Highly available Kafka cluster in Docker

Up until now we’ve been experimenting with Apache Kafka, a tool build with cluster and high availability in mind, but using exactly one host and availability settings which only few very optimistic people would call high.

Not today.

Today we’re going to spin up multi-host Kafka cluster and we’ll replicate topic in it, so if one host goes down, data and its availability won’t suffer.

Building up a cluster

I think three Kafka hosts would be enough to make up a nice cluster. However, there will be only one instance of ZooKeeper, so our safe and balanced cluster could be easily killed by one head shot after all. But just for today let’s pretend it won’t happen.

I’m going to put ZooKeeper and every Kafka broker into its own Docker container and spin them up with docker-compose. This will provide nice level of isolation and simplify the process of adding and removing more hosts.

Dockerfile

Images for Kafka and ZooKeeper containers need only two things: JDK and Kafka installer. They are so similar that we actually can use the same image for all containers.

We also need to do some changes in servers configuration. By default Kafka server connects to ZooKeeper at localhost:2181, which obviously won’t work between containers. Instead, we’ll name ZooKeeper container – zookeeper, and therefore will be able to connect to it using zookeeper:2181 address.

Another thing is every broker within a cluster must have unique broker id. We can pass ID from docker-compose directly to Dockerfile using ARG instruction and then use sed utility to update config file in every container:

openjdk image has JDK and ADD instruction will copy and unpack Kafka archive I downloaded earlier into the filesystem root. Then we rename setup folder to  /kafka  and update ZooKeeper URL and broker ID by two consequent  sed -i commands. Finally, EXPOSE will keep the port 2181 (ZooKeeper) and 9092 (Kafka) opened.

Piece of cake.

docker-compose.yml

docker-compose.yml is going to be a little bit more tricky, because it’s going to contain everything we need for fully functioning cluster: ZooKeeper, three Kafka servers, and messages producer and a consumer for some data flow.

ZOOKEEPER

ZooKeeper’s section is the simplest one:

It builds an image from the Dockerfile we just created, starts a container, launches ZooKeeper in it and opens port 2181 to outside world.

KAFKA SERVERS

Kafka servers configs are a slightly more complex, but no that much. E.g. the second server config looks like this:

depends_on directive will make sure that Kafka starts after ZooKeeper, and brokerId: 2 specifies ID to use.

PRODUCER

Producer container is the most confusing one, but only because its command  statement has the highest commands-per-line ratio in the whole file. This what producer wants to do:

It basically says:

  • wait 4 seconds before start (cheesy, but it’s simple and good enough to wait until servers are ready),
  • create new topic called ‘dates’ split it into two partitions with replication factor of three,
  • publish system date into that topic once per second.

Sounds simple, but this is how it looks as one line in docker-compose file:

Insane, right?

CONSUMER

Comparing to producer, consumer is nice and simple:

Both producer and consumer know about all three Kafka servers, so we can safely destroy some of them later.

Final look

This is the final look of docker-compose.yml:

Running a cluster

Now, call  docker-compose up , and after some time containers output will calm down and leave only consumer’s messages:

It’s really working.

Because I left zookeeper’s 2181 port accessible from the host machine, we can run some queries and see what distributed topic looks like:

Command outputs quite interesting stats: there’re two partitions of ‘dates’ topic – 0 and 1, with servers 3 and 1 being their leaders. As a side note, all reads and writes to a topic go through a leader and it’s also his job to keep replicas synchronized. At the moment, every partition has three replicas at nodes 3,1,2 and 1,2,3 respectively and all of them are synchronized (Isr – in-sync replica).

Because it’s almost 1 AM and I’m still awake, I think it’ll be fair if cluster suffers as well.

Tormenting Kafka cluster

Obviously, cluster is fine with tree brokers around. How would it feel if one of them was down? docker ps  says that kafka2 container has ID   12bda0311443 , so we can issue docker stop 12bda0311443  command to find out.

What’s nice, consumer’s output didn’t stop even for a second. Now, if we request topic statistics, something interesting will happen:

Partition leaders are the same, but replica number 2 is no longer synchronized. In production that would be a bad sign.

What happens if one of the leaders goes down as well?

Stopping kafka3 still doesn’t disrupt consumer’s output, but partition’s leader has changed:

Now the leader of both partitions is remaining kafka1 broker. Replica #3 is also no longer synchronized.

Finally, will bringing kafka2 and kafka3 back online make any difference?

It will. Leader is still the same, but replicas are synchronized again. The cluster got back to its initial state.

Conclusion

We just saw some black magic happening here. We created Kafka cluster with three hosts and one distributed topic on them, destroyed hosts one by one until the last one, and messages flow didn’t stop even for a second. When all hosts got back online, the whole cluster rebalanced itself and continued to work as if nothing happened.

Today’s example was way more difficult than what we did last time with single Kafka host, but none of today’s complexity has anything to do with Kafka itself. It was preparing Docker, docker-compose, and configuring services that looked heavy. The only piece of Kafka configuration we had to change this time that we didn’t had to with single node cluster is specifying broker id. Not a big price to pay, comparing how reliable the cluster has became.

 

7 thoughts on “Highly available Kafka cluster in Docker

  1. I want to ask how would you run this command :
    ./kafka/bin/kafka-topics.sh –describe –topic dates –zookeeper 127.0.0.1:2181
    I have tried by execute kafa1 docker container by typing command
    docker exec [kafka1 container id] /bin/bash
    but when I ran that command
    ./kafka/bin/kafka-topics.sh –describe –topic dates –zookeeper 127.0.0.1:2181
    this error appears to me
    [2017-07-09 11:00:27,439] WARN Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
    java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
    at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
    Exception in thread “main” org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 30000
    at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1232)
    at org.I0Itec.zkclient.ZkClient.(ZkClient.java:156)
    at org.I0Itec.zkclient.ZkClient.(ZkClient.java:130)
    at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:76)
    at kafka.utils.ZkUtils$.apply(ZkUtils.scala:58)
    at kafka.admin.TopicCommand$.main(TopicCommand.scala:53)
    at kafka.admin.TopicCommand.main(TopicCommand.scala)

    1. Is your zookeeper instance running? I see the following error in the middle of a call stack: Unable to connect to zookeeper server within timeout: 30000.
      Passing parameter like this – –zookeeper 127.0.0.1:2181 – assumes that it should be running within the same container

    1. From the top of my head – I don’t know. I’d google for distribution policies. This should be easy in Kubernetes, though. They have DaemonSet or something exactly for that.

  2. I want to understand how scaling up brokers will work, I mean if i scale cluster how Consumer and producer will start communicating with it.

    1. My understanding is that when you provide bootstrap_brokers parameter to your consumer/producer, the client and the broker will negotiate whom client should be talking to, which might end up being the broker you just upscaled to. I’d assume that for efficiencty reasons the client will talk to the node which has ‘correct’ partition and topic, not just _any_ node. If that’s not the case, then any bootstrap broker should be able to relay client’s communication to the target, as the cluster always knows its members.
      So bottom line is don’t think about bootstrap servers as being the ones and only ones the clients will be talking to. It’s just an entry points to the cluster as a whole. At least this is what I would expect to see after seeing other scalable systems 🙂

Leave a Reply

Your email address will not be published. Required fields are marked *