Skip to content

Commit b1f5304

Browse files
committed
1.Added delta reader as per PR comment
2.Corrected docs formatting for code blocks 3.Added unit tests for Delta reader
1 parent ca67cf2 commit b1f5304

File tree

6 files changed

+83
-29
lines changed

6 files changed

+83
-29
lines changed

docs/content/faq/general.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ DLT-META is a solution/framework using Databricks Delta Live Tables aka DLT whic
1717

1818
**Q. What different types of reader are supported using DLT-META ?**
1919

20-
DLT-META uses Databricks [Auto Loader](https://docs.databricks.com/ingestion/auto-loader/index.html) to read from s3/adls/blog stroage.
20+
DLT-META uses Databricks [Auto Loader](https://docs.databricks.com/ingestion/auto-loader/index.html), DELTA, KAFKA, EVENTHUB to read from s3/adls/blog stroage.
2121

2222
**Q. Can DLT-META support any other readers?**
2323

docs/content/getting_started/additionals.md

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,28 +25,28 @@ export DATABRICKS_TOKEN=<DATABRICKS TOKEN> # Account needs permission to create
2525

2626
5b. Run the command for eventhub ```python integration-tests/run-integration-test.py --cloud_provider_name=azure --dbr_version=11.3.x-scala2.12 --source=eventhub --dbfs_path=dbfs:/tmp/DLT-META/ --eventhub_name=iot --eventhub_secrets_scope_name=eventhubs_creds --eventhub_namespace=int_test-standard --eventhub_port=9093 --eventhub_producer_accesskey_name=producer ----eventhub_consumer_accesskey_name=consumer```
2727

28-
For eventhub integration tests, the following are the prerequisites:
29-
1. Needs eventhub instance running
30-
2. Using Databricks CLI, Create databricks secrets scope for eventhub keys
31-
3. Using Databricks CLI, Create databricks secrets to store producer and consumer keys using the scope created in step 2
28+
For eventhub integration tests, the following are the prerequisites:
29+
1. Needs eventhub instance running
30+
2. Using Databricks CLI, Create databricks secrets scope for eventhub keys
31+
3. Using Databricks CLI, Create databricks secrets to store producer and consumer keys using the scope created in step 2
3232

33-
Following are the mandatory arguments for running EventHubs integration test
34-
1. Provide your eventhub topic : --eventhub_name
35-
2. Provide eventhub namespace : --eventhub_namespace
36-
3. Provide eventhub port : --eventhub_port
37-
4. Provide databricks secret scope name : ----eventhub_secrets_scope_name
38-
5. Provide eventhub producer access key name : --eventhub_producer_accesskey_name
39-
6. Provide eventhub access key name : --eventhub_consumer_accesskey_name
33+
Following are the mandatory arguments for running EventHubs integration test
34+
1. Provide your eventhub topic : --eventhub_name
35+
2. Provide eventhub namespace : --eventhub_namespace
36+
3. Provide eventhub port : --eventhub_port
37+
4. Provide databricks secret scope name : ----eventhub_secrets_scope_name
38+
5. Provide eventhub producer access key name : --eventhub_producer_accesskey_name
39+
6. Provide eventhub access key name : --eventhub_consumer_accesskey_name
4040

4141

4242
5c. Run the command for kafka ```python3 integration-tests/run-integration-test.py --cloud_provider_name=aws --dbr_version=11.3.x-scala2.12 --source=kafka --dbfs_path=dbfs:/tmp/DLT-META/ --kafka_topic_name=dlt-meta-integration-test --kafka_broker=host:9092```
4343

44-
For kafka integration tests, the following are the prerequisites:
45-
1. Needs kafka instance running
44+
For kafka integration tests, the following are the prerequisites:
45+
1. Needs kafka instance running
4646

47-
Following are the mandatory arguments for running EventHubs integration test
48-
1. Provide your kafka topic name : --kafka_topic_name
49-
2. Provide kafka_broker : --kafka_broker
47+
Following are the mandatory arguments for running EventHubs integration test
48+
1. Provide your kafka topic name : --kafka_topic_name
49+
2. Provide kafka_broker : --kafka_broker
5050

5151
6. Once finished integration output file will be copied locally to
5252
```integration-test-output_<run_id>.txt```

docs/content/getting_started/runoboardingopt2.md

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,18 @@ draft: false
1212
```
1313
```
1414
onboarding_params_map = {
15-
"database": "dlt_demo",
16-
"onboarding_file_path": "dbfs:/onboarding_files/users_onboarding.json",
17-
"bronze_dataflowspec_table": "bronze_dataflowspec_table",
18-
"bronze_dataflowspec_path": "dbfs:/onboarding_tables_cdc/bronze",
19-
"silver_dataflowspec_table": "silver_dataflowspec_table",
20-
"silver_dataflowspec_path": "dbfs:/onboarding_tables_cdc/silver",
21-
"overwrite": "True",
22-
"onboard_layer": "bronze_silver",
23-
"env": "dev",
24-
"version": "v1",
25-
"import_author": "Ravi"
26-
}
15+
"database": "dlt_demo",
16+
"onboarding_file_path": "dbfs:/onboarding_files/users_onboarding.json",
17+
"bronze_dataflowspec_table": "bronze_dataflowspec_table",
18+
"bronze_dataflowspec_path": "dbfs:/onboarding_tables_cdc/bronze",
19+
"silver_dataflowspec_table": "silver_dataflowspec_table",
20+
"silver_dataflowspec_path": "dbfs:/onboarding_tables_cdc/silver",
21+
"overwrite": "True",
22+
"onboard_layer": "bronze_silver",
23+
"env": "dev",
24+
"version": "v1",
25+
"import_author": "Ravi"
26+
}
2727
2828
from src.onboard_dataflowspec import OnboardDataflowspec
2929
OnboardDataflowspec(spark, onboarding_params_map).onboard_dataflow_specs()

src/dataflow_pipeline.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ def read_bronze(self) -> DataFrame:
9898
bronze_dataflow_spec: BronzeDataflowSpec = self.dataflowSpec
9999
if bronze_dataflow_spec.sourceFormat == "cloudFiles" or bronze_dataflow_spec.sourceFormat == "delta":
100100
return PipelineReaders.read_dlt_cloud_files(self.spark, bronze_dataflow_spec, self.schema_json)
101+
if bronze_dataflow_spec.sourceFormat == "delta":
102+
return PipelineReaders.read_dlt_delta(self.spark, bronze_dataflow_spec)
101103
elif bronze_dataflow_spec.sourceFormat == "eventhub" or bronze_dataflow_spec.sourceFormat == "kafka":
102104
return PipelineReaders.read_kafka(self.spark, bronze_dataflow_spec, self.schema_json)
103105
else:

src/pipeline_readers.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,34 @@ def read_dlt_cloud_files(spark, bronze_dataflow_spec, schema_json) -> DataFrame:
4646
.load(source_path)
4747
)
4848

