Skip to content

Commit 1992851

Browse files
committed
1.PR Commnet: refactored dataflow where clause
2.PR Comment: removed --- from docs 3.PR Comment: Corrected delta read condition
1 parent 832780a commit 1992851

File tree

3 files changed

+18
-7
lines changed

3 files changed

+18
-7
lines changed

docs/content/getting_started/additionals.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ export DATABRICKS_TOKEN=<DATABRICKS TOKEN> # Account needs permission to create
2323
5. Run integration test against cloudfile or eventhub or kafka using below options:
2424
5a. Run the command for cloudfiles ```python integration-tests/run-integration-test.py --cloud_provider_name=aws --dbr_version=11.3.x-scala2.12 --source=cloudfiles --dbfs_path=dbfs:/tmp/DLT-META/```
2525

26-
5b. Run the command for eventhub ```python integration-tests/run-integration-test.py --cloud_provider_name=azure --dbr_version=11.3.x-scala2.12 --source=eventhub --dbfs_path=dbfs:/tmp/DLT-META/ --eventhub_name=iot --eventhub_secrets_scope_name=eventhubs_creds --eventhub_namespace=int_test-standard --eventhub_port=9093 --eventhub_producer_accesskey_name=producer ----eventhub_consumer_accesskey_name=consumer```
26+
5b. Run the command for eventhub ```python integration-tests/run-integration-test.py --cloud_provider_name=azure --dbr_version=11.3.x-scala2.12 --source=eventhub --dbfs_path=dbfs:/tmp/DLT-META/ --eventhub_name=iot --eventhub_secrets_scope_name=eventhubs_creds --eventhub_namespace=int_test-standard --eventhub_port=9093 --eventhub_producer_accesskey_name=producer --eventhub_consumer_accesskey_name=consumer```
2727

2828
For eventhub integration tests, the following are the prerequisites:
2929
1. Needs eventhub instance running
@@ -34,7 +34,7 @@ export DATABRICKS_TOKEN=<DATABRICKS TOKEN> # Account needs permission to create
3434
1. Provide your eventhub topic : --eventhub_name
3535
2. Provide eventhub namespace : --eventhub_namespace
3636
3. Provide eventhub port : --eventhub_port
37-
4. Provide databricks secret scope name : ----eventhub_secrets_scope_name
37+
4. Provide databricks secret scope name : --eventhub_secrets_scope_name
3838
5. Provide eventhub producer access key name : --eventhub_producer_accesskey_name
3939
6. Provide eventhub access key name : --eventhub_consumer_accesskey_name
4040

src/dataflow_pipeline.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,9 @@ def read_bronze(self) -> DataFrame:
9696
"""Read Bronze Table."""
9797
logger.info("In read_bronze func")
9898
bronze_dataflow_spec: BronzeDataflowSpec = self.dataflowSpec
99-
if bronze_dataflow_spec.sourceFormat == "cloudFiles" or bronze_dataflow_spec.sourceFormat == "delta":
99+
if bronze_dataflow_spec.sourceFormat == "cloudFiles":
100100
return PipelineReaders.read_dlt_cloud_files(self.spark, bronze_dataflow_spec, self.schema_json)
101-
if bronze_dataflow_spec.sourceFormat == "delta":
101+
elif bronze_dataflow_spec.sourceFormat == "delta":
102102
return PipelineReaders.read_dlt_delta(self.spark, bronze_dataflow_spec)
103103
elif bronze_dataflow_spec.sourceFormat == "eventhub" or bronze_dataflow_spec.sourceFormat == "kafka":
104104
return PipelineReaders.read_kafka(self.spark, bronze_dataflow_spec, self.schema_json)
@@ -117,12 +117,25 @@ def get_silver_schema(self):
117117
format="delta"
118118
# #f"{source_database}.{source_table}"
119119
).selectExpr(*select_exp)
120+
raw_delta_table_stream = self.__apply_where_clause(where_clause, raw_delta_table_stream)
121+
return raw_delta_table_stream.schema
122+
123+
def __apply_where_clause(self, where_clause, raw_delta_table_stream):
124+
"""This method apply where clause provided in silver transformations
125+
126+
Args:
127+
where_clause (_type_): _description_
128+
raw_delta_table_stream (_type_): _description_
129+
130+
Returns:
131+
_type_: _description_
132+
"""
120133
if where_clause:
121134
where_clause_str = " ".join(where_clause)
122135
if len(where_clause_str.strip()) > 0:
123136
for where_clause in where_clause:
124137
raw_delta_table_stream = raw_delta_table_stream.where(where_clause)
125-
return raw_delta_table_stream.schema
138+
return raw_delta_table_stream
126139

127140
def read_silver(self) -> DataFrame:
128141
"""Read Silver tables."""

src/pipeline_readers.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,6 @@ def read_dlt_delta(spark, bronze_dataflow_spec) -> DataFrame:
5353
Args:
5454
spark (_type_): _description_
5555
bronze_dataflow_spec (_type_): _description_
56-
schema_json (_type_): _description_
57-
5856
Returns:
5957
DataFrame: _description_
6058
"""

0 commit comments

Comments
 (0)