Skip to content

Commit afad110

Browse files
dahliaclaude
andcommitted
Fix PostgresMessageQueue to use distributed advisory locks
Replace the in-memory processingKeys Set with PostgreSQL advisory locks for proper distributed ordering key support across multiple workers. The previous implementation only tracked processing keys within a single process, which meant messages with the same ordering key could be processed concurrently by different worker instances. Now uses pg_try_advisory_lock(hashtext(tableName), hashtext(orderingKey)) to acquire distributed locks before processing messages. The lock is released with pg_advisory_unlock after the message handler completes. This ensures that messages with the same ordering key are processed sequentially even in a distributed multi-worker environment. Co-Authored-By: Claude <[email protected]>
1 parent 154cb91 commit afad110

File tree

1 file changed

+30
-32
lines changed
  • packages/postgres/src

1 file changed

+30
-32
lines changed

packages/postgres/src/mq.ts

Lines changed: 30 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -159,49 +159,47 @@ export class PostgresMessageQueue implements MessageQueue {
159159
): Promise<void> {
160160
await this.initialize();
161161
const { signal } = options;
162-
const processingKeys = new Set<string>();
163162
const poll = async () => {
164163
while (!signal?.aborted) {
165-
// Build the query to exclude messages with ordering keys currently
166-
// being processed. Messages without an ordering key (null) can always
167-
// be processed.
168-
const lockedKeys = [...processingKeys];
169-
const query = lockedKeys.length > 0
170-
? this.#sql`
171-
DELETE FROM ${this.#sql(this.#tableName)}
172-
WHERE id = (
173-
SELECT id
174-
FROM ${this.#sql(this.#tableName)}
175-
WHERE created + delay < CURRENT_TIMESTAMP
176-
AND (ordering_key IS NULL
177-
OR ordering_key NOT IN ${this.#sql(lockedKeys)})
178-
ORDER BY created
179-
LIMIT 1
180-
)
181-
RETURNING message, ordering_key;
182-
`.execute()
183-
: this.#sql`
184-
DELETE FROM ${this.#sql(this.#tableName)}
185-
WHERE id = (
186-
SELECT id
187-
FROM ${this.#sql(this.#tableName)}
188-
WHERE created + delay < CURRENT_TIMESTAMP
189-
ORDER BY created
190-
LIMIT 1
191-
)
192-
RETURNING message, ordering_key;
193-
`.execute();
164+
// Use PostgreSQL advisory locks for distributed ordering key locking.
165+
// pg_try_advisory_lock returns true if the lock was acquired, false
166+
// otherwise. We use hashtext() to convert the table name and ordering
167+
// key to integers for the lock ID. Messages without an ordering key
168+
// (null) can always be processed.
169+
const query = this.#sql`
170+
DELETE FROM ${this.#sql(this.#tableName)}
171+
WHERE id = (
172+
SELECT id
173+
FROM ${this.#sql(this.#tableName)}
174+
WHERE created + delay < CURRENT_TIMESTAMP
175+
AND (ordering_key IS NULL
176+
OR pg_try_advisory_lock(
177+
hashtext(${this.#tableName}),
178+
hashtext(ordering_key)
179+
))
180+
ORDER BY created
181+
LIMIT 1
182+
)
183+
RETURNING message, ordering_key;
184+
`.execute();
194185
const cancel = query.cancel.bind(query);
195186
signal?.addEventListener("abort", cancel);
196187
let i = 0;
197188
for (const row of await query) {
198189
if (signal?.aborted) return;
199190
const orderingKey = row.ordering_key as string | null;
200-
if (orderingKey != null) processingKeys.add(orderingKey);
201191
try {
202192
await handler(row.message);
203193
} finally {
204-
if (orderingKey != null) processingKeys.delete(orderingKey);
194+
// Release the distributed advisory lock if we acquired one
195+
if (orderingKey != null) {
196+
await this.#sql`
197+
SELECT pg_advisory_unlock(
198+
hashtext(${this.#tableName}),
199+
hashtext(${orderingKey})
200+
);
201+
`;
202+
}
205203
}
206204
i++;
207205
}

0 commit comments

Comments
 (0)