<h4 style="font-variant-caps: small-caps;font-size:35pt;">Databricks-ML-professional-S02a-Preprocessing-Logic</h4>

<div style='background-color:black;border-radius:5px;border-top:1px solid'></div>
<br/>
<p>This Notebook adds information related to the following requirements:</p><br/>
<b>Preprocessing Logic:</b>
<ul>
<li>Describe an MLflow flavor and the benefits of using MLflow flavors</li>
<li>Describe the advantages of using the pyfunc MLflow flavor</li>
<li>Describe the process and benefits of including preprocessing logic and context in custom model classes and objects</li>
</ul>
<br/>
<p><b>Download this notebook at format ipynb <a href="Databricks-ML-professional-S02a-Preprocessing-Logic.ipynb">here</a>.</b></p>
<br/>
<div style='background-color:black;border-radius:5px;border-top:1px solid'></div>

<a id="mlflowflavor"></a>
<div style='background-color:rgba(30, 144, 255, 0.1);border-radius:5px;padding:2px;'>
<span style="font-variant-caps: small-caps;font-weight:700">1. Describe an MLflow flavor and the benefits of using MLflow flavors</span></div>

<b>Flavor refers to the library of framework a ML model is built on.</b>
<ul>
<li>For example the flavor of a model can be - among others -
<ul><li><b>scikit-learn</b>: <code>mlflow.sklearn</code></li>
    <li><b>Spark ML</b>: <code>mlflow.spark</code></li>
    <li><b>Keras</b>: <code>mlflow.keras</code></li>
</ul></li>
<li><span style="text-decoration:underline">Problem</span>: keeping models in their native flavor can make the deployments more challenging. </li>
<li>The idea of flavor is to unify model artifact and api regardless the library used to build the model.</li>
<li>This contributes to break siloes and make collaboration easier.</li>
</ul>
<div style="display:block;text-align:center"><img width="500px" src="https://i.ibb.co/z4ZG1wc/mlflow7.png"/></div>

<a id="pyfuncmlflow"></a>
<div style='background-color:rgba(30, 144, 255, 0.1);border-radius:5px;padding:2px;'>
<span style="font-variant-caps: small-caps;font-weight:700">2. Describe the advantages of using the pyfunc MLflow flavor</span></div>

<p><b>The <code>python_function</code> or <code>pyfunc</code> flavor of models gives a generic way of bundling models.</b></p>
<ul>
<li>Let deploy a python function without worrying about the underlying format of the model</li>
<li>MLflow therefore maps any training framework to any deployment using these platform-agnostic representations, massively reducing the complexity of inference.</li>
<li>A <code>pyfunc</code> is a generic python model that can define any arbitrary logic, regardless of the libraries used to train it.</li>
<li>This object interoperates with any MLflow functionality, especially downstream scoring tools. As such, it's defined as a class with a related directory structure with all of the dependencies.</li>
<li> It is then "just an object" with a various methods such as a predict method. Since it makes very few assumptions, it can be deployed using MLflow, SageMaker, a Spark UDF, or in any other environment.</li></ul>

<a id="custommodels"></a>
<div style='background-color:rgba(30, 144, 255, 0.1);border-radius:5px;padding:2px;'>
<span style="font-variant-caps: small-caps;font-weight:700">3. Describe the process and benefits of including preprocessing logic and context in
custom model classes and objects</span>

<div style='border-radius:5px;padding:2px;'><span style="font-variant-caps: small-caps;font-weight:700">Let's illustrate this requirement with an example</span></div>
<p>Log two models to mlflow using <code>mlflow.pyfunc.log_model</code>:</p>
<ul><li>1 scikit-learn model</li><li>1 xgboost model</li></ul>
<p>And use them later the same way for prediction using <code>pyfunc.load_model()</code> function.</p>

<b>Load some libraries</b>

In [None]:
import pandas as pd
import seaborn as sns
#
import sklearn
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
#
import mlflow
from mlflow.models.signature import infer_signature
#
import logging
import json 
import os
from sys import version_info
#
import xgboost

In [None]:
logging.getLogger("mlflow").setLevel(logging.FATAL)

<p>Prepare train and test sets:</p>

In [None]:
diamonds_df = sns.load_dataset('diamonds').drop(['cut', 'color', 'clarity'], axis=1)
#
X_train, X_test, y_train, y_test = train_test_split(diamonds_df.drop(["price"], axis=1), diamonds_df["price"], random_state=42)

<div style='background-color:rgba(0, 139, 69, 0.2);border-radius:5px;padding:2px;padding-left:10px'>
<span style="font-variant-caps: small-caps;font-weight:700">scikit-learn</span></div>

<p>Definition of custom <b>scikit-learn model</b>:</p>

