Wednesday, March 14, 2018

Deprecation Errors with Kafka Consumer for twitter streaming

Leave a Comment

I have been working on Kafka twitter streaming feed data.

I am following the sample from below link: http://www.hahaskills.com/tutorials/kafka/Twitter_doc.html

I am able to use Producer code and it is working fine. Able to get twitter feed and send to Kafka Producer.

I am not able to use Consumer code, since it has been throwing as deprecated error for many APIs.

Here is the Consumer code:

import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties;  import kafka.consumer.Consumer; //import kafka.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; //import kafka.consumer.KafkaStream; //import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector;   //import org.apache.kafka.clients.producer.KafkaProducer;  public class KafkaConsumer {     private final ConsumerConnector consumer;     private final String topic;      public KafkaConsumer(String zookeeper, String groupId, String topic) {         Properties props = new Properties();         props.put("zookeeper.connect", zookeeper);         props.put("group.id", groupId);         props.put("zookeeper.session.timeout.ms", "500");         props.put("zookeeper.sync.time.ms", "250");         props.put("auto.commit.interval.ms", "1000");          consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));          this.topic = topic;     }      public void testConsumer() {       System.out.println("Test Con called");          Map<String, Integer> topicCount = new HashMap<>();          topicCount.put(topic, 1);          Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);          List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);          System.out.println("For");          for (final KafkaStream stream : streams) {              ConsumerIterator<byte[], byte[]> it = stream.iterator();              System.out.println("Size"+it.length());              while (it.hasNext()) {                 System.out.println("Stream");                 System.out.println("Message from Single Topic: " + new String(it.next().message()));             }         }          if (consumer != null) {             consumer.shutdown();         }     }      public static void main(String[] args) {       System.out.println("Started");      String topic="twittertopic";      KafkaConsumer simpleTWConsumer = new KafkaConsumer("localhost:XXXX", "testgroup", topic);      simpleTWConsumer.testConsumer();      System.out.println("End");     }     } 

It throws error : ConsumerConnector, ConsumerIterator, KafkaStream are deprecated.

ConsumerConfig is not visible.

Is there fixed version of this sample code (Kafka consumer for twitter)?

0 Answers

If You Enjoyed This, Take 5 Seconds To Share It

0 comments:

Post a Comment