Thursday, March 17, 2016

Kafka in Docker not working

Leave a Comment

I am trying to use wurstmeister\kafka-docker image with docker-compose, but I am having real problems with connecting everything.

All the posts or questions that I check, seems not to have any problems, but I am frankly lost. (And there are at least two questions in SO that try to address the problem)

I believe that the problem is my poor understanding of the networking of docker. So the problem:

I can consume and produce from the same container of kafka, but, when I try to create another container (or use my laptop with a python client) I got several errors related to the advertised.host.name parameter (in the image this parameter is KAFKA_ADVERTISED_HOST_NAME)

I already try setting this variable in lot of ways, but it simply don't work.

So I am looking for a authorative answer (i.e. how to set automatically those parameters and what is it meaning) how to set the docker-compose.yml

This is mine:

zookeeper:   image: wurstmeister/zookeeper   ports:     - "2181:2181"  kafka:   image: wurstmeister/kafka  # hostname: kafka   ports:     - "9092"   links:     - zookeeper:zk   environment:     KAFKA_ADVERTISED_HOST_NAME: "kafka"     KAFKA_ADVERTISED_PORT: "9092"     KAFKA_ZOOKEEPER_CONNECT: "zk:2181" 

UPDATE

Following the advise of @dnephin, I modified the start-kafka.sh in the following lines:

... if [[ -z "$KAFKA_ADVERTISED_PORT" ]]; then     export KAFKA_ADVERTISED_PORT=$(hostname -i) fi ... 

and remove KAFKA_ADVERTISED_HOST_NAME: "kafka" from the docker-compose.yml

I started the containers in the canonical way:

docker-compose up -d 

Both of the containers are running:

$ docker-compose ps            Name                          Command               State                     Ports                     ----------------------------------------------------------------------------------------------------------------- infraestructura_kafka_1       start-kafka.sh                   Up      0.0.0.0:32768->9092/tcp                     infraestructura_zookeeper_1   /opt/zookeeper/bin/zkServe ...   Up      0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp  

Afterwards I did:

docker-compose logs 

And everything run smoothly.

For checking the ip addresses:

$ KAFKA_IP=$(docker inspect --format '{{ .NetworkSettings.IPAddress }}' infraestructura_kafka_1)                                                                                                             $ echo $KAFKA_IP 172.17.0.4  and  $ ZK_IP=$(docker inspect --format '{{ .NetworkSettings.IPAddress }}' infraestructura_zookeeper_1)                                                                                                            $ echo $ZK_IP  172.17.0.3 

Then I execute in two differents consoles:

A producer:

$ docker run --rm --interactive wurstmeister/kafka /opt/kafka_2.11-0.9.0.1/bin/kafka-console-producer.sh --topic grillo --broker-list 171.17.0.4:9092   

A consumer:

$ docker run --rm --interactive  wurstmeister/kafka /opt/kafka_2.11-0.9.0.1/bin/kafka-console-consumer.sh --topic grillo --from-beginning --zookeeper 172.17.0.3:2181  

Almost immediately, warnings start flying all over the screen:

[2016-03-11 00:39:17,010] WARN Fetching topic metadata with correlation id 0 for topics [Set(grillo)] from broker [BrokerEndPoint(1001,ba53d4fd7595,9092)] failed (kafka.client.ClientUtils$) java.nio.channels.ClosedChannelException         at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)         at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)         at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)         at kafka.producer.SyncProducer.send(SyncProducer.scala:119)         at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)         at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)         at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) [2016-03-11 00:39:17,013] WARN [console-consumer-79688_9dd5f575d557-1457656747003-f1ed369d-leader-finder-thread], Failed to find leader for Set([grillo,0]) (kafka.consumer.ConsumerFetcherManager$LeaderFin derThread) kafka.common.KafkaException: fetching topic metadata for topics [Set(grillo)] from broker [ArrayBuffer(BrokerEndPoint(1001,ba53d4fd7595,9092))] failed         at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)         at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)         at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) Caused by: java.nio.channels.ClosedChannelException         at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)         at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)         at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)         at kafka.producer.SyncProducer.send(SyncProducer.scala:119)         at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)         ... 3 more 

