For the past 15 months, I have been working on a new library on and off. So far, I have been mostly silent about it, because I didn't feel like it was ready for a wider audience to use – even though we had been using it successfully in production for a while. However, since I broke my silence as long ago as April this year, when I did a talk about it at this year's ScalarConf in Warsaw, a blog post is overdue in which I explain what this library does and why I set out to write it in the first place.

Last year, I was involved in a project that required my team to implement a few Spark applications. For most of them, the business logic was rather complex, so we tried to implement this business logic in a test-driven way, using property-driven tests.

The pain of unit-testing Spark applications

At first glance, it looks like this is a great match. When it comes down to it, a Spark application consists of IO stages (reading from and writing to data sources) and transformations of data sets. The latter constitute our business logic and are relatively easy to separate from the IO parts. They are mostly built from pure functions. Functions like these are usually a perfect fit for test-driven development as well as for property-based testing.

However, all was not great. It may be old news to you if you have been working with Apache Spark for a while, but it turns out that writing real unit tests is not actually supported that well by Spark, and as a result, it can be quite painful. The thing is that in order to create an RDD, we always need a SparkContext, and the most light-weight mechanism for getting one is to create a local SparkContext. Creating a local SparkContext means that we start up a server, which takes a few seconds, and testing our properties with lots of different generated input data takes a really long time. Most certainly, we are losing the fast feedback loop we are used to from developing web applications, for example.

Abstracting over RDDs with kontextfrei

Now, we could confine ourselves to only unit-testing the functions that we pass to RDD operators, so that our unit tests do not have any dependency on Spark and can be verified as quickly as we are used to. However, this leaves quite a lot of business logic uncovered. Instead, at a Scala hackathon last May, I started to experiment with the idea of abstracting over Spark's RDD, and kontextfrei was born.

The idea is the following: By abstracting over RDD, we can write business logic that has no dependency on the RDD type. This means that we can also write test properties that are Spark-agnostic. Any Spark-agnostic code like this can either be executed on an RDD (which you would do in your actual Spark application and in your integration tests), or on a local and fast Scala collection (which is really great for unit tests that you continously run locally during development).

Obtaining the library

It's probably easier to show how this works than to describe it with words alone, so let's look at a really minimalistic example, the traditional word count. First, we need to add the necessary dependencies to our SBT build file. Kontextfrei consists of two different modules, kontextfrei-core and kontextfrei-scalatest. The former is what you need to abstract over RDD in your main code base, the former to get some additional support for writing your RDD-independent tests using ScalaTest with ScalaCheck. Let's add them to our build.sbt file, together with the usual Spark dependency you would need anyway:

resolvers += "dwestheide" at "https://dl.bintray.com/dwestheide/maven"
libraryDependencies += "com.danielwestheide" %% "kontextfrei-core-spark-2.2.0" % "0.6.0"
libraryDependencies += "com.danielwestheide" %% "kontextfrei-scalatest-spark-2.2.0" % "0.6.0" % "test,it"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.0"

Please note that in this simple example, we create a Spark application that you can execute in a self-contained way. In the real world, you would add spark-core as a provided dependency and create an assembly JAR that you pass to spark-submit.

Implementing the business logic

Now, let's see how we can implement the business logic of our word count application using kontextfrei. In our example, we define all of our business logic in a trait called WordCount:

package com.danielwestheide.kontextfrei.wordcount

import com.danielwestheide.kontextfrei.DCollectionOps
import com.danielwestheide.kontextfrei.syntax.SyntaxSupport

trait WordCount extends SyntaxSupport {

  def counts[F[_]: DCollectionOps](text: F[String]): F[(String, Long)] =
    text
      .flatMap(line => line.split(" "))
      .map(word => (word, 1L))
      .reduceByKey(_ + _)
      .sortBy(_._2, ascending = false)

  def formatted[F[_]: DCollectionOps](counts: F[(String, Long)]): F[String] =
    counts.map {
      case (word, count) => s"$word,$count"
    }
}

The first thing you'll notice is that the implementations of counts and formatted look exactly the same as they would if you were programming against Spark's RDD type. You could literally copy and paste RDD-based code into a program written with kontextfrei.

The second thing you notice is that the method signatures of counts and formatted contain a type constructor, declared as F[_], which is constrained by a context bound: For any concrete type constructor we pass in here, there must be an instance of kontextfrei's DCollectionOps typeclass. In our business logic, we do not care what concrete type constructor is used for F, as long as the operations defined in DCollectionOps are supported for it. This way, we are liberating our business logic from any dependency on Spark, and specifically on the annoying SparkContext.

In order to be able to use the familiar syntax we know from the RDD type, we mix in kontextfrei's SyntaxSupport trait, but you could just as well use an import instead, if that's more to your liking.

Plugging our business logic into the Spark application

At the end of the day, we want to be able to have a runnable Spark application. In order to achieve that, we must plug our Spark-agnostic business logic together with the Spark-dependent IO parts of our application. Here is what this looks like in our word count example:

package com.danielwestheide.kontextfrei.wordcount

import com.danielwestheide.kontextfrei.rdd.RDDOpsSupport
import org.apache.spark.SparkContext

object Main extends App with WordCount with RDDOpsSupport {

  implicit val sparkContext: SparkContext =
    new SparkContext("local[1]", "word-count")

  val inputFilePath = args(0)
  val outputFilePath = args(1)

  try {

    val textFile   = sparkContext.textFile(inputFilePath, minPartitions = 2)
    val wordCounts = counts(textFile)
    formatted(wordCounts).saveAsTextFile(outputFilePath)

  } finally {
    sparkContext.stop()
  }
}

