Skip to content

Commit 96b6c7f

Browse files
authored
Merge pull request #16 from bigjools/issue/14
Signal job failures when recovering dead jobs
2 parents bdbe80d + 88108c4 commit 96b6c7f

File tree

10 files changed

+123
-16
lines changed

10 files changed

+123
-16
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,5 @@ doc/_build
99

1010
.coverage
1111
.pytest_cache
12+
13+
tags

spinach/brokers/base.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,9 @@ def get_all_brokers(self) -> List[Dict[str, Union[None, str, int]]]:
158158
"""Return all registered brokers."""
159159

160160
@abstractmethod
161-
def enqueue_jobs_from_dead_broker(self, dead_broker_id: uuid.UUID) -> int:
161+
def enqueue_jobs_from_dead_broker(
162+
self, dead_broker_id: uuid.UUID
163+
) -> Tuple[int, list]:
162164
"""Re-enqueue the jobs that were running on a broker.
163165
164166
Only jobs that can be retired are moved back to the queue, the others

spinach/brokers/memory.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,9 +187,11 @@ def get_all_brokers(self) -> List[Dict[str, Union[None, str, int]]]:
187187
# A memory broker is not connected to any other broker
188188
return [self._get_broker_info()]
189189

190-
def enqueue_jobs_from_dead_broker(self, dead_broker_id: uuid.UUID) -> int:
190+
def enqueue_jobs_from_dead_broker(
191+
self, dead_broker_id: uuid.UUID
192+
) -> Tuple[int, list]:
191193
# A memory broker cannot be dead
192-
return 0
194+
return 0, []
193195

194196
def remove_job_from_running(self, job: Job):
195197
"""Remove a job from the list of running ones.

spinach/brokers/redis.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from redis.commands.core import Script
1616

1717
from ..brokers.base import Broker
18-
from ..job import Job, JobStatus
18+
from ..job import Job, JobStatus, advance_job_status
1919
from ..task import Task
2020
from ..const import (
2121
FUTURE_JOBS_KEY, NOTIFICATIONS_KEY, RUNNING_JOBS_KEY,
@@ -177,14 +177,24 @@ def move_future_jobs(self) -> int:
177177
'Worker %s on %s detected dead, re-enqueuing its jobs',
178178
dead_broker['id'], dead_broker['name']
179179
)
180-
num_jobs_re_enqueued = self.enqueue_jobs_from_dead_broker(
181-
uuid.UUID(dead_broker['id'])
180+
num_jobs_re_enqueued, failed_jobs = (
181+
self.enqueue_jobs_from_dead_broker(
182+
uuid.UUID(dead_broker['id'])
183+
)
182184
)
183185
logger.warning(
184186
'Worker %s on %s marked as dead, %d jobs were re-enqueued',
185187
dead_broker['id'], dead_broker['name'], num_jobs_re_enqueued
186188
)
187189

190+
# Mark failed jobs appropriately.
191+
jobs = [Job.deserialize(job) for job in failed_jobs]
192+
err = Exception(
193+
"Worker %s died and max_retries exceeded" % dead_broker['name']
194+
)
195+
for job in jobs:
196+
advance_job_status(self.namespace, job, duration=0.0, err=err)
197+
188198
logger.debug("Redis moved %s job(s) from future to current queues",
189199
num_jobs_moved)
190200
return num_jobs_moved
@@ -287,8 +297,10 @@ def get_all_brokers(self) -> List[Dict[str, Union[None, str, int]]]:
287297
)
288298
return [json.loads(r.decode()) for r in rv]
289299

