Saturday, March 24, 2018

Spark with Kafka streaming save to Elastic search slow performance

Leave a Comment

I have a list of data, the value is basically a bson document (think json), each json ranges from 5k to 20k in size. It either can be in bson object format or can be converted to json directly:

Key, Value -------- K1, JSON1 K1, JSON2 K2, JSON3 K2, JSON4 

I expect the groupByKey would produce:

K1, (JSON1, JSON2) K2, (JSON3, JSON4) 

so that when I do:

val data = [...].map(x => (x.Key, x.Value)) val groupedData = data.groupByKey() groupedData.foreachRDD { rdd =>    //the elements in the rdd here are not really grouped by the Key } 

I am so confused the the behaviour of the RDD. I read many articles in the internet including the official website from Spark: https://spark.apache.org/docs/0.9.1/scala-programming-guide.html

Still couldn't achieve what I want.

-------- UPDATED ---------------------

Basically I really need it to be grouped by the key, the key is the index to be used in Elasticsearch, so that I can perform batch process based on the key via Elasticsearch for Hadoop:

EsSpark.saveToEs(rdd); 

I can't do per partition because Elasticsearch only accept RDD. I tried to use sc.MakeRDD or sc.parallize, both telling me it is not serializable.

I tried to use:

EsSpark.saveToEs(rdd, Map(           "es.resource.write" -> "{TheKeyFromTheObjectAbove}",           "es.batch.size.bytes" -> "5000000") 

Documentation of the config is here: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html

But it is VERY slow comparing to not using the configuration to define dynamic index based on the value of individual document, I suspect it is parsing every json to fetch the value dynamically.

1 Answers

Answers 1

Here is the example.

import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession  object Test extends App {    val session: SparkSession = SparkSession     .builder.appName("Example")     .config(new SparkConf().setMaster("local[*]"))     .getOrCreate()   val sc = session.sparkContext    import session.implicits._    case class Message(key: String, value: String)    val input: Seq[Message] =     Seq(Message("K1", "foo1"),       Message("K1", "foo2"),       Message("K2", "foo3"),       Message("K2", "foo4"))    val inputRdd: RDD[Message] = sc.parallelize(input)    val intermediate: RDD[(String, String)] =     inputRdd.map(x => (x.key, x.value))   intermediate.toDF().show()   //  +---+----+   //  | _1|  _2|   //  +---+----+   //  | K1|foo1|   //  | K1|foo2|   //  | K2|foo3|   //  | K2|foo4|   //  +---+----+    val output: RDD[(String, List[String])] =     intermediate.groupByKey().map(x => (x._1, x._2.toList))   output.toDF().show()   //  +---+------------+   //  | _1|          _2|   //  +---+------------+   //  | K1|[foo1, foo2]|   //  | K2|[foo3, foo4]|   //  +---+------------+    output.foreachPartition(rdd => if (rdd.nonEmpty) {     println(rdd.toList)   })   //  List((K1,List(foo1, foo2)))   //  List((K2,List(foo3, foo4)))  } 
If You Enjoyed This, Take 5 Seconds To Share It

0 comments:

Post a Comment