Our Main object mixes in our WordCount trait as well as kontextfrei's RDDOpsSupport, which proves to the compiler that we have an instance of the DCollectionOps typeclass for the RDD type constructor. In order to prove this, we also need an implicit SparkContext. Again, instead of mixing in this trait, we can also use an import.

Now, our Main object is all about doing some IO and integrating our business logi into it.

Writing Spark-agnostic tests

So far so good. We have liberated our business logic from any dependency on Spark, but what do we gain from this? Well, now we are able to write our unit tests in a Spark-agnostic way as well. First, we define a BaseSpec which inherits from kontextfrei's KontextfreiSpec and mixes in a few other goodies from kontextfrei-scalatest and from ScalaTest itself:

package com.danielwestheide.kontextfrei.wordcount

import com.danielwestheide.kontextfrei.scalatest.KontextfreiSpec
import com.danielwestheide.kontextfrei.syntax.DistributionSyntaxSupport
import org.scalactic.anyvals.PosInt
import org.scalatest.prop.GeneratorDrivenPropertyChecks
import org.scalatest.{MustMatchers, PropSpecLike}

trait BaseSpec[F[_]]
    extends KontextfreiSpec[F]
    with DistributionSyntaxSupport
    with PropSpecLike
    with GeneratorDrivenPropertyChecks
    with MustMatchers {

  implicit val config: PropertyCheckConfiguration =
    PropertyCheckConfiguration(minSuccessful = PosInt(100))
}

BaseSpec, like our WordCount trait, takes a type constructor, which it simply passes along to the KontextfreiSpec trait. We will get back to that one in a minute.

Our actual test properties can now be implemented for any type constructor F[_] for which there is an instance of DCollectionOps. We define them in a trait WordCountProperties, which also has to be parameterized by a type constructor:

package com.danielwestheide.kontextfrei.wordcount

trait WordCountProperties[F[_]] extends BaseSpec[F] with WordCount {

  import collection.immutable._

  property("sums word counts across lines") {
    forAll { (wordA: String) =>
      whenever(wordA.nonEmpty) {
        val wordB = wordA.reverse + wordA
        val result =
          counts(Seq(s"$wordB $wordA $wordB", wordB).distributed)
            .collectAsMap()
        assert(result(wordB) === 3)
      }
    }
  }

  property("does not have duplicate keys") {
    forAll { (wordA: String) =>
      whenever(wordA.nonEmpty) {
        val wordB = wordA.reverse + wordA
        val result =
          counts(Seq(s"$wordA $wordB", s"$wordB $wordA").distributed)
        assert(
          result.keys.distinct().collect().toList === result.keys
            .collect()
            .toList)
      }
    }
  }

}

We want to be able to test our Spark-agnostic properties both against fast Scala collections as well as against RDDs in a local Spark cluster. To get there, we will need to define two test classes, one in the test sources directory, the other one in the it sources directory. Here is the unit test:

package com.danielwestheide.kontextfrei.wordcount

import com.danielwestheide.kontextfrei.scalatest.StreamSpec

class WordCountSpec extends BaseSpec[Stream]
  with StreamSpec
  with WordCountProperties[Stream]

We mix in BaseSpec and pass it the Stream type constructor. Stream has the same shape as RDD, but it is a Scala collection. The KontextfreiSpec trait extended by BaseSpec defines an abstract implicit DCollectionOps for its type constructor. By mixing in StreamSpec, we get an instance of DCollectionOps for Stream. When we implement our business logic, we can run the WordCountSpec test and get instantaneous feedback. We can use SBT's triggered execution and have it run our unit tests upon every detected source change, using ~test, and it will be really fast.

In order to make sure that none of the typical bugs that you would only notice in a Spark cluster have sneaked in, we also define an integration test, which tests exactly the same properties:

package com.danielwestheide.kontextfrei.wordcount

import com.danielwestheide.kontextfrei.scalatest.RDDSpec
import org.apache.spark.rdd.RDD

class WordCountIntegrationSpec extends BaseSpec[RDD]
  with RDDSpec
  with WordCountProperties[RDD]

This time, we mix in RDDSpec because we pass parameterize BaseSpec with the RDD type constructor.

Design goals

It was an explicit design goal to stick to the existing Spark API as closely as possible, allowing people with existing Spark code bases to switch to kontextfrei as smoothly as possible, or even to migrate parts of their application without too much hassle, with the benefit of now being able to cover their business logic with missing tests without the usual pain.

An alternative to this, of course, would have been to build this library based on the ever popular interpreter pattern. To be honest, I wish Spark itself was using this pattern – other libraries like Apache Crunch have shown successfully that this can help tremendously with enabling developers to write tests for the business logic of their applications. If Spark was built on those very principles, there wouldn't ne any reason for kontextfrei to exist at all.

Limitations

kontextfrei is still a young library, and while we have been using it in production in one project, I do not know of any other adopters. One if its limitations is that it doesn't yet support all operations defined on the RDD type – but we are getting closer. In addition, I have yet to find a clever way to support broadcast variables and accumulators. And of course, who is using RDDs anyway in 2017? While I do think that there is still room for RDD-based Spark applications, I am aware that many people have long moved on to Datasets and to Spark Streaming. It would be nice to create a similar typeclass-based abstraction for datasets and for streaming applications, but I haven't had the time to look deeper into what would be necessary to implement either of those.

Summary

kontextfrei is a Scala library that aims to provide developers with a faster feedback loop when developing Apache Spark applications. To achieve that, it enables you to write the business logic of your Spark application, as well as your test code, against an abstraction over Spark’s RDD.

I would love to hear your thoughts on this approach. Do you think it's worth it defining the biggest typeclass ever and reimplementing the RDD logic for Scala collections for test purposes? Please, if this looks interesting, do try it out. I am always interested in feedback and in contributions of all kind.