Skip to content

Commit 74492da

Browse files
authored
fix: more robust shutdown process (#511)
1 parent 187251c commit 74492da

File tree

13 files changed

+453
-139
lines changed

13 files changed

+453
-139
lines changed
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/**
2+
* This special AbortController is used to wait for all the abort handlers to finish before resolving the promise.
3+
*/
4+
export class AsyncAbortController extends AbortController {
5+
protected promises: Promise<any>[] = []
6+
protected priority = 0
7+
protected groups = new Map<number, AsyncAbortController[]>()
8+
9+
constructor() {
10+
super()
11+
12+
const originalEventListener = this.signal.addEventListener
13+
14+
// Patch event addEventListener to keep track of listeners and their promises
15+
this.signal.addEventListener = (type: string, listener: any, options: any) => {
16+
if (type !== 'abort') {
17+
return originalEventListener.call(this.signal, type, listener, options)
18+
}
19+
20+
let resolving: undefined | (() => Promise<void>) = undefined
21+
const promise = new Promise<void>(async (resolve, reject) => {
22+
resolving = async (): Promise<void> => {
23+
try {
24+
const result = await listener()
25+
resolve(result)
26+
} catch (e) {
27+
reject(e)
28+
}
29+
}
30+
})
31+
this.promises.push(promise)
32+
33+
if (!resolving) {
34+
throw new Error('resolve is undefined')
35+
}
36+
37+
return originalEventListener.call(this.signal, type, resolving, options)
38+
}
39+
}
40+
41+
protected _nextGroup?: AsyncAbortController
42+
43+
get nextGroup() {
44+
if (!this._nextGroup) {
45+
this._nextGroup = new AsyncAbortController()
46+
this._nextGroup.priority = this.priority + 1
47+
}
48+
49+
let existingGroups = this.groups.get(this._nextGroup.priority)
50+
if (!existingGroups) {
51+
existingGroups = []
52+
}
53+
54+
existingGroups.push(this._nextGroup)
55+
this.groups.set(this._nextGroup.priority, existingGroups)
56+
return this._nextGroup
57+
}
58+
59+
async abortAsync() {
60+
this.abort()
61+
while (this.promises.length > 0) {
62+
const promises = this.promises.splice(0, 100)
63+
await Promise.allSettled(promises)
64+
}
65+
await this.abortGroups()
66+
}
67+
68+
protected async abortGroups() {
69+
for (const [, group] of this.groups) {
70+
await Promise.allSettled(group.map((g) => g.abortAsync()))
71+
}
72+
}
73+
}

src/internal/concurrency/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
export * from './mutex'
2+
export * from './async-abort-controller'

src/internal/database/migrations/migrate.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { listTenantsToMigrate } from '../tenant'
1111
import { multitenantKnex } from '../multitenant-db'
1212
import { ProgressiveMigrations } from './progressive'
1313
import { RunMigrationsOnTenants } from '@storage/events'
14+
import { ERRORS } from '@internal/errors'
1415

1516
const {
1617
multitenantDatabaseUrl,
@@ -49,6 +50,9 @@ export const progressiveMigrations = new ProgressiveMigrations({
4950
* @param signal
5051
*/
5152
export function startAsyncMigrations(signal: AbortSignal) {
53+
if (signal.aborted) {
54+
throw ERRORS.Aborted('Migration aborted')
55+
}
5256
switch (dbMigrationStrategy) {
5357
case MultitenantMigrationStrategy.ON_REQUEST:
5458
return
@@ -127,14 +131,18 @@ export async function runMigrationsOnAllTenants(signal: AbortSignal) {
127131
* Runs multi-tenant migrations
128132
*/
129133
export async function runMultitenantMigrations(): Promise<void> {
130-
logger.info('Running multitenant migrations')
134+
logSchema.info(logger, '[Migrations] Running multitenant migrations', {
135+
type: 'migrations',
136+
})
131137
await connectAndMigrate({
132138
databaseUrl: multitenantDatabaseUrl,
133139
migrationsDirectory: './migrations/multitenant',
134140
shouldCreateStorageSchema: false,
135141
waitForLock: true,
136142
})
137-
logger.info('Multitenant migrations completed')
143+
logSchema.info(logger, '[Migrations] Completed', {
144+
type: 'migrations',
145+
})
138146
}
139147

140148
/**

src/internal/database/migrations/progressive.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ export class ProgressiveMigrations {
1919
signal.addEventListener('abort', () => {
2020
if (this.watchInterval) {
2121
clearInterval(this.watchInterval)
22+
logSchema.info(logger, '[Migrations] Stopping', {
23+
type: 'migrations',
24+
})
2225
this.drain().catch((e) => {
2326
logSchema.error(logger, '[Migrations] Error creating migration jobs', {
2427
type: 'migrations',

src/internal/errors/codes.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ export enum ErrorCode {
3636
MissingPart = 'MissingPart',
3737
SlowDown = 'SlowDown',
3838
TusError = 'TusError',
39+
Aborted = 'Aborted',
3940
}
4041

4142
export const ERRORS = {
@@ -363,6 +364,14 @@ export const ERRORS = {
363364
httpStatusCode: 400,
364365
message: `Part ${partNumber} is missing for upload id ${uploadId}`,
365366
}),
367+
368+
Aborted: (message: string, originalError?: unknown) =>
369+
new StorageBackendError({
370+
code: ErrorCode.Aborted,
371+
httpStatusCode: 500,
372+
message: message,
373+
originalError,
374+
}),
366375
}
367376

368377
export function isStorageError(errorType: ErrorCode, error: any): error is StorageBackendError {

src/internal/monitoring/otel.ts

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import { CompressionAlgorithm } from '@opentelemetry/otlp-exporter-base'
2121
import * as grpc from '@grpc/grpc-js'
2222
import { HttpInstrumentation } from '@opentelemetry/instrumentation-http'
2323
import { IncomingMessage } from 'http'
24+
import { logger, logSchema } from '@internal/monitoring/logger'
2425

2526
const headersEnv = process.env.OTEL_EXPORTER_OTLP_TRACES_HEADERS || ''
2627

@@ -125,9 +126,21 @@ if (process.env.OTEL_EXPORTER_OTLP_TRACES_ENDPOINT) {
125126

126127
// Gracefully shutdown the SDK on process exit
127128
process.on('SIGTERM', () => {
129+
logSchema.info(logger, '[Otel] Stopping', {
130+
type: 'otel',
131+
})
128132
sdk
129133
.shutdown()
130-
.then(() => console.log('Tracing terminated'))
131-
.catch((error) => console.error('Error terminating tracing', error))
134+
.then(() => {
135+
logSchema.info(logger, '[Otel] Exited', {
136+
type: 'otel',
137+
})
138+
})
139+
.catch((error) =>
140+
logSchema.error(logger, '[Otel] Shutdown error', {
141+
type: 'otel',
142+
error: error,
143+
})
144+
)
132145
})
133146
}

src/internal/pubsub/adapter.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
export interface PubSubAdapter {
2-
connect(): Promise<void>
2+
start(): Promise<void>
33
publish(channel: string, message: any): Promise<void>
44
subscribe(channel: string, cb: (message: any) => void): Promise<void>
55
unsubscribe(channel: string, cb: (message: any) => void): Promise<void>

src/internal/pubsub/postgres.ts

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1+
import EventEmitter from 'events'
12
import createSubscriber, { Subscriber } from 'pg-listen'
3+
import { ERRORS } from '@internal/errors'
4+
import { logger, logSchema } from '@internal/monitoring'
25
import { PubSubAdapter } from './adapter'
3-
import EventEmitter from 'events'
46

57
export class PostgresPubSub extends EventEmitter implements PubSubAdapter {
68
isConnected = false
@@ -22,22 +24,42 @@ export class PostgresPubSub extends EventEmitter implements PubSubAdapter {
2224
})
2325
}
2426

25-
async connect(): Promise<void> {
27+
async start(opts?: { signal?: AbortSignal }): Promise<void> {
28+
if (opts?.signal?.aborted) {
29+
throw ERRORS.Aborted('Postgres pubsub connection aborted')
30+
}
31+
2632
await this.subscriber.connect()
2733
this.isConnected = true
2834

35+
if (opts?.signal) {
36+
opts.signal.addEventListener(
37+
'abort',
38+
async () => {
39+
logSchema.info(logger, '[PubSub] Stopping', {
40+
type: 'pubsub',
41+
})
42+
await this.close()
43+
},
44+
{ once: true }
45+
)
46+
}
47+
2948
await Promise.all(
3049
this.subscriber.notifications.eventNames().map(async (channel) => {
3150
return this.subscriber.listenTo(channel as string)
3251
})
3352
)
3453
}
3554

36-
close(): Promise<void> {
55+
async close(): Promise<void> {
3756
this.subscriber.notifications.eventNames().forEach((event) => {
3857
this.subscriber.notifications.removeAllListeners(event)
3958
})
40-
return this.subscriber.close()
59+
await this.subscriber.close()
60+
logSchema.info(logger, '[PubSub] Exited', {
61+
type: 'pubsub',
62+
})
4163
}
4264

4365
async publish(channel: string, payload: unknown): Promise<void> {

src/internal/queue/queue.ts

Lines changed: 54 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { getConfig } from '../../config'
33
import { BaseEvent, BasePayload } from '../../storage/events'
44
import { QueueJobRetryFailed, QueueJobCompleted, QueueJobError } from '../monitoring/metrics'
55
import { logger, logSchema } from '../monitoring'
6+
import { ERRORS } from '@internal/errors'
67

78
//eslint-disable-next-line @typescript-eslint/no-explicit-any
89
type SubclassOfBaseClass = (new (payload: any) => BaseEvent<any>) & {
@@ -13,11 +14,19 @@ export abstract class Queue {
1314
protected static events: SubclassOfBaseClass[] = []
1415
private static pgBoss?: PgBoss
1516

16-
static async init() {
17+
static async start(opts: {
18+
signal?: AbortSignal
19+
onMessage?: (job: Job) => void
20+
registerWorkers?: () => void
21+
}) {
1722
if (Queue.pgBoss) {
1823
return Queue.pgBoss
1924
}
2025

26+
if (opts.signal?.aborted) {
27+
throw ERRORS.Aborted('Cannot start queue with aborted signal')
28+
}
29+
2130
const {
2231
isMultitenant,
2332
databaseURL,
@@ -26,6 +35,7 @@ export abstract class Queue {
2635
pgQueueDeleteAfterDays,
2736
pgQueueArchiveCompletedAfterSeconds,
2837
pgQueueRetentionDays,
38+
pgQueueEnableWorkers,
2939
} = getConfig()
3040

3141
let url = pgQueueConnectionURL ?? databaseURL
@@ -59,7 +69,36 @@ export abstract class Queue {
5969
})
6070

6171
await Queue.pgBoss.start()
62-
await Queue.startWorkers()
72+
73+
if (opts.registerWorkers && pgQueueEnableWorkers) {
74+
opts.registerWorkers()
75+
}
76+
77+
await Queue.startWorkers(opts.onMessage)
78+
79+
if (opts.signal) {
80+
opts.signal.addEventListener(
81+
'abort',
82+
async () => {
83+
logSchema.info(logger, '[Queue] Stopping', {
84+
type: 'queue',
85+
})
86+
return Queue.stop()
87+
.then(() => {
88+
logSchema.info(logger, '[Queue] Exited', {
89+
type: 'queue',
90+
})
91+
})
92+
.catch((e) => {
93+
logSchema.error(logger, '[Queue] Error while stopping queue', {
94+
error: e,
95+
type: 'queue',
96+
})
97+
})
98+
},
99+
{ once: true }
100+
)
101+
}
63102

64103
return Queue.pgBoss
65104
}
@@ -85,25 +124,29 @@ export abstract class Queue {
85124

86125
await boss.stop({
87126
timeout: 20 * 1000,
127+
graceful: true,
128+
destroy: true,
88129
})
89130

90131
await new Promise((resolve) => {
91-
boss.once('stopped', () => resolve(null))
132+
boss.once('stopped', () => {
133+
resolve(null)
134+
})
92135
})
93136

94137
Queue.pgBoss = undefined
95138
}
96139

97-
protected static startWorkers() {
140+
protected static startWorkers(onMessage?: (job: Job) => void) {
98141
const workers: Promise<string>[] = []
99142

100143
Queue.events.forEach((event) => {
101-
workers.push(Queue.registerTask(event.getQueueName(), event, true))
144+
workers.push(Queue.registerTask(event.getQueueName(), event, true, onMessage))
102145

103146
const slowRetryQueue = event.withSlowRetryQueue()
104147

105148
if (slowRetryQueue) {
106-
workers.push(Queue.registerTask(event.getSlowRetryQueueName(), event, false))
149+
workers.push(Queue.registerTask(event.getSlowRetryQueueName(), event, false, onMessage))
107150
}
108151
})
109152

@@ -113,14 +156,18 @@ export abstract class Queue {
113156
protected static registerTask(
114157
queueName: string,
115158
event: SubclassOfBaseClass,
116-
slowRetryQueueOnFail?: boolean
159+
slowRetryQueueOnFail?: boolean,
160+
onMessage?: (job: Job) => void
117161
) {
118162
const hasSlowRetryQueue = event.withSlowRetryQueue()
119163
return Queue.getInstance().work(
120164
queueName,
121165
event.getWorkerOptions(),
122166
async (job: Job<BasePayload>) => {
123167
try {
168+
if (onMessage) {
169+
onMessage(job)
170+
}
124171
const res = await event.handle(job)
125172

126173
QueueJobCompleted.inc({

0 commit comments

Comments
 (0)