Building a Spark PI Microservice with Scalatra

These instructions will help you to create a SparkPi microservice using Scala and the Scalatra framework. In addition, this project will also demonstrate how to unit test your Spark jobs using Spark Testing Base.

You should already have the necessary prerequisites installed and configured, but if not please review the instructions.

Create the Build

This project is built using SBT. First, create the following file/directory structure

tutorial-sparkpi-scala-scalatra (root)
    \_ build.sbt
    \_ project
        \_ Dependencies.scala
        \_ build.properties
        \_ plugins.sbt

The primary build definition is contained in build.sbt (analogous to pom.xml for Maven or build.gradle for Gradle), however, there are also configurations that are applied project wide from the project directory. For an application as small and as simple as this, these configurations can be superfluous (ie: they could reside directly in the build.sbt file). However, they become much more important in complex multi-module builds. Therefore, as a matter of best practice, this tutorial will be set up that way.

The file build.properties should be generated for you if you are using an IDE like Eclipse or IntelliJ IDEA. In this case it will be created by hand. For now, it simply contains the SBT version that the project is built with.

sbt.version = 1.0.2

The plugins.sbt file enables plugins globally for the project by adding extra settings and tasks. For this project, there are a few plugins required to enable the Scalatra framework. There are also additional plugins added to enable the project to be built and packaged as a deployable "fat jar".

logLevel := Level.Warn

resolvers += "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/"

addSbtPlugin( "com.eed3si9n" % "sbt-assembly" % "0.14.6" )
addSbtPlugin( "com.typesafe.sbt" % "sbt-native-packager" % "1.3.2" )
addSbtPlugin( "com.typesafe.sbt" % "sbt-twirl" % "1.3.13" )
addSbtPlugin( "org.scalatra.sbt" % "sbt-scalatra" % "1.0.2" )

Dependencies.scala is for dependency management. This centralizes all project dependencies and versions in one place. As mentioned above, this is overkill for a simple project like this, however, for complex multi-module builds it is invaluable in preventing configuration drift and dependency conflicts.

import sbt._

object Dependencies {

    val slf4jVersion = "1.7.5"
    val logbackVersion = "1.2.3"
    val sparkVersion = "2.3.0"
    val scalaTestVersion = "3.0.4"
    val scalatraVersion = "2.5.4"
    val jettyWebappVersion = "9.2.19.v20160908"
    val servletApiVersion = "3.1.0"
    val sparkTestBaseVersion = "2.2.0_0.8.0"

    val slf4j = Seq( "org.slf4j" % "slf4j-api" % slf4jVersion )

    val logback = Seq( "ch.qos.logback" % "logback-classic" % logbackVersion )

    val scalaTest = Seq( "org.scalatest" %% "scalatest" % scalaTestVersion % "test" )

    val scalatra = Seq( "org.scalatra" %% "scalatra" % scalatraVersion,
                        "org.scalatra" %% "scalatra-scalatest" % scalatraVersion % "test",
                        "org.eclipse.jetty" % "jetty-webapp" % jettyWebappVersion,
                        "javax.servlet" % "javax.servlet-api" % servletApiVersion )

    val spark = Seq( "org.apache.spark" %% "spark-core" % sparkVersion % "provided" )

    val sparkTestBase = Seq( "com.holdenkarau" %% "spark-testing-base" % sparkTestBaseVersion % "test" )


}

Finally, we come to the build definition itself, build.sbt. Comments have been added inline to the source code to explain the various components of the build.

import sbt._
import Dependencies._
organization := "io.radanalytics"
name := "tutorial-sparkpi-scala-scalatra"
version := "0.0.1-SNAPSHOT"
scalaVersion in ThisBuild := "2.11.11"

// 1. This is where SBT can reach out to resolve dependencies. SBT uses Apache Ivy to resolve dependencies by default, but can work with Maven repositories as well
resolvers += Resolver.sbtPluginRepo( "releases" )
resolvers += Classpaths.typesafeReleases
resolvers in ThisBuild ++= Seq( "Sonatype releases" at "https://oss.sonatype.org/content/repositories/releases",
                                "Spray IO Repository" at "http://repo.spray.io/",
                                "Maven Central" at "https://repo1.maven.org/maven2/",
                                "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/" )

// 2. Define the class to run when calling "java -jar ..."
mainClass in(Compile, run) := Some( "io.radanalytics.examples.scalatra.sparkpi.Main" )

// 3. Build the project to Java JAR conventions and add metadata to make Scalatra run
enablePlugins( JavaAppPackaging )
enablePlugins( ScalatraPlugin )


// 4. Add the project dependencies, see project/Dependencies.scala for dependency management
libraryDependencies ++= slf4j ++ logback ++ scalatra ++ scalaTest ++ spark ++ sparkTestBase


