Building SparkPi in Python with Flask

These instructions will help you to create a SparkPi microservice using the Python language and the Flask framework.

You should already have the necessary prerequisites installed and configured, but if not please review the instructions.

Create the application source file

This application is relatively compact and will only need a single source file for the implementation we have chosen. By default, the Python language source-to-image builder will look for a file named app.py in the root of your repository as the entry point for the application. This can be modified by passing a parameter to the build command, but for now let’s keep things simple and create this file.

Into your new source file, you should place these contents:

import os

from flask import Flask
from flask import request
from pyspark.sql import SparkSession


app = Flask(__name__)


def produce_pi(scale):
    spark = SparkSession.builder.appName("PythonPi").getOrCreate()
    n = 100000 * scale

    def f(_):
        from random import random
        x = random()
        y = random()
        return 1 if x ** 2 + y ** 2 <= 1 else 0

    count = spark.sparkContext.parallelize(
        xrange(1, n + 1), scale).map(f).reduce(lambda x, y: x + y)
    spark.stop()
    pi = 4.0 * count / n
    return pi


@app.route("/")
def index():
    return "Python Flask SparkPi server running. Add the 'sparkpi' route to this URL to invoke the app."


@app.route("/sparkpi")
def sparkpi():
    scale = int(request.args.get('scale', 2))
    pi = produce_pi(scale)
    response = "Pi is roughly {}".format(pi)

    return response


if __name__ == "__main__":
    port = int(os.environ.get("PORT", 8080))
    app.run(host='0.0.0.0', port=port)

Your project directory should now look like this:

$ ls
app.py

Analysis of the source code

Let us now take a look at the individual statements of the source file and break down what each section is doing.

At the beginning of the file we import the modules and classes that will be necessary for operation. You can see that we are including a few system modules, the Flask related pieces and the Spark session object.

import os

from flask import Flask
from flask import request
from pyspark.sql import SparkSession

The next statement creates an object that we will use to control the Flask framework.

app = Flask(__name__)

The first function we define is the heart of the processing that will be done. This function will be used by our /sparkpi endpoint to perform the calculations. We begin with the simple function declaration which allows the passing of a scale variable and then start to see our first hints of Apache Spark usage. We get a SparkSession object which will allow us to create processing instructions for and interact with the Apache Spark cluster.

def produce_pi(scale):
    spark = SparkSession.builder.appName("PythonPi").getOrCreate()

As the algorithm we are using is dependent on creating a number of random samples, the next line sets up the count of these samples. This application also uses a number of partitions to help distribute the workload. We use the passed in function variable scale to help us determine the number of samples to use.

n = 100000 * scale

The method we are using to estimate Pi requires that we generate a number of random X,Y coordinate pairs and then determine if each pair is inside or outside the radius of a circle. To assist in this process we will create a function which can simply return a 1 or a 0 if a randomly generated pair is inside or outside of a radius, respectively. We begin by declaring a function f, since this function will get passed a value that we do not need we simply declare it’s argument as _.

After the function declaration you can see that we import Python’s random.random function, we are importing this function here to help limit the scope of pickling that is needed by Spark’s task distribution process. We then use the random function to return a value between 0.0 and 1.0 for both an x and y value. This is perfect for the calculations we want to do as we can assume our imaginary circle’s radius is 1. You can see after the function declaration, we declare our 2 random points and then return a value indicating if they are inside or outside of the radius.

def f(_):
    from random import random
    x = random()
    y = random()
    return 1 if x ** 2 + y ** 2 <= 1 else 0

With our random point method declared we can now use our session object to get access to the SparkContext. The context object will allow us to run commands against our Apache Spark cluster.

The parallelize function will create a resilient distributed dataset(RDD) from our input, which in this case is simply a range of numbers. The RDD we create will have a number of entries based on the value n we caluclated previously, and it will be split into a number of slices defined by our scale variable. Partitioning the RDD will give us an added level of division for the work we are about to perform, Apache Spark can use this information to more thoroughly distribute the work over the cluster.

After we have defined the RDD to operate on, we then call the map function to apply our random point function f against all elements in the RDD. We are not actually using the values from the RDD and only determining whether a point is inside or outside, this explains why f can ignore its input and only needs to return a 1 or 0. Finally, we reduce all the results in our RDD by adding them together. This will give us the count of points inside the circle.

