Applying TDD to Scalding Development

In this post I would like to show a structured approach to allow a proper TDD process during the development of Scalding jobs. The post will contain a lot of code so, for the more impatient, let’s clearly state where we are heading to. The target of the testing approach I’m describing here is to allow to test a subset of the transformations composing your Scalding job in isolation, without too much boilerplate, simply specifying the input condition and the expected output. We need also to verify that the compositions of all these operations in the final Scalding job operates as required.

That means that our tests should operate at two levels:

  • Unit Tests: operating on a relatively small sets of operations
  • Integration Tests: operating at job level

In order to be able to do tests at both level we need to structure our code in a way that allows to extract a series of operation into a block that can be tested (i.e. has an expressible meaning) and we need some support to allow us to tests both these operations in isolation and the final job as a whole.

The way to organise our code in order to extract meaningful blocks of operations to be tested is described in External Operations Pattern I discussed in one of my previous posts. I won’t describe it again here so please have a look at the article for more details. It  provides us the scaffolding to support Unit Test of our code while The framework support to write unit and integration tests is provided by Scalding via two main classes/traits:

  •  The BddDsl trait supports testing a single or a set of operation using a very compact BDD-like DSL
  • The JobTest class supports the integration test of your complete job

The Code We Are Testing

As a first thing, let’s introduce the code of the job we want to test. The program we want to develop and test is responsible of calculating statistics on user sessions (average session duration and average number of action per sessions per user) on the basis of a series of session event logs containing a timestamp, a user ID an action description and some action specific data.

The object eventSchemas defines the shape of the data we are processing since the ingestion (EVENT) to the output (SESSION_AVERAGE_STATS)

