diff --git a/packages/batch/package.json b/packages/batch/package.json index 42396f9918..e280a4db4c 100644 --- a/packages/batch/package.json +++ b/packages/batch/package.json @@ -38,6 +38,10 @@ "default": "./lib/esm/index.js" } }, + "./parser": { + "import": "./lib/esm/parser.js", + "require": "./lib/cjs/parser.js" + }, "./types": { "import": "./lib/esm/types.js", "require": "./lib/cjs/types.js" @@ -45,6 +49,10 @@ }, "typesVersions": { "*": { + "parser": [ + "lib/cjs/parser.d.ts", + "lib/esm/parser.d.ts" + ], "types": [ "lib/cjs/types.d.ts", "lib/esm/types.d.ts" @@ -76,4 +84,4 @@ "@aws-lambda-powertools/testing-utils": "file:../testing", "@aws-lambda-powertools/parser": "2.25.2" } -} +} \ No newline at end of file diff --git a/packages/batch/src/BasePartialBatchProcessor.ts b/packages/batch/src/BasePartialBatchProcessor.ts index 529a3870d9..17537c47d6 100644 --- a/packages/batch/src/BasePartialBatchProcessor.ts +++ b/packages/batch/src/BasePartialBatchProcessor.ts @@ -1,9 +1,10 @@ -import type { StandardSchemaV1 } from '@standard-schema/spec'; +import { getStringFromEnv } from '@aws-lambda-powertools/commons/utils/env'; import type { DynamoDBRecord, KinesisStreamRecord, SQSRecord, } from 'aws-lambda'; +import type { GenericLogger } from '../../commons/lib/esm/types/GenericLogger.js'; import { BasePartialProcessor } from './BasePartialProcessor.js'; import { DATA_CLASS_MAPPING, @@ -12,7 +13,7 @@ import { } from './constants.js'; import { FullBatchFailureError } from './errors.js'; import type { - BasePartialBatchProcessorConfig, + BasePartialBatchProcessorParserConfig, EventSourceDataClassTypes, PartialItemFailureResponse, PartialItemFailures, @@ -45,9 +46,16 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor { public eventType: keyof typeof EventType; /** - * The schema of the body of the event record for parsing + * A logger instance to be used for logging debug, warning, and error messages. + * + * When no logger is provided, we'll only log warnings and errors using the global `console` object. + */ + protected readonly logger: Pick; + + /** + * The configuration options for the parser integration */ - protected schema?: StandardSchemaV1; + protected parserConfig?: BasePartialBatchProcessorParserConfig; /** * Initializes base batch processing class @@ -56,7 +64,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor { */ public constructor( eventType: keyof typeof EventType, - config?: BasePartialBatchProcessorConfig + parserConfig?: BasePartialBatchProcessorParserConfig ) { super(); this.eventType = eventType; @@ -66,9 +74,16 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor { [EventType.KinesisDataStreams]: () => this.collectKinesisFailures(), [EventType.DynamoDBStreams]: () => this.collectDynamoDBFailures(), }; - if (config) { - this.schema = config.schema; - } + this.parserConfig = parserConfig; + const alcLogLevel = getStringFromEnv({ + key: 'AWS_LAMBDA_LOG_LEVEL', + defaultValue: '', + }); + this.logger = parserConfig?.logger ?? { + debug: alcLogLevel === 'DEBUG' ? console.debug : () => undefined, + error: console.error, + warn: console.warn, + }; } /** diff --git a/packages/batch/src/BatchProcessor.ts b/packages/batch/src/BatchProcessor.ts index f6221e9e64..1a8ec918b3 100644 --- a/packages/batch/src/BatchProcessor.ts +++ b/packages/batch/src/BatchProcessor.ts @@ -1,14 +1,6 @@ -import type { StandardSchemaV1 } from '@standard-schema/spec'; -import type { StreamRecord } from 'aws-lambda'; import { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js'; -import { EventType, SchemaVendor } from './constants.js'; import { BatchProcessingError } from './errors.js'; -import type { - BaseRecord, - EventSourceDataClassTypes, - FailureResponse, - SuccessResponse, -} from './types.js'; +import type { BaseRecord, FailureResponse, SuccessResponse } from './types.js'; /** * Process records in a batch asynchronously and handle partial failure cases. @@ -108,13 +100,16 @@ class BatchProcessor extends BasePartialBatchProcessor { record: BaseRecord ): Promise { try { - const recordToProcess = - this.schema == null - ? record - : await this.#parseRecord(record, this.eventType, this.schema); + const recordToProcess = this.parserConfig?.parser + ? await this.parserConfig.parser( + record, + this.eventType, + this.logger, + this.parserConfig + ) + : record; const data = this.toBatchType(recordToProcess, this.eventType); const result = await this.handler(data, this.options?.context); - return this.successHandler(record, result); } catch (error) { return this.failureHandler(record, error as Error); @@ -133,163 +128,6 @@ class BatchProcessor extends BasePartialBatchProcessor { 'Not implemented. Use asyncProcess() instead.' ); } - - /** - * Extend the schema according to the event type passed. - * - * If useTransformers is true, extend using opinionated transformers. - * Otherwise, extend without any transformers. - * - * @param eventType - The type of event to process (SQS, Kinesis, DynamoDB) - * @param schema - The StandardSchema to be used for parsing - * @param useTransformers - Whether to use transformers for parsing - */ - async #createExtendedSchema(options: { - eventType: keyof typeof EventType; - schema: StandardSchemaV1; - useTransformers: boolean; - }) { - const { eventType, schema, useTransformers } = options; - switch (eventType) { - case EventType.SQS: { - if (useTransformers) { - const [{ JSONStringified }, { SqsRecordSchema }] = await Promise.all([ - import('@aws-lambda-powertools/parser/helpers'), - import('@aws-lambda-powertools/parser/schemas/sqs'), - ]); - return SqsRecordSchema.extend({ - body: JSONStringified(schema as any), - }); - } - const { SqsRecordSchema } = await import( - '@aws-lambda-powertools/parser/schemas/sqs' - ); - return SqsRecordSchema.extend({ body: schema }); - } - - case EventType.KinesisDataStreams: { - if (useTransformers) { - const [ - { Base64Encoded }, - { KinesisDataStreamRecord, KinesisDataStreamRecordPayload }, - ] = await Promise.all([ - import('@aws-lambda-powertools/parser/helpers'), - import('@aws-lambda-powertools/parser/schemas/kinesis'), - ]); - return KinesisDataStreamRecord.extend({ - kinesis: KinesisDataStreamRecordPayload.extend({ - data: Base64Encoded(schema as any), - }), - }); - } - const { KinesisDataStreamRecord, KinesisDataStreamRecordPayload } = - await import('@aws-lambda-powertools/parser/schemas/kinesis'); - return KinesisDataStreamRecord.extend({ - kinesis: KinesisDataStreamRecordPayload.extend({ data: schema }), - }); - } - - case EventType.DynamoDBStreams: { - if (useTransformers) { - const [ - { DynamoDBMarshalled }, - { DynamoDBStreamRecord, DynamoDBStreamChangeRecordBase }, - ] = await Promise.all([ - import('@aws-lambda-powertools/parser/helpers/dynamodb'), - import('@aws-lambda-powertools/parser/schemas/dynamodb'), - ]); - return DynamoDBStreamRecord.extend({ - dynamodb: DynamoDBStreamChangeRecordBase.extend({ - OldImage: DynamoDBMarshalled( - schema as any - ).optional(), - NewImage: DynamoDBMarshalled( - schema as any - ).optional(), - }), - }); - } - const { DynamoDBStreamRecord, DynamoDBStreamChangeRecordBase } = - await import('@aws-lambda-powertools/parser/schemas/dynamodb'); - return DynamoDBStreamRecord.extend({ - dynamodb: DynamoDBStreamChangeRecordBase.extend({ - OldImage: (schema as any).optional(), - NewImage: (schema as any).optional(), - }), - }); - } - - default: { - console.warn( - `The event type provided is not supported. Supported events: ${Object.values(EventType).join(',')}` - ); - throw new Error('Unsupported event type'); - } - } - } - - /** - * Parse the record according to the schema and event type passed. - * - * If the passed schema is already an extended schema, - * use the schema directly to parse the record. - * - * Only Zod Schemas are supported for schema extension. - * - * @param record - The record to be parsed - * @param eventType - The type of event to process - * @param schema - The StandardSchema to be used for parsing - */ - async #parseRecord( - record: EventSourceDataClassTypes, - eventType: keyof typeof EventType, - schema: StandardSchemaV1 - ): Promise { - const { parse } = await import('@aws-lambda-powertools/parser'); - // Try parsing with the original schema first - const extendedSchemaParsing = parse(record, undefined, schema, true); - if (extendedSchemaParsing.success) { - return extendedSchemaParsing.data as EventSourceDataClassTypes; - } - // Only proceed with schema extension if it's a Zod schema - if (schema['~standard'].vendor !== SchemaVendor.Zod) { - console.warn( - 'The schema provided is not supported. Only Zod schemas are supported for extension.' - ); - throw new Error('Unsupported schema type'); - } - // Handle schema extension based on event type - // Try without transformers first, then with transformers - const schemaWithoutTransformers = await this.#createExtendedSchema({ - eventType, - schema, - useTransformers: false, - }); - const schemaWithoutTransformersParsing = parse( - record, - undefined, - schemaWithoutTransformers, - true - ); - if (schemaWithoutTransformersParsing.success) { - return schemaWithoutTransformersParsing.data as EventSourceDataClassTypes; - } - const schemaWithTransformers = await this.#createExtendedSchema({ - eventType, - schema, - useTransformers: true, - }); - const schemaWithTransformersParsing = parse( - record, - undefined, - schemaWithTransformers, - true - ); - if (schemaWithTransformersParsing.success) { - return schemaWithTransformersParsing.data as EventSourceDataClassTypes; - } - throw new Error('Failed to parse record'); - } } export { BatchProcessor }; diff --git a/packages/batch/src/errors.ts b/packages/batch/src/errors.ts index 9467e5e0d2..a95a88cda6 100644 --- a/packages/batch/src/errors.ts +++ b/packages/batch/src/errors.ts @@ -63,10 +63,21 @@ class UnexpectedBatchTypeError extends BatchProcessingError { } } +/** + * Error thrown by the Batch Processing utility when a record fails to be parsed. + */ +class ParsingError extends BatchProcessingError { + public constructor(message: string) { + super(message); + this.name = 'ParsingError'; + } +} + export { BatchProcessingError, FullBatchFailureError, SqsFifoShortCircuitError, SqsFifoMessageGroupShortCircuitError, UnexpectedBatchTypeError, + ParsingError, }; diff --git a/packages/batch/src/index.ts b/packages/batch/src/index.ts index 510c57123d..6cf3370b00 100644 --- a/packages/batch/src/index.ts +++ b/packages/batch/src/index.ts @@ -5,6 +5,7 @@ export { EventType } from './constants.js'; export { BatchProcessingError, FullBatchFailureError, + ParsingError, SqsFifoMessageGroupShortCircuitError, SqsFifoShortCircuitError, UnexpectedBatchTypeError, diff --git a/packages/batch/src/parser.ts b/packages/batch/src/parser.ts new file mode 100644 index 0000000000..fe1cb795a6 --- /dev/null +++ b/packages/batch/src/parser.ts @@ -0,0 +1,156 @@ +import type { GenericLogger } from '@aws-lambda-powertools/commons/types'; +import type { StandardSchemaV1 } from '@standard-schema/spec'; +import type { ZodType } from 'zod'; +import { EventType, SchemaVendor } from './constants.js'; +import { ParsingError } from './errors.js'; +import type { + BasePartialBatchProcessorParserConfig, + EventSourceDataClassTypes, +} from './types.js'; + +/** + * Extend the schema according to the event type passed. + * + * If useTransformers is true, extend using opinionated transformers. + * Otherwise, extend without any transformers. + * + * @param options - The options for creating the extended schema + * @param options.eventType - The type of event to process (SQS, Kinesis, DynamoDB) + * @param options.schema - The StandardSchema to be used for parsing + * @param options.useTransformers - Whether to use transformers for parsing + * @param options.logger - A logger instance for logging + */ +const createExtendedSchema = async (options: { + eventType: keyof typeof EventType; + innerSchema: ZodType; + transformer?: BasePartialBatchProcessorParserConfig['transformer']; +}) => { + const { eventType, innerSchema, transformer } = options; + let schema = innerSchema; + switch (transformer) { + case 'json': { + const { JSONStringified } = await import( + '@aws-lambda-powertools/parser/helpers' + ); + schema = JSONStringified(innerSchema); + break; + } + case 'base64': { + const { Base64Encoded } = await import( + '@aws-lambda-powertools/parser/helpers' + ); + schema = Base64Encoded(innerSchema); + break; + } + case 'unmarshall': { + const { DynamoDBMarshalled } = await import( + '@aws-lambda-powertools/parser/helpers/dynamodb' + ); + schema = DynamoDBMarshalled(innerSchema); + break; + } + } + if (eventType === EventType.SQS) { + const { SqsRecordSchema } = await import( + '@aws-lambda-powertools/parser/schemas/sqs' + ); + return SqsRecordSchema.extend({ + body: schema, + }); + } + if (eventType === EventType.KinesisDataStreams) { + const { KinesisDataStreamRecord, KinesisDataStreamRecordPayload } = + await import('@aws-lambda-powertools/parser/schemas/kinesis'); + return KinesisDataStreamRecord.extend({ + kinesis: KinesisDataStreamRecordPayload.extend({ + data: schema, + }), + }); + } + + const { DynamoDBStreamRecord, DynamoDBStreamChangeRecordBase } = await import( + '@aws-lambda-powertools/parser/schemas/dynamodb' + ); + return DynamoDBStreamRecord.extend({ + dynamodb: DynamoDBStreamChangeRecordBase.extend({ + OldImage: schema.optional(), + NewImage: schema.optional(), + }), + }); +}; + +/** + * Parse the record with the passed schema and + * return the result or throw the error depending on parsing success + * + * @param record - The record to be parsed + * @param schema - The modified schema to parse with + * @param logger - A logger instance for logging + */ +const parseWithErrorHandling = async ( + record: EventSourceDataClassTypes, + schema: StandardSchemaV1, + logger: Pick +) => { + const { parse } = await import('@aws-lambda-powertools/parser'); + const result = parse(record, undefined, schema, true); + if (result.success) { + return result.data as EventSourceDataClassTypes; + } + const issues = result.error.cause as ReadonlyArray; + const errorMessage = issues + .map((issue) => `${issue.path?.join('.')}: ${issue.message}`) + .join('; '); + logger.debug(errorMessage); + throw new ParsingError(errorMessage); +}; + +/** + * Parse the record according to the schema and event type passed. + * + * If the passed schema is already an extended schema, + * use the schema directly to parse the record. + * + * Only Zod Schemas are supported for schema extension. + * + * @param record - The record to be parsed + * @param eventType - The type of event to process + * @param logger - A logger instance for logging + * @param parserConfig - The parser configuration options + */ +const parser = async ( + record: EventSourceDataClassTypes, + eventType: keyof typeof EventType, + logger: Pick, + parserConfig: BasePartialBatchProcessorParserConfig +): Promise => { + const { schema, innerSchema, transformer } = parserConfig; + // If the external schema is specified, use it to parse the record + if (schema) { + return parseWithErrorHandling(record, schema, logger); + } + if (innerSchema) { + // Only proceed with schema extension if it's a Zod schema + if (innerSchema['~standard'].vendor !== SchemaVendor.Zod) { + logger.error( + 'The schema provided is not supported. Only Zod schemas are supported for extension.' + ); + throw new ParsingError('Unsupported schema type'); + } + return parseWithErrorHandling( + record, + await createExtendedSchema({ + eventType, + innerSchema, + ...(transformer ? { transformer } : {}), + }), + logger + ); + } + logger.error('There was no schema or innerSchema provided'); + throw new ParsingError( + 'Either schema or innerSchema is required for parsing' + ); +}; + +export { parser }; diff --git a/packages/batch/src/types.ts b/packages/batch/src/types.ts index b5781c44c8..4ea03d920d 100644 --- a/packages/batch/src/types.ts +++ b/packages/batch/src/types.ts @@ -5,7 +5,8 @@ import type { KinesisStreamRecord, SQSRecord, } from 'aws-lambda'; - +import type { ZodType } from 'zod'; +import type { GenericLogger } from '../../commons/lib/esm/types/GenericLogger.js'; import type { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js'; import type { SqsFifoPartialProcessor } from './SqsFifoPartialProcessor.js'; import type { SqsFifoPartialProcessorAsync } from './SqsFifoPartialProcessorAsync.js'; @@ -92,19 +93,28 @@ type PartialItemFailures = { itemIdentifier: string }; type PartialItemFailureResponse = { batchItemFailures: PartialItemFailures[] }; /** - * Type representing the configuration options passed to the BasePartialBatchProcessor class. + * Type representing the parser configuration options passed to the BasePartialBatchProcessor class. * - * @property schema - The schema to be used for parsing + * @property schema - The full event schema to be used for parsing + * @property innerSchema - The inner payload schema + * @property transformer - The transformer to be used for parsing the payload + * @property logger - The logger to be used for logging debug and warning messages. */ -type BasePartialBatchProcessorConfig = { - /** - * The schema be either of the following: - * 1. An internal schema of the payload of the supported event types. - * 2. An internal schema along with helper transformer functions. - * 3. An extended schema of the supported event type. - */ - schema: StandardSchemaV1; -}; +type BasePartialBatchProcessorParserConfig = + | { + parser?: CallableFunction; + schema?: StandardSchemaV1; + innerSchema?: never; + transformer?: never; + logger?: Pick; + } + | { + parser?: CallableFunction; + schema?: never; + innerSchema?: ZodType; + transformer?: 'json' | 'base64' | 'unmarshall'; + logger?: Pick; + }; export type { BatchProcessingOptions, @@ -114,5 +124,5 @@ export type { FailureResponse, PartialItemFailures, PartialItemFailureResponse, - BasePartialBatchProcessorConfig, + BasePartialBatchProcessorParserConfig, }; diff --git a/packages/batch/tests/unit/BatchProcessor.test.ts b/packages/batch/tests/unit/BatchProcessor.test.ts index 020038b3fc..650354d968 100644 --- a/packages/batch/tests/unit/BatchProcessor.test.ts +++ b/packages/batch/tests/unit/BatchProcessor.test.ts @@ -1,27 +1,6 @@ -import { - Base64Encoded, - JSONStringified, -} from '@aws-lambda-powertools/parser/helpers'; -import { DynamoDBMarshalled } from '@aws-lambda-powertools/parser/helpers/dynamodb'; -import { - KinesisDataStreamRecord, - SqsRecordSchema, -} from '@aws-lambda-powertools/parser/schemas'; -import { - DynamoDBStreamChangeRecordBase, - DynamoDBStreamRecord, -} from '@aws-lambda-powertools/parser/schemas/dynamodb'; -import { KinesisDataStreamRecordPayload } from '@aws-lambda-powertools/parser/schemas/kinesis'; import context from '@aws-lambda-powertools/testing-utils/context'; -import type { - Context, - DynamoDBRecord, - KinesisStreamRecord, - SQSRecord, -} from 'aws-lambda'; -import * as v from 'valibot'; +import type { Context } from 'aws-lambda'; import { afterAll, beforeEach, describe, expect, it, vi } from 'vitest'; -import { z } from 'zod'; import { BatchProcessingError, BatchProcessor, @@ -307,340 +286,4 @@ describe('Class: AsyncBatchProcessor', () => { // Act & Assess expect(() => processor.processSync()).toThrowError(BatchProcessingError); }); - - describe('Batch processing with Parser Integration', () => { - const customSchema = z.object({ - Message: z.string(), - }); - const successPayload1 = { - Message: 'test-1', - }; - const successPayload2 = { - Message: 'test-2', - }; - const failurePayload1 = { - Message: 1, - }; - const failurePayload2 = { - Message: 2, - }; - const sqsRecordHandler = async (parsedRecord: SQSRecord) => { - return parsedRecord.body; - }; - const kinesisRecordHandler = async (parsedRecord: KinesisStreamRecord) => { - return parsedRecord.kinesis.data; - }; - const dynamodbRecordHandler = async (parsedRecord: DynamoDBRecord) => { - return parsedRecord.dynamodb?.NewImage; - }; - const cases = [ - { - description: 'passing Extended Schema', - SQS: { - schema: SqsRecordSchema.extend({ - body: JSONStringified(customSchema), - }), - }, - Kinesis: { - schema: KinesisDataStreamRecord.extend({ - kinesis: KinesisDataStreamRecordPayload.extend({ - data: Base64Encoded(customSchema).optional(), - }), - }), - }, - DynamoDB: { - schema: DynamoDBStreamRecord.extend({ - dynamodb: DynamoDBStreamChangeRecordBase.extend({ - NewImage: DynamoDBMarshalled(customSchema).optional(), - }), - }), - }, - }, - { - description: 'passing Internal Schema without transformers', - SQS: { - schema: customSchema, - }, - Kinesis: { - schema: customSchema, - }, - DynamoDB: { - schema: customSchema, - }, - }, - { - description: 'passing Internal Schema with transformers', - SQS: { - schema: JSONStringified(customSchema), - }, - Kinesis: { - schema: Base64Encoded(customSchema), - }, - DynamoDB: { - schema: DynamoDBMarshalled(customSchema), - }, - }, - ]; - describe.each(cases)('SQS Record Schema $description', ({ SQS }) => { - it('completes the processing with no failures and parses the payload before passing to the record handler', async () => { - // Prepare - const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); - const secondRecord = sqsRecordFactory(JSON.stringify(successPayload2)); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.SQS, { - schema: SQS.schema, - }); - - // Act - processor.register(records, sqsRecordHandler, options); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages).toStrictEqual([ - ['success', successPayload1, firstRecord], - ['success', successPayload2, secondRecord], - ]); - }); - - it('completes the processing with failures if some of the payload does not match the passed schema', async () => { - // Prepare - const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); - const secondRecord = sqsRecordFactory(JSON.stringify(failurePayload1)); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.SQS, { - schema: SQS.schema, - }); - - // Act - processor.register(records, sqsRecordHandler, options); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages[0]).toStrictEqual([ - 'success', - successPayload1, - firstRecord, - ]); - expect(processor.failureMessages.length).toBe(1); - expect(processor.response()).toStrictEqual({ - batchItemFailures: [{ itemIdentifier: secondRecord.messageId }], - }); - }); - - it('completes processing with all failures if all the payload does not match the passed schema', async () => { - // Prepare - const firstRecord = sqsRecordFactory(JSON.stringify(failurePayload1)); - const secondRecord = sqsRecordFactory(JSON.stringify(failurePayload2)); - - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.SQS, { - schema: SQS.schema, - }); - - // Act - processor.register(records, sqsRecordHandler, options); - - // Assess - await expect(processor.process()).rejects.toThrowError( - FullBatchFailureError - ); - }); - }); - - describe.each(cases)( - 'Kinesis Record Schema $description', - ({ Kinesis }) => { - it('completes the processing with no failures and parses the payload before passing to the record handler', async () => { - // Prepare - const firstRecord = kinesisRecordFactory( - Buffer.from(JSON.stringify(successPayload1)).toString('base64') - ); - const secondRecord = kinesisRecordFactory( - Buffer.from(JSON.stringify(successPayload2)).toString('base64') - ); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.KinesisDataStreams, { - schema: Kinesis.schema, - }); - - // Act - processor.register(records, kinesisRecordHandler, options); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages).toStrictEqual([ - ['success', successPayload1, firstRecord], - ['success', successPayload2, secondRecord], - ]); - }); - - it('completes the processing with failures if some of the payload does not match the passed schema', async () => { - // Prepare - const firstRecord = kinesisRecordFactory( - Buffer.from(JSON.stringify(successPayload1)).toString('base64') - ); - const secondRecord = kinesisRecordFactory( - Buffer.from(JSON.stringify(failurePayload1)).toString('base64') - ); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.KinesisDataStreams, { - schema: Kinesis.schema, - }); - - // Act - processor.register(records, kinesisRecordHandler, options); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages[0]).toStrictEqual([ - 'success', - successPayload1, - firstRecord, - ]); - expect(processor.failureMessages.length).toBe(1); - expect(processor.response()).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: secondRecord.kinesis.sequenceNumber }, - ], - }); - }); - - it('completes processing with all failures if all the payload does not match the passed schema', async () => { - // Prepare - const firstRecord = kinesisRecordFactory( - Buffer.from(JSON.stringify(failurePayload1)).toString('base64') - ); - const secondRecord = kinesisRecordFactory( - Buffer.from(JSON.stringify(failurePayload2)).toString('base64') - ); - - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.KinesisDataStreams, { - schema: Kinesis.schema, - }); - - // Act - processor.register(records, sqsRecordHandler, options); - - // Assess - await expect(processor.process()).rejects.toThrowError( - FullBatchFailureError - ); - }); - } - ); - - describe.each(cases)( - 'DynamoDB Record Schema $description', - ({ DynamoDB }) => { - it('completes the processing with no failures and parses the payload before passing to the record handler', async () => { - // Prepare - const firstRecord = dynamodbRecordFactory(successPayload1.Message); - const secondRecord = dynamodbRecordFactory(successPayload2.Message); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.DynamoDBStreams, { - schema: DynamoDB.schema, - }); - - // Act - processor.register(records, dynamodbRecordHandler, options); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages).toStrictEqual([ - ['success', successPayload1, firstRecord], - ['success', successPayload2, secondRecord], - ]); - }); - - it('completes the processing with failures if some of the payload does not match the passed schema', async () => { - // Prepare - const firstRecord = dynamodbRecordFactory(successPayload1.Message); - //@ts-expect-error Passing an invalid payload for testing - const secondRecord = dynamodbRecordFactory(failurePayload1.Message); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.DynamoDBStreams, { - schema: DynamoDB.schema, - }); - - // Act - processor.register(records, dynamodbRecordHandler, options); - const processedMessages = await processor.process(); - - // Assess - expect(processedMessages[0]).toStrictEqual([ - 'success', - successPayload1, - firstRecord, - ]); - expect(processor.failureMessages.length).toBe(1); - expect(processor.response()).toStrictEqual({ - batchItemFailures: [ - { itemIdentifier: secondRecord.dynamodb?.SequenceNumber }, - ], - }); - }); - - it('completes processing with all failures if all the payload does not match the passed schema', async () => { - // Prepare - //@ts-expect-error Passing an invalid payload for testing - const firstRecord = dynamodbRecordFactory(failurePayload1.Message); - //@ts-expect-error Passing an invalid payload for testing - const secondRecord = dynamodbRecordFactory(failurePayload2.Message); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.DynamoDBStreams, { - schema: DynamoDB.schema, - }); - - // Act - processor.register(records, dynamodbRecordHandler, options); - - // Assess - await expect(processor.process()).rejects.toThrowError( - FullBatchFailureError - ); - }); - } - ); - - it('completes processing with all failures if an unsupported event type is used for parsing', async () => { - // Prepare - const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); - const secondRecord = sqsRecordFactory(JSON.stringify(successPayload2)); - const records = [firstRecord, secondRecord]; - //@ts-expect-error - const processor = new BatchProcessor('invalid-event-type', { - schema: customSchema, - }); - - // Act - processor.register(records, sqsRecordHandler, options); - - // Assess - await expect(processor.process()).rejects.toThrowError( - FullBatchFailureError - ); - }); - - it('completes processing with failures if an unsupported schema type is used for parsing', async () => { - // Prepare - const unsupportedSchema = v.object({ - Message: v.string(), - }); - const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); - const secondRecord = sqsRecordFactory(JSON.stringify(successPayload2)); - const records = [firstRecord, secondRecord]; - const processor = new BatchProcessor(EventType.SQS, { - schema: unsupportedSchema, - }); - - // Act - processor.register(records, sqsRecordHandler, options); - - // Assess - await expect(processor.process()).rejects.toThrowError( - FullBatchFailureError - ); - }); - }); }); diff --git a/packages/batch/tests/unit/parsing.test.ts b/packages/batch/tests/unit/parsing.test.ts new file mode 100644 index 0000000000..231ec162a2 --- /dev/null +++ b/packages/batch/tests/unit/parsing.test.ts @@ -0,0 +1,383 @@ +import { + Base64Encoded, + JSONStringified, +} from '@aws-lambda-powertools/parser/helpers'; +import { DynamoDBMarshalled } from '@aws-lambda-powertools/parser/helpers/dynamodb'; +import { + KinesisDataStreamRecord, + SqsRecordSchema, +} from '@aws-lambda-powertools/parser/schemas'; +import { + DynamoDBStreamChangeRecordBase, + DynamoDBStreamRecord, +} from '@aws-lambda-powertools/parser/schemas/dynamodb'; +import { KinesisDataStreamRecordPayload } from '@aws-lambda-powertools/parser/schemas/kinesis'; +import type { + DynamoDBRecord, + KinesisStreamRecord, + SQSRecord, +} from 'aws-lambda'; +import { object, string } from 'valibot'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; +import { z } from 'zod'; +import { + BatchProcessor, + EventType, + FullBatchFailureError, +} from '../../src/index.js'; +import { parser } from '../../src/parser.js'; +import { + dynamodbRecordFactory, + kinesisRecordFactory, + sqsRecordFactory, +} from '../helpers/factories.js'; + +describe('Batch processing with Parser Integration', () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + const customSchema = z.object({ + Message: z.string(), + }); + const successPayload1 = { + Message: 'test-1', + }; + const successPayload2 = { + Message: 'test-2', + }; + const failurePayload1 = { + Message: 1, + }; + const sqsRecordHandler = async (parsedRecord: SQSRecord) => { + return parsedRecord.body; + }; + const kinesisRecordHandler = async (parsedRecord: KinesisStreamRecord) => { + return parsedRecord.kinesis.data; + }; + const dynamodbRecordHandler = async (parsedRecord: DynamoDBRecord) => { + return parsedRecord.dynamodb?.NewImage; + }; + const cases = [ + { + description: 'passing Extended Schema', + SQSParserConfig: { + parser, + schema: SqsRecordSchema.extend({ + body: JSONStringified(customSchema), + }), + }, + KinesisParserConfig: { + parser, + schema: KinesisDataStreamRecord.extend({ + kinesis: KinesisDataStreamRecordPayload.extend({ + data: Base64Encoded(customSchema).optional(), + }), + }), + }, + DynamoDBParserConfig: { + parser, + schema: DynamoDBStreamRecord.extend({ + dynamodb: DynamoDBStreamChangeRecordBase.extend({ + NewImage: DynamoDBMarshalled(customSchema).optional(), + }), + }), + }, + }, + { + description: 'passing Internal Schema without transformer property set', + SQSParserConfig: { + parser, + innerSchema: JSONStringified(customSchema), + }, + KinesisParserConfig: { + parser, + innerSchema: Base64Encoded(customSchema), + }, + DynamoDBParserConfig: { + parser, + innerSchema: DynamoDBMarshalled(customSchema), + }, + }, + { + description: 'passing Internal Schema with transformer property set', + SQSParserConfig: { + parser, + innerSchema: customSchema, + transformer: 'json' as const, + }, + KinesisParserConfig: { + parser, + innerSchema: customSchema, + transformer: 'base64' as const, + }, + DynamoDBParserConfig: { + parser, + innerSchema: customSchema, + transformer: 'unmarshall' as const, + }, + }, + ]; + describe.each(cases)( + 'SQS Record Schema $description', + ({ SQSParserConfig }) => { + it('completes the processing with no failures and parses the payload before passing to the record handler', async () => { + // Prepare + const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); + const secondRecord = sqsRecordFactory(JSON.stringify(successPayload2)); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.SQS, SQSParserConfig); + + // Act + processor.register(records, sqsRecordHandler); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', successPayload1, firstRecord], + ['success', successPayload2, secondRecord], + ]); + }); + + it('completes the processing with failures if some of the payload does not match the passed schema', async () => { + // Prepare + const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); + const secondRecord = sqsRecordFactory(JSON.stringify(failurePayload1)); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.SQS, SQSParserConfig); + + // Act + processor.register(records, sqsRecordHandler); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages[0]).toStrictEqual([ + 'success', + successPayload1, + firstRecord, + ]); + expect(processor.failureMessages.length).toBe(1); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [{ itemIdentifier: secondRecord.messageId }], + }); + }); + } + ); + + describe.each(cases)( + 'Kinesis Record Schema $description', + ({ KinesisParserConfig }) => { + it('completes the processing with no failures and parses the payload before passing to the record handler', async () => { + // Prepare + const firstRecord = kinesisRecordFactory( + Buffer.from(JSON.stringify(successPayload1)).toString('base64') + ); + const secondRecord = kinesisRecordFactory( + Buffer.from(JSON.stringify(successPayload2)).toString('base64') + ); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor( + EventType.KinesisDataStreams, + KinesisParserConfig + ); + + // Act + processor.register(records, kinesisRecordHandler); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', successPayload1, firstRecord], + ['success', successPayload2, secondRecord], + ]); + }); + + it('completes the processing with failures if some of the payload does not match the passed schema', async () => { + // Prepare + const firstRecord = kinesisRecordFactory( + Buffer.from(JSON.stringify(successPayload1)).toString('base64') + ); + const secondRecord = kinesisRecordFactory( + Buffer.from(JSON.stringify(failurePayload1)).toString('base64') + ); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor( + EventType.KinesisDataStreams, + KinesisParserConfig + ); + + // Act + processor.register(records, kinesisRecordHandler); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages[0]).toStrictEqual([ + 'success', + successPayload1, + firstRecord, + ]); + expect(processor.failureMessages.length).toBe(1); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: secondRecord.kinesis.sequenceNumber }, + ], + }); + }); + } + ); + + describe.each(cases)( + 'DynamoDB Record Schema $description', + ({ DynamoDBParserConfig }) => { + it('completes the processing with no failures and parses the payload before passing to the record handler', async () => { + // Prepare + const firstRecord = dynamodbRecordFactory(successPayload1.Message); + const secondRecord = dynamodbRecordFactory(successPayload2.Message); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor( + EventType.DynamoDBStreams, + DynamoDBParserConfig + ); + + // Act + processor.register(records, dynamodbRecordHandler); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', successPayload1, firstRecord], + ['success', successPayload2, secondRecord], + ]); + }); + + it('completes the processing with failures if some of the payload does not match the passed schema', async () => { + // Prepare + vi.stubEnv('AWS_LAMBDA_LOG_LEVEL', 'DEBUG'); + const firstRecord = dynamodbRecordFactory(successPayload1.Message); + //@ts-expect-error Passing an invalid payload for testing + const secondRecord = dynamodbRecordFactory(failurePayload1.Message); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.DynamoDBStreams, { + ...DynamoDBParserConfig, + logger: console, + }); + + // Act + processor.register(records, dynamodbRecordHandler); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages[0]).toStrictEqual([ + 'success', + successPayload1, + firstRecord, + ]); + expect(processor.failureMessages.length).toBe(1); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: secondRecord.dynamodb?.SequenceNumber }, + ], + }); + expect(console.debug).toHaveBeenCalledWith( + 'dynamodb.NewImage.Message: Invalid input: expected string, received number' + ); + }); + } + ); + + it('completes processing with all failures if an unsupported event type is used for parsing', async () => { + // Prepare + const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); + const secondRecord = sqsRecordFactory(JSON.stringify(successPayload2)); + const records = [firstRecord, secondRecord]; + //@ts-expect-error + const processor = new BatchProcessor('invalid-event-type', { + innerSchema: customSchema, + transformer: 'json', + }); + + // Act + processor.register(records, sqsRecordHandler); + + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); + }); + + it('completes processing with failures if an unsupported schema type is used for parsing', async () => { + // Prepare + const unsupportedSchema = object({ + Message: string(), + }); + const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); + const secondRecord = sqsRecordFactory(JSON.stringify(successPayload2)); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.SQS, { + parser, + // @ts-expect-error - we are explicitly testing a wrong schema vendor + innerSchema: unsupportedSchema, + transformer: 'json', + }); + + // Act + processor.register(records, sqsRecordHandler); + + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); + }); + + it('completes processing with failures if the schema is not passed for the parsing', async () => { + // Prepare + const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); + const secondRecord = sqsRecordFactory(JSON.stringify(successPayload2)); + const records = [firstRecord, secondRecord]; + + const processor = new BatchProcessor(EventType.SQS, { + parser, + transformer: 'json', + }); + + // Act + processor.register(records, sqsRecordHandler); + + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); + }); + + it('uses a custom logger when provided', async () => { + // Prepare + const logger = { + debug: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }; + const unsupportedSchema = object({ + Message: string(), + }); + const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); + const secondRecord = sqsRecordFactory(JSON.stringify(successPayload2)); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.SQS, { + parser, + // @ts-expect-error - we are explicitly testing a wrong schema vendor + innerSchema: unsupportedSchema, + transformer: 'json', + logger, + }); + + // Act + processor.register(records, sqsRecordHandler); + + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); + expect(logger.error).toHaveBeenCalledWith( + 'The schema provided is not supported. Only Zod schemas are supported for extension.' + ); + }); +}); diff --git a/packages/batch/vitest.config.ts b/packages/batch/vitest.config.ts index d5aa737c68..9f1196ef1f 100644 --- a/packages/batch/vitest.config.ts +++ b/packages/batch/vitest.config.ts @@ -3,5 +3,6 @@ import { defineProject } from 'vitest/config'; export default defineProject({ test: { environment: 'node', + setupFiles: ['../testing/src/setupEnv.ts'], }, });