Skip to content

Commit bbc914c

Browse files
committed
CCM-12875: update so invalid events are removed before attempting an upload
1 parent cc6f3b6 commit bbc914c

File tree

4 files changed

+91
-35
lines changed

4 files changed

+91
-35
lines changed

lambdas/pdm-uploader-lambda/src/__tests__/apis/sqs-trigger-lambda.test.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,34 @@ describe('sqs-trigger-lambda', () => {
186186
);
187187
});
188188

189+
it('should handle unexpected error in validateRecord', async () => {
190+
const handler = createHandler({
191+
uploadToPdm: mockUploadToPdm,
192+
eventPublisher: mockEventPublisher,
193+
logger: mockLogger,
194+
});
195+
const sqsEvent = createValidSQSEvent({
196+
Records: [
197+
{
198+
...createValidSQSEvent().Records[0],
199+
body: 'I-am-not-json',
200+
},
201+
],
202+
});
203+
204+
const result = await handler(sqsEvent);
205+
206+
expect(result.batchItemFailures).toEqual([{ itemIdentifier: 'msg-1' }]);
207+
expect(mockLogger.error).toHaveBeenCalledWith(
208+
expect.objectContaining({
209+
description: 'Error parsing SQS record',
210+
err: expect.objectContaining({
211+
message: expect.stringContaining('Unexpected token'),
212+
}),
213+
}),
214+
);
215+
});
216+
189217
it('should handle invalid message body', async () => {
190218
const handler = createHandler({
191219
uploadToPdm: mockUploadToPdm,

lambdas/pdm-uploader-lambda/src/__tests__/infra/pdm-api-client.test.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -155,10 +155,7 @@ describe('PdmClient', () => {
155155
conditionalRetry.mockRejectedValue(mockError);
156156

157157
await expect(
158-
pdmClient.createDocumentReference(
159-
mockFhirRequest,
160-
mockRequestId,
161-
),
158+
pdmClient.createDocumentReference(mockFhirRequest, mockRequestId),
162159
).rejects.toThrow('Network error');
163160

164161
expect(mockLogger.error).toHaveBeenCalledWith({

lambdas/pdm-uploader-lambda/src/apis/sqs-trigger-lambda.ts

Lines changed: 61 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@ import type {
1212
import messageDownloadedValidator from 'digital-letters-events/MESHInboxMessageDownloaded.js';
1313
import pdmResourceSubmittedValidator from 'digital-letters-events/PDMResourceSubmitted.js';
1414
import pdmResourceSubmissionRejectedValidator from 'digital-letters-events/PDMResourceSubmissionRejected.js';
15-
import { MESHInboxMessageDownloaded, PDMResourceSubmitted } from 'digital-letters-events';
15+
import {
16+
MESHInboxMessageDownloaded,
17+
PDMResourceSubmitted,
18+
} from 'digital-letters-events';
1619
import { EventPublisher, Logger } from 'utils';
1720

1821
interface ProcessingResult {
@@ -26,12 +29,15 @@ interface CreateHandlerDependencies {
2629
logger: Logger;
2730
}
2831

29-
async function processRecord(
32+
interface ValidatedRecord {
33+
messageId: string;
34+
event: MESHInboxMessageDownloaded;
35+
}
36+
37+
function validateRecord(
3038
{ body, messageId }: { body: string; messageId: string },
31-
uploadToPdm: UploadToPdm,
3239
logger: Logger,
33-
batchItemFailures: SQSBatchItemFailure[],
34-
): Promise<ProcessingResult> {
40+
): ValidatedRecord | null {
3541
try {
3642
const sqsEventBody = JSON.parse(body);
3743
const sqsEventDetail = sqsEventBody.detail;
@@ -42,20 +48,34 @@ async function processRecord(
4248
err: messageDownloadedValidator.errors,
4349
description: 'Error parsing queue entry',
4450
});
45-
batchItemFailures.push({ itemIdentifier: messageId });
46-
return { result: { outcome: 'failed' }, item: sqsEventDetail };
51+
return null;
4752
}
4853

49-
const messageDownloadedEvent: MESHInboxMessageDownloaded = sqsEventDetail;
54+
return { messageId, event: sqsEventDetail };
55+
} catch (error) {
56+
logger.error({
57+
err: error,
58+
description: 'Error parsing SQS record',
59+
});
60+
return null;
61+
}
62+
}
5063

51-
const result = await uploadToPdm.send(messageDownloadedEvent);
64+
async function processRecord(
65+
{ event, messageId }: ValidatedRecord,
66+
uploadToPdm: UploadToPdm,
67+
logger: Logger,
68+
batchItemFailures: SQSBatchItemFailure[],
69+
): Promise<ProcessingResult> {
70+
try {
71+
const result = await uploadToPdm.send(event);
5272

5373
if (result.outcome === 'failed') {
5474
batchItemFailures.push({ itemIdentifier: messageId });
55-
return { result: { outcome: 'failed' }, item: sqsEventDetail };
75+
return { result: { outcome: 'failed' }, item: event };
5676
}
5777

58-
return { result, item: sqsEventDetail };
78+
return { result, item: event };
5979
} catch (error) {
6080
logger.error({
6181
err: error,
@@ -120,24 +140,25 @@ async function publishSuccessfulEvents(
120140
if (successfulItems.length === 0) return;
121141

122142
try {
123-
const submittedFailedEvents = await eventPublisher.sendEvents<PDMResourceSubmitted>(
124-
successfulItems.map(({ event, resourceId }) => ({
125-
...event,
126-
id: randomUUID(),
127-
time: new Date().toISOString(),
128-
recordedtime: new Date().toISOString(),
129-
type: 'uk.nhs.notify.digital.letters.pdm.resource.submitted.v1',
130-
dataschema:
131-
'https://notify.nhs.uk/cloudevents/schemas/digital-letters/2025-10-draft/data/digital-letters-pdm-resource-submitted-data.schema.json',
132-
source: event.source.replace(/\/mesh$/, '/pdm'),
133-
data: {
134-
messageReference: event.data.messageReference,
135-
senderId: event.data.senderId,
136-
resourceId,
137-
},
138-
})),
139-
pdmResourceSubmittedValidator,
140-
);
143+
const submittedFailedEvents =
144+
await eventPublisher.sendEvents<PDMResourceSubmitted>(
145+
successfulItems.map(({ event, resourceId }) => ({
146+
...event,
147+
id: randomUUID(),
148+
time: new Date().toISOString(),
149+
recordedtime: new Date().toISOString(),
150+
type: 'uk.nhs.notify.digital.letters.pdm.resource.submitted.v1',
151+
dataschema:
152+
'https://notify.nhs.uk/cloudevents/schemas/digital-letters/2025-10-draft/data/digital-letters-pdm-resource-submitted-data.schema.json',
153+
source: event.source.replace(/\/mesh$/, '/pdm'),
154+
data: {
155+
messageReference: event.data.messageReference,
156+
senderId: event.data.senderId,
157+
resourceId,
158+
},
159+
})),
160+
pdmResourceSubmittedValidator,
161+
);
141162
if (submittedFailedEvents.length > 0) {
142163
logger.warn({
143164
description: 'Some successful events failed to publish',
@@ -203,7 +224,17 @@ export const createHandler = ({
203224
async function handler(sqsEvent: SQSEvent): Promise<SQSBatchResponse> {
204225
const batchItemFailures: SQSBatchItemFailure[] = [];
205226

206-
const promises = sqsEvent.Records.map((record) =>
227+
const validatedRecords: ValidatedRecord[] = [];
228+
for (const record of sqsEvent.Records) {
229+
const validated = validateRecord(record, logger);
230+
if (validated) {
231+
validatedRecords.push(validated);
232+
} else {
233+
batchItemFailures.push({ itemIdentifier: record.messageId });
234+
}
235+
}
236+
237+
const promises = validatedRecords.map((record) =>
207238
processRecord(record, uploadToPdm, logger, batchItemFailures),
208239
);
209240

lambdas/pdm-uploader-lambda/src/app/upload-to-pdm.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ export class UploadToPdm {
1818
async send(event: MESHInboxMessageDownloaded): Promise<UploadToPdmResult> {
1919
try {
2020
const fhirRequest = await getS3ObjectFromUri(event.data.messageUri);
21-
const messageReference = event.data.messageReference;
21+
const { messageReference } = event.data;
2222

2323
const response = await this.pdmClient.createDocumentReference(
2424
fhirRequest,

0 commit comments

Comments
 (0)