Tuesday, August 14, 2018

Dynamic Mapping Statement in Spark Scala

Leave a Comment

In my code I change a DF into an RDD to run a function through a map call. With each run there is a couple seconds of overhead so if I run the function call like this:

var iterator = 1  val inputDF = spark.sql("select * from DF")  var columnPosition = Array("Column_4") columnPosition = columnPosition ++ Array("Column_9") var selectedDF = inputDF var intrimDF = inputDF var finalDF = inputDF  val inputDF = spark.sql("select * from DF") while(iterator <= columnPosition.length) {   selectedDF=finalDF.selectExpr("foreign_key",columnPosition(iterator))   intrimDF = selectedDF.rdd.map(x => (x(0),action(x(1)))).toDF.selectExpr("_1 as some_key","_2 as " columnPosition(iterator)).joinBack.changeColumnPositions.dropOriginalColumn.renameColumns    finalDF=intrimDF   iterator = iterator + 1 } 

It runs in ~90 seconds for a large job, because of the join. What I am trying to do is build it like below to cut out the join entirely and have be dynamic.

val inputDF = spark.sql("select * from DF") val intrimDF = inputDF.rdd.map(x=>(x(0),x(1),x(2),action(x(3)),x(4),x(5),x(6),x(7),action(x(8)))) val columnStatement//Create an Array with columnName changes val finalDF = intrimDF.selectExpr(columnStatement :_*) 

The issue is I cant get past the hard coding side the problem, below is an example of what I want to try to do by dynamically setting the mapping call.

val mappingStatement = "x=>(x(0),x(1),x(2),action(x(3)),x(4),x(5),x(6),x(7),action(x(8)))" val intrimDF = inputDF.rdd.map(mappingStatement) 

Everything I have tried failed: 1 Calling using the Map() function 2 Setting an Array and passing it as :_* 3 Trying to build it as an calling, but it doesnt like being dyanmic

Hope this makes sense!

0 Answers

If You Enjoyed This, Take 5 Seconds To Share It

0 comments:

Post a Comment