// 5. Deployment of this artifact should be part of a CI/CD pipeline. Running the unit tests while building the "fat jar" is very expensive,
//    therefore, don't do it during the "assembly" phase (which will be run on Openshift).
test in assembly := {}

// 6. Resolve any conflicts when merging into a "fat jar"
assemblyMergeStrategy in assembly := {
    case PathList( "META-INF", "MANIFEST.MF" ) => MergeStrategy.discard
    case PathList( "reference.conf" ) => MergeStrategy.concat
    case x => MergeStrategy.last
}

Create the SparkPi Job

Now it is time to implement the actual Spark job that will calculate π. For this example, the code is adapted directly from the SparkPi Example. The job will be encapsulated inside of an object that exposes only one method calculate(). Create a new Scala class, src/main/scala/io/radanalytics/examples/scalatra/sparkpi/SparkPi.scala and implement calculate() the method as shown below:

package io.radanalytics.examples.scalatra.sparkpi

import org.apache.spark.SparkContext

import scala.math.random

class SparkPI( spark : SparkContext, scale : Int ) {

    val applicationName = "Spark PI Scalatra Tutorial"

    def calculate( ) : Double = {
        val n = math.min( 100000L * scale, Int.MaxValue ).toInt // avoid overflow
        val count = spark.parallelize( 1 until n, scale ).map( i => {
            val x = random
            val y = random
            if ( x * x + y * y < 1 ) 1 else 0
        } ).reduce( _ + _ )
        4.0 * count / ( n - 1 )
    }

}

Test the Spark Job

Thankfully, there is a library that will help enable the testing of Spark jobs in a unit test like environment. This framework, by using some utilities from hadoop-minicluster, can stand up an entire Spark environment inside of a Scalatest fixture, execute jobs, and compare results. The SparkPi example relies on random numbers for it’s computation, which makes it hard to truly unit test. However, for real life use cases, Spark Testing Base includes a slew of testing capabilities including RDD Comparisons, Data Frame comparisons, and Spark Streaming test utilities.

As a reference, an example of a Spark unit test using this framework would look like the following file, src/test/scala/io/radanalytics/examples/scalatra/sparkpi/SparkPiTest.scala:

package io.radanalytics.examples.scalatra.sparkpi

import com.holdenkarau.spark.testing.SharedSparkContext
import org.scalatest.FlatSpec
import org.slf4j.{Logger, LoggerFactory}

class SparkPiTest extends FlatSpec with SharedSparkContext {

    val LOG : Logger = LoggerFactory.getLogger( this.getClass )

    "SparkPI" should "calculate to scale 2" in {
        val sparkPi : Double = new SparkPI( sc, 2 ).calculate()

        LOG.info( "--------------------------------------------" )
        LOG.info( s"---   Pi is roughly + $sparkPi" )
        LOG.info( "--------------------------------------------" )

        // NOTE - here is where you would put assertions, however, comparing floating point numbers that use random
        //        numbers in the algorithm is tricky so we don't do it here
        assert( true )
    }

}

Implement the Service Endpoint

Scalatra is designed from the ground up to be an easy to use microservice framework. It is based on the similarly named Ruby framework, but with a Scala DSL and idioms. Setting up a SparkPI service is fairly easy and only requires the following:

1) Implement the servlet in src/main/scala/io/radanalytics/examples/scalatra/sparkpi/SparkPiServlet.scala to handle requests. This handler uses the SparkPi.scala class that was implemented in the last step to perform the calculation.

package io.radanalytics.examples.scalatra.sparkpi

import org.apache.spark.{SparkConf, SparkContext}
import org.scalatra.{Ok, ScalatraServlet}

class SparkPiServlet extends ScalatraServlet {

    get("/"){
        Ok( "Scala Scalatra SparkPi server running. Add the 'sparkpi' route to this URL to invoke the app." )
    }

    get( "/sparkpi" ) {
        val spark = new SparkContext( new SparkConf().setAppName( "RADAnalytics IO Scalatra Tutorial" ) )
        val sparkPi = new SparkPI( spark,2 ).calculate()
        spark.stop()
        Ok( "Pi is roughly " + sparkPi )
    }
}

2) Setup the Scalatra initialization and plumbing. Scalatra will want to default this class to being called ScalatraBootstrap in the default package. This behavior will be overridden in the bootstrap of the application to encourage better code organization. This class will be located in src/main/scala/io/radanalytics/examples/scalatra/sparkpi/ScalatraInit.scala.

package io.radanalytics.examples.scalatra.sparkpi

import javax.servlet.ServletContext
import org.scalatra.LifeCycle

class ScalatraInit extends LifeCycle {

