Skip to content

Commit 09f377f

Browse files
Wording improvements for the TaskScheduler (#17992)
As I found the current docstrings a bit unclear while trying to wrap my head around this class.
1 parent f1b0f9a commit 09f377f

File tree

2 files changed

+54
-37
lines changed

2 files changed

+54
-37
lines changed

changelog.d/17992.doc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Improve documentation for the `TaskScheduler` class.

synapse/util/task_scheduler.py

Lines changed: 53 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -46,33 +46,43 @@
4646

4747
class TaskScheduler:
4848
"""
49-
This is a simple task sheduler aimed at resumable tasks: usually we use `run_in_background`
50-
to launch a background task, or Twisted `deferLater` if we want to do so later on.
51-
52-
The problem with that is that the tasks will just stop and never be resumed if synapse
53-
is stopped for whatever reason.
54-
55-
How this works:
56-
- A function mapped to a named action should first be registered with `register_action`.
57-
This function will be called when trying to resuming tasks after a synapse shutdown,
58-
so this registration should happen when synapse is initialised, NOT right before scheduling
59-
a task.
60-
- A task can then be launched using this named action with `schedule_task`. A `params` dict
61-
can be passed, and it will be available to the registered function when launched. This task
62-
can be launch either now-ish, or later on by giving a `timestamp` parameter.
63-
64-
The function may call `update_task` at any time to update the `result` of the task,
65-
and this can be used to resume the task at a specific point and/or to convey a result to
66-
the code launching the task.
67-
You can also specify the `result` (and/or an `error`) when returning from the function.
68-
69-
The reconciliation loop runs every minute, so this is not a precise scheduler.
70-
There is a limit of 10 concurrent tasks, so tasks may be delayed if the pool is already
71-
full. In this regard, please take great care that scheduled tasks can actually finished.
72-
For now there is no mechanism to stop a running task if it is stuck.
73-
74-
Tasks will be run on the worker specified with `run_background_tasks_on` config,
75-
or the main one by default.
49+
This is a simple task scheduler designed for resumable tasks. Normally,
50+
you'd use `run_in_background` to start a background task or Twisted's
51+
`deferLater` if you want to run it later.
52+
53+
The issue is that these tasks stop completely and won't resume if Synapse is
54+
shut down for any reason.
55+
56+
Here's how it works:
57+
58+
- Register an Action: First, you need to register a function to a named
59+
action using `register_action`. This function will be called to resume tasks
60+
after a Synapse shutdown. Make sure to register it when Synapse initializes,
61+
not right before scheduling the task.
62+
63+
- Schedule a Task: You can launch a task linked to the named action
64+
using `schedule_task`. You can pass a `params` dictionary, which will be
65+
passed to the registered function when it's executed. Tasks can be scheduled
66+
to run either immediately or later by specifying a `timestamp`.
67+
68+
- Update Task: The function handling the task can call `update_task` at
69+
any point to update the task's `result`. This lets you resume the task from
70+
a specific point or pass results back to the code that scheduled it. When
71+
the function completes, you can also return a `result` or an `error`.
72+
73+
Things to keep in mind:
74+
75+
- The reconciliation loop runs every minute, so this is not a high-precision
76+
scheduler.
77+
78+
- Only 10 tasks can run at the same time. If the pool is full, tasks may be
79+
delayed. Make sure your scheduled tasks can actually finish.
80+
81+
- Currently, there's no way to stop a task if it gets stuck.
82+
83+
- Tasks will run on the worker defined by the `run_background_tasks_on`
84+
setting in your configuration. If no worker is specified, they'll run on
85+
the main one by default.
7686
"""
7787

7888
# Precision of the scheduler, evaluation of tasks to run will only happen
@@ -157,7 +167,7 @@ async def schedule_task(
157167
params: Optional[JsonMapping] = None,
158168
) -> str:
159169
"""Schedule a new potentially resumable task. A function matching the specified
160-
`action` should have be registered with `register_action` before the task is run.
170+
`action` should've been registered with `register_action` before the task is run.
161171
162172
Args:
163173
action: the name of a previously registered action
@@ -210,22 +220,28 @@ async def update_task(
210220
result: Optional[JsonMapping] = None,
211221
error: Optional[str] = None,
212222
) -> bool:
213-
"""Update some task associated values. This is exposed publicly so it can
214-
be used inside task functions, mainly to update the result and be able to
215-
resume a task at a specific step after a restart of synapse.
223+
"""Update some task-associated values. This is exposed publicly so it can
224+
be used inside task functions, mainly to update the result or resume
225+
a task at a specific step after a restart of synapse.
216226
217227
It can also be used to stage a task, by setting the `status` to `SCHEDULED` with
218228
a new timestamp.
219229
220-
The `status` can only be set to `ACTIVE` or `SCHEDULED`, `COMPLETE` and `FAILED`
221-
are terminal status and can only be set by returning it in the function.
230+
The `status` can only be set to `ACTIVE` or `SCHEDULED`. `COMPLETE` and `FAILED`
231+
are terminal statuses and can only be set by returning them from the function.
222232
223233
Args:
224234
id: the id of the task to update
225235
timestamp: useful to schedule a new stage of the task at a later date
226236
status: the new `TaskStatus` of the task
227237
result: the new result of the task
228238
error: the new error of the task
239+
240+
Returns:
241+
True if the update was successful, False otherwise.
242+
243+
Raises:
244+
Exception: If a status other than `ACTIVE` or `SCHEDULED` was passed.
229245
"""
230246
if status == TaskStatus.COMPLETE or status == TaskStatus.FAILED:
231247
raise Exception(
@@ -263,9 +279,9 @@ async def get_tasks(
263279
max_timestamp: Optional[int] = None,
264280
limit: Optional[int] = None,
265281
) -> List[ScheduledTask]:
266-
"""Get a list of tasks. Returns all the tasks if no args is provided.
282+
"""Get a list of tasks. Returns all the tasks if no args are provided.
267283
268-
If an arg is `None` all tasks matching the other args will be selected.
284+
If an arg is `None`, all tasks matching the other args will be selected.
269285
If an arg is an empty list, the corresponding value of the task needs
270286
to be `None` to be selected.
271287
@@ -277,8 +293,8 @@ async def get_tasks(
277293
a timestamp inferior to the specified one
278294
limit: Only return `limit` number of rows if set.
279295
280-
Returns
281-
A list of `ScheduledTask`, ordered by increasing timestamps
296+
Returns:
297+
A list of `ScheduledTask`, ordered by increasing timestamps.
282298
"""
283299
return await self._store.get_scheduled_tasks(
284300
actions=actions,

0 commit comments

Comments
 (0)