Machine Learning with Spark on Google Cloud Dataproc

In this post you will learn how to implement logistic regression using a machine learning library for Apache Spark running on a Google Cloud Dataproc cluster to develop a model for data from a multivariable dataset.

Google Cloud Dataproc is a fast, easy-to-use, fully-managed cloud service for running Apache Spark and Apache Hadoop clusters in a simple, cost-efficient way. Cloud Dataproc easily integrates with other Google Cloud Platform (GCP) services, giving you a powerful and complete platform for data processing, analytics and machine learning .

Apache Spark is an analytics engine for large scale data processing. Logistic regression is available as a module in Apache Spark’s machine learning library, MLlib. Spark MLlib, also called Spark ML, includes implementations for most standard machine learning algorithms such as k-means clustering, random forests, alternating least squares, k-means clustering, decision trees, support vector machines, etc. Spark can run on a Hadoop cluster, like Google Cloud Dataproc, in order to process very large datasets in parallel.

The base data set that is used provides historical information about internal flights in the United States retrieved from the US Bureau of Transport Statistics website.

  • Prepare the Spark interactive shell on a Google Cloud Dataproc cluster.
  • Create a training dataset for machine learning using Spark.
  • Develop a logistic regression machine learning model using Spark.
  • Evaluate the predictive behavior of a machine learning model using Spark on Google Cloud Datalab.

open GCP console

Cloud Console Menu

Prepare the Spark Interactive Shell

While you wait for Cloud Datalab to initialize on the cluster, start analyzing data using Spark with the PySpark interactive shell on the master node of the Cloud Dataproc cluster.

Click Compute Engine > VM Instances.

now wait a second as it is part of GCP’s data science training modules so you probably don’t have vms in your projects . so if you are cloud virgins search how to create a project in GCP create then move into dataproc section from navigation panel into GCP. create one dataproc cluster name it ch6cluster(just for copy paste issue you can name what so ever ). in worker nodes change it to 3 from 2(default). change the boot disk space from 500GB to lower that is convenient to you keep 200GB for master and 100GB for workers. click create remember to create the cluster in same zone where your bucket is located don’t have one then create one use a globally unique name (your projectid+random words). sing a song it will take upto 90 sec to create those clusters. now ssh into the masters .

Click Compute Engine > VM Instances.

Wait until you can see the ch6cluster-m virtual machine instance.

Click the SSH link for the ch6cluster-m virtual machine instance.

In the new SSH window create PROJECT and BUCKET environment variables by running the following commands:

export PROJECT_ID=$(gcloud info --format='value(config.project)')

export BUCKET=<your bucket name>

export ZONE=<your zone>

Launch PySpark to enter the Spark interactive shell:

pyspark

Wait until you see the interactive prompt. The cluster has been configured to deploy Python3 and configure Spark to use it.

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.4
      /_/

Using Python version 3.6.9 (default, Jul 30 2019 19:07:31)
SparkSession available as 'spark'.
>>>

The SparkContext variable sc and the SparkSession variable spark are pre-configured in both the interactive Spark shell and Cloud Datalab. You don’t need to create these when working with either of the interactive interfaces, but you will need to declare them in any standalone non-interactive code you develop.For the code used in this lab the following declarations are only required if you want to use the steps in standalone code.


from pyspark.sql import SparkSession
from pyspark import SparkContext
sc = SparkContext('local', 'logistic')
spark = SparkSession 
    .builder 
    .appName("Logistic regression w/ Spark ML") 
    .getOrCreate()

Once that code is added at the start of any Spark Python script, any code developed using the interactive Spark shell or Jupyter notebook will also work when launched as a standalone script.

Create a Spark Dataframe for Training

To get started on this lab you need to import the Logistic Regression classes.

Enter the following commands into the Pyspark interactive console session:

from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.regression import LabeledPoint

When pasting commands into the Pyspark console remember to press enter to make sure that the last command in any sequence is executed before you proceed to the next step.

Next, fetch the Cloud Storage bucket name from the environment variable you set earlier. Using that bucket you create the traindays DataFrame by reading in a prepared CSV that was copied into the bucket for you when the lab was launched. The CSV identifies a subset of days as valid for training. This allows you to create views of the entire flights dataset which is divided into a dataset used for training your model and a dataset used to test or validate that model. The details on how to create that CSV file are covered in earlier labs in this series.

Enter the following commands into the Pyspark interactive console session:

BUCKET=os.environ['BUCKET']
traindays = spark.read \
    .option("header", "true") \
    .csv('gs://{}/flights/trainday.csv'.format(BUCKET))

note these files will not be in your bucket if you really want to do the lab check googles data science training programme. or go ahead with your data set change respectively and see results.(you should need to pick a gb sized multi-classification problem you can check kaggle or bigquery for publicly available datasets)

Make this a Spark SQL view as well to make it easier to reuse later:

traindays.createOrReplaceTempView('traindays')

Query the first few records from the training dataset view:

spark.sql("SELECT * from traindays ORDER BY FL_DATE LIMIT 5").show()

