Wednesday, December 13, 2017

EMR Spark duplicating every action and job keeps running

Leave a Comment

I have created a scala application that uses Apache Spark to retrieve data from s3, do some transformation on it and save it.

I am using Apache Spark 2.0.2 configured in a 50 (r3.4xLarge) cluster mode.

hive-env.export HADOOP_HEAPSIZE 8192 spark.executor.cores             5 spark.executor.instances         149 spark.driver.memory              106124M spark.executor.memory            38000M spark.default.parallelism        5000 spark.sql.shuffle.partitions     1000 spark.kryoserializer.buffer.max  1024m  spark.sql.hive.convertMetastoreParquet false spark.hadoop.mapreduce.input.fileinputformat.split.maxsize 2560000000 spark.files.maxPartitionBytes 2560000000 spark.network.timeout            500s 

The job is running for more than 2 days now. Tried changing executor size, memory and al no use. I am seeing in the spark ui -

Active : Stage 0 persist at ItemBuilder.scala:197   Stage 1 persist at ItemBuilder.scala:197  Stage 0 and 1 persists shows : Tasks: Succeeded/Total = 115475/204108  Pending : Stage 2 persist at ItemBuilder.scala:197   Stage 2 persists shows : Tasks: Succeeded/Total =  0/400  Stage 3 count at ItemBuilder.scala:202 Stage 3 count shows : Tasks: Succeeded/Total =  0/200  Stage 4 count at ItemBuilder.scala:202 Stage 4 count shows : Tasks: Succeeded/Total =  0/1 

Can some one tell me why I am seeing persist 3 times ? and count 2 times ?

Here is my code :

val textFiles = sqlSession.sparkContext.textFile( files.mkString( "," ) )  val jsonFiles = sqlSession.read.schema( schema ).json( textFiles )  log.info( "Job is in progress" )  val filteredItemDetails = jsonFiles.filter( col( ITEM_ID ).isNotNull ).filter( length( col( ITEM_ID ) ) > 0 )  val itemDetails = filteredItemDetails.withColumn( ITEMS, explode( filteredItemDetails( ITEMS ) ) )   .filter( size( col(ITEM_EVENTS ) ) > 0 )   .filter( col( ITEM_TIMESTAMP ).isNotNull )   .select( ITEM_ID, EVENTS_ITEM_ENTRY, ITEM_TIMESTAMP )  val convertTimestamp = udf { (timestampString: String) => {     DateUtils.getSqlTimeStamp(timestampString)   } }  val itemDetailsWithTimestamp = itemDetails.withColumn(TIME_STAMP_CONVERTED, convertTimestamp(col(TIME_STAMP)))  val recentTime = DateUtils.getSqlTimeStamp( endTime )  val groupedData = itemDetailsWithTimestamp.groupBy( ITEM_ID, ITEM_ENTRY_ID )   .agg( datediff( lit( recentTime ), max( TIME_STAMP_CONVERTED ) ) as DAY_DIFFERENCE, count( ITEM_ENTRY_ID ) as FREQUENCY )   val toMap = udf { (itemType: String, count: Int) => Map( itemType -> count ) }  val tempResult = groupedData.withColumn( FREQUENT_DAYS, toMap( col( ITEM_ENTRY_ID ), col( DAY_DIFFERENCE ) ) )   .withColumn( FREQUENCY_COUNT, toMap( col( ITEM_ENTRY_ID ), col( FREQUENCY ) ) )   .drop( ITEM_ENTRY_ID )   .drop( DAY_DIFFERENCE )   .drop( FREQUENCY )  val result = tempResult.groupBy( ITEM_ID )   .agg( CombineMaps( col( FREQUENT_DAYS ) ) as FREQUENT_DAYS,     CombineMaps( col( FREQUENCY_COUNT ) ) as FREQUENCY_COUNT )   .persist( DISK_ONLY )   log.info( "Aggregation is completed." )  val totalItems = result.count( )  log.info( "Total Items = " + totalItems ) 

And in the Resource manager I am seeing :

Memory Used = 5.52 TB Memory Total = 5.52 TB Memory Reserved = 113.13 GB VCores Used = 51 VCores Total = 51 VCores Reserved = 1  And Application Queues shows : Used (over capacity) Used Capacity:  101.2% Configured Capacity:    100.0% 

Can some one tell me am I mis configured anything here ? My job is stuck at stage 0 itself.

I tried to test with reducing the data. It works fine then, but I used the original data I keep getting :

org.apache.spark.SparkException: Job aborted due to stage failure: Task 204170 in stage 16.0 failed 4 times, most recent failure: Lost task 204170.4 in stage 16.0 (TID 1278745, ip-172-31-12-41.ec2.internal): ExecutorLostFailure (executor 520 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 626834 ms Driver stacktrace:   at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)   at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)   at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)   at scala.Option.foreach(Option.scala:257)   at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)   at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1913)   at org.apache.spark.rdd.RDD.count(RDD.scala:1134)   ... 242 elided 

I am also seeing :

Dropping SparkListenerEvent because no remaining room in event queue. This likely means one of the SparkListeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler. 

1 Answers

Answers 1

I believe those are stages of the job and not the actual persist/count happening twice. Stages are group of parallel tasks which can happen at the same time without incurring a shuffle. I see 2 groupBy s in your code which requires shuffle hence the 2 stages. Does that help?

If You Enjoyed This, Take 5 Seconds To Share It

0 comments:

Post a Comment