mkdir -p src/main/java/io/radanalytics/examples/vertx
These instructions will help you to create a SparkPi microservice using the Java language and the Vert.x framework.
You should already have the necessary prerequisites installed and configured, but if not please review the instructions.
Although this application is relatively small overall, it is organized into three source files. If you are familiar with the structure of Java programs, you will know that the source files must be placed in the proper directories. To begin creating your source files, you will first need to create the directory structure for them. In the root of the new directory that you made for this tutorial, run the following command to make that structure:
mkdir -p src/main/java/io/radanalytics/examples/vertx
The first file to create is named SparkPiVerticle.java
and it will contain the starting point for your application.
This file also contains the code to create the Vert.x based HTTP routes and handlers. It should contain these contents:
package io.radanalytics.examples.vertx; import io.reactivex.Single; import io.vertx.core.Future; import io.vertx.core.json.JsonObject; import io.vertx.reactivex.core.AbstractVerticle; import io.vertx.reactivex.core.buffer.Buffer; import io.vertx.reactivex.core.http.HttpServer; import io.vertx.reactivex.core.http.HttpServerRequest; import io.vertx.reactivex.core.http.HttpServerResponse; import io.vertx.reactivex.ext.web.Router; import org.apache.log4j.Logger; public class SparkPiVerticle extends AbstractVerticle { private static final Logger log = Logger.getRootLogger(); private JsonObject prop = null; private SparkPiProducer pi; private Single<String> loadJarProperty() { if (prop != null) { return Single.just(prop.getString("jarfile")); } return vertx.fileSystem().rxReadFile("sparkpi.json") .map(Buffer::toJsonObject) .doOnSuccess(json -> prop = json) .map(json -> json.getString("jarfile")); } @Override public void start(Future<Void> fut) { // Create a router object. Router router = Router.router(vertx); router.route("/").handler(routingContext -> { HttpServerResponse response = routingContext.response(); response .putHeader("content-type", "text/html") .end("Java Vert.x SparkPi server running. Add the 'sparkpi' route to this URL to invoke the app."); }); router.route("/sparkpi").handler(routingContext -> { HttpServerResponse response = routingContext.response(); HttpServerRequest request = routingContext.request(); int scale = 2; if (request.params().get("scale") != null) { scale = Integer.parseInt(request.params().get("scale")); } int computedScale = scale; vertx.<String>rxExecuteBlocking(future -> future.complete(pi.getPi(computedScale))) .subscribe( res -> response .putHeader("content-type", "text/html") .end(res), routingContext::fail); }); Single<String> loaded = loadJarProperty() .doOnSuccess(jarFile -> { log.info("SparkPi submit jar is: " + jarFile); if (!SparkContextProvider.init(jarFile)) { throw new RuntimeException("This application is intended to be run as an oshinko S2I."); } pi = new SparkPiProducer(); }); Single<HttpServer> listening = vertx .createHttpServer() .requestHandler(router::accept) .rxListen(this.config().getInteger("http.port", 8080)); Single.merge(loaded, listening) .ignoreElements() .subscribe( fut::complete, fut::fail ); } }
The next file you will create is named SparkContextProvider.java
.
This file contains a helper class for creating the connection to the Apache Spark cluster.
It should contain these contents:
package io.radanalytics.examples.vertx; import javax.validation.constraints.NotNull; import org.apache.spark.SparkConf; import org.apache.spark.SparkException; import org.apache.spark.api.java.JavaSparkContext; public class SparkContextProvider { private static SparkContextProvider INSTANCE = null; private SparkConf sparkConf; private JavaSparkContext sparkContext; private SparkContextProvider() {} private SparkContextProvider(String jarFile) { this.sparkConf = new SparkConf().setAppName("JavaSparkPi"); this.sparkConf.setJars(new String[]{jarFile}); this.sparkContext = new JavaSparkContext(sparkConf); } public static boolean init(String jarFile) { try { if (INSTANCE == null) { INSTANCE = new SparkContextProvider(jarFile); } } catch (Exception e) { System.out.println(e.getMessage()); return false; } return true; } @NotNull public static JavaSparkContext getContext() { return INSTANCE.sparkContext; } }
The last source file should be named SparkPiProducer.java
and it contains a class that will perform the Pi calculations.
It should contain these contents:
package io.radanalytics.examples.vertx; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class SparkPiProducer implements Serializable { public String getPi(int scale) { JavaSparkContext jsc = SparkContextProvider.getContext(); int slices = scale; int n = 100000 * slices; List<Integer> l = new ArrayList<>(n); for (int i = 0; i < n; i++) { l.add(i); } JavaRDD<Integer> dataSet = jsc.parallelize(l, slices); int count = dataSet.map(integer -> { double x = Math.random() * 2 - 1; double y = Math.random() * 2 - 1; return (x * x + y * y < 1) ? 1 : 0; }).reduce((integer, integer2) -> integer + integer2); return "Pi is rouuuughly " + 4.0 * count / n; } }
With all the source files created your project directory should now look like this:
$ ls src $ find src -type f src/main/java/io/radanalytics/examples/vertx/SparkPiProducer.java src/main/java/io/radanalytics/examples/vertx/SparkPiVerticle.java src/main/java/io/radanalytics/examples/vertx/SparkContextProvider.java
Let us now take a look at the individual statements of the source files and break down what each component is doing.
To begin with we will start with the SparkPiVerticle.java
file.
This file defines the main entry class for our application, at the beginning of the file we define the namespace for
this source and include several classes and packages that will be needed:
package io.radanalytics.examples.vertx; import io.reactivex.Single; import io.vertx.core.Future; import io.vertx.core.json.JsonObject; import io.vertx.reactivex.core.AbstractVerticle; import io.vertx.reactivex.core.buffer.Buffer; import io.vertx.reactivex.core.http.HttpServer; import io.vertx.reactivex.core.http.HttpServerRequest; import io.vertx.reactivex.core.http.HttpServerResponse; import io.vertx.reactivex.ext.web.Router; import org.apache.log4j.Logger;
The next lines set up the class that will serve as our application’s entry point.
Verticles represent the components we wish to deploy in a Vert.x instance.
To create our Verticle
we simply extend the AbstractVerticle
class and write our concrete implementation.
public class SparkPiVerticle extends AbstractVerticle {
We start by defining a method to access the application properties asynchronously. This method returns the application’s Jar file location wrapped in a Single (a specialized type of Observable that returns a single item). This method will load the Jar location from the JSON file (using Vert.x’s builtin JSON reader and parser) and will notify any subscribed observers when it has successfully completed.
private Single<String> loadJarProperty() { if (prop != null) { return Single.just(prop.getString("jarfile")); } return vertx.fileSystem().rxReadFile("sparkpi.json") .map(Buffer::toJsonObject) .doOnSuccess(json -> prop = json) .map(json -> json.getString("jarfile")); }
We then implement the start
method which defines the behaviour of the Verticle
when it is deployed.
@Override public void start(Future<Void> fut) {
We instantiate the Vert.x router so that we are able to declare routes by writing
Router router = Router.router(vertx);
The first route function will register the root /
endpoint to simply return a string that we would like to display for our users.
This endpoint will allow us to confirm that the server is running without needing to invoke Spark.
router.route("/").handler(routingContext -> { HttpServerResponse response = routingContext.response(); response .putHeader("content-type", "text/html") .end("Java Vert.x SparkPi server running. Add the 'sparkpi' route to this URL to invoke the app."); });
The main route (or HTTP endpoint) to be defined is /sparkpi
.
This is will return the Pi estimate computed by the pi
instance.
We use Vert.x’s routing context to retrieve the scale parameter (if present) from the HTTP request.
The SparkPiProducer class does the actual work of calculating Pi and we pass it the requested scale value, defaulting to 2.
Since Vert.x follows an asynchronous design, concurrent calls to the /sparkpi
endpoint would in turn result into
concurrent calls to our Pi calculation service.
To specify that calls to the Spark cluster should be dealt differently, we wrap them in a ExecuteBlocking block.
This allows the code inside the block to execute in a thread from the thread pool, keeping its original context, rather
than running on the server’s worker thread.
router.route("/sparkpi").handler(routingContext -> { HttpServerResponse response = routingContext.response(); HttpServerRequest request = routingContext.request(); int scale = 2; if (request.params().get("scale") != null) { scale = Integer.parseInt(request.params().get("scale")); } int computedScale = scale; vertx.<String>rxExecuteBlocking(future -> future.complete(pi.getPi(computedScale))) .subscribe( res -> response .putHeader("content-type", "text/html") .end(res), routingContext::fail); });
Finally, we delegate the instantiation of the Pi calculation class for when the jar properties are successfully read:
Single<String> loaded = loadJarProperty() .doOnSuccess(jarFile -> { log.info("SparkPi submit jar is: " + jarFile); if (!SparkContextProvider.init(jarFile)) { throw new RuntimeException("This application is intended to be run as an oshinko S2I."); } pi = new SparkPiProducer(); });
Having defined the most important part of the Verticle
we can then start the Vert.x server by calling
Single<HttpServer> listening = vertx .createHttpServer() .requestHandler(router::accept) .rxListen(this.config().getInteger("http.port", 8080));
The next file we will examine is SparkContextProvider.java
, which will create a SparkContext using the singleton pattern.
The reasoning for this usage is to avoid threading conflicts with the Vert.x framework by having a singular connection to the Spark cluster.
As usual, at the beginning of the file we declare the package namespace for this file and include several classes and packages for usage.
package io.radanalytics.examples.vertx; import javax.validation.constraints.NotNull; import org.apache.spark.SparkConf; import org.apache.spark.SparkException; import org.apache.spark.api.java.JavaSparkContext;
Next we declare our provider class and set up a few internal variables. The static INSTANCE
will provide our concrete singular instantiation of this class which defines our singleton. The sparkConf
and sparkContext
variables are the actual connections to our Spark cluster.
public class SparkContextProvider { private static SparkContextProvider INSTANCE = null; private SparkConf sparkConf; private JavaSparkContext sparkContext;
Since this class will implement the singleton pattern, we make its constructors private to ensure that it will only be instantiated by the init
method. The second contructor function is the primary method here, it accepts the properties object and instantiates the internal private variables. The setJars
function will instruct Spark to associate our application Jar with the SparkConf object, and subsequently the Spark context.
private SparkContextProvider() {} private SparkContextProvider(String jarFile) { this.sparkConf = new SparkConf().setAppName("JavaSparkPi"); this.sparkConf.setJars(new String[]{jarFile}); this.sparkContext = new JavaSparkContext(sparkConf); }
The init
function is the main entry point for constructing the context provider. This function will simply check to determine if an instance has been created, and if not it will create that instance. As there is always the possibility of failure, this function will also catch any errors that result from spawning the new instance.
public static boolean init(SparkPiProperties props) { try { if (INSTANCE == null) { INSTANCE = new SparkContextProvider(props); } } catch (Exception e) { System.out.println(e.getMessage()); return false; } return true; }
The last function in this class is the primary means of interacting with the context. This function provides a convenient method for any other class to gain the Spark contenxt.
@NotNull public static JavaSparkContext getContext() { return INSTANCE.sparkContext; }
In addition to the source files we also need a few resource files to set default properties and configurations for our application. To begin creating your resource files you will first need to make a directory for them by running the following command from the root of your project:
mkdir -p src/main/resources
The first file you will create in that directory is named sparkpi.json
and it should contain the following contents:
{ "jarfile":"/opt/app-root/src/@project.name@-@project.version@.jar" }
This line may look familiar as we create a variable in the SparkPiProperties
class that will hold its value.
This will simply allow our build process to record the location of the Jar file for our application to utilize.
The next file you will create in the resources directory is named log4j.properties
and will define some options to the logging system used by our application. It should contain the following content:
log4j.rootLogger=INFO, stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p - %m%n
These configuration values will define the operation of the log4j logging system, for an extended explanation of their settings please see the Short introduction to log4j from the upstream documentation.
At this point your project directory should look like this:
$ ls src $ find src -type f src/main/java/io/radanalytics/examples/vertx/SparkContextProvider.java src/main/java/io/radanalytics/examples/vertx/SparkPiProducer.java src/main/java/io/radanalytics/examples/vertx/SparkController.java src/main/resources/log4j.properties src/main/resources/sparkpi.json
The last piece of our project is the build file.
If you are familiar with Java and the Maven build system then this file will look familiar.
Create a file name pom.xml
in the root of your project and add these contents to it:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>io.radanalytics.examples.vertx</groupId> <artifactId>sparkpi-app</artifactId> <version>1.0-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <!-- Be careful modifying these due to joint netty dependencies --> <vertx.version>3.5.3</vertx.version> <spark.version>2.3.0</spark.version> <doc.skip>true</doc.skip> </properties> <dependencies> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-core</artifactId> <version>${vertx.version}</version> </dependency> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-rx-java2</artifactId> <version>${vertx.version}</version> </dependency> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-web</artifactId> <version>${vertx.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>io.reactiverse</groupId> <artifactId>vertx-maven-plugin</artifactId> <version>1.0.15</version> <executions> <execution> <id>vmp</id> <goals> <goal>initialize</goal> <goal>package</goal> </goals> </execution> </executions> <configuration> <redeploy>true</redeploy> <classifier>vertx</classifier> <verticle>io.radanalytics.examples.vertx.SparkPiVerticle</verticle> </configuration> </plugin> </plugins> <resources> <resource> <directory>src/main/resources</directory> <filtering>true</filtering> </resource> </resources> </build> </project>
This file is quite verbose and an in-depth explanation of its working is out of scope for this tutorial. If you are interested in learning more about how the Maven build system works, this Maven in 5 minutes tutorial is a good starting point.
The root of your project should now look like this:
$ ls pom.xml src
The last step before we can build and run our application is to check in the files and push them to your repository. If you have followed the setup instructions and cloned your repository from an upstream of your creation, this should be as simple as running the following commands:
git add . git commit -m "add initial files" git push
Make sure to note the location of your remote repository as you will need it in the next step.
Now that all your files have been created, checked in and pushed to your online repository you are ready to command OpenShift to build and run your application.
The following command will start the process, you can see that we are telling OpenShift to use the oshinko-java-spark-build-dc
template for our application.
This template contains the necessary components to invoke the Oshinko source-to-image builder.
We also give our application a name, tell the builder where to find our source code and the name of the Jar file that will be produced.
Issue the following command, making sure to enter your repository location for the GIT_URI
parameter:
oc new-app --template oshinko-java-spark-build-dc \ -p APPLICATION_NAME=vertx-sparkpi \ -p GIT_URI=https://github.com/radanalyticsio/tutorial-sparkpi-java-vertx \ -p APP_FILE=sparkpi-app-1.0-SNAPSHOT-vertx.jar \ -p SPARK_OPTIONS='--driver-java-options="-Dvertx.cacheDirBase=/tmp/vertx-cache"'
Running this command should look something like this:
$ oc new-app --template oshinko-java-spark-build-dc \ -p APPLICATION_NAME=vertx-sparkpi \ -p GIT_URI=https://github.com/radanalyticsio/tutorial-sparkpi-java-vertx \ -p APP_FILE=sparkpi-app-1.0-SNAPSHOT-vertx.jar \ -p SPARK_OPTIONS='--driver-java-options="-Dvertx.cacheDirBase=/tmp/vertx-cache"' --> Deploying template "myproject/oshinko-java-spark-build-dc" to project myproject Apache Spark Java --------- Create a buildconfig, imagestream and deploymentconfig using source-to-image and Java Spark source files hosted in git' * With parameters: * APPLICATION_NAME=vertx-sparkpi * GIT_URI=https://github.com/radanalyticsio/tutorial-sparkpi-java-vertx * GIT_REF=master * CONTEXT_DIR= * APP_FILE=sparkpi-app-1.0-SNAPSHOT-vertx.jar * APP_ARGS= * APP_MAIN_CLASS= * SPARK_OPTIONS=--driver-java-options="-Dvertx.cacheDirBase=/tmp/vertx-cache" * OSHINKO_CLUSTER_NAME= * OSHINKO_NAMED_CONFIG= * OSHINKO_SPARK_DRIVER_CONFIG= * OSHINKO_DEL_CLUSTER=true --> Creating resources ... imagestream "vertx-sparkpi" created buildconfig "vertx-sparkpi" created deploymentconfig "vertx-sparkpi" created service "vertx-sparkpi" created --> Success Build scheduled, use 'oc logs -f bc/vertx-sparkpi' to track its progress. Application is not exposed. You can expose services to the outside world by executing one or more of the commands below: 'oc expose svc/vertx-sparkpi' Run 'oc status' to view your app.
Your application is now being built on OpenShift!
A common task when building and running applications on OpenShift is to monitor the logs.
You can even see a suggestion at the bottom of the oc new-app
command output that suggests we run oc logs -f bc/vertx-sparkpi
.
Running this command will follow(-f
) the BuildConfig(bc
) for your application sparkpi
.
When you run that command you should see something that begins like this:
Cloning "https://github.com/radanalyticsio/tutorial-sparkpi-java-vertx" ... Commit: 3dd2d911b86cc8c9a89b030a5cfa68b01bf28e7d (add scale query param) Author: Michael McCune <msm@redhat.com> Date: Thu Oct 19 17:12:59 2017 -0400 Pulling image "radanalyticsio/radanalytics-java-spark:stable" ... + [[ /bin/sh -c tar -C /tmp -xf - && /usr/local/s2i/assemble == *\/\u\s\r\/\l\o\c\a\l\/\s\2\i* ]] + exec /bin/sh -c 'tar -C /tmp -xf - && /usr/local/s2i/assemble' ================================================================== Starting S2I Java Build ..... S2I source build for Maven detected Found pom.xml ... Running 'mvn -Dmaven.repo.local=/tmp/artifacts/m2 package -DskipTests -Dmaven.javadoc.skip=true -Dmaven.site.skip=true -Dmaven.source.skip=true -Djacoco.skip=true -Dcheckstyle.skip=true -Dfindbugs.skip=true -Dpmd.skip=true -Dfabric8.skip=true -e -B ' Apache Maven 3.3.3 (7994120775791599e205a5524ec3e0dfe41d4a06; 2015-04-22T11:57:37+00:00) Maven home: /opt/maven Java version: 1.8.0_161, vendor: Oracle Corporation Java home: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.161-0.b14.el7_4.x86_64/jre Default locale: en_US, platform encoding: ANSI_X3.4-1968 OS name: "linux", version: "4.4.41-boot2docker", arch: "amd64", family: "unix" [INFO] Error stacktraces are turned on. [INFO] Scanning for projects... [INFO] [INFO] ------------------------------------------------------------------------ [INFO] Building sparkpi-app 1.0-SNAPSHOT [INFO] ------------------------------------------------------------------------ ...
The output from this call may be quite long depending on the steps required to build the application, but at the end you should see the source-to-image builder pushing the newly created image into OpenShift. You may or may not see all the "Pushed" status lines due to output buffer logging, but at the end you should see "Push successful", like this:
Pushing image 172.30.1.1:5000/myproject/vertx-sparkpi:latest ... Pushed 0/33 layers, 0% complete Pushed 1/33 layers, 3% complete Pushed 2/33 layers, 6% complete ... Push successful
To follow the progress further you will need to see the logs from the DeploymentConfig(dc
) for your application.
This can be done by changing the object type in your logs command like this oc logs -f dc/vertx-sparkpi
.
If you are quick, you might catch the log messages from OpenShift deploying your application:
$ oc logs -f dc/vertx-sparkpi --> Scaling vertx-sparkpi-1 to 1 --> Waiting up to 10m0s for pods in rc vertx-sparkpi-1 to become ready --> Success
If you see this output, it just means that you have caught the logs before the DeploymentConfig has generated anything from your application. Run the command again and you should start to see the output from the application, which should be similar to this:
$ oc logs -f dc/vertx-sparkpi + [[ /usr/local/s2i/run == *\/\u\s\r\/\l\o\c\a\l\/\s\2\i* ]] + exec /usr/local/s2i/run oshinko v0.5.3 Default spark image: radanalyticsio/openshift-spark:2.3-latest Didn't find cluster cluster-b38445, creating ephemeral cluster Using ephemeral cluster cluster-b38445 Waiting for spark master http://cluster-b38445-ui:8080 to be available ... Waiting for spark master http://cluster-b38445-ui:8080 to be available ... Waiting for spark master http://cluster-b38445-ui:8080 to be available ... Waiting for spark master http://cluster-b38445-ui:8080 to be available ... All spark workers alive Cluster configuration is [ { "namespace": "myproject", "name": "cluster-b38445", "href": "/clusters/cluster-b38445", "image": "radanalyticsio/openshift-spark:2.3-latest", "masterUrl": "spark://cluster-b38445:7077", "masterWebUrl": "http://cluster-b38445-ui:8080 ", "masterWebRoute": "cluster-b38445-ui-route-myproject.192.168.64.3.nip.io", "status": "Running", "workerCount": 1, "masterCount": 1, "Config": { "MasterCount": 1, "WorkerCount": 1, "Name": "", "SparkMasterConfig": "", "SparkWorkerConfig": "", "SparkImage": "radanalyticsio/openshift-spark:2.3-latest", "ExposeWebUI": "true", "Metrics": "false" }, "ephemeral": "vertx-sparkpi-1" } ] spark-submit --class io.vertx.core.Launcher --master spark://cluster-b38445:7077 --driver-java-options="-Dvertx.cacheDirBase=/tmp/vertx-cache" /opt/app-root/src/sparkpi-app-1.0-SNAPSHOT-vertx.jar
Let’s break this down a little. These first few lines are actually being generated by the Oshinko source-to-image tooling. They show that no Apache Spark cluster has been specified for the application, and as such it must create an ephemeral cluster. It then waits for the cluster to become fully active before launching the application.
The line beginning with spark-submit
shows us the command which will run the application and the output afterwards is coming from Vert.x informing us that the application is starting.
With your application now running on OpenShift please return to the My First RADanalytics Application page to learn how to interact with this new microservice.
You can find a reference implementation of this application in the RADanalytics GitHub organization at https://github.com/radanalyticsio/tutorial-sparkpi-java-vertx