Sunday, 25 October 2015

Log your Dataframes transformations with Writer Monad

In the last article we tried to use State Monad to build computation chain where step N depends on step (N-1). We wanted to have a set of small and easily composable pieces of code. To escape from level of high abstraction and check our design in more practical environment we are using Spark Dataframes as an example and practical illustration.

State Monad represents function which modify some state and simultaneously generates result value from it : S1 => (R,S2). In our case we treated S as a Map with Dataframes created during computation. R from equation above were just Logs generated during calculation of a new state.

The problem with that solution was that we had to pass Strings between transformations to combine final Log. Today we are going to check different construct - Writer Monad. According to description it should handle this "logs problem" problem for us - let's see if I'm smart enough to use this tool properly ;)

Understand Writer Monad

As a preparation for our experiments let's simulate couple useful types.First on is Dictionary which is a form of a bag for computation results like DataFrames, and the second one Log is just a debug message.

type Dictionary=Map[String,Any]
type Log=String

val initialDict:Dictionary=Map("initial"->"initialState")

Now let's create "the subject of our research" , the very first instance of Monad Writer in this blog post :

val initWriter: Writer[List[String], Dictionary] =Writer(List.empty[String],initialDict)

Our business logic at the beginning will be represented by a method which just adds simple simple value (new state) to previously defined dictionary. Notice that what we are building here is a phase of computation with signature : Dictionary=>Dictionary

def addToDictionary(key:String,value:Any) : Dictionary => Dictionary = { dict =>
  dict + (key -> value)
}

We are going to start "lab reasearch" with checking simple map method on Writer - will it be enough?

val result=initWriter
  .map(addToDictionary("one",1))
  .map(addToDictionary("two",true))
  .map(addToDictionary("three","value"))

result.run

//result is :
(List(),Map(initial -> initialState, one -> 1, two -> true, three -> value))

so we are not satisfied because just simple map does not generate any logs. Well this is actually not a surprise because we are not generate any logs in our "business logic" either... Let's fix it.

def addToWriter(key:String,value:Any) : Dictionary => Writer[List[String],Dictionary] = { dict =>
  Writer(List(s"adding $key -> $value"),dict + (key -> value))
}


val transformationWriter=initWriter
  .flatMap(addToWriter("one",1))
  .flatMap(addToWriter("two",true))
  .mapWritten(_ :+ "additional log") //<--- haaa this line is interesting!!
  .flatMap(addToWriter("three","value"))

In this case in each step we are building a new Writer instance in each transformation phase. Creation of a list with one element may seems a little bit awkward at the beginning but at the end all lists will be combined together in one final log. Also mapWritten is interesting because it gives us power to add some additional logging without touching "the computation subject"

Ok time to run it!

val (fullLog,fullResult)=transformationWriter.run

fullLog
//res1: List[String] = List(adding one -> 1, adding two -> true, additional log, adding three -> value)

fullResult
//res2: Dictionary = Map(initial -> initialState, one -> 1, two -> true, three -> value)

looks good! (really, it looks good - this is what we were really expecting). Below you can find an illustration to betetr understand Writer mechanism and now I believe we are ready for Spark scenario

Spark use case

In this paragraph we are going to implement a very simple but fully functional spark example. General concept is explained in a different post --> General concept of functional data transformation and it's about building transformation pipeline from small and simple functional bricks which are easy to test and compose. You can find the full code on github --> Full code : Dataframes transformations with Writer Monad

    type DataDictionary=Map[String,Any]
    type Log=String

    val addTimeStamp: DataFrame => DataFrame =
    { df =>
      df.withColumn("created",current_date())
    }

    val addLabel:String => DataFrame => DataFrame =
    {label => df =>
      df.withColumn("label",lit(label))
    }

    val businessJoin : (String,String) => (DataFrame,DataFrame) => DataFrame =
    {(column1,column2) => (df1,df2) =>
      df1.join(df2, df1(column1) === df2(column2))
    }

Mentioned functional bricks are just simple functions which operate on Dataframes. Now we need to lift them to the level of our computation chain which uses DataDictionary. This is the moment when for the first time in our Spark example we will use Writer Monad - take a look :

type TransformationPhase=DataDictionary => Writer[List[Log],DataDictionary]

def liftToTransformation[A:Extractor](f:A=>DataFrame)(key1:String)(resultKey:String): TransformationPhase =
    {dictionary =>
      val param1 =implicitly[Extractor[A]].extract(dictionary)(key1)
      val result=f(param1)
      val log= s"\nadding $resultKey -> $result"
      val newDictionary=dictionary + (resultKey -> result)
      Writer(List(log),newDictionary)
    }

