Thursday, September 22, 2016

How to fetch offset id while consuming Kafka from Spark, save it in Cassandra and use it to restart Kafka?

Leave a Comment

I am using Spark to consume data from Kafka and save it in Cassandra. My program is written in Java. I am using the spark-streaming-kafka_2.10:1.6.2 lib to accomplish this. My code is:

SparkConf sparkConf = new SparkConf().setAppName("name"); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); Map<String,String> kafkaParams = new HashMap<>(); kafkaParams.put("zookeeper.connect", "127.0.0.1"); kafkaParams.put("group.id", App.GROUP); JavaPairReceiverInputDStream<String, EventLog> messages =   KafkaUtils.createStream(jssc, String.class, EventLog.class, StringDecoder.class, EventLogDecoder.class,     kafkaParams, topicMap, StorageLevel.MEMORY_AND_DISK_SER_2()); JavaDStream<EventLog> lines = messages.map(new Function<Tuple2<String, EventLog>, EventLog>() {     @Override     public EventLog call(Tuple2<String, EventLog> tuple2) {         return tuple2._2();     } }); lines.foreachRDD(rdd -> {     javaFunctions(rdd).writerBuilder("test", "event_log", mapToRow(EventLog.class)).saveToCassandra(); }); jssc.start(); 

In my Cassandra table event_log, there is a column named offsetid to store the offset ID of the stream. How do I get the offset id till where this stream has read the Kafka stream and store it in Cassandra?

After saving it in Cassandra, I want to use the latest offset id to be used when Spark is started again. How do I do that?

2 Answers

Answers 1

So, you want to manage kafka offsets on your own.

For this:

  1. use createDirectStream instead of createStream. That will allow you to specify from what offsets you would like to read (fromOffsets: Map[TopicAndPartition, Long])

  2. collect information about offsets you already processed. That can be done by saving offset for each message or you can have this information aggregated in separate table. To get offsets range from rdd: rdd.asInstanceOf[HasOffsetRanges].offsetRanges. For java (according to documentation) http://spark.apache.org/docs/latest/streaming-kafka-integration.html OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

Answers 2

Below is the code for reference you may need to change the things as per your requirement. What I have done with the code and approach is that maintain Kafka partition wise offset for each topic in Cassandra(This can be done in zookeeper also as a suggestion using its java api). Store or update the the latest offset range for the topic with each string message received, in EventLog table. So always retrieve from table and see if present, then create direct stream from that offset, otherwise fresh direct stream.

package com.spark;  import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions; import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapRowTo;  import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map;  import kafka.common.TopicAndPartition; import kafka.message.MessageAndMetadata; import kafka.serializer.StringDecoder;  import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.HasOffsetRanges; import org.apache.spark.streaming.kafka.KafkaUtils; import org.apache.spark.streaming.kafka.OffsetRange;  import scala.Tuple2;  public class KafkaChannelFetchOffset {     public static void main(String[] args) {         String topicName = "topicName";         SparkConf sparkConf = new SparkConf().setAppName("name");         JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));         HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topicName));         HashMap<TopicAndPartition, Long> kafkaTopicPartition = new HashMap<TopicAndPartition, Long>();         Map<String, String> kafkaParams = new HashMap<>();         kafkaParams.put("zookeeper.connect", "127.0.0.1");         kafkaParams.put("group.id", "GROUP");         kafkaParams.put("metadata.broker.list", "127.0.0.1");         List<EventLog> eventLogList = javaFunctions(jssc).cassandraTable("test", "event_log", mapRowTo(EventLog.class))                 .select("topicName", "partion", "fromOffset", "untilOffset").where("topicName=?", topicName).collect();         JavaDStream<String> kafkaOutStream = null;         if (eventLogList == null || eventLogList.isEmpty()) {             kafkaOutStream = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams,                     topicsSet).transform(new Function<JavaPairRDD<String, String>, JavaRDD<String>>() {                 @Override                 public JavaRDD<String> call(JavaPairRDD<String, String> pairRdd) throws Exception {                     JavaRDD<String> rdd = pairRdd.map(new Function<Tuple2<String, String>, String>() {                         @Override                         public String call(Tuple2<String, String> arg0) throws Exception {                             return arg0._2;                         }                     });                     writeOffset(rdd, ((HasOffsetRanges) rdd.rdd()).offsetRanges());                     return rdd;                 }             });         } else {             for (EventLog eventLog : eventLogList) {                 kafkaTopicPartition.put(new TopicAndPartition(topicName, Integer.parseInt(eventLog.getPartition())),                         Long.parseLong(eventLog.getUntilOffset()));             }             kafkaOutStream = KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class,                     kafkaParams, kafkaTopicPartition, new Function<MessageAndMetadata<String, String>, String>() {                         @Override                         public String call(MessageAndMetadata<String, String> arg0) throws Exception {                             return arg0.message();                         }                     }).transform(new Function<JavaRDD<String>, JavaRDD<String>>() {                  @Override                 public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception {                     writeOffset(rdd, ((HasOffsetRanges) rdd.rdd()).offsetRanges());                     return rdd;                 }             });         }         // Use kafkaOutStream for further processing.         jssc.start();     }      private static void writeOffset(JavaRDD<String> rdd, final OffsetRange[] offsets) {         for (OffsetRange offsetRange : offsets) {             EventLog eventLog = new EventLog();             eventLog.setTopicName(String.valueOf(offsetRange.topic()));             eventLog.setPartition(String.valueOf(offsetRange.partition()));             eventLog.setFromOffset(String.valueOf(offsetRange.fromOffset()));             eventLog.setUntilOffset(String.valueOf(offsetRange.untilOffset()));             javaFunctions(rdd).writerBuilder("test", "event_log", null).saveToCassandra();         }     } } 

Hope this helps and resolve your problem...

If You Enjoyed This, Take 5 Seconds To Share It

0 comments:

Post a Comment