Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
d625ed2
Integrated Parser for the SQS event type
sdangol Sep 2, 2025
4e69aec
Removed the parsing from the toBatchType function and created it's ow…
sdangol Sep 3, 2025
0a60d4f
Added test to check for invalid event type
sdangol Sep 3, 2025
d0b3d23
Added documentation for the function
sdangol Sep 3, 2025
d6d81f9
Moved the build order of parser before the batch processor
sdangol Sep 3, 2025
b9ef42b
Added condition to do the extended schema parsing only when a zod sch…
sdangol Sep 3, 2025
8c1ad65
Moved the parser as a dev dependency
sdangol Sep 3, 2025
e7c8585
Updated lock file
sdangol Sep 3, 2025
b30e3be
Integrated Parser for the DynamoDB event type
sdangol Sep 3, 2025
f248e9f
Fixed the tests for DynamoDB record processing
sdangol Sep 3, 2025
7120337
Removed unused imports and unused comments
sdangol Sep 3, 2025
b33832f
Integrated Parser for the Kinesis event type
sdangol Sep 4, 2025
f1e8f70
Added tests for parser integration with kinesis and some refactoring
sdangol Sep 4, 2025
8322cdc
Merge branch 'main' into feat/parser-integration-batch-processing
dreamorosi Sep 4, 2025
ce3fc4f
Fixed the SonarQube finding
sdangol Sep 4, 2025
7c4cba8
Merge branch 'feat/parser-integration-batch-processing' of github.com…
sdangol Sep 4, 2025
ee115dd
Marked schema property as protected
sdangol Sep 4, 2025
f36d513
Added braces for the default block
sdangol Sep 4, 2025
29da8ff
Implemented parsing with and without the transformers
sdangol Sep 4, 2025
4f38337
Refactored the tests to reduce duplicate code
sdangol Sep 5, 2025
e73a331
Merge branch 'main' into feat/parser-integration-batch-processing
dreamorosi Sep 5, 2025
b314fb7
Fixed formatting and documentation issues
sdangol Sep 8, 2025
3ec45b2
Merge branch 'feat/parser-integration-batch-processing' of github.com…
sdangol Sep 8, 2025
016394b
Merge branch 'main' into feat/parser-integration-batch-processing
sdangol Sep 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
"packages/tracer",
"packages/parameters",
"packages/idempotency",
"packages/parser",
"packages/batch",
"packages/testing",
"packages/parser",
"examples/snippets",
"layers",
"examples/app",
Expand Down
3 changes: 2 additions & 1 deletion packages/batch/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
"nodejs"
],
"devDependencies": {
"@aws-lambda-powertools/testing-utils": "file:../testing"
"@aws-lambda-powertools/testing-utils": "file:../testing",
"@aws-lambda-powertools/parser": "2.25.2"
}
}
15 changes: 14 additions & 1 deletion packages/batch/src/BasePartialBatchProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { StandardSchemaV1 } from '@standard-schema/spec';
import type {
DynamoDBRecord,
KinesisStreamRecord,
Expand All @@ -11,6 +12,7 @@ import {
} from './constants.js';
import { FullBatchFailureError } from './errors.js';
import type {
BasePartialBatchProcessorConfig,
EventSourceDataClassTypes,
PartialItemFailureResponse,
PartialItemFailures,
Expand Down Expand Up @@ -42,12 +44,20 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
*/
public eventType: keyof typeof EventType;

/**
* The schema of the body of the event record for parsing
*/
protected schema?: StandardSchemaV1;

/**
* Initializes base batch processing class
*
* @param eventType The type of event to process (SQS, Kinesis, DynamoDB)
*/
public constructor(eventType: keyof typeof EventType) {
public constructor(
eventType: keyof typeof EventType,
config?: BasePartialBatchProcessorConfig
) {
super();
this.eventType = eventType;
this.batchResponse = DEFAULT_RESPONSE;
Expand All @@ -56,6 +66,9 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
[EventType.KinesisDataStreams]: () => this.collectKinesisFailures(),
[EventType.DynamoDBStreams]: () => this.collectDynamoDBFailures(),
};
if (config) {
this.schema = config.schema;
}
}

/**
Expand Down
18 changes: 9 additions & 9 deletions packages/batch/src/BasePartialProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ abstract class BasePartialProcessor {
* This method should be called when a record fails processing so that
* the processor can keep track of the error and the record that failed.
*
* @param record Record that failed processing
* @param error Error that was thrown
* @param record - Record that failed processing
* @param error - Error that was thrown
*/
public failureHandler(
record: EventSourceDataClassTypes,
Expand Down Expand Up @@ -131,7 +131,7 @@ abstract class BasePartialProcessor {
* This is to ensure that the processor keeps track of the results and the records
* that succeeded and failed processing.
*
* @param record Record to be processed
* @param record - Record to be processed
*/
public abstract processRecord(
record: BaseRecord
Expand All @@ -149,7 +149,7 @@ abstract class BasePartialProcessor {
* This is to ensure that the processor keeps track of the results and the records
* that succeeded and failed processing.
*
* @param record Record to be processed
* @param record - Record to be processed
*/
public abstract processRecordSync(
record: BaseRecord
Expand Down Expand Up @@ -198,9 +198,9 @@ abstract class BasePartialProcessor {
* to allow for reusing the processor instance across multiple invocations
* by instantiating the processor outside of the Lambda function handler.
*
* @param records Array of records to be processed
* @param handler CallableFunction to process each record from the batch
* @param options Options to be used during processing (optional)
* @param records - Array of records to be processed
* @param handler - CallableFunction to process each record from the batch
* @param options - Options to be used during processing (optional)
*/
public register(
records: BaseRecord[],
Expand All @@ -223,8 +223,8 @@ abstract class BasePartialProcessor {
* This method should be called when a record succeeds processing so that
* the processor can keep track of the result and the record that succeeded.
*
* @param record Record that succeeded processing
* @param result Result from record handler
* @param record - Record that succeeded processing
* @param result - Result from record handler
*/
public successHandler(
record: EventSourceDataClassTypes,
Expand Down
179 changes: 174 additions & 5 deletions packages/batch/src/BatchProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
import type { StandardSchemaV1 } from '@standard-schema/spec';
import type { StreamRecord } from 'aws-lambda';
import { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js';
import { EventType, SchemaVendor } from './constants.js';
import { BatchProcessingError } from './errors.js';
import type { BaseRecord, FailureResponse, SuccessResponse } from './types.js';
import type {
BaseRecord,
EventSourceDataClassTypes,
FailureResponse,
SuccessResponse,
} from './types.js';

/**
* Process records in a batch asynchronously and handle partial failure cases.
Expand Down Expand Up @@ -79,7 +87,7 @@ import type { BaseRecord, FailureResponse, SuccessResponse } from './types.js';
* });
* ```
*
* @param eventType The type of event to process (SQS, Kinesis, DynamoDB)
* @param eventType - The type of event to process (SQS, Kinesis, DynamoDB)
*/
class BatchProcessor extends BasePartialBatchProcessor {
/**
Expand All @@ -94,13 +102,17 @@ class BatchProcessor extends BasePartialBatchProcessor {
* If the handler function completes successfully, the method returns a success response.
* Otherwise, it returns a failure response with the error that occurred during processing.
*
* @param record The record to be processed
* @param record - The record to be processed
*/
public async processRecord(
record: BaseRecord
): Promise<SuccessResponse | FailureResponse> {
try {
const data = this.toBatchType(record, this.eventType);
const recordToProcess =
this.schema == null
? record
: await this.#parseRecord(record, this.eventType, this.schema);
const data = this.toBatchType(recordToProcess, this.eventType);
const result = await this.handler(data, this.options?.context);

return this.successHandler(record, result);
Expand All @@ -112,7 +124,7 @@ class BatchProcessor extends BasePartialBatchProcessor {
/**
* @throws {BatchProcessingError} This method is not implemented for synchronous processing.
*
* @param _record The record to be processed
* @param _record - The record to be processed
*/
public processRecordSync(
_record: BaseRecord
Expand All @@ -121,6 +133,163 @@ class BatchProcessor extends BasePartialBatchProcessor {
'Not implemented. Use asyncProcess() instead.'
);
}

/**
* Extend the schema according to the event type passed.
*
* If useTransformers is true, extend using opinionated transformers.
* Otherwise, extend without any transformers.
*
* @param eventType - The type of event to process (SQS, Kinesis, DynamoDB)
* @param schema - The StandardSchema to be used for parsing
* @param useTransformers - Whether to use transformers for parsing
*/
async #createExtendedSchema(options: {
eventType: keyof typeof EventType;
schema: StandardSchemaV1;
useTransformers: boolean;
}) {
const { eventType, schema, useTransformers } = options;
switch (eventType) {
case EventType.SQS: {
if (useTransformers) {
const [{ JSONStringified }, { SqsRecordSchema }] = await Promise.all([
import('@aws-lambda-powertools/parser/helpers'),
import('@aws-lambda-powertools/parser/schemas/sqs'),
]);
return SqsRecordSchema.extend({
body: JSONStringified(schema as any),
});
}
const { SqsRecordSchema } = await import(
'@aws-lambda-powertools/parser/schemas/sqs'
);
return SqsRecordSchema.extend({ body: schema });
}

case EventType.KinesisDataStreams: {
if (useTransformers) {
const [
{ Base64Encoded },
{ KinesisDataStreamRecord, KinesisDataStreamRecordPayload },
] = await Promise.all([
import('@aws-lambda-powertools/parser/helpers'),
import('@aws-lambda-powertools/parser/schemas/kinesis'),
]);
return KinesisDataStreamRecord.extend({
kinesis: KinesisDataStreamRecordPayload.extend({
data: Base64Encoded(schema as any),
}),
});
}
const { KinesisDataStreamRecord, KinesisDataStreamRecordPayload } =
await import('@aws-lambda-powertools/parser/schemas/kinesis');
return KinesisDataStreamRecord.extend({
kinesis: KinesisDataStreamRecordPayload.extend({ data: schema }),
});
}

case EventType.DynamoDBStreams: {
if (useTransformers) {
const [
{ DynamoDBMarshalled },
{ DynamoDBStreamRecord, DynamoDBStreamChangeRecordBase },
] = await Promise.all([
import('@aws-lambda-powertools/parser/helpers/dynamodb'),
import('@aws-lambda-powertools/parser/schemas/dynamodb'),
]);
return DynamoDBStreamRecord.extend({
dynamodb: DynamoDBStreamChangeRecordBase.extend({
OldImage: DynamoDBMarshalled<StreamRecord['OldImage']>(
schema as any
).optional(),
NewImage: DynamoDBMarshalled<StreamRecord['NewImage']>(
schema as any
).optional(),
}),
});
}
const { DynamoDBStreamRecord, DynamoDBStreamChangeRecordBase } =
await import('@aws-lambda-powertools/parser/schemas/dynamodb');
return DynamoDBStreamRecord.extend({
dynamodb: DynamoDBStreamChangeRecordBase.extend({
OldImage: (schema as any).optional(),
NewImage: (schema as any).optional(),
}),
});
}

default: {
console.warn(
`The event type provided is not supported. Supported events: ${Object.values(EventType).join(',')}`
);
throw new Error('Unsupported event type');
}
}
}

/**
* Parse the record according to the schema and event type passed.
*
* If the passed schema is already an extended schema,
* use the schema directly to parse the record.
*
* Only Zod Schemas are supported for schema extension.
*
* @param record - The record to be parsed
* @param eventType - The type of event to process
* @param schema - The StandardSchema to be used for parsing
*/
async #parseRecord(
record: EventSourceDataClassTypes,
eventType: keyof typeof EventType,
schema: StandardSchemaV1
): Promise<EventSourceDataClassTypes> {
const { parse } = await import('@aws-lambda-powertools/parser');
// Try parsing with the original schema first
const extendedSchemaParsing = parse(record, undefined, schema, true);
if (extendedSchemaParsing.success) {
return extendedSchemaParsing.data as EventSourceDataClassTypes;
}
// Only proceed with schema extension if it's a Zod schema
if (schema['~standard'].vendor !== SchemaVendor.Zod) {
console.warn(
'The schema provided is not supported. Only Zod schemas are supported for extension.'
);
throw new Error('Unsupported schema type');
}
// Handle schema extension based on event type
// Try without transformers first, then with transformers
const schemaWithoutTransformers = await this.#createExtendedSchema({
eventType,
schema,
useTransformers: false,
});
const schemaWithoutTransformersParsing = parse(
record,
undefined,
schemaWithoutTransformers,
true
);
if (schemaWithoutTransformersParsing.success) {
return schemaWithoutTransformersParsing.data as EventSourceDataClassTypes;
}
const schemaWithTransformers = await this.#createExtendedSchema({
eventType,
schema,
useTransformers: true,
});
const schemaWithTransformersParsing = parse(
record,
undefined,
schemaWithTransformers,
true
);
if (schemaWithTransformersParsing.success) {
return schemaWithTransformersParsing.data as EventSourceDataClassTypes;
}
throw new Error('Failed to parse record');
}
}

export { BatchProcessor };
9 changes: 8 additions & 1 deletion packages/batch/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ const EventType = {
DynamoDBStreams: 'DynamoDBStreams',
} as const;

/**
* Enum of supported schema vendors for the utility
*/
const SchemaVendor = {
Zod: 'zod',
} as const;

/**
* Default response for the partial batch processor
*/
Expand All @@ -35,4 +42,4 @@ const DATA_CLASS_MAPPING = {
record as DynamoDBRecord,
};

export { EventType, DEFAULT_RESPONSE, DATA_CLASS_MAPPING };
export { EventType, SchemaVendor, DEFAULT_RESPONSE, DATA_CLASS_MAPPING };
Loading