Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/features/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,9 @@ Available transformers by event type:
--8<-- "examples/snippets/batch/samples/parser_SQS.json"
```

!!! note
If `innerSchema` is used with DynamoDB streams, the schema will be applied to both the `NewImage` and the `OldImage` by default. If you want to have separate schema for both, you will need to extend the schema and use the full schema for parsing.

#### Using full event schema

For complete control over validation, extend the built-in schemas with your custom payload schema. This approach gives you full control over the entire event structure.
Expand Down
78 changes: 43 additions & 35 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions packages/batch/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,12 @@
"nodejs"
],
"dependencies": {
"@aws-lambda-powertools/commons": "2.26.1"
"@aws-lambda-powertools/commons": "2.26.1",
"@standard-schema/spec": "^1.0.0"
},
"devDependencies": {
"@aws-lambda-powertools/parser": "2.26.1",
"@aws-lambda-powertools/testing-utils": "file:../testing",
"@aws-lambda-powertools/parser": "2.26.1"
"zod": "^4.1.8"
}
}
6 changes: 3 additions & 3 deletions packages/batch/src/BasePartialBatchProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
} from './constants.js';
import { FullBatchFailureError } from './errors.js';
import type {
BasePartialBatchProcessorParserConfig,
BatchProcessorConfig,
EventSourceDataClassTypes,
PartialItemFailureResponse,
PartialItemFailures,
Expand Down Expand Up @@ -55,7 +55,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
/**
* The configuration options for the parser integration
*/
protected parserConfig?: BasePartialBatchProcessorParserConfig;
protected parserConfig?: BatchProcessorConfig;

/**
* Initializes base batch processing class
Expand All @@ -64,7 +64,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
*/
public constructor(
eventType: keyof typeof EventType,
parserConfig?: BasePartialBatchProcessorParserConfig
parserConfig?: BatchProcessorConfig
) {
super();
this.eventType = eventType;
Expand Down
35 changes: 34 additions & 1 deletion packages/batch/src/BatchProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import type { BaseRecord, FailureResponse, SuccessResponse } from './types.js';
* });
* ```
*
* **Process batch triggered by Kinesis Data Streams*
* **Process batch triggered by Kinesis Data Streams**
*
* @example
* ```typescript
Expand Down Expand Up @@ -113,6 +113,39 @@ import type { BaseRecord, FailureResponse, SuccessResponse } from './types.js';
* });
* ```
*
* **Process batch with inner schema validation**
*
* @example
* ```typescript
* import {
* BatchProcessor,
* EventType,
* processPartialResponse,
* } from '@aws-lambda-powertools/batch';
* import { parser } from '@aws-lambda-powertools/batch/parser';
* import type { SQSHandler } from 'aws-lambda';
* import { z } from 'zod';
*
* const myItemSchema = z.object({ name: z.string(), age: z.number() });
*
* const processor = new BatchProcessor(EventType.SQS, {
* parser,
* innerSchema: myItemSchema,
* });
*
* const recordHandler = async (record) => {
* // record is now fully typed and validated
* console.log(record.body.name, record.body.age);
* };
*
* export const handler: SQSHandler = async (event, context) =>
* processPartialResponse(event, recordHandler, processor, {
* context,
* });
* ```
*
* Note: If `innerSchema` is used with DynamoDB streams, the schema will be applied to both the NewImage and the OldImage by default. If you want to have separate schema for both, you will need to extend the schema and use the full schema for parsing.
*
* @param eventType - The type of event to process (SQS, Kinesis, DynamoDB)
*/
class BatchProcessor extends BasePartialBatchProcessor {
Expand Down
28 changes: 18 additions & 10 deletions packages/batch/src/parser.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
import type { GenericLogger } from '@aws-lambda-powertools/commons/types';
import type { StandardSchemaV1 } from '@standard-schema/spec';
import type { ZodType } from 'zod';
import { EventType, SchemaVendor } from './constants.js';
import { ParsingError } from './errors.js';
import type {
BasePartialBatchProcessorParserConfig,
BatchProcessorConfig,
EventSourceDataClassTypes,
RuntimeZodType,
} from './types.js';

/**
* Type guard to check if the schema is a Zod schema
*
* @param schema - The schema to check
*/
const isZodSchema = (schema: StandardSchemaV1): schema is RuntimeZodType =>
schema['~standard'].vendor === SchemaVendor.Zod;

/**
* Extend the schema according to the event type passed.
*
* If useTransformers is true, extend using opinionated transformers.
* If `useTransformers` is true, extend using opinionated transformers.
* Otherwise, extend without any transformers.
*
* @param options - The options for creating the extended schema
Expand All @@ -22,8 +30,8 @@ import type {
*/
const createExtendedSchema = async (options: {
eventType: keyof typeof EventType;
innerSchema: ZodType;
transformer?: BasePartialBatchProcessorParserConfig['transformer'];
innerSchema: RuntimeZodType;
transformer?: BatchProcessorConfig['transformer'];
}) => {
const { eventType, innerSchema, transformer } = options;
let schema = innerSchema;
Expand Down Expand Up @@ -73,8 +81,8 @@ const createExtendedSchema = async (options: {
);
return DynamoDBStreamRecord.extend({
dynamodb: DynamoDBStreamChangeRecordBase.extend({
OldImage: schema.optional(),
NewImage: schema.optional(),
OldImage: schema,
NewImage: schema,
}),
});
};
Expand All @@ -101,7 +109,7 @@ const parseWithErrorHandling = async (
const errorMessage = issues
.map((issue) => `${issue.path?.join('.')}: ${issue.message}`)
.join('; ');
logger.debug(errorMessage);
logger.debug(`Failed to parse record: ${errorMessage}`);
throw new ParsingError(errorMessage);
};

Expand All @@ -122,7 +130,7 @@ const parser = async (
record: EventSourceDataClassTypes,
eventType: keyof typeof EventType,
logger: Pick<GenericLogger, 'debug' | 'warn' | 'error'>,
parserConfig: BasePartialBatchProcessorParserConfig
parserConfig: BatchProcessorConfig
): Promise<EventSourceDataClassTypes> => {
const { schema, innerSchema, transformer } = parserConfig;
// If the external schema is specified, use it to parse the record
Expand All @@ -131,7 +139,7 @@ const parser = async (
}
if (innerSchema) {
// Only proceed with schema extension if it's a Zod schema
if (innerSchema['~standard'].vendor !== SchemaVendor.Zod) {
if (!isZodSchema(innerSchema)) {
logger.error(
'The schema provided is not supported. Only Zod schemas are supported for extension.'
);
Expand Down
Loading
Loading