Showing posts with label apache-spark-sql. Show all posts
Showing posts with label apache-spark-sql. Show all posts

Thursday, September 6, 2018

Limiting maximum size of dataframe partition

Leave a Comment

When I write out a dataframe to, say, csv, a .csv file is created for each partition. Suppose I want to limit the max size of each file to, say, 1 MB. I could do the write multiple times and increase the argument to repartition each time. Is there a way I can calculate ahead of time what argument to use for repartition to ensure the max size of each file is less than some specified size.

I imagine there might be pathological cases where all the data ends up on one partition. So make the weaker assumption that we only want to ensure that the average file size is less than some specified amount, say 1 MB.

2 Answers

Answers 1

1. Single dataframe solution

I was trying to find out some clever idea that would not kill the cluster at the same time and the only thing that came to my mind was:

  1. Calculate the size of the serialized row
  2. Get no. of rows in your DataFrame
  3. Repartition, by dividing with the expected size
  4. Should work?

The code should look more like this:

val df: DataFrame = ??? // your df val rowSize = getBytes(df.head) val rowCount = df.count() val partitionSize = 1000000 // million bytes in MB? val noPartitions: Int = (rowSize * rowCount / partitionSize).toInt df.repartition(noPartitions).write.format(...) // save to csv  // just helper function from https://stackoverflow.com/a/39371571/1549135 def getBytes(value: Any): Long = {   val stream: ByteArrayOutputStream = new ByteArrayOutputStream()   val oos = new ObjectOutputStream(stream)   oos.writeObject(value)   oos.close   stream.toByteArray.length } 

While my first choice was to calculate each row's byte size, that would be terribly inefficient. So, unless your data size in each row differs in size greatly, I would say that this solution will work. You can also calculate every n-th row size. You got the idea.

Also, I just 'hope' that Long will be big enough to support the expected size to calculate noPartitions. If not (if you have a lot of rows), maybe it would be better to change the operations order, f.e.:

val noPartitions: Int = (rowSize / partitionSize * rowCount).toInt 

again this is just a drafted idea with no domain knowledge about your data.

2. Cross system solution

While going through the apache-spark docs I have found an interesting cross-system solution:

spark.sql.files.maxPartitionBytes which sets:

The maximum number of bytes to pack into a single partition when reading files.

The default value is 134217728 (128 MB).

So I suppose you could set it to 1000000 (1MB) and it will have a permanent effect on your DataFrames. However, too small partition size may greatly impact your performance!

You can set it up, during SparkSession creation:

val spark = SparkSession   .builder()   .appName("Spark SQL basic example")   .config("spark.sql.files.maxPartitionBytes", 100000)   .getOrCreate() 

All of above is only valid if (I remember correctly and) the csv is partitioned with the same number of files as there are partitions of DataFrame.

Answers 2

    val df = spark.range(10000000)     df.cache          val catalyst_plan = df.queryExecution.logical     val df_size_in_bytes = spark.sessionState.executePlan(catalyst_plan).optimizedPlan.stats.sizeInBytes 

df_size_in_bytes: BigInt = 80000000

The best solution would be take 100 records and estimate the size and apply for all the rows as the above example

Read More

Tuesday, August 14, 2018

Implementing MERGE INTO sql in pyspark

Leave a Comment

How can problemmatically (pyspark) sql MERGE INTO statement can be achieved. I have two tables which I have table into temporary view using createOrReplaceTempView option. Then I tried using MERGE INTO statement on those two temporary views. But it is failing. The reason can be MERGE is not supported in SPARK SQL. Can some one give an hint how could a simple MERGE INTO SQL equivalent statement (something like below) can be implemented programmatically in pyspark.

MERGE INTO events USING updates ON events.eventId = updates.eventId WHEN MATCHED THEN   UPDATE SET     events.data = updates.data WHEN NOT MATCHED   THEN INSERT (date, eventId, data) VALUES (date, eventId, data) 

1 Answers

Answers 1

Merge does not support directly but we if we are ok to overwrite the complete table then you can follow the approach.

hiveContext.sql("select * from events").registerTempTable("temp_events") hiveContext.sql("select * from updates").registerTempTable("temp_updates")  hiveContext(""" select case when b.eventId is null then a.date else b.date as date, case when b.eventId is null then a.eventId else b.eventId end as eventId, case when b.eventId is null then a.data else b.data as data from temp_events a full outer join temp_updates b on a.eventId=b.eventId """).registerTempTable("FinalData")  hiveContext.sql("INSERT OVERWRITE TABLE table_name select * from FinalData") 

Using the case, we are making sure if the data is available in new set then we are taking those values else we will be taking the older values.

Please check if this solution works for you.

Thanks, Manu

Read More

Thursday, July 19, 2018

Sparksql query an array<string> contains a string,use predicate pushdown

Leave a Comment

I'm trying to query an array in ElasticSearch

data: "names":[{"name":"allen"},{"name":"bill"},{"name":"dave"},{"name":"poter"}] goal: "select names from table where array_contains(names.name, "bill")" 

but spark won't do predicate pushdown if SQL statement use array_contains function.
hint: names.name = ["allen","bill","dave","poter"]
I've tried

select * from table where array_contains(names.name,"bill")  -- and   select explode(names.name) as name from table as t1;select * from t1 where name = "bill"  -- and   select * from table where cast(names.name as string) like '%bill%' 

All failed to do pushdown, any other ways to do it?

1 Answers

Answers 1

The failure to pushdown is expected. For predicate to be delegated you need a Data Source support, and ElasticSearch connector doesn't list array_contains among pushed operations, which as today include:

  • =, => , <, >= , <=
  • is_null / is_not_null
  • in
  • String[Starts|Ends]With, StringContains
  • NULL safe equality.
  • Application of Boolean operators AND / OR / NOT.

Also any additional transformations (including CAST) disable predicate pushdown.

Read More

Sunday, July 1, 2018

Spark Streaming Exception: java.util.NoSuchElementException: None.get

Leave a Comment

I am writing SparkStreaming data to HDFS by converting it to a dataframe:

Code

object KafkaSparkHdfs {    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkKafka")   sparkConf.set("spark.driver.allowMultipleContexts", "true");   val sc = new SparkContext(sparkConf)    def main(args: Array[String]): Unit = {     val sqlContext = new org.apache.spark.sql.SQLContext(sc)     import sqlContext.implicits._      val ssc = new StreamingContext(sparkConf, Seconds(20))      val kafkaParams = Map[String, Object](       "bootstrap.servers" -> "localhost:9092",       "key.deserializer" -> classOf[StringDeserializer],       "value.deserializer" -> classOf[StringDeserializer],       "group.id" -> "stream3",       "auto.offset.reset" -> "latest",       "enable.auto.commit" -> (false: java.lang.Boolean)     )      val topics = Array("fridaydata")     val stream = KafkaUtils.createDirectStream[String, String](       ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)     )      val lines = stream.map(consumerRecord => consumerRecord.value)     val words = lines.flatMap(_.split(" "))     val wordMap = words.map(word => (word, 1))     val wordCount = wordMap.reduceByKey(_ + _)      wordCount.foreachRDD(rdd => {       val dataframe = rdd.toDF();        dataframe.write         .mode(SaveMode.Append)         .save("hdfs://localhost:9000/newfile24")          })      ssc.start()     ssc.awaitTermination()   } } 

The folder is created but the file is not written.

The program is getting terminated with the following error:

