I'm using a spark streaming job that uses mapWithState with an initial RDD. When restarting the application and recovering from the checkpoint it fails with the error:
This RDD lacks a SparkContext. It could happen in the following cases:
- RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
- When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758
This behavior is described in https://issues.apache.org/jira/browse/SPARK-13758 but it isn't really described how to solve it. My RDD isn't defined by the streaming job but I still need it in the state.
This is an example of what my graph looks like:
class EventStreamingApplication { private val config: Config = ConfigFactory.load() private val sc: SparkContext = { val conf = new SparkConf() .setAppName(config.getString("streaming.appName")) .set("spark.cassandra.connection.host", config.getString("streaming.cassandra.host")) val sparkContext = new SparkContext(conf) System.setProperty("com.amazonaws.services.s3.enableV4", "true") sparkContext.hadoopConfiguration.set("com.amazonaws.services.s3.enableV4", "true") sparkContext } def run(): Unit = { // streaming.eventCheckpointDir is an S3 Bucket val ssc: StreamingContext = StreamingContext.getOrCreate(config.getString("streaming.eventCheckpointDir"), createStreamingContext) ssc.start() ssc.awaitTermination() } def receiver(ssc: StreamingContext): DStream[Event] = { RabbitMQUtils.createStream(ssc, Map( "hosts" -> config.getString("streaming.rabbitmq.host"), "virtualHost" -> config.getString("streaming.rabbitmq.virtualHost"), "userName" -> config.getString("streaming.rabbitmq.user"), "password" -> config.getString("streaming.rabbitmq.password"), "exchangeName" -> config.getString("streaming.rabbitmq.eventExchange"), "exchangeType" -> config.getString("streaming.rabbitmq.eventExchangeType"), "queueName" -> config.getString("streaming.rabbitmq.eventQueue") )).flatMap(EventParser.apply) } def setupStreams(ssc: StreamingContext): Unit = { val events = receiver(ssc) ExampleJob(events, sc) } private def createStreamingContext(): StreamingContext = { val ssc = new StreamingContext(sc, Seconds(config.getInt("streaming.batchSeconds"))) setupStreams(ssc) ssc.checkpoint(config.getString("streaming.eventCheckpointDir")) ssc } } case class Aggregation(value: Long) // Contains aggregation values object ExampleJob { def apply(events: DStream[Event], sc: SparkContext): Unit = { val aggregations: RDD[(String, Aggregation)] = sc.cassandraTable('...', '...').map(...) // some domain class mapping val state = StateSpec .function((key, value, state) => { val oldValue = state.getOption().map(_.value).getOrElse(0) val newValue = oldValue + value.getOrElse(0) state.update(Aggregation(newValue)) state.get }) .initialState(aggregations) .numPartitions(1) .timeout(Seconds(86400)) events .filter(...) // filter out unnecessary events .map(...) // domain class mapping to key, event dstream .groupByKey() .map(i => (i._1, i._2.size.toLong)) .mapWithState(state) .stateSnapshots() .foreachRDD(rdd => { rdd.saveToCassandra(...) }) } }
The stacktrace thrown is:
Exception in thread "main" org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen in the following cases: (1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. (2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758. at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:89) at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) at org.apache.spark.rdd.PairRDDFunctions.partitionBy(PairRDDFunctions.scala:534) at org.apache.spark.streaming.rdd.MapWithStateRDD$.createFromPairRDD(MapWithStateRDD.scala:193) at org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:146) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) at scala.Option.orElse(Option.scala:289) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) at org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:134) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:333) at scala.Option.orElse(Option.scala:289) ... <991 lines omitted> ... at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:330) at org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:134) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:341) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:340) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:335) at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:577) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:571) at com.example.spark.EventStreamingApplication.run(EventStreamingApplication.scala:31) at com.example.spark.EventStreamingApplication$.main(EventStreamingApplication.scala:63) at com.example.spark.EventStreamingApplication.main(EventStreamingApplication.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:743) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
1 Answers
Answers 1
It seems that while spark is trying to recover, correct latest checkpoint file is not being picked. Because of this incorrect RDDs are being referred.
It seems that spark version 2.1.1 is impacted as this is not in fixed version list.
Please refer below link for apache documentation where fix release is not specified yet.
https://issues.apache.org/jira/browse/SPARK-19280
In my opinion, you can try to explore the automatic/manual solution where you can specify the latest checkpoint file while restarting the spark job.
I know that it is not much helpful but I thought it is better to explain you the root cause for this problem and current development to fix it and my opinion on possible solution.
0 comments:
Post a Comment