Skip to content

Commit c5630b3

Browse files
authored
Merge pull request #207 from vernu/dev
enhance sms queue processing
2 parents fb3b2c1 + 4da8570 commit c5630b3

File tree

4 files changed

+196
-94
lines changed

4 files changed

+196
-94
lines changed

api/src/gateway/gateway.module.ts

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { SMSBatch, SMSBatchSchema } from './schemas/sms-batch.schema'
1010
import { WebhookModule } from 'src/webhook/webhook.module'
1111
import { BillingModule } from 'src/billing/billing.module'
1212
import { BullModule } from '@nestjs/bull'
13-
import { ConfigModule } from '@nestjs/config'
13+
import { ConfigModule, ConfigService } from '@nestjs/config'
1414
import { SmsQueueService } from './queue/sms-queue.service'
1515
import { SmsQueueProcessor } from './queue/sms-queue.processor'
1616
import { SmsStatusUpdateTask } from './tasks/sms-status-update.task'
@@ -32,17 +32,25 @@ import { HeartbeatCheckTask } from './tasks/heartbeat-check.task'
3232
schema: SMSBatchSchema,
3333
},
3434
]),
35-
BullModule.registerQueue({
35+
BullModule.registerQueueAsync({
3636
name: 'sms',
37-
defaultJobOptions: {
38-
attempts: 2,
39-
backoff: {
40-
type: 'exponential',
41-
delay: 1000,
37+
imports: [ConfigModule],
38+
inject: [ConfigService],
39+
useFactory: async (configService: ConfigService) => ({
40+
limiter: {
41+
max: configService.get<number>('SMS_QUEUE_LIMITER_MAX', 20),
42+
duration: configService.get<number>('SMS_QUEUE_LIMITER_DURATION_MS', 1000),
4243
},
43-
removeOnComplete: { age: 24 * 3600 }, // 24 hours
44-
removeOnFail: { age: 72 * 3600 }, // 72 hours
45-
},
44+
defaultJobOptions: {
45+
attempts: 2,
46+
backoff: {
47+
type: 'exponential',
48+
delay: 1000,
49+
},
50+
removeOnComplete: { age: 24 * 3600 }, // 24 hours
51+
removeOnFail: { age: 72 * 3600 }, // 72 hours
52+
},
53+
}),
4654
}),
4755
AuthModule,
4856
UsersModule,

api/src/gateway/gateway.service.ts

Lines changed: 70 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,13 @@ export class GatewayService {
478478

479479
// Track FCM messages with their calculated delays for grouping
480480
const fcmMessagesWithDelays: Array<{ message: Message; delayMs?: number }> = []
481+
const smsDocumentsToInsert: Array<Record<string, any>> = []
482+
const smsToFcmMetadata: Array<{
483+
recipient: string
484+
message: string
485+
simSubscriptionId?: number
486+
delayMs?: number
487+
}> = []
481488

482489
for (const smsData of messages) {
483490
const message = smsData.message
@@ -495,8 +502,8 @@ export class GatewayService {
495502
const delayMs = this.calculateDelayFromScheduledAt(smsData.scheduledAt)
496503

497504
for (let recipient of recipients) {
498-
recipient = recipient.replace(/\s+/g, "")
499-
const sms = await this.smsModel.create({
505+
recipient = recipient.replace(/\s+/g, "")
506+
smsDocumentsToInsert.push({
500507
device: device._id,
501508
smsBatch: smsBatch._id,
502509
message: message,
@@ -508,32 +515,73 @@ export class GatewayService {
508515
simSubscriptionId: smsData.simSubscriptionId,
509516
}),
510517
})
511-
const updatedSMSData = {
512-
smsId: sms._id,
513-
smsBatchId: smsBatch._id,
518+
smsToFcmMetadata.push({
519+
recipient,
514520
message,
515-
recipients: [recipient],
516521
...(smsData.simSubscriptionId !== undefined && {
517522
simSubscriptionId: smsData.simSubscriptionId,
518523
}),
524+
delayMs,
525+
})
526+
}
527+
}
519528

520-
// Legacy fields to be removed in the future
521-
smsBody: message,
522-
receivers: [recipient],
523-
}
524-
const stringifiedSMSData = JSON.stringify(updatedSMSData)
529+
const insertChunkSize = 500
530+
const insertedSmsDocs: any[] = []
531+
const hasInsertMany = typeof (this.smsModel as any).insertMany === 'function'
532+
for (let i = 0; i < smsDocumentsToInsert.length; i += insertChunkSize) {
533+
const chunk = smsDocumentsToInsert.slice(i, i + insertChunkSize)
534+
if (hasInsertMany) {
535+
const insertedChunk = await (this.smsModel as any).insertMany(chunk, { ordered: true })
536+
insertedSmsDocs.push(...insertedChunk)
537+
continue
538+
}
525539

526-
const fcmMessage: Message = {
527-
data: {
528-
smsData: stringifiedSMSData,
529-
},
530-
token: device.fcmToken,
531-
android: {
532-
priority: 'high',
533-
},
534-
}
535-
fcmMessagesWithDelays.push({ message: fcmMessage, delayMs })
540+
// Fallback for mocked/non-standard models that don't expose insertMany
541+
for (const smsDocument of chunk) {
542+
const createdSmsDoc = await this.smsModel.create(smsDocument)
543+
insertedSmsDocs.push(createdSmsDoc)
544+
}
545+
}
546+
547+
if (insertedSmsDocs.length !== smsToFcmMetadata.length) {
548+
throw new HttpException(
549+
{
550+
success: false,
551+
error: 'Failed to map created SMS records to queue payload',
552+
},
553+
HttpStatus.INTERNAL_SERVER_ERROR,
554+
)
555+
}
556+
557+
for (let i = 0; i < insertedSmsDocs.length; i++) {
558+
const sms = insertedSmsDocs[i]
559+
const metadata = smsToFcmMetadata[i]
560+
const updatedSMSData = {
561+
smsId: sms._id,
562+
smsBatchId: smsBatch._id,
563+
message: metadata.message,
564+
recipients: [metadata.recipient],
565+
...(metadata.simSubscriptionId !== undefined && {
566+
simSubscriptionId: metadata.simSubscriptionId,
567+
}),
568+
569+
// Legacy fields to be removed in the future
570+
smsBody: metadata.message,
571+
receivers: [metadata.recipient],
572+
}
573+
const stringifiedSMSData = JSON.stringify(updatedSMSData)
574+
575+
const fcmMessage: Message = {
576+
data: {
577+
smsData: stringifiedSMSData,
578+
},
579+
token: device.fcmToken,
580+
android: {
581+
priority: 'high',
582+
},
536583
}
584+
fcmMessagesWithDelays.push({ message: fcmMessage, delayMs: metadata.delayMs })
537585
}
538586

539587
// Check if we should use the queue
@@ -673,7 +721,7 @@ export class GatewayService {
673721
!dto.sender ||
674722
!dto.message
675723
) {
676-
console.log('Invalid received SMS data')
724+
console.error(`receiveSMS: Invalid received SMS data (sender: ${dto.sender}, message: ${dto.message}) (receivedAt: ${dto.receivedAt}, receivedAtInMillis: ${dto.receivedAtInMillis})`)
677725
throw new HttpException(
678726
{
679727
success: false,

api/src/gateway/queue/sms-queue.processor.ts

Lines changed: 100 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -89,37 +89,23 @@ export class SmsQueueProcessor {
8989
// )
9090

9191
// Mark individual SMS records as failed when their FCM push failed
92+
const failedSmsIds: string[] = []
93+
const failedUpdates: Array<{
94+
smsId: string
95+
errorCode: string
96+
errorMessage: string
97+
}> = []
9298
for (let i = 0; i < response.responses.length; i++) {
9399
if (!response.responses[i].success) {
94100
try {
95101
const smsData = JSON.parse(fcmMessages[i].data.smsData)
96102
const fcmError = response.responses[i].error
97-
const updatedSms = await this.smsModel.findByIdAndUpdate(
98-
smsData.smsId,
99-
{
100-
$set: {
101-
status: 'failed',
102-
failedAt: new Date(),
103-
errorCode: getFcmErrorCode(fcmError ?? undefined),
104-
errorMessage: getFcmErrorMessage(fcmError ?? undefined),
105-
},
106-
},
107-
{ new: true },
108-
)
109-
if (device?.user && updatedSms) {
110-
await this.webhookService
111-
.deliverNotification({
112-
sms: updatedSms,
113-
user: device.user as any,
114-
event: WebhookEvent.MESSAGE_FAILED,
115-
})
116-
.catch((e) =>
117-
this.logger.warn(
118-
`Webhook delivery failed for SMS ${updatedSms._id}`,
119-
e?.message,
120-
),
121-
)
122-
}
103+
failedSmsIds.push(String(smsData.smsId))
104+
failedUpdates.push({
105+
smsId: String(smsData.smsId),
106+
errorCode: getFcmErrorCode(fcmError ?? undefined),
107+
errorMessage: getFcmErrorMessage(fcmError ?? undefined),
108+
})
123109
} catch (parseError) {
124110
this.logger.error(
125111
`Failed to mark SMS as failed for FCM message index ${i}`,
@@ -131,13 +117,12 @@ export class SmsQueueProcessor {
131117

132118
// Mark individual SMS records as dispatched when FCM push succeeded
133119
const now = new Date()
120+
const dispatchedSmsIds: string[] = []
134121
for (let i = 0; i < response.responses.length; i++) {
135122
if (response.responses[i].success) {
136123
try {
137124
const smsData = JSON.parse(fcmMessages[i].data.smsData)
138-
await this.smsModel.findByIdAndUpdate(smsData.smsId, {
139-
$set: { status: 'dispatched', dispatchedAt: now },
140-
})
125+
dispatchedSmsIds.push(String(smsData.smsId))
141126
} catch (parseError) {
142127
this.logger.error(
143128
`Failed to mark SMS as dispatched for FCM message index ${i}`,
@@ -147,6 +132,52 @@ export class SmsQueueProcessor {
147132
}
148133
}
149134

135+
if (failedUpdates.length > 0) {
136+
const failedAt = new Date()
137+
for (const failedUpdate of failedUpdates) {
138+
await this.smsModel.updateOne(
139+
{ _id: failedUpdate.smsId as any },
140+
{
141+
$set: {
142+
status: 'failed',
143+
failedAt,
144+
errorCode: failedUpdate.errorCode,
145+
errorMessage: failedUpdate.errorMessage,
146+
},
147+
},
148+
)
149+
}
150+
}
151+
152+
if (dispatchedSmsIds.length > 0) {
153+
await this.smsModel.updateMany(
154+
{ _id: { $in: dispatchedSmsIds } as any },
155+
{
156+
$set: { status: 'dispatched', dispatchedAt: now },
157+
},
158+
)
159+
}
160+
161+
if (device?.user && failedSmsIds.length > 0) {
162+
const failedSmsDocuments = await this.smsModel.find({
163+
_id: { $in: failedSmsIds },
164+
})
165+
for (const failedSms of failedSmsDocuments) {
166+
await this.webhookService
167+
.deliverNotification({
168+
sms: failedSms,
169+
user: device.user as any,
170+
event: WebhookEvent.MESSAGE_FAILED,
171+
})
172+
.catch((e) =>
173+
this.logger.warn(
174+
`Webhook delivery failed for SMS ${failedSms._id}`,
175+
e?.message,
176+
),
177+
)
178+
}
179+
}
180+
150181
// Update device SMS count
151182
await this.deviceModel
152183
.findByIdAndUpdate(deviceId, {
@@ -181,38 +212,11 @@ export class SmsQueueProcessor {
181212
this.logger.error(`Failed to process SMS job ${job.id}`, error)
182213

183214
// Mark all individual SMS in this batch of FCM messages as failed
215+
const failedSmsIds: string[] = []
184216
for (const fcmMessage of fcmMessages) {
185217
try {
186218
const smsData = JSON.parse(fcmMessage.data.smsData)
187-
const updatedSms = await this.smsModel.findByIdAndUpdate(
188-
smsData.smsId,
189-
{
190-
$set: {
191-
status: 'failed',
192-
failedAt: new Date(),
193-
errorCode:
194-
(error as any)?.code != null
195-
? getFcmErrorCode(error as any)
196-
: 'FCM_SEND_ERROR',
197-
errorMessage: getFcmErrorMessage(error as any),
198-
},
199-
},
200-
{ new: true },
201-
)
202-
if (device?.user && updatedSms) {
203-
await this.webhookService
204-
.deliverNotification({
205-
sms: updatedSms,
206-
user: device.user as any,
207-
event: WebhookEvent.MESSAGE_FAILED,
208-
})
209-
.catch((e) =>
210-
this.logger.warn(
211-
`Webhook delivery failed for SMS ${updatedSms._id}`,
212-
e?.message,
213-
),
214-
)
215-
}
219+
failedSmsIds.push(String(smsData.smsId))
216220
} catch (parseError) {
217221
this.logger.error(
218222
'Failed to mark SMS as failed after FCM error',
@@ -221,6 +225,44 @@ export class SmsQueueProcessor {
221225
}
222226
}
223227

228+
if (failedSmsIds.length > 0) {
229+
const failedAt = new Date()
230+
await this.smsModel.updateMany(
231+
{ _id: { $in: failedSmsIds } as any },
232+
{
233+
$set: {
234+
status: 'failed',
235+
failedAt,
236+
errorCode:
237+
(error as any)?.code != null
238+
? getFcmErrorCode(error as any)
239+
: 'FCM_SEND_ERROR',
240+
errorMessage: getFcmErrorMessage(error as any),
241+
},
242+
},
243+
)
244+
}
245+
246+
if (device?.user && failedSmsIds.length > 0) {
247+
const failedSmsDocuments = await this.smsModel.find({
248+
_id: { $in: failedSmsIds },
249+
})
250+
for (const failedSms of failedSmsDocuments) {
251+
await this.webhookService
252+
.deliverNotification({
253+
sms: failedSms,
254+
user: device.user as any,
255+
event: WebhookEvent.MESSAGE_FAILED,
256+
})
257+
.catch((e) =>
258+
this.logger.warn(
259+
`Webhook delivery failed for SMS ${failedSms._id}`,
260+
e?.message,
261+
),
262+
)
263+
}
264+
}
265+
224266
const smsBatch = await this.smsBatchModel.findByIdAndUpdate(
225267
smsBatchId,
226268
{

0 commit comments

Comments
 (0)