Tuesday, September 4, 2018

Calling scala code in pyspark for XSLT transformations

Leave a Comment

This might be a long shot, but figured it couldn't hurt to ask. I'm attempting to use Elsevier's open-sourced spark-xml-utils package in pyspark to transform some XML records with XSLT.

I've had a bit of success with some exploratory code getting a transformation to work:

# open XSLT processor from spark's jvm context with open('/tmp/foo.xsl', 'r') as f:     proc = sc._jvm.com.elsevier.spark_xml_utils.xslt.XSLTProcessor.getInstance(f.read())   # transform XML record with 'proc'  with open('/tmp/bar.xml','r') as f:     transformed = proc.transform(f.read()) 

However, in a more realistic situation, I was unable to drop the proc.transform into a lambda map function, getting errors similar to:

"An error occurred while calling o55.getstate. Trace: py4j.Py4JException: Method getstate([]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:272) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748)"

When I got the small example to work on a single record, that was operating in a pyspark shell, which I'm assuming was using the spark driver. But in the map function mentioned above, this was in Spark, via Livy and YARN, which introduces workers. This SO question/answer suggests that perhaps I cannot use the function from the jvm in that context.

Now, the spark-xml-utils library provides some examples in scala, doing precisely what I'd like to do:

import com.elsevier.spark_xml_utils.xslt.XSLTProcessor  val xmlKeyPair = sc.sequenceFile[String, String]("s3n://spark-xml-utils/xml/part*")  val stylesheet = sc.textFile("s3n://spark-xml-utils/stylesheets/srctitle.xsl").collect.head  val srctitles = xmlKeyPair.mapPartitions(recsIter => {      val proc = XSLTProcessor.getInstance(stylesheet)     recsIter.map(rec => proc.transform(rec._2)) }) 

I'm wondering, how can I translate this to pyspark code, such that I could run it over an RDD? Ideally, on an RDD with the following input and output format:

id | document | other | columns ----------------------------------------------------- sprog | <xml here...> | more | data baz   | <xml here...> | more | data 

that could become

id | document | other | columns ----------------------------------------------------- sprog | <*transformed* xml here...> | more | data baz   | <*transformed* xml here...> | more | data 

Any help or suggestions would be most appreciated.

Update 8/28/2018: Also tried running through mapPartitions, no dice. Same error of __getstate__()

0 Answers

If You Enjoyed This, Take 5 Seconds To Share It

0 comments:

Post a Comment