    18/06/22 16:14:41 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)     java.util.NoSuchElementException: None.get     at scala.None$.get(Option.scala:347)     at scala.None$.get(Option.scala:345)     at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)     at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)     at java.lang.Thread.run(Thread.java:748)     18/06/22 16:14:41 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.util.NoSuchElementException: None.get     at scala.None$.get(Option.scala:347)     at scala.None$.get(Option.scala:345)     at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)     at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 

In my pom I am using respective dependencies:

  • spark-core_2.11
  • spark-sql_2.11
  • spark-streaming_2.11
  • spark-streaming-kafka-0-10_2.11

2 Answers

Answers 1

The error is due to trying to run multiple spark contexts at teh same time. Setting allowMultipleContexts to true is mostly used for testing purposes and it's use is discouraged. The solution to your problem is therefore to make sure that the same SparkContext is used everywhere. From the code we can see that the SparkContext (sc) is used to create a SQLContext which is fine. However, when creating the StreamingContext it is not used, instead the SparkConf is used.

By looking at the documentation we see:

Create a StreamingContext by providing the configuration necessary for a new SparkContext

In other words, by using SparkConf as parameter a new SparkContext will be created. Now there are two separate contexts.

The easiest solution here would be to continue using the same context as before. Change the line creating the StreamingContext to:

val ssc = new StreamingContext(sc, Seconds(20)) 

Note: In newer versions of Spark (2.0+) use SparkSession instead. A new streaming context can then be created using StreamingContext(spark.sparkContext, ...). It can look as follows:

val spark = SparkSession().builder   .setMaster("local[*]")   .setAppName("SparkKafka")   .getOrCreate()  import sqlContext.implicits._ val ssc = new StreamingContext(spark.sparkContext, Seconds(20)) 

Answers 2

There is an obvious problem here - coalesce(1).

dataframe.coalesce(1) 

While reducing number of files might be tempting in many scenarios, it should be done if and only if it amount of data is low enough for nodes to handle (clearly it isn't here).

Furthermore, let me quote the documentation:

However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can call repartition. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).

The conclusion is you should adjust the parameter accordingly to the expected amount of data and desired parallelism. coalesce(1) as such is rarely useful in practice, especially in a context like streaming, where data properties can differ over time.

Read More

Friday, May 4, 2018

PySpark.sql.filter not performing as it should

Leave a Comment

I am running into the problem when executing below codes:

from pyspark.sql import functions as F from pyspark.sql import Row, HiveContext  hc = HiveContext() rows1 = [Row(id1 = '2', id2 = '1', id3 = 'a'),          Row(id1 = '3', id2 = '2', id3 = 'a'),          Row(id1 = '4', id2 = '3', id3 = 'b')] df1 = hc.createDataFrame(rows1) df2 = df1.filter(F.col("id3")=="a") df3 = df1.join(df2, df1.id2 == df2.id1, "inner") 

When I run above code, df3 is an empty DataFrame. However: If I change the code to below, it is giving the correct result (DataFrame of 2 rows):

from pyspark.sql import functions as F from pyspark.sql import Row, HiveContext  hc = HiveContext() rows1 = [Row(id1 = '2', id2 = '1', id3 = 'a'),          Row(id1 = '3', id2 = '2', id3 = 'a'),          Row(id1 = '4', id2 = '3', id3 = 'b')] df1 = hc.createDataFrame(rows1) rows2 = [Row(id1 = '2', id2 = '1', id3 = 'a'),          Row(id1 = '3', id2 = '2', id3 = 'a'),          Row(id1 = '4', id2 = '3', id3 = 'b')] df1_temp = hc.createDataFrame(rows2) df2 = df1_temp.filter(F.col("id3")=="a") df3 = df1.join(df2, df1.id2 == df2.id1, "inner") 

So My question is: why do I have to create a temp dataframe here? Also, if I can't get the HiveContext in my part of the project, how can I make a duplicate dataframe on top of the existing dataframe?

2 Answers

Answers 1

I believe that the problem you've hit here is an instance of a more general issue where certain types of DataFrame self-joins (including joins of a DataFrame against filtered copies of itself) can result in the generation of ambiguous or incorrect query plans.

There are several Spark JIRAs related to this; here are some notable ones:

There are other JIRA tickets dealing with different manifestations / aspects of these problems. Those tickets are discoverable by following chains of JIRA "relates to" links starting from the tickets listed above.

This ambiguity only crops up when referencing columns via the DataFrame instance (via subscripting, as in df["mycol"], or via field accesses, as in df.mycol). This ambiguity can be avoided by aliasing DataFrames and referring to columns via the aliases. For example, the following works correctly:

>>> from pyspark.sql import functions as F >>> df1 = hc.createDataFrame(rows1).alias("df1") >>> df2 = df1.filter(F.col("id3")=="a").alias("df2") >>> df3 = df1.join(df2, F.col("df1.id2") == F.col("df2.id1"), "inner") >>> df3.show() +---+---+---+---+---+---+ |id1|id2|id3|id1|id2|id3| +---+---+---+---+---+---+ |  4|  3|  b|  3|  2|  a| |  3|  2|  a|  2|  1|  a| +---+---+---+---+---+---+ 

Answers 2

I see the same behavior with this data set in Spark 2.0, but not always for the same operation. A slightly different data frame works fine.

df1 = spark.createDataFrame(     [(1, 2, 'a'), (2, 2, 'a'), (3, 4, 'b')], ['id1', 'id2', 'id3']     ) df1.show()  +---+---+---+ |id1|id2|id3| +---+---+---+ |  1|  2|  a| |  2|  2|  a| |  3|  4|  b| +---+---+---+  df2 = df1.filter(df1.id3 == 'a') df2.show()  +---+---+---+ |id1|id2|id3| +---+---+---+ |  1|  2|  a| |  2|  2|  a| +---+---+---+   df3 = df1.join(df2, df1.id2 == df2.id1, 'inner') df3.show()  +---+---+---+---+---+---+ |id1|id2|id3|id1|id2|id3| +---+---+---+---+---+---+ |  2|  2|  a|  1|  2|  a| |  2|  2|  a|  2|  2|  a| +---+---+---+---+---+---+ 

There must be a bug? I have not tried later versions of spark though. You may want to report this as a bug.

Read More

Thursday, April 12, 2018

Spark dataframe groupby mean and median does not complete

Leave a Comment

I am using Spark sql dataframes to perform a groupby operation and then compute the mean and median of data for each group. The original amount of data is about 1 terabyte.

val df_result = df.filter($"DayOfWeek" <= 5).groupBy("id").agg(         count("Error").as("Count"),          avg("Error").as("MeanError"),          callUDF("percentile_approx", col("Error"), lit(0.05)).as("5thError"),          callUDF("percentile_approx", col("Error"), lit(0.5)).as("MedianError"),          callUDF("percentile_approx", col("Error"), lit(0.95)).as("95thError")).     filter($"Count" > 1000)   df_result.orderBy(asc("MeanError")).limit(5000)     .write.format("csv").option("header", "true").save("/user/foo.bar/result.csv") 

When I run that query, my job gets stuck and does not complete. How do I go about debugging the problem? Is there a key imbalance that causes the groupby() to get stuck?

1 Answers

Answers 1

There are lots of sensible suggestions already in the comments, but for what it's worth here are my thoughts:

1) Does df.count work? If not, your problem is before the code you've posted (as suggested in comments)

2) Look in the Spark UI (as suggested in comments) - do most tasks complete quickly with a few taking a long while/appearing stuck? If so, skew is likely to be your problem

