Skip to content

Commit 8f5425d

Browse files
authored
Update readme for task queues
1 parent e57e126 commit 8f5425d

File tree

1 file changed

+30
-26
lines changed

1 file changed

+30
-26
lines changed

backend/redis/README.md

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,58 +1,62 @@
11
# Redis backend
22

3-
## Data
4-
5-
### Instances and their state
3+
## Instances and their state
64

75
Instances and their state (started_at, completed_at etc.) are stored as JSON blobs under the `instances-{instanceID}` keys.
86

9-
### History Events
7+
## History Events
108

119
Events are stored in streams per workflow instance under `events-{instanceID}`. We could use a plain list but with streams we can do `XRANGE` queries for a subset of the history for continuation tasks.
1210

13-
### Pending events
11+
## Pending events
1412

1513
Pending events are stored in streams under the `pending-{instanceID}` key.
1614

17-
### Timer events
15+
## Timer events
1816

1917
Timer events are stored in a sorted set. Whenever a client checks for a new workflow instance task, the sorted set is checked to see if any of the pending timer events is ready yet. If it is, it's added to the pending events before those are checked for pending workflow tasks.
2018

21-
### Task queues
19+
## Task queues
2220

2321
We need queues for activities and workflow instances. In both cases, we have tasks being enqueued, workers polling for works, and we have to guarantee that every task is eventually processed. So if a worker has dequeued a task and crashed, for example, eventually we need another worker to pick up the task and finish it.
2422

25-
#### Option 1: `ZSET`
23+
Task queues are implemented using Redis STREAMs.
24+
25+
<details>
26+
<summary>Alternatives considered</summary>
2627

27-
Store keys for queue items in a `ZSET`. The score for the sorted set is the timestamp when the task is unlocked. For new tasks the `SCORE` is the current timestamp.
28+
**Option 1 - `ZSET`**:
2829

29-
Workers make a `ZRANGE` query to that sorted set, looking for tasks where the `SCORE` is in `-inf, now)`. That will get tasksß where the unlock timestamp is in the past. Either because they have just been queued, or the lock has expired.
30+
Store keys for queue items in a `ZSET`. The score for the sorted set is the timestamp when the task is unlocked. For new tasks the `SCORE` is the current timestamp.
3031

31-
Once a worker picks up a task, the score is updated to `now + lock_timeout`. Query and update are done in a transaction with a `WATCH` on the queue key.
32+
Workers make a `ZRANGE` query to that sorted set, looking for tasks where the `SCORE` is in `-inf, now)`. That will get tasksß where the unlock timestamp is in the past. Either because they have just been queued, or the lock has expired.
3233

33-
When a worker is done with a task, it removes it from the `ZSET` (`ZREM`).
34+
Once a worker picks up a task, the score is updated to `now + lock_timeout`. Query and update are done in a transaction with a `WATCH` on the queue key.
3435

35-
Pro:
36-
- No special handling for recovering crashed tasks, they'll automatically unlock
36+
When a worker is done with a task, it removes it from the `ZSET` (`ZREM`).
3737

38-
Con:
39-
- Need for polling and cannot use any of the blocking redis commands
40-
- WAIT with transaction, or a script required
38+
Pro:
39+
- No special handling for recovering crashed tasks, they'll automatically unlock
4140

42-
#### Option 2: `LISTS`
41+
Con:
42+
- Need for polling and cannot use any of the blocking redis commands
43+
- WAIT with transaction, or a script required
4344

44-
Use two `LIST`s, one for the _pending_ queue, one for the _processing_ queue. To enqueue a new task, `LPUSH` it onto the _pending_ list. Also add an entry in a separate `ZSET` where the score is the unlock timestamp. Initially that timestamp will be the current timestamp. The `LPUSH` and `ZADD` are done in a transaction with a `WATCH` on the queue key, and retried if another client modified the queue in the mean time. Alternatively, the two operations can be done in a script.
45+
**Option 2 - `LISTS`**
4546

46-
For picking up tasks, we use a blocking `BLMOVE .. RIGHT LEFT` command to pick up the next available task from _pending_ and move it to _processing_ as a single atomic operation. Once picked up, the `SCORE` in the `ZSET` is adjusted to `now + lock_timeout`.
47+
Use two `LIST`s, one for the _pending_ queue, one for the _processing_ queue. To enqueue a new task, `LPUSH` it onto the _pending_ list. Also add an entry in a separate `ZSET` where the score is the unlock timestamp. Initially that timestamp will be the current timestamp. The `LPUSH` and `ZADD` are done in a transaction with a `WATCH` on the queue key, and retried if another client modified the queue in the mean time. Alternatively, the two operations can be done in a script.
4748

48-
When a worker is done with a task, it removes it from the _processing_ list (`LREM`), and the `ZSET` (`ZREM`).
49+
For picking up tasks, we use a blocking `BLMOVE .. RIGHT LEFT` command to pick up the next available task from _pending_ and move it to _processing_ as a single atomic operation. Once picked up, the `SCORE` in the `ZSET` is adjusted to `now + lock_timeout`.
4950

50-
To recover abandoned tasks, we periodically scan the _processing_ list
51+
When a worker is done with a task, it removes it from the _processing_ list (`LREM`), and the `ZSET` (`ZREM`).
5152

52-
Pro:
53+
To recover abandoned tasks, we periodically scan the _processing_ list
5354

54-
Con:
55-
- Requires periodic scans of the _processing_ list to find tasks that have been abandoned
55+
Pro:
56+
- Blocking call does not require constant polling
5657

57-
#### Option 3: `STREAMS`s
58+
Con:
59+
- Requires periodic scans of the _processing_ list to find tasks that have been abandoned
60+
- Picking up a task, adjusting its ZSET value and the periodic scan could run into race conditions
5861

62+
</details>

0 commit comments

Comments
 (0)