A Deep Dive into Custom Spark Transformers for Machine Learning Pipelines

  • Modern Spark Pipelines are a powerful way to create machine learning pipelines
  • Spark Pipelines use off-the-shelf data transformers to reduce boilerplate code and improve readability for specific use cases
  • This blog outlines how to construct custom Spark Transformers to integrate with Spark Pipelines
  • Learn how to identify the components of each Transformer class member function and correctly serialize and deserialize the transformer to and from disk 

CrowdStrike data scientists often explore novel approaches for creating machine learning pipelines especially when processing a large volume of data. The CrowdStrike Security Cloud stores more than 15 petabytes of data in the cloud and gathers data from trillions of security events per day, using it to secure millions of endpoints, cloud workloads and containers around the globe with the power of machine learning and indicators of attack.

When processing so much data, making use of modern Spark Pipelines is a powerful way to use off-the-shelf libraries to reduce boilerplate code and improve readability. Because these Transformers may not fit all use cases, it’s important to understand how to currently construct a custom Spark Transformer that integrates with Spark Pipelines and understand the components of Transformer. 

Pipeline Framework

Note: For this blog, we assume usage of PySpark version 3.0+

Machine learning workflows generally consist of multiple high-level steps:

  • Preprocessing your input data via some extract, transform and load (ETL) steps
  • Splitting the dataset for either cross validation or train/test/validate split
  • Training the model
  • Tuning hyperparameters

The code and structure of each step vary greatly and if inconsistently implemented, can affect readability and flexibility of a data scientist’s workflow. In addition, data scientists often reuse components of their workflow with slight modifications in repeated experiments. This is why commonly used frameworks like scikit-learn and Spark have created pipeline frameworks to more flexibly express and assemble common high-level workflows. 

Such frameworks give the user a consistent approach to build out the steps required to conduct experiments and are easy to extend. A less obvious advantage of this frame is the reduction of complexity for collaborators. Pipelines provide a common code structure which is more readable and thus reduces the barrier of entry into your codebase.

The following is a simple example of a dataset using a pipeline:

# Setup a simple pipeline to tokenize -> hashed term frequency vector -> train logistic regression
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer,
                            hashingTF,
                            lr])
 
model = pipeline.fit(input_dataset)

Because experiments should be reproducible, we may need to save information regarding the state of transformation and model hyperparameters. Without a pipeline, each transformer and model may need to be saved separately, and the order of transformation must be manually preserved. Using Spark Pipeline allows us to save the entire pipeline (including transformer states, order and hyperparameters) as a single object and reload easily. From an experimentation and engineering perspective, this reduces the ambiguity of experiment configurations and makes integrating the model and pipeline downstream more straightforward.

# save and reload the entire pipeline
model.save(save_path)
 
# use pipeline to run entire process again
loaded_pipeline = Pipeline.load(save_path)
loaded_predictions = loaded_pipeline.transform(test) 
 
# output
``` 
>> loaded_pipeline.stages
# output shows the stages in the loaded pipeline
[Tokenizer_7397a14f7aaa,
 HashingTF_4c188d1e40c1,
 LogisticRegressionModel: uid=LogisticRegression_a3ac1d359fb0, numClasses=2, numFeatures=12]
```

Custom Data Transformations

With improvements in flexibility and readability comes some additional work. We must conform our code with a structure that’s acceptable by modern pipelines. One very common set of tasks used in pipelines is the transform step of the ETL process, where we must take our raw data and pass it through a series of data transformation steps. The output of these transforms are vectors and labels used for model training. Though many manipulations on Spark Data can already be done through either native functions or Spark SQL, there are often custom transforms we must apply to every row of our data that require custom code. 

Let’s take for example a simple text manipulation Spark Dataset containing id, text and label  columns:

df = spark_session.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])
 
# Produces the following table:
```
+---+----------------+-----+
| id|            text|label|
+---+----------------+-----+
|  0| a b c d e spark|  1.0|
|  1|             b d|  0.0|
|  2|     spark f g h|  1.0|
|  3|hadoop mapreduce|  0.0|
+---+----------------+-----+
```

Starting with a Basic Transformation

It is recommended that data transformation should be expressed as Spark SQL when possible due to its under-the-hood integration with Spark query optimizers and JVM. However, this is sometimes not possible with more complex transformations. In such cases, we can use Spark User Defined Function (UDF) to write our transformations. (Note that UDFs will always be slower than native Spark SQL.)

We’d like to apply a transform such that if we see the string spark, we will append an additional signal string to the end of the text. A simple way to apply this transform to each row is to write this function, then run it as a UDF:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
 
# Define our transformation
def append_string(s, append_val=""):
    """
    If we see the word `spark` in s, append a string to the current string.
    """
    if s and 'spark' in s:
        return s + append_val
    return s
 
 
# Wrap the transformation as a UDF
append_udf = udf(lambda row: append_string(row, " hadoop"), StringType())
 
