We are receiving data in spark streaming from Kafka. Once execution has been started in Spark Streaming, it executes only one batch and the remaining batches starting queuing up in Kafka.
Our data is independent and can be processes in Parallel.
We tried multiple configurations with multiple executor, cores, back pressure and other configurations but nothing worked so far. There are a lot messages queued and only one micro batch has been processed at a time and rest are remained in queue.
We want to achieve parallelism at maximum, so that not any micro batch is queued, as we have enough resources available. So how we can reduce time by maximum utilization of resources.
// Start reading messages from Kafka and get DStream final JavaInputDStream<ConsumerRecord<String, byte[]>> consumerStream = KafkaUtils.createDirectStream( getJavaStreamingContext(), LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, byte[]>Subscribe("TOPIC_NAME", sparkServiceConf.getKafkaConsumeParams())); ThreadContext.put(Constants.CommonLiterals.LOGGER_UID_VAR, CommonUtils.loggerUniqueId()); JavaDStream<byte[]> messagesStream = consumerStream.map(new Function<ConsumerRecord<String, byte[]>, byte[]>() { private static final long serialVersionUID = 1L; @Override public byte[] call(ConsumerRecord<String, byte[]> kafkaRecord) throws Exception { return kafkaRecord.value(); } }); // Decode each binary message and generate JSON array JavaDStream<String> decodedStream = messagesStream.map(new Function<byte[], String>() { private static final long serialVersionUID = 1L; @Override public String call(byte[] asn1Data) throws Exception { if(asn1Data.length > 0) { try (InputStream inputStream = new ByteArrayInputStream(asn1Data); Writer writer = new StringWriter(); ) { ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(asn1Data); GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream); byte[] buffer = new byte[1024]; ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); int len; while((len = gzipInputStream.read(buffer)) != -1) { byteArrayOutputStream.write(buffer, 0, len); } return new String(byteArrayOutputStream.toByteArray()); } catch (Exception e) { // producer.flush(); throw e; } } return null; } }); // publish generated json gzip to kafka cache.foreachRDD(new VoidFunction<JavaRDD<String>>() { private static final long serialVersionUID = 1L; @Override public void call(JavaRDD<String> jsonRdd4DF) throws Exception { //Dataset<Row> json = sparkSession.read().json(jsonRdd4DF); if(!jsonRdd4DF.isEmpty()) { //JavaRDD<String> jsonRddDF = getJavaSparkContext().parallelize(jsonRdd4DF.collect()); Dataset<Row> json = sparkSession.read().json(jsonRdd4DF); SparkAIRMainJsonProcessor airMainJsonProcessor = new SparkAIRMainJsonProcessor(); airMainJsonProcessor.processAIRData(json, sparkSession); } } }); getJavaStreamingContext().start(); getJavaStreamingContext().awaitTermination(); getJavaStreamingContext().stop();
Technology that we are using:
HDFS 2.7.1.2.5 YARN + MapReduce2 2.7.1.2.5 ZooKeeper 3.4.6.2.5 Ambari Infra 0.1.0 Ambari Metrics 0.1.0 Kafka 0.10.0.2.5 Knox 0.9.0.2.5 Ranger 0.6.0.2.5 Ranger KMS 0.6.0.2.5 SmartSense 1.3.0.0-1 Spark2 2.0.x.2.5
Statistics that we got from difference experimentations:
Experiment 1
num_executors=6 executor_memory=8g executor_cores=12
100 Files processing time 48 Minutes
Experiment 2
spark.default.parallelism=12 num_executors=6 executor_memory=8g executor_cores=12
100 Files processing time 8 Minutes
Experiment 3
spark.default.parallelism=12 num_executors=6 executor_memory=8g executor_cores=12
100 Files processing time 7 Minutes
Experiment 4
spark.default.parallelism=16 num_executors=6 executor_memory=8g executor_cores=12
100 Files processing time 10 Minutes
Please advise, how we can process maximum so no queued.
2 Answers
Answers 1
We want to achieve parallelism at maximum, so that not any micro batch is queued
That's the thing about stream processing: you process the data in the order it was received. If you process your data at the rate slower than it arrives it will be queued. Also, don't expect that processing of one record will suddenly be parallelized across multiple nodes.
From your screenshot, it seems your batch time is 10 seconds and your producer published 100 records over 90 seconds.
It took 36s to process 2 records and 70s to process 17 records. Clearly, there is some per-batch overhead. If this dependency is linear, it would take only 4:18 to process all 100 records in a single mini-batch thus beating your record holder.
Since your code is not complete, it's hard to tell what exactly takes so much time. Transformations in the code look fine but probably the action (or subsequent transformations) are the real bottlenecks. Also, what's with producer.flush()
which wasn't mentioned anywhere in your code?
Answers 2
It's hard to tell without having all the details, but general advice to tackle issues like that -- start with very simple application, "Hello world" kind. Just read from input stream and print data into log file. Once this works you prove that problem was in application and you gradually add your functionality back until you find what was culprit. If even simplest app doesn't work - you know that problem in configuration or Spark cluster itself. Hope this helps.
0 comments:
Post a Comment