Skip to content

Commit bd393ab

Browse files
committed
control-service: Extending Trino lineage
Fix lineage for templates. --------- Signed-off-by: Tonka Zheleva <tonka.zheleva@broadcom.com>
1 parent f8d497a commit bd393ab

File tree

4 files changed

+90
-6
lines changed

4 files changed

+90
-6
lines changed

projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/dimension/scd1/03-handle-quality-checks_and_move_data.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1-
# Copyright 2023-2024 Broadcom
1+
# Copyright 2023-2025 Broadcom
22
# SPDX-License-Identifier: Apache-2.0
33
import logging
44
import os
55
import re
66

77
from vdk.api.job_input import IJobInput
8+
from vdk.api.lineage.model.logger.lineage_logger import ILineageLogger
9+
from vdk.api.lineage.model.sql.model import LineageData
10+
from vdk.internal.core.statestore import StoreKey
811
from vdk.plugin.trino.templates.data_quality_exception import DataQualityException
912
from vdk.plugin.trino.trino_utils import CommonUtilities
1013

@@ -77,7 +80,13 @@ def run(job_input: IJobInput):
7780
# copy the data if there's no quality check configure or if it passes
7881
if not check or check(staging_table_full_name):
7982
copy_staging_table_to_target_table(
80-
job_input, target_schema, target_table, staging_schema, staging_table
83+
job_input,
84+
target_schema,
85+
target_table,
86+
staging_schema,
87+
staging_table,
88+
source_schema,
89+
source_view,
8190
)
8291
else:
8392
target_table_full_name = f"{target_schema}.{target_table}"
@@ -94,6 +103,8 @@ def copy_staging_table_to_target_table(
94103
target_table,
95104
source_schema,
96105
source_table,
106+
original_source_schema,
107+
original_source_view,
97108
):
98109
# check if table is partitioned
99110
if is_partitioned_table(job_input, target_schema, target_table):
@@ -181,6 +192,20 @@ def copy_staging_table_to_target_table(
181192
)
182193
job_input.execute_query(insert_into_table)
183194

195+
lineage_data = LineageData(
196+
query="template",
197+
query_type="template",
198+
query_status="OK",
199+
input_tables=[original_source_schema + "." + original_source_view],
200+
output_table=target_schema + "." + target_table,
201+
)
202+
203+
LINEAGE_LOGGER_KEY = StoreKey[ILineageLogger]("trino-lineage-logger")
204+
lineage_logger = job_input._JobInput__templates._core_context.state.get(
205+
LINEAGE_LOGGER_KEY
206+
)
207+
lineage_logger.send(lineage_data)
208+
184209

