{ "cells": [ { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "58fab4bb-231e-48cf-8ed4-fc15a1b22845", "showTitle": false, "title": "" } }, "source": [ "
This Notebook adds information related to the following requirements:
Download this notebook at format ipynb here.
\n", "Automated testing in ML CI/CD pipelines plays a crucial role in ensuring the reliability, robustness, and performance of machine learning models. It helps identify errors, evaluate model accuracy, and maintain consistent behavior across deployments. Automated tests can cover unit testing for individual components, integration testing for model pipelines, and end-to-end testing for overall system functionality, providing confidence in the model's performance throughout the development lifecycle. This ensures that changes introduced in the CI/CD pipeline do not adversely impact the model's effectiveness and reliability.
" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "18e681ce-93ed-4c38-814e-6d851bb56281", "showTitle": false, "title": "" } }, "source": [ "\n", "A model registry webhook - also called model registry trigger - can be related to an event occuring within model registry. It means when a specific event occurs in model registry a specific action can be executed.
\n", "In this particular case, we are interested in the execution of a Databricks job when a specific event occurs in model registry.
\n", "As soon as a model is moved to Staging stage, the Databricks jobs will be triggered executing the Notebook that contains all the tests. This describes a way to automate the testing part of an ML CI/CD pipeline using Databricks.
\n", "Steps are:
\n", "MODEL_VERSION_TRANSITIONED_TO_STAGING
And that's it. As soon as a specific model will be transitioned to Staging, the test notebook will be triggered and execute any test defined there.
\n", "See this page for more information about webhook, in particular the list of possible events to listen for.
\n", "See this video for a complete example.
" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "0dd6cebd-bdf8-4311-8d2d-aba1a06cb1e6", "showTitle": false, "title": "" } }, "source": [ "Here is an example of code from training a model to the creation of a webhook that should be triggered when the model is moved to Staging stage. Then the webhook will be triggered to execute the test notebook via the Databricks job.
" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "ab13bc35-0a24-4276-803d-94f5536a34f5", "showTitle": false, "title": "" } }, "source": [ "Import libraries:
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "703f0b5c-bfe3-4c2e-868d-272162750573", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "application/vnd.databricks.v1+bamboolib_hint": "{\"pd.DataFrames\": [], \"version\": \"0.0.1\"}", "text/plain": [] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "import pandas as pd\n", "import seaborn as sns\n", "#\n", "from pyspark.sql.functions import *\n", "#\n", "from pyspark.ml.feature import VectorAssembler\n", "from pyspark.ml.regression import LinearRegression\n", "#\n", "import mlflow\n", "import logging\n", "#\n", "import json\n", "import requests\n", "from mlflow.utils.rest_utils import http_request\n", "from mlflow.utils.databricks_utils import get_databricks_host_creds" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "229568ac-4613-450d-ad58-82076441353b", "showTitle": false, "title": "" } }, "outputs": [], "source": [ "logging.getLogger(\"mlflow\").setLevel(logging.FATAL)" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "e0bcb9d4-60d8-49f9-9af6-9d706b2badb2", "showTitle": false, "title": "" } }, "source": [ "Load dataset:
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "4e944809-2594-4469-8289-da701a643535", "showTitle": false, "title": "" } }, "outputs": [], "source": [ "diamonds_df = sns.load_dataset(\"diamonds\").drop([\"cut\", \"color\", \"clarity\"], axis=1)\n", "#\n", "diamonds_sdf = spark.createDataFrame(diamonds_df)\n", "#\n", "train_df, test_df = diamonds_sdf.randomSplit([.8, .2], seed=42)" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "da10c765-83d1-4733-ae05-c8893f278037", "showTitle": false, "title": "" } }, "source": [ "Process features:
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "e2b61631-2512-4168-8009-3373ceb8ef8d", "showTitle": false, "title": "" } }, "outputs": [], "source": [ "assembler_inputs = [column for column in diamonds_sdf.columns if column not in ['price']]\n", "vec_assembler = VectorAssembler(inputCols=assembler_inputs, outputCol=\"features\")\n", "#\n", "train_df_processed = vec_assembler.transform(train_df)" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "93d4c25d-3f1f-4a1a-a0d4-e41f75fc4cf3", "showTitle": false, "title": "" } }, "source": [ "Instantiate ML model:
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "2fd05537-f0e8-4396-8e5a-fa6bacb94b1b", "showTitle": false, "title": "" } }, "outputs": [], "source": [ "lrm = LinearRegression(featuresCol=\"features\", labelCol='price')" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "8d0ab702-a617-43ce-a240-24b017b90dac", "showTitle": false, "title": "" } }, "source": [ "Train model and log to MLflow:
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "fa3a0dc8-7628-4b68-b18c-42e952c4567d", "showTitle": false, "title": "" } }, "outputs": [], "source": [ "model_path = 'webhook-model'\n", "#\n", "with mlflow.start_run(run_name=\"webhook-run\") as run:\n", " model = lrm.fit(train_df_processed)\n", " #\n", " mlflow.spark.log_model(model, model_path)" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "01a016e5-6d3a-4d39-bd81-1e731da9f7ec", "showTitle": false, "title": "" } }, "source": [ "Register latest logged model:
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "24c4e3c7-cdfe-42b4-9d8c-9e4a92bc652f", "showTitle": false, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Registered model 'webhook_diamonds' already exists. Creating a new version of this model...\n", "Created version '2' of model 'webhook_diamonds'.\n" ] } ], "source": [ "# model name\n", "model_name = \"webhook_diamonds\"\n", "#\n", "# register the latest logged model\n", "latest_run_id = mlflow.search_runs().sort_values(by=\"end_time\", ascending=False).head(1)['run_id'][0]\n", "#\n", "mlflow.register_model(f\"runs:/{latest_run_id}/{model_path}\", name=model_name);" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "b523467b-f822-47c5-8b34-b6b5277a38af", "showTitle": false, "title": "" } }, "source": [ "At this point, we manually create a test notebook and a job containing a task to execute this notebook.
\n", "The job ID is necessary for the next steps, it is available in the job definition in the UI and it can also be retrieved programmaticaly thanks to this function:
\n", "Note that the definition of the function in the next cell comes from this course.
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "21f18e09-50b2-4f84-a031-cf8b44321f60", "showTitle": false, "title": "" } }, "outputs": [], "source": [ "def find_job_id(instance, headers, job_name, offset_limit=1000):\n", " params = {\"offset\": 0}\n", " uri = f\"{instance}/api/2.1/jobs/list\"\n", " done = False\n", " job_id = None\n", " while not done:\n", " done = True\n", " res = requests.get(uri, params=params, headers=headers)\n", " assert res.status_code == 200, f\"Job list not returned; {res.content}\"\n", "\n", " jobs = res.json().get(\"jobs\", [])\n", " if len(jobs) > 0:\n", " for job in jobs:\n", " if job.get(\"settings\", {}).get(\"name\", None) == job_name:\n", " job_id = job.get(\"job_id\", None)\n", " break\n", " \n", " # if job_id not found; update the offset and try again\n", " if job_id is None:\n", " params[\"offset\"] += len(jobs)\n", " if params[\"offset\"] < offset_limit:\n", " done = False\n", " return job_id" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "f80768de-16f1-4940-8a5f-6bff628c02b5", "showTitle": false, "title": "" } }, "source": [ "We need a token for the webhook to be allowed to execute the Databricks job. The way to create a token is described in this course.
\n", "Alternatively, for this example purpose, a token can be retrieved with the following command:
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "15446246-1765-4e4a-9de5-07600d985ac2", "showTitle": false, "title": "" } }, "outputs": [], "source": [ "token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().getOrElse(None)" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "452eecae-76e7-481a-b4ca-236398070efb", "showTitle": false, "title": "" } }, "source": [ "Let's define the required parameters for the webhook definition:
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "2d404633-8025-446b-a48f-c62343c31250", "showTitle": false, "title": "" } }, "outputs": [], "source": [ "# define some parameters\n", "job_name = \"webhook_test\"\n", "headers = {\"Authorization\": f\"Bearer {token}\"}\n", "host_creds = get_databricks_host_creds(\"databricks\")\n", "endpoint = \"/api/2.0/mlflow/registry-webhooks/create\"\n", "instance = mlflow.utils.databricks_utils.get_webapp_url()\n", "job_id = find_job_id(instance, headers, job_name, offset_limit=1000)" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "81d547cd-526e-4e36-9759-a34e8b58029b", "showTitle": false, "title": "" } }, "source": [ "Finally, let's create the webhook:
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "f1f46d6d-4586-482b-ae50-60e2fe77ea64", "showTitle": false, "title": "" } }, "outputs": [], "source": [ "# define job_json\n", "job_json = {\"model_name\": model_name,\n", " \"events\": [\"MODEL_VERSION_TRANSITIONED_TO_STAGING\"],\n", " \"description\": \"Job webhook trigger\",\n", " \"status\": \"Active\",\n", " \"job_spec\": {\"job_id\": job_id,\n", " \"workspace_url\": instance,\n", " \"access_token\": token}\n", " }\n", "\n", "response = http_request(\n", " host_creds=host_creds, \n", " endpoint=endpoint,\n", " method=\"POST\",\n", " json=job_json\n", ")\n", "\n", "assert response.status_code == 200, f\"Expected HTTP 200, received {response.status_code}\"" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "772b8b35-d0ad-4a85-b2a1-48166057e828", "showTitle": false, "title": "" } }, "source": [ "From now, as soon as the model will be transitioned to Staging, the job will be executed, executing the associated notebook containing tests. The model can be transitioned to Staging either manually in the Databricks UI or programmaticaly by executing the below function.
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "15b0e72b-2169-4988-b9d4-e092edd9ad44", "showTitle": false, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Out[11]:See part 2 of this notebooks.
" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "f1327e7e-e9ca-44ba-9f8d-77089a4eeb34", "showTitle": false, "title": "" } }, "source": [ "\n", "See part 2 of this notebooks.
" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "4a6fdf9d-a85b-4d1d-ab32-26d6d12eb15b", "showTitle": false, "title": "" } }, "source": [ "\n", "See part 2 of this notebooks.
" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "afd06d14-a428-486e-9302-f98041a71d89", "showTitle": false, "title": "" } }, "source": [ "\n", "A use case for HTTP webhook is for example send notification to a Slack channel to get informed about something.
\n", "In this particular case, Webhook URL would be provided from Slack application. See all steps to create a Slack application and receive notifications on this page.
\n", "And below is the code to create the webhook that will send message to Slack channel when a model is moved to Staging.
\n", "Note the difference between job webhook and http webhook is one of the keys in the JSON dictionnary. In one case (job webhook), there is the job_spec
key, in the other case (http webhook), there is the http_url_spec
key.
Webhooks can be listed and deleted by the use of the following library: databricks-registry-webhooks
See also this page for another way to list and delete webhooks.
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "55f4e87b-0bd9-4929-a5fe-feb91d78900b", "showTitle": false, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "WARNING: You are using pip version 21.2.4; however, version 23.3.1 is available.\n", "You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-02a00090-d306-4c63-a422-b79fa56b5c38/bin/python -m pip install --upgrade pip' command.\n" ] } ], "source": [ "%sh\n", "pip install databricks-registry-webhooks -q" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "95772904-e323-4eba-b8ff-aeb8c1c2dea0", "showTitle": false, "title": "" } }, "source": [ "Example of command to list webhooks:
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "7a39a60e-901c-401e-b74e-5c38eace3cf4", "showTitle": false, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{'creation_timestamp': 1699615877080, 'description': 'Job webhook trigger', 'events': ['MODEL_VERSION_TRANSITIONED_TO_STAGING'], 'http_url_spec': None, 'id': '574346f1870847db8a76e252030d33f1', 'job_spec':Example of command to delete webhooks: need webhook id from the above command
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "c1e537fe-57ed-4dca-8eee-0815d0107b10", "showTitle": false, "title": "" } }, "outputs": [], "source": [ "RegistryWebhooksClient().delete_webhook(id=\"574346f1870847db8a76e252030d33f1\")" ] } ], "metadata": { "application/vnd.databricks.v1+notebook": { "dashboards": [], "language": "python", "notebookMetadata": { "mostRecentlyExecutedCommandWithImplicitDF": { "commandId": 1158789969180638, "dataframes": [ "_sqldf" ] }, "pythonIndentUnit": 2 }, "notebookName": "Databricks-ML-professional-S02c-Model-Lifecycle-Automation", "widgets": {} }, "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.10" } }, "nbformat": 4, "nbformat_minor": 4 }