In [None]:
class sklearn_model(mlflow.pyfunc.PythonModel):

    def __init__(self, params):
        """ Initialize with just the model hyperparameters """
        #
        self.params = params
        self.rf_model = None
        self.config = None
        
    def load_context(self, context=None, config_path=None):
        """ When loading a pyfunc, this method runs automatically with the related
            context. This method is designed to perform the same functionality when
            run in a notebook or a downstream operation (like a REST endpoint).
            If the `context` object is provided, it will load the path to a config from 
            that object (this happens with `mlflow.pyfunc.load_model()` is called).
            If the `config_path` argument is provided instead, it uses this argument
            in order to load in the config. """
        #
        if context: # This block executes for server run
            config_path = context.artifacts["config_path"]
        else:       # This block executes for notebook run
            pass

        self.config = json.load(open(config_path))
      
    def preprocess_input(self, model_input):
        """ Return pre-processed model_input """
        #
        # any preprocessing can be done there. For the example purpose, let's just apply a Robust Scaler
        from sklearn.preprocessing import RobustScaler
        #
        for c in list(model_input.columns):
            model_input[c] = RobustScaler().fit_transform(model_input[[c]])
        #
        return model_input
  
    def fit(self, X_train, y_train):
        """ Uses the same preprocessing logic to fit the model """
        #
        from sklearn.ensemble import RandomForestRegressor
        #
        processed_model_input = self.preprocess_input(X_train)
        rf_model = RandomForestRegressor(**self.params)
        rf_model.fit(processed_model_input, y_train)
        #
        self.rf_model = rf_model
    
    def predict(self, context, model_input):
        """ This is the main entrance to the model in deployment systems """
        #
        processed_model_input = self.preprocess_input(model_input.copy())
        return self.rf_model.predict(processed_model_input)

<p>Definition of the parameters for the custom scikit-learn model:</p>

In [None]:
params_sklearn = {
    "n_estimators": 15, 
    "max_depth": 5
}
#
# Designate a path
config_path_sklearn = "data_sklearn.json"
#
# Save the results
with open(config_path_sklearn, "w") as f:
    json.dump(params_sklearn, f)
#
# Generate an artifact object to saved
# All paths to the associated values will be copied over when saving
artifacts_sklearn = {"config_path": config_path_sklearn}

<p>Instantiate the scikit-learn custom model:</p>

In [None]:
model_sk = sklearn_model(params_sklearn)
#
model_sk.load_context(config_path=config_path_sklearn) 
#
# Confirm the config has loaded
model_sk.config

Out[67]: {'n_estimators': 15, 'max_depth': 5}

<p>Train the scikit-learn custom model on training set:</p>

In [None]:
model_sk.fit(X_train, y_train)

<p>Verify there can be predictions on test set using scikit-learn custom model:</p>

In [None]:
predictions_sklearn = model_sk.predict(context=None, model_input=X_test)
pd.DataFrame({'actual prices': list(y_test), 'predictions': list(predictions_sklearn)}).head(5)

Unnamed: 0,actual prices,predictions
0,559,581.063801
1,2201,1898.067468
2,1238,987.555592
3,1304,1026.65966
4,6901,10610.572961


<p>Optionally, prepare model signature:</p>

In [None]:
signature_sklearn = infer_signature(X_test, predictions_sklearn)
signature_sklearn

Out[75]: inputs: 
  ['carat': double, 'depth': double, 'table': double, 'x': double, 'y': double, 'z': double]
outputs: 
  [Tensor('float64', (-1,))]

<i>Generate the conda environment. This can be arbitrarily complex. This is necessary because when we use <b>mlflow.sklearn</b>, we automatically log the appropriate version of <b>sklearn</b>. With a <b>pyfunc</b>, we must manually construct our deployment environment. See more details about it in <a href="https://customer-academy.databricks.com/learn/course/1522/play/9698/model-management-demo" target="_blank">this video</a>.</i>

In [None]:
conda_env_sklearn = {
    "channels": ["defaults"],
    "dependencies": [
        f"python={version_info.major}.{version_info.minor}.{version_info.micro}",
        "pip",
        {"pip": ["mlflow",
                 f"scikit-learn=={sklearn.__version__}"]
        },
    ],
    "name": "sklearn_env"
}
conda_env_sklearn

Out[77]: {'channels': ['defaults'],
 'dependencies': ['python=3.9.5',
  'pip',
  {'pip': ['mlflow', 'scikit-learn==0.24.2']}],
 'name': 'sklearn_env'}

<p>Save the model using <code>mlflow.pyfunc.log_model</code> using the parameters defined previously:</p>

