Making Predictions on a PySpark DataFrame with a Scikit-Learn Model

Making Predictions on a PySpark DataFrame with a Scikit-Learn Model

1_biuUZhtJatC78spfQpC7jQ.jpeg

Introduction

A lot of Data Scientists and ML Engineers use Scikit-Learn to build and train ML models. This is because of Its open-source and easy-to-use configurations which make it widely acceptable.

Ideally, scikit learn-based models and generally all ML models are built with relatively small amounts of labeled data (which Pandas can easily manage). But making predictions with these models on hundreds of millions of rows of “unseen data” is a different story. slipping-patrick-star.gif At this point, Pandas is of no help as it does not scale well with large amounts of data and can only make use of one core at a time. Hence the need for something more scalable and efficient.

An alternative is PySpark (Spark’s API in Python). PySpark is strong where Pandas is weak, being able to read large amounts of data beyond Pandas’ limit, along with its parallel computing capabilities. But of course, a daunting task will be using your scikit learn model to make predictions (the usual way with .predict()) on the PySpark DataFrame.

A different (PySpark) DataFrame object than the usual Pandas DataFrame calls for different methods and approaches. Rebuilding the model with PySpark’s MLlib might cross your mind :(, but there’s an even quicker solution, using the scikit-learn model to make predictions on the PySpark DataFrame.

In this tutorial, I have itemized five steps you can follow to make predictions on a PySpark DataFrame with a scikit learn model. These steps can also be applied with a sklearn model pipeline object.

Prerequisites to follow through with this tutorial

To follow through with this tutorial you need a basic to intermediate understanding of:

  1. Scikit-Learn library
  2. PySpark
  3. User-Defined Functions (UDFs)
  4. Pickle or Joblib
  5. Predictive Analytics

A great article that can equally provide the needed guidance is Charu Makhijanu’s article titled Machine Learning Model Deployment Using Spark

Let's get started...

1. Ensure you have pickled (saved) your model

Here, it is expected that you have built your model and it is ready to make predictions on unseen data.

You need to save your model in the directory you are working in for use in another Jupyter Notebook or IDE where you will make predictions with the model (“score” unseen data). The process of doing this is called Pickling.

Pickling is a way of saving an object (in this case a model) by serializing it. In this process, the object you are trying to save is converted into a byte stream or a sequence of bytes and stored in your working directory.

I use Python’s Pickle Library for this process, but Sklearn’s joblib is also a good alternative to persist and/ or save your model. You can achieve this with simple lines of code, but I like to have a function for these sorts of things so I can reuse it in different instances.

import pickle

def export_object(filename = "filename", object_to_export = model):

    dump_file = open(filename, "wb")
    pickle.dump(object_to_export, dump_file)
    dump_file.close()

2. Create a spark session, unpickle and broadcast your model

In this step, our goal is to prepare the previously pickled model for application on a PySpark DataFrame. And so there are three important actions in this step

    • initializing a spark session
    • unpickling the saved model object, and
    • broadcasting it by making it available for parallel processing.
    Spark Sessions are important entities that need to be created when using PySpark. A spark session is essentially an entry point that provides a way of interacting with Spark’s capabilities which include creating RDDs (Resilient Distributed Dataset), DataFrames and so on. They have been the norm since Spark version 2.0.

    spark sessions can be easily created with the Spark Session builder pattern (SparkSession.builder()). Along with spark sessions, there are some configurations that need to be specified. These configurations will dictate PySpark’s behavior as you make use of it in reading the data for your model to make predictions.

    While the technical details of SparkSessions are beyond the scope of this article, you can find more information about it here.

    Below is how I create a SparkSession before reading the data with PySpark:

spk = SparkConf().setMaster('local[*]')\
     .set('spark.executor.memory','512g')\
     .set('spark.driver.memory', '512g')\
     .set('spark.shuffle.consolidateFiles','true')\
     .set("spark.sql.files.ignoreCorruptFiles", "true")\
     .set("spark.hadoop.fs.s3a.multipart.size", "104857600")\
     .setAppName("test")

spark=SparkSession.builder.config(conf = spk).getOrCreate()
sc = spark.sparkContext

Unpickling is the opposite of pickling an object. The unpickling process involves deserializing the pickled object, thereby loading it for use. To unpickle your model for use on a pyspark dataframe, you need the binaryFiles()function to read the serialized object, which is essentially a collection of binary files. You also need the collect() function to collate all the binary files read into a list. The last step in unpickling the model object is calling Pickle’s .loads() on the “collected” model data.

You can find how I did these in the code snippet below

model pkl = sc.binaryFiles("model")
model data = model pkl.collect()
model = pickle.loads(model data[0][1])

The last action in this step is broadcasting the model. We are able to broadcast a model object when we make the model available for parallel processing. This makes sense since PySpark has parallel processing capabilities. With the following lines of code, you can broadcast your model. You can go ahead to print your model’s configurations with the inbuilt print() function

broadcast model = sc.broadcast(model)
print(broadcast model.value)

3. Read and process your data according to the steps used in your model building phase

You need to read and transform the data according to the steps used in the model building phase. Of course, this will mean transforming some pandas preprocessing methods to suitable ones for PySpark DataFrames in your prediction/ data scoring script or code block. For guidance, you can look through this article by Yuefeng Zhang for pandas methods and their equivalent in PySpark.

Reading data in pyspark is as easy as one line of code. You just need to tell PySpark the format of the data and its location or source directory (a local directory or even an AWS s3 bucket). You can read data in Parquet format with this simple line of code. The option() function allows you to specify some configurations while reading the data.

spark.read.format('parquet').option("inferSchema",
                                            "true").load(args['source'])

To read other file formats aside Parquet please refer to this link.

4. Write a PySpark UDF to make predictions over DataFrame with your broadcast model

We are getting to the best part…
b4eea65b8b3ae2600dc4ed252b95596e_w200.gif

User-Defined Functions or UDFs are created to extend otherwise non-compatible capabilities (functions and methods) to PySpark DataFrames. They are computationally expensive, but in this case, we need them to make predictions on the PySpark DataFrame.

A UDF consists of two entities:

  • A python function
  • PySpark’s UDF decorator

The decorator will make the python function implementable on the DataFrame object. Here we are just calling our model’s predict() function on the specific data point that will be used to make or infer predictions.

To make predictions, our python function will look like this:

def predict(col):

    predictions = model.value.predict(col)
    return predictions

To apply this on the pyspark DataFrame we need to wrap it in a UDF. This can be written like so

make_predictions = udf(lambda x: predict(x))

5. Make Predictions by Applying the UDF on the PySpark DataFrame

The last and easiest step is applying the UDF on your PySpark DataFrame. The UDF we created above grants the variable make_predictions the functionalities of our model’s .predict()function. And so we are able to call this variable on the datapoint or column we need to make predictions.

I expect that you will want to save your predictions as a column in the DataFrame. PySpark has a very intuitive way of creating columns from the results of functions and methods. For this, the withColumn() function is very handy. The first argument is the name of the column that will store the predicted output of our model, the second argument is the make_predictions() UDF we created. See how to make predictions below:

df.withColumn("predicted_values", make_predictions(col("column")))

Conclusion

PySpark is a scalable library that can help you read large amounts of data for your model with little to no worry about size or the speed at which your data get’s fed into the model for predictions. And so, it is an easy substitute to Pandas especially when it comes to reading very large amounts of data.

Thankfully, the five points above suggest ways you can apply your sklearn model on a PySpark DataFrame, meaning you don’t have to rebuild the model with a different library or in this case PySpark’s MLlib.

For questions, discussions on this article on any other cool Data Science and Machine Learning stuff, I am available for discussions at . You can also catch me here on LinkedIn and Twitter