Skip to content

Commit 6a0c872

Browse files
committed
Fix tests
1 parent 5ac41d0 commit 6a0c872

File tree

10 files changed

+181
-95
lines changed

10 files changed

+181
-95
lines changed

kolibri/core/tasks/job.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import traceback
55
import uuid
66
from collections import namedtuple
7-
from datetime import timedelta
87

98
from kolibri.core.tasks.constants import ( # noqa F401 - imported for backwards compatibility
109
Priority,
@@ -102,10 +101,6 @@ def default_status_text(job):
102101

103102
ALLOWED_RETRY_IN_KWARGS = {"priority", "repeat", "interval", "retry_interval"}
104103

105-
MAX_RETRIES = (
106-
3 # Maximum number of retries for a job that failed due to a retryable exception
107-
)
108-
109104

110105
class Job(object):
111106
"""

kolibri/core/tasks/storage.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -657,11 +657,9 @@ def reschedule_finished_job_if_needed( # noqa: C901
657657
if retry_interval is not NO_VALUE
658658
else orm_job.retry_interval,
659659
max_retries=orm_job.max_retries,
660+
retries=orm_job.retries,
660661
)
661662

662-
if exception is not None:
663-
# TODO: Implement retry on exception logic.
664-
pass
665663
# Set a null new_scheduled_time so that we finish processing if none of the cases below pertain.
666664
new_scheduled_time = None
667665
if delay is not None:
@@ -671,11 +669,9 @@ def reschedule_finished_job_if_needed( # noqa: C901
671669
# enqueuing changes - so if it is still set to repeat, it will repeat again after the
672670
# delayed rerun.
673671
new_scheduled_time = self._now() + delay
674-
elif self._should_retry_on_task_failed(
672+
elif self._should_retry_on_failed_task(
675673
orm_job, exception, kwargs["retry_interval"]
676674
):
677-
# If the task has failed, and a retry interval has been specified (either in the original enqueue,
678-
# or from the passed in kwargs) then requeue as a retry.
679675
new_scheduled_time = self._now() + timedelta(
680676
seconds=kwargs["retry_interval"]
681677
if kwargs["retry_interval"] is not None
@@ -702,7 +698,7 @@ def reschedule_finished_job_if_needed( # noqa: C901
702698
# Use the schedule method so that any scheduling hooks are run for this next run of the job.
703699
self.schedule(new_scheduled_time, job, **kwargs)
704700

705-
def _should_retry_on_task_failed(self, orm_job, exception, retry_interval):
701+
def _should_retry_on_failed_task(self, orm_job, exception, retry_interval):
706702
"""
707703
Determine if a job should be retried based on its retry settings and the exception raised.
708704
"""
@@ -719,8 +715,8 @@ def _should_retry_on_task_failed(self, orm_job, exception, retry_interval):
719715

720716
job = self._orm_to_job(orm_job)
721717
retry_on = job.task.retry_on
722-
if retry_on:
723-
return any(issubclass(exception, exc) for exc in retry_on)
718+
if retry_on and exception:
719+
return any(isinstance(exception, exc) for exc in retry_on)
724720

725721
return True
726722

@@ -831,6 +827,7 @@ def schedule(
831827
interval=0,
832828
repeat=0,
833829
retry_interval=None,
830+
retries=None,
834831
max_retries=None,
835832
):
836833
"""
@@ -867,6 +864,7 @@ def schedule(
867864
interval=interval,
868865
repeat=repeat,
869866
retry_interval=retry_interval,
867+
retries=retries,
870868
max_retries=max_retries,
871869
scheduled_time=naive_utc_datetime(dt),
872870
saved_job=job.to_json(),

kolibri/core/tasks/test/taskrunner/test_job_running.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818
@pytest.fixture
1919
def storage_fixture():
2020
with connection() as conn:
21-
e = Worker(connection=conn)
2221
b = Storage(conn)
22+
b.Base.metadata.create_all(conn)
23+
e = Worker(connection=conn)
2324
b.clear(force=True)
2425
yield b
2526
e.shutdown()

kolibri/core/tasks/test/taskrunner/test_scheduler.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import pytest
44

5+
from kolibri.core.tasks.decorators import register_task
56
from kolibri.core.tasks.exceptions import JobRunning
67
from kolibri.core.tasks.job import Job
78
from kolibri.core.tasks.storage import Storage
@@ -14,15 +15,21 @@
1415
def job_storage():
1516
with connection() as c:
1617
s = Storage(connection=c)
18+
s.Base.metadata.create_all(c)
1719
s.clear(force=True)
1820
yield s
1921
s.clear(force=True)
2022

2123

24+
@register_task
25+
def add(x, y):
26+
return x + y
27+
28+
2229
class TestScheduler(object):
2330
@pytest.fixture
2431
def job(self):
25-
return Job(id)
32+
return Job(add)
2633

2734
def test_enqueue_at_a_function(self, job_storage, job):
2835
job_id = job_storage.enqueue_at(local_now(), job)
@@ -171,7 +178,7 @@ class TestReschedule(TestScheduler):
171178
@pytest.fixture
172179
def job(self, job_storage):
173180
now = local_now()
174-
job_id = job_storage.schedule(now, Job(id), interval=1, repeat=123)
181+
job_id = job_storage.schedule(now, Job(add), interval=1, repeat=123)
175182
return job_storage.get_job(job_id)
176183

177184
def test_reschedule_a_function_gives_job_running_error(self, job_storage, job):

kolibri/core/tasks/test/taskrunner/test_storage.py

Lines changed: 148 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import pytest
66
import pytz
77
from mock import patch
8+
from requests.exceptions import HTTPError
89

910
from kolibri.core.tasks.constants import DEFAULT_QUEUE
1011
from kolibri.core.tasks.constants import Priority
@@ -26,36 +27,43 @@
2627
def defaultbackend():
2728
with connection() as c:
2829
b = Storage(c)
30+
b.Base.metadata.create_all(c)
2931
b.clear(force=True)
3032
yield b
3133
b.clear(force=True)
3234

3335

34-
@pytest.fixture
35-
def func():
36-
@register_task
37-
def add(x, y):
38-
return x + y
36+
@register_task(
37+
retry_on=[ValueError, TypeError],
38+
)
39+
def add(x, y):
40+
return x + y
3941

40-
TaskRegistry["kolibri.core.tasks.test.taskrunner.test_storage.add"] = add
4142

42-
yield add
43-
TaskRegistry.clear()
43+
@pytest.fixture(autouse=True)
44+
def register_add_task():
45+
# register before tests
46+
TaskRegistry[callable_to_import_path(add)] = add
47+
try:
48+
yield
49+
finally:
50+
# clear after tests
51+
TaskRegistry.clear()
4452

4553

4654
@pytest.fixture
47-
def simplejob(func):
48-
return Job(func)
55+
def simplejob():
56+
return Job(add)
4957

5058

5159
class TestBackend:
52-
def test_can_enqueue_single_job(self, defaultbackend, simplejob, func):
60+
def test_can_enqueue_single_job(self, defaultbackend, simplejob):
5361
job_id = defaultbackend.enqueue_job(simplejob, QUEUE)
5462

5563
new_job = defaultbackend.get_job(job_id)
5664

5765
# Does the returned job record the function we set to run?
58-
assert str(new_job.func) == callable_to_import_path(func)
66+
assert str(new_job.func) == callable_to_import_path(add)
5967

6068
# Does the job have the right state (QUEUED)?
6169
assert new_job.state == State.QUEUED
@@ -310,6 +318,134 @@ def test_can_reschedule_finished_job(self, defaultbackend, simplejob):
310318
assert requeued_job.state == State.QUEUED
311319
assert requeued_orm_job.scheduled_time > previous_scheduled_time
312320

321+
def test_job_retry_on_matching_exception(self, defaultbackend, simplejob):
322+
exception = ValueError("Error")
323+
job_id = defaultbackend.enqueue_job(
324+
simplejob, QUEUE, retry_interval=5, max_retries=3
325+
)
326+
defaultbackend.mark_job_as_failed(job_id, exception, "Traceback")
327+
328+
orm_job = defaultbackend.get_orm_job(job_id)
329+
previous_scheduled_time = orm_job.scheduled_time
330+
331+
defaultbackend.reschedule_finished_job_if_needed(
332+
simplejob.job_id, exception=exception
333+
)
334+
requeued_orm_job = defaultbackend.get_orm_job(job_id)
335+
requeued_job = defaultbackend.get_job(job_id)
336+
337+
assert requeued_job.state == State.QUEUED
338+
assert requeued_orm_job.scheduled_time > previous_scheduled_time
339+
assert requeued_orm_job.retries == 1
340+
341+
def test_job_retry_on_matching_exception__no_max_retries(
342+
self, defaultbackend, simplejob
343+
):
344+
exception = ValueError("Error")
345+
job_id = defaultbackend.enqueue_job(simplejob, QUEUE, retry_interval=5)
346+
defaultbackend.mark_job_as_failed(job_id, exception, "Traceback")
347+
348+
orm_job = defaultbackend.get_orm_job(job_id)
349+
previous_scheduled_time = orm_job.scheduled_time
350+
351+
defaultbackend.reschedule_finished_job_if_needed(
352+
simplejob.job_id, exception=exception
353+
)
354+
requeued_orm_job = defaultbackend.get_orm_job(job_id)
355+
requeued_job = defaultbackend.get_job(job_id)
356+
357+
assert requeued_job.state == State.QUEUED
358+
assert requeued_orm_job.scheduled_time > previous_scheduled_time
359+
assert requeued_orm_job.retries == 1
360+
361+
def test_job_retry_on_matching_exception__no_retry_interval(
362+
self, defaultbackend, simplejob
363+
):
364+
exception = TypeError("Error")
365+
job_id = defaultbackend.enqueue_job(simplejob, QUEUE, max_retries=3)
366+
defaultbackend.mark_job_as_failed(job_id, exception, "Traceback")
367+
368+
orm_job = defaultbackend.get_orm_job(job_id)
369+
previous_scheduled_time = orm_job.scheduled_time
370+
371+
defaultbackend.reschedule_finished_job_if_needed(
372+
simplejob.job_id, exception=exception
373+
)
374+
requeued_orm_job = defaultbackend.get_orm_job(job_id)
375+
requeued_job = defaultbackend.get_job(job_id)
376+
377+
assert requeued_job.state == State.QUEUED
378+
assert requeued_orm_job.scheduled_time > previous_scheduled_time
379+
assert requeued_orm_job.retries == 1
380+
381+
def test_job_not_retry_on_matching_exception__no_retry_params(
382+
self, defaultbackend, simplejob
383+
):
384+
# If job has no retry params, it should not retry even if exception matches
385+
exception = ValueError("Error")
386+
job_id = defaultbackend.enqueue_job(simplejob, QUEUE)
387+
defaultbackend.mark_job_as_failed(job_id, exception, "Traceback")
388+
389+
orm_job = defaultbackend.get_orm_job(job_id)
390+
previous_scheduled_time = orm_job.scheduled_time
391+
392+
defaultbackend.reschedule_finished_job_if_needed(
393+
simplejob.job_id, exception=exception
394+
)
395+
requeued_orm_job = defaultbackend.get_orm_job(job_id)
396+
requeued_job = defaultbackend.get_job(job_id)
397+
398+
assert requeued_job.state == State.FAILED
399+
assert requeued_orm_job.scheduled_time == previous_scheduled_time
400+
assert requeued_orm_job.retries is None
401+
402+
def test_job_not_retry_on_non_matching_exception(self, defaultbackend, simplejob):
403+
exception = HTTPError("Error")
404+
job_id = defaultbackend.enqueue_job(
405+
simplejob, QUEUE, retry_interval=5, max_retries=3
406+
)
407+
defaultbackend.mark_job_as_failed(job_id, exception, "Traceback")
408+
409+
orm_job = defaultbackend.get_orm_job(job_id)
410+
previous_scheduled_time = orm_job.scheduled_time
411+
412+
defaultbackend.reschedule_finished_job_if_needed(
413+
simplejob.job_id, exception=exception
414+
)
415+
requeued_orm_job = defaultbackend.get_orm_job(job_id)
416+
requeued_job = defaultbackend.get_job(job_id)
417+
418+
assert requeued_job.state == State.FAILED
419+
assert requeued_orm_job.scheduled_time == previous_scheduled_time
420+
assert requeued_orm_job.retries is None
421+
422+
def test_job_not_retry_on_limit_max_retries(self, defaultbackend, simplejob):
423+
exception = ValueError("Error")
424+
job_id = defaultbackend.enqueue_job(simplejob, QUEUE, max_retries=1)
425+
defaultbackend.mark_job_as_failed(job_id, exception, "Traceback")
426+
427+
# Retry first time
428+
defaultbackend.reschedule_finished_job_if_needed(
429+
simplejob.job_id, exception=exception
430+
)
431+
defaultbackend.mark_job_as_failed(job_id, exception, "Traceback")
432+
433+
orm_job = defaultbackend.get_orm_job(job_id)
434+
previous_scheduled_time = orm_job.scheduled_time
435+
436+
# When trying to retry second time, it should not retry as max_retries is reached
437+
defaultbackend.reschedule_finished_job_if_needed(
438+
simplejob.job_id, exception=exception
439+
)
440+
441+
requeued_orm_job = defaultbackend.get_orm_job(job_id)
442+
requeued_job = defaultbackend.get_job(job_id)
443+
444+
retries = requeued_orm_job.retries
445+
assert requeued_job.state == State.FAILED
446+
assert requeued_orm_job.scheduled_time == previous_scheduled_time
447+
assert retries == 1
448+
313449
def test_reschedule_finished_job_canceled(self, defaultbackend, simplejob):
314450
# Test case where the job is canceled.
315451
job_id = defaultbackend.enqueue_job(simplejob, QUEUE)

kolibri/core/tasks/test/taskrunner/test_worker.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from kolibri.core.tasks.constants import Priority
77
from kolibri.core.tasks.job import Job
88
from kolibri.core.tasks.job import State
9+
from kolibri.core.tasks.storage import Storage
910
from kolibri.core.tasks.test.base import connection
1011
from kolibri.core.tasks.test.taskrunner.test_job_running import EventProxy
1112
from kolibri.core.tasks.worker import Worker
@@ -43,16 +44,20 @@ def toggle_flag(flag_id):
4344
@pytest.fixture
4445
def worker():
4546
with connection() as c:
47+
s = Storage(c)
48+
s.Base.metadata.create_all(c)
4649
b = Worker(c, regular_workers=1, high_workers=1)
4750
b.storage.clear(force=True)
4851
yield b
4952
b.storage.clear(force=True)
53+
s.clear(force=True)
5054
b.shutdown()
5155

5256

5357
def test_keyerror_prevention(worker):
5458
# Create a job with the same ID as the one in worker.enqueue_job_runs_job
5559
job = Job(id, args=(9,))
60+
worker.storage.Base.metadata.create_all(worker.storage.engine)
5661
worker.storage.enqueue_job(job, QUEUE)
5762

5863
while job.state != "COMPLETED":

0 commit comments

Comments
 (0)