3) You could potentially rewrite your query to first only find the 'count' per 'id'. Next filter your original df to contain only rows where the id appears more than 1000 times through a broadcasted (to avoid shuffle of df) inner join (if there aren't too many ids with more than 1000 occurrences). Then aggregate this smaller dataframe and calculate all your statistics. If the count aggregation works, the output should also show if there's any significant data skew!

4) Sometimes breaking the computation up into even smaller steps and writing and then immediately reading from disk has helped me get awkward jobs to complete in the past. Also can make debugging quicker if generating df is costly in the first instance.

5) Definitely worth upping spark.sql.shuffle.partitions (as suggested in comments); 2001 is a magic number in spark (What should be the optimal value for spark.sql.shuffle.partitions or how do we increase partitions when using Spark SQL?)

6) I would also try varying the amount of data, does it work if you use only use day of week = 1 (as suggested in comments)

7) Does the query run without the percentile_approx?

Read More

Wednesday, February 14, 2018

How to change the attributes order in Apache SparkSQL `Project` operator?

Leave a Comment

This is a Catalyst specific problem

See below my queryExecution.optimizedPlan before apply my Rule.

01 Project [x#9, p#10, q#11, if (isnull(q#11)) null else UDF(q#11) AS udfB_10#28, if (isnull(p#10)) null else UDF(p#10) AS udfA_99#93] 02 +- InMemoryRelation [x#9, p#10, q#11], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) 03    :  +- *SerializeFromObject [assertnotnull(input[0, eic.R0, true], top level non-flat input object).x AS x#9, unwrapoption(IntegerType, assertnotnull(input[0, eic.R0, true], top level non-flat input object).p) AS p#10, unwrapoption(IntegerType, assertnotnull(input[0, eic.R0, true], top level non-flat input object).q) AS q#11] 04    :     +- *MapElements <function1>, obj#8: eic.R0 05    :        +- *DeserializeToObject newInstance(class java.lang.Long), obj#7: java.lang.Long 05    :           +- *Range (0, 3, step=1, splits=Some(2)) 

In line 01 I need swap the position of udfA and udfB this way:

01 Project [x#9, p#10, q#11, if (isnull(p#10)) null else UDF(p#10) AS udfA_99#93, if (isnull(q#11)) null else UDF(q#11) AS udfB_10#28] 

when I try to change the order of the attributes in a Projection operation in SparkSQL via Catalyst optimization the result of the query is modified to an invalid value. Maybe I'm not doing everything is needed. I'm just changing the order of NamedExpression objects in fields parameter:

object ReorderColumnsOnProjectOptimizationRule extends Rule[LogicalPlan] {    def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {      case Project(fields: Seq[NamedExpression], child) =>        if (checkCondition(fields)) Project(newFieldsObject(fields), child) else Project(fields, child)      case _ => plan    }    private def newFieldsObject(fields: Seq[NamedExpression]): Seq[NamedExpression] = {     // compare UDFs computation cost and return the new NamedExpression list     . . .   }    private def checkCondition(fields: Seq[NamedExpression]): Boolean = {     // compare UDFs computation cost and return Boolean for decision off change order on field list.     . . .    }   . . . } 

Note: I'm adding my Rule on extraOptimizations SparkSQL object:

spark.experimental.extraOptimizations = Seq(ReorderColumnsOnProjectOptimizationRule) 

Any suggestions will be of great help.

EDIT 1

By the way, I created a notebook on Databricks for testing purposes. See this link for more detail

Commenting on line 60 the optimization is invoked and an error occurs.

. . . 58     // Do UDF with less cost before, so I need change the fields order 59     myPriorityList.size == 2 && myPriorityList(0) > myPriorityList(1) 60     false 61   } 

What did I miss ?

EDIT 2

Consider the following piece of code from compiler optimisation, which is almost analogous :

if ( really_slow_test(with,plenty,of,parameters)      && slower_test(with,some,parameters)      && fast_test // with no parameters    )  {   ...then code...  } 

This code first evaluates an expensive function then, on success, proceeds to evaluate the remainder of the expression. But even if the first test fails and the evaluation is short-cut, there’s a significant performance penalty because the fat really_slow_test(...) is always evaluated. While retaining program correctness, one can rearrange the expression as follows:

if ( fast_test      && slower_test(with,some,parameters)      && (really_slow_test(with,plenty,of,parameters))  {   ...then code...  } 

My goal is to run the fastest UDFs first

2 Answers

Answers 1

As stefanobaghino said the schema of the analyzer is cached after the analysis and the optimizer shouldn't change it.

If you use Spark 2.2 you can take advantage of SPARK-18127 and apply the rule in Analyzer.

If you run this dummy app

package panos.bletsos  import org.apache.spark.sql.catalyst.expressions.NamedExpression import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.SparkSessionExtensions   case class ReorderColumnsOnProjectOptimizationRule(spark: SparkSession) extends Rule[LogicalPlan] {   def apply(plan: LogicalPlan): LogicalPlan = plan transformDown  {     case p: Project => {       val fields = p.projectList       if (checkConditions(fields, p.child)) {         val modifiedFieldsObject = optimizePlan(fields, p.child, plan)         val projectUpdated = p.copy(modifiedFieldsObject, p.child)         projectUpdated       } else {         p       }     }   }    private def checkConditions(fields: Seq[NamedExpression], child: LogicalPlan): Boolean = {     // compare UDFs computation cost and return Boolean     val needsOptimization = listHaveTwoUDFsEnabledForOptimization(fields)     if (needsOptimization) println(fields.mkString(" | "))     needsOptimization   }    private def listHaveTwoUDFsEnabledForOptimization(fields: Seq[NamedExpression]): Boolean = {     // a simple priority order based on UDF name suffix     val myPriorityList = fields.map((e) => {       if (e.name.toString().startsWith("udf")) {         Integer.parseInt(e.name.toString().split("_")(1))       } else {         0       }     }).filter(e => e > 0)      // Do UDF with less cost before, so I need change the fields order     myPriorityList.size == 2 && myPriorityList(0) > myPriorityList(1)   }    private def optimizePlan(fields: Seq[NamedExpression],     child: LogicalPlan,     plan: LogicalPlan): Seq[NamedExpression] = {     // change order on field list. Return LogicalPlan modified     val myListWithUDF = fields.filter((e) =>  e.name.toString().startsWith("udf"))     if (myListWithUDF.size != 2) {       throw new UnsupportedOperationException(         s"The size of UDF list have ${myListWithUDF.size} elements.")     }     val myModifiedList: Seq[NamedExpression] = Seq(myListWithUDF(1), myListWithUDF(0))     val myListWithoutUDF = fields.filter((e) =>  !e.name.toString().startsWith("udf"))     val modifiedFielsObject = getFieldsReordered(myListWithoutUDF, myModifiedList)     val msg = "•••• optimizePlan called : " + fields.size + " columns on Project.\n" +       "•••• fields: " + fields.mkString(" | ") + "\n" +       "•••• UDFs to reorder:\n" + myListWithUDF.mkString(" | ") + "\n" +       "•••• field list Without UDF: " + myListWithoutUDF.mkString(" | ") + "\n" +       "•••• modifiedFielsObject: " + modifiedFielsObject.mkString(" | ") + "\n"     modifiedFielsObject   }    private def getFieldsReordered(fieldsWithoutUDFs: Seq[NamedExpression],     fieldsWithUDFs: Seq[NamedExpression]): Seq[NamedExpression] = {     fieldsWithoutUDFs.union(fieldsWithUDFs)   } }  case class R0(x: Int,   p: Option[Int] = Some((new scala.util.Random).nextInt(999)),   q: Option[Int] = Some((new scala.util.Random).nextInt(999)) )  object App {   def main(args : Array[String]) {     type ExtensionsBuilder = SparkSessionExtensions => Unit     // inject the rule here     val f: ExtensionsBuilder = { e =>       e.injectResolutionRule(ReorderColumnsOnProjectOptimizationRule)     }      val spark = SparkSession       .builder()       .withExtensions(f)       .getOrCreate()      def createDsR0(spark: SparkSession): Dataset[R0] = {       import spark.implicits._       val ds = spark.range(3)       val xdsR0 = ds.map((i) => {         R0(i.intValue() + 1)       })       // IMPORTANT: The cache here is mandatory       xdsR0.cache()     }      val dsR0 = createDsR0(spark)     val udfA_99 = (p: Int) => Math.cos(p * p)  // higher cost Function     val udfB_10 = (q: Int) => q + 1            // lower cost Function      println("*** I' going to register my UDF ***")     spark.udf.register("myUdfA", udfA_99)     spark.udf.register("myUdfB", udfB_10)      val dsR1 = {       val ret1DS = dsR0.selectExpr("x", "p", "q", "myUdfA(p) as udfA_99")       val result = ret1DS.cache()       dsR0.show()       result.show()        result     }      val dsR2 = {       val ret2DS = dsR1.selectExpr("x", "p", "q", "udfA_99", "myUdfB(p) as udfB_10")       val result = ret2DS.cache()       dsR0.show()       dsR1.show()       result.show()        result     }   } } 

it will print

+---+---+---+-------+-------------------+ |  x|  p|  q|udfB_10|            udfA_99| +---+---+---+-------+-------------------+ |  1|392|746|    393|-0.7508388993643841| |  2|778|582|    779| 0.9310990915956336| |  3|661| 34|    662| 0.6523545972748773| +---+---+---+-------+-------------------+ 

Answers 2

I believe the answer to this question is the same as this one.

The summary is that the optimizer is not supposed to alter the schema of the output as it's cached after the analysis.

I'll quote the accepted answer here as it comes from Michael Armbrust, the lead developer of the Spark SQL project at Databricks:

As you guessed, this is failing to work because we make assumptions that the optimizer will not change the results of the query.

Specifically, we cache the schema that comes out of the analyzer (and assume the optimizer does not change it). When translating rows to the external format, we use this schema and thus are truncating the columns in the result. If you did more than truncate (i.e. changed datatypes) this might even crash.

As you can see in this notebook, it is in fact producing the result you would expect under the covers. We are planning to open up more hooks at some point in the near future that would let you modify the plan at other phases of query execution. See SPARK-18127 for more details.

Read More

Monday, December 25, 2017

Setting up a Spark SQL connection with Kerberos

Leave a Comment

I have a simple Java application that can connect and query my cluster using Hive or Impala using code like

import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement;  ...  Class.forName("com.cloudera.hive.jdbc41.HS2Driver"); Connection con = DriverManager.getConnection("jdbc:hive2://myHostIP:10000/mySchemaName;hive.execution.engine=spark;AuthMech=1;KrbRealm=myHostIP;KrbHostFQDN=myHostIP;KrbServiceName=hive"); Statement stmt = con.createStatement();  ResultSet rs = stmt.executeQuery("select * from foobar"); 

But now I want to try doing the same query but with Spark SQL. I'm having a hard time figuring out how to use the Spark SQL API though. Specifically how to setup the connection. I see examples of how to setup the Spark Session but it's unclear what values I need to provide for example

  SparkSession spark = SparkSession   .builder()   .appName("Java Spark SQL basic example")   .config("spark.some.config.option", "some-value")   .getOrCreate(); 

How do I tell Spark SQL what Host and Port to use, what Schema to use, and how do I tell Spark SQL which authentication technique I'm using? For example I'm using Kerberos to authenticate.

The above Spark SQL code is from https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java

UPDATE:

I was able to make a little progress and I think I figured out how to tell the Spark SQL connection what Host and Port to use.

...  SparkSession spark = SparkSession .builder() .master("spark://myHostIP:10000") .appName("Java Spark Hive Example") .enableHiveSupport() .getOrCreate(); 

And I added the following dependency in my pom.xml file

<dependency>    <groupId>org.apache.spark</groupId>    <artifactId>spark-hive_2.11</artifactId>    <version>2.0.0</version> </dependency> 

With this update I can see that the connection is getting further but it appears it's now failing because I'm not authenticated. I need to figure out how to authenticate using Kerberos. Here's the relevant log data

2017-12-19 11:17:55.717  INFO 11912 --- [o-auto-1-exec-1] org.apache.spark.util.Utils              : Successfully started service 'SparkUI' on port 4040. 2017-12-19 11:17:55.717  INFO 11912 --- [o-auto-1-exec-1] org.apache.spark.ui.SparkUI              : Bound SparkUI to 0.0.0.0, and started at http://myHostIP:4040 2017-12-19 11:17:56.065  INFO 11912 --- [er-threadpool-0] s.d.c.StandaloneAppClient$ClientEndpoint : Connecting to master spark://myHostIP:10000... 2017-12-19 11:17:56.260  INFO 11912 --- [pc-connection-0] o.a.s.n.client.TransportClientFactory    : Successfully created connection to myHostIP:10000 after 113 ms (0 ms spent in bootstraps) 2017-12-19 11:17:56.354  WARN 11912 --- [huffle-client-0] o.a.s.n.server.TransportChannelHandler   : Exception in connection from myHostIP:10000  java.io.IOException: An existing connection was forcibly closed by the remote host 

0 Answers

Read More

Tuesday, July 18, 2017

Spark from_json - StructType and ArrayType

Leave a Comment

I have a data set that comes in as XML, and one of the nodes contains JSON. Spark is reading this in as a StringType, so I am trying to use from_json() to convert the JSON to a DataFrame.

I am able to convert a string of JSON, but how do I write the schema to work with an Array?

String without Array - Working nicely

import org.apache.spark.sql.functions._  val schemaExample = new StructType()           .add("FirstName", StringType)           .add("Surname", StringType)  val dfExample = spark.sql("""select "{ \"FirstName\":\"Johnny\", \"Surname\":\"Boy\" }" as theJson""")  val dfICanWorkWith = dfExample.select(from_json($"theJson", schemaExample))  dfICanWorkWith.collect()  // Results \\ res19: Array[org.apache.spark.sql.Row] = Array([[Johnny,Boy]]) 

String with an Array - Can't figure this one out

import org.apache.spark.sql.functions._  val schemaExample2 = new StructType()                               .add("", ArrayType(new StructType()                                                           .add("FirstName", StringType)                                                           .add("Surname", StringType)                                                 )                                   )  val dfExample2= spark.sql("""select "[{ \"FirstName\":\"Johnny\", \"Surname\":\"Boy\" }, { \"FirstName\":\"Franky\", \"Surname\":\"Man\" }" as theJson""")  val dfICanWorkWith = dfExample2.select(from_json($"theJson", schemaExample2))  dfICanWorkWith.collect()  // Result \\ res22: Array[org.apache.spark.sql.Row] = Array([null]) 

1 Answers

Answers 1

The problem is that you don't have a fully qualified json. Your json is missing a couple of things:

  • First you are missing the surrounding {} in which the json is done
  • Second you are missing the variable value (you set it as "" but did not add it)
  • Lastly you are missing the closing ]

Try replacing it with:

val dfExample2= spark.sql("""select "{\"\":[{ \"FirstName\":\"Johnny\", \"Surname\":\"Boy\" }, { \"FirstName\":\"Franky\", \"Surname\":\"Man\" }]}" as theJson""") 

and you will get:

scala> dfICanWorkWith.collect() res12: Array[org.apache.spark.sql.Row] = Array([[WrappedArray([Johnny,Boy], [Franky,Man])]]) 
Read More

Wednesday, June 28, 2017

Hive Sql dynamically get null column counts from a table

Leave a Comment

I am using datastax + spark integration and spark SQL thrift server, which gives me a Hive SQL interface to query the tables in Cassandra.

The tables in my database get dynamically created, what I want to do is get a count of null values in each column for the table based on just the table name.

I can get the column names using describe database.table but in hive SQL, how do I use its output in another select query which counts null for all the columns.

Update 1: Traceback with Dudu's solution

Error running query: TExecuteStatementResp(status=TStatus(errorCode=0, errorMessage="org.apache.spark.sql.AnalysisException: Invalid usage of '*' in explode/json_tuple/UDTF;", sqlState=None, infoMessages=["org.apache.hive.service.cli.HiveSQLException:org.apache.spark.sql.AnalysisException: Invalid usage of '' in explode/json_tuple/UDTF;:16:15", 'org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation:org$apache$spark$sql$hive$thriftserver$SparkExecuteStatementOperation$$execute:SparkExecuteStatementOperation.scala:258', 'org.apache.spark.sql.hive.thriftserver.SparkExecuteStatementOperation:runInternal:SparkExecuteStatementOperation.scala:152', 'org.apache.hive.service.cli.operation.Operation:run:Operation.java:257', 'org.apache.hive.service.cli.session.HiveSessionImpl:executeStatementInternal:HiveSessionImpl.java:388', 'org.apache.hive.service.cli.session.HiveSessionImpl:executeStatement:HiveSessionImpl.java:369', 'org.apache.hive.service.cli.CLIService:executeStatement:CLIService.java:262', 'org.apache.hive.service.cli.thrift.ThriftCLIService:ExecuteStatement:ThriftCLIService.java:437', 'org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement:getResult:TCLIService.java:1313', 'org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement:getResult:TCLIService.java:1298', 'org.apache.thrift.ProcessFunction:process:ProcessFunction.java:39', 'org.apache.thrift.TBaseProcessor:process:TBaseProcessor.java:39', 'org.apache.hive.service.auth.TSetIpAddressProcessor:process:TSetIpAddressProcessor.java:56', 'org.apache.thrift.server.TThreadPoolServer$WorkerProcess:run:TThreadPoolServer.java:286', 'java.util.concurrent.ThreadPoolExecutor:runWorker:ThreadPoolExecutor.java:1142', 'java.util.concurrent.ThreadPoolExecutor$Worker:run:ThreadPoolExecutor.java:617', 'java.lang.Thread:run:Thread.java:745'], statusCode=3), operationHandle=None)

3 Answers

Answers 1

In the following solution there is no need to deal with each column separately. The result is a column index and the number of null values in that column.
You can later on join it by the column index to an information retrieved from the metastore.
One limitations is that strings containning the exact text null will be counted as nulls.

Demo

The CTE (mytable as defined by with mytable as) can obviously be replaced by as actual table

with        mytable as              (                 select  stack                         (                             5                             ,1   ,1.2     ,date '2017-06-21'     ,null                            ,2   ,2.3     ,null                  ,null                            ,3   ,null    ,null                  ,'hello'                            ,4   ,4.5     ,null                  ,'world'                            ,5   ,null    ,date '2017-07-22'     ,null                         ) as (id,amt,dt,txt)             )  select      pe.pos                                          as col_index            ,count(case when pe.val='null' then 1 end)       as nulls_count  from        mytable t lateral view posexplode (split(printf(concat('%s',repeat('\u0001%s',field(unhex(1),t.*,unhex(1))-2)),t.*),'\\x01')) pe  group by    pe.pos        ; 

+-----------+-------------+ | col_index | nulls_count | +-----------+-------------+ |         0 |           0 | |         1 |           2 | |         2 |           3 | |         3 |           3 | +-----------+-------------+ 

Answers 2

Instead of describe database.table, you can use

Select column_name from system_schema.columns where keyspace_name='YOUR KEYSPACE' and table_name='YOUR TABLE'

There is also a column called kind in the above table with values like partition_key,clustering,regular.

The columns with values as partition_key and clustering will not have null values.

For other columns you can use

select sum(CASE WHEN col1 is NULL THEN 1 ELSE 0 END) as col1_cnt,sum(CASE WHEN col2 is NULL THEN 1 ELSE 0 END) as col2_cnt from table1 where col1 is null;

You can also try below query (Not tried myself)

SELECT COUNT(*)-COUNT(col1) As A, COUNT(*)-COUNT(col2) As B, COUNT(*)-COUNT(col3) As C FROM YourTable;  

May be for above query you can create variable for count instead of count(*) everytime.

Note: system_schema.columns is cassandra table and cassandra user should have read permission to this table

Answers 3

You will have to count null values from each column separately. For example -

select count(*) from mytable where col1 is null; select count(*) from mytable where col2 is null; 
Read More

Thursday, February 2, 2017

Dynamically generating JSON schema with Jackson

Leave a Comment

Is there a Jackson equivalent to Spark SQL's StructField where I can build expected schema from scratch? I already know the name of the field and the type of data.

Jackson JsonSchema module looks like the closest solution, but the examples of how to construct the JSON schema seem to be lacking, and a solution that doesn't involve creating a new POJO (perhaps JsonNode?) is preferred. The problem I have been having with using JsonNode is that it requires a value, and the schema is not supposed to have any value.

0 Answers

Read More

Thursday, January 26, 2017

SparkSQL - The correlated scalar subquery can only contain equality predicates

Leave a Comment

I would like to execute the following query with Spark SQL 2.0

SELECT a.id as id, (SELECT SUM(b.points)    FROM tableB b    WHERE b.id = a.id AND b.date <= a.date) AS points FROM tableA a 

but I get the following error

The correlated scalar subquery can only contain equality predicates.

Any idea how can I rewrite the query or use operations between the two datframes tableA and tableB to make it working?

1 Answers

Answers 1

select a.id as id,  sum(b.points) as points  from a, b  where a.id = b.id  and b.date <= a.date  group by a.id  ; 

Skip the sub-select and group by id to ensure a one to one relationship between ids and the sum of b's points column.

Here's a 'down and dirty' example which I used:

select * from a ;  id|date 1|2017-01-22 17:59:49 2|2017-01-22 18:00:00 3|2017-01-22 18:00:05 4|2017-01-22 18:00:11 5|2017-01-22 18:00:15  select * from b ; id|points|date 1|12|2017-01-21 18:03:20 3|25|2017-01-21 18:03:37 5|17|2017-01-21 18:03:55 2|-1|2017-01-22 18:04:27 4|-4|2017-01-22 18:04:35 5|400|2017-01-20 18:17:31 5|-1000|2017-01-23 18:18:36 

Notice that b has three entries of id = 5, two before a.date and one after.

select a.id, sum(b.points) as points from a, b where a.id = b.id and b.date <= a.date group by a.id ; 1|12 3|25 5|417 

I also confirmed "group by" is supported: http://spark.apache.org/docs/latest/sql-programming-guide.html#supported-hive-features

Read More

Tuesday, October 4, 2016

Scala & Spark: Recycling SQL statements

Leave a Comment

I spent quite some time to code multiple SQL queries that were formerly used to fetch the data for various R scripts. This is how it worked

sqlContent = readSQLFile("file1.sql") sqlContent = setSQLVariables(sqlContent, variables) results = executeSQL(sqlContent) 

The clue is, that for some queries a result from a prior query is required - why creating VIEWs in the database itself does not solve this problem. With Spark 2.0 I already figured out a way to do just that through

// create a dataframe using a jdbc connection to the database val tableDf = spark.read.jdbc(...) var tempTableName = "TEMP_TABLE" + java.util.UUID.randomUUID.toString.replace("-", "").toUpperCase var sqlQuery = Source.fromURL(getClass.getResource("/sql/" + sqlFileName)).mkString sqlQuery = setSQLVariables(sqlQuery, sqlVariables) sqlQuery = sqlQuery.replace("OLD_TABLE_NAME",tempTableName) tableDf.createOrReplaceTempView(tempTableName)  var data = spark.sql(sqlQuery) 

But this is in my humble opinion very fiddly. Also, more complex queries, e.g. queries that incooporate subquery factoring currently don't work. Is there a more robust way like re-implementing the SQL code into Spark.SQL code using filter($""), .select($""), etc.

The overall goal is to get multiple org.apache.spark.sql.DataFrames, each representing the results of one former SQL query (which always a few JOINs, WITHs, etc.). So n queries leading to n DataFrames.

Is there a better option than the provided two?

Setup: Hadoop v.2.7.3, Spark 2.0.0, Intelli J IDEA 2016.2, Scala 2.11.8, Testcluster on Win7 Workstation

0 Answers

Read More

Sunday, June 12, 2016

Read an unsupported mix of union types from an Avro file in Apache Spark

Leave a Comment

I'm trying to switch from reading csv flat files to avro files on spark. following https://github.com/databricks/spark-avro I use:

import com.databricks.spark.avro._ val sqlContext = new org.apache.spark.sql.SQLContext(sc) val df = sqlContext.read.avro("gs://logs.xyz.com/raw/2016/04/20/div1/div2/2016-04-20-08-28-35.UTC.blah-blah.avro") 

and get

java.lang.UnsupportedOperationException: This mix of union types is not supported (see README): ArrayBuffer(STRING) 

the readme file states clearly:

This library supports reading all Avro types, with the exception of complex union types. It uses the following mapping from Avro types to Spark SQL types:

when i try to textread the same file I can see the schema

val df = sc.textFile("gs://logs.xyz.com/raw/2016/04/20/div1/div2/2016-04-20-08-28-35.UTC.blah-blah.avro") df.take(2).foreach(println) 

{"name":"log_record","type":"record","fields":[{"name":"request","type":{"type":"record","name":"request_data","fields":[{"name":"datetime","type":"string"},{"name":"ip","type":"string"},{"name":"host","type":"string"},{"name":"uri","type":"string"},{"name":"request_uri","type":"string"},{"name":"referer","type":"string"},{"name":"useragent","type":"string"}]}}

<------- an excerpt of the full reply ------->

since I have little control on the format I'm getting these files in, my question here is - is there a workaround someone tested and can recommend?

I use gc dataproc with

MASTER=yarn-cluster spark-shell --num-executors 4 --executor-memory 4G --executor-cores 4 --packages com.databricks:spark-avro_2.10:2.0.1,com.databricks:spark-csv_2.11:1.3.0

any help would be greatly appreciated.....

0 Answers

Read More

Saturday, April 30, 2016

spark: read an unsupported mix of union types in avro file

Leave a Comment

I'm trying to switch from reading csv flat files to avro files on spark. following https://github.com/databricks/spark-avro I use:

import com.databricks.spark.avro._ val sqlContext = new org.apache.spark.sql.SQLContext(sc) val df = sqlContext.read.avro("gs://logs.xyz.com/raw/2016/04/20/div1/div2/2016-04-20-08-28-35.UTC.blah-blah.avro") 

and get

java.lang.UnsupportedOperationException: This mix of union types is not supported (see README): ArrayBuffer(STRING) 

the readme file states clearly:

This library supports reading all Avro types, with the exception of complex union types. It uses the following mapping from Avro types to Spark SQL types:

when i try to textread the same file I can see the schema

val df = sc.textFile("gs://logs.xyz.com/raw/2016/04/20/div1/div2/2016-04-20-08-28-35.UTC.blah-blah.avro") df.take(2).foreach(println) 

{"name":"log_record","type":"record","fields":[{"name":"request","type":{"type":"record","name":"request_data","fields":[{"name":"datetime","type":"string"},{"name":"ip","type":"string"},{"name":"host","type":"string"},{"name":"uri","type":"string"},{"name":"request_uri","type":"string"},{"name":"referer","type":"string"},{"name":"useragent","type":"string"}]}}

<------- an excerpt of the full reply ------->

since I have little control on the format I'm getting these files in, my question here is - is there a workaround someone tested and can recommend?

I use gc dataproc with

MASTER=yarn-cluster spark-shell --num-executors 4 --executor-memory 4G --executor-cores 4 --packages com.databricks:spark-avro_2.10:2.0.1,com.databricks:spark-csv_2.11:1.3.0

any help would be greatly appreciated.....

0 Answers

Read More

Friday, April 1, 2016

Not able to change Authentication in spark-cassandra-connector

Leave a Comment

I am creating one Spark-Cassandra App (Spark 1.6.0 & spark-cassandra-connector 1.6.0-M1), in which i am asking multiple users to enter their Cassandra properties like Host, Username, Password, Keyspace, Table and others.

To change the above properties dynamically and create dataframe from Cassandra table, I Googled and found out some information

http://www.russellspitzer.com/2016/02/16/Multiple-Clusters-SparkSql-Cassandra/

https://github.com/datastax/spark-cassandra-connector/blob/master/doc/14_data_frames.md#setting-cluster-and-keyspace-level-options

val csc = new CassandraSQLContext(SparkConnection._sc)  csc.setConf(s"${cluster}/spark.cassandra.connection.host", host) csc.setConf(s"${cluster}/spark.cassandra.connection.port", port) csc.setConf(s"${cluster}/spark.cassandra.auth.username", username) csc.setConf(s"${cluster}/spark.cassandra.auth.password", password)  csc.read.format("org.apache.spark.sql.cassandra")                   .options(Map("cluster" -> cluster, "keyspace" -> keySpace, "table" -> table))                   .load() 

I tried with mention properties, Clusters those doesn't require authentication is connecting successfully but when i try to connect with secure cluster using username & password properties, i am getting some error.

Exception in thread "Thread-10" java.io.IOException: Failed to open native connection to Cassandra at {192.168.1.17}:9042     at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:162)     at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:148)     at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:148)     at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)     at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56)     at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:81)     at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:109)     at com.datastax.spark.connector.rdd.partitioner.CassandraRDDPartitioner$.getTokenFactory(CassandraRDDPartitioner.scala:184)     at org.apache.spark.sql.cassandra.CassandraSourceRelation$.apply(CassandraSourceRelation.scala:267)     at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:57)     at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:158)     at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:119)     at com.bdbizviz.pa.spark.util.ServiceUtil$.readData(ServiceUtil.scala:97)     at com.bdbizviz.pa.spark.services.SparkServices$$anon$1.run(SparkServices.scala:114)     at java.lang.Thread.run(Thread.java:745) Caused by: com.datastax.driver.core.exceptions.AuthenticationException: Authentication error on host /192.168.1.17:9042: Host /192.168.1.17:9042 requires authentication, but no authenticator found in Cluster configuration     at com.datastax.driver.core.AuthProvider$1.newAuthenticator(AuthProvider.java:40)     at com.datastax.driver.core.Connection$5.apply(Connection.java:250)     at com.datastax.driver.core.Connection$5.apply(Connection.java:234)     at com.google.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:861)     at com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297)     at com.google.common.util.concurrent.ExecutionList.executeListener(ExecutionList.java:156)     at com.google.common.util.concurrent.ExecutionList.execute(ExecutionList.java:145)     at com.google.common.util.concurrent.AbstractFuture.set(AbstractFuture.java:185)     at com.datastax.driver.core.Connection$Future.onSet(Connection.java:1174)     at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:1005)     at com.datastax.driver.core.Connection$Dispatcher.channelRead0(Connection.java:928)     at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)     at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)     at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)     at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)     at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)     at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)     at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:831)     at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:346)     at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:254)     at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)     ... 1 more 

