Skip to content

Commit 5d9bf02

Browse files
authored
[ISD-3853] Fix: add retry limit for reactive jobs (#596)
1 parent 5b822a7 commit 5d9bf02

File tree

4 files changed

+162
-28
lines changed

4 files changed

+162
-28
lines changed

docs/changelog.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,15 @@
22

33
This changelog documents user-relevant changes to the GitHub runner charm.
44

5+
## 2025-07-24
6+
7+
- Fix an issue with infinite retry of a reactive job message.
8+
59
## 2025-07-22
610

711
- Removed support for using both jobmaanger and GitHub at the same time.
812

13+
914
## 2025-07-18
1015

1116
- Fix an issue where flushing runners does not include reactive processes. This cause some reactive runner to spawn with old code after upgrade.

github-runner-manager/src/github_runner_manager/reactive/consumer.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929

3030
PROCESS_COUNT_HEADER_NAME = "X-Process-Count"
3131
WAIT_TIME_IN_SEC = 60
32+
RETRY_LIMIT = 5
3233
# This control message is for testing. The reactive process will stop consuming messages
3334
# when the message is sent. This message does not come from the router.
3435
END_PROCESSING_PAYLOAD = "__END__"
@@ -131,12 +132,27 @@ def consume( # noqa: C901
131132
msg.headers[PROCESS_COUNT_HEADER_NAME] = (
132133
msg.headers.get(PROCESS_COUNT_HEADER_NAME, 0) + 1
133134
)
134-
# Avoid rapid retrying to prevent overloading services, e.g., OpenStack API.
135-
if msg.headers[PROCESS_COUNT_HEADER_NAME] > 1:
136-
sleep(WAIT_TIME_IN_SEC)
135+
msg_process_count = msg.headers[PROCESS_COUNT_HEADER_NAME]
137136

138137
job_details = _parse_job_details(msg)
139138
logger.info("Received reactive job: %s", job_details)
139+
140+
if msg_process_count > RETRY_LIMIT:
141+
logger.warning(
142+
"Retry limit reach for job %s with labels: %s",
143+
job_details.url,
144+
job_details.labels,
145+
)
146+
msg.reject(requeue=False)
147+
continue
148+
149+
if msg_process_count > 1:
150+
logger.info(
151+
"Pause job %s with retry count %s", job_details.url, msg_process_count
152+
)
153+
# Avoid rapid retrying to prevent overloading services, e.g., OpenStack API.
154+
sleep(WAIT_TIME_IN_SEC)
155+
140156
if not _validate_labels(
141157
labels=job_details.labels, supported_labels=supported_labels
142158
):

github-runner-manager/tests/unit/reactive/test_consumer.py

Lines changed: 129 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33

44
import secrets
55
from contextlib import closing
6-
from datetime import datetime, timezone
76
from random import randint
7+
from unittest import mock
88
from unittest.mock import ANY, MagicMock
99

1010
import pytest
@@ -16,9 +16,15 @@
1616
from github_runner_manager.platform.github_provider import GitHubRunnerPlatform
1717
from github_runner_manager.platform.platform_provider import PlatformProvider
1818
from github_runner_manager.reactive import consumer
19-
from github_runner_manager.reactive.consumer import JobError, Labels, get_queue_size
19+
from github_runner_manager.reactive.consumer import (
20+
PROCESS_COUNT_HEADER_NAME,
21+
RETRY_LIMIT,
22+
WAIT_TIME_IN_SEC,
23+
JobError,
24+
Labels,
25+
get_queue_size,
26+
)
2027
from github_runner_manager.reactive.types_ import QueueConfig
21-
from github_runner_manager.types_.github import JobConclusion, JobInfo, JobStatus
2228

2329
IN_MEMORY_URI = "memory://"
2430
FAKE_JOB_ID = "8200803099"
@@ -35,9 +41,10 @@ def queue_config_fixture() -> QueueConfig:
3541

3642

3743
@pytest.fixture(name="mock_sleep", autouse=True)
38-
def mock_sleep_fixture(monkeypatch: pytest.MonkeyPatch) -> None:
44+
def mock_sleep_fixture(monkeypatch: pytest.MonkeyPatch) -> MagicMock:
3945
"""Mock the sleep function."""
40-
monkeypatch.setattr(consumer, "sleep", lambda _: None)
46+
monkeypatch.setattr(consumer, "sleep", mock_sleep := MagicMock())
47+
return mock_sleep
4148

4249

4350
@pytest.mark.parametrize(
@@ -49,11 +56,11 @@ def mock_sleep_fixture(monkeypatch: pytest.MonkeyPatch) -> None:
4956
pytest.param({"LaBeL1", "label2"}, {"label1", "laBeL2"}, id="case insensitive labels"),
5057
],
5158
)
52-
def test_consume(labels: Labels, supported_labels: Labels, queue_config: QueueConfig):
59+
def test_consume(labels: Labels, supported_labels: Labels, queue_config: QueueConfig, mock_sleep):
5360
"""
5461
arrange: A job with valid labels placed in the message queue which has not yet been picked up.
5562
act: Call consume.
56-
assert: A runner is created and the message is removed from the queue.
63+
assert: A runner is created, the message is removed from the queue, sleep is called once.
5764
"""
5865
job_details = consumer.JobDetails(
5966
labels=labels,
@@ -76,6 +83,8 @@ def test_consume(labels: Labels, supported_labels: Labels, queue_config: QueueCo
7683

7784
_assert_queue_is_empty(queue_config.queue_name)
7885

86+
mock_sleep.assert_called_once_with(WAIT_TIME_IN_SEC)
87+
7988

8089
def test_consume_job_manager(queue_config: QueueConfig):
8190
"""
@@ -187,7 +196,9 @@ def test_consume_reject_if_job_gets_not_picked_up(queue_config: QueueConfig):
187196
supported_labels=labels,
188197
)
189198

190-
_assert_msg_has_been_requeued(queue_config.queue_name, job_details.json())
199+
_assert_msg_has_been_requeued(
200+
queue_config.queue_name, job_details.json(), headers={PROCESS_COUNT_HEADER_NAME: 1}
201+
)
191202

192203

193204
def test_consume_reject_if_spawning_failed(queue_config: QueueConfig):
@@ -216,7 +227,9 @@ def test_consume_reject_if_spawning_failed(queue_config: QueueConfig):
216227
supported_labels=labels,
217228
)
218229

219-
_assert_msg_has_been_requeued(queue_config.queue_name, job_details.json())
230+
_assert_msg_has_been_requeued(
231+
queue_config.queue_name, job_details.json(), headers={PROCESS_COUNT_HEADER_NAME: 1}
232+
)
220233

221234

222235
def test_consume_raises_queue_error(monkeypatch: pytest.MonkeyPatch, queue_config: QueueConfig):
@@ -351,34 +364,119 @@ def test_consume_reject_if_labels_not_supported(
351364
_assert_queue_is_empty(queue_config.queue_name)
352365

353366

354-
def _create_job_info(status: JobStatus) -> JobInfo:
355-
"""Create a JobInfo object with the given status.
367+
def test_consume_retried_job_success(queue_config: QueueConfig, mock_sleep: MagicMock):
368+
"""
369+
arrange: A job placed in the message queue which is processed before.
370+
act: Call consume.
371+
assert: A runner is spawned, the message is removed from the queue, and sleep is called two
372+
times.
373+
"""
374+
labels = {secrets.token_hex(16), secrets.token_hex(16)}
375+
job_details = consumer.JobDetails(
376+
labels=labels,
377+
url=FAKE_JOB_URL,
378+
)
379+
_put_in_queue(
380+
job_details.json(), queue_config.queue_name, headers={PROCESS_COUNT_HEADER_NAME: 1}
381+
)
356382

357-
Args:
358-
status: The status of the job.
383+
runner_manager_mock = MagicMock(spec=consumer.RunnerManager)
384+
platform_mock = MagicMock(spec=PlatformProvider)
385+
platform_mock.check_job_been_picked_up.side_effect = [False, True]
359386

360-
Returns:
361-
The JobInfo object.
387+
consumer.consume(
388+
queue_config=queue_config,
389+
runner_manager=runner_manager_mock,
390+
platform_provider=platform_mock,
391+
supported_labels=labels,
392+
)
393+
394+
runner_manager_mock.create_runners.assert_called_once_with(1, metadata=ANY, reactive=True)
395+
396+
_assert_queue_is_empty(queue_config.queue_name)
397+
398+
mock_sleep.assert_has_calls([mock.call(WAIT_TIME_IN_SEC), mock.call(WAIT_TIME_IN_SEC)])
399+
400+
401+
def test_consume_retried_job_failure(queue_config: QueueConfig, mock_sleep: MagicMock):
402+
"""
403+
arrange: A job placed in the message queue which is processed before. Mock runner spawn fail.
404+
act: Call consume.
405+
assert: The message requeued. Sleep called once.
406+
"""
407+
labels = {secrets.token_hex(16), secrets.token_hex(16)}
408+
job_details = consumer.JobDetails(
409+
labels=labels,
410+
url=FAKE_JOB_URL,
411+
)
412+
_put_in_queue(
413+
job_details.json(), queue_config.queue_name, headers={PROCESS_COUNT_HEADER_NAME: 1}
414+
)
415+
416+
runner_manager_mock = MagicMock(spec=consumer.RunnerManager)
417+
runner_manager_mock.create_runners.return_value = tuple()
418+
419+
platform_mock = MagicMock(spec=GitHubRunnerPlatform)
420+
platform_mock.check_job_been_picked_up.side_effect = [False]
421+
422+
consumer.consume(
423+
queue_config=queue_config,
424+
runner_manager=runner_manager_mock,
425+
platform_provider=platform_mock,
426+
supported_labels=labels,
427+
)
428+
429+
_assert_msg_has_been_requeued(
430+
queue_config.queue_name, job_details.json(), headers={PROCESS_COUNT_HEADER_NAME: 2}
431+
)
432+
433+
mock_sleep.assert_called_once_with(WAIT_TIME_IN_SEC)
434+
435+
436+
def test_consume_retried_job_failure_past_limit(queue_config: QueueConfig, mock_sleep: MagicMock):
437+
"""
438+
arrange: A job placed in the message queue which is at the retry limit.
439+
act: Call consume.
440+
assert: Message not requeue, and not processed.
362441
"""
363-
return JobInfo(
364-
created_at=datetime(2021, 10, 1, 0, 0, 0, tzinfo=timezone.utc),
365-
started_at=datetime(2021, 10, 1, 1, 0, 0, tzinfo=timezone.utc),
366-
conclusion=JobConclusion.SUCCESS,
367-
status=status,
368-
job_id=randint(1, 1000),
442+
labels = {secrets.token_hex(16), secrets.token_hex(16)}
443+
job_details = consumer.JobDetails(
444+
labels=labels,
445+
url=FAKE_JOB_URL,
369446
)
447+
_put_in_queue(
448+
job_details.json(),
449+
queue_config.queue_name,
450+
headers={PROCESS_COUNT_HEADER_NAME: RETRY_LIMIT},
451+
)
452+
_put_in_queue(consumer.END_PROCESSING_PAYLOAD, queue_config.queue_name)
453+
454+
runner_manager_mock = MagicMock(spec=consumer.RunnerManager)
455+
platform_mock = MagicMock(spec=GitHubRunnerPlatform)
456+
457+
consumer.consume(
458+
queue_config=queue_config,
459+
runner_manager=runner_manager_mock,
460+
platform_provider=platform_mock,
461+
supported_labels=labels,
462+
)
463+
464+
runner_manager_mock.create_runners.assert_not_called()
465+
platform_mock.check_job_been_picked_up.assert_not_called()
466+
_assert_queue_is_empty(queue_config.queue_name)
370467

371468

372-
def _put_in_queue(msg: str, queue_name: str) -> None:
469+
def _put_in_queue(msg: str, queue_name: str, headers: dict[str, str | int] | None = None) -> None:
373470
"""Put a job in the message queue.
374471
375472
Args:
376473
msg: The job details.
377474
queue_name: The name of the queue
475+
headers: The message headers. Not set if None.
378476
"""
379477
with Connection(IN_MEMORY_URI) as conn:
380478
with closing(conn.SimpleQueue(queue_name)) as simple_queue:
381-
simple_queue.put(msg, retry=True)
479+
simple_queue.put(msg, headers=headers, retry=True)
382480

383481

384482
def _consume_from_queue(queue_name: str) -> Message:
@@ -406,15 +504,21 @@ def _assert_queue_is_empty(queue_name: str) -> None:
406504
assert simple_queue.qsize() == 0
407505

408506

409-
def _assert_msg_has_been_requeued(queue_name: str, payload: str) -> None:
507+
def _assert_msg_has_been_requeued(
508+
queue_name: str, payload: str, headers: dict[str, str | int] | None
509+
) -> None:
410510
"""Assert that the message is requeued.
411511
412512
This will consume message from the queue and assert that the payload is the same as the given.
413513
414514
Args:
415515
queue_name: The name of the queue.
416516
payload: The payload of the message to assert.
517+
headers: The headers to assert for if present.
417518
"""
418519
with Connection(IN_MEMORY_URI) as conn:
419520
with closing(conn.SimpleQueue(queue_name)) as simple_queue:
420-
assert simple_queue.get(block=False).payload == payload
521+
msg = simple_queue.get(block=False)
522+
assert msg.payload == payload
523+
if headers is not None:
524+
assert msg.headers == headers

github-runner-manager/tests/unit/test_runner_scaler.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,15 @@ def issue_events_mock_fixture(monkeypatch: pytest.MonkeyPatch):
8787
return issue_events_mock
8888

8989

90+
@pytest.fixture(scope="function", name="mock_process_manager_subprocess_run")
91+
def mock_process_manager_subprocess_run_fixture(monkeypatch: pytest.MonkeyPatch):
92+
mock_subprocess_run = MagicMock()
93+
monkeypatch.setattr(
94+
"github_runner_manager.reactive.process_manager.secure_run_subprocess", mock_subprocess_run
95+
)
96+
return mock_subprocess_run
97+
98+
9099
@pytest.fixture(scope="function", name="runner_manager")
91100
def runner_manager_fixture(
92101
monkeypatch, mock_runner_managers, github_path: GitHubPath, issue_events_mock

0 commit comments

Comments
 (0)