Having this lifting function we can easily create functions which operate on Datadictionary

val addTimestampPhase=
      liftToTransformation(addTimeStamp)("InitialFrame")("WithTimeStamp")
val addLabelPhase=
      liftToTransformation(addLabel("experiment"))("WithTimeStamp")("Labelled")
val businessJoinPhase=
      liftToTransformation(businessJoin("customerId","id"))("Labelled","SomeOtherFrame")("JoinedByBusinessRules")

And because lifted function have signature Dictionary => Writer we can easily compose computation chain with flatMap and Writer will take care about log composition

val transformation1=(dictionary:DataDictionary)=>start(dictionary)
  .flatMap(addTimestampPhase)
  .flatMap(addLabelPhase)
  .mapWritten(_ :+ "before business join") 
  .flatMap(businessJoinPhase)

This leave us with the last missing piece of this puzzle - transformation composition. Below we have an example how we can chain first transformation with another one.

//transformation2
val importantSelect:DataFrame => DataFrame = _.select("customerId","credit","label","created")
    val importantSelectPhase =liftToTransformation(importantSelect)("JoinedByBusinessRules")("BusinessReport")

val transformation2=(dictionary:DataDictionary)=>start(dictionary)
      .flatMap(importantSelectPhase)

val transformationComposed=(dictionary:DataDictionary)=>start(dictionary)
      .flatMap(transformation1)
      .flatMap(transformation2)

Full Working Example

As I mentioned before you can find all code on github however for all 99% who will never go there - here is the main function :)

def main(args: Array[String]) {
      val config=new SparkConf().setMaster("local[4]").setAppName("Dataframes transformation with State Monad")
      val sc=new SparkContext(config)
      val sqlContext=new SQLContext(sc)
      import sqlContext.implicits._

      println("example start")

      val df1=sc.parallelize(Seq(
        (1,"cust1@gmail.com","Stefan"),
        (2,"cust2@gmail.com","Zdzislawa"),
        (3,"cust3@gmail.com","Bonifacy"),
        (4,"cust4@gmail.com","Bozebozebozenka")
      )).toDF("customerId","email","name")


      val df2=sc.parallelize(Seq(
        (1,10),
        (2,20),
        (3,30),
        (4,40)
      )).toDF("id","credit")

      val dictionary:DataDictionary=Map("InitialFrame" -> df1,"SomeOtherFrame"->df2)
      val (log,resultDictionary)=transformationComposed(dictionary).run
      println("**************LOG*************** : "+log)
      println("**************DICTIONARY********")
      resultDictionary.foreach(println)
      val result=resultDictionary("BusinessReport").asInstanceOf[DataFrame]
      result.show()

    }
And this give us the result in console:
//In list log we have every step logged - along with the message added in mapWritten
**************LOG*************** : List(
adding WithTimeStamp -> [customerId: int, email: string, name: string, created: date], 
adding Labelled -> [customerId: int, email: string, name: string, created: date, label: string], 
before business join, 
adding JoinedByBusinessRules -> [customerId: int, email: string, name: string, created: date, label: string, id: int, credit: int], 
adding BusinessReport -> [customerId: int, credit: int, label: string, created: date])


//If you want to analyze this example in detail then you are going to find reference to each used DataFrame in DataDictionary
**************DICTIONARY********
(JoinedByBusinessRules,[customerId: int, email: string, name: string, created: date, label: string, id: int, credit: int])
(WithTimeStamp,[customerId: int, email: string, name: string, created: date])
(BusinessReport,[customerId: int, credit: int, label: string, created: date])
(InitialFrame,[customerId: int, email: string, name: string])
(SomeOtherFrame,[id: int, credit: int])
(Labelled,[customerId: int, email: string, name: string, created: date, label: string])


//And our final result - nicely formatted by Spark - looks as expected
+----------+------+----------+----------+
|customerId|credit|     label|   created|
+----------+------+----------+----------+
|         1|    10|experiment|2015-10-24|
|         2|    20|experiment|2015-10-24|
|         3|    30|experiment|2015-10-24|
|         4|    40|experiment|2015-10-24|
+----------+------+----------+----------+

One more possibility

If you look deeper into Writer implementation in ScalaZ you will find a very very interesting declaration which looks like this :

  type Writer[W, A] = WriterT[Id, W, A]

What is WriterT? It's a monad transformer and it's a topic for a different article but generally it can turn "something else" into Writer. This interesting functionality gives us a possibility to limit Writer occurrences in our code to minimum and use standard tuple (maybe... maybe for some reason you have such need or maybe you are just curious how to do it)

