Showing posts with label apache-spark. Show all posts
Showing posts with label apache-spark. Show all posts

Saturday, October 13, 2018

How to fix data shuffling issue to avoid exception: java.lang.OutOfMemoryError: GC overhead limit exceeded while saving a dataframe as a Hive table?

Leave a Comment

I know there are lots of questions with same title, but this scenario is different. Hence just read it once before you think it is a duplicate.

I am trying to move data from a table in postgres table to a Hive table on HDFS. To do that, I came up with the following code:

  val conf  = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.sql.orc.filterPushdown","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max","512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.yarn.driver.memoryOverhead","7168").set("spark.yarn.executor.memoryOverhead","7168").set("spark.sql.shuffle.partitions", "61").set("spark.default.parallelism", "60").set("spark.memory.storageFraction","0.5").set("spark.memory.fraction","0.6").set("spark.memory.offHeap.enabled","true").set("spark.memory.offHeap.size","16g").set("spark.dynamicAllocation.enabled", "false").set("spark.dynamicAllocation.enabled","true").set("spark.shuffle.service.enabled","true")   val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()   def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = {         val colList                = allColumns.split(",").toList         val (partCols, npartCols)  = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ' ')))         val queryCols              = npartCols.mkString(",") + ", 0 as " + flagCol + "," + partCols.reverse.mkString(",")         val execQuery              = s"select ${allColumns}, 0 as ${flagCol} from schema.tablename where period_year='2017' and period_num='12'"         val yearDF                 = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2017")                                                                       .option("user", devUserName).option("password", devPassword)                                                                       .option("partitionColumn","cast_id")                                                                       .option("lowerBound", 1).option("upperBound", 100000)                                                                       .option("numPartitions",70).load()         val totalCols:List[String] = splitColumns ++ textList         val cdt                    = new ChangeDataTypes(totalCols, dataMapper)         hiveDataTypes              = cdt.gpDetails()         val fc                     = prepareHiveTableSchema(hiveDataTypes, partition_columns)         val allColsOrdered         = yearDF.columns.diff(partition_columns) ++ partition_columns         val allCols                = allColsOrdered.map(colname => org.apache.spark.sql.functions.col(colname))         val resultDF               = yearDF.select(allCols:_*)         val stringColumns          = resultDF.schema.fields.filter(x => x.dataType == StringType).map(s => s.name)         val finalDF                = stringColumns.foldLeft(resultDF) {           (tempDF, colName) => tempDF.withColumn(colName, regexp_replace(regexp_replace(col(colName), "[\r\n]+", " "), "[\t]+"," "))         }         finalDF   }     val dataDF = prepareFinalDF(splitColumns, textList, allColumns, dataMapper, partition_columns, spark)     val dataDFPart = dataDF.repartition(30)     dataDFPart.createOrReplaceTempView("preparedDF")     spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")     spark.sql("set hive.exec.dynamic.partition=true")     spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF") 

The data is inserted into the hive table dynamically partitioned based on prtn_String_columns: source_system_name, period_year, period_num

Spark-submit used:

SPARK_MAJOR_VERSION=2 spark-submit --conf spark.ui.port=4090 --driver-class-path /home/fdlhdpetl/jars/postgresql-42.1.4.jar  --jars /home/fdlhdpetl/jars/postgresql-42.1.4.jar --num-executors 80 --executor-cores 5 --executor-memory 50G --driver-memory 20G --driver-cores 3 --class com.partition.source.YearPartition splinter_2.11-0.1.jar --master=yarn --deploy-mode=cluster --keytab /home/fdlhdpetl/fdlhdpetl.keytab --principal fdlhdpetl@FDLDEV.COM --files /usr/hdp/current/spark2-client/conf/hive-site.xml,testconnection.properties --name Splinter --conf spark.executor.extraClassPath=/home/fdlhdpetl/jars/postgresql-42.1.4.jar 

The following error messages are generated in the executor logs:

Container exited with a non-zero exit code 143. Killed by external signal 18/10/03 15:37:24 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[SIGTERM handler,9,system] java.lang.OutOfMemoryError: Java heap space     at java.util.zip.InflaterInputStream.<init>(InflaterInputStream.java:88)     at java.util.zip.ZipFile$ZipFileInflaterInputStream.<init>(ZipFile.java:393)     at java.util.zip.ZipFile.getInputStream(ZipFile.java:374)     at java.util.jar.JarFile.getManifestFromReference(JarFile.java:199)     at java.util.jar.JarFile.getManifest(JarFile.java:180)     at sun.misc.URLClassPath$JarLoader$2.getManifest(URLClassPath.java:944)     at java.net.URLClassLoader.defineClass(URLClassLoader.java:450)     at java.net.URLClassLoader.access$100(URLClassLoader.java:73)     at java.net.URLClassLoader$1.run(URLClassLoader.java:368)     at java.net.URLClassLoader$1.run(URLClassLoader.java:362)     at java.security.AccessController.doPrivileged(Native Method)     at java.net.URLClassLoader.findClass(URLClassLoader.java:361)     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)     at org.apache.spark.util.SignalUtils$ActionHandler.handle(SignalUtils.scala:99)     at sun.misc.Signal$1.run(Signal.java:212)     at java.lang.Thread.run(Thread.java:745) 

I see in the logs that the read is being executed properly with the given number of partitions as below:

Scan JDBCRelation((select column_names from schema.tablename where period_year='2017' and period_num='12') as year2017) [numPartitions=50] 

Below is the state of executors in stages: enter image description here

enter image description here

enter image description here

enter image description here

The data is not being partitioned properly. One partition is smaller while the other one becomes huge. There is a skew problem here. While inserting the data into Hive table the job fails at the line:spark.sql(s"INSERT OVERWRITE TABLE schema.hivetable PARTITION(${prtn_String_columns}) select * from preparedDF") but I understand this is happening because of the data skew problem.

I tried to increase number of executors, increasing the executor memory, driver memory, tried to just save as csv file instead of saving the dataframe into a Hive table but nothing affects the execution from giving the exception:

java.lang.OutOfMemoryError: GC overhead limit exceeded 

Is there anything in the code that I need to correct ? Could anyone let me know how can I fix this problem ?

3 Answers

Answers 1

  1. Determine how many partitions you need given the amount of input data and your cluster resources. As a rule of thumb it is better to keep partition input under 1GB unless strictly necessary. and strictly smaller than the block size limit.

    You've previously stated that you migrate 1TB of data values you use in different posts (5 - 70) are likely way to low to ensure smooth process.

    Try to use value which won't require further repartitioning.

  2. Know your data.

    Analyze the columns available in the the dataset to determine if there any columns with high cardinality and uniform distribution to be distributed among desired number of partitions. These are good candidates for an import process. Additionally you should determine an exact range of values.

    Aggregations with different centrality and skewness measure as well as histograms and basic counts-by-key are good exploration tools. For this part it is better to analyze data directly in the database, instead of fetching it to Spark.

  3. If there are no columns which satisfy above criteria consider:

    • Creating a custom one and exposing it via. a view. Hashes over multiple independent columns are usually good candidates. Please consult your database manual to determine functions that can be used here (DBMS_CRYPTO in Oracle, pgcrypto in PostgreSQL)*.
    • Using a set of independent columns which taken together provide high enough cardinality.

      Optionally, if you're going to write to a partitioned Hive table, you should consider including Hive partitioning columns. It might limit the number of files generated later.

  4. Prepare partitioning arguments

    • If column selected or created in the previous steps is numeric provide it directly as the partitionColumn and use range values determined before to fill lowerBound and upperBound.

      If bound values don't reflect the properties of data (min(col) for lowerBound, max(col) for upperBound) it can result in a significant data skew so thread carefully. In the worst case scenario, when bounds don't cover the range of data, all records will be fetched by a single machine, making it no better than no partitioning at all.

    • If column selected in the previous steps is categorical or is a set of columns generate a list of mutually exclusive predicates that fully cover the data, in a form that can be used in a SQL where clause.

      For example if you have a column A with values {a1, a2, a3} and column B with values {b1, b2, b3}:

      val predicates = for {   a <- Seq("a1", "a2", "a3")   b <- Seq("b1", "b2", "b3") } yield s"A = $a AND B = $b" 

      Double check that conditions don't overlap and all combinations are covered. If these conditions are not satisfied you end up with duplicates or missing records respectively.

      Pass data as predicates argument to jdbc call. Note that the number of partitions will be equal exactly to the number of predicates.

  5. Put database in a read-only mode (any ongoing writes can cause data inconsistency. If possible you should lock database before you start the whole process, but if might be not possible, in your organization).

  6. If the number of partitions matches the desired output load data without repartition and dump directly to the sink, if not you can try to repartition following the same rules as in the step 1.

  7. If you still experience any problems make sure that you've properly configured Spark memory and GC options.

  8. If none of the above works:

    • Consider dumping your data to a network / distributes storage using tools like COPY TO and read it directly from there.

      Note that or standard database utilities you will typically need a POSIX compliant file system, so HDFS usually won't do.

      The advantage of this approach is that you don't need to worry about the column properties, and there is no need for putting data in a read-only mode, to ensure consistency.

    • Using dedicated bulk transfer tools, like Apache Sqoop, and reshaping data afterwards.