In [None]:
with mlflow.start_run() as run:
    mlflow.pyfunc.log_model(
        "sklearn_RFR", 
        python_model=model_sk, 
        artifacts=artifacts_sklearn,
        conda_env=conda_env_sklearn,
        signature=signature_sklearn,
        input_example=X_test[:3] 
  )

<p>It is now possible to load the logged model using <code>mlflow.pyfunc.load_model</code> and use it for predictions:</p>

In [None]:
mlflow_pyfunc_model_path_sk = f"runs:/{run.info.run_id}/sklearn_RFR"
loaded_preprocess_model_sk = mlflow.pyfunc.load_model(mlflow_pyfunc_model_path_sk)
#
y_pred = loaded_preprocess_model_sk.predict(X_test)
#
pd.DataFrame({'actual prices': list(y_test), 'predictions': list(y_pred)}).head(5)

Unnamed: 0,actual prices,predictions
0,559,581.063801
1,2201,1898.067468
2,1238,987.555592
3,1304,1026.65966
4,6901,10610.572961


<p>Let's score RMSE for this model:</p>

In [None]:
print("RMSE for custom scikit-learn model: ", mean_squared_error(y_test, y_pred, squared=False))

RMSE for custom scikit-learn model:  1372.2569123988917


<p>It's also possible to load the custom model as a <b>Spark UDF</b> using <code>mlflow.pyfunc.spark_udf</code> and predict in a Spark dataframe:</p>

In [None]:
sklearn_custom_predict = mlflow.pyfunc.spark_udf(spark, mlflow_pyfunc_model_path_sk)
#
display(spark.createDataFrame(X_test).withColumn('prediction', sklearn_custom_predict(*['carat', 'depth', 'table', 'x', 'y', 'z'])).limit(5))

carat,depth,table,x,y,z,prediction
0.24,62.1,56.0,3.97,4.0,2.47,581.0638013699318
0.58,60.0,57.0,5.44,5.42,3.26,8459.162219485595
0.4,62.1,55.0,4.76,4.74,2.95,1808.31042721024
0.43,60.8,57.0,4.92,4.89,2.98,2573.66269537617
1.55,62.3,55.0,7.44,7.37,4.61,14959.410444117686


<div style='background-color:rgba(0, 139, 69, 0.2);border-radius:5px;padding:2px;padding-left:10px'>
<span style="font-variant-caps: small-caps;font-weight:700">xgboost</span></div>

<p>Definition of <b>custom xgboost model</b>:</p>

In [None]:
class xgboost_regressor(mlflow.pyfunc.PythonModel):

    def __init__(self, params):
        """ Initialize with just the model hyperparameters """
        #
        self.params = params
        self.xgb_model = None
        self.config = None
        
    def load_context(self, context=None, config_path=None):
        """ When loading a pyfunc, this method runs automatically with the related
            context. This method is designed to perform the same functionality when
            run in a notebook or a downstream operation (like a REST endpoint).
            If the `context` object is provided, it will load the path to a config from 
            that object (this happens with `mlflow.pyfunc.load_model()` is called).
            If the `config_path` argument is provided instead, it uses this argument
            in order to load in the config. """
        #
        if context: # This block executes for server run
            config_path = context.artifacts["config_path"]
        else:       # This block executes for notebook run
            pass

        self.config = json.load(open(config_path))
      
    def preprocess_input(self, model_input):
        """ Return pre-processed model_input """
        #
        # any preprocessing can be done there. For the example purpose, let's here apply a Standard Scaler
        from sklearn.preprocessing import StandardScaler
        #
        for c in list(model_input.columns):
            model_input[c] = StandardScaler().fit_transform(model_input[[c]])
        #
        return model_input
  
    def fit(self, X_train, y_train):
        """ Uses the same preprocessing logic to fit the model """
        #
        from xgboost import XGBRegressor
        #
        processed_model_input = self.preprocess_input(X_train)
        xgb_model = XGBRegressor(**self.params)
        xgb_model.fit(processed_model_input, y_train)
        #
        self.xgb_model = xgb_model
    
    def predict(self, context, model_input):
        """ This is the main entrance to the model in deployment systems """
        #
        processed_model_input = self.preprocess_input(model_input.copy())
        return self.xgb_model.predict(processed_model_input)

<p>Definition of the parameters for the custom xgboost model:</p>

In [None]:
params_xgb = {
    "n_estimators": 1000, 
    "max_depth": 7,
    "eta": 0.1,
    "subsample": 0.7,
    "colsample_bytree": 0.8
}

# Designate a path
config_path_xgb = "data_xgb.json"

# Save the results
with open(config_path_xgb, "w") as f:
    json.dump(params_xgb, f)

# Generate an artifact object to saved
# All paths to the associated values will be copied over when saving
artifacts_xgb = {"config_path": config_path_xgb} 