def addToTuple(key:String,value:Any) : Dictionary => (List[String],Dictionary) = { dict =>
  (List(s"adding $key -> $value"),dict + (key -> value)) // Standard Tuple
}

val tupledResult=initWriter
  .flatMapF(addToTuple("one",1))
  .flatMapF(addToTuple("two",true))
  .mapWritten(_ :+ "additional log")
  .flatMapF(addToTuple("three","value"))

val (log,r)=tupledResult.run

What next?

We can dive deeper into this WriterT and other Monad transformers to learn what is it or maybe we can focus on reading dataframe from filesystem into DataDictionary? There is also something else worth checking and quite intriguing - something called Free Monads...

Sunday, 4 October 2015

Spark Dataframes transformations with state Monad

I want to make this post an interesting journey into unique discovery how some super interesting functional concepts may solve very practical problems.

The problem is maybe not common but still practical (I need to find a synonym for "practical" - "real world" is ok?).

  • How to make chain of Dataframes transformations in Spark easier to test, easier to maintain and easier to evolve?
I described it last time in : my last post where I also tried to show some attempts to solve it.

So general idea is to chain pure functions into one transformation pipeline. Design based on small functions should ease testing and provide more chances to compose those small pieces of logic. On the other hand there was no simple option to compose two transformations and also feeling of "reinventing the wheel" was difficult to shake off.

To investigate a very promising mechanism from FP - State Monad we are going to see some conceptual examples with usage of ScalaZ. Then we will dive a little bit into mechanics shown in https://www.manning.com/books/functional-programming-in-scala. And finally we will take a look at new library https://github.com/non/cats which in my current understanding suppose to be more modular version of ScalaZ somewhere in the future.

State Monad and Definition of Dictionary

Some worth reading theory :

To recall from the last episode - we want to execute sequence of transformations of immutable DataDictionary which contains DataFrames and some configuration parameters. Plus we want to log each step - you know - just in case...

type DataDictionary=Map[String,Any]
type Log=String

So if according to the theory - State Monad transforms a State into tuple (NewState,StateResult) --> S => (S,A)
Then in our case we will have signature Dictionary => (Dictionary,Log)

Take a look at this piece of code :

def pureTransformation(key: String, value: Any, currentLog: Log):
 (Dictionary) => (Dictionary, Log) = {
  dict =>
    val newDict: Dictionary = dict + (key -> value)
    (newDict, currentLog + s": added $key : $value")
}

This is trivial method created for educational purposes. We are just adding a new entry to existing dictionary and then we are logging our operation. Notice that we already have proper signature : (Dictionary) => (Dictionary, Log)

State companion object in ScalaZ has proper constructor where we need to put our transforming function :

def stateTransform(key:String,value:Any,currentLog:Log="") = 
State[Dictionary,Log]{
    pureTransformation(key, value, currentLog)
}

val state=stateTransform("key4",4.0)
//state: scalaz.State[Dictionary,Log] = scalaz.package$State....

And generally that's it :) Now it is important to understand that... nothing happened yet. We just have created a description for transformation. To run transformation itself we need to pass initial state of dictionary and trigger execution.

val initialDictionary:Dictionary=Map()
state.run(initialDictionary)

//res0: (Dictionary, Log) = (Map(key4 -> 4.0),: added key4 : 4.0)

That worked however now we need to solve our main problem - how to compose multiple transformations

Compose transformations

I don't want to go deep into for comprehension mechanics so if someone donesn't know what those arrows in scalas' for means - some tutorials can be found here -->https://www.google.pl/search?q=scala+for+comprehension

val transformation=for{
  log1<-stateTransform("key4",4.0)
  log2<-stateTransform("key5",true,log1)
  finalLog<-stateTransform("key6","someValue",log2)
} yield finalLog

Some awkwardness of using State Monad for this operation is that everything seems to be "log centered" where the essence is in dictionary manipulation which is handled by State internals

What is the type of transformation ?

