Up until now we’ve been experimenting with Apache Kafka, a tool build with cluster and high availability in mind, but using exactly one host and availability settings which only few very optimistic people would call high.
Not today.
Today we’re going to spin up multi-host Kafka cluster and we’ll replicate topic in it, so if one host goes down, data and its availability won’t suffer.
Building up a cluster
I think three Kafka hosts would be enough to make up a nice cluster. However, there will be only one instance of ZooKeeper, so our safe and balanced cluster could be easily killed by one head shot after all. But just for today let’s pretend it won’t happen.
I’m going to put ZooKeeper and every Kafka broker into its own Docker container and spin them up with docker-compose. This will provide nice level of isolation and simplify the process of adding and removing more hosts.
Dockerfile
Images for Kafka and ZooKeeper containers need only two things: JDK and Kafka installer. They are so similar that we actually can use the same image for all containers.
We also need to do some changes in servers configuration. By default Kafka server connects to ZooKeeper at localhost:2181, which obviously won’t work between containers. Instead, we’ll name ZooKeeper container – zookeeper, and therefore will be able to connect to it using zookeeper:2181 address.
Another thing is every broker within a cluster must have unique broker id. We can pass ID from docker-compose directly to Dockerfile using ARG instruction and then use sed utility to update config file in every container:
1 2 3 4 5 6 7 8 9 10 11 |
FROM openjdk ARG brokerId=1 ADD kafka_2.11-0.10.1.0.tgz / RUN mv /kafka_2.11-0.10.1.0 /kafka && \ sed -i s/localhost:2181/zookeeper:2181/g /kafka/config/server.properties && \ sed -i s/broker.id=0/broker.id=${brokerId}/g /kafka/config/server.properties EXPOSE 2181 9092 |
openjdk image has JDK and ADD instruction will copy and unpack Kafka archive I downloaded earlier into the filesystem root. Then we rename setup folder to /kafka and update ZooKeeper URL and broker ID by two consequent sed -i commands. Finally, EXPOSE will keep the port 2181 (ZooKeeper) and 9092 (Kafka) opened.
Piece of cake.
docker-compose.yml
docker-compose.yml is going to be a little bit more tricky, because it’s going to contain everything we need for fully functioning cluster: ZooKeeper, three Kafka servers, and messages producer and a consumer for some data flow.
ZOOKEEPER
ZooKeeper’s section is the simplest one:
1 2 3 4 5 |
zookeeper: build: . command: /kafka/bin/zookeeper-server-start.sh /kafka/config/zookeeper.properties ports: - 2181:2181 |
It builds an image from the Dockerfile we just created, starts a container, launches ZooKeeper in it and opens port 2181 to outside world.
KAFKA SERVERS
Kafka servers configs are a slightly more complex, but no that much. E.g. the second server config looks like this:
1 2 3 4 5 6 7 8 |
kafka2: build: context: . args: brokerId: 2 command: /kafka/bin/kafka-server-start.sh /kafka/config/server.properties depends_on: - zookeeper |
depends_on directive will make sure that Kafka starts after ZooKeeper, and brokerId: 2 specifies ID to use.
PRODUCER
Producer container is the most confusing one, but only because its command statement has the highest commands-per-line ratio in the whole file. This what producer wants to do:
1 2 3 4 5 6 |
sleep 4 /kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 3 --partitions 2 --topic dates while true; do date | /kafka/bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic dates; sleep 1; done |
It basically says:
- wait 4 seconds before start (cheesy, but it’s simple and good enough to wait until servers are ready),
- create new topic called ‘dates’ split it into two partitions with replication factor of three,
- publish system date into that topic once per second.
Sounds simple, but this is how it looks as one line in docker-compose file:
1 2 3 4 5 6 7 8 |
producer: build: . command: bash -c "sleep 4 && /kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 3 --partitions 2 --topic dates && while true; do date | /kafka/bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic dates; sleep 1; done " depends_on: - zookeeper - kafka1 - kafka2 - kafka3 |
Insane, right?
CONSUMER
Comparing to producer, consumer is nice and simple:
1 2 3 4 5 6 7 8 |
consumer: build: . command: bash -c "sleep 6 && /kafka/bin/kafka-console-consumer.sh --topic dates --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092" depends_on: - zookeeper - kafka1 - kafka2 - kafka3 |
Both producer and consumer know about all three Kafka servers, so we can safely destroy some of them later.
Final look
This is the final look of docker-compose.yml:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
version: '2' services: zookeeper: build: . command: /kafka/bin/zookeeper-server-start.sh /kafka/config/zookeeper.properties ports: - 2181:2181 kafka1: build: context: . args: brokerId: 1 command: /kafka/bin/kafka-server-start.sh /kafka/config/server.properties depends_on: - zookeeper kafka2: build: context: . args: brokerId: 2 command: /kafka/bin/kafka-server-start.sh /kafka/config/server.properties depends_on: - zookeeper kafka3: build: context: . args: brokerId: 3 command: /kafka/bin/kafka-server-start.sh /kafka/config/server.properties depends_on: - zookeeper producer: build: . command: bash -c "sleep 4 && /kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 3 --partitions 2 --topic dates && while true; do date | /kafka/bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic dates; sleep 1; done " depends_on: - zookeeper - kafka1 - kafka2 - kafka3 consumer: build: . command: bash -c "sleep 6 && /kafka/bin/kafka-console-consumer.sh --topic dates --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092" depends_on: - zookeeper - kafka1 - kafka2 - kafka3 |
Running a cluster
Now, call docker-compose up , and after some time containers output will calm down and leave only consumer’s messages:
1 2 3 4 5 6 |
# consumer_1 | Tue Dec 13 05:23:31 UTC 2016 # consumer_1 | Tue Dec 13 05:23:34 UTC 2016 # consumer_1 | Tue Dec 13 05:23:37 UTC 2016 # consumer_1 | Tue Dec 13 05:23:39 UTC 2016 # consumer_1 | Tue Dec 13 05:23:43 UTC 2016 # consumer_1 | Tue Dec 13 05:23:46 UTC 2016 |
It’s really working.
Because I left zookeeper’s 2181 port accessible from the host machine, we can run some queries and see what distributed topic looks like:
1 2 3 |
./bin/kafka-topics.sh --describe --topic dates --zookeeper 127.0.0.1:2181 # Topic: dates Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2 # Topic: dates Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 |
Command outputs quite interesting stats: there’re two partitions of ‘dates’ topic – 0 and 1, with servers 3 and 1 being their leaders. As a side note, all reads and writes to a topic go through a leader and it’s also his job to keep replicas synchronized. At the moment, every partition has three replicas at nodes 3,1,2 and 1,2,3 respectively and all of them are synchronized (Isr – in-sync replica).
Because it’s almost 1 AM and I’m still awake, I think it’ll be fair if cluster suffers as well.
Tormenting Kafka cluster
Obviously, cluster is fine with tree brokers around. How would it feel if one of them was down? docker ps says that kafka2 container has ID 12bda0311443 , so we can issue docker stop 12bda0311443 command to find out.
1 2 |
docker stop 12bda0311443 # 12bda0311443 |
What’s nice, consumer’s output didn’t stop even for a second. Now, if we request topic statistics, something interesting will happen:
1 2 3 4 |
./kafka/bin/kafka-topics.sh --describe --topic dates --zookeeper 127.0.0.1:2181 #Topic:dates PartitionCount:2 ReplicationFactor:3 Configs: # Topic: dates Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1 # Topic: dates Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,3 |
Partition leaders are the same, but replica number 2 is no longer synchronized. In production that would be a bad sign.
What happens if one of the leaders goes down as well?
Stopping kafka3 still doesn’t disrupt consumer’s output, but partition’s leader has changed:
1 2 3 4 |
./bin/kafka-topics.sh --describe --topic dates --zookeeper 127.0.0.1:2181 #Topic:dates PartitionCount:2 ReplicationFactor:3 Configs: # Topic: dates Partition: 0 Leader: 1 Replicas: 3,1,2 Isr: 1 # Topic: dates Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1 |
Now the leader of both partitions is remaining kafka1 broker. Replica #3 is also no longer synchronized.
Finally, will bringing kafka2 and kafka3 back online make any difference?
1 2 3 4 |
./bin/kafka-topics.sh --describe --topic dates --zookeeper 127.0.0.1:2181 #Topic:dates PartitionCount:2 ReplicationFactor:3 Configs: # Topic: dates Partition: 0 Leader: 1 Replicas: 3,1,2 Isr: 1,3,2 # Topic: dates Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,3,2 |
It will. Leader is still the same, but replicas are synchronized again. The cluster got back to its initial state.
Conclusion
We just saw some black magic happening here. We created Kafka cluster with three hosts and one distributed topic on them, destroyed hosts one by one until the last one, and messages flow didn’t stop even for a second. When all hosts got back online, the whole cluster rebalanced itself and continued to work as if nothing happened.
Today’s example was way more difficult than what we did last time with single Kafka host, but none of today’s complexity has anything to do with Kafka itself. It was preparing Docker, docker-compose, and configuring services that looked heavy. The only piece of Kafka configuration we had to change this time that we didn’t had to with single node cluster is specifying broker id. Not a big price to pay, comparing how reliable the cluster has became.
I want to ask how would you run this command :
./kafka/bin/kafka-topics.sh –describe –topic dates –zookeeper 127.0.0.1:2181
I have tried by execute kafa1 docker container by typing command
docker exec [kafka1 container id] /bin/bash
but when I ran that command
./kafka/bin/kafka-topics.sh –describe –topic dates –zookeeper 127.0.0.1:2181
this error appears to me
[2017-07-09 11:00:27,439] WARN Session 0x0 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1141)
Exception in thread “main” org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 30000
at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1232)
at org.I0Itec.zkclient.ZkClient.(ZkClient.java:156)
at org.I0Itec.zkclient.ZkClient.(ZkClient.java:130)
at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:76)
at kafka.utils.ZkUtils$.apply(ZkUtils.scala:58)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:53)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
Is your zookeeper instance running? I see the following error in the middle of a call stack:
Unable to connect to zookeeper server within timeout: 30000
.Passing parameter like this –
–zookeeper 127.0.0.1:2181
– assumes that it should be running within the same containerCan we have a docker-compose that can launch 3 containers 3 different hosts a.k.a. instances?
From the top of my head – I don’t know. I’d google for distribution policies. This should be easy in Kubernetes, though. They have DaemonSet or something exactly for that.
I want to understand how scaling up brokers will work, I mean if i scale cluster how Consumer and producer will start communicating with it.
My understanding is that when you provide bootstrap_brokers parameter to your consumer/producer, the client and the broker will negotiate whom client should be talking to, which might end up being the broker you just upscaled to. I’d assume that for efficiencty reasons the client will talk to the node which has ‘correct’ partition and topic, not just _any_ node. If that’s not the case, then any bootstrap broker should be able to relay client’s communication to the target, as the cluster always knows its members.
So bottom line is don’t think about bootstrap servers as being the ones and only ones the clients will be talking to. It’s just an entry points to the cluster as a whole. At least this is what I would expect to see after seeing other scalable systems 🙂
Thanks @pav, I will try it out by myself. lets see how it works