Showing posts with label scala. Show all posts
Showing posts with label scala. Show all posts

Thursday, September 6, 2018

Limiting maximum size of dataframe partition

Leave a Comment

When I write out a dataframe to, say, csv, a .csv file is created for each partition. Suppose I want to limit the max size of each file to, say, 1 MB. I could do the write multiple times and increase the argument to repartition each time. Is there a way I can calculate ahead of time what argument to use for repartition to ensure the max size of each file is less than some specified size.

I imagine there might be pathological cases where all the data ends up on one partition. So make the weaker assumption that we only want to ensure that the average file size is less than some specified amount, say 1 MB.

2 Answers

Answers 1

1. Single dataframe solution

I was trying to find out some clever idea that would not kill the cluster at the same time and the only thing that came to my mind was:

  1. Calculate the size of the serialized row
  2. Get no. of rows in your DataFrame
  3. Repartition, by dividing with the expected size
  4. Should work?

The code should look more like this:

val df: DataFrame = ??? // your df val rowSize = getBytes(df.head) val rowCount = df.count() val partitionSize = 1000000 // million bytes in MB? val noPartitions: Int = (rowSize * rowCount / partitionSize).toInt df.repartition(noPartitions).write.format(...) // save to csv  // just helper function from https://stackoverflow.com/a/39371571/1549135 def getBytes(value: Any): Long = {   val stream: ByteArrayOutputStream = new ByteArrayOutputStream()   val oos = new ObjectOutputStream(stream)   oos.writeObject(value)   oos.close   stream.toByteArray.length } 

While my first choice was to calculate each row's byte size, that would be terribly inefficient. So, unless your data size in each row differs in size greatly, I would say that this solution will work. You can also calculate every n-th row size. You got the idea.

Also, I just 'hope' that Long will be big enough to support the expected size to calculate noPartitions. If not (if you have a lot of rows), maybe it would be better to change the operations order, f.e.:

val noPartitions: Int = (rowSize / partitionSize * rowCount).toInt 

again this is just a drafted idea with no domain knowledge about your data.

2. Cross system solution

While going through the apache-spark docs I have found an interesting cross-system solution:

spark.sql.files.maxPartitionBytes which sets:

The maximum number of bytes to pack into a single partition when reading files.

The default value is 134217728 (128 MB).

So I suppose you could set it to 1000000 (1MB) and it will have a permanent effect on your DataFrames. However, too small partition size may greatly impact your performance!

You can set it up, during SparkSession creation:

val spark = SparkSession   .builder()   .appName("Spark SQL basic example")   .config("spark.sql.files.maxPartitionBytes", 100000)   .getOrCreate() 

All of above is only valid if (I remember correctly and) the csv is partitioned with the same number of files as there are partitions of DataFrame.

Answers 2

    val df = spark.range(10000000)     df.cache          val catalyst_plan = df.queryExecution.logical     val df_size_in_bytes = spark.sessionState.executePlan(catalyst_plan).optimizedPlan.stats.sizeInBytes 

df_size_in_bytes: BigInt = 80000000

The best solution would be take 100 records and estimate the size and apply for all the rows as the above example

Read More

Tuesday, September 4, 2018

Calling scala code in pyspark for XSLT transformations

Leave a Comment

This might be a long shot, but figured it couldn't hurt to ask. I'm attempting to use Elsevier's open-sourced spark-xml-utils package in pyspark to transform some XML records with XSLT.

I've had a bit of success with some exploratory code getting a transformation to work:

# open XSLT processor from spark's jvm context with open('/tmp/foo.xsl', 'r') as f:     proc = sc._jvm.com.elsevier.spark_xml_utils.xslt.XSLTProcessor.getInstance(f.read())   # transform XML record with 'proc'  with open('/tmp/bar.xml','r') as f:     transformed = proc.transform(f.read()) 

However, in a more realistic situation, I was unable to drop the proc.transform into a lambda map function, getting errors similar to:

"An error occurred while calling o55.getstate. Trace: py4j.Py4JException: Method getstate([]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:272) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748)"

When I got the small example to work on a single record, that was operating in a pyspark shell, which I'm assuming was using the spark driver. But in the map function mentioned above, this was in Spark, via Livy and YARN, which introduces workers. This SO question/answer suggests that perhaps I cannot use the function from the jvm in that context.

Now, the spark-xml-utils library provides some examples in scala, doing precisely what I'd like to do:

import com.elsevier.spark_xml_utils.xslt.XSLTProcessor  val xmlKeyPair = sc.sequenceFile[String, String]("s3n://spark-xml-utils/xml/part*")  val stylesheet = sc.textFile("s3n://spark-xml-utils/stylesheets/srctitle.xsl").collect.head  val srctitles = xmlKeyPair.mapPartitions(recsIter => {      val proc = XSLTProcessor.getInstance(stylesheet)     recsIter.map(rec => proc.transform(rec._2)) }) 

I'm wondering, how can I translate this to pyspark code, such that I could run it over an RDD? Ideally, on an RDD with the following input and output format:

id | document | other | columns ----------------------------------------------------- sprog | <xml here...> | more | data baz   | <xml here...> | more | data 

that could become

id | document | other | columns ----------------------------------------------------- sprog | <*transformed* xml here...> | more | data baz   | <*transformed* xml here...> | more | data 

Any help or suggestions would be most appreciated.

Update 8/28/2018: Also tried running through mapPartitions, no dice. Same error of __getstate__()

0 Answers

Read More

Sunday, August 26, 2018

WARN ShutdownHookManager: ShutdownHook '$anon$2' timeout,

Leave a Comment

I have spark job which is reading a file, connecting to DB server doing some calculations and then create a different .csv file.

Since yesterday its continue to fail with this error (even tho it creates an output file):

error WARN ShutdownHookManager: ShutdownHook '$anon$2' timeout

How to fix it?

I did search and found that somebody recommend to use 'explicitly invoke sparkContext.stop() before exiting the application'. but the problem if I am adding this like of code to my spark code I cant compile it, on compilation it throws me an error:

not found: value sparkContext [error] sparkContext.stop() [error] ^ [error] one error found [error] (compile:compileIncremental) Compilation failed

0 Answers

Read More

Thursday, August 23, 2018

How to avoid an extra trailing newline when using Scala's sys.process to shell out

Leave a Comment

I need to execute an external binary from my Scala code and get its output. The standard way to do this seems to be using scala.sys.process. The following code mostly works:

import scala.sys.process._ val command = Seq("python3", "-c", "print('foo', end='')") val result = command.!! 

However, it seems like there is an extra trailing newline:

print(result.length) // prints 4 instead of 3 print(result) // prints an extra newline at the end 

I can just modify the result string to remove the last character, but it seems weird that sys.process would add a trailing newline to output, so I thought it might either be a bug with what I'm doing or something that can be configured.

Is the trailing newline supposed to be there?

Is there a way to get rid of it without manipulating the output string?

3 Answers

Answers 1

Have you tried val result = cmd.lineStream? (Or the related .lineStream_!, which offers some protection against exceptions.)

The result is a Stream[String]. Each element of the stream is a string of output from the process with whitespace preserved but there are no newline characters because that's the stream element delimiter.


The added newline appears to be intentional for processes launched with !!.

In ProcessBuilderImpl.scala:

def !! = slurp(None, withIn = false) 

...

private[this] def slurp(log: Option[ProcessLogger], withIn: Boolean): String = {   val buffer = new StringBuffer   val code   = this ! BasicIO(withIn, buffer, log)    if (code == 0) buffer.toString   else scala.sys.error("Nonzero exit value: " + code) } 

In BasicIO.scala:

def apply(withIn: Boolean, buffer: StringBuffer, log: Option[ProcessLogger]) =     new ProcessIO(input(withIn), processFully(buffer), getErr(log)) 

...

def processFully(buffer: Appendable): InputStream => Unit =   processFully(appendLine(buffer)) 

...

private[this] def appendLine(buffer: Appendable): String => Unit = line => {     buffer append line     buffer append Newline  <--!! } 

Answers 2

sys.process isn't adding the trailing newline: echo is. From the documentation:

DESCRIPTION The echo utility writes any specified operands, separated by single blank (') characters and followed by a newline (\n') character, to the standard output.

Using trim can remove this. Otherwise, you can do this, if your shell supports it:

val result = "echo -n foo".!!

Answers 3

In order to preserve the output as presented, you have to intercept the InputStream before any NewLine interpretations (or additions).

