Skip to content

Commit 495a0bc

Browse files
authored
fix: use exactly once queue semantics for specific events (#755)
1 parent 6e33350 commit 495a0bc

File tree

12 files changed

+188
-21
lines changed

12 files changed

+188
-21
lines changed

package-lock.json

Lines changed: 8 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@
7676
"multistream": "^4.1.0",
7777
"object-sizeof": "^2.6.4",
7878
"pg": "^8.12.0",
79-
"pg-boss": "^10.2.0",
79+
"pg-boss": "github:supabase/pg-boss#feat/exactly_once",
8080
"pg-listen": "^1.7.0",
8181
"pino": "^9.7.0",
8282
"pino-logflare": "^0.5.2",

src/http/routes/admin/queue.ts

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,34 @@
1-
import { FastifyInstance } from 'fastify'
1+
import { FastifyInstance, RequestGenericInterface } from 'fastify'
22
import apiKey from '../../plugins/apikey'
33
import { getConfig } from '../../../config'
4-
import { UpgradePgBossV10 } from '@storage/events'
4+
import { MoveJobs, UpgradePgBossV10 } from '@storage/events'
5+
import { FromSchema } from 'json-schema-to-ts'
56

67
const { pgQueueEnable } = getConfig()
78

9+
const moveJobsSchema = {
10+
body: {
11+
type: 'object',
12+
properties: {
13+
fromQueue: {
14+
type: 'string',
15+
},
16+
toQueue: {
17+
type: 'string',
18+
},
19+
deleteJobsFromOriginalQueue: {
20+
type: 'boolean',
21+
default: false,
22+
},
23+
},
24+
required: ['fromQueue', 'toQueue'],
25+
},
26+
} as const
27+
28+
interface MoveJobsRequestInterface extends RequestGenericInterface {
29+
Body: FromSchema<typeof moveJobsSchema.body>
30+
}
31+
832
export default async function routes(fastify: FastifyInstance) {
933
fastify.register(apiKey)
1034

@@ -17,4 +41,22 @@ export default async function routes(fastify: FastifyInstance) {
1741

1842
return reply.send({ message: 'Migration scheduled' })
1943
})
44+
45+
fastify.post<MoveJobsRequestInterface>('/move', async (req, reply) => {
46+
if (!pgQueueEnable) {
47+
return reply.status(400).send({ message: 'Queue is not enabled' })
48+
}
49+
50+
const fromQueue = req.body.fromQueue
51+
const toQueue = req.body.toQueue
52+
const deleteJobsFromOriginalQueue = req.body.deleteJobsFromOriginalQueue || false
53+
54+
await MoveJobs.send({
55+
fromQueue,
56+
toQueue,
57+
deleteJobsFromOriginalQueue,
58+
})
59+
60+
return reply.send({ message: 'Move jobs scheduled' })
61+
})
2062
}

