Skip to content

Commit a23c964

Browse files
authored
feat: pgboss v10 (#696)
1 parent d82ebec commit a23c964

32 files changed

+574
-448
lines changed

package-lock.json

Lines changed: 74 additions & 165 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@
7575
"multistream": "^4.1.0",
7676
"object-sizeof": "^2.6.4",
7777
"pg": "^8.12.0",
78-
"pg-boss": "^9.0.3",
78+
"pg-boss": "^10.2.0",
7979
"pg-listen": "^1.7.0",
8080
"pino": "^8.15.4",
8181
"pino-logflare": "^0.4.2",

src/admin-app.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ const build = (opts: FastifyServerOptions = {}, appInstance?: FastifyInstance):
1919
app.register(routes.jwks, { prefix: 'tenants' })
2020
app.register(routes.migrations, { prefix: 'migrations' })
2121
app.register(routes.s3Credentials, { prefix: 's3' })
22+
app.register(routes.queue, { prefix: 'queue' })
2223

2324
let registriesToMerge: Registry[] = []
2425

src/config.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ type StorageConfigType = {
115115
pgQueueDeleteAfterDays?: number
116116
pgQueueArchiveCompletedAfterSeconds?: number
117117
pgQueueRetentionDays?: number
118+
pgQueueConcurrentTasksPerQueue: number
118119
webhookURL?: string
119120
webhookApiKey?: string
120121
webhookQueuePullInterval?: number
@@ -410,6 +411,10 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType {
410411
10
411412
),
412413
pgQueueRetentionDays: parseInt(getOptionalConfigFromEnv('PG_QUEUE_RETENTION_DAYS') || '2', 10),
414+
pgQueueConcurrentTasksPerQueue: parseInt(
415+
getOptionalConfigFromEnv('PG_QUEUE_CONCURRENT_TASKS_PER_QUEUE') || '50',
416+
10
417+
),
413418

414419
// Webhooks
415420
webhookURL: getOptionalConfigFromEnv('WEBHOOK_URL'),

src/http/routes/admin/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ export { default as tenants } from './tenants'
33
export { default as s3Credentials } from './s3'
44
export { default as objects } from './objects'
55
export { default as jwks } from './jwks'
6+
export { default as queue } from './queue'

src/http/routes/admin/queue.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import { FastifyInstance } from 'fastify'
2+
import apiKey from '../../plugins/apikey'
3+
import { getConfig } from '../../../config'
4+
import { UpgradePgBossV10 } from '@storage/events'
5+
6+
const { pgQueueEnable } = getConfig()
7+
8+
export default async function routes(fastify: FastifyInstance) {
9+
fastify.register(apiKey)
10+
11+
fastify.post('/migrate/pgboss-v10', async (req, reply) => {
12+
if (!pgQueueEnable) {
13+
return reply.status(400).send({ message: 'Queue is not enabled' })
14+
}
15+
16+
await UpgradePgBossV10.send({})
17+
18+
return reply.send({ message: 'Migration scheduled' })
19+
})
20+
}

src/internal/cluster/cluster.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@ export class Cluster {
2727
cluster
2828
.getClusterSize()
2929
.then((size) => {
30-
if (size !== Cluster.size) {
30+
if (size && size !== Cluster.size) {
3131
clusterEvent.emit('change', { size })
32+
Cluster.size = size
3233
}
33-
Cluster.size = size
3434
})
3535
.catch((e) => {
3636
console.error('Error getting cluster size', e)

src/internal/cluster/ecs.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ export class ClusterDiscoveryECS {
2626
const respMetadata = await axios.get(`${process.env.ECS_CONTAINER_METADATA_URI}/task`)
2727

2828
const command = new ListTasksCommand({
29-
serviceName: respMetadata.data.ServiceName,
29+
family: respMetadata.data.Family,
3030
cluster: respMetadata.data.Cluster,
3131
desiredStatus: status,
3232
})
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import { DBMigration } from '@internal/database/migrations/types'
2+
import { ERRORS } from '@internal/errors'
3+
import { loadMigrationFiles } from 'postgres-migrations'
4+
import { getConfig } from '../../../config'
5+
6+
const { dbMigrationFreezeAt } = getConfig()
7+
8+
export const loadMigrationFilesCached = memoizePromise(loadMigrationFiles)
9+
10+
export const localMigrationFiles = () => loadMigrationFiles('./migrations/tenant')
11+
12+
export async function lastLocalMigrationName() {
13+
const migrations = await loadMigrationFilesCached('./migrations/tenant')
14+
15+
if (!dbMigrationFreezeAt) {
16+
return migrations[migrations.length - 1].name as keyof typeof DBMigration
17+
}
18+
19+
const migrationIndex = migrations.findIndex((m) => m.name === dbMigrationFreezeAt)
20+
if (migrationIndex === -1) {
21+
throw ERRORS.InternalError(undefined, `Migration ${dbMigrationFreezeAt} not found`)
22+
}
23+
return migrations[migrationIndex].name as keyof typeof DBMigration
24+
}
25+
26+
/**
27+
* Memoizes a promise
28+
* @param func
29+
*/
30+
function memoizePromise<T, Args extends unknown[]>(
31+
func: (...args: Args) => Promise<T>
32+
): (...args: Args) => Promise<T> {
33+
const cache = new Map<string, Promise<T>>()
34+
35+
function generateKey(args: Args): string {
36+
return args
37+
.map((arg) => {
38+
if (typeof arg === 'object' && arg !== null) {
39+
return Object.entries(arg).sort().toString()
40+
}
41+
return String(arg)
42+
})
43+
.join('|')
44+
}
45+
46+
return async function (...args: Args): Promise<T> {
47+
const key = generateKey(args)
48+
if (cache.has(key)) {
49+
return cache.get(key)!
50+
}
51+
52+
const result = func(...args)
53+
cache.set(key, result)
54+
return result
55+
}
56+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
export * from './migrate'
2+
export * from './files'
23
export * from './types'

0 commit comments

Comments
 (0)