I am attempting to use the SimpleConsumer in Kafka 9 to allow users to replay events from a time offset - but the Messages I am receiving back from Kafka are in a very strange encoding:
7icf-test-testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7\�W>8������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819330373,"context":{"userid":0,"username":"testUser"}}�!}�a�����{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819331637,"context":{"userid":1,"username":"testUser"}}���r�����{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819332754,"context":{"userid":2,"username":"testUser"}}��������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819333868,"context":{"userid":3,"username":"testUser"}}�p= ������{"namespace":"test","type":"testEvent.ebebf1a4.2911.431d.a138.f5d6db4647d7","received":1464819334997,"context":{"userid":4,"username"
Using the KafkaConsumer this messages parse just fine. Here is the code I am using to retrieve messages using the SimpleConsumer :
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) { long currentOffset = messageAndOffset.offset(); if (currentOffset < readOffset) { log.debug("Found an old offset - skip"); continue; } readOffset = messageAndOffset.nextOffset(); int payloadOffset = 14 + messageAndOffset.message().keySize(); // remove first x bytes, schema Id byte[] data = messageAndOffset.message().payload().array(); byte[] realData = Arrays.copyOfRange(data, payloadOffset, data.length - payloadOffset); log.debug("Read " + new String(realData, "UTF-8")); }
I added the code to skip the first x bytes after I kept getting UTF-32 errors about bytes being too high, which I assume is because Kafka prepends info like message size to the payload. Is this an Avro artifact?
3 Answers
Answers 1
I never found a good answer to this - but I switched to using the SimpleConsumer
to query Kafka for the offsets I needed (per partition . . . though the implementation is poor) and then use the native KafkaConsumer using seek(TopicPartition, offset)
or seekToBeginning(TopicPartition)
to retrieve the messages. Hopefully they will add, to the native client, the ability to retrieve messages from a given timestamp in the next release.
Answers 2
Are you looking for this ?
readOffset = messageAndOffset.nextOffset(); ByteBuffer payload = messageAndOffset.message().payload(); if(payload == null) { System.err.println("Message is null : " + readOffset); continue; } final byte[] realData = new byte[payload.limit()]; payload.get(realData); System.out.println("Read " + new String(realData, "UTF-8"));
Answers 3
You can periodically log the partition an offset you are committing with the timestamp of the message (maybe not each commit) and then you can have some measure in the future to set your consumer offsets. I presume this is for production debugging.
I doubt they'd add such a feature, it seems unfeasible considering how Kafka works, although I may be mistaken, there's always genius stuff going on. I'd do the logging thing.
0 comments:
Post a Comment