Sunday, 4 October 2015

Spark Dataframes transformations with state Monad

I want to make this post an interesting journey into unique discovery how some super interesting functional concepts may solve very practical problems.

The problem is maybe not common but still practical (I need to find a synonym for "practical" - "real world" is ok?).

  • How to make chain of Dataframes transformations in Spark easier to test, easier to maintain and easier to evolve?
I described it last time in : my last post where I also tried to show some attempts to solve it.

So general idea is to chain pure functions into one transformation pipeline. Design based on small functions should ease testing and provide more chances to compose those small pieces of logic. On the other hand there was no simple option to compose two transformations and also feeling of "reinventing the wheel" was difficult to shake off.

To investigate a very promising mechanism from FP - State Monad we are going to see some conceptual examples with usage of ScalaZ. Then we will dive a little bit into mechanics shown in https://www.manning.com/books/functional-programming-in-scala. And finally we will take a look at new library https://github.com/non/cats which in my current understanding suppose to be more modular version of ScalaZ somewhere in the future.

State Monad and Definition of Dictionary

Some worth reading theory :

To recall from the last episode - we want to execute sequence of transformations of immutable DataDictionary which contains DataFrames and some configuration parameters. Plus we want to log each step - you know - just in case...

type DataDictionary=Map[String,Any]
type Log=String

So if according to the theory - State Monad transforms a State into tuple (NewState,StateResult) --> S => (S,A)
Then in our case we will have signature Dictionary => (Dictionary,Log)

Take a look at this piece of code :

def pureTransformation(key: String, value: Any, currentLog: Log):
 (Dictionary) => (Dictionary, Log) = {
  dict =>
    val newDict: Dictionary = dict + (key -> value)
    (newDict, currentLog + s": added $key : $value")
}

This is trivial method created for educational purposes. We are just adding a new entry to existing dictionary and then we are logging our operation. Notice that we already have proper signature : (Dictionary) => (Dictionary, Log)

State companion object in ScalaZ has proper constructor where we need to put our transforming function :

def stateTransform(key:String,value:Any,currentLog:Log="") = 
State[Dictionary,Log]{
    pureTransformation(key, value, currentLog)
}

val state=stateTransform("key4",4.0)
//state: scalaz.State[Dictionary,Log] = scalaz.package$State....

And generally that's it :) Now it is important to understand that... nothing happened yet. We just have created a description for transformation. To run transformation itself we need to pass initial state of dictionary and trigger execution.

val initialDictionary:Dictionary=Map()
state.run(initialDictionary)

//res0: (Dictionary, Log) = (Map(key4 -> 4.0),: added key4 : 4.0)

That worked however now we need to solve our main problem - how to compose multiple transformations

Compose transformations

I don't want to go deep into for comprehension mechanics so if someone donesn't know what those arrows in scalas' for means - some tutorials can be found here -->https://www.google.pl/search?q=scala+for+comprehension

val transformation=for{
  log1<-stateTransform("key4",4.0)
  log2<-stateTransform("key5",true,log1)
  finalLog<-stateTransform("key6","someValue",log2)
} yield finalLog

Some awkwardness of using State Monad for this operation is that everything seems to be "log centered" where the essence is in dictionary manipulation which is handled by State internals

What is the type of transformation ?

transformation: scalaz.IndexedStateT[scalaz.Id.Id,Dictionary,Dictionary,Log]
We really don't want to go there so in this case it may be good Idea to precise type we want to work with by ourselves :
val transformation : State[Dictionary,Log]=for{ ...

And finally - in overview we are building something like this :

And usage of this educational example will result in :

val dictionary:Dictionary
=Map("key1"->1,"key2"->2,"key3"->"value3")
val (finalDictionary,log)=transformation.run(dictionary)
finalDictionary
/*res2: Dictionary = Map(
key4 -> 4.0, 
key5 -> true, 
key1 -> 1, 
key2 -> 2, 
key6 -> someValue, 
key3 -> value3)*/

log
/*res3: Log = : 
added key4 : 4.0:
added key5 : true:
added key6 : someValue*/

Analysis in depth

According to what I saw till now ScalaZ has very elegant internal design with a lot of code reuse (and this "reuse concept" which was always OOP "holy grail") however to fully understand State implementation you need to grasp wider picture.

So it may be easier to take a look at the code from https://www.manning.com/books/functional-programming-in-scala. You can find the whole example here : https://github.com/fpinscala/fpinscala/blob/master/answers/src/main/scala/fpinscala/state/State.scala but generally we need to focus on two small methods :

case class State[S, +A](run: S => (A, S)) {
(...)
def flatMap[B](f: A => State[S, B]): State[S, B] = 
State(s => {
    val (a, s1) = run(s)
    f(a).run(s1)
  })
}

So it just chains sequence of states together into one function called "run" and executes it when you call run with an initial state. I hope that the rest of the code is understandable - it always frustrate me when authors state that "the rest are just details" but in this case we are just invoking one function and passing result to another... the rest are just details :)