This displays the first five records in the training table:

+----------+------------+
|   FL_DATE|is_train_day|
+----------+------------+
|2015-01-01|        True|
|2015-01-02|       False|
|2015-01-03|       False|
|2015-01-04|        True|
|2015-01-05|        True|
+----------+------------+

Now you create a schema for the actual flights dataset. As you have seen in previous labs, almost all of the fields in this dataset are strings apart from the four numeric fields for departure delaytaxi out timeflight distance, and arrival delay:

from pyspark.sql.types \
    import StringType, FloatType, StructType, StructField
header = 'FL_DATE,UNIQUE_CARRIER,AIRLINE_ID,CARRIER,FL_NUM,ORIGIN_AIRPORT_ID,ORIGIN_AIRPORT_SEQ_ID,ORIGIN_CITY_MARKET_ID,ORIGIN,DEST_AIRPORT_ID,DEST_AIRPORT_SEQ_ID,DEST_CITY_MARKET_ID,DEST,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,TAXI_OUT,WHEELS_OFF,WHEELS_ON,TAXI_IN,CRS_ARR_TIME,ARR_TIME,ARR_DELAY,CANCELLED,CANCELLATION_CODE,DIVERTED,DISTANCE,DEP_AIRPORT_LAT,DEP_AIRPORT_LON,DEP_AIRPORT_TZOFFSET,ARR_AIRPORT_LAT,ARR_AIRPORT_LON,ARR_AIRPORT_TZOFFSET,EVENT,NOTIFY_TIME'
def get_structfield(colname):
   if colname in ['ARR_DELAY', 'DEP_DELAY', 'DISTANCE', 'TAXI_OUT']:
      return StructField(colname, FloatType(), True)
   else:
      return StructField(colname, StringType(), True)

schema = StructType([get_structfield(colname) for colname in header.split(',')])

The next stage in the process is to identify the source data files.

You will use the all_flights-00004-* shard file for this, as it has a representative subset of the full dataset and can be processed in a reasonable amount of time:

inputs = 'gs://{}/flights/tzcorr/all_flights-00004-*'.format(BUCKET)

When processing the full dataset you can change the previous line to the following: #inputs = 'gs://{}/flights/tzcorr/all_flights-*'.format(BUCKET) # FULL

Now read the data into Spark SQL from the input file using the schema definition you created:

flights = spark.read\
            .schema(schema)\
            .csv(inputs)
flights.createOrReplaceTempView('flights')

Next, create a query that uses data only from days identified as part of the training dataset:

trainquery = """
SELECT
  F.DEP_DELAY,F.TAXI_OUT,f.ARR_DELAY,F.DISTANCE
FROM flights f
JOIN traindays t
ON f.FL_DATE == t.FL_DATE
WHERE
  t.is_train_day == 'True'
"""
traindata = spark.sql(trainquery)

Inspect some of the data to see if it looks correct:

traindata.head(2)

You may see a warning stating "Truncated the string representation of a plan since it was too large." You can safely ignore it for the purpose of this lab as it is only of relevance if you want to inspect SQL schema logs.

Your output should be something similar to the following:

[Row(DEP_DELAY=-2.0, TAXI_OUT=26.0, ARR_DELAY=0.0, DISTANCE=677.0), Row(DEP_DELAY=-2.0, TAXI_OUT=22.0, ARR_DELAY=3.
0, DISTANCE=451.0)]

Ask Spark to provide some analysis of the dataset:

traindata.describe().show()

This should output something similar to the following:

+-------+-----------+-----------+------------+------------+
|summary|  DEP_DELAY|   TAXI_OUT|   ARR_DELAY|    DISTANCE|
+-------+-----------+-----------+------------+------------+
|  count|     151446|     151373|      150945|      152566|
|   mean|      10.72|      16.11|        5.31|      837.42|
| stddev|      36.38|       8.89|       38.04|      623.04|
|    min|      -39.0|        1.0|       -68.0|        31.0|
|    max|     1393.0|      168.0|      1364.0|      4983.0|
+-------+-----------+-----------+------------+------------+

The mean and standard deviation values have been rounded to two decimal places for clarity in this table, but you will see the full floating point values on screen.

The table shows that there are some issues with the data. Not all of the records have values for all of the variables, there are different count stats for DISTANCEARR_DELAYDEP_DELAY and TAXI_OUT. This happens because:

  • Flights are scheduled but never depart
  • Some depart but are cancelled before take off
  • Some flights are diverted and therefore never arrive

These records have NULL values instead of the relevant field values and Spark’s describe function does not count them.

The best way to deal with this is to remove flights that have been cancelled or diverted using the following query:

trainquery = """
SELECT
  DEP_DELAY, TAXI_OUT, ARR_DELAY, DISTANCE
FROM flights f
JOIN traindays t
ON f.FL_DATE == t.FL_DATE
WHERE
  t.is_train_day == 'True' AND
  f.CANCELLED == '0.00' AND
  f.DIVERTED == '0.00'
"""
traindata = spark.sql(trainquery)
traindata.describe().show()

