{ "cells": [ { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "58fab4bb-231e-48cf-8ed4-fc15a1b22845", "showTitle": false, "title": "" } }, "source": [ "
This Notebook adds information related to the following requirements:
Download this notebook at format ipynb here.
\n", "Refers to the process of making predictions in a batch deployment scenario and storing the results for future reference or utilization. Let's break down the key components:
\n", "In practical terms, the process might involve running a batch job that takes a set of input data, applies a trained model to make predictions, and then stores these predictions in a database, file system, or another storage solution. This approach is common in scenarios where real-time processing is not critical, and predictions can be made on a periodic basis.
\n", "For example, in a recommendation system, batch deployment might involve processing user interactions over a day and generating personalized recommendations overnight. The computed recommendations would then be saved for use the next day when users interact with the system.
" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "1dcba302-be4c-4076-a92a-4bf0b7b4d2c4", "showTitle": false, "title": "" } }, "source": [ "\n", "In the context of batch deployment for machine learning models, where predictions are generated in bulk, it's common to save these predictions for later use.
For live serving, high-performance databases are often preferred for quick retrieval. However, in certain scenarios like populating emails, where rapid access may not be crucial, less performant data storage options, such as a blob store, can be identified as suitable solutions.
These storage solutions may not offer the highest performance but are chosen strategically based on the specific needs of use cases like email population, balancing considerations of performance and efficiency.
" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "1fb137c6-a812-4fe9-b723-aa07ef9aa2f0", "showTitle": false, "title": "" } }, "source": [ "\n", "Let's see two examples to illustrate this requirement:
\n", "Then both models will be loaded using mlflow.pyfunc.load_model
function and used the same way for prediction on test set.
Load data into a pandas dataframe (for the sake of simplicity, let's only keep numerical columns):
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "90352c13-b715-457a-8308-09e6c6844b1a", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "\n", " | carat | \n", "depth | \n", "table | \n", "price | \n", "x | \n", "y | \n", "z | \n", "
---|---|---|---|---|---|---|---|
26985 | \n", "2.01 | \n", "60.1 | \n", "61.0 | \n", "17068 | \n", "8.14 | \n", "8.06 | \n", "4.87 | \n", "
29197 | \n", "0.33 | \n", "59.0 | \n", "61.0 | \n", "694 | \n", "4.49 | \n", "4.56 | \n", "2.67 | \n", "
32340 | \n", "0.30 | \n", "62.1 | \n", "56.0 | \n", "789 | \n", "4.29 | \n", "4.31 | \n", "2.67 | \n", "
\n | carat | \ndepth | \ntable | \nprice | \nx | \ny | \nz | \n
---|---|---|---|---|---|---|---|
26985 | \n2.01 | \n60.1 | \n61.0 | \n17068 | \n8.14 | \n8.06 | \n4.87 | \n
29197 | \n0.33 | \n59.0 | \n61.0 | \n694 | \n4.49 | \n4.56 | \n2.67 | \n
32340 | \n0.30 | \n62.1 | \n56.0 | \n789 | \n4.29 | \n4.31 | \n2.67 | \n
Let's drop duplicates and separate into train set (67%) and test set (33%):
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "54c32173-999f-4935-95cc-9d68b05bcf84", "showTitle": false, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Number of rows test set: 17731\n", "Number of rows train set: 36001\n", "Sum of count rows of train and test set: 53732\n", "Total number of rows of initial dataframe: 53732\n" ] } ], "source": [ "diamonds_sdf = spark.createDataFrame(diamonds_df).dropDuplicates()\n", "#\n", "# Spark Dataframes\n", "test_sdf = diamonds_sdf.orderBy(rand()).limit(int(33*diamonds_sdf.count()/100))\n", "train_sdf = diamonds_sdf.subtract(test_sdf)\n", "#\n", "# Pandas Dataframes\n", "test_df = test_sdf.toPandas()\n", "train_df = train_sdf.toPandas()\n", "#\n", "print(f\"Number of rows test set: {test_sdf.count()}\")\n", "print(f\"Number of rows train set: {train_sdf.count()}\")\n", "print(f\"Sum of count rows of train and test set: {train_sdf.count() + test_sdf.count()}\")\n", "print(f\"Total number of rows of initial dataframe: {diamonds_sdf.count()}\")" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "86c30e4b-65a0-4e75-a7e7-155a5be096a3", "showTitle": false, "title": "" } }, "source": [ "Scikit-learn library:
\n", "mlflow.pyfunc.load_model
\n", " | predictions | \n", "
---|---|
0 | \n", "6771.932454 | \n", "
1 | \n", "10611.573560 | \n", "
2 | \n", "741.122107 | \n", "
3 | \n", "1805.671346 | \n", "
4 | \n", "1587.993408 | \n", "
\n | predictions | \n
---|---|
0 | \n6771.932454 | \n
1 | \n10611.573560 | \n
2 | \n741.122107 | \n
3 | \n1805.671346 | \n
4 | \n1587.993408 | \n
MLlib library: There is an additional step which is to convert input to vector using VectorAssembler
. Thus, we need a pipeline and we will log to MLflow the fitted pipeline.
mlflow.pyfunc.load_model
\n", " | predictions | \n", "
---|---|
0 | \n", "6711.408932 | \n", "
1 | \n", "9540.179628 | \n", "
2 | \n", "647.270082 | \n", "
3 | \n", "2388.329445 | \n", "
4 | \n", "1695.716464 | \n", "
\n | predictions | \n
---|---|
0 | \n6711.408932 | \n
1 | \n9540.179628 | \n
2 | \n647.270082 | \n
3 | \n2388.329445 | \n
4 | \n1695.716464 | \n
With the model trained using scikit-learn library, it is possible to load it and affect it to a Spark UDF function to make predictions:
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "e309f1ac-9d80-4c0c-85ac-7df9f314710c", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "price | prediction |
---|---|
4580 | 6771.9324540939115 |
8408 | 10611.573559702078 |
1103 | 741.122106786866 |
1332 | 1805.6713457303665 |
1293 | 1587.9934075468402 |
Z-Ordering: colocates related information in the same set of files
\n", "Z-Ordering is a form of multi-dimensional clustering that colocates related information in the same set of files. It reduces the amount of data that needs to be read. See more here.
\n", "Here after is an example of use of Z-ordering.
\n", "Let's first write a dataframe as a Delta table:
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "43526c33-a6e1-43d7-9520-41f7ab1f7558", "showTitle": false, "title": "" } }, "outputs": [], "source": [ "(train_sdf.write\n", " .format(\"delta\")\n", " .mode(\"overwrite\")\n", " .option(\"overwriteSchema\", \"true\")\n", " .saveAsTable(\"train_set_diamonds\"))" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "4a61070c-4099-4712-9c2a-8540c5afa9d9", "showTitle": false, "title": "" } }, "source": [ "Let's get table location:
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "d8164ea2-5abe-45de-9625-66297e69871f", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "col_name | data_type | comment |
---|---|---|
Location | dbfs:/user/hive/warehouse/train_set_diamonds |
Let's Z-order table by feature carat
:
Partitioning: stores data associated with different categorical values in different directories
\n", "Partition will create as many folders as there are distinct values in the specified column for partitioning. Thus, column with high cardinality are not recommanded as partition key.
Here after is an example of use of Partitioning.
\n", "Let's first reload the original dataframe and save it as a managed Delta table:
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "0ba3aed9-a669-4f36-bb12-818f98871a6a", "showTitle": false, "title": "" } }, "outputs": [], "source": [ "(spark.createDataFrame(sns.load_dataset('diamonds')).write\n", " .format(\"delta\")\n", " .mode(\"overwrite\")\n", " .option(\"overwriteSchema\", \"true\")\n", " .saveAsTable(\"diamonds_df_not_partitioned\"))" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "860d0033-92f9-44d1-87f6-c37eccf3b835", "showTitle": false, "title": "" } }, "source": [ "Let's have a look at the content of the delta table folder. We see that there are four parquet files an a folder _delta_log
:
col_name | data_type | comment |
---|---|---|
Location | dbfs:/user/hive/warehouse/diamonds_df_not_partitioned |
We can identify the feature cut
as a good candidate to be the partition key:
cut | count |
---|---|
Ideal | 21551 |
Premium | 13791 |
Very Good | 12082 |
Good | 4906 |
Fair | 1610 |
Let's partition using cut
as partition key:
Now let's have a look at the content of the partitioned table. We see there as many folders as there are distinct values in column cut
. This will speed-up requests.
score_batch
let's make predictions easily on a large amount of data at a time using features coming from feature store.
Let's have a look at an example to illustrate this requirement:
\n", "diamonds
dataset from Seaborn libraryscore_batch
1. Load dataset
\n", "diamonds
dataset from Seaborn libraryindex
column made of unique values. It will be used as primary key for the Feature Store.x
or X
... let's rename."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"application/vnd.databricks.v1+cell": {
"cellMetadata": {
"byteLimit": 2048000,
"rowLimit": 10000
},
"inputWidgets": {},
"nuid": "e3116320-dbb9-4aea-b8d4-b826acdfc39b",
"showTitle": false,
"title": ""
}
},
"outputs": [
{
"data": {
"text/html": [
"index | carat | cut | color | clarity | depth | table | price | x_r | y | z |
---|---|---|---|---|---|---|---|---|---|---|
0 | 0.23 | Ideal | E | SI2 | 61.5 | 55.0 | 326 | 3.95 | 3.98 | 2.43 |
1 | 0.21 | Premium | E | SI1 | 59.8 | 61.0 | 326 | 3.89 | 3.84 | 2.31 |
2 | 0.23 | Good | E | VS1 | 56.9 | 65.0 | 327 | 4.05 | 4.07 | 2.31 |
3 | 0.29 | Premium | I | VS2 | 62.4 | 58.0 | 334 | 4.2 | 4.23 | 2.63 |
4 | 0.31 | Good | J | SI2 | 63.3 | 58.0 | 335 | 4.34 | 4.35 | 2.75 |
2. Create Feature table in Feature Store
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "92306fc4-ca06-46bb-9aa2-f49eb4c3e6d2", "showTitle": false, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "2023/11/23 17:32:06 WARNING databricks.feature_store._compute_client._compute_client: Deleting a feature table can lead to unexpected failures in upstream producers and downstream consumers (models, endpoints, and scheduled jobs).\n", "2023/11/23 17:32:09 INFO databricks.feature_store._compute_client._compute_client: Created feature table 'hive_metastore.default.diamonds_fs'.\n" ] } ], "source": [ "# create a feature store client\n", "fs = feature_store.FeatureStoreClient()\n", "#\n", "# fs.drop_table(\"default.diamonds_fs\")\n", "#\n", "# create feature table - as only the scema is provided in the command below, it will only create the table structure without populating it with data\n", "result = fs.create_table(name=\"diamonds_fs\", # required\n", " primary_keys=[\"index\"], # required\n", " schema=diamonds_full.drop(\"price\").schema, # need either dataframe schema\n", " #df=diamonds_full, # or dataframe itself\n", " description=\"seaborn diamonds dataset\");" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "4e64855c-5e9e-4ebc-bd3a-168ec46d9758", "showTitle": false, "title": "" } }, "source": [ "3. Push preprocessed features to Feature table
\n", "(There's no preprocessing done there for the sake of simplicity. Ideally, features pushed to Feature Store should be processed and ready to be used for model training)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "dd3f95e8-ebae-474a-8372-dd3043a63f28", "showTitle": false, "title": "" } }, "outputs": [], "source": [ "fs.write_table(name=\"diamonds_fs\",\n", " df=diamonds_full.drop('price'),\n", " mode='merge')" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "9215ba4b-6e84-451a-ba95-d3e012c12ba5", "showTitle": false, "title": "" } }, "source": [ "After that, Feature table is available in Features menu as well the associated Delta table in Catalog menu.
\n", "4. Create train and test sets
\n", "Here, features are now available in Features Store. It is possible to load them from there to train a model. For this example, we will:
\n", "x_r
, y
, z
, carat
What will help to make the difference between the train set and test set is the Primary key column: index
.
Moreover, later for scoring by using the test set, we will need the initial target values from initial price
column. Thus, the columns needed for the train and test sets are: index
and price
.
index
we retrieve from Feature store the rows needed to train/test the modelsprice
we have the target used to train/evaluate the modelsprice | index |
---|---|
337 | 8 |
336 | 5 |
326 | 0 |
334 | 3 |
336 | 6 |
Let's create the Feature Store training sets.
\n", "x_r | y | z | carat | price |
---|---|---|---|---|
3.87 | 3.78 | 2.49 | 0.22 | 337 |
3.94 | 3.96 | 2.48 | 0.24 | 336 |
3.95 | 3.98 | 2.43 | 0.23 | 326 |
4.2 | 4.23 | 2.63 | 0.29 | 334 |
3.95 | 3.98 | 2.47 | 0.24 | 336 |
carat | depth | table | x_r | y | z | price |
---|---|---|---|---|---|---|
0.22 | 65.1 | 61.0 | 3.87 | 3.78 | 2.49 | 337 |
0.24 | 62.8 | 57.0 | 3.94 | 3.96 | 2.48 | 336 |
0.23 | 61.5 | 55.0 | 3.95 | 3.98 | 2.43 | 326 |
0.29 | 62.4 | 58.0 | 4.2 | 4.23 | 2.63 | 334 |
0.24 | 62.3 | 57.0 | 3.95 | 3.98 | 2.47 | 336 |
5. Prepare and train models
\n", "Training of two scikit-learn models based on different features sets. Input dataframes need to be pandas dataframes/series.
" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "1ddce2d7-694d-42c1-ae9a-478d026a351f", "showTitle": false, "title": "" } }, "source": [ "Model trained using the four features dataset:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "703f0b5c-bfe3-4c2e-868d-272162750573", "showTitle": false, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Out[36]: RandomForestRegressor()" ] } ], "source": [ "X_train_4 = train_set_4.drop(\"price\").toPandas()\n", "y_train_4 = train_set_4.toPandas()[\"price\"]\n", "#\n", "rf_4_model = RandomForestRegressor()\n", "#\n", "rf_4_model.fit(X_train_4, y_train_4)" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "0814a65e-5ff5-4c18-b53a-1fadff6ae268", "showTitle": false, "title": "" } }, "source": [ "Model trained using all numerical features dataset:" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "82e4e873-31bd-4c18-874a-f88ce8c54c82", "showTitle": false, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Out[37]: RandomForestRegressor()" ] } ], "source": [ "X_train_all = train_set_all.drop(\"price\").toPandas()\n", "y_train_all = train_set_all.toPandas()[\"price\"]\n", "#\n", "rf_all_model = RandomForestRegressor()\n", "#\n", "rf_all_model.fit(X_train_all, y_train_all)" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "838f0a82-3f68-4c36-b235-3ce2f873410a", "showTitle": false, "title": "" } }, "source": [ "6. Log models to associate them to features in Feature Store
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "cdd03ad9-eab5-420a-a198-dd2c90f71ef3", "showTitle": false, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Registered model 'trained_with_4_features' already exists. Creating a new version of this model...\n", "Created version '7' of model 'trained_with_4_features'.\n" ] } ], "source": [ "model_name_4_features = \"trained_with_4_features\"\n", "#\n", "fs.log_model(rf_4_model,\n", " artifact_path=model_name_4_features, # parameter required\n", " flavor=mlflow.sklearn, # parameter required\n", " training_set=train_set_4_features, # either training_set or feature_spec_path parameters required\n", " registered_model_name=model_name_4_features); # not required. However model will not be linked to features in features store until model is registered" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "e44704fb-e0dd-451b-803f-70f7ba6a06c0", "showTitle": false, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Registered model 'trained_with_all_features' already exists. Creating a new version of this model...\n", "Created version '6' of model 'trained_with_all_features'.\n" ] } ], "source": [ "model_name_all_features = \"trained_with_all_features\"\n", "#\n", "fs.log_model(rf_all_model,\n", " artifact_path=model_name_all_features, # parameter required\n", " flavor=mlflow.sklearn, # parameter required\n", " training_set=train_set_all_features, # either training_set or feature_spec_path parameters required\n", " registered_model_name=model_name_all_features); # not required. However model will not be linked to features in features store until model is registered" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "61497a33-a9a4-4bf9-88b2-02e0f63f8bc3", "showTitle": false, "title": "" } }, "source": [ "At this point we can see that models are associated with the features they were trained on in Features Store:
\n", "7. Score models on test set using score_batch
Model trained on four features:
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "a3dac6a5-0caa-4252-8295-40383c5592aa", "showTitle": false, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "runs:/45a6dbcde3a1464a84a61353caf60365/trained_with_4_features\n" ] }, { "data": { "text/html": [ "price | prediction |
---|---|
5146 | 6148.454523809524 |
998 | 1054.7966666666666 |
722 | 920.675 |
756 | 911.7239999999999 |
4312 | 4765.28 |
Score RMSE on test set for model trained with 4 features:
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "8df34d1d-3064-4c18-b8b7-a4ff9fb793a0", "showTitle": false, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "RMSE for model trained on 4 features: 1448.8825377496491\n" ] } ], "source": [ "print(\"RMSE for model trained on 4 features:\",\n", " mean_squared_error(predictions_df_4_features.toPandas()['price'], predictions_df_4_features.toPandas()['prediction'], squared=False))" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "10058824-419f-4ab9-b8d4-049a99b6db86", "showTitle": false, "title": "" } }, "source": [ "Model trained on all numerical features:
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "26c4c37f-55a2-463c-9eaa-a737070e550a", "showTitle": false, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "runs:/45a6dbcde3a1464a84a61353caf60365/trained_with_all_features\n" ] }, { "data": { "text/html": [ "price | prediction |
---|---|
3352 | 3267.8175 |
2281 | 1654.9560000000001 |
3601 | 5770.05 |
9032 | 8038.75 |
6126 | 6958.44 |
Score RMSE on test set for model trained with all numerical features:
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "97dab406-3bc9-4a07-9f51-02471704d3eb", "showTitle": false, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "1401.7197923544627\n" ] } ], "source": [ "print(mean_squared_error(predictions_df_all_features.toPandas()['price'], predictions_df_all_features.toPandas()['prediction'], squared=False))" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "48be5619-0c6d-4915-b1b5-590396271f26", "showTitle": false, "title": "" } }, "source": [ "Comparison of actual price and predicted price according to carat
for model trained using all numerical features, on a sample of 1000 random entries:
8. Case of new data
\n", "How to predict newly arriving data if there is no information on it in Feature Store? Need to update first the Feature Store with new data:
" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "2b86ebd5-e014-43e9-882b-3e8dde08b4e0", "showTitle": false, "title": "" } }, "source": [ "Let's create a new diamond data. Schema should match data in Feature Store:
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "49b297af-6a07-448a-9342-e09d5847cc28", "showTitle": false, "title": "" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- index: long (nullable = true)\n", " |-- carat: double (nullable = true)\n", " |-- cut: string (nullable = true)\n", " |-- color: string (nullable = true)\n", " |-- clarity: string (nullable = true)\n", " |-- depth: double (nullable = true)\n", " |-- table: double (nullable = true)\n", " |-- price: long (nullable = true)\n", " |-- x_r: double (nullable = true)\n", " |-- y: double (nullable = true)\n", " |-- z: double (nullable = true)\n", "\n" ] } ], "source": [ "diamonds_full.printSchema()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "f332ab5d-e77c-4880-bf10-1c0863f228a0", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "index | carat | cut | color | clarity | depth | table | price | x_r | y | z |
---|---|---|---|---|---|---|---|---|---|---|
88887777 | 2.0 | Good | E | VS1 | 40.0 | 64.0 | 326 | 4.14 | 3.5 | 2.1 |
index | price |
---|---|
88887777 | 4500 |
index |
---|
88887777 |
Now, update the Feature Store with the new diamond data:
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "ad9f4763-9446-41a4-b963-e51c0d6fcb08", "showTitle": false, "title": "" } }, "outputs": [], "source": [ "fs.write_table(name=\"diamonds_fs\",\n", " df=new_diamond,\n", " mode='merge')" ] }, { "cell_type": "markdown", "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": {}, "inputWidgets": {}, "nuid": "cdaea63c-b435-4f75-8cf0-aec1b2991b1b", "showTitle": false, "title": "" } }, "source": [ "We verify that score_batch
predicts either with/without the price of the new data, the only requirement is the primary key - in this particular case, column index
- of the new diamond data in the Feature Store:
index | price | carat | depth | table | x_r | y | z | prediction |
---|---|---|---|---|---|---|---|---|
88887777 | 4500 | 2.0 | 40.0 | 64.0 | 4.14 | 3.5 | 2.1 | 9639.403333333334 |
index | carat | depth | table | x_r | y | z | prediction |
---|---|---|---|---|---|---|---|
88887777 | 2.0 | 40.0 | 64.0 | 4.14 | 3.5 | 2.1 | 9639.403333333334 |
And verify that if a primary key is not found in the Feature Store, it results in an error:
" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "application/vnd.databricks.v1+cell": { "cellMetadata": { "byteLimit": 2048000, "rowLimit": 10000 }, "inputWidgets": {}, "nuid": "fac28dd7-81a6-4ea8-bec1-b779aaa14d15", "showTitle": false, "title": "" } }, "outputs": [ { "data": { "text/html": [ "\n", "org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 431.0 failed 4 times, most recent failure: Lost task 0.3 in stage 431.0 (TID 1339) (10.139.64.4 executor driver): org.apache.spark.api.python.PythonException: 'ValueError: Input contains NaN, infinity or a value too large for dtype('float32').'. Full traceback below:\n", "Traceback (most recent call last):\n", " File \"/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py\", line 1025, in udf\n", " os.kill(scoring_server_proc.pid, signal.SIGTERM)\n", " File \"/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py\", line 866, in _predict_row_batch\n", " result = predict_fn(pdf)\n", " File \"/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py\", line 1006, in batch_predict_fn\n", " return loaded_model.predict(pdf)\n", " File \"/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py\", line 373, in predict\n", " return self._predict_fn(data)\n", " File \"/databricks/python/lib/python3.9/site-packages/sklearn/ensemble/_forest.py\", line 784, in predict\n", " X = self._validate_X_predict(X)\n", " File \"/databricks/python/lib/python3.9/site-packages/sklearn/ensemble/_forest.py\", line 422, in _validate_X_predict\n", " return self.estimators_[0]._validate_X_predict(X, check_input=True)\n", " File \"/databricks/python/lib/python3.9/site-packages/sklearn/tree/_classes.py\", line 407, in _validate_X_predict\n", " X = self._validate_data(X, dtype=DTYPE, accept_sparse=\"csr\",\n", " File \"/databricks/python/lib/python3.9/site-packages/sklearn/base.py\", line 421, in _validate_data\n", " X = check_array(X, **check_params)\n", " File \"/databricks/python/lib/python3.9/site-packages/sklearn/utils/validation.py\", line 63, in inner_f\n", " return f(*args, **kwargs)\n", " File \"/databricks/python/lib/python3.9/site-packages/sklearn/utils/validation.py\", line 720, in check_array\n", " _assert_all_finite(array,\n", " File \"/databricks/python/lib/python3.9/site-packages/sklearn/utils/validation.py\", line 103, in _assert_all_finite\n", " raise ValueError(\n", "ValueError: Input contains NaN, infinity or a value too large for dtype('float32').\n", "\n", "\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:694)\n", "\tat org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:110)\n", "\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:647)\n", "\tat org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)\n", "\tat scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)\n", "\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)\n", "\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)\n", "\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n", "\tat org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:761)\n", "\tat org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)\n", "\tat org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:186)\n", "\tat org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)\n", "\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n", "\tat org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)\n", "\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n", "\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)\n", "\tat org.apache.spark.scheduler.Task.doRunTask(Task.scala:174)\n", "\tat org.apache.spark.scheduler.Task.$anonfun$run$4(Task.scala:137)\n", "\tat com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:125)\n", "\tat org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:137)\n", "\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n", "\tat org.apache.spark.scheduler.Task.run(Task.scala:96)\n", "\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:902)\n", "\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1697)\n", "\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:905)\n", "\tat scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n", "\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n", "\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:760)\n", "\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n", "\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n", "\tat java.lang.Thread.run(Thread.java:750)\n", "\n", "Driver stacktrace:\n", "\tat org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3381)\n", "\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3313)\n", "\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3304)\n", "\tat scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)\n", "\tat scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)\n", "\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)\n", "\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3304)\n", "\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1428)\n", "\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1428)\n", "\tat scala.Option.foreach(Option.scala:407)\n", "\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1428)\n", "\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3593)\n", "\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3531)\n", "\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3519)\n", "\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:51)\n", "\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$runJob$1(DAGScheduler.scala:1177)\n", "\tat scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n", "\tat com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)\n", "\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1165)\n", "\tat org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2746)\n", "\tat org.apache.spark.sql.execution.collect.Collector.$anonfun$runSparkJobs$1(Collector.scala:312)\n", "\tat scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n", "\tat com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)\n", "\tat org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:271)\n", "\tat org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:322)\n", "\tat org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:105)\n", "\tat org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:112)\n", "\tat org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:115)\n", "\tat org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:104)\n", "\tat org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:88)\n", "\tat org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$computeResult$1(ResultCacheManager.scala:527)\n", "\tat com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)\n", "\tat org.apache.spark.sql.execution.qrc.ResultCacheManager.collectResult$1(ResultCacheManager.scala:519)\n", "\tat org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$computeResult$2(ResultCacheManager.scala:537)\n", "\tat org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$withFinalPlanUpdateLegacy$1(AdaptiveSparkPlanExec.scala:771)\n", "\tat com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)\n", "\tat org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdateLegacy(AdaptiveSparkPlanExec.scala:769)\n", "\tat org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:764)\n", "\tat org.apache.spark.sql.execution.qrc.ResultCacheManager.computeResult(ResultCacheManager.scala:537)\n", "\tat org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:396)\n", "\tat scala.Option.getOrElse(Option.scala:189)\n", "\tat org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:390)\n", "\tat org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:292)\n", "\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollectResult$1(SparkPlan.scala:433)\n", "\tat com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)\n", "\tat org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:430)\n", "\tat org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3431)\n", "\tat org.apache.spark.sql.Dataset.$anonfun$collectResult$1(Dataset.scala:3422)\n", "\tat org.apache.spark.sql.Dataset.$anonfun$withAction$3(Dataset.scala:4297)\n", "\tat org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:773)\n", "\tat org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4295)\n", "\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:249)\n", "\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:399)\n", "\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:194)\n", "\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:985)\n", "\tat org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:148)\n", "\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:349)\n", "\tat org.apache.spark.sql.Dataset.withAction(Dataset.scala:4295)\n", "\tat org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3421)\n", "\tat com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:267)\n", "\tat com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:101)\n", "\tat com.databricks.backend.daemon.driver.PythonDriverLocalBase.generateTableResult(PythonDriverLocalBase.scala:723)\n", "\tat com.databricks.backend.daemon.driver.JupyterDriverLocal.computeListResultsItem(JupyterDriverLocal.scala:1424)\n", "\tat com.databricks.backend.daemon.driver.JupyterDriverLocal$JupyterEntryPoint.addCustomDisplayData(JupyterDriverLocal.scala:505)\n", "\tat sun.reflect.GeneratedMethodAccessor746.invoke(Unknown Source)\n", "\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n", "\tat java.lang.reflect.Method.invoke(Method.java:498)\n", "\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n", "\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)\n", "\tat py4j.Gateway.invoke(Gateway.java:306)\n", "\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n", "\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n", "\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)\n", "\tat py4j.ClientServerConnection.run(ClientServerConnection.java:115)\n", "\tat java.lang.Thread.run(Thread.java:750)\n", "Caused by: org.apache.spark.api.python.PythonException: 'ValueError: Input contains NaN, infinity or a value too large for dtype('float32').'. Full traceback below:\n", "Traceback (most recent call last):\n", " File \"/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py\", line 1025, in udf\n", " os.kill(scoring_server_proc.pid, signal.SIGTERM)\n", " File \"/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py\", line 866, in _predict_row_batch\n", " result = predict_fn(pdf)\n", " File \"/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py\", line 1006, in batch_predict_fn\n", " return loaded_model.predict(pdf)\n", " File \"/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py\", line 373, in predict\n", " return self._predict_fn(data)\n", " File \"/databricks/python/lib/python3.9/site-packages/sklearn/ensemble/_forest.py\", line 784, in predict\n", " X = self._validate_X_predict(X)\n", " File \"/databricks/python/lib/python3.9/site-packages/sklearn/ensemble/_forest.py\", line 422, in _validate_X_predict\n", " return self.estimators_[0]._validate_X_predict(X, check_input=True)\n", " File \"/databricks/python/lib/python3.9/site-packages/sklearn/tree/_classes.py\", line 407, in _validate_X_predict\n", " X = self._validate_data(X, dtype=DTYPE, accept_sparse=\"csr\",\n", " File \"/databricks/python/lib/python3.9/site-packages/sklearn/base.py\", line 421, in _validate_data\n", " X = check_array(X, **check_params)\n", " File \"/databricks/python/lib/python3.9/site-packages/sklearn/utils/validation.py\", line 63, in inner_f\n", " return f(*args, **kwargs)\n", " File \"/databricks/python/lib/python3.9/site-packages/sklearn/utils/validation.py\", line 720, in check_array\n", " _assert_all_finite(array,\n", " File \"/databricks/python/lib/python3.9/site-packages/sklearn/utils/validation.py\", line 103, in _assert_all_finite\n", " raise ValueError(\n", "ValueError: Input contains NaN, infinity or a value too large for dtype('float32').\n", "\n", "\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:694)\n", "\tat org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:110)\n", "\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:647)\n", "\tat org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)\n", "\tat scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)\n", "\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)\n", "\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)\n", "\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n", "\tat org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:761)\n", "\tat org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)\n", "\tat org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:186)\n", "\tat org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)\n", "\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n", "\tat org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)\n", "\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n", "\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)\n", "\tat org.apache.spark.scheduler.Task.doRunTask(Task.scala:174)\n", "\tat org.apache.spark.scheduler.Task.$anonfun$run$4(Task.scala:137)\n", "\tat com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:125)\n", "\tat org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:137)\n", "\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n", "\tat org.apache.spark.scheduler.Task.run(Task.scala:96)\n", "\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:902)\n", "\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1697)\n", "\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:905)\n", "\tat scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n", "\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n", "\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:760)\n", "\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n", "\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n", "\t... 1 more" ] }, "metadata": { "application/vnd.databricks.v1+output": { "arguments": {}, "data": "org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 431.0 failed 4 times, most recent failure: Lost task 0.3 in stage 431.0 (TID 1339) (10.139.64.4 executor driver): org.apache.spark.api.python.PythonException: 'ValueError: Input contains NaN, infinity or a value too large for dtype('float32').'. Full traceback below:\nTraceback (most recent call last):\n File \"/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py\", line 1025, in udf\n os.kill(scoring_server_proc.pid, signal.SIGTERM)\n File \"/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py\", line 866, in _predict_row_batch\n result = predict_fn(pdf)\n File \"/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py\", line 1006, in batch_predict_fn\n return loaded_model.predict(pdf)\n File \"/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py\", line 373, in predict\n return self._predict_fn(data)\n File \"/databricks/python/lib/python3.9/site-packages/sklearn/ensemble/_forest.py\", line 784, in predict\n X = self._validate_X_predict(X)\n File \"/databricks/python/lib/python3.9/site-packages/sklearn/ensemble/_forest.py\", line 422, in _validate_X_predict\n return self.estimators_[0]._validate_X_predict(X, check_input=True)\n File \"/databricks/python/lib/python3.9/site-packages/sklearn/tree/_classes.py\", line 407, in _validate_X_predict\n X = self._validate_data(X, dtype=DTYPE, accept_sparse=\"csr\",\n File \"/databricks/python/lib/python3.9/site-packages/sklearn/base.py\", line 421, in _validate_data\n X = check_array(X, **check_params)\n File \"/databricks/python/lib/python3.9/site-packages/sklearn/utils/validation.py\", line 63, in inner_f\n return f(*args, **kwargs)\n File \"/databricks/python/lib/python3.9/site-packages/sklearn/utils/validation.py\", line 720, in check_array\n _assert_all_finite(array,\n File \"/databricks/python/lib/python3.9/site-packages/sklearn/utils/validation.py\", line 103, in _assert_all_finite\n raise ValueError(\nValueError: Input contains NaN, infinity or a value too large for dtype('float32').\n\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:694)\n\tat org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:110)\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:647)\n\tat org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)\n\tat scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)\n\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:761)\n\tat org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)\n\tat org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:186)\n\tat org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)\n\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n\tat org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)\n\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)\n\tat org.apache.spark.scheduler.Task.doRunTask(Task.scala:174)\n\tat org.apache.spark.scheduler.Task.$anonfun$run$4(Task.scala:137)\n\tat com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:125)\n\tat org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:137)\n\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:96)\n\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:902)\n\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1697)\n\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:905)\n\tat scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:760)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:750)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3381)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3313)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3304)\n\tat scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)\n\tat scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3304)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1428)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1428)\n\tat scala.Option.foreach(Option.scala:407)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1428)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3593)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3531)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3519)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:51)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$runJob$1(DAGScheduler.scala:1177)\n\tat scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n\tat com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1165)\n\tat org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2746)\n\tat org.apache.spark.sql.execution.collect.Collector.$anonfun$runSparkJobs$1(Collector.scala:312)\n\tat scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n\tat com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)\n\tat org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:271)\n\tat org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:322)\n\tat org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:105)\n\tat org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:112)\n\tat org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:115)\n\tat org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:104)\n\tat org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:88)\n\tat org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$computeResult$1(ResultCacheManager.scala:527)\n\tat com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)\n\tat org.apache.spark.sql.execution.qrc.ResultCacheManager.collectResult$1(ResultCacheManager.scala:519)\n\tat org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$computeResult$2(ResultCacheManager.scala:537)\n\tat org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$withFinalPlanUpdateLegacy$1(AdaptiveSparkPlanExec.scala:771)\n\tat com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)\n\tat org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdateLegacy(AdaptiveSparkPlanExec.scala:769)\n\tat org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:764)\n\tat org.apache.spark.sql.execution.qrc.ResultCacheManager.computeResult(ResultCacheManager.scala:537)\n\tat org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:396)\n\tat scala.Option.getOrElse(Option.scala:189)\n\tat org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:390)\n\tat org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:292)\n\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollectResult$1(SparkPlan.scala:433)\n\tat com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)\n\tat org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:430)\n\tat org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3431)\n\tat org.apache.spark.sql.Dataset.$anonfun$collectResult$1(Dataset.scala:3422)\n\tat org.apache.spark.sql.Dataset.$anonfun$withAction$3(Dataset.scala:4297)\n\tat org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:773)\n\tat org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4295)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:249)\n\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:399)\n\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:194)\n\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:985)\n\tat org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:148)\n\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:349)\n\tat org.apache.spark.sql.Dataset.withAction(Dataset.scala:4295)\n\tat org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3421)\n\tat com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:267)\n\tat com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:101)\n\tat com.databricks.backend.daemon.driver.PythonDriverLocalBase.generateTableResult(PythonDriverLocalBase.scala:723)\n\tat com.databricks.backend.daemon.driver.JupyterDriverLocal.computeListResultsItem(JupyterDriverLocal.scala:1424)\n\tat com.databricks.backend.daemon.driver.JupyterDriverLocal$JupyterEntryPoint.addCustomDisplayData(JupyterDriverLocal.scala:505)\n\tat sun.reflect.GeneratedMethodAccessor746.invoke(Unknown Source)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)\n\tat py4j.Gateway.invoke(Gateway.java:306)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)\n\tat py4j.ClientServerConnection.run(ClientServerConnection.java:115)\n\tat java.lang.Thread.run(Thread.java:750)\nCaused by: org.apache.spark.api.python.PythonException: 'ValueError: Input contains NaN, infinity or a value too large for dtype('float32').'. Full traceback below:\nTraceback (most recent call last):\n File \"/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py\", line 1025, in udf\n os.kill(scoring_server_proc.pid, signal.SIGTERM)\n File \"/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py\", line 866, in _predict_row_batch\n result = predict_fn(pdf)\n File \"/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py\", line 1006, in batch_predict_fn\n return loaded_model.predict(pdf)\n File \"/databricks/python/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py\", line 373, in predict\n return self._predict_fn(data)\n File \"/databricks/python/lib/python3.9/site-packages/sklearn/ensemble/_forest.py\", line 784, in predict\n X = self._validate_X_predict(X)\n File \"/databricks/python/lib/python3.9/site-packages/sklearn/ensemble/_forest.py\", line 422, in _validate_X_predict\n return self.estimators_[0]._validate_X_predict(X, check_input=True)\n File \"/databricks/python/lib/python3.9/site-packages/sklearn/tree/_classes.py\", line 407, in _validate_X_predict\n X = self._validate_data(X, dtype=DTYPE, accept_sparse=\"csr\",\n File \"/databricks/python/lib/python3.9/site-packages/sklearn/base.py\", line 421, in _validate_data\n X = check_array(X, **check_params)\n File \"/databricks/python/lib/python3.9/site-packages/sklearn/utils/validation.py\", line 63, in inner_f\n return f(*args, **kwargs)\n File \"/databricks/python/lib/python3.9/site-packages/sklearn/utils/validation.py\", line 720, in check_array\n _assert_all_finite(array,\n File \"/databricks/python/lib/python3.9/site-packages/sklearn/utils/validation.py\", line 103, in _assert_all_finite\n raise ValueError(\nValueError: Input contains NaN, infinity or a value too large for dtype('float32').\n\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:694)\n\tat org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:110)\n\tat org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:647)\n\tat org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)\n\tat scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)\n\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:761)\n\tat org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)\n\tat org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:186)\n\tat org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)\n\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n\tat org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)\n\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)\n\tat org.apache.spark.scheduler.Task.doRunTask(Task.scala:174)\n\tat org.apache.spark.scheduler.Task.$anonfun$run$4(Task.scala:137)\n\tat com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:125)\n\tat org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:137)\n\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:96)\n\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:902)\n\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1697)\n\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:905)\n\tat scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)\n\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:760)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\t... 1 more\n", "errorSummary": "PythonException: 'ValueError: Input contains NaN, infinity or a value too large for dtype('float32').'. Full traceback below:", "errorTraceType": "html", "metadata": {}, "type": "ipynbError" } }, "output_type": "display_data" } ], "source": [ "# predict unknown diamond\n", "predictions_unknown_diamond = fs.score_batch(uri_all_features, diamond_unknown)\n", "display(predictions_unknown_diamond)" ] } ], "metadata": { "application/vnd.databricks.v1+notebook": { "dashboards": [], "language": "python", "notebookMetadata": { "mostRecentlyExecutedCommandWithImplicitDF": { "commandId": 1612971859527038, "dataframes": [ "_sqldf" ] }, "pythonIndentUnit": 2 }, "notebookName": "Databricks-ML-professional-S03a-Batch", "widgets": {} }, "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.10" } }, "nbformat": 4, "nbformat_minor": 4 }