Skip to content

Commit b923c5d

Browse files
authored
Merge pull request #226 from AllenNeuralDynamics/feat-improve-timeout-strategy
Monitor long-running RPC spawned processes to prevent timeout errors
2 parents aa28ef0 + 1910a70 commit b923c5d

File tree

4 files changed

+148
-14
lines changed

4 files changed

+148
-14
lines changed

src/clabe/xml_rpc/_client.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@ class XmlRpcClientSettings(ServiceSettings):
3737
timeout: float = Field(default=30.0, description="Default timeout for RPC calls in seconds")
3838
poll_interval: float = Field(default=0.5, description="Polling interval for job status checks in seconds")
3939
max_file_size: int = Field(default=5 * 1024 * 1024, description="Maximum file size in bytes (default 5MB)")
40+
monitor: bool = Field(
41+
default=True,
42+
description="If True, timeout becomes a liveness check window. As long as the job is confirmed "
43+
"running within the timeout period, waiting continues indefinitely. If False, timeout is the "
44+
"total execution time limit.",
45+
)
4046

4147

4248
class XmlRpcClient:
@@ -52,7 +58,9 @@ def __init__(self, settings: XmlRpcClientSettings):
5258
self.settings = settings
5359
self._client = xmlrpc.client.ServerProxy(str(settings.server_url), allow_none=True)
5460
self._token = settings.token.get_secret_value()
55-
self._executor = XmlRpcExecutor(self, timeout=settings.timeout, poll_interval=settings.poll_interval)
61+
self._executor = XmlRpcExecutor(
62+
self, timeout=settings.timeout, poll_interval=settings.poll_interval, monitor=settings.monitor
63+
)
5664

5765
logger.info(f"RPC client initialized for server: {settings.server_url}")
5866

