We are running a 3 broker Kafka 0.10.0.1 cluster. We have a java app which spawns many consumer threads consuming from different topics. For every topic we have specified different consumer-group.
A lot of times I see that whenever this application is restarted one or more CGs take more than 5 minutes to receive partition assignment. Till that time consumers for that topic don't consume anything. If I go to Kafka broker and run consumer-groups.sh and describe that particular CG I see that it is rebalancing. In server.log I see such lines
Preparing to stabilize group otp-sms-consumer Stabilized group otp-sms-consumer
And between these two logs there is usually a gap of about 5 minutes or more. On consumer side when I turn trace level logs, there is literally no activity during this pause time. After a couple of minutes a lot of activity starts. There is time critical data stored in that topic like otp-sms and we cannot tolerate such long delays. What can be the reason for such long rebalances.
Here's our consumer config
auto.commit.interval.ms = 3000 auto.offset.reset = latest bootstrap.servers = [x.x.x.x:9092, x.x.x.x:9092, x.x.x.x:9092] check.crcs = true client.id = connections.max.idle.ms = 540000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = otp-notifications-consumer heartbeat.interval.ms = 3000 interceptor.classes = null key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 50 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = SSL send.buffer.bytes = 131072 session.timeout.ms = 300000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = /x/x/client.truststore.jks ssl.truststore.password = [hidden] ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer Please help.
2 Answers
Answers 1
Your consumer configuration seems reasonable. I would advise trying three things:
- Try to spawn a single consumer thread, and assign it only one of the topics you're trying to consume from. That single thread should get all partitions for that topic assigned, and it should immediately start receiving all the data. You can try to print out the partition and message offset, as well as content, to validate that it's getting all the data.
- Once you validate that's working, spawn a single consumer thread, and assign it all the topics you're trying to consume from. Do the same validation printing out the messages.
- Finally, if that is working fine, start adding consumer threads one by one, and see if you start getting pauses when consuming.
That should allow you to pinpoint where the problem is. If you're able to consume everything with a single thread, but not with multiple threads, then your threading mechanism/pooling might have issues.
Answers 2
Check the __consumer_offsets partitions size on disk. We faced similar issue that was due to compaction errors. This leads to very long rebalances. See https://issues.apache.org/jira/browse/KAFKA-5413 for more details (solved since kafka 0.10.2.2 / 0.11) Another option is that that your broker configuration is incorrect, and compaction is turned off, and log.cleaner.enable if false. __consumer_offsets is a compacted topic, so if log.cleaner is disabled, it will not be compacted and lead to the same symptom.
0 comments:
Post a Comment