Skip to content

Commit 88108c4

Browse files
committed
Signal job failures when recovering dead jobs
The `enqueue_jobs_from_dead_broker.lua` script doesn't re-enqueue jobs if the max_retries was exceeded. This is very surprising behaviour for most people, and because the failure handler is not called it means jobs can be left in an inconsistent state. This change will make sure that the failure handler is called and the job moved to the FAILED state. Drive-by: Add functional tests for concurrency that I forgot to `git add` previously. Drive-by: Add tags file to .gitignore Drive-by: add flake8 to tox venv dependencies so that vscode works better in that venv Fixes: #14
1 parent bdbe80d commit 88108c4

File tree

10 files changed

+123
-16
lines changed

10 files changed

+123
-16
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,5 @@ doc/_build
99

1010
.coverage
1111
.pytest_cache
12+
13+
tags

spinach/brokers/base.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,9 @@ def get_all_brokers(self) -> List[Dict[str, Union[None, str, int]]]:
158158
"""Return all registered brokers."""
159159

160160
@abstractmethod
161-
def enqueue_jobs_from_dead_broker(self, dead_broker_id: uuid.UUID) -> int:
161+
def enqueue_jobs_from_dead_broker(
162+
self, dead_broker_id: uuid.UUID
163+
) -> Tuple[int, list]:
162164
"""Re-enqueue the jobs that were running on a broker.
163165
164166
Only jobs that can be retired are moved back to the queue, the others

spinach/brokers/memory.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,9 +187,11 @@ def get_all_brokers(self) -> List[Dict[str, Union[None, str, int]]]:
187187
# A memory broker is not connected to any other broker
188188
return [self._get_broker_info()]
189189

190-
def enqueue_jobs_from_dead_broker(self, dead_broker_id: uuid.UUID) -> int:
190+
def enqueue_jobs_from_dead_broker(
191+
self, dead_broker_id: uuid.UUID
192+
) -> Tuple[int, list]:
191193
# A memory broker cannot be dead
192-
return 0
194+
return 0, []
193195

194196
def remove_job_from_running(self, job: Job):
195197
"""Remove a job from the list of running ones.

spinach/brokers/redis.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from redis.commands.core import Script
1616

1717
from ..brokers.base import Broker
18-
from ..job import Job, JobStatus
18+
from ..job import Job, JobStatus, advance_job_status
1919
from ..task import Task
2020
from ..const import (
2121
FUTURE_JOBS_KEY, NOTIFICATIONS_KEY, RUNNING_JOBS_KEY,
@@ -177,14 +177,24 @@ def move_future_jobs(self) -> int:
177177
'Worker %s on %s detected dead, re-enqueuing its jobs',
178178
dead_broker['id'], dead_broker['name']
179179
)
180-
num_jobs_re_enqueued = self.enqueue_jobs_from_dead_broker(
181-
uuid.UUID(dead_broker['id'])
180+
num_jobs_re_enqueued, failed_jobs = (
181+
self.enqueue_jobs_from_dead_broker(
182+
uuid.UUID(dead_broker['id'])
183+
)
182184
)
183185
logger.warning(
184186
'Worker %s on %s marked as dead, %d jobs were re-enqueued',
185187
dead_broker['id'], dead_broker['name'], num_jobs_re_enqueued
186188
)
187189

190+
# Mark failed jobs appropriately.
191+
jobs = [Job.deserialize(job) for job in failed_jobs]
192+
err = Exception(
193+
"Worker %s died and max_retries exceeded" % dead_broker['name']
194+
)
195+
for job in jobs:
196+
advance_job_status(self.namespace, job, duration=0.0, err=err)
197+
188198
logger.debug("Redis moved %s job(s) from future to current queues",
189199
num_jobs_moved)
190200
return num_jobs_moved
@@ -287,8 +297,10 @@ def get_all_brokers(self) -> List[Dict[str, Union[None, str, int]]]:
287297
)
288298
return [json.loads(r.decode()) for r in rv]
289299

