@@ -160,7 +160,6 @@ def run(self, params: Dict[str, Any]) -> Optional[result.FusionSQLResult]:
160160 config = {}
161161 config .update (catalog_config )
162162 config .update (storage_config )
163- config ['table_id' ] = f'{ iceberg_database } .{ iceberg_table } '
164163 config_json = json .dumps (config )
165164
166165 creds = {}
@@ -170,6 +169,7 @@ def run(self, params: Dict[str, Any]) -> Optional[result.FusionSQLResult]:
170169
171170 # Create a unique pipeline name
172171 pipeline_name = f'iceberg_mv_{ view_database } _{ view_table } _{ uuid .uuid4 ().hex [:8 ]} '
172+ table_id = f'{ iceberg_database } .{ iceberg_table } '
173173
174174 print ('ICEBERG TABLE' , iceberg_database , iceberg_table )
175175 print ('DB TABLE' , view_database , view_table )
@@ -179,20 +179,23 @@ def run(self, params: Dict[str, Any]) -> Optional[result.FusionSQLResult]:
179179 # Create and start the pipeline
180180 with connect () as conn :
181181 with conn .cursor () as cur :
182- # Create the pipeline
182+ # Infer and create the pipeline.
183+ # It also creates a table (and optionally a view in case of merge pipeline) with the same name
183184 cur .execute (rf'''
184- CREATE PIPELINE `{ pipeline_name } ` AS
185- LOAD DATA S3 ''
185+ CREATE INFERRED PIPELINE `{ pipeline_name } ` AS
186+ LOAD DATA S3 '{ table_id } '
186187 CONFIG '{ config_json } '
187188 CREDENTIALS '{ creds_json } '
188- REPLACE INTO TABLE
189- `{ view_database } `.`{ view_table } `
190189 FORMAT ICEBERG
190+ OPTIONS = 'merge'
191191 ''' )
192192
193193 # Start the pipeline
194194 cur .execute (rf'START PIPELINE `{ pipeline_name } `' )
195195
196+ # Create view with user-provided name
197+ cur .execute (rf'CREATE VIEW `{ view_database } `.`{ view_table } ` AS SELECT * FROM `{ pipeline_name } `' )
198+
196199 # Return result
197200 res = result .FusionSQLResult ()
198201 res .add_field ('MaterializedView' , result .STRING )
0 commit comments