<p>Instantiate the xgboost custom model:</p>

In [None]:
model_xgb = xgboost_regressor(params_xgb)
#
model_xgb.load_context(config_path=config_path_xgb) 
#
# Confirm the config has loaded
model_xgb.config

Out[91]: {'n_estimators': 1000,
 'max_depth': 7,
 'eta': 0.1,
 'subsample': 0.7,
 'colsample_bytree': 0.8}

<p>Train the xgboost custom model on training set:</p>

In [None]:
model_xgb.fit(X_train, y_train)

<p>Verify there can be predictions on test set using xgboost custom model:</p>

In [None]:
predictions_xgb = model_xgb.predict(context=None, model_input=X_test)
pd.DataFrame({'actual prices': list(y_test), 'predictions': list(predictions_xgb)}).head(5)

Unnamed: 0,actual prices,predictions
0,559,524.269531
1,2201,1795.01416
2,1238,1029.63623
3,1304,1096.781372
4,6901,10364.128906


<p>Optionally, prepare model signature:</p>

In [None]:
signature_xgb = infer_signature(X_test, predictions_xgb)
signature_xgb

Out[94]: inputs: 
  ['carat': double, 'depth': double, 'table': double, 'x': double, 'y': double, 'z': double]
outputs: 
  [Tensor('float32', (-1,))]

<i>Generate the conda environment. This can be arbitrarily complex. This is necessary because when we use <b>mlflow.sklearn</b>, we automatically log the appropriate version of <b>sklearn</b>. With a <b>pyfunc</b>, we must manually construct our deployment environment. See more details about it in <a href="https://customer-academy.databricks.com/learn/course/1522/play/9698/model-management-demo" target="_blank">this video</a>.</i>

In [None]:
conda_env_xgb = {
    "channels": ["defaults"],
    "dependencies": [
        f"python={version_info.major}.{version_info.minor}.{version_info.micro}",
        "pip",
        {"pip": ["mlflow",
                 f"xgboost=={xgboost.__version__}"]
        },
    ],
    "name": "xgboost_env"
}
conda_env_xgb

Out[95]: {'channels': ['defaults'],
 'dependencies': ['python=3.9.5',
  'pip',
  {'pip': ['mlflow', 'xgboost==1.6.2']}],
 'name': 'xgboost_env'}

<p>Save the model using <code>mlflow.pyfunc.log_model</code> using the parameters defined previously:</p>

In [None]:
with mlflow.start_run() as run:
    mlflow.pyfunc.log_model(
        "xgb_regressor", 
        python_model=model_xgb, 
        artifacts=artifacts_xgb,
        conda_env=conda_env_xgb,
        signature=signature_xgb,
        input_example=X_test[:3] 
  )

<p>It is now possible to load the logged model using <code>mlflow.pyfunc.load_model</code> and use it for predictions:</p>

In [None]:
mlflow_pyfunc_model_path_xgb = f"runs:/{run.info.run_id}/xgb_regressor"
loaded_preprocess_model_xgb = mlflow.pyfunc.load_model(mlflow_pyfunc_model_path_xgb)
#
y_pred_xgb = loaded_preprocess_model_xgb.predict(X_test)
#
pd.DataFrame({'actual prices': list(y_test), 'predictions': list(y_pred_xgb)}).head(5)

Unnamed: 0,actual prices,predictions
0,559,524.269531
1,2201,1795.01416
2,1238,1029.63623
3,1304,1096.781372
4,6901,10364.128906


<p>Let's score RMSE for this model:</p>

In [None]:
print("RMSE for custom xgboost model: ", mean_squared_error(y_test, y_pred_xgb, squared=False))

RMSE for custom xgboost model:  1457.7130185941312


<p>It's also possible to load the custom model as a <b>Spark UDF</b> using <code>mlflow.pyfunc.spark_udf</code> and predict in a Spark dataframe:</p>

In [None]:
xgboost_custom_predict = mlflow.pyfunc.spark_udf(spark, mlflow_pyfunc_model_path_xgb)
#
display(spark.createDataFrame(X_test).withColumn('prediction', xgboost_custom_predict(*['carat', 'depth', 'table', 'x', 'y', 'z'])).limit(5))

carat,depth,table,x,y,z,prediction
0.24,62.1,56.0,3.97,4.0,2.47,818.0107421875
0.58,60.0,57.0,5.44,5.42,3.26,2764.87451171875
0.4,62.1,55.0,4.76,4.74,2.95,1567.1573486328125
0.43,60.8,57.0,4.92,4.89,2.98,1833.7303466796875
1.55,62.3,55.0,7.44,7.37,4.61,11748.9375