290-
def enqueue_jobs_from_dead_broker(self, dead_broker_id: uuid.UUID) -> int:
291-
return self._run_script(
300+
def enqueue_jobs_from_dead_broker(
301+
self, dead_broker_id: uuid.UUID
302+
) -> Tuple[int, list]:
303+
return tuple(self._run_script(
292304
self._enqueue_jobs_from_dead_broker,
293305
str(dead_broker_id),
294306
self._to_namespaced(RUNNING_JOBS_KEY.format(dead_broker_id)),
@@ -298,7 +310,7 @@ def enqueue_jobs_from_dead_broker(self, dead_broker_id: uuid.UUID) -> int:
298310
self._to_namespaced(NOTIFICATIONS_KEY),
299311
self._to_namespaced(MAX_CONCURRENCY_KEY),
300312
self._to_namespaced(CURRENT_CONCURRENCY_KEY),
301-
)
313+
))
302314

303315
def register_periodic_tasks(self, tasks: Iterable[Task]):
304316
"""Register tasks that need to be scheduled periodically."""

spinach/brokers/redis_scripts/enqueue_jobs_from_dead_broker.lua

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ local max_concurrency_key = ARGV[7]
88
local current_concurrency_key = ARGV[8]
99

1010
local num_enqueued_jobs = 0
11+
local i = 1
12+
local failed_jobs = {}
1113

1214
-- Get all jobs that were running on the broker before it died
1315
local jobs_json = redis.call('hvals', running_jobs_key)
@@ -16,7 +18,6 @@ for _, job_json in ipairs(jobs_json) do
1618
local job = cjson.decode(job_json)
1719
if job["retries"] < job["max_retries"] then
1820
job["retries"] = job["retries"] + 1
19-
2021
-- Set job status to queued:
2122
-- A major difference between retrying a job failing in a worker and
2223
-- a failing from a dead broker is that the dead broker one is
@@ -39,6 +40,10 @@ for _, job_json in ipairs(jobs_json) do
3940
local queue = string.format("%s/%s", namespace, job["queue"])
4041
redis.call('rpush', queue, job_json)
4142
num_enqueued_jobs = num_enqueued_jobs + 1
43+
else
44+
-- Keep track of jobs that exceeded the max_retries
45+
failed_jobs[i] = job_json
46+
i = i + 1
4247
end
4348

4449
redis.call('hdel', running_jobs_key, job["id"])
@@ -52,4 +57,4 @@ if num_enqueued_jobs > 0 then
5257
redis.call('publish', notifications, '')
5358
end
5459

55-
return num_enqueued_jobs
60+
return {num_enqueued_jobs, failed_jobs}

tests/functional/__init__.py

Whitespace-only changes.
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import pytest
2+
import time
3+
4+
from spinach.brokers.memory import MemoryBroker
5+
from spinach.brokers.redis import RedisBroker
6+
from spinach.engine import Engine
7+
8+
9+
@pytest.fixture(params=[MemoryBroker, RedisBroker])
10+
def spin(request):
11+
broker = request.param
12+
spin = Engine(broker(), namespace='tests')
13+
yield spin
14+
15+
16+
def test_concurrency_limit(spin):
17+
count = 0
18+
19+
@spin.task(name='do_something', max_retries=10, max_concurrency=1)
20+
def do_something(index):
21+
nonlocal count
22+
assert index == count
23+
count += 1
24+
25+
for i in range(0, 5):
26+
spin.schedule(do_something, i)
27+
28+
# Start two workers; test that only one job runs at once as per the
29+
# Task definition.
30+
spin.start_workers(number=2, block=True, stop_when_queue_empty=True)
31+
assert count == 5

tests/test_brokers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ def test_flush(broker):
9696
def test_enqueue_jobs_from_dead_broker(broker):
9797
# Marking a broker that doesn't exist as dead
9898
broker_id = uuid.UUID('62664577-cf89-4f6a-ab16-4e20ec8fe4c2')
99-
assert broker.enqueue_jobs_from_dead_broker(broker_id) == 0
99+
assert broker.enqueue_jobs_from_dead_broker(broker_id) == (0, [])
100100

101101

102102
def test_get_broker_info(broker):

tests/test_redis_brokers.py

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
)
1717
from spinach.job import Job, JobStatus
1818
from spinach.task import Task
19+
from spinach import signals
1920

