Skip to content

Commit f46d958

Browse files
avoid a worker being held up indefintely by constantly checking
1 parent b155352 commit f46d958

File tree

2 files changed

+66
-2
lines changed

2 files changed

+66
-2
lines changed

core/pioreactor/web/tasks.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ def _process_delayed_json_response(
163163
unit: str,
164164
response: Response,
165165
*,
166-
max_attempts: int = 50,
166+
max_attempts: int = 60,
167167
retry_sleep_s: float = 0.1,
168168
) -> tuple[str, Any]:
169169
"""
@@ -801,7 +801,7 @@ def _get_from_unit(
801801
json: dict | None = None,
802802
timeout=5.0,
803803
return_raw=False,
804-
max_attempts: int = 30,
804+
max_attempts: int = 60,
805805
) -> tuple[str, Any]:
806806
try:
807807
address = resolve_to_address(unit)

core/tests/web/test_tasks.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# -*- coding: utf-8 -*-
2+
from __future__ import annotations
3+
4+
import json
5+
from typing import Any
6+
7+
import pytest
8+
from pioreactor.mureq import Response
9+
from pioreactor.web import tasks
10+
11+
12+
def _response(status_code: int, payload: dict[str, Any]) -> Response:
13+
return Response("http://unit.local", status_code, {}, json.dumps(payload).encode())
14+
15+
16+
def test_get_from_unit_retries_until_result(monkeypatch: pytest.MonkeyPatch) -> None:
17+
# Simulate two pending responses followed by a completed task.
18+
responses = [
19+
_response(202, {"result_url_path": "/unit_api/task_results/abc"}),
20+
_response(202, {"result_url_path": "/unit_api/task_results/abc"}),
21+
_response(200, {"task_id": "abc", "result": {"ok": True}}),
22+
]
23+
24+
# Each request pops the next response in sequence.
25+
def fake_get_from(
26+
address: str, endpoint: str, json: dict | None = None, timeout: float = 5.0
27+
) -> Response:
28+
return responses.pop(0)
29+
30+
monkeypatch.setattr(tasks, "get_from", fake_get_from)
31+
monkeypatch.setattr(tasks, "resolve_to_address", lambda unit: "http://unit.local")
32+
# Avoid test delays from retry sleeps.
33+
monkeypatch.setattr(tasks, "sleep", lambda _: None)
34+
35+
unit, result = tasks._get_from_unit("unit1", "/unit_api/do", max_attempts=2)
36+
37+
assert unit == "unit1"
38+
assert result == {"ok": True}
39+
assert responses == []
40+
41+
42+
def test_get_from_unit_stops_after_max_attempts(monkeypatch: pytest.MonkeyPatch) -> None:
43+
# Simulate a pending response that never resolves within the attempt limit.
44+
responses = [
45+
_response(202, {"result_url_path": "/unit_api/task_results/abc"}),
46+
_response(202, {"result_url_path": "/unit_api/task_results/abc"}),
47+
]
48+
49+
# Each request pops the next response in sequence.
50+
def fake_get_from(
51+
address: str, endpoint: str, json: dict | None = None, timeout: float = 5.0
52+
) -> Response:
53+
return responses.pop(0)
54+
55+
monkeypatch.setattr(tasks, "get_from", fake_get_from)
56+
monkeypatch.setattr(tasks, "resolve_to_address", lambda unit: "http://unit.local")
57+
# Avoid test delays from retry sleeps.
58+
monkeypatch.setattr(tasks, "sleep", lambda _: None)
59+
60+
unit, result = tasks._get_from_unit("unit1", "/unit_api/do", max_attempts=1)
61+
62+
assert unit == "unit1"
63+
assert result is None
64+
assert responses == []

0 commit comments

Comments
 (0)