    override def init( context : ServletContext ) {
        context.mount( classOf[ SparkPiServlet ], "/*" )
    }

}

3) Implement the main application class, which will bootstrap Jetty and bind the SparkPiServlet so that it can service requests. Notice the addition of an init parameter, which overrides the Scalatra default mentioned in the previous step. This class, called src/main/scala/io/radanalytics/examples/scalatra/sparkpi/Main.scala, is shown below.

package io.radanalytics.examples.scalatra.sparkpi

import org.eclipse.jetty.server.Server
import org.eclipse.jetty.servlet.DefaultServlet
import org.eclipse.jetty.webapp.WebAppContext
import org.scalatra.servlet.ScalatraListener

object Main {

    def main( args : Array[ String ] ) : Unit = {
        val port = 8080
        val server = new Server( port )
        val context = new WebAppContext()

        context.setContextPath( "/" )
        context.setResourceBase( "src/main/webapp" )
        context.setInitParameter( ScalatraListener.LifeCycleKey, "io.radanalytics.examples.scalatra.sparkpi.ScalatraInit" ) // Override the Scalatra default for ScalatraBootstrap in default package
        context.addEventListener( new ScalatraListener )

        server.setHandler( context )
        server.start()
        server.join()
    }

}

Build the Application and Verify Locally

The directory structure should now look like this:

tutorial-sparkpi-scala-scalatra (root)
    \_ build.sbt
    \_ project
        \_ Dependencies.scala
        \_ build.properties
        \_ plugins.sbt
    \_ src
        \_ main
            \_ scala
                \_ io
                    \_ radanalytics
                        \_ examples
                            \_ scalatra
                                \_ sparkpi
                                    \_ Main.scala
                                    \_ ScalatraInit.scala
                                    \_ SparkPi.scala
                                    \_ SparkPiServlet.scala
        \_ test
            \_ scala
                \_ io
                    \_ radanalytics
                        \_ examples
                            \_ scalatra
                                \_ sparkpi
                                    \_ SparkPiTest.scala

At this point it is possible to build and unit test the application locally. In the context of this tutorial, which is to deploy the application to Openshift, this step is optional. However, it can be helpful in validating that the code builds and passes all the tests before being deployed. If you have SBT installed, you can run the tests and generate the executable jar with the following command:

# if this is your first SBT build this could take quite some time.
sbt clean test assembly

If this build succeeds, the deployable "fat jar" will be located in the following location:

tutorial-sparkpi-scala-scalatra (root)
    \_ build.sbt
    ...
    \_ target
        \_scala-2.11
            \_ tutorial-sparkpi-scala-scalatra-assembly-0.0.1-SNAPSHOT.jar

This deployment can be tested by running java -jar target/scala-2.11/tutorial-sparkpi-scala-scalatra-assembly-0.0.1-SNAPSHOT.jar. The service should start, but will fail when servicing requests, unless you have a Spark cluster running locally.

Commit your code and push it to a hosted Git repository

Now that the core project code has been implemented, it’s time to push it up to a hosted source code repository. In this example, GitHub will be the source code host, however, any Git hosting service (such as GitLab or BitBucket) will also work. Start by creating a GitHub repository for the project. Then, the following commands will add all the source code to the project and push it to GitHub.

cd <project_directory>  # hint: this is the directory that contains build.sbt

# initialize the local Git repository
git init
git add --all
git commit -m "implement SparkPI microservice with Scalatra"

# add the remote repo and push to GitHub (or other Git hosting provider)
git remote add origin <git_repo_url>
git push origin master

The code is now hosted on GitHub and will be available to the s2i builder in the next step.

Deploy the Application to Openshift

The application can be deployed to Openshift using the RADAnalytics s2i images by executing the following command:

# replace <git_repo_url> with the GitHub repository link from the previous step
GIT_REPO_URL=<your_git_repo_url>

oc new-app --template oshinko-scala-spark-build-dc \
    -p APPLICATION_NAME=sparkpi \
    -p GIT_URI=$GIT_REPO_URL \
    -p APP_MAIN_CLASS=io.radanalytics.examples.scalatra.sparkpi.Main \
    -p APP_FILE=tutorial-sparkpi-scala-scalatra-assembly-0.0.1-SNAPSHOT.jar \
    -p SBT_ARGS="clean assembly" \
    -p APP_ARGS="-Xms1024M, -Xmx2048M, -XX:MaxMetaspace=1024M"

Use oc logs -f bc/sparkpi to tail the logs. The assembly task from SBT will be run, so it could take some time to complete. Once the application is ready you can expose it with oc expose svc/sparkpi.

Now that the service is up and running, you can return to the My First RADanalytics Application to learn how to interact with this new microservice.

Supplementary Materials and Further Reading