290-
def enqueue_jobs_from_dead_broker(self, dead_broker_id: uuid.UUID) -> int:
291-
return self._run_script(
300+
def enqueue_jobs_from_dead_broker(
301+
self, dead_broker_id: uuid.UUID
302+
) -> Tuple[int, list]:
303+
return tuple(self._run_script(
292304
self._enqueue_jobs_from_dead_broker,
293305
str(dead_broker_id),
294306
self._to_namespaced(RUNNING_JOBS_KEY.format(dead_broker_id)),
@@ -298,7 +310,7 @@ def enqueue_jobs_from_dead_broker(self, dead_broker_id: uuid.UUID) -> int:
298310
self._to_namespaced(NOTIFICATIONS_KEY),
299311
self._to_namespaced(MAX_CONCURRENCY_KEY),
300312
self._to_namespaced(CURRENT_CONCURRENCY_KEY),
301-
)
313+
))
302314

303315
def register_periodic_tasks(self, tasks: Iterable[Task]):
304316
"""Register tasks that need to be scheduled periodically."""

spinach/brokers/redis_scripts/enqueue_jobs_from_dead_broker.lua

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ local max_concurrency_key = ARGV[7]
88
local current_concurrency_key = ARGV[8]
99

1010
local num_enqueued_jobs = 0
11+
local i = 1
12+
local failed_jobs = {}
1113

1214
-- Get all jobs that were running on the broker before it died
1315
local jobs_json = redis.call('hvals', running_jobs_key)
@@ -16,7 +18,6 @@ for _, job_json in ipairs(jobs_json) do
1618
local job = cjson.decode(job_json)
1719
if job["retries"] < job["max_retries"] then
1820
job["retries"] = job["retries"] + 1
19-
2021
-- Set job status to queued:
2122
-- A major difference between retrying a job failing in a worker and
2223
-- a failing from a dead broker is that the dead broker one is
@@ -39,6 +40,10 @@ for _, job_json in ipairs(jobs_json) do
3940
local queue = string.format("%s/%s", namespace, job["queue"])
4041
redis.call('rpush', queue, job_json)
4142
num_enqueued_jobs = num_enqueued_jobs + 1
43+
else
44+
-- Keep track of jobs that exceeded the max_retries
45+
failed_jobs[i] = job_json
46+
i = i + 1
4247
end
4348

4449
redis.call('hdel', running_jobs_key, job["id"])
@@ -52,4 +57,4 @@ if num_enqueued_jobs > 0 then
5257
redis.call('publish', notifications, '')
5358
end
5459

55-
return num_enqueued_jobs
60+
return {num_enqueued_jobs, failed_jobs}

tests/functional/__init__.py

Whitespace-only changes.
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import pytest
2+
import time
3+
4+
from spinach.brokers.memory import MemoryBroker
5+
from spinach.brokers.redis import RedisBroker
6+
from spinach.engine import Engine
7+
8+
9+
@pytest.fixture(params=[MemoryBroker, RedisBroker])
10+
def spin(request):
11+
broker = request.param
12+
spin = Engine(broker(), namespace='tests')
13+
yield spin
14+
15+
16+
def test_concurrency_limit(spin):
17+
count = 0
18+
19+
@spin.task(name='do_something', max_retries=10, max_concurrency=1)
20+
def do_something(index):
21+
nonlocal count
22+
assert index == count
23+
count += 1
24+
25+
for i in range(0, 5):
26+
spin.schedule(do_something, i)
27+
28+
# Start two workers; test that only one job runs at once as per the
29+
# Task definition.
30+
spin.start_workers(number=2, block=True, stop_when_queue_empty=True)
31+
assert count == 5

tests/test_brokers.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ def test_flush(broker):
9696
def test_enqueue_jobs_from_dead_broker(broker):
9797
# Marking a broker that doesn't exist as dead
9898
broker_id = uuid.UUID('62664577-cf89-4f6a-ab16-4e20ec8fe4c2')
99-
assert broker.enqueue_jobs_from_dead_broker(broker_id) == 0
99+
assert broker.enqueue_jobs_from_dead_broker(broker_id) == (0, [])
100100

101101

102102
def test_get_broker_info(broker):

tests/test_redis_brokers.py

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
)
1717
from spinach.job import Job, JobStatus
1818
from spinach.task import Task
19+
from spinach import signals
1920

2021

