Wednesday, June 21, 2017

Spark write to CSV fails even after 8 hours

Leave a Comment

I have a dataframe with roughly 200-600 gb of data I am reading, manipulating, and then writing to csv using the spark shell (scala) on an elastic map reduce cluster.Spark write to CSV fails even after 8 hours

here's how I'm writing to csv:

result.persist.coalesce(20000).write.option("delimiter",",").csv("s3://bucket-name/results") 

The result variable is created through a mix of columns from some other dataframes: var result=sources.join(destinations, Seq("source_d","destination_d")).select("source_i","destination_i")

Now, I am able to read the csv data it is based on in roughly 22 minutes. In this same program, I'm also able to write another (smaller) dataframe to csv in 8 minutes. However, for this result dataframe it takes 8+ hours and still fails ... saying one of the connections was closed.

I'm also running this job on 13 x c4.8xlarge instances on ec2, with 36 cores each and 60 gb of ram, so I thought I'd have the capacity to write to csv, especially after 8 hours.

Many stages required retries or had failed tasks and I can't figure out what I'm doing wrong or why it's taking so long. I can see from the Spark UI that it never even got to the write CSV stage and was busy with persist stages, but without the persist function it was still failing after 8 hours. Any ideas? Help is greatly appreciated! Spark web UI

Update:

I've ran the following command to repartition the result variable into 66K partitions:

val r2 = result.repartition(66000) #confirmed with numpartitions r2.write.option("delimiter",",").csv("s3://s3-bucket/results") 

However, even after several hours, the jobs are still failing. What am I doing wrong still?

stages breakdown failed stages

note, I'm running spark shell via spark-shell yarn --driver-memory 50G

Update 2:

I've tried running the write with a persist first:

r2.persist(StorageLevel.MEMORY_AND_DISK) 

But I had many stages fail, returning a, Job aborted due to stage failure: ShuffleMapStage 10 (persist at <console>:36) has failed the maximum allowable number of times: 4. Most recent failure reason: org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 3' or saying Connection from ip-172-31-48-180.ec2.internal/172.31.48.180:7337 closed

Executors page Executors page

Spark web UI page for a node returning a shuffle error Spark web UI page for a node returning a shuffle error

Spark web UI page for a node returning an ec2 connection closed error Spark web UI page for a node returning an ec2 connection closed error

Overall Job Summary page Overall Job Summary page

2 Answers

Answers 1

I can see from the Spark UI that it never even got to the write CSV stage and was busy with persist stages, but without the persist function it was still failing after 8 hours. Any ideas?

enter image description here

It is FetchFailedException i.e Failed to fetch a shuffle block

Since you are able to deal with small files, only huge data its failed... I strongly feel that not enough partitions.

Fist thing is verify/Print source.rdd.getNumPartitions(). and destinations.rdd.getNumPartitions(). and result.rdd.getNumPartitions().

You need to repartition after the data is loaded in order to partition the data (via shuffle) to other nodes in the cluster. This will give you the parallelism that you need for faster processing with out fail

Further more, to verify the other configurations applied... print all the config like this, adjust them to correct values as per demand.

sc.getConf.getAll 

Also have a look at

Answers 2

repartition both source and destination before joining, with number of partitions such that each partition would be 10MB - 128MB(try to tune), there is no need to make it 20000(imho too many). then join by those two columns and then write, without repartitioning(ie. output partitions should be same as reparitioning before join)

if you still have trouble, try to make same thing after converting to both dataframes to rdd(there are some differences between apis, and especially regarding repartitions, key-value rdds etc)

If You Enjoyed This, Take 5 Seconds To Share It

0 comments:

Post a Comment