This should output something similar to the following, indicating that you have correctly dealt with the problem:

+-------+-----------+-----------+------------+------------+
|summary|  DEP_DELAY|   TAXI_OUT|   ARR_DELAY|    DISTANCE|
+-------+-----------+-----------+------------+------------+
|  count|     150945|     150945|      150945|      150945|

Develop a Logistic Regression Model

Now you can create a function that converts a set of data points in your DataFrame into a training example. A training example contains a sample of the input features and the correct answer for those inputs. In this case, the correct answer is whether the arrival delay is less than 15 minutes and the labels that you want to use as inputs are the values for departure delaytaxi out time, and flight distance.

Enter the following in the Pyspark interactive console to create the definition for the training example function:

def to_example(raw_data_point):
  return LabeledPoint(\
              float(raw_data_point['ARR_DELAY'] < 15), # on-time? \
              [ \
                  raw_data_point['DEP_DELAY'], \
                  raw_data_point['TAXI_OUT'], \
                  raw_data_point['DISTANCE'], \
              ])

You will need to press return again to get back to the PySpark interactive shell prompt ( >>> ) .

You can now map this training example function to the training dataset using a mapping:

examples = traindata.rdd.map(to_example)

This gives you a training DataFrame for the Spark logistic regression module that will create a logistic regression model based on your training dataset:

lrmodel = LogisticRegressionWithLBFGS.train(examples, intercept=True)

The intercept=True parameter is used because the prediction for arrival delay will not equal zero when all of the inputs are zero in this case. If you have a training dataset where you expect that a prediction should be zero when the inputs are all zero then you should specify intercept=False.

When this train method finishes, the lrmodel object will have weights and an intercept value that you can inspect:

print (lrmodel.weights,lrmodel.intercept)

The output will look similar to the following:

[-0.17315525007,-0.123703577812,0.00047521823417] 5.26368986835

These weights, when used with the formula for linear regression, allow you to create a model in code with any language of your choosing.

You can test this directly now by providing some input variables for a flight that has a departure delay of 6 minutes, a taxi-out time of 12 minutes, and a flight distance of 594 miles.

lrmodel.predict([6.0,12.0,594.0])

This yields a result of 1, the flight will be on time. Now try it with a much longer departure delay of 36 minutes.

lrmodel.predict([36.0,12.0,594.0])

This yields a result of 0, the flight won’t be on time.

These results are not probabilities; they are returned as either true or false based on a threshold which is set by default to 0.5. You can return the actual probability by clearing the threshold:

lrmodel.clearThreshold()
print (lrmodel.predict([6.0,12.0,594.0]))
print (lrmodel.predict([36.0,12.0,594.0]))

This will produce results that are probabilities, with the first close to 1 and the second close to 0.

Now set the threshold to 0.7 to correspond to your requirement to be able to cancel meetings if the probability of an on time arrival falls below 70%.

lrmodel.setThreshold(0.7)
print (lrmodel.predict([6.0,12.0,594.0]))
print (lrmodel.predict([36.0,12.0,594.0]))

Your outputs are once again 1 and 0, but now they reflect the 70% probability threshold that you require and not the default 50%.

Save and Restore a Logistic Regression Model

You can save a Spark logistic regression model directly to Google Cloud Storage which allows you to save and reuse a model without having to retrain the model from scratch.

To save a model to Cloud Storage you must first make sure that the location does not already contain any model files.

Run the following:

MODEL_FILE='gs://' + BUCKET + '/flights/sparkmloutput/model'
os.system('gsutil -m rm -r ' +MODEL_FILE)

This will should report an error stating CommandException: 1 files/objects could not be removed because the model has not been saved yet. The error indicates that there are no files present in the target location. You need to be certain that this location is empty before attempting to save the model and this command guarantees that.

Save the model by running:

lrmodel.save(sc, MODEL_FILE)
print ('{} saved'.format(MODEL_FILE))

Now destroy the model object in memory and confirm that it no longer contains any model data:

lrmodel = 0
print (lrmodel)

Now retrieve the model from storage and print it out:

from pyspark.mllib.classification import LogisticRegressionModel
lrmodel = LogisticRegressionModel.load(sc, MODEL_FILE)
lrmodel.setThreshold(0.7)
print (lrmodel.weights)

The model parameters, i.e. the weights and intercept values, have been restored.

Test the model with a scenario that will definitely not arrive on time:

print (lrmodel.predict([36.0,12.0,594.0]))

This will print out 0 indicating that the flight will probably arrive late, given your 70% probability threshold.

Finally, retest the model using data for a flight that should arrive on time:

print (lrmodel.predict([6.0,12.0,594.0]))

This will print out 1 indicating that the flight will probably arrive on time, given your 70% probability threshold.

Close the PySpark interactive shell:

exit()

You should now be back to the SSH command line.

One thought on “Machine Learning with Spark on Google Cloud Dataproc

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s