Skip to content

Commit 2ce1196

Browse files
committed
WIP: New PoC for processing tasks from multiple databases
1 parent 6519b76 commit 2ce1196

File tree

4 files changed

+71
-161
lines changed

4 files changed

+71
-161
lines changed

src/task_processor/managers.py

Lines changed: 12 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -1,129 +1,27 @@
11
import typing
22

3-
from django.conf import settings
4-
from django.db import connections, transaction
53
from django.db.models import Manager
6-
from django.db.utils import ProgrammingError
7-
from django.utils.connection import ConnectionDoesNotExist
84

95
if typing.TYPE_CHECKING:
10-
from django.db.models.query import RawQuerySet
11-
126
from task_processor.models import RecurringTask, Task
137

148

159
class TaskManager(Manager["Task"]):
16-
def get_tasks_to_process( # noqa: C901
10+
def get_tasks_to_process(
1711
self,
12+
database: str,
1813
num_tasks: int,
19-
skip_old_database: bool = False,
20-
) -> typing.Generator["Task", None, None]:
21-
"""
22-
Retrieve tasks to process from the database
23-
24-
This does its best effort to retrieve tasks from the old database first
25-
"""
26-
if not skip_old_database:
27-
old_database = "default" if self._is_database_separate else "task_processor"
28-
old_tasks = self._fetch_tasks_from(old_database, num_tasks)
29-
30-
# Fetch tasks from the previous database
31-
try:
32-
with transaction.atomic(using=old_database):
33-
first_task = next(old_tasks)
34-
except StopIteration:
35-
pass # Empty set
36-
except ProgrammingError:
37-
pass # Function no longer exists in old database
38-
except ConnectionDoesNotExist:
39-
pass # Database not available
40-
else:
41-
yield first_task
42-
num_tasks -= 1
43-
for task in old_tasks:
44-
yield task
45-
num_tasks -= 1
46-
47-
if num_tasks == 0:
48-
return
49-
50-
new_database = "task_processor" if self._is_database_separate else "default"
51-
new_tasks = self._fetch_tasks_from(new_database, num_tasks)
52-
53-
# Fetch tasks from the new database
54-
try:
55-
with transaction.atomic(using=new_database):
56-
first_task = next(new_tasks)
57-
except StopIteration:
58-
pass # Empty set
59-
except ProgrammingError:
60-
# Function doesn't exist in the database yet
61-
self._create_or_replace_function__get_tasks_to_process()
62-
yield from self.get_tasks_to_process(num_tasks, skip_old_database=True)
63-
else:
64-
yield first_task
65-
yield from new_tasks
66-
67-
@property
68-
def _is_database_separate(self) -> bool:
69-
"""
70-
Check whether the task processor database is separate from the default database
71-
"""
72-
return "task_processor" in settings.DATABASES
73-
74-
def _fetch_tasks_from(
75-
self, database: str, num_tasks: int
76-
) -> typing.Iterator["Task"]:
77-
"""
78-
Retrieve tasks from the specified Django database
79-
"""
80-
return (
81-
self.using(database)
82-
.raw("SELECT * FROM get_tasks_to_process(%s)", [num_tasks])
83-
.iterator()
14+
) -> typing.List["Task"]:
15+
return list(
16+
self.using(database).raw(
17+
"SELECT * FROM get_tasks_to_process(%s)",
18+
[num_tasks],
19+
),
8420
)
8521

86-
def _create_or_replace_function__get_tasks_to_process(self) -> None:
87-
"""
88-
Create or replace the function to get tasks to process.
89-
"""
90-
database = "task_processor" if self._is_database_separate else "default"
91-
with connections[database].cursor() as cursor:
92-
cursor.execute(
93-
"""
94-
CREATE OR REPLACE FUNCTION get_tasks_to_process(num_tasks integer)
95-
RETURNS SETOF task_processor_task AS $$
96-
DECLARE
97-
row_to_return task_processor_task;
98-
BEGIN
99-
-- Select the tasks that needs to be processed
100-
FOR row_to_return IN
101-
SELECT *
102-
FROM task_processor_task
103-
WHERE num_failures < 3 AND scheduled_for < NOW() AND completed = FALSE AND is_locked = FALSE
104-
ORDER BY priority ASC, scheduled_for ASC, created_at ASC
105-
LIMIT num_tasks
106-
-- Select for update to ensure that no other workers can select these tasks while in this transaction block
107-
FOR UPDATE SKIP LOCKED
108-
LOOP
109-
-- Lock every selected task(by updating `is_locked` to true)
110-
UPDATE task_processor_task
111-
-- Lock this row by setting is_locked True, so that no other workers can select these tasks after this
112-
-- transaction is complete (but the tasks are still being executed by the current worker)
113-
SET is_locked = TRUE
114-
WHERE id = row_to_return.id;
115-
-- If we don't explicitly update the `is_locked` column here, the client will receive the row that is actually locked but has the `is_locked` value set to `False`.
116-
row_to_return.is_locked := TRUE;
117-
RETURN NEXT row_to_return;
118-
END LOOP;
119-
120-
RETURN;
121-
END;
122-
$$ LANGUAGE plpgsql
123-
"""
124-
)
125-
12622

12723
class RecurringTaskManager(Manager["RecurringTask"]):
128-
def get_tasks_to_process(self) -> "RawQuerySet[RecurringTask]":
129-
return self.raw("SELECT * FROM get_recurringtasks_to_process()")
24+
def get_tasks_to_process(self, database: str) -> typing.List["RecurringTask"]:
25+
return list(
26+
self.using(database).raw("SELECT * FROM get_recurringtasks_to_process()"),
27+
)

src/task_processor/processor.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,14 @@
2727
UNREGISTERED_RECURRING_TASK_GRACE_PERIOD = timedelta(minutes=30)
2828

2929

30-
def run_tasks(num_tasks: int = 1) -> list[TaskRun]:
30+
def run_tasks(database: str, num_tasks: int = 1) -> list[TaskRun]:
3131
if num_tasks < 1:
3232
raise ValueError("Number of tasks to process must be at least one")
3333

34-
tasks = list(Task.objects.get_tasks_to_process(num_tasks))
34+
tasks = list(Task.objects.get_tasks_to_process(database, num_tasks))
3535

3636
if tasks:
37-
logger.debug(f"Running {len(tasks)} task(s)")
37+
logger.debug(f"Running {len(tasks)} task(s) from database '{database}'")
3838

3939
executed_tasks = []
4040
task_runs = []
@@ -54,20 +54,24 @@ def run_tasks(num_tasks: int = 1) -> list[TaskRun]:
5454

5555
if task_runs:
5656
TaskRun.objects.bulk_create(task_runs)
57-
logger.debug(f"Finished running {len(task_runs)} task(s)")
57+
logger.debug(
58+
f"Finished running {len(task_runs)} task(s) from database '{database}'",
59+
)
5860

5961
return task_runs
6062

6163
return []
6264

6365

64-
def run_recurring_tasks() -> list[RecurringTaskRun]:
66+
def run_recurring_tasks(database: str) -> list[RecurringTaskRun]:
6567
# NOTE: We will probably see a lot of delay in the execution of recurring tasks
6668
# if the tasks take longer then `run_every` to execute. This is not
6769
# a problem for now, but we should be mindful of this limitation
68-
tasks = RecurringTask.objects.get_tasks_to_process()
70+
tasks = RecurringTask.objects.get_tasks_to_process(database)
6971
if tasks:
70-
logger.debug(f"Running {len(tasks)} recurring task(s)")
72+
logger.debug(
73+
f"Running {len(tasks)} recurring task(s) from database '{database}'",
74+
)
7175

7276
task_runs = []
7377

@@ -95,7 +99,9 @@ def run_recurring_tasks() -> list[RecurringTaskRun]:
9599

96100
if task_runs:
97101
RecurringTaskRun.objects.bulk_create(task_runs)
98-
logger.debug(f"Finished running {len(task_runs)} recurring task(s)")
102+
logger.debug(
103+
f"Finished running {len(task_runs)} recurring task(s) from database '{database}'",
104+
)
99105

100106
return task_runs
101107

src/task_processor/threads.py

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -95,18 +95,24 @@ def run(self) -> None:
9595
time.sleep(self.sleep_interval_millis / 1000)
9696

9797
def run_iteration(self) -> None:
98-
try:
99-
run_tasks(self.queue_pop_size)
100-
run_recurring_tasks()
101-
except Exception as e:
102-
# To prevent task threads from dying if they get an error retrieving the tasks from the
103-
# database this will allow the thread to continue trying to retrieve tasks if it can
104-
# successfully re-establish a connection to the database.
105-
# TODO: is this also what is causing tasks to get stuck as locked? Can we unlock
106-
# tasks here?
107-
108-
logger.error("Received error retrieving tasks: %s.", e, exc_info=e)
109-
close_old_connections()
98+
for database in ["default", "task_processor"]:
99+
try:
100+
run_tasks(database, self.queue_pop_size)
101+
run_recurring_tasks(database)
102+
except Exception as exception:
103+
# To prevent task threads from dying if they get an error retrieving the tasks from the
104+
# database this will allow the thread to continue trying to retrieve tasks if it can
105+
# successfully re-establish a connection to the database.
106+
# TODO: is this also what is causing tasks to get stuck as locked? Can we unlock
107+
# tasks here?
108+
109+
logger.error(
110+
"Received error retrieving tasks from database '%s': %s.",
111+
database,
112+
repr(exception),
113+
exc_info=exception,
114+
)
115+
close_old_connections()
110116

111117
def stop(self) -> None:
112118
self._stopped = True

0 commit comments

Comments
 (0)