Skip to content

Commit 5df9444

Browse files
more pios tasks, Assign pioreactors UX
1 parent 320d30a commit 5df9444

File tree

9 files changed

+1068
-24
lines changed

9 files changed

+1068
-24
lines changed

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,14 @@
1111

1212
#### Enhancements
1313

14-
- Added `/api/units/<pioreactor_unit>/jobs/stop/experiments/<experiment>` to mirror worker stop-all behavior.
1514
- Normalized calibration route parameter naming to `calibration_name` across API and unit API calibration endpoints for more consistent endpoint templates.
15+
- Added `pios jobs list` and `pios jobs list running` to inspect worker job history and running jobs from the leader CLI.
16+
- Added `GET /unit_api/jobs` for per-unit job history (ordered by newest first), complementing the existing `GET /unit_api/jobs/running` endpoint.
1617

1718
#### Bug fixes
1819

1920
- Fixed `/api/config/units/$broadcast` to correctly merge each unit's own `config_<unit>.ini` instead of using a shared `config_$broadcast.ini` path.
21+
- Improved the UI's "Assign Pioreactors" dialog: units already assigned to another experiment can now be selected for reassignment, their status text updates to "Will be unassigned from <experiment>" when selected, and "Select all" now applies consistently to all listed units.
2022

2123
### 26.2.3
2224