Spark Example

If whole concept is clear enough let's try to implement it on DataFrames. You can find the whole example on : https://github.com/PawelWlodarski/blog/blob/master/src/main/scala/pl/pawelwlodarski/spark/functionalchain/ChainOnstateMonad.scala

We are going to simulate two transformations :

  1. First one consist of three separate phases and just add couple columns to initial frame and then does simple join
  2. Second one creates a business report by single select - for simplification this is "one phase transformation"

First Transformation

val addTimeStamp: DataFrame => DataFrame = 
  { df =>
    df.withColumn("created",current_date())
  }

val addLabel:String => DataFrame => DataFrame = 
  {label => df =>
    df.withColumn("label",lit(label))
  }

val businessJoin : (String,String) => (DataFrame,DataFrame) => DataFrame = 
  {(column1,column2) => (df1,df2) =>
      df1.join(df2, df1(column1) === df2(column2))
  }

A tried to justify this design in Previous Post . Generally here we have primitive simple stateless functions which are easy to test and compose.

lifting to Transformation

Now we need machinery to lift those small functions to transformation context :

 //lift
type TransformationPhase=DataDictionary => (DataDictionary,Log)

trait Extractor[A]{
    def extract(dictionary:DataDictionary)(key:String):A
}

implicit object DataFramExtractor extends Extractor[DataFrame] {
    override def extract(dictionary:DataDictionary)(key: String): DataFrame = dictionary(key).asInstanceOf[DataFrame]
}


def liftToTransformation[A:Extractor](f:A=>DataFrame)(key1:String)(resultKey:String):String => TransformationPhase = 
{currentLog => dictionary =>
    val param1 =implicitly[Extractor[A]].extract(dictionary)(key1)
    val result=f(param1)
    val log=currentLog + s"\nadding $resultKey -> $result"
    val newDictionary=dictionary + (resultKey -> result)
    (newDictionary,log)
 }

Couple important notes. First of all we have separated groups of parameters. First one is a primitive function the rest is about configuration. Maybe Lenses would be better for getting and setting parameters from dictionary but what we see is good enough for now. Extractors allow us to retrieve any type of value from dictionary (here only DataFrame for simplicity). And the last thing - as a result we need to compose logs and results - thats why lift become curried function Log => DataDictionary => (DataDictionary,Log)

Building transformations

val addTimestampPhase=
liftToTransformation(addTimeStamp)("InitialFrame")("WithTimeStamp")
val addLabelPhase=
liftToTransformation(addLabel("experiment"))("WithTimeStamp")("Labelled")
val businessJoinPhase=
liftToTransformation(businessJoin("customerId","id"))("Labelled","SomeOtherFrame")("JoinedByBusinessRules")

val transformation1:Log => State[DataDictionary,Log] =
initialLog => for{
    log1 <- State(addTimestampPhase(initialLog))
    log2 <- State(addLabelPhase(log1))
    log3 <- State(businessJoinPhase(log2))
} yield log3

We build state for each transformation phase and then we composed everything into one transformation. It maybe more convenient to this part State(phase) into lift method.

And like it was mentioned at the beginning - another simple transformation :

//transformation2
val importantSelect:DataFrame => DataFrame = _.select("customerId","credit","label","created")
val importantSelectPhase =
    liftToTransformation(importantSelect)("JoinedByBusinessRules")("BusinessReport")

val transformation2:Log => State[DataDictionary,Log]=
    initialLog => State(importantSelectPhase(initialLog))

val transformationComposed:Log => State[DataDictionary,Log]=
    initialLog=>for{
    logT1 <- transformation1(initialLog)
    logT2 <- transformation2(logT1)
} yield logT2

You may noticed one interesting thisng - transformation is Actually not a State but a Function Log => State[DataDictionary,Log]. Reason for this is that it is only way I was able to pass log from one transformation to another to combine them. Without logs we could stay with simple State[DataDictionary,Log]

Composition!

And now the moment everyone was waiting for! Transformation composition!

val transformationComposed:Log => State[DataDictionary,Log]=
initialLog=>for{
    logT1 <- transformation1(initialLog)
    logT2 <- transformation2(logT1)
} yield logT2

Yes, this was it. This was "The Moment". Your life has just changed forever...