0 Answers

Read More

Monday, March 21, 2016

Spark: java.lang.RuntimeException: [1.226] failure: identifier expected

Leave a Comment

UPDATED 3/11

I am running into an error I am receiving from Spark based on my SparkSQL query. I am running spark 1.2.1 version. I tried checking my query against several of the answers I found on stack overflow, but I am unable to diagnose exactly what the issue is here.

The error:

Application Failed...java.lang.RuntimeException: [1.226] failure: identifier expected  SELECT A_HOSTNAME, A_MODEL FROM (SELECT A_HOSTNAME, A_MODEL, COUNT(*) FROM (SELECT A_HOSTNAME, A_IF_DESC, A_MODEL FROM TOPOLOGY WHERE IS_PROD > 0 AND A_TYPE = 'GWR' AND Z_TYPE = 'LCR' GROUP BY A_HOSTNAME, A_IF_DESC, A_MODEL) GROUP BY A_HOSTNAME, A_MODEL HAVING COUNT(*) > 1) GROUP BY A_HOSTNAME, A_MODEL 

Code: modelTopology.java

public class modelTopology implements Serializable { //Variables are here + getters and setters } 

Code: Create JavaSchemaRDD from RDD as modelTOpologySchema

 JavaRDD<modelTopology> MODEL_TOPOLOGYRDD = TopologyRDD.map(                 new Function<Object[], modelTopology>() {                     public modelTopology call(Object[] line) throws Exception {                         modelTopology toporow = new modelTopology();                         toporow.setA_TYPE(line[0].toString().trim());                         toporow.setZ_TYPE(line[1].toString().trim());                         toporow.setA_CLLI(line[2].toString().trim());                         toporow.setZ_CLLI(line[3].toString().trim());                         toporow.setA_HOSTNAME(line[4].toString().trim());                         toporow.setZ_HOSTNAME(line[5].toString().trim());                         toporow.setA_LOCATION(line[6].toString().trim());                         toporow.setA_LOC_TYPE(line[7].toString().trim());                         toporow.setZ_LOCATION(line[8].toString().trim());                         toporow.setZ_LOC_TYPE(line[9].toString().trim());                         toporow.setA_SHELF(line[10].toString().trim());                         toporow.setA_SLOT(line[11].toString().trim());                         toporow.setA_CARD(line[12].toString().trim());                         toporow.setA_PORT(line[13].toString().trim());                         toporow.setA_INTERFACE(line[14].toString().trim());                         toporow.setA_IF_DESC(line[15].toString().trim());                         toporow.setZ_SHELF(line[16].toString().trim());                         toporow.setZ_SLOT(line[17].toString().trim());                         toporow.setZ_CARD(line[18].toString().trim());                         toporow.setZ_PORT(line[19].toString().trim());                         toporow.setZ_INTERFACE(line[20].toString().trim());                         toporow.setZ_IF_DESC(line[21].toString().trim());                         toporow.setA_CARD_NAME(line[22].toString().trim());                         toporow.setZ_CARD_NAME(line[23].toString().trim());                         toporow.setPHY_CIRCUIT_ID(line[24].toString().trim());                         toporow.setLAG_CIRCUIT_ID(line[25].toString().trim());                         toporow.setPHY_CIRCUIT_ALIAS(line[26].toString().trim());                         toporow.setA_VENDOR(line[27].toString().trim());                         toporow.setA_MODEL(line[28].toString().trim());                         toporow.setA_TECHNOLOGY(line[29].toString().trim());                         toporow.setZ_VENDOR(line[30].toString().trim());                         toporow.setZ_MODEL(line[31].toString().trim());                         toporow.setZ_TECHNOLOGY(line[32].toString().trim());                         toporow.setA_EH_ELEMENT_ID(line[33].toString().trim());                         toporow.setA_EH_MACHINE_ID(line[34].toString().trim());                         toporow.setZ_EH_ELEMENT_ID(line[35].toString().trim());                         toporow.setZ_EH_MACHINE_ID(line[36].toString().trim());                         toporow.setA_EH_SPEED(line[37].toString().trim());                         toporow.setZ_EH_SPEED(line[38].toString().trim());                         toporow.setA_EH_SPEED1(line[39].toString().trim());                         toporow.setZ_EH_SPEED1(line[40].toString().trim());                         toporow.setA_EH_EHEALTH_DOMAIN(line[41].toString().trim());                         toporow.setZ_EH_EHEALTH_DOMAIN(line[42].toString().trim());                         toporow.setA_MRTG_HOSTID(line[43].toString().trim());                         toporow.setA_MRTG_GRPID(line[44].toString().trim());                         toporow.setA_MRTG_IFID(line[45].toString().trim());                         toporow.setZ_MRTG_HOSTID(line[46].toString().trim());                         toporow.setZ_MRTG_GRPID(line[47].toString().trim());                         toporow.setZ_MRTG_IFID(line[48].toString().trim());                         toporow.setA_MGMT_IP(line[49].toString().trim());                         toporow.setZ_MGMT_IP(line[50].toString().trim());                         toporow.setA_IF_INDEX(line[51].toString().trim());                         toporow.setZ_IF_INDEX(line[52].toString().trim());                         toporow.setIS_PROD(line[53].toString().trim());                         toporow.setTOPOLOGY_KEY(line[54].toString().trim());                         toporow.setCOMMIT_TS(line[55].toString().trim());                          return toporow;                     }                 });          JavaSchemaRDD schemaTopology = sqlContext.applySchema(MODEL_TOPOLOGYRDD, modelTopology.class);         schemaTopology.registerAsTable("TOPOLOGY");          JavaSchemaRDD FILTERED_TOPOLOGY = sqlContext.sql("SELECT A_HOSTNAME, A_MODEL FROM (SELECT A_HOSTNAME, A_MODEL, COUNT(*) FROM (SELECT A_HOSTNAME, A_IF_DESC, A_MODEL FROM TOPOLOGY WHERE IS_PROD > 0 AND A_TYPE = 'GWR' AND Z_TYPE = 'LCR' GROUP BY A_HOSTNAME, A_IF_DESC, A_MODEL) GROUP BY A_HOSTNAME, A_MODEL HAVING COUNT(*) > 1) GROUP BY A_HOSTNAME, A_MODEL").cache(); 

JavaSchemaRDD Layout

root  |-- COMMIT_TS: string (nullable = true)  |-- IS_PROD: string (nullable = true)  |-- LAG_CIRCUIT_ID: string (nullable = true)  |-- PHY_CIRCUIT_ALIAS: string (nullable = true)  |-- PHY_CIRCUIT_ID: string (nullable = true)  |-- TOPOLOGY_KEY: string (nullable = true)  |-- a_CARD: string (nullable = true)  |-- a_CARD_NAME: string (nullable = true)  |-- a_CLLI: string (nullable = true)  |-- a_EH_EHEALTH_DOMAIN: string (nullable = true)  |-- a_EH_ELEMENT_ID: string (nullable = true)  |-- a_EH_MACHINE_ID: string (nullable = true)  |-- a_EH_SPEED: string (nullable = true)  |-- a_EH_SPEED1: string (nullable = true)  |-- a_HOSTNAME: string (nullable = true)  |-- a_IF_DESC: string (nullable = true)  |-- a_IF_INDEX: string (nullable = true)  |-- a_INTERFACE: string (nullable = true)  |-- a_LOCATION: string (nullable = true)  |-- a_LOC_TYPE: string (nullable = true)  |-- a_MGMT_IP: string (nullable = true)  |-- a_MODEL: string (nullable = true)  |-- a_MRTG_GRPID: string (nullable = true)  |-- a_MRTG_HOSTID: string (nullable = true)  |-- a_MRTG_IFID: string (nullable = true)  |-- a_PORT: string (nullable = true)  |-- a_SHELF: string (nullable = true)  |-- a_SLOT: string (nullable = true)  |-- a_TECHNOLOGY: string (nullable = true)  |-- a_TYPE: string (nullable = true)  |-- a_VENDOR: string (nullable = true)  |-- z_CARD: string (nullable = true)  |-- z_CARD_NAME: string (nullable = true)  |-- z_CLLI: string (nullable = true)  |-- z_EH_EHEALTH_DOMAIN: string (nullable = true)  |-- z_EH_ELEMENT_ID: string (nullable = true)  |-- z_EH_MACHINE_ID: string (nullable = true)  |-- z_EH_SPEED: string (nullable = true)  |-- z_EH_SPEED1: string (nullable = true)  |-- z_HOSTNAME: string (nullable = true)  |-- z_IF_DESC: string (nullable = true)  |-- z_IF_INDEX: string (nullable = true)  |-- z_INTERFACE: string (nullable = true)  |-- z_LOCATION: string (nullable = true)  |-- z_LOC_TYPE: string (nullable = true)  |-- z_MGMT_IP: string (nullable = true)  |-- z_MODEL: string (nullable = true)  |-- z_MRTG_GRPID: string (nullable = true)  |-- z_MRTG_HOSTID: string (nullable = true)  |-- z_MRTG_IFID: string (nullable = true)  |-- z_PORT: string (nullable = true)  |-- z_SHELF: string (nullable = true)  |-- z_SLOT: string (nullable = true)  |-- z_TECHNOLOGY: string (nullable = true)  |-- z_TYPE: string (nullable = true)  |-- z_VENDOR: string (nullable = true) 

EDIT 3/11/2016 Edited my sql statement per subquery feedback received below.

