Skip to content

Commit f1e8f70

Browse files
committed
Added tests for parser integration with kinesis and some refactoring
1 parent b33832f commit f1e8f70

File tree

2 files changed

+269
-69
lines changed

2 files changed

+269
-69
lines changed

packages/batch/src/BatchProcessor.ts

Lines changed: 73 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -140,103 +140,107 @@ class BatchProcessor extends BasePartialBatchProcessor {
140140
}
141141

142142
/**
143-
* Parse the record according to the schema passed.
144-
*
145-
* If the passed schema is already an extended schema,
146-
* it directly uses the schema to parse the record
147-
*
148-
* If the passed schema is an internal payload schema,
149-
* it checks whether it is a zod schema and
150-
* then extends the zod schema according to the passed event type for parsing
143+
* Create an extended schema according to the event type passed.
151144
*
152-
* @param record The record to be parsed
153-
* @param eventType The type of event to process
145+
* @param eventType The type of event to process (SQS, Kinesis, DynamoDB)
154146
* @param schema The StandardSchema to be used for parsing
155147
*/
156-
private async parseRecord(
157-
record: EventSourceDataClassTypes,
148+
private async createExtendedSchema(
158149
eventType: keyof typeof EventType,
159150
schema: StandardSchemaV1
160-
): Promise<SQSRecord | KinesisStreamRecord | DynamoDBRecord> {
161-
const { parse } = await import('@aws-lambda-powertools/parser');
162-
if (eventType === EventType.SQS) {
163-
const extendedSchemaParsing = parse(record, undefined, schema, true);
164-
if (extendedSchemaParsing.success)
165-
return extendedSchemaParsing.data as SQSRecord;
166-
if (schema['~standard'].vendor === SchemaType.Zod) {
167-
const { JSONStringified } = await import(
168-
'@aws-lambda-powertools/parser/helpers'
169-
);
170-
const { SqsRecordSchema } = await import(
171-
'@aws-lambda-powertools/parser/schemas/sqs'
172-
);
173-
const extendedSchema = SqsRecordSchema.extend({
174-
// biome-ignore lint/suspicious/noExplicitAny: The vendor field in the schema is verified that the schema is a Zod schema
151+
) {
152+
switch (eventType) {
153+
case EventType.SQS: {
154+
const [{ JSONStringified }, { SqsRecordSchema }] = await Promise.all([
155+
import('@aws-lambda-powertools/parser/helpers'),
156+
import('@aws-lambda-powertools/parser/schemas/sqs'),
157+
]);
158+
return SqsRecordSchema.extend({
175159
body: JSONStringified(schema as any),
176160
});
177-
return parse(record, undefined, extendedSchema);
178161
}
179-
console.warn(
180-
'The schema provided is not supported. Only Zod schemas are supported for extension.'
181-
);
182-
throw new Error('Unsupported schema type');
183-
}
184-
if (eventType === EventType.KinesisDataStreams) {
185-
const extendedSchemaParsing = parse(record, undefined, schema, true);
186-
if (extendedSchemaParsing.success)
187-
return extendedSchemaParsing.data as KinesisStreamRecord;
188-
if (schema['~standard'].vendor === SchemaType.Zod) {
189-
const { Base64Encoded } = await import(
190-
'@aws-lambda-powertools/parser/helpers'
191-
);
192-
const { KinesisDataStreamRecord, KinesisDataStreamRecordPayload } =
193-
await import('@aws-lambda-powertools/parser/schemas/kinesis');
194-
const extendedSchema = KinesisDataStreamRecord.extend({
162+
case EventType.KinesisDataStreams: {
163+
const [
164+
{ Base64Encoded },
165+
{ KinesisDataStreamRecord, KinesisDataStreamRecordPayload },
166+
] = await Promise.all([
167+
import('@aws-lambda-powertools/parser/helpers'),
168+
import('@aws-lambda-powertools/parser/schemas/kinesis'),
169+
]);
170+
return KinesisDataStreamRecord.extend({
195171
kinesis: KinesisDataStreamRecordPayload.extend({
196-
// biome-ignore lint/suspicious/noExplicitAny: The vendor field in the schema is verified that the schema is a Zod schema
197172
data: Base64Encoded(schema as any),
198173
}),
199174
});
200-
return parse(record, undefined, extendedSchema);
201175
}
202-
console.warn(
203-
'The schema provided is not supported. Only Zod schemas are supported for extension.'
204-
);
205-
throw new Error('Unsupported schema type');
206-
}
207-
if (eventType === EventType.DynamoDBStreams) {
208-
const extendedSchemaParsing = parse(record, undefined, schema, true);
209-
if (extendedSchemaParsing.success)
210-
return extendedSchemaParsing.data as DynamoDBRecord;
211-
if (schema['~standard'].vendor === SchemaType.Zod) {
212-
const { DynamoDBMarshalled } = await import(
213-
'@aws-lambda-powertools/parser/helpers/dynamodb'
214-
);
215-
const { DynamoDBStreamRecord, DynamoDBStreamChangeRecordBase } =
216-
await import('@aws-lambda-powertools/parser/schemas/dynamodb');
217-
const extendedSchema = DynamoDBStreamRecord.extend({
176+
case EventType.DynamoDBStreams: {
177+
const [
178+
{ DynamoDBMarshalled },
179+
{ DynamoDBStreamRecord, DynamoDBStreamChangeRecordBase },
180+
] = await Promise.all([
181+
import('@aws-lambda-powertools/parser/helpers/dynamodb'),
182+
import('@aws-lambda-powertools/parser/schemas/dynamodb'),
183+
]);
184+
return DynamoDBStreamRecord.extend({
218185
dynamodb: DynamoDBStreamChangeRecordBase.extend({
219-
// biome-ignore lint/suspicious/noExplicitAny: The vendor field in the schema is verified that the schema is a Zod schema
220186
OldImage: DynamoDBMarshalled<StreamRecord['OldImage']>(
221187
schema as any
222188
).optional(),
223-
// biome-ignore lint/suspicious/noExplicitAny: The vendor field in the schema is verified that the schema is a Zod schema
224189
NewImage: DynamoDBMarshalled<StreamRecord['NewImage']>(
225190
schema as any
226191
).optional(),
227192
}),
228193
});
229-
return parse(record, undefined, extendedSchema);
230194
}
195+
default:
196+
console.warn(
197+
`The event type provided is not supported. Supported events: ${Object.values(EventType).join(',')}`
198+
);
199+
throw new Error('Unsupported event type');
200+
}
201+
}
202+
203+
/**
204+
* Parse the record according to the schema passed.
205+
*
206+
* If the passed schema is already an extended schema,
207+
* it directly uses the schema to parse the record
208+
*
209+
* If the passed schema is an internal payload schema,
210+
* it checks whether it is a zod schema and
211+
* then extends the zod schema according to the passed event type for parsing
212+
*
213+
* @param record The record to be parsed
214+
* @param eventType The type of event to process
215+
* @param schema The StandardSchema to be used for parsing
216+
*/
217+
private async parseRecord(
218+
record: EventSourceDataClassTypes,
219+
eventType: keyof typeof EventType,
220+
schema: StandardSchemaV1
221+
): Promise<SQSRecord | KinesisStreamRecord | DynamoDBRecord> {
222+
const { parse } = await import('@aws-lambda-powertools/parser');
223+
// Try parsing with the original schema first
224+
const extendedSchemaParsing = parse(record, undefined, schema, true);
225+
if (extendedSchemaParsing.success) {
226+
return extendedSchemaParsing.data as
227+
| SQSRecord
228+
| KinesisStreamRecord
229+
| DynamoDBRecord;
230+
}
231+
// Only proceed with schema extension if it's a Zod schema
232+
if (schema['~standard'].vendor !== SchemaType.Zod) {
231233
console.warn(
232234
'The schema provided is not supported. Only Zod schemas are supported for extension.'
233235
);
234236
throw new Error('Unsupported schema type');
235237
}
236-
console.warn(
237-
`The event type provided is not supported. Supported events: ${Object.values(EventType).join(',')}`
238-
);
239-
throw new Error('Unsupported event type');
238+
// Handle schema extension based on event type
239+
const extendedSchema = await this.createExtendedSchema(eventType, schema);
240+
return parse(record, undefined, extendedSchema) as
241+
| SQSRecord
242+
| KinesisStreamRecord
243+
| DynamoDBRecord;
240244
}
241245
}
242246

packages/batch/tests/unit/BatchProcessor.test.ts

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -855,4 +855,200 @@ describe('Class: AsyncBatchProcessor', () => {
855855
);
856856
});
857857
});
858+
859+
describe('Batch processing with Parser Integration: Passing Internal DynamoDB Record Schema', () => {
860+
it('completes the processing with failures if some of the payload does not match the passed schema', async () => {
861+
// Prepare
862+
const customSchema = z.string();
863+
//@ts-expect-error Passing a number
864+
const firstRecord = kinesisRecordFactory(1);
865+
const secondRecord = kinesisRecordFactory('c3VjY2Vzcw==');
866+
const records = [firstRecord, secondRecord];
867+
const processor = new BatchProcessor(EventType.KinesisDataStreams, {
868+
schema: customSchema,
869+
});
870+
871+
// Act
872+
processor.register(records, asyncKinesisRecordHandler, options);
873+
const processedMessages = await processor.process();
874+
875+
// Assess
876+
expect(processedMessages[1]).toStrictEqual([
877+
'success',
878+
'success',
879+
secondRecord,
880+
]);
881+
expect(processor.failureMessages.length).toBe(1);
882+
expect(processor.response()).toStrictEqual({
883+
batchItemFailures: [
884+
{ itemIdentifier: firstRecord.kinesis.sequenceNumber },
885+
],
886+
});
887+
});
888+
889+
it('completes the processing with no failures and parses the payload before passing to the record handler', async () => {
890+
// Prepare
891+
const customSchema = z.string();
892+
const firstRecord = kinesisRecordFactory('c3VjY2Vzcw==');
893+
const secondRecord = kinesisRecordFactory('c3VjY2Vzcw==');
894+
const records = [firstRecord, secondRecord];
895+
const processor = new BatchProcessor(EventType.KinesisDataStreams, {
896+
schema: customSchema,
897+
});
898+
899+
// Act
900+
processor.register(records, asyncKinesisRecordHandler, options);
901+
const processedMessages = await processor.process();
902+
903+
// Assess
904+
expect(processedMessages).toStrictEqual([
905+
['success', 'success', firstRecord],
906+
['success', 'success', secondRecord],
907+
]);
908+
});
909+
910+
it('completes processing with all failures if all the payload does not match the passed schema', async () => {
911+
// Prepare
912+
const customSchema = z.string();
913+
//@ts-expect-error Passing a number
914+
const firstRecord = kinesisRecordFactory(1);
915+
//@ts-expect-error Passing a number
916+
const secondRecord = kinesisRecordFactory(1);
917+
const records = [firstRecord, secondRecord];
918+
const processor = new BatchProcessor(EventType.KinesisDataStreams, {
919+
schema: customSchema,
920+
});
921+
922+
// Act
923+
processor.register(records, asyncKinesisRecordHandler, options);
924+
925+
// Assess
926+
await expect(processor.process()).rejects.toThrowError(
927+
FullBatchFailureError
928+
);
929+
});
930+
931+
it('completes processing with failures if an unsupported schema type is used for parsing', async () => {
932+
// Prepare
933+
const customSchema = v.string();
934+
935+
const firstRecord = kinesisRecordFactory('c3VjY2Vzcw==');
936+
const secondRecord = kinesisRecordFactory('c3VjY2Vzcw==');
937+
const records = [firstRecord, secondRecord];
938+
const processor = new BatchProcessor(EventType.KinesisDataStreams, {
939+
schema: customSchema,
940+
});
941+
942+
// Act
943+
processor.register(records, asyncKinesisRecordHandler, options);
944+
945+
// Assess
946+
await expect(processor.process()).rejects.toThrowError(
947+
FullBatchFailureError
948+
);
949+
});
950+
});
951+
952+
describe('Batch processing with Parser Integration: Passing Extended Kinesis Record Schema', () => {
953+
it('completes the processing with failures if some of the payload does not match the passed schema', async () => {
954+
// Prepare
955+
const customSchema = z.string();
956+
const { Base64Encoded } = await import(
957+
'@aws-lambda-powertools/parser/helpers'
958+
);
959+
const { KinesisDataStreamRecord, KinesisDataStreamRecordPayload } =
960+
await import('@aws-lambda-powertools/parser/schemas/kinesis');
961+
const extendedSchema = KinesisDataStreamRecord.extend({
962+
kinesis: KinesisDataStreamRecordPayload.extend({
963+
data: Base64Encoded(customSchema).optional(),
964+
}),
965+
});
966+
//@ts-expect-error Passing a number
967+
const firstRecord = kinesisRecordFactory(1);
968+
const secondRecord = kinesisRecordFactory('c3VjY2Vzcw==');
969+
const records = [firstRecord, secondRecord];
970+
const processor = new BatchProcessor(EventType.KinesisDataStreams, {
971+
schema: extendedSchema,
972+
});
973+
974+
// Act
975+
processor.register(records, asyncKinesisRecordHandler, options);
976+
const processedMessages = await processor.process();
977+
978+
// Assess
979+
expect(processedMessages[1]).toStrictEqual([
980+
'success',
981+
'success',
982+
secondRecord,
983+
]);
984+
expect(processor.failureMessages.length).toBe(1);
985+
expect(processor.response()).toStrictEqual({
986+
batchItemFailures: [
987+
{ itemIdentifier: firstRecord.kinesis.sequenceNumber },
988+
],
989+
});
990+
});
991+
992+
it('completes the processing with no failures and parses the payload before passing to the record handler', async () => {
993+
// Prepare
994+
const customSchema = z.string();
995+
const { Base64Encoded } = await import(
996+
'@aws-lambda-powertools/parser/helpers'
997+
);
998+
const { KinesisDataStreamRecord, KinesisDataStreamRecordPayload } =
999+
await import('@aws-lambda-powertools/parser/schemas/kinesis');
1000+
const extendedSchema = KinesisDataStreamRecord.extend({
1001+
kinesis: KinesisDataStreamRecordPayload.extend({
1002+
data: Base64Encoded(customSchema).optional(),
1003+
}),
1004+
});
1005+
const firstRecord = kinesisRecordFactory('c3VjY2Vzcw==');
1006+
const secondRecord = kinesisRecordFactory('c3VjY2Vzcw==');
1007+
const records = [firstRecord, secondRecord];
1008+
const processor = new BatchProcessor(EventType.KinesisDataStreams, {
1009+
schema: extendedSchema,
1010+
});
1011+
1012+
// Act
1013+
processor.register(records, asyncKinesisRecordHandler, options);
1014+
const processedMessages = await processor.process();
1015+
1016+
// Assess
1017+
expect(processedMessages).toStrictEqual([
1018+
['success', 'success', firstRecord],
1019+
['success', 'success', secondRecord],
1020+
]);
1021+
});
1022+
1023+
it('completes processing with all failures if all the payload does not match the passed schema', async () => {
1024+
// Prepare
1025+
const customSchema = z.string();
1026+
const { Base64Encoded } = await import(
1027+
'@aws-lambda-powertools/parser/helpers'
1028+
);
1029+
const { KinesisDataStreamRecord, KinesisDataStreamRecordPayload } =
1030+
await import('@aws-lambda-powertools/parser/schemas/kinesis');
1031+
const extendedSchema = KinesisDataStreamRecord.extend({
1032+
kinesis: KinesisDataStreamRecordPayload.extend({
1033+
data: Base64Encoded(customSchema).optional(),
1034+
}),
1035+
});
1036+
//@ts-expect-error Passing a number
1037+
const firstRecord = kinesisRecordFactory(1);
1038+
//@ts-expect-error Passing a number
1039+
const secondRecord = kinesisRecordFactory(1);
1040+
const records = [firstRecord, secondRecord];
1041+
const processor = new BatchProcessor(EventType.KinesisDataStreams, {
1042+
schema: extendedSchema,
1043+
});
1044+
1045+
// Act
1046+
processor.register(records, asyncKinesisRecordHandler, options);
1047+
1048+
// Assess
1049+
await expect(processor.process()).rejects.toThrowError(
1050+
FullBatchFailureError
1051+
);
1052+
});
1053+
});
8581054
});

0 commit comments

Comments
 (0)