Skip to content

Commit 0a5702f

Browse files
committed
GroupedAsyncAdapter, events in priority
1 parent b96baa2 commit 0a5702f

File tree

3 files changed

+55
-2
lines changed

3 files changed

+55
-2
lines changed

src/adapters/abstract/grouped-async.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ export abstract class GroupedAsyncAdapter extends BaseAdapter implements Adapter
2020
new PQueue({ concurrency: 1 }).once('idle', () => this.queues.delete(message.key))
2121
)
2222
}
23-
resp.push(this.queues.get(message.key)!.add(() => this.baseHandleMessage(message)))
23+
resp.push(
24+
this.queues.get(message.key)!.add(() => this.baseHandleMessage(message), { priority: message.is_event ? 1 : 0 })
25+
)
2426
}
2527
return await Promise.all(resp as ReturnType<typeof this.baseHandleMessage>[])
2628
}

src/transfer.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,8 @@ export class Transfer implements StartStop {
135135
context_id,
136136
error,
137137
attempts,
138-
since_at
138+
since_at,
139+
is_event
139140
from pg_trx_outbox${
140141
this.options.outboxOptions?.partition == null ? '' : `_${this.options.outboxOptions?.partition}`
141142
}

test/adapters.e2e.ts

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,3 +264,53 @@ test('GroupedAsyncAdapter works', async () => {
264264
assert.deepStrictEqual(resp[0].value, handledMessages![0]?.value)
265265
assert.deepStrictEqual(resp[1].value, handledMessages![1]?.value)
266266
})
267+
268+
test('GroupedAsyncAdapter events in priority rather than commands', async () => {
269+
const handledMessages: OutboxMessage[] = []
270+
271+
pgTrxOutbox = new PgTrxOutbox({
272+
adapter: new (class extends GroupedAsyncAdapter {
273+
async start() {}
274+
async stop() {}
275+
override async onHandled(): Promise<void> {}
276+
async handleMessage(message: OutboxMessage) {
277+
handledMessages.push(message)
278+
return { value: { success: true } }
279+
}
280+
})(),
281+
pgOptions: {
282+
host: pgDocker.getHost(),
283+
port: pgDocker.getPort(),
284+
user: pgDocker.getUsername(),
285+
password: pgDocker.getPassword(),
286+
database: pgDocker.getDatabase(),
287+
},
288+
outboxOptions: {
289+
concurrency: true,
290+
pollInterval: 300,
291+
},
292+
})
293+
await pgTrxOutbox.start()
294+
await pg.query(
295+
`
296+
INSERT INTO pg_trx_outbox (topic, "key", value, is_event)
297+
VALUES
298+
('pg.trx.outbox', 'testKey', '{"ok": true}', false),
299+
('pg.trx.outbox', 'testKey', '{"ok": true}', true)
300+
`
301+
)
302+
await setTimeout(1000)
303+
304+
const resp = await pg.query<OutboxMessage>('select * from pg_trx_outbox order by id').then(r => r.rows)
305+
306+
assert.deepEqual(resp[0]?.response, { success: true })
307+
assert.strictEqual(resp[0]?.error, null)
308+
309+
assert.deepEqual(resp[1]?.response, { success: true })
310+
assert.strictEqual(resp[1]?.error, null)
311+
312+
assert.deepStrictEqual(
313+
handledMessages.map(e => e.is_event),
314+
[true, false]
315+
)
316+
})

0 commit comments

Comments
 (0)