   JavaSchemaRDD FILTERED_TOPOLOGY = sqlContext.sql("SELECT t2.A_HOSTNAME, t2.A_MODEL FROM " +                     "(SELECT t1.A_HOSTNAME, t1.A_MODEL, COUNT(*) FROM " +                     "(SELECT A_HOSTNAME, A_IF_DESC, A_MODEL " +                     "FROM TOPOLOGY WHERE IS_PROD > 0 AND A_TYPE = 'GWR' AND Z_TYPE = 'LCR' " +                     "GROUP BY A_HOSTNAME, A_IF_DESC, A_MODEL) t1 " +                     "GROUP BY A_HOSTNAME, A_MODEL HAVING COUNT(*) > 1) t2 " +                     "GROUP BY t2.A_HOSTNAME, t2.A_MODEL").cache(); 

Error:

Application Failed...org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 't2.A_HOSTNAME.,'t2.A_MODEL.,'t2.A_HOSTNAME.,'t2.A_MODEL., tree: 'Aggregate ['t2.A_HOSTNAME.,'t2.A_MODEL.], ['t2.A_HOSTNAME.,'t2.A_MODEL.]  'Subquery t2   'Filter (COUNT(1) > CAST(1, LongType))    'Aggregate ['A_HOSTNAME,'A_MODEL], ['t1.A_HOSTNAME.,'t1.A_MODEL.,COUNT(1) AS c2#59L]     'Subquery t1      'Aggregate ['A_HOSTNAME,'A_IF_DESC,'A_MODEL], ['A_HOSTNAME,'A_IF_DESC,'A_MODEL]       'Filter (((CAST(IS_PROD#1, DoubleType) > CAST(0, DoubleType)) && ('A_TYPE = GWR)) && ('Z_TYPE = LCR))        Subquery TOPOLOGY         LogicalRDD [COMMIT_TS#0,IS_PROD#1,LAG_CIRCUIT_ID#2,PHY_CIRCUIT_ALIAS#3,PHY_CIRCUIT_ID#4,TOPOLOGY_KEY#5,a_CARD#6,a_CARD_NAME#7,a_CLLI#8,a_EH_EHEALTH_DOMAIN#9,a_EH_ELEMENT_ID#10,a_EH_MACHINE_ID#11,a_EH_SPEED#12,a_EH_SPEED1#13,a_HOSTNAME#14,a_IF_DESC#15,a_IF_INDEX#16,a_INTERFACE#17,a_LOCATION#18,a_LOC_TYPE#19,a_MGMT_IP#20,a_MODEL#21,a_MRTG_GRPID#22,a_MRTG_HOSTID#23,a_MRTG_IFID#24,a_PORT#25,a_SHELF#26,a_SLOT#27,a_TECHNOLOGY#28,a_TYPE#29,a_VENDOR#30,z_CARD#31,z_CARD_NAME#32,z_CLLI#33,z_EH_EHEALTH_DOMAIN#34,z_EH_ELEMENT_ID#35,z_EH_MACHINE_ID#36,z_EH_SPEED#37,z_EH_SPEED1#38,z_HOSTNAME#39,z_IF_DESC#40,z_IF_INDEX#41,z_INTERFACE#42,z_LOCATION#43,z_LOC_TYPE#44,z_MGMT_IP#45,z_MODEL#46,z_MRTG_GRPID#47,z_MRTG_HOSTID#48,z_MRTG_IFID#49,z_PORT#50,z_SHELF#51,z_SLOT#52,z_TECHNOLOGY#53,z_TYPE#54,z_VENDOR#55], MapPartitionsRDD[2] at mapPartitions at JavaSQLContext.scala:102 

3 Answers

Answers 1

Spark SQL is basically a HiveQL and accepts subqueries only when followed with aliases:

SELECT ... FROM (subquery) name ... SELECT ... FROM (subquery) AS name ...   

See Hive Language Manual - SubQueries

Answers 2

Your SQL is unnecessarily complicated and is missing aliases for the subqueries.

Here is a simplified query (on multiple lines for readability):

SELECT A_HOSTNAME, A_MODEL FROM   (SELECT A_HOSTNAME, A_IF_DESC, A_MODEL   FROM TOPOLOGY   WHERE IS_PROD > 0 AND A_TYPE = 'GWR' AND Z_TYPE = 'LCR') D GROUP BY A_HOSTNAME, A_MODEL HAVING COUNT(*) > 1; 

This returns hostname and model for devices having more than one interface, which is what I understand you want to do.

Answers 3

I don't understand you query, as it seems way too complicated. Is the following one the same :

SELECT A_HOSTNAME, A_MODEL FROM TOPOLOGY  WHERE IS_PROD > 0 AND A_TYPE = 'GWR' AND Z_TYPE = 'LCR'  GROUP BY A_HOSTNAME, A_MODEL HAVING COUNT(*) > 1 

And as asked by zero323 why are you running a very old version of Spark, can you try with at least a 1.5.x version (or the new 1.6.1 version)

Regards,

Loïc

Read More