Skip to content

Commit 2ef5d8c

Browse files
SNOW-2223110-asyncjob-failure-status-apis (#3605)
1 parent 8b66874 commit 2ef5d8c

File tree

3 files changed

+70
-0
lines changed

3 files changed

+70
-0
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
#### New Features
88

99
- `Session.create_dataframe` now accepts keyword arguments that are forwarded to the internal call to `Session.write_pandas` or `Session.write_arrow` when creating a DataFrame from a pandas DataFrame or a pyarrow Table.
10+
- Added new APIs for `AsyncJob`:
11+
- `AsyncJob.is_failed()` returns a `bool` indicating if a job has failed. Can be used in combination with `AsyncJob.is_done()` to determine if a job is finished and errored.
12+
- `AsyncJob.status()` returns a string representing the current query status (e.g., "RUNNING", "SUCCESS", "FAILED_WITH_ERROR") for detailed monitoring without calling `result()`.
1013
- Added a dataframe profiler. To use, you can call get_execution_profile() on your desired dataframe. This profiler reports the queries executed to evaluate a dataframe, and statistics about each of the query operators. Currently an experimental feature
1114

1215
## 1.35.0 (2025-07-24)

src/snowflake/snowpark/async_job.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,22 @@ def is_done(self) -> bool:
240240
is_running = self._session._conn._conn.is_still_running(status)
241241
return not is_running
242242

243+
def is_failed(self) -> bool:
244+
"""
245+
Checks the status of the query associated with this instance and returns a bool value
246+
indicating whether the query has failed.
247+
"""
248+
status = self._session._conn._conn.get_query_status(self.query_id)
249+
return self._session._conn._conn.is_an_error(status)
250+
251+
def status(self) -> str:
252+
"""
253+
Returns a string representing the current status of the query.
254+
(e.g., "RUNNING", "SUCCESS", "FAILED_WITH_ERROR", "ABORTING", etc.)
255+
"""
256+
status = self._session._conn._conn.get_query_status(self.query_id)
257+
return status.name
258+
243259
def cancel(self) -> None:
244260
"""Cancels the query associated with this instance."""
245261
# stop and cancel current query id

tests/integ/scala/test_async_job_suite.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,17 +378,27 @@ def test_async_is_running_and_cancel(session):
378378
while not async_job.is_done():
379379
sleep(1.0)
380380
assert async_job.is_done()
381+
assert not async_job.is_failed()
381382

382383
# set 20s to avoid flakiness
383384
async_job2 = session.sql("select SYSTEM$WAIT(20)").collect_nowait()
384385
assert not async_job2.is_done()
386+
assert not async_job2.is_failed()
385387
async_job2.cancel()
386388
start = time()
387389
while not async_job2.is_done():
390+
assert async_job2.status() in [
391+
"RUNNING",
392+
"ABORTING",
393+
"ABORTED",
394+
"FAILED_WITH_ERROR",
395+
]
388396
sleep(1.0)
389397
# If query is canceled, it takes less time than originally needed
390398
assert (time() - start) < 20
391399
assert async_job2.is_done()
400+
# cancel/abort should fail the query
401+
assert async_job2.is_failed()
392402

393403

394404
@pytest.mark.skipif(not is_pandas_available, reason="pandas is not available")
@@ -514,3 +524,44 @@ def test_iter_cursor_wait_for_result(session, action):
514524
df = session.sql("call system$wait(5)")
515525
async_job = action(df)
516526
assert async_job.result() is not None
527+
528+
529+
@pytest.mark.skipif(IS_IN_STORED_PROC, reason="sproc is not supported in async job yet")
530+
def test_async_job_status_apis(session):
531+
successful_queries = [
532+
"select a from values (1, 2), (3, 4) as t(a, b)",
533+
"select 1 + 1",
534+
]
535+
for query in successful_queries:
536+
async_job = session.sql(query).collect_nowait()
537+
while not async_job.is_done():
538+
assert not async_job.is_failed()
539+
status = async_job.status()
540+
assert status in [
541+
"RUNNING",
542+
"QUEUED",
543+
"RESUMING_WAREHOUSE",
544+
"QUEUED_REPARING_WAREHOUSE",
545+
"SUCCESS",
546+
]
547+
sleep(1.0)
548+
assert not async_job.is_failed()
549+
assert async_job.status() == "SUCCESS"
550+
assert async_job.result() is not None
551+
552+
failed_queries = ["select c from values (1, 2), (3, 4) as t(a, b)", "select 1 / 0"]
553+
for query in failed_queries:
554+
async_job = session.sql(query).collect_nowait()
555+
while not async_job.is_done():
556+
sleep(1.0)
557+
status = async_job.status()
558+
assert status in [
559+
"RUNNING",
560+
"QUEUED",
561+
"RESUMING_WAREHOUSE",
562+
"QUEUED_REPARING_WAREHOUSE",
563+
"FAILED_WITH_ERROR",
564+
"FAILED_WITH_INCIDENT",
565+
]
566+
assert async_job.is_failed()
567+
assert async_job.status() in ["FAILED_WITH_ERROR", "FAILED_WITH_INCIDENT"]

0 commit comments

Comments
 (0)