-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Description
When the queues table offset sequence exceeds Integer.MAX_VALUE (2,147,483,647), the worker enters an infinite loop re-executing the same tasks because queue messages can never be acknowledged.
Context
Migration V1_41__offset_bigint.sql correctly changed the offset column from SERIAL to BIGINT:
ALTER SEQUENCE queues_offset_seq AS BIGINT;
ALTER TABLE queues ALTER COLUMN "offset" TYPE BIGINT;But the Java code in JdbcQueue.receiveImpl still reads offsets as Integer:
// JdbcQueue.java, line 397 (receiveImpl, inTransaction path)
result.map(record -> record.get("offset", Integer.class))
// JdbcQueue.java, line 411 (receiveImpl, non-inTransaction path)
fetch.map(record -> record.get("offset", Integer.class))And PostgresQueue.updateGroupOffsets (line 86) receives List<Integer>.
What happens
When offset > 2,147,483,647, record.get("offset", Integer.class) either:
- Throws a
DataTypeException(jOOQ cannot convert BIGINT to Integer), or - Silently overflows to a wrong positive value
In both cases, updateGroupOffsets fails to set consumer_worker = true:
- Exception case: thrown after
consumer.accept()already dispatched tasks to the worker thread pool (irreversible side effect) → transaction rolls back → messages stay unacknowledged - Overflow case:
UPDATE ... WHERE "offset" IN (<wrong values>)→ 0 rows matched →consumer_workerstaysfalse
The poll loop (immediateRepoll = true by default) immediately re-fetches the same unacknowledged messages, dispatches them again, and so on — every task is executed dozens of times in parallel, creating exponential pod/process creation.
How to reproduce
- Use JDBC/PostgreSQL backend in distributed mode
- Advance the
queues_offset_seqsequence pastInteger.MAX_VALUE:ALTER SEQUENCE queues_offset_seq RESTART WITH 2147483648;
- Trigger any flow with a worker task
- Observe: the task is executed repeatedly,
consumer_workerstaysfalsefor all new messages
Impact on our production
- 16 scheduled flows triggered → 128 pods created simultaneously (20 executions per task, saturating all 128 worker threads)
- 76,583 unacknowledged messages accumulated in the
queuestable - Complete platform outage for ~24h
Suggested fix
Replace Integer.class with Long.class in JdbcQueue.receiveImpl:
// Line 397
result.map(record -> record.get("offset", Long.class))
// Line 411
fetch.map(record -> record.get("offset", Long.class))And update updateGroupOffsets signature from List<Integer> to List<Long> in JdbcQueue and PostgresQueue.
Contributing issue: unacknowledged messages never cleaned up
Migration V1_31__queues_updated_date.sql drops the DEFAULT on the updated column:
ALTER TABLE queues ALTER COLUMN updated DROP NOT NULL;
ALTER TABLE queues ALTER COLUMN updated DROP DEFAULT;Since produceFields() doesn't set updated, new messages are inserted with updated = NULL. The JdbcCleaner deletes where updated <= threshold, which never matches NULL rows. Unacknowledged messages accumulate indefinitely, making the impact worse when the overflow bug triggers.
Minor: inverted compareAndSet in JdbcWorkerJobQueueService.close()
Line 75:
// Current (cleanup never executes):
if (!isStopped.compareAndSet(true, false)) { return; }
// Should be:
if (!isStopped.compareAndSet(false, true)) { return; }Workaround
Stop all components, then:
DELETE FROM queues;
ALTER SEQUENCE queues_offset_seq RESTART WITH 1;Environment
- Kestra Version: v1.1.15 (Helm chart 1.0.41)
- Backend: PostgreSQL 17.6 (AWS RDS)
- Deployment: Distributed on EKS (separate webserver, executor, scheduler, worker)