Skip to content

Commit b692da9

Browse files
authored
Merge pull request #49 from Flowpack/partitioned_waitlist
FEATURE: Partitioned Wait Lists
2 parents ca3421d + c39288f commit b692da9

13 files changed

+703
-142
lines changed

β€ŽREADME.mdβ€Ž

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ This is NOT a fully featured CI pipeline solution.
1919
<!-- TOC -->
2020
* [Badges](#badges)
2121
* [Components](#components)
22-
* [prunner (this repository)](#prunner--this-repository-)
22+
* [prunner (this repository)](#prunner-this-repository)
2323
* [prunner-ui](#prunner-ui)
2424
* [Flowpack.Prunner](#flowpackprunner)
2525
* [User guide](#user-guide)
@@ -32,7 +32,10 @@ This is NOT a fully featured CI pipeline solution.
3232
* [Limiting concurrency](#limiting-concurrency)
3333
* [The wait list](#the-wait-list)
3434
* [Debounce jobs with a start delay](#debounce-jobs-with-a-start-delay)
35+
* [Partitioned Wait Lists](#partitioned-wait-lists)
3536
* [Disabling fail-fast behavior](#disabling-fail-fast-behavior)
37+
* [Error handling with on_error](#error-handling-with-on_error)
38+
* [Important notes:](#important-notes)
3639
* [Configuring retention period](#configuring-retention-period)
3740
* [Handling of child processes](#handling-of-child-processes)
3841
* [Graceful shutdown](#graceful-shutdown)
@@ -44,11 +47,11 @@ This is NOT a fully featured CI pipeline solution.
4447
* [Development](#development)
4548
* [Requirements](#requirements)
4649
* [Running locally](#running-locally)
47-
* [IDE Setup (IntelliJ/GoLand)](#ide-setup--intellijgoland-)
50+
* [IDE Setup (IntelliJ/GoLand)](#ide-setup-intellijgoland)
4851
* [Building for different operating systems.](#building-for-different-operating-systems)
4952
* [Running Tests](#running-tests)
5053
* [Memory Leak Debugging](#memory-leak-debugging)
51-
* [Generate OpenAPI (Swagger) spec](#generate-openapi--swagger--spec)
54+
* [Generate OpenAPI (Swagger) spec](#generate-openapi-swagger-spec)
5255
* [Releasing](#releasing)
5356
* [Security concept](#security-concept)
5457
* [License](#license)
@@ -222,7 +225,8 @@ pipelines:
222225

223226
To deactivate the queuing altogether, set `queue_limit: 0`.
224227

225-
Now, if the queue is limited, an error occurs when it is full and you try to add a new job.
228+
Now, if the queue is limited (and default `queue_strategy: append` is configured),
229+
an error occurs when it is full and you try to add a new job.
226230

227231
Alternatively, you can also set `queue_strategy: replace` to replace the last job in the
228232
queue by the newly added one:
@@ -257,6 +261,7 @@ in form of a zero or positive decimal value with a time unit ("ms", "s", "m", "h
257261
```yaml
258262
pipelines:
259263
do_something:
264+
# NOTE: to prevent starvation, use queue_limit >= 2x
260265
queue_limit: 1
261266
queue_strategy: replace
262267
concurrency: 1
@@ -265,6 +270,52 @@ pipelines:
265270
tasks: # as usual
266271
```
267272

273+
NOTE: If you use `queue_limit: 1` and `start_delay`, you will run into **starvation** (=the job never starts)
274+
if jobs are submitted quicker than `start_delay`. If you instead use `queue_limit: 2` or higher, you can
275+
avoid this issue: Then, the 1st slot will always be worked on after `start_delay`, while the 2nd slot will
276+
be replaced quickly.
277+
278+
### Partitioned Wait Lists
279+
280+
If you have a multi-tenant application, you might want to use **one wait-list per tenant** (e.g. for import jobs),
281+
combined with global `concurrency` limits (depending on the globally available server resources).
282+
283+
To enable this, do the following:
284+
285+
- `queue_strategy: partitioned_replace`: Enabled partitioned wait list
286+
- `queue_partition_limit: 1` (or higher): Configure the number of wait-list slots per tenant. The last slot gets replaced
287+
when the wait-list is full.
288+
- can be combined with arbitrary `start_delay` and `concurrency` as expected
289+
290+
Full example:
291+
292+
```
293+
pipelines:
294+
do_something:
295+
queue_strategy: partitioned_replace
296+
# prevent starvation
297+
queue_partition_limit: 2
298+
concurrency: 1
299+
# Queues a run of the job and only starts it after 10 seconds have passed (if no other run was triggered which replaced the queued job)
300+
start_delay: 10s
301+
tasks: # as usual
302+
303+
```
304+
305+
Additionally, when submitting a job, you need to specify the `queuePartition` argument:
306+
307+
```
308+
POST /pipelines/schedule
309+
310+
{
311+
"pipeline": "my_pipeline",
312+
"variables": {
313+
...
314+
},
315+
"queuePartition": "tenant_foo"
316+
}
317+
```
318+
268319

269320
### Disabling fail-fast behavior
270321

β€Ždefinition/loader_test.goβ€Ž

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,28 @@ func TestLoadRecursively_WithMissingDependency(t *testing.T) {
2323
_, err := LoadRecursively("../test/fixtures/missingDep.yml")
2424
require.EqualError(t, err, `loading ../test/fixtures/missingDep.yml: invalid pipeline definition "test_it": missing task "not_existing" referenced in depends_on of task "test"`)
2525
}
26+
27+
func TestPartitionedWaitlist_OK(t *testing.T) {
28+
_, err := LoadRecursively("../test/fixtures/partitioned_waitlist.yml")
29+
require.NoError(t, err)
30+
}
31+
32+
func TestPartitionedWaitlist_err_0_partition_limit(t *testing.T) {
33+
_, err := LoadRecursively("../test/fixtures/partitioned_waitlist_err_0_partition_limit.yml")
34+
require.EqualError(t, err, `loading ../test/fixtures/partitioned_waitlist_err_0_partition_limit.yml: invalid pipeline definition "test_it": queue_partition_limit must be defined and >=1 if queue_strategy=partitioned_replace`)
35+
}
36+
37+
func TestPartitionedWaitlist_err_no_partition_limit(t *testing.T) {
38+
_, err := LoadRecursively("../test/fixtures/partitioned_waitlist_err_no_partition_limit.yml")
39+
require.EqualError(t, err, `loading ../test/fixtures/partitioned_waitlist_err_no_partition_limit.yml: invalid pipeline definition "test_it": queue_partition_limit must be defined and >=1 if queue_strategy=partitioned_replace`)
40+
}
41+
42+
func TestPartitionedWaitlist_err_queue_limit(t *testing.T) {
43+
_, err := LoadRecursively("../test/fixtures/partitioned_waitlist_err_queue_limit.yml")
44+
require.EqualError(t, err, `loading ../test/fixtures/partitioned_waitlist_err_queue_limit.yml: invalid pipeline definition "test_it": queue_limit is not allowed if queue_strategy=partitioned_replace, use queue_partition_limit instead`)
45+
}
46+
47+
func TestWaitlist_err_partitioned_queue_limit(t *testing.T) {
48+
_, err := LoadRecursively("../test/fixtures/waitlist_err_partitioned_queue_limit.yml")
49+
require.EqualError(t, err, `loading ../test/fixtures/waitlist_err_partitioned_queue_limit.yml: invalid pipeline definition "test_it": queue_partition_limit is not allowed if queue_strategy=append|replace, use queue_limit instead`)
50+
}

β€Ždefinition/pipelines.goβ€Ž

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,15 @@ type OnErrorTaskDef struct {
5252
type PipelineDef struct {
5353
// Concurrency declares how many instances of this pipeline are allowed to execute concurrently (defaults to 1)
5454
Concurrency int `yaml:"concurrency"`
55-
// QueueLimit is the number of slots for queueing jobs if the allowed concurrency is exceeded, defaults to unbounded (nil)
55+
// QueueLimit is the number of slots for queueing jobs if the allowed concurrency is exceeded, defaults to unbounded (nil). Only allowed with queue_strategy=append|replace, not with partitioned_replace (there, use queue_partition_limit instead)
5656
QueueLimit *int `yaml:"queue_limit"`
57+
58+
// QueuePartitionLimit is the number of slots for queueing jobs per partition, if queue_strategy=partitioned_replace is used.
59+
QueuePartitionLimit *int `yaml:"queue_partition_limit"`
60+
5761
// QueueStrategy to use when adding jobs to the queue (defaults to append)
5862
QueueStrategy QueueStrategy `yaml:"queue_strategy"`
63+
5964
// StartDelay will delay the start of a job if the value is greater than zero (defaults to 0)
6065
StartDelay time.Duration `yaml:"start_delay"`
6166

@@ -100,6 +105,19 @@ func (d PipelineDef) validate() error {
100105
return errors.New("start_delay needs queue_limit > 0")
101106
}
102107

108+
if d.QueueStrategy == QueueStrategyPartitionedReplace {
109+
if d.QueueLimit != nil {
110+
return errors.New("queue_limit is not allowed if queue_strategy=partitioned_replace, use queue_partition_limit instead")
111+
}
112+
if d.QueuePartitionLimit == nil || *d.QueuePartitionLimit < 1 {
113+
return errors.New("queue_partition_limit must be defined and >=1 if queue_strategy=partitioned_replace")
114+
}
115+
} else {
116+
if d.QueuePartitionLimit != nil {
117+
return errors.New("queue_partition_limit is not allowed if queue_strategy=append|replace, use queue_limit instead")
118+
}
119+
}
120+
103121
for taskName, taskDef := range d.Tasks {
104122
for _, dependentTask := range taskDef.DependsOn {
105123
_, exists := d.Tasks[dependentTask]
@@ -164,13 +182,21 @@ func (d PipelineDef) Equals(otherDef PipelineDef) bool {
164182
return true
165183
}
166184

185+
// QueueStrategy defines the behavior when jobs wait (=are queued) before pipeline execution.
167186
type QueueStrategy int
168187

169188
const (
170-
// QueueStrategyAppend appends jobs to the queue until queue limit is reached
189+
// QueueStrategyAppend appends jobs to the queue until the queue limit is reached (FIFO)
171190
QueueStrategyAppend QueueStrategy = 0
172-
// QueueStrategyReplace replaces pending jobs (with same variables) instead of appending to the queue
191+
192+
// QueueStrategyReplace replaces the **LAST** pending job if the queue limit is reached. If the queue is not yet full, the job is appended.
193+
// NOTE: if using queue_limit=1 + replace, this can lead to starvation if rapidly enqueuing jobs. If using queue_limit >= 2, this cannot happen anymore.
194+
// (see 2025_08_14_partitioned_waitlist.md for detailed description)
173195
QueueStrategyReplace QueueStrategy = 1
196+
197+
// QueueStrategyPartitionedReplace implements the "partitioned waitlist" strategy, as explained in 2025_08_14_partitioned_waitlist.md.
198+
// -> it replaces the **LAST** pending job of a given partition, if the partition is full (=queue_partition_limit).
199+
QueueStrategyPartitionedReplace QueueStrategy = 2
174200
)
175201

176202
func (s *QueueStrategy) UnmarshalYAML(unmarshal func(interface{}) error) error {
@@ -185,6 +211,8 @@ func (s *QueueStrategy) UnmarshalYAML(unmarshal func(interface{}) error) error {
185211
*s = QueueStrategyAppend
186212
case "replace":
187213
*s = QueueStrategyReplace
214+
case "partitioned_replace":
215+
*s = QueueStrategyPartitionedReplace
188216
default:
189217
return errors.Errorf("unknown queue strategy: %q", strategyName)
190218
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
# FEATURE: Partitioned Waitlists
2+
3+
4+
# Problem description
5+
6+
## Current state
7+
8+
> **FROM README**
9+
>
10+
> By default, if you limit concurrency, and the limit is exceeded, further jobs are added to the
11+
> waitlist of the pipeline.
12+
>
13+
> However, you have some options to configure this as well:
14+
>
15+
> The waitlist can have a maximum size, denoted by `queue_limit`:
16+
>
17+
> ```yaml
18+
> pipelines:
19+
> do_something:
20+
> queue_limit: 1
21+
> concurrency: 1
22+
> tasks: # as usual
23+
> ```
24+
>
25+
> To deactivate the queuing altogether, set `queue_limit: 0`.
26+
>
27+
> Now, if the queue is limited, an error occurs when it is full and you try to add a new job.
28+
>
29+
> Alternatively, you can also set `queue_strategy: replace` to replace the last job in the
30+
> queue by the newly added one:
31+
32+
## Current state -> Problem "Starvation" with "Debounce" ??
33+
34+
> Queues a run of the job and only starts it after 10 seconds have passed (if no other run was triggered which replaced the queued job)
35+
36+
```
37+
--> time (1 char = 1 s)
38+
39+
# 1 waitlist slot, delay 10 s
40+
41+
a____ _____ <-- here the job A is queued
42+
A <-- here job A starts
43+
44+
45+
# 1 waitlist slot, delay 10 s
46+
# STARVATION CAN HAPPEN
47+
a____ b____ c_____ _____
48+
C
49+
50+
51+
# 2 waitlist slots, delay 10 s
52+
# !!! PREVENTS STARVATION !!!
53+
a____ b__c_ ______ _____
54+
[a_] [ab] A <-- here, a starts. NO STARVATION
55+
[ac]
56+
[c_]
57+
```
58+
59+
SUMMARY:
60+
- Starvation can only happen if waitlist size=1; if waitlist size=2 (or bigger) cannot happen because always the LAST job gets replaced.
61+
- We cannot debounce immediately; so we ALWAYS wait at least for start_delay. (not a problem for us right now).
62+
63+
## problem description
64+
65+
In a project, we have 50 Neos instances, which use prunner for background tasks (some are short, some are long).
66+
67+
Currently, we have 4 pipelines globally
68+
- concurrency 1
69+
- concurrency 8
70+
- concurrency 4
71+
- concurrency 4 (import job)
72+
- -> needs the global concurrency to limit the needed server resources
73+
74+
Now, a new pipeline should be added for "irregular import jobs" triggered by webhooks.
75+
- can happen very quickly after each other
76+
- -> Pipeline should start after a certain amount of time (newer should override older pipelines)
77+
- StartDelay combined with QueueStrategy "Replace"
78+
- `waitList[len(waitList)-1] = job` -> *LAST* Element is replaced of wait list.
79+
- -> GLOBAL replacement does not work, because each job has arguments (which are relevant, i.e. tenant ID).
80+
81+
We still want to have a GLOBAL CONCURRENCY LIMIT (per pipeline), but a WAITLIST per instance.
82+
83+
84+
## Solution Idea:
85+
86+
we want to be able to SEGMENT the waitlist into different partitions. The `queue_strategy` and `queue_limit` should be per partition.
87+
`concurrency` stays per pipeline (as before)
88+
89+
```
90+
**LOGICAL VIEW** (Idea)
91+
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
92+
β”‚ Waitlist Instance 1 β”‚ β”‚ Pipeline (with concurrency 2) β”‚
93+
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ β”‚ β”‚
94+
β”‚ Waitlist Instance 2 β”‚ -> β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
95+
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ β”‚ β”‚
96+
β”‚ Waitlist Instance 3 β”‚ β”‚ β”‚
97+
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
98+
99+
if job is *delayed*,
100+
stays in wait list
101+
for this duration
102+
103+
```
104+
105+
Technically, we still have a SINGLE Wait List per pipeline, but the jobs can be partitioned by `waitlist_partition_id`.
106+
107+
-> In this case, the `replace` strategy will replace the last element of the given partition.
108+
109+
-> we create a new queueStrategy for the partitioned waitlist: `partitioned_replace`
110+
111+
If we partition the waitlist, the waitlist can grow up to queue_limit * number of partitions.
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package slice_utils
2+
3+
func Filter[T any](s []T, p func(i T) bool) []T {
4+
var result []T
5+
for _, i := range s {
6+
if p(i) {
7+
result = append(result, i)
8+
}
9+
}
10+
return result
11+
}

0 commit comments

Comments
Β (0)