I have created a scala application that uses Apache Spark to retrieve data from s3, do some transformation on it and save it.
I am using Apache Spark 2.0.2 configured in a 50 (r3.4xLarge) cluster mode.
hive-env.export HADOOP_HEAPSIZE 8192 spark.executor.cores 5 spark.executor.instances 149 spark.driver.memory 106124M spark.executor.memory 38000M spark.default.parallelism 5000 spark.sql.shuffle.partitions 1000 spark.kryoserializer.buffer.max 1024m spark.sql.hive.convertMetastoreParquet false spark.hadoop.mapreduce.input.fileinputformat.split.maxsize 2560000000 spark.files.maxPartitionBytes 2560000000 spark.network.timeout 500s
The job is running for more than 2 days now. Tried changing executor size, memory and al no use. I am seeing in the spark ui -
Active : Stage 0 persist at ItemBuilder.scala:197 Stage 1 persist at ItemBuilder.scala:197 Stage 0 and 1 persists shows : Tasks: Succeeded/Total = 115475/204108 Pending : Stage 2 persist at ItemBuilder.scala:197 Stage 2 persists shows : Tasks: Succeeded/Total = 0/400 Stage 3 count at ItemBuilder.scala:202 Stage 3 count shows : Tasks: Succeeded/Total = 0/200 Stage 4 count at ItemBuilder.scala:202 Stage 4 count shows : Tasks: Succeeded/Total = 0/1
Can some one tell me why I am seeing persist 3 times ? and count 2 times ?
Here is my code :
val textFiles = sqlSession.sparkContext.textFile( files.mkString( "," ) ) val jsonFiles = sqlSession.read.schema( schema ).json( textFiles ) log.info( "Job is in progress" ) val filteredItemDetails = jsonFiles.filter( col( ITEM_ID ).isNotNull ).filter( length( col( ITEM_ID ) ) > 0 ) val itemDetails = filteredItemDetails.withColumn( ITEMS, explode( filteredItemDetails( ITEMS ) ) ) .filter( size( col(ITEM_EVENTS ) ) > 0 ) .filter( col( ITEM_TIMESTAMP ).isNotNull ) .select( ITEM_ID, EVENTS_ITEM_ENTRY, ITEM_TIMESTAMP ) val convertTimestamp = udf { (timestampString: String) => { DateUtils.getSqlTimeStamp(timestampString) } } val itemDetailsWithTimestamp = itemDetails.withColumn(TIME_STAMP_CONVERTED, convertTimestamp(col(TIME_STAMP))) val recentTime = DateUtils.getSqlTimeStamp( endTime ) val groupedData = itemDetailsWithTimestamp.groupBy( ITEM_ID, ITEM_ENTRY_ID ) .agg( datediff( lit( recentTime ), max( TIME_STAMP_CONVERTED ) ) as DAY_DIFFERENCE, count( ITEM_ENTRY_ID ) as FREQUENCY ) val toMap = udf { (itemType: String, count: Int) => Map( itemType -> count ) } val tempResult = groupedData.withColumn( FREQUENT_DAYS, toMap( col( ITEM_ENTRY_ID ), col( DAY_DIFFERENCE ) ) ) .withColumn( FREQUENCY_COUNT, toMap( col( ITEM_ENTRY_ID ), col( FREQUENCY ) ) ) .drop( ITEM_ENTRY_ID ) .drop( DAY_DIFFERENCE ) .drop( FREQUENCY ) val result = tempResult.groupBy( ITEM_ID ) .agg( CombineMaps( col( FREQUENT_DAYS ) ) as FREQUENT_DAYS, CombineMaps( col( FREQUENCY_COUNT ) ) as FREQUENCY_COUNT ) .persist( DISK_ONLY ) log.info( "Aggregation is completed." ) val totalItems = result.count( ) log.info( "Total Items = " + totalItems )
And in the Resource manager I am seeing :
Memory Used = 5.52 TB Memory Total = 5.52 TB Memory Reserved = 113.13 GB VCores Used = 51 VCores Total = 51 VCores Reserved = 1 And Application Queues shows : Used (over capacity) Used Capacity: 101.2% Configured Capacity: 100.0%
Can some one tell me am I mis configured anything here ? My job is stuck at stage 0 itself.
I tried to test with reducing the data. It works fine then, but I used the original data I keep getting :
org.apache.spark.SparkException: Job aborted due to stage failure: Task 204170 in stage 16.0 failed 4 times, most recent failure: Lost task 204170.4 in stage 16.0 (TID 1278745, ip-172-31-12-41.ec2.internal): ExecutorLostFailure (executor 520 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 626834 ms Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1873) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1886) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1899) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1913) at org.apache.spark.rdd.RDD.count(RDD.scala:1134) ... 242 elided
I am also seeing :
Dropping SparkListenerEvent because no remaining room in event queue. This likely means one of the SparkListeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler.
1 Answers
Answers 1
I believe those are stages of the job and not the actual persist/count happening twice. Stages are group of parallel tasks which can happen at the same time without incurring a shuffle. I see 2 groupBy s in your code which requires shuffle hence the 2 stages. Does that help?
0 comments:
Post a Comment