count = spark.sparkContext.parallelize(
    xrange(1, n + 1), scale).map(f).reduce(lambda x, y: x + y)

Next we stop the spark context.

spark.stop()

Now that we have the number of random points inside the circle and we know the total number of samples, we can compute our estimate for Pi. This statement simply finds the ratio of points inside to outside the cirlce, then multiples that ratio by 4 to produce our estimate. Finally we return the result of our calculation.

pi = 4.0 * count / n
return pi

The next function we define will respond to requests at the root index (/) of our service. The first line of this is Flask’s route decorator which provides a convenient method for creating endpoint functions. The body of the function simply returns a string that we would like to display for our users. This endpoint will allow us to confirm that the server is running without needing to invoke Spark.

@app.route("/")
def index():
    return "Python Flask SparkPi server running. Add the 'sparkpi' route to this URL to invoke the app."

The third function gets into the core of this application, it is the sparkpi endpoint which will calculate Pi for the user. The first few lines look very similar to our index function, we use the route decorator to describe the sparkpi endpoint.

@app.route("/sparkpi")
def sparkpi():

As noted earlier, we will use a number of partitions for the calculation of Pi. By default we will operate with only 2 partitions, but the user can increase these numbers by specifying the number of partitions. This option is achieved by using an argument parameter in the GET path. On the next line, you can see that we use Flask’s Request object to detect if the user has requested a number of partitions.

scale = int(request.args.get('scale', 2))

Now that we have the number of partitions to use for our calculation, we call the produce_pi function we defined earlier to perform the processing. It will return the estimated value of Pi which we will then format into a text string before returning the result to the user.

pi = produce_pi(scale)
response = "Pi is roughly {}".format(pi)

return response

The last part of this source file is a little bit of Python that will detect if the file is being called directly as an application, and if so it will start the HTTP server. You can see that on the second line we get the port by looking to the environment for a PORT variable and if it does not exist we use 8080 as the port, we use this method of setting the port to allow ourselves the opportunity of changing listening ports if desired. Lastly, we use the Flask application object to run the server, it is import that we instruct the server to listen on 0.0.0.0 and not 127.0.0.1 as this will be running in a container and the localhost address will not resolve as expected.

if __name__ == "__main__":
    port = int(os.environ.get("PORT", 8080))
    app.run(host='0.0.0.0', port=port)

Add dependencies

There is one additional file that we will need to make our application work. If you are familiar with Python dependency management then you may have seen requirements.txt files before. This file is used by the source-to-image builder to install any extra dependencies we may need. Since we are using Flask for our HTTP framework, and it is not part of Python’s default packages, we will need to install it through this file.

Create a file named requirements.txt in the root of your project and add the following contents. This line will ensure that the proper version of Flask is installed into our application image.

Flask==0.12.1

Your project directory should now look like this:

$ ls
app.py  requirements.txt

Commit your code

The last step before we can build and run our application is to check in the files and push them to your repository. If you have followed the setup instructions and cloned your repository from an upstream of your creation, this should be as simple as running the following commands:

git add .
git commit -m "add initial files"
git push

Make sure to note the location of your remote repository as you will need it in the next step.

Build and run the application

Now that all your files have been created, checked in and pushed to your online repository you are ready to command OpenShift to build and run your application. The following command will start the process, you can see that we are telling OpenShift to use the oshinko-python-spark-build-dc template for our application. This template contains the necessary components to invoke the Oshinko source-to-image builder. We also give our application a name and tell the builder where to find our source code. Issue the following command, making sure to enter your repository location for the GIT_URI parameter:

oc new-app --template oshinko-python-spark-build-dc  \
    -p APPLICATION_NAME=sparkpi \
    -p GIT_URI=https://github.com/radanalyticsio/tutorial-sparkpi-python-flask

Running this command should look something like this:

