Skip to content

Commit 32cba38

Browse files
committed
Consume tasks from multiple databases
1 parent 2ce1196 commit 32cba38

File tree

7 files changed

+100
-202
lines changed

7 files changed

+100
-202
lines changed

poetry.lock

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

settings/dev.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,3 +62,5 @@
6262

6363
# Avoid models.W042 warnings
6464
DEFAULT_AUTO_FIELD = "django.db.models.AutoField"
65+
66+
TASK_PROCESSOR_DATABASES = ["default"]

src/task_processor/processor.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@ def run_tasks(database: str, num_tasks: int = 1) -> list[TaskRun]:
3131
if num_tasks < 1:
3232
raise ValueError("Number of tasks to process must be at least one")
3333

34-
tasks = list(Task.objects.get_tasks_to_process(database, num_tasks))
35-
34+
tasks = Task.objects.get_tasks_to_process(database, num_tasks)
3635
if tasks:
3736
logger.debug(f"Running {len(tasks)} task(s) from database '{database}'")
3837

@@ -55,7 +54,7 @@ def run_tasks(database: str, num_tasks: int = 1) -> list[TaskRun]:
5554
if task_runs:
5655
TaskRun.objects.bulk_create(task_runs)
5756
logger.debug(
58-
f"Finished running {len(task_runs)} task(s) from database '{database}'",
57+
f"Finished running {len(task_runs)} task(s) from database '{database}'"
5958
)
6059

6160
return task_runs
@@ -69,9 +68,7 @@ def run_recurring_tasks(database: str) -> list[RecurringTaskRun]:
6968
# a problem for now, but we should be mindful of this limitation
7069
tasks = RecurringTask.objects.get_tasks_to_process(database)
7170
if tasks:
72-
logger.debug(
73-
f"Running {len(tasks)} recurring task(s) from database '{database}'",
74-
)
71+
logger.debug(f"Running {len(tasks)} recurring task(s)")
7572

7673
task_runs = []
7774

@@ -99,9 +96,7 @@ def run_recurring_tasks(database: str) -> list[RecurringTaskRun]:
9996

10097
if task_runs:
10198
RecurringTaskRun.objects.bulk_create(task_runs)
102-
logger.debug(
103-
f"Finished running {len(task_runs)} recurring task(s) from database '{database}'",
104-
)
99+
logger.debug(f"Finished running {len(task_runs)} recurring task(s)")
105100

106101
return task_runs
107102

src/task_processor/threads.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from datetime import datetime, timedelta
55
from threading import Thread
66

7+
from django.conf import settings
78
from django.db import close_old_connections
89
from django.utils import timezone
910

@@ -95,23 +96,33 @@ def run(self) -> None:
9596
time.sleep(self.sleep_interval_millis / 1000)
9697

9798
def run_iteration(self) -> None:
98-
for database in ["default", "task_processor"]:
99+
"""
100+
Consume and execute tasks from the queue, and run recurring tasks
101+
102+
This method tries to consume tasks from multiple databases as to ensure
103+
that any remaining tasks are processed after opting in or out of a
104+
separate database setup.
105+
"""
106+
database_is_separate = "task_processor" in settings.TASK_PROCESSOR_DATABASES
107+
for database in settings.TASK_PROCESSOR_DATABASES:
99108
try:
100109
run_tasks(database, self.queue_pop_size)
101-
run_recurring_tasks(database)
110+
111+
# Recurring tasks are only run on one database
112+
if (database == "default") ^ database_is_separate:
113+
run_recurring_tasks(database)
102114
except Exception as exception:
103115
# To prevent task threads from dying if they get an error retrieving the tasks from the
104116
# database this will allow the thread to continue trying to retrieve tasks if it can
105117
# successfully re-establish a connection to the database.
106118
# TODO: is this also what is causing tasks to get stuck as locked? Can we unlock
107119
# tasks here?
108-
120+
exception_repr = f"{exception.__class__.__module__}.{repr(exception)}"
109121
logger.error(
110-
"Received error retrieving tasks from database '%s': %s.",
111-
database,
112-
repr(exception),
122+
f"Error handling tasks from database '{database}': {exception_repr}",
113123
exc_info=exception,
114124
)
125+
115126
close_old_connections()
116127

117128
def stop(self) -> None:

tests/unit/task_processor/test_unit_task_processor_managers.py

Lines changed: 0 additions & 166 deletions
This file was deleted.

tests/unit/task_processor/test_unit_task_processor_processor.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,7 @@ def test_run_task_runs_task_and_creates_task_run_object_when_failure(
432432
assert not task.completed
433433

434434
expected_log_records = [
435-
("DEBUG", "Running 1 task(s)"),
435+
("DEBUG", "Running 1 task(s) from database 'default'"),
436436
(
437437
"DEBUG",
438438
f"Running task {task.task_identifier} id={task.id} args={task.args} kwargs={task.kwargs}",
@@ -441,7 +441,7 @@ def test_run_task_runs_task_and_creates_task_run_object_when_failure(
441441
"ERROR",
442442
f"Failed to execute task '{task.task_identifier}', with id {task.id}. Exception: {msg}",
443443
),
444-
("DEBUG", "Finished running 1 task(s)"),
444+
("DEBUG", "Finished running 1 task(s) from database 'default'"),
445445
]
446446

447447
assert expected_log_records == [
@@ -698,7 +698,7 @@ def test_run_tasks_skips_locked_tasks(
698698

699699
# When
700700
# we spawn a new thread to run the first task (configured to just sleep)
701-
task_runner_thread = Thread(target=run_tasks)
701+
task_runner_thread = Thread(target=run_tasks, args=("default",))
702702
task_runner_thread.start()
703703

704704
# and subsequently attempt to run another task in the main thread

0 commit comments

Comments
 (0)