Let us now take a look at the individual statements of the source file and break down what each section is doing.
At the beginning of the file we import the modules and classes that will be necessary for operation. You can see that we are including a few system modules, the Flask related pieces and the Spark session object.
import os
from flask import Flask
from flask import request
from pyspark.sql import SparkSession
The next statement creates an object that we will use to control the Flask framework.
The first function we define is the heart of the processing that will be done. This function will be used by our /sparkpi
endpoint to perform the calculations. We begin with the simple function declaration which allows the passing of a scale
variable and then start to see our first hints of Apache Spark usage. We get a SparkSession object which will allow us to create processing instructions for and interact with the Apache Spark cluster.
def produce_pi(scale):
spark = SparkSession.builder.appName("PythonPi").getOrCreate()
As the algorithm we are using is dependent on creating a number of random samples, the next line sets up the count of these samples. This application also uses a number of partitions to help distribute the workload. We use the passed in function variable scale
to help us determine the number of samples to use.
The method we are using to estimate Pi requires that we generate a number of random X,Y coordinate pairs and then determine if each pair is inside or outside the radius of a circle. To assist in this process we will create a function which can simply return a 1 or a 0 if a randomly generated pair is inside or outside of a radius, respectively. We begin by declaring a function f
, since this function will get passed a value that we do not need we simply declare it’s argument as _
.
After the function declaration you can see that we import Python’s random.random function, we are importing this function here to help limit the scope of pickling that is needed by Spark’s task distribution process. We then use the random
function to return a value between 0.0
and 1.0
for both an x
and y
value. This is perfect for the calculations we want to do as we can assume our imaginary circle’s radius is 1
. You can see after the function declaration, we declare our 2 random points and then return a value indicating if they are inside or outside of the radius.
def f(_):
from random import random
x = random()
y = random()
return 1 if x ** 2 + y ** 2 <= 1 else 0
With our random point method declared we can now use our session object to get access to the SparkContext. The context object will allow us to run commands against our Apache Spark cluster.
The parallelize
function will create a resilient distributed dataset(RDD) from our input, which in this case is simply a range of numbers. The RDD we create will have a number of entries based on the value n
we caluclated previously, and it will be split into a number of slices defined by our scale
variable. Partitioning the RDD will give us an added level of division for the work we are about to perform, Apache Spark can use this information to more thoroughly distribute the work over the cluster.
After we have defined the RDD to operate on, we then call the map
function to apply our random point function f
against all elements in the RDD. We are not actually using the values from the RDD and only determining whether a point is inside or outside, this explains why f
can ignore its input and only needs to return a 1 or 0. Finally, we reduce
all the results in our RDD by adding them together. This will give us the count of points inside the circle.
count = spark.sparkContext.parallelize(
xrange(1, n + 1), scale).map(f).reduce(lambda x, y: x + y)
Next we stop the spark context.
Now that we have the number of random points inside the circle and we know the total number of samples, we can compute our estimate for Pi. This statement simply finds the ratio of points inside to outside the cirlce, then multiples that ratio by 4 to produce our estimate. Finally we return the result of our calculation.
pi = 4.0 * count / n
return pi
The next function we define will respond to requests at the root index (/
) of our service. The first line of this is Flask’s route decorator which provides a convenient method for creating endpoint functions. The body of the function simply returns a string that we would like to display for our users. This endpoint will allow us to confirm that the server is running without needing to invoke Spark.
@app.route("/")
def index():
return "Python Flask SparkPi server running. Add the 'sparkpi' route to this URL to invoke the app."
The third function gets into the core of this application, it is the sparkpi
endpoint which will calculate Pi for the user. The first few lines look very similar to our index function, we use the route decorator to describe the sparkpi
endpoint.
@app.route("/sparkpi")
def sparkpi():
As noted earlier, we will use a number of partitions for the calculation of Pi. By default we will operate with only 2 partitions, but the user can increase these numbers by specifying the number of partitions. This option is achieved by using an argument parameter in the GET
path. On the next line, you can see that we use Flask’s Request object to detect if the user has requested a number of partitions.
scale = int(request.args.get('scale', 2))
Now that we have the number of partitions to use for our calculation, we call the produce_pi
function we defined earlier to perform the processing. It will return the estimated value of Pi which we will then format into a text string before returning the result to the user.
pi = produce_pi(scale)
response = "Pi is roughly {}".format(pi)
return response
The last part of this source file is a little bit of Python that will detect if the file is being called directly as an application, and if so it will start the HTTP server. You can see that on the second line we get the port by looking to the environment for a PORT
variable and if it does not exist we use 8080
as the port, we use this method of setting the port to allow ourselves the opportunity of changing listening ports if desired. Lastly, we use the Flask application object to run the server, it is import that we instruct the server to listen on 0.0.0.0
and not 127.0.0.1
as this will be running in a container and the localhost address will not resolve as expected.
if __name__ == "__main__":
port = int(os.environ.get("PORT", 8080))
app.run(host='0.0.0.0', port=port)