Wednesday, May 16, 2018

How can I get Apache Spark job progress without knowing the job id?

Leave a Comment

I have the following as an example:

val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc)  val df = sqlContext.read.json("examples/src/main/resources/people.json")  df.count 

I'm aware that I can monitor the jobs using a SparkListener using the spark context; however, that gives me events on all jobs (which I can't use as I don't know the job id).

How can I get the progress of the "count" action only?

1 Answers

Answers 1

As already suggested in the comments, one could use REST API of the Spark UI to collect the required numbers.

The main problem is to identify the stages you are interessted in. There is no 1:1 mapping from code to the stages. A single count for example will trigger two stages (one stage for counting the elements in each partition of the dataframe and a second stage for summing up the results of the first stage). Stages get usually the name of the action that triggered their execution, although this may be changed within the code.

One can create a method that queries the REST API for all stages with a certain name and then add up the number of all tasks for these stages as well as the number of completed tasks. Assuming that all tasks will approximately take a similar execution time (this assumption is false if the dataset has a skew partition) one can use the share of completed tasks as a measurement of the job progress.

def countTasks(sparkUiUrl: String, stageName: String): (Int, Int) = {   import scala.util.parsing.json._   import scala.collection.mutable.ListBuffer   def get(url: String) = scala.io.Source.fromURL(url).mkString    //get the ids of all running applications and collect them in a ListBuffer   val applications = JSON.parseFull(get(sparkUiUrl + "/api/v1/applications?staus=running"))   val apps: ListBuffer[String] = new scala.collection.mutable.ListBuffer[String]   applications match {     case Some(l: List[Map[String, String]]) => l.foreach(apps += _ ("id"))     case other => println("Unknown data structure while reading applications: " + other)   }    var countTasks: Int = 0;   var countCompletedTasks: Int = 0;    //get the stages for each application and sum up the number of tasks for each stage with the requested name   apps.foreach(app => {     val stages = JSON.parseFull(get(sparkUiUrl + "/api/v1/applications/" + app + "/stages"))     stages match {       case Some(l: List[Map[String, Any]]) => l.foreach(m => {         if (m("name") == stageName) {           countTasks += m("numTasks").asInstanceOf[Double].toInt           countCompletedTasks += m("numCompleteTasks").asInstanceOf[Double].toInt         }       })       case other => println("Unknown data structure while reading stages: " + other)     }   })    //println(countCompletedTasks + " of " + countTasks + " tasks completed")   (countTasks, countCompletedTasks) } 

Calling this function for the given count example

println(countTasks("http://localhost:4040", "show at CountExample.scala:16")) 

will print out two numbers: the first one will be number of all tasks and the second one will be the number of finished tasks.

I have tested this code with Spark 2.3.0. Before using it in a production environment, it will surely need some additional polishing, especially some more sophisticated error checking. The statistic could be improved by not only counting the finished tasks but also the failed ones.

If You Enjoyed This, Take 5 Seconds To Share It

0 comments:

Post a Comment