<h4 style="font-variant-caps: small-caps;font-size:35pt;">Databricks-ML-professional-S01a-Data-management</h4>

<div style='background-color:black;border-radius:5px;border-top:1px solid'></div>
<br/>
<p>This Notebook adds information related to the following requirements:</p><br/>
<b>Data Management:</b>
<ul>
<li>Read and write a Delta table</li>
<li>View Delta table history and load a previous version of a Delta table</li>
<li>Create, overwrite, merge, and read Feature Store tables in machine learning workflows</li>
</ul>
<br/>
<p><b>Download this notebook at format ipynb <a href="Databricks-ML-professional-S01a-Data-management.ipynb">here</a>.</b></p>
<br/>
<div style='background-color:black;border-radius:5px;border-top:1px solid'></div>

<div style='background-color:rgba(30, 144, 255, 0.1);border-radius:5px;padding:2px;'>
<span style="font-variant-caps: small-caps;font-weight:700">1. Import libraries</span></div>

In [None]:
import pandas as pd
import seaborn as sns
#
from pyspark.sql.functions import *
#
from databricks.feature_store import FeatureStoreClient, feature_table

<div style='background-color:rgba(30, 144, 255, 0.1);border-radius:5px;padding:2px;'>
<span style="font-variant-caps: small-caps;font-weight:700">2. Load dataset, convert to Spark DataFrame</span></div>

In [None]:
taxis_df = sns.load_dataset("taxis")
#
taxis_sdf = spark.createDataFrame(taxis_df)
#
display(taxis_sdf.limit(5))

pickup,dropoff,passengers,distance,fare,tip,tolls,total,color,payment,pickup_zone,dropoff_zone,pickup_borough,dropoff_borough
2019-03-23 20:21:09,2019-03-23 20:27:24,1,1.6,7.0,2.15,0.0,12.95,yellow,credit card,Lenox Hill West,UN/Turtle Bay South,Manhattan,Manhattan
2019-03-04 16:11:55,2019-03-04 16:19:00,1,0.79,5.0,0.0,0.0,9.3,yellow,cash,Upper West Side South,Upper West Side South,Manhattan,Manhattan
2019-03-27 17:53:01,2019-03-27 18:00:25,1,1.37,7.5,2.36,0.0,14.16,yellow,credit card,Alphabet City,West Village,Manhattan,Manhattan
2019-03-10 01:23:59,2019-03-10 01:49:51,1,7.7,27.0,6.15,0.0,36.95,yellow,credit card,Hudson Sq,Yorkville West,Manhattan,Manhattan
2019-03-30 13:27:42,2019-03-30 13:37:14,3,2.16,9.0,1.1,0.0,13.4,yellow,credit card,Midtown East,Yorkville West,Manhattan,Manhattan


<a id="readwritedeltatable"></a>
<div style='background-color:rgba(30, 144, 255, 0.1);border-radius:5px;padding:2px;'>
<span style="font-variant-caps: small-caps;font-weight:700">3. Read and write a Delta table</span></div>

<p>Dataframes are saved by default as <b>managed delta tables</b> when the location is not specified:</p>

In [None]:
(taxis_sdf.write
          .mode("overwrite")
          .option("overwriteSchema", "True")
          .format("delta")
          .saveAsTable("taxis_sdf"))

<p>When location is specified, they are <b>external delta tables</b>:</p>

In [None]:
(taxis_sdf.write
          .mode("overwrite")
          .option("overwriteSchema", "True")
          .format("delta")
          .save("/temp/taxis_sdf"))

<p><b>Managed delta tables</b> can be listed with the following command:</p>

In [None]:
display(spark.sql("show tables").limit(5))

database,tableName,isTemporary
default,amsterdam_airbnb_df,False
default,csv_stud,False
default,diamonds,False
default,diamonds_df_not_partitioned,False
default,diamonds_df_partitioned,False


<p>Delta tables can be read this way:</p>

In [None]:
taxis_sdf    = spark.table("taxis_sdf")           # read managed delta table
taxi_sdf_ext = spark.read.load("/temp/taxis_sdf") # read external delta table

In [None]:
display(taxi_sdf_ext.limit(5))

