diff --git a/docs/features/batch.md b/docs/features/batch.md index 7c5de6d3da..088deff619 100644 --- a/docs/features/batch.md +++ b/docs/features/batch.md @@ -358,6 +358,9 @@ Available transformers by event type: --8<-- "examples/snippets/batch/samples/parser_SQS.json" ``` +!!! note + If `innerSchema` is used with DynamoDB streams, the schema will be applied to both the `NewImage` and the `OldImage` by default. If you want to have dedicated schemas, see the section below. + #### Using full event schema For complete control over validation, extend the built-in schemas with your custom payload schema. This approach gives you full control over the entire event structure. diff --git a/package-lock.json b/package-lock.json index 505f393b4d..b896b6e615 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9967,14 +9967,14 @@ "license": "MIT" }, "node_modules/tinyglobby": { - "version": "0.2.14", - "resolved": "https://registry.npmjs.org/tinyglobby/-/tinyglobby-0.2.14.tgz", - "integrity": "sha512-tX5e7OM1HnYr2+a2C/4V0htOcSQcoSTH9KgJnVvNm5zm/cyEWKJ7j7YutsH9CxMdtOkkLFy2AHrMci9IM8IPZQ==", + "version": "0.2.15", + "resolved": "https://registry.npmjs.org/tinyglobby/-/tinyglobby-0.2.15.tgz", + "integrity": "sha512-j2Zq4NyQYG5XMST4cbs02Ak8iJUdxRM0XI5QyxXuZOzKOINmWurp3smXu3y5wDcJrptwpSjgXHzIQxR0omXljQ==", "dev": true, "license": "MIT", "dependencies": { - "fdir": "^6.4.4", - "picomatch": "^4.0.2" + "fdir": "^6.5.0", + "picomatch": "^4.0.3" }, "engines": { "node": ">=12.0.0" @@ -9984,11 +9984,14 @@ } }, "node_modules/tinyglobby/node_modules/fdir": { - "version": "6.4.6", - "resolved": "https://registry.npmjs.org/fdir/-/fdir-6.4.6.tgz", - "integrity": "sha512-hiFoqpyZcfNm1yc4u8oWCf9A2c4D3QjCrks3zmoVKVxpQRzmPNar1hUJcBG2RQHvEVGDN+Jm81ZheVLAQMK6+w==", + "version": "6.5.0", + "resolved": "https://registry.npmjs.org/fdir/-/fdir-6.5.0.tgz", + "integrity": "sha512-tIbYtZbucOs0BRGqPJkshJUYdL+SDH7dVM8gjy+ERp3WAUjLEFJE+02kanyHtwjWOnwrKYBiwAmM0p4kLJAnXg==", "dev": true, "license": "MIT", + "engines": { + "node": ">=12.0.0" + }, "peerDependencies": { "picomatch": "^3 || ^4" }, @@ -9999,9 +10002,9 @@ } }, "node_modules/tinyglobby/node_modules/picomatch": { - "version": "4.0.2", - "resolved": "https://registry.npmjs.org/picomatch/-/picomatch-4.0.2.tgz", - "integrity": "sha512-M7BAV6Rlcy5u+m6oPhAPFgJTzAioX/6B0DxyvDlo9l8+T3nLKbrczg2WLUyzd45L8RqfUMyGPzekbMvX2Ldkwg==", + "version": "4.0.3", + "resolved": "https://registry.npmjs.org/picomatch/-/picomatch-4.0.3.tgz", + "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "dev": true, "license": "MIT", "engines": { @@ -10256,24 +10259,24 @@ } }, "node_modules/vite": { - "version": "6.3.5", - "resolved": "https://registry.npmjs.org/vite/-/vite-6.3.5.tgz", - "integrity": "sha512-cZn6NDFE7wdTpINgs++ZJ4N49W2vRp8LCKrn3Ob1kYNtOo21vfDoaV5GzBfLU4MovSAB8uNRm4jgzVQZ+mBzPQ==", + "version": "7.1.5", + "resolved": "https://registry.npmjs.org/vite/-/vite-7.1.5.tgz", + "integrity": "sha512-4cKBO9wR75r0BeIWWWId9XK9Lj6La5X846Zw9dFfzMRw38IlTk2iCcUt6hsyiDRcPidc55ZParFYDXi0nXOeLQ==", "dev": true, "license": "MIT", "dependencies": { "esbuild": "^0.25.0", - "fdir": "^6.4.4", - "picomatch": "^4.0.2", - "postcss": "^8.5.3", - "rollup": "^4.34.9", - "tinyglobby": "^0.2.13" + "fdir": "^6.5.0", + "picomatch": "^4.0.3", + "postcss": "^8.5.6", + "rollup": "^4.43.0", + "tinyglobby": "^0.2.15" }, "bin": { "vite": "bin/vite.js" }, "engines": { - "node": "^18.0.0 || ^20.0.0 || >=22.0.0" + "node": "^20.19.0 || >=22.12.0" }, "funding": { "url": "https://github.com/vitejs/vite?sponsor=1" @@ -10282,14 +10285,14 @@ "fsevents": "~2.3.3" }, "peerDependencies": { - "@types/node": "^18.0.0 || ^20.0.0 || >=22.0.0", + "@types/node": "^20.19.0 || >=22.12.0", "jiti": ">=1.21.0", - "less": "*", + "less": "^4.0.0", "lightningcss": "^1.21.0", - "sass": "*", - "sass-embedded": "*", - "stylus": "*", - "sugarss": "*", + "sass": "^1.70.0", + "sass-embedded": "^1.70.0", + "stylus": ">=0.54.8", + "sugarss": "^5.0.0", "terser": "^5.16.0", "tsx": "^4.8.1", "yaml": "^2.4.2" @@ -10354,11 +10357,14 @@ } }, "node_modules/vite/node_modules/fdir": { - "version": "6.4.6", - "resolved": "https://registry.npmjs.org/fdir/-/fdir-6.4.6.tgz", - "integrity": "sha512-hiFoqpyZcfNm1yc4u8oWCf9A2c4D3QjCrks3zmoVKVxpQRzmPNar1hUJcBG2RQHvEVGDN+Jm81ZheVLAQMK6+w==", + "version": "6.5.0", + "resolved": "https://registry.npmjs.org/fdir/-/fdir-6.5.0.tgz", + "integrity": "sha512-tIbYtZbucOs0BRGqPJkshJUYdL+SDH7dVM8gjy+ERp3WAUjLEFJE+02kanyHtwjWOnwrKYBiwAmM0p4kLJAnXg==", "dev": true, "license": "MIT", + "engines": { + "node": ">=12.0.0" + }, "peerDependencies": { "picomatch": "^3 || ^4" }, @@ -10384,9 +10390,9 @@ } }, "node_modules/vite/node_modules/picomatch": { - "version": "4.0.2", - "resolved": "https://registry.npmjs.org/picomatch/-/picomatch-4.0.2.tgz", - "integrity": "sha512-M7BAV6Rlcy5u+m6oPhAPFgJTzAioX/6B0DxyvDlo9l8+T3nLKbrczg2WLUyzd45L8RqfUMyGPzekbMvX2Ldkwg==", + "version": "4.0.3", + "resolved": "https://registry.npmjs.org/picomatch/-/picomatch-4.0.3.tgz", + "integrity": "sha512-5gTmgEY/sqK6gFXLIsQNH19lWb4ebPDLA4SdLP7dsWkIXHWlG66oPuVvXSGFPppYZz8ZDZq0dYYrbHfBCVUb1Q==", "dev": true, "license": "MIT", "engines": { @@ -10754,11 +10760,13 @@ "version": "2.26.1", "license": "MIT-0", "dependencies": { - "@aws-lambda-powertools/commons": "2.26.1" + "@aws-lambda-powertools/commons": "2.26.1", + "@standard-schema/spec": "^1.0.0" }, "devDependencies": { "@aws-lambda-powertools/parser": "2.26.1", - "@aws-lambda-powertools/testing-utils": "file:../testing" + "@aws-lambda-powertools/testing-utils": "file:../testing", + "zod": "^4.1.8" } }, "packages/commons": { @@ -11021,4 +11029,4 @@ } } } -} +} \ No newline at end of file diff --git a/packages/batch/package.json b/packages/batch/package.json index 41512af4df..e723bbe1d6 100644 --- a/packages/batch/package.json +++ b/packages/batch/package.json @@ -81,10 +81,12 @@ "nodejs" ], "dependencies": { - "@aws-lambda-powertools/commons": "2.26.1" + "@aws-lambda-powertools/commons": "2.26.1", + "@standard-schema/spec": "^1.0.0" }, "devDependencies": { + "@aws-lambda-powertools/parser": "2.26.1", "@aws-lambda-powertools/testing-utils": "file:../testing", - "@aws-lambda-powertools/parser": "2.26.1" + "zod": "^4.1.8" } } \ No newline at end of file diff --git a/packages/batch/src/BasePartialBatchProcessor.ts b/packages/batch/src/BasePartialBatchProcessor.ts index 4d525dbded..21806be424 100644 --- a/packages/batch/src/BasePartialBatchProcessor.ts +++ b/packages/batch/src/BasePartialBatchProcessor.ts @@ -13,7 +13,7 @@ import { } from './constants.js'; import { FullBatchFailureError } from './errors.js'; import type { - BasePartialBatchProcessorParserConfig, + BatchProcessorConfig, EventSourceDataClassTypes, PartialItemFailureResponse, PartialItemFailures, @@ -55,7 +55,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor { /** * The configuration options for the parser integration */ - protected parserConfig?: BasePartialBatchProcessorParserConfig; + protected parserConfig?: BatchProcessorConfig; /** * Initializes base batch processing class @@ -64,7 +64,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor { */ public constructor( eventType: keyof typeof EventType, - parserConfig?: BasePartialBatchProcessorParserConfig + parserConfig?: BatchProcessorConfig ) { super(); this.eventType = eventType; diff --git a/packages/batch/src/BatchProcessor.ts b/packages/batch/src/BatchProcessor.ts index 0e1f2296df..30ae521001 100644 --- a/packages/batch/src/BatchProcessor.ts +++ b/packages/batch/src/BatchProcessor.ts @@ -33,7 +33,7 @@ import type { BaseRecord, FailureResponse, SuccessResponse } from './types.js'; * }); * ``` * - * **Process batch triggered by Kinesis Data Streams* + * **Process batch triggered by Kinesis Data Streams** * * @example * ```typescript @@ -113,6 +113,40 @@ import type { BaseRecord, FailureResponse, SuccessResponse } from './types.js'; * }); * ``` * + * **Process batch with inner schema validation** + * + * @example + * ```typescript + * import { + * BatchProcessor, + * EventType, + * processPartialResponse, + * } from '@aws-lambda-powertools/batch'; + * import { parser } from '@aws-lambda-powertools/batch/parser'; + * import type { SQSHandler } from 'aws-lambda'; + * import { z } from 'zod'; + * + * const myItemSchema = z.object({ name: z.string(), age: z.number() }); + * + * const processor = new BatchProcessor(EventType.SQS, { + * parser, + * innerSchema: myItemSchema, + * transformer: 'json' + * }); + * + * const recordHandler = async (record) => { + * // record is now fully typed and validated + * console.log(record.body.name, record.body.age); + * }; + * + * export const handler: SQSHandler = async (event, context) => + * processPartialResponse(event, recordHandler, processor, { + * context, + * }); + * ``` + * + * Note: If `innerSchema` is used with DynamoDB streams, the schema will be applied to both the NewImage and the OldImage by default. If you want to have separate schema for both, you will need to extend the schema and use the full schema for parsing. + * * @param eventType - The type of event to process (SQS, Kinesis, DynamoDB) */ class BatchProcessor extends BasePartialBatchProcessor { diff --git a/packages/batch/src/parser.ts b/packages/batch/src/parser.ts index fe1cb795a6..cb9374cceb 100644 --- a/packages/batch/src/parser.ts +++ b/packages/batch/src/parser.ts @@ -1,29 +1,30 @@ import type { GenericLogger } from '@aws-lambda-powertools/commons/types'; import type { StandardSchemaV1 } from '@standard-schema/spec'; -import type { ZodType } from 'zod'; import { EventType, SchemaVendor } from './constants.js'; import { ParsingError } from './errors.js'; import type { - BasePartialBatchProcessorParserConfig, + BatchProcessorConfig, EventSourceDataClassTypes, } from './types.js'; /** * Extend the schema according to the event type passed. * - * If useTransformers is true, extend using opinionated transformers. + * If `useTransformers` is true, extend using opinionated transformers. * Otherwise, extend without any transformers. * + * The vendor is already checked at runtime to ensure Zod is being used when required using `StandardSchemaV1['~standard'].vendor`. + * * @param options - The options for creating the extended schema * @param options.eventType - The type of event to process (SQS, Kinesis, DynamoDB) - * @param options.schema - The StandardSchema to be used for parsing + * @param options.innerSchema - The StandardSchema to be used for parsing. To avoid forcing a direct dependency on Zod, we use `unknown` here, which is not ideal but necessary. * @param options.useTransformers - Whether to use transformers for parsing * @param options.logger - A logger instance for logging */ const createExtendedSchema = async (options: { eventType: keyof typeof EventType; - innerSchema: ZodType; - transformer?: BasePartialBatchProcessorParserConfig['transformer']; + innerSchema: unknown; + transformer?: BatchProcessorConfig['transformer']; }) => { const { eventType, innerSchema, transformer } = options; let schema = innerSchema; @@ -32,6 +33,7 @@ const createExtendedSchema = async (options: { const { JSONStringified } = await import( '@aws-lambda-powertools/parser/helpers' ); + // @ts-expect-error - we know it's a Zod schema due to the runtime check earlier schema = JSONStringified(innerSchema); break; } @@ -39,6 +41,7 @@ const createExtendedSchema = async (options: { const { Base64Encoded } = await import( '@aws-lambda-powertools/parser/helpers' ); + // @ts-expect-error - we know it's a Zod schema due to the runtime check earlier schema = Base64Encoded(innerSchema); break; } @@ -46,6 +49,7 @@ const createExtendedSchema = async (options: { const { DynamoDBMarshalled } = await import( '@aws-lambda-powertools/parser/helpers/dynamodb' ); + // @ts-expect-error - we know it's a Zod schema due to the runtime check earlier schema = DynamoDBMarshalled(innerSchema); break; } @@ -73,8 +77,8 @@ const createExtendedSchema = async (options: { ); return DynamoDBStreamRecord.extend({ dynamodb: DynamoDBStreamChangeRecordBase.extend({ - OldImage: schema.optional(), - NewImage: schema.optional(), + OldImage: schema, + NewImage: schema, }), }); }; @@ -101,7 +105,7 @@ const parseWithErrorHandling = async ( const errorMessage = issues .map((issue) => `${issue.path?.join('.')}: ${issue.message}`) .join('; '); - logger.debug(errorMessage); + logger.debug(`Failed to parse record: ${errorMessage}`); throw new ParsingError(errorMessage); }; @@ -111,7 +115,8 @@ const parseWithErrorHandling = async ( * If the passed schema is already an extended schema, * use the schema directly to parse the record. * - * Only Zod Schemas are supported for schema extension. + * Parts of the parser integration within BatchProcessor rely on Zod for schema transformations, + * however some other parts also support other Standard Schema-compatible libraries. * * @param record - The record to be parsed * @param eventType - The type of event to process @@ -122,7 +127,7 @@ const parser = async ( record: EventSourceDataClassTypes, eventType: keyof typeof EventType, logger: Pick, - parserConfig: BasePartialBatchProcessorParserConfig + parserConfig: BatchProcessorConfig ): Promise => { const { schema, innerSchema, transformer } = parserConfig; // If the external schema is specified, use it to parse the record diff --git a/packages/batch/src/types.ts b/packages/batch/src/types.ts index 2f2ad880cb..8cd4fa4575 100644 --- a/packages/batch/src/types.ts +++ b/packages/batch/src/types.ts @@ -7,7 +7,6 @@ import type { SQSRecord, StreamRecord, } from 'aws-lambda'; -import type { ZodType } from 'zod'; import type { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js'; import type { BatchProcessor } from './BatchProcessor.js'; import type { parser } from './parser.js'; @@ -119,7 +118,7 @@ type PartialItemFailureResponse = { batchItemFailures: PartialItemFailures[] }; * @property transformer - Payload transformer (only available with innerSchema) * @property logger - Optional logger for debug/warning messages */ -type BasePartialBatchProcessorParserConfig = +type BatchProcessorConfig = | { /** * Required when using schema parsing - import from `@aws-lambda-powertools/batch/parser` @@ -144,8 +143,12 @@ type BasePartialBatchProcessorParserConfig = schema?: never; /** * Payload-only Zod schema, mutually exclusive with `schema` + * + * @remarks + * Only Zod schemas are supported for `innerSchema` as we rely on Zod's schema extension capabilities. + * If you need to use a different Standard Schema-compatible library, use `schema` instead. */ - innerSchema: ZodType; + innerSchema: StandardSchemaV1; /** * Payload transformer, only available with `innerSchema` */ @@ -247,6 +250,6 @@ export type { FailureResponse, PartialItemFailures, PartialItemFailureResponse, - BasePartialBatchProcessorParserConfig, + BatchProcessorConfig, ParsedRecord, }; diff --git a/packages/batch/tests/helpers/factories.ts b/packages/batch/tests/helpers/factories.ts index bf3b75a7cb..196d7abbd4 100644 --- a/packages/batch/tests/helpers/factories.ts +++ b/packages/batch/tests/helpers/factories.ts @@ -61,6 +61,7 @@ const dynamodbRecordFactory = (body: string): DynamoDBRecord => { eventVersion: '1.0', dynamodb: { Keys: { Id: { N: '101' } }, + OldImage: { Message: { S: body } }, NewImage: { Message: { S: body } }, StreamViewType: 'NEW_AND_OLD_IMAGES', SequenceNumber: seq, diff --git a/packages/batch/tests/unit/parsing.test.ts b/packages/batch/tests/unit/parsing.test.ts index 231ec162a2..f31dc592ee 100644 --- a/packages/batch/tests/unit/parsing.test.ts +++ b/packages/batch/tests/unit/parsing.test.ts @@ -45,339 +45,248 @@ describe('Batch processing with Parser Integration', () => { const successPayload2 = { Message: 'test-2', }; - const failurePayload1 = { - Message: 1, - }; - const sqsRecordHandler = async (parsedRecord: SQSRecord) => { - return parsedRecord.body; - }; - const kinesisRecordHandler = async (parsedRecord: KinesisStreamRecord) => { - return parsedRecord.kinesis.data; - }; - const dynamodbRecordHandler = async (parsedRecord: DynamoDBRecord) => { - return parsedRecord.dynamodb?.NewImage; - }; - const cases = [ - { - description: 'passing Extended Schema', - SQSParserConfig: { - parser, - schema: SqsRecordSchema.extend({ - body: JSONStringified(customSchema), - }), - }, - KinesisParserConfig: { - parser, - schema: KinesisDataStreamRecord.extend({ - kinesis: KinesisDataStreamRecordPayload.extend({ - data: Base64Encoded(customSchema).optional(), - }), - }), - }, - DynamoDBParserConfig: { - parser, - schema: DynamoDBStreamRecord.extend({ - dynamodb: DynamoDBStreamChangeRecordBase.extend({ - NewImage: DynamoDBMarshalled(customSchema).optional(), - }), - }), - }, - }, - { - description: 'passing Internal Schema without transformer property set', - SQSParserConfig: { - parser, - innerSchema: JSONStringified(customSchema), - }, - KinesisParserConfig: { - parser, - innerSchema: Base64Encoded(customSchema), - }, - DynamoDBParserConfig: { - parser, - innerSchema: DynamoDBMarshalled(customSchema), - }, - }, - { - description: 'passing Internal Schema with transformer property set', - SQSParserConfig: { - parser, - innerSchema: customSchema, - transformer: 'json' as const, - }, - KinesisParserConfig: { - parser, - innerSchema: customSchema, - transformer: 'base64' as const, - }, - DynamoDBParserConfig: { - parser, - innerSchema: customSchema, - transformer: 'unmarshall' as const, - }, - }, + const sqsRecordHandler = async (parsedRecord: SQSRecord) => parsedRecord.body; + const sqsRecords = [ + sqsRecordFactory(JSON.stringify(successPayload1)), + sqsRecordFactory(JSON.stringify(successPayload2)), + ]; + const kinesisRecordHandler = async (parsedRecord: KinesisStreamRecord) => + parsedRecord.kinesis.data; + const kinesisRecords = [ + kinesisRecordFactory( + Buffer.from(JSON.stringify(successPayload1)).toString('base64') + ), + kinesisRecordFactory( + Buffer.from(JSON.stringify(successPayload2)).toString('base64') + ), + ]; + const dynamodbRecordHandler = async (parsedRecord: DynamoDBRecord) => + parsedRecord.dynamodb?.NewImage; + const dynamodbRecords = [ + dynamodbRecordFactory(successPayload1.Message), + dynamodbRecordFactory(successPayload2.Message), ]; - describe.each(cases)( - 'SQS Record Schema $description', - ({ SQSParserConfig }) => { - it('completes the processing with no failures and parses the payload before passing to the record handler', async () => { - // Prepare - const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); - const secondRecord = sqsRecordFactory(JSON.stringify(successPayload2)); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.SQS, SQSParserConfig); - - // Act - processor.register(records, sqsRecordHandler); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages).toStrictEqual([ - ['success', successPayload1, firstRecord], - ['success', successPayload2, secondRecord], - ]); - }); - - it('completes the processing with failures if some of the payload does not match the passed schema', async () => { - // Prepare - const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); - const secondRecord = sqsRecordFactory(JSON.stringify(failurePayload1)); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.SQS, SQSParserConfig); - - // Act - processor.register(records, sqsRecordHandler); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages[0]).toStrictEqual([ - 'success', - successPayload1, - firstRecord, - ]); - expect(processor.failureMessages.length).toBe(1); - expect(processor.response()).toStrictEqual({ - batchItemFailures: [{ itemIdentifier: secondRecord.messageId }], - }); - }); - } - ); - - describe.each(cases)( - 'Kinesis Record Schema $description', - ({ KinesisParserConfig }) => { - it('completes the processing with no failures and parses the payload before passing to the record handler', async () => { - // Prepare - const firstRecord = kinesisRecordFactory( - Buffer.from(JSON.stringify(successPayload1)).toString('base64') - ); - const secondRecord = kinesisRecordFactory( - Buffer.from(JSON.stringify(successPayload2)).toString('base64') - ); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor( - EventType.KinesisDataStreams, - KinesisParserConfig - ); - - // Act - processor.register(records, kinesisRecordHandler); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages).toStrictEqual([ - ['success', successPayload1, firstRecord], - ['success', successPayload2, secondRecord], - ]); - }); - - it('completes the processing with failures if some of the payload does not match the passed schema', async () => { - // Prepare - const firstRecord = kinesisRecordFactory( - Buffer.from(JSON.stringify(successPayload1)).toString('base64') - ); - const secondRecord = kinesisRecordFactory( - Buffer.from(JSON.stringify(failurePayload1)).toString('base64') - ); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor( - EventType.KinesisDataStreams, - KinesisParserConfig - ); - - // Act - processor.register(records, kinesisRecordHandler); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages[0]).toStrictEqual([ - 'success', - successPayload1, - firstRecord, - ]); - expect(processor.failureMessages.length).toBe(1); - expect(processor.response()).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: secondRecord.kinesis.sequenceNumber }, - ], - }); - }); - } - ); - - describe.each(cases)( - 'DynamoDB Record Schema $description', - ({ DynamoDBParserConfig }) => { - it('completes the processing with no failures and parses the payload before passing to the record handler', async () => { - // Prepare - const firstRecord = dynamodbRecordFactory(successPayload1.Message); - const secondRecord = dynamodbRecordFactory(successPayload2.Message); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor( - EventType.DynamoDBStreams, - DynamoDBParserConfig - ); - - // Act - processor.register(records, dynamodbRecordHandler); - const processedMessages = await processor.process(); - // Assess - expect(processedMessages).toStrictEqual([ - ['success', successPayload1, firstRecord], - ['success', successPayload2, secondRecord], - ]); - }); + it('uses a custom logger when provided', async () => { + // Prepare + const logger = { + debug: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }; + const unsupportedSchema = object({ + Message: string(), + }); + const processor = new BatchProcessor(EventType.SQS, { + parser, + innerSchema: unsupportedSchema, + transformer: 'json', + logger, + }); + processor.register(sqsRecords, sqsRecordHandler); - it('completes the processing with failures if some of the payload does not match the passed schema', async () => { - // Prepare - vi.stubEnv('AWS_LAMBDA_LOG_LEVEL', 'DEBUG'); - const firstRecord = dynamodbRecordFactory(successPayload1.Message); - //@ts-expect-error Passing an invalid payload for testing - const secondRecord = dynamodbRecordFactory(failurePayload1.Message); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.DynamoDBStreams, { - ...DynamoDBParserConfig, - logger: console, - }); + // Act & Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); + expect(logger.error).toHaveBeenCalledWith( + 'The schema provided is not supported. Only Zod schemas are supported for extension.' + ); + }); - // Act - processor.register(records, dynamodbRecordHandler); - const processedMessages = await processor.process(); + it('completes processing with failures if the schema is not passed for the parsing', async () => { + // Prepare + vi.stubEnv('AWS_LAMBDA_LOG_LEVEL', 'INFO'); + // @ts-expect-error - testing missing required params + const processor = new BatchProcessor(EventType.SQS, { + parser, + transformer: 'json', + }); + processor.register(sqsRecords, sqsRecordHandler); - // Assess - expect(processedMessages[0]).toStrictEqual([ - 'success', - successPayload1, - firstRecord, - ]); - expect(processor.failureMessages.length).toBe(1); - expect(processor.response()).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: secondRecord.dynamodb?.SequenceNumber }, - ], - }); - expect(console.debug).toHaveBeenCalledWith( - 'dynamodb.NewImage.Message: Invalid input: expected string, received number' - ); - }); - } - ); + // Act & Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); + }); - it('completes processing with all failures if an unsupported event type is used for parsing', async () => { + it('does not log debug logs if the lambda log level is set to higher than INFO', async () => { // Prepare - const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); - const secondRecord = sqsRecordFactory(JSON.stringify(successPayload2)); - const records = [firstRecord, secondRecord]; - //@ts-expect-error - const processor = new BatchProcessor('invalid-event-type', { + vi.stubEnv('AWS_LAMBDA_LOG_LEVEL', 'INFO'); + const processor = new BatchProcessor(EventType.SQS, { + parser, innerSchema: customSchema, transformer: 'json', }); + processor.register([sqsRecordFactory('fail')], sqsRecordHandler); - // Act - processor.register(records, sqsRecordHandler); - - // Assess + // Act & Assess await expect(processor.process()).rejects.toThrowError( FullBatchFailureError ); + expect(console.debug).not.toHaveBeenCalled(); }); - it('completes processing with failures if an unsupported schema type is used for parsing', async () => { + it('reports the parsing error if the record does not conform to the schema', async () => { // Prepare - const unsupportedSchema = object({ - Message: string(), - }); - const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); - const secondRecord = sqsRecordFactory(JSON.stringify(successPayload2)); - const records = [firstRecord, secondRecord]; + vi.stubEnv('AWS_LAMBDA_LOG_LEVEL', 'DEBUG'); + const records = [sqsRecordFactory(JSON.stringify({ Invalid: 'invalid' }))]; const processor = new BatchProcessor(EventType.SQS, { parser, - // @ts-expect-error - we are explicitly testing a wrong schema vendor - innerSchema: unsupportedSchema, + innerSchema: customSchema, transformer: 'json', + logger: console, + }); + processor.register(records, sqsRecordHandler, { + throwOnFullBatchFailure: false, }); // Act - processor.register(records, sqsRecordHandler); + const result = await processor.process(); // Assess - await expect(processor.process()).rejects.toThrowError( - FullBatchFailureError + expect(console.debug).toHaveBeenCalledWith( + 'Failed to parse record: body.Message: Invalid input: expected string, received undefined' ); + expect(result).toEqual([ + [ + 'fail', + 'body.Message: Invalid input: expected string, received undefined', + records[0], + ], + ]); }); - it('completes processing with failures if the schema is not passed for the parsing', async () => { + it.each([ + { + case: 'full schema', + records: sqsRecords, + params: { + schema: SqsRecordSchema.extend({ + body: JSONStringified(customSchema), + }), + }, + }, + { + case: 'inner schema', + records: sqsRecords, + params: { + innerSchema: JSONStringified(customSchema), + }, + }, + { + case: 'inner schema with transformer', + records: sqsRecords, + params: { + innerSchema: customSchema, + transformer: 'json' as const, + }, + }, + ])('processes SQS records with $case', async ({ records, params }) => { // Prepare - const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); - const secondRecord = sqsRecordFactory(JSON.stringify(successPayload2)); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.SQS, { parser, - transformer: 'json', + ...params, }); + processor.register(records, sqsRecordHandler); // Act - processor.register(records, sqsRecordHandler); + const result = await processor.process(); // Assess - await expect(processor.process()).rejects.toThrowError( - FullBatchFailureError - ); + expect(result).toEqual([ + ['success', successPayload1, sqsRecords[0]], + ['success', successPayload2, sqsRecords[1]], + ]); }); - it('uses a custom logger when provided', async () => { + it.each([ + { + case: 'full schema', + records: kinesisRecords, + params: { + schema: KinesisDataStreamRecord.extend({ + kinesis: KinesisDataStreamRecordPayload.extend({ + data: Base64Encoded(customSchema), + }), + }), + }, + }, + { + case: 'inner schema', + records: kinesisRecords, + params: { + innerSchema: Base64Encoded(customSchema), + }, + }, + { + case: 'inner schema with transformer', + records: kinesisRecords, + params: { + innerSchema: customSchema, + transformer: 'base64' as const, + }, + }, + ])('processes Kinesis records with $case', async ({ records, params }) => { // Prepare - const logger = { - debug: vi.fn(), - warn: vi.fn(), - error: vi.fn(), - }; - const unsupportedSchema = object({ - Message: string(), + const processor = new BatchProcessor(EventType.KinesisDataStreams, { + parser, + ...params, }); - const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); - const secondRecord = sqsRecordFactory(JSON.stringify(successPayload2)); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.SQS, { + processor.register(records, kinesisRecordHandler); + + // Act + const result = await processor.process(); + + // Assess + expect(result).toEqual([ + ['success', successPayload1, kinesisRecords[0]], + ['success', successPayload2, kinesisRecords[1]], + ]); + }); + + it.each([ + { + case: 'full schema', + records: dynamodbRecords, + params: { + schema: DynamoDBStreamRecord.extend({ + dynamodb: DynamoDBStreamChangeRecordBase.extend({ + NewImage: DynamoDBMarshalled(customSchema), + }), + }), + }, + }, + { + case: 'inner schema', + records: dynamodbRecords, + params: { + innerSchema: DynamoDBMarshalled(customSchema), + }, + }, + { + case: 'inner schema with transformer', + records: dynamodbRecords, + params: { + innerSchema: customSchema, + transformer: 'unmarshall' as const, + }, + }, + ])('processes DynamoDB records with $case', async ({ records, params }) => { + // Prepare + const processor = new BatchProcessor(EventType.DynamoDBStreams, { parser, - // @ts-expect-error - we are explicitly testing a wrong schema vendor - innerSchema: unsupportedSchema, - transformer: 'json', - logger, + ...params, }); + processor.register(records, dynamodbRecordHandler); // Act - processor.register(records, sqsRecordHandler); + const result = await processor.process(); // Assess - await expect(processor.process()).rejects.toThrowError( - FullBatchFailureError - ); - expect(logger.error).toHaveBeenCalledWith( - 'The schema provided is not supported. Only Zod schemas are supported for extension.' - ); + expect(result).toEqual([ + ['success', successPayload1, dynamodbRecords[0]], + ['success', successPayload2, dynamodbRecords[1]], + ]); }); });