Tuesday, July 18, 2017

Spark from_json - StructType and ArrayType

Leave a Comment

I have a data set that comes in as XML, and one of the nodes contains JSON. Spark is reading this in as a StringType, so I am trying to use from_json() to convert the JSON to a DataFrame.

I am able to convert a string of JSON, but how do I write the schema to work with an Array?

String without Array - Working nicely

import org.apache.spark.sql.functions._  val schemaExample = new StructType()           .add("FirstName", StringType)           .add("Surname", StringType)  val dfExample = spark.sql("""select "{ \"FirstName\":\"Johnny\", \"Surname\":\"Boy\" }" as theJson""")  val dfICanWorkWith = dfExample.select(from_json($"theJson", schemaExample))  dfICanWorkWith.collect()  // Results \\ res19: Array[org.apache.spark.sql.Row] = Array([[Johnny,Boy]]) 

String with an Array - Can't figure this one out

import org.apache.spark.sql.functions._  val schemaExample2 = new StructType()                               .add("", ArrayType(new StructType()                                                           .add("FirstName", StringType)                                                           .add("Surname", StringType)                                                 )                                   )  val dfExample2= spark.sql("""select "[{ \"FirstName\":\"Johnny\", \"Surname\":\"Boy\" }, { \"FirstName\":\"Franky\", \"Surname\":\"Man\" }" as theJson""")  val dfICanWorkWith = dfExample2.select(from_json($"theJson", schemaExample2))  dfICanWorkWith.collect()  // Result \\ res22: Array[org.apache.spark.sql.Row] = Array([null]) 

1 Answers

Answers 1

The problem is that you don't have a fully qualified json. Your json is missing a couple of things:

  • First you are missing the surrounding {} in which the json is done
  • Second you are missing the variable value (you set it as "" but did not add it)
  • Lastly you are missing the closing ]

Try replacing it with:

val dfExample2= spark.sql("""select "{\"\":[{ \"FirstName\":\"Johnny\", \"Surname\":\"Boy\" }, { \"FirstName\":\"Franky\", \"Surname\":\"Man\" }]}" as theJson""") 

and you will get:

scala> dfICanWorkWith.collect() res12: Array[org.apache.spark.sql.Row] = Array([[WrappedArray([Johnny,Boy], [Franky,Man])]]) 
If You Enjoyed This, Take 5 Seconds To Share It

0 comments:

Post a Comment