2021

2122
CONCURRENT_TASK_NAME = 'i_am_concurrent'
@@ -281,7 +282,7 @@ def test_enqueue_jobs_from_dead_broker(broker, broker_2):
281282
assert current == b'1'
282283

283284
# Mark broker as dead, should re-enqueue only the idempotent jobs.
284-
assert broker_2.enqueue_jobs_from_dead_broker(broker._id) == 2
285+
assert broker_2.enqueue_jobs_from_dead_broker(broker._id) == (2, [])
285286

286287
# Check that the current_concurrency was decremented for job_3.
287288
current = broker._r.hget(
@@ -301,11 +302,36 @@ def test_enqueue_jobs_from_dead_broker(broker, broker_2):
301302

302303
# Check that a broker can be marked as dead multiple times
303304
# without duplicating jobs
304-
assert broker_2.enqueue_jobs_from_dead_broker(broker._id) == 0
305+
assert broker_2.enqueue_jobs_from_dead_broker(broker._id) == (0, [])
306+
307+
308+
def test_enqueue_fails_jobs_from_dead_broker_if_max_retries_exceeded(
309+
broker, broker_2
310+
):
311+
job_1 = Job('foo_task', 'foo_queue', datetime.now(timezone.utc), 1)
312+
job_1.retries = 1
313+
job_2 = Job('foo_task', 'foo_queue', datetime.now(timezone.utc), 10)
314+
broker.enqueue_jobs([job_1, job_2])
315+
316+
# Start the job.
317+
broker.get_jobs_from_queue('foo_queue', 100)
318+
319+
# Simulate a dead broker.
320+
num_requeued, failed_jobs = broker_2.enqueue_jobs_from_dead_broker(
321+
broker._id
322+
)
323+
324+
# Check that one was requeued and the one marked failed is job_1.
325+
assert num_requeued == 1
326+
jobs = [Job.deserialize(job.decode()) for job in failed_jobs]
327+
job_1.status = JobStatus.RUNNING
328+
assert [job_1] == jobs
305329

306330

307331
def test_detect_dead_broker(broker, broker_2):
308-
broker_2.enqueue_jobs_from_dead_broker = Mock(return_value=10)
332+
broker_2.enqueue_jobs_from_dead_broker = Mock(
333+
return_value=(10, [])
334+
)
309335

310336
# Register the first broker
311337
broker.move_future_jobs()
@@ -321,8 +347,34 @@ def test_detect_dead_broker(broker, broker_2):
321347
)
322348

323349

350+
def test_dead_jobs_exceeding_max_retries_are_marked_failed(broker, broker_2):
351+
job_1 = Job('foo_task', 'foo_queue', datetime.now(timezone.utc), 1)
352+
job_1.retries = 1
353+
# Register the first broker
354+
broker.move_future_jobs()
355+
broker_2.enqueue_jobs_from_dead_broker = Mock(
356+
return_value=(0, [job_1.serialize()])
357+
)
358+
# Set the 2nd broker to detect dead brokers after 2 seconds of inactivity
359+
broker_2.broker_dead_threshold_seconds = 2
360+
time.sleep(2.1)
361+
362+
signal_called = False
363+
364+
@signals.job_failed.connect
365+
def check_job(namespace, job, err, **kwargs):
366+
nonlocal signal_called
367+
signal_called = True
368+
assert job.status == JobStatus.FAILED
369+
370+
assert 0 == broker_2.move_future_jobs()
371+
assert True is signal_called
372+
373+
324374
def test_not_detect_deregistered_broker_as_dead(broker, broker_2):
325-
broker_2.enqueue_jobs_from_dead_broker = Mock(return_value=10)
375+
broker_2.enqueue_jobs_from_dead_broker = Mock(
376+
return_value=(10, [])
377+
)
326378

327379
# Register and de-register the first broker
328380
broker.move_future_jobs()

tox.ini

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ deps =
1414
pytest-cov
1515
pytest-threadleak
1616
pycodestyle
17+
flake8
1718
flask
1819
django
1920

0 commit comments

Comments
 (0)