core/pioreactor/cli/pios.py

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
"""
55
import os
66
from concurrent.futures import ThreadPoolExecutor
7+
from typing import Any
78

89
import click
910
from msgspec import DecodeError
@@ -18,6 +19,7 @@
1819
from pioreactor.exc import SSHError
1920
from pioreactor.logging import create_logger
2021
from pioreactor.mureq import HTTPException
22+
from pioreactor.pubsub import get_from
2123
from pioreactor.pubsub import post_into
2224
from pioreactor.utils.job_manager import ClusterJobManager
2325
from pioreactor.utils.networking import cp_file_across_cluster
@@ -209,6 +211,107 @@ def parse_click_arguments(input_list: list[str]) -> dict: # TODO: typed dict
209211

210212
return {"args": args, "options": opts}
211213

214+
def _format_timestamp_to_seconds(timestamp: str | None) -> str:
215+
if timestamp is None:
216+
return ""
217+
218+
from pioreactor.utils.timing import to_datetime
219+
220+
try:
221+
dt = to_datetime(timestamp)
222+
except ValueError:
223+
return timestamp
224+
225+
return dt.replace(microsecond=0).strftime("%Y-%m-%dT%H:%M:%S")
226+
227+
def _format_job_history_line(
228+
job_id: int,
229+
job_name: str,
230+
experiment: str,
231+
job_source: str | None,
232+
unit: str,
233+
started_at: str,
234+
ended_at: str | None,
235+
) -> str:
236+
job_source_display = job_source or "unknown"
237+
ended_at_display = _format_timestamp_to_seconds(ended_at) or "still running"
238+
job_id_label = click.style(f"[job_id={job_id}]", fg="cyan")
239+
job_name_label = click.style(job_name, fg="green", bold=True)
240+
ended_at_label = (
241+
click.style(ended_at_display, fg="yellow", bold=True) if ended_at is None else ended_at_display
242+
)
243+
244+
return (
245+
f"{job_id_label} {job_name_label} "
246+
f"experiment={experiment}, source={job_source_display}, "
247+
f"started_at={_format_timestamp_to_seconds(started_at)}, ended_at={ended_at_label}"
248+
)
249+
250+
def _show_cluster_job_history(
251+
units: tuple[str, ...],
252+
experiments: tuple[str, ...],
253+
*,
254+
running_only: bool = False,
255+
) -> None:
256+
units = resolve_target_units(units, experiments, active_only=True, include_leader=None)
257+
if len(units) == 0:
258+
click.echo("No jobs recorded.")
259+
return
260+
261+
endpoint = "/unit_api/jobs/running" if running_only else "/unit_api/jobs"
262+
all_rows: list[tuple[int, str, str, str | None, str, str, str | None]] = []
263+
264+
def _thread_function(unit: str) -> tuple[bool, str, list[dict[str, Any]]]:
265+
try:
266+
response = get_from(resolve_to_address(unit), endpoint)
267+
response.raise_for_status()
268+
payload = response.json()
269+
if not isinstance(payload, list):
270+
raise ValueError("Expected list payload")
271+
return True, unit, payload
272+
except (HTTPException, ValueError) as e:
273+
click.echo(f"Unable to list jobs on {unit}: {e}", err=True)
274+
return False, unit, []
275+
276+
with ThreadPoolExecutor(max_workers=len(units)) as executor:
277+
results = executor.map(_thread_function, units)
278+
279+
for _, unit, rows in results:
280+
for raw_row in rows:
281+
if not isinstance(raw_row, dict):
282+
continue
283+
284+
try:
285+
job_id = int(raw_row["job_id"])
286+
job_name = str(raw_row["job_name"])
287+
experiment = str(raw_row["experiment"])
288+
job_source_raw = raw_row.get("job_source")
289+
row_unit = str(raw_row.get("unit", unit))
290+
started_at = str(raw_row["started_at"])
291+
ended_at_raw = raw_row.get("ended_at")
292+
except (KeyError, TypeError, ValueError):
293+
continue
294+
295+
all_rows.append(
296+
(
297+
job_id,
298+
job_name,
299+
experiment,
300+
None if job_source_raw is None else str(job_source_raw),
301+
row_unit,
302+
started_at,
303+
None if ended_at_raw is None else str(ended_at_raw),
304+
)
305+
)
306+
307+
if not all_rows:
308+
click.echo("No jobs recorded.")
309+
return
310+
311+
all_rows.sort(key=lambda job_row: job_row[5], reverse=True)
312+
for job_row in all_rows:
313+
click.echo(_format_job_history_line(*job_row))
314+
212315
def universal_identifier_to_all_active_workers(workers: tuple[str, ...]) -> tuple[str, ...]:
213316
try:
214317
active_workers = get_active_workers_in_inventory()
@@ -782,6 +885,23 @@ def _thread_function(unit: str) -> bool:
782885
if not all(results):
783886
raise click.Abort()
784887

888+
@pios.group(name="jobs", short_help="job-related commands")
889+
def jobs():
890+
"""Interact with worker jobs."""
891+
pass
892+
893+
@jobs.group(name="list", short_help="list jobs current and previous", invoke_without_command=True)
894+
@which_units
895+
@click.pass_context
896+
def list_jobs(ctx, units: tuple[str, ...], experiments: tuple[str, ...]) -> None:
897+
if ctx.invoked_subcommand is None:
898+
_show_cluster_job_history(units, experiments, running_only=False)
899+
900+
@list_jobs.command(name="running", short_help="show status of running job(s)")
901+
@which_units
902+
def list_running_jobs(units: tuple[str, ...], experiments: tuple[str, ...]) -> None:
903+
_show_cluster_job_history(units, experiments, running_only=True)
904+
785905
@pios.command("kill", short_help="kill a job(s) on workers")
786906
@click.option("--all-jobs", is_flag=True, help="kill all worker jobs")
787907
@click.option("--experiment", type=click.STRING)

core/pioreactor/web/tasks.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
from pioreactor import whoami
3939
from pioreactor.config import config as pioreactor_config
4040
from pioreactor.logging import create_logger
41+
from pioreactor.models import CORE_MODELS
4142
from pioreactor.mureq import HTTPErrorStatus
4243
from pioreactor.mureq import HTTPException
4344
from pioreactor.mureq import Response
@@ -286,6 +287,11 @@ def check_model_hardware(model_name: str, model_version: str) -> None:
286287
if model_version != "1.5":
287288
return
288289

290+
if (model_name, model_version) in CORE_MODELS:
291+
display_name = CORE_MODELS[(model_name, model_version)].display_name
292+
else:
293+
display_name = f"{model_name} {model_version}"
294+
289295
try:
290296
addresses = _get_adc_addresses_for_model(model_name, model_version)
291297
except exc.HardwareNotFoundError as err:
@@ -295,21 +301,19 @@ def check_model_hardware(model_name: str, model_version: str) -> None:
295301
return
296302

297303
if not addresses:
298-
logger.debug(
299-
f"Hardware check found no ADC addresses for {model_name} {model_version} on {get_unit_name()}."
300-
)
304+
logger.debug(f"Hardware check found no ADC addresses for {display_name} on {get_unit_name()}.")
301305
return
302306

303307
missing = sorted(addr for addr in addresses if not hardware.is_i2c_device_present(addr))
304308
if missing:
305309
missing_hex = ", ".join(hex(addr) for addr in missing)
306310
logger.warning(
307-
f"Hardware check failed for {model_name} {model_version} on {get_unit_name()}: "
311+
f"Hardware check failed for {display_name} on {get_unit_name()}: "
308312
f"missing I2C devices at {missing_hex}."
309313
)
310314
return
311315

312-
logger.notice(f"Correct hardware found for {model_name} {model_version} on {get_unit_name()}.")
316+
logger.notice(f"Correct hardware found for {display_name} on {get_unit_name()}.")
313317
return
314318

315319

core/pioreactor/web/unit_api.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -594,6 +594,25 @@ def stop_jobs() -> DelayedResponseReturnValue:
594594
return create_task_response(task)
595595

596596

597+
@unit_api_bp.route("/jobs", methods=["GET"])
598+
def get_jobs() -> ResponseReturnValue:
599+
jobs = query_temp_local_metadata_db(
600+
"""
601+
SELECT
602+
job_id,
603+
job_name,
604+
experiment,
605+
job_source,
606+
unit,
607+
started_at,
608+
ended_at
609+
FROM pio_job_metadata
610+
ORDER BY started_at DESC
611+
"""
612+
)
613+
return jsonify(jobs)
614+
615+
597616
@unit_api_bp.route("/jobs/running/experiments/<experiment>", methods=["GET"])
598617
def get_running_jobs_for_experiment(experiment: str) -> ResponseReturnValue:
599618
jobs = query_temp_local_metadata_db(

core/tests/test_cli.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,74 @@ def test_pios_kill_requests() -> None:
243243
assert bucket[1].json == {"experiment": "demo"}
244244

245245

246+
def test_pios_jobs_list_requests_history_endpoint(monkeypatch) -> None:
247+
class DummyResponse:
248+
def raise_for_status(self) -> None:
249+
return
250+
251+
def json(self) -> list[dict[str, str | int | None]]:
252+
return [
253+
{
254+
"job_id": 42,
255+
"job_name": "stirring",
256+
"experiment": "_testing_experiment",
257+
"job_source": "cli",
258+
"unit": "unit1",
259+
"started_at": "2026-01-01T00:00:00.000Z",
260+
"ended_at": "2026-01-01T00:10:00.000Z",
261+
}
262+
]
263+
264+
captured: list[tuple[str, str]] = []
265+
266+
def fake_get_from(address: str, endpoint: str, **_kwargs):
267+
captured.append((address, endpoint))
268+
return DummyResponse()
269+
270+
monkeypatch.setattr("pioreactor.cli.pios.get_from", fake_get_from)
271+
272+
runner = CliRunner()
273+
result = runner.invoke(pios, ["jobs", "list", "--units", "unit1"])
274+
assert result.exit_code == 0
275+
assert captured == [("unit1.local", "/unit_api/jobs")]
276+
assert "[job_id=42]" in result.output
277+
assert "stirring" in result.output
278+
279+
280+
def test_pios_jobs_list_running_requests_running_endpoint(monkeypatch) -> None:
281+
class DummyResponse:
282+
def raise_for_status(self) -> None:
283+
return
284+
285+
def json(self) -> list[dict[str, str | int | None]]:
286+
return [
287+
{
288+
"job_id": 43,
289+
"job_name": "od_reading",
290+
"experiment": "_testing_experiment",
291+
"job_source": "cli",
292+
"unit": "unit1",
293+
"started_at": "2026-01-01T00:00:00.000Z",
294+
"ended_at": None,
295+
}
296+
]
297+
298+
captured: list[tuple[str, str]] = []
299+
300+
def fake_get_from(address: str, endpoint: str, **_kwargs):
301+
captured.append((address, endpoint))
302+
return DummyResponse()
303+
304+
monkeypatch.setattr("pioreactor.cli.pios.get_from", fake_get_from)
305+
306+
runner = CliRunner()
307+
result = runner.invoke(pios, ["jobs", "list", "running", "--units", "unit1"])
308+
assert result.exit_code == 0
309+
assert captured == [("unit1.local", "/unit_api/jobs/running")]
310+
assert "[job_id=43]" in result.output
311+
assert "still running" in result.output
312+
313+
246314
def test_pio_job_info_lists_job() -> None:
247315
runner = CliRunner()
248316
job_name = "test_job"

core/tests/web/test_unit_api.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,3 +167,80 @@ class DummyTask:
167167
data = resp.get_json()
168168
assert data["task_id"] == "task-123"
169169
assert captured["args"] == ("install", "pioreactor-air-bubbler")
170+
171+
172+
def test_get_jobs_returns_history(client) -> None:
173+
from time import sleep
174+
175+
from pioreactor.utils.job_manager import JobManager
176+
177+
with JobManager() as jm:
178+
old_job_id = jm.register_and_set_running(
179+
unit="unit1",
180+
experiment="exp_old",
181+
job_name="old_job",
182+
job_source="test",
183+
pid=1001,
184+
leader="leader",
185+
is_long_running_job=False,
186+
)
187+
jm.set_not_running(old_job_id)
188+
189+
sleep(0.02)
190+
191+
newest_job_id = jm.register_and_set_running(
192+
unit="unit1",
193+
experiment="exp_new",
194+
job_name="new_job",
195+
job_source="test",
196+
pid=1002,
197+
leader="leader",
198+
is_long_running_job=False,
199+
)
200+
201+
response = client.get("/unit_api/jobs")
202+
assert response.status_code == 200
203+
rows = response.get_json()
204+
assert isinstance(rows, list)
205+
assert [row["job_id"] for row in rows[:2]] == [newest_job_id, old_job_id]
206+
assert set(rows[0]) == {
207+
"job_id",
208+
"job_name",
209+
"experiment",
210+
"job_source",
211+
"unit",
212+
"started_at",
213+
"ended_at",
214+
}
215+
216+
217+
def test_get_running_jobs_endpoint_filters_results(client) -> None:
218+
from pioreactor.utils.job_manager import JobManager
219+
220+
with JobManager() as jm:
221+
stopped_job_id = jm.register_and_set_running(
222+
unit="unit1",
223+
experiment="exp_old",
224+
job_name="old_job",
225+
job_source="test",
226+
pid=1003,
227+
leader="leader",
228+
is_long_running_job=False,
229+
)
230+
jm.set_not_running(stopped_job_id)
231+
232+
running_job_id = jm.register_and_set_running(
233+
unit="unit1",
234+
experiment="exp_new",
235+
job_name="new_job",
236+
job_source="test",
237+
pid=1004,
238+
leader="leader",
239+
is_long_running_job=False,
240+
)
241+
242+
response = client.get("/unit_api/jobs/running")
243+
assert response.status_code == 200
244+
rows = response.get_json()
245+
assert isinstance(rows, list)
246+
assert [row["job_id"] for row in rows] == [running_job_id]

0 commit comments

Comments
 (0)