Thursday, March 22, 2018

Deprecation Errors with Kafka Consumer for twitter streaming

Leave a Comment

I have been working on Kafka twitter streaming feed data.

I am following the sample from below link: http://www.hahaskills.com/tutorials/kafka/Twitter_doc.html

I am able to use Producer code and it is working fine. Able to get twitter feed and send to Kafka Producer.

I am not able to use Consumer code, since it has been throwing as deprecated error for many APIs.

Here is the Consumer code:

import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties;  import kafka.consumer.Consumer; //import kafka.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; //import kafka.consumer.KafkaStream; //import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector;   //import org.apache.kafka.clients.producer.KafkaProducer;  public class KafkaConsumer {     private final ConsumerConnector consumer;     private final String topic;      public KafkaConsumer(String zookeeper, String groupId, String topic) {         Properties props = new Properties();         props.put("zookeeper.connect", zookeeper);         props.put("group.id", groupId);         props.put("zookeeper.session.timeout.ms", "500");         props.put("zookeeper.sync.time.ms", "250");         props.put("auto.commit.interval.ms", "1000");          consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));          this.topic = topic;     }      public void testConsumer() {       System.out.println("Test Con called");          Map<String, Integer> topicCount = new HashMap<>();          topicCount.put(topic, 1);          Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);          List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);          System.out.println("For");          for (final KafkaStream stream : streams) {              ConsumerIterator<byte[], byte[]> it = stream.iterator();              System.out.println("Size"+it.length());              while (it.hasNext()) {                 System.out.println("Stream");                 System.out.println("Message from Single Topic: " + new String(it.next().message()));             }         }          if (consumer != null) {             consumer.shutdown();         }     }      public static void main(String[] args) {       System.out.println("Started");      String topic="twittertopic";      KafkaConsumer simpleTWConsumer = new KafkaConsumer("localhost:XXXX", "testgroup", topic);      simpleTWConsumer.testConsumer();      System.out.println("End");     }     } 

It throws error : ConsumerConnector, ConsumerIterator, KafkaStream are deprecated.

ConsumerConfig is not visible.

Is there fixed version of this sample code (Kafka consumer for twitter)?

1 Answers

Answers 1

The tutorial you are following is very old and it's using the old Scala Kafka clients that have been deprecated, see http://kafka.apache.org/documentation/#legacyapis

The classes that have been deprecated are:

  • kafka.consumer.* and kafka.javaapi.consumer instead use the newer Java Consumer under org.apache.kafka.clients.consumer.*

  • kafka.producer.* and kafka.javaapi.producer instead use the newer Java Producer under org.apache.kafka.clients.producer.*

Apart from using deprecated classes, your code was mostly correct, I only had to fix a few imports. See below a fixed version. Using it I was able to consume messages I was producing to a topic called twittertopic.

package example;  import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties;  import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector;  public class MyConsumer {     private final ConsumerConnector consumer;     private final String topic;      public MyConsumer(String zookeeper, String groupId, String topic) {         Properties props = new Properties();         props.put("zookeeper.connect", zookeeper);         props.put("group.id", groupId);         props.put("zookeeper.session.timeout.ms", "500");         props.put("zookeeper.sync.time.ms", "250");         props.put("auto.commit.interval.ms", "1000");         consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));         this.topic = topic;     }      public void testConsumer() {          Map<String, Integer> topicCount = new HashMap<>();         topicCount.put(topic, 1);          Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);         List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);          for (final KafkaStream stream : streams) {             ConsumerIterator<byte[], byte[]> it = stream.iterator();             while (it.hasNext()) {                 System.out.println("Message from Single Topic: " + new String(it.next().message()));             }         }          if (consumer != null) {             consumer.shutdown();         }     }      public static void main(String[] args) {         System.out.println("Started");         String topic = "twittertopic";         MyConsumer simpleTWConsumer = new MyConsumer("localhost:2181", "testgroup", topic);         simpleTWConsumer.testConsumer();         System.out.println("End");     } } 

While the code above can be used, the next major Kafka release is likely to remove classes that are currently deprecated, so you should not write new logic using these.

Instead you should get started with the Java clients, you can use the examples provided on Github: https://github.com/apache/kafka/tree/trunk/examples/src/main/java/kafka/examples

Using the new Java Consumer, your logic would look like:

import java.util.Arrays; import java.util.Properties;  import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer;  public class MyConsumer {      static final String TOPIC = "twittertopic";     static final String GROUP = "testgroup";      public static void main(String[] args) {         System.out.println("Started");          Properties props = new Properties();         props.put("bootstrap.servers", "localhost:9092");         props.put("group.id", GROUP);         props.put("auto.commit.interval.ms", "1000");         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");          try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);) {             consumer.subscribe(Arrays.asList(TOPIC));              for (int i = 0; i < 1000; i++) {                 ConsumerRecords<String, String> records = consumer.poll(1000L);                 System.out.println("Size: " + records.count());                 for (ConsumerRecord<String, String> record : records) {                     System.out.println("Received a message: " + record.key() + " " + record.value());                 }             }         }         System.out.println("End");     }  } 
If You Enjoyed This, Take 5 Seconds To Share It

0 comments:

Post a Comment