Skip to content

Commit 0a885cc

Browse files
committed
refactor: consolidate Pub/Sub code and reduce duplication
Move transient network error codes to shared TRANSIENT_NETWORK_CODES set in lib/consts.js, replacing 4 local copies. Extract setMetaFireAndForget and tryEnsurePubsub helpers in oauth2-apps.js. Extract getPubSubAppsForSelect helper in routes-ui.js. Simplify message processing loop in google.js with try/catch+continue flow. Simplify subscription settings POST with nullish coalescing.
1 parent 71b3ab4 commit 0a885cc

File tree

6 files changed

+105
-206
lines changed

6 files changed

+105
-206
lines changed

lib/consts.js

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,18 @@ module.exports = {
205205
OUTLOOK_RETRY_BASE_DELAY: 30, // seconds - base delay for exponential backoff
206206
OUTLOOK_RETRY_MAX_DELAY: 120, // seconds - maximum delay between retries
207207

208+
// Transient network error codes that indicate a retry-worthy connection failure
209+
TRANSIENT_NETWORK_CODES: new Set([
210+
'ENOTFOUND',
211+
'EAI_AGAIN',
212+
'ETIMEDOUT',
213+
'ECONNRESET',
214+
'ECONNREFUSED',
215+
'UND_ERR_SOCKET',
216+
'UND_ERR_CONNECT_TIMEOUT',
217+
'UND_ERR_HEADERS_TIMEOUT'
218+
]),
219+
208220
generateWebhookTable() {
209221
let entries = [];
210222

lib/email-client/gmail/gmail-api.js

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
'use strict';
22

33
const { metricsMeta } = require('../base-client');
4+
const { TRANSIENT_NETWORK_CODES } = require('../../consts');
45

56
// Gmail API configuration
67
const GMAIL_API_BASE = 'https://gmail.googleapis.com';
@@ -12,18 +13,6 @@ const LIST_BATCH_SIZE = 10;
1213
const MAX_RETRY_ATTEMPTS = 3;
1314
const RETRY_BASE_DELAY = 1000; // 1 second base delay
1415

15-
// Network-level errors that are transient and should be retried
16-
const TRANSIENT_NETWORK_CODES = new Set([
17-
'ENOTFOUND',
18-
'EAI_AGAIN',
19-
'ETIMEDOUT',
20-
'ECONNRESET',
21-
'ECONNREFUSED',
22-
'UND_ERR_SOCKET',
23-
'UND_ERR_CONNECT_TIMEOUT',
24-
'UND_ERR_HEADERS_TIMEOUT'
25-
]);
26-
2716
// Gmail API error code mapping to internal error codes
2817
// https://developers.google.com/gmail/api/reference/rest#error-codes
2918
const GMAIL_ERROR_MAP = {

lib/email-client/outlook/graph-api.js

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,17 @@
22

33
const { metricsMeta } = require('../base-client');
44

5-
const { OUTLOOK_MAX_BATCH_SIZE, OUTLOOK_MAX_RETRY_ATTEMPTS, OUTLOOK_RETRY_BASE_DELAY, OUTLOOK_RETRY_MAX_DELAY } = require('../../consts');
5+
const {
6+
OUTLOOK_MAX_BATCH_SIZE,
7+
OUTLOOK_MAX_RETRY_ATTEMPTS,
8+
OUTLOOK_RETRY_BASE_DELAY,
9+
OUTLOOK_RETRY_MAX_DELAY,
10+
TRANSIENT_NETWORK_CODES
11+
} = require('../../consts');
612

713
// Maximum number of operations in a single batch request to Microsoft Graph API
814
const MAX_BATCH_SIZE = OUTLOOK_MAX_BATCH_SIZE;
915

10-
// Network-level errors that are transient and should be retried
11-
const TRANSIENT_NETWORK_CODES = new Set([
12-
'ENOTFOUND',
13-
'EAI_AGAIN',
14-
'ETIMEDOUT',
15-
'ECONNRESET',
16-
'ECONNREFUSED',
17-
'UND_ERR_SOCKET',
18-
'UND_ERR_CONNECT_TIMEOUT',
19-
'UND_ERR_HEADERS_TIMEOUT'
20-
]);
21-
2216
// MS Graph API error code mapping to internal error codes
2317
// https://learn.microsoft.com/en-us/graph/errors
2418
const GRAPH_ERROR_MAP = {

lib/oauth/pubsub/google.js

Lines changed: 20 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
'use strict';
22

33
const { redis } = require('../../db');
4-
const { REDIS_PREFIX } = require('../../consts');
4+
const { REDIS_PREFIX, TRANSIENT_NETWORK_CODES } = require('../../consts');
55
const logger = require('../../logger').child({
66
component: 'google-subscriber'
77
});
@@ -74,7 +74,7 @@ class PubSubInstance {
7474
async checkSchemaVersions() {
7575
let subscriberApps = await redis.smembers(this.getPubsubAppKey());
7676
let currentSchemaId = 3;
77-
for (let subscriberApp of subscriberApps || []) {
77+
for (let subscriberApp of subscriberApps) {
7878
let schemaVersion = Number(await redis.hget(`${REDIS_PREFIX}oapp:h:${subscriberApp}`, '__schemaVersion')) || 0;
7979
if (schemaVersion < currentSchemaId) {
8080
// migrate
@@ -111,7 +111,7 @@ class PubSubInstance {
111111

112112
let subscriberApps = await redis.smembers(this.getPubsubAppKey());
113113
let accountIds = new Set();
114-
for (let subscriberApp of subscriberApps || []) {
114+
for (let subscriberApp of subscriberApps) {
115115
let accountId = await redis.hget(`${REDIS_PREFIX}oapp:h:${subscriberApp}`, payload.emailAddress?.toLowerCase());
116116
if (accountId) {
117117
accountIds.add(accountId);
@@ -194,50 +194,27 @@ class PubSubInstance {
194194
return;
195195
}
196196

197-
let processingSuccess = false;
197+
let messageId = receivedMessage?.message?.messageId;
198+
198199
try {
199-
await this.processPulledMessage(
200-
receivedMessage?.message?.messageId,
201-
Buffer.from(receivedMessage?.message?.data || '', 'base64').toString()
202-
);
203-
processingSuccess = true;
200+
await this.processPulledMessage(messageId, Buffer.from(receivedMessage?.message?.data || '', 'base64').toString());
204201
} catch (err) {
205-
// Processing failed - don't ACK so message will be redelivered
206-
logger.error({
207-
msg: 'Failed to process subscription message',
208-
app: this.app,
209-
messageId: receivedMessage?.message?.messageId,
210-
err
211-
});
202+
// Processing failed - skip ACK so message will be redelivered
203+
logger.error({ msg: 'Failed to process subscription message', app: this.app, messageId, err });
204+
continue;
212205
}
213206

214-
// Only ACK after successful processing
215-
if (processingSuccess) {
216-
try {
217-
accessToken = await this.getAccessToken();
218-
if (!accessToken) {
219-
logger.error({
220-
msg: 'Failed to ack subscription message. No access token',
221-
app: this.app,
222-
messageId: receivedMessage?.message?.messageId
223-
});
224-
} else {
225-
await this.client.request(accessToken, acknowledgeUrl, 'POST', { ackIds: [receivedMessage?.ackId] }, { returnText: true });
226-
logger.debug({
227-
msg: 'Acked subscription message',
228-
app: this.app,
229-
messageId: receivedMessage?.message?.messageId
230-
});
231-
}
232-
} catch (err) {
233-
// failed to ack
234-
logger.error({
235-
msg: 'Failed to ack subscription message',
236-
app: this.app,
237-
messageId: receivedMessage?.message?.messageId,
238-
err
239-
});
207+
// ACK after successful processing
208+
try {
209+
accessToken = await this.getAccessToken();
210+
if (!accessToken) {
211+
logger.error({ msg: 'Failed to ack subscription message. No access token', app: this.app, messageId });
212+
} else {
213+
await this.client.request(accessToken, acknowledgeUrl, 'POST', { ackIds: [receivedMessage?.ackId] }, { returnText: true });
214+
logger.debug({ msg: 'Acked subscription message', app: this.app, messageId });
240215
}
216+
} catch (err) {
217+
logger.error({ msg: 'Failed to ack subscription message', app: this.app, messageId, err });
241218
}
242219
}
243220

@@ -247,18 +224,7 @@ class PubSubInstance {
247224
}
248225
} catch (err) {
249226
// Transient network errors are expected for long-polling connections
250-
if (
251-
[
252-
'ENOTFOUND',
253-
'EAI_AGAIN',
254-
'ETIMEDOUT',
255-
'ECONNRESET',
256-
'ECONNREFUSED',
257-
'UND_ERR_SOCKET',
258-
'UND_ERR_CONNECT_TIMEOUT',
259-
'UND_ERR_HEADERS_TIMEOUT'
260-
].includes(err.code)
261-
) {
227+
if (TRANSIENT_NETWORK_CODES.has(err.code)) {
262228
logger.warn({ msg: 'Transient error pulling subscription messages', app: this.app, code: err.code });
263229
return;
264230
}

lib/oauth2-apps.js

Lines changed: 39 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,14 @@
33
const { redis } = require('./db');
44
const msgpack = require('msgpack5')();
55
const logger = require('./logger');
6-
const { REDIS_PREFIX } = require('./consts');
6+
const { REDIS_PREFIX, TRANSIENT_NETWORK_CODES } = require('./consts');
77
const { encrypt, decrypt } = require('./encrypt');
88
const Boom = require('@hapi/boom');
99
const settings = require('./settings');
1010
const Lock = require('ioredfour');
1111
const getSecret = require('./get-secret');
1212
const { parentPort } = require('worker_threads');
1313

14-
const TRANSIENT_NETWORK_CODES = new Set([
15-
'ENOTFOUND',
16-
'EAI_AGAIN',
17-
'ETIMEDOUT',
18-
'ECONNRESET',
19-
'ECONNREFUSED',
20-
'UND_ERR_SOCKET',
21-
'UND_ERR_CONNECT_TIMEOUT',
22-
'UND_ERR_HEADERS_TIMEOUT'
23-
]);
24-
2514
/**
2615
* Record metrics for OAuth2 token operations
2716
* Works in both main thread and worker threads
@@ -706,17 +695,7 @@ class OAuth2AppsHandler {
706695
created: true
707696
};
708697

709-
try {
710-
let appData = await this.get(id);
711-
if (appData.baseScopes === 'pubsub') {
712-
let pubsubUpdates = await this.ensurePubsub(appData);
713-
if (Object.keys(pubsubUpdates || {}).length) {
714-
result.pubsubUpdates = pubsubUpdates;
715-
}
716-
}
717-
} catch (err) {
718-
logger.error({ msg: 'Failed to set up pubsub', app: id, err });
719-
}
698+
await this.tryEnsurePubsub(id, result);
720699

721700
return result;
722701
}
@@ -790,17 +769,7 @@ class OAuth2AppsHandler {
790769
updated: true
791770
};
792771

793-
try {
794-
let appData = await this.get(id);
795-
if (appData.baseScopes === 'pubsub') {
796-
let pubsubUpdates = await this.ensurePubsub(appData);
797-
if (Object.keys(pubsubUpdates || {}).length) {
798-
result.pubsubUpdates = pubsubUpdates;
799-
}
800-
}
801-
} catch (err) {
802-
logger.error({ msg: 'Failed to set up pubsub', app: id, err });
803-
}
772+
await this.tryEnsurePubsub(id, result);
804773

805774
return result;
806775
}
@@ -876,6 +845,35 @@ class OAuth2AppsHandler {
876845
};
877846
}
878847

848+
/**
849+
* Fire-and-forget setMeta call that logs errors instead of throwing.
850+
* Used in ensurePubsub error handlers where we want to record a flag
851+
* but must not block or alter the original error flow.
852+
*/
853+
setMetaFireAndForget(appId, meta) {
854+
this.setMeta(appId, meta).catch(metaErr => {
855+
logger.error({ msg: 'Failed to set metadata', app: appId, err: metaErr });
856+
});
857+
}
858+
859+
/**
860+
* Try to set up Pub/Sub for the given app and attach results to the result object.
861+
* Errors are logged but not thrown so they do not prevent create/update from succeeding.
862+
*/
863+
async tryEnsurePubsub(id, result) {
864+
try {
865+
let appData = await this.get(id);
866+
if (appData.baseScopes === 'pubsub') {
867+
let pubsubUpdates = await this.ensurePubsub(appData);
868+
if (Object.keys(pubsubUpdates || {}).length) {
869+
result.pubsubUpdates = pubsubUpdates;
870+
}
871+
}
872+
} catch (err) {
873+
logger.error({ msg: 'Failed to set up pubsub', app: id, err });
874+
}
875+
}
876+
879877
async deleteTopic(appData) {
880878
let topicName = appData.pubSubTopic;
881879
let topicUrl = `https://pubsub.googleapis.com/v1/${topicName}`;
@@ -1008,22 +1006,18 @@ class OAuth2AppsHandler {
10081006
case 403:
10091007
// no permissions
10101008
if (/Cloud Pub\/Sub API has not been used in project/.test(err?.oauthRequest?.response?.error?.message)) {
1011-
this.setMeta(appData.id, {
1009+
this.setMetaFireAndForget(appData.id, {
10121010
authFlag: {
10131011
message:
10141012
'Enable the Cloud Pub/Sub API for your project before using the service client. Check the server response below for details.',
10151013
description: err?.oauthRequest?.response?.error?.message
10161014
}
1017-
}).catch(metaErr => {
1018-
logger.error({ msg: 'Failed to set authFlag metadata', app: appData.id, err: metaErr });
10191015
});
10201016
} else {
1021-
this.setMeta(appData.id, {
1017+
this.setMetaFireAndForget(appData.id, {
10221018
authFlag: {
10231019
message: 'Service client does not have permission to manage Pub/Sub topics. Grant the service user the "Pub/Sub Admin" role.'
10241020
}
1025-
}).catch(metaErr => {
1026-
logger.error({ msg: 'Failed to set authFlag metadata', app: appData.id, err: metaErr });
10271021
});
10281022
}
10291023

@@ -1057,13 +1051,11 @@ class OAuth2AppsHandler {
10571051
switch (err?.oauthRequest?.response?.error?.code) {
10581052
case 403:
10591053
// no permissions
1060-
this.setMeta(appData.id, {
1054+
this.setMetaFireAndForget(appData.id, {
10611055
authFlag: {
10621056
message:
10631057
'Service client does not have permission to manage Pub/Sub topics. Grant the service user the "Pub/Sub Admin" role.'
10641058
}
1065-
}).catch(metaErr => {
1066-
logger.error({ msg: 'Failed to set authFlag metadata', app: appData.id, err: metaErr });
10671059
});
10681060
throw err;
10691061
case 409:
@@ -1132,12 +1124,10 @@ class OAuth2AppsHandler {
11321124
switch (err?.oauthRequest?.response?.error?.code) {
11331125
case 403:
11341126
// no permissions
1135-
this.setMeta(appData.id, {
1127+
this.setMetaFireAndForget(appData.id, {
11361128
authFlag: {
11371129
message: 'Service client does not have permission to view Pub/Sub topics. Grant the service user the "Pub/Sub Admin" role.'
11381130
}
1139-
}).catch(metaErr => {
1140-
logger.error({ msg: 'Failed to set authFlag metadata', app: appData.id, err: metaErr });
11411131
});
11421132
throw err;
11431133
case 404: {
@@ -1190,13 +1180,11 @@ class OAuth2AppsHandler {
11901180
switch (err?.oauthRequest?.response?.error?.code) {
11911181
case 403:
11921182
// no permissions
1193-
this.setMeta(appData.id, {
1183+
this.setMetaFireAndForget(appData.id, {
11941184
authFlag: {
11951185
message:
11961186
'Service client does not have permission to manage Pub/Sub topics. Grant the service user the "Pub/Sub Admin" role.'
11971187
}
1198-
}).catch(metaErr => {
1199-
logger.error({ msg: 'Failed to set authFlag metadata', app: appData.id, err: metaErr });
12001188
});
12011189
throw err;
12021190
case 409:
@@ -1241,12 +1229,10 @@ class OAuth2AppsHandler {
12411229
switch (err?.oauthRequest?.response?.error?.code) {
12421230
case 403:
12431231
// no permissions
1244-
this.setMeta(appData.id, {
1232+
this.setMetaFireAndForget(appData.id, {
12451233
authFlag: {
12461234
message: 'Service client does not have permission to view Pub/Sub topics. Grant the service user the "Pub/Sub Admin" role.'
12471235
}
1248-
}).catch(metaErr => {
1249-
logger.error({ msg: 'Failed to set authFlag metadata', app: appData.id, err: metaErr });
12501236
});
12511237
throw err;
12521238
default:
@@ -1290,12 +1276,10 @@ class OAuth2AppsHandler {
12901276
switch (err?.oauthRequest?.response?.error?.code) {
12911277
case 403:
12921278
// no permissions
1293-
this.setMeta(appData.id, {
1279+
this.setMetaFireAndForget(appData.id, {
12941280
authFlag: {
12951281
message: 'Service client does not have permission to manage Pub/Sub topics. Grant the service user the "Pub/Sub Admin" role.'
12961282
}
1297-
}).catch(metaErr => {
1298-
logger.error({ msg: 'Failed to set authFlag metadata', app: appData.id, err: metaErr });
12991283
});
13001284
throw err;
13011285
default:

0 commit comments

Comments
 (0)