# Apply the UDF to our Dataset and create a resultant column called `appended_text`
df.withColumn("appended_text", append_udf(col("text"))) \
  .show()   
 
 
# Produces the following output table:
```
+---+----------------+-----+----------------------+
|id |text        	|label|appended_text     	|
+---+----------------+-----+----------------------+
|0  |a b c d e spark |1.0  |a b c d e spark hadoop|
|1  |b d         	|0.0  |b d               	|
|2  |spark f g h 	|1.0  |spark f g h hadoop	|
|3  |hadoop mapreduce|0.0  |hadoop mapreduce  	|
+---+----------------+-----+----------------------+
 
```

Note that although this will apply the correct transform, there are a few inconveniences:

  • We cannot save the internal state of the transform — for example, what value we used for the append_val argument in append_string(). This is especially important if we have many inputs that need to be set before we run our transform.
  • We cannot use it as part of a Pipeline, so we would need to either create a Pipeline which starts after this transform step, or write our own subsequent data transforms manually. This means we need to programmatically ensure that code between experiments stays the same. 

Converting Transformation Function Into a Custom Transformer

To make our transformation function both savable and loadable and usable as part of a Pipeline, we will inherit from the SparkML Transformer class along with a few mixins to ensure API conformity with SparkML. The converted custom transformer would look like the following:

import append_string  # this is the function we wrote above
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark import keyword_only  # Note: use pyspark.ml.util.keyword_only if Spark < 2.0
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
 
 
class StringAppender(Transformer,               # Base class
                     HasInputCol,               # Sets up an inputCol parameter
                     HasOutputCol,              # Sets up an outputCol parameter
                     DefaultParamsReadable,     # Makes parameters readable from file
                     DefaultParamsWritable      # Makes parameters writable from file
                    ):
    """
    Custom Transformer wrapper class for append_string()
    """
  
    # append_str is a value which we would like to be able to store state for, so we create a parameter.
    append_str = Param(
        Params._dummy(),
        "append_str",
        "Value we want to append with",
        typeConverter=TypeConverters.toString,   # This will allow code to automatically try to convert to string
    )
  
    @keyword_only
    def __init__(self, inputCol=None, outputCol=None, append_str=None):
        """
        Constructor: set values for all Param objects
        """
        super().__init__()
        self._setDefault(append_str=None)
        kwargs = self._input_kwargs
        self.setParams(**kwargs)
  
    @keyword_only
    def setParams(self, inputCol=None, outputCol=None, append_str=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)
  
    def setAppendStr(self, new_append_str):
        return self.setParams(append_str=new_append_str)
  
    # Required if you use Spark >= 3.0
    def setInputCol(self, new_inputCol):
        return self.setParams(inputCol=new_inputCol)
  
    # Required if you use Spark >= 3.0
    def setOutputCol(self, new_outputCol):
        return self.setParams(outputCol=new_outputCol)
  
    def getAppendStr(self):
        return self.getOrDefault(self.append_str)
  
    def _transform(self, dataset):
        """
        This is the main member function which applies the transform to transform data from the `inputCol` to the `outputCol`
        """
        if not self.isSet("inputCol"):
            raise ValueError(
                "No input column set for the "
                "StringAppenderTransformer transformer."
            )
        input_column = dataset[self.getInputCol()]
        output_column = self.getOutputCol()
        append_str = self.getAppendStr()
        udf_func = lambda x: append_string(x, append_str)
        data_type = StringType()
         
        return dataset.withColumn(output_column,
                                  udf(udf_func, data_type)(input_column))

Let’s break down some components of this wrapper and discuss each in detail:

  • Transformer Abstract Base Class
  • Param Type Member Variables
  • @keyword_only, Constructor, and Decorator and Input Persistence
  • Mixins: HasInputCol, HasOutputCol
  • Traits: DefaultParamsReadable, DefaultParamsWritable

Transformer Abstract Base Class

Every custom transformer must at least inherit pyspark.ml.Transformer as the abstract base class.

We must also at the minimum override the _transform() function so that the Transformer knows how to transform out data. The input passed to  _transform() is the entire input Dataset including all the columns so we will need to retrieve the input and output columns (usually set by the constructor).

Now that we have the input dataset , input_column  name, and output_column  name, we can wrap our transformation function append_string(). Note that if the transformation function requires more than a single input, you will need to convert the function into one which accepts a single input. You can do this using a lambda function.

# Code snippet of _transform():
        udf_func = lambda x: append_string(x, append_str)  # append_string() takes two inputs, we can wrap it with a lambda
        data_type = StringType()
         
        # Note we need to wrap udf_func with pyspark.sql.functions.udf
        return dataset.withColumn(output_column,
                                  udf(udf_func, data_type)(input_column))

Param Type Member Variables

