I have been working on Kafka twitter streaming feed data.
I am following the sample from below link: http://www.hahaskills.com/tutorials/kafka/Twitter_doc.html
I am able to use Producer code and it is working fine. Able to get twitter feed and send to Kafka Producer.
I am not able to use Consumer code, since it has been throwing as deprecated error for many APIs.
Here is the Consumer code:
import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; //import kafka.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; //import kafka.consumer.KafkaStream; //import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; //import org.apache.kafka.clients.producer.KafkaProducer; public class KafkaConsumer { private final ConsumerConnector consumer; private final String topic; public KafkaConsumer(String zookeeper, String groupId, String topic) { Properties props = new Properties(); props.put("zookeeper.connect", zookeeper); props.put("group.id", groupId); props.put("zookeeper.session.timeout.ms", "500"); props.put("zookeeper.sync.time.ms", "250"); props.put("auto.commit.interval.ms", "1000"); consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); this.topic = topic; } public void testConsumer() { System.out.println("Test Con called"); Map<String, Integer> topicCount = new HashMap<>(); topicCount.put(topic, 1); Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount); List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic); System.out.println("For"); for (final KafkaStream stream : streams) { ConsumerIterator<byte[], byte[]> it = stream.iterator(); System.out.println("Size"+it.length()); while (it.hasNext()) { System.out.println("Stream"); System.out.println("Message from Single Topic: " + new String(it.next().message())); } } if (consumer != null) { consumer.shutdown(); } } public static void main(String[] args) { System.out.println("Started"); String topic="twittertopic"; KafkaConsumer simpleTWConsumer = new KafkaConsumer("localhost:XXXX", "testgroup", topic); simpleTWConsumer.testConsumer(); System.out.println("End"); } }
It throws error : ConsumerConnector, ConsumerIterator, KafkaStream are deprecated.
ConsumerConfig is not visible.
Is there fixed version of this sample code (Kafka consumer for twitter)?
0 comments:
Post a Comment