Skip to content

Commit 3555aaa

Browse files
Merge pull request #33 from rtdtwo/patch-1
Mismatched Keys: Update read_dlt_delta() with key "source_database" instead of "database"
2 parents 423a776 + efae9df commit 3555aaa

File tree

2 files changed

+4
-4
lines changed

2 files changed

+4
-4
lines changed

src/pipeline_readers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,14 @@ def read_dlt_delta(spark, bronze_dataflow_spec) -> DataFrame:
6262
if reader_config_options and len(reader_config_options) > 0:
6363
return (
6464
spark.readStream.options(**reader_config_options).table(
65-
f"""{bronze_dataflow_spec.sourceDetails["database"]}
65+
f"""{bronze_dataflow_spec.sourceDetails["source_database"]}
6666
.{bronze_dataflow_spec.sourceDetails["table"]}"""
6767
)
6868
)
6969
else:
7070
return (
7171
spark.readStream.table(
72-
f"""{bronze_dataflow_spec.sourceDetails["database"]}
72+
f"""{bronze_dataflow_spec.sourceDetails["source_database"]}
7373
.{bronze_dataflow_spec.sourceDetails["table"]}"""
7474
)
7575
)

tests/test_pipeline_readers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ def test_read_delta_positive(self):
212212
full_path = os.path.abspath("tests/resources/delta/customers")
213213
self.spark.sql(f"CREATE TABLE if not exists source_bronze.customer USING DELTA LOCATION '{full_path}' ")
214214

215-
source_details_map = {"sourceDetails": {"database": "source_bronze", "table": "customer"}}
215+
source_details_map = {"sourceDetails": {"source_database": "source_bronze", "table": "customer"}}
216216

217217
bronze_map.update(source_details_map)
218218
bronze_dataflow_spec = BronzeDataflowSpec(**bronze_map)
@@ -227,7 +227,7 @@ def test_read_delta_with_read_config_positive(self):
227227
self.spark.sql("CREATE DATABASE IF NOT EXISTS source_bronze")
228228
full_path = os.path.abspath("tests/resources/delta/customers")
229229
self.spark.sql(f"CREATE TABLE if not exists source_bronze.customer USING DELTA LOCATION '{full_path}' ")
230-
source_details_map = {"sourceDetails": {"database": "source_bronze", "table": "customer"}}
230+
source_details_map = {"sourceDetails": {"source_database": "source_bronze", "table": "customer"}}
231231
bronze_map.update(source_details_map)
232232
reader_config = {"readerConfigOptions": {"maxFilesPerTrigger": "1"}}
233233
bronze_map.update(reader_config)

0 commit comments

Comments
 (0)