Skip to content

Commit 71b3ab4

Browse files
committed
fix: harden Pub/Sub shutdown, loop scheduling, and abort lifecycle
Signal webhooks workers to stop Pub/Sub pull loops on SIGTERM/SIGINT before closing BullMQ queues, preventing 90s hangs from outstanding long-poll requests. Use setImmediate in startLoop success path to prevent tight microtask loops when run() resolves instantly. Close _abortController null window by checking stopped flag first. Snapshot Map keys in stopAll() for safe iteration. Extract duplicated backoff formula into _recoveryBackoffMs(). Fix incorrect "Topic" error message in subscription creation path.
1 parent 45f7b64 commit 71b3ab4

File tree

4 files changed

+119
-9
lines changed

4 files changed

+119
-9
lines changed

lib/oauth/pubsub/google.js

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ class PubSubInstance {
1717

1818
this.stopped = false;
1919
this._loopTimer = null;
20+
this._immediateHandle = null;
2021
this._abortController = null;
2122
this.recoveryAttempts = 0;
2223
this.lastRecoveryAttempt = 0;
@@ -43,7 +44,7 @@ class PubSubInstance {
4344
this.run()
4445
.then(() => {
4546
this._lastLoopError = null;
46-
this.startLoop();
47+
this._immediateHandle = setImmediate(() => this.startLoop());
4748
})
4849
.catch(err => {
4950
if (this.stopped || err.name === 'AbortError') {
@@ -170,11 +171,11 @@ class PubSubInstance {
170171
{ returnImmediately: false, maxMessages: 100 },
171172
{ signal: this._abortController.signal }
172173
);
173-
this._abortController = null;
174174
if (this.stopped) {
175-
// ignore if stopped
175+
this._abortController = null;
176176
return;
177177
}
178+
this._abortController = null;
178179

179180
let reqTime = Date.now() - start;
180181

@@ -274,7 +275,7 @@ class PubSubInstance {
274275

275276
async attemptRecovery(reason) {
276277
let now = Date.now();
277-
let backoffMs = Math.min(3000 * Math.pow(2, Math.min(this.recoveryAttempts, 20)), 5 * 60 * 1000);
278+
let backoffMs = this._recoveryBackoffMs();
278279
if (now - this.lastRecoveryAttempt < backoffMs) {
279280
let remainingMs = backoffMs - (now - this.lastRecoveryAttempt);
280281
let err = new Error(reason);
@@ -296,7 +297,7 @@ class PubSubInstance {
296297
this.recoveryAttempts = 0;
297298
logger.info({ msg: 'Successfully recovered Pub/Sub subscription', app: this.app, reason });
298299
} catch (recoveryErr) {
299-
let nextBackoffMs = Math.min(3000 * Math.pow(2, Math.min(this.recoveryAttempts, 20)), 5 * 60 * 1000);
300+
let nextBackoffMs = this._recoveryBackoffMs();
300301
logger.warn({ msg: 'Subscription recovery failed', app: this.app, reason, err: recoveryErr, nextRetryMs: nextBackoffMs });
301302
recoveryErr.retryDelay = nextBackoffMs;
302303
throw recoveryErr;
@@ -324,6 +325,10 @@ class PubSubInstance {
324325
await this.getClient();
325326
return await oauth2Apps.getServiceAccessToken(this.appData, this.client);
326327
}
328+
329+
_recoveryBackoffMs() {
330+
return Math.min(3000 * Math.pow(2, Math.min(this.recoveryAttempts, 20)), 5 * 60 * 1000);
331+
}
327332
}
328333

329334
class GooglePubSub {
@@ -356,6 +361,7 @@ class GooglePubSub {
356361
const instance = this.pubSubInstances.get(app);
357362
instance.stopped = true;
358363
clearTimeout(instance._loopTimer);
364+
clearImmediate(instance._immediateHandle);
359365
if (instance._abortController) {
360366
instance._abortController.abort();
361367
}
@@ -364,7 +370,7 @@ class GooglePubSub {
364370
}
365371

366372
stopAll() {
367-
for (let app of this.pubSubInstances.keys()) {
373+
for (let app of Array.from(this.pubSubInstances.keys())) {
368374
this.remove(app);
369375
}
370376
}

lib/oauth2-apps.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1165,7 +1165,7 @@ class OAuth2AppsHandler {
11651165
*/
11661166

11671167
if (!subscriptionCreateRes?.name) {
1168-
throw new Error('Topic was not created');
1168+
throw new Error('Subscription was not created');
11691169
}
11701170

11711171
await this.update(

server.js

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2853,6 +2853,18 @@ async function collectMetrics() {
28532853
*/
28542854
const closeQueues = cb => {
28552855
let proms = [];
2856+
2857+
// Signal webhooks workers to stop Pub/Sub pull loops before exiting
2858+
if (workers.has('webhooks')) {
2859+
for (let worker of workers.get('webhooks')) {
2860+
proms.push(
2861+
call(worker, { cmd: 'close' }).catch(err => {
2862+
logger.error({ msg: 'Failed to signal webhooks worker to close', err });
2863+
})
2864+
);
2865+
}
2866+
}
2867+
28562868
if (queueEvents.notify) {
28572869
proms.push(queueEvents.notify.close());
28582870
}
@@ -2882,7 +2894,7 @@ const closeQueues = cb => {
28822894
}
28832895
returned = true;
28842896
cb();
2885-
}, 2500);
2897+
}, 5000);
28862898

28872899
Promise.allSettled(proms).then(() => {
28882900
clearTimeout(closeTimeout);

test/pubsub-recovery-test.js

Lines changed: 93 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ require.cache[getSecretPath] = {
195195
};
196196

197197
// Now safe to import production modules
198-
const { PubSubInstance } = require('../lib/oauth/pubsub/google');
198+
const { PubSubInstance, GooglePubSub } = require('../lib/oauth/pubsub/google');
199199
const { oauth2Apps } = require('../lib/oauth2-apps');
200200
const msgpack = require('msgpack5')();
201201
const { REDIS_PREFIX } = require('../lib/consts');
@@ -729,6 +729,98 @@ test('Pub/Sub subscription recovery tests', async t => {
729729
});
730730
});
731731

732+
// --- stopAll() and instance lifecycle tests ---
733+
734+
await t.test('stopAll() and instance lifecycle tests', async t2 => {
735+
await t2.test('stopAll() clears all instances safely', async () => {
736+
let pubsub = new GooglePubSub({ call: async () => {} });
737+
// Manually add instances without triggering constructor side effects
738+
for (let id of ['app-a', 'app-b', 'app-c']) {
739+
let instance = Object.create(PubSubInstance.prototype);
740+
Object.assign(instance, {
741+
app: id,
742+
stopped: false,
743+
_loopTimer: null,
744+
_immediateHandle: null,
745+
_abortController: null
746+
});
747+
pubsub.pubSubInstances.set(id, instance);
748+
}
749+
750+
assert.strictEqual(pubsub.pubSubInstances.size, 3, 'should have 3 instances');
751+
pubsub.stopAll();
752+
assert.strictEqual(pubsub.pubSubInstances.size, 0, 'all instances should be removed');
753+
});
754+
755+
await t2.test('stopAll() sets stopped flag on all instances', async () => {
756+
let pubsub = new GooglePubSub({ call: async () => {} });
757+
let instances = [];
758+
for (let id of ['app-x', 'app-y']) {
759+
let instance = Object.create(PubSubInstance.prototype);
760+
Object.assign(instance, {
761+
app: id,
762+
stopped: false,
763+
_loopTimer: null,
764+
_immediateHandle: null,
765+
_abortController: null
766+
});
767+
pubsub.pubSubInstances.set(id, instance);
768+
instances.push(instance);
769+
}
770+
771+
pubsub.stopAll();
772+
for (let inst of instances) {
773+
assert.strictEqual(inst.stopped, true, 'instance should be stopped');
774+
}
775+
});
776+
777+
await t2.test('remove() clears immediate handle', async () => {
778+
let pubsub = new GooglePubSub({ call: async () => {} });
779+
let instance = Object.create(PubSubInstance.prototype);
780+
Object.assign(instance, {
781+
app: 'imm-test',
782+
stopped: false,
783+
_loopTimer: null,
784+
_immediateHandle: setImmediate(() => {}),
785+
_abortController: null
786+
});
787+
pubsub.pubSubInstances.set('imm-test', instance);
788+
789+
pubsub.remove('imm-test');
790+
assert.strictEqual(instance.stopped, true, 'instance should be stopped');
791+
assert.strictEqual(pubsub.pubSubInstances.size, 0, 'instance should be removed from map');
792+
});
793+
});
794+
795+
// --- _recoveryBackoffMs() tests ---
796+
797+
await t.test('_recoveryBackoffMs() returns expected values', async t2 => {
798+
await t2.test('backoff at 0 attempts is 3000ms', () => {
799+
let instance = createTestInstance({ recoveryAttempts: 0 });
800+
assert.strictEqual(instance._recoveryBackoffMs(), 3000);
801+
});
802+
803+
await t2.test('backoff at 1 attempt is 6000ms', () => {
804+
let instance = createTestInstance({ recoveryAttempts: 1 });
805+
assert.strictEqual(instance._recoveryBackoffMs(), 6000);
806+
});
807+
808+
await t2.test('backoff at 5 attempts is 96000ms', () => {
809+
let instance = createTestInstance({ recoveryAttempts: 5 });
810+
assert.strictEqual(instance._recoveryBackoffMs(), 96000);
811+
});
812+
813+
await t2.test('backoff caps at 300000ms (5 minutes)', () => {
814+
let instance = createTestInstance({ recoveryAttempts: 20 });
815+
assert.strictEqual(instance._recoveryBackoffMs(), 300000);
816+
});
817+
818+
await t2.test('backoff stays capped beyond 20 attempts', () => {
819+
let instance = createTestInstance({ recoveryAttempts: 25 });
820+
assert.strictEqual(instance._recoveryBackoffMs(), 300000);
821+
});
822+
});
823+
732824
// --- del() subscriber cleanup tests ---
733825

734826
await t.test('del() subscriber cleanup tests', async t2 => {

0 commit comments

Comments
 (0)