Skip to content

Commit 5f8f235

Browse files
committed
SNOW-1572316: async query timer bomb (#2059)
1 parent 74b1b87 commit 5f8f235

File tree

2 files changed

+60
-55
lines changed

2 files changed

+60
-55
lines changed

src/snowflake/connector/aio/_cursor.py

Lines changed: 60 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import asyncio
88
import collections
99
import re
10+
import signal
1011
import sys
1112
import uuid
1213
from logging import getLogger
@@ -66,6 +67,7 @@ def __init__(
6667
# the following fixes type hint
6768
self._connection: SnowflakeConnection = connection
6869
self._lock_canceling = asyncio.Lock()
70+
self._timebomb: asyncio.Task | None = None
6971

7072
def __aiter__(self):
7173
return self
@@ -98,13 +100,19 @@ async def __aexit__(
98100
"""Context manager with commit or rollback."""
99101
await self.close()
100102

103+
async def _timebomb_task(self, timeout, query):
104+
try:
105+
logger.debug("started timebomb in %ss", timeout)
106+
await asyncio.sleep(timeout)
107+
await self.__cancel_query(query)
108+
except asyncio.CancelledError:
109+
logger.debug("cancelled timebomb in timebomb task")
110+
101111
async def __cancel_query(self, query) -> None:
102112
if self._sequence_counter >= 0 and not self.is_closed():
103113
logger.debug("canceled. %s, request_id: %s", query, self._request_id)
104114
async with self._lock_canceling:
105-
raise NotImplementedError(
106-
"Canceling a query is not supported in async."
107-
)
115+
await self._connection._cancel_query(query, self._request_id)
108116

109117
async def _describe_internal(
110118
self, *args: Any, **kwargs: Any
@@ -187,44 +195,44 @@ async def _execute_helper(
187195
timeout if timeout and timeout > 0 else self._connection.network_timeout
188196
)
189197

190-
# TODO: asyncio timer bomb
191-
# if real_timeout is not None:
192-
# self._timebomb = Timer(real_timeout, self.__cancel_query, [query])
193-
# self._timebomb.start()
194-
# logger.debug("started timebomb in %ss", real_timeout)
195-
# else:
196-
# self._timebomb = None
197-
#
198-
# original_sigint = signal.getsignal(signal.SIGINT)
199-
#
200-
# def interrupt_handler(*_): # pragma: no cover
201-
# try:
202-
# signal.signal(signal.SIGINT, exit_handler)
203-
# except (ValueError, TypeError):
204-
# # ignore failures
205-
# pass
206-
# try:
207-
# if self._timebomb is not None:
208-
# self._timebomb.cancel()
209-
# logger.debug("cancelled timebomb in finally")
210-
# self._timebomb = None
211-
# self.__cancel_query(query)
212-
# finally:
213-
# if original_sigint:
214-
# try:
215-
# signal.signal(signal.SIGINT, original_sigint)
216-
# except (ValueError, TypeError):
217-
# # ignore failures
218-
# pass
219-
# raise KeyboardInterrupt
220-
#
221-
# try:
222-
# if not original_sigint == exit_handler:
223-
# signal.signal(signal.SIGINT, interrupt_handler)
224-
# except ValueError: # pragma: no cover
225-
# logger.debug(
226-
# "Failed to set SIGINT handler. " "Not in main thread. Ignored..."
227-
# )
198+
if real_timeout is not None:
199+
self._timebomb = asyncio.create_task(
200+
self._timebomb_task(real_timeout, query)
201+
)
202+
logger.debug("started timebomb in %ss", real_timeout)
203+
else:
204+
self._timebomb = None
205+
206+
original_sigint = signal.getsignal(signal.SIGINT)
207+
208+
def interrupt_handler(*_): # pragma: no cover
209+
try:
210+
signal.signal(signal.SIGINT, snowflake.connector.cursor.exit_handler)
211+
except (ValueError, TypeError):
212+
# ignore failures
213+
pass
214+
try:
215+
if self._timebomb is not None:
216+
self._timebomb.cancel()
217+
self._timebomb = None
218+
logger.debug("cancelled timebomb in finally")
219+
asyncio.create_task(self.__cancel_query(query))
220+
finally:
221+
if original_sigint:
222+
try:
223+
signal.signal(signal.SIGINT, original_sigint)
224+
except (ValueError, TypeError):
225+
# ignore failures
226+
pass
227+
raise KeyboardInterrupt
228+
229+
try:
230+
if not original_sigint == snowflake.connector.cursor.exit_handler:
231+
signal.signal(signal.SIGINT, interrupt_handler)
232+
except ValueError: # pragma: no cover
233+
logger.debug(
234+
"Failed to set SIGINT handler. " "Not in main thread. Ignored..."
235+
)
228236
ret: dict[str, Any] = {"data": {}}
229237
try:
230238
ret = await self._connection.cmd_query(
@@ -243,18 +251,17 @@ async def _execute_helper(
243251
dataframe_ast=dataframe_ast,
244252
)
245253
finally:
246-
pass
247-
# TODO: async timer bomb
248-
# try:
249-
# if original_sigint:
250-
# signal.signal(signal.SIGINT, original_sigint)
251-
# except (ValueError, TypeError): # pragma: no cover
252-
# logger.debug(
253-
# "Failed to reset SIGINT handler. Not in main " "thread. Ignored..."
254-
# )
255-
# if self._timebomb is not None:
256-
# self._timebomb.cancel()
257-
# logger.debug("cancelled timebomb in finally")
254+
try:
255+
if original_sigint:
256+
signal.signal(signal.SIGINT, original_sigint)
257+
except (ValueError, TypeError): # pragma: no cover
258+
logger.debug(
259+
"Failed to reset SIGINT handler. Not in main " "thread. Ignored..."
260+
)
261+
if self._timebomb is not None:
262+
self._timebomb.cancel()
263+
self._timebomb = None
264+
logger.debug("cancelled timebomb in finally")
258265

259266
if "data" in ret and "parameters" in ret["data"]:
260267
parameters = ret["data"].get("parameters", list())

test/integ/aio/test_cursor_async.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -759,8 +759,6 @@ async def test_invalid_bind_data_type(conn_cnx):
759759
await cnx.cursor().execute("select 1 from dual where 1=%s", ([1, 2, 3],))
760760

761761

762-
# TODO: SNOW-1657469 for timeout
763-
@pytest.mark.skip
764762
async def test_timeout_query(conn_cnx):
765763
async with conn_cnx() as cnx:
766764
async with cnx.cursor() as c:

0 commit comments

Comments
 (0)