* Don't use pseudocolumns - Pseudocolumn in Spark JDBC.

Answers 2

In my experience there are 4 kinds of memory settings which make a difference:

A) [1] Memory for storing data for processing reasons VS [2] Heap Space for holding the program stack

B) [1] Driver VS [2] executor memory

Up to now, I was always able to get my Spark jobs running successfully by increasing the appropriate kind of memory:

A2-B1 would therefor be the memory available on the driver to hold the program stack. Etc.

The property names are as follows:

A1-B1) executor-memory

A1-B2) driver-memory

A2-B1) spark.yarn.executor.memoryOverhead

A2-B2) spark.yarn.driver.memoryOverhead

Keep in mind that the sum of all *-B1 must be less than the available memory on your workers and the sum of all *-B2 must be less than the memory on your driver node.

My bet would be, that the culprit is one of the boldly marked heap settings.

Answers 3

There was an another question of yours routed here as duplicate

 'How to avoid data skewing while reading huge datasets or tables into spark?    The data is not being partitioned properly. One partition is smaller while the    other one becomes huge on read.   I observed that one of the partition has nearly 2million rows and    while inserting there is a skew in partition. ' 

if the problem is to deal with data that is partitioned in a dataframe after read, Have you played around increasing the "numPartitions" value ?

.option("numPartitions",50) 

lowerBound, upperBound form partition strides for generated WHERE clause expressions and numpartitions determines the number of split.

say for example, sometable has column - ID (we choose that as partitionColumn) ; value range we see in table for column-ID is from 1 to 1000 and we want to get all the records by running select * from sometable, so we going with lowerbound = 1 & upperbound = 1000 and numpartition = 4

this will produce a dataframe of 4 partition with result of each Query by building sql based on our feed (lowerbound = 1 & upperbound = 1000 and numpartition = 4)

select * from sometable where ID < 250 select * from sometable where ID >= 250 and ID < 500 select * from sometable where ID >= 500 and ID < 750 select * from sometable where ID >= 750 

what if most of the records in our table fall within the range of ID(500,750). that's the situation you are in to.

when we increase numpartition , the split happens even further and that reduce the volume of records in the same partition but this is not a fine shot.

Instead of spark splitting the partitioncolumn based on boundaries we provide, if you think of feeding the split by yourself so, data can be evenly splitted. you need to switch over to another JDBC method where instead of (lowerbound,upperbound & numpartition) we can provide predicates directly.

def jdbc(url: String, table: String, predicates: Array[String], connectionProperties: Properties): DataFrame  

Link

Read More

Sunday, September 30, 2018

Python lagged series to Pyspark

Leave a Comment

I am trying to do adapt this Python code in pyspark:

from statsmodels.tsa.tsatools import lagmat  def lag_func(data,lag):     lag = lag     X = lagmat(data["diff"], lag)     lagged = data.copy()     for c in range(1,lag+1):         lagged["lag%d" % c] = X[:, c-1]     return lagged  def diff_creation(data):     data["diff"] = np.nan     data.ix[1:, "diff"] = (data.iloc[1:, 1].as_matrix() - data.iloc[:len(data)-1, 1].as_matrix())     return data 

The result is a dataframe with lagged columns.

I tried something like that:

class SerieMaker(Transformer):     def __init__(self, inputCol='f_qty_recalc', outputCol='serie', dateCol='dt_ticket_sale', idCol= ['id_store', 'id_sku'], serieSize=30):         self.inputCol = inputCol         self.outputCol = outputCol         self.dateCol = dateCol         self.serieSize = serieSize         self.idCol = idCol      def _transform(self, df):         window = Window.partitionBy(self.idCol).orderBy(self.dateCol)         series = []             df = df.withColumn('filled_serie', F.lit(0))          """ 30 days lag"""          for index in reversed(range(0, self.serieSize)):             window2 = Window.partitionBy(self.idCol).orderBy(self.dateCol).rowsBetween((self.serieSize - index), self.serieSize)             col_name = (self.outputCol + '%s' % index)             series.append(col_name)             df = df.withColumn(col_name, F.when(F.isnull(F.lag(F.col(self.inputCol), index).over(window)),                                                  F.first(F.col(self.inputCol),                                                          ignorenulls=True).over(window2)).otherwise(F.lag(F.col(self.inputCol),                                                                                                           index).over(window)))             df = df.withColumn('filled_serie', F.when(F.isnull(F.lag(F.col(self.inputCol), index).over(window)),                                                        (F.col('filled_serie') + 1)).otherwise(F.col('filled_serie')))             df = df.withColumn('rank', F.rank().over(window))             return df.withColumn(self.outputCol, F.col(*series)) 

My df looks like:

  id_sku|id_store|     dt_ticket_sale|f_qty_recalc|prc_sku|sales| +------------+--------+-------------------+------------+-------+-----+ |    514655.0|    1090|2017-12-20 00:00:00|           1|   1.23| 1.23| |    823259.0|     384|2017-12-20 00:00:00|           1|   2.79| 2.79| 

My expected output is some lag of fqty_recalc and at the beginning idsku idstore and date (not shown there):

    diff    lag1    lag2    lag3    lag4    lag5    lag6    lag7    lag8    lag9    ... lag20   lag21   lag22   lag23   lag24   lag25   lag26   lag27   lag28   lag29 0   NaN 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1   0.0 NaN 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 ... 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 

0 Answers

Read More

Thursday, September 13, 2018

Are failed spark executors a cause for concern?

Leave a Comment

I understand that Apache Spark is designed around resilient data structures, but are failures expected during a running system or does this typically indicate a problem?

As I begin to scale the system out to different configurations, I see ExecutorLostFailure and No more replicas (See below). The system recovers and the program finishes.

Should I be concerned with this, and are there typically things we can do to avoid this; or is this expected as the number of executors grow?

18/05/18 23:59:00 WARN TaskSetManager: Lost task 87.0 in stage 4044.0 (TID 391338, ip-10-0-0-68.eu-west-1.compute.internal, executor 11): ExecutorLostFailure (executor 11 exited caused by one of the running tasks) Reason: Container marked as failed: container_1526667532988_0010_01_000012 on host: ip-10-0-0-68.eu-west-1.compute.internal. Exit status: -100. Diagnostics: Container released on a *lost* node 18/05/18 23:59:00 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_193_7 ! 18/05/18 23:59:00 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_582_50 ! 18/05/18 23:59:00 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_401_91 ! 18/05/18 23:59:00 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_582_186 ! 18/05/18 23:59:00 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_115_139 ! 

