Skip to content

Commit c5b4e47

Browse files
committed
SNOW-1654536: async binding stage bind upload agent (#2069)
1 parent fe6faae commit c5b4e47

13 files changed

+1284
-39
lines changed
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
#!/usr/bin/env python
2+
#
3+
# Copyright (c) 2012-2023 Snowflake Computing Inc. All rights reserved.
4+
#
5+
6+
from __future__ import annotations
7+
8+
from io import BytesIO
9+
from logging import getLogger
10+
from typing import TYPE_CHECKING, cast
11+
12+
from snowflake.connector import Error
13+
from snowflake.connector.bind_upload_agent import BindUploadAgent as BindUploadAgentSync
14+
from snowflake.connector.errors import BindUploadError
15+
16+
if TYPE_CHECKING:
17+
from snowflake.connector.aio import SnowflakeCursor
18+
19+
logger = getLogger(__name__)
20+
21+
22+
class BindUploadAgent(BindUploadAgentSync):
23+
def __init__(
24+
self,
25+
cursor: SnowflakeCursor,
26+
rows: list[bytes],
27+
stream_buffer_size: int = 1024 * 1024 * 10,
28+
) -> None:
29+
super().__init__(cursor, rows, stream_buffer_size)
30+
self.cursor = cast("SnowflakeCursor", cursor)
31+
32+
async def _create_stage(self) -> None:
33+
await self.cursor.execute(self._CREATE_STAGE_STMT)
34+
35+
async def upload(self) -> None:
36+
try:
37+
await self._create_stage()
38+
except Error as err:
39+
self.cursor.connection._session_parameters[
40+
"CLIENT_STAGE_ARRAY_BINDING_THRESHOLD"
41+
] = 0
42+
logger.debug("Failed to create stage for binding.")
43+
raise BindUploadError from err
44+
45+
row_idx = 0
46+
while row_idx < len(self.rows):
47+
f = BytesIO()
48+
size = 0
49+
while True:
50+
f.write(self.rows[row_idx])
51+
size += len(self.rows[row_idx])
52+
row_idx += 1
53+
if row_idx >= len(self.rows) or size >= self._stream_buffer_size:
54+
break
55+
try:
56+
await self.cursor.execute(
57+
f"PUT file://{row_idx}.csv {self.stage_path}", file_stream=f
58+
)
59+
except Error as err:
60+
logger.debug("Failed to upload the bindings file to stage.")
61+
raise BindUploadError from err
62+
f.close()

src/snowflake/connector/aio/_cursor.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
ProgrammingError,
2828
)
2929
from snowflake.connector._sql_util import get_file_transfer_type
30+
from snowflake.connector.aio._build_upload_agent import BindUploadAgent
3031
from snowflake.connector.aio._result_batch import (
3132
ResultBatch,
3233
create_batches_from_response,
@@ -746,9 +747,10 @@ async def executemany(
746747
):
747748
# bind stage optimization
748749
try:
749-
raise NotImplementedError(
750-
"Bind stage is not supported yet in async."
751-
)
750+
rows = self.connection._write_params_to_byte_rows(seqparams)
751+
bind_uploader = BindUploadAgent(self, rows)
752+
await bind_uploader.upload()
753+
bind_stage = bind_uploader.stage_path
752754
except BindUploadError:
753755
logger.debug(
754756
"Failed to upload binds to stage, sending binds to "

test/helpers.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
from snowflake.connector.compat import OK
2222

2323
if TYPE_CHECKING:
24+
import snowflake.connector.aio
2425
import snowflake.connector.connection
2526

2627
try:

0 commit comments

Comments
 (0)