pickup,dropoff,passengers,distance,fare,tip,tolls,total,color,payment,pickup_zone,dropoff_zone,pickup_borough,dropoff_borough
2019-03-23 20:21:09,2019-03-23 20:27:24,1,1.6,7.0,2.15,0.0,12.95,yellow,credit card,Lenox Hill West,UN/Turtle Bay South,Manhattan,Manhattan
2019-03-04 16:11:55,2019-03-04 16:19:00,1,0.79,5.0,0.0,0.0,9.3,yellow,cash,Upper West Side South,Upper West Side South,Manhattan,Manhattan
2019-03-27 17:53:01,2019-03-27 18:00:25,1,1.37,7.5,2.36,0.0,14.16,yellow,credit card,Alphabet City,West Village,Manhattan,Manhattan
2019-03-10 01:23:59,2019-03-10 01:49:51,1,7.7,27.0,6.15,0.0,36.95,yellow,credit card,Hudson Sq,Yorkville West,Manhattan,Manhattan
2019-03-30 13:27:42,2019-03-30 13:37:14,3,2.16,9.0,1.1,0.0,13.4,yellow,credit card,Midtown East,Yorkville West,Manhattan,Manhattan


<a id="viewdeltahistory"></a>
<div style='background-color:rgba(30, 144, 255, 0.1);border-radius:5px;padding:2px;'>
<span style="font-variant-caps: small-caps;font-weight:700">4. View Delta table history and load a previous version of a Delta table</span></div>

<p>Let's do a small modification to the table and save it again:</p>

In [None]:
# remove nulls for payment column
taxis_sdf = taxis_sdf.filter("payment is not null")
#
# save the change to managed delta table
(taxis_sdf.write
          .mode("overwrite")
          .option("overwriteSchema", "True")
          .format("delta")
          .saveAsTable("taxis_sdf"))
#
# save the change to external delta table
(taxis_sdf.write
          .mode("overwrite")
          .option("overwriteSchema", "True")
          .format("delta")
          .save("/temp/taxis_sdf"))

<p>See changes in <b>managed table</b>:</p>

In [None]:
spark.sql("DESCRIBE HISTORY taxis_sdf")

<p>See changes in <b>external table</b>:</p>

In [None]:
spark.sql("DESCRIBE HISTORY '/temp/taxis_sdf/'")

<p>The following command lets load a previous version. We see that loading version <code>0</code> of table get null values for column <code>payment</code> back:</p>

In [None]:
display(spark.read
        .format("delta")
        .option("versionAsOf", 0)
        .table("taxis_sdf")
        .filter("payment is null")
        .limit(5))
#
# Or for an external delta table
display(spark.read
        .format("delta")
        .option("versionAsOf", 0)
        .load("/temp/taxis_sdf")
        .filter("payment is null")
        .limit(5))

pickup,dropoff,passengers,distance,fare,tip,tolls,total,color,payment,pickup_zone,dropoff_zone,pickup_borough,dropoff_borough
2019-03-08 02:56:38,2019-03-08 03:07:24,1,2.4,10.5,0.0,0.0,14.3,yellow,,Murray Hill,West Village,Manhattan,Manhattan
2019-03-02 19:01:36,2019-03-02 19:08:46,0,1.4,7.0,0.0,0.0,10.3,yellow,,Upper East Side South,Murray Hill,Manhattan,Manhattan
2019-03-23 11:07:11,2019-03-23 11:32:46,1,0.0,19.0,0.0,0.0,22.3,yellow,,West Village,Upper East Side South,Manhattan,Manhattan
2019-03-15 00:10:38,2019-03-15 00:21:39,1,3.1,12.5,0.0,0.0,16.3,yellow,,West Chelsea/Hudson Yards,Upper West Side South,Manhattan,Manhattan
2019-03-30 11:38:20,2019-03-30 11:40:45,1,0.2,3.5,0.0,0.0,6.8,yellow,,Upper West Side North,Upper West Side North,Manhattan,Manhattan


pickup,dropoff,passengers,distance,fare,tip,tolls,total,color,payment,pickup_zone,dropoff_zone,pickup_borough,dropoff_borough
2019-03-08 02:56:38,2019-03-08 03:07:24,1,2.4,10.5,0.0,0.0,14.3,yellow,,Murray Hill,West Village,Manhattan,Manhattan
2019-03-02 19:01:36,2019-03-02 19:08:46,0,1.4,7.0,0.0,0.0,10.3,yellow,,Upper East Side South,Murray Hill,Manhattan,Manhattan
2019-03-23 11:07:11,2019-03-23 11:32:46,1,0.0,19.0,0.0,0.0,22.3,yellow,,West Village,Upper East Side South,Manhattan,Manhattan
2019-03-15 00:10:38,2019-03-15 00:21:39,1,3.1,12.5,0.0,0.0,16.3,yellow,,West Chelsea/Hudson Yards,Upper West Side South,Manhattan,Manhattan
2019-03-30 11:38:20,2019-03-30 11:40:45,1,0.2,3.5,0.0,0.0,6.8,yellow,,Upper West Side North,Upper West Side North,Manhattan,Manhattan


