diff --git a/package-lock.json b/package-lock.json index 58b25704c5..b414fbfb70 100644 --- a/package-lock.json +++ b/package-lock.json @@ -16,9 +16,9 @@ "packages/tracer", "packages/parameters", "packages/idempotency", + "packages/parser", "packages/batch", "packages/testing", - "packages/parser", "examples/snippets", "layers", "examples/app", @@ -10734,6 +10734,7 @@ "version": "2.25.2", "license": "MIT-0", "devDependencies": { + "@aws-lambda-powertools/parser": "2.25.2", "@aws-lambda-powertools/testing-utils": "file:../testing" } }, diff --git a/package.json b/package.json index c7192c6a4d..d82f098014 100644 --- a/package.json +++ b/package.json @@ -10,9 +10,9 @@ "packages/tracer", "packages/parameters", "packages/idempotency", + "packages/parser", "packages/batch", "packages/testing", - "packages/parser", "examples/snippets", "layers", "examples/app", diff --git a/packages/batch/package.json b/packages/batch/package.json index e91a4f2a30..42396f9918 100644 --- a/packages/batch/package.json +++ b/packages/batch/package.json @@ -73,6 +73,7 @@ "nodejs" ], "devDependencies": { - "@aws-lambda-powertools/testing-utils": "file:../testing" + "@aws-lambda-powertools/testing-utils": "file:../testing", + "@aws-lambda-powertools/parser": "2.25.2" } } diff --git a/packages/batch/src/BasePartialBatchProcessor.ts b/packages/batch/src/BasePartialBatchProcessor.ts index f4ac5ac6c9..529a3870d9 100644 --- a/packages/batch/src/BasePartialBatchProcessor.ts +++ b/packages/batch/src/BasePartialBatchProcessor.ts @@ -1,3 +1,4 @@ +import type { StandardSchemaV1 } from '@standard-schema/spec'; import type { DynamoDBRecord, KinesisStreamRecord, @@ -11,6 +12,7 @@ import { } from './constants.js'; import { FullBatchFailureError } from './errors.js'; import type { + BasePartialBatchProcessorConfig, EventSourceDataClassTypes, PartialItemFailureResponse, PartialItemFailures, @@ -42,12 +44,20 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor { */ public eventType: keyof typeof EventType; + /** + * The schema of the body of the event record for parsing + */ + protected schema?: StandardSchemaV1; + /** * Initializes base batch processing class * * @param eventType The type of event to process (SQS, Kinesis, DynamoDB) */ - public constructor(eventType: keyof typeof EventType) { + public constructor( + eventType: keyof typeof EventType, + config?: BasePartialBatchProcessorConfig + ) { super(); this.eventType = eventType; this.batchResponse = DEFAULT_RESPONSE; @@ -56,6 +66,9 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor { [EventType.KinesisDataStreams]: () => this.collectKinesisFailures(), [EventType.DynamoDBStreams]: () => this.collectDynamoDBFailures(), }; + if (config) { + this.schema = config.schema; + } } /** diff --git a/packages/batch/src/BasePartialProcessor.ts b/packages/batch/src/BasePartialProcessor.ts index 1640cc4355..23b7cdc9cc 100644 --- a/packages/batch/src/BasePartialProcessor.ts +++ b/packages/batch/src/BasePartialProcessor.ts @@ -74,8 +74,8 @@ abstract class BasePartialProcessor { * This method should be called when a record fails processing so that * the processor can keep track of the error and the record that failed. * - * @param record Record that failed processing - * @param error Error that was thrown + * @param record - Record that failed processing + * @param error - Error that was thrown */ public failureHandler( record: EventSourceDataClassTypes, @@ -131,7 +131,7 @@ abstract class BasePartialProcessor { * This is to ensure that the processor keeps track of the results and the records * that succeeded and failed processing. * - * @param record Record to be processed + * @param record - Record to be processed */ public abstract processRecord( record: BaseRecord @@ -149,7 +149,7 @@ abstract class BasePartialProcessor { * This is to ensure that the processor keeps track of the results and the records * that succeeded and failed processing. * - * @param record Record to be processed + * @param record - Record to be processed */ public abstract processRecordSync( record: BaseRecord @@ -198,9 +198,9 @@ abstract class BasePartialProcessor { * to allow for reusing the processor instance across multiple invocations * by instantiating the processor outside of the Lambda function handler. * - * @param records Array of records to be processed - * @param handler CallableFunction to process each record from the batch - * @param options Options to be used during processing (optional) + * @param records - Array of records to be processed + * @param handler - CallableFunction to process each record from the batch + * @param options - Options to be used during processing (optional) */ public register( records: BaseRecord[], @@ -223,8 +223,8 @@ abstract class BasePartialProcessor { * This method should be called when a record succeeds processing so that * the processor can keep track of the result and the record that succeeded. * - * @param record Record that succeeded processing - * @param result Result from record handler + * @param record - Record that succeeded processing + * @param result - Result from record handler */ public successHandler( record: EventSourceDataClassTypes, diff --git a/packages/batch/src/BatchProcessor.ts b/packages/batch/src/BatchProcessor.ts index 4cff0ee8c9..f6221e9e64 100644 --- a/packages/batch/src/BatchProcessor.ts +++ b/packages/batch/src/BatchProcessor.ts @@ -1,6 +1,14 @@ +import type { StandardSchemaV1 } from '@standard-schema/spec'; +import type { StreamRecord } from 'aws-lambda'; import { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js'; +import { EventType, SchemaVendor } from './constants.js'; import { BatchProcessingError } from './errors.js'; -import type { BaseRecord, FailureResponse, SuccessResponse } from './types.js'; +import type { + BaseRecord, + EventSourceDataClassTypes, + FailureResponse, + SuccessResponse, +} from './types.js'; /** * Process records in a batch asynchronously and handle partial failure cases. @@ -79,7 +87,7 @@ import type { BaseRecord, FailureResponse, SuccessResponse } from './types.js'; * }); * ``` * - * @param eventType The type of event to process (SQS, Kinesis, DynamoDB) + * @param eventType - The type of event to process (SQS, Kinesis, DynamoDB) */ class BatchProcessor extends BasePartialBatchProcessor { /** @@ -94,13 +102,17 @@ class BatchProcessor extends BasePartialBatchProcessor { * If the handler function completes successfully, the method returns a success response. * Otherwise, it returns a failure response with the error that occurred during processing. * - * @param record The record to be processed + * @param record - The record to be processed */ public async processRecord( record: BaseRecord ): Promise { try { - const data = this.toBatchType(record, this.eventType); + const recordToProcess = + this.schema == null + ? record + : await this.#parseRecord(record, this.eventType, this.schema); + const data = this.toBatchType(recordToProcess, this.eventType); const result = await this.handler(data, this.options?.context); return this.successHandler(record, result); @@ -112,7 +124,7 @@ class BatchProcessor extends BasePartialBatchProcessor { /** * @throws {BatchProcessingError} This method is not implemented for synchronous processing. * - * @param _record The record to be processed + * @param _record - The record to be processed */ public processRecordSync( _record: BaseRecord @@ -121,6 +133,163 @@ class BatchProcessor extends BasePartialBatchProcessor { 'Not implemented. Use asyncProcess() instead.' ); } + + /** + * Extend the schema according to the event type passed. + * + * If useTransformers is true, extend using opinionated transformers. + * Otherwise, extend without any transformers. + * + * @param eventType - The type of event to process (SQS, Kinesis, DynamoDB) + * @param schema - The StandardSchema to be used for parsing + * @param useTransformers - Whether to use transformers for parsing + */ + async #createExtendedSchema(options: { + eventType: keyof typeof EventType; + schema: StandardSchemaV1; + useTransformers: boolean; + }) { + const { eventType, schema, useTransformers } = options; + switch (eventType) { + case EventType.SQS: { + if (useTransformers) { + const [{ JSONStringified }, { SqsRecordSchema }] = await Promise.all([ + import('@aws-lambda-powertools/parser/helpers'), + import('@aws-lambda-powertools/parser/schemas/sqs'), + ]); + return SqsRecordSchema.extend({ + body: JSONStringified(schema as any), + }); + } + const { SqsRecordSchema } = await import( + '@aws-lambda-powertools/parser/schemas/sqs' + ); + return SqsRecordSchema.extend({ body: schema }); + } + + case EventType.KinesisDataStreams: { + if (useTransformers) { + const [ + { Base64Encoded }, + { KinesisDataStreamRecord, KinesisDataStreamRecordPayload }, + ] = await Promise.all([ + import('@aws-lambda-powertools/parser/helpers'), + import('@aws-lambda-powertools/parser/schemas/kinesis'), + ]); + return KinesisDataStreamRecord.extend({ + kinesis: KinesisDataStreamRecordPayload.extend({ + data: Base64Encoded(schema as any), + }), + }); + } + const { KinesisDataStreamRecord, KinesisDataStreamRecordPayload } = + await import('@aws-lambda-powertools/parser/schemas/kinesis'); + return KinesisDataStreamRecord.extend({ + kinesis: KinesisDataStreamRecordPayload.extend({ data: schema }), + }); + } + + case EventType.DynamoDBStreams: { + if (useTransformers) { + const [ + { DynamoDBMarshalled }, + { DynamoDBStreamRecord, DynamoDBStreamChangeRecordBase }, + ] = await Promise.all([ + import('@aws-lambda-powertools/parser/helpers/dynamodb'), + import('@aws-lambda-powertools/parser/schemas/dynamodb'), + ]); + return DynamoDBStreamRecord.extend({ + dynamodb: DynamoDBStreamChangeRecordBase.extend({ + OldImage: DynamoDBMarshalled( + schema as any + ).optional(), + NewImage: DynamoDBMarshalled( + schema as any + ).optional(), + }), + }); + } + const { DynamoDBStreamRecord, DynamoDBStreamChangeRecordBase } = + await import('@aws-lambda-powertools/parser/schemas/dynamodb'); + return DynamoDBStreamRecord.extend({ + dynamodb: DynamoDBStreamChangeRecordBase.extend({ + OldImage: (schema as any).optional(), + NewImage: (schema as any).optional(), + }), + }); + } + + default: { + console.warn( + `The event type provided is not supported. Supported events: ${Object.values(EventType).join(',')}` + ); + throw new Error('Unsupported event type'); + } + } + } + + /** + * Parse the record according to the schema and event type passed. + * + * If the passed schema is already an extended schema, + * use the schema directly to parse the record. + * + * Only Zod Schemas are supported for schema extension. + * + * @param record - The record to be parsed + * @param eventType - The type of event to process + * @param schema - The StandardSchema to be used for parsing + */ + async #parseRecord( + record: EventSourceDataClassTypes, + eventType: keyof typeof EventType, + schema: StandardSchemaV1 + ): Promise { + const { parse } = await import('@aws-lambda-powertools/parser'); + // Try parsing with the original schema first + const extendedSchemaParsing = parse(record, undefined, schema, true); + if (extendedSchemaParsing.success) { + return extendedSchemaParsing.data as EventSourceDataClassTypes; + } + // Only proceed with schema extension if it's a Zod schema + if (schema['~standard'].vendor !== SchemaVendor.Zod) { + console.warn( + 'The schema provided is not supported. Only Zod schemas are supported for extension.' + ); + throw new Error('Unsupported schema type'); + } + // Handle schema extension based on event type + // Try without transformers first, then with transformers + const schemaWithoutTransformers = await this.#createExtendedSchema({ + eventType, + schema, + useTransformers: false, + }); + const schemaWithoutTransformersParsing = parse( + record, + undefined, + schemaWithoutTransformers, + true + ); + if (schemaWithoutTransformersParsing.success) { + return schemaWithoutTransformersParsing.data as EventSourceDataClassTypes; + } + const schemaWithTransformers = await this.#createExtendedSchema({ + eventType, + schema, + useTransformers: true, + }); + const schemaWithTransformersParsing = parse( + record, + undefined, + schemaWithTransformers, + true + ); + if (schemaWithTransformersParsing.success) { + return schemaWithTransformersParsing.data as EventSourceDataClassTypes; + } + throw new Error('Failed to parse record'); + } } export { BatchProcessor }; diff --git a/packages/batch/src/constants.ts b/packages/batch/src/constants.ts index 159b12df6a..f6827774ef 100644 --- a/packages/batch/src/constants.ts +++ b/packages/batch/src/constants.ts @@ -17,6 +17,13 @@ const EventType = { DynamoDBStreams: 'DynamoDBStreams', } as const; +/** + * Enum of supported schema vendors for the utility + */ +const SchemaVendor = { + Zod: 'zod', +} as const; + /** * Default response for the partial batch processor */ @@ -35,4 +42,4 @@ const DATA_CLASS_MAPPING = { record as DynamoDBRecord, }; -export { EventType, DEFAULT_RESPONSE, DATA_CLASS_MAPPING }; +export { EventType, SchemaVendor, DEFAULT_RESPONSE, DATA_CLASS_MAPPING }; diff --git a/packages/batch/src/types.ts b/packages/batch/src/types.ts index e94b811f6c..b5781c44c8 100644 --- a/packages/batch/src/types.ts +++ b/packages/batch/src/types.ts @@ -1,9 +1,11 @@ +import type { StandardSchemaV1 } from '@standard-schema/spec'; import type { Context, DynamoDBRecord, KinesisStreamRecord, SQSRecord, } from 'aws-lambda'; + import type { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js'; import type { SqsFifoPartialProcessor } from './SqsFifoPartialProcessor.js'; import type { SqsFifoPartialProcessorAsync } from './SqsFifoPartialProcessorAsync.js'; @@ -89,6 +91,21 @@ type PartialItemFailures = { itemIdentifier: string }; */ type PartialItemFailureResponse = { batchItemFailures: PartialItemFailures[] }; +/** + * Type representing the configuration options passed to the BasePartialBatchProcessor class. + * + * @property schema - The schema to be used for parsing + */ +type BasePartialBatchProcessorConfig = { + /** + * The schema be either of the following: + * 1. An internal schema of the payload of the supported event types. + * 2. An internal schema along with helper transformer functions. + * 3. An extended schema of the supported event type. + */ + schema: StandardSchemaV1; +}; + export type { BatchProcessingOptions, BaseRecord, @@ -97,4 +114,5 @@ export type { FailureResponse, PartialItemFailures, PartialItemFailureResponse, + BasePartialBatchProcessorConfig, }; diff --git a/packages/batch/tests/unit/BatchProcessor.test.ts b/packages/batch/tests/unit/BatchProcessor.test.ts index 650354d968..020038b3fc 100644 --- a/packages/batch/tests/unit/BatchProcessor.test.ts +++ b/packages/batch/tests/unit/BatchProcessor.test.ts @@ -1,6 +1,27 @@ +import { + Base64Encoded, + JSONStringified, +} from '@aws-lambda-powertools/parser/helpers'; +import { DynamoDBMarshalled } from '@aws-lambda-powertools/parser/helpers/dynamodb'; +import { + KinesisDataStreamRecord, + SqsRecordSchema, +} from '@aws-lambda-powertools/parser/schemas'; +import { + DynamoDBStreamChangeRecordBase, + DynamoDBStreamRecord, +} from '@aws-lambda-powertools/parser/schemas/dynamodb'; +import { KinesisDataStreamRecordPayload } from '@aws-lambda-powertools/parser/schemas/kinesis'; import context from '@aws-lambda-powertools/testing-utils/context'; -import type { Context } from 'aws-lambda'; +import type { + Context, + DynamoDBRecord, + KinesisStreamRecord, + SQSRecord, +} from 'aws-lambda'; +import * as v from 'valibot'; import { afterAll, beforeEach, describe, expect, it, vi } from 'vitest'; +import { z } from 'zod'; import { BatchProcessingError, BatchProcessor, @@ -286,4 +307,340 @@ describe('Class: AsyncBatchProcessor', () => { // Act & Assess expect(() => processor.processSync()).toThrowError(BatchProcessingError); }); + + describe('Batch processing with Parser Integration', () => { + const customSchema = z.object({ + Message: z.string(), + }); + const successPayload1 = { + Message: 'test-1', + }; + const successPayload2 = { + Message: 'test-2', + }; + const failurePayload1 = { + Message: 1, + }; + const failurePayload2 = { + Message: 2, + }; + const sqsRecordHandler = async (parsedRecord: SQSRecord) => { + return parsedRecord.body; + }; + const kinesisRecordHandler = async (parsedRecord: KinesisStreamRecord) => { + return parsedRecord.kinesis.data; + }; + const dynamodbRecordHandler = async (parsedRecord: DynamoDBRecord) => { + return parsedRecord.dynamodb?.NewImage; + }; + const cases = [ + { + description: 'passing Extended Schema', + SQS: { + schema: SqsRecordSchema.extend({ + body: JSONStringified(customSchema), + }), + }, + Kinesis: { + schema: KinesisDataStreamRecord.extend({ + kinesis: KinesisDataStreamRecordPayload.extend({ + data: Base64Encoded(customSchema).optional(), + }), + }), + }, + DynamoDB: { + schema: DynamoDBStreamRecord.extend({ + dynamodb: DynamoDBStreamChangeRecordBase.extend({ + NewImage: DynamoDBMarshalled(customSchema).optional(), + }), + }), + }, + }, + { + description: 'passing Internal Schema without transformers', + SQS: { + schema: customSchema, + }, + Kinesis: { + schema: customSchema, + }, + DynamoDB: { + schema: customSchema, + }, + }, + { + description: 'passing Internal Schema with transformers', + SQS: { + schema: JSONStringified(customSchema), + }, + Kinesis: { + schema: Base64Encoded(customSchema), + }, + DynamoDB: { + schema: DynamoDBMarshalled(customSchema), + }, + }, + ]; + describe.each(cases)('SQS Record Schema $description', ({ SQS }) => { + it('completes the processing with no failures and parses the payload before passing to the record handler', async () => { + // Prepare + const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); + const secondRecord = sqsRecordFactory(JSON.stringify(successPayload2)); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.SQS, { + schema: SQS.schema, + }); + + // Act + processor.register(records, sqsRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', successPayload1, firstRecord], + ['success', successPayload2, secondRecord], + ]); + }); + + it('completes the processing with failures if some of the payload does not match the passed schema', async () => { + // Prepare + const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); + const secondRecord = sqsRecordFactory(JSON.stringify(failurePayload1)); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.SQS, { + schema: SQS.schema, + }); + + // Act + processor.register(records, sqsRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages[0]).toStrictEqual([ + 'success', + successPayload1, + firstRecord, + ]); + expect(processor.failureMessages.length).toBe(1); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [{ itemIdentifier: secondRecord.messageId }], + }); + }); + + it('completes processing with all failures if all the payload does not match the passed schema', async () => { + // Prepare + const firstRecord = sqsRecordFactory(JSON.stringify(failurePayload1)); + const secondRecord = sqsRecordFactory(JSON.stringify(failurePayload2)); + + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.SQS, { + schema: SQS.schema, + }); + + // Act + processor.register(records, sqsRecordHandler, options); + + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); + }); + }); + + describe.each(cases)( + 'Kinesis Record Schema $description', + ({ Kinesis }) => { + it('completes the processing with no failures and parses the payload before passing to the record handler', async () => { + // Prepare + const firstRecord = kinesisRecordFactory( + Buffer.from(JSON.stringify(successPayload1)).toString('base64') + ); + const secondRecord = kinesisRecordFactory( + Buffer.from(JSON.stringify(successPayload2)).toString('base64') + ); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.KinesisDataStreams, { + schema: Kinesis.schema, + }); + + // Act + processor.register(records, kinesisRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', successPayload1, firstRecord], + ['success', successPayload2, secondRecord], + ]); + }); + + it('completes the processing with failures if some of the payload does not match the passed schema', async () => { + // Prepare + const firstRecord = kinesisRecordFactory( + Buffer.from(JSON.stringify(successPayload1)).toString('base64') + ); + const secondRecord = kinesisRecordFactory( + Buffer.from(JSON.stringify(failurePayload1)).toString('base64') + ); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.KinesisDataStreams, { + schema: Kinesis.schema, + }); + + // Act + processor.register(records, kinesisRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages[0]).toStrictEqual([ + 'success', + successPayload1, + firstRecord, + ]); + expect(processor.failureMessages.length).toBe(1); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: secondRecord.kinesis.sequenceNumber }, + ], + }); + }); + + it('completes processing with all failures if all the payload does not match the passed schema', async () => { + // Prepare + const firstRecord = kinesisRecordFactory( + Buffer.from(JSON.stringify(failurePayload1)).toString('base64') + ); + const secondRecord = kinesisRecordFactory( + Buffer.from(JSON.stringify(failurePayload2)).toString('base64') + ); + + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.KinesisDataStreams, { + schema: Kinesis.schema, + }); + + // Act + processor.register(records, sqsRecordHandler, options); + + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); + }); + } + ); + + describe.each(cases)( + 'DynamoDB Record Schema $description', + ({ DynamoDB }) => { + it('completes the processing with no failures and parses the payload before passing to the record handler', async () => { + // Prepare + const firstRecord = dynamodbRecordFactory(successPayload1.Message); + const secondRecord = dynamodbRecordFactory(successPayload2.Message); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.DynamoDBStreams, { + schema: DynamoDB.schema, + }); + + // Act + processor.register(records, dynamodbRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages).toStrictEqual([ + ['success', successPayload1, firstRecord], + ['success', successPayload2, secondRecord], + ]); + }); + + it('completes the processing with failures if some of the payload does not match the passed schema', async () => { + // Prepare + const firstRecord = dynamodbRecordFactory(successPayload1.Message); + //@ts-expect-error Passing an invalid payload for testing + const secondRecord = dynamodbRecordFactory(failurePayload1.Message); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.DynamoDBStreams, { + schema: DynamoDB.schema, + }); + + // Act + processor.register(records, dynamodbRecordHandler, options); + const processedMessages = await processor.process(); + + // Assess + expect(processedMessages[0]).toStrictEqual([ + 'success', + successPayload1, + firstRecord, + ]); + expect(processor.failureMessages.length).toBe(1); + expect(processor.response()).toStrictEqual({ + batchItemFailures: [ + { itemIdentifier: secondRecord.dynamodb?.SequenceNumber }, + ], + }); + }); + + it('completes processing with all failures if all the payload does not match the passed schema', async () => { + // Prepare + //@ts-expect-error Passing an invalid payload for testing + const firstRecord = dynamodbRecordFactory(failurePayload1.Message); + //@ts-expect-error Passing an invalid payload for testing + const secondRecord = dynamodbRecordFactory(failurePayload2.Message); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.DynamoDBStreams, { + schema: DynamoDB.schema, + }); + + // Act + processor.register(records, dynamodbRecordHandler, options); + + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); + }); + } + ); + + it('completes processing with all failures if an unsupported event type is used for parsing', async () => { + // Prepare + const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); + const secondRecord = sqsRecordFactory(JSON.stringify(successPayload2)); + const records = [firstRecord, secondRecord]; + //@ts-expect-error + const processor = new BatchProcessor('invalid-event-type', { + schema: customSchema, + }); + + // Act + processor.register(records, sqsRecordHandler, options); + + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); + }); + + it('completes processing with failures if an unsupported schema type is used for parsing', async () => { + // Prepare + const unsupportedSchema = v.object({ + Message: v.string(), + }); + const firstRecord = sqsRecordFactory(JSON.stringify(successPayload1)); + const secondRecord = sqsRecordFactory(JSON.stringify(successPayload2)); + const records = [firstRecord, secondRecord]; + const processor = new BatchProcessor(EventType.SQS, { + schema: unsupportedSchema, + }); + + // Act + processor.register(records, sqsRecordHandler, options); + + // Assess + await expect(processor.process()).rejects.toThrowError( + FullBatchFailureError + ); + }); + }); });