import sys.process._  val cmnd = Seq("/bin/echo","-n","one\n\nthree")  val stdOutBuf = new StringBuilder val pio = new ProcessIO(_.close()  //ignore STDIN   ,out=>{                          //save STDOUT     val src = io.Source.fromInputStream(out)     src.foreach(stdOutBuf.+=)     src.close()   },_.close())                     //ignore STDERR (or not, up to you)  val exitCode :Int = cmnd.run(pio).exitValue() val exactOutput :String = stdOutBuf.result() 
Read More

Friday, August 17, 2018

Silently dropping a nullable database column with slick

Leave a Comment

I have a database for objects called Campaigns containing three fields :

  • Id (int, not nullable)
  • Version (int, not nullable)
  • Stuff (Text, nullable)

Let's call CampaignsRow the corresponding slick entity class

When I select line from Campaigns, I don't always need to read stuff, which contains big chunks of text.

However, I'd very much like to work in the codebase with the class CampaignsRow instead of a tuple, and so to be able to sometimes just drop the Stuff column, while retaining the original type

Basically, I'm trying to write the following function :

  //Force dropping the Stuff column from the current Query   def smallCampaign(campaigns: Query[Campaigns, CampaignsRow, Seq]): Query[Campaigns, CampaignsRow, Seq] = {      val smallCampaignQuery = campaigns.map {       row => CampaignsRow(row.id, row.version , None : Option[String])      }      smallCampaignQuery /* Fails because the type is now wrong, I have a Query[(Rep[Int], Rep[Int], Rep[Option[String]), (Int, Int, Option[String], Seq] */   } 

Any idea how to do this ? I suspect this has to do with Shape in slick, but I can't find a resource to start understanding this class, and the slick source code is proving too complex for me to follow.

2 Answers

Answers 1

You're actually already doing almost what you want in def *, the default mapping. You can use the same tools in the map method. Your two tools are mapTo and <>.

As you've found, there is the mapTo method which you can only use if your case class exactly matches the shape of the tuple, so if you wanted a special case class just for this purpose:

case class CampaignLite(id: Int, version: Int)  val smallCampaignQuery = campaigns.map {   row => (row.id, row.version).mapTo[CampaignLite] } 

As you want to reuse your existing class, you can write your own convert functions instead of using the standard tupled and unapply and pass those to <>:

object CampaignRow {   def tupleLite(t: (Int, Int)) = CampaignRow(t._1, t._2, None)   def unapplyLite(c: CampaignRow) = Some((c.id, c.version)) }  val smallCampaignQuery = campaigns.map {   row => (row.id, row.version) <> (CampaignRow.tupleLite, CampaignRow.unapplyLite) } 

This gives you the most flexibility, as you can do whatever you like in your convert functions, but it's a bit more wordy.

As row is an instance of the Campaigns table you could always define it there alongside *, if you need to use it regularly.

class Campaigns ... {   ...   def * = (id, version, stuff).mapTo[CampaignRow]   def liteMapping = (id, version) <> (CampaignRow.tupleLite, CampaignRow.unapplyLite) }  val liteCampaigns = campaigns.map(_.liteMapping) 

Reference: Essential Slick 3, section 5.2.1

Answers 2

If I understand your requirement correctly, you could consider making CampaignRow a case class that models your Campaigns table class by having Campaigns extend Table[CampaignRow] and providing the bidirectional mapping for the * projection:

case class CampaignRow(id: Int, version: Int, stuff: Option[String])  class Campaigns(tag: Tag) extends Table[CampaignRow](tag, "CAMPAIGNS") {   // ...   def * = (id, version, stuff) <> (CampaignRow.tupled, CampaignRow.unapply) } 

You should then be able to do something like below:

val campaigns = TableQuery[CampaignRow]  val smallCampaignQuery = campaigns.map( _.copy(stuff = None) ) 

For a relevant example, here's a Slick doc.

Read More

Tuesday, August 14, 2018

Dynamic Mapping Statement in Spark Scala

Leave a Comment

In my code I change a DF into an RDD to run a function through a map call. With each run there is a couple seconds of overhead so if I run the function call like this:

var iterator = 1  val inputDF = spark.sql("select * from DF")  var columnPosition = Array("Column_4") columnPosition = columnPosition ++ Array("Column_9") var selectedDF = inputDF var intrimDF = inputDF var finalDF = inputDF  val inputDF = spark.sql("select * from DF") while(iterator <= columnPosition.length) {   selectedDF=finalDF.selectExpr("foreign_key",columnPosition(iterator))   intrimDF = selectedDF.rdd.map(x => (x(0),action(x(1)))).toDF.selectExpr("_1 as some_key","_2 as " columnPosition(iterator)).joinBack.changeColumnPositions.dropOriginalColumn.renameColumns    finalDF=intrimDF   iterator = iterator + 1 } 

It runs in ~90 seconds for a large job, because of the join. What I am trying to do is build it like below to cut out the join entirely and have be dynamic.

val inputDF = spark.sql("select * from DF") val intrimDF = inputDF.rdd.map(x=>(x(0),x(1),x(2),action(x(3)),x(4),x(5),x(6),x(7),action(x(8)))) val columnStatement//Create an Array with columnName changes val finalDF = intrimDF.selectExpr(columnStatement :_*) 

The issue is I cant get past the hard coding side the problem, below is an example of what I want to try to do by dynamically setting the mapping call.

val mappingStatement = "x=>(x(0),x(1),x(2),action(x(3)),x(4),x(5),x(6),x(7),action(x(8)))" val intrimDF = inputDF.rdd.map(mappingStatement) 

Everything I have tried failed: 1 Calling using the Map() function 2 Setting an Array and passing it as :_* 3 Trying to build it as an calling, but it doesnt like being dyanmic

Hope this makes sense!

0 Answers

Read More

Saturday, August 11, 2018

How to get reflected runtime Method from AnyRef object?

Leave a Comment

I am trying to get the reflected runtime method in an instance but it is not shown in the decls result:

val foo: AnyRef = new Object {   def bar = 1 } typeOf[foo.type].decls //Does not contain bar method 

I tried to use Java reflection class and it works:

foo.getClass.getDeclaredMethods //It contains bar method 

But I prefer to work with MethodSymbols and Scala Type than Java Class and Method reflection. How can I get the reflected MethodSymbol?


I want an method to look up an object passed as AnyRef for a method bar and call it. Something like below:

def getBarMethodFromObj(obj: AnyRef): MethodSymbol = {   //typeOf(obj).decl(TermName("bar")) this doesn't work } 

I cannot use trait because bar can have different argument and return types and numbers. As Scala does not support varadic generic arguments, I plan to use reflection to find the method and call, but this cannot be done in Scala as well. I am currently use Java solution:

val bar = foo.getClass.getDeclaredMethods.find(_.getName == "bar") bar.invoke(foo, params: _*) 

However Java reflection does not retain generic types as it creates problem for List and Map, etc. So I want to know if I can implement this in Scala, or is there any coming solution

1 Answers

Answers 1

I don't know what you're trying to do, but removing the AnyRef annotation makes your code work:

val foo = new { def bar = 1 } typeOf[foo.type].decls // Contains bar method 

If you need a type annotation (for example, in a method signature), you can use the same structural type that the compiler infers:

val foo: { def bar: Int } = new { def bar = 1 } 

If you want to get the full list of methods from another method without knowing the exact type except via generics, you may be interested in TypeTag:

import scala.reflect.runtime.universe.{ TypeTag, typeTag } val foo = new { def bar = 1 } def getMethods[T: TypeTag](t: T) = typeTag[T].tpe.decls getMethods(foo) // Contains bar 

If you can't use TypeTag (maybe because you can't make API changes), then you're probably best off using the Java reflection API. The Scala reflection API is generally designed to use type information, so it may not work for you if you only know the type is AnyRef.

Read More

Thursday, August 2, 2018

Distributed Computation for large data-data processing

Leave a Comment

I have a huge time series data and I want to do data processing using spark`s parallel processing/distributed computation. The requirement is looking at the data row by row to determine the groups as specified below under desired result sections, I can't really get spark to distribute this without some kind of coordination between the executors

t- timeseries datetime sample, lat-latitude, long-longitude 


For instance : Taking a small part of sample data-set for explaining the case

t   lat long 0   27  28 5   27  28 10  27  28 15  29  49 20  29  49 25  27  28 30  27  28  

Desired Output should be :

Lat-long    interval (27,28) (0,10) (29,49) (15,20) (27,28) (25,30) 

I am able to get the desired result using this piece of code

val spark = SparkSession.builder().master("local").getOrCreate()  import spark.implicits._   val df = Seq(   (0, 27,28),   (5, 27,28),   (10, 27,28),   (15, 26,49),   (20, 26,49),   (25, 27,28),   (30, 27,28) ).toDF("t", "lat","long")  val dfGrouped = df .withColumn("lat-long", struct($"lat", $"long"))  val wAll = Window.partitionBy().orderBy($"t".asc)  dfGrouped.withColumn("lag", lag("lat-long", 1, null).over(wAll)) .orderBy(asc("t")).withColumn("detector", when($"lat-long" === $"lag", 0)     .otherwise(1)).withColumn("runningTotal", sum("detector").over(wAll)) .groupBy("runningTotal", "lat-long").agg(struct(min("t"), max("t")).as("interval")) .drop("runningTotal").show } 

But what If the data gets into two executors then the data will be like

Data in executor 1 :

t   lat long 0   27  28 5   27  28 10  27  28 15  29  49 20  29  49 25  27  28 

Data in executor 2 :

t   lat long 30   27  28 


How should I get the desired output for large amount of data.There must be smarter ways to do this ,distributing this with some kind of coordination between the executors so as to get that result.

Please guide me through a right direction,I have researched about the same but not being able to land up to a solution.

PS: This just a sample example.

1 Answers

Answers 1

You can address this with a UDAF. First of all, you could add one column thats represent the t column partitioned in a number of executor you have. Something like executorIndex = t % ((max(t) - min(t)) / numExecutors).

Then you can apply your UDAF grouping by executorIndex.

Your UDAF need store a Map with a String key (for example) thats represents one lat and long pair, and a int[] thats represents the maxT and the minT for this lat-long key.

Please ask if you need more extensive explanation.

Hope this help...

PS: I'm suming that there are some time relation between same lat and long, something normal if your are tracking some movement...

Read More

Monday, July 16, 2018

Spark dataframe saveAsTable is using a single task

Leave a Comment

We have a pipeline for which the initial stages are properly scalable - using several dozen workers apiece.

One of the last stages is

dataFrame.write.format(outFormat).mode(saveMode). partitionBy(partColVals.map(_._1): _*).saveAsTable(tname) 

For this stage we end up with a single worker. This clearly does not work for us - in fact the worker runs out of disk space - on top of being very slow.

enter image description here

Why would that command end up running on a single worker/single task only?

Update The output format was parquet. The number of partition columns did not affect the result (tried one column as well as several columns).

Another update None of the following conditions (as posited by an answer below) held:

  • coalesce or partitionBy statements
  • window / analytic functions
  • Dataset.limit
  • sql.shuffle.partitions

1 Answers

Answers 1

The problem is unlikely to be related in any way to saveAsTable.

A single task in a stage indicates that the input data (Dataset or RDD) has only a one partition. This is contrast to cases where there are multiple tasks but one or more have significantly higher execution time, which normally correspond to partitions containing positively skewed keys. Also you should confound a single task scenario with low CPU utilization. The former is usually a result of insufficient IO throughput (high CPU wait times are the most obvious indication of that), but in rare cases can be traced to usage of shared objects with low level synchronization primitives.

Since standard data sources don't shuffle data on write (including cases where partitionBy and bucketBy options are used) it is safe to assume that data has been repartitioned somewhere in the upstream code. Usually it means that one of the following happened:

  • Data has been explicitly moved to a single partition using coalesce(1) or repartition(1).
  • Data has been implicitly moved to a single partition for example with:

    • Dataset.limit
    • Window function applications with window definition lacking PARTITION BY clause.

      df.withColumn(   "row_number",    row_number().over(Window.orderBy("some_column")) ) 
    • sql.shuffle.partitions option is set to 1 and upstream code includes non-local operation on a Dataset.

    • Dataset is a result of applying a global aggregate function (without GROUP BY caluse). This usually not an issue, unless function is non-reducing (collect_list or comparable).

While there is no evidence that it is the problem here, in general case you should also possibility, data contains only a single partition all the way to the source. This usually when input is fetched using JDBC source, but the 3rd party formats can exhibit the same behavior.

To identify the source of the problem you should either check the execution plan for the input Dataset (explain(true)) or check SQL tab of the Spark Web UI.

Read More

Monday, May 21, 2018

Conditional Scala Play Evolutions

Leave a Comment

I would like to implement an evolution that applies only if a condition is met on a Scala Play framework application. The condition is that the application should be in a certain environment.

I have this evolution right now:

# payments SCHEMA  # --- !Ups  INSERT INTO table1 (id, provider_name, provider_country, provider_code, status, flag) VALUES (10, 'XXXXX', 'XX', 'XXXXX', '1', '0');  # --- !Downs DELETE FROM table2 WHERE id = 10; 

I want the evolution to run if this condition is met

if(config.env == 'dev'){    //execute evolution } 

How do I achieve this? Is this a function of the evolution or the application logic?

3 Answers

Answers 1

Ideally production code should not be polluted with DEV artefacts, so I believe this is a function of evolutions.

Say we have production evolutions under

conf/evolutions/default 

We could create corresponding DEV evolutions under

conf/evolutions/dev 

Then we could load the appropriate DB on the basis of an env flag:

class SomeRepository @Inject()(dbapi: DBApi, config: Configuration)(implicit ec: DatabaseExecutionContext) {    private val db =     if(config.env == "dev"){       dbapi.database("dev")     } else {       dbapi.database("default")     }    def findById(id: Long): Future[Option[SomeModel]] = Future {     db.withConnection { implicit connection =>       SQL"select * from ...     }   }(ec) ... } 

Answers 2

One approach might be to use a stored procedure in conjunction with a db-based app 'setting'. Assume your app had an appSetting table for storing app settings.

create table appSetting (   name varchar(63) not null primary key,    value varchar(255) ) ; -- insert into appSetting values ('environment','dev'); 

Then, something along the following lines would create a tmpLog table (or insert a value into table1) only if appSetting has a value of 'dev' for setting 'environment' at the time of running the evolution:

# --- !Ups create procedure doEvolution31()   begin     declare environment varchar(31);;     select value       into environment       from appSetting     where name='environment'     ;;     if (environment='dev') then       create table tmpLog (id int not null primary key, text varchar(255));;       -- or INSERT INTO table1 (id, provider_name, provider_country, provider_code, status, flag) VALUES (10, 'XXXXX', 'XX', 'XXXXX', '1', '0');     end if;;   end ; call doEvolution31();  # --- !Downs drop procedure doEvolution31; drop table if exists tmpLog; -- or delete from table2 where id=10; 

You don't mention which db you are using. The above is MYSQL syntax. There might be a way to get a config value into the stored proc, perhaps via some sbt magic, but I think we would use the above if we had such a requirement. (BTW The double semicolons are for escaping out a single semicolon so that individual statements of the procedures are not executed when the procedure is being created.)

Answers 3

Why do you need it at all? Don't you use separate db for different environments as it's being told at documentation? If you do - then you probably have different db configurations, probably at different files. That, probably, looks something like that:

# application.conf db.default {     driver=com.mysql.jdbc.Driver     url="jdbc:mysql://localhost/playdb"     username=playdbuser     password="a strong password" } # dev.conf db.dev {     driver=com.mysql.jdbc.Driver     url="jdbc:mysql://localhost/playdb"     username=playdbuser     password="a strong password" } # staging.conf db.staging {     driver=com.mysql.jdbc.Driver     url="jdbc:mysql://localhost/playdb"     username=playdbuser     password="a strong password" } # prod.conf db.prod {     driver=com.mysql.jdbc.Driver     url="jdbc:mysql://localhost/playdb"     username=playdbuser     password="a strong password" } 

Actually nothing stops you to make it the same db but don't - just use proper db per environment. Assuming you are using jdbc connector and PlayEvolutions plugin - just put your evolution to right directory and you'll achieve what you want. The other question is actually: "How to use proper db per environment?" And the answer is strongly depend on your choice of DI.

Read More

Sunday, April 8, 2018

How to pushdown limit predicate for Cassandra when you use dataframes?

Leave a Comment

I have large Cassandra table. I want to load only 50 rows from Cassandra. Following code

val ds = sparkSession.read       .format("org.apache.spark.sql.cassandra")       .options(Map("table" -> s"$Aggregates", "keyspace" -> s"$KeySpace"))       .load()       .where(col("aggregate_type") === "DAY")       .where(col("start_time") <= "2018-03-28")       .limit(50).collect() 

Following code pushes both predicates from where methods, but not limit one. Is it true that whole data (1 million of records) being fetched? If not, why run time of this code and code without limit(50) roughly the same.

1 Answers

Answers 1

Unlike Spark Streaming, Spark itself is trying to preload as much data as it can, as fast as it can so to be able operate on it in parallel. So preloading is lazy, but greedy when it's triggered. There are cassandra-conector specific factors however:

  • Automatic predicate pushdown of valid "where" clauses.

  • According to this answer limit(...) is not translated to CQL's LIMIT, so then its behavior depends on how many fetching jobs are created after enough data is downloaded. Quote:

calling limit will allow Spark to skip reading some portions from the underlying DataSource. These would limit the amount of data read from Cassandra by canceling tasks from being executed.

Possible solutions:

  • DataFrame limits could be partially managed by limiting numPartitions and data exchange rate (concurrent.reads and other params). If you're okay with n ~ 50 "in most cases", you could also limit something like where(dayIndex < 50 * factor * num_records).

  • There is a way to set CQL LIMIT through SparkPartitionLimit, which is directly affecting every CQL request (see more) - keep in mind that requests are per-spark-partition. It's available in CassandraRdd extension class, so you would have to convert to RDD first.

The code would be something like:

filteredDataFrame.rdd.asInstanceOf[CassandraRDD].limit(n).take(n).collect() 

This would append LIMIT $N to every CQL-request. Unlike with DataFrame's limit, if you specify CassandraRDD limit several times (.limit(10).limit(20)) - only last one is going to be appended. Also, I used n instead of n / numPartitions + 1 as it (even if Spark and Cassandra partitions are one-to-one) might return less results per-partition (given that requests run in parallel - it shouldn't hurt performance too much). As a result, I had to add take(n) in order to cut <= numPartitions * n down to n.

Warning double-check that your where's are translatable to CQL (using explain()) - otherwise LIMIT would be applied before filtering.

P.S. You could also try to run CQL directly using sparkSession.sql(...) (like here) and compare results.

Read More

Saturday, March 24, 2018

Spark with Kafka streaming save to Elastic search slow performance

Leave a Comment

I have a list of data, the value is basically a bson document (think json), each json ranges from 5k to 20k in size. It either can be in bson object format or can be converted to json directly:

Key, Value -------- K1, JSON1 K1, JSON2 K2, JSON3 K2, JSON4 

I expect the groupByKey would produce:

K1, (JSON1, JSON2) K2, (JSON3, JSON4) 

so that when I do:

val data = [...].map(x => (x.Key, x.Value)) val groupedData = data.groupByKey() groupedData.foreachRDD { rdd =>    //the elements in the rdd here are not really grouped by the Key } 

I am so confused the the behaviour of the RDD. I read many articles in the internet including the official website from Spark: https://spark.apache.org/docs/0.9.1/scala-programming-guide.html

Still couldn't achieve what I want.

-------- UPDATED ---------------------

Basically I really need it to be grouped by the key, the key is the index to be used in Elasticsearch, so that I can perform batch process based on the key via Elasticsearch for Hadoop:

EsSpark.saveToEs(rdd); 

I can't do per partition because Elasticsearch only accept RDD. I tried to use sc.MakeRDD or sc.parallize, both telling me it is not serializable.

I tried to use:

EsSpark.saveToEs(rdd, Map(           "es.resource.write" -> "{TheKeyFromTheObjectAbove}",           "es.batch.size.bytes" -> "5000000") 

Documentation of the config is here: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html

But it is VERY slow comparing to not using the configuration to define dynamic index based on the value of individual document, I suspect it is parsing every json to fetch the value dynamically.

1 Answers

Answers 1

Here is the example.

import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession  object Test extends App {    val session: SparkSession = SparkSession     .builder.appName("Example")     .config(new SparkConf().setMaster("local[*]"))     .getOrCreate()   val sc = session.sparkContext    import session.implicits._    case class Message(key: String, value: String)    val input: Seq[Message] =     Seq(Message("K1", "foo1"),       Message("K1", "foo2"),       Message("K2", "foo3"),       Message("K2", "foo4"))    val inputRdd: RDD[Message] = sc.parallelize(input)    val intermediate: RDD[(String, String)] =     inputRdd.map(x => (x.key, x.value))   intermediate.toDF().show()   //  +---+----+   //  | _1|  _2|   //  +---+----+   //  | K1|foo1|   //  | K1|foo2|   //  | K2|foo3|   //  | K2|foo4|   //  +---+----+    val output: RDD[(String, List[String])] =     intermediate.groupByKey().map(x => (x._1, x._2.toList))   output.toDF().show()   //  +---+------------+   //  | _1|          _2|   //  +---+------------+   //  | K1|[foo1, foo2]|   //  | K2|[foo3, foo4]|   //  +---+------------+    output.foreachPartition(rdd => if (rdd.nonEmpty) {     println(rdd.toList)   })   //  List((K1,List(foo1, foo2)))   //  List((K2,List(foo3, foo4)))  } 
Read More

Wednesday, February 14, 2018

How to change the attributes order in Apache SparkSQL `Project` operator?

Leave a Comment

This is a Catalyst specific problem

See below my queryExecution.optimizedPlan before apply my Rule.

01 Project [x#9, p#10, q#11, if (isnull(q#11)) null else UDF(q#11) AS udfB_10#28, if (isnull(p#10)) null else UDF(p#10) AS udfA_99#93] 02 +- InMemoryRelation [x#9, p#10, q#11], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) 03    :  +- *SerializeFromObject [assertnotnull(input[0, eic.R0, true], top level non-flat input object).x AS x#9, unwrapoption(IntegerType, assertnotnull(input[0, eic.R0, true], top level non-flat input object).p) AS p#10, unwrapoption(IntegerType, assertnotnull(input[0, eic.R0, true], top level non-flat input object).q) AS q#11] 04    :     +- *MapElements <function1>, obj#8: eic.R0 05    :        +- *DeserializeToObject newInstance(class java.lang.Long), obj#7: java.lang.Long 05    :           +- *Range (0, 3, step=1, splits=Some(2)) 

In line 01 I need swap the position of udfA and udfB this way:

01 Project [x#9, p#10, q#11, if (isnull(p#10)) null else UDF(p#10) AS udfA_99#93, if (isnull(q#11)) null else UDF(q#11) AS udfB_10#28] 

when I try to change the order of the attributes in a Projection operation in SparkSQL via Catalyst optimization the result of the query is modified to an invalid value. Maybe I'm not doing everything is needed. I'm just changing the order of NamedExpression objects in fields parameter:

object ReorderColumnsOnProjectOptimizationRule extends Rule[LogicalPlan] {    def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {      case Project(fields: Seq[NamedExpression], child) =>        if (checkCondition(fields)) Project(newFieldsObject(fields), child) else Project(fields, child)      case _ => plan    }    private def newFieldsObject(fields: Seq[NamedExpression]): Seq[NamedExpression] = {     // compare UDFs computation cost and return the new NamedExpression list     . . .   }    private def checkCondition(fields: Seq[NamedExpression]): Boolean = {     // compare UDFs computation cost and return Boolean for decision off change order on field list.     . . .    }   . . . } 

Note: I'm adding my Rule on extraOptimizations SparkSQL object:

spark.experimental.extraOptimizations = Seq(ReorderColumnsOnProjectOptimizationRule) 

Any suggestions will be of great help.

EDIT 1

By the way, I created a notebook on Databricks for testing purposes. See this link for more detail

Commenting on line 60 the optimization is invoked and an error occurs.

. . . 58     // Do UDF with less cost before, so I need change the fields order 59     myPriorityList.size == 2 && myPriorityList(0) > myPriorityList(1) 60     false 61   } 

What did I miss ?

EDIT 2

Consider the following piece of code from compiler optimisation, which is almost analogous :

if ( really_slow_test(with,plenty,of,parameters)      && slower_test(with,some,parameters)      && fast_test // with no parameters    )  {   ...then code...  } 

This code first evaluates an expensive function then, on success, proceeds to evaluate the remainder of the expression. But even if the first test fails and the evaluation is short-cut, there’s a significant performance penalty because the fat really_slow_test(...) is always evaluated. While retaining program correctness, one can rearrange the expression as follows:

if ( fast_test      && slower_test(with,some,parameters)      && (really_slow_test(with,plenty,of,parameters))  {   ...then code...  } 

My goal is to run the fastest UDFs first

2 Answers

Answers 1

As stefanobaghino said the schema of the analyzer is cached after the analysis and the optimizer shouldn't change it.

If you use Spark 2.2 you can take advantage of SPARK-18127 and apply the rule in Analyzer.

If you run this dummy app

package panos.bletsos  import org.apache.spark.sql.catalyst.expressions.NamedExpression import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.SparkSessionExtensions   case class ReorderColumnsOnProjectOptimizationRule(spark: SparkSession) extends Rule[LogicalPlan] {   def apply(plan: LogicalPlan): LogicalPlan = plan transformDown  {     case p: Project => {       val fields = p.projectList       if (checkConditions(fields, p.child)) {         val modifiedFieldsObject = optimizePlan(fields, p.child, plan)         val projectUpdated = p.copy(modifiedFieldsObject, p.child)         projectUpdated       } else {         p       }     }   }    private def checkConditions(fields: Seq[NamedExpression], child: LogicalPlan): Boolean = {     // compare UDFs computation cost and return Boolean     val needsOptimization = listHaveTwoUDFsEnabledForOptimization(fields)     if (needsOptimization) println(fields.mkString(" | "))     needsOptimization   }    private def listHaveTwoUDFsEnabledForOptimization(fields: Seq[NamedExpression]): Boolean = {     // a simple priority order based on UDF name suffix     val myPriorityList = fields.map((e) => {       if (e.name.toString().startsWith("udf")) {         Integer.parseInt(e.name.toString().split("_")(1))       } else {         0       }     }).filter(e => e > 0)      // Do UDF with less cost before, so I need change the fields order     myPriorityList.size == 2 && myPriorityList(0) > myPriorityList(1)   }    private def optimizePlan(fields: Seq[NamedExpression],     child: LogicalPlan,     plan: LogicalPlan): Seq[NamedExpression] = {     // change order on field list. Return LogicalPlan modified     val myListWithUDF = fields.filter((e) =>  e.name.toString().startsWith("udf"))     if (myListWithUDF.size != 2) {       throw new UnsupportedOperationException(         s"The size of UDF list have ${myListWithUDF.size} elements.")     }     val myModifiedList: Seq[NamedExpression] = Seq(myListWithUDF(1), myListWithUDF(0))     val myListWithoutUDF = fields.filter((e) =>  !e.name.toString().startsWith("udf"))     val modifiedFielsObject = getFieldsReordered(myListWithoutUDF, myModifiedList)     val msg = "•••• optimizePlan called : " + fields.size + " columns on Project.\n" +       "•••• fields: " + fields.mkString(" | ") + "\n" +       "•••• UDFs to reorder:\n" + myListWithUDF.mkString(" | ") + "\n" +       "•••• field list Without UDF: " + myListWithoutUDF.mkString(" | ") + "\n" +       "•••• modifiedFielsObject: " + modifiedFielsObject.mkString(" | ") + "\n"     modifiedFielsObject   }    private def getFieldsReordered(fieldsWithoutUDFs: Seq[NamedExpression],     fieldsWithUDFs: Seq[NamedExpression]): Seq[NamedExpression] = {     fieldsWithoutUDFs.union(fieldsWithUDFs)   } }  case class R0(x: Int,   p: Option[Int] = Some((new scala.util.Random).nextInt(999)),   q: Option[Int] = Some((new scala.util.Random).nextInt(999)) )  object App {   def main(args : Array[String]) {     type ExtensionsBuilder = SparkSessionExtensions => Unit     // inject the rule here     val f: ExtensionsBuilder = { e =>       e.injectResolutionRule(ReorderColumnsOnProjectOptimizationRule)     }      val spark = SparkSession       .builder()       .withExtensions(f)       .getOrCreate()      def createDsR0(spark: SparkSession): Dataset[R0] = {       import spark.implicits._       val ds = spark.range(3)       val xdsR0 = ds.map((i) => {         R0(i.intValue() + 1)       })       // IMPORTANT: The cache here is mandatory       xdsR0.cache()     }      val dsR0 = createDsR0(spark)     val udfA_99 = (p: Int) => Math.cos(p * p)  // higher cost Function     val udfB_10 = (q: Int) => q + 1            // lower cost Function      println("*** I' going to register my UDF ***")     spark.udf.register("myUdfA", udfA_99)     spark.udf.register("myUdfB", udfB_10)      val dsR1 = {       val ret1DS = dsR0.selectExpr("x", "p", "q", "myUdfA(p) as udfA_99")       val result = ret1DS.cache()       dsR0.show()       result.show()        result     }      val dsR2 = {       val ret2DS = dsR1.selectExpr("x", "p", "q", "udfA_99", "myUdfB(p) as udfB_10")       val result = ret2DS.cache()       dsR0.show()       dsR1.show()       result.show()        result     }   } } 

it will print

+---+---+---+-------+-------------------+ |  x|  p|  q|udfB_10|            udfA_99| +---+---+---+-------+-------------------+ |  1|392|746|    393|-0.7508388993643841| |  2|778|582|    779| 0.9310990915956336| |  3|661| 34|    662| 0.6523545972748773| +---+---+---+-------+-------------------+ 

Answers 2

I believe the answer to this question is the same as this one.

The summary is that the optimizer is not supposed to alter the schema of the output as it's cached after the analysis.

I'll quote the accepted answer here as it comes from Michael Armbrust, the lead developer of the Spark SQL project at Databricks:

As you guessed, this is failing to work because we make assumptions that the optimizer will not change the results of the query.

Specifically, we cache the schema that comes out of the analyzer (and assume the optimizer does not change it). When translating rows to the external format, we use this schema and thus are truncating the columns in the result. If you did more than truncate (i.e. changed datatypes) this might even crash.

As you can see in this notebook, it is in fact producing the result you would expect under the covers. We are planning to open up more hooks at some point in the near future that would let you modify the plan at other phases of query execution. See SPARK-18127 for more details.

Read More

Monday, February 5, 2018

Scala Sorting with Another Json's Key

Leave a Comment

I am new to Scala and please help me out. I have 2 Json file. I want to sort first json with a key from the second json.

For eg: First Json

{     "id": 1,     "response" : [{             "user_id" : 1,             "products" : [                 {                     "product_id": 10,                     "price": 200                 },                 {                     "product_id": 13,                     "price": 210                 },                 {                     "product_id": 9,                     "price": 320                 }             ]          },{             "user_id" : 2,             "products" : [                 {                     "product_id": 15,                     "price": 200                 },                 {                     "product_id": 13,                     "price": 210                 },                 {                     "product_id": 8,                     "price": 320                 }             ]         }     ] } 

And My Second Json

{     "sort": [         {             "user_id": 1,             "products": [                 {                     "id": 8,                     "rank": 5                 },                 {                     "id": 9,                     "rank": 1                 },                 {                     "id": 10,                     "rank": 3                 },                 {                     "id": 13,                     "rank": 2                 },{                     "id": 15,                     "rank": 6                 },{                     "id": 17,                     "rank": 4                 },{                     "id": 20,                     "rank": 7                 },{                     "id": 21,                     "rank": 8                 },{                     "id": 23,                     "rank": 9                 }             ]         },{             "user_id": 2,             "products": [                 {                     "id": 8,                     "rank": 5                 },                 {                     "id": 9,                     "rank": 1                 },                 {                     "id": 10,                     "rank": 3                 },                 {                     "id": 13,                     "rank": 2                 },{                     "id": 15,                     "rank": 6                 },{                     "id": 17,                     "rank": 4                 },{                     "id": 20,                     "rank": 7                 },{                     "id": 21,                     "rank": 8                 },{                     "id": 23,                     "rank": 9                 }             ]         }     ] } 

I want to sort my first json with respect to the rank I have in second Json.

Output should be like each user should have his products in sorted order based on the rank that is specified for each user on the second JSON.

This is so far what I have tried

def sortedRes() = Action.async {     val url = //1st json url     val sortUrl = //2nd json url     ws.url(url).get().map { response =>       val value: JsValue = Json.parse(response.body)       val result: Either[Exception, SampleResponses] = value.validate[SampleResponses] match {         case JsSuccess(searchResponse, _) =>           Right(searchResponse)         case JsError(errors) =>           Left(new Exception("Couldn't parse Search API response"))       }        val values: List[SampleResponse] = result.right.get.responses        ws.url(sortUrl).get().map { response =>         val sorted: JsValue = Json.parse(response.body)           val sortRespResult: Either[Exception, Sort] = sorted.validate[Sort] match {           case JsSuccess(sortResponse, _) =>             Right(sortResponse)           case JsError(errors) =>             Left(new Exception("Couldn't parse because of these errors : " + errors))         }          val prodRankDetails: List[SampleRank] = sortRespResult.right.get.sort          println("prod = " + prodRankDetails.head.products.sortWith(_.rank > _.rank))       }       Ok(Json.toJson(result.right.get))     }   } 

In the last print statement I got the second json's first users product sorted. What I am trying to get is my first json sorted based on the second user.

Here is my model class

package models  import play.api.libs.functional.syntax._ import play.api.libs.json.Reads._ import play.api.libs.json._ // Combinator syntax   object sample {   case class SampleProduct(productId:Int, price: Int)   case class SampleResponse(userId: Int, products: List[SampleProduct])   case class SampleResponses(id: Int, responses: List[SampleResponse])   case class SampleRankedProduct(id: Int, rank: Int)   case class SampleRank(userId: Int, products: List[SampleRankedProduct])   case class Sort(sort: List[SampleRank])    implicit val productReads: Reads[SampleProduct] = (     (JsPath \ "product_id").read[Int] and       (JsPath \ "price").read[Int]     )(SampleProduct.apply _)    implicit val productWrites: Writes[SampleProduct] = (     (JsPath \ "product_id").write[Int] and       (JsPath \ "price ").write[Int]     )(unlift(SampleProduct.unapply))    implicit val responseReads: Reads[SampleResponse] = (     (JsPath \ "user_id").read[Int] and       (JsPath \ "products").read[List[SampleProduct]]     )(SampleResponse.apply _)    implicit val responseWrites: Writes[SampleResponse] = (     (JsPath \ "user_id").write[Int] and       (JsPath \ "products").write[List[SampleProduct]]     )(unlift(SampleResponse.unapply))    implicit val responsesReads: Reads[SampleResponses] = (     (JsPath \ "id").read[Int] and       (JsPath \ "response").read[List[SampleResponse]]     )(SampleResponses.apply _)    implicit val responsesWrites: Writes[SampleResponses] = (     (JsPath \ "id").write[Int] and       (JsPath \ "response").write[List[SampleResponse]]     )(unlift(SampleResponses.unapply))    implicit val rankedProductReads: Reads[SampleRankedProduct] = (     (JsPath \ "id").read[Int] and       (JsPath \ "rank").read[Int]     )(SampleRankedProduct.apply _)    implicit val rankedProductWrites: Writes[SampleRankedProduct] = (     (JsPath \ "id").write[Int] and       (JsPath \ "rank ").write[Int]     )(unlift(SampleRankedProduct.unapply))    implicit val rankReads: Reads[SampleRank] = (     (JsPath \ "user_id").read[Int] and       (JsPath \ "products").read[List[SampleRankedProduct]]     )(SampleRank.apply _)    implicit val rankWrites: Writes[SampleRank] = (     (JsPath \ "user_id").write[Int] and       (JsPath \ "products").write[List[SampleRankedProduct]]     )(unlift(SampleRank.unapply))    implicit val sortReads: Reads[Sort] = (JsPath \ "sort").read[List[SampleRank]].map(x ⇒ Sort(x))    implicit val sortWrites: Writes[Sort] = (__ \ "sort").write[List[SampleRank]].contramap { (person: Sort) => person.sort } } 

1 Answers

Answers 1

An approach for sorting the first JSON using the ranking from the second one, is something like that:

.... val prodRankDetails: List[SampleRank] = sortRespResult.right.get.sort  val sortedResults = values.copy(   responses = values.responses.map { resultForUser =>     val rankingForUser = prodRankDetails       .find(_.userId == resultForUser.userId)       .getOrElse(throw new RuntimeException(s"Rank not found for user ${resultForUser.userId}"))       .products       .map(product => (product.id, product.rank))       .toMap      resultForUser.copy(       products = resultForUser.products.sortWith((a, b) =>         rankingForUser.getOrElse(a.productId, Int.MaxValue) < rankingForUser.getOrElse(b.productId, Int.MaxValue))     )    } ) Ok(Json.toJson(sortedResults)) 

that should be performed just after you got prodRankDetails. And also the response will be available inside the .map as you are managing a Future. So you have to move also the Ok(...) inside that map and to flatMap the outer Future.

Hope this helps.

Read More

Saturday, January 6, 2018

Scala - evaluate function calls sequentially until one return

Leave a Comment

I have a few 'legacy' endpoints that can return the Data I'm looking for.

def mainCall(id): Data {      maybeMyDataInEndpoint1(id: UUID): DataA      maybeMyDataInEndpoint2(id: UUID): DataB      maybeMyDataInEndpoint3(id: UUID): DataC } 
  • null can be returned if no DataX found
  • return types for each method are different. There are a convert method that converting each DataX to unified Data.
  • The endpoints are not Scala-ish

What is the best Scala approach to evaluate those method calls sequentially until I have the value I need?

In pseudo I would do something like:

val myData = maybeMyDataInEndpoint1 getOrElse maybeMyDataInEndpoint2 getOrElse maybeMyDataInEndpoint3 

5 Answers

Answers 1

I'd use an easier approach, though the other Answers use more elaborate language features. Just use Option() to catch the null, chain with orElse. I'm assuming methods convertX(d:DataX):Data for explicit conversion. As it might not be found at all we return an Option

def mainCall(id: UUID): Option[Data] {   Option(maybeMyDataInEndpoint1(id)).map(convertA)   .orElse(Option(maybeMyDataInEndpoint2(id)).map(convertB))   .orElse(Option(maybeMyDataInEndpoint3(id)).map(convertC)) } 

Answers 2

Maybe You can lift these methods as high order functions of Lists and collectFirst, like:

  val fs = List(maybeMyDataInEndpoint1 _, maybeMyDataInEndpoint2 _, maybeMyDataInEndpoint3 _)    val f = (a: UUID) => fs.collectFirst {     case u  if u(a) != null => u(a)   }   r(myUUID) 

Answers 3

The best Scala approach IMHO is to do things in the most straightforward way.

  • To handle optional values (or nulls from Java land), use Option.
  • To sequentially evaluate a list of methods, fold over a Seq of functions.
  • To convert from one data type to another, use either (1.) implicit conversions or (2.) regular functions depending on the situation and your preference.

    1. (Edit) Assuming implicit conversions:

      def legacyEndpoint[A](endpoint: UUID => A)(implicit convert: A => Data) =   (id: UUID) => Option(endpoint(id)).map(convert)  val legacyEndpoints = Seq(   legacyEndpoint(maybeMyDataInEndpoint1),   legacyEndpoint(maybeMyDataInEndpoint2),   legacyEndpoint(maybeMyDataInEndpoint3) )  def mainCall(id: UUID): Option[Data] =   legacyEndpoints.foldLeft(Option.empty[Data])(_ orElse _(id)) 
    2. (Edit) Using explicit conversions:

      def legacyEndpoint[A](endpoint: UUID => A)(convert: A => Data) =   (id: UUID) => Option(endpoint(id)).map(convert)  val legacyEndpoints = Seq(   legacyEndpoint(maybeMyDataInEndpoint1)(fromDataA),   legacyEndpoint(maybeMyDataInEndpoint2)(fromDataB),   legacyEndpoint(maybeMyDataInEndpoint3)(fromDataC) )  ... // same as before 

Answers 4

Here is one way to do it.

(1) You can make your convert methods implicit (or wrap them into implicit wrappers) for convenience.

(2) Then use Stream to build chain from method calls. You should give type inference a hint that you want your stream to contain Data elements (not DataX as returned by legacy methods) so that appropriate implicit convert will be applied to each result of a legacy method call.

(3) Since Stream is lazy and evaluates its tail "by name" only first method gets called so far. At this point you can apply lazy filter to skip null results.

(4) Now you can actually evaluate chain, getting first non-null result with headOption

(HACK) Unfortunately, scala type inference (at the time of writing, v2.12.4) is not powerful enough to allow using #:: stream methods, unless you guide it every step of the way. Using cons makes inference happy but is cumbersome. Also, building stream using vararg apply method of companion object is not an option too, since scala does not support "by-name" varargs yet. In my example below I use combination of stream and toLazyData methods. stream is a generic helper, builds streams from 0-arg functions. toLazyData is an implicit "by-name" conversion designed to interplay with implicit convert functions that convert from DataX to Data.

Here is the demo that demonstrates the idea with more detail:

object Demo {    case class Data(value: String)   class DataA   class DataB   class DataC    def maybeMyDataInEndpoint1(id: String): DataA = {     println("maybeMyDataInEndpoint1")     null   }    def maybeMyDataInEndpoint2(id: String): DataB = {     println("maybeMyDataInEndpoint2")     new DataB   }    def maybeMyDataInEndpoint3(id: String): DataC = {     println("maybeMyDataInEndpoint3")     new DataC   }    implicit def convert(data: DataA): Data = if (data == null) null else Data(data.toString)   implicit def convert(data: DataB): Data = if (data == null) null else Data(data.toString)   implicit def convert(data: DataC): Data = if (data == null) null else Data(data.toString)    implicit def toLazyData[T](value: => T)(implicit convert: T => Data): (() => Data) = () => convert(value)    def stream[T](xs: (() => T)*): Stream[T] = {     xs.toStream.map(_())   }    def main (args: Array[String]) {      val chain = stream(       maybeMyDataInEndpoint1("1"),       maybeMyDataInEndpoint2("2"),       maybeMyDataInEndpoint3("3")     )      val result = chain.filter(_ != null).headOption.getOrElse(Data("default"))      println(result)    }  } 

This prints:

maybeMyDataInEndpoint1 maybeMyDataInEndpoint2 Data(Demo$DataB@16022d9d) 

Here maybeMyDataInEndpoint1 returns null and maybeMyDataInEndpoint2 needs to be invoked, delivering DataB, maybeMyDataInEndpoint3 never gets invoked since we already have the result.

Answers 5

I think @g.krastev's answer is perfectly good for your use case and you should accept that. I'm just expending a bit on it to show how you can make the last step slightly better with cats.

First, the boilerplate:

import java.util.UUID  final case class DataA(i: Int) final case class DataB(i: Int) final case class DataC(i: Int) type Data = Int  def convertA(a: DataA): Data = a.i def convertB(b: DataB): Data = b.i def convertC(c: DataC): Data = c.i  def maybeMyDataInEndpoint1(id: UUID): DataA = DataA(1) def maybeMyDataInEndpoint2(id: UUID): DataB = DataB(2) def maybeMyDataInEndpoint3(id: UUID): DataC = DataC(3) 

This is basically what you have, in a way that you can copy/paste in the REPL and have compile.

Now, let's first declare a way to turn each of your endpoints into something safe and unified:

def makeSafe[A, B](evaluate: UUID ⇒ A, f: A ⇒ B): UUID ⇒ Option[B] =    id ⇒ Option(evaluate(id)).map(f) 

With this in place, you can, for example, call the following to turn maybeMyDataInEndpoint1 into a UUID => Option[A]:

makeSafe(maybeMyDataInEndpoint1, convertA) 

The idea is now to turn your endpoints into a list of UUID => Option[A] and fold over that list. Here's your list:

val endpoints = List(   makeSafe(maybeMyDataInEndpoint1, convertA),   makeSafe(maybeMyDataInEndpoint2, convertB),   makeSafe(maybeMyDataInEndpoint3, convertC) ) 

You can now fold on it manually, which is what @g.krastev did:

def mainCall(id: UUID): Option[Data] =    endpoints.foldLeft(None: Option[Data])(_ orElse _(id)) 

If you're fine with a cats dependency, the notion of folding over a list of options is just a concrete use case of a common pattern (the interaction of Foldable and Monoid):

import cats._ import cats.implicits._  def mainCall(id: UUID): Option[Data] = endpoints.foldMap(_(id)) 

There are other ways to make this nicer still, but they might be overkill in this context - I'd probably declare a type class to turn any type into a Data, say, to give makeSafe a cleaner type signature.

Read More

Wednesday, December 13, 2017

EMR Spark duplicating every action and job keeps running

Leave a Comment

I have created a scala application that uses Apache Spark to retrieve data from s3, do some transformation on it and save it.

I am using Apache Spark 2.0.2 configured in a 50 (r3.4xLarge) cluster mode.

hive-env.export HADOOP_HEAPSIZE 8192 spark.executor.cores             5 spark.executor.instances         149 spark.driver.memory              106124M spark.executor.memory            38000M spark.default.parallelism        5000 spark.sql.shuffle.partitions     1000 spark.kryoserializer.buffer.max  1024m  spark.sql.hive.convertMetastoreParquet false spark.hadoop.mapreduce.input.fileinputformat.split.maxsize 2560000000 spark.files.maxPartitionBytes 2560000000 spark.network.timeout            500s 

The job is running for more than 2 days now. Tried changing executor size, memory and al no use. I am seeing in the spark ui -

Active : Stage 0 persist at ItemBuilder.scala:197   Stage 1 persist at ItemBuilder.scala:197  Stage 0 and 1 persists shows : Tasks: Succeeded/Total = 115475/204108  Pending : Stage 2 persist at ItemBuilder.scala:197   Stage 2 persists shows : Tasks: Succeeded/Total =  0/400  Stage 3 count at ItemBuilder.scala:202 Stage 3 count shows : Tasks: Succeeded/Total =  0/200  Stage 4 count at ItemBuilder.scala:202 Stage 4 count shows : Tasks: Succeeded/Total =  0/1 

Can some one tell me why I am seeing persist 3 times ? and count 2 times ?

Here is my code :

val textFiles = sqlSession.sparkContext.textFile( files.mkString( "," ) )  val jsonFiles = sqlSession.read.schema( schema ).json( textFiles )  log.info( "Job is in progress" )  val filteredItemDetails = jsonFiles.filter( col( ITEM_ID ).isNotNull ).filter( length( col( ITEM_ID ) ) > 0 )  val itemDetails = filteredItemDetails.withColumn( ITEMS, explode( filteredItemDetails( ITEMS ) ) )   .filter( size( col(ITEM_EVENTS ) ) > 0 )   .filter( col( ITEM_TIMESTAMP ).isNotNull )   .select( ITEM_ID, EVENTS_ITEM_ENTRY, ITEM_TIMESTAMP )  val convertTimestamp = udf { (timestampString: String) => {     DateUtils.getSqlTimeStamp(timestampString)   } }  val itemDetailsWithTimestamp = itemDetails.withColumn(TIME_STAMP_CONVERTED, convertTimestamp(col(TIME_STAMP)))  val recentTime = DateUtils.getSqlTimeStamp( endTime )  val groupedData = itemDetailsWithTimestamp.groupBy( ITEM_ID, ITEM_ENTRY_ID )   .agg( datediff( lit( recentTime ), max( TIME_STAMP_CONVERTED ) ) as DAY_DIFFERENCE, count( ITEM_ENTRY_ID ) as FREQUENCY )   val toMap = udf { (itemType: String, count: Int) => Map( itemType -> count ) }  val tempResult = groupedData.withColumn( FREQUENT_DAYS, toMap( col( ITEM_ENTRY_ID ), col( DAY_DIFFERENCE ) ) )   .withColumn( FREQUENCY_COUNT, toMap( col( ITEM_ENTRY_ID ), col( FREQUENCY ) ) )   .drop( ITEM_ENTRY_ID )   .drop( DAY_DIFFERENCE )   .drop( FREQUENCY )  val result = tempResult.groupBy( ITEM_ID )   .agg( CombineMaps( col( FREQUENT_DAYS ) ) as FREQUENT_DAYS,     CombineMaps( col( FREQUENCY_COUNT ) ) as FREQUENCY_COUNT )   .persist( DISK_ONLY )   log.info( "Aggregation is completed." )  val totalItems = result.count( )  log.info( "Total Items = " + totalItems ) 

And in the Resource manager I am seeing :

Memory Used = 5.52 TB Memory Total = 5.52 TB Memory Reserved = 113.13 GB VCores Used = 51 VCores Total = 51 VCores Reserved = 1  And Application Queues shows : Used (over capacity) Used Capacity:  101.2% Configured Capacity:    100.0% 

Can some one tell me am I mis configured anything here ? My job is stuck at stage 0 itself.

I tried to test with reducing the data. It works fine then, but I used the original data I keep getting :

org.apache.spark.SparkException: Job aborted due to stage failure: Task 204170 in stage 16.0 failed 4 times, most recent failure: Lost task 204170.4 in stage 16.0 (TID 1278745, ip-172-31-12-41.ec2.internal): ExecutorLostFailure (executor 520 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 626834 ms Driver stacktrace:   at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)   at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)   at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)   at scala.Option.foreach(Option.scala:257)   at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)   at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1913)   at org.apache.spark.rdd.RDD.count(RDD.scala:1134)   ... 242 elided 

I am also seeing :

Dropping SparkListenerEvent because no remaining room in event queue. This likely means one of the SparkListeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler. 

1 Answers

Answers 1

I believe those are stages of the job and not the actual persist/count happening twice. Stages are group of parallel tasks which can happen at the same time without incurring a shuffle. I see 2 groupBy s in your code which requires shuffle hence the 2 stages. Does that help?

Read More

Thursday, November 23, 2017

Why does this Spark code make NullPointerException?

Leave a Comment

I have a problem executing a Spark application.

Source code:

// Read table From HDFS val productInformation = spark.table("temp.temp_table1") val dict = spark.table("temp.temp_table2")  // Custom UDF val countPositiveSimilarity = udf[Long, Seq[String], Seq[String]]((a, b) =>      dict.filter(         (($"first".isin(a: _*) && $"second".isin(b: _*)) || ($"first".isin(b: _*) && $"second".isin(a: _*))) && $"similarity" > 0.7     ).count )  val result = productInformation.withColumn("positive_count", countPositiveSimilarity($"title", $"internal_category"))  // Error occurs! result.show 

Error message:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 54.0 failed 4 times, most recent failure: Lost task 0.3 in stage 54.0 (TID 5887, ip-10-211-220-33.ap-northeast-2.compute.internal, executor 150): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<string>, array<string>) => bigint)     at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)     at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)     at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)     at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)     at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)     at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)     at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)     at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)     at org.apache.spark.scheduler.Task.run(Task.scala:99)     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)     at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException     at $anonfun$1.apply(<console>:45)     at $anonfun$1.apply(<console>:43)     ... 16 more  Driver stacktrace:   at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)   at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)   at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)   at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)   at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)   at scala.Option.foreach(Option.scala:257)   at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802)   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650)   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)   at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)   at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628)   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918)   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931)   at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944)   at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:333)   at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)   at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2371)   at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)   at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765)   at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2370)   at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2377)   at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2113)   at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2112)   at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2795)   at org.apache.spark.sql.Dataset.head(Dataset.scala:2112)   at org.apache.spark.sql.Dataset.take(Dataset.scala:2327)   at org.apache.spark.sql.Dataset.showString(Dataset.scala:248)   at org.apache.spark.sql.Dataset.show(Dataset.scala:636)   at org.apache.spark.sql.Dataset.show(Dataset.scala:595)   at org.apache.spark.sql.Dataset.show(Dataset.scala:604)   ... 48 elided Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<string>, array<string>) => bigint)   at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)   at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)   at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)   at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)   at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)   at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)   at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)   at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)   at org.apache.spark.scheduler.Task.run(Task.scala:99)   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)   ... 3 more Caused by: java.lang.NullPointerException   at $anonfun$1.apply(<console>:45)   at $anonfun$1.apply(<console>:43)   ... 16 more 

I have checked whether productInformation and dict have null value in Columns. But there are no null values.

Can anyone help me? I attached example code to let you know more details:

case class Target(wordListOne: Seq[String], WordListTwo: Seq[String]) val targetData = Seq(Target(Seq("Spark", "Wrong", "Something"), Seq("Java", "Grape", "Banana")),                      Target(Seq("Java", "Scala"), Seq("Scala", "Banana")),                      Target(Seq(""), Seq("Grape", "Banana")),                      Target(Seq(""), Seq(""))) val targets = spark.createDataset(targetData)  case class WordSimilarity(first: String, second: String, similarity: Double) val similarityData = Seq(WordSimilarity("Spark", "Java", 0.8),                       WordSimilarity("Scala", "Spark", 0.9),                       WordSimilarity("Java", "Scala", 0.9),                      WordSimilarity("Apple", "Grape", 0.66),                      WordSimilarity("Scala", "Apple", -0.1),                      WordSimilarity("Gine", "Spark", 0.1))  val dict = spark.createDataset(similarityData)  val countPositiveSimilarity = udf[Long, Seq[String], Seq[String]]((a, b) =>      dict.filter(         (($"first".isin(a: _*) && $"second".isin(b: _*)) || ($"first".isin(b: _*) && $"second".isin(a: _*))) && $"similarity" > 0.7     ).count )  val countDF = targets.withColumn("positive_count", countPositiveSimilarity($"wordListOne", $"wordListTwo")) 

This is an example code and is similar to my original code. Example code operates well. Which point should I check in original code and data?

2 Answers

Answers 1

Very interesting question. I have to do some search, and here is my though. Hope this will help you a little bit.

When you create Dataset via createDataset, spark will assign this dataset with LocalRelation logical query plan.

def createDataset[T : Encoder](data: Seq[T]): Dataset[T] = {     val enc = encoderFor[T]     val attributes = enc.schema.toAttributes     val encoded = data.map(d => enc.toRow(d).copy())     val plan = new LocalRelation(attributes, encoded)     Dataset[T](self, plan)   } 

Follow this link: LocalRelation is a leaf logical plan that allow functions like collect or take to be executed locally, i.e. without using Spark executors.

And, it's true as isLocal method point out

 /**    * Returns true if the `collect` and `take` methods can be run locally    * (without any Spark executors).    *    * @group basic    * @since 1.6.0    */   def isLocal: Boolean = logicalPlan.isInstanceOf[LocalRelation] 

Obviously, You can check out your 2 datasets is local.

And, the show method actually call take internally.

private[sql] def showString(_numRows: Int, truncate: Int = 20): String = {     val numRows = _numRows.max(0)     val takeResult = toDF().take(numRows + 1)     val hasMoreData = takeResult.length > numRows     val data = takeResult.take(numRows) 

So, with those envidences, I think the call countDF.show is executed, it will behave simliar as when you call count on dict dataset from driver, number of call times is number of records of targets. And, the dict dataset of course doesn't need to be local for the show on countDF work.

You can try to save countDF, it will give you exception same as first case org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (array<string>, array<string>) => bigint)

Answers 2

You can not use a Dataframe inside of an udf. You will need to join productInformation and dict, and do the udf logic after the join.

Read More

Sunday, November 5, 2017

Neo4j: Which internal module responsible for verifying WHERE conditions from cypher?

Leave a Comment

My goal is to add to neo4j engine additional filtering, which will verify every node and relationship for property. This will give ability to break graph to subsets and perform random queries in different "layers" on demand.

According to this answer:

Cypher is build on the Traversal API

Though I tried to set breakpoints in PathExpanders.scala and StandardExpander.scala and it seems they aren't triggered while executing cypher MATCH query. Also modifying PathEvaluator in Evaluators.java didn't affected results for cypher queries.

I also inspected ast.rewriters which are triggered during cypher parsing, though it seems that I need to embed global filtering on later steps - when engine selects data from store.

In which place verification of node/relationship properties happens for cypher queries?

0 Answers

Read More

Sunday, October 29, 2017

Spark UDAFs: How to minimize/eliminate datatype conversion, copying and boxing overheads

Leave a Comment

My Spark App is written in Scala. I am writing several UDAFs that perform computations on "vectorized" data, where each value is an array of constant size (say 16) of doubles, instead of just one scalar value. Calculations are done on a per-element basis. My goal is to have the UDAF execute as efficiently as possible. To this end I would like the doubles to appear contiguously in memory, and to see that Spark's code gen + JVM's JIT compiler will do the computation via SIMD instructions.

However, it would seem that writing a UDAF in the straightforward documented manner and using standard DataFrame functionality leads Spark to generate awfully inefficient layouts for its Aggregator and Row objects. The data in the arrays reaches my code with type WrappedArray[Double]. It's a proxy to an Object[] array, containing 16 boxed Doubles. Not only does that take around 6-8X the amount of memory of the actual raw data, it also makes it impossible to use SIMD instructions since the doubles themselves are not in contiguous memory locations.

so for example, a simple UDAF that does a "vectorized sum" has an update function that looks like this:

override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {   val acc = buffer.getAs[WrappedArray[Double]](0)   if (!input.isNullAt(0)) {     val value = input.getAs[WrappedArray[Double]](0)     for (i <- 0 until N) {       acc(i) += value(i)     }     buffer(0) = acc   } } 

This is an operation that, in a sanely-written native program, would look something like this:

void update(double* acc, const double* input) {   for(size_t i = 0; i != N; ++i) {     acc[i] += input[i]   } } 

What's happening in my UDAF? as far as I can tell, just the fact that I needed the buffer(0) = acc line at the end to exist (or the accumulator doesn't update) means that array contents are being copied. So it first creates new two object arrays of size N (acc and value), copies the original boxed Doubles into them, then the += creates a new boxed Double with the result for each element, puts them back in acc, then the array acc is copied back into the array in buffer(0).

That is just plain awful. I haven't run any profiling yet, but I fully expect the 2nd code fragment to run about 20-50 times faster than this.

There has to be a better way to do things. I read a while ago about "Project Tungsten" and apparently Spark can operate with unmanaged memory buffers - I'm not a Spark developer though so I don't even know if I can use these capabilities for my UDAFs, if so how? Is there some other way to at least eliminate the boxing and the useless array copying?

Edit: example inputs and outputs are as follows -

case class TestInRow(id: Int, value: Array[Double])  // A UDAF that sums up arrays of 4 doubles val sumVec = MyUDAFs.SumVec(4)  val inputDF = Seq(   TestInRow(0, Array[Double](1, 2, 5, 10)),   TestInRow(1, Array[Double](0, -1, 5, -12)),   TestInRow(2, Array[Double](0, 0, 5, 10)),   TestInRow(3, Array[Double](0, -1, 5, -8)) ).toDF  val resultDF = inputDF.groupBy().agg(sumVec(col("value")) as "svec")  val output = resultDF.head.getAs[WrappedArray[Double]](0) assert(output == Seq(1.0, 0.0, 20.0, 0.0)) 

NOTE: In the real test I'm actually making an expectedDF with that one row and the expected values in it, then call a function that compares DFs by joining on a key tuple (an empty tuple in this case) and checks all doubles in the "value" columns match to within an error tolerance.

NOTE: This is taken from a unit test. In my real usecase, the arrays are initially formed from an array(expr*) construct in a select(...) call on another DF that contains scalars. Something like this:

df.select(    k1,    k2,    k3,    array(      sum(when(expr(predicates(0)(k4, k5)), qty).otherwise(lit(0.0))),      sum(when(expr(predicates(1)(k4, k5)), qty).otherwise(lit(0.0))),      sum(when(expr(predicates(2)(k4, k5)), qty).otherwise(lit(0.0))),      ...      sum(when(expr(predicates(N-1)(k4, k5)), qty).otherwise(lit(0.0)))    ) as "packed_qty" ) 

and then I want to aggregate with vectorizedSum(packed_qty), for example in an sql query that has GROUP BY ... GROUPING SETS (...) over several combinations of (k1, k2, k3).

There are several other UDAFs, they all work on the same principle: the inputs are arrays of N doubles, it does some FP operation (or several) between them on a per-element basis and aggregates the result(s) into array(s) of size N

0 Answers

Read More

Wednesday, September 6, 2017

Scala : Registry design pattern or similar?

Leave a Comment

I am migrating my system from java to Scala. I have used registry pattern in my java code to get the implementation from the string. Is there any similar thing I could do with scala ? I am new to scala, can someone point to me proper references ?

My java code :

public class ItemRegistry {      private final Map<String, ItemFactory> factoryRegistry;      public ItemRegistry() {         this.factoryRegistry = new HashMap<>();     }      public ItemRegistry(List<ItemFactory> factories) {         factoryRegistry = new HashMap<>();         for (ItemFactory factory : factories) {             registerFactory(factory);         }     }      public void registerFactory(ItemFactory factory) {         Set<String> aliases = factory.getRegisteredItems();         for (String alias : aliases) {             factoryRegistry.put(alias, factory);         }     }      public Item newInstance(String itemName) throws ItemException {         ItemFactory factory = factoryRegistry.get(itemName);         if (factory == null) {             throw new ItemException("Unable to find factory containing alias " + itemName);         }         return factory.getItem(itemName);     }      public Set<String> getRegisteredAliases() {         return factoryRegistry.keySet();     } } 

My Item interface :

public interface Item {     void apply(Order Order) throws ItemException;      String getItemName(); } 

I map the string like :

public interface ItemFactory {      Item getItem(String itemName) throws ItemException;      Set<String> getRegisteredItems(); }   public abstract class AbstractItemFactory implements ItemFactory {       protected final Map<String, Supplier<Item>> factory = Maps.newHashMap();      @Override     public Item getItem(String alias) throws ItemException {         try {             final Supplier<Item> supplier = factory.get(alias);             return supplier.get();         } catch (Exception e) {             throw new ItemException("Unable to create instance of " + alias, e);         }     }      protected Supplier<Item> defaultSupplier(Class<? extends Item> itemClass) {         return () -> {             try {                 return itemClass.newInstance();             } catch (InstantiationException | IllegalAccessException e) {                 throw new RuntimeException("Unable to create instance of " + itemClass, e);             }         };     }      @Override     public Set<String> getRegisteredItems() {         return factory.keySet();     } }  public class GenericItemFactory extends AbstractItemFactory {      public GenericItemFactory() {         factory.put("reducedPriceItem",  () -> new Discount(reducedPriceItem));         factory.put("salePriceItem",  () -> new Sale(reducedPriceItem));     } } 

where Sale and Discount are implemntation of Item. I use the newInstance method in ItemRegistry to get the class based on the name. Can some one suggest me any similar thing which can allow me to do the same in scala ?

4 Answers

Answers 1

The other answers give the following options:

  • Directly translate your existing Java code to Scala.
  • Implement another version of your existing code in Scala.
  • Use Spring for dependency injection.

This answer offers an approach that is different from the "registry pattern" and that uses the compiler instead of a string, or Spring, to resolve implementations. In Scala, we can use the language constructs to inject dependencies with the cake pattern. Below is an example using simplified versions of your classes:

case class Order(id: Int)  trait Item {   // renamed to applyOrder to disambiguate it from apply(), which has special use in Scala   def applyOrder(order: Order): Unit    def name: String }  trait Sale extends Item {   override def applyOrder(order: Order): Unit = println(s"sale on order[${order.id}]")   override def name: String = "sale" }  trait Discount extends Item {   override def applyOrder(order: Order): Unit = println(s"discount on order[${order.id}]")   override def name: String = "discount" } 

Let's define a class Shopping that depends on an Item. We can express this dependency as a self type:

class Shopping { this: Item =>   def shop(order: Order): Unit = {     println(s"shopping with $name")     applyOrder(order)   } } 

Shopping has a single method, shop, that calls both the applyOrder and name methods on its Item. Let's create two instances of Shopping: one that has a Sale item and one that has a Discount item...

val sale = new Shopping with Sale val discount = new Shopping with Discount 

...and invoke their respective shop methods:

val order1 = new Order(123) sale.shop(order1) // prints: //   shopping with sale //   sale on order[123]  val order2 = new Order(456) discount.shop(order2) // prints: //   shopping with discount //   discount on order[456] 

The compiler requires us to mix in an Item implementation when creating a Shopping instance. We have compile-time enforcement of the dependencies, and we don't need third-party libraries, with this pattern.

Answers 2

You can pretty much just translate your Java classes to Scala and use the exact same pattern as you're doing in Java.

Since Scala runs on the JVM you can also use it with Spring. It may not be the "standard" way of writing services in Scala but it's definitely a viable choice.

Answers 3

This blog is a pretty good resource and answers your question in a good, idiomatic way.

Answers 4

As others have already suggested, you can translate your code directly into Scala without changing the design pattern, if that's what you want.

Here's how that might look:

import scala.collection.Set import scala.collection.mutable import scala.collection.immutable  trait Item  trait ItemFactory {   def registeredItems: Set[String]   def getItem(alias: String): Item }  class ItemRegistry(factories: List[ItemFactory]) {    final private val factoryRegistry = mutable.Map[String, ItemFactory]()    factories.foreach(this.registerFactory)    def registerFactory(factory: ItemFactory): Unit = {     factory.registeredItems.foreach(alias =>       factoryRegistry.put(alias, factory))   }    def newInstance(itemName: String): Item = {     val factory = this.factoryRegistry.get(itemName)         .getOrElse(throw new Exception("Unable to find factory containing alias " + itemName))     factory.getItem(itemName)   }    def getRegisteredAliases: Set[String] = this.factoryRegistry.keySet } 

I would suggest that this is a clunky pattern in both Java and Scala though. It may be useful from time to time. Could you give an example of what you want to achieve with this? When do you need to use a different factory based on a runtime value?

Read More