66
77sys .path .append ("../../common/lib" )
88
9- from databricks_ingestion_monitoring .common_ldp import Configuration , Constants , MonitoringEtlPipeline
9+ from databricks_ingestion_monitoring .common_ldp import (
10+ Configuration ,
11+ Constants ,
12+ MonitoringEtlPipeline ,
13+ )
1014from databricks_ingestion_monitoring .standard_tables import (
11- EVENTS_TABLE_METRICS ,
12- TABLE_STATUS ,
13- TABLE_STATUS_PER_PIPELINE_RUN
14- )
15+ EVENTS_TABLE_METRICS ,
16+ TABLE_STATUS ,
17+ TABLE_STATUS_PER_PIPELINE_RUN ,
18+ )
1519
1620# Configure logging
17- logging .basicConfig (level = logging . INFO ,
18- format = ' %(asctime)s - %(name)s - %(levelname)s - %(message)s'
21+ logging .basicConfig (
22+ level = logging . INFO , format = " %(asctime)s - %(name)s - %(levelname)s - %(message)s"
1923)
2024logger = logging .getLogger (__name__ )
2125logger .info ("Starting CDC Connector Monitoring ETL Pipeline" )
2428
2529conf = Configuration (spark .conf )
2630
31+
2732class CdcConstants :
28- CDC_FLOW_TYPE = ' cdc'
29- SNAPSHOT_FLOW_TYPE = ' snapshot'
30- CDC_STAGING_TABLE_FLOW_TYPE = ' cdc_staging'
31- TABLE_STATUS_PER_PIPELINE_RUN = ' table_status_per_pipeline_run'
32- CDC_STAGING_TABLE = ' cdc_staging_table'
33+ CDC_FLOW_TYPE = " cdc"
34+ SNAPSHOT_FLOW_TYPE = " snapshot"
35+ CDC_STAGING_TABLE_FLOW_TYPE = " cdc_staging"
36+ TABLE_STATUS_PER_PIPELINE_RUN = " table_status_per_pipeline_run"
37+ CDC_STAGING_TABLE = " cdc_staging_table"
3338
3439
3540class CdcConnectorMonitoringEtlPipeline (MonitoringEtlPipeline ):
36- def __init__ (self , conf : Configuration , spark : SparkSession ):
37- super ().__init__ (conf , spark )
38-
39- def _get_event_logs_bronze_sql (self , event_log_source : str ):
40- """
41- Override base definition for append flows from the event log sources into `event_logs_bronze` table. It adds
42- CDC Connector-specific fields
43- """
44- sql = super ()._get_event_logs_bronze_sql (event_log_source )
45- sql = sql .replace (Constants .sql_fields_def_extension_point ,
46- f""", (CASE WHEN endswith(flow_name, "_snapshot_flow") THEN 'snapshot'
41+ def __init__ (self , conf : Configuration , spark : SparkSession ):
42+ super ().__init__ (conf , spark )
43+
44+ def _get_event_logs_bronze_sql (self , event_log_source : str ):
45+ """
46+ Override base definition for append flows from the event log sources into `event_logs_bronze` table. It adds
47+ CDC Connector-specific fields
48+ """
49+ sql = super ()._get_event_logs_bronze_sql (event_log_source )
50+ sql = sql .replace (
51+ Constants .sql_fields_def_extension_point ,
52+ f""", (CASE WHEN endswith(flow_name, "_snapshot_flow") THEN 'snapshot'
4753 WHEN details:operation_progress.cdc_snapshot.table_name::string is not null THEN '{ CdcConstants .SNAPSHOT_FLOW_TYPE } '
4854 WHEN endswith(flow_name, "_cdc_flow") THEN '{ CdcConstants .CDC_FLOW_TYPE } '
4955 WHEN endswith(flow_name, ".{ CdcConstants .CDC_STAGING_TABLE } ") THEN '{ CdcConstants .CDC_STAGING_TABLE_FLOW_TYPE } '
5056 END) flow_type{ Constants .sql_fields_def_extension_point }
51- """ )
52- return sql
53-
54-
55- def _get_events_errors_sql (self ):
56- sql = super ()._get_events_errors_sql ()
57- sql = sql .replace (Constants .sql_fields_def_extension_point ,
58- f", flow_type{ Constants .sql_fields_def_extension_point } " )
59- return sql
60-
61-
62- def _get_events_warnings_sql (self ):
63- sql = super ()._get_events_warnings_sql ()
64- sql = sql .replace (Constants .sql_fields_def_extension_point ,
65- f", flow_type{ Constants .sql_fields_def_extension_point } " )
66- return sql
67-
68-
69- def _get_events_table_metrics_sql (self ):
70- sql = super ()._get_events_table_metrics_sql ()
71- return sql .replace (Constants .sql_fields_def_extension_point , f", flow_type{ Constants .sql_fields_def_extension_point } " )
72-
73-
74- def register_base_tables_and_views (self , spark : SparkSession ):
75- super ().register_base_tables_and_views (spark )
76-
77- def _get_table_run_processing_state_sql (self ):
78- sql = super ()._get_table_run_processing_state_sql ()
79- sql = sql .replace (Constants .where_clause_extension_point , f"AND (table_name not LIKE '%.{ CdcConstants .CDC_STAGING_TABLE } ') { Constants .where_clause_extension_point } " )
80- sql = sql .replace (Constants .sql_fields_def_extension_point , f", flow_type{ Constants .sql_fields_def_extension_point } " )
81- return sql
82-
83-
84- def register_table_status (self , spark : SparkSession ):
85- table_status_per_pipeline_run_cdf = f"{ TABLE_STATUS_PER_PIPELINE_RUN .name } _cdf"
86- @dlt .view (name = table_status_per_pipeline_run_cdf )
87- def table_run_processing_state_cdf ():
88- return (
89- spark .readStream
90- .option ("readChangeFeed" , "true" )
91- .table (TABLE_STATUS_PER_PIPELINE_RUN .name )
92- .filter ("_change_type IN ('insert', 'update_postimage')" )
93- )
94-
95- silver_table_name = f"{ TABLE_STATUS .name } _silver"
96- dlt .create_streaming_table (name = silver_table_name ,
97- comment = "Capture information about the latest state, ingested data and errors for target tables" ,
98- cluster_by = ['pipeline_id' , 'table_name' ],
99- table_properties = {
100- "delta.enableRowTracking" : "true"
101- })
102-
103- silver_latest_source_view_name = f"{ silver_table_name } _latest_source"
104- @dlt .view (name = silver_latest_source_view_name )
105- def table_latest_run_processing_state_source ():
106- return spark .sql (f"""
57+ """ ,
58+ )
59+ return sql
60+
61+ def _get_events_errors_sql (self ):
62+ sql = super ()._get_events_errors_sql ()
63+ sql = sql .replace (
64+ Constants .sql_fields_def_extension_point ,
65+ f", flow_type{ Constants .sql_fields_def_extension_point } " ,
66+ )
67+ return sql
68+
69+ def _get_events_warnings_sql (self ):
70+ sql = super ()._get_events_warnings_sql ()
71+ sql = sql .replace (
72+ Constants .sql_fields_def_extension_point ,
73+ f", flow_type{ Constants .sql_fields_def_extension_point } " ,
74+ )
75+ return sql
76+
77+ def _get_events_table_metrics_sql (self ):
78+ sql = super ()._get_events_table_metrics_sql ()
79+ return sql .replace (
80+ Constants .sql_fields_def_extension_point ,
81+ f", flow_type{ Constants .sql_fields_def_extension_point } " ,
82+ )
83+
84+ def register_base_tables_and_views (self , spark : SparkSession ):
85+ super ().register_base_tables_and_views (spark )
86+
87+ def _get_table_run_processing_state_sql (self ):
88+ sql = super ()._get_table_run_processing_state_sql ()
89+ sql = sql .replace (
90+ Constants .where_clause_extension_point ,
91+ f"AND (table_name not LIKE '%.{ CdcConstants .CDC_STAGING_TABLE } ') { Constants .where_clause_extension_point } " ,
92+ )
93+ sql = sql .replace (
94+ Constants .sql_fields_def_extension_point ,
95+ f", flow_type{ Constants .sql_fields_def_extension_point } " ,
96+ )
97+ return sql
98+
99+ def register_table_status (self , spark : SparkSession ):
100+ table_status_per_pipeline_run_cdf = f"{ TABLE_STATUS_PER_PIPELINE_RUN .name } _cdf"
101+
102+ @dlt .view (name = table_status_per_pipeline_run_cdf )
103+ def table_run_processing_state_cdf ():
104+ return (
105+ spark .readStream .option ("readChangeFeed" , "true" )
106+ .table (TABLE_STATUS_PER_PIPELINE_RUN .name )
107+ .filter ("_change_type IN ('insert', 'update_postimage')" )
108+ )
109+
110+ silver_table_name = f"{ TABLE_STATUS .name } _silver"
111+ dlt .create_streaming_table (
112+ name = silver_table_name ,
113+ comment = "Capture information about the latest state, ingested data and errors for target tables" ,
114+ cluster_by = ["pipeline_id" , "table_name" ],
115+ table_properties = {"delta.enableRowTracking" : "true" },
116+ )
117+
118+ silver_latest_source_view_name = f"{ silver_table_name } _latest_source"
119+
120+ @dlt .view (name = silver_latest_source_view_name )
121+ def table_latest_run_processing_state_source ():
122+ return spark .sql (f"""
107123 SELECT pipeline_id,
108124 table_name,
109125 pipeline_run_id AS latest_pipeline_run_id,
@@ -129,18 +145,22 @@ def table_latest_run_processing_state_source():
129145 WHERE table_name NOT LIKE '%.{ CdcConstants .CDC_STAGING_TABLE } '
130146 """ )
131147
132- dlt .create_auto_cdc_flow (
133- name = f"{ silver_table_name } _apply_latest" ,
134- source = silver_latest_source_view_name ,
135- target = silver_table_name ,
136- keys = ['pipeline_id' , 'table_name' ],
137- sequence_by = 'updated_at' ,
138- ignore_null_updates = True )
139-
140- silver_latest_cdc_changes_source_view_name = f"{ silver_table_name } _latest_cdc_changes_source"
141- @dlt .view (name = silver_latest_cdc_changes_source_view_name )
142- def table_latest_run_processing_state_source ():
143- return spark .sql (f"""
148+ dlt .create_auto_cdc_flow (
149+ name = f"{ silver_table_name } _apply_latest" ,
150+ source = silver_latest_source_view_name ,
151+ target = silver_table_name ,
152+ keys = ["pipeline_id" , "table_name" ],
153+ sequence_by = "updated_at" ,
154+ ignore_null_updates = True ,
155+ )
156+
157+ silver_latest_cdc_changes_source_view_name = (
158+ f"{ silver_table_name } _latest_cdc_changes_source"
159+ )
160+
161+ @dlt .view (name = silver_latest_cdc_changes_source_view_name )
162+ def table_latest_run_processing_state_source ():
163+ return spark .sql (f"""
144164 SELECT pipeline_id,
145165 table_name,
146166 null AS latest_pipeline_run_id,
@@ -168,18 +188,22 @@ def table_latest_run_processing_state_source():
168188 AND flow_type='cdc'
169189 """ )
170190
171- dlt .create_auto_cdc_flow (
172- name = f"{ silver_table_name } _apply_latest_cdc_changes" ,
173- source = silver_latest_cdc_changes_source_view_name ,
174- target = silver_table_name ,
175- keys = ['pipeline_id' , 'table_name' ],
176- sequence_by = 'updated_at' ,
177- ignore_null_updates = True )
178-
179- silver_latest_snapshot_changes_source_view_name = f"{ silver_table_name } _latest_snapshot_changes_source"
180- @dlt .view (name = silver_latest_snapshot_changes_source_view_name )
181- def table_latest_run_processing_state_source ():
182- return spark .sql (f"""
191+ dlt .create_auto_cdc_flow (
192+ name = f"{ silver_table_name } _apply_latest_cdc_changes" ,
193+ source = silver_latest_cdc_changes_source_view_name ,
194+ target = silver_table_name ,
195+ keys = ["pipeline_id" , "table_name" ],
196+ sequence_by = "updated_at" ,
197+ ignore_null_updates = True ,
198+ )
199+
200+ silver_latest_snapshot_changes_source_view_name = (
201+ f"{ silver_table_name } _latest_snapshot_changes_source"
202+ )
203+
204+ @dlt .view (name = silver_latest_snapshot_changes_source_view_name )
205+ def table_latest_run_processing_state_source ():
206+ return spark .sql (f"""
183207 SELECT pipeline_id,
184208 table_name,
185209 null AS latest_pipeline_run_id,
@@ -207,22 +231,23 @@ def table_latest_run_processing_state_source():
207231 AND flow_type='snapshot'
208232 """ )
209233
210- dlt .create_auto_cdc_flow (
211- name = f"{ silver_table_name } _apply_latest_snapshot_changes" ,
212- source = silver_latest_snapshot_changes_source_view_name ,
213- target = silver_table_name ,
214- keys = ['pipeline_id' , 'table_name' ],
215- sequence_by = 'updated_at' ,
216- ignore_null_updates = True )
217-
218- @dlt .table (name = TABLE_STATUS .name ,
219- comment = TABLE_STATUS .table_comment ,
220- cluster_by = ['pipeline_id' , 'table_name' ],
221- table_properties = {
222- "delta.enableRowTracking" : "true"
223- })
224- def table_status ():
225- return spark .sql (f"""
234+ dlt .create_auto_cdc_flow (
235+ name = f"{ silver_table_name } _apply_latest_snapshot_changes" ,
236+ source = silver_latest_snapshot_changes_source_view_name ,
237+ target = silver_table_name ,
238+ keys = ["pipeline_id" , "table_name" ],
239+ sequence_by = "updated_at" ,
240+ ignore_null_updates = True ,
241+ )
242+
243+ @dlt .table (
244+ name = TABLE_STATUS .name ,
245+ comment = TABLE_STATUS .table_comment ,
246+ cluster_by = ["pipeline_id" , "table_name" ],
247+ table_properties = {"delta.enableRowTracking" : "true" },
248+ )
249+ def table_status ():
250+ return spark .sql (f"""
226251 SELECT s.*,
227252 latest_pipeline_run_num_written_cdc_changes,
228253 latest_pipeline_run_num_written_snapshot_changes
@@ -240,7 +265,7 @@ def table_status():
240265 AND s.latest_pipeline_run_id = etm.pipeline_run_id
241266 AND s.table_name = etm.table_name
242267 """ )
243-
268+
244269
245270pipeline = CdcConnectorMonitoringEtlPipeline (conf , spark )
246- pipeline .register_base_tables_and_views (spark )
271+ pipeline .register_base_tables_and_views (spark )
0 commit comments