Databricks-ML-professional-S01a-Data-management
This Notebook adds information related to the following requirements:
Data Management:
- Read and write a Delta table
- View Delta table history and load a previous version of a Delta table
- Create, overwrite, merge, and read Feature Store tables in machine learning workflows
Download this notebook at format ipynb here.
import pandas as pd
import seaborn as sns
#
from pyspark.sql.functions import *
#
from databricks.feature_store import FeatureStoreClient, feature_table
taxis_df = sns.load_dataset("taxis")
#
taxis_sdf = spark.createDataFrame(taxis_df)
#
display(taxis_sdf.limit(5))
pickup | dropoff | passengers | distance | fare | tip | tolls | total | color | payment | pickup_zone | dropoff_zone | pickup_borough | dropoff_borough |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
2019-03-23 20:21:09 | 2019-03-23 20:27:24 | 1 | 1.6 | 7.0 | 2.15 | 0.0 | 12.95 | yellow | credit card | Lenox Hill West | UN/Turtle Bay South | Manhattan | Manhattan |
2019-03-04 16:11:55 | 2019-03-04 16:19:00 | 1 | 0.79 | 5.0 | 0.0 | 0.0 | 9.3 | yellow | cash | Upper West Side South | Upper West Side South | Manhattan | Manhattan |
2019-03-27 17:53:01 | 2019-03-27 18:00:25 | 1 | 1.37 | 7.5 | 2.36 | 0.0 | 14.16 | yellow | credit card | Alphabet City | West Village | Manhattan | Manhattan |
2019-03-10 01:23:59 | 2019-03-10 01:49:51 | 1 | 7.7 | 27.0 | 6.15 | 0.0 | 36.95 | yellow | credit card | Hudson Sq | Yorkville West | Manhattan | Manhattan |
2019-03-30 13:27:42 | 2019-03-30 13:37:14 | 3 | 2.16 | 9.0 | 1.1 | 0.0 | 13.4 | yellow | credit card | Midtown East | Yorkville West | Manhattan | Manhattan |
Dataframes are saved by default as managed delta tables when the location is not specified:
(taxis_sdf.write
.mode("overwrite")
.option("overwriteSchema", "True")
.format("delta")
.saveAsTable("taxis_sdf"))
When location is specified, they are external delta tables:
(taxis_sdf.write
.mode("overwrite")
.option("overwriteSchema", "True")
.format("delta")
.save("/temp/taxis_sdf"))
Managed delta tables can be listed with the following command:
display(spark.sql("show tables").limit(5))
database | tableName | isTemporary |
---|---|---|
default | amsterdam_airbnb_df | false |
default | csv_stud | false |
default | diamonds | false |
default | diamonds_df_not_partitioned | false |
default | diamonds_df_partitioned | false |
Delta tables can be read this way:
taxis_sdf = spark.table("taxis_sdf") # read managed delta table
taxi_sdf_ext = spark.read.load("/temp/taxis_sdf") # read external delta table
display(taxi_sdf_ext.limit(5))
pickup | dropoff | passengers | distance | fare | tip | tolls | total | color | payment | pickup_zone | dropoff_zone | pickup_borough | dropoff_borough |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
2019-03-23 20:21:09 | 2019-03-23 20:27:24 | 1 | 1.6 | 7.0 | 2.15 | 0.0 | 12.95 | yellow | credit card | Lenox Hill West | UN/Turtle Bay South | Manhattan | Manhattan |
2019-03-04 16:11:55 | 2019-03-04 16:19:00 | 1 | 0.79 | 5.0 | 0.0 | 0.0 | 9.3 | yellow | cash | Upper West Side South | Upper West Side South | Manhattan | Manhattan |
2019-03-27 17:53:01 | 2019-03-27 18:00:25 | 1 | 1.37 | 7.5 | 2.36 | 0.0 | 14.16 | yellow | credit card | Alphabet City | West Village | Manhattan | Manhattan |
2019-03-10 01:23:59 | 2019-03-10 01:49:51 | 1 | 7.7 | 27.0 | 6.15 | 0.0 | 36.95 | yellow | credit card | Hudson Sq | Yorkville West | Manhattan | Manhattan |
2019-03-30 13:27:42 | 2019-03-30 13:37:14 | 3 | 2.16 | 9.0 | 1.1 | 0.0 | 13.4 | yellow | credit card | Midtown East | Yorkville West | Manhattan | Manhattan |
Let's do a small modification to the table and save it again:
# remove nulls for payment column
taxis_sdf = taxis_sdf.filter("payment is not null")
#
# save the change to managed delta table
(taxis_sdf.write
.mode("overwrite")
.option("overwriteSchema", "True")
.format("delta")
.saveAsTable("taxis_sdf"))
#
# save the change to external delta table
(taxis_sdf.write
.mode("overwrite")
.option("overwriteSchema", "True")
.format("delta")
.save("/temp/taxis_sdf"))
See changes in managed table:
spark.sql("DESCRIBE HISTORY taxis_sdf")
See changes in external table:
spark.sql("DESCRIBE HISTORY '/temp/taxis_sdf/'")
The following command lets load a previous version. We see that loading version 0
of table get null values for column payment
back:
display(spark.read
.format("delta")
.option("versionAsOf", 0)
.table("taxis_sdf")
.filter("payment is null")
.limit(5))
#
# Or for an external delta table
display(spark.read
.format("delta")
.option("versionAsOf", 0)
.load("/temp/taxis_sdf")
.filter("payment is null")
.limit(5))
pickup | dropoff | passengers | distance | fare | tip | tolls | total | color | payment | pickup_zone | dropoff_zone | pickup_borough | dropoff_borough |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
2019-03-08 02:56:38 | 2019-03-08 03:07:24 | 1 | 2.4 | 10.5 | 0.0 | 0.0 | 14.3 | yellow | null | Murray Hill | West Village | Manhattan | Manhattan |
2019-03-02 19:01:36 | 2019-03-02 19:08:46 | 0 | 1.4 | 7.0 | 0.0 | 0.0 | 10.3 | yellow | null | Upper East Side South | Murray Hill | Manhattan | Manhattan |
2019-03-23 11:07:11 | 2019-03-23 11:32:46 | 1 | 0.0 | 19.0 | 0.0 | 0.0 | 22.3 | yellow | null | West Village | Upper East Side South | Manhattan | Manhattan |
2019-03-15 00:10:38 | 2019-03-15 00:21:39 | 1 | 3.1 | 12.5 | 0.0 | 0.0 | 16.3 | yellow | null | West Chelsea/Hudson Yards | Upper West Side South | Manhattan | Manhattan |
2019-03-30 11:38:20 | 2019-03-30 11:40:45 | 1 | 0.2 | 3.5 | 0.0 | 0.0 | 6.8 | yellow | null | Upper West Side North | Upper West Side North | Manhattan | Manhattan |
pickup | dropoff | passengers | distance | fare | tip | tolls | total | color | payment | pickup_zone | dropoff_zone | pickup_borough | dropoff_borough |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
2019-03-08 02:56:38 | 2019-03-08 03:07:24 | 1 | 2.4 | 10.5 | 0.0 | 0.0 | 14.3 | yellow | null | Murray Hill | West Village | Manhattan | Manhattan |
2019-03-02 19:01:36 | 2019-03-02 19:08:46 | 0 | 1.4 | 7.0 | 0.0 | 0.0 | 10.3 | yellow | null | Upper East Side South | Murray Hill | Manhattan | Manhattan |
2019-03-23 11:07:11 | 2019-03-23 11:32:46 | 1 | 0.0 | 19.0 | 0.0 | 0.0 | 22.3 | yellow | null | West Village | Upper East Side South | Manhattan | Manhattan |
2019-03-15 00:10:38 | 2019-03-15 00:21:39 | 1 | 3.1 | 12.5 | 0.0 | 0.0 | 16.3 | yellow | null | West Chelsea/Hudson Yards | Upper West Side South | Manhattan | Manhattan |
2019-03-30 11:38:20 | 2019-03-30 11:40:45 | 1 | 0.2 | 3.5 | 0.0 | 0.0 | 6.8 | yellow | null | Upper West Side North | Upper West Side North | Manhattan | Manhattan |
Let's add an index to the table:
taxis_sdf = taxis_sdf.withColumn("id", monotonically_increasing_id())
A Catalog and schema must exist. Then if one them is missing, it is necessary to create it/them. For the example here, let's use the default existing Catalog hive_metastore
and create a Schema in it:
# Create a catalog
spark.sql("USE CATALOG hive_metastore")
spark.sql("CREATE SCHEMA IF NOT EXISTS seaborn_db")
Out[77]: DataFrame[]
The following command is to create and populate a feature store table in schema seaborn_df
from hive_metastore
Catalog:
fs = FeatureStoreClient()
#
customer_feature_table = fs.create_table(
name='hive_metastore.seaborn_db.taxidataset', # format is <catalog_name>.<schema_name>.<table_name>
primary_keys='id', # required
schema=taxis_sdf.schema, # either schema or df parameter is required - if df is provided, the feature store table will be automatically populated with the data, otherwise only the structure of the table will be created, it will need to be populated later
df = taxis_sdf, # either schema or df parameter is required - if df is provided, the feature store table will be automatically populated with the data, otherwise only the structure of the table will be created, it will need to be populated later
description='Seaborn taxi dataset features'
)
2023/11/22 16:27:14 WARNING databricks.feature_store._compute_client._compute_client: The feature table "hive_metastore.seaborn_db.taxidataset" already exists. Use "FeatureStoreClient.write_table" API to write to the feature table.
The previously created table in Feature Store can be deleted this way:
fs.drop_table(name='seaborn_db.taxidataset')
2023/11/22 16:27:16 WARNING databricks.feature_store._compute_client._compute_client: Deleting a feature table can lead to unexpected failures in upstream producers and downstream consumers (models, endpoints, and scheduled jobs).
Alternatively, an empty table structure can be created in the Feature store to be populated later. The command below only creates the structure of the table in Feature Store:
customer_feature_table = fs.create_table(
name='hive_metastore.seaborn_db.taxidataset', # format is <catalog_name>.<schema_name>.<table_name>
primary_keys='id', # required
schema=taxis_sdf.schema, # here only schema is provided, the feature table is created empty
description='Seaborn taxi dataset features'
)
2023/11/22 16:27:21 INFO databricks.feature_store._compute_client._compute_client: Created feature table 'hive_metastore.seaborn_db.taxidataset'.
Next step is to write data in it:
fs.write_table(
df=taxis_sdf,
name='seaborn_db.taxidataset',
mode='merge' # mode = 'overwrite' could also be used in this particular case
)
Finally, feature store table can be read this way:
display(fs.read_table(name='seaborn_db.taxidataset').limit(5))
pickup | dropoff | passengers | distance | fare | tip | tolls | total | color | payment | pickup_zone | dropoff_zone | pickup_borough | dropoff_borough | id |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
2019-03-17 13:16:13 | 2019-03-17 13:40:32 | 1 | 2.9 | 17.0 | 0.0 | 0.0 | 20.3 | yellow | cash | Clinton East | Lenox Hill East | Manhattan | Manhattan | 0 |
2019-03-22 08:43:39 | 2019-03-22 08:51:49 | 1 | 0.55 | 6.5 | 1.96 | 0.0 | 11.76 | yellow | credit card | Union Sq | Murray Hill | Manhattan | Manhattan | 1 |
2019-03-02 21:42:07 | 2019-03-02 22:01:15 | 1 | 2.5 | 13.5 | 3.45 | 0.0 | 20.75 | yellow | credit card | SoHo | Murray Hill | Manhattan | Manhattan | 2 |
2019-03-13 21:49:00 | 2019-03-13 22:02:04 | 1 | 2.68 | 11.0 | 1.0 | 0.0 | 15.8 | yellow | credit card | East Chelsea | Alphabet City | Manhattan | Manhattan | 3 |
2019-03-03 12:55:38 | 2019-03-03 13:01:05 | 1 | 0.85 | 5.5 | 0.0 | 0.0 | 8.8 | yellow | cash | Midtown Center | Murray Hill | Manhattan | Manhattan | 4 |