In my code I change a DF into an RDD to run a function through a map call. With each run there is a couple seconds of overhead so if I run the function call like this:
var iterator = 1 val inputDF = spark.sql("select * from DF") var columnPosition = Array("Column_4") columnPosition = columnPosition ++ Array("Column_9") var selectedDF = inputDF var intrimDF = inputDF var finalDF = inputDF val inputDF = spark.sql("select * from DF") while(iterator <= columnPosition.length) { selectedDF=finalDF.selectExpr("foreign_key",columnPosition(iterator)) intrimDF = selectedDF.rdd.map(x => (x(0),action(x(1)))).toDF.selectExpr("_1 as some_key","_2 as " columnPosition(iterator)).joinBack.changeColumnPositions.dropOriginalColumn.renameColumns finalDF=intrimDF iterator = iterator + 1 }
It runs in ~90 seconds for a large job, because of the join. What I am trying to do is build it like below to cut out the join entirely and have be dynamic.
val inputDF = spark.sql("select * from DF") val intrimDF = inputDF.rdd.map(x=>(x(0),x(1),x(2),action(x(3)),x(4),x(5),x(6),x(7),action(x(8)))) val columnStatement//Create an Array with columnName changes val finalDF = intrimDF.selectExpr(columnStatement :_*)
The issue is I cant get past the hard coding side the problem, below is an example of what I want to try to do by dynamically setting the mapping call.
val mappingStatement = "x=>(x(0),x(1),x(2),action(x(3)),x(4),x(5),x(6),x(7),action(x(8)))" val intrimDF = inputDF.rdd.map(mappingStatement)
Everything I have tried failed: 1 Calling using the Map() function 2 Setting an Array and passing it as :_* 3 Trying to build it as an calling, but it doesnt like being dyanmic
Hope this makes sense!
0 comments:
Post a Comment