I have a dataframe with roughly 200-600 gb of data I am reading, manipulating, and then writing to csv using the spark shell (scala)
on an elastic map reduce cluster.Spark write to CSV fails even after 8 hours
here's how I'm writing to csv:
result.persist.coalesce(20000).write.option("delimiter",",").csv("s3://bucket-name/results")
The result variable is created through a mix of columns from some other dataframes: var result=sources.join(destinations, Seq("source_d","destination_d")).select("source_i","destination_i")
Now, I am able to read the csv data it is based on in roughly 22 minutes. In this same program, I'm also able to write another (smaller) dataframe to csv in 8 minutes. However, for this result
dataframe it takes 8+ hours and still fails ... saying one of the connections was closed.
I'm also running this job on 13 x c4.8xlarge instances on ec2
, with 36 cores each and 60 gb of ram, so I thought I'd have the capacity to write to csv, especially after 8 hours.
Many stages required retries or had failed tasks and I can't figure out what I'm doing wrong or why it's taking so long. I can see from the Spark UI that it never even got to the write CSV stage and was busy with persist stages, but without the persist function it was still failing after 8 hours. Any ideas? Help is greatly appreciated!
Update:
I've ran the following command to repartition the result
variable into 66K partitions:
val r2 = result.repartition(66000) #confirmed with numpartitions r2.write.option("delimiter",",").csv("s3://s3-bucket/results")
However, even after several hours, the jobs are still failing. What am I doing wrong still?
note, I'm running spark shell via spark-shell yarn --driver-memory 50G
Update 2:
I've tried running the write with a persist first:
r2.persist(StorageLevel.MEMORY_AND_DISK)
But I had many stages fail, returning a, Job aborted due to stage failure: ShuffleMapStage 10 (persist at <console>:36) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 3'
or saying Connection from ip-172-31-48-180.ec2.internal/172.31.48.180:7337 closed
Spark web UI page for a node returning a shuffle error
Spark web UI page for a node returning an ec2 connection closed error
2 Answers
Answers 1
I can see from the Spark UI that it never even got to the write CSV stage and was busy with persist stages, but without the persist function it was still failing after 8 hours. Any ideas?
It is FetchFailedException
i.e Failed to fetch a shuffle block
Since you are able to deal with small files, only huge data its failed... I strongly feel that not enough partitions.
Fist thing is verify/Print source.rdd.getNumPartitions()
. and destinations.rdd.getNumPartitions()
. and result.rdd.getNumPartitions()
.
You need to repartition after the data is loaded in order to partition the data (via shuffle) to other nodes in the cluster. This will give you the parallelism that you need for faster processing with out fail
Further more, to verify the other configurations applied... print all the config like this, adjust them to correct values as per demand.
sc.getConf.getAll
Also have a look at
- SPARK-5928
- Spark-TaskRunner-FetchFailedException Possible reasons : OOM or Container memory limits
Answers 2
repartition both source and destination before joining, with number of partitions such that each partition would be 10MB - 128MB(try to tune), there is no need to make it 20000(imho too many). then join by those two columns and then write, without repartitioning(ie. output partitions should be same as reparitioning before join)
if you still have trouble, try to make same thing after converting to both dataframes to rdd(there are some differences between apis, and especially regarding repartitions, key-value rdds etc)
0 comments:
Post a Comment