<a id="actionsfeaturestoretables"></a>
<div style='background-color:rgba(30, 144, 255, 0.1);border-radius:5px;padding:2px;'>
<span style="font-variant-caps: small-caps;font-weight:700">5. Create, overwrite, merge and read Feature Store tables in machine learning workflows</span></div>

<p>Let's add an index to the table:</p>

In [None]:
taxis_sdf = taxis_sdf.withColumn("id", monotonically_increasing_id())

<p>A Catalog and schema must exist. Then if one them is missing, it is necessary to create it/them. For the example here, let's use the default existing Catalog <code>hive_metastore</code> and create a Schema in it:</p>

In [None]:
# Create a catalog
spark.sql("USE CATALOG hive_metastore")
spark.sql("CREATE SCHEMA IF NOT EXISTS seaborn_db")

Out[77]: DataFrame[]

<p>The following command is to create and populate a feature store table in schema <code>seaborn_df</code> from <code>hive_metastore</code> Catalog:</p>

In [None]:
fs = FeatureStoreClient()
#
customer_feature_table = fs.create_table(
  name='hive_metastore.seaborn_db.taxidataset', # format is <catalog_name>.<schema_name>.<table_name>
  primary_keys='id',                            # required
  schema=taxis_sdf.schema,                      # either schema or df parameter is required - if df is provided, the feature store table will be automatically populated with the data, otherwise only the structure of the table will be created, it will need to be populated later
  df = taxis_sdf,                               # either schema or df parameter is required - if df is provided, the feature store table will be automatically populated with the data, otherwise only the structure of the table will be created, it will need to be populated later
  description='Seaborn taxi dataset features'
)



<p>The previously created table in Feature Store can be deleted this way:</p>

In [None]:
fs.drop_table(name='seaborn_db.taxidataset')



<p>Alternatively, an empty table structure can be created in the Feature store to be populated later. The command below only creates the structure of the table in Feature Store:</p>

In [None]:
customer_feature_table = fs.create_table(
  name='hive_metastore.seaborn_db.taxidataset', # format is <catalog_name>.<schema_name>.<table_name>
  primary_keys='id',                            # required
  schema=taxis_sdf.schema,                      # here only schema is provided, the feature table is created empty
  description='Seaborn taxi dataset features'
)

2023/11/22 16:27:21 INFO databricks.feature_store._compute_client._compute_client: Created feature table 'hive_metastore.seaborn_db.taxidataset'.


<p>Next step is to write data in it:</p>

In [None]:
fs.write_table(
  df=taxis_sdf,
  name='seaborn_db.taxidataset',
  mode='merge'                   # mode = 'overwrite' could also be used in this particular case
)

<p>Finally, feature store table can be read this way:</p>

In [None]:
display(fs.read_table(name='seaborn_db.taxidataset').limit(5))

pickup,dropoff,passengers,distance,fare,tip,tolls,total,color,payment,pickup_zone,dropoff_zone,pickup_borough,dropoff_borough,id
2019-03-17 13:16:13,2019-03-17 13:40:32,1,2.9,17.0,0.0,0.0,20.3,yellow,cash,Clinton East,Lenox Hill East,Manhattan,Manhattan,0
2019-03-22 08:43:39,2019-03-22 08:51:49,1,0.55,6.5,1.96,0.0,11.76,yellow,credit card,Union Sq,Murray Hill,Manhattan,Manhattan,1
2019-03-02 21:42:07,2019-03-02 22:01:15,1,2.5,13.5,3.45,0.0,20.75,yellow,credit card,SoHo,Murray Hill,Manhattan,Manhattan,2
2019-03-13 21:49:00,2019-03-13 22:02:04,1,2.68,11.0,1.0,0.0,15.8,yellow,credit card,East Chelsea,Alphabet City,Manhattan,Manhattan,3
2019-03-03 12:55:38,2019-03-03 13:01:05,1,0.85,5.5,0.0,0.0,8.8,yellow,cash,Midtown Center,Murray Hill,Manhattan,Manhattan,4


<img width="1000px" src="https://i.ibb.co/Yy56SQz/unity-catalog-1.png"/>