Skip to content

Commit ce87867

Browse files
Merge pull request #228 from databrickslabs/issue_227
Added custom function support for kafka and delta tables
2 parents e2a5aa1 + 298fa4f commit ce87867

File tree

1 file changed

+2
-2
lines changed

1 file changed

+2
-2
lines changed

src/dataflow_pipeline.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -315,9 +315,9 @@ def read_bronze(self) -> DataFrame:
315315
if bronze_dataflow_spec.sourceFormat == "cloudFiles":
316316
input_df = pipeline_reader.read_dlt_cloud_files()
317317
elif bronze_dataflow_spec.sourceFormat == "delta" or bronze_dataflow_spec.sourceFormat == "snapshot":
318-
return pipeline_reader.read_dlt_delta()
318+
input_df = pipeline_reader.read_dlt_delta()
319319
elif bronze_dataflow_spec.sourceFormat == "eventhub" or bronze_dataflow_spec.sourceFormat == "kafka":
320-
return pipeline_reader.read_kafka()
320+
input_df = pipeline_reader.read_kafka()
321321
else:
322322
raise Exception(f"{bronze_dataflow_spec.sourceFormat} source format not supported")
323323
return self.apply_custom_transform_fun(input_df)

0 commit comments

Comments
 (0)