transformation: scalaz.IndexedStateT[scalaz.Id.Id,Dictionary,Dictionary,Log]
We really don't want to go there so in this case it may be good Idea to precise type we want to work with by ourselves :
val transformation : State[Dictionary,Log]=for{ ...

And finally - in overview we are building something like this :

And usage of this educational example will result in :

val dictionary:Dictionary
=Map("key1"->1,"key2"->2,"key3"->"value3")
val (finalDictionary,log)=transformation.run(dictionary)
finalDictionary
/*res2: Dictionary = Map(
key4 -> 4.0, 
key5 -> true, 
key1 -> 1, 
key2 -> 2, 
key6 -> someValue, 
key3 -> value3)*/

log
/*res3: Log = : 
added key4 : 4.0:
added key5 : true:
added key6 : someValue*/

Analysis in depth

According to what I saw till now ScalaZ has very elegant internal design with a lot of code reuse (and this "reuse concept" which was always OOP "holy grail") however to fully understand State implementation you need to grasp wider picture.

So it may be easier to take a look at the code from https://www.manning.com/books/functional-programming-in-scala. You can find the whole example here : https://github.com/fpinscala/fpinscala/blob/master/answers/src/main/scala/fpinscala/state/State.scala but generally we need to focus on two small methods :

case class State[S, +A](run: S => (A, S)) {
(...)
def flatMap[B](f: A => State[S, B]): State[S, B] = 
State(s => {
    val (a, s1) = run(s)
    f(a).run(s1)
  })
}

So it just chains sequence of states together into one function called "run" and executes it when you call run with an initial state. I hope that the rest of the code is understandable - it always frustrate me when authors state that "the rest are just details" but in this case we are just invoking one function and passing result to another... the rest are just details :)

Spark Example

If whole concept is clear enough let's try to implement it on DataFrames. You can find the whole example on : https://github.com/PawelWlodarski/blog/blob/master/src/main/scala/pl/pawelwlodarski/spark/functionalchain/ChainOnstateMonad.scala

We are going to simulate two transformations :

  1. First one consist of three separate phases and just add couple columns to initial frame and then does simple join
  2. Second one creates a business report by single select - for simplification this is "one phase transformation"

First Transformation

val addTimeStamp: DataFrame => DataFrame = 
  { df =>
    df.withColumn("created",current_date())
  }

val addLabel:String => DataFrame => DataFrame = 
  {label => df =>
    df.withColumn("label",lit(label))
  }

val businessJoin : (String,String) => (DataFrame,DataFrame) => DataFrame = 
  {(column1,column2) => (df1,df2) =>
      df1.join(df2, df1(column1) === df2(column2))
  }

A tried to justify this design in Previous Post . Generally here we have primitive simple stateless functions which are easy to test and compose.

lifting to Transformation

Now we need machinery to lift those small functions to transformation context :

type TransformationPhase=DataDictionary => (DataDictionary,Log)

trait Extractor[A]{
    def extract(dictionary:DataDictionary)(key:String):A
}

implicit object DataFramExtractor extends Extractor[DataFrame] {
    override def extract(dictionary:DataDictionary)(key: String): DataFrame = 
dictionary(key).asInstanceOf[DataFrame]
}


def liftToTransformation[A:Extractor](f:A=>DataFrame)(key1:String)(resultKey:String)
:String => TransformationPhase = 
{currentLog => dictionary =>
    val param1 =implicitly[Extractor[A]].extract(dictionary)(key1)
    val result=f(param1)
    val log=currentLog + s"\nadding $resultKey -> $result"
    val newDictionary=dictionary + (resultKey -> result)
    (newDictionary,log)
 }

Couple important notes. First of all we have separated groups of parameters. First one is a primitive function the rest is about configuration. Maybe Lenses would be better for getting and setting parameters from dictionary but what we see is good enough for now. Extractors allow us to retrieve any type of value from dictionary (here only DataFrame for simplicity). And the last thing - as a result we need to compose logs and results - thats why lift become curried function Log => DataDictionary => (DataDictionary,Log)

Building transformations

val addTimestampPhase=
liftToTransformation(addTimeStamp)("InitialFrame")("WithTimeStamp")
val addLabelPhase=
liftToTransformation(addLabel("experiment"))("WithTimeStamp")("Labelled")
val businessJoinPhase=
liftToTransformation(businessJoin("customerId","id"))("Labelled","SomeOtherFrame")("JoinedByBusinessRules")

val transformation1:Log => State[DataDictionary,Log] =
initialLog => for{
    log1 <- State(addTimestampPhase(initialLog))
    log2 <- State(addLabelPhase(log1))
    log3 <- State(businessJoinPhase(log2))
} yield log3

We build state for each transformation phase and then we composed everything into one transformation. It maybe more convenient to this part State(phase) into lift method.

And like it was mentioned at the beginning - another simple transformation :

//transformation2
val importantSelect:DataFrame => DataFrame = _.select("customerId","credit","label","created")
val importantSelectPhase =
    liftToTransformation(importantSelect)("JoinedByBusinessRules")("BusinessReport")

val transformation2:Log => State[DataDictionary,Log]=
    initialLog => State(importantSelectPhase(initialLog))

val transformationComposed:Log => State[DataDictionary,Log]=
    initialLog=>for{
    logT1 <- transformation1(initialLog)
    logT2 <- transformation2(logT1)
} yield logT2

