Sunday, 20 September 2015

Functional Data Transformation with Spark

You need to make some transformations with usage of Spark Dataframes which is easy to test and maintain? I hope that this article will provide you many useful ideas on how approach this problem.

This should be enough for an introduction - content start!

An abstract understanding of the problem

Let's try to understand problem conceptually before we will dive into details. On the picture below you can see an example transformation flow when working with dataframes. In Spark Dataframes are immutable which can save us from many strange problems. Moreover we can design our flow with usage of (properly granulated) small methods which simplifies testing a lot.

But what if what we just saw is only a piece of a bigger system? Technically we have created a procedure which produces a result from 5 input parameters. There is a great risk that with this design we are going to come across the same problems as with standard procedural programming

  • A set of tightly coupled methods which are difficult to use outside given procedure.
  • Some bags for methods like "Utils classes" with set of operations representing common logic which maybe reused "here and there" but most likely can not be easily composed together to produce a new behavior.

According to my current knowledge design based on Functional Programming gives us a lot more chances to compose two pieces of a program and produce a new logic in elegant way. Also it is easier to test each piece of such design in separation in comparison to procedural approach. So let's move into FP world and see what is waiting for us there...

What we want to achieve - a view from top

On this mysterious drawing we can see something which (let's hope) can be called "Functional Data Processing Pipeline". And those wonderful rectangles symbolizes various compositions of single functions, currying and whatever your brain is able to see there.

Till now we were moving on the level of very very abstract description. To gain better understanding let's dive now into code and see how build it from the bottom.

What we want to achieve - a view from bottom (and finally some code)

Following examples may seem extraordinary simple but their goal of existence is to serve as an "educational example".

In the first piece of presented code below we can see a transformation chain consisted of three functions. First one generalTimestampFunction may represent a very general logic common to many independent transformations in our system. businessOperation is a very unsophisticated select based on some business criteria. And finally we have some kind of report at the end.

val generalTimestampFunction: (DataFrame=>DataFrame) = {df =>
    import org.apache.spark.sql.functions._
    df.withColumn("created",current_date())
}

val businessOperation: DataFrame => DataFrame = { df =>
df.select("column_with_business_meaning1","other_column1","other_column2","created")
}

val businessReport: DataFrame => DataFrame = { df =>
    import org.apache.spark.sql.functions._
    df.groupBy("created").agg(count("column_with_business_meaning1"))
}

val chain1 =generalTimestampFunction andThen businessOperation andThen businessReport

Just to stay focused on our purpose in this exercise - we want to show how moving from procedural to functional design simplifies composition and testing - testing itself won't be covered in this article (I need material for further posts) but you may use your experience and imagination to see how easy to test those functions are. Ok, next example.

val preconfiguredLogic : Long => DataFrame => DataFrame = businessKey =>  {inputFrame =>
inputFrame.select("column1,column2,created")
.where(inputFrame("businessRelation")=== businessKey)
}

val CARS_INDUSTRY=1
val MOVIE_INDUSTRY=1
val chain2=generalTimestampFunction andThen preconfiguredLogic(CARS_INDUSTRY)
val chain3=generalTimestampFunction andThen preconfiguredLogic(MOVIE_INDUSTRY)

Here we are using currying to inject some configuration into our function which is then composed with the rest of the chain. It fulfills my subjective feeling of good design - I hope your perception is at least a little bit similar :)

Till now we were operating on functions with one Dataframe as and argument. We can have situations where two or more Dataframes interact with each other to produce some meaningful result:

val businessJoin: (DataFrame,DataFrame) => DataFrame ={ (frame1,frame2) =>
    frame1.join(frame2,Seq("column_with_business_meaning1","column_with_business_meaning2"))
}

val businessUnion: (DataFrame,DataFrame) => DataFrame ={ (frame1,frame2) =>
    //some assertions that both frames are prepared for union
    frame1 unionAll frame2
}

val businessOperation2: DataFrame => DataFrame = { df =>
      import org.apache.spark.sql.functions._
      df.groupBy("column_with_business_meaning1").agg(max("expected_column1"),count("expected_column2"))
}

In this case we are going to use Function2 and maybe later Function3 or Function4 where using andThen is not that easy so now is a very good moment to come back to our diagram with abstract function chain transforming initial Data with several phases : Data=>Data

Data => Data and lifting to transformation

Let's return to this picture once again :

An InitialData in our case this may be a set of many Dataframes and some additional primitives like Long, String or whatever taken from the configuration. Requirements are quite simple and standard Map should be enough to fulfill them

