Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions packages/batch/src/BasePartialProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,7 @@ abstract class BasePartialProcessor {
/**
* Processes records in parallel using `Promise.all`.
*/
async #processRecordsInParallel(): Promise<
(SuccessResponse | FailureResponse)[]
> {
#processRecordsInParallel(): Promise<(SuccessResponse | FailureResponse)[]> {
return Promise.all(
this.records.map((record) => this.processRecord(record))
);
Expand Down
2 changes: 1 addition & 1 deletion packages/batch/src/BatchProcessorSync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ import type { BaseRecord, FailureResponse, SuccessResponse } from './types.js';
*
* @param _record The record to be processed
*/
public async processRecord(
public processRecord(
_record: BaseRecord
): Promise<SuccessResponse | FailureResponse> {
throw new BatchProcessingError('Not implemented. Use process() instead.');
Expand Down
55 changes: 43 additions & 12 deletions packages/batch/tests/helpers/handlers.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type {
AttributeValue,
Context,
DynamoDBRecord,
KinesisStreamRecord,
Expand All @@ -8,50 +9,70 @@ import type {
const sqsRecordHandler = (record: SQSRecord): string => {
const body = record.body;
if (body.includes('fail')) {
throw Error('Failed to process record.');
throw new Error('Failed to process record.');
}

return body;
};

const asyncSqsRecordHandler = async (record: SQSRecord): Promise<string> =>
Promise.resolve(sqsRecordHandler(record));
const asyncSqsRecordHandler = async (record: SQSRecord): Promise<string> => {
const body = record.body;
if (body.includes('fail')) {
throw new Error('Failed to process record.');
}
await new Promise((resolve) => setTimeout(resolve, 1));
return body;
};

const kinesisRecordHandler = (record: KinesisStreamRecord): string => {
const body = record.kinesis.data;
if (body.includes('fail')) {
throw Error('Failed to process record.');
throw new Error('Failed to process record.');
}

return body;
};

const asyncKinesisRecordHandler = async (
record: KinesisStreamRecord
): Promise<string> => Promise.resolve(kinesisRecordHandler(record));
): Promise<string> => {
const body = record.kinesis.data;
if (body.includes('fail')) {
throw new Error('Failed to process record.');
}
await new Promise((resolve) => setTimeout(resolve, 1));
return body;
};

const dynamodbRecordHandler = (record: DynamoDBRecord): object => {
const dynamodbRecordHandler = (record: DynamoDBRecord): AttributeValue => {
const body = record.dynamodb?.NewImage?.Message || { S: 'fail' };
if (body.S?.includes('fail')) {
throw Error('Failed to process record.');
throw new Error('Failed to process record.');
}

return body;
};

const asyncDynamodbRecordHandler = async (
record: DynamoDBRecord
): Promise<object> => {
return Promise.resolve(dynamodbRecordHandler(record));
): Promise<AttributeValue> => {
const body = record.dynamodb?.NewImage?.Message || { S: 'fail' };
if (body.S?.includes('fail')) {
throw new Error('Failed to process record.');
}
await new Promise((resolve) => setTimeout(resolve, 1));
return body;
};

const handlerWithContext = (record: SQSRecord, context: Context): string => {
try {
if (context.getRemainingTimeInMillis() === 0) {
throw Error('No time remaining.');
throw new Error('No time remaining.');
}
} catch {
throw Error(`Context possibly malformed. Displaying context:\n${context}`);
throw new Error(
`Context possibly malformed. Displaying context:\n${context}`
);
}

return record.body;
Expand All @@ -61,7 +82,17 @@ const asyncHandlerWithContext = async (
record: SQSRecord,
context: Context
): Promise<string> => {
return Promise.resolve(handlerWithContext(record, context));
try {
if (context.getRemainingTimeInMillis() === 0) {
throw new Error('No time remaining.');
}
} catch {
throw new Error(
`Context possibly malformed. Displaying context:\n${context}`
);
}
await new Promise((resolve) => setTimeout(resolve, 1));
return record.body;
};

export {
Expand Down
2 changes: 1 addition & 1 deletion packages/batch/tests/unit/BasePartialProcessor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ describe('Class: BasePartialBatchProcessor', () => {
super(EventType.SQS);
}

public async processRecord(
public processRecord(
_record: BaseRecord
): Promise<SuccessResponse | FailureResponse> {
throw new Error('Not implemented');
Expand Down
2 changes: 1 addition & 1 deletion packages/batch/tests/unit/SqsFifoPartialProcessor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ describe('SQS FIFO Processors', () => {
});
}

it('continues processing and moves to the next group when `skipGroupOnError` is true', async () => {
it('continues processing and moves to the next group when `skipGroupOnError` is true', () => {
// Prepare
const firstRecord = sqsRecordFactory('fail', '1');
const secondRecord = sqsRecordFactory('success', '2');
Expand Down
6 changes: 3 additions & 3 deletions packages/batch/tests/unit/processPartialResponse.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ describe('Function: processPartialResponse()', () => {
context,
};

const handlerWithSqsEvent = async (
const handlerWithSqsEvent = (
event: SQSEvent,
options: BatchProcessingOptions
) => {
Expand All @@ -51,7 +51,7 @@ describe('Function: processPartialResponse()', () => {
return handler(event, context);
};

const handlerWithKinesisEvent = async (
const handlerWithKinesisEvent = (
event: KinesisStreamEvent,
options: BatchProcessingOptions
) => {
Expand All @@ -71,7 +71,7 @@ describe('Function: processPartialResponse()', () => {
return handler(event, context);
};

const handlerWithDynamoDBEvent = async (
const handlerWithDynamoDBEvent = (
event: DynamoDBStreamEvent,
options: BatchProcessingOptions
) => {
Expand Down
Loading