The External Operations Pattern

This is the first pattern of the series of Scalding specific patterns. As I mentioned in the introduction, the pattern presentation will follow the structure:

  • Motivation: where I will describe the reason why this pattern might be relevant to you
  • Structure: where I will describe how the code will be structured
  • Sample code: where I will describe a very simple solution implementing the pattern
  • Interactions: where I will present how this pattern is interacting with others

The External Operations Pattern

The code of a the Scalding Job class should not contain complex pipe transformations but delegate to externally-written functions.

Motivation

There are several reasons to adopt this approach. The first one is to follow the KISS principle and reduce the complexity of the code composing the Scalding Job. Extracting the operations in external module (a Trait or Object) is also allowing to respect the DRY principle and extract common operations to be reused by different Jobs. This pattern is also very useful to increase the testability of your code and will constitute the basis of the TDD approach described in the next chapter.

Structure

A Scalding Job is created defining a class extending com.twitter.scalding.Job. The Primary Constructor will contain all the logic to interpret the job arguments and process the job input to generate the desired output. According to this pattern the Job class will contain code to:

  • Parse and Validate the parameters;
  • Create Input Sources and Output Sinks;
  • Compose a set of externally available functions (Transformation Functions) to operate on the Sources in order to generate the data to write on the Sinks

The Transformation Functions are extracted in a module (Trait or Object) external to the job. The same applies to the Schemas definition for the Sources, Sinks and the different Pipes generated using the TFs.

Sample Code

Let’s consider a simple job that is responsible of analysing a series of user events related to web activity in the format timestamp, userId, action, actionData and extract statistics about the number of sessions per users, their average duration and the average number of actions per session. The definition of a session is given splitting user actions in cluster of actions closer than a time threshold. A very simple implementation of this logic will be as follow:


import com.twitter.scalding._

trait IdGenerator[T] {
  type IdType = T

  def first: T
  def next(current: T): T
}

class NoExternalOperationsJob(args: Args) extends Job(args) {

  val events = List(
    (1000, "1", "get", "session1_event1"),
    (1010, "1", "click", "session1_event2"),
    (1020, "1", "click", "session1_event3"),
    (1030, "1", "put", "session1_event4"),

    (1100, "2", "get", "session1_event1"),
    (1110, "2", "click", "session1_event2"),
    (1160, "2", "put", "session1_event3"),

    (1200, "1", "get", "session2_event1"),
    (1210, "1", "click", "session2_event2"),
    (1260, "1", "put", "session2_event3")
  )

  val simpleIdGenerator =  new IdGenerator[Long] {
    override def first: Long = 0l

    override def next(current: Long): Long = current + 1
  }

  val SESSION_THREASHOLD = 100l

