2424)
2525from snowflake .connector ._sql_util import get_file_transfer_type
2626from snowflake .connector .aio ._bind_upload_agent import BindUploadAgent
27+ from snowflake .connector .aio ._file_transfer_agent import SnowflakeFileTransferAgent
2728from snowflake .connector .aio ._result_batch import (
2829 ResultBatch ,
2930 create_batches_from_response ,
@@ -664,11 +665,8 @@ async def execute(
664665 )
665666 logger .debug ("PUT OR GET: %s" , self .is_file_transfer )
666667 if self .is_file_transfer :
667- from ._file_transfer_agent import SnowflakeFileTransferAgent
668-
669668 # Decide whether to use the old, or new code path
670- sf_file_transfer_agent = SnowflakeFileTransferAgent (
671- self ,
669+ sf_file_transfer_agent = self ._create_file_transfer_agent (
672670 query ,
673671 ret ,
674672 put_callback = _put_callback ,
@@ -684,9 +682,6 @@ async def execute(
684682 skip_upload_on_content_match = _skip_upload_on_content_match ,
685683 source_from_stream = file_stream ,
686684 multipart_threshold = data .get ("threshold" ),
687- use_s3_regional_url = self ._connection .enable_stage_s3_privatelink_for_us_east_1 ,
688- unsafe_file_write = self ._connection .unsafe_file_write ,
689- reraise_error_in_file_transfer_work_function = self .connection ._reraise_error_in_file_transfer_work_function ,
690685 )
691686 await sf_file_transfer_agent .execute ()
692687 data = sf_file_transfer_agent .result ()
@@ -1082,8 +1077,6 @@ async def _download(
10821077 _do_reset (bool, optional): Whether to reset the cursor before
10831078 downloading, by default we will reset the cursor.
10841079 """
1085- from ._file_transfer_agent import SnowflakeFileTransferAgent
1086-
10871080 if _do_reset :
10881081 self .reset ()
10891082
@@ -1097,11 +1090,9 @@ async def _download(
10971090 )
10981091
10991092 # Execute the file operation based on the interpretation above.
1100- file_transfer_agent = SnowflakeFileTransferAgent (
1101- self ,
1093+ file_transfer_agent = self ._create_file_transfer_agent (
11021094 "" , # empty command because it is triggered by directly calling this util not by a SQL query
11031095 ret ,
1104- reraise_error_in_file_transfer_work_function = self .connection ._reraise_error_in_file_transfer_work_function ,
11051096 )
11061097 await file_transfer_agent .execute ()
11071098 await self ._init_result_and_meta (file_transfer_agent .result ())
@@ -1122,8 +1113,6 @@ async def _upload(
11221113 _do_reset (bool, optional): Whether to reset the cursor before
11231114 uploading, by default we will reset the cursor.
11241115 """
1125- from ._file_transfer_agent import SnowflakeFileTransferAgent
1126-
11271116 if _do_reset :
11281117 self .reset ()
11291118
@@ -1137,12 +1126,10 @@ async def _upload(
11371126 )
11381127
11391128 # Execute the file operation based on the interpretation above.
1140- file_transfer_agent = SnowflakeFileTransferAgent (
1141- self ,
1129+ file_transfer_agent = self ._create_file_transfer_agent (
11421130 "" , # empty command because it is triggered by directly calling this util not by a SQL query
11431131 ret ,
11441132 force_put_overwrite = False , # _upload should respect user decision on overwriting
1145- reraise_error_in_file_transfer_work_function = self .connection ._reraise_error_in_file_transfer_work_function ,
11461133 )
11471134 await file_transfer_agent .execute ()
11481135 await self ._init_result_and_meta (file_transfer_agent .result ())
@@ -1191,8 +1178,6 @@ async def _upload_stream(
11911178 _do_reset (bool, optional): Whether to reset the cursor before
11921179 uploading, by default we will reset the cursor.
11931180 """
1194- from ._file_transfer_agent import SnowflakeFileTransferAgent
1195-
11961181 if _do_reset :
11971182 self .reset ()
11981183
@@ -1207,13 +1192,11 @@ async def _upload_stream(
12071192 )
12081193
12091194 # Execute the file operation based on the interpretation above.
1210- file_transfer_agent = SnowflakeFileTransferAgent (
1211- self ,
1195+ file_transfer_agent = self ._create_file_transfer_agent (
12121196 "" , # empty command because it is triggered by directly calling this util not by a SQL query
12131197 ret ,
12141198 source_from_stream = input_stream ,
12151199 force_put_overwrite = False , # _upload should respect user decision on overwriting
1216- reraise_error_in_file_transfer_work_function = self .connection ._reraise_error_in_file_transfer_work_function ,
12171200 )
12181201 await file_transfer_agent .execute ()
12191202 await self ._init_result_and_meta (file_transfer_agent .result ())
@@ -1320,6 +1303,24 @@ async def query_result(self, qid: str) -> SnowflakeCursor:
13201303 )
13211304 return self
13221305
1306+ def _create_file_transfer_agent (
1307+ self ,
1308+ command : str ,
1309+ ret : dict [str , Any ],
1310+ / ,
1311+ ** kwargs ,
1312+ ) -> SnowflakeFileTransferAgent :
1313+
1314+ return SnowflakeFileTransferAgent (
1315+ self ,
1316+ command ,
1317+ ret ,
1318+ use_s3_regional_url = self ._connection .enable_stage_s3_privatelink_for_us_east_1 ,
1319+ unsafe_file_write = self ._connection .unsafe_file_write ,
1320+ reraise_error_in_file_transfer_work_function = self ._connection ._reraise_error_in_file_transfer_work_function ,
1321+ ** kwargs ,
1322+ )
1323+
13231324
13241325class DictCursor (DictCursorSync , SnowflakeCursor ):
13251326 pass
0 commit comments