diff --git a/_resources/00-global-setup-v2.py b/_resources/00-global-setup-v2.py index 88ea5e47..5d791142 100644 --- a/_resources/00-global-setup-v2.py +++ b/_resources/00-global-setup-v2.py @@ -16,7 +16,7 @@ import requests import collections import os - +import re class DBDemos(): @staticmethod @@ -41,24 +41,24 @@ def use_and_create_db(catalog, dbName, cloud_storage_path = None): spark.sql(f"CREATE CATALOG IF NOT EXISTS `{catalog}`") if catalog == 'dbdemos': spark.sql(f"ALTER CATALOG `{catalog}` OWNER TO `account users`") - use_and_create_db(catalog, dbName) + use_and_create_db(catalog, db) if catalog == 'dbdemos': try: - spark.sql(f"GRANT CREATE, USAGE on DATABASE `{catalog}`.`{dbName}` TO `account users`") - spark.sql(f"ALTER SCHEMA `{catalog}`.`{dbName}` OWNER TO `account users`") - for t in spark.sql(f'SHOW TABLES in {catalog}.{dbName}').collect(): + spark.sql(f"GRANT CREATE, USAGE on DATABASE `{catalog}`.`{db}` TO `account users`") + spark.sql(f"ALTER SCHEMA `{catalog}`.`{db}` OWNER TO `account users`") + for t in spark.sql(f'SHOW TABLES in {catalog}.{db}').collect(): try: - spark.sql(f'GRANT ALL PRIVILEGES ON TABLE {catalog}.{dbName}.{t["tableName"]} TO `account users`') - spark.sql(f'ALTER TABLE {catalog}.{dbName}.{t["tableName"]} OWNER TO `account users`') + spark.sql(f'GRANT ALL PRIVILEGES ON TABLE {catalog}.{db}.{t["tableName"]} TO `account users`') + spark.sql(f'ALTER TABLE {catalog}.{db}.{t["tableName"]} OWNER TO `account users`') except Exception as e: if "NOT_IMPLEMENTED.TRANSFER_MATERIALIZED_VIEW_OWNERSHIP" not in str(e) and "STREAMING_TABLE_OPERATION_NOT_ALLOWED.UNSUPPORTED_OPERATION" not in str(e) : - print(f'WARN: Couldn t set table {catalog}.{dbName}.{t["tableName"]} owner to account users, error: {e}') + print(f'WARN: Couldn t set table {catalog}.{db}.{t["tableName"]} owner to account users, error: {e}') except Exception as e: print("Couldn't grant access to the schema to all users:"+str(e)) - print(f"using catalog.database `{catalog}`.`{dbName}`") - spark.sql(f"""USE `{catalog}`.`{dbName}`""") + print(f"using catalog.database `{catalog}`.`{db}`") + spark.sql(f"""USE `{catalog}`.`{db}`""") if volume_name: spark.sql(f'CREATE VOLUME IF NOT EXISTS {volume_name};') diff --git a/product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py b/product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py index 5c63845a..4efda25d 100644 --- a/product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py +++ b/product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py @@ -60,8 +60,14 @@ # COMMAND ---------- +from pyspark.sql.functions import input_file_name, col +from pyspark.sql import DataFrame +import time + +# COMMAND ---------- + # DBTITLE 1,Let's explore our incoming data. We receive CSV files with client information -cdc_raw_data = spark.read.option('header', "true").csv(raw_data_location+'/user_csv') +cdc_raw_data = spark.read.option('header', "true").csv(volume_folder+'/user_csv') display(cdc_raw_data) # COMMAND ---------- @@ -73,20 +79,18 @@ # DBTITLE 1,We need to keep the cdc information, however csv isn't a efficient storage. Let's put that in a Delta table instead: bronzeDF = (spark.readStream - .format("cloudFiles") - .option("cloudFiles.format", "csv") - #.option("cloudFiles.maxFilesPerTrigger", "1") #Simulate streaming, remove in production - .option("cloudFiles.inferColumnTypes", "true") - .option("cloudFiles.schemaLocation", cloud_storage_path+"/schema_cdc_raw") - .option("cloudFiles.schemaHints", "id bigint, operation_date timestamp") - .load(raw_data_location+'/user_csv')) - -(bronzeDF.withColumn("file_name", input_file_name()).writeStream - .option("checkpointLocation", cloud_storage_path+"/checkpoint_cdc_raw") - .trigger(processingTime='10 seconds') - .table("clients_cdc")) - -time.sleep(20) + .format("cloudFiles") + .option("cloudFiles.format", "csv") + .option("cloudFiles.inferColumnTypes", "true") + .option("cloudFiles.schemaLocation", volume_folder+"/schema_cdc_raw") + .option("cloudFiles.schemaHints", "id bigint, operation_date timestamp") + .load(volume_folder+'/user_csv')) + +(bronzeDF.withColumn("file_name", col("_metadata.file_path")).writeStream + .option("checkpointLocation", volume_folder+"/checkpoint_cdc_raw") + .trigger(availableNow=True) + .table(f"`{catalog}`.`{db}`.clients_cdc") + .awaitTermination()) # COMMAND ---------- @@ -109,21 +113,46 @@ # COMMAND ---------- -# DBTITLE 1,We can now create our client table using standard SQL command +# DBTITLE 1,We can now create our client table using a standard SQL command # MAGIC %sql # MAGIC -- we can add NOT NULL in our ID field (or even more advanced constraint) -# MAGIC CREATE TABLE IF NOT EXISTS retail_client_silver (id BIGINT NOT NULL, name STRING, address STRING, email STRING, operation STRING) -# MAGIC TBLPROPERTIES (delta.enableChangeDataFeed = true, delta.autoOptimize.optimizeWrite = true, delta.autoOptimize.autoCompact = true); +# MAGIC CREATE TABLE IF NOT EXISTS retail_client_silver ( +# MAGIC id BIGINT NOT NULL, +# MAGIC name STRING, +# MAGIC address STRING, +# MAGIC email STRING, +# MAGIC operation STRING, +# MAGIC CONSTRAINT id_pk PRIMARY KEY(id)) +# MAGIC TBLPROPERTIES ( +# MAGIC delta.enableChangeDataFeed = true, +# MAGIC delta.autoOptimize.optimizeWrite = true, +# MAGIC delta.autoOptimize.autoCompact = true +# MAGIC ); # COMMAND ---------- # DBTITLE 1,And run our MERGE statement the upsert the CDC information in our final table -#for each batch / incremental update from the raw cdc table, we'll run a MERGE on the silver table -def merge_stream(df, i): +def merge_stream(df: DataFrame, i): + """ + Processes a microbatch of CDC (Change Data Capture) data to merge it into the 'retail_client_silver' table. + This method performs deduplication and upserts or deletes records based on the operation specified in each row. + + Args: + df (DataFrame): The DataFrame representing the microbatch of CDC data. + i (int): The batch ID, not directly used in this process. + + The method performs these steps: + 1. Temporarily registers the DataFrame as 'clients_cdc_microbatch' to allow SQL operations. + 2. Deduplicates the incoming data by 'id', keeping the latest operation for each 'id'. + 3. Executes a MERGE SQL operation on 'retail_client_silver': + - Deletes records if the latest operation for an 'id' is 'DELETE'. + - Updates records for an 'id' if the latest operation is not 'DELETE'. + - Inserts new records if an 'id' does not exist in 'retail_client_silver' and the operation is not 'DELETE'. + """ + df.createOrReplaceTempView("clients_cdc_microbatch") - #First we need to dedup the incoming data based on ID (we can have multiple update of the same row in our incoming data) - #Then we run the merge (upsert or delete). We could do it with a window and filter on rank() == 1 too - df._jdf.sparkSession().sql("""MERGE INTO retail_client_silver target + + df.sparkSession.sql("""MERGE INTO retail_client_silver target USING (select id, name, address, email, operation from (SELECT *, ROW_NUMBER() OVER (PARTITION BY id ORDER BY operation_date DESC) as rank from clients_cdc_microbatch) @@ -134,15 +163,20 @@ def merge_stream(df, i): WHEN MATCHED AND source.operation != 'DELETE' THEN UPDATE SET * WHEN NOT MATCHED AND source.operation != 'DELETE' THEN INSERT *""") -spark.readStream \ - .table("clients_cdc") \ - .writeStream \ - .foreachBatch(merge_stream) \ - .option("checkpointLocation", cloud_storage_path+"/checkpoint_clients_cdc") \ - .trigger(processingTime='10 seconds') \ - .start() +def trigger_silver_stream(): + """ + Initiates a structured streaming process that reads change data capture (CDC) records from a specified table and processes them in batches using a custom merge function. The process is designed to handle streaming updates efficiently, applying changes to a 'silver' table based on the incoming stream. + """ + (spark.readStream + .table(f"`{catalog}`.`{db}`.clients_cdc") + .writeStream + .foreachBatch(merge_stream) + .option("checkpointLocation", volume_folder+"/checkpoint_clients_cdc") + .trigger(availableNow=True) + .start() + .awaitTermination()) -time.sleep(20) +trigger_silver_stream() # COMMAND ---------- @@ -153,25 +187,25 @@ def merge_stream(df, i): # MAGIC %md # MAGIC ### Testing the first CDC layer -# MAGIC Let's send a new CDC entry to simulate an update and a DELETE for the ID 1 and 2 +# MAGIC Let's send a new CDC entry to simulate an update and a DELETE for the ID 1000 and 2000 # COMMAND ---------- -# DBTITLE 1,Let's UPDATE id=1 and DELETE the row with id=2 +# DBTITLE 1,Let's UPDATE id=1000 and DELETE the row with id=2000 # MAGIC %sql # MAGIC insert into clients_cdc (id, name, address, email, operation_date, operation, _rescued_data, file_name) values -# MAGIC (1000, "Quentin", "Paris 75020", "quentin.ambard@databricks.com", now(), "UPDATE", null, null), -# MAGIC (2000, null, null, null, now(), "DELETE", null, null); +# MAGIC (1000, "Quentin", "123 Paper Street, UT 75020", "quentin.ambard@databricks.com", now(), "UPDATE", null, null), +# MAGIC (2000, null, null, null, now(), "DELETE", null, null); +# MAGIC # MAGIC select * from clients_cdc where id in (1000, 2000); # COMMAND ---------- -#wait for the stream to get the new data -time.sleep(20) +# explicitly trigger the stream in our example; It's equally easy to just have the stream run 24/7 +trigger_silver_stream() # COMMAND ---------- -# DBTITLE 1,Wait a few seconds for the stream to catch the new entry in the CDC table and check the results in the main table # MAGIC %sql # MAGIC select * from retail_client_silver where id in (1000, 2000); # MAGIC -- Note that ID 1000 has been updated, and ID 2000 is deleted @@ -203,7 +237,7 @@ def merge_stream(df, i): # MAGIC ALTER TABLE retail_client_silver SET TBLPROPERTIES (delta.enableChangeDataFeed = true); # MAGIC # MAGIC -- Delta Lake CDF works using table_changes function: -# MAGIC SELECT * FROM table_changes('retail_client_silver', 1) order by id +# MAGIC SELECT * FROM table_changes('retail_client_silver', 1) order by id # COMMAND ---------- @@ -231,7 +265,7 @@ def merge_stream(df, i): changes = spark.read.format("delta") \ .option("readChangeData", "true") \ .option("startingVersion", int(last_version) -1) \ - .table("retail_client_silver") + .table(f"`{catalog}`.`{db}`.retail_client_silver") display(changes) # COMMAND ---------- @@ -250,7 +284,19 @@ def merge_stream(df, i): # DBTITLE 1,Let's create or final GOLD table: retail_client_gold # MAGIC %sql -# MAGIC CREATE TABLE IF NOT EXISTS retail_client_gold (id BIGINT NOT NULL, name STRING, address STRING, email STRING, gold_data STRING); +# MAGIC CREATE TABLE IF NOT EXISTS retail_client_gold ( +# MAGIC id BIGINT NOT NULL, +# MAGIC name STRING, +# MAGIC address STRING, +# MAGIC email STRING, +# MAGIC gold_data STRING, +# MAGIC CONSTRAINT gold_id_pk PRIMARY KEY(id)); + +# COMMAND ---------- + +# MAGIC %md +# MAGIC +# MAGIC Now we can create our initial Gold table using the latest version of our Silver table. Keep in mind that we are **not** looking at the Change Data Feed (CDF) here. We are utilizing the latest version of our siler table that is synced with our external table. Also note that some of these states are not real, and only for demonstration. # COMMAND ---------- @@ -269,7 +315,7 @@ def upsertToDelta(data, batchId): data_deduplicated = data_deduplicated.withColumn("address", regexp_replace(col("address"), "\"", "")) #run the merge in the gold table directly - (DeltaTable.forName(spark, "retail_client_gold").alias("target") + (DeltaTable.forName(data.sparkSession, "retail_client_gold").alias("target") .merge(data_deduplicated.alias("source"), "source.id = target.id") .whenMatchedDelete("source._change_type = 'delete'") .whenMatchedUpdateAll("source._change_type != 'delete'") @@ -290,6 +336,7 @@ def upsertToDelta(data, batchId): # COMMAND ---------- +# DBTITLE 1,Start the gold stream # MAGIC %sql SELECT * FROM retail_client_gold # COMMAND ---------- @@ -325,4 +372,4 @@ def upsertToDelta(data, batchId): # COMMAND ---------- # DBTITLE 1,Make sure we stop all actives streams -stop_all_streams() +DBDemos.stop_all_streams() diff --git a/product_demos/cdc-pipeline/02-CDC-CDF-full-multi-tables.py b/product_demos/cdc-pipeline/02-CDC-CDF-full-multi-tables.py index da070595..3dd32a70 100644 --- a/product_demos/cdc-pipeline/02-CDC-CDF-full-multi-tables.py +++ b/product_demos/cdc-pipeline/02-CDC-CDF-full-multi-tables.py @@ -57,8 +57,16 @@ # COMMAND ---------- +from concurrent.futures import ThreadPoolExecutor +from collections import deque +from delta.tables import * +from pyspark.sql.functions import input_file_name, col, row_number +from pyspark.sql.window import Window + +# COMMAND ---------- + # DBTITLE 1,Let's explore our raw cdc data. We have 2 tables we want to sync (transactions and users) -base_folder = f"{raw_data_location}/cdc" +base_folder = f"{volume_folder}/cdc" display(dbutils.fs.ls(base_folder)) # COMMAND ---------- @@ -67,7 +75,7 @@ # COMMAND ---------- -dbutils.fs.rm(f"{cloud_storage_path}/cdc_full", True) +dbutils.fs.rm(f"{volume_folder}/cdc_full", True) # COMMAND ---------- @@ -78,13 +86,13 @@ def update_bronze_layer(path, bronze_table): (spark.readStream .format("cloudFiles") .option("cloudFiles.format", "csv") - .option("cloudFiles.schemaLocation", f"{cloud_storage_path}/cdc_full/schemas/{bronze_table}") + .option("cloudFiles.schemaLocation", f"{volume_folder}/cdc_full/schemas/{bronze_table}") .option("cloudFiles.schemaHints", "id bigint, operation_date timestamp") .option("cloudFiles.inferColumnTypes", "true") .load(path) .withColumn("file_name", input_file_name()) .writeStream - .option("checkpointLocation", f"{cloud_storage_path}/cdc_full/checkpoints/{bronze_table}") + .option("checkpointLocation", f"{volume_folder}/cdc_full/checkpoints/{bronze_table}") .option("mergeSchema", "true") .trigger(once=True) .table(bronze_table).awaitTermination()) @@ -121,7 +129,7 @@ def merge_stream(updates, i): .table(bronze_table) .writeStream .foreachBatch(merge_stream) - .option("checkpointLocation", f"{cloud_storage_path}/cdc_full/checkpoints/{silver_table}") + .option("checkpointLocation", f"{volume_folder}/cdc_full/checkpoints/{silver_table}") .trigger(once=True) .start().awaitTermination()) @@ -133,10 +141,6 @@ def merge_stream(updates, i): # COMMAND ---------- -from concurrent.futures import ThreadPoolExecutor -from collections import deque -from delta.tables import * - def refresh_cdc_table(table): try: #update the bronze table @@ -189,4 +193,4 @@ def refresh_cdc_table(table): # COMMAND ---------- # DBTITLE 1,Make sure we stop all actives streams -stop_all_streams() +DBDemos.stop_all_streams() diff --git a/product_demos/cdc-pipeline/_resources/00-setup.py b/product_demos/cdc-pipeline/_resources/00-setup.py index cf5ca476..fb77ab86 100644 --- a/product_demos/cdc-pipeline/_resources/00-setup.py +++ b/product_demos/cdc-pipeline/_resources/00-setup.py @@ -3,27 +3,46 @@ # COMMAND ---------- -# MAGIC %run ../../../_resources/00-global-setup $reset_all_data=$reset_all_data $db_prefix=retail +# MAGIC %run ../../../_resources/00-global-setup-v2 # COMMAND ---------- -import json -import time -from pyspark.sql.window import Window -from pyspark.sql.functions import row_number +# Note: End users should not modify this code. +# Instead, use dbdemos.install("cdc-pipeline", catalog="..", schema="...") +catalog = "dbdemos" +db = "cdc_pipeline" +volume_name = "cdc_pipeline" reset_all_data = dbutils.widgets.get("reset_all_data") == "true" -raw_data_location = cloud_storage_path+"/delta_cdf" -if reset_all_data or is_folder_empty(raw_data_location+"/user_csv"): +DBDemos.setup_schema(catalog, db, reset_all_data, volume_name) +volume_folder = f"/Volumes/{catalog}/{db}/{volume_name}" + +# COMMAND ---------- + +total_databases = spark.sql(f"show databases in {catalog} like '{db}'").count() +assert (total_databases == 1), f"There should be exactly one database [{db}] within catalog [{catalog}]" + +total_volumes = spark.sql(f"show volumes in `{catalog}`.`{db}`").count() +assert (total_volumes == 1), f"There should be exactly one volume [{volume_name}] within {catalog}.{db}" + +# COMMAND ---------- + +if reset_all_data or DBDemos.is_folder_empty(volume_folder+"/user_csv"): + # delete all data + print("Dropping table...", end='') + spark.sql(f"USE `{catalog}`.`{db}`") spark.sql("""DROP TABLE if exists clients_cdc""") spark.sql("""DROP TABLE if exists retail_client_silver""") + spark.sql("""DROP TABLE if exists retail_client_gold""") + print(" Done") + #data generation on another notebook to avoid installing libraries (takes a few seconds to setup pip env) - print(f"Generating data under {raw_data_location} , please wait a few sec...") + print(f"Generating data under {volume_folder}. This may take several minutes...") path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get() parent_count = path[path.rfind("Delta-Lake-CDC-CDF"):].count('/') - 1 prefix = "./" if parent_count == 0 else parent_count*"../" prefix = f'{prefix}_resources/' - dbutils.notebook.run(prefix+"01-load-data", 120, {"raw_data_location": raw_data_location}) + dbutils.notebook.run(path=prefix+"01-load-data", timeout_seconds=300, arguments={"raw_data_location": volume_folder}) else: print("data already existing. Run with reset_all_data=true to force a data cleanup for your local demo.") diff --git a/product_demos/cdc-pipeline/_resources/01-load-data.py b/product_demos/cdc-pipeline/_resources/01-load-data.py index 33f09783..cdd06b57 100644 --- a/product_demos/cdc-pipeline/_resources/01-load-data.py +++ b/product_demos/cdc-pipeline/_resources/01-load-data.py @@ -36,6 +36,9 @@ def create_dataset(df): df = df.withColumn("email", fake_email()) df = df.withColumn("address", fake_address()) return df + +print("Generating Data... ", end='') + #APPEND Faker.seed(0) df = spark.range(0, 10000) @@ -77,8 +80,11 @@ def cleanup_folder(path): df = df.withColumn("amount", (F.rand(10)*1000).cast('int')+10) df = df.withColumn("operation", F.lit('UPDATE')) df.repartition(1).write.mode("append").option("header", "true").format("csv").save(raw_data_location+"/cdc/transactions") +print("Done") +print("Cleaning up files... ", end='') cleanup_folder(raw_data_location+"/user_csv") cleanup_folder(raw_data_location+"/cdc/users") -cleanup_folder(raw_data_location+"/cdc/transactions") +cleanup_folder(raw_data_location+"/cdc/transactions") +print("Done") diff --git a/product_demos/cdc-pipeline/_resources/bundle_config.py b/product_demos/cdc-pipeline/_resources/bundle_config.py index ed25da29..415d91b6 100644 --- a/product_demos/cdc-pipeline/_resources/bundle_config.py +++ b/product_demos/cdc-pipeline/_resources/bundle_config.py @@ -53,5 +53,20 @@ "title": "Delta Lake Performance & operation", "description": "Programatically ingest multiple CDC flows to synch all your database." } - ] + ], + "cluster": { + "spark_version": "14.3.x-scala2.12", + "spark_conf": { + "spark.master": "local[*, 4]", + "spark.databricks.cluster.profile": "singleNode" + }, + "custom_tags": { + "ResourceClass": "SingleNode" + }, + "num_workers": 0, + "single_user_name": "{{CURRENT_USER}}", + "data_security_mode": "SINGLE_USER", + "node_type_id": "m5.large", + "driver_node_type_id": "m5.large", + } }