Skip to content

Commit b33832f

Browse files
committed
Integrated Parser for the Kinesis event type
1 parent 7120337 commit b33832f

File tree

1 file changed

+23
-0
lines changed

1 file changed

+23
-0
lines changed

packages/batch/src/BatchProcessor.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,29 @@ class BatchProcessor extends BasePartialBatchProcessor {
181181
);
182182
throw new Error('Unsupported schema type');
183183
}
184+
if (eventType === EventType.KinesisDataStreams) {
185+
const extendedSchemaParsing = parse(record, undefined, schema, true);
186+
if (extendedSchemaParsing.success)
187+
return extendedSchemaParsing.data as KinesisStreamRecord;
188+
if (schema['~standard'].vendor === SchemaType.Zod) {
189+
const { Base64Encoded } = await import(
190+
'@aws-lambda-powertools/parser/helpers'
191+
);
192+
const { KinesisDataStreamRecord, KinesisDataStreamRecordPayload } =
193+
await import('@aws-lambda-powertools/parser/schemas/kinesis');
194+
const extendedSchema = KinesisDataStreamRecord.extend({
195+
kinesis: KinesisDataStreamRecordPayload.extend({
196+
// biome-ignore lint/suspicious/noExplicitAny: The vendor field in the schema is verified that the schema is a Zod schema
197+
data: Base64Encoded(schema as any),
198+
}),
199+
});
200+
return parse(record, undefined, extendedSchema);
201+
}
202+
console.warn(
203+
'The schema provided is not supported. Only Zod schemas are supported for extension.'
204+
);
205+
throw new Error('Unsupported schema type');
206+
}
184207
if (eventType === EventType.DynamoDBStreams) {
185208
const extendedSchemaParsing = parse(record, undefined, schema, true);
186209
if (extendedSchemaParsing.success)

0 commit comments

Comments
 (0)