src/internal/queue/queue.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ export abstract class Queue {
6565

6666
Queue.pgBoss = new PgBoss({
6767
connectionString: url,
68-
68+
migrate: true,
6969
db: Queue.pgBossDb,
7070
schema: 'pgboss_v10',
7171
application_name: 'storage-pgboss',

src/storage/events/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,5 @@ export * from './migrations/run-migrations'
1212
export * from './migrations/reset-migrations'
1313
export * from './jwks/jwks-create-signing-secret'
1414
export * from './pgboss/upgrade-v10'
15+
export * from './pgboss/move-jobs'
1516
export * from './workers'

src/storage/events/jwks/jwks-create-signing-secret.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,12 @@ interface JwksCreateSigningSecretPayload extends BasePayload {
99
}
1010

1111
export class JwksCreateSigningSecret extends BaseEvent<JwksCreateSigningSecretPayload> {
12-
static queueName = 'tenants-jwks-create'
12+
static queueName = 'tenants-jwks-create-v2'
1313

1414
static getQueueOptions(): Queue {
1515
return {
1616
name: this.queueName,
17-
policy: 'stately',
17+
policy: 'exactly_once',
1818
} as const
1919
}
2020

src/storage/events/migrations/reset-migrations.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@ interface ResetMigrationsPayload extends BasePayload {
1313
}
1414

1515
export class ResetMigrationsOnTenant extends BaseEvent<ResetMigrationsPayload> {
16-
static queueName = 'tenants-migrations-reset'
16+
static queueName = 'tenants-migrations-reset-v2'
1717

1818
static getQueueOptions(): Queue {
1919
return {
2020
name: this.queueName,
21-
policy: 'stately',
21+
policy: 'exactly_once',
2222
} as const
2323
}
2424

src/storage/events/migrations/run-migrations.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@ interface RunMigrationsPayload extends BasePayload {
1717
}
1818

1919
export class RunMigrationsOnTenants extends BaseEvent<RunMigrationsPayload> {
20-
static queueName = 'tenants-migrations'
20+
static queueName = 'tenants-migrations-v2'
2121
static allowSync = false
2222

2323
static getQueueOptions(): Queue {
2424
return {
2525
name: this.queueName,
26-
policy: 'stately',
26+
policy: 'exactly_once',
2727
} as const
2828
}
2929

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
import { BaseEvent } from '../base-event'
2+
import { Job, Queue as PgBossQueue, SendOptions, WorkOptions } from 'pg-boss'
3+
import { BasePayload, Queue } from '@internal/queue'
4+
import { multitenantKnex } from '@internal/database'
5+
import { logger, logSchema } from '@internal/monitoring'
6+
7+
interface MoveJobsPayload extends BasePayload {
8+
fromQueue: string
9+
toQueue: string
10+
deleteJobsFromOriginalQueue?: boolean
11+
}
12+
13+
export class MoveJobs extends BaseEvent<MoveJobsPayload> {
14+
static queueName = 'move-jobs'
15+
16+
static getQueueOptions(): PgBossQueue {
17+
return {
18+
name: this.queueName,
19+
policy: 'exactly_once',
20+
} as const
21+
}
22+
23+
static getWorkerOptions(): WorkOptions {
24+
return {
25+
includeMetadata: true,
26+
}
27+
}
28+
29+
static getSendOptions(payload: MoveJobsPayload): SendOptions {
30+
return {
31+
expireInHours: 2,
32+
singletonKey: `move_${payload.fromQueue}_to_${payload.toQueue}`,
33+
singletonHours: 12,
34+
retryLimit: 3,
35+
retryDelay: 5,
36+
priority: 10,
37+
}
38+
}
39+
40+
static async handle(job: Job<MoveJobsPayload>) {
41+
await multitenantKnex.transaction(async (tnx) => {
42+
const resultLock = await tnx.raw('SELECT pg_try_advisory_xact_lock(-5525285245963000611)')
43+
const lockAcquired = resultLock.rows.shift()?.pg_try_advisory_xact_lock || false
44+
45+
if (!lockAcquired) {
46+
return
47+
}
48+
49+
const schema = 'pgboss_v10'
50+
const fromQueueName = job.data.fromQueue
51+
const toQueue = await Queue.getInstance().getQueue(job.data.toQueue)
52+
53+
if (!toQueue) {
54+
logSchema.error(logger, `[PgBoss] Target queue ${job.data.toQueue} does not exist`, {
55+
type: 'pgboss',
56+
})
57+
return
58+
}
59+
60+
try {
61+
const sql = `
62+
INSERT INTO ${schema}.job (
63+
id,
64+
name,
65+
priority,
66+
data,
67+
retry_limit,
68+
retry_count,
69+
retry_delay,
70+
retry_backoff,
71+
start_after,
72+
singleton_key,
73+
singleton_on,
74+
expire_in,
75+
created_on,
76+
keep_until,
77+
output,
78+
policy,
79+
state
80+
)
81+
SELECT
82+
id,
83+
'${toQueue.name}' as name,
84+
priority,
85+
data,
86+
retry_limit,
87+
retry_count,
88+
retry_delay,
89+
retry_backoff,
90+
start_after,
91+
singleton_key,
92+
singleton_on,
93+
expire_in,
94+
created_on,
95+
keep_until,
96+
output,
97+
'${toQueue.policy}' as policy,
98+
'created' as state
99+
FROM ${schema}.job
100+
WHERE name = '${fromQueueName}'
101+
AND state IN ('created', 'active', 'retry')
102+
ON CONFLICT DO NOTHING
103+
`
104+
105+
await tnx.raw(sql)
106+
107+
if (job.data.deleteJobsFromOriginalQueue) {
108+
const deleteSql = `
109+
DELETE FROM ${schema}.job
110+
WHERE name = '${fromQueueName}'
111+
AND state IN ('created', 'active', 'retry')
112+
`
113+
await tnx.raw(deleteSql)
114+
}
115+
} catch (error) {
116+
logSchema.error(logger, '[PgBoss] Error while copying jobs', {
117+
type: 'pgboss',
118+
error,
119+
})
120+
}
121+
})
122+
}
123+
}

src/storage/events/pgboss/upgrade-v10.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ export class UpgradePgBossV10 extends BaseEvent<UpgradePgBossV10Payload> {
1212
static getQueueOptions(): PgBossQueue {
1313
return {
1414
name: this.queueName,
15-
policy: 'stately',
15+
policy: 'exactly_once',
1616
} as const
1717
}
1818

0 commit comments

Comments
 (0)