Databricks-ML-professional-S02c-Model-Lifecycle-Automation
This Notebook adds information related to the following requirements:
Model Lifecycle Automation:
- Identify the role of automated testing in ML CI/CD pipelines
- Describe how to automate the model lifecycle using Model Registry Webhooks and Databricks Jobs
- Identify advantages of using Job clusters over all-purpose clusters
- Describe how to create a Job that triggers when a model transitions between stages, given a scenario
- Describe how to connect a Webhook with a Job
- Identify which code block will trigger a shown webhook
- Identify a use case for HTTP webhooks and where the Webhook URL needs to come
- Describe how to list all webhooks and how to delete a webhook
Download this notebook at format ipynb here.
Automated testing in ML CI/CD pipelines plays a crucial role in ensuring the reliability, robustness, and performance of machine learning models. It helps identify errors, evaluate model accuracy, and maintain consistent behavior across deployments. Automated tests can cover unit testing for individual components, integration testing for model pipelines, and end-to-end testing for overall system functionality, providing confidence in the model's performance throughout the development lifecycle. This ensures that changes introduced in the CI/CD pipeline do not adversely impact the model's effectiveness and reliability.
A model registry webhook - also called model registry trigger - can be related to an event occuring within model registry. It means when a specific event occurs in model registry a specific action can be executed.
In this particular case, we are interested in the execution of a Databricks job when a specific event occurs in model registry.
As soon as a model is moved to Staging stage, the Databricks jobs will be triggered executing the Notebook that contains all the tests. This describes a way to automate the testing part of an ML CI/CD pipeline using Databricks.
Steps are:
- Train a model
- Log the model to MLflow
- Register the model
- Create a notebook containing some tests
- Create a job of 1 task: it should execute the test notebook created in the previous step (this can be done through th UI or programmaticaly)
- Create a webhook that should listen to the event: 'when a model is moved to Staging' =
MODEL_VERSION_TRANSITIONED_TO_STAGING
And that's it. As soon as a specific model will be transitioned to Staging, the test notebook will be triggered and execute any test defined there.
See this page for more information about webhook, in particular the list of possible events to listen for.
See this video for a complete example.
Here is an example of code from training a model to the creation of a webhook that should be triggered when the model is moved to Staging stage. Then the webhook will be triggered to execute the test notebook via the Databricks job.
Import libraries:
import pandas as pd
import seaborn as sns
#
from pyspark.sql.functions import *
#
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
#
import mlflow
import logging
#
import json
import requests
from mlflow.utils.rest_utils import http_request
from mlflow.utils.databricks_utils import get_databricks_host_creds
logging.getLogger("mlflow").setLevel(logging.FATAL)
Load dataset:
diamonds_df = sns.load_dataset("diamonds").drop(["cut", "color", "clarity"], axis=1)
#
diamonds_sdf = spark.createDataFrame(diamonds_df)
#
train_df, test_df = diamonds_sdf.randomSplit([.8, .2], seed=42)
Process features:
assembler_inputs = [column for column in diamonds_sdf.columns if column not in ['price']]
vec_assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
#
train_df_processed = vec_assembler.transform(train_df)
Instantiate ML model:
lrm = LinearRegression(featuresCol="features", labelCol='price')
Train model and log to MLflow:
model_path = 'webhook-model'
#
with mlflow.start_run(run_name="webhook-run") as run:
model = lrm.fit(train_df_processed)
#
mlflow.spark.log_model(model, model_path)
Register latest logged model:
# model name
model_name = "webhook_diamonds"
#
# register the latest logged model
latest_run_id = mlflow.search_runs().sort_values(by="end_time", ascending=False).head(1)['run_id'][0]
#
mlflow.register_model(f"runs:/{latest_run_id}/{model_path}", name=model_name);
Registered model 'webhook_diamonds' already exists. Creating a new version of this model... Created version '2' of model 'webhook_diamonds'.
At this point, we manually create a test notebook and a job containing a task to execute this notebook.
The job ID is necessary for the next steps, it is available in the job definition in the UI and it can also be retrieved programmaticaly thanks to this function:
Note that the definition of the function in the next cell comes from this course.
def find_job_id(instance, headers, job_name, offset_limit=1000):
params = {"offset": 0}
uri = f"{instance}/api/2.1/jobs/list"
done = False
job_id = None
while not done:
done = True
res = requests.get(uri, params=params, headers=headers)
assert res.status_code == 200, f"Job list not returned; {res.content}"
jobs = res.json().get("jobs", [])
if len(jobs) > 0:
for job in jobs:
if job.get("settings", {}).get("name", None) == job_name:
job_id = job.get("job_id", None)
break
# if job_id not found; update the offset and try again
if job_id is None:
params["offset"] += len(jobs)
if params["offset"] < offset_limit:
done = False
return job_id
We need a token for the webhook to be allowed to execute the Databricks job. The way to create a token is described in this course.
Alternatively, for this example purpose, a token can be retrieved with the following command:
token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().getOrElse(None)
Let's define the required parameters for the webhook definition:
# define some parameters
job_name = "webhook_test"
headers = {"Authorization": f"Bearer {token}"}
host_creds = get_databricks_host_creds("databricks")
endpoint = "/api/2.0/mlflow/registry-webhooks/create"
instance = mlflow.utils.databricks_utils.get_webapp_url()
job_id = find_job_id(instance, headers, job_name, offset_limit=1000)
Finally, let's create the webhook:
# define job_json
job_json = {"model_name": model_name,
"events": ["MODEL_VERSION_TRANSITIONED_TO_STAGING"],
"description": "Job webhook trigger",
"status": "Active",
"job_spec": {"job_id": job_id,
"workspace_url": instance,
"access_token": token}
}
response = http_request(
host_creds=host_creds,
endpoint=endpoint,
method="POST",
json=job_json
)
assert response.status_code == 200, f"Expected HTTP 200, received {response.status_code}"
From now, as soon as the model will be transitioned to Staging, the job will be executed, executing the associated notebook containing tests. The model can be transitioned to Staging either manually in the Databricks UI or programmaticaly by executing the below function.
client = mlflow.MlflowClient()
#
client.transition_model_version_stage(model_name, 1, 'Staging')
Out[11]: <ModelVersion: creation_timestamp=1699615322888, current_stage='Staging', description='', last_updated_timestamp=1699616004219, name='webhook_diamonds', run_id='ed6f91126eb149e7bf39c024da865a00', run_link='', source='dbfs:/databricks/mlflow-tracking/1352035400533066/ed6f91126eb149e7bf39c024da865a00/artifacts/webhook-model', status='READY', status_message='', tags={}, user_id='2329071338839022', version='1'>
- Cost Efficiency: Job clusters are ephemeral and automatically terminate after the job completes, minimizing costs compared to continuously running all-purpose clusters.
- Resource Isolation: Job clusters provide dedicated resources for a specific job, preventing interference from other workloads and ensuring consistent performance.
- Automatic Scaling: Job clusters automatically scale resources based on the job's requirements, optimizing resource utilization and improving job execution times.
- Version Isolation: Job clusters allow you to specify the Databricks Runtime version, ensuring consistent and isolated environments for each job execution.
- Ease of Management: Job clusters are managed automatically by Databricks, reducing the operational overhead of managing long-lived clusters manually.
See part 2 of this notebooks.
See part 2 of this notebooks.
See part 2 of this notebooks.
A use case for HTTP webhook is for example send notification to a Slack channel to get informed about something.
In this particular case, Webhook URL would be provided from Slack application. See all steps to create a Slack application and receive notifications on this page.
And below is the code to create the webhook that will send message to Slack channel when a model is moved to Staging.
Note the difference between job webhook and http webhook is one of the keys in the JSON dictionnary. In one case (job webhook), there is the job_spec
key, in the other case (http webhook), there is the http_url_spec
key.
from mlflow.utils.rest_utils import http_request
from mlflow.utils.databricks_utils import get_databricks_host_creds
import urllib
slack_incoming_webhook = "https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX"
endpoint = "/api/2.0/mlflow/registry-webhooks/create"
host_creds = get_databricks_host_creds("databricks")
## specify http url of the slack notification
http_json = {"model_name": model_name,
"events": ["MODEL_VERSION_TRANSITIONED_TO_STAGING"],
"description": "Job webhook trigger",
"status": "Active",
"http_url_spec": {
"url": slack_incoming_webhook,
"enable_ssl_verification": "false"}}
response = http_request(
host_creds=host_creds,
endpoint=endpoint,
method="POST",
json=http_json
)
print(json.dumps(response.json(), indent=4))
Webhooks can be listed and deleted by the use of the following library: databricks-registry-webhooks
See also this page for another way to list and delete webhooks.
%sh
pip install databricks-registry-webhooks -q
WARNING: You are using pip version 21.2.4; however, version 23.3.1 is available. You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-02a00090-d306-4c63-a422-b79fa56b5c38/bin/python -m pip install --upgrade pip' command.
Example of command to list webhooks:
from databricks_registry_webhooks import RegistryWebhooksClient
#
webhooks_list = RegistryWebhooksClient().list_webhooks(model_name=model_name)
#
for webhook in webhooks_list:
print(dict(webhook))
{'creation_timestamp': 1699615877080, 'description': 'Job webhook trigger', 'events': ['MODEL_VERSION_TRANSITIONED_TO_STAGING'], 'http_url_spec': None, 'id': '574346f1870847db8a76e252030d33f1', 'job_spec': <JobSpec: access_token='', job_id='483529352125879', workspace_url='https://eastus-c3.azuredatabricks.net'>, 'last_updated_timestamp': 1699615877080, 'model_name': 'webhook_diamonds', 'status': 'ACTIVE'}
Example of command to delete webhooks: need webhook id from the above command
RegistryWebhooksClient().delete_webhook(id="574346f1870847db8a76e252030d33f1")