I am using Spark sql dataframes to perform a groupby operation and then compute the mean and median of data for each group. The original amount of data is about 1 terabyte.
val df_result = df.filter($"DayOfWeek" <= 5).groupBy("id").agg( count("Error").as("Count"), avg("Error").as("MeanError"), callUDF("percentile_approx", col("Error"), lit(0.05)).as("5thError"), callUDF("percentile_approx", col("Error"), lit(0.5)).as("MedianError"), callUDF("percentile_approx", col("Error"), lit(0.95)).as("95thError")). filter($"Count" > 1000) df_result.orderBy(asc("MeanError")).limit(5000) .write.format("csv").option("header", "true").save("/user/foo.bar/result.csv") When I run that query, my job gets stuck and does not complete. How do I go about debugging the problem? Is there a key imbalance that causes the groupby() to get stuck?
1 Answers
Answers 1
There are lots of sensible suggestions already in the comments, but for what it's worth here are my thoughts:
1) Does df.count work? If not, your problem is before the code you've posted (as suggested in comments)
2) Look in the Spark UI (as suggested in comments) - do most tasks complete quickly with a few taking a long while/appearing stuck? If so, skew is likely to be your problem
3) You could potentially rewrite your query to first only find the 'count' per 'id'. Next filter your original df to contain only rows where the id appears more than 1000 times through a broadcasted (to avoid shuffle of df) inner join (if there aren't too many ids with more than 1000 occurrences). Then aggregate this smaller dataframe and calculate all your statistics. If the count aggregation works, the output should also show if there's any significant data skew!
4) Sometimes breaking the computation up into even smaller steps and writing and then immediately reading from disk has helped me get awkward jobs to complete in the past. Also can make debugging quicker if generating df is costly in the first instance.
5) Definitely worth upping spark.sql.shuffle.partitions (as suggested in comments); 2001 is a magic number in spark (What should be the optimal value for spark.sql.shuffle.partitions or how do we increase partitions when using Spark SQL?)
6) I would also try varying the amount of data, does it work if you use only use day of week = 1 (as suggested in comments)
7) Does the query run without the percentile_approx?
0 comments:
Post a Comment