$ oc new-app --template oshinko-python-spark-build-dc  \
>     -p APPLICATION_NAME=sparkpi \
>     -p GIT_URI=https://github.com/radanalyticsio/tutorial-sparkpi-python-flask.git
--> Deploying template "pi/oshinko-python-spark-build-dc" to project pi

     PySpark
     ---------
     Create a buildconfig, imagestream and deploymentconfig using source-to-image and pyspark source hosted in git


     * With parameters:
        * Application Name=sparkpi
        * Git Repository URL=https://github.com/radanalyticsio/tutorial-sparkpi-python-flask.git
        * Application Arguments=
        * spark-submit Options=
        * Git Reference=
        * OSHINKO_CLUSTER_NAME=
        * OSHINKO_NAMED_CONFIG=
        * OSHINKO_SPARK_DRIVER_CONFIG=
        * OSHINKO_DEL_CLUSTER=true
        * APP_FILE=

--> Creating resources ...
    imagestream "sparkpi" created
    buildconfig "sparkpi" created
    deploymentconfig "sparkpi" created
    service "sparkpi" created
--> Success
    Build scheduled, use 'oc logs -f bc/sparkpi' to track its progress.
    Run 'oc status' to view your app.

Your application is now being built on OpenShift!

A common task when building and running applications on OpenShift is to monitor the logs. You can even see a suggestion at the bottom of the oc new-app command output that suggests we run oc logs -f bc/sparkpi. Running this command will follow(-f) the BuildConfig(bc) for your application sparkpi. When you run that command you should see something that begins like this:

$ oc logs -f bc/sparkpi
Cloning "https://github.com/radanalyticsio/tutorial-sparkpi-python-flask.git" ...
        Commit: c8dbb96247c51ea8f13a7dfcf38fc37221378bbe (convert range to xrange)
        Author: Michael McCune <msm@redhat.com>
        Date:   Thu Aug 24 10:48:19 2017 -0400
Pulling image "radanalyticsio/radanalytics-pyspark" ...
---> Installing application source ...
---> Installing dependencies ...
You are using pip version 7.1.0, however version 9.0.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.
Collecting Flask==0.12.1 (from -r requirements.txt (line 1))
...

The output from this call may be quite long depending on the steps required to build the application, but at the end you should see the source-to-image builder pushing the newly created image into OpenShift. You may or may not see all the "Pushed" status lines due to output buffer logging, but at the end you should see "Push successful", like this:

Pushing image 172.30.1.1:5000/pi/sparkpi:latest ...
Pushed 0/19 layers, 0% complete
Pushed 1/19 layers, 5% complete
Push successful

To follow the progress further you will need to see the logs from the DeploymentConfig(dc) for your application. This can be done by changing the object type in your logs command like this oc logs -f dc/sparkpi. If you are quick, you might catch the log messages from OpenShift deploying your application:

$ oc logs -f dc/sparkpi
--> Scaling sparkpi-1 to 1
--> Waiting up to 10m0s for pods in rc sparkpi-1 to become ready
--> Success

If you see this output, it just means that you have caught the logs before the DeploymentConfig has generated anything from your application. Run the command again and you should start to see the output from the application, which should be similar to this:

$ oc logs -f dc/sparkpi
version 1
Didn't find cluster cluster-qlcvtk, creating ephemeral cluster
Using ephemeral cluster cluster-qlcvtk
Waiting for spark master http://cluster-qlcvtk-ui:8080 to be available ...
Waiting for spark master http://cluster-qlcvtk-ui:8080 to be available ...
Waiting for spark master http://cluster-qlcvtk-ui:8080 to be available ...
Waiting for spark master http://cluster-qlcvtk-ui:8080 to be available ...
Waiting for spark master http://cluster-qlcvtk-ui:8080 to be available ...
Waiting for spark workers (1/0 alive) ...
Waiting for spark workers (1/1 alive) ...
All spark workers alive
spark-submit --master spark://cluster-qlcvtk:7077 /opt/app-root/src/app.py
 * Running on http://0.0.0.0:8080/ (Press CTRL+C to quit)

Let’s break this down a little. These first few lines are actually being generated by the Oshinko source-to-image tooling. They show that no Apache Spark cluster has been specified for the application, and as such it must create an ephemeral cluster. It then waits for the cluster to become fully active before launching the application.

On the last two lines you see the spark-submit command which will run the application and the output from Flask informing us that it is listening on the host and port we specified.

With your application now running on OpenShift please return to the My First RADanalytics Application page to learn how to interact with this new microservice.

You can find a reference implementation of this application in the RADanalytics GitHub organization at https://github.com/radanalyticsio/tutorial-sparkpi-python-flask