@@ -201,13 +201,20 @@ def get_creation_expression(
201201 table_function = False ,
202202 allow_dynamic_metadata_for_data_lakes = False ,
203203 run_on_cluster = False ,
204+ explicit_metadata_path = "" ,
204205 ** kwargs ,
205206):
206- allow_dynamic_metadata_for_datalakes_suffix = (
207- " SETTINGS allow_dynamic_metadata_for_data_lakes = 1"
208- if allow_dynamic_metadata_for_data_lakes
209- else ""
210- )
207+ settings_array = []
208+ if allow_dynamic_metadata_for_data_lakes :
209+ settings_array .append ("allow_dynamic_metadata_for_data_lakes = 1" )
210+
211+ if explicit_metadata_path :
212+ settings_array .append (f"iceberg_metadata_file_path = '{ explicit_metadata_path } '" )
213+
214+ if settings_array :
215+ settings_expression = " SETTINGS " + "," .join (settings_array )
216+ else :
217+ settings_expression = ""
211218
212219 if storage_type == "s3" :
213220 if "bucket" in kwargs :
@@ -227,7 +234,7 @@ def get_creation_expression(
227234 DROP TABLE IF EXISTS { table_name } ;
228235 CREATE TABLE { table_name }
229236 ENGINE=IcebergS3(s3, filename = 'iceberg_data/default/{ table_name } /', format={ format } , url = 'http://minio1:9001/{ bucket } /')"""
230- + allow_dynamic_metadata_for_datalakes_suffix
237+ + settings_expression
231238 )
232239
233240 elif storage_type == "azure" :
@@ -247,7 +254,7 @@ def get_creation_expression(
247254 DROP TABLE IF EXISTS { table_name } ;
248255 CREATE TABLE { table_name }
249256 ENGINE=IcebergAzure(azure, container = { cluster .azure_container_name } , storage_account_url = '{ cluster .env_variables ["AZURITE_STORAGE_ACCOUNT_URL" ]} ', blob_path = '/iceberg_data/default/{ table_name } /', format={ format } )"""
250- + allow_dynamic_metadata_for_datalakes_suffix
257+ + settings_expression
251258 )
252259
253260 elif storage_type == "local" :
@@ -263,7 +270,7 @@ def get_creation_expression(
263270 DROP TABLE IF EXISTS { table_name } ;
264271 CREATE TABLE { table_name }
265272 ENGINE=IcebergLocal(local, path = '/iceberg_data/default/{ table_name } /', format={ format } )"""
266- + allow_dynamic_metadata_for_datalakes_suffix
273+ + settings_expression
267274 )
268275
269276 else :
@@ -2723,3 +2730,47 @@ def check_validity_and_get_prunned_files(select_expression):
27232730 )
27242731 == 1
27252732 )
2733+
2734+ @pytest .mark .parametrize ("storage_type" , ["s3" , "azure" , "local" ])
2735+ def test_explicit_metadata_file (started_cluster , storage_type ):
2736+ instance = started_cluster .instances ["node1" ]
2737+ spark = started_cluster .spark_session
2738+ TABLE_NAME = (
2739+ "test_explicit_metadata_file_"
2740+ + storage_type
2741+ + "_"
2742+ + get_uuid_str ()
2743+ )
2744+
2745+ spark .sql (
2746+ f"CREATE TABLE { TABLE_NAME } (id bigint, data string) USING iceberg TBLPROPERTIES ('format-version' = '2', 'write.update.mode'='merge-on-read', 'write.delete.mode'='merge-on-read', 'write.merge.mode'='merge-on-read')"
2747+ )
2748+
2749+ for i in range (50 ):
2750+ spark .sql (
2751+ f"INSERT INTO { TABLE_NAME } select id, char(id + ascii('a')) from range(10)"
2752+ )
2753+
2754+ default_upload_directory (
2755+ started_cluster ,
2756+ storage_type ,
2757+ f"/iceberg_data/default/{ TABLE_NAME } /" ,
2758+ f"/iceberg_data/default/{ TABLE_NAME } /" ,
2759+ )
2760+
2761+ create_iceberg_table (storage_type , instance , TABLE_NAME , started_cluster , explicit_metadata_path = "" )
2762+
2763+ assert int (instance .query (f"SELECT count() FROM { TABLE_NAME } " )) == 500
2764+
2765+ create_iceberg_table (storage_type , instance , TABLE_NAME , started_cluster , explicit_metadata_path = "metadata/v31.metadata.json" )
2766+
2767+ assert int (instance .query (f"SELECT count() FROM { TABLE_NAME } " )) == 300
2768+
2769+ create_iceberg_table (storage_type , instance , TABLE_NAME , started_cluster , explicit_metadata_path = "metadata/v11.metadata.json" )
2770+
2771+ assert int (instance .query (f"SELECT count() FROM { TABLE_NAME } " )) == 100
2772+
2773+ with pytest .raises (Exception ):
2774+ create_iceberg_table (storage_type , instance , TABLE_NAME , started_cluster , explicit_metadata_path = chr (0 ) + chr (1 ))
2775+ with pytest .raises (Exception ):
2776+ create_iceberg_table (storage_type , instance , TABLE_NAME , started_cluster , explicit_metadata_path = "../metadata/v11.metadata.json" )
0 commit comments