Skip to content

Commit 2214076

Browse files
Merge pull request #30 from databrickslabs/feature/dlt-meta-uc-cli
Added unit tests and readme
2 parents 9f1521c + e0932fe commit 2214076

File tree

9 files changed

+142
-157
lines changed

9 files changed

+142
-157
lines changed

README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,14 @@ With this framework you need to record the source and target metadata in an onbo
6969
## Getting Started
7070
Refer to the [Getting Started](https://databrickslabs.github.io/dlt-meta/getting_started)
7171
### Databricks Labs DLT-META CLI lets you run onboard and deploy in interactive python terminal
72+
#### pre-requisites:
73+
- [Databricks CLI](https://docs.databricks.com/en/dev-tools/cli/tutorial.html)
74+
- Python 3.8.0 +
75+
#### Steps:
7276
- ``` git clone dlt-meta ```
7377
- ``` cd dlt-meta ```
7478
- ``` python -m venv .venv ```
7579
- ```source .venv/bin/activate ```
76-
- ``` pip install databricks ```
7780
- ``` pip install databricks-sdk ```
7881
- ```databricks labs dlt-meta onboard```
7982
- - Above command will prompt you to provide onboarding details. If you have cloned dlt-meta git repo then accept defaults which will launch config from demo folder.

src/__main__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ def parse_args():
3535
def main():
3636
"""Whl file entry point."""
3737
args = parse_args()
38+
onboard_dataflowspecs(args)
39+
40+
41+
def onboard_dataflowspecs(args):
3842
onboard_layer = args.onboard_layer
3943
uc_enabled = True if args.uc_enabled and args.uc_enabled.lower() == "true" else False
4044
onboarding_args_dict = args.__dict__

src/dataflow_pipeline.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,6 @@ def invoke_dlt_pipeline(spark, layer):
360360
dataflowspec_list = DataflowSpecUtils.get_bronze_dataflow_spec(spark)
361361
elif "silver" == layer.lower():
362362
dataflowspec_list = DataflowSpecUtils.get_silver_dataflow_spec(spark)
363-
364363
logger.info(f"Length of Dataflow Spec {len(dataflowspec_list)}")
365364
for dataflowSpec in dataflowspec_list:
366365
logger.info("Printing Dataflow Spec")

src/pipeline_readers.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ def read_kafka(spark, bronze_dataflow_spec, schema_json) -> DataFrame:
105105
# add date, hour, and minute columns derived from eventhub enqueued timestamp
106106
.selectExpr("*", "to_date(timestamp) as date", "hour(timestamp) as hour", "minute(timestamp) as minute")
107107
)
108-
109108
if schema_json:
110109
schema = StructType.fromJson(schema_json)
111110
return (

tests/resources/onboarding.json

Lines changed: 1 addition & 154 deletions
Original file line numberDiff line numberDiff line change
@@ -1,154 +1 @@
1-
[
2-
{
3-
"data_flow_id": "100",
4-
"data_flow_group": "A1",
5-
"source_system": "MYSQL",
6-
"source_format": "cloudFiles",
7-
"source_details": {
8-
"source_database": "APP",
9-
"source_table": "CUSTOMERS",
10-
"source_path_dev": "tests/resources/data/customers",
11-
"source_schema_path": "tests/resources/schema/customer_schema.ddl"
12-
},
13-
"bronze_database_dev": "bronze",
14-
"bronze_database_staging": "bronze",
15-
"bronze_database_prd": "bronze",
16-
"bronze_table": "customers_cdc",
17-
"bronze_reader_options": {
18-
"cloudFiles.format": "json",
19-
"cloudFiles.inferColumnTypes": "true",
20-
"cloudFiles.rescuedDataColumn": "_rescued_data"
21-
},
22-
"bronze_table_path_dev": "tests/resources/delta/customers",
23-
"bronze_table_properties": {
24-
"pipelines.autoOptimize.managed": "false",
25-
"pipelines.reset.allowed": "false"
26-
},
27-
"bronze_data_quality_expectations_json_dev": "tests/resources/dqe/customers/bronze_data_quality_expectations.json",
28-
"silver_database_dev": "silver",
29-
"silver_database_staging": "silver",
30-
"silver_database_prd": "silver",
31-
"silver_table": "customers",
32-
"silver_cdc_apply_changes": {
33-
"keys": [
34-
"id"
35-
],
36-
"sequence_by": "operation_date",
37-
"scd_type": "1",
38-
"apply_as_deletes": "operation = 'DELETE'",
39-
"except_column_list": [
40-
"operation",
41-
"operation_date",
42-
"_rescued_data"
43-
]
44-
},
45-
"silver_table_path_dev": "tests/resources/data/silver/customers",
46-
"silver_table_properties": {
47-
"pipelines.autoOptimize.managed": "false",
48-
"pipelines.reset.allowed": "false",
49-
"pipelines.autoOptimize.zOrderCols": "id,email"
50-
},
51-
"silver_transformation_json_dev": "tests/resources/silver_transformations.json"
52-
},
53-
{
54-
"data_flow_id": "101",
55-
"data_flow_group": "A1",
56-
"source_system": "MYSQL",
57-
"source_format": "cloudFiles",
58-
"source_details": {
59-
"source_database": "APP",
60-
"source_table": "TRANSACTIONS",
61-
"source_path_prd": "tests/resources/data/transactions",
62-
"source_path_dev": "tests/resources/data/transactions"
63-
},
64-
"bronze_database_dev": "bronze",
65-
"bronze_database_staging": "bronze",
66-
"bronze_database_prd": "bronze",
67-
"bronze_table": "transactions_cdc",
68-
"bronze_reader_options": {
69-
"cloudFiles.format": "json",
70-
"cloudFiles.inferColumnTypes": "true",
71-
"cloudFiles.rescuedDataColumn": "_rescued_data"
72-
},
73-
"bronze_table_path_dev": "tests/resources/delta/transactions",
74-
"bronze_table_path_staging": "s3://db-dlt-meta-staging/demo/data/bronze/transactions",
75-
"bronze_table_path_prd": "s3://db-dlt-meta-prod/demo/data/bronze/transactions",
76-
"bronze_table_properties": {
77-
"pipelines.reset.allowed": "false"
78-
},
79-
"bronze_data_quality_expectations_json_dev": "tests/resources/dqe/transactions/bronze_data_quality_expectations.json",
80-
"bronze_database_quarantine_dev": "bronze",
81-
"bronze_database_quarantine_staging": "bronze",
82-
"bronze_database_quarantine_prd": "bronze",
83-
"bronze_quarantine_table": "transactions_cdc_quarantine",
84-
"bronze_quarantine_table_path_dev": "tests/resources/data/bronze/transactions_quarantine",
85-
"silver_database_dev": "silver",
86-
"silver_database_preprd": "silver",
87-
"silver_database_prd": "silver",
88-
"silver_table": "transactions",
89-
"silver_cdc_apply_changes": {
90-
"keys": [
91-
"id"
92-
],
93-
"sequence_by": "operation_date",
94-
"scd_type": "1",
95-
"apply_as_deletes": "operation = 'DELETE'",
96-
"except_column_list": [
97-
"operation",
98-
"operation_date",
99-
"_rescued_data"
100-
]
101-
},
102-
"silver_partition_columns": "transaction_date",
103-
"silver_table_path_dev": "tests/resources/data/silver/transactions",
104-
"silver_transformation_json_dev": "tests/resources/silver_transformations.json",
105-
"silver_table_properties": {
106-
"pipelines.reset.allowed": "false",
107-
"pipelines.autoOptimize.zOrderCols": "id, customer_id"
108-
}
109-
},
110-
{
111-
"data_flow_id": "103",
112-
"data_flow_group": "A2",
113-
"source_system": "MYSQL",
114-
"source_format": "eventhub",
115-
"source_details": {
116-
"source_schema_path": "tests/resources/schema/eventhub_iot_schema.ddl",
117-
"eventhub.accessKeyName": "iotIngestionAccessKey",
118-
"eventhub.name": "iot",
119-
"eventhub.accessKeySecretName": "iotIngestionAccessKey",
120-
"eventhub.secretsScopeName": "eventhubs_creds",
121-
"kafka.sasl.mechanism": "PLAIN",
122-
"kafka.security.protocol": "SASL_SSL",
123-
"kafka.bootstrap.servers": "ganesh-standard.servicebus.windows.net:9093"
124-
},
125-
"bronze_database_dev": "bronze",
126-
"bronze_database_staging": "bronze",
127-
"bronze_database_prd": "bronze",
128-
"bronze_table": "iot_cdc",
129-
"bronze_reader_options": {
130-
"maxOffsetsPerTrigger": "50000",
131-
"startingOffsets": "latest",
132-
"failOnDataLoss": "false",
133-
"kafka.request.timeout.ms": "60000",
134-
"kafka.session.timeout.ms": "60000"
135-
},
136-
"bronze_table_path_dev": "tests/resources/delta/iot_cdc",
137-
"bronze_table_path_staging": "s3://db-dlt-meta-staging/demo/data/bronze/iot_cdc",
138-
"bronze_table_path_prd": "s3://db-dlt-meta-prod/demo/data/bronze/iot_cdc",
139-
"bronze_data_quality_expectations_json_dev": "tests/resources/dqe/iot_cdc/bronze_data_quality_expectations.json",
140-
"silver_database_dev": "silver",
141-
"silver_table": "iot_cdc",
142-
"silver_cdc_apply_changes": {
143-
"keys": [
144-
"device_id"
145-
],
146-
"sequence_by": "timestamp",
147-
"scd_type": "1",
148-
"apply_as_deletes": "operation = 'DELETE'",
149-
"except_column_list": []
150-
},
151-
"silver_table_path_dev": "tests/resources/data/silver/iot_cdc",
152-
"silver_transformation_json_dev": "tests/resources/silver_transformations.json"
153-
}
154-
]
1+
[{"data_flow_id": "100", "data_flow_group": "A1", "source_system": "MYSQL", "source_format": "cloudFiles", "source_details": {"source_database": "APP", "source_table": "CUSTOMERS", "source_path_dev": "tests/resources/data/customers", "source_schema_path": "tests/resources/schema/customer_schema.ddl"}, "bronze_database_dev": "bronze", "bronze_database_staging": "bronze", "bronze_database_prd": "bronze", "bronze_table": "customers_cdc", "bronze_reader_options": {"cloudFiles.format": "json", "cloudFiles.inferColumnTypes": "true", "cloudFiles.rescuedDataColumn": "_rescued_data"}, "bronze_table_path_dev": "tests/resources/delta/customers", "bronze_table_properties": {"pipelines.autoOptimize.managed": "false", "pipelines.reset.allowed": "false"}, "bronze_data_quality_expectations_json_dev": "tests/resources/dqe/customers/bronze_data_quality_expectations.json", "silver_database_dev": "silver", "silver_database_staging": "silver", "silver_database_prd": "silver", "silver_table": "customers", "silver_cdc_apply_changes": {"keys": ["id"], "sequence_by": "operation_date", "scd_type": "1", "apply_as_deletes": "operation = 'DELETE'", "except_column_list": ["operation", "operation_date", "_rescued_data"]}, "silver_table_path_dev": "tests/resources/data/silver/customers", "silver_table_properties": {"pipelines.autoOptimize.managed": "false", "pipelines.reset.allowed": "false", "pipelines.autoOptimize.zOrderCols": "id,email"}, "silver_transformation_json_dev": "tests/resources/silver_transformations.json"}, {"data_flow_id": "101", "data_flow_group": "A1", "source_system": "MYSQL", "source_format": "cloudFiles", "source_details": {"source_database": "APP", "source_table": "TRANSACTIONS", "source_path_prd": "tests/resources/data/transactions", "source_path_dev": "tests/resources/data/transactions"}, "bronze_database_dev": "bronze", "bronze_database_staging": "bronze", "bronze_database_prd": "bronze", "bronze_table": "transactions_cdc", "bronze_reader_options": {"cloudFiles.format": "json", "cloudFiles.inferColumnTypes": "true", "cloudFiles.rescuedDataColumn": "_rescued_data"}, "bronze_table_path_dev": "tests/resources/delta/transactions", "bronze_table_path_staging": "s3://db-dlt-meta-staging/demo/data/bronze/transactions", "bronze_table_path_prd": "s3://db-dlt-meta-prod/demo/data/bronze/transactions", "bronze_table_properties": {"pipelines.reset.allowed": "false"}, "bronze_data_quality_expectations_json_dev": "tests/resources/dqe/transactions/bronze_data_quality_expectations.json", "bronze_database_quarantine_dev": "bronze", "bronze_database_quarantine_staging": "bronze", "bronze_database_quarantine_prd": "bronze", "bronze_quarantine_table": "transactions_cdc_quarantine", "bronze_quarantine_table_path_dev": "tests/resources/data/bronze/transactions_quarantine", "silver_database_dev": "silver", "silver_database_preprd": "silver", "silver_database_prd": "silver", "silver_table": "transactions", "silver_cdc_apply_changes": {"keys": ["id"], "sequence_by": "operation_date", "scd_type": "1", "apply_as_deletes": "operation = 'DELETE'", "except_column_list": ["operation", "operation_date", "_rescued_data"]}, "silver_partition_columns": "transaction_date", "silver_table_path_dev": "tests/resources/data/silver/transactions", "silver_transformation_json_dev": "tests/resources/silver_transformations.json", "silver_table_properties": {"pipelines.reset.allowed": "false", "pipelines.autoOptimize.zOrderCols": "id, customer_id"}}, {"data_flow_id": "103", "data_flow_group": "A2", "source_system": "MYSQL", "source_format": "eventhub", "source_details": {"source_schema_path": "tests/resources/schema/eventhub_iot_schema.ddl", "eventhub.accessKeyName": "iotIngestionAccessKey", "eventhub.name": "iot", "eventhub.accessKeySecretName": "iotIngestionAccessKey", "eventhub.secretsScopeName": "eventhubs_creds", "kafka.sasl.mechanism": "PLAIN", "kafka.security.protocol": "SASL_SSL", "kafka.bootstrap.servers": "ganesh-standard.servicebus.windows.net:9093"}, "bronze_database_dev": "bronze", "bronze_database_staging": "bronze", "bronze_database_prd": "bronze", "bronze_table": "iot_cdc", "bronze_reader_options": {"maxOffsetsPerTrigger": "50000", "startingOffsets": "latest", "failOnDataLoss": "false", "kafka.request.timeout.ms": "60000", "kafka.session.timeout.ms": "60000"}, "bronze_table_path_dev": "tests/resources/delta/iot_cdc", "bronze_table_path_staging": "s3://db-dlt-meta-staging/demo/data/bronze/iot_cdc", "bronze_table_path_prd": "s3://db-dlt-meta-prod/demo/data/bronze/iot_cdc", "bronze_data_quality_expectations_json_dev": "tests/resources/dqe/iot_cdc/bronze_data_quality_expectations.json", "silver_database_dev": "silver", "silver_table": "iot_cdc", "silver_cdc_apply_changes": {"keys": ["device_id"], "sequence_by": "timestamp", "scd_type": "1", "apply_as_deletes": "operation = 'DELETE'", "except_column_list": []}, "silver_table_path_dev": "tests/resources/data/silver/iot_cdc", "silver_transformation_json_dev": "tests/resources/silver_transformations.json"}]

tests/resources/onboarding_ac_type2.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
"sequence_by": "dmsTimestamp",
3232
"scd_type": "2",
3333
"apply_as_deletes": "Op = 'D'",
34+
"apply_as_truncates": "Op = 'T'",
3435
"except_column_list": [
3536
"Op",
3637
"dmsTimestamp",
@@ -72,6 +73,7 @@
7273
"sequence_by": "dmsTimestamp",
7374
"scd_type": "2",
7475
"apply_as_deletes": "Op = 'D'",
76+
"apply_as_truncates": "Op = 'T'",
7577
"except_column_list": [
7678
"Op",
7779
"dmsTimestamp",

tests/test_main.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
import sys
44
import copy
55
from src import __main__
6+
from unittest.mock import MagicMock
7+
8+
spark = MagicMock()
9+
OnboardDataflowspec = MagicMock()
610

711

812
class MainTests(DLTFrameworkTestCase):
@@ -101,3 +105,73 @@ def test_main_layer_missing(self):
101105
sys.argv = list
102106
with self.assertRaises(Exception):
103107
__main__.main()
108+
109+
def test_main_bronze_silver_uc(self):
110+
"""Test bronze and silver onboarding for uc."""
111+
OnboardDataflowspec.return_value = None
112+
spark_mock = MagicMock("SparkSession")
113+
spark.builder.appName("DLT-META_Onboarding_Task").getOrCreate().return_value = spark_mock
114+
param_map = copy.deepcopy(self.onboarding_bronze_silver_params_uc_map)
115+
param_map["onboard_layer"] = "bronze_silver"
116+
list = ["dummy_test"]
117+
for key in param_map:
118+
list.append(f"--{key}={param_map[key]}")
119+
sys.argv = list
120+
__main__.main()
121+
bronze_dataflowSpec_df = (self.spark.read.format("delta").table(
122+
f"{param_map['database']}.{param_map['bronze_dataflowspec_table']}")
123+
)
124+
self.assertEqual(bronze_dataflowSpec_df.count(), 3)
125+
silver_dataflowSpec_df = (self.spark.read.format("delta") .table(
126+
f"{param_map['database']}.{param_map['silver_dataflowspec_table']}")
127+
)
128+
self.assertEqual(silver_dataflowSpec_df.count(), 3)
129+
del param_map['onboard_layer']
130+
del param_map['uc_enabled']
131+
del param_map['bronze_dataflowspec_path']
132+
del param_map['silver_dataflowspec_path']
133+
OnboardDataflowspec.called_once_with(spark_mock, param_map, uc_enabled=True)
134+
135+
def test_onboarding(self):
136+
mock_onboard_dataflowspec = OnboardDataflowspec
137+
mock_args = MagicMock()
138+
mock_args.onboard_layer = 'bronze_silver'
139+
mock_args.uc_enabled = 'true'
140+
mock_args.__dict__ = {
141+
'onboard_layer': 'bronze_silver',
142+
'uc_enabled': 'true',
143+
'bronze_dataflowspec_path': 'path/to/bronze_dataflowspec',
144+
'silver_dataflowspec_path': 'path/to/silver_dataflowspec'
145+
}
146+
147+
spark_mock = MagicMock("SparkSession")
148+
spark.builder.appName("DLT-META_Onboarding_Task").getOrCreate().return_value = spark_mock
149+
150+
mock_onboard_obj = MagicMock()
151+
mock_onboard_dataflowspec.return_value = mock_onboard_obj
152+
153+
# Act
154+
onboard_layer = mock_args.onboard_layer
155+
uc_enabled = True if mock_args.uc_enabled and mock_args.uc_enabled.lower() == "true" else False
156+
onboarding_args_dict = mock_args.__dict__
157+
del onboarding_args_dict['onboard_layer']
158+
del onboarding_args_dict['uc_enabled']
159+
if uc_enabled:
160+
if 'bronze_dataflowspec_path' in onboarding_args_dict:
161+
del onboarding_args_dict['bronze_dataflowspec_path']
162+
if 'silver_dataflowspec_path' in onboarding_args_dict:
163+
del onboarding_args_dict['silver_dataflowspec_path']
164+
onboard_obj = mock_onboard_dataflowspec(spark, onboarding_args_dict, uc_enabled=uc_enabled)
165+
166+
if onboard_layer.lower() == "bronze_silver":
167+
onboard_obj.onboard_dataflow_specs()
168+
elif onboard_layer.lower() == "bronze":
169+
onboard_obj.onboard_bronze_dataflow_spec()
170+
elif onboard_layer.lower() == "silver":
171+
onboard_obj.onboard_silver_dataflow_spec()
172+
else:
173+
raise Exception("onboard_layer argument missing in commandline")
174+
175+
# Assert
176+
mock_onboard_dataflowspec.assert_called_once_with(spark, {}, uc_enabled=True)
177+
mock_onboard_obj.onboard_dataflow_specs.assert_called_once()

tests/test_onboard_dataflowspec.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,41 @@ def test_onboardDataFlowSpecs_with_merge(self):
121121
if bronze_row.dataFlowId == "103":
122122
self.assertEqual(bronze_row.readerConfigOptions.get("maxOffsetsPerTrigger"), "60000")
123123

124+
def test_onboardDataFlowSpecs_with_merge_uc(self):
125+
"""Test for onboardDataflowspec with merge scenario."""
126+
local_params = copy.deepcopy(self.onboarding_bronze_silver_params_uc_map)
127+
local_params["onboarding_file_path"] = self.onboarding_json_file
128+
del local_params["uc_enabled"]
129+
onboardDataFlowSpecs = OnboardDataflowspec(self.spark, local_params, uc_enabled=True)
130+
onboardDataFlowSpecs.onboard_dataflow_specs()
131+
bronze_dataflowSpec_df = self.read_dataflowspec(
132+
self.onboarding_bronze_silver_params_map['database'],
133+
self.onboarding_bronze_silver_params_map['bronze_dataflowspec_table'])
134+
silver_dataflowSpec_df = self.read_dataflowspec(
135+
self.onboarding_bronze_silver_params_map['database'],
136+
self.onboarding_bronze_silver_params_map['silver_dataflowspec_table'])
137+
self.assertEqual(bronze_dataflowSpec_df.count(), 3)
138+
self.assertEqual(silver_dataflowSpec_df.count(), 3)
139+
local_params["overwrite"] = "False"
140+
local_params["onboarding_file_path"] = self.onboarding_v2_json_file
141+
onboardDataFlowSpecs = OnboardDataflowspec(self.spark, local_params, uc_enabled=True)
142+
onboardDataFlowSpecs.onboard_dataflow_specs()
143+
bronze_dataflowSpec_df = self.read_dataflowspec(
144+
self.onboarding_bronze_silver_params_map['database'],
145+
self.onboarding_bronze_silver_params_map['bronze_dataflowspec_table'])
146+
silver_dataflowSpec_df = self.read_dataflowspec(
147+
self.onboarding_bronze_silver_params_map['database'],
148+
self.onboarding_bronze_silver_params_map['silver_dataflowspec_table'])
149+
self.assertEqual(bronze_dataflowSpec_df.count(), 3)
150+
self.assertEqual(silver_dataflowSpec_df.count(), 3)
151+
bronze_df_rows = bronze_dataflowSpec_df.collect()
152+
for bronze_df_row in bronze_df_rows:
153+
bronze_row = BronzeDataflowSpec(**bronze_df_row.asDict())
154+
if bronze_row.dataFlowId in ["101", "102"]:
155+
self.assertIsNone(bronze_row.readerConfigOptions.get("cloudFiles.rescuedDataColumn"))
156+
if bronze_row.dataFlowId == "103":
157+
self.assertEqual(bronze_row.readerConfigOptions.get("maxOffsetsPerTrigger"), "60000")
158+
124159
def test_onboardBronzeDataflowSpec_positive(self):
125160
"""Test for onboardDataflowspec."""
126161
onboarding_params_map = copy.deepcopy(self.onboarding_bronze_silver_params_map)

0 commit comments

Comments
 (0)