1
- import type { StandardSchemaV1 } from '@standard-schema/spec' ;
2
- import type { StreamRecord } from 'aws-lambda' ;
3
1
import { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js' ;
4
- import { EventType , SchemaVendor } from './constants.js' ;
5
2
import { BatchProcessingError } from './errors.js' ;
6
- import type {
7
- BasePartialBatchProcessorParserConfig ,
8
- BaseRecord ,
9
- EventSourceDataClassTypes ,
10
- FailureResponse ,
11
- SuccessResponse ,
12
- } from './types.js' ;
3
+ import type { BaseRecord , FailureResponse , SuccessResponse } from './types.js' ;
13
4
14
5
/**
15
6
* Process records in a batch asynchronously and handle partial failure cases.
@@ -109,7 +100,14 @@ class BatchProcessor extends BasePartialBatchProcessor {
109
100
record : BaseRecord
110
101
) : Promise < SuccessResponse | FailureResponse > {
111
102
try {
112
- const recordToProcess = await this . #parseRecord( record , this . eventType ) ;
103
+ const recordToProcess = this . parserConfig ?. parser
104
+ ? await this . parserConfig . parser (
105
+ record ,
106
+ this . eventType ,
107
+ this . logger ,
108
+ this . parserConfig
109
+ )
110
+ : record ;
113
111
const data = this . toBatchType ( recordToProcess , this . eventType ) ;
114
112
const result = await this . handler ( data , this . options ?. context ) ;
115
113
return this . successHandler ( record , result ) ;
@@ -130,158 +128,6 @@ class BatchProcessor extends BasePartialBatchProcessor {
130
128
'Not implemented. Use asyncProcess() instead.'
131
129
) ;
132
130
}
133
-
134
- /**
135
- * Extend the schema according to the event type passed.
136
- *
137
- * If useTransformers is true, extend using opinionated transformers.
138
- * Otherwise, extend without any transformers.
139
- *
140
- * @param eventType - The type of event to process (SQS, Kinesis, DynamoDB)
141
- * @param schema - The StandardSchema to be used for parsing
142
- * @param useTransformers - Whether to use transformers for parsing
143
- */
144
- async #createExtendedSchema( options : {
145
- eventType : keyof typeof EventType ;
146
- innerSchema : StandardSchemaV1 ;
147
- transformer ?: BasePartialBatchProcessorParserConfig [ 'transformer' ] ;
148
- } ) {
149
- const { eventType, innerSchema, transformer } = options ;
150
- let schema = innerSchema ;
151
- switch ( transformer ) {
152
- case 'json' : {
153
- const { JSONStringified } = await import (
154
- '@aws-lambda-powertools/parser/helpers'
155
- ) ;
156
- schema = JSONStringified ( innerSchema as any ) ;
157
- break ;
158
- }
159
- case 'base64' : {
160
- const { Base64Encoded } = await import (
161
- '@aws-lambda-powertools/parser/helpers'
162
- ) ;
163
- schema = Base64Encoded ( innerSchema as any ) ;
164
- break ;
165
- }
166
- case 'unmarshall' : {
167
- const { DynamoDBMarshalled } = await import (
168
- '@aws-lambda-powertools/parser/helpers/dynamodb'
169
- ) ;
170
- schema = DynamoDBMarshalled ( innerSchema as any ) ;
171
- break ;
172
- }
173
- }
174
- switch ( eventType ) {
175
- case EventType . SQS : {
176
- const { SqsRecordSchema } = await import (
177
- '@aws-lambda-powertools/parser/schemas/sqs'
178
- ) ;
179
- return SqsRecordSchema . extend ( {
180
- body : schema ,
181
- } ) ;
182
- }
183
- case EventType . KinesisDataStreams : {
184
- const { KinesisDataStreamRecord, KinesisDataStreamRecordPayload } =
185
- await import ( '@aws-lambda-powertools/parser/schemas/kinesis' ) ;
186
- return KinesisDataStreamRecord . extend ( {
187
- kinesis : KinesisDataStreamRecordPayload . extend ( {
188
- data : schema ,
189
- } ) ,
190
- } ) ;
191
- }
192
- case EventType . DynamoDBStreams : {
193
- const { DynamoDBStreamRecord, DynamoDBStreamChangeRecordBase } =
194
- await import ( '@aws-lambda-powertools/parser/schemas/dynamodb' ) ;
195
- return DynamoDBStreamRecord . extend ( {
196
- dynamodb : DynamoDBStreamChangeRecordBase . extend ( {
197
- OldImage : ( schema as any ) . optional ( ) ,
198
- NewImage : ( schema as any ) . optional ( ) ,
199
- } ) ,
200
- } ) ;
201
- }
202
- default : {
203
- console . warn (
204
- `The event type provided is not supported. Supported events: ${ Object . values ( EventType ) . join ( ',' ) } `
205
- ) ;
206
- throw new Error ( 'Unsupported event type' ) ;
207
- }
208
- }
209
- }
210
-
211
- /**
212
- * Parse the record with the passed schema and
213
- * return the result or throw the error depending on parsing success
214
- *
215
- * @param record - The record to be parsed
216
- * @param schema - The modified schema to parse with
217
- */
218
- async #parseWithErrorHandling(
219
- record : EventSourceDataClassTypes ,
220
- schema : StandardSchemaV1
221
- ) {
222
- const { parse } = await import ( '@aws-lambda-powertools/parser' ) ;
223
- const result = parse ( record , undefined , schema , true ) ;
224
- if ( result . success ) {
225
- return result . data as EventSourceDataClassTypes ;
226
- }
227
- const issues = result . error . cause as ReadonlyArray < StandardSchemaV1 . Issue > ;
228
- const errorMessage = issues
229
- . map ( ( issue ) => `${ issue ?. path ?. join ( '.' ) } : ${ issue . message } ` )
230
- . join ( '; ' ) ;
231
- this . logger . debug ( errorMessage ) ;
232
- throw new Error ( errorMessage ) ;
233
- }
234
-
235
- /**
236
- * Parse the record according to the schema and event type passed.
237
- *
238
- * If the passed schema is already an extended schema,
239
- * use the schema directly to parse the record.
240
- *
241
- * Only Zod Schemas are supported for schema extension.
242
- *
243
- * @param record - The record to be parsed
244
- * @param eventType - The type of event to process
245
- */
246
- async #parseRecord(
247
- record : EventSourceDataClassTypes ,
248
- eventType : keyof typeof EventType
249
- ) : Promise < EventSourceDataClassTypes > {
250
- if ( this . parserConfig == null ) {
251
- return record ;
252
- }
253
- const { schema, innerSchema, transformer } = this . parserConfig ;
254
- // If the external schema is specified, use it to parse the record
255
- if ( schema != null ) {
256
- return this . #parseWithErrorHandling( record , schema ) ;
257
- }
258
- if ( innerSchema != null ) {
259
- // Only proceed with schema extension if it's a Zod schema
260
- if ( innerSchema [ '~standard' ] . vendor !== SchemaVendor . Zod ) {
261
- this . logger . error (
262
- 'The schema provided is not supported. Only Zod schemas are supported for extension.'
263
- ) ;
264
- throw new Error ( 'Unsupported schema type' ) ;
265
- }
266
- if ( transformer != null ) {
267
- const schemaWithTransformers = await this . #createExtendedSchema( {
268
- eventType,
269
- innerSchema,
270
- transformer,
271
- } ) ;
272
- return this . #parseWithErrorHandling( record , schemaWithTransformers ) ;
273
- }
274
- const schemaWithoutTransformers = await this . #createExtendedSchema( {
275
- eventType,
276
- innerSchema,
277
- } ) ;
278
- return this . #parseWithErrorHandling( record , schemaWithoutTransformers ) ;
279
- }
280
- this . logger . error (
281
- 'The schema provided is not supported. Only Zod schemas are supported for extension.'
282
- ) ;
283
- throw new Error ( 'Either schema or innerSchema is required for parsing' ) ;
284
- }
285
131
}
286
132
287
133
export { BatchProcessor } ;
0 commit comments