@@ -152,7 +160,8 @@ def wait_for_result(self, job_id: str, timeout: Optional[float] = None) -> JobRe
152160
JobResult object with execution details
153161
154162
Raises:
155-
TimeoutError: If the command doesn't complete within the timeout
163+
TimeoutError: If the command doesn't complete within the timeout (or if
164+
monitor mode is enabled and the job stops responding within the timeout window)
156165
157166
Example:
158167
```python
@@ -166,13 +175,21 @@ def wait_for_result(self, job_id: str, timeout: Optional[float] = None) -> JobRe
166175

167176
start_time = time.time()
168177

169-
while time.time() - start_time < timeout:
178+
while True:
179+
elapsed = time.time() - start_time
180+
if elapsed >= timeout:
181+
raise TimeoutError(f"Job {job_id} did not complete within {timeout} seconds")
182+
170183
result = self.get_result(job_id)
171184
if result.status == JobStatus.DONE:
172185
return result
173-
time.sleep(self.settings.poll_interval)
174186

175-
raise TimeoutError(f"Job {job_id} did not complete within {timeout} seconds")
187+
# In monitor mode, reset timer if job is still running
188+
if self.settings.monitor and self.is_running(job_id):
189+
logger.debug("Job %s is still running; resetting timeout timer", job_id)
190+
start_time = time.time()
191+
192+
time.sleep(self.settings.poll_interval)
176193

177194
def run_command(self, cmd_args: list[str] | str, timeout: Optional[float] = None) -> JobResult:
178195
"""

src/clabe/xml_rpc/_executor.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,11 @@ class XmlRpcExecutor:
4747
"""
4848

4949
def __init__(
50-
self, client: "XmlRpcClient", timeout: Optional[float] = None, poll_interval: Optional[float] = None
50+
self,
51+
client: "XmlRpcClient",
52+
timeout: Optional[float] = None,
53+
poll_interval: Optional[float] = None,
54+
monitor: bool = True,
5155
) -> None:
5256
"""
5357
Initialize the RPC executor.
@@ -56,6 +60,9 @@ def __init__(
5660
client: RPC client instance for communication with the remote server
5761
timeout: Maximum time to wait for command completion (uses client default if None)
5862
poll_interval: Polling interval for job status checks (uses client default if None)
63+
monitor: If True, timeout becomes a liveness check window. As long as the job
64+
is confirmed running within the timeout period, waiting continues indefinitely.
65+
If False, timeout is the total execution time limit.
5966
6067
Example:
6168
```python
@@ -72,6 +79,7 @@ def __init__(
7279
self.client = client
7380
self.timeout = timeout
7481
self.poll_interval = poll_interval
82+
self.monitor = monitor
7583

7684
logger.info(f"RPC executor initialized for server: {client.settings.server_url}")
7785

@@ -152,18 +160,26 @@ async def _wait_for_result_async(self, job_id: str):
152160
JobResult with execution details
153161
154162
Raises:
155-
TimeoutError: If job doesn't complete within timeout
163+
TimeoutError: If job doesn't complete within timeout (or if monitor mode
164+
is enabled and the job stops responding within the timeout window)
156165
"""
157166

158167
timeout = self.timeout or self.client.settings.timeout
159168
poll_interval = self.poll_interval or self.client.settings.poll_interval
160169
start_time = time.time()
161170

162-
while time.time() - start_time < timeout:
171+
while True:
172+
elapsed = time.time() - start_time
173+
if elapsed >= timeout:
174+
raise TimeoutError(f"Job {job_id} did not complete within {timeout} seconds")
175+
163176
await asyncio.sleep(poll_interval)
164177
# this is synchronous but should be fast
165178
result = self.client.get_result(job_id)
166179
if result.status == JobStatus.DONE:
167180
return result
168181

169-
raise TimeoutError(f"Job {job_id} did not complete within {timeout} seconds")
182+
# In monitor mode, reset timer if job is still running
183+
if self.monitor and self.client.is_running(job_id):
184+
logger.debug("Job %s is still running; resetting timeout timer", job_id)
185+
start_time = time.time()

tests/apps/test_rpc_executors.py

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ def mock_client(self):
1717
mock_client.settings.timeout = 30.0
1818
mock_client.settings.poll_interval = 0.5
1919
mock_client.settings.server_url = "http://localhost:8000"
20+
mock_client.settings.monitor = True
2021
return mock_client
2122

2223
@pytest.fixture
@@ -29,10 +30,19 @@ def test_init(self):
2930
mock_client = Mock()
3031
mock_client.settings.server_url = "http://localhost:8000"
3132

32-
executor = XmlRpcExecutor(mock_client, timeout=60.0, poll_interval=1.0)
33+
executor = XmlRpcExecutor(mock_client, timeout=60.0, poll_interval=1.0, monitor=False)
3334
assert executor.client == mock_client
3435
assert executor.timeout == 60.0
3536
assert executor.poll_interval == 1.0
37+
assert executor.monitor is False
38+
39+
def test_init_default_monitor(self):
40+
"""Test XmlRpcExecutor initialization with default monitor=True."""
41+
mock_client = Mock()
42+
mock_client.settings.server_url = "http://localhost:8000"
43+
44+
executor = XmlRpcExecutor(mock_client)
45+
assert executor.monitor is True
3646

3747
def test_run_success(self, executor):
3848
"""Test successful synchronous command execution."""
@@ -96,3 +106,37 @@ async def test_run_async_no_job_id(self, executor):
96106

97107
with pytest.raises(Exception, match="Job submission failed: no job ID returned"):
98108
await executor.run_async(cmd)
109+
110+
@pytest.mark.asyncio
111+
async def test_run_async_with_monitor_enabled(self, mock_client):
112+
"""Test asynchronous command execution with monitor mode enabled (default)."""
113+
# Create executor with monitor=True (default)
114+
executor = XmlRpcExecutor(mock_client, monitor=True)
115+
116+
submission_response = JobSubmissionResponse(success=True, job_id="monitor-job")
117+
# Simulate job running initially, then completing
118+
running_result = JobResult(
119+
job_id="monitor-job", status=JobStatus.RUNNING, stdout=None, stderr=None, returncode=None, error=None
120+
)
121+
done_result = JobResult(
122+
job_id="monitor-job",
123+
status=JobStatus.DONE,
124+
stdout="Monitored async output",
125+
stderr="",
126+
returncode=0,
127+
error=None,
128+
)
129+
130+
executor.client.submit_command.return_value = submission_response
131+
# First call returns running, second call returns done
132+
executor.client.get_result.side_effect = [running_result, done_result]
133+
executor.client.is_running.return_value = True
134+
135+
cmd = Command(cmd="long_async_command", output_parser=identity_parser)
136+
result = await executor.run_async(cmd)
137+
138+
assert isinstance(result, CommandResult)
139+
assert result.stdout == "Monitored async output"
140+
assert result.exit_code == 0
141+
142+
executor.client.submit_command.assert_called_once_with("long_async_command")

tests/xml-rpc/test_client.py

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ def test_client_settings_creation(self):
9494
assert settings.timeout == 30.0
9595
assert settings.poll_interval == 0.5
9696
assert settings.max_file_size == 5 * 1024 * 1024
97+
assert settings.monitor is True # Default is True
9798

9899
def test_client_settings_custom_values(self):
99100
"""Test creating client settings with custom values."""
@@ -103,13 +104,15 @@ def test_client_settings_custom_values(self):
103104
timeout=60.0,
104105
poll_interval=1.0,
105106
max_file_size=10 * 1024 * 1024,
107+
monitor=False,
106108
)
107109

108110
assert str(settings.server_url) == "http://192.168.1.100:9000/"
109111
assert settings.token.get_secret_value() == "custom-token"
110112
assert settings.timeout == 60.0
111113
assert settings.poll_interval == 1.0
112114
assert settings.max_file_size == 10 * 1024 * 1024
115+
assert settings.monitor is False
113116

114117

115118
class TestJobResult:
@@ -234,14 +237,25 @@ def test_wait_for_result_success(self, rpc_client: XmlRpcClient):
234237
assert result.returncode == 0
235238
assert "completed" in result.stdout
236239

237-
def test_wait_for_result_timeout(self, rpc_client: XmlRpcClient):
238-
"""Test timeout when waiting for command completion."""
240+
def test_wait_for_result_timeout(self, test_server):
241+
"""Test timeout when waiting for command completion (with monitor=False)."""
239242
import sys
240243

241-
submission = rpc_client.submit_command([sys.executable, "-c", "import time; time.sleep(10)"])
244+
# Create a client with monitor=False to test actual timeout behavior
245+
_, port, token = test_server
246+
settings = XmlRpcClientSettings(
247+
server_url=HttpUrl(f"http://127.0.0.1:{port}"),
248+
token=SecretStr(token),
249+
timeout=5.0,
250+
poll_interval=0.1,
251+
monitor=False, # Disable monitor mode to test hard timeout
252+
)
253+
client = XmlRpcClient(settings)
254+
255+
submission = client.submit_command([sys.executable, "-c", "import time; time.sleep(10)"])
242256

243257
with pytest.raises(TimeoutError, match="did not complete within"):
244-
rpc_client.wait_for_result(submission.job_id, timeout=0.5)
258+
client.wait_for_result(submission.job_id, timeout=0.5)
245259

246260
def test_run_command_success(self, rpc_client: XmlRpcClient):
247261
"""Test running a command to completion."""
@@ -267,6 +281,49 @@ def test_is_running(self, rpc_client: XmlRpcClient):
267281
is_running_after = rpc_client.is_running(submission.job_id)
268282
assert is_running_after is False
269283

284+
def test_wait_for_result_with_monitor_default(self, rpc_client: XmlRpcClient):
285+
"""Test that monitor mode (default) completes successfully for long-running job."""
286+
import sys
287+
288+
# Submit a job that runs longer than the timeout
289+
submission = rpc_client.submit_command([sys.executable, "-c", "import time; time.sleep(0.5); print('done')"])
290+
291+
# The job takes 0.5s but timeout is 0.3s - with default monitor=True,
292+
# the timeout resets each time is_running returns True
293+
result = rpc_client.wait_for_result(submission.job_id, timeout=0.3)
294+
295+
assert result.status == JobStatus.DONE
296+
assert result.returncode == 0
297+
assert "done" in result.stdout
298+
299+
def test_wait_for_result_monitor_resets_timeout(self, rpc_client: XmlRpcClient):
300+
"""Test that monitor mode (default) resets timeout when job is confirmed running."""
301+
import sys
302+
303+
# Submit a job that takes longer than the timeout
304+
submission = rpc_client.submit_command(
305+
[sys.executable, "-c", "import time; time.sleep(0.8); print('finished')"]
306+
)
307+
308+
# With default monitor=True, the timeout resets each poll cycle while job is running
309+
result = rpc_client.wait_for_result(submission.job_id, timeout=0.4)
310+
311+
assert result.status == JobStatus.DONE
312+
assert "finished" in result.stdout
313+
314+
def test_run_command_with_monitor_default(self, rpc_client: XmlRpcClient):
315+
"""Test run_command with default monitor mode enabled."""
316+
import sys
317+
318+
# With default monitor=True, long-running commands complete successfully
319+
result = rpc_client.run_command(
320+
[sys.executable, "-c", "import time; time.sleep(0.5); print('monitored')"],
321+
timeout=0.3,
322+
)
323+
324+
assert result.status == JobStatus.DONE
325+
assert "monitored" in result.stdout
326+
270327
def test_list_jobs(self, rpc_client: XmlRpcClient):
271328
"""Test listing jobs."""
272329
# Submit a job

0 commit comments

Comments
 (0)