Let’s talk about tests. The Apache Spark official guide doesn’t suggest an approach for testing, so we have to do the first steps ourselves.
First, we should understand what we are going to achieve with testing. Often, developers think all testing is unit testing. Actually, there are a lot of tests: unit, integration, component, contract, end-to-end, you name it. I used to separate it into two categories: unit tests and integration tests. The names are unimportant. The border between these terms is vague. Let’s look at what I mean by those words.
Unit Testing
By “unit” I mean isolating code for the test. It doesn’t matter what it would be: a function, a class, or a whole package. The main thing that makes something a “unit” is the isolation of part of your code.
Pros of unit tests:
- These tests are fast and small. When a test fails, you are able to find mistakes quickly
- It’s easy to achieve a wide coverage of code. When you test component A, which has 3 test cases, and component B, which has 3 test cases, you need to write only 6 tests. But for the integration test with components A and B, you need to test any combination of these cases, so 3 * 3 = 9 cases
Cons:
- A lot of code. You need to write a lot of tests to get a whole system tested, even with small coverage. On the other hand, it’s possible to test a whole system with several integration tests
- Unit tests often require a lot of mocking, which makes it harder to write
- It becomes completely unreadable with a lot of mocking
- Unit tests fix the implementation of your functions
There is one big problem with unit testing. I deliberately didn’t add the commonly mentioned advantage of unit testing: fearless refactoring. Because the last drawback ruins this item completely: you are able to do only simple things within a function, like renaming or method extraction. But often you want to completely rewrite some part of a system without breaking it. And this is a place where integration tests come to the rescue.
Integration/e2e Testing
The main reasons why I love integration tests:
- You can cover a big part of your code with a few tests
- You can throw away a big part of your system and write it from scratch without rewriting integration tests
This makes it a good choice for a beginning, when you have requirements that change as the project progresses. I split integration tests into two categories. One involves infrastructure (databases, HTTP, etc.) code. The other doesn’t. The latter makes this kind of integration testing look like a “big unit test”. Also, while you are testing a bigger part with a smaller test code, those tests can be considered as documentation.
What We Have Now
Let’s define some “lab rat” code, for example:
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import java.util.Properties
class TestJob() {
def run(): Unit = {
val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.getOrCreate()
val users = spark.read
.jdbc("/database", "users", new Properties()) // simplified for example
val adults = users
.withColumn("full_name", concat(col("first_name"), lit(" "), col("last_name")))
.filter(col("age") >= lit(18))
.drop(col("first_name"))
.drop(col("last_name"))
adults.write
.jdbc("/database", "adults", new Properties()) // simplified for example
}
}Unit testing with Spark
To test your code in Spark, you must divide your code into at least 2 parts: domain/computation, and input/output. When your Spark transformation doesn’t have code that writes or reads to/from an outer world, it’s really easy to write unit tests:
// class TestJob() ...
def run(users: DataFrame): DataFrame = {
return users
.withColumn... // the same as in TestJob original class
}And the test (using scalatest):
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.scalatest.freespec.AnyFreeSpec
class Test extends AnyFreeSpec {
"TestJob test" in {
implicit val spark: SparkSession = SparkSession.builder()
.master("local[1]")
.getOrCreate()
import spark.implicits._
val users = Seq(
("John", "Johnson", 17),
("Henry", "Petrovich", 18),
("Harry", "Harrison", 19)
).toDF("first_name", "last_name", "age")
val adults = new TestJob(users).run()
val expected = Seq(
("Henry Petrovich", 18),
("Harry Harrison", 19)
).toDF("full_name", "age")
val diff = adults.except(expected)
.union(expected.except(adults))
diff.count() should be 0
}
}Here we are asserting that data frames are the same by .except method that removes rows from the first dataset that exists in the second dataset. Of course, it’s just a simple demo. In the real world, you should write a custom assert function with at least a printing of the difference in case of an error. An example of such a function can be found in the repository for this article.
Integration/e2e Testing with Spark
The simplest way to write e2e tests for Spark is to use TestContainers to run real databases in Docker, set up input data, and, after a Spark job finishes, check output tables. We won’t discuss it in this article, because it doesn’t have any Spark-specific details, and I think those tests are going to be really long-running, making them boring. This is not our way. Instead, let’s use dependency injection to abstract read sources and write targets. Also, let’s add Guice as a dependency injection framework:
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.scalatest.freespec.AnyFreeSpec
import javax.inject.Inject
import java.util.Properties
trait UsersDao {
def getAll: DataFrame
}
class SparkUsersDao @Inject()(spark: SparkSession) extends UsersDao {
override def getAll: DataFrame = spark.read
.jdbc("/database", "users", new Properties())
}
trait AdultsDao {
def save(dataFrame: DataFrame)
}
class SparkAdultsDao @Inject()(spark: SparkSession) extends AdultsDao {
override def save(dataFrame: DataFrame): Unit = dataFrame.write
.jdbc("/database", "adults", new Properties())
}
class TestJob @Inject()(spark: SparkSession,
usersDao: UsersDao,
adultsDao: AdultsDao) {
def run(): Unit = {
val users = usersDao.getAll
val adults = users
.withColumn... // the same as in TestJob original class
adultsDao.save(adults)
}
}Now it becomes easy to mock implementations with Mockito or Scalamock and write integration tests.
Also, we can consider another mocking option. When we are reading and writing, we are using Spark, why not mock it? Is it possible? Yes, but I don’t recommend doing this, despite the convenience of using such a mock, Spark was not designed to be replaced with a mock, so mocking Spark will involve a lot of reflection usage. And reflection leads to unstable code: library developers usually provide some backward compatibility for open API, but don’t (and they shouldn’t) provide any guarantees about private API.
But if you are really curious, you can see at SparkStub class in the repository, an example that contains an experimental implementation of that approach. Now the test job will look like:
class TestJob @Inject()(spark: SparkSession) {
def run(): Unit = {
spark.read... // the same as in TestJob original class
val adults = users
.withColumn... // the same as in TestJob original class
spark.save... // the same as in TestJob original class
}
}But the call of this method will look like the screenshot below:
import com.sysgears.{DataFrames, SparkStub}
// ...
implicit val spark: SparkSession = SparkStub.create()
// ...
DataFrames.threadLocal.addTableToRead("jdbc", "users", users)
new TestJob(spark).run()
DataFrames.threadLocal.written("jdbc", "adults").show()In this case, the Spark context was mocked by SparkStub util, and – using the Mockito implementation of SparkSession and Dataset classes – was modified to read from and write to DataFrames.threadLocal (that actually is just a map with dataframes).
Cucumber
Foreword. This paragraph may require a basic knowledge of Cucumber. For this article, it’s enough to think about Cucumber as a middle layer that provides human-readable BDD test syntax. Instead of writing Scala test code as above, we are going to get this syntax:
Feature: Cucumber demo
Scenario: run test job
Given table: "users" has data:
| first_name STRING | last_name STRING | age INT |
| John | Petrovich | 17 |
| Henry | Johnson | 18 |
| Harry | Potter | 19 |
When test job is started
Then table: "adults" was written with data:
| full_name STRING | age INT |
| Henry Johnson | 18 |
| Harry Potter | 19 |If you want to dive deeper into Cucumber, you can start by reading the official introduction here and/or reading about Cucumber’s DataTable here.
We can slightly increase readability. Let’s use the infix method call, and define methods named: ! as a unary operator for creating a row builder, binary | to add more values to a row, and a method | with no parameters that transform the builder to a dataframe. So we can define dataframes as in the screenshot below:
import com.sysgears.DataFrameBuilder._
// ...
implicit val spark: SparkSession = // ...
val users: DataFrame =
! "first_name" | "last_name" | "age" |
! "John" | "Johnson" | 17 |
! "Henry" | "Petrovich" | 18 |
! "Harry" | "Harrison" | 19 |The full code for DataFrameBuilder can be found in the repository.
Another approach is to use Cucumber for providing a human-readable syntax. I’m not going to dwell on connecting Cucumber to your project; it’s well described in the official documentation and other guides. Let’s jump directly to the step definitions.
First of all, we need to store datasets somewhere that can be read and written. A sample version of this will look like this:
class DataFrames() {
val readDataFrames = mutable.HashMap[String, DataFrame]
val writeDataFrames = mutable.HashMap[String, DataFrame]
}A more mature version can be found in the repository, but for our example, it will be enough. Next, we should define the DataFrame data type to allow conversion from Cucumber’s DataTable to Spark DataFrame:
import io.cucumber.java.en.{Given, Then}
import io.cucumber.java.DataTableType
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import scala.collection.JavaConversions._
import scala.language.implicitConversions
class SparkSteps @Inject()(spark: SparkSession, dataFrames: DataFrames) {
private implicit val sparkSession: SparkSession = spark
@DataTableType
def toDataFrame(table: DataTable): DataFrame = {
def parseValue(`type`: String, value: String) = {
`type` match {
case "BOOLEAN" => value.toBoolean
case "BYTE" | "TINYINT" => value.toByte
... // the same for other spark types
case _ => value
}
}
val header = dataTable.asLists().get(0)
val namesAndTypes = header
.map { nameAndType => nameAndType.split(" ") }
.map { nameAndTypeSplit => (nameAndTypeSplit(0), nameAndTypeSplit(1)) }
spark.createDataFrame(
dataTable.asMaps()
.map { rows => rows.map { case (key, value) => (key.split(" ")(0), value) } }
.map { nameAndValueRows =>
namesAndTypes
.map { case (name, typ) =>
parseValue(typ, nameAndValueRows.getOrDefault(name, ""))
}
}
.map(Row.fromSeq(_)),
StructType.fromDDL((header ++ defaultsHeader).mkString(", "))
)
}
}Next, step definitions for input data and output assertion:
@Given("table: {string} has data:")
def setDataFrame(table: String, dataFrame: DataFrame): Unit = {
dataFrames.read.put(table, dataFrame)
}
@Then("table: {string} was written with data:")
def assertDataFrame(table: String, expected: DataFrame): Unit = {
val actual = dataFrames.write(table)
val diff = actual.except(expected)
.union(expected.except(actual))
diff.count() should be 0
}Next, we must define a step for running the job. We considered three options above in this article, let’s use the first one here:
@When("test job is started")
def runTestJob() = {
val adults = job.run(dataFrames.read("users"))
dataFrames.write.put("adults", adults)
}Eventually, we can write our test:
Feature: Cucumber demo
Scenario: run test job
Given table: "users" has data:
| first_name STRING | last_name STRING | age INT |
| John | Petrovich | 17 |
| Henry | Johnson | 18 |
| Harry | Potter | 19 |
When test job is started
Then table: "adults" was written with data:
| full_name STRING | age INT |
| Henry Johnson | 18 |
| Harry Potter | 19 |Looks good, doesn’t it?
Conclusions
So what approach should you choose? Let’s define some pros and cons.
Unit testing. Easy to set up, Easy to write. But requires your transformation to be isolated from the input and output code. Also, it forces you to write a lot of assertion code.
Integration testing. The same as unit testing, but with dependency injection and some general interface for your DAOs, you can remove some boilerplate code. Doesn’t require the transformation logic to be isolated, but requires the input/output to be abstracted.
e2e. Good option, when you don’t have isolated logic and abstracted input/output/ But bear in mind that this requires real databases, and execution speed of these tests can be disappointing.
Cucumber vs plain Scala code. Cucumber is the perfect solution when you want to combine your code with documentation. But it will only work when you can cover all job start cases with several Cucumber steps (it may require some general interface for every Spark Job). Also, the format of the data table looks close to Spark’s .show() output, and sometimes it makes test writing easier.
What you can find in the repository:
- Test demo for this article,
DataFrameBuilderthat brings table syntax DSL to Scala (as in the beginning of the Cucumber section) SparkStubclass that allows you to mock Spark sessions,DataFramesclass that makes it easy to implement integration between the main code and tests- Steps definitions for Cucumber that allow you to set default columns, parsing tables as specific Scala classes
SysGears is happy to support you in doing magic with your data by implementing Big Data pipelines in the SMACK stack. Feel free to reach out to us at info@sysgears.com.