2122
CONCURRENT_TASK_NAME = 'i_am_concurrent'
@@ -281,7 +282,7 @@ def test_enqueue_jobs_from_dead_broker(broker, broker_2):
281282
assert current == b'1'
282283

283284
# Mark broker as dead, should re-enqueue only the idempotent jobs.
284-
assert broker_2.enqueue_jobs_from_dead_broker(broker._id) == 2
285+
assert broker_2.enqueue_jobs_from_dead_broker(broker._id) == (2, [])
285286

286287
# Check that the current_concurrency was decremented for job_3.
287288
current = broker._r.hget(
@@ -301,11 +302,36 @@ def test_enqueue_jobs_from_dead_broker(broker, broker_2):
301302

302303
# Check that a broker can be marked as dead multiple times
303304
# without duplicating jobs
304-
assert broker_2.enqueue_jobs_from_dead_broker(broker._id) == 0
305+
assert broker_2.enqueue_jobs_from_dead_broker(broker._id) == (0, [])
306+
307+
308+
def test_enqueue_fails_jobs_from_dead_broker_if_max_retries_exceeded(
309+
broker, broker_2
310+
):
311+
job_1 = Job('foo_task', 'foo_queue', datetime.now(timezone.utc), 1)
312+
job_1.retries = 1
313+
job_2 = Job('foo_task', 'foo_queue', datetime.now(timezone.utc), 10)
314+
broker.enqueue_jobs([job_1, job_2])
315+
316+
# Start the job.
317+
broker.get_jobs_from_queue('foo_queue', 100)
318+
319+
# Simulate a dead broker.
320+
num_requeued, failed_jobs = broker_2.enqueue_jobs_from_dead_broker(
321+
broker._id
322+
)
323+
324+
# Check that one was requeued and the one marked failed is job_1.
325+
assert num_requeued == 1
326+
jobs = [Job.deserialize(job.decode()) for job in failed_jobs]
327+
job_1.status = JobStatus.RUNNING
328+
assert [job_1] == jobs
305329

306330

307331
def test_detect_dead_broker(broker, broker_2):
308-
broker_2.enqueue_jobs_from_dead_broker = Mock(return_value=10)
332+
broker_2.enqueue_jobs_from_dead_broker = Mock(
333+
return_value=(10, [])
334+
)
309335

310336
# Register the first broker
311337
broker.move_future_jobs()
@@ -321,8 +347,34 @@ def test_detect_dead_broker(broker, broker_2):
321347
)
322348

323349

350+
def test_dead_jobs_exceeding_max_retries_are_marked_failed(broker, broker_2):
351+
job_1 = Job('foo_task', 'foo_queue', datetime.now(timezone.utc), 1)
352+
job_1.retries = 1
353+
# Register the first broker
354+
broker.move_future_jobs()
355+
broker_2.enqueue_jobs_from_dead_broker = Mock(
356+
return_value=(0, [job_1.serialize()])
357+
)
358+
# Set the 2nd broker to detect dead brokers after 2 seconds of inactivity
359+
broker_2.broker_dead_threshold_seconds = 2
360+
time.sleep(2.1)
361+
362+
signal_called = False
363+
364+
@signals.job_failed.connect
365+
def check_job(namespace, job, err, **kwargs):
366+
nonlocal signal_called
367+
signal_called = True
368+
assert job.status == JobStatus.FAILED
369+
370+
assert 0 == broker_2.move_future_jobs()
371+
assert True is signal_called
372+
373+
324374
def test_not_detect_deregistered_broker_as_dead(broker, broker_2):
325-
broker_2.enqueue_jobs_from_dead_broker = Mock(return_value=10)
375+
broker_2.enqueue_jobs_from_dead_broker = Mock(
376+
return_value=(10, [])
377+
)
326378

327379
# Register and de-register the first broker
328380
broker.move_future_jobs()

tox.ini

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ deps =
1414
pytest-cov
1515
pytest-threadleak
1616
pycodestyle
17+
flake8
1718
flask
1819
django
1920

0 commit comments

Comments
 (0)