185210
def is_partitioned_table(job_input: IJobInput, target_schema, target_table) -> bool:
186211
show_create_query = CommonUtilities.get_file_content(

projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/dimension/scd1_upsert/03-handle-quality-checks_and_move_data.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
1-
# Copyright 2023-2024 Broadcom
1+
# Copyright 2023-2025 Broadcom
22
# SPDX-License-Identifier: Apache-2.0
33
import logging
44
import os
55
import re
66

77
from vdk.api.job_input import IJobInput
8+
from vdk.api.lineage.model.logger.lineage_logger import ILineageLogger
9+
from vdk.api.lineage.model.sql.model import LineageData
10+
from vdk.internal.core.statestore import StoreKey
811
from vdk.plugin.trino.templates.data_quality_exception import DataQualityException
912
from vdk.plugin.trino.trino_utils import CommonUtilities
1013

@@ -78,7 +81,13 @@ def run(job_input: IJobInput):
7881
# copy the data if there's no quality check configure or if it passes
7982
if not check or check(staging_table_full_name):
8083
copy_staging_table_to_target_table(
81-
job_input, target_schema, target_table, staging_schema, staging_table
84+
job_input,
85+
target_schema,
86+
target_table,
87+
staging_schema,
88+
staging_table,
89+
source_schema,
90+
source_view,
8291
)
8392
else:
8493
target_table_full_name = f"{target_schema}.{target_table}"
@@ -95,6 +104,8 @@ def copy_staging_table_to_target_table(
95104
target_table,
96105
source_schema,
97106
source_table,
107+
original_source_schema,
108+
original_source_view,
98109
):
99110
# non-partitioned tables:
100111
# - Since truncate and delete do not work for non-partitioned tables - get the create statement, drop the table and then re-create it - we do this to preserve and metadata like user comments
@@ -135,6 +146,20 @@ def copy_staging_table_to_target_table(
135146
)
136147
job_input.execute_query(insert_into_table)
137148

149+
lineage_data = LineageData(
150+
query="template",
151+
query_type="template",
152+
query_status="OK",
153+
input_tables=[original_source_schema + "." + original_source_view],
154+
output_table=target_schema + "." + target_table,
155+
)
156+
157+
LINEAGE_LOGGER_KEY = StoreKey[ILineageLogger]("trino-lineage-logger")
158+
lineage_logger = job_input._JobInput__templates._core_context.state.get(
159+
LINEAGE_LOGGER_KEY
160+
)
161+
lineage_logger.send(lineage_data)
162+
138163

139164
def remove_external_location(sql_statement):
140165
# Regular expression pattern to match the external_location clause

projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/insert/02-handle-quality-checks_and_move_data.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1-
# Copyright 2023-2024 Broadcom
1+
# Copyright 2023-2025 Broadcom
22
# SPDX-License-Identifier: Apache-2.0
33
import logging
44
import os
55

66
from vdk.api.job_input import IJobInput
7+
from vdk.api.lineage.model.logger.lineage_logger import ILineageLogger
8+
from vdk.api.lineage.model.sql.model import LineageData
9+
from vdk.internal.core.statestore import StoreKey
710
from vdk.plugin.trino.templates.data_quality_exception import DataQualityException
811
from vdk.plugin.trino.trino_utils import CommonUtilities
912

@@ -128,3 +131,17 @@ def run(job_input: IJobInput):
128131
)
129132
else:
130133
job_input.execute_query(insert_query)
134+
135+
lineage_data = LineageData(
136+
query="template",
137+
query_type="template",
138+
query_status="OK",
139+
input_tables=[source_schema + "." + source_view],
140+
output_table=target_schema + "." + target_table,
141+
)
142+
143+
LINEAGE_LOGGER_KEY = StoreKey[ILineageLogger]("trino-lineage-logger")
144+
lineage_logger = job_input._JobInput__templates._core_context.state.get(
145+
LINEAGE_LOGGER_KEY
146+
)
147+
lineage_logger.send(lineage_data)

projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/fact/snapshot/02-handle-quality-checks_and_move_data.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1-
# Copyright 2023-2024 Broadcom
1+
# Copyright 2023-2025 Broadcom
22
# SPDX-License-Identifier: Apache-2.0
33
import logging
44
import os
55

66
from vdk.api.job_input import IJobInput
7+
from vdk.api.lineage.model.logger.lineage_logger import ILineageLogger
8+
from vdk.api.lineage.model.sql.model import LineageData
9+
from vdk.internal.core.statestore import StoreKey
710
from vdk.plugin.trino.templates.data_quality_exception import DataQualityException
811
from vdk.plugin.trino.trino_utils import CommonUtilities
912

@@ -116,6 +119,20 @@ def run(job_input: IJobInput):
116119
source_view=staging_table,
117120
)
118121
job_input.execute_query(create_and_insert_into_target)
122+
123+
lineage_data = LineageData(
124+
query="template",
125+
query_type="template",
126+
query_status="OK",
127+
input_tables=[source_schema + "." + source_view],
128+
output_table=target_schema + "." + target_table,
129+
)
130+
131+
LINEAGE_LOGGER_KEY = StoreKey[ILineageLogger]("trino-lineage-logger")
132+
lineage_logger = job_input._JobInput__templates._core_context.state.get(
133+
LINEAGE_LOGGER_KEY
134+
)
135+
lineage_logger.send(lineage_data)
119136
else:
120137
log.info(
121138
f"Target table {target_schema}.{target_table} remains unchanged "

0 commit comments

Comments
 (0)