Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions _resources/00-global-setup-v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import requests
import collections
import os

import re

class DBDemos():
@staticmethod
Expand All @@ -41,24 +41,24 @@ def use_and_create_db(catalog, dbName, cloud_storage_path = None):
spark.sql(f"CREATE CATALOG IF NOT EXISTS `{catalog}`")
if catalog == 'dbdemos':
spark.sql(f"ALTER CATALOG `{catalog}` OWNER TO `account users`")
use_and_create_db(catalog, dbName)
use_and_create_db(catalog, db)

if catalog == 'dbdemos':
try:
spark.sql(f"GRANT CREATE, USAGE on DATABASE `{catalog}`.`{dbName}` TO `account users`")
spark.sql(f"ALTER SCHEMA `{catalog}`.`{dbName}` OWNER TO `account users`")
for t in spark.sql(f'SHOW TABLES in {catalog}.{dbName}').collect():
spark.sql(f"GRANT CREATE, USAGE on DATABASE `{catalog}`.`{db}` TO `account users`")
spark.sql(f"ALTER SCHEMA `{catalog}`.`{db}` OWNER TO `account users`")
for t in spark.sql(f'SHOW TABLES in {catalog}.{db}').collect():
try:
spark.sql(f'GRANT ALL PRIVILEGES ON TABLE {catalog}.{dbName}.{t["tableName"]} TO `account users`')
spark.sql(f'ALTER TABLE {catalog}.{dbName}.{t["tableName"]} OWNER TO `account users`')
spark.sql(f'GRANT ALL PRIVILEGES ON TABLE {catalog}.{db}.{t["tableName"]} TO `account users`')
spark.sql(f'ALTER TABLE {catalog}.{db}.{t["tableName"]} OWNER TO `account users`')
except Exception as e:
if "NOT_IMPLEMENTED.TRANSFER_MATERIALIZED_VIEW_OWNERSHIP" not in str(e) and "STREAMING_TABLE_OPERATION_NOT_ALLOWED.UNSUPPORTED_OPERATION" not in str(e) :
print(f'WARN: Couldn t set table {catalog}.{dbName}.{t["tableName"]} owner to account users, error: {e}')
print(f'WARN: Couldn t set table {catalog}.{db}.{t["tableName"]} owner to account users, error: {e}')
except Exception as e:
print("Couldn't grant access to the schema to all users:"+str(e))

print(f"using catalog.database `{catalog}`.`{dbName}`")
spark.sql(f"""USE `{catalog}`.`{dbName}`""")
print(f"using catalog.database `{catalog}`.`{db}`")
spark.sql(f"""USE `{catalog}`.`{db}`""")

if volume_name:
spark.sql(f'CREATE VOLUME IF NOT EXISTS {volume_name};')
Expand Down
133 changes: 90 additions & 43 deletions product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,14 @@

# COMMAND ----------

from pyspark.sql.functions import input_file_name, col
from pyspark.sql import DataFrame
import time

# COMMAND ----------

# DBTITLE 1,Let's explore our incoming data. We receive CSV files with client information
cdc_raw_data = spark.read.option('header', "true").csv(raw_data_location+'/user_csv')
cdc_raw_data = spark.read.option('header', "true").csv(volume_folder+'/user_csv')
display(cdc_raw_data)

# COMMAND ----------
Expand All @@ -73,20 +79,18 @@

# DBTITLE 1,We need to keep the cdc information, however csv isn't a efficient storage. Let's put that in a Delta table instead:
bronzeDF = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
#.option("cloudFiles.maxFilesPerTrigger", "1") #Simulate streaming, remove in production
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.schemaLocation", cloud_storage_path+"/schema_cdc_raw")
.option("cloudFiles.schemaHints", "id bigint, operation_date timestamp")
.load(raw_data_location+'/user_csv'))

(bronzeDF.withColumn("file_name", input_file_name()).writeStream
.option("checkpointLocation", cloud_storage_path+"/checkpoint_cdc_raw")
.trigger(processingTime='10 seconds')
.table("clients_cdc"))

time.sleep(20)
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.schemaLocation", volume_folder+"/schema_cdc_raw")
.option("cloudFiles.schemaHints", "id bigint, operation_date timestamp")
.load(volume_folder+'/user_csv'))

(bronzeDF.withColumn("file_name", col("_metadata.file_path")).writeStream
.option("checkpointLocation", volume_folder+"/checkpoint_cdc_raw")
.trigger(availableNow=True)
.table(f"`{catalog}`.`{db}`.clients_cdc")
.awaitTermination())

# COMMAND ----------

Expand All @@ -109,21 +113,46 @@

# COMMAND ----------

