Skip to content

Commit 29da8ff

Browse files
committed
Implemented parsing with and without the transformers
1 parent f36d513 commit 29da8ff

File tree

2 files changed

+480
-177
lines changed

2 files changed

+480
-177
lines changed

packages/batch/src/BatchProcessor.ts

Lines changed: 87 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -142,56 +142,87 @@ class BatchProcessor extends BasePartialBatchProcessor {
142142
/**
143143
* Create an extended schema according to the event type passed.
144144
*
145+
* If useTransformers is true, parsing with transformers
146+
* else parse without transformers
147+
*
145148
* @param eventType The type of event to process (SQS, Kinesis, DynamoDB)
146149
* @param schema The StandardSchema to be used for parsing
150+
* @param useTransformers Whether to use transformers for parsing
147151
*/
148152
private async createExtendedSchema(
149153
eventType: keyof typeof EventType,
150-
schema: StandardSchemaV1
154+
schema: StandardSchemaV1,
155+
useTransformers: boolean
151156
) {
152157
switch (eventType) {
153158
case EventType.SQS: {
154-
const [{ JSONStringified }, { SqsRecordSchema }] = await Promise.all([
155-
import('@aws-lambda-powertools/parser/helpers'),
156-
import('@aws-lambda-powertools/parser/schemas/sqs'),
157-
]);
158-
return SqsRecordSchema.extend({
159-
body: JSONStringified(schema as any),
160-
});
159+
if (useTransformers) {
160+
const [{ JSONStringified }, { SqsRecordSchema }] = await Promise.all([
161+
import('@aws-lambda-powertools/parser/helpers'),
162+
import('@aws-lambda-powertools/parser/schemas/sqs'),
163+
]);
164+
return SqsRecordSchema.extend({
165+
body: JSONStringified(schema as any),
166+
});
167+
}
168+
const { SqsRecordSchema } = await import(
169+
'@aws-lambda-powertools/parser/schemas/sqs'
170+
);
171+
return SqsRecordSchema.extend({ body: schema });
161172
}
173+
162174
case EventType.KinesisDataStreams: {
163-
const [
164-
{ Base64Encoded },
165-
{ KinesisDataStreamRecord, KinesisDataStreamRecordPayload },
166-
] = await Promise.all([
167-
import('@aws-lambda-powertools/parser/helpers'),
168-
import('@aws-lambda-powertools/parser/schemas/kinesis'),
169-
]);
175+
if (useTransformers) {
176+
const [
177+
{ Base64Encoded },
178+
{ KinesisDataStreamRecord, KinesisDataStreamRecordPayload },
179+
] = await Promise.all([
180+
import('@aws-lambda-powertools/parser/helpers'),
181+
import('@aws-lambda-powertools/parser/schemas/kinesis'),
182+
]);
183+
return KinesisDataStreamRecord.extend({
184+
kinesis: KinesisDataStreamRecordPayload.extend({
185+
data: Base64Encoded(schema as any),
186+
}),
187+
});
188+
}
189+
const { KinesisDataStreamRecord, KinesisDataStreamRecordPayload } =
190+
await import('@aws-lambda-powertools/parser/schemas/kinesis');
170191
return KinesisDataStreamRecord.extend({
171-
kinesis: KinesisDataStreamRecordPayload.extend({
172-
data: Base64Encoded(schema as any),
173-
}),
192+
kinesis: KinesisDataStreamRecordPayload.extend({ data: schema }),
174193
});
175194
}
195+
176196
case EventType.DynamoDBStreams: {
177-
const [
178-
{ DynamoDBMarshalled },
179-
{ DynamoDBStreamRecord, DynamoDBStreamChangeRecordBase },
180-
] = await Promise.all([
181-
import('@aws-lambda-powertools/parser/helpers/dynamodb'),
182-
import('@aws-lambda-powertools/parser/schemas/dynamodb'),
183-
]);
197+
if (useTransformers) {
198+
const [
199+
{ DynamoDBMarshalled },
200+
{ DynamoDBStreamRecord, DynamoDBStreamChangeRecordBase },
201+
] = await Promise.all([
202+
import('@aws-lambda-powertools/parser/helpers/dynamodb'),
203+
import('@aws-lambda-powertools/parser/schemas/dynamodb'),
204+
]);
205+
return DynamoDBStreamRecord.extend({
206+
dynamodb: DynamoDBStreamChangeRecordBase.extend({
207+
OldImage: DynamoDBMarshalled<StreamRecord['OldImage']>(
208+
schema as any
209+
).optional(),
210+
NewImage: DynamoDBMarshalled<StreamRecord['NewImage']>(
211+
schema as any
212+
).optional(),
213+
}),
214+
});
215+
}
216+
const { DynamoDBStreamRecord, DynamoDBStreamChangeRecordBase } =
217+
await import('@aws-lambda-powertools/parser/schemas/dynamodb');
184218
return DynamoDBStreamRecord.extend({
185219
dynamodb: DynamoDBStreamChangeRecordBase.extend({
186-
OldImage: DynamoDBMarshalled<StreamRecord['OldImage']>(
187-
schema as any
188-
).optional(),
189-
NewImage: DynamoDBMarshalled<StreamRecord['NewImage']>(
190-
schema as any
191-
).optional(),
220+
OldImage: (schema as any).optional(),
221+
NewImage: (schema as any).optional(),
192222
}),
193223
});
194224
}
225+
195226
default: {
196227
console.warn(
197228
`The event type provided is not supported. Supported events: ${Object.values(EventType).join(',')}`
@@ -207,9 +238,7 @@ class BatchProcessor extends BasePartialBatchProcessor {
207238
* If the passed schema is already an extended schema,
208239
* it directly uses the schema to parse the record
209240
*
210-
* If the passed schema is an internal payload schema,
211-
* it checks whether it is a zod schema and
212-
* then extends the zod schema according to the passed event type for parsing
241+
* Only Zod Schemas are supported for automatic schema extension
213242
*
214243
* @param record The record to be parsed
215244
* @param eventType The type of event to process
@@ -234,12 +263,30 @@ class BatchProcessor extends BasePartialBatchProcessor {
234263
throw new Error('Unsupported schema type');
235264
}
236265
// Handle schema extension based on event type
237-
const extendedSchema = await this.createExtendedSchema(eventType, schema);
238-
return parse(
239-
record,
240-
undefined,
241-
extendedSchema
242-
) as EventSourceDataClassTypes;
266+
try {
267+
// Try without transformers first, then with transformers
268+
const extendedSchemaWithoutTransformers = await this.createExtendedSchema(
269+
eventType,
270+
schema,
271+
false
272+
);
273+
return parse(
274+
record,
275+
undefined,
276+
extendedSchemaWithoutTransformers
277+
) as EventSourceDataClassTypes;
278+
} catch {
279+
const extendedSchemaWithTransformers = await this.createExtendedSchema(
280+
eventType,
281+
schema,
282+
true
283+
);
284+
return parse(
285+
record,
286+
undefined,
287+
extendedSchemaWithTransformers
288+
) as EventSourceDataClassTypes;
289+
}
243290
}
244291
}
245292

0 commit comments

Comments
 (0)