Databricks-ML-professional-S02a-Preprocessing-Logic
This Notebook adds information related to the following requirements:
Preprocessing Logic:
- Describe an MLflow flavor and the benefits of using MLflow flavors
- Describe the advantages of using the pyfunc MLflow flavor
- Describe the process and benefits of including preprocessing logic and context in custom model classes and objects
Download this notebook at format ipynb here.
Flavor refers to the library of framework a ML model is built on.
- For example the flavor of a model can be - among others -
- scikit-learn:
mlflow.sklearn
- Spark ML:
mlflow.spark
- Keras:
mlflow.keras
- scikit-learn:
- Problem: keeping models in their native flavor can make the deployments more challenging.
- The idea of flavor is to unify model artifact and api regardless the library used to build the model.
- This contributes to break siloes and make collaboration easier.
The python_function
or pyfunc
flavor of models gives a generic way of bundling models.
- Let deploy a python function without worrying about the underlying format of the model
- MLflow therefore maps any training framework to any deployment using these platform-agnostic representations, massively reducing the complexity of inference.
- A
pyfunc
is a generic python model that can define any arbitrary logic, regardless of the libraries used to train it. - This object interoperates with any MLflow functionality, especially downstream scoring tools. As such, it's defined as a class with a related directory structure with all of the dependencies.
- It is then "just an object" with a various methods such as a predict method. Since it makes very few assumptions, it can be deployed using MLflow, SageMaker, a Spark UDF, or in any other environment.
Log two models to mlflow using mlflow.pyfunc.log_model
:
- 1 scikit-learn model
- 1 xgboost model
And use them later the same way for prediction using pyfunc.load_model()
function.
Load some libraries
import pandas as pd
import seaborn as sns
#
import sklearn
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
#
import mlflow
from mlflow.models.signature import infer_signature
#
import logging
import json
import os
from sys import version_info
#
import xgboost
logging.getLogger("mlflow").setLevel(logging.FATAL)
Prepare train and test sets:
diamonds_df = sns.load_dataset('diamonds').drop(['cut', 'color', 'clarity'], axis=1)
#
X_train, X_test, y_train, y_test = train_test_split(diamonds_df.drop(["price"], axis=1), diamonds_df["price"], random_state=42)
Definition of custom scikit-learn model:
class sklearn_model(mlflow.pyfunc.PythonModel):
def __init__(self, params):
""" Initialize with just the model hyperparameters """
#
self.params = params
self.rf_model = None
self.config = None
def load_context(self, context=None, config_path=None):
""" When loading a pyfunc, this method runs automatically with the related
context. This method is designed to perform the same functionality when
run in a notebook or a downstream operation (like a REST endpoint).
If the `context` object is provided, it will load the path to a config from
that object (this happens with `mlflow.pyfunc.load_model()` is called).
If the `config_path` argument is provided instead, it uses this argument
in order to load in the config. """
#
if context: # This block executes for server run
config_path = context.artifacts["config_path"]
else: # This block executes for notebook run
pass
self.config = json.load(open(config_path))
def preprocess_input(self, model_input):
""" Return pre-processed model_input """
#
# any preprocessing can be done there. For the example purpose, let's just apply a Robust Scaler
from sklearn.preprocessing import RobustScaler
#
for c in list(model_input.columns):
model_input[c] = RobustScaler().fit_transform(model_input[[c]])
#
return model_input
def fit(self, X_train, y_train):
""" Uses the same preprocessing logic to fit the model """
#
from sklearn.ensemble import RandomForestRegressor
#
processed_model_input = self.preprocess_input(X_train)
rf_model = RandomForestRegressor(**self.params)
rf_model.fit(processed_model_input, y_train)
#
self.rf_model = rf_model
def predict(self, context, model_input):
""" This is the main entrance to the model in deployment systems """
#
processed_model_input = self.preprocess_input(model_input.copy())
return self.rf_model.predict(processed_model_input)
Definition of the parameters for the custom scikit-learn model:
params_sklearn = {
"n_estimators": 15,
"max_depth": 5
}
#
# Designate a path
config_path_sklearn = "data_sklearn.json"
#
# Save the results
with open(config_path_sklearn, "w") as f:
json.dump(params_sklearn, f)
#
# Generate an artifact object to saved
# All paths to the associated values will be copied over when saving
artifacts_sklearn = {"config_path": config_path_sklearn}
Instantiate the scikit-learn custom model:
model_sk = sklearn_model(params_sklearn)
#
model_sk.load_context(config_path=config_path_sklearn)
#
# Confirm the config has loaded
model_sk.config
Out[67]: {'n_estimators': 15, 'max_depth': 5}
Train the scikit-learn custom model on training set:
model_sk.fit(X_train, y_train)
Verify there can be predictions on test set using scikit-learn custom model:
predictions_sklearn = model_sk.predict(context=None, model_input=X_test)
pd.DataFrame({'actual prices': list(y_test), 'predictions': list(predictions_sklearn)}).head(5)
actual prices | predictions | |
---|---|---|
0 | 559 | 581.063801 |
1 | 2201 | 1898.067468 |
2 | 1238 | 987.555592 |
3 | 1304 | 1026.659660 |
4 | 6901 | 10610.572961 |
Optionally, prepare model signature:
signature_sklearn = infer_signature(X_test, predictions_sklearn)
signature_sklearn
Out[75]: inputs: ['carat': double, 'depth': double, 'table': double, 'x': double, 'y': double, 'z': double] outputs: [Tensor('float64', (-1,))]
Generate the conda environment. This can be arbitrarily complex. This is necessary because when we use mlflow.sklearn, we automatically log the appropriate version of sklearn. With a pyfunc, we must manually construct our deployment environment. See more details about it in this video.
conda_env_sklearn = {
"channels": ["defaults"],
"dependencies": [
f"python={version_info.major}.{version_info.minor}.{version_info.micro}",
"pip",
{"pip": ["mlflow",
f"scikit-learn=={sklearn.__version__}"]
},
],
"name": "sklearn_env"
}
conda_env_sklearn
Out[77]: {'channels': ['defaults'], 'dependencies': ['python=3.9.5', 'pip', {'pip': ['mlflow', 'scikit-learn==0.24.2']}], 'name': 'sklearn_env'}
Save the model using mlflow.pyfunc.log_model
using the parameters defined previously:
with mlflow.start_run() as run:
mlflow.pyfunc.log_model(
"sklearn_RFR",
python_model=model_sk,
artifacts=artifacts_sklearn,
conda_env=conda_env_sklearn,
signature=signature_sklearn,
input_example=X_test[:3]
)
It is now possible to load the logged model using mlflow.pyfunc.load_model
and use it for predictions:
mlflow_pyfunc_model_path_sk = f"runs:/{run.info.run_id}/sklearn_RFR"
loaded_preprocess_model_sk = mlflow.pyfunc.load_model(mlflow_pyfunc_model_path_sk)
#
y_pred = loaded_preprocess_model_sk.predict(X_test)
#
pd.DataFrame({'actual prices': list(y_test), 'predictions': list(y_pred)}).head(5)
actual prices | predictions | |
---|---|---|
0 | 559 | 581.063801 |
1 | 2201 | 1898.067468 |
2 | 1238 | 987.555592 |
3 | 1304 | 1026.659660 |
4 | 6901 | 10610.572961 |
Let's score RMSE for this model:
print("RMSE for custom scikit-learn model: ", mean_squared_error(y_test, y_pred, squared=False))
RMSE for custom scikit-learn model: 1372.2569123988917
It's also possible to load the custom model as a Spark UDF using mlflow.pyfunc.spark_udf
and predict in a Spark dataframe:
sklearn_custom_predict = mlflow.pyfunc.spark_udf(spark, mlflow_pyfunc_model_path_sk)
#
display(spark.createDataFrame(X_test).withColumn('prediction', sklearn_custom_predict(*['carat', 'depth', 'table', 'x', 'y', 'z'])).limit(5))
carat | depth | table | x | y | z | prediction |
---|---|---|---|---|---|---|
0.24 | 62.1 | 56.0 | 3.97 | 4.0 | 2.47 | 581.0638013699318 |
0.58 | 60.0 | 57.0 | 5.44 | 5.42 | 3.26 | 8459.162219485595 |
0.4 | 62.1 | 55.0 | 4.76 | 4.74 | 2.95 | 1808.3104272102398 |
0.43 | 60.8 | 57.0 | 4.92 | 4.89 | 2.98 | 2573.6626953761706 |
1.55 | 62.3 | 55.0 | 7.44 | 7.37 | 4.61 | 14959.410444117686 |
Definition of custom xgboost model:
class xgboost_regressor(mlflow.pyfunc.PythonModel):
def __init__(self, params):
""" Initialize with just the model hyperparameters """
#
self.params = params
self.xgb_model = None
self.config = None
def load_context(self, context=None, config_path=None):
""" When loading a pyfunc, this method runs automatically with the related
context. This method is designed to perform the same functionality when
run in a notebook or a downstream operation (like a REST endpoint).
If the `context` object is provided, it will load the path to a config from
that object (this happens with `mlflow.pyfunc.load_model()` is called).
If the `config_path` argument is provided instead, it uses this argument
in order to load in the config. """
#
if context: # This block executes for server run
config_path = context.artifacts["config_path"]
else: # This block executes for notebook run
pass
self.config = json.load(open(config_path))
def preprocess_input(self, model_input):
""" Return pre-processed model_input """
#
# any preprocessing can be done there. For the example purpose, let's here apply a Standard Scaler
from sklearn.preprocessing import StandardScaler
#
for c in list(model_input.columns):
model_input[c] = StandardScaler().fit_transform(model_input[[c]])
#
return model_input
def fit(self, X_train, y_train):
""" Uses the same preprocessing logic to fit the model """
#
from xgboost import XGBRegressor
#
processed_model_input = self.preprocess_input(X_train)
xgb_model = XGBRegressor(**self.params)
xgb_model.fit(processed_model_input, y_train)
#
self.xgb_model = xgb_model
def predict(self, context, model_input):
""" This is the main entrance to the model in deployment systems """
#
processed_model_input = self.preprocess_input(model_input.copy())
return self.xgb_model.predict(processed_model_input)
Definition of the parameters for the custom xgboost model:
params_xgb = {
"n_estimators": 1000,
"max_depth": 7,
"eta": 0.1,
"subsample": 0.7,
"colsample_bytree": 0.8
}
# Designate a path
config_path_xgb = "data_xgb.json"
# Save the results
with open(config_path_xgb, "w") as f:
json.dump(params_xgb, f)
# Generate an artifact object to saved
# All paths to the associated values will be copied over when saving
artifacts_xgb = {"config_path": config_path_xgb}
Instantiate the xgboost custom model:
model_xgb = xgboost_regressor(params_xgb)
#
model_xgb.load_context(config_path=config_path_xgb)
#
# Confirm the config has loaded
model_xgb.config
Out[91]: {'n_estimators': 1000, 'max_depth': 7, 'eta': 0.1, 'subsample': 0.7, 'colsample_bytree': 0.8}
Train the xgboost custom model on training set:
model_xgb.fit(X_train, y_train)
Verify there can be predictions on test set using xgboost custom model:
predictions_xgb = model_xgb.predict(context=None, model_input=X_test)
pd.DataFrame({'actual prices': list(y_test), 'predictions': list(predictions_xgb)}).head(5)
actual prices | predictions | |
---|---|---|
0 | 559 | 524.269531 |
1 | 2201 | 1795.014160 |
2 | 1238 | 1029.636230 |
3 | 1304 | 1096.781372 |
4 | 6901 | 10364.128906 |
Optionally, prepare model signature:
signature_xgb = infer_signature(X_test, predictions_xgb)
signature_xgb
Out[94]: inputs: ['carat': double, 'depth': double, 'table': double, 'x': double, 'y': double, 'z': double] outputs: [Tensor('float32', (-1,))]
Generate the conda environment. This can be arbitrarily complex. This is necessary because when we use mlflow.sklearn, we automatically log the appropriate version of sklearn. With a pyfunc, we must manually construct our deployment environment. See more details about it in this video.
conda_env_xgb = {
"channels": ["defaults"],
"dependencies": [
f"python={version_info.major}.{version_info.minor}.{version_info.micro}",
"pip",
{"pip": ["mlflow",
f"xgboost=={xgboost.__version__}"]
},
],
"name": "xgboost_env"
}
conda_env_xgb
Out[95]: {'channels': ['defaults'], 'dependencies': ['python=3.9.5', 'pip', {'pip': ['mlflow', 'xgboost==1.6.2']}], 'name': 'xgboost_env'}
Save the model using mlflow.pyfunc.log_model
using the parameters defined previously:
with mlflow.start_run() as run:
mlflow.pyfunc.log_model(
"xgb_regressor",
python_model=model_xgb,
artifacts=artifacts_xgb,
conda_env=conda_env_xgb,
signature=signature_xgb,
input_example=X_test[:3]
)
It is now possible to load the logged model using mlflow.pyfunc.load_model
and use it for predictions:
mlflow_pyfunc_model_path_xgb = f"runs:/{run.info.run_id}/xgb_regressor"
loaded_preprocess_model_xgb = mlflow.pyfunc.load_model(mlflow_pyfunc_model_path_xgb)
#
y_pred_xgb = loaded_preprocess_model_xgb.predict(X_test)
#
pd.DataFrame({'actual prices': list(y_test), 'predictions': list(y_pred_xgb)}).head(5)
actual prices | predictions | |
---|---|---|
0 | 559 | 524.269531 |
1 | 2201 | 1795.014160 |
2 | 1238 | 1029.636230 |
3 | 1304 | 1096.781372 |
4 | 6901 | 10364.128906 |
Let's score RMSE for this model:
print("RMSE for custom xgboost model: ", mean_squared_error(y_test, y_pred_xgb, squared=False))
RMSE for custom xgboost model: 1457.7130185941312
It's also possible to load the custom model as a Spark UDF using mlflow.pyfunc.spark_udf
and predict in a Spark dataframe:
xgboost_custom_predict = mlflow.pyfunc.spark_udf(spark, mlflow_pyfunc_model_path_xgb)
#
display(spark.createDataFrame(X_test).withColumn('prediction', xgboost_custom_predict(*['carat', 'depth', 'table', 'x', 'y', 'z'])).limit(5))
carat | depth | table | x | y | z | prediction |
---|---|---|---|---|---|---|
0.24 | 62.1 | 56.0 | 3.97 | 4.0 | 2.47 | 818.0107421875 |
0.58 | 60.0 | 57.0 | 5.44 | 5.42 | 3.26 | 2764.87451171875 |
0.4 | 62.1 | 55.0 | 4.76 | 4.74 | 2.95 | 1567.1573486328125 |
0.43 | 60.8 | 57.0 | 4.92 | 4.89 | 2.98 | 1833.7303466796875 |
1.55 | 62.3 | 55.0 | 7.44 | 7.37 | 4.61 | 11748.9375 |