Sunday 25 October 2015

Log your Dataframes transformations with Writer Monad

In the last article we tried to use State Monad to build computation chain where step N depends on step (N-1). We wanted to have a set of small and easily composable pieces of code. To escape from level of high abstraction and check our design in more practical environment we are using Spark Dataframes as an example and practical illustration.

State Monad represents function which modify some state and simultaneously generates result value from it : S1 => (R,S2). In our case we treated S as a Map with Dataframes created during computation. R from equation above were just Logs generated during calculation of a new state.

The problem with that solution was that we had to pass Strings between transformations to combine final Log. Today we are going to check different construct - Writer Monad. According to description it should handle this "logs problem" problem for us - let's see if I'm smart enough to use this tool properly ;)

Understand Writer Monad

As a preparation for our experiments let's simulate couple useful types.First on is Dictionary which is a form of a bag for computation results like DataFrames, and the second one Log is just a debug message.

type Dictionary=Map[String,Any]
type Log=String

val initialDict:Dictionary=Map("initial"->"initialState")

Now let's create "the subject of our research" , the very first instance of Monad Writer in this blog post :

val initWriter: Writer[List[String], Dictionary] =Writer(List.empty[String],initialDict)

Our business logic at the beginning will be represented by a method which just adds simple simple value (new state) to previously defined dictionary. Notice that what we are building here is a phase of computation with signature : Dictionary=>Dictionary

def addToDictionary(key:String,value:Any) : Dictionary => Dictionary = { dict =>
  dict + (key -> value)
}

We are going to start "lab reasearch" with checking simple map method on Writer - will it be enough?

val result=initWriter
  .map(addToDictionary("one",1))
  .map(addToDictionary("two",true))
  .map(addToDictionary("three","value"))

result.run

//result is :
(List(),Map(initial -> initialState, one -> 1, two -> true, three -> value))

so we are not satisfied because just simple map does not generate any logs. Well this is actually not a surprise because we are not generate any logs in our "business logic" either... Let's fix it.

def addToWriter(key:String,value:Any) : Dictionary => Writer[List[String],Dictionary] = { dict =>
  Writer(List(s"adding $key -> $value"),dict + (key -> value))
}


val transformationWriter=initWriter
  .flatMap(addToWriter("one",1))
  .flatMap(addToWriter("two",true))
  .mapWritten(_ :+ "additional log") //<--- haaa this line is interesting!!
  .flatMap(addToWriter("three","value"))

In this case in each step we are building a new Writer instance in each transformation phase. Creation of a list with one element may seems a little bit awkward at the beginning but at the end all lists will be combined together in one final log. Also mapWritten is interesting because it gives us power to add some additional logging without touching "the computation subject"

Ok time to run it!

val (fullLog,fullResult)=transformationWriter.run

fullLog
//res1: List[String] = List(adding one -> 1, adding two -> true, additional log, adding three -> value)

fullResult
//res2: Dictionary = Map(initial -> initialState, one -> 1, two -> true, three -> value)

looks good! (really, it looks good - this is what we were really expecting). Below you can find an illustration to betetr understand Writer mechanism and now I believe we are ready for Spark scenario

Spark use case

In this paragraph we are going to implement a very simple but fully functional spark example. General concept is explained in a different post --> General concept of functional data transformation and it's about building transformation pipeline from small and simple functional bricks which are easy to test and compose. You can find the full code on github --> Full code : Dataframes transformations with Writer Monad

    type DataDictionary=Map[String,Any]
    type Log=String

    val addTimeStamp: DataFrame => DataFrame =
    { df =>
      df.withColumn("created",current_date())
    }

    val addLabel:String => DataFrame => DataFrame =
    {label => df =>
      df.withColumn("label",lit(label))
    }

    val businessJoin : (String,String) => (DataFrame,DataFrame) => DataFrame =
    {(column1,column2) => (df1,df2) =>
      df1.join(df2, df1(column1) === df2(column2))
    }

Mentioned functional bricks are just simple functions which operate on Dataframes. Now we need to lift them to the level of our computation chain which uses DataDictionary. This is the moment when for the first time in our Spark example we will use Writer Monad - take a look :

type TransformationPhase=DataDictionary => Writer[List[Log],DataDictionary]

def liftToTransformation[A:Extractor](f:A=>DataFrame)(key1:String)(resultKey:String): TransformationPhase =
    {dictionary =>
      val param1 =implicitly[Extractor[A]].extract(dictionary)(key1)
      val result=f(param1)
      val log= s"\nadding $resultKey -> $result"
      val newDictionary=dictionary + (resultKey -> result)
      Writer(List(log),newDictionary)
    }

Having this lifting function we can easily create functions which operate on Datadictionary

val addTimestampPhase=
      liftToTransformation(addTimeStamp)("InitialFrame")("WithTimeStamp")
val addLabelPhase=
      liftToTransformation(addLabel("experiment"))("WithTimeStamp")("Labelled")
