Skip to content

Commit de59b5c

Browse files
committed
control-service: Extending Trino lineage
Fix lineage for templates. --------- Signed-off-by: Tonka Zheleva <tonka.zheleva@broadcom.com>
1 parent 4e11a2e commit de59b5c

File tree

4 files changed

+12
-8
lines changed

4 files changed

+12
-8
lines changed

projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/dimension/scd1/03-handle-quality-checks_and_move_data.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from vdk.api.job_input import IJobInput
88
from vdk.api.lineage.model.logger.lineage_logger import ILineageLogger
99
from vdk.api.lineage.model.sql.model import LineageData
10+
from vdk.api.lineage.model.sql.model import LineageTable
1011
from vdk.internal.core.statestore import StoreKey
1112
from vdk.plugin.trino.templates.data_quality_exception import DataQualityException
1213
from vdk.plugin.trino.trino_utils import CommonUtilities
@@ -196,8 +197,8 @@ def copy_staging_table_to_target_table(
196197
query="template",
197198
query_type="template",
198199
query_status="OK",
199-
input_tables=[original_source_schema + "." + original_source_view],
200-
output_table=target_schema + "." + target_table,
200+
input_tables=[LineageTable('',original_source_schema, original_source_view)],
201+
output_table=LineageTable('', target_schema, target_table)
201202
)
202203

203204
LINEAGE_LOGGER_KEY = StoreKey[ILineageLogger]("trino-lineage-logger")

projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/dimension/scd1_upsert/03-handle-quality-checks_and_move_data.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from vdk.api.job_input import IJobInput
88
from vdk.api.lineage.model.logger.lineage_logger import ILineageLogger
99
from vdk.api.lineage.model.sql.model import LineageData
10+
from vdk.api.lineage.model.sql.model import LineageTable
1011
from vdk.internal.core.statestore import StoreKey
1112
from vdk.plugin.trino.templates.data_quality_exception import DataQualityException
1213
from vdk.plugin.trino.trino_utils import CommonUtilities
@@ -150,8 +151,8 @@ def copy_staging_table_to_target_table(
150151
query="template",
151152
query_type="template",
152153
query_status="OK",
153-
input_tables=[original_source_schema + "." + original_source_view],
154-
output_table=target_schema + "." + target_table,
154+
input_tables=[LineageTable('',original_source_schema, original_source_view)],
155+
output_table=LineageTable('', target_schema, target_table)
155156
)
156157

157158
LINEAGE_LOGGER_KEY = StoreKey[ILineageLogger]("trino-lineage-logger")

projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-handle-quality-checks_and_move_data.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from vdk.api.job_input import IJobInput
77
from vdk.api.lineage.model.logger.lineage_logger import ILineageLogger
88
from vdk.api.lineage.model.sql.model import LineageData
9+
from vdk.api.lineage.model.sql.model import LineageTable
910
from vdk.internal.core.statestore import StoreKey
1011
from vdk.plugin.trino.templates.data_quality_exception import DataQualityException
1112
from vdk.plugin.trino.trino_utils import CommonUtilities
@@ -136,8 +137,8 @@ def run(job_input: IJobInput):
136137
query="template",
137138
query_type="template",
138139
query_status="OK",
139-
input_tables=[source_schema + "." + source_view],
140-
output_table=target_schema + "." + target_table,
140+
input_tables=[LineageTable('', source_schema, source_view)],
141+
output_table=LineageTable('', target_schema, target_table)
141142
)
142143

143144
LINEAGE_LOGGER_KEY = StoreKey[ILineageLogger]("trino-lineage-logger")

projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/snapshot/02-handle-quality-checks_and_move_data.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from vdk.api.job_input import IJobInput
77
from vdk.api.lineage.model.logger.lineage_logger import ILineageLogger
88
from vdk.api.lineage.model.sql.model import LineageData
9+
from vdk.api.lineage.model.sql.model import LineageTable
910
from vdk.internal.core.statestore import StoreKey
1011
from vdk.plugin.trino.templates.data_quality_exception import DataQualityException
1112
from vdk.plugin.trino.trino_utils import CommonUtilities
@@ -124,8 +125,8 @@ def run(job_input: IJobInput):
124125
query="template",
125126
query_type="template",
126127
query_status="OK",
127-
input_tables=[source_schema + "." + source_view],
128-
output_table=target_schema + "." + target_table,
128+
input_tables=[LineageTable('', source_schema, source_view)],
129+
output_table=LineageTable('', target_schema, target_table)
129130
)
130131

131132
LINEAGE_LOGGER_KEY = StoreKey[ILineageLogger]("trino-lineage-logger")

0 commit comments

Comments
 (0)