and so on

In the console of the producer, I wrote some sentences:

$ docker run --rm --interactive klustera/kafka /opt/kafka_2.11-0.9.0.1/bin/kafka-console-producer.sh --topic grillo --broker-list 171.17.0.4:9092                                                            Hola ¿Cómo estáń? ¿Todo bien? 

And a few moments later, I got this response:

[2016-03-11 00:39:28,955] ERROR Error when sending message to topic grillo with key: null, value: 4 bytes with error: Failed to update metadata after 60000 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) [2016-03-11 00:40:28,956] ERROR Error when sending message to topic grillo with key: null, value: 16 bytes with error: Failed to update metadata after 60000 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) [2016-03-11 00:41:28,956] ERROR Error when sending message to topic grillo with key: null, value: 12 bytes with error: Failed to update metadata after 60000 ms. (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback) 

And in the docker-compose logs

... zookeeper_1 | 2016-03-11 00:39:07,072 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@651] - Got user-level KeeperException when processing sessionid:0x153631368b1000b type:create c xid:0x2 zxid:0x47 txntype:-1 reqpath:n/a Error Path:/consumers Error:KeeperErrorCode = NodeExists for /consumers zookeeper_1 | 2016-03-11 00:39:07,243 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@651] - Got user-level KeeperException when processing sessionid:0x153631368b1000b type:create c xid:0x19 zxid:0x4b txntype:-1 reqpath:n/a Error Path:/consumers/console-consumer-79688/owners/grillo Error:KeeperErrorCode = NoNode for /consumers/console-consumer-79688/owners/grillo zookeeper_1 | 2016-03-11 00:39:07,247 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@651] - Got user-level KeeperException when processing sessionid:0x153631368b1000b type:create $xid:0x1a zxid:0x4c txntype:-1 reqpath:n/a Error Path:/consumers/console-consumer-79688/owners Error:KeeperErrorCode = NoNode for /consumers/console-consumer-79688/owners ... 

UPDATE 2

I made it work, at least, in docker-machine:

First, I defined a variable with the name of the docker-machine:

DOCKER_VM=kafka_test 

Then, I modify the docker-compose.yml as follows:

KAFKA_ADVERTISED_HOST_NAME: "${DOCKER_MACHINE_IP}" 

Lastly, in the environment of the docker-machine, I execute:

DOCKER_MACHINE_IP=$(docker-machine ip $DOCKER_VM) docker-compose up -d 

But in the laptop (I mean, without using a virtual machine, it doesn't work)

2 Answers

Answers 1

I believe the value you use for KAFKA_ADVERTISED_HOST_NAME will change depending on how the container can be reached.

If you're trying to connect from another container, using kafka should be correct (as long as you use set that name as the link alias).

If you're trying to connect from the host, that name isn't going to work. You'd need to use the container IP address, which you can get using docker inspect. However the container IP address will change, so it might be better to set this from inside the container using $(hostname -i) to retrieve it.

Answers 2

Just try the following & use service discovery for example this one.

zookeeper:   image: wurstmeister/zookeeper   ports:     - "2181:2181" kafka:   build: .   ports:     - "9092:9092"   links:     - zookeeper:zk   environment:     KAFKA_ADVERTISED_HOST_NAME: 192.168.59.103     KAFKA_ADVERTISED_PORT: 9092     KAFKA_CREATE_TOPICS: "test:1:1"   volumes:     - /var/run/docker.sock:/var/run/docker.sock 

Or you use this one:

zookeeper:   image: wurstmeister/zookeeper   ports:      - "2181" kafka:   build: .   ports:     - "9092"   links:      - zookeeper:zk   environment:     KAFKA_ADVERTISED_HOST_NAME: 192.168.59.103     DOCKER_HOST: 192.168.59.103:2375   volumes:     - /var/run/docker.sock:/var/run/docker.sock 
If You Enjoyed This, Take 5 Seconds To Share It

0 comments:

Post a Comment