I have a huge time series data and I want to do data processing using spark`s parallel processing/distributed computation. The requirement is looking at the data row by row to determine the groups as specified below under desired result sections, I can't really get spark to distribute this without some kind of coordination between the executors
t- timeseries datetime sample, lat-latitude, long-longitude
For instance : Taking a small part of sample data-set for explaining the case
t lat long 0 27 28 5 27 28 10 27 28 15 29 49 20 29 49 25 27 28 30 27 28
Desired Output should be :
Lat-long interval (27,28) (0,10) (29,49) (15,20) (27,28) (25,30)
I am able to get the desired result using this piece of code
val spark = SparkSession.builder().master("local").getOrCreate() import spark.implicits._ val df = Seq( (0, 27,28), (5, 27,28), (10, 27,28), (15, 26,49), (20, 26,49), (25, 27,28), (30, 27,28) ).toDF("t", "lat","long") val dfGrouped = df .withColumn("lat-long", struct($"lat", $"long")) val wAll = Window.partitionBy().orderBy($"t".asc) dfGrouped.withColumn("lag", lag("lat-long", 1, null).over(wAll)) .orderBy(asc("t")).withColumn("detector", when($"lat-long" === $"lag", 0) .otherwise(1)).withColumn("runningTotal", sum("detector").over(wAll)) .groupBy("runningTotal", "lat-long").agg(struct(min("t"), max("t")).as("interval")) .drop("runningTotal").show }
But what If the data gets into two executors then the data will be like
Data in executor 1 :
t lat long 0 27 28 5 27 28 10 27 28 15 29 49 20 29 49 25 27 28
Data in executor 2 :
t lat long 30 27 28
How should I get the desired output for large amount of data.There must be smarter ways to do this ,distributing this with some kind of coordination between the executors so as to get that result.
Please guide me through a right direction,I have researched about the same but not being able to land up to a solution.
PS: This just a sample example.
1 Answers
Answers 1
You can address this with a UDAF. First of all, you could add one column thats represent the t column partitioned in a number of executor you have. Something like executorIndex = t % ((max(t) - min(t)) / numExecutors).
Then you can apply your UDAF grouping by executorIndex.
Your UDAF need store a Map with a String key (for example) thats represents one lat and long pair, and a int[] thats represents the maxT and the minT for this lat-long key.
Please ask if you need more extensive explanation.
Hope this help...
PS: I'm suming that there are some time relation between same lat and long, something normal if your are tracking some movement...
0 comments:
Post a Comment