The Program

Let's check if what we created actually works.

 def main(args: Array[String]) {
    val config=new SparkConf()
              .setMaster("local[4]")
              .setAppName("Dataframes transformation with State Monad")
    val sc=new SparkContext(config)
    val sqlContext=new SQLContext(sc)
    import sqlContext.implicits._

    println("example start")

    val df1=sc.parallelize(Seq(
      (1,"cust1@gmail.com","Stefan"),
      (2,"cust2@gmail.com","Zdzislawa"),
      (3,"cust3@gmail.com","Bonifacy"),
      (4,"cust4@gmail.com","Bozebozebozenka")
    )).toDF("customerId","email","name")


    val df2=sc.parallelize(Seq(
      (1,10),
      (2,20),
      (3,30),
      (4,40)
    )).toDF("id","credit")

    val dictionary:DataDictionary=Map("InitialFrame" -> df1,"SomeOtherFrame"->df2)

    val (resultDictionary,log)=transformationComposed("").run(dictionary)
    println("**************LOG*************** : "+log)
    println("**************DICTIONARY********")
    resultDictionary.foreach(println)
    val result=resultDictionary("BusinessReport").asInstanceOf[DataFrame]
    result.show()
  }

We have created two dataframes: first one represent some users second one users credits. Then we are building final report.

Results

Log looks good - everything in there.

**************LOG*************** : 
adding WithTimeStamp -> [customerId: int, email: string, name: string, created: date]
adding Labelled -> [customerId: int, email: string, name: string, created: date, label: string]
adding JoinedByBusinessRules -> [customerId: int, email: string, name: string, created: date, label: string, id: int, credit: int]
adding BusinessReport -> [customerId: int, credit: int, label: string, created: date]

Dictionary looks good - everything in there.

**************DICTIONARY********
(JoinedByBusinessRules,[customerId: int, email: string, name: string, created: date, label: string, id: int, credit: int])
(WithTimeStamp,[customerId: int, email: string, name: string, created: date])
(BusinessReport,[customerId: int, credit: int, label: string, created: date])
(InitialFrame,[customerId: int, email: string, name: string])
(SomeOtherFrame,[id: int, credit: int])
(Labelled,[customerId: int, email: string, name: string, created: date, label: string])

final report looks good - everything in there.

+----------+------+----------+----------+
|customerId|credit|     label|   created|
+----------+------+----------+----------+
|         1|    10|experiment|2015-10-04|
|         2|    20|experiment|2015-10-04|
|         3|    30|experiment|2015-10-04|
|         4|    40|experiment|2015-10-04|
+----------+------+----------+----------+

Cats library

My current experience tells me that when working with Spark it is good to have as few external dependencies as possible. Another library similar to ScalaZ is being created and what I understood it will be more modular. The library is called cats -> https://github.com/non/cats

You can similar educational example to the one which was at the beginning of this post --> Example In Cats

For now it is difficult for me to say what is the difference between those two libraries because example looks similar :

import cats.free.Trampoline
import cats.state._
import cats.std.all._

type Dictionary=Map[String,Any]
type Log=String

val dictionary:Dictionary=Map("key1"->1,"key2"->2,"key3"->"value3")

def transform(key:String,value:Any,currentLog:String=""): State[Dictionary,Log] = State[Dictionary,Log]{ dict =>
  val newDict:Dictionary=dict + (key -> value)
  (newDict,currentLog + s": added $key : $value")
}


val state=transform("key4",4.0)

val toRun: Trampoline[(Dictionary, Log)] = state.run(dictionary)
toRun.run

val state2=transform("key5",true)


val transformation=for{
  s1<-transform("key4",4.0,"")
  s2<-transform("key5",5.0,s1)
} yield s2

transformation.run(dictionary).run

state.flatMap(log=>transform("key5",true,log)).run(dictionary).run

What next?

  • Can we make logging easier with WriterMonad?
  • Is it possible to use Lenses instead of simple Strings to operate on DataDictionary?
  • Scalaz has an interesting construct - Monad.whileM - can we use it to implement some kind of composable loops in our transformations?

2 comments:

  1. Why a Map[String,Any] when the def liftToTransformation[A:Extractor](f:A=>DataFrame)(key1:String)(resultKey:String) value type is a String?

    ReplyDelete
  2. Hi, liftToTransformation returns a function String => TransformationPhase where input is current log. Map[String,Any] is something separated and it just contains values of various types like DataFrame but also Int,Floats or String. Inside liftToTransformation you are extracting those values and uses as input in pure function (in this simple example it was A=>DataFrame). I hope I understood your question correctly.

    ReplyDelete