|
1 | 1 | from unittest import mock |
2 | 2 | from contextlib import contextmanager |
3 | | - |
| 3 | +from types import SimpleNamespace |
4 | 4 | import argparse |
| 5 | +import openshift_client as oc |
5 | 6 |
|
6 | 7 | from batchtools.bj import ListJobsCommand |
7 | | -from tests.helpers import DictToObject |
| 8 | + |
| 9 | + |
| 10 | +def make_job(name: str, labels: dict | None = None): |
| 11 | + """Create a fake oc APIObject-like job with attribute access.""" |
| 12 | + md = mock.Mock() |
| 13 | + md.name = name |
| 14 | + md.labels = labels or {} |
| 15 | + model = mock.Mock() |
| 16 | + model.metadata = md |
| 17 | + job = mock.Mock() |
| 18 | + job.model = model |
| 19 | + return job |
| 20 | + |
| 21 | + |
| 22 | +def make_workload_owned_by_job(job_name: str, extra_labels: dict | None = None): |
| 23 | + """Create a fake Workload with ownerReferences pointing to the Job""" |
| 24 | + owner = SimpleNamespace(kind="Job", name=job_name) |
| 25 | + wl_md = mock.Mock() |
| 26 | + wl_md.name = f"wl-for-{job_name}" |
| 27 | + wl_md.ownerReferences = [owner] # attribute-style objects |
| 28 | + wl_md.labels = (extra_labels or {}) | {} # ensure dict |
| 29 | + wl_model = mock.Mock() |
| 30 | + wl_model.metadata = wl_md |
| 31 | + wl = mock.Mock() |
| 32 | + wl.model = wl_model |
| 33 | + return wl |
8 | 34 |
|
9 | 35 |
|
10 | 36 | @contextmanager |
11 | | -def patch_jobs_selector(jobs: list[DictToObject]): |
12 | | - with mock.patch("openshift_client.selector") as mock_selector: |
13 | | - mock_result = mock.Mock(name="result") |
14 | | - mock_result.objects.return_value = jobs |
15 | | - mock_selector.return_value = mock_result |
16 | | - yield mock_selector |
17 | | - |
18 | | - |
19 | | -def test_no_jobs(args: argparse.Namespace, capsys): |
20 | | - with patch_jobs_selector([]): |
21 | | - ListJobsCommand.run(args) |
22 | | - captured = capsys.readouterr() |
23 | | - assert "No jobs found" in captured.out |
24 | | - |
25 | | - |
26 | | -def test_list_jobs(args: argparse.Namespace, capsys): |
27 | | - jobs = [ |
28 | | - DictToObject({"model": {"metadata": {"name": "job1"}}}), |
29 | | - DictToObject({"model": {"metadata": {"name": "job2"}}}), |
30 | | - ] |
31 | | - with patch_jobs_selector(jobs): |
32 | | - ListJobsCommand.run(args) |
33 | | - captured = capsys.readouterr() |
34 | | - assert f"Found {len(jobs)} jobs" in captured.out |
35 | | - for job in jobs: |
36 | | - assert job.model.metadata.name in captured.out |
| 37 | +def patch_selector(jobs, workloads=None, raise_on: str | None = None): |
| 38 | + """ |
| 39 | + Patch oc.selector so that: |
| 40 | + - selector("jobs").objects() -> jobs |
| 41 | + - selector("workloads").objects() -> workloads or [] |
| 42 | + - if raise_on == "jobs"/"workloads", that selector raises OpenShiftPythonException |
| 43 | + """ |
| 44 | + with mock.patch("openshift_client.selector") as sel: |
| 45 | + |
| 46 | + def _sel(kind: str, *args, **kwargs): |
| 47 | + if raise_on == kind: |
| 48 | + raise oc.OpenShiftPythonException("test exception") |
| 49 | + m = mock.Mock(name=f"selector<{kind}>") |
| 50 | + if kind == "jobs": |
| 51 | + m.objects.return_value = jobs |
| 52 | + elif kind == "workloads": |
| 53 | + m.objects.return_value = workloads or [] |
| 54 | + else: |
| 55 | + m.objects.return_value = [] |
| 56 | + return m |
| 57 | + |
| 58 | + sel.side_effect = _sel |
| 59 | + yield sel |
| 60 | + |
| 61 | + |
| 62 | +def test_no_jobs(capsys): |
| 63 | + with patch_selector([]): |
| 64 | + ListJobsCommand.run(argparse.Namespace()) |
| 65 | + out = capsys.readouterr().out |
| 66 | + assert "No jobs found" in out |
| 67 | + |
| 68 | + |
| 69 | +def test_lists_only_kueue_managed_jobs_via_label(capsys): |
| 70 | + # job1 is Kueue-managed via label; job2 is not |
| 71 | + job1 = make_job("job1", labels={"kueue.x-k8s.io/queue-name": "a-queue"}) |
| 72 | + job2 = make_job("job2") |
| 73 | + |
| 74 | + with patch_selector([job1, job2], workloads=[]): |
| 75 | + ListJobsCommand.run(argparse.Namespace()) |
| 76 | + out = capsys.readouterr().out |
| 77 | + |
| 78 | + # total jobs count should reflect both jobs |
| 79 | + assert "Found 2 job(s)" in out |
| 80 | + # only the Kueue-managed job is listed |
| 81 | + assert "- job1" in out |
| 82 | + assert "job2" not in out |
| 83 | + |
| 84 | + |
| 85 | +def test_lists_kueue_managed_jobs_via_workload_owner_ref(capsys): |
| 86 | + # neither job has the Kueue label, but job-b is owned by a Workload |
| 87 | + job_a = make_job("job-a") |
| 88 | + job_b = make_job("job-b") |
| 89 | + wl_for_b = make_workload_owned_by_job("job-b") |
| 90 | + |
| 91 | + with patch_selector([job_a, job_b], workloads=[wl_for_b]): |
| 92 | + ListJobsCommand.run(argparse.Namespace()) |
| 93 | + out = capsys.readouterr().out |
| 94 | + |
| 95 | + assert "Found 2 job(s)" in out |
| 96 | + assert "- job-b" in out |
| 97 | + assert "job-a" not in out # no label and no owning workload |
| 98 | + |
| 99 | + |
| 100 | +def test_selector_exception_exits_cleanly(capsys): |
| 101 | + with patch_selector([], raise_on="jobs"): |
| 102 | + try: |
| 103 | + ListJobsCommand.run(argparse.Namespace()) |
| 104 | + assert False, "Expected SystemExit due to selector error" |
| 105 | + except SystemExit as e: |
| 106 | + assert "Error occurred while retrieving jobs: test exception" in str(e) |
0 commit comments