2 Answers

Answers 1

As I begin to scale the system out to different configurations, I see ExecutorLostFailure and No more replicas (See below). Should I be concerned with this?

You are right, this exception does not necessarily mean that something is wrong about your Spark job, because it will be thrown even in cases, where a server stopped working because of physical reasons (e.g. outage).

However, if you see multiple executor failures in your job, this is probably a signal that something can probably be improved. More specifically, the spark configuration contains a parameter called spark.task.maxFailures, which corresponds to the maximum number of failures for each task, after which a job will be considered as failed. As a result, in a well-behaved Spark job, you might see some executor failures, but they should be rare and you should rarely see a specific task failing multiple times, because then it probably means that it's not the fault of the executor, but the task is extremely heavy to deal with.

Are there typically things we can do to avoid this?

That depends a lot in the nature of your job. However, as said before the usual suspect is that the created task is too heavy for an executor (e.g. in terms of memory required). Spark creates a number of partitions for each RDD, based on several factors, such as the size of your cluster. However, if for example your cluster is quite small, Spark might create partitions that are very big in size and cause problems to the executors. So, you can try re-partitioning the RDDs in your code to enforce more, smaller partitions, which can be processed more easily.

Answers 2

More important than how many failures are you receiving, you should look at the cause of these failures.

If the cause of the failure is related to network problems, it is ok. That is expected on distributed systems. When you have many machines talking to each other, at some moment you will have some communication issues.

But, if the cause of the error is related to resources consuming, then you may have a dangerous problem. In general, all slaves have similar specs. If some job is requiring more resources than available on some slave, probably this will happen again and again in the next slaves. They will keep failing and failing until becoming all unresponsible, in a Domino effect.

In this last case, you may need to rethink and rewrite your code to reduce the among of memory or disk necessary to each step on each slave do the job. Some common improvements are making all the filters before the grouping or changing the grouping by key strategy.

Read More

Thursday, September 6, 2018

Limiting maximum size of dataframe partition

Leave a Comment

When I write out a dataframe to, say, csv, a .csv file is created for each partition. Suppose I want to limit the max size of each file to, say, 1 MB. I could do the write multiple times and increase the argument to repartition each time. Is there a way I can calculate ahead of time what argument to use for repartition to ensure the max size of each file is less than some specified size.

I imagine there might be pathological cases where all the data ends up on one partition. So make the weaker assumption that we only want to ensure that the average file size is less than some specified amount, say 1 MB.

2 Answers

Answers 1

1. Single dataframe solution

I was trying to find out some clever idea that would not kill the cluster at the same time and the only thing that came to my mind was:

  1. Calculate the size of the serialized row
  2. Get no. of rows in your DataFrame
  3. Repartition, by dividing with the expected size
  4. Should work?

The code should look more like this:

val df: DataFrame = ??? // your df val rowSize = getBytes(df.head) val rowCount = df.count() val partitionSize = 1000000 // million bytes in MB? val noPartitions: Int = (rowSize * rowCount / partitionSize).toInt df.repartition(noPartitions).write.format(...) // save to csv  // just helper function from https://stackoverflow.com/a/39371571/1549135 def getBytes(value: Any): Long = {   val stream: ByteArrayOutputStream = new ByteArrayOutputStream()   val oos = new ObjectOutputStream(stream)   oos.writeObject(value)   oos.close   stream.toByteArray.length } 

While my first choice was to calculate each row's byte size, that would be terribly inefficient. So, unless your data size in each row differs in size greatly, I would say that this solution will work. You can also calculate every n-th row size. You got the idea.

Also, I just 'hope' that Long will be big enough to support the expected size to calculate noPartitions. If not (if you have a lot of rows), maybe it would be better to change the operations order, f.e.:

val noPartitions: Int = (rowSize / partitionSize * rowCount).toInt 

again this is just a drafted idea with no domain knowledge about your data.

2. Cross system solution

While going through the apache-spark docs I have found an interesting cross-system solution:

spark.sql.files.maxPartitionBytes which sets:

The maximum number of bytes to pack into a single partition when reading files.

The default value is 134217728 (128 MB).

So I suppose you could set it to 1000000 (1MB) and it will have a permanent effect on your DataFrames. However, too small partition size may greatly impact your performance!

You can set it up, during SparkSession creation:

val spark = SparkSession   .builder()   .appName("Spark SQL basic example")   .config("spark.sql.files.maxPartitionBytes", 100000)   .getOrCreate() 

All of above is only valid if (I remember correctly and) the csv is partitioned with the same number of files as there are partitions of DataFrame.

Answers 2

    val df = spark.range(10000000)     df.cache          val catalyst_plan = df.queryExecution.logical     val df_size_in_bytes = spark.sessionState.executePlan(catalyst_plan).optimizedPlan.stats.sizeInBytes 

df_size_in_bytes: BigInt = 80000000

The best solution would be take 100 records and estimate the size and apply for all the rows as the above example

Read More

Tuesday, September 4, 2018

Calling scala code in pyspark for XSLT transformations

Leave a Comment

This might be a long shot, but figured it couldn't hurt to ask. I'm attempting to use Elsevier's open-sourced spark-xml-utils package in pyspark to transform some XML records with XSLT.

I've had a bit of success with some exploratory code getting a transformation to work:

# open XSLT processor from spark's jvm context with open('/tmp/foo.xsl', 'r') as f:     proc = sc._jvm.com.elsevier.spark_xml_utils.xslt.XSLTProcessor.getInstance(f.read())   # transform XML record with 'proc'  with open('/tmp/bar.xml','r') as f:     transformed = proc.transform(f.read()) 

However, in a more realistic situation, I was unable to drop the proc.transform into a lambda map function, getting errors similar to:

"An error occurred while calling o55.getstate. Trace: py4j.Py4JException: Method getstate([]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:272) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748)"

When I got the small example to work on a single record, that was operating in a pyspark shell, which I'm assuming was using the spark driver. But in the map function mentioned above, this was in Spark, via Livy and YARN, which introduces workers. This SO question/answer suggests that perhaps I cannot use the function from the jvm in that context.

Now, the spark-xml-utils library provides some examples in scala, doing precisely what I'd like to do:

import com.elsevier.spark_xml_utils.xslt.XSLTProcessor  val xmlKeyPair = sc.sequenceFile[String, String]("s3n://spark-xml-utils/xml/part*")  val stylesheet = sc.textFile("s3n://spark-xml-utils/stylesheets/srctitle.xsl").collect.head  val srctitles = xmlKeyPair.mapPartitions(recsIter => {      val proc = XSLTProcessor.getInstance(stylesheet)     recsIter.map(rec => proc.transform(rec._2)) }) 

I'm wondering, how can I translate this to pyspark code, such that I could run it over an RDD? Ideally, on an RDD with the following input and output format:

id | document | other | columns ----------------------------------------------------- sprog | <xml here...> | more | data baz   | <xml here...> | more | data 

that could become

id | document | other | columns ----------------------------------------------------- sprog | <*transformed* xml here...> | more | data baz   | <*transformed* xml here...> | more | data 

Any help or suggestions would be most appreciated.

Update 8/28/2018: Also tried running through mapPartitions, no dice. Same error of __getstate__()

0 Answers

Read More

Sunday, August 26, 2018

WARN ShutdownHookManager: ShutdownHook '$anon$2' timeout,

Leave a Comment

I have spark job which is reading a file, connecting to DB server doing some calculations and then create a different .csv file.