49+
@staticmethod
50+
def read_dlt_delta(spark, bronze_dataflow_spec) -> DataFrame:
51+
"""Read dlt delta.
52+
53+
Args:
54+
spark (_type_): _description_
55+
bronze_dataflow_spec (_type_): _description_
56+
schema_json (_type_): _description_
57+
58+
Returns:
59+
DataFrame: _description_
60+
"""
61+
logger.info("In read_dlt_cloud_files func")
62+
source_path = bronze_dataflow_spec.sourceDetails["path"]
63+
reader_config_options = bronze_dataflow_spec.readerConfigOptions
64+
65+
if reader_config_options and len(reader_config_options) > 0:
66+
return (
67+
spark.readStream.format(bronze_dataflow_spec.sourceFormat)
68+
.options(**reader_config_options)
69+
.load(source_path)
70+
)
71+
else:
72+
return (
73+
spark.readStream.format(bronze_dataflow_spec.sourceFormat)
74+
.load(source_path)
75+
)
76+
4977
@staticmethod
5078
def get_db_utils(spark):
5179
"""Get databricks utils using DBUtils package."""

tests/test_pipeline_readers.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,30 @@ def test_read_cloud_files_positive(self):
141141
customer_df = PipelineReaders.read_dlt_cloud_files(self.spark, bronze_dataflow_spec, schema)
142142
self.assertIsNotNone(customer_df)
143143

144+
def test_read_delta_positive(self):
145+
"""Test read_cloud_files positive."""
146+
bronze_map = PipelineReadersTests.bronze_dataflow_spec_map
147+
source_format_map = {"sourceFormat": "delta"}
148+
bronze_map.update(source_format_map)
149+
source_details_map = {"sourceDetails": {"path": "tests/resources/delta/customers"}}
150+
bronze_map.update(source_details_map)
151+
bronze_dataflow_spec = BronzeDataflowSpec(**bronze_map)
152+
customer_df = PipelineReaders.read_dlt_delta(self.spark, bronze_dataflow_spec)
153+
self.assertIsNotNone(customer_df)
154+
155+
def test_read_delta_with_read_config_positive(self):
156+
"""Test read_cloud_files positive."""
157+
bronze_map = PipelineReadersTests.bronze_dataflow_spec_map
158+
source_format_map = {"sourceFormat": "delta"}
159+
bronze_map.update(source_format_map)
160+
source_details_map = {"sourceDetails": {"path": "tests/resources/delta/customers"}}
161+
bronze_map.update(source_details_map)
162+
reader_config = {"readerConfigOptions": {"maxFilesPerTrigger": "1"}}
163+
bronze_map.update(reader_config)
164+
bronze_dataflow_spec = BronzeDataflowSpec(**bronze_map)
165+
customer_df = PipelineReaders.read_dlt_delta(self.spark, bronze_dataflow_spec)
166+
self.assertIsNotNone(customer_df)
167+
144168
@patch.object(PipelineReaders, "get_db_utils", return_value=dbutils)
145169
@patch.object(dbutils, "secrets.get", return_value={"called"})
146170
def test_get_eventhub_kafka_options(self, get_db_utils, dbutils):

0 commit comments

Comments
 (0)