Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/features/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,9 @@ Available transformers by event type:
--8<-- "examples/snippets/batch/samples/parser_SQS.json"
```

!!! note
If `innerSchema` is used with DynamoDB streams, the schema will be applied to both the `NewImage` and the `OldImage` by default. If you want to have dedicated schemas, see the section below.

#### Using full event schema

For complete control over validation, extend the built-in schemas with your custom payload schema. This approach gives you full control over the entire event structure.
Expand Down
78 changes: 43 additions & 35 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions packages/batch/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,12 @@
"nodejs"
],
"dependencies": {
"@aws-lambda-powertools/commons": "2.26.1"
"@aws-lambda-powertools/commons": "2.26.1",
"@standard-schema/spec": "^1.0.0"
},
"devDependencies": {
"@aws-lambda-powertools/parser": "2.26.1",
"@aws-lambda-powertools/testing-utils": "file:../testing",
"@aws-lambda-powertools/parser": "2.26.1"
"zod": "^4.1.8"
}
}
6 changes: 3 additions & 3 deletions packages/batch/src/BasePartialBatchProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
} from './constants.js';
import { FullBatchFailureError } from './errors.js';
import type {
BasePartialBatchProcessorParserConfig,
BatchProcessorConfig,
EventSourceDataClassTypes,
PartialItemFailureResponse,
PartialItemFailures,
Expand Down Expand Up @@ -55,7 +55,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
/**
* The configuration options for the parser integration
*/
protected parserConfig?: BasePartialBatchProcessorParserConfig;
protected parserConfig?: BatchProcessorConfig;

/**
* Initializes base batch processing class
Expand All @@ -64,7 +64,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
*/
public constructor(
eventType: keyof typeof EventType,
parserConfig?: BasePartialBatchProcessorParserConfig
parserConfig?: BatchProcessorConfig
) {
super();
this.eventType = eventType;
Expand Down
36 changes: 35 additions & 1 deletion packages/batch/src/BatchProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import type { BaseRecord, FailureResponse, SuccessResponse } from './types.js';
* });
* ```
*
* **Process batch triggered by Kinesis Data Streams*
* **Process batch triggered by Kinesis Data Streams**
*
* @example
* ```typescript
Expand Down Expand Up @@ -113,6 +113,40 @@ import type { BaseRecord, FailureResponse, SuccessResponse } from './types.js';
* });
* ```
*
* **Process batch with inner schema validation**
*
* @example
* ```typescript
* import {
* BatchProcessor,
* EventType,
* processPartialResponse,
* } from '@aws-lambda-powertools/batch';
* import { parser } from '@aws-lambda-powertools/batch/parser';
* import type { SQSHandler } from 'aws-lambda';
* import { z } from 'zod';
*
* const myItemSchema = z.object({ name: z.string(), age: z.number() });
*
* const processor = new BatchProcessor(EventType.SQS, {
* parser,
* innerSchema: myItemSchema,
* transformer: 'json'
* });
*
* const recordHandler = async (record) => {
* // record is now fully typed and validated
* console.log(record.body.name, record.body.age);
* };
*
* export const handler: SQSHandler = async (event, context) =>
* processPartialResponse(event, recordHandler, processor, {
* context,
* });
* ```
*
* Note: If `innerSchema` is used with DynamoDB streams, the schema will be applied to both the NewImage and the OldImage by default. If you want to have separate schema for both, you will need to extend the schema and use the full schema for parsing.
*
* @param eventType - The type of event to process (SQS, Kinesis, DynamoDB)
*/
class BatchProcessor extends BasePartialBatchProcessor {
Expand Down
27 changes: 16 additions & 11 deletions packages/batch/src/parser.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,30 @@
import type { GenericLogger } from '@aws-lambda-powertools/commons/types';
import type { StandardSchemaV1 } from '@standard-schema/spec';
import type { ZodType } from 'zod';
import { EventType, SchemaVendor } from './constants.js';
import { ParsingError } from './errors.js';
import type {
BasePartialBatchProcessorParserConfig,
BatchProcessorConfig,
EventSourceDataClassTypes,
} from './types.js';

/**
* Extend the schema according to the event type passed.
*
* If useTransformers is true, extend using opinionated transformers.
* If `useTransformers` is true, extend using opinionated transformers.
* Otherwise, extend without any transformers.
*
* The vendor is already checked at runtime to ensure Zod is being used when required using `StandardSchemaV1['~standard'].vendor`.
*
* @param options - The options for creating the extended schema
* @param options.eventType - The type of event to process (SQS, Kinesis, DynamoDB)
* @param options.schema - The StandardSchema to be used for parsing
* @param options.innerSchema - The StandardSchema to be used for parsing. To avoid forcing a direct dependency on Zod, we use `unknown` here, which is not ideal but necessary.
* @param options.useTransformers - Whether to use transformers for parsing
* @param options.logger - A logger instance for logging
*/
const createExtendedSchema = async (options: {
eventType: keyof typeof EventType;
innerSchema: ZodType;
transformer?: BasePartialBatchProcessorParserConfig['transformer'];
innerSchema: unknown;
transformer?: BatchProcessorConfig['transformer'];
}) => {
const { eventType, innerSchema, transformer } = options;
let schema = innerSchema;
Expand All @@ -32,20 +33,23 @@ const createExtendedSchema = async (options: {
const { JSONStringified } = await import(
'@aws-lambda-powertools/parser/helpers'
);
// @ts-expect-error - we know it's a Zod schema due to the runtime check earlier
schema = JSONStringified(innerSchema);
break;
}
case 'base64': {
const { Base64Encoded } = await import(
'@aws-lambda-powertools/parser/helpers'
);
// @ts-expect-error - we know it's a Zod schema due to the runtime check earlier
schema = Base64Encoded(innerSchema);
break;
}
case 'unmarshall': {
const { DynamoDBMarshalled } = await import(
'@aws-lambda-powertools/parser/helpers/dynamodb'
);
// @ts-expect-error - we know it's a Zod schema due to the runtime check earlier
schema = DynamoDBMarshalled(innerSchema);
break;
}
Expand Down Expand Up @@ -73,8 +77,8 @@ const createExtendedSchema = async (options: {
);
return DynamoDBStreamRecord.extend({
dynamodb: DynamoDBStreamChangeRecordBase.extend({
OldImage: schema.optional(),
NewImage: schema.optional(),
OldImage: schema,
NewImage: schema,
}),
});
};
Expand All @@ -101,7 +105,7 @@ const parseWithErrorHandling = async (
const errorMessage = issues
.map((issue) => `${issue.path?.join('.')}: ${issue.message}`)
.join('; ');
logger.debug(errorMessage);
logger.debug(`Failed to parse record: ${errorMessage}`);
throw new ParsingError(errorMessage);
};

Expand All @@ -111,7 +115,8 @@ const parseWithErrorHandling = async (
* If the passed schema is already an extended schema,
* use the schema directly to parse the record.
*
* Only Zod Schemas are supported for schema extension.
* Parts of the parser integration within BatchProcessor rely on Zod for schema transformations,
* however some other parts also support other Standard Schema-compatible libraries.
*
* @param record - The record to be parsed
* @param eventType - The type of event to process
Expand All @@ -122,7 +127,7 @@ const parser = async (
record: EventSourceDataClassTypes,
eventType: keyof typeof EventType,
logger: Pick<GenericLogger, 'debug' | 'warn' | 'error'>,
parserConfig: BasePartialBatchProcessorParserConfig
parserConfig: BatchProcessorConfig
): Promise<EventSourceDataClassTypes> => {
const { schema, innerSchema, transformer } = parserConfig;
// If the external schema is specified, use it to parse the record
Expand Down
Loading