Since yesterday its continue to fail with this error (even tho it creates an output file):

error WARN ShutdownHookManager: ShutdownHook '$anon$2' timeout

How to fix it?

I did search and found that somebody recommend to use 'explicitly invoke sparkContext.stop() before exiting the application'. but the problem if I am adding this like of code to my spark code I cant compile it, on compilation it throws me an error:

not found: value sparkContext [error] sparkContext.stop() [error] ^ [error] one error found [error] (compile:compileIncremental) Compilation failed

0 Answers

Read More

Friday, August 17, 2018

How does Apache spark handle python multithread issues?

Leave a Comment

According to python's GIL we cannot use threading in CPU bound processes so my question is how does Apache Spark utilize python in multi-core environment?

1 Answers

Answers 1

Multi-threading python issues are separated from Apache Spark internals. Parallelism on Spark is dealt with inside the JVM.

enter image description here

And the reason is that in the Python driver program, SparkContext uses Py4J to launch a JVM and create a JavaSparkContext.

Py4J is only used on the driver for local communication between the Python and Java SparkContext objects; large data transfers are performed through a different mechanism.

RDD transformations in Python are mapped to transformations on PythonRDD objects in Java. On remote worker machines, PythonRDD objects launch Python sub-processes and communicate with them using pipes, sending the user's code and the data to be processed.

PS: I'm not sure if this actually answers your question completely.

Read More

Tuesday, August 14, 2018

Dynamic Mapping Statement in Spark Scala

Leave a Comment

In my code I change a DF into an RDD to run a function through a map call. With each run there is a couple seconds of overhead so if I run the function call like this:

var iterator = 1  val inputDF = spark.sql("select * from DF")  var columnPosition = Array("Column_4") columnPosition = columnPosition ++ Array("Column_9") var selectedDF = inputDF var intrimDF = inputDF var finalDF = inputDF  val inputDF = spark.sql("select * from DF") while(iterator <= columnPosition.length) {   selectedDF=finalDF.selectExpr("foreign_key",columnPosition(iterator))   intrimDF = selectedDF.rdd.map(x => (x(0),action(x(1)))).toDF.selectExpr("_1 as some_key","_2 as " columnPosition(iterator)).joinBack.changeColumnPositions.dropOriginalColumn.renameColumns    finalDF=intrimDF   iterator = iterator + 1 } 

It runs in ~90 seconds for a large job, because of the join. What I am trying to do is build it like below to cut out the join entirely and have be dynamic.

val inputDF = spark.sql("select * from DF") val intrimDF = inputDF.rdd.map(x=>(x(0),x(1),x(2),action(x(3)),x(4),x(5),x(6),x(7),action(x(8)))) val columnStatement//Create an Array with columnName changes val finalDF = intrimDF.selectExpr(columnStatement :_*) 

The issue is I cant get past the hard coding side the problem, below is an example of what I want to try to do by dynamically setting the mapping call.

val mappingStatement = "x=>(x(0),x(1),x(2),action(x(3)),x(4),x(5),x(6),x(7),action(x(8)))" val intrimDF = inputDF.rdd.map(mappingStatement) 

Everything I have tried failed: 1 Calling using the Map() function 2 Setting an Array and passing it as :_* 3 Trying to build it as an calling, but it doesnt like being dyanmic

Hope this makes sense!

0 Answers

Read More

Thursday, August 2, 2018

Distributed Computation for large data-data processing

Leave a Comment

