diff --git a/packages/batch/src/BasePartialProcessor.ts b/packages/batch/src/BasePartialProcessor.ts index 23b7cdc9cc..da09f22557 100644 --- a/packages/batch/src/BasePartialProcessor.ts +++ b/packages/batch/src/BasePartialProcessor.ts @@ -239,9 +239,7 @@ abstract class BasePartialProcessor { /** * Processes records in parallel using `Promise.all`. */ - async #processRecordsInParallel(): Promise< - (SuccessResponse | FailureResponse)[] - > { + #processRecordsInParallel(): Promise<(SuccessResponse | FailureResponse)[]> { return Promise.all( this.records.map((record) => this.processRecord(record)) ); diff --git a/packages/batch/src/BatchProcessorSync.ts b/packages/batch/src/BatchProcessorSync.ts index a3613a3d51..45f47a6e21 100644 --- a/packages/batch/src/BatchProcessorSync.ts +++ b/packages/batch/src/BatchProcessorSync.ts @@ -88,7 +88,7 @@ import type { BaseRecord, FailureResponse, SuccessResponse } from './types.js'; * * @param _record The record to be processed */ - public async processRecord( + public processRecord( _record: BaseRecord ): Promise { throw new BatchProcessingError('Not implemented. Use process() instead.'); diff --git a/packages/batch/tests/helpers/handlers.ts b/packages/batch/tests/helpers/handlers.ts index 24dff02799..5f0b63eb88 100644 --- a/packages/batch/tests/helpers/handlers.ts +++ b/packages/batch/tests/helpers/handlers.ts @@ -1,4 +1,6 @@ +import { setTimeout } from 'node:timers/promises'; import type { + AttributeValue, Context, DynamoDBRecord, KinesisStreamRecord, @@ -8,19 +10,25 @@ import type { const sqsRecordHandler = (record: SQSRecord): string => { const body = record.body; if (body.includes('fail')) { - throw Error('Failed to process record.'); + throw new Error('Failed to process record.'); } return body; }; -const asyncSqsRecordHandler = async (record: SQSRecord): Promise => - Promise.resolve(sqsRecordHandler(record)); +const asyncSqsRecordHandler = async (record: SQSRecord): Promise => { + const body = record.body; + if (body.includes('fail')) { + throw new Error('Failed to process record.'); + } + await setTimeout(1); // simulate some processing time + return body; +}; const kinesisRecordHandler = (record: KinesisStreamRecord): string => { const body = record.kinesis.data; if (body.includes('fail')) { - throw Error('Failed to process record.'); + throw new Error('Failed to process record.'); } return body; @@ -28,12 +36,19 @@ const kinesisRecordHandler = (record: KinesisStreamRecord): string => { const asyncKinesisRecordHandler = async ( record: KinesisStreamRecord -): Promise => Promise.resolve(kinesisRecordHandler(record)); +): Promise => { + const body = record.kinesis.data; + if (body.includes('fail')) { + throw new Error('Failed to process record.'); + } + await setTimeout(1); // simulate some processing time + return body; +}; -const dynamodbRecordHandler = (record: DynamoDBRecord): object => { +const dynamodbRecordHandler = (record: DynamoDBRecord): AttributeValue => { const body = record.dynamodb?.NewImage?.Message || { S: 'fail' }; if (body.S?.includes('fail')) { - throw Error('Failed to process record.'); + throw new Error('Failed to process record.'); } return body; @@ -41,17 +56,24 @@ const dynamodbRecordHandler = (record: DynamoDBRecord): object => { const asyncDynamodbRecordHandler = async ( record: DynamoDBRecord -): Promise => { - return Promise.resolve(dynamodbRecordHandler(record)); +): Promise => { + const body = record.dynamodb?.NewImage?.Message || { S: 'fail' }; + if (body.S?.includes('fail')) { + throw new Error('Failed to process record.'); + } + await setTimeout(1); // simulate some processing time + return body; }; const handlerWithContext = (record: SQSRecord, context: Context): string => { try { if (context.getRemainingTimeInMillis() === 0) { - throw Error('No time remaining.'); + throw new Error('No time remaining.'); } } catch { - throw Error(`Context possibly malformed. Displaying context:\n${context}`); + throw new Error( + `Context possibly malformed. Displaying context:\n${context}` + ); } return record.body; @@ -61,7 +83,17 @@ const asyncHandlerWithContext = async ( record: SQSRecord, context: Context ): Promise => { - return Promise.resolve(handlerWithContext(record, context)); + try { + if (context.getRemainingTimeInMillis() === 0) { + throw new Error('No time remaining.'); + } + } catch { + throw new Error( + `Context possibly malformed. Displaying context:\n${context}` + ); + } + await setTimeout(1); // simulate some processing time + return record.body; }; export { diff --git a/packages/batch/tests/unit/BasePartialProcessor.test.ts b/packages/batch/tests/unit/BasePartialProcessor.test.ts index 6c5421c01b..5e61ebe294 100644 --- a/packages/batch/tests/unit/BasePartialProcessor.test.ts +++ b/packages/batch/tests/unit/BasePartialProcessor.test.ts @@ -25,7 +25,7 @@ describe('Class: BasePartialBatchProcessor', () => { super(EventType.SQS); } - public async processRecord( + public processRecord( _record: BaseRecord ): Promise { throw new Error('Not implemented'); diff --git a/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts b/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts index 6071464039..a4b55da062 100644 --- a/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts +++ b/packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts @@ -186,7 +186,7 @@ describe('SQS FIFO Processors', () => { }); } - it('continues processing and moves to the next group when `skipGroupOnError` is true', async () => { + it('continues processing and moves to the next group when `skipGroupOnError` is true', () => { // Prepare const firstRecord = sqsRecordFactory('fail', '1'); const secondRecord = sqsRecordFactory('success', '2'); diff --git a/packages/batch/tests/unit/processPartialResponse.test.ts b/packages/batch/tests/unit/processPartialResponse.test.ts index d9c6d56a20..60b6a31cb4 100644 --- a/packages/batch/tests/unit/processPartialResponse.test.ts +++ b/packages/batch/tests/unit/processPartialResponse.test.ts @@ -36,10 +36,10 @@ describe('Function: processPartialResponse()', () => { context, }; - const handlerWithSqsEvent = async ( + const handlerWithSqsEvent = ( event: SQSEvent, options: BatchProcessingOptions - ) => { + ): Promise => { const processor = new BatchProcessor(EventType.SQS); const handler = async ( @@ -51,10 +51,10 @@ describe('Function: processPartialResponse()', () => { return handler(event, context); }; - const handlerWithKinesisEvent = async ( + const handlerWithKinesisEvent = ( event: KinesisStreamEvent, options: BatchProcessingOptions - ) => { + ): Promise => { const processor = new BatchProcessor(EventType.KinesisDataStreams); const handler = async ( @@ -71,10 +71,10 @@ describe('Function: processPartialResponse()', () => { return handler(event, context); }; - const handlerWithDynamoDBEvent = async ( + const handlerWithDynamoDBEvent = ( event: DynamoDBStreamEvent, options: BatchProcessingOptions - ) => { + ): Promise => { const processor = new BatchProcessor(EventType.DynamoDBStreams); const handler = async (