Wednesday, February 7, 2018

Spark streaming Kafka messages not consumed

Leave a Comment

I want to receive messages from a topic in Kafka (broker v 0.10.2.1) using Spark (1.6.2) Streaming.

I'm using the Receiver approach. The code is as following code:

public static void main(String[] args) throws Exception {     SparkConf sparkConf = new SparkConf().setAppName("SimpleStreamingApp");     JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, new Duration(5000));     //     Map<String, Integer> topicMap = new HashMap<>();     topicMap.put("myTopic", 1);     //     String zkQuorum = "host1:port1,host2:port2,host3:port3";     //     Map<String, String> kafkaParamsMap = new HashMap<>();     kafkaParamsMap.put("bootstraps.server", zkQuorum);     kafkaParamsMap.put("metadata.broker.list", zkQuorum);     kafkaParamsMap.put("zookeeper.connect", zkQuorum);     kafkaParamsMap.put("group.id", "group_name");     kafkaParamsMap.put("security.protocol", "SASL_PLAINTEXT");     kafkaParamsMap.put("security.mechanism", "GSSAPI");     kafkaParamsMap.put("ssl.kerberos.service.name", "kafka");     kafkaParamsMap.put("key.deserializer", "kafka.serializer.StringDecoder");     kafkaParamsMap.put("value.deserializer", "kafka.serializer.DefaultDecoder");     //     JavaPairReceiverInputDStream<byte[], byte[]> stream = KafkaUtils.createStream(javaStreamingContext,                             byte[].class, byte[].class,                             DefaultDecoder.class, DefaultDecoder.class,                             kafkaParamsMap,                             topicMap,                             StorageLevel.MEMORY_ONLY());      VoidFunction<JavaPairRDD<byte[], byte[]>> voidFunc = new VoidFunction<JavaPairRDD<byte[], byte[]>> ()     {        public void call(JavaPairRDD<byte[], byte[]> rdd) throws Exception        {           List<Tuple2<byte[], byte[]>> all = rdd.collect();           System.out.println("size of red: " + all.size());        }     }      stream.forEach(voidFunc);      javaStreamingContext.start();     javaStreamingContext.awaitTermination(); } 

Access to Kafka is kerberized. When I launch

spark-submit --verbose --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf" --files jaas.conf,privKey.der --principal <accountName> --keytab <path to keytab file> --master yarn --jars <comma separated path to all jars> --class <fully qualified java main class> <path to jar file containing main class>

  1. VerifiableProperties class from Kafka logs warning messages for the properties included in the kafkaParams hashmap:
INFO KafkaReceiver: connecting to zookeeper: <the correct zookeeper quorum provided in kafkaParams map>  VerifiableProperties: Property auto.offset.reset is overridden to largest VerifiableProperties: Property enable.auto.commit is not valid. VerifiableProperties: Property sasl.kerberos.service.name is not valid VerifiableProperties: Property key.deserializer is not valid ... VerifiableProperties: Property zookeeper.connect is overridden to .... 

I think because these properties are not accepted, so it might be affecting the stream processing.

** when I launch in the cluster mode --master yarn, then these warning messages don't appear**

  1. Later, I see following logs repeated every 5 seconds as configured:

    INFO BlockRDD: Removing RDD 4 from persistence list

    INFO KafkaInputDStream: Removing blocks of RDD BlockRDD[4] at createStream at ...

    INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()

    INFO ... INFO BlockManager: Removing RDD 4

However, I don't see any actual message getting printed on the console.

Question: Why is my code not printing any actual messages?

My gradle dependencies are:

compile group: 'org.apache.spark', name: 'spark-core_2.10', version: '1.6.2' compile group: 'org.apache.spark', name: 'spark-streaming_2.10', version: '1.6.2' compile group: 'org.apache.spark', name: 'spark-streaming-kafka_2.10', version: '1.6.2' 

2 Answers

Answers 1

stream is an object of JavaPairReceiverInputDStream. Convert it into Dstream and use foreachRDD to print the messages that are consumed from Kafka

Answers 2

Spark 1.6.2 not support kafka 0.10 ,just support kafka0.8. For kafka 0.10 ,you should use spark 2

If You Enjoyed This, Take 5 Seconds To Share It

0 comments:

Post a Comment