I have a huge time series data and I want to do data processing using spark`s parallel processing/distributed computation. The requirement is looking at the data row by row to determine the groups as specified below under desired result sections, I can't really get spark to distribute this without some kind of coordination between the executors

t- timeseries datetime sample, lat-latitude, long-longitude 


For instance : Taking a small part of sample data-set for explaining the case

t   lat long 0   27  28 5   27  28 10  27  28 15  29  49 20  29  49 25  27  28 30  27  28  

Desired Output should be :

Lat-long    interval (27,28) (0,10) (29,49) (15,20) (27,28) (25,30) 

I am able to get the desired result using this piece of code

val spark = SparkSession.builder().master("local").getOrCreate()  import spark.implicits._   val df = Seq(   (0, 27,28),   (5, 27,28),   (10, 27,28),   (15, 26,49),   (20, 26,49),   (25, 27,28),   (30, 27,28) ).toDF("t", "lat","long")  val dfGrouped = df .withColumn("lat-long", struct($"lat", $"long"))  val wAll = Window.partitionBy().orderBy($"t".asc)  dfGrouped.withColumn("lag", lag("lat-long", 1, null).over(wAll)) .orderBy(asc("t")).withColumn("detector", when($"lat-long" === $"lag", 0)     .otherwise(1)).withColumn("runningTotal", sum("detector").over(wAll)) .groupBy("runningTotal", "lat-long").agg(struct(min("t"), max("t")).as("interval")) .drop("runningTotal").show } 

But what If the data gets into two executors then the data will be like

Data in executor 1 :

t   lat long 0   27  28 5   27  28 10  27  28 15  29  49 20  29  49 25  27  28 

Data in executor 2 :

t   lat long 30   27  28 


How should I get the desired output for large amount of data.There must be smarter ways to do this ,distributing this with some kind of coordination between the executors so as to get that result.

Please guide me through a right direction,I have researched about the same but not being able to land up to a solution.

PS: This just a sample example.

1 Answers

Answers 1

You can address this with a UDAF. First of all, you could add one column thats represent the t column partitioned in a number of executor you have. Something like executorIndex = t % ((max(t) - min(t)) / numExecutors).

Then you can apply your UDAF grouping by executorIndex.

Your UDAF need store a Map with a String key (for example) thats represents one lat and long pair, and a int[] thats represents the maxT and the minT for this lat-long key.

Please ask if you need more extensive explanation.

Hope this help...

PS: I'm suming that there are some time relation between same lat and long, something normal if your are tracking some movement...

Read More

Thursday, July 19, 2018

Sparksql query an array<string> contains a string,use predicate pushdown

Leave a Comment

I'm trying to query an array in ElasticSearch

data: "names":[{"name":"allen"},{"name":"bill"},{"name":"dave"},{"name":"poter"}] goal: "select names from table where array_contains(names.name, "bill")" 

but spark won't do predicate pushdown if SQL statement use array_contains function.
hint: names.name = ["allen","bill","dave","poter"]
I've tried

select * from table where array_contains(names.name,"bill")  -- and   select explode(names.name) as name from table as t1;select * from t1 where name = "bill"  -- and   select * from table where cast(names.name as string) like '%bill%' 

All failed to do pushdown, any other ways to do it?

1 Answers

Answers 1

The failure to pushdown is expected. For predicate to be delegated you need a Data Source support, and ElasticSearch connector doesn't list array_contains among pushed operations, which as today include:

  • =, => , <, >= , <=
  • is_null / is_not_null
  • in
  • String[Starts|Ends]With, StringContains
  • NULL safe equality.
  • Application of Boolean operators AND / OR / NOT.

Also any additional transformations (including CAST) disable predicate pushdown.

Read More

Monday, July 16, 2018

Spark dataframe saveAsTable is using a single task

Leave a Comment

We have a pipeline for which the initial stages are properly scalable - using several dozen workers apiece.

One of the last stages is

dataFrame.write.format(outFormat).mode(saveMode). partitionBy(partColVals.map(_._1): _*).saveAsTable(tname) 

For this stage we end up with a single worker. This clearly does not work for us - in fact the worker runs out of disk space - on top of being very slow.

enter image description here

Why would that command end up running on a single worker/single task only?

Update The output format was parquet. The number of partition columns did not affect the result (tried one column as well as several columns).

Another update None of the following conditions (as posited by an answer below) held:

  • coalesce or partitionBy statements
  • window / analytic functions
  • Dataset.limit
  • sql.shuffle.partitions

1 Answers

Answers 1

The problem is unlikely to be related in any way to saveAsTable.

A single task in a stage indicates that the input data (Dataset or RDD) has only a one partition. This is contrast to cases where there are multiple tasks but one or more have significantly higher execution time, which normally correspond to partitions containing positively skewed keys. Also you should confound a single task scenario with low CPU utilization. The former is usually a result of insufficient IO throughput (high CPU wait times are the most obvious indication of that), but in rare cases can be traced to usage of shared objects with low level synchronization primitives.

Since standard data sources don't shuffle data on write (including cases where partitionBy and bucketBy options are used) it is safe to assume that data has been repartitioned somewhere in the upstream code. Usually it means that one of the following happened:

  • Data has been explicitly moved to a single partition using coalesce(1) or repartition(1).
  • Data has been implicitly moved to a single partition for example with:

    • Dataset.limit
    • Window function applications with window definition lacking PARTITION BY clause.

      df.withColumn(   "row_number",    row_number().over(Window.orderBy("some_column")) ) 
    • sql.shuffle.partitions option is set to 1 and upstream code includes non-local operation on a Dataset.

    • Dataset is a result of applying a global aggregate function (without GROUP BY caluse). This usually not an issue, unless function is non-reducing (collect_list or comparable).

While there is no evidence that it is the problem here, in general case you should also possibility, data contains only a single partition all the way to the source. This usually when input is fetched using JDBC source, but the 3rd party formats can exhibit the same behavior.

To identify the source of the problem you should either check the execution plan for the input Dataset (explain(true)) or check SQL tab of the Spark Web UI.

Read More

Thursday, July 12, 2018

How to avoid generating crc files and SUCCESS files while saving a DataFrame?

Leave a Comment

I am using the following code to save a spark DataFrame to JSON file

unzipJSON.write.mode("append").json("/home/eranw/Workspace/JSON/output/unCompressedJson.json") 

the output result is:

part-r-00000-704b5725-15ea-4705-b347-285a4b0e7fd8 .part-r-00000-704b5725-15ea-4705-b347-285a4b0e7fd8.crc part-r-00001-704b5725-15ea-4705-b347-285a4b0e7fd8 .part-r-00001-704b5725-15ea-4705-b347-285a4b0e7fd8.crc _SUCCESS ._SUCCESS.crc 
  1. How do I generate a single JSON file and not a file per line?
  2. How can I avoid the *crc files?
  3. How can I avoid the SUCCESS file?

1 Answers

Answers 1

If you want a single file, you need to do a coalesce to a single partition before calling write, so:

unzipJSON.coalesce(1).write.mode("append").json("/home/eranw/Workspace/JSON/output/unCompressedJson.json") 

Personally, I find it rather annoying that the number of output files depend on number of partitions you have before calling write - especially if you do a write with a partitionBy - but as far as I know, there are currently no other way.

I don't know if there is a way to disable the .crc files - I don't know of one - but you can disable the _SUCCESS file by setting the following on the hadoop configuration of the Spark context.

sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") 

Note, that you may also want to disable generation of the metadata files with:

sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false") 

Apparently, generating the metadata files takes some time (see this blog post) but aren't actually that important (according to this). Personally, I always disable them and I have had no issues.

Read More

Sunday, July 1, 2018

Spark Streaming Exception: java.util.NoSuchElementException: None.get

Leave a Comment

I am writing SparkStreaming data to HDFS by converting it to a dataframe:

Code

object KafkaSparkHdfs {    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkKafka")   sparkConf.set("spark.driver.allowMultipleContexts", "true");   val sc = new SparkContext(sparkConf)    def main(args: Array[String]): Unit = {     val sqlContext = new org.apache.spark.sql.SQLContext(sc)     import sqlContext.implicits._      val ssc = new StreamingContext(sparkConf, Seconds(20))      val kafkaParams = Map[String, Object](       "bootstrap.servers" -> "localhost:9092",       "key.deserializer" -> classOf[StringDeserializer],       "value.deserializer" -> classOf[StringDeserializer],       "group.id" -> "stream3",       "auto.offset.reset" -> "latest",       "enable.auto.commit" -> (false: java.lang.Boolean)     )      val topics = Array("fridaydata")     val stream = KafkaUtils.createDirectStream[String, String](       ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams)     )      val lines = stream.map(consumerRecord => consumerRecord.value)     val words = lines.flatMap(_.split(" "))     val wordMap = words.map(word => (word, 1))     val wordCount = wordMap.reduceByKey(_ + _)      wordCount.foreachRDD(rdd => {       val dataframe = rdd.toDF();        dataframe.write         .mode(SaveMode.Append)         .save("hdfs://localhost:9000/newfile24")          })      ssc.start()     ssc.awaitTermination()   } } 

The folder is created but the file is not written.

The program is getting terminated with the following error:

    18/06/22 16:14:41 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)     java.util.NoSuchElementException: None.get     at scala.None$.get(Option.scala:347)     at scala.None$.get(Option.scala:345)     at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)     at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)     at java.lang.Thread.run(Thread.java:748)     18/06/22 16:14:41 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.util.NoSuchElementException: None.get     at scala.None$.get(Option.scala:347)     at scala.None$.get(Option.scala:345)     at org.apache.spark.storage.BlockInfoManager.releaseAllLocksForTask(BlockInfoManager.scala:343)     at org.apache.spark.storage.BlockManager.releaseAllLocksForTask(BlockManager.scala:670)     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:289)     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 

In my pom I am using respective dependencies:

  • spark-core_2.11
  • spark-sql_2.11
  • spark-streaming_2.11
  • spark-streaming-kafka-0-10_2.11

2 Answers

Answers 1

The error is due to trying to run multiple spark contexts at teh same time. Setting allowMultipleContexts to true is mostly used for testing purposes and it's use is discouraged. The solution to your problem is therefore to make sure that the same SparkContext is used everywhere. From the code we can see that the SparkContext (sc) is used to create a SQLContext which is fine. However, when creating the StreamingContext it is not used, instead the SparkConf is used.

By looking at the documentation we see:

Create a StreamingContext by providing the configuration necessary for a new SparkContext

In other words, by using SparkConf as parameter a new SparkContext will be created. Now there are two separate contexts.

The easiest solution here would be to continue using the same context as before. Change the line creating the StreamingContext to:

val ssc = new StreamingContext(sc, Seconds(20)) 

Note: In newer versions of Spark (2.0+) use SparkSession instead. A new streaming context can then be created using StreamingContext(spark.sparkContext, ...). It can look as follows:

val spark = SparkSession().builder   .setMaster("local[*]")   .setAppName("SparkKafka")   .getOrCreate()  import sqlContext.implicits._ val ssc = new StreamingContext(spark.sparkContext, Seconds(20)) 

Answers 2

There is an obvious problem here - coalesce(1).

dataframe.coalesce(1) 

While reducing number of files might be tempting in many scenarios, it should be done if and only if it amount of data is low enough for nodes to handle (clearly it isn't here).

Furthermore, let me quote the documentation:

However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can call repartition. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).

The conclusion is you should adjust the parameter accordingly to the expected amount of data and desired parallelism. coalesce(1) as such is rarely useful in practice, especially in a context like streaming, where data properties can differ over time.

Read More

Thursday, May 31, 2018

Pyspark: spark-submit not working like CLI

Leave a Comment

I have a pyspark to load data from a TSV file and save it as parquet file as well save it as a persistent SQL table.

When I run it line by line through pyspark CLI, it works exactly like expected. When I run it as as an application using spark-submit it runs without any errors but I get strange results: 1. the data is overwritten instead of appended. 2. When I run SQL queries against it I get no data returned even though the parquet files are several gigabytes in size (what I expect). Any suggestions?

Code:

from pyspark import SparkContext, SparkConf from pyspark.sql.types import * from pyspark.sql.functions import *  csv_file = '/srv/spark/data/input/ipfixminute2018-03-28.tsv' parquet_dir = '/srv/spark/data/parquet/ipfixminute'  sc = SparkContext(appName='import-ipfixminute') spark = SQLContext(sc)  fields = [StructField('time_stamp', TimestampType(), True),                 StructField('subscriberId', StringType(), True),                 StructField('sourceIPv4Address', StringType(), True),                 StructField('destinationIPv4Address', StringType(), True),                 StructField('service',StringType(), True),                 StructField('baseService',StringType(), True),                 StructField('serverHostname', StringType(), True),                 StructField('rat', StringType(), True),                 StructField('userAgent', StringType(), True),                 StructField('accessPoint', StringType(), True),                 StructField('station', StringType(), True),                 StructField('device', StringType(), True),                 StructField('contentCategories', StringType(), True),                 StructField('incomingOctets', LongType(), True),                 StructField('outgoingOctets', LongType(), True),                 StructField('incomingShapingDrops', IntegerType(), True),                 StructField('outgoingShapingDrops', IntegerType(), True),                 StructField('qoeIncomingInternal', DoubleType(), True),                 StructField('qoeIncomingExternal', DoubleType(), True),                 StructField('qoeOutgoingInternal', DoubleType(), True),                 StructField('qoeOutgoingExternal', DoubleType(), True),                 StructField('incomingShapingLatency', DoubleType(), True),                 StructField('outgoingShapingLatency', DoubleType(), True),                 StructField('internalRtt', DoubleType(), True),                 StructField('externalRtt', DoubleType(), True),                 StructField('HttpUrl',StringType(), True)]  schema = StructType(fields) df = spark.read.load(csv_file, format='csv',sep='\t',header=True,schema=schema,timestampFormat='yyyy-MM-dd HH:mm:ss') df = df.drop('all') df = df.withColumn('date',to_date('time_stamp')) df.write.saveAsTable('test2',mode='append',partitionBy='date',path=parquet_dir) 

1 Answers

Answers 1

As @user8371915 suggested it is similar to this:

Spark can access Hive table from pyspark but not from spark-submit

I needed to replace

from pyspark.sql import SQLContext  sqlContext = SQLContext(sc) 

with

from pyspark.sql import HiveContext  sqlContext = HiveContext(sc) 

This resolved this issue.

Read More

Saturday, May 26, 2018

Spark Driver memory and Application Master memory

Leave a Comment

Am I understanding the documentation for client mode correctly?

  1. client mode is opposed to cluster mode where the driver runs within the application master?
  2. In client mode the driver and application master are separate processes and therefore spark.driver.memory + spark.yarn.am.memory must be less than the machine's memory?
  3. In client mode is the driver memory is not included in the application master memory setting?

2 Answers

Answers 1

client mode is opposed to cluster mode where the driver runs within the application master?

Yes, When Spark application deployed over YARN in

  • Client mode, driver will be running in the machine where application got submitted and the machine has to be available in the network till the application completes.
  • Cluster mode, driver will be running in application master(one per spark application) node and machine submitting the application need not to be in network after submission

Client mode

Client mode

Cluster mode

Cluster mode

If Spark spark application submitted with cluster mode on it's own resource manager(standalone) then driver will be in one of the worker node.

References for images and content:

In client mode the driver and application master are separate processes and therefore spark.driver.memory + spark.yarn.am.memory must be less than the machine's memory?

No, In client mode, driver and AM are separate processes and exists in different machines, so memory need not to be combined but spark.yarn.am.memory + some overhead should be less then YARN container memory(yarn.nodemanager.resource.memory-mb). If it exceeds YARN's Resource Manager will kill the container.

In client mode is the driver memory is not included in the application master memory setting?

Here spark.driver.memory must be lass then the available memory in the machine from where the spark application is going to launch.

But, In cluster mode use spark.driver.memory instead of spark.yarn.am.memory.

spark.yarn.am.memory : 512m (default)

Amount of memory to use for the YARN Application Master in client mode, in the same format as JVM memory strings (e.g. 512m, 2g). In cluster mode, use spark.driver.memory instead. Use lower-case suffixes, e.g. k, m, g, t, and p, for kibi-, mebi-, gibi-, tebi-, and pebibytes, respectively.

Check more about these properties here

Answers 2

In client mode, the driver is launched directly within the spark-submit i.e client program. The application master to be created in any one of node in cluster. The spark.driver.memory (+ memory overhead) to be less than machine's memory.

In cluster mode, driver is running inside the application master in any of node in the cluster.

https://blog.cloudera.com/blog/2014/05/apache-spark-resource-management-and-yarn-app-models/

Read More

Friday, May 18, 2018

Add PySpark RDD as new column to pyspark.sql.dataframe

Leave a Comment

I have a pyspark.sql.dataframe where each row is a news article. I then have a RDD that represents the words contained in each article. I want to add the RDD of words as a column named 'words' to my dataframe of new articles. I tried

df.withColumn('words', words_rdd ) 

but I get the error

AssertionError: col should be Column 

The DataFrame looks something like this

Articles the cat and dog ran we went to the park today it will rain 

but I have 3k news articles.

I applied a function to clean the text such as remove stop words and I have a RDD that looks like this:

[[cat, dog, ran],[we, went, park],[today, will, rain]] 

I'm trying to get my Dataframe to look like this:

Articles                 Words the cat and dog ran      [cat, dog, ran] we went to the park      [we, went, park] today it will rain       [today, will, rain] 

4 Answers

Answers 1

Disclaimer:

Spark DataFrame in general has no strictly defined order. Use at your own risk.

Add index to existing DataFrame:

from pyspark.sql.types import *  df_index = spark.createDataFrame(     df.rdd.zipWithIndex(),     StructType([StructField("data", df.schema), StructField("id", LongType())]) ) 

Add index to RDD and convert to DataFrame:

words_df = spark.createDataFrame(     words_rdd.zipWithIndex(),     StructType([         StructField("words", ArrayType(StringType())),         StructField("id", LongType())     ]) ) 

Join both and select required fields:

df_index.join(words_df, "id").select("data.*", "words") 

Caution

There are different solutions, which might work in specific cases, but don't guarantee performance and or correctness. These include:

  • Using monotonically_increasing_id as a join key - in general case not correct.
  • Using row_number() window function as a join key - unacceptable performance implication and in general not correct if there is no specific order defined.
  • Using zip on RDDs - can work if and only if both structures have the same data distribution (should work in this case).

Note:

In this specific case you shouldn't need RDD. pyspark.ml.feature provides a variety of Transformers, which should work well for you.

from pyspark.ml.feature import * from pyspark.ml import Pipeline  df = spark.createDataFrame(      ["the cat and dog ran", "we went to the park", "today it will rain"],          "string" ).toDF("Articles")  Pipeline(stages=[     RegexTokenizer(inputCol="Articles", outputCol="Tokens"),      StopWordsRemover(inputCol="Tokens", outputCol="Words") ]).fit(df).transform(df).show() # +-------------------+--------------------+---------------+ # |           Articles|              Tokens|          Words| # +-------------------+--------------------+---------------+ # |the cat and dog ran|[the, cat, and, d...|[cat, dog, ran]| # |we went to the park|[we, went, to, th...|   [went, park]| # | today it will rain|[today, it, will,...|  [today, rain]| # +-------------------+--------------------+---------------+ 

The list of stop words can be provided using stopWords parameter of the StopWordsRemover, for example:

StopWordsRemover(     inputCol="Tokens",     outputCol="Words",     stopWords=["the", "and", "we", "to", "it"] ) 

Answers 2

Why do you want to join the rdd back to the dataframe, I would rather create a new column from "Articles" directly. There are multiple ways to do it, here are my 5 cents:

from pyspark.sql import Row from pyspark.sql.context import SQLContext sqlCtx = SQLContext(sc)    # sc is the sparkcontext  x = [Row(Articles='the cat and dog ran'),Row(Articles='we went to the park'),Row(Articles='today it will rain')] df = sqlCtx.createDataFrame(x)  df2 = df.map(lambda x:tuple([x.Articles,x.Articles.split(' ')])).toDF(['Articles','words']) df2.show() 

You get the following output:

Articles                 words the cat and dog ran      [the, cat, and, dog, ran] we went to the park      [we, went, to, the, park] today it will rain       [today, it, will, rain] 

Let me know if you were looking to achieve something else.

Answers 3

A simple approach but effective would be to use udf. You can:

from pyspark.sql.functions import udf from pyspark.sql.types import StringType  df = spark.createDataFrame(["the cat and dog ran", "we went to the park", "today it will rain", None],  "string" ).toDF("Articles")  split_words = udf(lambda x : x.split(' ') if x is not None else x, StringType()) df = df.withColumn('Words', split_words(df['Articles']))  df.show(10,False) >> +-------------------+-------------------------+ |Articles           |Words                    | +-------------------+-------------------------+ |the cat and dog ran|[the, cat, and, dog, ran]| |we went to the park|[we, went, to, the, park]| |today it will rain |[today, it, will, rain]  | |null               |null                     | +-------------------+-------------------------+ 

I added check for None because it very usual to have in your data bad lines. You can drop them easily after splitting or before ,with dropna.

But in my opinion if you want to do this as a preparation task for Text analytics it would be probably to your best interest to build a Pipeline as @user9613318 suggests in his answer

Answers 4

rdd1 = spark.sparkContext.parallelize([1, 2, 3, 5]) # make some transformation on rdd1: rdd2 = rdd.map(lambda n: True if n % 2 else False) # Append each row in rdd2 to those in rdd1. rdd1.zip(rdd2).collect() 
Read More

Wednesday, May 16, 2018

How can I get Apache Spark job progress without knowing the job id?

Leave a Comment

I have the following as an example:

val sc: SparkContext // An existing SparkContext. val sqlContext = new org.apache.spark.sql.SQLContext(sc)  val df = sqlContext.read.json("examples/src/main/resources/people.json")  df.count 

I'm aware that I can monitor the jobs using a SparkListener using the spark context; however, that gives me events on all jobs (which I can't use as I don't know the job id).

How can I get the progress of the "count" action only?

1 Answers

Answers 1

As already suggested in the comments, one could use REST API of the Spark UI to collect the required numbers.

The main problem is to identify the stages you are interessted in. There is no 1:1 mapping from code to the stages. A single count for example will trigger two stages (one stage for counting the elements in each partition of the dataframe and a second stage for summing up the results of the first stage). Stages get usually the name of the action that triggered their execution, although this may be changed within the code.

One can create a method that queries the REST API for all stages with a certain name and then add up the number of all tasks for these stages as well as the number of completed tasks. Assuming that all tasks will approximately take a similar execution time (this assumption is false if the dataset has a skew partition) one can use the share of completed tasks as a measurement of the job progress.

def countTasks(sparkUiUrl: String, stageName: String): (Int, Int) = {   import scala.util.parsing.json._   import scala.collection.mutable.ListBuffer   def get(url: String) = scala.io.Source.fromURL(url).mkString    //get the ids of all running applications and collect them in a ListBuffer   val applications = JSON.parseFull(get(sparkUiUrl + "/api/v1/applications?staus=running"))   val apps: ListBuffer[String] = new scala.collection.mutable.ListBuffer[String]   applications match {     case Some(l: List[Map[String, String]]) => l.foreach(apps += _ ("id"))     case other => println("Unknown data structure while reading applications: " + other)   }    var countTasks: Int = 0;   var countCompletedTasks: Int = 0;    //get the stages for each application and sum up the number of tasks for each stage with the requested name   apps.foreach(app => {     val stages = JSON.parseFull(get(sparkUiUrl + "/api/v1/applications/" + app + "/stages"))     stages match {       case Some(l: List[Map[String, Any]]) => l.foreach(m => {         if (m("name") == stageName) {           countTasks += m("numTasks").asInstanceOf[Double].toInt           countCompletedTasks += m("numCompleteTasks").asInstanceOf[Double].toInt         }       })       case other => println("Unknown data structure while reading stages: " + other)     }   })    //println(countCompletedTasks + " of " + countTasks + " tasks completed")   (countTasks, countCompletedTasks) } 

Calling this function for the given count example

println(countTasks("http://localhost:4040", "show at CountExample.scala:16")) 

will print out two numbers: the first one will be number of all tasks and the second one will be the number of finished tasks.

I have tested this code with Spark 2.3.0. Before using it in a production environment, it will surely need some additional polishing, especially some more sophisticated error checking. The statistic could be improved by not only counting the finished tasks but also the failed ones.

Read More

Tuesday, May 15, 2018

Spark error with google/guava library: java.lang.NoSuchMethodError: com.google.common.cache.CacheBuilder.refreshAfterWrite

Leave a Comment

I have a simple spark project - in which in the pom.xml the dependencies are only the basic scala, scalatest/junit, and spark:

    <dependency>         <groupId>net.alchim31.maven</groupId>         <artifactId>scala-maven-plugin</artifactId>         <version>3.2.0</version>     </dependency>     <dependency>         <groupId>org.scala-lang</groupId>         <artifactId>scala-library</artifactId>         <version>${scala.version}</version>     </dependency>     <dependency>         <groupId>org.scala-lang</groupId>         <artifactId>scala-compiler</artifactId>         <version>${scala.version}</version>     </dependency>     <dependency>         <groupId>junit</groupId>         <artifactId>junit</artifactId>         <version>4.11</version>         <scope>test</scope>     </dependency>     <dependency>         <groupId>org.scalatest</groupId>         <artifactId>scalatest_${scala.binary.version}</artifactId>         <version>3.0.1</version>         <scope>test</scope>     </dependency>     <dependency>         <groupId>org.apache.spark</groupId>         <artifactId>spark-core_${scala.binary.version}</artifactId>         <version>${spark.version}</version>         <scope>compile</scope>     </dependency>     <dependency>         <groupId>org.apache.spark</groupId>         <artifactId>spark-mllib_${scala.binary.version}</artifactId>         <version>${spark.version}</version>         <scope>compile</scope>     </dependency> </dependencies> 

When attempting to run a basic spark program the SparkSession init fails on this line:

 SparkSession.builder.master(master).appName("sparkApp").getOrCreate 

Here is the output / error:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 18/04/07 18:06:15 INFO SparkContext: Running Spark version 2.2.1 Exception in thread "main" java.lang.NoSuchMethodError: com.google.common.cache.CacheBuilder .refreshAfterWrite(JLjava/util/concurrent/TimeUnit;) Lcom/google/common/cache/CacheBuilder;     at org.apache.hadoop.security.Groups.<init>(Groups.java:96)     at org.apache.hadoop.security.Groups.<init>(Groups.java:73)  at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:293) at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:283) at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:260) at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:789) at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:774) at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:647) at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2424) at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2424) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2424) at org.apache.spark.SparkContext.<init>(SparkContext.scala:295) at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2516) at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:918) at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:910) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:910) 

I have run spark locally many dozens of times on other projects, what might be wrong with this simple one? Is there a dependency on $HADOOP_HOME environment variable or similar?

Update By downgrading the spark version to 2.0.1 I was able to compile. That does not fix the problem (we need newer) version. But it helps point out the source of the problem

Another update In a different project the hack to downgrade to 2.0.1 does help - i.e. execution proceeds further : but then when writing out to parquet a similar exception does happen.

8/05/07 11:26:11 ERROR Executor: Exception in task 0.0 in stage 2741.0 (TID 2618) java.lang.NoSuchMethodError: com.google.common.cache.CacheBuilder.build(Lcom/google/common/cache/CacheLoader;)Lcom/google/common/cache/LoadingCache;     at org.apache.hadoop.io.compress.CodecPool.createCache(CodecPool.java:62)     at org.apache.hadoop.io.compress.CodecPool.<clinit>(CodecPool.java:74)     at org.apache.parquet.hadoop.CodecFactory$BytesCompressor.<init>(CodecFactory.java:92)     at org.apache.parquet.hadoop.CodecFactory.getCompressor(CodecFactory.java:169)     at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:303)     at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:262)     at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetFileFormat.scala:562)     at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:139)     at org.apache.spark.sql.execution.datasources.BaseWriterContainer.newOutputWriter(WriterContainer.scala:131)     at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:247)     at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)     at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143)     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)     at org.apache.spark.scheduler.Task.run(Task.scala:86)     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 

1 Answers

Answers 1

This error occurs due to version mismatch between Google's guava library and Spark. Spark shades guava but many libraries use guava. You can try Shading the Guava dependencies as per this post. Apache-Spark-User-List

Read More

Friday, May 4, 2018

PySpark.sql.filter not performing as it should

Leave a Comment

I am running into the problem when executing below codes:

from pyspark.sql import functions as F from pyspark.sql import Row, HiveContext  hc = HiveContext() rows1 = [Row(id1 = '2', id2 = '1', id3 = 'a'),          Row(id1 = '3', id2 = '2', id3 = 'a'),          Row(id1 = '4', id2 = '3', id3 = 'b')] df1 = hc.createDataFrame(rows1) df2 = df1.filter(F.col("id3")=="a") df3 = df1.join(df2, df1.id2 == df2.id1, "inner") 

When I run above code, df3 is an empty DataFrame. However: If I change the code to below, it is giving the correct result (DataFrame of 2 rows):

from pyspark.sql import functions as F from pyspark.sql import Row, HiveContext  hc = HiveContext() rows1 = [Row(id1 = '2', id2 = '1', id3 = 'a'),          Row(id1 = '3', id2 = '2', id3 = 'a'),          Row(id1 = '4', id2 = '3', id3 = 'b')] df1 = hc.createDataFrame(rows1) rows2 = [Row(id1 = '2', id2 = '1', id3 = 'a'),          Row(id1 = '3', id2 = '2', id3 = 'a'),          Row(id1 = '4', id2 = '3', id3 = 'b')] df1_temp = hc.createDataFrame(rows2) df2 = df1_temp.filter(F.col("id3")=="a") df3 = df1.join(df2, df1.id2 == df2.id1, "inner") 

So My question is: why do I have to create a temp dataframe here? Also, if I can't get the HiveContext in my part of the project, how can I make a duplicate dataframe on top of the existing dataframe?

2 Answers

Answers 1

I believe that the problem you've hit here is an instance of a more general issue where certain types of DataFrame self-joins (including joins of a DataFrame against filtered copies of itself) can result in the generation of ambiguous or incorrect query plans.

There are several Spark JIRAs related to this; here are some notable ones:

There are other JIRA tickets dealing with different manifestations / aspects of these problems. Those tickets are discoverable by following chains of JIRA "relates to" links starting from the tickets listed above.

This ambiguity only crops up when referencing columns via the DataFrame instance (via subscripting, as in df["mycol"], or via field accesses, as in df.mycol). This ambiguity can be avoided by aliasing DataFrames and referring to columns via the aliases. For example, the following works correctly:

>>> from pyspark.sql import functions as F >>> df1 = hc.createDataFrame(rows1).alias("df1") >>> df2 = df1.filter(F.col("id3")=="a").alias("df2") >>> df3 = df1.join(df2, F.col("df1.id2") == F.col("df2.id1"), "inner") >>> df3.show() +---+---+---+---+---+---+ |id1|id2|id3|id1|id2|id3| +---+---+---+---+---+---+ |  4|  3|  b|  3|  2|  a| |  3|  2|  a|  2|  1|  a| +---+---+---+---+---+---+ 

Answers 2

I see the same behavior with this data set in Spark 2.0, but not always for the same operation. A slightly different data frame works fine.

df1 = spark.createDataFrame(     [(1, 2, 'a'), (2, 2, 'a'), (3, 4, 'b')], ['id1', 'id2', 'id3']     ) df1.show()  +---+---+---+ |id1|id2|id3| +---+---+---+ |  1|  2|  a| |  2|  2|  a| |  3|  4|  b| +---+---+---+  df2 = df1.filter(df1.id3 == 'a') df2.show()  +---+---+---+ |id1|id2|id3| +---+---+---+ |  1|  2|  a| |  2|  2|  a| +---+---+---+   df3 = df1.join(df2, df1.id2 == df2.id1, 'inner') df3.show()  +---+---+---+---+---+---+ |id1|id2|id3|id1|id2|id3| +---+---+---+---+---+---+ |  2|  2|  a|  1|  2|  a| |  2|  2|  a|  2|  2|  a| +---+---+---+---+---+---+ 

There must be a bug? I have not tried later versions of spark though. You may want to report this as a bug.

Read More

Thursday, April 12, 2018

Spark dataframe groupby mean and median does not complete

Leave a Comment

I am using Spark sql dataframes to perform a groupby operation and then compute the mean and median of data for each group. The original amount of data is about 1 terabyte.

val df_result = df.filter($"DayOfWeek" <= 5).groupBy("id").agg(         count("Error").as("Count"),          avg("Error").as("MeanError"),          callUDF("percentile_approx", col("Error"), lit(0.05)).as("5thError"),          callUDF("percentile_approx", col("Error"), lit(0.5)).as("MedianError"),          callUDF("percentile_approx", col("Error"), lit(0.95)).as("95thError")).     filter($"Count" > 1000)   df_result.orderBy(asc("MeanError")).limit(5000)     .write.format("csv").option("header", "true").save("/user/foo.bar/result.csv") 

When I run that query, my job gets stuck and does not complete. How do I go about debugging the problem? Is there a key imbalance that causes the groupby() to get stuck?

1 Answers

Answers 1

There are lots of sensible suggestions already in the comments, but for what it's worth here are my thoughts:

1) Does df.count work? If not, your problem is before the code you've posted (as suggested in comments)

2) Look in the Spark UI (as suggested in comments) - do most tasks complete quickly with a few taking a long while/appearing stuck? If so, skew is likely to be your problem

3) You could potentially rewrite your query to first only find the 'count' per 'id'. Next filter your original df to contain only rows where the id appears more than 1000 times through a broadcasted (to avoid shuffle of df) inner join (if there aren't too many ids with more than 1000 occurrences). Then aggregate this smaller dataframe and calculate all your statistics. If the count aggregation works, the output should also show if there's any significant data skew!

4) Sometimes breaking the computation up into even smaller steps and writing and then immediately reading from disk has helped me get awkward jobs to complete in the past. Also can make debugging quicker if generating df is costly in the first instance.

5) Definitely worth upping spark.sql.shuffle.partitions (as suggested in comments); 2001 is a magic number in spark (What should be the optimal value for spark.sql.shuffle.partitions or how do we increase partitions when using Spark SQL?)

6) I would also try varying the amount of data, does it work if you use only use day of week = 1 (as suggested in comments)

7) Does the query run without the percentile_approx?

Read More