You may noticed one interesting thisng - transformation is Actually not a State but a Function Log => State[DataDictionary,Log]. Reason for this is that it is only way I was able to pass log from one transformation to another to combine them. Without logs we could stay with simple State[DataDictionary,Log]

Composition!

And now the moment everyone was waiting for! Transformation composition!

val transformationComposed:Log => State[DataDictionary,Log]=
initialLog=>for{
    logT1 <- transformation1(initialLog)
    logT2 <- transformation2(logT1)
} yield logT2

Yes, this was it. This was "The Moment". Your life has just changed forever...

The Program

Let's check if what we created actually works.

 def main(args: Array[String]) {
    val config=new SparkConf()
              .setMaster("local[4]")
              .setAppName("Dataframes transformation with State Monad")
    val sc=new SparkContext(config)
    val sqlContext=new SQLContext(sc)
    import sqlContext.implicits._

    println("example start")

    val df1=sc.parallelize(Seq(
      (1,"cust1@gmail.com","Stefan"),
      (2,"cust2@gmail.com","Zdzislawa"),
      (3,"cust3@gmail.com","Bonifacy"),
      (4,"cust4@gmail.com","Bozebozebozenka")
    )).toDF("customerId","email","name")


    val df2=sc.parallelize(Seq(
      (1,10),
      (2,20),
      (3,30),
      (4,40)
    )).toDF("id","credit")

    val dictionary:DataDictionary=Map("InitialFrame" -> df1,"SomeOtherFrame"->df2)

    val (resultDictionary,log)=transformationComposed("").run(dictionary)
    println("**************LOG*************** : "+log)
    println("**************DICTIONARY********")
    resultDictionary.foreach(println)
    val result=resultDictionary("BusinessReport").asInstanceOf[DataFrame]
    result.show()
  }

We have created two dataframes: first one represent some users second one users credits. Then we are building final report.

Results

Log looks good - everything in there.

**************LOG*************** : 
adding WithTimeStamp -> [customerId: int, email: string, name: string, created: date]
adding Labelled -> [customerId: int, email: string, name: string, created: date, label: string]
adding JoinedByBusinessRules -> [customerId: int, email: string, name: string, created: date, label: string, id: int, credit: int]
adding BusinessReport -> [customerId: int, credit: int, label: string, created: date]

Dictionary looks good - everything in there.

**************DICTIONARY********
(JoinedByBusinessRules,[customerId: int, email: string, name: string, created: date, label: string, id: int, credit: int])
(WithTimeStamp,[customerId: int, email: string, name: string, created: date])
(BusinessReport,[customerId: int, credit: int, label: string, created: date])
(InitialFrame,[customerId: int, email: string, name: string])
(SomeOtherFrame,[id: int, credit: int])
(Labelled,[customerId: int, email: string, name: string, created: date, label: string])

final report looks good - everything in there.

+----------+------+----------+----------+
|customerId|credit|     label|   created|
+----------+------+----------+----------+
|         1|    10|experiment|2015-10-04|
|         2|    20|experiment|2015-10-04|
|         3|    30|experiment|2015-10-04|
|         4|    40|experiment|2015-10-04|
+----------+------+----------+----------+

Cats library

My current experience tells me that when working with Spark it is good to have as few external dependencies as possible. Another library similar to ScalaZ is being created and what I understood it will be more modular. The library is called cats -> https://github.com/non/cats

You can similar educational example to the one which was at the beginning of this post --> Example In Cats

For now it is difficult for me to say what is the difference between those two libraries because example looks similar :

import cats.free.Trampoline
import cats.state._
import cats.std.all._

type Dictionary=Map[String,Any]
type Log=String

val dictionary:Dictionary=Map("key1"->1,"key2"->2,"key3"->"value3")

def transform(key:String,value:Any,currentLog:String=""): State[Dictionary,Log] = State[Dictionary,Log]{ dict =>
  val newDict:Dictionary=dict + (key -> value)
  (newDict,currentLog + s": added $key : $value")
}


val state=transform("key4",4.0)

val toRun: Trampoline[(Dictionary, Log)] = state.run(dictionary)
toRun.run

val state2=transform("key5",true)


val transformation=for{
  s1<-transform("key4",4.0,"")
  s2<-transform("key5",5.0,s1)
} yield s2

transformation.run(dictionary).run

state.flatMap(log=>transform("key5",true,log)).run(dictionary).run

What next?

  • Can we make logging easier with WriterMonad?
  • Is it possible to use Lenses instead of simple Strings to operate on DataDictionary?
  • Scalaz has an interesting construct - Monad.whileM - can we use it to implement some kind of composable loops in our transformations?