22# SPDX-License-Identifier: Apache-2.0
33import logging
44import os
5+ import re
56
67from vdk .api .job_input import IJobInput
78from vdk .plugin .trino .templates .data_quality_exception import DataQualityException
89from vdk .plugin .trino .trino_utils import CommonUtilities
9- from vdk .plugin .trino .trino_utils import TrinoTemplateQueries
1010
1111log = logging .getLogger (__name__ )
1212
@@ -121,23 +121,18 @@ def copy_staging_table_to_target_table(
121121 # override session properties -> "SET SESSION hdfs.insert_existing_partitions_behavior = 'OVERWRITE';"
122122 for item in original_session_properties_backup :
123123 statement = set_session_property_template .format (
124- propety_name = item [0 ], property_value = OVERWRITE
124+ property_name = item [0 ], property_value = OVERWRITE
125125 )
126126 job_input .execute_query (statement )
127127
128128 # INSERT OVERWRITE data
129- partition_clause = get_insert_sql_partition_clause (
130- job_input , target_schema , target_table
131- )
132-
133129 insert_overwrite_query = CommonUtilities .get_file_content (
134130 SQL_FILES_FOLDER , "02-insert-overwrite.sql"
135131 )
136132
137133 insert_overwrite = insert_overwrite_query .format (
138134 target_schema = target_schema ,
139135 target_table = target_table ,
140- _vdk_template_insert_partition_clause = partition_clause ,
141136 source_schema = source_schema ,
142137 source_view = source_table ,
143138 )
@@ -146,22 +141,38 @@ def copy_staging_table_to_target_table(
146141 # restore session properties
147142 for item in original_session_properties_backup :
148143 statement = set_session_property_template .format (
149- propety_name = item [0 ], property_value = item [1 ]
144+ property_name = item [0 ], property_value = item [1 ]
150145 )
151146 job_input .execute_query (statement )
152147 else :
153148 # non-partitioned tables:
154- # - Truncate the target table
149+ # - Since truncate and delete do not work for non-partitioned tables - get the create statement, drop the table and then re-create it - we do this to preserve and metadata like user comments
155150 # - Insert contents from staging table in target table
156151 # - Delete staging table
157- truncate_table_query = CommonUtilities .get_file_content (
158- SQL_FILES_FOLDER , "02-truncate -table.sql"
152+ show_create_query = CommonUtilities .get_file_content (
153+ SQL_FILES_FOLDER , "02-show-create -table.sql"
159154 )
160- truncate_table = truncate_table_query .format (
155+ show_create_target_table = show_create_query .format (
161156 target_schema = target_schema , target_table = target_table
162157 )
163- job_input .execute_query (truncate_table )
164158
159+ table_create_statement = job_input .execute_query (show_create_target_table )
160+ # remove the "external_location" clause from the create statement as it might lead to data not being cleaned up properly in hive
161+ table_create_statement = remove_external_location (table_create_statement [0 ][0 ])
162+
163+ # drop the table
164+ drop_table_query = CommonUtilities .get_file_content (
165+ SQL_FILES_FOLDER , "02-drop-table.sql"
166+ )
167+ drop_table = drop_table_query .format (
168+ target_schema = target_schema , target_table = target_table
169+ )
170+ job_input .execute_query (drop_table )
171+
172+ # re-create the table
173+ job_input .execute_query (table_create_statement )
174+
175+ # insert the data
165176 insert_into_table_query = CommonUtilities .get_file_content (
166177 SQL_FILES_FOLDER , "02-insert-into-table.sql"
167178 )
@@ -173,14 +184,6 @@ def copy_staging_table_to_target_table(
173184 )
174185 job_input .execute_query (insert_into_table )
175186
176- drop_table_query = CommonUtilities .get_file_content (
177- SQL_FILES_FOLDER , "02-drop-table.sql"
178- )
179- drop_table = drop_table_query .format (
180- target_schema = source_schema , target_table = source_table
181- )
182- job_input .execute_query (drop_table )
183-
184187
185188def is_partitioned_table (job_input : IJobInput , target_schema , target_table ) -> bool :
186189 show_create_query = CommonUtilities .get_file_content (
@@ -198,18 +201,11 @@ def is_partitioned_table(job_input: IJobInput, target_schema, target_table) -> b
198201 )
199202
200203
201- def get_insert_sql_partition_clause (job_input : IJobInput , target_schema , target_table ):
202- get_partitions_query = CommonUtilities .get_file_content (
203- SQL_FILES_FOLDER , "02-get-partitions.sql"
204- )
205- get_partitions = get_partitions_query .format (
206- target_schema = target_schema , target_table = target_table
207- )
208- partitions_result = job_input .execute_query (get_partitions )
204+ def remove_external_location (sql_statement ):
205+ # Regular expression pattern to match the external_location clause
206+ pattern = r"external_location\s*=\s*'[^']*',?\s*"
209207
210- partitions = []
211- for partition in partitions_result :
212- partitions .append (partition [0 ])
208+ # Remove the external_location clause from the SQL statement
209+ cleaned_sql = re .sub (pattern , "" , sql_statement , flags = re .IGNORECASE )
213210
214- sql = "PARTITION (" + "," .join ("`" + p + "`" for p in partitions ) + ")"
215- return sql
211+ return cleaned_sql
0 commit comments