Skip to content

Commit 1b99496

Browse files
sbuldeevStefan Buldeevpre-commit-ci[bot]ddakov
authored
vdk-trino: Optimize SCD1 Upsert template (#3454)
What: Optimize template by splitting staging table population in two step to reduce memory footprint. Why: For bigger tables the template struggles to create the staging table with unions between the view data and current table data. Signed-off-by: Stefan Buldeev sbuldeev@vmware.com --------- Signed-off-by: Stefan Buldeev sbuldeev@vmware.com Signed-off-by: Dako Dakov <ddakov@vmware.com> Co-authored-by: Stefan Buldeev <stefan.buldeev@broadcom.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: ddakov <ddakov@vmware.com>
1 parent 38397a7 commit 1b99496

File tree

3 files changed

+16
-20
lines changed

3 files changed

+16
-20
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,12 +1 @@
1-
CREATE TABLE "{target_schema_staging}"."{target_table_staging}" AS
2-
(
3-
SELECT t.*
4-
FROM "{target_schema}"."{target_table}" AS t
5-
LEFT JOIN "{source_schema}"."{source_view}" AS s ON s."{id_column}" = t."{id_column}"
6-
WHERE s."{id_column}" IS NULL
7-
)
8-
UNION ALL
9-
(
10-
SELECT *
11-
FROM "{source_schema}"."{source_view}"
12-
)
1+
CREATE TABLE "{target_schema_staging}"."{target_table_staging}" AS (SELECT * FROM "{source_schema}"."{source_view}")

projects/vdk-plugins/vdk-trino/src/vdk/plugin/trino/templates/load/dimension/scd1_upsert/03-handle-quality-checks_and_move_data.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,23 +58,30 @@ def run(job_input: IJobInput):
5858
)
5959
job_input.execute_query(drop_table)
6060

61-
# create staging table and insert data
61+
# create staging table and insert the source (new) data
6262
create_table_and_insert_data_query = CommonUtilities.get_file_content(
6363
SQL_FILES_FOLDER, "02-create-table-and-insert-data.sql"
6464
)
65-
create_staging_table_and_insert_data = create_table_and_insert_data_query.format(
66-
target_schema=target_schema,
67-
target_table=target_table,
65+
create_stg_tbl_and_insert_new_data = create_table_and_insert_data_query.format(
6866
source_schema=source_schema,
6967
source_view=source_view,
7068
target_schema_staging=staging_schema,
7169
target_table_staging=staging_table,
72-
id_column=id_column,
7370
)
74-
job_input.execute_query(create_staging_table_and_insert_data)
71+
job_input.execute_query(create_stg_tbl_and_insert_new_data)
7572

7673
staging_table_full_name = f"{staging_schema}.{staging_table}"
7774

75+
# append the target-only (old) data to the staging table
76+
append_old_to_stg = f"""
77+
INSERT INTO {staging_table_full_name}
78+
SELECT t.*
79+
FROM {target_schema}.{target_table} AS t
80+
LEFT JOIN {staging_table_full_name} AS s ON s."{id_column}" = t."{id_column}"
81+
WHERE s."{id_column}" IS NULL
82+
"""
83+
job_input.execute_query(append_old_to_stg)
84+
7885
# copy the data if there's no quality check configure or if it passes
7986
if not check or check(staging_table_full_name):
8087
copy_staging_table_to_target_table(

projects/vdk-plugins/vdk-trino/tests/test_vdk_templates.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -672,7 +672,7 @@ def __scd_upsert_execute(
672672
"run",
673673
get_test_job_path(
674674
pathlib.Path(os.path.dirname(os.path.abspath(__file__))),
675-
"load_dimension_scd_upsert_template_job",
675+
"load_dimension_scd1_upsert_template_job",
676676
),
677677
"--arguments",
678678
json.dumps(
@@ -696,7 +696,7 @@ def __scd_upsert_execute(
696696
"run",
697697
get_test_job_path(
698698
pathlib.Path(os.path.dirname(os.path.abspath(__file__))),
699-
"load_dimension_scd_upsert_template_job",
699+
"load_dimension_scd1_upsert_template_job",
700700
),
701701
"--arguments",
702702
json.dumps(

0 commit comments

Comments
 (0)