# DBTITLE 1,We can now create our client table using standard SQL command
# DBTITLE 1,We can now create our client table using a standard SQL command
# MAGIC %sql
# MAGIC -- we can add NOT NULL in our ID field (or even more advanced constraint)
# MAGIC CREATE TABLE IF NOT EXISTS retail_client_silver (id BIGINT NOT NULL, name STRING, address STRING, email STRING, operation STRING)
# MAGIC TBLPROPERTIES (delta.enableChangeDataFeed = true, delta.autoOptimize.optimizeWrite = true, delta.autoOptimize.autoCompact = true);
# MAGIC CREATE TABLE IF NOT EXISTS retail_client_silver (
# MAGIC id BIGINT NOT NULL,
# MAGIC name STRING,
# MAGIC address STRING,
# MAGIC email STRING,
# MAGIC operation STRING,
# MAGIC CONSTRAINT id_pk PRIMARY KEY(id))
# MAGIC TBLPROPERTIES (
# MAGIC delta.enableChangeDataFeed = true,
# MAGIC delta.autoOptimize.optimizeWrite = true,
# MAGIC delta.autoOptimize.autoCompact = true
# MAGIC );

# COMMAND ----------

# DBTITLE 1,And run our MERGE statement the upsert the CDC information in our final table
#for each batch / incremental update from the raw cdc table, we'll run a MERGE on the silver table
def merge_stream(df, i):
def merge_stream(df: DataFrame, i):
"""
Processes a microbatch of CDC (Change Data Capture) data to merge it into the 'retail_client_silver' table.
This method performs deduplication and upserts or deletes records based on the operation specified in each row.

Args:
df (DataFrame): The DataFrame representing the microbatch of CDC data.
i (int): The batch ID, not directly used in this process.

The method performs these steps:
1. Temporarily registers the DataFrame as 'clients_cdc_microbatch' to allow SQL operations.
2. Deduplicates the incoming data by 'id', keeping the latest operation for each 'id'.
3. Executes a MERGE SQL operation on 'retail_client_silver':
- Deletes records if the latest operation for an 'id' is 'DELETE'.
- Updates records for an 'id' if the latest operation is not 'DELETE'.
- Inserts new records if an 'id' does not exist in 'retail_client_silver' and the operation is not 'DELETE'.
"""

