Skip to content

Commit 0bdf451

Browse files
authored
Add new exit code for aborted queries (#1587)
1 parent d4a9e6d commit 0bdf451

File tree

5 files changed

+162
-3
lines changed

5 files changed

+162
-3
lines changed

src/datachain/catalog/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from .catalog import (
22
QUERY_DATASET_PREFIX,
3+
QUERY_SCRIPT_ABORTED_EXIT_CODE,
34
QUERY_SCRIPT_CANCELED_EXIT_CODE,
45
Catalog,
56
is_namespace_local,
@@ -8,6 +9,7 @@
89

910
__all__ = [
1011
"QUERY_DATASET_PREFIX",
12+
"QUERY_SCRIPT_ABORTED_EXIT_CODE",
1113
"QUERY_SCRIPT_CANCELED_EXIT_CODE",
1214
"Catalog",
1315
"get_catalog",

src/datachain/catalog/catalog.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@
7878
DATASET_INTERNAL_ERROR_MESSAGE = "Internal error on creating dataset"
7979
# exit code we use if query script was canceled
8080
QUERY_SCRIPT_CANCELED_EXIT_CODE = 11
81+
# exit code we use if the job is already in a terminal state (failed/canceled elsewhere)
82+
QUERY_SCRIPT_ABORTED_EXIT_CODE = 12
8183
QUERY_SCRIPT_SIGTERM_EXIT_CODE = -15 # if query script was terminated by SIGTERM
8284

8385
# dataset pull

src/datachain/error.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,17 @@ def __init__(self, message: str, return_code: int = 0):
8585

8686

8787
class QueryScriptCancelError(QueryScriptRunError):
88-
pass
88+
"""Raised when a running script is canceled by the user."""
89+
90+
91+
class QueryScriptAbortError(QueryScriptRunError):
92+
"""Raised when execution should stop because the job is already in a terminal state.
93+
94+
Unlike QueryScriptCancelError (user-initiated cancellation), this signals that
95+
the job has already finished (e.g., failed elsewhere) and continuing execution
96+
would be wasteful. The distinction allows handlers to differentiate between
97+
"user clicked Cancel" and "job already dead, stop working".
98+
"""
8999

90100

91101
class ClientError(RuntimeError):

src/datachain/query/dataset.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
from datachain.dataset import DatasetDependency, DatasetStatus, RowDict
4444
from datachain.error import (
4545
DatasetNotFoundError,
46+
QueryScriptAbortError,
4647
QueryScriptCancelError,
4748
TableMissingError,
4849
)
@@ -592,7 +593,7 @@ def create_result_query(
592593
to select
593594
"""
594595

595-
def populate_udf_output_table(
596+
def populate_udf_output_table( # noqa: PLR0915
596597
self,
597598
udf_table: "Table",
598599
query: Select,
@@ -612,7 +613,10 @@ def populate_udf_output_table(
612613
)
613614
return
614615

615-
from datachain.catalog import QUERY_SCRIPT_CANCELED_EXIT_CODE
616+
from datachain.catalog import (
617+
QUERY_SCRIPT_ABORTED_EXIT_CODE,
618+
QUERY_SCRIPT_CANCELED_EXIT_CODE,
619+
)
616620
from datachain.catalog.loader import (
617621
DISTRIBUTED_IMPORT_PATH,
618622
get_udf_distributor_class,
@@ -726,6 +730,14 @@ def populate_udf_output_table(
726730
"UDF execution was canceled by the user."
727731
) from None
728732
if retval := process.poll():
733+
if retval == QUERY_SCRIPT_CANCELED_EXIT_CODE:
734+
raise QueryScriptCancelError(
735+
"UDF execution was canceled by the user."
736+
)
737+
if retval == QUERY_SCRIPT_ABORTED_EXIT_CODE:
738+
raise QueryScriptAbortError(
739+
"UDF execution aborted: job already terminated."
740+
)
729741
raise RuntimeError(
730742
f"UDF Execution Failed! Exit code: {retval}"
731743
)
@@ -761,6 +773,9 @@ def populate_udf_output_table(
761773
processed_cb.close()
762774
generated_cb.close()
763775

776+
except QueryScriptAbortError:
777+
catalog.warehouse.close()
778+
sys.exit(QUERY_SCRIPT_ABORTED_EXIT_CODE)
764779
except QueryScriptCancelError:
765780
catalog.warehouse.close()
766781
sys.exit(QUERY_SCRIPT_CANCELED_EXIT_CODE)
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
from unittest.mock import MagicMock, patch
2+
3+
import pytest
4+
5+
import datachain as dc
6+
from datachain.catalog import (
7+
QUERY_SCRIPT_ABORTED_EXIT_CODE,
8+
QUERY_SCRIPT_CANCELED_EXIT_CODE,
9+
)
10+
11+
12+
def _make_mock_popen(poll_return_value=0, communicate_side_effect=None):
13+
mock_process = MagicMock()
14+
mock_process.__enter__ = MagicMock(return_value=mock_process)
15+
mock_process.__exit__ = MagicMock(return_value=False)
16+
if communicate_side_effect:
17+
mock_process.communicate.side_effect = communicate_side_effect
18+
else:
19+
mock_process.communicate.return_value = (b"", b"")
20+
mock_process.poll.return_value = poll_return_value
21+
return mock_process
22+
23+
24+
def _build_parallel_chain(session):
25+
def identity(x: int) -> int:
26+
return x
27+
28+
return (
29+
dc.read_values(x=list(range(5)), session=session)
30+
.settings(parallel=2)
31+
.map(identity, output={"result": int})
32+
)
33+
34+
35+
def test_aborted_exit_code_causes_sys_exit(test_session_tmpfile, monkeypatch):
36+
monkeypatch.delenv("DATACHAIN_DISTRIBUTED", raising=False)
37+
chain = _build_parallel_chain(test_session_tmpfile)
38+
39+
mock_process = _make_mock_popen(poll_return_value=QUERY_SCRIPT_ABORTED_EXIT_CODE)
40+
41+
with patch("datachain.query.dataset.subprocess.Popen", return_value=mock_process):
42+
with pytest.raises(SystemExit) as exc_info:
43+
chain.to_list()
44+
45+
assert exc_info.value.code == QUERY_SCRIPT_ABORTED_EXIT_CODE
46+
47+
48+
def test_canceled_exit_code_causes_sys_exit(test_session_tmpfile, monkeypatch):
49+
monkeypatch.delenv("DATACHAIN_DISTRIBUTED", raising=False)
50+
chain = _build_parallel_chain(test_session_tmpfile)
51+
52+
mock_process = _make_mock_popen(poll_return_value=QUERY_SCRIPT_CANCELED_EXIT_CODE)
53+
54+
with patch("datachain.query.dataset.subprocess.Popen", return_value=mock_process):
55+
with pytest.raises(SystemExit) as exc_info:
56+
chain.to_list()
57+
58+
assert exc_info.value.code == QUERY_SCRIPT_CANCELED_EXIT_CODE
59+
60+
61+
def test_unknown_exit_code_raises_runtime_error(test_session_tmpfile, monkeypatch):
62+
monkeypatch.delenv("DATACHAIN_DISTRIBUTED", raising=False)
63+
chain = _build_parallel_chain(test_session_tmpfile)
64+
65+
mock_process = _make_mock_popen(poll_return_value=42)
66+
67+
with patch("datachain.query.dataset.subprocess.Popen", return_value=mock_process):
68+
with pytest.raises(RuntimeError, match="UDF Execution Failed! Exit code: 42"):
69+
chain.to_list()
70+
71+
72+
def test_keyboard_interrupt_causes_cancel_sys_exit(test_session_tmpfile, monkeypatch):
73+
monkeypatch.delenv("DATACHAIN_DISTRIBUTED", raising=False)
74+
chain = _build_parallel_chain(test_session_tmpfile)
75+
76+
mock_process = _make_mock_popen(
77+
communicate_side_effect=KeyboardInterrupt("simulated ctrl-c")
78+
)
79+
80+
with patch("datachain.query.dataset.subprocess.Popen", return_value=mock_process):
81+
with pytest.raises(SystemExit) as exc_info:
82+
chain.to_list()
83+
84+
assert exc_info.value.code == QUERY_SCRIPT_CANCELED_EXIT_CODE
85+
86+
87+
def test_aborted_exit_code_closes_warehouse(test_session_tmpfile, monkeypatch):
88+
monkeypatch.delenv("DATACHAIN_DISTRIBUTED", raising=False)
89+
chain = _build_parallel_chain(test_session_tmpfile)
90+
91+
mock_process = _make_mock_popen(poll_return_value=QUERY_SCRIPT_ABORTED_EXIT_CODE)
92+
warehouse = test_session_tmpfile.catalog.warehouse
93+
original_close = warehouse.close
94+
close_called = False
95+
96+
def tracking_close():
97+
nonlocal close_called
98+
close_called = True
99+
original_close()
100+
101+
monkeypatch.setattr(warehouse, "close", tracking_close)
102+
103+
with patch("datachain.query.dataset.subprocess.Popen", return_value=mock_process):
104+
with pytest.raises(SystemExit):
105+
chain.to_list()
106+
107+
assert close_called
108+
109+
110+
def test_canceled_exit_code_closes_warehouse(test_session_tmpfile, monkeypatch):
111+
monkeypatch.delenv("DATACHAIN_DISTRIBUTED", raising=False)
112+
chain = _build_parallel_chain(test_session_tmpfile)
113+
114+
mock_process = _make_mock_popen(poll_return_value=QUERY_SCRIPT_CANCELED_EXIT_CODE)
115+
warehouse = test_session_tmpfile.catalog.warehouse
116+
original_close = warehouse.close
117+
close_called = False
118+
119+
def tracking_close():
120+
nonlocal close_called
121+
close_called = True
122+
original_close()
123+
124+
monkeypatch.setattr(warehouse, "close", tracking_close)
125+
126+
with patch("datachain.query.dataset.subprocess.Popen", return_value=mock_process):
127+
with pytest.raises(SystemExit):
128+
chain.to_list()
129+
130+
assert close_called

0 commit comments

Comments
 (0)