type Datadictionary=Map[String,Any]

So our rectangles will now be a form of a function which creates another form of DataDictionary.

type TransformationPhase=Datadictionary => Datadictionary

And finally let's try to create a connection between just mentioned construct and pure functions which operate on "transformation primitives" (DataFrame, Long etc)

def liftToTransformation(f:DataFrame=>DataFrame)(key:String): TransformationPhase = { dictionary =>
    val frame =dictionary(key).asInstanceOf[DataFrame]
    val result=f(frame)
    //how to create a new dictionary???
    ???
}

The First problem is that we don't know how to add produced result to the dictionary. We have two quick solutions and both of the produces result of following type :

type PhaseResult=(String,DataFrame)

First one is to make primitive functions aware of a dictionary mapping :

val primitiveFunctionAware: DataFrame => PhaseResult={df =>
    val result=df.select("important_column")
    ("KEY",result)
}

But if for some reason we want to have more elasticity we can just use currying to inject a key name :

val primitiveFunctionCurried: String => DataFrame => PhaseResult=key => {df =>
    val result=df.select("important_column")
    (key,result)
}

And now we can just create a new dictionary by adding primitive result to the old one

def liftToTransformation(f:DataFrame=>PhaseResult)(key:String): TransformationPhase = { dictionary =>
    val frame =dictionary(key).asInstanceOf[DataFrame]
    dictionary + f(frame)
}

We are almost there :) Last thing in this phase - we want to use dictionary element of any type - not only Dataframes. To solve this let's create a mechanism responsible for extracting elements with particular types. In basic form it can be something like this :

trait Extractor[A]{
    def extract(dictionary:Datadictionary)(key:String):A
}

implicit object DataFramExtractor extends Extractor[DataFrame] {
    override def extract(dictionary:Datadictionary)(key: String): DataFrame = 
dictionary(key).asInstanceOf[DataFrame]
}

implicit object LongExtractor extends Extractor[Long] {
   override def extract(dictionary:Datadictionary)(key: String): Long = 
dictionary(key).asInstanceOf[Long]
}

And now we can easily prepare a set of lifting methods which can lift our primitive functions into transformation signature DataDictionary => DataDictionary

def liftToTransformation[A:Extractor](f:A=>PhaseResult)(key1:String): TransformationPhase = { dictionary =>
    val param1 =implicitly[Extractor[A]].extract(dictionary)(key1)
    dictionary + f(param1)
}

def liftToTransformation[A:Extractor,B:Extractor](f:(A,B)=>PhaseResult)(key1:String,key2:String): TransformationPhase = { dictionary =>
    val param1 =implicitly[Extractor[A]].extract(dictionary)(key1)
    val param2 =implicitly[Extractor[B]].extract(dictionary)(key2)
    dictionary + f(param1,param2)
}

def liftToTransformation[A:Extractor,B:Extractor,C:Extractor](f:(A,B,C)=>PhaseResult)(key1:String,key2:String,key3:String): TransformationPhase = { dictionary =>
    val param1 =implicitly[Extractor[A]].extract(dictionary)(key1)
    val param2 =implicitly[Extractor[B]].extract(dictionary)(key2)
    val param3 =implicitly[Extractor[C]].extract(dictionary)(key3)
    dictionary + f(param1,param2,param3)
}

The question is - can this be implemented as one method which can handle functions of various arity like Function1,Function2 or Function3? Currently I don't have answer to this question but I saw something similar in Scalaz or in spark itself with udfs - so maybe there is no other way?

Last one code with example usage and we are done in this section of the article :

val primitiveFunctionCurried: String => DataFrame => PhaseResult=key => {df =>
    val result=df.select("important_column")
    (key,result)
}

val functionLiftedToTransformation: TransformationPhase=
liftToTransformation(primitiveFunctionCurried("RESULT_KEY"))("INPUT_FRAME")

Transformation Chain

We already handled level of primitive functions with all their advantages in context of composition. We know how to lift them to transformation signature. Now we need to build a "transformation chain" which will execute our transformation step by step :

We can construct a simple wrapper which can just act a little bit like "antThen" in our transformation :

object Transformation{
    def init(loader:String => Any)(keys:String*) : Transformation = {
      val inititalDictionary=keys.map(key => (key,loader(key))).toMap
      new Transformation(inititalDictionary)
    }
  }

class Transformation(val dictionary:Datadictionary){
    def transform(f:Datadictionary=>Datadictionary) = new Transformation(f(dictionary))
  }