As part of constructing the custom transformer, we will need to generate pyspark.ml.param.shared.Param objects for each of the following:

  • an input_column name which indicates the data that should be transformed
  • the output_column  where the transformed data should be written.
  • any additional data that need to be stored by the Transformer (e.g., append_str, the string that in we want to append in our example)

Param  objects can be set to a value like normal variables but enable us to more easily read/and write them to/from file using Spark’s native methods. Generally these can be set at initialization with the constructor (__init__()). However, because we inherit from HasInputCol and HasOutputCol, the Param type member variables inputCol and outputCol respectively are created automatically for us. Thus we only need to create the append_str Param object. See the next section for more information on the mixins.

append_str = Param(
        Params._dummy(),
        "append_str",
        "Value we want to append with",
        typeConverter=TypeConverters.toString,   # This will allow code to automatically try to convert to string
    )

The typeConverter parameter here helps implicitly apply type conversions if the data type is different.

Mixins: HasInputCol, HasOutputCol

Inheriting mixins HasInputCol and HasOutputCol allow us to reduce the amount of boiler plate code we must write to create. HasInputCol will create a Param member variable for your custom transformer class called inputCol  that can then be set/retrieved/written to file. Same effect for HasOutputCol and the member variable outputCol. Additionally each mixin here will also initialize default values for their member variable.

Optionally, you can implement setInputCol()  and setOutputCol() to conform more closely with standard transformers available in SparkML.

There are also additional mixins that can be inherited if needed (e.g., a list of input columns or output columns). For more information, please refer to the pyspark API.

@keyword_only Decorator, Constructor and Input Persistence

To correctly create a custom transformer, we must be able to store the inputs used to create the transformer. The inputs will be stored as Param type member variables within our custom transformer class. Let’s break down how this is done.

@keyword_only
def __init__(self, inputCol=None, outputCol=None, append_str=None):
    """
    Constructor: set values for all Param objects
    """
    super().__init__()
    self._setDefault(append_str=None)
    kwargs = self._input_kwargs
    self.setParams(**kwargs)

Here, @keyword_only  will store input keyword arguments (inputCol, outputCol and append_str in our example) as an internal map inside of the Transformer (in a protected variable called _input_kwargs). After the input arguments are stored, we must manually set any custom variable (using _setDefault()) we pass in that isn’t part of the mixins we inherited from. Specifically, because we inherited from HasInputCol  and HasOutputCol, we do not need to manually set.  This will ensure we can safely retrieve the variables later using the inherited member function getOrDefault().  

Next we set the Param type member variables (by calling setParams()) using our map _input_kwargs so that we can correctly retrieve the true assigned values when we need them later. 

Finally, when we decide to retrieve the variables such as inputCol or append_str , we will need to make a call to getOrDefault() like self.getOrDefault(self.append_str). This is different from how we normally retrieve a variable in Python because each variable is a Param object. See definition for function getAppendStr() for more detail.

Traits: DefaultParamsReadable, DefaultParamsWritable

The final component of creating a custom transformer is to inherit traits DefaultParamsReadable and DefaultParamsWritable to allow us to correctly read to file and write from file both as part of a pipeline or by itself. These traits will read/write the Params we have created to file.

Not inheriting these traits may lead to errors like the following when attempting to save a customer transformer:

ValueError: ('Pipeline write will fail on this pipeline because stage %s of type %s is not MLWritable', 'StringAppender_281f47e48529', <class '__main__.StringAppender'>)

Using a Custom Transformer as Part of a Pipeline

Once the custom transformer is built, it’s easy to attach the transformer to add this component to a pipeline. We will need to initialize our custom transformer by setting the correct input/output columns and the append string to use. Then we will add it as a stage to our pipeline. For example, if we extend the pipeline from section “Pipeline Framework” above, we will have:

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from custom_transformer import StringAppender  # This is the StringAppender we created above
 
appender = StringAppender(inputCol="text", outputCol="updated_text", append_str=" hadoop")  # initialize our custom transformer
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[appender,   # add the transformer as a stage
                            tokenizer,
                            hashingTF,
                            lr])

As we can see, converting a custom processing function into a custom transformer step requires us to implement the pattern discussed in this post. Although there are some non-trivial components to wrapping functions, the pattern for this work is consistent so it can be applied to most processing functions. Additionally, custom transformers can then be used as part of a pipeline to further improve code readability and integration with native spark pipeline frameworks. Finally, setting up your processing functions as transformers allows us to save entire pipelines to disk, which can be more easily shared and used by collaborators down-stream of your workflow.

References

Additional Resources

  • Learn more about today’s adversaries and how to combat them at Fal.Con 2022, the cybersecurity industry’s most anticipated annual event. Register now and meet us in Las Vegas, Sept. 19-21! 
  • Learn more about the powerful, cloud-native CrowdStrike Falcon®® platform by visiting the product webpage.
  • Get a full-featured free trial of CrowdStrike Falcon® Prevent™ and learn how true next-gen AV performs against today’s most sophisticated threats.
Related Content