I have been working on Kafka twitter streaming feed data.
I am following the sample from below link: http://www.hahaskills.com/tutorials/kafka/Twitter_doc.html
I am able to use Producer code and it is working fine. Able to get twitter feed and send to Kafka Producer.
I am not able to use Consumer code, since it has been throwing as deprecated error for many APIs.
Here is the Consumer code:
import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; //import kafka.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; //import kafka.consumer.KafkaStream; //import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; //import org.apache.kafka.clients.producer.KafkaProducer; public class KafkaConsumer { private final ConsumerConnector consumer; private final String topic; public KafkaConsumer(String zookeeper, String groupId, String topic) { Properties props = new Properties(); props.put("zookeeper.connect", zookeeper); props.put("group.id", groupId); props.put("zookeeper.session.timeout.ms", "500"); props.put("zookeeper.sync.time.ms", "250"); props.put("auto.commit.interval.ms", "1000"); consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); this.topic = topic; } public void testConsumer() { System.out.println("Test Con called"); Map<String, Integer> topicCount = new HashMap<>(); topicCount.put(topic, 1); Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount); List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic); System.out.println("For"); for (final KafkaStream stream : streams) { ConsumerIterator<byte[], byte[]> it = stream.iterator(); System.out.println("Size"+it.length()); while (it.hasNext()) { System.out.println("Stream"); System.out.println("Message from Single Topic: " + new String(it.next().message())); } } if (consumer != null) { consumer.shutdown(); } } public static void main(String[] args) { System.out.println("Started"); String topic="twittertopic"; KafkaConsumer simpleTWConsumer = new KafkaConsumer("localhost:XXXX", "testgroup", topic); simpleTWConsumer.testConsumer(); System.out.println("End"); } }
It throws error : ConsumerConnector, ConsumerIterator, KafkaStream are deprecated.
ConsumerConfig is not visible.
Is there fixed version of this sample code (Kafka consumer for twitter)?
1 Answers
Answers 1
The tutorial you are following is very old and it's using the old Scala Kafka clients that have been deprecated, see http://kafka.apache.org/documentation/#legacyapis
The classes that have been deprecated are:
kafka.consumer.*
andkafka.javaapi.consumer
instead use the newer Java Consumer underorg.apache.kafka.clients.consumer.*
kafka.producer.*
andkafka.javaapi.producer
instead use the newer Java Producer underorg.apache.kafka.clients.producer.*
Apart from using deprecated classes, your code was mostly correct, I only had to fix a few imports. See below a fixed version. Using it I was able to consume messages I was producing to a topic called twittertopic
.
package example; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class MyConsumer { private final ConsumerConnector consumer; private final String topic; public MyConsumer(String zookeeper, String groupId, String topic) { Properties props = new Properties(); props.put("zookeeper.connect", zookeeper); props.put("group.id", groupId); props.put("zookeeper.session.timeout.ms", "500"); props.put("zookeeper.sync.time.ms", "250"); props.put("auto.commit.interval.ms", "1000"); consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); this.topic = topic; } public void testConsumer() { Map<String, Integer> topicCount = new HashMap<>(); topicCount.put(topic, 1); Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount); List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic); for (final KafkaStream stream : streams) { ConsumerIterator<byte[], byte[]> it = stream.iterator(); while (it.hasNext()) { System.out.println("Message from Single Topic: " + new String(it.next().message())); } } if (consumer != null) { consumer.shutdown(); } } public static void main(String[] args) { System.out.println("Started"); String topic = "twittertopic"; MyConsumer simpleTWConsumer = new MyConsumer("localhost:2181", "testgroup", topic); simpleTWConsumer.testConsumer(); System.out.println("End"); } }
While the code above can be used, the next major Kafka release is likely to remove classes that are currently deprecated, so you should not write new logic using these.
Instead you should get started with the Java clients, you can use the examples provided on Github: https://github.com/apache/kafka/tree/trunk/examples/src/main/java/kafka/examples
Using the new Java Consumer, your logic would look like:
import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class MyConsumer { static final String TOPIC = "twittertopic"; static final String GROUP = "testgroup"; public static void main(String[] args) { System.out.println("Started"); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", GROUP); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);) { consumer.subscribe(Arrays.asList(TOPIC)); for (int i = 0; i < 1000; i++) { ConsumerRecords<String, String> records = consumer.poll(1000L); System.out.println("Size: " + records.count()); for (ConsumerRecord<String, String> record : records) { System.out.println("Received a message: " + record.key() + " " + record.value()); } } } System.out.println("End"); } }
0 comments:
Post a Comment