Skip to content

Commit d4d508c

Browse files
more timeouts
1 parent 665df8d commit d4d508c

File tree

3 files changed

+34
-30
lines changed

3 files changed

+34
-30
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
- Fix self-test logging closing prematurely.
3434
- Fix floating point error at the boundary of OD calibrations.
3535
- Fix runtime forward-reference errors in type annotations after dropping `__future__` imports.
36+
- Fix timeouts being too short on some UI export operations
3637

3738
### 25.12.10
3839

core/pioreactor/web/api.py

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -110,20 +110,23 @@ def broadcast_get_across_cluster(endpoint: str, timeout: float = 5.0, return_raw
110110

111111

112112
def broadcast_post_across_cluster(
113-
endpoint: str, json: dict | None = None, params: dict | None = None
113+
endpoint: str,
114+
json: dict | None = None,
115+
params: dict | None = None,
116+
timeout: float = 30.0,
114117
) -> Result:
115118
assert endpoint.startswith("/unit_api")
116-
return tasks.multicast_post(endpoint, get_all_units(), json=json, params=params)
119+
return tasks.multicast_post(endpoint, get_all_units(), json=json, params=params, timeout=timeout)
117120

118121

119-
def broadcast_delete_across_cluster(endpoint: str, json: dict | None = None) -> Result:
122+
def broadcast_delete_across_cluster(endpoint: str, json: dict | None = None, timeout: float = 30.0) -> Result:
120123
assert endpoint.startswith("/unit_api")
121-
return tasks.multicast_delete(endpoint, get_all_units(), json=json)
124+
return tasks.multicast_delete(endpoint, get_all_units(), json=json, timeout=timeout)
122125

123126

124-
def broadcast_patch_across_cluster(endpoint: str, json: dict | None = None) -> Result:
127+
def broadcast_patch_across_cluster(endpoint: str, json: dict | None = None, timeout: float = 30.0) -> Result:
125128
assert endpoint.startswith("/unit_api")
126-
return tasks.multicast_patch(endpoint, get_all_units(), json=json)
129+
return tasks.multicast_patch(endpoint, get_all_units(), json=json, timeout=timeout)
127130

128131

129132
# send only to workers
@@ -147,20 +150,23 @@ def broadcast_get_across_workers_in_experiment(
147150

148151

149152
def broadcast_post_across_workers(
150-
endpoint: str, json: dict | None = None, params: dict | None = None
153+
endpoint: str,
154+
json: dict | None = None,
155+
params: dict | None = None,
156+
timeout: float = 30.0,
151157
) -> Result:
152158
assert endpoint.startswith("/unit_api")
153-
return tasks.multicast_post(endpoint, get_all_workers(), json=json, params=params)
159+
return tasks.multicast_post(endpoint, get_all_workers(), json=json, params=params, timeout=timeout)
154160

155161

156-
def broadcast_delete_across_workers(endpoint: str, json: dict | None = None) -> Result:
162+
def broadcast_delete_across_workers(endpoint: str, json: dict | None = None, timeout: float = 30.0) -> Result:
157163
assert endpoint.startswith("/unit_api")
158-
return tasks.multicast_delete(endpoint, get_all_workers(), json=json)
164+
return tasks.multicast_delete(endpoint, get_all_workers(), json=json, timeout=timeout)
159165

160166

161-
def broadcast_patch_across_workers(endpoint: str, json: dict | None = None) -> Result:
167+
def broadcast_patch_across_workers(endpoint: str, json: dict | None = None, timeout: float = 30.0) -> Result:
162168
assert endpoint.startswith("/unit_api")
163-
return tasks.multicast_patch(endpoint, get_all_workers(), json=json)
169+
return tasks.multicast_patch(endpoint, get_all_workers(), json=json, timeout=timeout)
164170

165171

166172
def _build_single_file_multipart(
@@ -1536,17 +1542,15 @@ def start_calibration_session(pioreactor_unit: str) -> ResponseReturnValue:
15361542
timeout=30,
15371543
)
15381544
response.raise_for_status()
1539-
except (HTTPErrorStatus, HTTPException) as exc:
1545+
except (HTTPErrorStatus, HTTPException):
15401546
detail = _extract_unit_api_error(response)
15411547
if detail:
1542-
publish_to_error_log(f"{exc}: {detail}", "start_calibration_session")
15431548
abort_with(502, f"{detail}")
15441549
if response is not None:
15451550
abort_with(
15461551
502,
15471552
f"Starting calibration session failed on {pioreactor_unit} (HTTP {response.status_code}).",
15481553
)
1549-
publish_to_error_log(str(exc), "start_calibration_session")
15501554
abort_with(502, f"Starting calibration session failed on {pioreactor_unit}.")
15511555

15521556
return Response(
@@ -1569,17 +1573,15 @@ def get_calibration_session(pioreactor_unit: str, session_id: str) -> ResponseRe
15691573
timeout=30,
15701574
)
15711575
response.raise_for_status()
1572-
except (HTTPErrorStatus, HTTPException) as exc:
1576+
except (HTTPErrorStatus, HTTPException):
15731577
detail = _extract_unit_api_error(response)
15741578
if detail:
1575-
publish_to_error_log(f"{exc}: {detail}", "get_calibration_session")
15761579
abort_with(502, f"Fetching calibration session failed on {pioreactor_unit}: {detail}")
15771580
if response is not None:
15781581
abort_with(
15791582
502,
15801583
f"Fetching calibration session failed on {pioreactor_unit} (HTTP {response.status_code}).",
15811584
)
1582-
publish_to_error_log(str(exc), "get_calibration_session")
15831585
abort_with(502, f"Fetching calibration session failed on {pioreactor_unit}.")
15841586

15851587
return Response(
@@ -1607,17 +1609,15 @@ def advance_calibration_session(pioreactor_unit: str, session_id: str) -> Respon
16071609
timeout=300,
16081610
)
16091611
response.raise_for_status()
1610-
except (HTTPErrorStatus, HTTPException) as exc:
1612+
except (HTTPErrorStatus, HTTPException):
16111613
detail = _extract_unit_api_error(response)
16121614
if detail:
1613-
publish_to_error_log(f"{exc}: {detail}", "advance_calibration_session")
16141615
abort_with(502, detail)
16151616
if response is not None:
16161617
abort_with(
16171618
502,
16181619
f"Updating calibration session failed on {pioreactor_unit} (HTTP {response.status_code}).",
16191620
)
1620-
publish_to_error_log(str(exc), "advance_calibration_session")
16211621
abort_with(502, f"Updating calibration session failed on {pioreactor_unit}.")
16221622

16231623
return Response(
@@ -1640,17 +1640,15 @@ def abort_calibration_session(pioreactor_unit: str, session_id: str) -> Response
16401640
timeout=30,
16411641
)
16421642
response.raise_for_status()
1643-
except (HTTPErrorStatus, HTTPException) as exc:
1643+
except (HTTPErrorStatus, HTTPException):
16441644
detail = _extract_unit_api_error(response)
16451645
if detail:
1646-
publish_to_error_log(f"{exc}: {detail}", "abort_calibration_session")
16471646
abort_with(502, f"Aborting calibration session failed on {pioreactor_unit}: {detail}")
16481647
if response is not None:
16491648
abort_with(
16501649
502,
16511650
f"Aborting calibration session failed on {pioreactor_unit} (HTTP {response.status_code}).",
16521651
)
1653-
publish_to_error_log(str(exc), "abort_calibration_session")
16541652
abort_with(502, f"Aborting calibration session failed on {pioreactor_unit}.")
16551653

16561654
return Response(

core/pioreactor/web/tasks.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -890,6 +890,7 @@ def multicast_post(
890890
units: list[str],
891891
json: dict | list[dict | None] | None = None,
892892
params: dict | list[dict | None] | None = None,
893+
timeout: float = 30.0,
893894
) -> dict[str, Any]:
894895
# this function "consumes" one huey thread waiting fyi
895896
assert endpoint.startswith("/unit_api")
@@ -905,7 +906,7 @@ def multicast_post(
905906
tasks = post_into_unit.map(((units[i], endpoint, json[i], params[i]) for i in range(len(units))))
906907

907908
return {
908-
unit: response for (unit, response) in tasks.get(blocking=True, timeout=30)
909+
unit: response for (unit, response) in tasks.get(blocking=True, timeout=timeout)
909910
} # add a timeout so that we don't hold up a thread forever.
910911

911912

@@ -964,7 +965,7 @@ def multicast_get(
964965

965966
tasks = get_from_unit.map(((units[i], endpoint, json[i], timeout, return_raw) for i in range(len(units))))
966967
unsorted_responses = {
967-
unit: response for (unit, response) in tasks.get(blocking=True, timeout=15)
968+
unit: response for (unit, response) in tasks.get(blocking=True, timeout=timeout)
968969
} # add a timeout so that we don't hold up a thread forever.
969970

970971
return dict(sorted(unsorted_responses.items())) # always sort alphabetically for downstream uses.
@@ -996,14 +997,16 @@ def patch_into_unit(unit: str, endpoint: str, json: dict | None = None) -> tuple
996997

997998

998999
@huey.task(priority=50)
999-
def multicast_patch(endpoint: str, units: list[str], json: dict | None = None) -> dict[str, Any]:
1000+
def multicast_patch(
1001+
endpoint: str, units: list[str], json: dict | None = None, timeout: float = 30.0
1002+
) -> dict[str, Any]:
10001003
# this function "consumes" one huey thread waiting fyi
10011004
assert endpoint.startswith("/unit_api")
10021005

10031006
tasks = patch_into_unit.map(((unit, endpoint, json) for unit in units))
10041007

10051008
return {
1006-
unit: response for (unit, response) in tasks.get(blocking=True, timeout=30)
1009+
unit: response for (unit, response) in tasks.get(blocking=True, timeout=timeout)
10071010
} # add a timeout so that we don't hold up a thread forever.
10081011

10091012

@@ -1026,12 +1029,14 @@ def delete_from_unit(unit: str, endpoint: str, json: dict | None = None) -> tupl
10261029

10271030

10281031
@huey.task(priority=5)
1029-
def multicast_delete(endpoint: str, units: list[str], json: dict | None = None) -> dict[str, Any]:
1032+
def multicast_delete(
1033+
endpoint: str, units: list[str], json: dict | None = None, timeout: float = 30.0
1034+
) -> dict[str, Any]:
10301035
# this function "consumes" one huey thread waiting fyi
10311036
assert endpoint.startswith("/unit_api")
10321037

10331038
tasks = delete_from_unit.map(((unit, endpoint, json) for unit in units))
10341039

10351040
return {
1036-
unit: response for (unit, response) in tasks.get(blocking=True, timeout=30)
1041+
unit: response for (unit, response) in tasks.get(blocking=True, timeout=timeout)
10371042
} # add a timeout so that we don't hold up a thread forever.

0 commit comments

Comments
 (0)