And Finally we can use our first version of transformation chain. In this form there are still some missing parts but generally more or less we were able to solve our initial problems : we have chained a set of standalone functions (only dependency is on transformation level where particular lifted function expect given frame to be in dictionary but the way it was inserted there is not important )

val domainLoader:String=>Any = ???
val domainTransformation1: TransformationPhase =
liftToTransformation(primitiveFunctionCurried("RESULT_KEY1"))("FRAME1")
val domainTransformation2: TransformationPhase =
liftToTransformation(primitiveFunctionCurried("RESULT_KEY2"))("FRAME2")
val domainTransformation3: TransformationPhase =
liftToTransformation(primitiveTwoParamFunction("FINAL_RESULT"))("RESULT_KEY","RESULT_KEY2")

val result=Transformation
        .init(domainLoader)("FRAME1","FRAME2","CONFIG")
        .transform(domainTransformation1)
        .transform(domainTransformation2)
        .transform(domainTransformation3)

Improvements - logging

With current design it is actually very easy to log parameters passed to each phase was run and the result which was calculated. It is simple because we need only to modify lift functions and make use of logs in transformation wrapper

type Log = String
  type LoggableTransformationPhase=Datadictionary => (Datadictionary,Log)

  def liftToTransformationWithLogging[A:Extractor](f:A=>PhaseResult)(key1:String):
 LoggableTransformationPhase= { dictionary =>
    val param1 =implicitly[Extractor[A]].extract(dictionary)(key1)
    val result=f(param1)
    val log=s"transformed $param1 for $key1 into $result"

    (dictionary + result,log)
  }

class LoggableTransformation(val dictionary:Datadictionary,
val log:Seq[Log]=Seq.empty[Log]){
    def transform(f:Datadictionary=>(Datadictionary,Log),transformationTitle:String) = 
    {
      val (nextDictionary,transformationLog: Log)=f(dictionary)
      val newLog: Seq[Log] = log :+ transformationTitle :+ transformationLog
      new LoggableTransformation(nextDictionary,newLog)
    }
  }

Improvements - caching

We can also introduce caching operation on level of our abstraction. This step moves responsibility of caching from our functions to transformation chain which simplifies functions themselves even more. A naive implementation may look like this :

class Transformation(val dictionary:Datadictionary){
    def transform(f:Datadictionary=>Datadictionary) = new Transformation(f(dictionary))
    def cache(keys:String*)={
      keys.foreach(key => dictionary(key).asInstanceOf[DataFrame].cache())
      this
    }
}

val result=Transformation
        .init(domainLoader)("FRAME1","FRAME2","CONFIG")
        .transform(domainTransformation1)
        .cache("RESULT_KEY2")
        .transform(domainTransformation2)
        .transform(domainTransformation3)

We can even implement some kind of statistics which can tell us if a frame was be requested couple times - so it may be worth to cache it!

class DataDictionaryWithStatistics(val dictionary:Map[String,Any]){
    private var statistics: Map[String,Int] = ??? //how many times each dataframe was requested

    def asDataFrame(key:String)=dictionary
      .get(key)
      .map(_.asInstanceOf[DataFrame])
      .getOrElse(throw new RuntimeException(s"$key is not a Dataframe"))
}

Reinventing a wheel?

In this article we choose certain direction in solving our problem. We started from a problem definition then we build solution step-by-step and finally we saw what kind of tool do we need to implement the solution. In my opinion this is better than taking "something with name" and adapting solution to it - for example compare "how I can solve this with Writer Monad" vs "can I use Writer monad to solve this problem?"

Now we can check if there is something ready to use which can help us in building Dataframes transformation solution. We are not going to dive deeply into those tools because it is a good topic for next posts.

State Monad

Here is a good link with explanation --> http://www.slideshare.net/dgalichet/scalaio-statemonad. Conceptually a State Monad is something with signature S=>(S,A) so in our example State would be DataDictionary(or a mix of DD and Seq[Log]) and A could be a Log (or nothing if don't want to have logging - kamikaze mode!)

In this article we never considered situation when we need to somehow compose two full transformations. State monad is something more general created with composability in mind so there is a great potential for such operations.

Writer Monad

I've never used this one in practice but my first impression is that this one is just created to solve our problem! For sure I will go deeper into this topic.

Summary

Please remember that code presented in this article is just a research project. Although it compiles - the main purpose of it was to present some concepts and some additional work is needed to adapt it to real world demands. If someone want's to take a look at the code : here is the repo --> https://github.com/PawelWlodarski/blog.

And this is the end of this article and an exciting summary!