From de5e8979cea8c524afedbd7327688cd7e974c7ef Mon Sep 17 00:00:00 2001 From: ravi-databricks Date: Fri, 14 Nov 2025 11:48:12 -0800 Subject: [PATCH] Implement automatic liquid clustering for bronze and silver tables - Added `bronze_cluster_by_auto` and `silver_cluster_by_auto` parameters to support automatic clustering in onboarding templates. - Updated relevant documentation to include descriptions for new parameters. - Enhanced dataflow specifications to handle `clusterByAuto` property for both bronze and silver layers. - Modified integration tests to validate the new clustering functionality. - Adjusted existing tests to ensure compatibility with the new parameters and their expected behavior. --- demo/conf/onboarding.template | 28 +- .../getting_started/metadatapreperation.md | 2 + docs/content/releases/_index.md | 9 + .../conf/cloudfiles-onboarding.template | 18 +- .../conf/cloudfiles-onboarding_A2.template | 6 +- .../conf/eventhub-onboarding.template | 2 + .../conf/kafka-onboarding.template | 2 + .../conf/snapshot-onboarding.template | 4 + integration_tests/run_integration_tests.py | 3 +- src/dataflow_pipeline.py | 45 +- src/dataflow_spec.py | 4 + src/onboard_dataflowspec.py | 55 +- src/pipeline_writers.py | 7 +- tests/test_dataflow_pipeline.py | 1074 +++++++++++++---- tests/test_main.py | 8 - tests/test_onboard_dataflowspec.py | 178 +++ tests/test_pipeline_readers.py | 4 + tests/test_pipeline_writers.py | 49 +- 18 files changed, 1177 insertions(+), 321 deletions(-) diff --git a/demo/conf/onboarding.template b/demo/conf/onboarding.template index b7a4f5a1..e0ca2d5a 100644 --- a/demo/conf/onboarding.template +++ b/demo/conf/onboarding.template @@ -20,7 +20,7 @@ "cloudFiles.rescuedDataColumn": "_rescued_data", "header": "true" }, - "bronze_cluster_by":["dob"], + "bronze_cluster_by_auto": true, "bronze_data_quality_expectations_json_prod": "{uc_volume_path}/demo/conf/dqe/customers.json", "bronze_catalog_quarantine_prod": "{uc_catalog_name}", "bronze_database_quarantine_prod": "{bronze_schema}", @@ -28,6 +28,7 @@ "bronze_quarantine_table_comment": "customers quarantine table", "bronze_quarantine_table_path_prod": "{uc_volume_path}/data/bronze/customers_quarantine", "bronze_quarantine_table_cluster_by": ["dob"], + "bronze_quarantine_table_cluster_by_auto": "true", "silver_catalog_prod": "{uc_catalog_name}", "silver_database_prod": "{silver_schema}", "silver_table": "customers", @@ -46,7 +47,8 @@ "_rescued_data" ] }, - "silver_cluster_by":["customer_id"], + "silver_cluster_by":["dob"], + "silver_cluster_by_auto": true, "silver_transformation_json_prod": "{uc_volume_path}/demo/conf/silver_transformations.json", "silver_data_quality_expectations_json_prod": "{uc_volume_path}/demo/conf/dqe/customers_silver_dqe.json" @@ -73,17 +75,19 @@ "header": "true" }, "bronze_cluster_by":["transaction_date"], + "bronze_cluster_by_auto": true, "bronze_data_quality_expectations_json_prod": "{uc_volume_path}/demo/conf/dqe/transactions.json", "bronze_database_quarantine_prod": "{uc_catalog_name}.{bronze_schema}", "bronze_quarantine_table": "transactions_quarantine", "bronze_quarantine_table_comment": "transactions bronze quarantine table", "bronze_quarantine_table_path_prod": "{uc_volume_path}/demo/resources/data/bronze/transactions_quarantine", "bronze_quarantine_table_cluster_by": ["transaction_date"], + "bronze_quarantine_table_cluster_by_auto": "true", "silver_catalog_prod": "{uc_catalog_name}", "silver_database_prod": "{silver_schema}", "silver_table": "transactions", "silver_table_comment": "transactions silver table", - "silver_table_path_prod": "{uc_volume_path}/data/silver/transactions", + "silver_table_path_prod": "{uc_volume_path}/demo/resources/data/silver/transactions", "silver_cdc_apply_changes": { "keys": [ "transaction_id" @@ -97,8 +101,7 @@ "_rescued_data" ] }, - "silver_cluster_by":["transaction_date"], - "silver_table_path_prod": "{uc_volume_path}/demo/resources/data/silver/transactions", + "silver_cluster_by_auto": "true", "silver_transformation_json_prod": "{uc_volume_path}/demo/conf/silver_transformations.json", "silver_data_quality_expectations_json_prod": "{uc_volume_path}/demo/conf/dqe/transactions_silver_dqe.json" }, @@ -117,24 +120,25 @@ "bronze_database_prod": "{bronze_schema}", "bronze_table": "products", "bronze_table_comment": "products bronze table", - "bronze_table_path_prod": "{uc_volume_path}/data/bronze/products", + "bronze_table_path_prod": "{uc_volume_path}/demo/resources/data/bronze/products", "bronze_reader_options": { "cloudFiles.format": "csv", "cloudFiles.rescuedDataColumn": "_rescued_data", "header": "true" }, - "bronze_table_path_prod": "{uc_volume_path}/demo/resources/data/bronze/products", + "bronze_cluster_by_auto": true, "bronze_data_quality_expectations_json_prod": "{uc_volume_path}/demo/conf/dqe/products.json", "bronze_database_quarantine_prod": "{uc_catalog_name}.{bronze_schema}", "bronze_quarantine_table": "products_quarantine", "bronze_quarantine_table_comment": "products quarantine bronze table", "bronze_quarantine_table_path_prod": "{uc_volume_path}/demo/resources/data/bronze/products_quarantine", "bronze_quarantine_table_cluster_by": ["product_id"], + "bronze_quarantine_table_cluster_by_auto": "true", "silver_catalog_prod": "{uc_catalog_name}", "silver_database_prod": "{silver_schema}", "silver_table": "products", "silver_table_comment": "products silver table", - "silver_table_path_prod": "{uc_volume_path}/data/silver/products", + "silver_table_path_prod": "{uc_volume_path}/demo/resources/data/silver/products", "silver_cdc_apply_changes": { "keys": [ "product_id" @@ -148,7 +152,7 @@ "_rescued_data" ] }, - "silver_table_path_prod": "{uc_volume_path}/demo/resources/data/silver/products", + "silver_cluster_by_auto": "true", "silver_transformation_json_prod": "{uc_volume_path}/demo/conf/silver_transformations.json", "silver_data_quality_expectations_json_prod": "{uc_volume_path}/demo/conf/dqe/products_silver_dqe.json" }, @@ -167,13 +171,12 @@ "bronze_database_prod": "{bronze_schema}", "bronze_table": "stores", "bronze_table_comment": "stores bronze table", - "bronze_table_path_prod": "{uc_volume_path}/data/bronze/stores", + "bronze_table_path_prod": "{uc_volume_path}/demo/resources/data/bronze/stores", "bronze_reader_options": { "cloudFiles.format": "csv", "cloudFiles.rescuedDataColumn": "_rescued_data", "header": "true" }, - "bronze_table_path_prod": "{uc_volume_path}/demo/resources/data/bronze/stores", "bronze_data_quality_expectations_json_prod": "{uc_volume_path}/demo/conf/dqe/stores.json", "bronze_catalog_quarantine_prod": "{uc_catalog_name}", "bronze_database_quarantine_prod": "{bronze_schema}", @@ -185,7 +188,7 @@ "silver_database_prod": "{silver_schema}", "silver_table": "stores", "silver_table_comment": "stores silver table", - "silver_table_path_prod": "{uc_volume_path}/data/silver/stores", + "silver_table_path_prod": "{uc_volume_path}/demo/resources/data/silver/stores", "silver_cdc_apply_changes": { "keys": [ "store_id" @@ -199,7 +202,6 @@ "_rescued_data" ] }, - "silver_table_path_prod": "{uc_volume_path}/demo/resources/data/silver/stores", "silver_transformation_json_prod": "{uc_volume_path}/demo/conf/silver_transformations.json", "silver_data_quality_expectations_json_prod": "{uc_volume_path}/demo/conf/dqe/stores_silver_dqe.json" } diff --git a/docs/content/getting_started/metadatapreperation.md b/docs/content/getting_started/metadatapreperation.md index 6011c1a9..1c1a10b9 100644 --- a/docs/content/getting_started/metadatapreperation.md +++ b/docs/content/getting_started/metadatapreperation.md @@ -36,6 +36,7 @@ The `onboarding.json` file contains links to [silver_transformations.json](https | bronze_reader_options | Reader options which can be provided to spark reader
e.g multiline=true,header=true in json format | | bronze_parition_columns | Bronze table partition cols list | | bronze_cluster_by | Bronze tables cluster by cols list | +| bronze_cluster_by_auto | Enable automatic liquid clustering on the bronze table. Boolean value (true/false). Can be combined with bronze_cluster_by to define initial clustering keys. See [Automatic liquid clustering](https://docs.databricks.com/aws/en/delta/clustering#auto-liquid) | | bronze_cdc_apply_changes | Bronze cdc apply changes Json | | bronze_apply_changes_from_snapshot | Bronze apply changes from snapshot Json e.g. Mandatory fields: keys=["userId"], scd_type=`1` or `2` optional fields: track_history_column_list=`[col1]`, track_history_except_column_list=`[col2]` | | bronze_table_path_{env} | Bronze table storage path.| @@ -57,6 +58,7 @@ The `onboarding.json` file contains links to [silver_transformations.json](https | silver_table_comment | Silver table comments | | silver_partition_columns | Silver table partition columns list | | silver_cluster_by | Silver tables cluster by cols list | +| silver_cluster_by_auto | Enable automatic liquid clustering on the silver table. Boolean value (true/false). Can be combined with silver_cluster_by to define initial clustering keys. See [Automatic liquid clustering](https://docs.databricks.com/aws/en/delta/clustering#auto-liquid) | | silver_cdc_apply_changes | Silver cdc apply changes Json | | silver_table_path_{env} | Silver table storage path. | | silver_table_properties | Lakeflow Declarative Pipeline table properties map. e.g. `{"pipelines.autoOptimize.managed": "false" , "pipelines.autoOptimize.zOrderCols": "year,month", "pipelines.reset.allowed": "false"}` | diff --git a/docs/content/releases/_index.md b/docs/content/releases/_index.md index 02a118cb..eee103ed 100644 --- a/docs/content/releases/_index.md +++ b/docs/content/releases/_index.md @@ -4,6 +4,15 @@ date: 2021-08-04T14:50:11-04:00 weight: 80 draft: false --- +# v0.0.11 + +## Enhancements +- Added automatic liquid clustering support (`cluster_by_auto`) for bronze and silver tables [Issue #238](https://github.com/databrickslabs/dlt-meta/issues/238) + - Enables automatic liquid clustering on streaming tables via the `cluster_by_auto` parameter + - Can be combined with `cluster_by` to define initial clustering keys followed by automatic optimization + - Supported for both bronze and silver layer tables + - See [Automatic liquid clustering documentation](https://docs.databricks.com/aws/en/delta/clustering#auto-liquid) for more details + # v0.0.10 ## ⚠️ Breaking Changes diff --git a/integration_tests/conf/cloudfiles-onboarding.template b/integration_tests/conf/cloudfiles-onboarding.template index bb85bfa2..fbbe2f94 100644 --- a/integration_tests/conf/cloudfiles-onboarding.template +++ b/integration_tests/conf/cloudfiles-onboarding.template @@ -31,7 +31,6 @@ "bronze_table_properties": { "pipelines.autoOptimize.managed": "true" }, - "bronze_cluster_by": ["id", "email"], "bronze_data_quality_expectations_json_it": "{uc_volume_path}/integration_tests/conf/dqe/customers/bronze_data_quality_expectations.json", "bronze_catalog_quarantine_it": "{uc_catalog_name}", "bronze_database_quarantine_it": "{bronze_schema}", @@ -42,6 +41,8 @@ "pipelines.reset.allowed": "false" }, "bronze_quarantine_table_cluster_by":["id", "email"], + "bronze_quarantine_table_cluster_by_auto": true, + "bronze_cluster_by_auto": "true", "bronze_append_flows": [ { "name": "customer_bronze_flow", @@ -68,7 +69,6 @@ "silver_table_properties": { "pipelines.reset.allowed": "false" }, - "silver_table_cluster_by":["customer_id"], "silver_data_quality_expectations_json_it": "{uc_volume_path}/integration_tests/conf/dqe/customers/silver_data_quality_expectations.json", "silver_catalog_quarantine_it":"{uc_catalog_name}", "silver_database_quarantine_it":"{silver_schema}", @@ -76,7 +76,8 @@ "silver_quarantine_table_properties": { "pipelines.reset.allowed": "false" }, - "silver_cluster_by":["id", "email"], + "silver_quarantine_table_cluster_by_auto": true, + "silver_cluster_by_auto": "true", "silver_append_flows": [ { "name": "customers_silver_flow", @@ -120,7 +121,6 @@ "bronze_table_properties": { "pipelines.reset.allowed": "true" }, - "bronze_table_cluster_by":["id", "customer_id"], "bronze_data_quality_expectations_json_it": "{uc_volume_path}/integration_tests/conf/dqe/transactions/bronze_data_quality_expectations.json", "bronze_catalog_quarantine_it": "{uc_catalog_name}", "bronze_database_quarantine_it": "{bronze_schema}", @@ -129,8 +129,9 @@ "bronze_quarantine_table_properties": { "pipelines.reset.allowed": "true", "pipelines.autoOptimize.managed": "false" - }, - "bronze_quarantine_table_cluster_by": ["id", "customer_id"], + }, + "bronze_quarantine_table_cluster_by_auto": true, + "bronze_cluster_by_auto": "true", "bronze_append_flows": [ { "name": "transactions_bronze_flow", @@ -171,10 +172,11 @@ "silver_table_properties": { "pipelines.reset.allowed": "false" }, - "silver_cluster_by":["id", "customer_id"], + "silver_cluster_by_auto": "true", "silver_catalog_quarantine_it":"{uc_catalog_name}", "silver_database_quarantine_it":"{silver_schema}", "silver_quarantine_table":"transactions_quarantine", - "silver_quarantine_table_cluster_by":["id","customer_id"] + "silver_quarantine_table_cluster_by":["id","customer_id"], + "silver_quarantine_table_cluster_by_auto": true } ] \ No newline at end of file diff --git a/integration_tests/conf/cloudfiles-onboarding_A2.template b/integration_tests/conf/cloudfiles-onboarding_A2.template index b79e8c85..946d357b 100644 --- a/integration_tests/conf/cloudfiles-onboarding_A2.template +++ b/integration_tests/conf/cloudfiles-onboarding_A2.template @@ -28,7 +28,8 @@ "bronze_table_properties": { "pipelines.autoOptimize.managed": "true" }, - "bronze_cluster_by":["id", "email"], + "bronze_cluster_by":["id", "email"], + "bronze_cluster_by_auto": true, "bronze_data_quality_expectations_json_it": "{uc_volume_path}/integration_tests/conf/dqe/customers/bronze_data_quality_expectations.json", "bronze_catalog_quarantine_it": "{uc_catalog_name}", "bronze_database_quarantine_it": "{bronze_schema}", @@ -37,6 +38,7 @@ "bronze_quarantine_table_properties": { "pipelines.reset.allowed": "false" }, - "bronze_quarantine_table_cluster_by":["id", "email"] + "bronze_quarantine_table_cluster_by":["id", "email"], + "bronze_quarantine_table_cluster_by_auto": true } ] \ No newline at end of file diff --git a/integration_tests/conf/eventhub-onboarding.template b/integration_tests/conf/eventhub-onboarding.template index 39313150..86ab0d2e 100644 --- a/integration_tests/conf/eventhub-onboarding.template +++ b/integration_tests/conf/eventhub-onboarding.template @@ -26,11 +26,13 @@ "bronze_database_it": "{bronze_schema}", "bronze_table": "bronze_{run_id}_iot", "bronze_partition_columns": "date", + "bronze_cluster_by_auto": true, "bronze_data_quality_expectations_json_it": "{uc_volume_path}/integration_tests/conf/dqe/iot/bronze_data_quality_expectations.json", "bronze_catalog_quarantine_it": "{uc_catalog_name}", "bronze_database_quarantine_it": "{bronze_schema}", "bronze_quarantine_table": "bronze_{run_id}_iot_quarantine", "bronze_quarantine_table_path_it": "{uc_volume_path}/data/bronze/iot_quarantine", + "bronze_quarantine_table_cluster_by_auto": true, "bronze_sinks": [ { "name": "bronze_eventhub_sink", diff --git a/integration_tests/conf/kafka-onboarding.template b/integration_tests/conf/kafka-onboarding.template index 9bb3d5cc..95fe9e3d 100644 --- a/integration_tests/conf/kafka-onboarding.template +++ b/integration_tests/conf/kafka-onboarding.template @@ -21,11 +21,13 @@ "bronze_table": "bronze_{run_id}_iot", "bronze_partition_columns": "date", "bronze_table_path_it": "{uc_volume_path}/data/bronze/iot", + "bronze_cluster_by_auto": true, "bronze_data_quality_expectations_json_it": "{uc_volume_path}/integration_tests/conf/dqe/iot/bronze_data_quality_expectations.json", "bronze_catalog_quarantine_it": "{uc_catalog_name}", "bronze_database_quarantine_it": "{bronze_schema}", "bronze_quarantine_table": "bronze_{run_id}_iot_quarantine", "bronze_quarantine_table_path_it": "{uc_volume_path}/data/bronze/iot_quarantine", + "bronze_quarantine_table_cluster_by_auto": true, "bronze_sinks": [ { "name": "bronze_customer_kafka_sink1", diff --git a/integration_tests/conf/snapshot-onboarding.template b/integration_tests/conf/snapshot-onboarding.template index 0d3392d4..7029cbf0 100644 --- a/integration_tests/conf/snapshot-onboarding.template +++ b/integration_tests/conf/snapshot-onboarding.template @@ -13,6 +13,7 @@ }, "bronze_database_it": "{uc_catalog_name}.{bronze_schema}", "bronze_table": "products", + "bronze_cluster_by_auto": true, "bronze_apply_changes_from_snapshot": { "keys": [ "product_id" @@ -23,6 +24,7 @@ "silver_database_it": "{silver_schema}", "silver_table": "products", "silver_table_comment": "products silver table", + "silver_cluster_by_auto": true, "silver_apply_changes_from_snapshot":{ "keys": [ "product_id" @@ -45,6 +47,7 @@ }, "bronze_database_it": "{uc_catalog_name}.{bronze_schema}", "bronze_table": "stores", + "bronze_cluster_by_auto": true, "bronze_apply_changes_from_snapshot": { "keys": [ "store_id" @@ -54,6 +57,7 @@ "silver_catalog_it": "{uc_catalog_name}", "silver_database_it": "{silver_schema}", "silver_table": "stores", + "silver_cluster_by_auto": true, "silver_apply_changes_from_snapshot":{ "keys": [ "store_id" diff --git a/integration_tests/run_integration_tests.py b/integration_tests/run_integration_tests.py index 5d726a89..86dc94f3 100644 --- a/integration_tests/run_integration_tests.py +++ b/integration_tests/run_integration_tests.py @@ -9,6 +9,7 @@ import uuid import webbrowser from dataclasses import dataclass +from datetime import timedelta # Add project root to Python path project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) @@ -799,7 +800,7 @@ def launch_workflow(self, runner_conf: DLTMetaRunnerConf): f"{self.ws.config.host}/jobs/{created_job.job_id}?o={self.ws.get_workspace_id()}" ) print(f"Waiting for job to complete. job_id={created_job.job_id}") - run_by_id = self.ws.jobs.run_now(job_id=created_job.job_id).result() + run_by_id = self.ws.jobs.run_now(job_id=created_job.job_id).result(timeout=timedelta(minutes=20)) print(f"Job run finished. run_id={run_by_id}") return created_job diff --git a/src/dataflow_pipeline.py b/src/dataflow_pipeline.py index 802368d0..41396320 100644 --- a/src/dataflow_pipeline.py +++ b/src/dataflow_pipeline.py @@ -237,11 +237,21 @@ def _write_standard_table(self, is_bronze=True): """Write standard DLT table for bronze or silver layer.""" target_path, target_table, target_table_name = self._get_target_table_info() comment = self._get_table_comment(target_table, is_bronze) + + # Get cluster_by_auto from dataflowSpec, default to False if not present + cluster_by_auto = ( + self.dataflowSpec.clusterByAuto + if hasattr(self.dataflowSpec, 'clusterByAuto') + and self.dataflowSpec.clusterByAuto is not None + else False + ) + dlt.table( self.write_to_delta, name=f"{target_table}", partition_cols=DataflowSpecUtils.get_partition_cols(self.dataflowSpec.partitionColumns), cluster_by=DataflowSpecUtils.get_partition_cols(self.dataflowSpec.clusterBy), + cluster_by_auto=cluster_by_auto, table_properties=self.dataflowSpec.tableProperties, path=target_path, comment=comment, @@ -465,6 +475,15 @@ def write_layer_with_dqe(self): else: target_path, target_table, target_table_name = self._get_target_table_info() target_comment = self._get_table_comment(target_table, is_bronze) + + # Get cluster_by_auto from dataflowSpec, default to False if not present + cluster_by_auto = ( + self.dataflowSpec.clusterByAuto + if hasattr(self.dataflowSpec, 'clusterByAuto') + and self.dataflowSpec.clusterByAuto is not None + else False + ) + # Create base table with expectations if expect_all_dict: dlt_table_with_expectation = dlt.expect_all(expect_all_dict)( @@ -474,6 +493,7 @@ def write_layer_with_dqe(self): table_properties=self.dataflowSpec.tableProperties, partition_cols=DataflowSpecUtils.get_partition_cols(self.dataflowSpec.partitionColumns), cluster_by=DataflowSpecUtils.get_partition_cols(self.dataflowSpec.clusterBy), + cluster_by_auto=cluster_by_auto, path=target_path, comment=target_comment, ) @@ -487,6 +507,7 @@ def write_layer_with_dqe(self): table_properties=self.dataflowSpec.tableProperties, partition_cols=DataflowSpecUtils.get_partition_cols(self.dataflowSpec.partitionColumns), cluster_by=DataflowSpecUtils.get_partition_cols(self.dataflowSpec.clusterBy), + cluster_by_auto=cluster_by_auto, path=target_path, comment=target_comment, ) @@ -503,6 +524,7 @@ def write_layer_with_dqe(self): table_properties=self.dataflowSpec.tableProperties, partition_cols=DataflowSpecUtils.get_partition_cols(self.dataflowSpec.partitionColumns), cluster_by=DataflowSpecUtils.get_partition_cols(self.dataflowSpec.clusterBy), + cluster_by_auto=cluster_by_auto, path=target_path, comment=target_comment, ) @@ -555,6 +577,14 @@ def write_layer_with_dqe(self): else f"{layer_name} dlt quarantine table {quarantine_table}" ) + # Get cluster_by_auto from quarantine configuration, default to False + # Handle both string and boolean values since quarantineTargetDetails uses StringType + q_cluster_by_auto_value = quarantine_target_details.get("cluster_by_auto", False) + if isinstance(q_cluster_by_auto_value, str): + q_cluster_by_auto = q_cluster_by_auto_value.lower().strip() == 'true' + else: + q_cluster_by_auto = bool(q_cluster_by_auto_value) if q_cluster_by_auto_value else False + dlt.expect_all_or_drop(expect_or_quarantine_dict)( dlt.table( self.write_to_delta, @@ -562,6 +592,7 @@ def write_layer_with_dqe(self): table_properties=self.dataflowSpec.quarantineTableProperties, partition_cols=q_partition_cols, cluster_by=q_cluster_by, + cluster_by_auto=q_cluster_by_auto, path=quarantine_path, comment=quarantine_comment, ) @@ -590,13 +621,15 @@ def write_append_flows(self): else self.silver_schema ) target_details = self._get_target_details() + append_flow_writer = AppendFlowWriter( self.spark, append_flow, target_details['table'], struct_schema, self.dataflowSpec.tableProperties, self.dataflowSpec.partitionColumns, - self.dataflowSpec.clusterBy + self.dataflowSpec.clusterBy, + self.dataflowSpec.clusterByAuto ) append_flow_writer.write_flow() @@ -711,11 +744,21 @@ def create_streaming_table(self, struct_schema, target_path=None): target_table = ( f"{target_cl_name}{target_db_name}.{target_table_name}" ) + + # Get cluster_by_auto from dataflowSpec, default to False if not present + cluster_by_auto = ( + self.dataflowSpec.clusterByAuto + if hasattr(self.dataflowSpec, 'clusterByAuto') + and self.dataflowSpec.clusterByAuto is not None + else False + ) + dlt.create_streaming_table( name=target_table, table_properties=self.dataflowSpec.tableProperties, partition_cols=DataflowSpecUtils.get_partition_cols(self.dataflowSpec.partitionColumns), cluster_by=DataflowSpecUtils.get_partition_cols(self.dataflowSpec.clusterBy), + cluster_by_auto=cluster_by_auto, path=target_path, schema=struct_schema, expect_all=expect_all_dict, diff --git a/src/dataflow_spec.py b/src/dataflow_spec.py index 7ece55eb..be7db850 100644 --- a/src/dataflow_spec.py +++ b/src/dataflow_spec.py @@ -41,6 +41,7 @@ class BronzeDataflowSpec: updateDate: datetime updatedBy: str clusterBy: list + clusterByAuto: bool sinks: str @@ -72,6 +73,7 @@ class SilverDataflowSpec: updateDate: datetime updatedBy: str clusterBy: list + clusterByAuto: bool sinks: str @@ -186,6 +188,7 @@ class DataflowSpecUtils: "appendFlowsSchemas", "applyChangesFromSnapshot", "clusterBy", + "clusterByAuto", "sinks" ] additional_silver_df_columns = [ @@ -196,6 +199,7 @@ class DataflowSpecUtils: "appendFlowsSchemas", "applyChangesFromSnapshot", "clusterBy", + "clusterByAuto", "sinks" ] additional_cdc_apply_changes_columns = ["flow_name", "once"] diff --git a/src/onboard_dataflowspec.py b/src/onboard_dataflowspec.py index 7dabce31..7f25385f 100644 --- a/src/onboard_dataflowspec.py +++ b/src/onboard_dataflowspec.py @@ -499,7 +499,8 @@ def __get_bronze_dataflow_spec_dataframe(self, onboarding_df, env): "appendFlows", "appendFlowsSchemas", "sinks", - "clusterBy" + "clusterBy", + "clusterByAuto" ] data_flow_spec_schema = StructType( [ @@ -540,6 +541,7 @@ def __get_bronze_dataflow_spec_dataframe(self, onboarding_df, env): StructField("appendFlowsSchemas", MapType(StringType(), StringType(), True), True), StructField("sinks", StringType(), True), StructField("clusterBy", ArrayType(StringType(), True), True), + StructField("clusterByAuto", T.BooleanType(), True), ] ) data = [] @@ -624,6 +626,7 @@ def __get_bronze_dataflow_spec_dataframe(self, onboarding_df, env): dlt_sinks = self.get_sink_details(onboarding_row, "bronze") cluster_by = self.__get_cluster_by_properties(onboarding_row, bronze_table_properties, "bronze_cluster_by") + cluster_by_auto = self.__get_cluster_by_auto(onboarding_row, "bronze_cluster_by_auto") cdc_apply_changes = None if ( @@ -681,7 +684,8 @@ def __get_bronze_dataflow_spec_dataframe(self, onboarding_df, env): append_flows, append_flows_schemas, dlt_sinks, - cluster_by + cluster_by, + cluster_by_auto ) data.append(bronze_row) # logger.info(bronze_parition_columns) @@ -757,6 +761,41 @@ def __get_cluster_by_properties(self, onboarding_row, table_properties, cluster_ ) return cluster_by + def __get_cluster_by_auto(self, onboarding_row, cluster_by_auto_key): + """Get cluster_by_auto property from onboarding row.""" + # If key doesn't exist, return False + if cluster_by_auto_key not in onboarding_row: + return False + + value = onboarding_row[cluster_by_auto_key] + + # If explicitly set to None, return None + if value is None: + return None + + # Handle boolean values + if isinstance(value, bool): + return value + + # Handle string values + if isinstance(value, str): + value_lower = value.lower().strip() + if value_lower == 'true': + return True + elif value_lower == 'false': + return False + else: + raise Exception( + f"Invalid {cluster_by_auto_key}: Expected boolean or string representation of boolean " + f"but got '{value}'" + ) + + # Invalid type + raise Exception( + f"Invalid {cluster_by_auto_key}: Expected boolean or string representation of boolean " + f"but got {type(value).__name__}: '{value}'" + ) + def __get_quarantine_details(self, env, layer, onboarding_row): quarantine_table_partition_columns = "" quarantine_target_details = {} @@ -781,6 +820,9 @@ def __get_quarantine_details(self, env, layer, onboarding_row): quarantine_table_cluster_by = self.__get_cluster_by_properties(onboarding_row, quarantine_table_properties, f"{layer}_quarantine_table_cluster_by") + quarantine_table_cluster_by_auto = self.__get_cluster_by_auto( + onboarding_row, f"{layer}_quarantine_table_cluster_by_auto" + ) if ( f"{layer}_database_quarantine_{env}" in onboarding_row and onboarding_row[f"{layer}_database_quarantine_{env}"] @@ -788,7 +830,8 @@ def __get_quarantine_details(self, env, layer, onboarding_row): quarantine_target_details = {"database": onboarding_row[f"{layer}_database_quarantine_{env}"], "table": onboarding_row[f"{layer}_quarantine_table"], "partition_columns": quarantine_table_partition_columns, - "cluster_by": quarantine_table_cluster_by + "cluster_by": quarantine_table_cluster_by, + "cluster_by_auto": quarantine_table_cluster_by_auto } quarantine_catalog = ( onboarding_row[f"{layer}_catalog_quarantine_{env}"] @@ -931,7 +974,7 @@ def __validate_apply_changes_from_snapshot(self, onboarding_row, layer): logger.info(f"mandatory missing keys= {missing_mandatory_attr}") raise Exception( f"""mandatory missing atrributes for {layer}_apply_changes_from_snapshot = { - missing_mandatory_attr} + missing_mandatory_attr} for onboarding row = {onboarding_row}""" ) else: @@ -1096,6 +1139,7 @@ def __get_silver_dataflow_spec_dataframe(self, onboarding_df, env): "appendFlows", "appendFlowsSchemas", "clusterBy", + "clusterByAuto", "sinks" ] data_flow_spec_schema = StructType( @@ -1128,6 +1172,7 @@ def __get_silver_dataflow_spec_dataframe(self, onboarding_df, env): StructField("appendFlows", StringType(), True), StructField("appendFlowsSchemas", MapType(StringType(), StringType(), True), True), StructField("clusterBy", ArrayType(StringType(), True), True), + StructField("clusterByAuto", T.BooleanType(), True), StructField("sinks", StringType(), True) ] ) @@ -1219,6 +1264,7 @@ def __get_silver_dataflow_spec_dataframe(self, onboarding_df, env): dlt_sinks = self.get_sink_details(onboarding_row, "silver") silver_cluster_by = self.__get_cluster_by_properties(onboarding_row, silver_table_properties, "silver_cluster_by") + silver_cluster_by_auto = self.__get_cluster_by_auto(onboarding_row, "silver_cluster_by_auto") silver_cdc_apply_changes = None if ( @@ -1284,6 +1330,7 @@ def __get_silver_dataflow_spec_dataframe(self, onboarding_df, env): append_flows, append_flow_schemas, silver_cluster_by, + silver_cluster_by_auto, dlt_sinks ) data.append(silver_row) diff --git a/src/pipeline_writers.py b/src/pipeline_writers.py index cb70cb21..ac07ca1e 100644 --- a/src/pipeline_writers.py +++ b/src/pipeline_writers.py @@ -15,7 +15,7 @@ class AppendFlowWriter: """Append Flow Writer class.""" def __init__(self, spark, append_flow, target, struct_schema, table_properties=None, - partition_cols=None, cluster_by=None): + partition_cols=None, cluster_by=None, cluster_by_auto=False): """Init.""" self.spark = spark self.target = target @@ -24,6 +24,7 @@ def __init__(self, spark, append_flow, target, struct_schema, table_properties=N self.table_properties = table_properties self.partition_cols = partition_cols self.cluster_by = cluster_by + self.cluster_by_auto = cluster_by_auto def read_af_view(self): """Write to Delta.""" @@ -32,11 +33,15 @@ def read_af_view(self): def write_flow(self): """Write Append Flow.""" if self.append_flow.create_streaming_table: + # Default cluster_by_auto to False if None + cluster_by_auto = self.cluster_by_auto if self.cluster_by_auto is not None else False + dlt.create_streaming_table( name=self.target, table_properties=self.table_properties, partition_cols=DataflowSpecUtils.get_partition_cols(self.partition_cols), cluster_by=DataflowSpecUtils.get_partition_cols(self.cluster_by), + cluster_by_auto=cluster_by_auto, schema=self.struct_schema, expect_all=None, expect_all_or_drop=None, diff --git a/tests/test_dataflow_pipeline.py b/tests/test_dataflow_pipeline.py index e4cc1e6f..0e57667e 100644 --- a/tests/test_dataflow_pipeline.py +++ b/tests/test_dataflow_pipeline.py @@ -73,6 +73,7 @@ class DataflowPipelineTests(DLTFrameworkTestCase): "updateDate": datetime.now, "updatedBy": "dlt-meta-unittest", "clusterBy": [""], + "clusterByAuto": False, "sinks": [] } @@ -103,13 +104,14 @@ class DataflowPipelineTests(DLTFrameworkTestCase): "quarantineTableProperties": {}, "appendFlows": [], "appendFlowsSchemas": {}, - "sinks": {}, + "sinks": [], "version": "v1", "createDate": datetime.now, "createdBy": "dlt-meta-unittest", "updateDate": datetime.now, "updatedBy": "dlt-meta-unittest", "clusterBy": [""], + "clusterByAuto": False, } silver_cdc_apply_changes = { "keys": ["id"], @@ -170,6 +172,7 @@ class DataflowPipelineTests(DLTFrameworkTestCase): "updateDate": datetime.now, "updatedBy": "dlt-meta-unittest", "clusterBy": [""], + "clusterByAuto": False, } silver_acfs_dataflow_spec_map = { "dataFlowId": "1", @@ -216,6 +219,7 @@ class DataflowPipelineTests(DLTFrameworkTestCase): "updateDate": datetime.now, "updatedBy": "dlt-meta-unittest", "clusterBy": [""], + "clusterByAuto": False, } # @classmethod # def setUp(self): @@ -555,9 +559,9 @@ def test_cdc_apply_changes_scd_type2(self, cdc_apply_changes): with self.assertRaises(Exception): dlt_data_flow.cdc_apply_changes() - @patch('dlt.view', new_callable=MagicMock) - def test_dlt_view_bronze_call(self, mock_view): - mock_view.view.return_value = None + @patch('src.dataflow_pipeline.dlt') + def test_dlt_view_bronze_call(self, mock_dlt): + mock_dlt.view = MagicMock(return_value=None) bronze_dataflow_spec = BronzeDataflowSpec( **DataflowPipelineTests.bronze_dataflow_spec_map ) @@ -566,15 +570,15 @@ def test_dlt_view_bronze_call(self, mock_view): pipeline.read_bronze = MagicMock() pipeline.view_name = view_name pipeline.read() - assert mock_view.called_once_with( + mock_dlt.view.assert_called_once_with( pipeline.read_bronze, name=pipeline.view_name, comment=f"input dataset view for {pipeline.view_name}" ) - @patch('dlt.view', new_callable=MagicMock) - def test_dlt_view_silver_call(self, mock_view): - mock_view.view.return_value = None + @patch('src.dataflow_pipeline.dlt') + def test_dlt_view_silver_call(self, mock_dlt): + mock_dlt.view = MagicMock(return_value=None) silver_dataflow_spec = SilverDataflowSpec( **DataflowPipelineTests.silver_dataflow_spec_map ) @@ -583,82 +587,106 @@ def test_dlt_view_silver_call(self, mock_view): pipeline.read_bronze = MagicMock() pipeline.view_name = view_name pipeline.read() - assert mock_view.called_once_with( + mock_dlt.view.assert_called_once_with( pipeline.read_silver, name=pipeline.view_name, comment=f"input dataset view for {pipeline.view_name}" ) - @patch('dlt.table', new_callable=MagicMock) - def test_dlt_write_bronze(self, mock_dlt_table): - mock_dlt_table.table.return_value = None + @patch('src.dataflow_pipeline.dlt') + def test_dlt_write_bronze(self, mock_dlt): + mock_dlt_table = MagicMock(return_value=lambda func: func) + mock_dlt.table = mock_dlt_table bronze_dataflow_spec = BronzeDataflowSpec( **DataflowPipelineTests.bronze_dataflow_spec_map ) - self.spark.conf.set("spark.databricks.unityCatalog.enabled", "True") bronze_dataflow_spec.cdcApplyChanges = None bronze_dataflow_spec.dataQualityExpectations = None view_name = f"{bronze_dataflow_spec.targetDetails['table']}_inputview" - pipeline = DataflowPipeline(self.spark, bronze_dataflow_spec, view_name, None) - pipeline.read_bronze = MagicMock() - pipeline.view_name = view_name - target_path = None - pipeline.write_bronze() - assert mock_dlt_table.called_once_with( - pipeline.write_to_delta, - name=f"{bronze_dataflow_spec.targetDetails['table']}", - partition_cols=DataflowSpecUtils.get_partition_cols(bronze_dataflow_spec.partitionColumns), - table_properties=bronze_dataflow_spec.tableProperties, - path=target_path, - comment=f"bronze dlt table{bronze_dataflow_spec.targetDetails['table']}" - ) + + # Unity Catalog enabled + self.spark.conf.set("spark.databricks.unityCatalog.enabled", "True") + pipeline_uc = DataflowPipeline(self.spark, bronze_dataflow_spec, view_name, None) + pipeline_uc.read_bronze = MagicMock() + pipeline_uc.view_name = view_name + pipeline_uc.write_bronze() + mock_dlt_table.assert_called_once() + args, kwargs = mock_dlt_table.call_args + target_path_uc, target_table_uc, _ = pipeline_uc._get_target_table_info() + expected_comment_uc = pipeline_uc._get_table_comment(target_table_uc, is_bronze=True) + self.assertEqual(args[0], pipeline_uc.write_to_delta) + self.assertEqual(kwargs["name"], target_table_uc) + self.assertIsNone(target_path_uc) + self.assertIsNone(kwargs["path"]) + self.assertEqual(kwargs["comment"], expected_comment_uc) + mock_dlt_table.reset_mock() + + # Unity Catalog disabled self.spark.conf.set("spark.databricks.unityCatalog.enabled", "False") - target_path = bronze_dataflow_spec.targetDetails["path"] - pipeline.write_bronze() - assert mock_dlt_table.called_once_with( - pipeline.write_to_delta, - name=f"{bronze_dataflow_spec.targetDetails['table']}", - partition_cols=DataflowSpecUtils.get_partition_cols(bronze_dataflow_spec.partitionColumns), - table_properties=bronze_dataflow_spec.tableProperties, - path=target_path, - comment=f"bronze dlt table{bronze_dataflow_spec.targetDetails['table']}" + bronze_dataflow_spec_no_uc = BronzeDataflowSpec( + **DataflowPipelineTests.bronze_dataflow_spec_map ) - - @patch('dlt.table', new_callable=MagicMock) - def test_dlt_write_silver(self, mock_dlt_table): + bronze_dataflow_spec_no_uc.cdcApplyChanges = None + bronze_dataflow_spec_no_uc.dataQualityExpectations = None + pipeline_no_uc = DataflowPipeline(self.spark, bronze_dataflow_spec_no_uc, view_name, None) + pipeline_no_uc.read_bronze = MagicMock() + pipeline_no_uc.view_name = view_name + target_path_no_uc, target_table_no_uc, _ = pipeline_no_uc._get_target_table_info() + expected_comment_no_uc = pipeline_no_uc._get_table_comment(target_table_no_uc, is_bronze=True) + pipeline_no_uc.write_bronze() + mock_dlt_table.assert_called_once() + args, kwargs = mock_dlt_table.call_args + self.assertEqual(args[0], pipeline_no_uc.write_to_delta) + self.assertEqual(kwargs["name"], target_table_no_uc) + self.assertEqual(kwargs["path"], target_path_no_uc) + self.assertEqual(kwargs["comment"], expected_comment_no_uc) + + @patch('src.dataflow_pipeline.dlt') + def test_dlt_write_silver(self, mock_dlt): + mock_dlt_table = MagicMock(return_value=lambda func: func) + mock_dlt.table = mock_dlt_table DataflowPipeline.get_silver_schema = MagicMock - mock_dlt_table.table.return_value = None - # Arrange silver_dataflow_spec = SilverDataflowSpec( **DataflowPipelineTests.silver_dataflow_spec_map ) - self.spark.conf.set("spark.databricks.unityCatalog.enabled", "True") - view_name = f"{silver_dataflow_spec.targetDetails['table']}_inputview" - pipeline = DataflowPipeline(self.spark, silver_dataflow_spec, view_name, None) - pipeline.read_bronze = MagicMock() - pipeline.view_name = view_name - target_path = None silver_dataflow_spec.cdcApplyChanges = None - pipeline.write_silver() - assert mock_dlt_table.called_once_with( - pipeline.write_to_delta, - name=f"{silver_dataflow_spec.targetDetails['table']}", - partition_cols=DataflowSpecUtils.get_partition_cols(silver_dataflow_spec.partitionColumns), - table_properties=silver_dataflow_spec.tableProperties, - path=target_path, - comment=f"silver dlt table{silver_dataflow_spec.targetDetails['table']}" - ) + view_name = f"{silver_dataflow_spec.targetDetails['table']}_inputview" + + # Unity Catalog enabled + self.spark.conf.set("spark.databricks.unityCatalog.enabled", "True") + pipeline_uc = DataflowPipeline(self.spark, silver_dataflow_spec, view_name, None) + pipeline_uc.read_bronze = MagicMock() + pipeline_uc.view_name = view_name + pipeline_uc.write_silver() + mock_dlt_table.assert_called_once() + args, kwargs = mock_dlt_table.call_args + target_path_uc, target_table_uc, target_table_name_uc = pipeline_uc._get_target_table_info() + expected_comment_uc = pipeline_uc._get_table_comment(target_table_uc, is_bronze=False) + self.assertEqual(args[0], pipeline_uc.write_to_delta) + self.assertEqual(kwargs["name"], target_table_name_uc) + self.assertIsNone(target_path_uc) + self.assertIsNone(kwargs["path"]) + self.assertEqual(kwargs["comment"], expected_comment_uc) + mock_dlt_table.reset_mock() + + # Unity Catalog disabled self.spark.conf.set("spark.databricks.unityCatalog.enabled", "False") - target_path = silver_dataflow_spec.targetDetails["path"] - pipeline.write_silver() - assert mock_dlt_table.called_once_with( - pipeline.write_to_delta, - name=f"{silver_dataflow_spec.targetDetails['table']}", - partition_cols=DataflowSpecUtils.get_partition_cols(silver_dataflow_spec.partitionColumns), - table_properties=silver_dataflow_spec.tableProperties, - path=target_path, - comment=f"silver dlt table{silver_dataflow_spec.targetDetails['table']}" + silver_dataflow_spec_no_uc = SilverDataflowSpec( + **DataflowPipelineTests.silver_dataflow_spec_map ) + silver_dataflow_spec_no_uc.cdcApplyChanges = None + pipeline_no_uc = DataflowPipeline(self.spark, silver_dataflow_spec_no_uc, view_name, None) + pipeline_no_uc.read_bronze = MagicMock() + pipeline_no_uc.view_name = view_name + target_path_no_uc, target_table_no_uc, target_table_name_no_uc = pipeline_no_uc._get_target_table_info() + expected_comment_no_uc = pipeline_no_uc._get_table_comment(target_table_no_uc, is_bronze=False) + pipeline_no_uc.write_silver() + mock_dlt_table.assert_called_once() + args, kwargs = mock_dlt_table.call_args + self.assertEqual(args[0], pipeline_no_uc.write_to_delta) + self.assertEqual(kwargs["name"], target_table_name_no_uc) + self.assertEqual(kwargs["path"], target_path_no_uc) + self.assertEqual(kwargs["comment"], expected_comment_no_uc) @patch.object(DataflowPipeline, 'write_silver', new_callable=MagicMock) def test_dataflowpipeline_silver_write(self, mock_dfp): @@ -746,19 +774,16 @@ def test_dataflow_pipeline_read_bronze_kafka(self, mock_read_kafka): def read_dataflowspec(self, database, table): return self.spark.read.table(f"{database}.{table}") - @patch('dlt.table', new_callable=MagicMock) - @patch('dlt.expect_all', new_callable=MagicMock) - @patch('dlt.expect_all_or_fail', new_callable=MagicMock) - @patch('dlt.expect_all_or_drop', new_callable=MagicMock) - def test_dataflowpipeline_bronze_dqe(self, - mock_dlt_table, - mock_dlt_expect_all, - mock_dlt_expect_all_or_fail, - mock_dlt_expect_all_or_drop): - mock_dlt_table.return_value = lambda func: func - mock_dlt_expect_all.return_value = lambda func: func - mock_dlt_expect_all_or_fail.return_value = lambda func: func - mock_dlt_expect_all_or_drop.return_value = lambda func: func + @patch('src.dataflow_pipeline.dlt') + def test_dataflowpipeline_bronze_dqe(self, mock_dlt): + mock_dlt_table = MagicMock(return_value=lambda func: func) + mock_expect_all = MagicMock(return_value=lambda func: func) + mock_expect_all_or_fail = MagicMock(return_value=lambda func: func) + mock_expect_all_or_drop = MagicMock(return_value=lambda func: func) + mock_dlt.table = mock_dlt_table + mock_dlt.expect_all = mock_expect_all + mock_dlt.expect_all_or_fail = mock_expect_all_or_fail + mock_dlt.expect_all_or_drop = mock_expect_all_or_drop onboarding_params_map = copy.deepcopy(self.onboarding_bronze_silver_params_map) onboarding_params_map['onboarding_file_path'] = self.onboarding_type2_json_file del onboarding_params_map["silver_dataflowspec_table"] @@ -784,36 +809,41 @@ def test_dataflowpipeline_bronze_dqe(self, expect_or_drop_dict = data_quality_expectations_json["expect_or_drop"] if "expect_or_quarantine" in data_quality_expectations_json: expect_or_quarantine_dict = data_quality_expectations_json["expect_or_quarantine"] - target_path = bronze_dataflow_spec.targetDetails["path"] - ddlSchemaStr = self.spark.read.text(paths="tests/resources/schema/products.ddl", - wholetext=True).collect()[0]["value"] - struct_schema = T._parse_datatype_string(ddlSchemaStr) pipeline.write_bronze() - assert mock_dlt_table.called_once_with( - name=f"{bronze_dataflowSpec_df.targetDetails['table']}", - table_properties=bronze_dataflowSpec_df.tableProperties, - partition_cols=DataflowSpecUtils.get_partition_cols(bronze_dataflow_spec.partitionColumns), - path=target_path, - schema=struct_schema, - comment=f"bronze dlt table{bronze_dataflow_spec.targetDetails['table']}", + self.assertGreaterEqual(mock_dlt_table.call_count, 1) + _, kwargs = mock_dlt_table.call_args_list[0] + target_path_actual, target_table, target_table_name = pipeline._get_target_table_info() + expected_comment = pipeline._get_table_comment(target_table, is_bronze=True) + self.assertEqual(kwargs["name"], target_table_name) + expected_table_properties = ( + dict(bronze_dataflow_spec.tableProperties) + if bronze_dataflow_spec.tableProperties + else {} ) - assert mock_dlt_expect_all_or_drop.called_once_with(expect_or_drop_dict) - assert mock_dlt_expect_all_or_fail.called_once_with(expect_or_fail_dict) - assert mock_dlt_expect_all.called_once_with(expect_dict) - assert mock_dlt_expect_all_or_drop.expect_all_or_drop(expect_or_quarantine_dict) + self.assertEqual(kwargs["table_properties"], expected_table_properties) + self.assertEqual( + kwargs["partition_cols"], + DataflowSpecUtils.get_partition_cols(bronze_dataflow_spec.partitionColumns) + ) + self.assertEqual(kwargs["path"], target_path_actual) + self.assertEqual(kwargs["comment"], expected_comment) + self.assertGreaterEqual(mock_expect_all_or_drop.call_count, 1) + first_call_args, _ = mock_expect_all_or_drop.call_args_list[0] + self.assertEqual(first_call_args[0], expect_or_drop_dict) + mock_expect_all_or_fail.assert_called_once_with(expect_or_fail_dict) + mock_expect_all.assert_called_once_with(expect_dict) + assert mock_expect_all_or_drop.expect_all_or_drop(expect_or_quarantine_dict) - @patch.object(DataflowPipeline, "create_streaming_table", new_callable=MagicMock) - @patch('dlt.create_streaming_live_table', new_callable=MagicMock) - @patch('dlt.create_auto_cdc_flow', new_callable=MagicMock) @patch.object(DataflowPipeline, 'get_silver_schema', new_callable=MagicMock) + @patch('src.dataflow_pipeline.dlt') + @patch.object(DataflowPipeline, "create_streaming_table", new_callable=MagicMock) def test_dataflowpipeline_silver_cdc_apply_changes(self, mock_create_streaming_table, - mock_create_streaming_live_table, - mock_create_auto_cdc_flow, + mock_dlt, mock_get_silver_schema): mock_create_streaming_table.return_value = None - mock_create_streaming_live_table.return_value = None - mock_create_auto_cdc_flow.create_auto_cdc_flow.return_value = None + mock_create_auto_cdc_flow = MagicMock(return_value=None) + mock_dlt.create_auto_cdc_flow = mock_create_auto_cdc_flow onboarding_params_map = copy.deepcopy(self.onboarding_bronze_silver_params_map) onboarding_params_map['onboarding_file_path'] = self.onboarding_type2_json_file del onboarding_params_map["bronze_dataflowspec_table"] @@ -857,45 +887,52 @@ def test_dataflowpipeline_silver_cdc_apply_changes(self, mock_get_silver_schema.return_value = json.dumps(struct_schema.jsonValue()) pipeline.silver_schema = struct_schema pipeline.write_silver() - assert mock_create_streaming_table.called_once_with( - schema=struct_schema, - name=f"{silver_dataflowSpec_df.targetDetails['table']}" - ) - assert mock_create_streaming_live_table.called_once_with( - name=f"{silver_dataflowSpec_df.targetDetails['table']}", - table_properties=silver_dataflowSpec_df.tableProperties, - path=target_path, - schema=struct_schema, - expect_all=expect_dict, - expect_all_or_drop=expect_or_drop_dict, - expect_all_or_fail=expect_or_fail_dict + mock_create_streaming_table.assert_called_once_with(None, target_path) + mock_create_auto_cdc_flow.assert_called_once() + _, kwargs = mock_create_auto_cdc_flow.call_args + target_database = silver_dataflow_spec.targetDetails['database'] + target_table_name = silver_dataflow_spec.targetDetails['table'] + expected_target = f"{target_database}.{target_table_name}" + self.assertEqual(kwargs["target"], expected_target) + self.assertEqual(kwargs["source"], view_name) + self.assertEqual(kwargs["keys"], cdc_apply_changes.keys) + self.assertEqual(kwargs["sequence_by"], cdc_apply_changes.sequence_by) + + def assert_column_equals(actual, expected): + if expected is None: + self.assertIsNone(actual) + else: + self.assertIsNotNone(actual) + self.assertEqual(str(actual), str(expected)) + + assert_column_equals(kwargs["apply_as_deletes"], apply_as_deletes) + assert_column_equals(kwargs["apply_as_truncates"], apply_as_truncates) + self.assertEqual(kwargs["where"], cdc_apply_changes.where) + self.assertEqual(kwargs["ignore_null_updates"], cdc_apply_changes.ignore_null_updates) + self.assertEqual(kwargs["column_list"], cdc_apply_changes.column_list) + self.assertEqual(kwargs["except_column_list"], cdc_apply_changes.except_column_list) + self.assertEqual(kwargs["stored_as_scd_type"], cdc_apply_changes.scd_type) + self.assertEqual(kwargs["track_history_column_list"], cdc_apply_changes.track_history_column_list) + self.assertEqual(kwargs["track_history_except_column_list"], cdc_apply_changes.track_history_except_column_list) + self.assertEqual(kwargs["flow_name"], cdc_apply_changes.flow_name) + self.assertEqual(kwargs["once"], cdc_apply_changes.once) + self.assertEqual( + kwargs["ignore_null_updates_column_list"], + cdc_apply_changes.ignore_null_updates_column_list ) - assert mock_create_auto_cdc_flow.called_once_with( - name=f"{silver_dataflowSpec_df.targetDetails['table']}", - source=view_name, - keys=cdc_apply_changes.keys, - sequence_by=cdc_apply_changes.sequence_by, - where=cdc_apply_changes.where, - ignore_null_updates=cdc_apply_changes.ignore_null_updates, - apply_as_deletes=apply_as_deletes, - apply_as_truncates=apply_as_truncates, - column_list=cdc_apply_changes.column_list, - except_column_list=cdc_apply_changes.except_column_list, - stored_as_scd_type=cdc_apply_changes.scd_type, - track_history_column_list=cdc_apply_changes.track_history_column_list, - track_history_except_column_list=cdc_apply_changes.track_history_except_column_list + self.assertEqual( + kwargs["ignore_null_updates_except_column_list"], + cdc_apply_changes.ignore_null_updates_except_column_list ) + @patch('src.dataflow_pipeline.dlt') @patch.object(DataflowPipeline, "create_streaming_table", new_callable=MagicMock) - @patch('dlt.create_streaming_live_table', new_callable=MagicMock) - @patch('dlt.create_auto_cdc_flow', new_callable=MagicMock) def test_bronze_cdc_apply_changes(self, mock_create_streaming_table, - mock_create_streaming_live_table, - mock_create_auto_cdc_flow): + mock_dlt): mock_create_streaming_table.return_value = None - mock_create_auto_cdc_flow.create_auto_cdc_flow.return_value = None - mock_create_streaming_live_table.return_value = None + mock_create_auto_cdc_flow = MagicMock(return_value=None) + mock_dlt.create_auto_cdc_flow = mock_create_auto_cdc_flow onboarding_params_map = copy.deepcopy(self.onboarding_bronze_silver_params_map) onboarding_params_map['onboarding_file_path'] = self.onboarding_bronze_type2_json_file o_dfs = OnboardDataflowspec(self.spark, onboarding_params_map) @@ -915,47 +952,40 @@ def test_bronze_cdc_apply_changes(self, apply_as_truncates = None if cdc_apply_changes.apply_as_truncates: apply_as_truncates = expr(cdc_apply_changes.apply_as_truncates) - struct_schema = json.loads(bronze_dataflow_spec.schema) + expected_schema = pipeline.modify_schema_for_cdc_changes(cdc_apply_changes) pipeline.write_bronze() - assert mock_create_streaming_table.called_once_with( - schema=struct_schema, - name=f"{bronze_dataflowSpec_df.targetDetails['table']}" - ) - assert mock_create_streaming_live_table.called_once_with( - name=f"{bronze_dataflowSpec_df.targetDetails['table']}", - table_properties=bronze_dataflowSpec_df.tableProperties, - path=bronze_dataflowSpec_df.targetDetails["path"], - schema=struct_schema, - expect_all=None, - expect_all_or_drop=None, - expect_all_or_fail=None - ) - assert mock_create_auto_cdc_flow.called_once_with( - name=f"{bronze_dataflowSpec_df.targetDetails['table']}", - source=view_name, - keys=cdc_apply_changes.keys, - sequence_by=cdc_apply_changes.sequence_by, - where=cdc_apply_changes.where, - ignore_null_updates=cdc_apply_changes.ignore_null_updates, - apply_as_deletes=apply_as_deletes, - apply_as_truncates=apply_as_truncates, - column_list=cdc_apply_changes.column_list, - except_column_list=cdc_apply_changes.except_column_list, - stored_as_scd_type=cdc_apply_changes.scd_type, - track_history_column_list=cdc_apply_changes.track_history_column_list, - track_history_except_column_list=cdc_apply_changes.track_history_except_column_list - ) - + mock_create_streaming_table.assert_called_once() + args, kwargs = mock_create_streaming_table.call_args + self.assertEqual(args[0], expected_schema) + self.assertEqual(args[1], bronze_dataflow_spec.targetDetails["path"]) + mock_create_auto_cdc_flow.assert_called_once() + _, kwargs = mock_create_auto_cdc_flow.call_args + target_database = bronze_dataflow_spec.targetDetails['database'] + target_table_name = bronze_dataflow_spec.targetDetails['table'] + expected_target = f"{target_database}.{target_table_name}" + self.assertEqual(kwargs["target"], expected_target) + self.assertEqual(kwargs["source"], view_name) + self.assertEqual(kwargs["keys"], cdc_apply_changes.keys) + self.assertEqual(kwargs["sequence_by"], cdc_apply_changes.sequence_by) + + def assert_column_equals(actual, expected): + if expected is None: + self.assertIsNone(actual) + else: + self.assertIsNotNone(actual) + self.assertEqual(str(actual), str(expected)) + + assert_column_equals(kwargs["apply_as_deletes"], apply_as_deletes) + assert_column_equals(kwargs["apply_as_truncates"], apply_as_truncates) + + @patch('src.dataflow_pipeline.dlt') @patch.object(DataflowPipeline, "create_streaming_table", new_callable=MagicMock) - @patch('dlt.create_streaming_live_table', new_callable=MagicMock) - @patch('dlt.create_auto_cdc_flow', new_callable=MagicMock) def test_bronze_cdc_apply_changes_v7(self, mock_create_streaming_table, - mock_create_streaming_live_table, - mock_create_auto_cdc_flow): + mock_dlt): mock_create_streaming_table.return_value = None - mock_create_auto_cdc_flow.create_auto_cdc_flow.return_value = None - mock_create_streaming_live_table.return_value = None + mock_create_auto_cdc_flow = MagicMock(return_value=None) + mock_dlt.create_auto_cdc_flow = mock_create_auto_cdc_flow onboarding_params_map = copy.deepcopy(self.onboarding_bronze_silver_params_map) onboarding_params_map['onboarding_file_path'] = self.onboarding_json_v7_file o_dfs = OnboardDataflowspec(self.spark, onboarding_params_map) @@ -979,54 +1009,70 @@ def test_bronze_cdc_apply_changes_v7(self, apply_as_truncates = None if cdc_apply_changes.apply_as_truncates: apply_as_truncates = expr(cdc_apply_changes.apply_as_truncates) - struct_schema = json.loads(bronze_dataflow_spec.schema) + expected_schema = pipeline.modify_schema_for_cdc_changes(cdc_apply_changes) pipeline.write_bronze() - assert mock_create_streaming_table.called_once_with( - schema=struct_schema, - name=f"{bronze_dataflowSpec_df.targetDetails['table']}" - ) - assert mock_create_streaming_live_table.called_once_with( - name=f"{bronze_dataflowSpec_df.targetDetails['table']}", - table_properties=bronze_dataflowSpec_df.tableProperties, - path=bronze_dataflowSpec_df.targetDetails["path"], - schema=struct_schema, - expect_all=None, - expect_all_or_drop=None, - expect_all_or_fail=None + mock_create_streaming_table.assert_called_once() + args, kwargs = mock_create_streaming_table.call_args + self.assertEqual(args[0], expected_schema) + self.assertEqual(args[1], bronze_dataflow_spec.targetDetails["path"]) + mock_create_auto_cdc_flow.assert_called_once() + _, kwargs = mock_create_auto_cdc_flow.call_args + target_database = bronze_dataflow_spec.targetDetails['database'] + target_table_name = bronze_dataflow_spec.targetDetails['table'] + expected_target = f"{target_database}.{target_table_name}" + self.assertEqual(kwargs["target"], expected_target) + self.assertEqual(kwargs["source"], view_name) + self.assertEqual(kwargs["keys"], cdc_apply_changes.keys) + self.assertEqual(kwargs["sequence_by"], cdc_apply_changes.sequence_by) + + def assert_column_equals(actual, expected): + if expected is None: + self.assertIsNone(actual) + else: + self.assertIsNotNone(actual) + self.assertEqual(str(actual), str(expected)) + + assert_column_equals(kwargs["apply_as_deletes"], apply_as_deletes) + assert_column_equals(kwargs["apply_as_truncates"], apply_as_truncates) + self.assertEqual(kwargs["where"], cdc_apply_changes.where) + self.assertEqual(kwargs["ignore_null_updates"], cdc_apply_changes.ignore_null_updates) + self.assertEqual(kwargs["column_list"], cdc_apply_changes.column_list) + self.assertEqual(kwargs["except_column_list"], cdc_apply_changes.except_column_list) + self.assertEqual(kwargs["stored_as_scd_type"], cdc_apply_changes.scd_type) + self.assertEqual(kwargs["track_history_column_list"], cdc_apply_changes.track_history_column_list) + self.assertEqual(kwargs["track_history_except_column_list"], cdc_apply_changes.track_history_except_column_list) + self.assertEqual(kwargs["flow_name"], cdc_apply_changes.flow_name) + self.assertEqual(kwargs["once"], cdc_apply_changes.once) + self.assertEqual( + kwargs["ignore_null_updates_column_list"], + cdc_apply_changes.ignore_null_updates_column_list ) - assert mock_create_auto_cdc_flow.called_once_with( - name=f"{bronze_dataflowSpec_df.targetDetails['table']}", - source=view_name, - keys=cdc_apply_changes.keys, - sequence_by=cdc_apply_changes.sequence_by, - where=cdc_apply_changes.where, - ignore_null_updates=cdc_apply_changes.ignore_null_updates, - apply_as_deletes=apply_as_deletes, - apply_as_truncates=apply_as_truncates, - column_list=cdc_apply_changes.column_list, - except_column_list=cdc_apply_changes.except_column_list, - stored_as_scd_type=cdc_apply_changes.scd_type, - track_history_column_list=cdc_apply_changes.track_history_column_list, - track_history_except_column_list=cdc_apply_changes.track_history_except_column_list + self.assertEqual( + kwargs["ignore_null_updates_except_column_list"], + cdc_apply_changes.ignore_null_updates_except_column_list ) @patch.object(DataflowPipeline, "create_streaming_table", new_callable=MagicMock) @patch.object(DataflowPipeline, "write_to_delta", new_callable=MagicMock) - @patch('dlt.create_streaming_live_table', new_callable=MagicMock) - @patch('dlt.append_flow', new_callable=MagicMock) - @patch('dlt.read_stream', new_callable=MagicMock) + @patch('src.pipeline_writers.dlt') + @patch('src.dataflow_pipeline.dlt') def test_bronze_append_flow_positive(self, - mock_read_stream, - mock_append_flow, - mock_create_streaming_live_table, + mock_dlt, + mock_pipeline_writers_dlt, mock_write_to_delta, mock_create_streaming_table, ): mock_create_streaming_table.return_value = None mock_write_to_delta.return_value = None - mock_create_streaming_live_table.return_value = None - mock_append_flow.return_value = lambda func: func - mock_read_stream.return_value = None + mock_dlt_create_streaming_table = MagicMock(return_value=None) + mock_append_flow = MagicMock(return_value=lambda func: func) + mock_read_stream = MagicMock(return_value=None) + mock_dlt.create_streaming_table = mock_dlt_create_streaming_table + mock_dlt.append_flow = mock_append_flow + mock_dlt.read_stream = mock_read_stream + # Also set up mocks for pipeline_writers.dlt + mock_pipeline_writers_dlt.append_flow = mock_append_flow + mock_pipeline_writers_dlt.read_stream = mock_read_stream onboarding_params_map = copy.deepcopy(self.onboarding_bronze_silver_params_map) onboarding_params_map['onboarding_file_path'] = self.onboarding_append_flow_json_file o_dfs = OnboardDataflowspec(self.spark, onboarding_params_map) @@ -1041,28 +1087,27 @@ def test_bronze_append_flow_positive(self, struct_schema = json.loads(bronze_dataflow_spec.schema) append_flows = DataflowSpecUtils.get_append_flows(bronze_dataflow_spec.appendFlows) pipeline.write_bronze() - for append_flow in append_flows: - assert mock_create_streaming_table.called_once_with( - schema=struct_schema, - name=f"{bronze_dataflowSpec_df.targetDetails['table']}" - ) - assert mock_create_streaming_live_table.called_once_with( - name=f"{bronze_dataflowSpec_df.targetDetails['table']}", - table_properties=bronze_dataflowSpec_df.tableProperties, - path=bronze_dataflowSpec_df.targetDetails["path"], - schema=struct_schema, - expect_all=None, - expect_all_or_drop=None, - expect_all_or_fail=None - ) - target_table = bronze_dataflow_spec.targetDetails["table"] - assert mock_append_flow.called_once_with( - name=append_flow.name, - target=target_table, - comment=f"append_flow={append_flow.name} for target={target_table}", - spark_conf=append_flow.spark_conf, - once=append_flow.once - )(mock_write_to_delta.called_once()) + target_table = bronze_dataflow_spec.targetDetails["table"] + flows_with_streaming = [flow for flow in append_flows if flow.create_streaming_table] + self.assertEqual(mock_dlt_create_streaming_table.call_count, len(flows_with_streaming)) + for call, append_flow in zip(mock_dlt_create_streaming_table.call_args_list, flows_with_streaming): + _, kwargs = call + self.assertEqual(kwargs["name"], target_table) + self.assertEqual(kwargs["table_properties"], bronze_dataflow_spec.tableProperties) + self.assertEqual(kwargs["path"], bronze_dataflow_spec.targetDetails["path"]) + self.assertEqual(kwargs["schema"], struct_schema) + self.assertIsNone(kwargs["expect_all"]) + self.assertIsNone(kwargs["expect_all_or_drop"]) + self.assertIsNone(kwargs["expect_all_or_fail"]) + self.assertEqual(mock_append_flow.call_count, len(append_flows)) + for call, append_flow in zip(mock_append_flow.call_args_list, append_flows): + _, kwargs = call + self.assertEqual(kwargs["name"], append_flow.name) + self.assertEqual(kwargs["target"], target_table) + self.assertEqual(kwargs["comment"], f"append_flow={append_flow.name} for target={target_table}") + expected_spark_conf = append_flow.spark_conf if append_flow.spark_conf else {} + self.assertEqual(kwargs["spark_conf"], expected_spark_conf) + self.assertEqual(kwargs["once"], append_flow.once) def test_get_dq_expectations(self): o_dfs = OnboardDataflowspec(self.spark, self.onboarding_bronze_silver_params_map) @@ -1079,9 +1124,9 @@ def test_get_dq_expectations(self): self.assertIsNone(expect_all_or_fail_dict) self.assertIsNone(expect_all_dict) - @patch('dlt.view', new_callable=MagicMock) - def test_read_append_flows(self, mock_view): - mock_view.view.return_value = None + @patch('src.dataflow_pipeline.dlt') + def test_read_append_flows(self, mock_dlt): + mock_dlt.view = MagicMock(return_value=None) onboarding_params_map = copy.deepcopy(self.onboarding_bronze_silver_params_map) onboarding_params_map['onboarding_file_path'] = self.onboarding_append_flow_json_file o_dfs = OnboardDataflowspec(self.spark, onboarding_params_map) @@ -1095,16 +1140,15 @@ def test_read_append_flows(self, mock_view): pipeline = DataflowPipeline(self.spark, silver_dataflow_spec, view_name, None) pipeline.read_append_flows() append_flow = DataflowSpecUtils.get_append_flows(silver_dataflow_spec.appendFlows)[0] - pipeline_reader = PipelineReaders( - self.spark, - append_flow.source_format, - append_flow.source_details, - append_flow.reader_options - ) - assert mock_view.called_once_with( - pipeline_reader.read_dlt_cloud_files, - name=f"{append_flow.name}_view", - comment=f"append flow input dataset view for{append_flow.name}_view") + + # Check if mock was called before unpacking + self.assertIsNotNone(mock_dlt.view.call_args, "mock_view was not called") + called_args, called_kwargs = mock_dlt.view.call_args + read_callable = called_args[0] + self.assertEqual(read_callable.__name__, "read_dlt_cloud_files") + self.assertEqual(called_kwargs["name"], f"{append_flow.name}_view") + self.assertEqual(called_kwargs["comment"], f"append flow input dataset view for {append_flow.name}_view") + mock_dlt.view.reset_mock() bronze_df_row = bronze_dataflowSpec_df.filter(bronze_dataflowSpec_df.dataFlowId == "103").collect()[0] bronze_dataflow_spec = BronzeDataflowSpec(**bronze_df_row.asDict()) @@ -1112,16 +1156,14 @@ def test_read_append_flows(self, mock_view): pipeline = DataflowPipeline(self.spark, bronze_dataflow_spec, view_name, None) pipeline.read_append_flows() append_flow = DataflowSpecUtils.get_append_flows(bronze_dataflow_spec.appendFlows)[0] - pipeline_reader = PipelineReaders( - self.spark, - append_flow.source_format, - append_flow.source_details, - append_flow.reader_options - ) - assert mock_view.called_once_with( - pipeline_reader.read_kafka, - name=f"{append_flow.name}_view", - comment=f"append flow input dataset view for{append_flow.name}_view") + + self.assertIsNotNone(mock_dlt.view.call_args, "mock_view was not called for dataFlowId 103") + called_args, called_kwargs = mock_dlt.view.call_args + read_callable = called_args[0] + self.assertEqual(read_callable.__name__, "read_kafka") + self.assertEqual(called_kwargs["name"], f"{append_flow.name}_view") + self.assertEqual(called_kwargs["comment"], f"append flow input dataset view for {append_flow.name}_view") + mock_dlt.view.reset_mock() silver_dataflowSpec_df = self.spark.read.format("delta").load( self.onboarding_bronze_silver_params_map['silver_dataflowspec_path'] @@ -1132,16 +1174,13 @@ def test_read_append_flows(self, mock_view): pipeline = DataflowPipeline(self.spark, silver_dataflow_spec, view_name, None) pipeline.read_append_flows() append_flow = DataflowSpecUtils.get_append_flows(silver_dataflow_spec.appendFlows)[0] - pipeline_reader = PipelineReaders( - self.spark, - append_flow.source_format, - append_flow.source_details, - append_flow.reader_options - ) - assert mock_view.called_once_with( - pipeline_reader.read_dlt_delta, - name=f"{append_flow.name}_view", - comment=f"append flow input dataset view for{append_flow.name}_view") + + self.assertIsNotNone(mock_dlt.view.call_args, "mock_view was not called for dataFlowId 101") + called_args, called_kwargs = mock_dlt.view.call_args + read_callable = called_args[0] + self.assertEqual(read_callable.__name__, "read_dlt_delta") + self.assertEqual(called_kwargs["name"], f"{append_flow.name}_view") + self.assertEqual(called_kwargs["comment"], f"append flow input dataset view for {append_flow.name}_view") bronze_dataflowSpec_df.appendFlows = None with self.assertRaises(Exception): pipeline = DataflowPipeline(self.spark, bronze_dataflowSpec_df, view_name, None) @@ -1167,9 +1206,11 @@ def test_get_dq_expectations_with_expect_all(self): self.assertIsNotNone(expect_all_or_drop_dict) self.assertIsNotNone(expect_all_or_fail_dict) - @patch('dlt.table', new_callable=MagicMock) - def test_modify_schema_for_cdc_changes(self, mock_dlt_table): + @patch('src.dataflow_pipeline.dlt') + def test_modify_schema_for_cdc_changes(self, mock_dlt): + mock_dlt_table = MagicMock() mock_dlt_table.table.return_value = None + mock_dlt.table = mock_dlt_table cdc_apply_changes_json = """{ "keys": ["id"], "sequence_by": "operation_date", @@ -1609,9 +1650,11 @@ def test_snapshot_format_exception_without_reader_function(self): pipeline.run_dlt() self.assertEqual(str(context.exception), "Snapshot reader function not provided!") - @patch('dlt.view') - def test_is_create_view_with_delta_snapshot_format(self, mock_dlt_view): + @patch('src.dataflow_pipeline.dlt') + def test_is_create_view_with_delta_snapshot_format(self, mock_dlt): """Test is_create_view with delta snapshot format.""" + mock_dlt_view = MagicMock() + mock_dlt.view = mock_dlt_view bronze_spec_map = copy.deepcopy(DataflowPipelineTests.bronze_dataflow_spec_map) bronze_spec_map["sourceDetails"] = {"snapshot_format": "delta"} bronze_dataflow_spec = BronzeDataflowSpec(**bronze_spec_map) @@ -1779,3 +1822,510 @@ def test_apply_transformations_with_select_and_where(self): mock_df.selectExpr.assert_called_once_with(*select_exp) mock_df.where.assert_called_once_with("id > 0") + + def test_cluster_by_auto_string_to_boolean_conversion_true(self): + """Test cluster_by_auto string 'true' conversion to boolean True.""" + bronze_spec_map = copy.deepcopy(DataflowPipelineTests.bronze_dataflow_spec_map) + bronze_spec_map["quarantineTargetDetails"] = { + "database": "bronze", + "table": "customer_dqe", + "path": "tests/localtest/delta/customers_dqe", + "cluster_by_auto": "true" + } + bronze_dataflow_spec = BronzeDataflowSpec(**bronze_spec_map) + pipeline = DataflowPipeline(self.spark, bronze_dataflow_spec, "test_view") + + quarantine_details = pipeline._get_quarantine_target_details() + q_cluster_by_auto_value = quarantine_details.get("cluster_by_auto", False) + + # Simulate the conversion logic from dataflow_pipeline.py + if isinstance(q_cluster_by_auto_value, str): + q_cluster_by_auto = q_cluster_by_auto_value.lower().strip() == 'true' + else: + q_cluster_by_auto = bool(q_cluster_by_auto_value) if q_cluster_by_auto_value else False + + self.assertEqual(q_cluster_by_auto, True) + self.assertIsInstance(q_cluster_by_auto, bool) + + def test_cluster_by_auto_string_to_boolean_conversion_false(self): + """Test cluster_by_auto string 'false' conversion to boolean False.""" + bronze_spec_map = copy.deepcopy(DataflowPipelineTests.bronze_dataflow_spec_map) + bronze_spec_map["quarantineTargetDetails"] = { + "database": "bronze", + "table": "customer_dqe", + "path": "tests/localtest/delta/customers_dqe", + "cluster_by_auto": "false" + } + bronze_dataflow_spec = BronzeDataflowSpec(**bronze_spec_map) + pipeline = DataflowPipeline(self.spark, bronze_dataflow_spec, "test_view") + + quarantine_details = pipeline._get_quarantine_target_details() + q_cluster_by_auto_value = quarantine_details.get("cluster_by_auto", False) + + # Simulate the conversion logic from dataflow_pipeline.py + if isinstance(q_cluster_by_auto_value, str): + q_cluster_by_auto = q_cluster_by_auto_value.lower().strip() == 'true' + else: + q_cluster_by_auto = bool(q_cluster_by_auto_value) if q_cluster_by_auto_value else False + + self.assertEqual(q_cluster_by_auto, False) + self.assertIsInstance(q_cluster_by_auto, bool) + + def test_cluster_by_auto_string_case_insensitive(self): + """Test cluster_by_auto case insensitive string conversion.""" + test_values = ["True", "TRUE", "TrUe"] + for value in test_values: + bronze_spec_map = copy.deepcopy(DataflowPipelineTests.bronze_dataflow_spec_map) + bronze_spec_map["quarantineTargetDetails"] = { + "database": "bronze", + "table": "customer_dqe", + "cluster_by_auto": value + } + bronze_dataflow_spec = BronzeDataflowSpec(**bronze_spec_map) + pipeline = DataflowPipeline(self.spark, bronze_dataflow_spec, "test_view") + + quarantine_details = pipeline._get_quarantine_target_details() + q_cluster_by_auto_value = quarantine_details.get("cluster_by_auto", False) + + if isinstance(q_cluster_by_auto_value, str): + q_cluster_by_auto = q_cluster_by_auto_value.lower().strip() == 'true' + else: + q_cluster_by_auto = bool(q_cluster_by_auto_value) if q_cluster_by_auto_value else False + + self.assertEqual(q_cluster_by_auto, True, f"Failed for value: {value}") + + def test_cluster_by_auto_boolean_true(self): + """Test cluster_by_auto with boolean True value.""" + bronze_spec_map = copy.deepcopy(DataflowPipelineTests.bronze_dataflow_spec_map) + bronze_spec_map["quarantineTargetDetails"] = { + "database": "bronze", + "table": "customer_dqe", + "cluster_by_auto": True + } + bronze_dataflow_spec = BronzeDataflowSpec(**bronze_spec_map) + pipeline = DataflowPipeline(self.spark, bronze_dataflow_spec, "test_view") + + quarantine_details = pipeline._get_quarantine_target_details() + q_cluster_by_auto_value = quarantine_details.get("cluster_by_auto", False) + + # Simulate the conversion logic from dataflow_pipeline.py + if isinstance(q_cluster_by_auto_value, str): + q_cluster_by_auto = q_cluster_by_auto_value.lower().strip() == 'true' + else: + q_cluster_by_auto = bool(q_cluster_by_auto_value) if q_cluster_by_auto_value else False + + self.assertEqual(q_cluster_by_auto, True) + + def test_cluster_by_auto_boolean_false(self): + """Test cluster_by_auto with boolean False value.""" + bronze_spec_map = copy.deepcopy(DataflowPipelineTests.bronze_dataflow_spec_map) + bronze_spec_map["quarantineTargetDetails"] = { + "database": "bronze", + "table": "customer_dqe", + "cluster_by_auto": False + } + bronze_dataflow_spec = BronzeDataflowSpec(**bronze_spec_map) + pipeline = DataflowPipeline(self.spark, bronze_dataflow_spec, "test_view") + + quarantine_details = pipeline._get_quarantine_target_details() + q_cluster_by_auto_value = quarantine_details.get("cluster_by_auto", False) + + # Simulate the conversion logic from dataflow_pipeline.py + if isinstance(q_cluster_by_auto_value, str): + q_cluster_by_auto = q_cluster_by_auto_value.lower().strip() == 'true' + else: + q_cluster_by_auto = bool(q_cluster_by_auto_value) if q_cluster_by_auto_value else False + + self.assertEqual(q_cluster_by_auto, False) + + def test_cluster_by_auto_default_when_missing(self): + """Test cluster_by_auto defaults to False when not provided.""" + bronze_spec_map = copy.deepcopy(DataflowPipelineTests.bronze_dataflow_spec_map) + bronze_spec_map["quarantineTargetDetails"] = { + "database": "bronze", + "table": "customer_dqe" + } + bronze_dataflow_spec = BronzeDataflowSpec(**bronze_spec_map) + pipeline = DataflowPipeline(self.spark, bronze_dataflow_spec, "test_view") + + quarantine_details = pipeline._get_quarantine_target_details() + q_cluster_by_auto_value = quarantine_details.get("cluster_by_auto", False) + + # Simulate the conversion logic from dataflow_pipeline.py + if isinstance(q_cluster_by_auto_value, str): + q_cluster_by_auto = q_cluster_by_auto_value.lower().strip() == 'true' + else: + q_cluster_by_auto = bool(q_cluster_by_auto_value) if q_cluster_by_auto_value else False + + self.assertEqual(q_cluster_by_auto, False) + + def test_cluster_by_auto_for_bronze_table(self): + """Test cluster_by_auto is correctly extracted for bronze table.""" + bronze_spec_map = copy.deepcopy(DataflowPipelineTests.bronze_dataflow_spec_map) + bronze_spec_map["clusterByAuto"] = True + bronze_dataflow_spec = BronzeDataflowSpec(**bronze_spec_map) + pipeline = DataflowPipeline(self.spark, bronze_dataflow_spec, "test_view") + + # Simulate the logic from dataflow_pipeline.py for bronze tables + cluster_by_auto = ( + pipeline.dataflowSpec.clusterByAuto + if hasattr(pipeline.dataflowSpec, 'clusterByAuto') + and pipeline.dataflowSpec.clusterByAuto is not None + else False + ) + + self.assertEqual(cluster_by_auto, True) + self.assertIsInstance(cluster_by_auto, bool) + + def test_cluster_by_auto_for_bronze_table_false(self): + """Test cluster_by_auto is False for bronze table when set to False.""" + bronze_spec_map = copy.deepcopy(DataflowPipelineTests.bronze_dataflow_spec_map) + bronze_spec_map["clusterByAuto"] = False + bronze_dataflow_spec = BronzeDataflowSpec(**bronze_spec_map) + pipeline = DataflowPipeline(self.spark, bronze_dataflow_spec, "test_view") + + cluster_by_auto = ( + pipeline.dataflowSpec.clusterByAuto + if hasattr(pipeline.dataflowSpec, 'clusterByAuto') + and pipeline.dataflowSpec.clusterByAuto is not None + else False + ) + + self.assertEqual(cluster_by_auto, False) + + def test_cluster_by_auto_for_bronze_table_not_set(self): + """Test cluster_by_auto defaults to False when not set for bronze table.""" + bronze_spec_map = copy.deepcopy(DataflowPipelineTests.bronze_dataflow_spec_map) + # Don't set clusterByAuto + bronze_dataflow_spec = BronzeDataflowSpec(**bronze_spec_map) + pipeline = DataflowPipeline(self.spark, bronze_dataflow_spec, "test_view") + + cluster_by_auto = ( + pipeline.dataflowSpec.clusterByAuto + if hasattr(pipeline.dataflowSpec, 'clusterByAuto') + and pipeline.dataflowSpec.clusterByAuto is not None + else False + ) + + self.assertEqual(cluster_by_auto, False) + + def test_cluster_by_auto_for_silver_table(self): + """Test cluster_by_auto is correctly extracted for silver table.""" + silver_spec_map = copy.deepcopy(DataflowPipelineTests.silver_dataflow_spec_map) + silver_spec_map["clusterByAuto"] = True + silver_dataflow_spec = SilverDataflowSpec(**silver_spec_map) + pipeline = DataflowPipeline(self.spark, silver_dataflow_spec, "test_view") + + cluster_by_auto = ( + pipeline.dataflowSpec.clusterByAuto + if hasattr(pipeline.dataflowSpec, 'clusterByAuto') + and pipeline.dataflowSpec.clusterByAuto is not None + else False + ) + + self.assertEqual(cluster_by_auto, True) + self.assertIsInstance(cluster_by_auto, bool) + + def test_get_table_properties(self): + """Test _get_table_properties method.""" + bronze_dataflow_spec = BronzeDataflowSpec(**DataflowPipelineTests.bronze_dataflow_spec_map) + pipeline = DataflowPipeline(self.spark, bronze_dataflow_spec, "test_view") + table_properties = pipeline._get_table_properties() + self.assertIsInstance(table_properties, dict) + + def test_helper_methods_for_table_info(self): + """Test helper methods that extract source and target table information.""" + bronze_spec_map = copy.deepcopy(DataflowPipelineTests.bronze_dataflow_spec_map) + bronze_spec_map["sourceDetails"] = { + "catalog": "test_catalog", + "database": "test_db", + "table": "test_table" + } + bronze_dataflow_spec = BronzeDataflowSpec(**bronze_spec_map) + pipeline = DataflowPipeline(self.spark, bronze_dataflow_spec, "test_view") + + # Test _get_source_table_info + source_table_name, source_details = pipeline._get_source_table_info() + self.assertEqual(source_table_name, "test_catalog.test_db.test_table") + self.assertIn("catalog", source_details) + + # Test _get_target_table_name + target_table_name = pipeline._get_target_table_name() + self.assertIn("customer", target_table_name) + + def test_quarantine_cluster_by_string_parsing(self): + """Test quarantine cluster_by parsing with string representation.""" + bronze_spec_map = copy.deepcopy(DataflowPipelineTests.bronze_dataflow_spec_map) + bronze_spec_map["quarantineTargetDetails"] = { + "database": "bronze", + "table": "customer_dqe", + "path": "tests/localtest/delta/customers_dqe", + "cluster_by": "['id', 'email']" + } + bronze_dataflow_spec = BronzeDataflowSpec(**bronze_spec_map) + pipeline = DataflowPipeline(self.spark, bronze_dataflow_spec, "test_view", "quarantine_view") + + # This should trigger the cluster_by parsing logic + quarantine_details = pipeline._get_quarantine_target_details() + self.assertIn("cluster_by", quarantine_details) + + def test_apply_transformations_helper(self): + """Test _apply_transformations helper method.""" + bronze_dataflow_spec = BronzeDataflowSpec(**DataflowPipelineTests.bronze_dataflow_spec_map) + pipeline = DataflowPipeline(self.spark, bronze_dataflow_spec, "test_view") + + # Create test DataFrame + test_data = [("1", "John", "john@email.com"), ("2", "Jane", "jane@email.com")] + test_df = self.spark.createDataFrame(test_data, ["id", "name", "email"]) + + # Test with select and where + result_df = pipeline._apply_transformations(test_df, ["id", "name"], ["id > '0'"]) + self.assertEqual(len(result_df.columns), 2) + self.assertIn("id", result_df.columns) + self.assertIn("name", result_df.columns) + + def test_get_silver_schema_uc_disabled_with_path(self): + """Test get_silver_schema with Unity Catalog disabled using path.""" + # Use copy of silver spec map + silver_spec_map = copy.deepcopy(DataflowPipelineTests.silver_dataflow_spec_map) + + self.spark.sql("CREATE DATABASE IF NOT EXISTS bronze") + self.spark.sql("DROP TABLE IF EXISTS bronze.customer") + if os.path.exists(f"{self.temp_delta_tables_path}/tables/customer_schema_uc_test"): + shutil.rmtree(f"{self.temp_delta_tables_path}/tables/customer_schema_uc_test") + + # Write data to delta format with all required columns + options = {"rescuedDataColumn": "_rescued_data", "inferColumnTypes": "true", "multiline": True} + customers_df = self.spark.read.options(**options).json("tests/resources/data/customers") + # Add the _rescued_data column and ensure all selectExp columns are present + customers_with_rescued = customers_df.withColumn("_rescued_data", lit("Test")) + + # Write to both table and path + (customers_with_rescued.write.format("delta") + .mode("overwrite") + .option("path", f"{self.temp_delta_tables_path}/tables/customer_schema_uc_test") + .saveAsTable("bronze.customer")) + + # Update silver spec with the path where data was written + silver_spec_map.update({ + "sourceDetails": { + "database": "bronze", + "table": "customer", + "path": f"{self.temp_delta_tables_path}/tables/customer_schema_uc_test" + } + }) + silver_dataflow_spec = SilverDataflowSpec(**silver_spec_map) + + # Test with UC disabled - should use path instead of table name + self.spark.conf.set("spark.databricks.unityCatalog.enabled", "False") + pipeline = DataflowPipeline(self.spark, silver_dataflow_spec, "test_view") + + # Verify that get_silver_schema returns a valid schema when using path + schema = pipeline.get_silver_schema() + self.assertIsNotNone(schema, "Schema should not be None when using path with UC disabled") + + def test_get_silver_schema_with_catalog(self): + """Test get_silver_schema with catalog specified in source details.""" + silver_spec_map = copy.deepcopy(DataflowPipelineTests.silver_dataflow_spec_map) + silver_spec_map["sourceDetails"] = { + "catalog": "test_catalog", + "database": "bronze", + "table": "customer", + "path": "tests/resources/delta/customers" + } + + self.spark.sql("CREATE DATABASE IF NOT EXISTS bronze") + self.spark.sql("DROP TABLE IF EXISTS bronze.customer") + options = {"rescuedDataColumn": "_rescued_data", "inferColumnTypes": "true", "multiline": True} + customers_df = self.spark.read.options(**options).json("tests/resources/data/customers") + (customers_df.withColumn("_rescued_data", lit("Test")).write.format("delta") + .mode("overwrite").saveAsTable("bronze.customer")) + + silver_dataflow_spec = SilverDataflowSpec(**silver_spec_map) + + # Test with UC enabled - should try to use catalog (will fail but tests the path) + self.spark.conf.set("spark.databricks.unityCatalog.enabled", "True") + pipeline = DataflowPipeline(self.spark, silver_dataflow_spec, "test_view") + + # This will likely fail in test environment but exercises the catalog code path + try: + pipeline.get_silver_schema() + except Exception: + # Expected to fail in test environment without real UC + pass + + def test_read_silver_with_reader_config_and_snapshot(self): + """Test read_silver with reader config options and snapshot format.""" + silver_spec_map = copy.deepcopy(DataflowPipelineTests.silver_dataflow_spec_map) + silver_spec_map["readerConfigOptions"] = {"maxFilesPerTrigger": "1"} + silver_spec_map["sourceFormat"] = "snapshot" + silver_spec_map["sourceDetails"] = { + "database": "bronze", + "table": "customer", + "path": f"{self.temp_delta_tables_path}/tables/customer_snapshot" + } + + self.spark.sql("CREATE DATABASE IF NOT EXISTS bronze") + self.spark.sql("DROP TABLE IF EXISTS bronze.customer") + if os.path.exists(f"{self.temp_delta_tables_path}/tables/customer_snapshot"): + shutil.rmtree(f"{self.temp_delta_tables_path}/tables/customer_snapshot") + + options = {"rescuedDataColumn": "_rescued_data", "inferColumnTypes": "true", "multiline": True} + customers_df = self.spark.read.options(**options).json("tests/resources/data/customers") + (customers_df.withColumn("_rescued_data", lit("Test")).write.format("delta") + .mode("append").option("path", f"{self.temp_delta_tables_path}/tables/customer_snapshot") + .saveAsTable("bronze.customer")) + + silver_dataflow_spec = SilverDataflowSpec(**silver_spec_map) + + # Test with UC disabled - should use snapshot read with path + self.spark.conf.set("spark.databricks.unityCatalog.enabled", "False") + pipeline = DataflowPipeline(self.spark, silver_dataflow_spec, "test_view") + + result_df = pipeline.read_silver() + self.assertIsNotNone(result_df) + # Verify it's a DataFrame with data + self.assertTrue(hasattr(result_df, 'count')) + + def test_read_silver_with_reader_config_uc_enabled(self): + """Test read_silver with reader config options and UC enabled.""" + silver_spec_map = copy.deepcopy(DataflowPipelineTests.silver_dataflow_spec_map) + silver_spec_map["readerConfigOptions"] = {"maxFilesPerTrigger": "1"} + silver_spec_map["sourceFormat"] = "snapshot" + silver_spec_map["sourceDetails"] = { + "database": "bronze", + "table": "customer", + "path": "tests/resources/delta/customers" + } + + self.spark.sql("CREATE DATABASE IF NOT EXISTS bronze") + self.spark.sql("DROP TABLE IF EXISTS bronze.customer") + options = {"rescuedDataColumn": "_rescued_data", "inferColumnTypes": "true", "multiline": True} + customers_df = self.spark.read.options(**options).json("tests/resources/data/customers") + (customers_df.withColumn("_rescued_data", lit("Test")).write.format("delta") + .mode("overwrite").saveAsTable("bronze.customer")) + + silver_dataflow_spec = SilverDataflowSpec(**silver_spec_map) + + # Test with UC enabled - should use table name + self.spark.conf.set("spark.databricks.unityCatalog.enabled", "True") + pipeline = DataflowPipeline(self.spark, silver_dataflow_spec, "test_view") + + result_df = pipeline.read_silver() + self.assertIsNotNone(result_df) + + def test_read_silver_streaming_with_reader_config(self): + """Test read_silver with streaming and reader config options.""" + silver_spec_map = copy.deepcopy(DataflowPipelineTests.silver_dataflow_spec_map) + silver_spec_map["readerConfigOptions"] = {"maxFilesPerTrigger": "1"} + silver_spec_map["sourceFormat"] = "delta" # Not snapshot, so uses readStream + silver_spec_map["sourceDetails"] = { + "database": "bronze", + "table": "customer", + "path": f"{self.temp_delta_tables_path}/tables/customer_streaming" + } + + self.spark.sql("CREATE DATABASE IF NOT EXISTS bronze") + self.spark.sql("DROP TABLE IF EXISTS bronze.customer") + if os.path.exists(f"{self.temp_delta_tables_path}/tables/customer_streaming"): + shutil.rmtree(f"{self.temp_delta_tables_path}/tables/customer_streaming") + + options = {"rescuedDataColumn": "_rescued_data", "inferColumnTypes": "true", "multiline": True} + customers_df = self.spark.read.options(**options).json("tests/resources/data/customers") + (customers_df.withColumn("_rescued_data", lit("Test")).write.format("delta") + .mode("append").option("path", f"{self.temp_delta_tables_path}/tables/customer_streaming") + .saveAsTable("bronze.customer")) + + silver_dataflow_spec = SilverDataflowSpec(**silver_spec_map) + + # Test with UC disabled - should use readStream with path + self.spark.conf.set("spark.databricks.unityCatalog.enabled", "False") + pipeline = DataflowPipeline(self.spark, silver_dataflow_spec, "test_view") + + result_df = pipeline.read_silver() + self.assertIsNotNone(result_df) + + def test_read_from_source_snapshot_format(self): + """Test _read_from_source with snapshot format.""" + bronze_spec_map = copy.deepcopy(DataflowPipelineTests.bronze_dataflow_spec_map) + bronze_spec_map["sourceFormat"] = "snapshot" + bronze_spec_map["sourceDetails"] = { + "database": "bronze", + "table": "customer", + "path": "tests/resources/delta/customers" + } + + bronze_dataflow_spec = BronzeDataflowSpec(**bronze_spec_map) + + # Test with UC disabled + self.spark.conf.set("spark.databricks.unityCatalog.enabled", "False") + pipeline = DataflowPipeline(self.spark, bronze_dataflow_spec, "test_view") + + # This exercises the snapshot read path + self.assertIsNotNone(pipeline) + self.assertEqual(pipeline.dataflowSpec.sourceFormat, "snapshot") + + def test_read_from_source_streaming_with_path(self): + """Test _read_from_source with streaming format using path.""" + bronze_spec_map = copy.deepcopy(DataflowPipelineTests.bronze_dataflow_spec_map) + bronze_spec_map["sourceFormat"] = "delta" + bronze_spec_map["sourceDetails"] = { + "database": "bronze", + "table": "customer", + "path": "tests/resources/delta/customers" + } + + bronze_dataflow_spec = BronzeDataflowSpec(**bronze_spec_map) + + # Test with UC disabled - exercises the streaming read with path + self.spark.conf.set("spark.databricks.unityCatalog.enabled", "False") + pipeline = DataflowPipeline(self.spark, bronze_dataflow_spec, "test_view") + + self.assertIsNotNone(pipeline) + self.assertFalse(pipeline.uc_enabled) + + def test_read_from_source_with_reader_options(self): + """Test _read_from_source with reader config options.""" + bronze_spec_map = copy.deepcopy(DataflowPipelineTests.bronze_dataflow_spec_map) + bronze_spec_map["sourceFormat"] = "delta" + bronze_spec_map["readerConfigOptions"] = {"maxFilesPerTrigger": "1", "ignoreDeletes": "true"} + bronze_spec_map["sourceDetails"] = { + "database": "bronze", + "table": "customer", + "path": "tests/resources/delta/customers" + } + + bronze_dataflow_spec = BronzeDataflowSpec(**bronze_spec_map) + pipeline = DataflowPipeline(self.spark, bronze_dataflow_spec, "test_view") + + # Verify reader config options are available + reader_opts = pipeline._get_reader_config_options() + self.assertIn("maxFilesPerTrigger", reader_opts) + self.assertEqual(reader_opts["maxFilesPerTrigger"], "1") + + def test_create_dataframe_reader_with_options(self): + """Test _create_dataframe_reader with various options.""" + bronze_dataflow_spec = BronzeDataflowSpec(**DataflowPipelineTests.bronze_dataflow_spec_map) + pipeline = DataflowPipeline(self.spark, bronze_dataflow_spec, "test_view") + + # Test streaming reader without options + reader = pipeline._create_dataframe_reader(is_streaming=True, reader_options=None) + self.assertIsNotNone(reader) + + # Test batch reader without options + reader = pipeline._create_dataframe_reader(is_streaming=False, reader_options=None) + self.assertIsNotNone(reader) + + # Test streaming reader with options + reader = pipeline._create_dataframe_reader( + is_streaming=True, + reader_options={"maxFilesPerTrigger": "1"} + ) + self.assertIsNotNone(reader) + + # Test batch reader with options + reader = pipeline._create_dataframe_reader( + is_streaming=False, + reader_options={"inferSchema": "true"} + ) + self.assertIsNotNone(reader) diff --git a/tests/test_main.py b/tests/test_main.py index 93f2aa8b..eda63dcf 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -108,9 +108,6 @@ def test_main_layer_missing(self): def test_main_bronze_silver_uc(self): """Test bronze and silver onboarding for uc.""" - OnboardDataflowspec.return_value = None - spark_mock = MagicMock("SparkSession") - spark.builder.appName("DLT-META_Onboarding_Task").getOrCreate().return_value = spark_mock param_map = copy.deepcopy(self.onboarding_bronze_silver_params_uc_map) param_map["onboard_layer"] = "bronze_silver" list = ["dummy_test"] @@ -126,11 +123,6 @@ def test_main_bronze_silver_uc(self): f"{param_map['database']}.{param_map['silver_dataflowspec_table']}") ) self.assertEqual(silver_dataflowSpec_df.count(), 3) - del param_map['onboard_layer'] - del param_map['uc_enabled'] - del param_map['bronze_dataflowspec_path'] - del param_map['silver_dataflowspec_path'] - OnboardDataflowspec.called_once_with(spark_mock, param_map, uc_enabled=True) def test_onboarding(self): mock_onboard_dataflowspec = OnboardDataflowspec diff --git a/tests/test_onboard_dataflowspec.py b/tests/test_onboard_dataflowspec.py index 74580678..21c6c5f5 100644 --- a/tests/test_onboard_dataflowspec.py +++ b/tests/test_onboard_dataflowspec.py @@ -786,3 +786,181 @@ def test_cluster_by_string_parsing_double_quotes(self): cluster_by = onboardDataFlowSpecs._OnboardDataflowspec__get_cluster_by_properties( onboarding_row, onboarding_row['bronze_table_properties'], "bronze_cluster_by") self.assertEqual(cluster_by, ["id", "customer_id"]) + + def test_bronze_cluster_by_auto_true(self): + """Test cluster_by_auto property with True value.""" + onboarding_row = { + "bronze_cluster_by_auto": True + } + onboardDataFlowSpecs = OnboardDataflowspec(self.spark, self.onboarding_bronze_silver_params_map) + cluster_by_auto = onboardDataFlowSpecs._OnboardDataflowspec__get_cluster_by_auto( + onboarding_row, "bronze_cluster_by_auto") + self.assertEqual(cluster_by_auto, True) + + def test_bronze_cluster_by_auto_false(self): + """Test cluster_by_auto property with False value.""" + onboarding_row = { + "bronze_cluster_by_auto": False + } + onboardDataFlowSpecs = OnboardDataflowspec(self.spark, self.onboarding_bronze_silver_params_map) + cluster_by_auto = onboardDataFlowSpecs._OnboardDataflowspec__get_cluster_by_auto( + onboarding_row, "bronze_cluster_by_auto") + self.assertEqual(cluster_by_auto, False) + + def test_bronze_cluster_by_auto_string_true(self): + """Test cluster_by_auto property with string 'true'.""" + onboarding_row = { + "bronze_cluster_by_auto": "true" + } + onboardDataFlowSpecs = OnboardDataflowspec(self.spark, self.onboarding_bronze_silver_params_map) + cluster_by_auto = onboardDataFlowSpecs._OnboardDataflowspec__get_cluster_by_auto( + onboarding_row, "bronze_cluster_by_auto") + self.assertEqual(cluster_by_auto, True) + + def test_bronze_cluster_by_auto_string_false(self): + """Test cluster_by_auto property with string 'false'.""" + onboarding_row = { + "bronze_cluster_by_auto": "false" + } + onboardDataFlowSpecs = OnboardDataflowspec(self.spark, self.onboarding_bronze_silver_params_map) + cluster_by_auto = onboardDataFlowSpecs._OnboardDataflowspec__get_cluster_by_auto( + onboarding_row, "bronze_cluster_by_auto") + self.assertEqual(cluster_by_auto, False) + + def test_bronze_cluster_by_auto_none(self): + """Test cluster_by_auto property with None value.""" + onboarding_row = { + "bronze_cluster_by_auto": None + } + onboardDataFlowSpecs = OnboardDataflowspec(self.spark, self.onboarding_bronze_silver_params_map) + cluster_by_auto = onboardDataFlowSpecs._OnboardDataflowspec__get_cluster_by_auto( + onboarding_row, "bronze_cluster_by_auto") + self.assertEqual(cluster_by_auto, None) + + def test_bronze_cluster_by_auto_invalid_string(self): + """Test cluster_by_auto property with invalid string value.""" + onboarding_row = { + "bronze_cluster_by_auto": "invalid" + } + onboardDataFlowSpecs = OnboardDataflowspec(self.spark, self.onboarding_bronze_silver_params_map) + with self.assertRaises(Exception) as context: + onboardDataFlowSpecs._OnboardDataflowspec__get_cluster_by_auto( + onboarding_row, "bronze_cluster_by_auto") + self.assertIn("Expected boolean or string representation of boolean", str(context.exception)) + + def test_silver_cluster_by_auto_true(self): + """Test silver cluster_by_auto property with True value.""" + onboarding_row = { + "silver_cluster_by_auto": True + } + onboardDataFlowSpecs = OnboardDataflowspec(self.spark, self.onboarding_bronze_silver_params_map) + cluster_by_auto = onboardDataFlowSpecs._OnboardDataflowspec__get_cluster_by_auto( + onboarding_row, "silver_cluster_by_auto") + self.assertEqual(cluster_by_auto, True) + + def test_bronze_cluster_by_auto_string_case_insensitive(self): + """Test cluster_by_auto property with case-insensitive string values.""" + test_values = ["true", "True", "TRUE", "TrUe"] + onboardDataFlowSpecs = OnboardDataflowspec(self.spark, self.onboarding_bronze_silver_params_map) + + for value in test_values: + onboarding_row = {"bronze_cluster_by_auto": value} + cluster_by_auto = onboardDataFlowSpecs._OnboardDataflowspec__get_cluster_by_auto( + onboarding_row, "bronze_cluster_by_auto") + self.assertEqual(cluster_by_auto, True, f"Failed for value: {value}") + + test_values_false = ["false", "False", "FALSE", "FaLsE"] + for value in test_values_false: + onboarding_row = {"bronze_cluster_by_auto": value} + cluster_by_auto = onboardDataFlowSpecs._OnboardDataflowspec__get_cluster_by_auto( + onboarding_row, "bronze_cluster_by_auto") + self.assertEqual(cluster_by_auto, False, f"Failed for value: {value}") + + def test_bronze_cluster_by_auto_string_with_whitespace(self): + """Test cluster_by_auto property with string values containing whitespace.""" + onboarding_row = {"bronze_cluster_by_auto": " true "} + onboardDataFlowSpecs = OnboardDataflowspec(self.spark, self.onboarding_bronze_silver_params_map) + cluster_by_auto = onboardDataFlowSpecs._OnboardDataflowspec__get_cluster_by_auto( + onboarding_row, "bronze_cluster_by_auto") + self.assertEqual(cluster_by_auto, True) + + onboarding_row = {"bronze_cluster_by_auto": " false "} + cluster_by_auto = onboardDataFlowSpecs._OnboardDataflowspec__get_cluster_by_auto( + onboarding_row, "bronze_cluster_by_auto") + self.assertEqual(cluster_by_auto, False) + + def test_bronze_cluster_by_auto_invalid_type(self): + """Test cluster_by_auto property with invalid type.""" + onboarding_row = {"bronze_cluster_by_auto": 123} + onboardDataFlowSpecs = OnboardDataflowspec(self.spark, self.onboarding_bronze_silver_params_map) + with self.assertRaises(Exception) as context: + onboardDataFlowSpecs._OnboardDataflowspec__get_cluster_by_auto( + onboarding_row, "bronze_cluster_by_auto") + self.assertIn("Expected boolean or string", str(context.exception)) + + def test_bronze_cluster_by_auto_missing_key(self): + """Test cluster_by_auto property when key is missing.""" + onboarding_row = {} + onboardDataFlowSpecs = OnboardDataflowspec(self.spark, self.onboarding_bronze_silver_params_map) + cluster_by_auto = onboardDataFlowSpecs._OnboardDataflowspec__get_cluster_by_auto( + onboarding_row, "bronze_cluster_by_auto") + self.assertEqual(cluster_by_auto, False) + + def test_quarantine_cluster_by_auto_true(self): + """Test quarantine table cluster_by_auto with True value.""" + onboarding_row = { + "bronze_database_quarantine_it": "quarantine_db", + "bronze_quarantine_table": "quarantine_table", + "bronze_quarantine_table_cluster_by_auto": True + } + onboardDataFlowSpecs = OnboardDataflowspec(self.spark, self.onboarding_bronze_silver_params_map) + quarantine_target_details, _ = ( + onboardDataFlowSpecs._OnboardDataflowspec__get_quarantine_details( + "it", "bronze", onboarding_row + ) + ) + self.assertEqual(quarantine_target_details.get("cluster_by_auto"), True) + + def test_quarantine_cluster_by_auto_false(self): + """Test quarantine table cluster_by_auto with False value.""" + onboarding_row = { + "bronze_database_quarantine_it": "quarantine_db", + "bronze_quarantine_table": "quarantine_table", + "bronze_quarantine_table_cluster_by_auto": False + } + onboardDataFlowSpecs = OnboardDataflowspec(self.spark, self.onboarding_bronze_silver_params_map) + quarantine_target_details, _ = ( + onboardDataFlowSpecs._OnboardDataflowspec__get_quarantine_details( + "it", "bronze", onboarding_row + ) + ) + self.assertEqual(quarantine_target_details.get("cluster_by_auto"), False) + + def test_quarantine_cluster_by_auto_string(self): + """Test quarantine table cluster_by_auto with string value.""" + onboarding_row = { + "bronze_database_quarantine_it": "quarantine_db", + "bronze_quarantine_table": "quarantine_table", + "bronze_quarantine_table_cluster_by_auto": "true" + } + onboardDataFlowSpecs = OnboardDataflowspec(self.spark, self.onboarding_bronze_silver_params_map) + quarantine_target_details, _ = ( + onboardDataFlowSpecs._OnboardDataflowspec__get_quarantine_details( + "it", "bronze", onboarding_row + ) + ) + self.assertEqual(quarantine_target_details.get("cluster_by_auto"), True) + + def test_quarantine_cluster_by_auto_default(self): + """Test quarantine table cluster_by_auto defaults when not provided.""" + onboarding_row = { + "bronze_database_quarantine_it": "quarantine_db", + "bronze_quarantine_table": "quarantine_table" + } + onboardDataFlowSpecs = OnboardDataflowspec(self.spark, self.onboarding_bronze_silver_params_map) + quarantine_target_details, _ = ( + onboardDataFlowSpecs._OnboardDataflowspec__get_quarantine_details( + "it", "bronze", onboarding_row + ) + ) + self.assertEqual(quarantine_target_details.get("cluster_by_auto"), False) diff --git a/tests/test_pipeline_readers.py b/tests/test_pipeline_readers.py index c7662dfd..24cbe41c 100644 --- a/tests/test_pipeline_readers.py +++ b/tests/test_pipeline_readers.py @@ -54,6 +54,7 @@ class PipelineReadersTests(DLTFrameworkTestCase): "updateDate": datetime.now, "updatedBy": "dlt-meta-unittest", "clusterBy": [""], + "clusterByAuto": False, } bronze_eventhub_dataflow_spec_map = { @@ -97,6 +98,7 @@ class PipelineReadersTests(DLTFrameworkTestCase): "updateDate": datetime.now, "updatedBy": "dlt-meta-unittest", "clusterBy": [""], + "clusterByAuto": False, } bronze_eventhub_dataflow_spec_omit_secret_map = { @@ -139,6 +141,7 @@ class PipelineReadersTests(DLTFrameworkTestCase): "updateDate": datetime.now, "updatedBy": "dlt-meta-unittest", "clusterBy": [""], + "clusterByAuto": False, } bronze_kafka_dataflow_spec_map = { @@ -173,6 +176,7 @@ class PipelineReadersTests(DLTFrameworkTestCase): "updateDate": datetime.now, "updatedBy": "dlt-meta-unittest", "clusterBy": [""], + "clusterByAuto": False, } @classmethod diff --git a/tests/test_pipeline_writers.py b/tests/test_pipeline_writers.py index 5b1b50a3..3c5d5007 100644 --- a/tests/test_pipeline_writers.py +++ b/tests/test_pipeline_writers.py @@ -61,10 +61,23 @@ def test_write_to_sink(self, mock_append_flow, mock_create_sink): mock_create_sink.assert_called_once_with(name='test_sink', format='kafka', options={}) mock_append_flow.assert_called_once() - @patch('dlt.create_sink', new_callable=MagicMock) - @patch('dlt.append_flow', new_callable=MagicMock) - @patch('dlt.table', new_callable=MagicMock) - def test_dataflowpipeline_bronze_sink_write(self, mock_dlt_table, mock_append_flow, mock_create_sink): + @patch('src.pipeline_writers.dlt') + @patch('src.dataflow_pipeline.dlt') + def test_dataflowpipeline_bronze_sink_write(self, mock_dataflow_dlt, mock_writers_dlt): + mock_dlt_table = MagicMock(return_value=lambda func: func) + mock_append_flow = MagicMock(return_value=lambda func: func) + mock_create_sink = MagicMock() + + # Set up mocks for dataflow_pipeline.dlt + mock_dataflow_dlt.table = mock_dlt_table + mock_dataflow_dlt.append_flow = mock_append_flow + mock_dataflow_dlt.create_sink = mock_create_sink + + # Set up mocks for pipeline_writers.dlt + mock_writers_dlt.create_sink = mock_create_sink + mock_writers_dlt.append_flow = mock_append_flow + mock_writers_dlt.read_stream = MagicMock(return_value=None) + local_params = copy.deepcopy(self.onboarding_bronze_silver_params_map) local_params["onboarding_file_path"] = self.onboarding_sink_json_file local_params["bronze_dataflowspec_table"] = "bronze_dataflowspec_sink" @@ -85,20 +98,14 @@ def test_dataflowpipeline_bronze_sink_write(self, mock_dlt_table, mock_append_fl view_name = f"{bronze_dataflow_spec.targetDetails['table']}_inputView" pipeline = DataflowPipeline(self.spark, BronzeDataflowSpec(**bronze_dataflow_spec.asDict()), view_name, None) pipeline.write() - assert mock_create_sink.called_with( - name="sink", - target="sink", - comment="sink dlt table sink" - ) - assert mock_append_flow.called_with( - name="sink", - target="sink" - ) - assert mock_dlt_table.called_with( - pipeline.write_to_delta, - name="sink", - partition_cols=[], - table_properties={}, - path=None, - comment="sink dlt table sink" - ) + # Verify that create_sink was called (may be called multiple times for multiple sinks) + self.assertGreater(mock_create_sink.call_count, 0, "create_sink should have been called") + # Verify all calls have the required parameters + for call in mock_create_sink.call_args_list: + _, kwargs = call + self.assertIn('name', kwargs) + self.assertIn('format', kwargs) + self.assertIn('options', kwargs) + # Check that append_flow and dlt.table were called + self.assertGreater(mock_append_flow.call_count, 0) + self.assertGreater(mock_dlt_table.call_count, 0)