object eventSchemas {
 val EVENT = List('timestamp, 'userId, 'action, 'actionData)
 val EVENT_WITH_TIMESPAN = EVENT ::: List('timeSincePrevious)
 val EVENT_IN_SESSION = EVENT ::: List('sessionId)
 val SESSION_STAT = List('userId, 'sessionDuration, 'actionsPerSession)
 val SESSION_AVERAGE_STAT = List('userId, 'sessionsPerUser, 'avgSessionDuration, 'avgActionsPerSession)
}

The job code (already refactored applying the external operations pattern) is:

import com.twitter.scalding._

trait IdGenerator[T] {
  type IdType = T

  def first: T
  def next(current: T): T
}

class ExternalOperationsJob(args: Args) extends Job(args) {
 import eventSchemas._
 import EventSessionStats._

 val simpleIdGenerator = new IdGenerator[Long] {
    override def first: Long = 0l

    override def next(current: Long): Long = current + 1
 }

 val SESSION_THREASHOLD = 100l

 // Actions: enter, click, drag
 Tsv(args("event.source.path"), EVENT)
    .read
    .addTimeSincePreviousUserEvent
    .extractSessions(simpleIdGenerator, SESSION_THREASHOLD)
    .extractPerSessionStats
    .extractAverageSessionStats
    .write(Csv(args("output.path"))
}

Where the macro operations addTimeSincePreviousUserEvent, extractSession, extractPerSessionStats and extractAverageSessionStats are defined in a separate Trait:

trait EventSessionStats {
 import Dsl._

 def pipe: Pipe

 /**
 * Calculates the time passed between every user event and the previous one and add the information into field timeSincePrevious
 * Will insert -1 for first event
 *
 * INPUT_SCHEMA: EVENT
 * OUTPUT_SCHEMA: EVENTS_WITH_TIMESPAN
 */
 def addTimeSincePreviousUserEvent : Pipe =
 pipe
 .insert('tempPreviousEventTime, 0l)
 .groupBy('userId) { group =>
    group
    .sortBy('timestamp)
    .scanLeft( ('timestamp,'tempPreviousEventTime) -> ('tempTimestamp, 'timeSincePrevious) )( (0l, 0l) ) { (accumulator: (Long, Long), current: (Long, Long)) =>
      val previousEventTimestamp = accumulator._1
      val currentEventTimestamp = current._1

      val duration = if(previousEventTimestamp != 0l) (currentEventTimestamp - previousEventTimestamp) else -1l

      (currentEventTimestamp, duration)
    }
 }
 .filter('timestamp) { timestamp: Long => timestamp > 0 }
 .discard('tempPreviousEventTime)
 .discard('tempTimestamp)

 /**
 * Extracts sessions from events with time since previous info
 * A new session is defined every time an event is farther than the parameter sessionThreshold to the previous one
 * The sessionID is calculated by the provided idGenerator
 *
 * INPUT_SCHEMA: EVENT_WITH_TIMESPAN
 * OUTUPT_SCHEMA: EVENT_IN_SESSION
 */
 def extractSessions[T](idGenerator: IdGenerator[T], sessionThreshold: Long): Pipe =
 pipe
 .groupBy('userId) { group =>
   group
    .sortBy('timestamp)
    .scanLeft('timeSincePrevious -> 'sessionId) (idGenerator.first) { (currSessionId: idGenerator.IdType, timeSincePrevious: Long) =>
      if(timeSincePrevious > sessionThreshold)
        idGenerator.next(currSessionId)
      else
        currSessionId
    }
 }
 .filter('timestamp) { timestamp: Long => timestamp > 0 }
 .discard('timeSincePrevious)

 /**
 * Extracts for every session:
 *
 * - Session Duration
 * - Number of actions per sessions
 *
 * INPUT_SCHEMA: EVENT_IN_SESSION
 * OUTPUT_SCHEMA: SESSION_STAT
 */
 def extractPerSessionStats: Pipe =
 pipe
 .groupBy('userId, 'sessionId) { group =>
   group
   .sortBy('timestamp)
   .size('actionsPerSession)
   .toList[Long]('timestamp -> 'sessionTimes)
 }
 .map('sessionTimes -> 'sessionDuration) { sessionTimes: List[Long] =>
   sessionTimes.last - sessionTimes.head
 }
 .discard('sessionTimes)

 /**
 * Calculates the average values of the per-session stats
 *
 * INPUT_SCHEMA: SESSION_STAT
 * OUTPUT_SCHEMA: SESSION_AVERAGE_STAT
 */
 def extractAverageSessionStats: Pipe =
 pipe
 .groupBy('userId) { group =>
   group
    .size('sessionsPerUser)
    .average('sessionDuration -> 'avgSessionDuration)
    .average('actionsPerSession -> 'avgActionsPerSession )
 }
}

And is made available to be used with the fluent notation using the Scala Extension Methods technique:

object EventSessionStats {
 implicit class EventSessionStatsWrapper(val pipe: Pipe) extends AnyRef with EventSessionStats
}

Unit Testing our Code: BddDsl

In the code presented above we see four main steps that can be tested in isolation: addTimeSincePreviousUserEvent, extractSessions, extractPerSessionStats and extractAverageSessionStats. I’m going to present here just some simple tests on the extractSessions function since that is possible the more complex one in the job. A wider description of the capabilities and syntax can be found in the GitHub Scalding repository.

The code below shows part of the tests we should do on our function, ideally when we are writing it to guide us to specify the correct logic. There are many missing (such as testing the usage of the IdGenerator) but the code should be enough to give you an idea.


class ExternalOperationsSpec extends FlatSpec with Matchers with BddDsl  {
  import eventSchemas._
  import EventSessionStats._

  val simpleIdGenerator =  new IdGenerator[Long] {
    override def first: Long = 0l

    override def next(current: Long): Long = current + 1
  }

  behavior of "extractSessions"

  it should "group operations closer than threshold in the same session" in {
    Given {
      List(
        (1000, "user1", "action1", "data", -1),
        (1100, "user1", "action2", "data", 100),
        (1300, "user1", "action3", "data", 200)
      ) withSchema EVENT_WITH_TIMESPAN
    } When {
      pipe: Pipe => pipe.extractSessions(simpleIdGenerator, 500l)
    } Then {
      buffer: mutable.Buffer[(Long, String, String, String, Long)] =>
        buffer.toList should equal (
          List(
            (1000, "user1", "action1", "data", 0l),
            (1100, "user1", "action2", "data", 0l),
            (1300, "user1", "action3", "data", 0l)
          )
        )
    }
  }

  it should "split operations farther than threshold in different sessions" in {
    Given {
      List(
        (1000, "user1", "action1", "data", -1),
        (1100, "user1", "action2", "data", 100),
        (1700, "user1", "action3", "data", 600)
      ) withSchema EVENT_WITH_TIMESPAN
    } When {
      pipe: Pipe => pipe.extractSessions(simpleIdGenerator, 500l)
    } Then {
      buffer: mutable.Buffer[(Long, String, String, String, Long)] =>
        buffer.toList should equal (
          List(
            (1000, "user1", "action1", "data", 0l),
            (1100, "user1", "action2", "data", 0l),
            (1700, "user1", "action3", "data", 1l)
          )
        )
    }
  }

  it should "split operations for different users in different sessions" in {
    Given {
      List(
        (1000, "user1", "action1", "data", -1),
        (1100, "user1", "action2", "data", 100),
        (1700, "user1", "action3", "data", 600),
        (1700, "user2", "action4", "data", -1)
      ) withSchema EVENT_WITH_TIMESPAN
    } When {
      pipe: Pipe => pipe.extractSessions(simpleIdGenerator, 500l)
    } Then {
      buffer: mutable.Buffer[(Long, String, String, String, Long)] =>
        buffer.toList should equal (
          List(
            (1000, "user1", "action1", "data", 0l),
            (1100, "user1", "action2", "data", 0l),
            (1700, "user1", "action3", "data", 1l),
            (1700, "user2", "action4", "data", 0l)
          )
        )
    }
  }
}

This kind of tests can be also written assembly a set of operation to see how the interact, doing a sort of limited-scope integration test.

The BddDsl trait is available in Scalding since version 0.9.1. If you need to use version 0.8.x you can use the open source project ScaldingUnit that was the original implementation and has been then merged into Scalding.

Basic Job-Level Test Support in Scalding: JobTest

The class com.twitter.scalding.JobTest class gives you a black box view of your job allowing to test it providing the input data programmatically and doing assertion on the generate data. Everything without being involved on the format of the data itself but just focusing on the data content. It allows you to validate also how the job arguments are handled. Here is a simple end to end test for our session statistics job:

class ExternalOperationsE2ESpec extends FlatSpec with Matchers {
  import eventSchemas._
  import Dsl._

  behavior of "Average Session Stats Calcuation"

  it should "Extract average session stats from user access events" in {
      val events = List(
        (1000, "1", "get", "session1_event1"),
        (1010, "1", "click", "session1_event2"),
        (1020, "1", "click", "session1_event3"),
        (1030, "1", "put", "session1_event4"),

        (1100, "2", "get", "session1_event1"),
        (1110, "2", "click", "session1_event2"),
        (1160, "2", "put", "session1_event3"),

        (1200, "1", "get", "session2_event1"),
        (1210, "1", "click", "session2_event2"),
        (1260, "1", "put", "session2_event3")
      )


    JobTest(classOf[ExternalOperationsJob].getName)
      .arg("event.source.path", "eventsIn")
      .arg("output.path", "statsOut")
      .source(Tsv("eventsIn", EVENT), events)
      .sink(Csv("statsOut")) { buffer: mutable.Buffer[(String, Long, Long, Long)] =>
        buffer.toList.sortBy(_._1) should equal (
            List(
              ("1", 2, 45.0, 3.5),
              ("2", 1, 60.0, 3.0)
            )
        )
      }
      .run
  }
}

 

Mixing it All Together: TDD with Scalding

Having provided all the different ingredients to cook your Scalding Job’s test. Just a few words on how to mix them. The approach is the classical TDD one so I won’t spend much time on it. I just would like to describe how I’m trying to reach the best code and function coverage minimising the test cost effort.

The approach I’m usually following is to:

  • Identify the macro operations and split my code using the External Operations Pattern
  • Develop my external operations guided by my unit tests, written using the BddDsl trait. The tests are:
    • Testing as through as possible the transformation specification with good description and validation of all corner cases
    • Ignoring fields that are not necessary for the specific computation. This is not shown in my examples but, if your data structure is quite wide, you can decide to simplify your tests and improve their readability exploiting the fact that with the field API, fields that are not explicitly referenced are just passed through your operations. That allows you to
      • Test your function with a minimal schema, i.e. only the fields that are explicitly referenced
      • Add some test to validate that the extra fields are not swallowed during the processing (as it might happen when doing group operations) or are not clashing with some temporary fields or with some join
  • Write some reduced-scope integration tests to verify that some specific subset of operations is actually writing well together. This is usually necessary to verify that the output schema generated by one of the operations is actually consistent with the  input one required by the following one. This to protect the consistency of the code when doing maintenance or refactorings since the Field API doesn’t support well this cases
  • Doing integration tests using the JobTest class to validate:
    • The end to end logic, testing mostly the main cases, since the corner cases should have been covered by the unit tests above
    • The argument management
4
Share

Leave a Reply