{ "cells": [ { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "58fab4bb-231e-48cf-8ed4-fc15a1b22845", "showTitle": false, "title": "" } }, "source": [ "

Databricks-ML-professional-S03a-Batch

" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "76c17cec-5d2d-49f0-93d1-5ded2421fda4", "showTitle": false, "title": "" } }, "source": [ "
\n", "
\n", "

This Notebook adds information related to the following requirements:


\n", "Batch:\n", "\n", "
\n", "

Download this notebook at format ipynb here.

\n", "
\n", "
" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "2d6aaf81-c559-44bd-bc70-25852c40193d", "showTitle": false, "title": "" } }, "source": [ "\n", "
\n", "1. Describe batch deployment as the appropriate use case for the vast majority of\n", "deployment use cases
\n", "\n", "
" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "18e681ce-93ed-4c38-814e-6d851bb56281", "showTitle": false, "title": "" } }, "source": [ "\n", "
\n", "2. Identify how batch deployment computes predictions and saves them somewhere\n", "for later use
\n", "

Refers to the process of making predictions in a batch deployment scenario and storing the results for future reference or utilization. Let's break down the key components:

\n", "
    \n", "
  1. Batch deployment: In the context of machine learning or data processing, batch deployment refers to a mode of operation where predictions or computations are performed on a set of data collected over a specific period or based on a predefined batch size.
  2. \n", "
  3. Computes Predictions: This indicates that the system is generating predictions or results based on the input data. In machine learning, this could involve running a trained model on a batch of input data to produce predictions.
  4. \n", "
  5. Save predictions Somewhere: After computing predictions, the results are not immediately discarded. Instead, they are stored or saved in a designated location. This storage could be in databases, files, or any other suitable data storage system.
  6. \n", "
  7. Later use: The predictions are saved with the intention of using them at a later time. This could be for various purposes such as analysis, reporting, or serving the predictions to end-users when needed.
  8. \n", "
\n", "

In practical terms, the process might involve running a batch job that takes a set of input data, applies a trained model to make predictions, and then stores these predictions in a database, file system, or another storage solution. This approach is common in scenarios where real-time processing is not critical, and predictions can be made on a periodic basis.

\n", "

For example, in a recommendation system, batch deployment might involve processing user interactions over a day and generating personalized recommendations overnight. The computed recommendations would then be saved for use the next day when users interact with the system.

" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "1dcba302-be4c-4076-a92a-4bf0b7b4d2c4", "showTitle": false, "title": "" } }, "source": [ "\n", "
\n", "3. Identify live serving benefits of querying precomputed batch predictions
\n", "
    \n", "
  1. Reduced Latency: Precomputing predictions in batch mode allows the system to process and store results ahead of time. This can lead to lower latency during live serving since the predictions are readily available and don't require real-time computation.
  2. \n", "
  3. Scalability: Batch processing can be more efficient for large-scale computations. By precomputing predictions in batches, the system can scale more easily to handle varying workloads during live serving.
  4. \n", "
  5. Resource efficiency: Computing predictions in batch mode can be resource-efficient, especially for complex models or large datasets. It allows the system to optimize resource utilization during non-peak hours.
  6. \n", "
  7. Consistency: Precomputed batch predictions can offer consistency in results, as they are generated using the same model and data. This is in contrast to real-time predictions, which might be influenced by changes in the model or input data at the moment of serving.
  8. \n", "
  9. Offline analysis: Having precomputed predictions enables offline analysis of the results, allowing organizations to gain insights, perform audits, and conduct evaluations without affecting live serving.
  10. \n", "
" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "0dd6cebd-bdf8-4311-8d2d-aba1a06cb1e6", "showTitle": false, "title": "" } }, "source": [ "\n", "
\n", "4. Identify less performant data storage as a solution for other use cases
\n", "

In the context of batch deployment for machine learning models, where predictions are generated in bulk, it's common to save these predictions for later use.

For live serving, high-performance databases are often preferred for quick retrieval. However, in certain scenarios like populating emails, where rapid access may not be crucial, less performant data storage options, such as a blob store, can be identified as suitable solutions.

These storage solutions may not offer the highest performance but are chosen strategically based on the specific needs of use cases like email population, balancing considerations of performance and efficiency.

" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "1fb137c6-a812-4fe9-b723-aa07ef9aa2f0", "showTitle": false, "title": "" } }, "source": [ "\n", "
\n", "5. Load registered models with load_model
\n", "

Let's see two examples to illustrate this requirement:

\n", "\n", "

Then both models will be loaded using mlflow.pyfunc.load_model function and used the same way for prediction on test set.

" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "a7a110c9-0567-4cae-9512-c08076a669b9", "showTitle": false, "title": "" } }, "source": [ "Load some libraries" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "4d42b33e-8b38-464f-8132-c60c1789f9dc", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "application/vnd.databricks.v1+bamboolib_hint": "{\"pd.DataFrames\": [], \"version\": \"0.0.1\"}", "text/plain": [] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "import pandas as pd\n", "import seaborn as sns\n", "import matplotlib.pyplot as plt\n", "#\n", "from pyspark.sql.functions import *\n", "#\n", "import mlflow\n", "import logging\n", "#\n", "from sklearn.ensemble import RandomForestRegressor\n", "from sklearn.metrics import mean_squared_error\n", "#\n", "from pyspark.ml.regression import LinearRegression\n", "from pyspark.ml.feature import VectorAssembler\n", "from pyspark.ml import Pipeline\n", "#\n", "from databricks import feature_store" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "3a17f196-3a17-4b6e-b0c7-dc89a780d430", "showTitle": false, "title": "" } }, "outputs": [], "source": [ "logging.getLogger(\"mlflow\").setLevel(logging.FATAL)" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "493c1a72-5c5a-4d86-a3bf-1dad8f307290", "showTitle": false, "title": "" } }, "source": [ "

Load data into a pandas dataframe (for the sake of simplicity, let's only keep numerical columns):

" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "90352c13-b715-457a-8308-09e6c6844b1a", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
caratdepthtablepricexyz
269852.0160.161.0170688.148.064.87
291970.3359.061.06944.494.562.67
323400.3062.156.07894.294.312.67
\n", "
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "arguments": {}, "data": "
\n\n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
caratdepthtablepricexyz
269852.0160.161.0170688.148.064.87
291970.3359.061.06944.494.562.67
323400.3062.156.07894.294.312.67
\n
", "datasetInfos": [], "metadata": {}, "removedWidgets": [], "textData": null, "type": "htmlSandbox" } }, "output_type": "display_data" } ], "source": [ "diamonds_df = sns.load_dataset('diamonds').drop(columns=['cut', 'clarity', 'color'], axis=1)\n", "diamonds_df.sample(3)" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "05149010-aa7c-427b-ad6d-0af49c0d995d", "showTitle": false, "title": "" } }, "source": [ "

Let's drop duplicates and separate into train set (67%) and test set (33%):

" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "54c32173-999f-4935-95cc-9d68b05bcf84", "showTitle": false, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Number of rows test set: 17731\n", "Number of rows train set: 36001\n", "Sum of count rows of train and test set: 53732\n", "Total number of rows of initial dataframe: 53732\n" ] } ], "source": [ "diamonds_sdf = spark.createDataFrame(diamonds_df).dropDuplicates()\n", "#\n", "# Spark Dataframes\n", "test_sdf = diamonds_sdf.orderBy(rand()).limit(int(33*diamonds_sdf.count()/100))\n", "train_sdf = diamonds_sdf.subtract(test_sdf)\n", "#\n", "# Pandas Dataframes\n", "test_df = test_sdf.toPandas()\n", "train_df = train_sdf.toPandas()\n", "#\n", "print(f\"Number of rows test set: {test_sdf.count()}\")\n", "print(f\"Number of rows train set: {train_sdf.count()}\")\n", "print(f\"Sum of count rows of train and test set: {train_sdf.count() + test_sdf.count()}\")\n", "print(f\"Total number of rows of initial dataframe: {diamonds_sdf.count()}\")" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "86c30e4b-65a0-4e75-a7e7-155a5be096a3", "showTitle": false, "title": "" } }, "source": [ "

Scikit-learn library:

\n", "" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "b9ba297d-f5d5-464c-8465-a83126c38c37", "showTitle": false, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Registered model 'scikit-learn_model' already exists. Creating a new version of this model...\n", "Created version '7' of model 'scikit-learn_model'.\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
predictions
06771.932454
110611.573560
2741.122107
31805.671346
41587.993408
\n", "
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "arguments": {}, "data": "
\n\n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
predictions
06771.932454
110611.573560
2741.122107
31805.671346
41587.993408
\n
", "datasetInfos": [], "metadata": {}, "removedWidgets": [], "textData": null, "type": "htmlSandbox" } }, "output_type": "display_data" } ], "source": [ "# Prepare features and target dataframes\n", "X = train_df.drop('price', axis=1)\n", "y = train_df['price']\n", "#\n", "# train model (is automatically logged to mlflow)\n", "rf = RandomForestRegressor(n_estimators=100, max_depth=5)\n", "rf.fit(X, y)\n", "#\n", "# get latest run_id programmaticaly\n", "latest_run_id = mlflow.search_runs().sort_values(by=\"end_time\", ascending=False).head(1)['run_id'][0]\n", "#\n", "# uri to latest run (by default, artifact_path is 'model')\n", "uri_scikit_learn = f\"runs:/{latest_run_id}/model\"\n", "#\n", "# register latest logged model\n", "mlflow.register_model(uri_scikit_learn, name=\"scikit-learn_model\")\n", "#\n", "# load latest registered model\n", "scikit_learn_model = mlflow.pyfunc.load_model(uri_scikit_learn)\n", "#\n", "# prediction of test set using loaded model\n", "pd.DataFrame(scikit_learn_model.predict(test_df.drop('price', axis=1)), columns=['predictions']).head(5)" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "dcaf843c-eb33-4092-b312-137da9e35220", "showTitle": false, "title": "" } }, "source": [ "

MLlib library: There is an additional step which is to convert input to vector using VectorAssembler. Thus, we need a pipeline and we will log to MLflow the fitted pipeline.

\n", "" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "376bd9ee-157b-4964-8222-6824c01d8072", "showTitle": false, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Registered model 'mllib_model' already exists. Creating a new version of this model...\n", "Created version '4' of model 'mllib_model'.\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
predictions
06711.408932
19540.179628
2647.270082
32388.329445
41695.716464
\n", "
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "arguments": {}, "data": "
\n\n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
predictions
06711.408932
19540.179628
2647.270082
32388.329445
41695.716464
\n
", "datasetInfos": [], "metadata": {}, "removedWidgets": [], "textData": null, "type": "htmlSandbox" } }, "output_type": "display_data" } ], "source": [ "# set vector assembler parameters\n", "assembler_inputs = [c for c in train_sdf.columns if c not in ['price']]\n", "vec_assembler = VectorAssembler(inputCols=assembler_inputs, outputCol=\"features\")\n", "#\n", "# instantiate model\n", "mllib_rfr = LinearRegression(featuresCol=\"features\", labelCol='price')\n", "#\n", "# define pipeline stages\n", "stages = [vec_assembler, mllib_rfr]\n", "#\n", "# set pipeline\n", "pipeline = Pipeline(stages=stages)\n", "#\n", "# fit pipeline to train set\n", "model_mllib = pipeline.fit(train_sdf)\n", "#\n", "# get latest run_id programmaticaly\n", "latest_run_id = mlflow.search_runs().sort_values(by=\"end_time\", ascending=False).head(1)['run_id'][0]\n", "#\n", "# uri to latest run (by default, artifact_path is 'model')\n", "uri_mllib = f\"runs:/{latest_run_id}/model\"\n", "#\n", "# register latest logged model\n", "mlflow.register_model(uri_mllib, name=\"mllib_model\")\n", "#\n", "# load latest registered model\n", "mllib_model = mlflow.pyfunc.load_model(uri_mllib)\n", "#\n", "# Here predictions can be done using same input as for model trained using scikit learn library\n", "pd.DataFrame(mllib_model.predict(test_df.drop('price', axis=1)), columns=['predictions']).head(5)" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "8847c4cc-4b9a-4789-8394-fab399d94983", "showTitle": false, "title": "" } }, "source": [ "\n", "
\n", "6. Deploy a single-node model in parallel using spark_udf
\n", "

With the model trained using scikit-learn library, it is possible to load it and affect it to a Spark UDF function to make predictions:

" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "e309f1ac-9d80-4c0c-85ac-7df9f314710c", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "
priceprediction
45806771.9324540939115
840810611.573559702078
1103741.122106786866
13321805.6713457303665
12931587.9934075468402
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "aggData": [], "aggError": "", "aggOverflow": false, "aggSchema": [], "aggSeriesLimitReached": false, "aggType": "", "arguments": {}, "columnCustomDisplayInfos": {}, "data": [ [ 4580, 6771.9324540939115 ], [ 8408, 10611.573559702078 ], [ 1103, 741.122106786866 ], [ 1332, 1805.6713457303665 ], [ 1293, 1587.9934075468402 ] ], "datasetInfos": [], "dbfsResultPath": null, "isJsonSchema": true, "metadata": {}, "overflow": false, "plotOptions": { "customPlotOptions": {}, "displayType": "table", "pivotAggregation": null, "pivotColumns": null, "xColumns": null, "yColumns": null }, "removedWidgets": [], "schema": [ { "metadata": "{}", "name": "price", "type": "\"long\"" }, { "metadata": "{}", "name": "prediction", "type": "\"double\"" } ], "type": "table" } }, "output_type": "display_data" } ], "source": [ "# load model into a spark udf\n", "predict_scikit_learn = mlflow.pyfunc.spark_udf(spark, uri_scikit_learn)\n", "#\n", "# make predictions on the spark test dataframe\n", "display(test_sdf.withColumn(\"prediction\", predict_scikit_learn(*[c for c in test_sdf.columns if c not in ['price']])).select(\"price\", \"prediction\").limit(5))" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "ab3e9181-55f3-4134-bb15-6b9248d1d5b9", "showTitle": false, "title": "" } }, "source": [ "\n", "
\n", "7. Identify z-ordering as a solution for reducing the amount of time to read predictions\n", "from a table
\n", "

Z-Ordering: colocates related information in the same set of files

\n", "

Z-Ordering is a form of multi-dimensional clustering that colocates related information in the same set of files. It reduces the amount of data that needs to be read. See more here.

\n", "

Here after is an example of use of Z-ordering.

\n", "

Let's first write a dataframe as a Delta table:

" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "43526c33-a6e1-43d7-9520-41f7ab1f7558", "showTitle": false, "title": "" } }, "outputs": [], "source": [ "(train_sdf.write\n", " .format(\"delta\")\n", " .mode(\"overwrite\")\n", " .option(\"overwriteSchema\", \"true\")\n", " .saveAsTable(\"train_set_diamonds\"))" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "4a61070c-4099-4712-9c2a-8540c5afa9d9", "showTitle": false, "title": "" } }, "source": [ "

Let's get table location:

" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "d8164ea2-5abe-45de-9625-66297e69871f", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "
col_namedata_typecomment
Locationdbfs:/user/hive/warehouse/train_set_diamonds
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "aggData": [], "aggError": "", "aggOverflow": false, "aggSchema": [], "aggSeriesLimitReached": false, "aggType": "", "arguments": {}, "columnCustomDisplayInfos": {}, "data": [ [ "Location", "dbfs:/user/hive/warehouse/train_set_diamonds", "" ] ], "datasetInfos": [], "dbfsResultPath": null, "isJsonSchema": true, "metadata": {}, "overflow": false, "plotOptions": { "customPlotOptions": {}, "displayType": "table", "pivotAggregation": null, "pivotColumns": null, "xColumns": null, "yColumns": null }, "removedWidgets": [], "schema": [ { "metadata": "{\"comment\":\"name of the column\"}", "name": "col_name", "type": "\"string\"" }, { "metadata": "{\"comment\":\"data type of the column\"}", "name": "data_type", "type": "\"string\"" }, { "metadata": "{\"comment\":\"comment of the column\"}", "name": "comment", "type": "\"string\"" } ], "type": "table" } }, "output_type": "display_data" } ], "source": [ "display(spark.sql(\"describe table extended train_set_diamonds\").filter(\"col_name in ('Location')\"))" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "71cb898e-8da4-456d-acb8-a55f71ab6ad0", "showTitle": false, "title": "" } }, "source": [ "

Let's Z-order table by feature carat:

" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "67a61c82-7059-40c4-8a56-116d86d7339b", "showTitle": false, "title": "" } }, "outputs": [], "source": [ "delta_partitioned_path = \"dbfs:/user/hive/warehouse/train_set_diamonds\"\n", "#\n", "spark.sql(f\"OPTIMIZE delta.`{delta_partitioned_path}` ZORDER BY (carat)\");" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "cb2a00bd-a465-4ce9-8363-9ff1f189f5e2", "showTitle": false, "title": "" } }, "source": [ "\n", "
\n", "8. Identify partitioning on a common column to speed up querying
\n", "

Partitioning: stores data associated with different categorical values in different directories

\n", "

Partition will create as many folders as there are distinct values in the specified column for partitioning. Thus, column with high cardinality are not recommanded as partition key.

Here after is an example of use of Partitioning.

\n", "

Let's first reload the original dataframe and save it as a managed Delta table:

" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "0ba3aed9-a669-4f36-bb12-818f98871a6a", "showTitle": false, "title": "" } }, "outputs": [], "source": [ "(spark.createDataFrame(sns.load_dataset('diamonds')).write\n", " .format(\"delta\")\n", " .mode(\"overwrite\")\n", " .option(\"overwriteSchema\", \"true\")\n", " .saveAsTable(\"diamonds_df_not_partitioned\"))" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "860d0033-92f9-44d1-87f6-c37eccf3b835", "showTitle": false, "title": "" } }, "source": [ "

Let's have a look at the content of the delta table folder. We see that there are four parquet files an a folder _delta_log:

" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "e450e873-f459-4e17-9c4b-531b438f3d4a", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "
col_namedata_typecomment
Locationdbfs:/user/hive/warehouse/diamonds_df_not_partitioned
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "aggData": [], "aggError": "", "aggOverflow": false, "aggSchema": [], "aggSeriesLimitReached": false, "aggType": "", "arguments": {}, "columnCustomDisplayInfos": {}, "data": [ [ "Location", "dbfs:/user/hive/warehouse/diamonds_df_not_partitioned", "" ] ], "datasetInfos": [], "dbfsResultPath": null, "isJsonSchema": true, "metadata": {}, "overflow": false, "plotOptions": { "customPlotOptions": {}, "displayType": "table", "pivotAggregation": null, "pivotColumns": null, "xColumns": null, "yColumns": null }, "removedWidgets": [], "schema": [ { "metadata": "{\"comment\":\"name of the column\"}", "name": "col_name", "type": "\"string\"" }, { "metadata": "{\"comment\":\"data type of the column\"}", "name": "data_type", "type": "\"string\"" }, { "metadata": "{\"comment\":\"comment of the column\"}", "name": "comment", "type": "\"string\"" } ], "type": "table" } }, "output_type": "display_data" } ], "source": [ "display(spark.sql(\"describe table extended diamonds_df_not_partitioned\").filter(\"col_name in ('Location')\"))" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "c435ce0b-f242-4d60-a2e4-60ab6649056a", "showTitle": false, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "dbfs:/user/hive/warehouse/diamonds_df_not_partitioned/_delta_log/\n", "dbfs:/user/hive/warehouse/diamonds_df_not_partitioned/part-00000-ae36b44e-0c7d-4c9f-9a6b-303df7a6b41c-c000.snappy.parquet\n", "dbfs:/user/hive/warehouse/diamonds_df_not_partitioned/part-00001-88c54dec-c999-415e-a6b7-e18f6fcf912c-c000.snappy.parquet\n", "dbfs:/user/hive/warehouse/diamonds_df_not_partitioned/part-00002-05d4b875-93c6-49c3-a176-e25ac6c39cab-c000.snappy.parquet\n", "dbfs:/user/hive/warehouse/diamonds_df_not_partitioned/part-00003-739559b9-0ff4-443f-a6ef-502f1733bae4-c000.snappy.parquet\n" ] } ], "source": [ "for file in dbutils.fs.ls(\"dbfs:/user/hive/warehouse/diamonds_df_not_partitioned\"):\n", " print(file.path)" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "3a110cb9-e2fd-44f3-9701-06dde864d68e", "showTitle": false, "title": "" } }, "source": [ "

We can identify the feature cut as a good candidate to be the partition key:

" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "3129c2fe-338c-4478-ac5e-9634f7fdd2be", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "
cutcount
Ideal21551
Premium13791
Very Good12082
Good4906
Fair1610
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "aggData": [], "aggError": "", "aggOverflow": false, "aggSchema": [], "aggSeriesLimitReached": false, "aggType": "", "arguments": {}, "columnCustomDisplayInfos": {}, "data": [ [ "Ideal", 21551 ], [ "Premium", 13791 ], [ "Very Good", 12082 ], [ "Good", 4906 ], [ "Fair", 1610 ] ], "datasetInfos": [], "dbfsResultPath": null, "isJsonSchema": true, "metadata": {}, "overflow": false, "plotOptions": { "customPlotOptions": {}, "displayType": "table", "pivotAggregation": null, "pivotColumns": null, "xColumns": null, "yColumns": null }, "removedWidgets": [], "schema": [ { "metadata": "{}", "name": "cut", "type": "\"string\"" }, { "metadata": "{}", "name": "count", "type": "\"long\"" } ], "type": "table" } }, "output_type": "display_data" } ], "source": [ "display(spark.table(\"diamonds_df_not_partitioned\").groupBy(\"cut\").count().orderBy(desc('count')))" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "ac6e9f96-bf12-4291-b243-0d782ea17ea9", "showTitle": false, "title": "" } }, "source": [ "

Let's partition using cut as partition key:

" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "bdab33b6-54d0-41d7-bbab-78f1d6dc801f", "showTitle": false, "title": "" } }, "outputs": [], "source": [ "(spark.table(\"diamonds_df_not_partitioned\")\n", " .write.partitionBy(\"cut\")\n", " .format(\"delta\")\n", " .mode(\"overwrite\")\n", " .option(\"overwriteSchema\", \"true\")\n", " .saveAsTable(\"diamonds_df_partitioned\"))" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "7fad799d-4524-40eb-a7c0-77ae6da2175d", "showTitle": false, "title": "" } }, "source": [ "

Now let's have a look at the content of the partitioned table. We see there as many folders as there are distinct values in column cut. This will speed-up requests.

" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "d003f526-9e67-44f4-9500-fc80c99a39e0", "showTitle": false, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "dbfs:/user/hive/warehouse/diamonds_df_partitioned/_delta_log/\n", "dbfs:/user/hive/warehouse/diamonds_df_partitioned/cut=Fair/\n", "dbfs:/user/hive/warehouse/diamonds_df_partitioned/cut=Good/\n", "dbfs:/user/hive/warehouse/diamonds_df_partitioned/cut=Ideal/\n", "dbfs:/user/hive/warehouse/diamonds_df_partitioned/cut=Premium/\n", "dbfs:/user/hive/warehouse/diamonds_df_partitioned/cut=Very Good/\n" ] } ], "source": [ "for file in dbutils.fs.ls(\"dbfs:/user/hive/warehouse/diamonds_df_partitioned\"):\n", " print(file.path)" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "088d26da-3f3c-400e-b86c-a9c5900e7c5c", "showTitle": false, "title": "" } }, "source": [ "\n", "
\n", "9. Describe the practical benefits of using the score_batch operation
\n", "

score_batch let's make predictions easily on a large amount of data at a time using features coming from feature store.

\n", "

Let's have a look at an example to illustrate this requirement:

\n", "
    \n", "
  1. Load dataset: The dataset used for the example is diamonds dataset from Seaborn library
  2. \n", "
  3. Create Feature table in Feature Store
  4. \n", "
  5. Push preprocessed features to Feature Store
  6. \n", "
  7. Create train and test sets
  8. \n", "
  9. Prepare and train models
  10. \n", "
  11. Log models to associate them to features in Features Store
  12. \n", "
  13. Score models on test set using score_batch
  14. \n", "
  15. Case of new data
  16. \n", "
" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "1e8c666e-4e83-481e-ba1c-9171de353f58", "showTitle": false, "title": "" } }, "source": [ "

1. Load dataset

\n", "\n", "Looks like there is a problem with having a column named x or X... let's rename." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "e3116320-dbb9-4aea-b8d4-b826acdfc39b", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "
indexcaratcutcolorclaritydepthtablepricex_ryz
00.23IdealESI261.555.03263.953.982.43
10.21PremiumESI159.861.03263.893.842.31
20.23GoodEVS156.965.03274.054.072.31
30.29PremiumIVS262.458.03344.24.232.63
40.31GoodJSI263.358.03354.344.352.75
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "aggData": [], "aggError": "", "aggOverflow": false, "aggSchema": [], "aggSeriesLimitReached": false, "aggType": "", "arguments": {}, "columnCustomDisplayInfos": {}, "data": [ [ 0, 0.23, "Ideal", "E", "SI2", 61.5, 55, 326, 3.95, 3.98, 2.43 ], [ 1, 0.21, "Premium", "E", "SI1", 59.8, 61, 326, 3.89, 3.84, 2.31 ], [ 2, 0.23, "Good", "E", "VS1", 56.9, 65, 327, 4.05, 4.07, 2.31 ], [ 3, 0.29, "Premium", "I", "VS2", 62.4, 58, 334, 4.2, 4.23, 2.63 ], [ 4, 0.31, "Good", "J", "SI2", 63.3, 58, 335, 4.34, 4.35, 2.75 ] ], "datasetInfos": [], "dbfsResultPath": null, "isJsonSchema": true, "metadata": {}, "overflow": false, "plotOptions": { "customPlotOptions": {}, "displayType": "table", "pivotAggregation": null, "pivotColumns": null, "xColumns": null, "yColumns": null }, "removedWidgets": [], "schema": [ { "metadata": "{}", "name": "index", "type": "\"long\"" }, { "metadata": "{}", "name": "carat", "type": "\"double\"" }, { "metadata": "{}", "name": "cut", "type": "\"string\"" }, { "metadata": "{}", "name": "color", "type": "\"string\"" }, { "metadata": "{}", "name": "clarity", "type": "\"string\"" }, { "metadata": "{}", "name": "depth", "type": "\"double\"" }, { "metadata": "{}", "name": "table", "type": "\"double\"" }, { "metadata": "{}", "name": "price", "type": "\"long\"" }, { "metadata": "{}", "name": "x_r", "type": "\"double\"" }, { "metadata": "{}", "name": "y", "type": "\"double\"" }, { "metadata": "{}", "name": "z", "type": "\"double\"" } ], "type": "table" } }, "output_type": "display_data" } ], "source": [ "pd_diamonds = sns.load_dataset('diamonds').reset_index()\n", "#\n", "diamonds_full = spark.createDataFrame(pd_diamonds).withColumnRenamed('x', 'x_r')\n", "#\n", "display(diamonds_full.limit(5))" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "91f71306-b829-4150-8562-2d7cc4728b48", "showTitle": false, "title": "" } }, "source": [ "

2. Create Feature table in Feature Store

" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "92306fc4-ca06-46bb-9aa2-f49eb4c3e6d2", "showTitle": false, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "2023/11/23 17:32:06 WARNING databricks.feature_store._compute_client._compute_client: Deleting a feature table can lead to unexpected failures in upstream producers and downstream consumers (models, endpoints, and scheduled jobs).\n", "2023/11/23 17:32:09 INFO databricks.feature_store._compute_client._compute_client: Created feature table 'hive_metastore.default.diamonds_fs'.\n" ] } ], "source": [ "# create a feature store client\n", "fs = feature_store.FeatureStoreClient()\n", "#\n", "# fs.drop_table(\"default.diamonds_fs\")\n", "#\n", "# create feature table - as only the scema is provided in the command below, it will only create the table structure without populating it with data\n", "result = fs.create_table(name=\"diamonds_fs\", # required\n", " primary_keys=[\"index\"], # required\n", " schema=diamonds_full.drop(\"price\").schema, # need either dataframe schema\n", " #df=diamonds_full, # or dataframe itself\n", " description=\"seaborn diamonds dataset\");" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "4e64855c-5e9e-4ebc-bd3a-168ec46d9758", "showTitle": false, "title": "" } }, "source": [ "

3. Push preprocessed features to Feature table

\n", "(There's no preprocessing done there for the sake of simplicity. Ideally, features pushed to Feature Store should be processed and ready to be used for model training)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "dd3f95e8-ebae-474a-8372-dd3043a63f28", "showTitle": false, "title": "" } }, "outputs": [], "source": [ "fs.write_table(name=\"diamonds_fs\",\n", " df=diamonds_full.drop('price'),\n", " mode='merge')" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "9215ba4b-6e84-451a-ba95-d3e012c12ba5", "showTitle": false, "title": "" } }, "source": [ "

After that, Feature table is available in Features menu as well the associated Delta table in Catalog menu.

\n", "" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "e96a564b-f4ac-423f-bfde-7a1ec12fa25b", "showTitle": false, "title": "" } }, "source": [ "

4. Create train and test sets

\n", "

Here, features are now available in Features Store. It is possible to load them from there to train a model. For this example, we will:

\n", "\n", "

What will help to make the difference between the train set and test set is the Primary key column: index.

\n", "

Moreover, later for scoring by using the test set, we will need the initial target values from initial price column. Thus, the columns needed for the train and test sets are: index and price.

\n", "" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "1317f3aa-58c4-4c96-9744-ce965b4bd720", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "
priceindex
3378
3365
3260
3343
3366
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "aggData": [], "aggError": "", "aggOverflow": false, "aggSchema": [], "aggSeriesLimitReached": false, "aggType": "", "arguments": {}, "columnCustomDisplayInfos": {}, "data": [ [ 337, 8 ], [ 336, 5 ], [ 326, 0 ], [ 334, 3 ], [ 336, 6 ] ], "datasetInfos": [], "dbfsResultPath": null, "isJsonSchema": true, "metadata": {}, "overflow": false, "plotOptions": { "customPlotOptions": {}, "displayType": "table", "pivotAggregation": null, "pivotColumns": null, "xColumns": null, "yColumns": null }, "removedWidgets": [], "schema": [ { "metadata": "{}", "name": "price", "type": "\"long\"" }, { "metadata": "{}", "name": "index", "type": "\"long\"" } ], "type": "table" } }, "output_type": "display_data" } ], "source": [ "y_test = diamonds_full.select(\"price\", \"index\").orderBy(rand()).limit(int(33*diamonds_full.count()/100))\n", "y_train = diamonds_full.select(\"price\", \"index\").subtract(y_test)\n", "#\n", "display(y_train.limit(5))" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "b207fd0e-c0f0-4dbe-a031-91d8c9744087", "showTitle": false, "title": "" } }, "source": [ "

Let's create the Feature Store training sets.

\n", "" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "609d2917-17b5-467c-8d2b-7a8fbd1beb36", "showTitle": false, "title": "" } }, "source": [ "Four features training set:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "46d75cc8-2c72-4e8e-be93-46b2052dd79f", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "
x_ryzcaratprice
3.873.782.490.22337
3.943.962.480.24336
3.953.982.430.23326
4.24.232.630.29334
3.953.982.470.24336
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "aggData": [], "aggError": "", "aggOverflow": false, "aggSchema": [], "aggSeriesLimitReached": false, "aggType": "", "arguments": {}, "columnCustomDisplayInfos": {}, "data": [ [ 3.87, 3.78, 2.49, 0.22, 337 ], [ 3.94, 3.96, 2.48, 0.24, 336 ], [ 3.95, 3.98, 2.43, 0.23, 326 ], [ 4.2, 4.23, 2.63, 0.29, 334 ], [ 3.95, 3.98, 2.47, 0.24, 336 ] ], "datasetInfos": [], "dbfsResultPath": null, "isJsonSchema": true, "metadata": {}, "overflow": false, "plotOptions": { "customPlotOptions": {}, "displayType": "table", "pivotAggregation": null, "pivotColumns": null, "xColumns": null, "yColumns": null }, "removedWidgets": [], "schema": [ { "metadata": "{}", "name": "x_r", "type": "\"double\"" }, { "metadata": "{}", "name": "y", "type": "\"double\"" }, { "metadata": "{}", "name": "z", "type": "\"double\"" }, { "metadata": "{}", "name": "carat", "type": "\"double\"" }, { "metadata": "{}", "name": "price", "type": "\"long\"" } ], "type": "table" } }, "output_type": "display_data" } ], "source": [ "# With 4 features: x, y, z, carat\n", "feature_lookups_4_features = [feature_store.FeatureLookup(table_name=\"diamonds_fs\",\n", " feature_names=['x_r', 'y', 'z', 'carat'],\n", " lookup_key=\"index\")]\n", "#\n", "# create associated training set\n", "train_set_4_features = fs.create_training_set(y_train,\n", " feature_lookups_4_features,\n", " label=\"price\",\n", " exclude_columns=\"index\")\n", "#\n", "# load training set\n", "train_set_4 = train_set_4_features.load_df()\n", "#\n", "# display to check\n", "display(train_set_4.limit(5))" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "81d2a88a-d33c-44cb-80a0-143e574891bc", "showTitle": false, "title": "" } }, "source": [ "All numerical features training set:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "7d56d779-8327-4bdd-b9e7-859c60d9d935", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "
caratdepthtablex_ryzprice
0.2265.161.03.873.782.49337
0.2462.857.03.943.962.48336
0.2361.555.03.953.982.43326
0.2962.458.04.24.232.63334
0.2462.357.03.953.982.47336
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "aggData": [], "aggError": "", "aggOverflow": false, "aggSchema": [], "aggSeriesLimitReached": false, "aggType": "", "arguments": {}, "columnCustomDisplayInfos": {}, "data": [ [ 0.22, 65.1, 61, 3.87, 3.78, 2.49, 337 ], [ 0.24, 62.8, 57, 3.94, 3.96, 2.48, 336 ], [ 0.23, 61.5, 55, 3.95, 3.98, 2.43, 326 ], [ 0.29, 62.4, 58, 4.2, 4.23, 2.63, 334 ], [ 0.24, 62.3, 57, 3.95, 3.98, 2.47, 336 ] ], "datasetInfos": [], "dbfsResultPath": null, "isJsonSchema": true, "metadata": {}, "overflow": false, "plotOptions": { "customPlotOptions": {}, "displayType": "table", "pivotAggregation": null, "pivotColumns": null, "xColumns": null, "yColumns": null }, "removedWidgets": [], "schema": [ { "metadata": "{}", "name": "carat", "type": "\"double\"" }, { "metadata": "{}", "name": "depth", "type": "\"double\"" }, { "metadata": "{}", "name": "table", "type": "\"double\"" }, { "metadata": "{}", "name": "x_r", "type": "\"double\"" }, { "metadata": "{}", "name": "y", "type": "\"double\"" }, { "metadata": "{}", "name": "z", "type": "\"double\"" }, { "metadata": "{}", "name": "price", "type": "\"long\"" } ], "type": "table" } }, "output_type": "display_data" } ], "source": [ "# With all numerical features\n", "feature_lookups_all_features = [feature_store.FeatureLookup(table_name=\"diamonds_fs\",\n", " feature_names=[c for c in diamonds_full.columns if c not in ['index', 'cut', 'clarity', 'price', 'color']],\n", " lookup_key=\"index\")]\n", "#\n", "# create associated training set\n", "train_set_all_features = fs.create_training_set(y_train,\n", " feature_lookups_all_features,\n", " label=\"price\",\n", " exclude_columns=\"index\")\n", "#\n", "# load training set\n", "train_set_all = train_set_all_features.load_df()\n", "#\n", "# display to check\n", "display(train_set_all.limit(5))" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "a8693996-799e-4cae-9e14-b6ba149a1c6f", "showTitle": false, "title": "" } }, "source": [ "

5. Prepare and train models

\n", "

Training of two scikit-learn models based on different features sets. Input dataframes need to be pandas dataframes/series.

" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "1ddce2d7-694d-42c1-ae9a-478d026a351f", "showTitle": false, "title": "" } }, "source": [ "Model trained using the four features dataset:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "703f0b5c-bfe3-4c2e-868d-272162750573", "showTitle": false, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Out[36]: RandomForestRegressor()" ] } ], "source": [ "X_train_4 = train_set_4.drop(\"price\").toPandas()\n", "y_train_4 = train_set_4.toPandas()[\"price\"]\n", "#\n", "rf_4_model = RandomForestRegressor()\n", "#\n", "rf_4_model.fit(X_train_4, y_train_4)" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "0814a65e-5ff5-4c18-b53a-1fadff6ae268", "showTitle": false, "title": "" } }, "source": [ "Model trained using all numerical features dataset:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "82e4e873-31bd-4c18-874a-f88ce8c54c82", "showTitle": false, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Out[37]: RandomForestRegressor()" ] } ], "source": [ "X_train_all = train_set_all.drop(\"price\").toPandas()\n", "y_train_all = train_set_all.toPandas()[\"price\"]\n", "#\n", "rf_all_model = RandomForestRegressor()\n", "#\n", "rf_all_model.fit(X_train_all, y_train_all)" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "838f0a82-3f68-4c36-b235-3ce2f873410a", "showTitle": false, "title": "" } }, "source": [ "

6. Log models to associate them to features in Feature Store

" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "cdd03ad9-eab5-420a-a198-dd2c90f71ef3", "showTitle": false, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Registered model 'trained_with_4_features' already exists. Creating a new version of this model...\n", "Created version '7' of model 'trained_with_4_features'.\n" ] } ], "source": [ "model_name_4_features = \"trained_with_4_features\"\n", "#\n", "fs.log_model(rf_4_model,\n", " artifact_path=model_name_4_features, # parameter required\n", " flavor=mlflow.sklearn, # parameter required\n", " training_set=train_set_4_features, # either training_set or feature_spec_path parameters required\n", " registered_model_name=model_name_4_features); # not required. However model will not be linked to features in features store until model is registered" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "e44704fb-e0dd-451b-803f-70f7ba6a06c0", "showTitle": false, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Registered model 'trained_with_all_features' already exists. Creating a new version of this model...\n", "Created version '6' of model 'trained_with_all_features'.\n" ] } ], "source": [ "model_name_all_features = \"trained_with_all_features\"\n", "#\n", "fs.log_model(rf_all_model,\n", " artifact_path=model_name_all_features, # parameter required\n", " flavor=mlflow.sklearn, # parameter required\n", " training_set=train_set_all_features, # either training_set or feature_spec_path parameters required\n", " registered_model_name=model_name_all_features); # not required. However model will not be linked to features in features store until model is registered" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "61497a33-a9a4-4bf9-88b2-02e0f63f8bc3", "showTitle": false, "title": "" } }, "source": [ "

At this point we can see that models are associated with the features they were trained on in Features Store:

\n", "" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "b3a733f0-123a-4fe7-b497-70b72e58b9fc", "showTitle": false, "title": "" } }, "source": [ "

7. Score models on test set using score_batch

" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "b3f591e7-37f6-4ec6-a5d2-27f4db5323ec", "showTitle": false, "title": "" } }, "source": [ "

Model trained on four features:

" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "a3dac6a5-0caa-4252-8295-40383c5592aa", "showTitle": false, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "runs:/45a6dbcde3a1464a84a61353caf60365/trained_with_4_features\n" ] }, { "data": { "text/html": [ "
priceprediction
51466148.454523809524
9981054.7966666666666
722920.675
756911.7239999999999
43124765.28
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "aggData": [], "aggError": "", "aggOverflow": false, "aggSchema": [], "aggSeriesLimitReached": false, "aggType": "", "arguments": {}, "columnCustomDisplayInfos": {}, "data": [ [ 5146, 6148.454523809524 ], [ 998, 1054.7966666666666 ], [ 722, 920.675 ], [ 756, 911.7239999999999 ], [ 4312, 4765.28 ] ], "datasetInfos": [], "dbfsResultPath": null, "isJsonSchema": true, "metadata": {}, "overflow": false, "plotOptions": { "customPlotOptions": {}, "displayType": "table", "pivotAggregation": null, "pivotColumns": null, "xColumns": null, "yColumns": null }, "removedWidgets": [], "schema": [ { "metadata": "{}", "name": "price", "type": "\"long\"" }, { "metadata": "{}", "name": "prediction", "type": "\"double\"" } ], "type": "table" } }, "output_type": "display_data" } ], "source": [ "# latest run id for model named \"trained_with_4_features\"\n", "for val in mlflow.MlflowClient().get_registered_model(model_name_4_features):\n", " if val[0]=='latest_versions':\n", " run_id_4 = val[1][0].run_id\n", "#\n", "# uri to latest run\n", "uri_4_features = f\"runs:/{run_id_4}/{model_name_4_features}\"\n", "print(uri_4_features)\n", "#\n", "# predict on test set\n", "predictions_df_4_features = fs.score_batch(uri_4_features, y_test).select(\"price\", \"prediction\");\n", "display(predictions_df_4_features.orderBy(rand()).limit(5));" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "18a6eaba-d08c-477b-ae20-a4baa09390cd", "showTitle": false, "title": "" } }, "source": [ "

Score RMSE on test set for model trained with 4 features:

" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "8df34d1d-3064-4c18-b8b7-a4ff9fb793a0", "showTitle": false, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "RMSE for model trained on 4 features: 1448.8825377496491\n" ] } ], "source": [ "print(\"RMSE for model trained on 4 features:\",\n", " mean_squared_error(predictions_df_4_features.toPandas()['price'], predictions_df_4_features.toPandas()['prediction'], squared=False))" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "10058824-419f-4ab9-b8d4-049a99b6db86", "showTitle": false, "title": "" } }, "source": [ "

Model trained on all numerical features:

" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "26c4c37f-55a2-463c-9eaa-a737070e550a", "showTitle": false, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "runs:/45a6dbcde3a1464a84a61353caf60365/trained_with_all_features\n" ] }, { "data": { "text/html": [ "
priceprediction
33523267.8175
22811654.9560000000001
36015770.05
90328038.75
61266958.44
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "aggData": [], "aggError": "", "aggOverflow": false, "aggSchema": [], "aggSeriesLimitReached": false, "aggType": "", "arguments": {}, "columnCustomDisplayInfos": {}, "data": [ [ 3352, 3267.8175 ], [ 2281, 1654.9560000000001 ], [ 3601, 5770.05 ], [ 9032, 8038.75 ], [ 6126, 6958.44 ] ], "datasetInfos": [], "dbfsResultPath": null, "isJsonSchema": true, "metadata": {}, "overflow": false, "plotOptions": { "customPlotOptions": {}, "displayType": "table", "pivotAggregation": null, "pivotColumns": null, "xColumns": null, "yColumns": null }, "removedWidgets": [], "schema": [ { "metadata": "{}", "name": "price", "type": "\"long\"" }, { "metadata": "{}", "name": "prediction", "type": "\"double\"" } ], "type": "table" } }, "output_type": "display_data" } ], "source": [ "# latest run id for model named \"trained_with_all_features\"\n", "for val in mlflow.MlflowClient().get_registered_model(model_name_all_features):\n", " if val[0]=='latest_versions':\n", " run_id_all = val[1][0].run_id\n", "#\n", "# uri to latest run\n", "uri_all_features = f\"runs:/{run_id_all}/{model_name_all_features}\"\n", "print(uri_all_features)\n", "#\n", "# predict on test set\n", "predictions_df_all_features = fs.score_batch(uri_all_features, y_test).select(\"price\", \"prediction\", \"carat\");\n", "display(predictions_df_all_features.select(\"price\", \"prediction\").orderBy(rand()).limit(5));" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "1553678a-e273-4b23-91ed-568c901ab53c", "showTitle": false, "title": "" } }, "source": [ "

Score RMSE on test set for model trained with all numerical features:

" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "97dab406-3bc9-4a07-9f51-02471704d3eb", "showTitle": false, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "1401.7197923544627\n" ] } ], "source": [ "print(mean_squared_error(predictions_df_all_features.toPandas()['price'], predictions_df_all_features.toPandas()['prediction'], squared=False))" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "48be5619-0c6d-4915-b1b5-590396271f26", "showTitle": false, "title": "" } }, "source": [ "

Comparison of actual price and predicted price according to carat for model trained using all numerical features, on a sample of 1000 random entries:

" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "f53c8f7e-9a5b-424f-9ba9-283685d3ad54", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "image/png": "\n" }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "arguments": {}, "data": "\n", "datasetInfos": [], "metadata": {}, "removedWidgets": [], "type": "image" } }, "output_type": "display_data" } ], "source": [ "plt.figure(figsize=(17, 6))\n", "#\n", "sample_predictions = predictions_df_all_features.orderBy(rand()).limit(1000)\n", "#\n", "plt.scatter(list(sample_predictions.toPandas()['carat']), list(sample_predictions.toPandas()['price']), label='Actual Price', color='blue', marker='o')\n", "plt.scatter(list(sample_predictions.toPandas()['carat']), list(sample_predictions.toPandas()['prediction']), label='Prediction', color='orange', marker='s')\n", "#\n", "# Adding labels and title\n", "plt.xlabel('Carat')\n", "plt.ylabel('Prices')\n", "plt.title('Actual Price vs Prediction')\n", "#\n", "# Adding grid for better readability\n", "plt.grid(True, linestyle='--', alpha=0.7)\n", "#\n", "# Adding legend\n", "plt.legend()\n", "#\n", "# Show plot\n", "plt.show();" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "e0bcb9d4-60d8-49f9-9af6-9d706b2badb2", "showTitle": false, "title": "" } }, "source": [ "

8. Case of new data

\n", "

How to predict newly arriving data if there is no information on it in Feature Store? Need to update first the Feature Store with new data:

" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "2b86ebd5-e014-43e9-882b-3e8dde08b4e0", "showTitle": false, "title": "" } }, "source": [ "

Let's create a new diamond data. Schema should match data in Feature Store:

" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "49b297af-6a07-448a-9342-e09d5847cc28", "showTitle": false, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- index: long (nullable = true)\n", " |-- carat: double (nullable = true)\n", " |-- cut: string (nullable = true)\n", " |-- color: string (nullable = true)\n", " |-- clarity: string (nullable = true)\n", " |-- depth: double (nullable = true)\n", " |-- table: double (nullable = true)\n", " |-- price: long (nullable = true)\n", " |-- x_r: double (nullable = true)\n", " |-- y: double (nullable = true)\n", " |-- z: double (nullable = true)\n", "\n" ] } ], "source": [ "diamonds_full.printSchema()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "f332ab5d-e77c-4880-bf10-1c0863f228a0", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "
indexcaratcutcolorclaritydepthtablepricex_ryz
888877772.0GoodEVS140.064.03264.143.52.1
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "aggData": [], "aggError": "", "aggOverflow": false, "aggSchema": [], "aggSeriesLimitReached": false, "aggType": "", "arguments": {}, "columnCustomDisplayInfos": {}, "data": [ [ 88887777, 2, "Good", "E", "VS1", 40, 64, 326, 4.14, 3.5, 2.1 ] ], "datasetInfos": [], "dbfsResultPath": null, "isJsonSchema": true, "metadata": {}, "overflow": false, "plotOptions": { "customPlotOptions": {}, "displayType": "table", "pivotAggregation": null, "pivotColumns": null, "xColumns": null, "yColumns": null }, "removedWidgets": [], "schema": [ { "metadata": "{}", "name": "index", "type": "\"long\"" }, { "metadata": "{}", "name": "carat", "type": "\"double\"" }, { "metadata": "{}", "name": "cut", "type": "\"string\"" }, { "metadata": "{}", "name": "color", "type": "\"string\"" }, { "metadata": "{}", "name": "clarity", "type": "\"string\"" }, { "metadata": "{}", "name": "depth", "type": "\"double\"" }, { "metadata": "{}", "name": "table", "type": "\"double\"" }, { "metadata": "{}", "name": "price", "type": "\"long\"" }, { "metadata": "{}", "name": "x_r", "type": "\"double\"" }, { "metadata": "{}", "name": "y", "type": "\"double\"" }, { "metadata": "{}", "name": "z", "type": "\"double\"" } ], "type": "table" } }, "output_type": "display_data" }, { "data": { "text/html": [ "
indexprice
888877774500
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "aggData": [], "aggError": "", "aggOverflow": false, "aggSchema": [], "aggSeriesLimitReached": false, "aggType": "", "arguments": {}, "columnCustomDisplayInfos": {}, "data": [ [ 88887777, 4500 ] ], "datasetInfos": [], "dbfsResultPath": null, "isJsonSchema": true, "metadata": {}, "overflow": false, "plotOptions": { "customPlotOptions": {}, "displayType": "table", "pivotAggregation": null, "pivotColumns": null, "xColumns": null, "yColumns": null }, "removedWidgets": [], "schema": [ { "metadata": "{}", "name": "index", "type": "\"long\"" }, { "metadata": "{}", "name": "price", "type": "\"long\"" } ], "type": "table" } }, "output_type": "display_data" }, { "data": { "text/html": [ "
index
88887777
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "aggData": [], "aggError": "", "aggOverflow": false, "aggSchema": [], "aggSeriesLimitReached": false, "aggType": "", "arguments": {}, "columnCustomDisplayInfos": {}, "data": [ [ 88887777 ] ], "datasetInfos": [], "dbfsResultPath": null, "isJsonSchema": true, "metadata": {}, "overflow": false, "plotOptions": { "customPlotOptions": {}, "displayType": "table", "pivotAggregation": null, "pivotColumns": null, "xColumns": null, "yColumns": null }, "removedWidgets": [], "schema": [ { "metadata": "{}", "name": "index", "type": "\"long\"" } ], "type": "table" } }, "output_type": "display_data" } ], "source": [ "new_diamond = (diamonds_full.limit(1).withColumn('index', lit(88887777).cast('long'))\n", " .withColumn('carat', lit(2).cast('double'))\n", " .withColumn('cut', lit('Good').cast('string'))\n", " .withColumn('color', lit('E').cast('string'))\n", " .withColumn('clarity', lit('VS1').cast('string'))\n", " .withColumn('depth', lit(40).cast('double'))\n", " .withColumn('table', lit(64).cast('double'))\n", " .withColumn('x_r', lit(4.14).cast('double'))\n", " .withColumn('y', lit(3.5).cast('double'))\n", " .withColumn('z', lit(2.1).cast('double')))\n", "#\n", "new_diamond_with_price = spark.createDataFrame(pd.DataFrame({'index': [88887777], 'price': [4500]}))\n", "#\n", "new_diamond_without_price = spark.createDataFrame(pd.DataFrame({'index': [88887777]}))\n", "#\n", "diamond_unknown = spark.createDataFrame(pd.DataFrame({'index': [98989898]}))\n", "#\n", "display(new_diamond)\n", "display(new_diamond_with_price)\n", "display(new_diamond_without_price)" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "69917373-02c9-46f1-adc7-73a283e1505b", "showTitle": false, "title": "" } }, "source": [ "

Now, update the Feature Store with the new diamond data:

" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "ad9f4763-9446-41a4-b963-e51c0d6fcb08", "showTitle": false, "title": "" } }, "outputs": [], "source": [ "fs.write_table(name=\"diamonds_fs\",\n", " df=new_diamond,\n", " mode='merge')" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "cdaea63c-b435-4f75-8cf0-aec1b2991b1b", "showTitle": false, "title": "" } }, "source": [ "

We verify that score_batch predicts either with/without the price of the new data, the only requirement is the primary key - in this particular case, column index - of the new diamond data in the Feature Store:

" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "88bed0bc-f4d4-4f22-95ca-83d4a56c4c84", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "
indexpricecaratdepthtablex_ryzprediction
8888777745002.040.064.04.143.52.19639.403333333334
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "aggData": [], "aggError": "", "aggOverflow": false, "aggSchema": [], "aggSeriesLimitReached": false, "aggType": "", "arguments": {}, "columnCustomDisplayInfos": {}, "data": [ [ 88887777, 4500, 2, 40, 64, 4.14, 3.5, 2.1, 9639.403333333334 ] ], "datasetInfos": [], "dbfsResultPath": null, "isJsonSchema": true, "metadata": {}, "overflow": false, "plotOptions": { "customPlotOptions": {}, "displayType": "table", "pivotAggregation": null, "pivotColumns": null, "xColumns": null, "yColumns": null }, "removedWidgets": [], "schema": [ { "metadata": "{}", "name": "index", "type": "\"long\"" }, { "metadata": "{}", "name": "price", "type": "\"long\"" }, { "metadata": "{}", "name": "carat", "type": "\"double\"" }, { "metadata": "{}", "name": "depth", "type": "\"double\"" }, { "metadata": "{}", "name": "table", "type": "\"double\"" }, { "metadata": "{}", "name": "x_r", "type": "\"double\"" }, { "metadata": "{}", "name": "y", "type": "\"double\"" }, { "metadata": "{}", "name": "z", "type": "\"double\"" }, { "metadata": "{}", "name": "prediction", "type": "\"double\"" } ], "type": "table" } }, "output_type": "display_data" }, { "data": { "text/html": [ "
indexcaratdepthtablex_ryzprediction
888877772.040.064.04.143.52.19639.403333333334
" ] }, "metadata": { "application/vnd.databricks.v1+output": { "addedWidgets": {}, "aggData": [], "aggError": "", "aggOverflow": false, "aggSchema": [], "aggSeriesLimitReached": false, "aggType": "", "arguments": {}, "columnCustomDisplayInfos": {}, "data": [ [ 88887777, 2, 40, 64, 4.14, 3.5, 2.1, 9639.403333333334 ] ], "datasetInfos": [], "dbfsResultPath": null, "isJsonSchema": true, "metadata": {}, "overflow": false, "plotOptions": { "customPlotOptions": {}, "displayType": "table", "pivotAggregation": null, "pivotColumns": null, "xColumns": null, "yColumns": null }, "removedWidgets": [], "schema": [ { "metadata": "{}", "name": "index", "type": "\"long\"" }, { "metadata": "{}", "name": "carat", "type": "\"double\"" }, { "metadata": "{}", "name": "depth", "type": "\"double\"" }, { "metadata": "{}", "name": "table", "type": "\"double\"" }, { "metadata": "{}", "name": "x_r", "type": "\"double\"" }, { "metadata": "{}", "name": "y", "type": "\"double\"" }, { "metadata": "{}", "name": "z", "type": "\"double\"" }, { "metadata": "{}", "name": "prediction", "type": "\"double\"" } ], "type": "table" } }, "output_type": "display_data" } ], "source": [ "# predict with price\n", "predictions_new_diamond_with_price = fs.score_batch(uri_all_features, new_diamond_with_price)\n", "display(predictions_new_diamond_with_price)\n", "#\n", "# predict without price\n", "predictions_new_diamond_without_price = fs.score_batch(uri_all_features, new_diamond_without_price)\n", "display(predictions_new_diamond_without_price)" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "88f5654a-c3c1-4b83-a268-d8d67bd94906", "showTitle": false, "title": "" } }, "source": [ "

And verify that if a primary key is not found in the Feature Store, it results in an error:

" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "fac28dd7-81a6-4ea8-bec1-b779aaa14d15", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "\n", "org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 431.0 failed 4 times, most recent failure: Lost task 0.3 in stage 431.0 (TID 1339) (10.139.64.4 executor driver): org.apache.spark.api.python.PythonException: 'ValueError: Input contains NaN, infinity or a value too large for dtype('float32').'. Full traceback below:\n", "Traceback (most recent call last):\n", " File \"/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py\", line 1025, in udf\n", " os.kill(scoring_server_proc.pid, signal.SIGTERM)\n", " File \"/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py\", line 866, in _predict_row_batch\n", " result = predict_fn(pdf)\n", " File \"/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py\", line 1006, in batch_predict_fn\n", " return loaded_model.predict(pdf)\n", " File \"/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py\", line 373, in predict\n", " return self._predict_fn(data)\n", " File \"/databricks/python/lib/python3.9/site-packages/sklearn/ensemble/_forest.py\", line 784, in predict\n", " X = self._validate_X_predict(X)\n", " File \"/databricks/python/lib/python3.9/site-packages/sklearn/ensemble/_forest.py\", line 422, in _validate_X_predict\n", " return self.estimators_[0]._validate_X_predict(X, check_input=True)\n", " File \"/databricks/python/lib/python3.9/site-packages/sklearn/tree/_classes.py\", line 407, in _validate_X_predict\n", " X = self._validate_data(X, dtype=DTYPE, accept_sparse=\"csr\",\n", " File \"/databricks/python/lib/python3.9/site-packages/sklearn/base.py\", line 421, in _validate_data\n", " X = check_array(X, **check_params)\n", " File \"/databricks/python/lib/python3.9/site-packages/sklearn/utils/validation.py\", line 63, in inner_f\n", " return f(*args, **kwargs)\n", " File \"/databricks/python/lib/python3.9/site-packages/sklearn/utils/validation.py\", line 720, in check_array\n", " _assert_all_finite(array,\n", " File \"/databricks/python/lib/python3.9/site-packages/sklearn/utils/validation.py\", line 103, in _assert_all_finite\n", " raise ValueError(\n", "ValueError: Input contains NaN, infinity or a value too large for dtype('float32').\n", "\n", "\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:694)\n", "\tat org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:110)\n", "\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:647)\n", "\tat org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)\n", "\tat scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)\n", "\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)\n", "\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)\n", "\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n", "\tat org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:761)\n", "\tat org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)\n", "\tat org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:186)\n", "\tat org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)\n", "\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n", "\tat org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)\n", "\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n", "\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)\n", "\tat org.apache.spark.scheduler.Task.doRunTask(Task.scala:174)\n", "\tat org.apache.spark.scheduler.Task.$anonfun$run$4(Task.scala:137)\n", "\tat com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:125)\n", "\tat org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:137)\n", "\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n", "\tat org.apache.spark.scheduler.Task.run(Task.scala:96)\n", "\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:902)\n", "\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1697)\n", "\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:905)\n", "\tat scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n", "\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n", "\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:760)\n", "\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n", "\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n", "\tat java.lang.Thread.run(Thread.java:750)\n", "\n", "Driver stacktrace:\n", "\tat org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3381)\n", "\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3313)\n", "\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3304)\n", "\tat scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)\n", "\tat scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)\n", "\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)\n", "\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3304)\n", "\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1428)\n", "\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1428)\n", "\tat scala.Option.foreach(Option.scala:407)\n", "\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1428)\n", "\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3593)\n", "\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3531)\n", "\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3519)\n", "\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:51)\n", "\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$runJob$1(DAGScheduler.scala:1177)\n", "\tat scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n", "\tat com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)\n", "\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1165)\n", "\tat org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2746)\n", "\tat org.apache.spark.sql.execution.collect.Collector.$anonfun$runSparkJobs$1(Collector.scala:312)\n", "\tat scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n", "\tat com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)\n", "\tat org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:271)\n", "\tat org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:322)\n", "\tat org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:105)\n", "\tat org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:112)\n", "\tat org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:115)\n", "\tat org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:104)\n", "\tat org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:88)\n", "\tat org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$computeResult$1(ResultCacheManager.scala:527)\n", "\tat com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)\n", "\tat org.apache.spark.sql.execution.qrc.ResultCacheManager.collectResult$1(ResultCacheManager.scala:519)\n", "\tat org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$computeResult$2(ResultCacheManager.scala:537)\n", "\tat org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$withFinalPlanUpdateLegacy$1(AdaptiveSparkPlanExec.scala:771)\n", "\tat com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)\n", "\tat org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdateLegacy(AdaptiveSparkPlanExec.scala:769)\n", "\tat org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:764)\n", "\tat org.apache.spark.sql.execution.qrc.ResultCacheManager.computeResult(ResultCacheManager.scala:537)\n", "\tat org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:396)\n", "\tat scala.Option.getOrElse(Option.scala:189)\n", "\tat org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:390)\n", "\tat org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:292)\n", "\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollectResult$1(SparkPlan.scala:433)\n", "\tat com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)\n", "\tat org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:430)\n", "\tat org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3431)\n", "\tat org.apache.spark.sql.Dataset.$anonfun$collectResult$1(Dataset.scala:3422)\n", "\tat org.apache.spark.sql.Dataset.$anonfun$withAction$3(Dataset.scala:4297)\n", "\tat org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:773)\n", "\tat org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4295)\n", "\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:249)\n", "\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:399)\n", "\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:194)\n", "\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:985)\n", "\tat org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:148)\n", "\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:349)\n", "\tat org.apache.spark.sql.Dataset.withAction(Dataset.scala:4295)\n", "\tat org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3421)\n", "\tat com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:267)\n", "\tat com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:101)\n", "\tat com.databricks.backend.daemon.driver.PythonDriverLocalBase.generateTableResult(PythonDriverLocalBase.scala:723)\n", "\tat com.databricks.backend.daemon.driver.JupyterDriverLocal.computeListResultsItem(JupyterDriverLocal.scala:1424)\n", "\tat com.databricks.backend.daemon.driver.JupyterDriverLocal$JupyterEntryPoint.addCustomDisplayData(JupyterDriverLocal.scala:505)\n", "\tat sun.reflect.GeneratedMethodAccessor746.invoke(Unknown Source)\n", "\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n", "\tat java.lang.reflect.Method.invoke(Method.java:498)\n", "\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n", "\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)\n", "\tat py4j.Gateway.invoke(Gateway.java:306)\n", "\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n", "\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n", "\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)\n", "\tat py4j.ClientServerConnection.run(ClientServerConnection.java:115)\n", "\tat java.lang.Thread.run(Thread.java:750)\n", "Caused by: org.apache.spark.api.python.PythonException: 'ValueError: Input contains NaN, infinity or a value too large for dtype('float32').'. Full traceback below:\n", "Traceback (most recent call last):\n", " File \"/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py\", line 1025, in udf\n", " os.kill(scoring_server_proc.pid, signal.SIGTERM)\n", " File \"/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py\", line 866, in _predict_row_batch\n", " result = predict_fn(pdf)\n", " File \"/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py\", line 1006, in batch_predict_fn\n", " return loaded_model.predict(pdf)\n", " File \"/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py\", line 373, in predict\n", " return self._predict_fn(data)\n", " File \"/databricks/python/lib/python3.9/site-packages/sklearn/ensemble/_forest.py\", line 784, in predict\n", " X = self._validate_X_predict(X)\n", " File \"/databricks/python/lib/python3.9/site-packages/sklearn/ensemble/_forest.py\", line 422, in _validate_X_predict\n", " return self.estimators_[0]._validate_X_predict(X, check_input=True)\n", " File \"/databricks/python/lib/python3.9/site-packages/sklearn/tree/_classes.py\", line 407, in _validate_X_predict\n", " X = self._validate_data(X, dtype=DTYPE, accept_sparse=\"csr\",\n", " File \"/databricks/python/lib/python3.9/site-packages/sklearn/base.py\", line 421, in _validate_data\n", " X = check_array(X, **check_params)\n", " File \"/databricks/python/lib/python3.9/site-packages/sklearn/utils/validation.py\", line 63, in inner_f\n", " return f(*args, **kwargs)\n", " File \"/databricks/python/lib/python3.9/site-packages/sklearn/utils/validation.py\", line 720, in check_array\n", " _assert_all_finite(array,\n", " File \"/databricks/python/lib/python3.9/site-packages/sklearn/utils/validation.py\", line 103, in _assert_all_finite\n", " raise ValueError(\n", "ValueError: Input contains NaN, infinity or a value too large for dtype('float32').\n", "\n", "\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:694)\n", "\tat org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:110)\n", "\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:647)\n", "\tat org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)\n", "\tat scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)\n", "\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)\n", "\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)\n", "\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n", "\tat org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:761)\n", "\tat org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)\n", "\tat org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:186)\n", "\tat org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)\n", "\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n", "\tat org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)\n", "\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n", "\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)\n", "\tat org.apache.spark.scheduler.Task.doRunTask(Task.scala:174)\n", "\tat org.apache.spark.scheduler.Task.$anonfun$run$4(Task.scala:137)\n", "\tat com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:125)\n", "\tat org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:137)\n", "\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n", "\tat org.apache.spark.scheduler.Task.run(Task.scala:96)\n", "\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:902)\n", "\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1697)\n", "\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:905)\n", "\tat scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n", "\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n", "\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:760)\n", "\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n", "\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n", "\t... 1 more" ] }, "metadata": { "application/vnd.databricks.v1+output": { "arguments": {}, "data": "org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 431.0 failed 4 times, most recent failure: Lost task 0.3 in stage 431.0 (TID 1339) (10.139.64.4 executor driver): org.apache.spark.api.python.PythonException: 'ValueError: Input contains NaN, infinity or a value too large for dtype('float32').'. Full traceback below:\nTraceback (most recent call last):\n File \"/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py\", line 1025, in udf\n os.kill(scoring_server_proc.pid, signal.SIGTERM)\n File \"/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py\", line 866, in _predict_row_batch\n result = predict_fn(pdf)\n File \"/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py\", line 1006, in batch_predict_fn\n return loaded_model.predict(pdf)\n File \"/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py\", line 373, in predict\n return self._predict_fn(data)\n File \"/databricks/python/lib/python3.9/site-packages/sklearn/ensemble/_forest.py\", line 784, in predict\n X = self._validate_X_predict(X)\n File \"/databricks/python/lib/python3.9/site-packages/sklearn/ensemble/_forest.py\", line 422, in _validate_X_predict\n return self.estimators_[0]._validate_X_predict(X, check_input=True)\n File \"/databricks/python/lib/python3.9/site-packages/sklearn/tree/_classes.py\", line 407, in _validate_X_predict\n X = self._validate_data(X, dtype=DTYPE, accept_sparse=\"csr\",\n File \"/databricks/python/lib/python3.9/site-packages/sklearn/base.py\", line 421, in _validate_data\n X = check_array(X, **check_params)\n File \"/databricks/python/lib/python3.9/site-packages/sklearn/utils/validation.py\", line 63, in inner_f\n return f(*args, **kwargs)\n File \"/databricks/python/lib/python3.9/site-packages/sklearn/utils/validation.py\", line 720, in check_array\n _assert_all_finite(array,\n File \"/databricks/python/lib/python3.9/site-packages/sklearn/utils/validation.py\", line 103, in _assert_all_finite\n raise ValueError(\nValueError: Input contains NaN, infinity or a value too large for dtype('float32').\n\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:694)\n\tat org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:110)\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:647)\n\tat org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)\n\tat scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)\n\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:761)\n\tat org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)\n\tat org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:186)\n\tat org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)\n\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n\tat org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)\n\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)\n\tat org.apache.spark.scheduler.Task.doRunTask(Task.scala:174)\n\tat org.apache.spark.scheduler.Task.$anonfun$run$4(Task.scala:137)\n\tat com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:125)\n\tat org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:137)\n\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:96)\n\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:902)\n\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1697)\n\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:905)\n\tat scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:760)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:750)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3381)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3313)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3304)\n\tat scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)\n\tat scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3304)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1428)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1428)\n\tat scala.Option.foreach(Option.scala:407)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1428)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3593)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3531)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3519)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:51)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$runJob$1(DAGScheduler.scala:1177)\n\tat scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n\tat com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1165)\n\tat org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2746)\n\tat org.apache.spark.sql.execution.collect.Collector.$anonfun$runSparkJobs$1(Collector.scala:312)\n\tat scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n\tat com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)\n\tat org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:271)\n\tat org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:322)\n\tat org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:105)\n\tat org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:112)\n\tat org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:115)\n\tat org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:104)\n\tat org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:88)\n\tat org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$computeResult$1(ResultCacheManager.scala:527)\n\tat com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)\n\tat org.apache.spark.sql.execution.qrc.ResultCacheManager.collectResult$1(ResultCacheManager.scala:519)\n\tat org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$computeResult$2(ResultCacheManager.scala:537)\n\tat org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$withFinalPlanUpdateLegacy$1(AdaptiveSparkPlanExec.scala:771)\n\tat com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)\n\tat org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdateLegacy(AdaptiveSparkPlanExec.scala:769)\n\tat org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:764)\n\tat org.apache.spark.sql.execution.qrc.ResultCacheManager.computeResult(ResultCacheManager.scala:537)\n\tat org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:396)\n\tat scala.Option.getOrElse(Option.scala:189)\n\tat org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:390)\n\tat org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:292)\n\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollectResult$1(SparkPlan.scala:433)\n\tat com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)\n\tat org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:430)\n\tat org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3431)\n\tat org.apache.spark.sql.Dataset.$anonfun$collectResult$1(Dataset.scala:3422)\n\tat org.apache.spark.sql.Dataset.$anonfun$withAction$3(Dataset.scala:4297)\n\tat org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:773)\n\tat org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4295)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:249)\n\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:399)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:194)\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:985)\n\tat org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:148)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:349)\n\tat org.apache.spark.sql.Dataset.withAction(Dataset.scala:4295)\n\tat org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3421)\n\tat com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:267)\n\tat com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:101)\n\tat com.databricks.backend.daemon.driver.PythonDriverLocalBase.generateTableResult(PythonDriverLocalBase.scala:723)\n\tat com.databricks.backend.daemon.driver.JupyterDriverLocal.computeListResultsItem(JupyterDriverLocal.scala:1424)\n\tat com.databricks.backend.daemon.driver.JupyterDriverLocal$JupyterEntryPoint.addCustomDisplayData(JupyterDriverLocal.scala:505)\n\tat sun.reflect.GeneratedMethodAccessor746.invoke(Unknown Source)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)\n\tat py4j.Gateway.invoke(Gateway.java:306)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)\n\tat py4j.ClientServerConnection.run(ClientServerConnection.java:115)\n\tat java.lang.Thread.run(Thread.java:750)\nCaused by: org.apache.spark.api.python.PythonException: 'ValueError: Input contains NaN, infinity or a value too large for dtype('float32').'. Full traceback below:\nTraceback (most recent call last):\n File \"/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py\", line 1025, in udf\n os.kill(scoring_server_proc.pid, signal.SIGTERM)\n File \"/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py\", line 866, in _predict_row_batch\n result = predict_fn(pdf)\n File \"/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py\", line 1006, in batch_predict_fn\n return loaded_model.predict(pdf)\n File \"/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py\", line 373, in predict\n return self._predict_fn(data)\n File \"/databricks/python/lib/python3.9/site-packages/sklearn/ensemble/_forest.py\", line 784, in predict\n X = self._validate_X_predict(X)\n File \"/databricks/python/lib/python3.9/site-packages/sklearn/ensemble/_forest.py\", line 422, in _validate_X_predict\n return self.estimators_[0]._validate_X_predict(X, check_input=True)\n File \"/databricks/python/lib/python3.9/site-packages/sklearn/tree/_classes.py\", line 407, in _validate_X_predict\n X = self._validate_data(X, dtype=DTYPE, accept_sparse=\"csr\",\n File \"/databricks/python/lib/python3.9/site-packages/sklearn/base.py\", line 421, in _validate_data\n X = check_array(X, **check_params)\n File \"/databricks/python/lib/python3.9/site-packages/sklearn/utils/validation.py\", line 63, in inner_f\n return f(*args, **kwargs)\n File \"/databricks/python/lib/python3.9/site-packages/sklearn/utils/validation.py\", line 720, in check_array\n _assert_all_finite(array,\n File \"/databricks/python/lib/python3.9/site-packages/sklearn/utils/validation.py\", line 103, in _assert_all_finite\n raise ValueError(\nValueError: Input contains NaN, infinity or a value too large for dtype('float32').\n\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:694)\n\tat org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:110)\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:647)\n\tat org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)\n\tat scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)\n\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:761)\n\tat org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)\n\tat org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:186)\n\tat org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)\n\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n\tat org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)\n\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)\n\tat org.apache.spark.scheduler.Task.doRunTask(Task.scala:174)\n\tat org.apache.spark.scheduler.Task.$anonfun$run$4(Task.scala:137)\n\tat com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:125)\n\tat org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:137)\n\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:96)\n\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:902)\n\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1697)\n\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:905)\n\tat scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:760)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\t... 1 more\n", "errorSummary": "PythonException: 'ValueError: Input contains NaN, infinity or a value too large for dtype('float32').'. Full traceback below:", "errorTraceType": "html", "metadata": {}, "type": "ipynbError" } }, "output_type": "display_data" } ], "source": [ "# predict unknown diamond\n", "predictions_unknown_diamond = fs.score_batch(uri_all_features, diamond_unknown)\n", "display(predictions_unknown_diamond)" ] } ], "metadata": { "application/vnd.databricks.v1+notebook": { "dashboards": [], "language": "python", "notebookMetadata": { "mostRecentlyExecutedCommandWithImplicitDF": { "commandId": 1612971859527038, "dataframes": [ "_sqldf" ] }, "pythonIndentUnit": 2 }, "notebookName": "Databricks-ML-professional-S03a-Batch", "widgets": {} }, "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.10" } }, "nbformat": 4, "nbformat_minor": 4 }