df.createOrReplaceTempView("clients_cdc_microbatch")
#First we need to dedup the incoming data based on ID (we can have multiple update of the same row in our incoming data)
#Then we run the merge (upsert or delete). We could do it with a window and filter on rank() == 1 too
df._jdf.sparkSession().sql("""MERGE INTO retail_client_silver target

df.sparkSession.sql("""MERGE INTO retail_client_silver target
USING
(select id, name, address, email, operation from
(SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY operation_date DESC) as rank from clients_cdc_microbatch)
Expand All @@ -134,15 +163,20 @@ def merge_stream(df, i):
WHEN MATCHED AND source.operation != 'DELETE' THEN UPDATE SET *
WHEN NOT MATCHED AND source.operation != 'DELETE' THEN INSERT *""")

spark.readStream \
.table("clients_cdc") \
.writeStream \
.foreachBatch(merge_stream) \
.option("checkpointLocation", cloud_storage_path+"/checkpoint_clients_cdc") \
.trigger(processingTime='10 seconds') \
.start()
def trigger_silver_stream():
"""
Initiates a structured streaming process that reads change data capture (CDC) records from a specified table and processes them in batches using a custom merge function. The process is designed to handle streaming updates efficiently, applying changes to a 'silver' table based on the incoming stream.
"""
(spark.readStream
.table(f"`{catalog}`.`{db}`.clients_cdc")
.writeStream
.foreachBatch(merge_stream)
.option("checkpointLocation", volume_folder+"/checkpoint_clients_cdc")
.trigger(availableNow=True)
.start()
.awaitTermination())

time.sleep(20)
trigger_silver_stream()

# COMMAND ----------

Expand All @@ -153,25 +187,25 @@ def merge_stream(df, i):

# MAGIC %md
# MAGIC ### Testing the first CDC layer
# MAGIC Let's send a new CDC entry to simulate an update and a DELETE for the ID 1 and 2
# MAGIC Let's send a new CDC entry to simulate an update and a DELETE for the ID 1000 and 2000

# COMMAND ----------

# DBTITLE 1,Let's UPDATE id=1 and DELETE the row with id=2
# DBTITLE 1,Let's UPDATE id=1000 and DELETE the row with id=2000
# MAGIC %sql
# MAGIC insert into clients_cdc (id, name, address, email, operation_date, operation, _rescued_data, file_name) values
# MAGIC (1000, "Quentin", "Paris 75020", "[email protected]", now(), "UPDATE", null, null),
# MAGIC (2000, null, null, null, now(), "DELETE", null, null);
# MAGIC (1000, "Quentin", "123 Paper Street, UT 75020", "[email protected]", now(), "UPDATE", null, null),
# MAGIC (2000, null, null, null, now(), "DELETE", null, null);
# MAGIC
# MAGIC select * from clients_cdc where id in (1000, 2000);

# COMMAND ----------

#wait for the stream to get the new data
time.sleep(20)
# explicitly trigger the stream in our example; It's equally easy to just have the stream run 24/7
trigger_silver_stream()

# COMMAND ----------

# DBTITLE 1,Wait a few seconds for the stream to catch the new entry in the CDC table and check the results in the main table
# MAGIC %sql
# MAGIC select * from retail_client_silver where id in (1000, 2000);
# MAGIC -- Note that ID 1000 has been updated, and ID 2000 is deleted
Expand Down Expand Up @@ -203,7 +237,7 @@ def merge_stream(df, i):
# MAGIC ALTER TABLE retail_client_silver SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
# MAGIC
# MAGIC -- Delta Lake CDF works using table_changes function:
# MAGIC SELECT * FROM table_changes('retail_client_silver', 1) order by id
# MAGIC SELECT * FROM table_changes('retail_client_silver', 1) order by id

# COMMAND ----------

Expand Down Expand Up @@ -231,7 +265,7 @@ def merge_stream(df, i):
changes = spark.read.format("delta") \
.option("readChangeData", "true") \
.option("startingVersion", int(last_version) -1) \
.table("retail_client_silver")
.table(f"`{catalog}`.`{db}`.retail_client_silver")
display(changes)

# COMMAND ----------
Expand All @@ -250,7 +284,19 @@ def merge_stream(df, i):

# DBTITLE 1,Let's create or final GOLD table: retail_client_gold
# MAGIC %sql
# MAGIC CREATE TABLE IF NOT EXISTS retail_client_gold (id BIGINT NOT NULL, name STRING, address STRING, email STRING, gold_data STRING);
# MAGIC CREATE TABLE IF NOT EXISTS retail_client_gold (
# MAGIC id BIGINT NOT NULL,
# MAGIC name STRING,
# MAGIC address STRING,
# MAGIC email STRING,
# MAGIC gold_data STRING,
# MAGIC CONSTRAINT gold_id_pk PRIMARY KEY(id));

# COMMAND ----------

# MAGIC %md
# MAGIC
# MAGIC Now we can create our initial Gold table using the latest version of our Silver table. Keep in mind that we are **not** looking at the Change Data Feed (CDF) here. We are utilizing the latest version of our siler table that is synced with our external table. Also note that some of these states are not real, and only for demonstration.

# COMMAND ----------

Expand All @@ -269,7 +315,7 @@ def upsertToDelta(data, batchId):
data_deduplicated = data_deduplicated.withColumn("address", regexp_replace(col("address"), "\"", ""))

#run the merge in the gold table directly
(DeltaTable.forName(spark, "retail_client_gold").alias("target")
(DeltaTable.forName(data.sparkSession, "retail_client_gold").alias("target")
.merge(data_deduplicated.alias("source"), "source.id = target.id")
.whenMatchedDelete("source._change_type = 'delete'")
.whenMatchedUpdateAll("source._change_type != 'delete'")
Expand All @@ -290,6 +336,7 @@ def upsertToDelta(data, batchId):

# COMMAND ----------

# DBTITLE 1,Start the gold stream
# MAGIC %sql SELECT * FROM retail_client_gold

# COMMAND ----------
Expand Down Expand Up @@ -325,4 +372,4 @@ def upsertToDelta(data, batchId):
# COMMAND ----------

# DBTITLE 1,Make sure we stop all actives streams
stop_all_streams()
DBDemos.stop_all_streams()
24 changes: 14 additions & 10 deletions product_demos/cdc-pipeline/02-CDC-CDF-full-multi-tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,16 @@

# COMMAND ----------

from concurrent.futures import ThreadPoolExecutor
from collections import deque
from delta.tables import *
from pyspark.sql.functions import input_file_name, col, row_number
from pyspark.sql.window import Window

# COMMAND ----------

# DBTITLE 1,Let's explore our raw cdc data. We have 2 tables we want to sync (transactions and users)
base_folder = f"{raw_data_location}/cdc"
base_folder = f"{volume_folder}/cdc"
display(dbutils.fs.ls(base_folder))

# COMMAND ----------
Expand All @@ -67,7 +75,7 @@

# COMMAND ----------

dbutils.fs.rm(f"{cloud_storage_path}/cdc_full", True)
dbutils.fs.rm(f"{volume_folder}/cdc_full", True)

# COMMAND ----------

Expand All @@ -78,13 +86,13 @@ def update_bronze_layer(path, bronze_table):
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("cloudFiles.schemaLocation", f"{cloud_storage_path}/cdc_full/schemas/{bronze_table}")
.option("cloudFiles.schemaLocation", f"{volume_folder}/cdc_full/schemas/{bronze_table}")
.option("cloudFiles.schemaHints", "id bigint, operation_date timestamp")
.option("cloudFiles.inferColumnTypes", "true")
.load(path)
.withColumn("file_name", input_file_name())
.writeStream
.option("checkpointLocation", f"{cloud_storage_path}/cdc_full/checkpoints/{bronze_table}")
.option("checkpointLocation", f"{volume_folder}/cdc_full/checkpoints/{bronze_table}")
.option("mergeSchema", "true")
.trigger(once=True)
.table(bronze_table).awaitTermination())
Expand Down Expand Up @@ -121,7 +129,7 @@ def merge_stream(updates, i):
.table(bronze_table)
.writeStream
.foreachBatch(merge_stream)
.option("checkpointLocation", f"{cloud_storage_path}/cdc_full/checkpoints/{silver_table}")
.option("checkpointLocation", f"{volume_folder}/cdc_full/checkpoints/{silver_table}")
.trigger(once=True)
.start().awaitTermination())

Expand All @@ -133,10 +141,6 @@ def merge_stream(updates, i):

# COMMAND ----------

from concurrent.futures import ThreadPoolExecutor
from collections import deque
from delta.tables import *

def refresh_cdc_table(table):
try:
#update the bronze table
Expand Down Expand Up @@ -189,4 +193,4 @@ def refresh_cdc_table(table):
# COMMAND ----------

# DBTITLE 1,Make sure we stop all actives streams
stop_all_streams()
DBDemos.stop_all_streams()
37 changes: 28 additions & 9 deletions product_demos/cdc-pipeline/_resources/00-setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,46 @@

# COMMAND ----------

# MAGIC %run ../../../_resources/00-global-setup $reset_all_data=$reset_all_data $db_prefix=retail
# MAGIC %run ../../../_resources/00-global-setup-v2

# COMMAND ----------

import json
import time
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
# Note: End users should not modify this code.
# Instead, use dbdemos.install("cdc-pipeline", catalog="..", schema="...")

catalog = "dbdemos"
db = "cdc_pipeline"
volume_name = "cdc_pipeline"
reset_all_data = dbutils.widgets.get("reset_all_data") == "true"
raw_data_location = cloud_storage_path+"/delta_cdf"

if reset_all_data or is_folder_empty(raw_data_location+"/user_csv"):
DBDemos.setup_schema(catalog, db, reset_all_data, volume_name)
volume_folder = f"/Volumes/{catalog}/{db}/{volume_name}"

# COMMAND ----------

total_databases = spark.sql(f"show databases in {catalog} like '{db}'").count()
assert (total_databases == 1), f"There should be exactly one database [{db}] within catalog [{catalog}]"

total_volumes = spark.sql(f"show volumes in `{catalog}`.`{db}`").count()
assert (total_volumes == 1), f"There should be exactly one volume [{volume_name}] within {catalog}.{db}"

# COMMAND ----------

if reset_all_data or DBDemos.is_folder_empty(volume_folder+"/user_csv"):
# delete all data
print("Dropping table...", end='')
spark.sql(f"USE `{catalog}`.`{db}`")
spark.sql("""DROP TABLE if exists clients_cdc""")
spark.sql("""DROP TABLE if exists retail_client_silver""")
spark.sql("""DROP TABLE if exists retail_client_gold""")
print(" Done")

#data generation on another notebook to avoid installing libraries (takes a few seconds to setup pip env)
print(f"Generating data under {raw_data_location} , please wait a few sec...")
print(f"Generating data under {volume_folder}. This may take several minutes...")
path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
parent_count = path[path.rfind("Delta-Lake-CDC-CDF"):].count('/') - 1
prefix = "./" if parent_count == 0 else parent_count*"../"
prefix = f'{prefix}_resources/'
dbutils.notebook.run(prefix+"01-load-data", 120, {"raw_data_location": raw_data_location})
dbutils.notebook.run(path=prefix+"01-load-data", timeout_seconds=300, arguments={"raw_data_location": volume_folder})
else:
print("data already existing. Run with reset_all_data=true to force a data cleanup for your local demo.")
Loading