val businessJoinPhase=
      liftToTransformation(businessJoin("customerId","id"))("Labelled","SomeOtherFrame")("JoinedByBusinessRules")

And because lifted function have signature Dictionary => Writer we can easily compose computation chain with flatMap and Writer will take care about log composition

val transformation1=(dictionary:DataDictionary)=>start(dictionary)
  .flatMap(addTimestampPhase)
  .flatMap(addLabelPhase)
  .mapWritten(_ :+ "before business join") 
  .flatMap(businessJoinPhase)

This leave us with the last missing piece of this puzzle - transformation composition. Below we have an example how we can chain first transformation with another one.

//transformation2
val importantSelect:DataFrame => DataFrame = _.select("customerId","credit","label","created")
    val importantSelectPhase =liftToTransformation(importantSelect)("JoinedByBusinessRules")("BusinessReport")

val transformation2=(dictionary:DataDictionary)=>start(dictionary)
      .flatMap(importantSelectPhase)

val transformationComposed=(dictionary:DataDictionary)=>start(dictionary)
      .flatMap(transformation1)
      .flatMap(transformation2)

Full Working Example

As I mentioned before you can find all code on github however for all 99% who will never go there - here is the main function :)

def main(args: Array[String]) {
      val config=new SparkConf().setMaster("local[4]").setAppName("Dataframes transformation with State Monad")
      val sc=new SparkContext(config)
      val sqlContext=new SQLContext(sc)
      import sqlContext.implicits._

      println("example start")

      val df1=sc.parallelize(Seq(
        (1,"cust1@gmail.com","Stefan"),
        (2,"cust2@gmail.com","Zdzislawa"),
        (3,"cust3@gmail.com","Bonifacy"),
        (4,"cust4@gmail.com","Bozebozebozenka")
      )).toDF("customerId","email","name")


      val df2=sc.parallelize(Seq(
        (1,10),
        (2,20),
        (3,30),
        (4,40)
      )).toDF("id","credit")

      val dictionary:DataDictionary=Map("InitialFrame" -> df1,"SomeOtherFrame"->df2)
      val (log,resultDictionary)=transformationComposed(dictionary).run
      println("**************LOG*************** : "+log)
      println("**************DICTIONARY********")
      resultDictionary.foreach(println)
      val result=resultDictionary("BusinessReport").asInstanceOf[DataFrame]
      result.show()

    }
And this give us the result in console:
//In list log we have every step logged - along with the message added in mapWritten
**************LOG*************** : List(
adding WithTimeStamp -> [customerId: int, email: string, name: string, created: date], 
adding Labelled -> [customerId: int, email: string, name: string, created: date, label: string], 
before business join, 
adding JoinedByBusinessRules -> [customerId: int, email: string, name: string, created: date, label: string, id: int, credit: int], 
adding BusinessReport -> [customerId: int, credit: int, label: string, created: date])


//If you want to analyze this example in detail then you are going to find reference to each used DataFrame in DataDictionary
**************DICTIONARY********
(JoinedByBusinessRules,[customerId: int, email: string, name: string, created: date, label: string, id: int, credit: int])
(WithTimeStamp,[customerId: int, email: string, name: string, created: date])
(BusinessReport,[customerId: int, credit: int, label: string, created: date])
(InitialFrame,[customerId: int, email: string, name: string])
(SomeOtherFrame,[id: int, credit: int])
(Labelled,[customerId: int, email: string, name: string, created: date, label: string])


//And our final result - nicely formatted by Spark - looks as expected
+----------+------+----------+----------+
|customerId|credit|     label|   created|
+----------+------+----------+----------+
|         1|    10|experiment|2015-10-24|
|         2|    20|experiment|2015-10-24|
|         3|    30|experiment|2015-10-24|
|         4|    40|experiment|2015-10-24|
+----------+------+----------+----------+

One more possibility

If you look deeper into Writer implementation in ScalaZ you will find a very very interesting declaration which looks like this :

  type Writer[W, A] = WriterT[Id, W, A]

What is WriterT? It's a monad transformer and it's a topic for a different article but generally it can turn "something else" into Writer. This interesting functionality gives us a possibility to limit Writer occurrences in our code to minimum and use standard tuple (maybe... maybe for some reason you have such need or maybe you are just curious how to do it)

def addToTuple(key:String,value:Any) : Dictionary => (List[String],Dictionary) = { dict =>
  (List(s"adding $key -> $value"),dict + (key -> value)) // Standard Tuple
}

val tupledResult=initWriter
  .flatMapF(addToTuple("one",1))
  .flatMapF(addToTuple("two",true))
  .mapWritten(_ :+ "additional log")
  .flatMapF(addToTuple("three","value"))

val (log,r)=tupledResult.run

What next?

We can dive deeper into this WriterT and other Monad transformers to learn what is it or maybe we can focus on reading dataframe from filesystem into DataDictionary? There is also something else worth checking and quite intriguing - something called Free Monads...

No comments:

Post a Comment