I want to receive messages from a topic in Kafka (broker v 0.10.2.1) using Spark (1.6.2) Streaming.
I'm using the Receiver
approach. The code is as following code:
public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf().setAppName("SimpleStreamingApp"); JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, new Duration(5000)); // Map<String, Integer> topicMap = new HashMap<>(); topicMap.put("myTopic", 1); // String zkQuorum = "host1:port1,host2:port2,host3:port3"; // Map<String, String> kafkaParamsMap = new HashMap<>(); kafkaParamsMap.put("bootstraps.server", zkQuorum); kafkaParamsMap.put("metadata.broker.list", zkQuorum); kafkaParamsMap.put("zookeeper.connect", zkQuorum); kafkaParamsMap.put("group.id", "group_name"); kafkaParamsMap.put("security.protocol", "SASL_PLAINTEXT"); kafkaParamsMap.put("security.mechanism", "GSSAPI"); kafkaParamsMap.put("ssl.kerberos.service.name", "kafka"); kafkaParamsMap.put("key.deserializer", "kafka.serializer.StringDecoder"); kafkaParamsMap.put("value.deserializer", "kafka.serializer.DefaultDecoder"); // JavaPairReceiverInputDStream<byte[], byte[]> stream = KafkaUtils.createStream(javaStreamingContext, byte[].class, byte[].class, DefaultDecoder.class, DefaultDecoder.class, kafkaParamsMap, topicMap, StorageLevel.MEMORY_ONLY()); VoidFunction<JavaPairRDD<byte[], byte[]>> voidFunc = new VoidFunction<JavaPairRDD<byte[], byte[]>> () { public void call(JavaPairRDD<byte[], byte[]> rdd) throws Exception { List<Tuple2<byte[], byte[]>> all = rdd.collect(); System.out.println("size of red: " + all.size()); } } stream.forEach(voidFunc); javaStreamingContext.start(); javaStreamingContext.awaitTermination(); }
Access to Kafka is kerberized. When I launch
spark-submit --verbose --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=jaas.conf"
--files jaas.conf,privKey.der
--principal <accountName>
--keytab <path to keytab file>
--master yarn
--jars <comma separated path to all jars>
--class <fully qualified java main class>
<path to jar file containing main class>
VerifiableProperties
class from Kafka logs warning messages for the properties included in thekafkaParams
hashmap:
INFO KafkaReceiver: connecting to zookeeper: <the correct zookeeper quorum provided in kafkaParams map> VerifiableProperties: Property auto.offset.reset is overridden to largest VerifiableProperties: Property enable.auto.commit is not valid. VerifiableProperties: Property sasl.kerberos.service.name is not valid VerifiableProperties: Property key.deserializer is not valid ... VerifiableProperties: Property zookeeper.connect is overridden to ....
I think because these properties are not accepted, so it might be affecting the stream processing.
** when I launch in the cluster mode --master yarn
, then these warning messages don't appear**
Later, I see following logs repeated every 5 seconds as configured:
INFO BlockRDD: Removing RDD 4 from persistence list
INFO KafkaInputDStream: Removing blocks of RDD BlockRDD[4] at createStream at ...
INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
INFO ... INFO BlockManager: Removing RDD 4
However, I don't see any actual message getting printed on the console.
Question: Why is my code not printing any actual messages?
My gradle dependencies are:
compile group: 'org.apache.spark', name: 'spark-core_2.10', version: '1.6.2' compile group: 'org.apache.spark', name: 'spark-streaming_2.10', version: '1.6.2' compile group: 'org.apache.spark', name: 'spark-streaming-kafka_2.10', version: '1.6.2'
2 Answers
Answers 1
stream is an object of JavaPairReceiverInputDStream. Convert it into Dstream and use foreachRDD to print the messages that are consumed from Kafka
Answers 2
Spark 1.6.2 not support kafka 0.10 ,just support kafka0.8. For kafka 0.10 ,you should use spark 2
0 comments:
Post a Comment