  // Actions: enter, click, drag
  IterableSource(events, List('timestamp, 'userId, 'action, 'actionData)).read
    .insert('tempPreviousEventTime, 0l)
    .groupBy('userId) { group =>
      group
        .sortBy('timestamp)
        .scanLeft( ('timestamp,'tempPreviousEventTime) -> ('tempTimestamp, 'timeSincePrevious) )( (0l, 0l) ) { (accumulator: (Long, Long), current: (Long, Long)) =>
          val previousEventTimestamp = accumulator._1
          val currentEventTimestamp = current._1

          val duration = if(previousEventTimestamp != 0l) (currentEventTimestamp - previousEventTimestamp) else -1l

          (currentEventTimestamp, duration)
        }
    }
    .discard('tempPreviousEventTime)
    .discard('tempTimestamp)
    .filter('timestamp) { timestamp: Long => timestamp > 0 }
    .groupBy('userId) { group =>
        group
          .sortBy('timestamp)
          .scanLeft('timeSincePrevious -> 'sessionId) (simpleIdGenerator.first) { (currSessionId: simpleIdGenerator.IdType, timeSincePrevious: Long) =>
            if(timeSincePrevious > SESSION_THREASHOLD)
              simpleIdGenerator.next(currSessionId)
            else
              currSessionId
          }
    }
    .filter('timestamp) { timestamp: Long => timestamp > 0 }
    .discard('timeSincePrevious)
    .debug
    .groupBy('userId, 'sessionId) { group =>
      group
        .sortBy('timestamp)
        .size('actionsPerSession)
        .toList[Long]('timestamp -> 'sessionTimes)
    }
    .map('sessionTimes -> 'sessionLenght) { sessionTimes: List[Long] =>
      sessionTimes.last - sessionTimes.head
    }
    .discard('sessionTimes)
    .debug
    .groupBy('userId) { group =>
      group
        .size('sessionsPerUser)
        .average('sessionLenght -> 'avgSessionLenght)
        .average('actionsPerSession -> 'avgactionsPerSession )
    }
    .debug
    .write(Csv("output.csv"))
}

 

The code shown above is doing the job but is quite difficult to read and doesn’t allow to test every step in isolation. I added some prints in some relevant points to show how the data is transformed during the pipeline.

Using the External Operations Pattern the job code will appear as follows:


class ExternalOperationsJob(args: Args) extends Job(args) {
 import eventSchemas._
 import EventSessionStats._

 val events = List(
 (1000, "1", "get", "session1_event1"),
 (1010, "1", "click", "session1_event2"),
 (1020, "1", "click", "session1_event3"),
 (1030, "1", "put", "session1_event4"),

 (1100, "2", "get", "session1_event1"),
 (1110, "2", "click", "session1_event2"),
 (1160, "2", "put", "session1_event3"),

 (1200, "1", "get", "session2_event1"),
 (1210, "1", "click", "session2_event2"),
 (1260, "1", "put", "session2_event3")
 )

 val simpleIdGenerator = new IdGenerator[Long] {
 override def first: Long = 0l

 override def next(current: Long): Long = current + 1
 }

 val SESSION_THREASHOLD = 100l

 // Actions: enter, click, drag
 IterableSource(events, EVENT)
 .read
 .addTimeSincePreviousUserEvent
 .extractSessions(simpleIdGenerator, SESSION_THREASHOLD)
 .extractPerSessionStats
 .extractAverageSessionStats
 .write(Csv("output.csv"))

}

The schema describing how the data are changing are specified into a separate object sampleJobSchemas.  This obviously applies to the Field API. An example with the Typed API will be given in a different post.

object eventSchemas {
 val EVENT = List('timestamp, 'userId, 'action, 'actionData)
 val EVENT_WITH_TIMESPAN = EVENT ::: List('timeSincePrevious)
 val EVENT_IN_SESSION = List('userId, 'timestamp, 'sessionId)
 val SESSION_STAT = List('userId, 'sessionDuration, 'actionsPerSession)
 val SESSION_AVERAGE_STAT = List('userId, 'avgSessionDuration, 'avgActionsPerSession)
}

The Transformations Functions are all contained into a trait EventSessionStats that defines all the different transformations used by the job. The trait defines an abstract member self that refers to the pipe the operations will be applied to. All operations are working on this pipe as a source and can accept other pipes or other parameters. Extracting the operations into a trait allows us to reuse them in a different project simply extending the trait (see the code in the book examples for a concrete implementation). It also allow us to test them independently using the BddDsl trait in Scalding framework. I will write a specific post on the matter but you can check the unit tests on the trait in Scalding source or the documentation of the ScaldingUnit framework that was the original project the trait is based on.


trait EventSessionStats {
 import Dsl._

 def pipe: Pipe

 /**
 * Calculates the time passed between every user event and the previous one and add the information into field timeSincePrevious
 * Will insert -1 for first event
 *
 * INPUT_SCHEMA: EVENT
 * OUTPUT_SCHEMA: EVENTS_WITH_TIMESPAN
 */
 def addTimeSincePreviousUserEvent : Pipe =
 pipe
 .insert('tempPreviousEventTime, 0l)
 .groupBy('userId) { group =>
    group
    .sortBy('timestamp)
    .scanLeft( ('timestamp,'tempPreviousEventTime) -> ('tempTimestamp, 'timeSincePrevious) )( (0l, 0l) ) { (accumulator: (Long, Long), current: (Long, Long)) =>
      val previousEventTimestamp = accumulator._1
      val currentEventTimestamp = current._1

      val duration = if(previousEventTimestamp != 0l) (currentEventTimestamp - previousEventTimestamp) else -1l

      (currentEventTimestamp, duration)
    }
 }
 .filter('timestamp) { timestamp: Long => timestamp > 0 }
 .discard('tempPreviousEventTime)
 .discard('tempTimestamp)

 /**
 * Extracts sessions from events with time since previous info
 * A new session is defined every time an event is farther than the parameter sessionThreshold to the previous one
 * The sessionID is calculated by the provided idGenerator
 *
 * INPUT_SCHEMA: EVENT_WITH_TIMESPAN
 * OUTUPT_SCHEMA: EVENT_IN_SESSION
 */
 def extractSessions[T](idGenerator: IdGenerator[T], sessionThreshold: Long): Pipe =
 pipe
 .groupBy('userId) { group =>
   group
    .sortBy('timestamp)
    .scanLeft('timeSincePrevious -> 'sessionId) (idGenerator.first) { (currSessionId: idGenerator.IdType, timeSincePrevious: Long) =>
      if(timeSincePrevious > sessionThreshold)
        idGenerator.next(currSessionId)
      else
        currSessionId
    }
 }
 .filter('timestamp) { timestamp: Long => timestamp > 0 }
 .discard('timeSincePrevious)

 /**
 * Extracts for every session:
 *
 * - Session Duration
 * - Number of actions per sessions
 *
 * INPUT_SCHEMA: EVENT_IN_SESSION
 * OUTPUT_SCHEMA: SESSION_STAT
 */
 def extractPerSessionStats: Pipe =
 pipe
 .groupBy('userId, 'sessionId) { group =>
   group
   .sortBy('timestamp)
   .size('actionsPerSession)
   .toList[Long]('timestamp -> 'sessionTimes)
 }
 .map('sessionTimes -> 'sessionDuration) { sessionTimes: List[Long] =>
   sessionTimes.last - sessionTimes.head
 }
 .discard('sessionTimes)

 /**
 * Calculates the average values of the per-session stats
 *
 * INPUT_SCHEMA: SESSION_STAT
 * OUTPUT_SCHEMA: SESSION_AVERAGE_STAT
 */
 def extractAverageSessionStats: Pipe =
 pipe
 .groupBy('userId) { group =>
   group
    .size('sessionsPerUser)
    .average('sessionDuration -> 'avgSessionDuration)
    .average('actionsPerSession -> 'avgActionsPerSession )
 }
}

The trait is made available to your scalding jobs via the object EventSessionStats which content has to be imported in the job. Its purpose is to make the function in EventSessionStats available as Extension Methods to the Pipe and RichPipe classes

  • A class SampleJobPipeOperationsWrapper implementing the SampleJobPipeOperations trait and extending Serializable. The class needs to extend serializable because it might be transferred to different nodes of the cluster during the job execution;
  • An implicit conversions from Pipe to EventSessionStatsWrapper. Note that the only one conversion is generated by Scala since the class  has been declared as implicit. This will cause Scala to create an automatic implicit conversion for you (since Scala 2.10).  These conversions are a standard Scala mechanism used to provide Extension Methods (see //docs.scala-lang.org/overviews/core/value-classes.html#extension_methods) and will allow to use the new transformations as if they were part of the Pipe or RichPipe interface, thus enabling to write the code in the standard Scalding style.

object EventSessionStats {
 implicit class EventSessionStatsWrapper(val pipe: Pipe) extends AnyRef with EventSessionStats
}

Interactions

This pattern is at the base of the Dependency Injection one that will be presented in the next post. In particular the Dependency Injection Pattern can help us to handle the two parameters used in the definition of the user session, the session generator and the session threshold.

 

4
Share

Trackbacks for this post

Leave a Reply

-->