From 0160de47c684cefb4169b2c0366040e439cc2d50 Mon Sep 17 00:00:00 2001 From: Swopnil Dangol Date: Thu, 11 Sep 2025 07:37:15 +0100 Subject: [PATCH 1/8] Refactored the code to make the API simpler for better error handling --- .../batch/src/BasePartialBatchProcessor.ts | 13 +- packages/batch/src/BatchProcessor.ts | 196 ++++++++-------- packages/batch/src/types.ts | 37 ++- .../batch/tests/unit/BatchProcessor.test.ts | 221 ++++++++++-------- 4 files changed, 258 insertions(+), 209 deletions(-) diff --git a/packages/batch/src/BasePartialBatchProcessor.ts b/packages/batch/src/BasePartialBatchProcessor.ts index 529a3870d9..0c1902a1be 100644 --- a/packages/batch/src/BasePartialBatchProcessor.ts +++ b/packages/batch/src/BasePartialBatchProcessor.ts @@ -1,4 +1,3 @@ -import type { StandardSchemaV1 } from '@standard-schema/spec'; import type { DynamoDBRecord, KinesisStreamRecord, @@ -12,7 +11,7 @@ import { } from './constants.js'; import { FullBatchFailureError } from './errors.js'; import type { - BasePartialBatchProcessorConfig, + BasePartialBatchProcessorParserConfig, EventSourceDataClassTypes, PartialItemFailureResponse, PartialItemFailures, @@ -45,9 +44,9 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor { public eventType: keyof typeof EventType; /** - * The schema of the body of the event record for parsing + * The configuration options for the parser integration */ - protected schema?: StandardSchemaV1; + protected parserConfig?: BasePartialBatchProcessorParserConfig; /** * Initializes base batch processing class @@ -56,7 +55,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor { */ public constructor( eventType: keyof typeof EventType, - config?: BasePartialBatchProcessorConfig + parserConfig?: BasePartialBatchProcessorParserConfig ) { super(); this.eventType = eventType; @@ -66,9 +65,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor { [EventType.KinesisDataStreams]: () => this.collectKinesisFailures(), [EventType.DynamoDBStreams]: () => this.collectDynamoDBFailures(), }; - if (config) { - this.schema = config.schema; - } + this.parserConfig = parserConfig; } /** diff --git a/packages/batch/src/BatchProcessor.ts b/packages/batch/src/BatchProcessor.ts index f6221e9e64..466c29eea5 100644 --- a/packages/batch/src/BatchProcessor.ts +++ b/packages/batch/src/BatchProcessor.ts @@ -4,6 +4,7 @@ import { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js'; import { EventType, SchemaVendor } from './constants.js'; import { BatchProcessingError } from './errors.js'; import type { + BasePartialBatchProcessorParserConfig, BaseRecord, EventSourceDataClassTypes, FailureResponse, @@ -108,13 +109,9 @@ class BatchProcessor extends BasePartialBatchProcessor { record: BaseRecord ): Promise { try { - const recordToProcess = - this.schema == null - ? record - : await this.#parseRecord(record, this.eventType, this.schema); + const recordToProcess = await this.#parseRecord(record, this.eventType); const data = this.toBatchType(recordToProcess, this.eventType); const result = await this.handler(data, this.options?.context); - return this.successHandler(record, result); } catch (error) { return this.failureHandler(record, error as Error); @@ -146,69 +143,53 @@ class BatchProcessor extends BasePartialBatchProcessor { */ async #createExtendedSchema(options: { eventType: keyof typeof EventType; - schema: StandardSchemaV1; - useTransformers: boolean; + innerSchema: StandardSchemaV1; + transformer?: BasePartialBatchProcessorParserConfig['transformer']; }) { - const { eventType, schema, useTransformers } = options; + const { eventType, innerSchema, transformer } = options; + let schema = innerSchema; + switch (transformer) { + case 'json': { + const { JSONStringified } = await import( + '@aws-lambda-powertools/parser/helpers' + ); + schema = JSONStringified(innerSchema as any); + break; + } + case 'base64': { + const { Base64Encoded } = await import( + '@aws-lambda-powertools/parser/helpers' + ); + schema = Base64Encoded(innerSchema as any); + break; + } + case 'unmarshall': { + const { DynamoDBMarshalled } = await import( + '@aws-lambda-powertools/parser/helpers/dynamodb' + ); + schema = DynamoDBMarshalled(innerSchema as any); + break; + } + } switch (eventType) { case EventType.SQS: { - if (useTransformers) { - const [{ JSONStringified }, { SqsRecordSchema }] = await Promise.all([ - import('@aws-lambda-powertools/parser/helpers'), - import('@aws-lambda-powertools/parser/schemas/sqs'), - ]); - return SqsRecordSchema.extend({ - body: JSONStringified(schema as any), - }); - } const { SqsRecordSchema } = await import( '@aws-lambda-powertools/parser/schemas/sqs' ); - return SqsRecordSchema.extend({ body: schema }); + return SqsRecordSchema.extend({ + body: schema, + }); } - case EventType.KinesisDataStreams: { - if (useTransformers) { - const [ - { Base64Encoded }, - { KinesisDataStreamRecord, KinesisDataStreamRecordPayload }, - ] = await Promise.all([ - import('@aws-lambda-powertools/parser/helpers'), - import('@aws-lambda-powertools/parser/schemas/kinesis'), - ]); - return KinesisDataStreamRecord.extend({ - kinesis: KinesisDataStreamRecordPayload.extend({ - data: Base64Encoded(schema as any), - }), - }); - } const { KinesisDataStreamRecord, KinesisDataStreamRecordPayload } = await import('@aws-lambda-powertools/parser/schemas/kinesis'); return KinesisDataStreamRecord.extend({ - kinesis: KinesisDataStreamRecordPayload.extend({ data: schema }), + kinesis: KinesisDataStreamRecordPayload.extend({ + data: schema, + }), }); } - case EventType.DynamoDBStreams: { - if (useTransformers) { - const [ - { DynamoDBMarshalled }, - { DynamoDBStreamRecord, DynamoDBStreamChangeRecordBase }, - ] = await Promise.all([ - import('@aws-lambda-powertools/parser/helpers/dynamodb'), - import('@aws-lambda-powertools/parser/schemas/dynamodb'), - ]); - return DynamoDBStreamRecord.extend({ - dynamodb: DynamoDBStreamChangeRecordBase.extend({ - OldImage: DynamoDBMarshalled( - schema as any - ).optional(), - NewImage: DynamoDBMarshalled( - schema as any - ).optional(), - }), - }); - } const { DynamoDBStreamRecord, DynamoDBStreamChangeRecordBase } = await import('@aws-lambda-powertools/parser/schemas/dynamodb'); return DynamoDBStreamRecord.extend({ @@ -218,7 +199,6 @@ class BatchProcessor extends BasePartialBatchProcessor { }), }); } - default: { console.warn( `The event type provided is not supported. Supported events: ${Object.values(EventType).join(',')}` @@ -238,57 +218,77 @@ class BatchProcessor extends BasePartialBatchProcessor { * * @param record - The record to be parsed * @param eventType - The type of event to process - * @param schema - The StandardSchema to be used for parsing */ async #parseRecord( record: EventSourceDataClassTypes, - eventType: keyof typeof EventType, - schema: StandardSchemaV1 + eventType: keyof typeof EventType ): Promise { - const { parse } = await import('@aws-lambda-powertools/parser'); - // Try parsing with the original schema first - const extendedSchemaParsing = parse(record, undefined, schema, true); - if (extendedSchemaParsing.success) { - return extendedSchemaParsing.data as EventSourceDataClassTypes; + if (this.parserConfig == null) { + return record; } - // Only proceed with schema extension if it's a Zod schema - if (schema['~standard'].vendor !== SchemaVendor.Zod) { - console.warn( - 'The schema provided is not supported. Only Zod schemas are supported for extension.' + const { parse } = await import('@aws-lambda-powertools/parser'); + const { schema, innerSchema, transformer } = this.parserConfig; + // If the external schema is specified, use it to parse the record + if (schema != null) { + const extendedSchemaParsing = parse(record, undefined, schema, true); + if (extendedSchemaParsing.success) { + return extendedSchemaParsing.data as EventSourceDataClassTypes; + } + const issues = extendedSchemaParsing.error + .cause as ReadonlyArray; + throw new Error( + `Failed to parse record: ${issues.map((issue) => `${issue?.path?.join('.')}: ${issue.message}`).join('; ')} ` ); - throw new Error('Unsupported schema type'); - } - // Handle schema extension based on event type - // Try without transformers first, then with transformers - const schemaWithoutTransformers = await this.#createExtendedSchema({ - eventType, - schema, - useTransformers: false, - }); - const schemaWithoutTransformersParsing = parse( - record, - undefined, - schemaWithoutTransformers, - true - ); - if (schemaWithoutTransformersParsing.success) { - return schemaWithoutTransformersParsing.data as EventSourceDataClassTypes; } - const schemaWithTransformers = await this.#createExtendedSchema({ - eventType, - schema, - useTransformers: true, - }); - const schemaWithTransformersParsing = parse( - record, - undefined, - schemaWithTransformers, - true - ); - if (schemaWithTransformersParsing.success) { - return schemaWithTransformersParsing.data as EventSourceDataClassTypes; + if (innerSchema != null) { + // Only proceed with schema extension if it's a Zod schema + if (innerSchema['~standard'].vendor !== SchemaVendor.Zod) { + console.warn( + 'The schema provided is not supported. Only Zod schemas are supported for extension.' + ); + throw new Error('Unsupported schema type'); + } + if (transformer != null) { + const schemaWithTransformers = await this.#createExtendedSchema({ + eventType, + innerSchema, + transformer, + }); + const schemaWithTransformersParsing = parse( + record, + undefined, + schemaWithTransformers, + true + ); + if (schemaWithTransformersParsing.success) { + return schemaWithTransformersParsing.data as EventSourceDataClassTypes; + } + const issues = schemaWithTransformersParsing.error + .cause as ReadonlyArray; + throw new Error( + `Failed to parse record: ${issues.map((issue) => `${issue?.path?.join('.')}: ${issue.message}`).join('; ')} ` + ); + } + const schemaWithoutTransformers = await this.#createExtendedSchema({ + eventType, + innerSchema, + }); + const schemaWithoutTransformersParsing = parse( + record, + undefined, + schemaWithoutTransformers, + true + ); + if (schemaWithoutTransformersParsing.success) { + return schemaWithoutTransformersParsing.data as EventSourceDataClassTypes; + } + const issues = schemaWithoutTransformersParsing.error + .cause as ReadonlyArray; + throw new Error( + `Failed to parse record: ${issues.map((issue) => `${issue?.path?.join('.')}: ${issue.message}`).join('; ')} ` + ); } - throw new Error('Failed to parse record'); + throw new Error('Either schema or innerSchema is required for parsing'); } } diff --git a/packages/batch/src/types.ts b/packages/batch/src/types.ts index b5781c44c8..34f722c3fd 100644 --- a/packages/batch/src/types.ts +++ b/packages/batch/src/types.ts @@ -5,7 +5,7 @@ import type { KinesisStreamRecord, SQSRecord, } from 'aws-lambda'; - +import type { GenericLogger } from '../../commons/lib/esm/types/GenericLogger.js'; import type { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js'; import type { SqsFifoPartialProcessor } from './SqsFifoPartialProcessor.js'; import type { SqsFifoPartialProcessorAsync } from './SqsFifoPartialProcessorAsync.js'; @@ -92,18 +92,37 @@ type PartialItemFailures = { itemIdentifier: string }; type PartialItemFailureResponse = { batchItemFailures: PartialItemFailures[] }; /** - * Type representing the configuration options passed to the BasePartialBatchProcessor class. + * Type representing the parser configuration options passed to the BasePartialBatchProcessor class. * * @property schema - The schema to be used for parsing + * @property innerSchema - The schema for the inner payload + * @property transformer - The transformer to be used for parsing the payload + * @property logger - The logger to be used for logging debug and warning messages. */ -type BasePartialBatchProcessorConfig = { +type BasePartialBatchProcessorParserConfig = { + /** + * The schema for the full event including the extended inner payload schema. + * + * StandardSchema is supported. + */ + schema?: StandardSchemaV1; + /** + * The schema for the inner payload of the event. + * Only Zod schemas are supported. + */ + innerSchema?: StandardSchemaV1; + /** + * The transformer to be used for parsing the payload. + * Supported transformers are: + * 1. 'json': Uses JSONStringified helper + * 2. 'base64': Uses Base64Encoded helper + * 3. 'unmarshall': Uses DynamoDBMarshalled helper + */ + transformer?: 'json' | 'base64' | 'unmarshall'; /** - * The schema be either of the following: - * 1. An internal schema of the payload of the supported event types. - * 2. An internal schema along with helper transformer functions. - * 3. An extended schema of the supported event type. + * The logger to be used for logging debug and warning messages. */ - schema: StandardSchemaV1; + logger?: Pick; }; export type { @@ -114,5 +133,5 @@ export type { FailureResponse, PartialItemFailures, PartialItemFailureResponse, - BasePartialBatchProcessorConfig, + BasePartialBatchProcessorParserConfig, }; diff --git a/packages/batch/tests/unit/BatchProcessor.test.ts b/packages/batch/tests/unit/BatchProcessor.test.ts index 020038b3fc..4d5d14312e 100644 --- a/packages/batch/tests/unit/BatchProcessor.test.ts +++ b/packages/batch/tests/unit/BatchProcessor.test.ts @@ -336,19 +336,19 @@ describe('Class: AsyncBatchProcessor', () => { const cases = [ { description: 'passing Extended Schema', - SQS: { + SQSParserConfig: { schema: SqsRecordSchema.extend({ body: JSONStringified(customSchema), }), }, - Kinesis: { + KinesisParserConfig: { schema: KinesisDataStreamRecord.extend({ kinesis: KinesisDataStreamRecordPayload.extend({ data: Base64Encoded(customSchema).optional(), }), }), }, - DynamoDB: { + DynamoDBParserConfig: { schema: DynamoDBStreamRecord.extend({ dynamodb: DynamoDBStreamChangeRecordBase.extend({ NewImage: DynamoDBMarshalled(customSchema).optional(), @@ -357,99 +357,105 @@ describe('Class: AsyncBatchProcessor', () => { }, }, { - description: 'passing Internal Schema without transformers', - SQS: { - schema: customSchema, + description: 'passing Internal Schema without transformer property set', + SQSParserConfig: { + innerSchema: JSONStringified(customSchema), }, - Kinesis: { - schema: customSchema, + KinesisParserConfig: { + innerSchema: Base64Encoded(customSchema), }, - DynamoDB: { - schema: customSchema, + DynamoDBParserConfig: { + innerSchema: DynamoDBMarshalled(customSchema), }, }, { - description: 'passing Internal Schema with transformers', - SQS: { - schema: JSONStringified(customSchema), + description: 'passing Internal Schema with transformer property set', + SQSParserConfig: { + innerSchema: customSchema, + transformer: 'json' as const, }, - Kinesis: { - schema: Base64Encoded(customSchema), + KinesisParserConfig: { + innerSchema: customSchema, + transformer: 'base64' as const, }, - DynamoDB: { - schema: DynamoDBMarshalled(customSchema), + DynamoDBParserConfig: { + innerSchema: customSchema, + transformer: 'unmarshall' as const, }, }, ]; - describe.each(cases)('SQS Record Schema $description', ({ SQS }) => { - it('completes the processing with no failures and parses the payload before passing to the record handler', async () => { - // Prepare - const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); - const secondRecord = sqsRecordFactory(JSON.stringify(successPayload2)); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.SQS, { - schema: SQS.schema, - }); - - // Act - processor.register(records, sqsRecordHandler, options); - const processedMessages = await processor.process(); + describe.each(cases)( + 'SQS Record Schema $description', + ({ SQSParserConfig }) => { + it('completes the processing with no failures and parses the payload before passing to the record handler', async () => { + // Prepare + const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); + const secondRecord = sqsRecordFactory( + JSON.stringify(successPayload2) + ); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.SQS, SQSParserConfig); - // Assess - expect(processedMessages).toStrictEqual([ - ['success', successPayload1, firstRecord], - ['success', successPayload2, secondRecord], - ]); - }); + // Act + processor.register(records, sqsRecordHandler, options); + const processedMessages = await processor.process(); - it('completes the processing with failures if some of the payload does not match the passed schema', async () => { - // Prepare - const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); - const secondRecord = sqsRecordFactory(JSON.stringify(failurePayload1)); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.SQS, { - schema: SQS.schema, + // Assess + expect(processedMessages).toStrictEqual([ + ['success', successPayload1, firstRecord], + ['success', successPayload2, secondRecord], + ]); }); - // Act - processor.register(records, sqsRecordHandler, options); - const processedMessages = await processor.process(); + it('completes the processing with failures if some of the payload does not match the passed schema', async () => { + // Prepare + const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); + const secondRecord = sqsRecordFactory( + JSON.stringify(failurePayload1) + ); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.SQS, SQSParserConfig); - // Assess - expect(processedMessages[0]).toStrictEqual([ - 'success', - successPayload1, - firstRecord, - ]); - expect(processor.failureMessages.length).toBe(1); - expect(processor.response()).toStrictEqual({ - batchItemFailures: [{ itemIdentifier: secondRecord.messageId }], + // Act + processor.register(records, sqsRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages[0]).toStrictEqual([ + 'success', + successPayload1, + firstRecord, + ]); + expect(processor.failureMessages.length).toBe(1); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [{ itemIdentifier: secondRecord.messageId }], + }); }); - }); - it('completes processing with all failures if all the payload does not match the passed schema', async () => { - // Prepare - const firstRecord = sqsRecordFactory(JSON.stringify(failurePayload1)); - const secondRecord = sqsRecordFactory(JSON.stringify(failurePayload2)); + it('completes processing with all failures if all the payload does not match the passed schema', async () => { + // Prepare + const firstRecord = sqsRecordFactory(JSON.stringify(failurePayload1)); + const secondRecord = sqsRecordFactory( + JSON.stringify(failurePayload2) + ); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.SQS, { - schema: SQS.schema, - }); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.SQS, SQSParserConfig); - // Act - processor.register(records, sqsRecordHandler, options); + // Act + processor.register(records, sqsRecordHandler, options); - // Assess - await expect(processor.process()).rejects.toThrowError( - FullBatchFailureError - ); - }); - }); + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); + }); + } + ); describe.each(cases)( 'Kinesis Record Schema $description', - ({ Kinesis }) => { + ({ KinesisParserConfig }) => { it('completes the processing with no failures and parses the payload before passing to the record handler', async () => { // Prepare const firstRecord = kinesisRecordFactory( @@ -459,9 +465,10 @@ describe('Class: AsyncBatchProcessor', () => { Buffer.from(JSON.stringify(successPayload2)).toString('base64') ); const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.KinesisDataStreams, { - schema: Kinesis.schema, - }); + const processor = new BatchProcessor( + EventType.KinesisDataStreams, + KinesisParserConfig + ); // Act processor.register(records, kinesisRecordHandler, options); @@ -483,9 +490,10 @@ describe('Class: AsyncBatchProcessor', () => { Buffer.from(JSON.stringify(failurePayload1)).toString('base64') ); const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.KinesisDataStreams, { - schema: Kinesis.schema, - }); + const processor = new BatchProcessor( + EventType.KinesisDataStreams, + KinesisParserConfig + ); // Act processor.register(records, kinesisRecordHandler, options); @@ -515,9 +523,10 @@ describe('Class: AsyncBatchProcessor', () => { ); const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.KinesisDataStreams, { - schema: Kinesis.schema, - }); + const processor = new BatchProcessor( + EventType.KinesisDataStreams, + KinesisParserConfig + ); // Act processor.register(records, sqsRecordHandler, options); @@ -532,15 +541,16 @@ describe('Class: AsyncBatchProcessor', () => { describe.each(cases)( 'DynamoDB Record Schema $description', - ({ DynamoDB }) => { + ({ DynamoDBParserConfig }) => { it('completes the processing with no failures and parses the payload before passing to the record handler', async () => { // Prepare const firstRecord = dynamodbRecordFactory(successPayload1.Message); const secondRecord = dynamodbRecordFactory(successPayload2.Message); const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.DynamoDBStreams, { - schema: DynamoDB.schema, - }); + const processor = new BatchProcessor( + EventType.DynamoDBStreams, + DynamoDBParserConfig + ); // Act processor.register(records, dynamodbRecordHandler, options); @@ -559,9 +569,10 @@ describe('Class: AsyncBatchProcessor', () => { //@ts-expect-error Passing an invalid payload for testing const secondRecord = dynamodbRecordFactory(failurePayload1.Message); const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.DynamoDBStreams, { - schema: DynamoDB.schema, - }); + const processor = new BatchProcessor( + EventType.DynamoDBStreams, + DynamoDBParserConfig + ); // Act processor.register(records, dynamodbRecordHandler, options); @@ -588,9 +599,10 @@ describe('Class: AsyncBatchProcessor', () => { //@ts-expect-error Passing an invalid payload for testing const secondRecord = dynamodbRecordFactory(failurePayload2.Message); const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.DynamoDBStreams, { - schema: DynamoDB.schema, - }); + const processor = new BatchProcessor( + EventType.DynamoDBStreams, + DynamoDBParserConfig + ); // Act processor.register(records, dynamodbRecordHandler, options); @@ -610,7 +622,8 @@ describe('Class: AsyncBatchProcessor', () => { const records = [firstRecord, secondRecord]; //@ts-expect-error const processor = new BatchProcessor('invalid-event-type', { - schema: customSchema, + innerSchema: customSchema, + transformer: 'json', }); // Act @@ -631,7 +644,27 @@ describe('Class: AsyncBatchProcessor', () => { const secondRecord = sqsRecordFactory(JSON.stringify(successPayload2)); const records = [firstRecord, secondRecord]; const processor = new BatchProcessor(EventType.SQS, { - schema: unsupportedSchema, + innerSchema: unsupportedSchema, + transformer: 'json', + }); + + // Act + processor.register(records, sqsRecordHandler, options); + + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); + }); + + it('completes processing with failures if the schema is not passed for the parsing', async () => { + // Prepare + const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); + const secondRecord = sqsRecordFactory(JSON.stringify(successPayload2)); + const records = [firstRecord, secondRecord]; + + const processor = new BatchProcessor(EventType.SQS, { + transformer: 'json', }); // Act From fd5e8998b87c307341bcd6dc884d5f08578159ce Mon Sep 17 00:00:00 2001 From: Swopnil Dangol Date: Thu, 11 Sep 2025 07:51:04 +0100 Subject: [PATCH 2/8] Refactored the code to remove duplicates --- packages/batch/src/BatchProcessor.ts | 63 +++++++++++----------------- 1 file changed, 25 insertions(+), 38 deletions(-) diff --git a/packages/batch/src/BatchProcessor.ts b/packages/batch/src/BatchProcessor.ts index 466c29eea5..75f3164064 100644 --- a/packages/batch/src/BatchProcessor.ts +++ b/packages/batch/src/BatchProcessor.ts @@ -208,6 +208,28 @@ class BatchProcessor extends BasePartialBatchProcessor { } } + /** + * Parse the record with the passed schema and + * return the result or throw the error depending on parsing success + * + * @param record - The record to be parsed + * @param schema - The modified schema to parse with + */ + async #parseWithErrorHandling( + record: EventSourceDataClassTypes, + schema: StandardSchemaV1 + ) { + const { parse } = await import('@aws-lambda-powertools/parser'); + const result = parse(record, undefined, schema, true); + if (result.success) { + return result.data as EventSourceDataClassTypes; + } + const issues = result.error.cause as ReadonlyArray; + throw new Error( + `Failed to parse record: ${issues.map((issue) => `${issue?.path?.join('.')}: ${issue.message}`).join('; ')}` + ); + } + /** * Parse the record according to the schema and event type passed. * @@ -226,19 +248,10 @@ class BatchProcessor extends BasePartialBatchProcessor { if (this.parserConfig == null) { return record; } - const { parse } = await import('@aws-lambda-powertools/parser'); const { schema, innerSchema, transformer } = this.parserConfig; // If the external schema is specified, use it to parse the record if (schema != null) { - const extendedSchemaParsing = parse(record, undefined, schema, true); - if (extendedSchemaParsing.success) { - return extendedSchemaParsing.data as EventSourceDataClassTypes; - } - const issues = extendedSchemaParsing.error - .cause as ReadonlyArray; - throw new Error( - `Failed to parse record: ${issues.map((issue) => `${issue?.path?.join('.')}: ${issue.message}`).join('; ')} ` - ); + return this.#parseWithErrorHandling(record, schema); } if (innerSchema != null) { // Only proceed with schema extension if it's a Zod schema @@ -254,39 +267,13 @@ class BatchProcessor extends BasePartialBatchProcessor { innerSchema, transformer, }); - const schemaWithTransformersParsing = parse( - record, - undefined, - schemaWithTransformers, - true - ); - if (schemaWithTransformersParsing.success) { - return schemaWithTransformersParsing.data as EventSourceDataClassTypes; - } - const issues = schemaWithTransformersParsing.error - .cause as ReadonlyArray; - throw new Error( - `Failed to parse record: ${issues.map((issue) => `${issue?.path?.join('.')}: ${issue.message}`).join('; ')} ` - ); + return this.#parseWithErrorHandling(record, schemaWithTransformers); } const schemaWithoutTransformers = await this.#createExtendedSchema({ eventType, innerSchema, }); - const schemaWithoutTransformersParsing = parse( - record, - undefined, - schemaWithoutTransformers, - true - ); - if (schemaWithoutTransformersParsing.success) { - return schemaWithoutTransformersParsing.data as EventSourceDataClassTypes; - } - const issues = schemaWithoutTransformersParsing.error - .cause as ReadonlyArray; - throw new Error( - `Failed to parse record: ${issues.map((issue) => `${issue?.path?.join('.')}: ${issue.message}`).join('; ')} ` - ); + return this.#parseWithErrorHandling(record, schemaWithoutTransformers); } throw new Error('Either schema or innerSchema is required for parsing'); } From bd9d833283e72c1976cced244668e0e25c0b8442 Mon Sep 17 00:00:00 2001 From: Swopnil Dangol Date: Thu, 11 Sep 2025 08:59:02 +0100 Subject: [PATCH 3/8] Added logger --- .../batch/src/BasePartialBatchProcessor.ts | 18 ++++++++++++++++++ packages/batch/src/BatchProcessor.ts | 13 +++++++++---- packages/batch/src/types.ts | 5 +++-- 3 files changed, 30 insertions(+), 6 deletions(-) diff --git a/packages/batch/src/BasePartialBatchProcessor.ts b/packages/batch/src/BasePartialBatchProcessor.ts index 0c1902a1be..17537c47d6 100644 --- a/packages/batch/src/BasePartialBatchProcessor.ts +++ b/packages/batch/src/BasePartialBatchProcessor.ts @@ -1,8 +1,10 @@ +import { getStringFromEnv } from '@aws-lambda-powertools/commons/utils/env'; import type { DynamoDBRecord, KinesisStreamRecord, SQSRecord, } from 'aws-lambda'; +import type { GenericLogger } from '../../commons/lib/esm/types/GenericLogger.js'; import { BasePartialProcessor } from './BasePartialProcessor.js'; import { DATA_CLASS_MAPPING, @@ -43,6 +45,13 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor { */ public eventType: keyof typeof EventType; + /** + * A logger instance to be used for logging debug, warning, and error messages. + * + * When no logger is provided, we'll only log warnings and errors using the global `console` object. + */ + protected readonly logger: Pick; + /** * The configuration options for the parser integration */ @@ -66,6 +75,15 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor { [EventType.DynamoDBStreams]: () => this.collectDynamoDBFailures(), }; this.parserConfig = parserConfig; + const alcLogLevel = getStringFromEnv({ + key: 'AWS_LAMBDA_LOG_LEVEL', + defaultValue: '', + }); + this.logger = parserConfig?.logger ?? { + debug: alcLogLevel === 'DEBUG' ? console.debug : () => undefined, + error: console.error, + warn: console.warn, + }; } /** diff --git a/packages/batch/src/BatchProcessor.ts b/packages/batch/src/BatchProcessor.ts index 75f3164064..8dc60a2c5b 100644 --- a/packages/batch/src/BatchProcessor.ts +++ b/packages/batch/src/BatchProcessor.ts @@ -225,9 +225,11 @@ class BatchProcessor extends BasePartialBatchProcessor { return result.data as EventSourceDataClassTypes; } const issues = result.error.cause as ReadonlyArray; - throw new Error( - `Failed to parse record: ${issues.map((issue) => `${issue?.path?.join('.')}: ${issue.message}`).join('; ')}` - ); + const errorMessage = issues + .map((issue) => `${issue?.path?.join('.')}: ${issue.message}`) + .join('; '); + this.logger.error(errorMessage); + throw new Error(errorMessage); } /** @@ -256,7 +258,7 @@ class BatchProcessor extends BasePartialBatchProcessor { if (innerSchema != null) { // Only proceed with schema extension if it's a Zod schema if (innerSchema['~standard'].vendor !== SchemaVendor.Zod) { - console.warn( + this.logger.error( 'The schema provided is not supported. Only Zod schemas are supported for extension.' ); throw new Error('Unsupported schema type'); @@ -275,6 +277,9 @@ class BatchProcessor extends BasePartialBatchProcessor { }); return this.#parseWithErrorHandling(record, schemaWithoutTransformers); } + this.logger.error( + 'The schema provided is not supported. Only Zod schemas are supported for extension.' + ); throw new Error('Either schema or innerSchema is required for parsing'); } } diff --git a/packages/batch/src/types.ts b/packages/batch/src/types.ts index 34f722c3fd..3bc151329d 100644 --- a/packages/batch/src/types.ts +++ b/packages/batch/src/types.ts @@ -94,8 +94,8 @@ type PartialItemFailureResponse = { batchItemFailures: PartialItemFailures[] }; /** * Type representing the parser configuration options passed to the BasePartialBatchProcessor class. * - * @property schema - The schema to be used for parsing - * @property innerSchema - The schema for the inner payload + * @property schema - The full event schema to be used for parsing + * @property innerSchema - The inner payload schema * @property transformer - The transformer to be used for parsing the payload * @property logger - The logger to be used for logging debug and warning messages. */ @@ -113,6 +113,7 @@ type BasePartialBatchProcessorParserConfig = { innerSchema?: StandardSchemaV1; /** * The transformer to be used for parsing the payload. + * No transformers will be used if this is not provided. * Supported transformers are: * 1. 'json': Uses JSONStringified helper * 2. 'base64': Uses Base64Encoded helper From 96e5b607031bb4628c0b4933af5b3508a760c8c6 Mon Sep 17 00:00:00 2001 From: Swopnil Dangol Date: Thu, 11 Sep 2025 09:46:27 +0100 Subject: [PATCH 4/8] Added tests for logger --- packages/batch/src/BatchProcessor.ts | 2 +- .../batch/tests/unit/BatchProcessor.test.ts | 53 +++++++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/packages/batch/src/BatchProcessor.ts b/packages/batch/src/BatchProcessor.ts index 8dc60a2c5b..ad39e5e893 100644 --- a/packages/batch/src/BatchProcessor.ts +++ b/packages/batch/src/BatchProcessor.ts @@ -228,7 +228,7 @@ class BatchProcessor extends BasePartialBatchProcessor { const errorMessage = issues .map((issue) => `${issue?.path?.join('.')}: ${issue.message}`) .join('; '); - this.logger.error(errorMessage); + this.logger.debug(errorMessage); throw new Error(errorMessage); } diff --git a/packages/batch/tests/unit/BatchProcessor.test.ts b/packages/batch/tests/unit/BatchProcessor.test.ts index 4d5d14312e..4671f58faf 100644 --- a/packages/batch/tests/unit/BatchProcessor.test.ts +++ b/packages/batch/tests/unit/BatchProcessor.test.ts @@ -675,5 +675,58 @@ describe('Class: AsyncBatchProcessor', () => { FullBatchFailureError ); }); + + it('uses a custom logger when provided', async () => { + // Prepare + const logger = { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }; + const unsupportedSchema = v.object({ + Message: v.string(), + }); + const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); + const secondRecord = sqsRecordFactory(JSON.stringify(successPayload2)); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.SQS, { + innerSchema: unsupportedSchema, + transformer: 'json', + logger, + }); + + // Act + processor.register(records, sqsRecordHandler, options); + + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); + expect(logger.error).toHaveBeenCalledWith( + 'The schema provided is not supported. Only Zod schemas are supported for extension.' + ); + }); + + it('emits debug logs when AWS_LAMBDA_LOG_LEVEL is set to DEBUG', async () => { + // Prepare + const consoleSpy = vi.spyOn(console, 'debug'); + vi.stubEnv('AWS_LAMBDA_LOG_LEVEL', 'DEBUG'); + const firstRecord = sqsRecordFactory(JSON.stringify(failurePayload1)); + const records = [firstRecord]; + const processor = new BatchProcessor(EventType.SQS, { + innerSchema: customSchema, + transformer: 'json', + }); + + // Act + processor.register(records, sqsRecordHandler, options); + + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); + expect(consoleSpy).toHaveBeenCalled(); + }); }); }); From c2595e1a29613febcaf9ba4fa787e729a82e066b Mon Sep 17 00:00:00 2001 From: Swopnil Dangol Date: Thu, 11 Sep 2025 13:18:59 +0100 Subject: [PATCH 5/8] Created the type combinations for the parser config --- packages/batch/src/types.ts | 39 +++++++++++++------------------------ 1 file changed, 13 insertions(+), 26 deletions(-) diff --git a/packages/batch/src/types.ts b/packages/batch/src/types.ts index 3bc151329d..d5c7dcba8c 100644 --- a/packages/batch/src/types.ts +++ b/packages/batch/src/types.ts @@ -99,32 +99,19 @@ type PartialItemFailureResponse = { batchItemFailures: PartialItemFailures[] }; * @property transformer - The transformer to be used for parsing the payload * @property logger - The logger to be used for logging debug and warning messages. */ -type BasePartialBatchProcessorParserConfig = { - /** - * The schema for the full event including the extended inner payload schema. - * - * StandardSchema is supported. - */ - schema?: StandardSchemaV1; - /** - * The schema for the inner payload of the event. - * Only Zod schemas are supported. - */ - innerSchema?: StandardSchemaV1; - /** - * The transformer to be used for parsing the payload. - * No transformers will be used if this is not provided. - * Supported transformers are: - * 1. 'json': Uses JSONStringified helper - * 2. 'base64': Uses Base64Encoded helper - * 3. 'unmarshall': Uses DynamoDBMarshalled helper - */ - transformer?: 'json' | 'base64' | 'unmarshall'; - /** - * The logger to be used for logging debug and warning messages. - */ - logger?: Pick; -}; +type BasePartialBatchProcessorParserConfig = + | { + schema?: StandardSchemaV1; + innerSchema?: never; + transformer?: never; + logger?: Pick; + } + | { + schema?: never; + innerSchema?: StandardSchemaV1; + transformer?: 'json' | 'base64' | 'unmarshall'; + logger?: Pick; + }; export type { BatchProcessingOptions, From 89509b5fc76931892b9d52ae463869f2df5b0335 Mon Sep 17 00:00:00 2001 From: Andrea Amorosi Date: Thu, 11 Sep 2025 16:43:03 +0200 Subject: [PATCH 6/8] refactor: make sub-path export --- packages/batch/package.json | 10 +- packages/batch/src/BatchProcessor.ts | 172 +------ packages/batch/src/errors.ts | 11 + packages/batch/src/index.ts | 1 + packages/batch/src/parser.ts | 158 +++++++ packages/batch/src/types.ts | 5 +- .../batch/tests/unit/BatchProcessor.test.ts | 445 +----------------- packages/batch/tests/unit/parsing.test.ts | 383 +++++++++++++++ packages/batch/vitest.config.ts | 1 + 9 files changed, 577 insertions(+), 609 deletions(-) create mode 100644 packages/batch/src/parser.ts create mode 100644 packages/batch/tests/unit/parsing.test.ts diff --git a/packages/batch/package.json b/packages/batch/package.json index 42396f9918..e280a4db4c 100644 --- a/packages/batch/package.json +++ b/packages/batch/package.json @@ -38,6 +38,10 @@ "default": "./lib/esm/index.js" } }, + "./parser": { + "import": "./lib/esm/parser.js", + "require": "./lib/cjs/parser.js" + }, "./types": { "import": "./lib/esm/types.js", "require": "./lib/cjs/types.js" @@ -45,6 +49,10 @@ }, "typesVersions": { "*": { + "parser": [ + "lib/cjs/parser.d.ts", + "lib/esm/parser.d.ts" + ], "types": [ "lib/cjs/types.d.ts", "lib/esm/types.d.ts" @@ -76,4 +84,4 @@ "@aws-lambda-powertools/testing-utils": "file:../testing", "@aws-lambda-powertools/parser": "2.25.2" } -} +} \ No newline at end of file diff --git a/packages/batch/src/BatchProcessor.ts b/packages/batch/src/BatchProcessor.ts index ad39e5e893..1a8ec918b3 100644 --- a/packages/batch/src/BatchProcessor.ts +++ b/packages/batch/src/BatchProcessor.ts @@ -1,15 +1,6 @@ -import type { StandardSchemaV1 } from '@standard-schema/spec'; -import type { StreamRecord } from 'aws-lambda'; import { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js'; -import { EventType, SchemaVendor } from './constants.js'; import { BatchProcessingError } from './errors.js'; -import type { - BasePartialBatchProcessorParserConfig, - BaseRecord, - EventSourceDataClassTypes, - FailureResponse, - SuccessResponse, -} from './types.js'; +import type { BaseRecord, FailureResponse, SuccessResponse } from './types.js'; /** * Process records in a batch asynchronously and handle partial failure cases. @@ -109,7 +100,14 @@ class BatchProcessor extends BasePartialBatchProcessor { record: BaseRecord ): Promise { try { - const recordToProcess = await this.#parseRecord(record, this.eventType); + const recordToProcess = this.parserConfig?.parser + ? await this.parserConfig.parser( + record, + this.eventType, + this.logger, + this.parserConfig + ) + : record; const data = this.toBatchType(recordToProcess, this.eventType); const result = await this.handler(data, this.options?.context); return this.successHandler(record, result); @@ -130,158 +128,6 @@ class BatchProcessor extends BasePartialBatchProcessor { 'Not implemented. Use asyncProcess() instead.' ); } - - /** - * Extend the schema according to the event type passed. - * - * If useTransformers is true, extend using opinionated transformers. - * Otherwise, extend without any transformers. - * - * @param eventType - The type of event to process (SQS, Kinesis, DynamoDB) - * @param schema - The StandardSchema to be used for parsing - * @param useTransformers - Whether to use transformers for parsing - */ - async #createExtendedSchema(options: { - eventType: keyof typeof EventType; - innerSchema: StandardSchemaV1; - transformer?: BasePartialBatchProcessorParserConfig['transformer']; - }) { - const { eventType, innerSchema, transformer } = options; - let schema = innerSchema; - switch (transformer) { - case 'json': { - const { JSONStringified } = await import( - '@aws-lambda-powertools/parser/helpers' - ); - schema = JSONStringified(innerSchema as any); - break; - } - case 'base64': { - const { Base64Encoded } = await import( - '@aws-lambda-powertools/parser/helpers' - ); - schema = Base64Encoded(innerSchema as any); - break; - } - case 'unmarshall': { - const { DynamoDBMarshalled } = await import( - '@aws-lambda-powertools/parser/helpers/dynamodb' - ); - schema = DynamoDBMarshalled(innerSchema as any); - break; - } - } - switch (eventType) { - case EventType.SQS: { - const { SqsRecordSchema } = await import( - '@aws-lambda-powertools/parser/schemas/sqs' - ); - return SqsRecordSchema.extend({ - body: schema, - }); - } - case EventType.KinesisDataStreams: { - const { KinesisDataStreamRecord, KinesisDataStreamRecordPayload } = - await import('@aws-lambda-powertools/parser/schemas/kinesis'); - return KinesisDataStreamRecord.extend({ - kinesis: KinesisDataStreamRecordPayload.extend({ - data: schema, - }), - }); - } - case EventType.DynamoDBStreams: { - const { DynamoDBStreamRecord, DynamoDBStreamChangeRecordBase } = - await import('@aws-lambda-powertools/parser/schemas/dynamodb'); - return DynamoDBStreamRecord.extend({ - dynamodb: DynamoDBStreamChangeRecordBase.extend({ - OldImage: (schema as any).optional(), - NewImage: (schema as any).optional(), - }), - }); - } - default: { - console.warn( - `The event type provided is not supported. Supported events: ${Object.values(EventType).join(',')}` - ); - throw new Error('Unsupported event type'); - } - } - } - - /** - * Parse the record with the passed schema and - * return the result or throw the error depending on parsing success - * - * @param record - The record to be parsed - * @param schema - The modified schema to parse with - */ - async #parseWithErrorHandling( - record: EventSourceDataClassTypes, - schema: StandardSchemaV1 - ) { - const { parse } = await import('@aws-lambda-powertools/parser'); - const result = parse(record, undefined, schema, true); - if (result.success) { - return result.data as EventSourceDataClassTypes; - } - const issues = result.error.cause as ReadonlyArray; - const errorMessage = issues - .map((issue) => `${issue?.path?.join('.')}: ${issue.message}`) - .join('; '); - this.logger.debug(errorMessage); - throw new Error(errorMessage); - } - - /** - * Parse the record according to the schema and event type passed. - * - * If the passed schema is already an extended schema, - * use the schema directly to parse the record. - * - * Only Zod Schemas are supported for schema extension. - * - * @param record - The record to be parsed - * @param eventType - The type of event to process - */ - async #parseRecord( - record: EventSourceDataClassTypes, - eventType: keyof typeof EventType - ): Promise { - if (this.parserConfig == null) { - return record; - } - const { schema, innerSchema, transformer } = this.parserConfig; - // If the external schema is specified, use it to parse the record - if (schema != null) { - return this.#parseWithErrorHandling(record, schema); - } - if (innerSchema != null) { - // Only proceed with schema extension if it's a Zod schema - if (innerSchema['~standard'].vendor !== SchemaVendor.Zod) { - this.logger.error( - 'The schema provided is not supported. Only Zod schemas are supported for extension.' - ); - throw new Error('Unsupported schema type'); - } - if (transformer != null) { - const schemaWithTransformers = await this.#createExtendedSchema({ - eventType, - innerSchema, - transformer, - }); - return this.#parseWithErrorHandling(record, schemaWithTransformers); - } - const schemaWithoutTransformers = await this.#createExtendedSchema({ - eventType, - innerSchema, - }); - return this.#parseWithErrorHandling(record, schemaWithoutTransformers); - } - this.logger.error( - 'The schema provided is not supported. Only Zod schemas are supported for extension.' - ); - throw new Error('Either schema or innerSchema is required for parsing'); - } } export { BatchProcessor }; diff --git a/packages/batch/src/errors.ts b/packages/batch/src/errors.ts index 9467e5e0d2..a95a88cda6 100644 --- a/packages/batch/src/errors.ts +++ b/packages/batch/src/errors.ts @@ -63,10 +63,21 @@ class UnexpectedBatchTypeError extends BatchProcessingError { } } +/** + * Error thrown by the Batch Processing utility when a record fails to be parsed. + */ +class ParsingError extends BatchProcessingError { + public constructor(message: string) { + super(message); + this.name = 'ParsingError'; + } +} + export { BatchProcessingError, FullBatchFailureError, SqsFifoShortCircuitError, SqsFifoMessageGroupShortCircuitError, UnexpectedBatchTypeError, + ParsingError, }; diff --git a/packages/batch/src/index.ts b/packages/batch/src/index.ts index 510c57123d..6cf3370b00 100644 --- a/packages/batch/src/index.ts +++ b/packages/batch/src/index.ts @@ -5,6 +5,7 @@ export { EventType } from './constants.js'; export { BatchProcessingError, FullBatchFailureError, + ParsingError, SqsFifoMessageGroupShortCircuitError, SqsFifoShortCircuitError, UnexpectedBatchTypeError, diff --git a/packages/batch/src/parser.ts b/packages/batch/src/parser.ts new file mode 100644 index 0000000000..54a54ee2e2 --- /dev/null +++ b/packages/batch/src/parser.ts @@ -0,0 +1,158 @@ +import type { GenericLogger } from '@aws-lambda-powertools/commons/types'; +import type { StandardSchemaV1 } from '@standard-schema/spec'; +import type { ZodType } from 'zod'; +import { EventType, SchemaVendor } from './constants.js'; +import { ParsingError } from './errors.js'; +import type { + BasePartialBatchProcessorParserConfig, + EventSourceDataClassTypes, +} from './types.js'; + +/** + * Extend the schema according to the event type passed. + * + * If useTransformers is true, extend using opinionated transformers. + * Otherwise, extend without any transformers. + * + * @param options - The options for creating the extended schema + * @param options.eventType - The type of event to process (SQS, Kinesis, DynamoDB) + * @param options.schema - The StandardSchema to be used for parsing + * @param options.useTransformers - Whether to use transformers for parsing + * @param options.logger - A logger instance for logging + */ +const createExtendedSchema = async (options: { + eventType: keyof typeof EventType; + innerSchema: ZodType; + transformer?: BasePartialBatchProcessorParserConfig['transformer']; +}) => { + const { eventType, innerSchema, transformer } = options; + let schema = innerSchema; + switch (transformer) { + case 'json': { + const { JSONStringified } = await import( + '@aws-lambda-powertools/parser/helpers' + ); + schema = JSONStringified(innerSchema); + break; + } + case 'base64': { + const { Base64Encoded } = await import( + '@aws-lambda-powertools/parser/helpers' + ); + schema = Base64Encoded(innerSchema); + break; + } + case 'unmarshall': { + const { DynamoDBMarshalled } = await import( + '@aws-lambda-powertools/parser/helpers/dynamodb' + ); + schema = DynamoDBMarshalled(innerSchema); + break; + } + } + if (eventType === EventType.SQS) { + const { SqsRecordSchema } = await import( + '@aws-lambda-powertools/parser/schemas/sqs' + ); + return SqsRecordSchema.extend({ + body: schema, + }); + } + if (eventType === EventType.KinesisDataStreams) { + const { KinesisDataStreamRecord, KinesisDataStreamRecordPayload } = + await import('@aws-lambda-powertools/parser/schemas/kinesis'); + return KinesisDataStreamRecord.extend({ + kinesis: KinesisDataStreamRecordPayload.extend({ + data: schema, + }), + }); + } + + const { DynamoDBStreamRecord, DynamoDBStreamChangeRecordBase } = await import( + '@aws-lambda-powertools/parser/schemas/dynamodb' + ); + return DynamoDBStreamRecord.extend({ + dynamodb: DynamoDBStreamChangeRecordBase.extend({ + OldImage: schema.optional(), + NewImage: schema.optional(), + }), + }); +}; + +/** + * Parse the record with the passed schema and + * return the result or throw the error depending on parsing success + * + * @param record - The record to be parsed + * @param schema - The modified schema to parse with + * @param logger - A logger instance for logging + */ +const parseWithErrorHandling = async ( + record: EventSourceDataClassTypes, + schema: StandardSchemaV1, + logger: Pick +) => { + const { parse } = await import('@aws-lambda-powertools/parser'); + const result = parse(record, undefined, schema, true); + if (result.success) { + return result.data as EventSourceDataClassTypes; + } + const issues = result.error.cause as ReadonlyArray; + const errorMessage = issues + .map((issue) => `${issue?.path?.join('.')}: ${issue.message}`) + .join('; '); + logger.debug(errorMessage); + throw new ParsingError(errorMessage); +}; + +/** + * Parse the record according to the schema and event type passed. + * + * If the passed schema is already an extended schema, + * use the schema directly to parse the record. + * + * Only Zod Schemas are supported for schema extension. + * + * @param record - The record to be parsed + * @param eventType - The type of event to process + * @param logger - A logger instance for logging + * @param parserConfig - The parser configuration options + */ +const parser = async ( + record: EventSourceDataClassTypes, + eventType: keyof typeof EventType, + logger: Pick, + parserConfig: BasePartialBatchProcessorParserConfig +): Promise => { + const { schema, innerSchema, transformer } = parserConfig; + // If the external schema is specified, use it to parse the record + if (schema) { + return parseWithErrorHandling(record, schema, logger); + } + if (innerSchema) { + // Only proceed with schema extension if it's a Zod schema + if (innerSchema['~standard'].vendor !== SchemaVendor.Zod) { + logger.error( + 'The schema provided is not supported. Only Zod schemas are supported for extension.' + ); + throw new ParsingError('Unsupported schema type'); + } + return parseWithErrorHandling( + record, + await createExtendedSchema({ + eventType, + innerSchema, + ...(transformer ? { transformer } : {}), + }), + logger + ); + } + logger.error( + 'The schema provided is not supported. Only Zod schemas are supported for extension.' + ); + throw new ParsingError( + 'Either schema or innerSchema is required for parsing' + ); +}; + +export { parser }; diff --git a/packages/batch/src/types.ts b/packages/batch/src/types.ts index d5c7dcba8c..4ea03d920d 100644 --- a/packages/batch/src/types.ts +++ b/packages/batch/src/types.ts @@ -5,6 +5,7 @@ import type { KinesisStreamRecord, SQSRecord, } from 'aws-lambda'; +import type { ZodType } from 'zod'; import type { GenericLogger } from '../../commons/lib/esm/types/GenericLogger.js'; import type { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js'; import type { SqsFifoPartialProcessor } from './SqsFifoPartialProcessor.js'; @@ -101,14 +102,16 @@ type PartialItemFailureResponse = { batchItemFailures: PartialItemFailures[] }; */ type BasePartialBatchProcessorParserConfig = | { + parser?: CallableFunction; schema?: StandardSchemaV1; innerSchema?: never; transformer?: never; logger?: Pick; } | { + parser?: CallableFunction; schema?: never; - innerSchema?: StandardSchemaV1; + innerSchema?: ZodType; transformer?: 'json' | 'base64' | 'unmarshall'; logger?: Pick; }; diff --git a/packages/batch/tests/unit/BatchProcessor.test.ts b/packages/batch/tests/unit/BatchProcessor.test.ts index 4671f58faf..650354d968 100644 --- a/packages/batch/tests/unit/BatchProcessor.test.ts +++ b/packages/batch/tests/unit/BatchProcessor.test.ts @@ -1,27 +1,6 @@ -import { - Base64Encoded, - JSONStringified, -} from '@aws-lambda-powertools/parser/helpers'; -import { DynamoDBMarshalled } from '@aws-lambda-powertools/parser/helpers/dynamodb'; -import { - KinesisDataStreamRecord, - SqsRecordSchema, -} from '@aws-lambda-powertools/parser/schemas'; -import { - DynamoDBStreamChangeRecordBase, - DynamoDBStreamRecord, -} from '@aws-lambda-powertools/parser/schemas/dynamodb'; -import { KinesisDataStreamRecordPayload } from '@aws-lambda-powertools/parser/schemas/kinesis'; import context from '@aws-lambda-powertools/testing-utils/context'; -import type { - Context, - DynamoDBRecord, - KinesisStreamRecord, - SQSRecord, -} from 'aws-lambda'; -import * as v from 'valibot'; +import type { Context } from 'aws-lambda'; import { afterAll, beforeEach, describe, expect, it, vi } from 'vitest'; -import { z } from 'zod'; import { BatchProcessingError, BatchProcessor, @@ -307,426 +286,4 @@ describe('Class: AsyncBatchProcessor', () => { // Act & Assess expect(() => processor.processSync()).toThrowError(BatchProcessingError); }); - - describe('Batch processing with Parser Integration', () => { - const customSchema = z.object({ - Message: z.string(), - }); - const successPayload1 = { - Message: 'test-1', - }; - const successPayload2 = { - Message: 'test-2', - }; - const failurePayload1 = { - Message: 1, - }; - const failurePayload2 = { - Message: 2, - }; - const sqsRecordHandler = async (parsedRecord: SQSRecord) => { - return parsedRecord.body; - }; - const kinesisRecordHandler = async (parsedRecord: KinesisStreamRecord) => { - return parsedRecord.kinesis.data; - }; - const dynamodbRecordHandler = async (parsedRecord: DynamoDBRecord) => { - return parsedRecord.dynamodb?.NewImage; - }; - const cases = [ - { - description: 'passing Extended Schema', - SQSParserConfig: { - schema: SqsRecordSchema.extend({ - body: JSONStringified(customSchema), - }), - }, - KinesisParserConfig: { - schema: KinesisDataStreamRecord.extend({ - kinesis: KinesisDataStreamRecordPayload.extend({ - data: Base64Encoded(customSchema).optional(), - }), - }), - }, - DynamoDBParserConfig: { - schema: DynamoDBStreamRecord.extend({ - dynamodb: DynamoDBStreamChangeRecordBase.extend({ - NewImage: DynamoDBMarshalled(customSchema).optional(), - }), - }), - }, - }, - { - description: 'passing Internal Schema without transformer property set', - SQSParserConfig: { - innerSchema: JSONStringified(customSchema), - }, - KinesisParserConfig: { - innerSchema: Base64Encoded(customSchema), - }, - DynamoDBParserConfig: { - innerSchema: DynamoDBMarshalled(customSchema), - }, - }, - { - description: 'passing Internal Schema with transformer property set', - SQSParserConfig: { - innerSchema: customSchema, - transformer: 'json' as const, - }, - KinesisParserConfig: { - innerSchema: customSchema, - transformer: 'base64' as const, - }, - DynamoDBParserConfig: { - innerSchema: customSchema, - transformer: 'unmarshall' as const, - }, - }, - ]; - describe.each(cases)( - 'SQS Record Schema $description', - ({ SQSParserConfig }) => { - it('completes the processing with no failures and parses the payload before passing to the record handler', async () => { - // Prepare - const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); - const secondRecord = sqsRecordFactory( - JSON.stringify(successPayload2) - ); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.SQS, SQSParserConfig); - - // Act - processor.register(records, sqsRecordHandler, options); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages).toStrictEqual([ - ['success', successPayload1, firstRecord], - ['success', successPayload2, secondRecord], - ]); - }); - - it('completes the processing with failures if some of the payload does not match the passed schema', async () => { - // Prepare - const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); - const secondRecord = sqsRecordFactory( - JSON.stringify(failurePayload1) - ); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.SQS, SQSParserConfig); - - // Act - processor.register(records, sqsRecordHandler, options); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages[0]).toStrictEqual([ - 'success', - successPayload1, - firstRecord, - ]); - expect(processor.failureMessages.length).toBe(1); - expect(processor.response()).toStrictEqual({ - batchItemFailures: [{ itemIdentifier: secondRecord.messageId }], - }); - }); - - it('completes processing with all failures if all the payload does not match the passed schema', async () => { - // Prepare - const firstRecord = sqsRecordFactory(JSON.stringify(failurePayload1)); - const secondRecord = sqsRecordFactory( - JSON.stringify(failurePayload2) - ); - - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.SQS, SQSParserConfig); - - // Act - processor.register(records, sqsRecordHandler, options); - - // Assess - await expect(processor.process()).rejects.toThrowError( - FullBatchFailureError - ); - }); - } - ); - - describe.each(cases)( - 'Kinesis Record Schema $description', - ({ KinesisParserConfig }) => { - it('completes the processing with no failures and parses the payload before passing to the record handler', async () => { - // Prepare - const firstRecord = kinesisRecordFactory( - Buffer.from(JSON.stringify(successPayload1)).toString('base64') - ); - const secondRecord = kinesisRecordFactory( - Buffer.from(JSON.stringify(successPayload2)).toString('base64') - ); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor( - EventType.KinesisDataStreams, - KinesisParserConfig - ); - - // Act - processor.register(records, kinesisRecordHandler, options); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages).toStrictEqual([ - ['success', successPayload1, firstRecord], - ['success', successPayload2, secondRecord], - ]); - }); - - it('completes the processing with failures if some of the payload does not match the passed schema', async () => { - // Prepare - const firstRecord = kinesisRecordFactory( - Buffer.from(JSON.stringify(successPayload1)).toString('base64') - ); - const secondRecord = kinesisRecordFactory( - Buffer.from(JSON.stringify(failurePayload1)).toString('base64') - ); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor( - EventType.KinesisDataStreams, - KinesisParserConfig - ); - - // Act - processor.register(records, kinesisRecordHandler, options); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages[0]).toStrictEqual([ - 'success', - successPayload1, - firstRecord, - ]); - expect(processor.failureMessages.length).toBe(1); - expect(processor.response()).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: secondRecord.kinesis.sequenceNumber }, - ], - }); - }); - - it('completes processing with all failures if all the payload does not match the passed schema', async () => { - // Prepare - const firstRecord = kinesisRecordFactory( - Buffer.from(JSON.stringify(failurePayload1)).toString('base64') - ); - const secondRecord = kinesisRecordFactory( - Buffer.from(JSON.stringify(failurePayload2)).toString('base64') - ); - - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor( - EventType.KinesisDataStreams, - KinesisParserConfig - ); - - // Act - processor.register(records, sqsRecordHandler, options); - - // Assess - await expect(processor.process()).rejects.toThrowError( - FullBatchFailureError - ); - }); - } - ); - - describe.each(cases)( - 'DynamoDB Record Schema $description', - ({ DynamoDBParserConfig }) => { - it('completes the processing with no failures and parses the payload before passing to the record handler', async () => { - // Prepare - const firstRecord = dynamodbRecordFactory(successPayload1.Message); - const secondRecord = dynamodbRecordFactory(successPayload2.Message); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor( - EventType.DynamoDBStreams, - DynamoDBParserConfig - ); - - // Act - processor.register(records, dynamodbRecordHandler, options); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages).toStrictEqual([ - ['success', successPayload1, firstRecord], - ['success', successPayload2, secondRecord], - ]); - }); - - it('completes the processing with failures if some of the payload does not match the passed schema', async () => { - // Prepare - const firstRecord = dynamodbRecordFactory(successPayload1.Message); - //@ts-expect-error Passing an invalid payload for testing - const secondRecord = dynamodbRecordFactory(failurePayload1.Message); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor( - EventType.DynamoDBStreams, - DynamoDBParserConfig - ); - - // Act - processor.register(records, dynamodbRecordHandler, options); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages[0]).toStrictEqual([ - 'success', - successPayload1, - firstRecord, - ]); - expect(processor.failureMessages.length).toBe(1); - expect(processor.response()).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: secondRecord.dynamodb?.SequenceNumber }, - ], - }); - }); - - it('completes processing with all failures if all the payload does not match the passed schema', async () => { - // Prepare - //@ts-expect-error Passing an invalid payload for testing - const firstRecord = dynamodbRecordFactory(failurePayload1.Message); - //@ts-expect-error Passing an invalid payload for testing - const secondRecord = dynamodbRecordFactory(failurePayload2.Message); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor( - EventType.DynamoDBStreams, - DynamoDBParserConfig - ); - - // Act - processor.register(records, dynamodbRecordHandler, options); - - // Assess - await expect(processor.process()).rejects.toThrowError( - FullBatchFailureError - ); - }); - } - ); - - it('completes processing with all failures if an unsupported event type is used for parsing', async () => { - // Prepare - const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); - const secondRecord = sqsRecordFactory(JSON.stringify(successPayload2)); - const records = [firstRecord, secondRecord]; - //@ts-expect-error - const processor = new BatchProcessor('invalid-event-type', { - innerSchema: customSchema, - transformer: 'json', - }); - - // Act - processor.register(records, sqsRecordHandler, options); - - // Assess - await expect(processor.process()).rejects.toThrowError( - FullBatchFailureError - ); - }); - - it('completes processing with failures if an unsupported schema type is used for parsing', async () => { - // Prepare - const unsupportedSchema = v.object({ - Message: v.string(), - }); - const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); - const secondRecord = sqsRecordFactory(JSON.stringify(successPayload2)); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.SQS, { - innerSchema: unsupportedSchema, - transformer: 'json', - }); - - // Act - processor.register(records, sqsRecordHandler, options); - - // Assess - await expect(processor.process()).rejects.toThrowError( - FullBatchFailureError - ); - }); - - it('completes processing with failures if the schema is not passed for the parsing', async () => { - // Prepare - const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); - const secondRecord = sqsRecordFactory(JSON.stringify(successPayload2)); - const records = [firstRecord, secondRecord]; - - const processor = new BatchProcessor(EventType.SQS, { - transformer: 'json', - }); - - // Act - processor.register(records, sqsRecordHandler, options); - - // Assess - await expect(processor.process()).rejects.toThrowError( - FullBatchFailureError - ); - }); - - it('uses a custom logger when provided', async () => { - // Prepare - const logger = { - debug: vi.fn(), - info: vi.fn(), - warn: vi.fn(), - error: vi.fn(), - }; - const unsupportedSchema = v.object({ - Message: v.string(), - }); - const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); - const secondRecord = sqsRecordFactory(JSON.stringify(successPayload2)); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.SQS, { - innerSchema: unsupportedSchema, - transformer: 'json', - logger, - }); - - // Act - processor.register(records, sqsRecordHandler, options); - - // Assess - await expect(processor.process()).rejects.toThrowError( - FullBatchFailureError - ); - expect(logger.error).toHaveBeenCalledWith( - 'The schema provided is not supported. Only Zod schemas are supported for extension.' - ); - }); - - it('emits debug logs when AWS_LAMBDA_LOG_LEVEL is set to DEBUG', async () => { - // Prepare - const consoleSpy = vi.spyOn(console, 'debug'); - vi.stubEnv('AWS_LAMBDA_LOG_LEVEL', 'DEBUG'); - const firstRecord = sqsRecordFactory(JSON.stringify(failurePayload1)); - const records = [firstRecord]; - const processor = new BatchProcessor(EventType.SQS, { - innerSchema: customSchema, - transformer: 'json', - }); - - // Act - processor.register(records, sqsRecordHandler, options); - - // Assess - await expect(processor.process()).rejects.toThrowError( - FullBatchFailureError - ); - expect(consoleSpy).toHaveBeenCalled(); - }); - }); }); diff --git a/packages/batch/tests/unit/parsing.test.ts b/packages/batch/tests/unit/parsing.test.ts new file mode 100644 index 0000000000..231ec162a2 --- /dev/null +++ b/packages/batch/tests/unit/parsing.test.ts @@ -0,0 +1,383 @@ +import { + Base64Encoded, + JSONStringified, +} from '@aws-lambda-powertools/parser/helpers'; +import { DynamoDBMarshalled } from '@aws-lambda-powertools/parser/helpers/dynamodb'; +import { + KinesisDataStreamRecord, + SqsRecordSchema, +} from '@aws-lambda-powertools/parser/schemas'; +import { + DynamoDBStreamChangeRecordBase, + DynamoDBStreamRecord, +} from '@aws-lambda-powertools/parser/schemas/dynamodb'; +import { KinesisDataStreamRecordPayload } from '@aws-lambda-powertools/parser/schemas/kinesis'; +import type { + DynamoDBRecord, + KinesisStreamRecord, + SQSRecord, +} from 'aws-lambda'; +import { object, string } from 'valibot'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import { z } from 'zod'; +import { + BatchProcessor, + EventType, + FullBatchFailureError, +} from '../../src/index.js'; +import { parser } from '../../src/parser.js'; +import { + dynamodbRecordFactory, + kinesisRecordFactory, + sqsRecordFactory, +} from '../helpers/factories.js'; + +describe('Batch processing with Parser Integration', () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + const customSchema = z.object({ + Message: z.string(), + }); + const successPayload1 = { + Message: 'test-1', + }; + const successPayload2 = { + Message: 'test-2', + }; + const failurePayload1 = { + Message: 1, + }; + const sqsRecordHandler = async (parsedRecord: SQSRecord) => { + return parsedRecord.body; + }; + const kinesisRecordHandler = async (parsedRecord: KinesisStreamRecord) => { + return parsedRecord.kinesis.data; + }; + const dynamodbRecordHandler = async (parsedRecord: DynamoDBRecord) => { + return parsedRecord.dynamodb?.NewImage; + }; + const cases = [ + { + description: 'passing Extended Schema', + SQSParserConfig: { + parser, + schema: SqsRecordSchema.extend({ + body: JSONStringified(customSchema), + }), + }, + KinesisParserConfig: { + parser, + schema: KinesisDataStreamRecord.extend({ + kinesis: KinesisDataStreamRecordPayload.extend({ + data: Base64Encoded(customSchema).optional(), + }), + }), + }, + DynamoDBParserConfig: { + parser, + schema: DynamoDBStreamRecord.extend({ + dynamodb: DynamoDBStreamChangeRecordBase.extend({ + NewImage: DynamoDBMarshalled(customSchema).optional(), + }), + }), + }, + }, + { + description: 'passing Internal Schema without transformer property set', + SQSParserConfig: { + parser, + innerSchema: JSONStringified(customSchema), + }, + KinesisParserConfig: { + parser, + innerSchema: Base64Encoded(customSchema), + }, + DynamoDBParserConfig: { + parser, + innerSchema: DynamoDBMarshalled(customSchema), + }, + }, + { + description: 'passing Internal Schema with transformer property set', + SQSParserConfig: { + parser, + innerSchema: customSchema, + transformer: 'json' as const, + }, + KinesisParserConfig: { + parser, + innerSchema: customSchema, + transformer: 'base64' as const, + }, + DynamoDBParserConfig: { + parser, + innerSchema: customSchema, + transformer: 'unmarshall' as const, + }, + }, + ]; + describe.each(cases)( + 'SQS Record Schema $description', + ({ SQSParserConfig }) => { + it('completes the processing with no failures and parses the payload before passing to the record handler', async () => { + // Prepare + const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); + const secondRecord = sqsRecordFactory(JSON.stringify(successPayload2)); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.SQS, SQSParserConfig); + + // Act + processor.register(records, sqsRecordHandler); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', successPayload1, firstRecord], + ['success', successPayload2, secondRecord], + ]); + }); + + it('completes the processing with failures if some of the payload does not match the passed schema', async () => { + // Prepare + const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); + const secondRecord = sqsRecordFactory(JSON.stringify(failurePayload1)); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.SQS, SQSParserConfig); + + // Act + processor.register(records, sqsRecordHandler); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages[0]).toStrictEqual([ + 'success', + successPayload1, + firstRecord, + ]); + expect(processor.failureMessages.length).toBe(1); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [{ itemIdentifier: secondRecord.messageId }], + }); + }); + } + ); + + describe.each(cases)( + 'Kinesis Record Schema $description', + ({ KinesisParserConfig }) => { + it('completes the processing with no failures and parses the payload before passing to the record handler', async () => { + // Prepare + const firstRecord = kinesisRecordFactory( + Buffer.from(JSON.stringify(successPayload1)).toString('base64') + ); + const secondRecord = kinesisRecordFactory( + Buffer.from(JSON.stringify(successPayload2)).toString('base64') + ); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor( + EventType.KinesisDataStreams, + KinesisParserConfig + ); + + // Act + processor.register(records, kinesisRecordHandler); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', successPayload1, firstRecord], + ['success', successPayload2, secondRecord], + ]); + }); + + it('completes the processing with failures if some of the payload does not match the passed schema', async () => { + // Prepare + const firstRecord = kinesisRecordFactory( + Buffer.from(JSON.stringify(successPayload1)).toString('base64') + ); + const secondRecord = kinesisRecordFactory( + Buffer.from(JSON.stringify(failurePayload1)).toString('base64') + ); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor( + EventType.KinesisDataStreams, + KinesisParserConfig + ); + + // Act + processor.register(records, kinesisRecordHandler); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages[0]).toStrictEqual([ + 'success', + successPayload1, + firstRecord, + ]); + expect(processor.failureMessages.length).toBe(1); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: secondRecord.kinesis.sequenceNumber }, + ], + }); + }); + } + ); + + describe.each(cases)( + 'DynamoDB Record Schema $description', + ({ DynamoDBParserConfig }) => { + it('completes the processing with no failures and parses the payload before passing to the record handler', async () => { + // Prepare + const firstRecord = dynamodbRecordFactory(successPayload1.Message); + const secondRecord = dynamodbRecordFactory(successPayload2.Message); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor( + EventType.DynamoDBStreams, + DynamoDBParserConfig + ); + + // Act + processor.register(records, dynamodbRecordHandler); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', successPayload1, firstRecord], + ['success', successPayload2, secondRecord], + ]); + }); + + it('completes the processing with failures if some of the payload does not match the passed schema', async () => { + // Prepare + vi.stubEnv('AWS_LAMBDA_LOG_LEVEL', 'DEBUG'); + const firstRecord = dynamodbRecordFactory(successPayload1.Message); + //@ts-expect-error Passing an invalid payload for testing + const secondRecord = dynamodbRecordFactory(failurePayload1.Message); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.DynamoDBStreams, { + ...DynamoDBParserConfig, + logger: console, + }); + + // Act + processor.register(records, dynamodbRecordHandler); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages[0]).toStrictEqual([ + 'success', + successPayload1, + firstRecord, + ]); + expect(processor.failureMessages.length).toBe(1); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: secondRecord.dynamodb?.SequenceNumber }, + ], + }); + expect(console.debug).toHaveBeenCalledWith( + 'dynamodb.NewImage.Message: Invalid input: expected string, received number' + ); + }); + } + ); + + it('completes processing with all failures if an unsupported event type is used for parsing', async () => { + // Prepare + const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); + const secondRecord = sqsRecordFactory(JSON.stringify(successPayload2)); + const records = [firstRecord, secondRecord]; + //@ts-expect-error + const processor = new BatchProcessor('invalid-event-type', { + innerSchema: customSchema, + transformer: 'json', + }); + + // Act + processor.register(records, sqsRecordHandler); + + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); + }); + + it('completes processing with failures if an unsupported schema type is used for parsing', async () => { + // Prepare + const unsupportedSchema = object({ + Message: string(), + }); + const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); + const secondRecord = sqsRecordFactory(JSON.stringify(successPayload2)); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.SQS, { + parser, + // @ts-expect-error - we are explicitly testing a wrong schema vendor + innerSchema: unsupportedSchema, + transformer: 'json', + }); + + // Act + processor.register(records, sqsRecordHandler); + + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); + }); + + it('completes processing with failures if the schema is not passed for the parsing', async () => { + // Prepare + const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); + const secondRecord = sqsRecordFactory(JSON.stringify(successPayload2)); + const records = [firstRecord, secondRecord]; + + const processor = new BatchProcessor(EventType.SQS, { + parser, + transformer: 'json', + }); + + // Act + processor.register(records, sqsRecordHandler); + + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); + }); + + it('uses a custom logger when provided', async () => { + // Prepare + const logger = { + debug: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }; + const unsupportedSchema = object({ + Message: string(), + }); + const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); + const secondRecord = sqsRecordFactory(JSON.stringify(successPayload2)); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.SQS, { + parser, + // @ts-expect-error - we are explicitly testing a wrong schema vendor + innerSchema: unsupportedSchema, + transformer: 'json', + logger, + }); + + // Act + processor.register(records, sqsRecordHandler); + + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); + expect(logger.error).toHaveBeenCalledWith( + 'The schema provided is not supported. Only Zod schemas are supported for extension.' + ); + }); +}); diff --git a/packages/batch/vitest.config.ts b/packages/batch/vitest.config.ts index d5aa737c68..9f1196ef1f 100644 --- a/packages/batch/vitest.config.ts +++ b/packages/batch/vitest.config.ts @@ -3,5 +3,6 @@ import { defineProject } from 'vitest/config'; export default defineProject({ test: { environment: 'node', + setupFiles: ['../testing/src/setupEnv.ts'], }, }); From 31300765a25e083ae6fc7e57feebf84971e9c8c1 Mon Sep 17 00:00:00 2001 From: Swopnil Dangol Date: Thu, 11 Sep 2025 15:49:50 +0100 Subject: [PATCH 7/8] Updated error message --- packages/batch/src/parser.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/packages/batch/src/parser.ts b/packages/batch/src/parser.ts index 54a54ee2e2..3037c6adc6 100644 --- a/packages/batch/src/parser.ts +++ b/packages/batch/src/parser.ts @@ -147,9 +147,7 @@ const parser = async ( logger ); } - logger.error( - 'The schema provided is not supported. Only Zod schemas are supported for extension.' - ); + logger.error('There was no schema or innerSchema provided'); throw new ParsingError( 'Either schema or innerSchema is required for parsing' ); From b305854f8d0f728425245f014495becdcdb15f19 Mon Sep 17 00:00:00 2001 From: Swopnil Dangol Date: Thu, 11 Sep 2025 16:06:59 +0100 Subject: [PATCH 8/8] Removed optional chaining from parsing issue --- packages/batch/src/parser.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/batch/src/parser.ts b/packages/batch/src/parser.ts index 3037c6adc6..fe1cb795a6 100644 --- a/packages/batch/src/parser.ts +++ b/packages/batch/src/parser.ts @@ -99,7 +99,7 @@ const parseWithErrorHandling = async ( } const issues = result.error.cause as ReadonlyArray; const errorMessage = issues - .map((issue) => `${issue?.path?.join('.')}: ${issue.message}`) + .map((issue) => `${issue.path?.join('.')}: ${issue.message}`) .join('; '); logger.debug(errorMessage); throw new ParsingError(errorMessage);