Friday, May 25, 2018

Kafka Consumer error: Marking coordinator dead

Leave a Comment

I have a topic with 10 partitions in Kafka 0.10.0.1 cluster. I have an application that spawns multiple consumer threads. For this topic I am spawning 5 threads. Many times in my application logs I am seeing this entry

INFO :: AbstractCoordinator:600 - Marking the coordinator x.x.x.x:9092 (id:2147483646 rack: null) dead for group notifications-consumer 

Then there are several entries saying (Re-)joining group notifications-consumer. Afterwards I also see one warning saying

Auto commit failed for group notifications-consumer: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time  message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned by poll() with max.poll.records. 

Now I have already adjusted my consumer config like so

props.put("max.poll.records", 200); props.put("heartbeat.interval.ms", 20000); props.put("session.timeout.ms", 60000); 

So, even after properly adjusting the config I am still getting this error. During the rebalance our app is completely unresponsive. Please help.

1 Answers

Answers 1

With session.timeout.ms you only control the timeouts due to heartbeats, this means that has passed session.timeout.ms milliseconds since the last heartbeat and the cluster declares you as a dead node and triggers a rebalance.

Before KIP-62 the heartbeat was sent within the poll but now is moved to a specific background thread to avoid being evicted from the cluster if you were taking more time than session.timeout.ms to call another poll(). Separating the heartbeat to a specific thread decouples the processing from telling the cluster that you are up and running, but this introduced the risk of "livelock" situations in which the process is alive, but is not making progress, so besides making the heartbeat independent of the poll a new timeout was introduced to ensure that the consumer was alive and making progress. The documentation says these about the implementation pre KIP-62:

As long as the consumer is sending heartbeats, it basically holds a lock on the partitions it was assigned. If the process becomes defunct in such a way that it cannot make progress but is nevertheless continuing to send heartbeats, then no other member in the group will be able to take over the partitions, which causes increasing lag. The fact that heartbeating and processing is all done in the same thread, however, guarantees that consumers must make progress to keep their assignment. Any stall which affects processing also affects heartbeats.

The changes introduced by the KIP-62 includes:

Decoupling the processing timeout: We propose to introduce a separate locally enforced timeout for record processing and a background thread to keep the session active until this timeout expires. We call this new timeout as the "process timeout" and expose it in the consumer's configuration as max.poll.interval.ms. This config sets the maximum delay between client calls to poll()

From the logs you posted I think you may be in this situation, your app is taking more time than max.poll.interval.ms (5 min by default) to process the 200 polled records. If you are in this scenario you could only reduce even more the max.poll.records or increase the max.poll.interval.ms.

PD:

The max.poll.interval.ms configuration that appears on your log is from (at least) kafka 0.10.1.0 so I assume you make a little mistake there.

If You Enjoyed This, Take 5 Seconds To Share It

0 comments:

Post a Comment