Skip to content

Commit 0160de4

Browse files
committed
Refactored the code to make the API simpler for better error handling
1 parent 131d64b commit 0160de4

File tree

4 files changed

+258
-209
lines changed

4 files changed

+258
-209
lines changed

packages/batch/src/BasePartialBatchProcessor.ts

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import type { StandardSchemaV1 } from '@standard-schema/spec';
21
import type {
32
DynamoDBRecord,
43
KinesisStreamRecord,
@@ -12,7 +11,7 @@ import {
1211
} from './constants.js';
1312
import { FullBatchFailureError } from './errors.js';
1413
import type {
15-
BasePartialBatchProcessorConfig,
14+
BasePartialBatchProcessorParserConfig,
1615
EventSourceDataClassTypes,
1716
PartialItemFailureResponse,
1817
PartialItemFailures,
@@ -45,9 +44,9 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
4544
public eventType: keyof typeof EventType;
4645

4746
/**
48-
* The schema of the body of the event record for parsing
47+
* The configuration options for the parser integration
4948
*/
50-
protected schema?: StandardSchemaV1;
49+
protected parserConfig?: BasePartialBatchProcessorParserConfig;
5150

5251
/**
5352
* Initializes base batch processing class
@@ -56,7 +55,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
5655
*/
5756
public constructor(
5857
eventType: keyof typeof EventType,
59-
config?: BasePartialBatchProcessorConfig
58+
parserConfig?: BasePartialBatchProcessorParserConfig
6059
) {
6160
super();
6261
this.eventType = eventType;
@@ -66,9 +65,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
6665
[EventType.KinesisDataStreams]: () => this.collectKinesisFailures(),
6766
[EventType.DynamoDBStreams]: () => this.collectDynamoDBFailures(),
6867
};
69-
if (config) {
70-
this.schema = config.schema;
71-
}
68+
this.parserConfig = parserConfig;
7269
}
7370

7471
/**

packages/batch/src/BatchProcessor.ts

Lines changed: 98 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js';
44
import { EventType, SchemaVendor } from './constants.js';
55
import { BatchProcessingError } from './errors.js';
66
import type {
7+
BasePartialBatchProcessorParserConfig,
78
BaseRecord,
89
EventSourceDataClassTypes,
910
FailureResponse,
@@ -108,13 +109,9 @@ class BatchProcessor extends BasePartialBatchProcessor {
108109
record: BaseRecord
109110
): Promise<SuccessResponse | FailureResponse> {
110111
try {
111-
const recordToProcess =
112-
this.schema == null
113-
? record
114-
: await this.#parseRecord(record, this.eventType, this.schema);
112+
const recordToProcess = await this.#parseRecord(record, this.eventType);
115113
const data = this.toBatchType(recordToProcess, this.eventType);
116114
const result = await this.handler(data, this.options?.context);
117-
118115
return this.successHandler(record, result);
119116
} catch (error) {
120117
return this.failureHandler(record, error as Error);
@@ -146,69 +143,53 @@ class BatchProcessor extends BasePartialBatchProcessor {
146143
*/
147144
async #createExtendedSchema(options: {
148145
eventType: keyof typeof EventType;
149-
schema: StandardSchemaV1;
150-
useTransformers: boolean;
146+
innerSchema: StandardSchemaV1;
147+
transformer?: BasePartialBatchProcessorParserConfig['transformer'];
151148
}) {
152-
const { eventType, schema, useTransformers } = options;
149+
const { eventType, innerSchema, transformer } = options;
150+
let schema = innerSchema;
151+
switch (transformer) {
152+
case 'json': {
153+
const { JSONStringified } = await import(
154+
'@aws-lambda-powertools/parser/helpers'
155+
);
156+
schema = JSONStringified(innerSchema as any);
157+
break;
158+
}
159+
case 'base64': {
160+
const { Base64Encoded } = await import(
161+
'@aws-lambda-powertools/parser/helpers'
162+
);
163+
schema = Base64Encoded(innerSchema as any);
164+
break;
165+
}
166+
case 'unmarshall': {
167+
const { DynamoDBMarshalled } = await import(
168+
'@aws-lambda-powertools/parser/helpers/dynamodb'
169+
);
170+
schema = DynamoDBMarshalled(innerSchema as any);
171+
break;
172+
}
173+
}
153174
switch (eventType) {
154175
case EventType.SQS: {
155-
if (useTransformers) {
156-
const [{ JSONStringified }, { SqsRecordSchema }] = await Promise.all([
157-
import('@aws-lambda-powertools/parser/helpers'),
158-
import('@aws-lambda-powertools/parser/schemas/sqs'),
159-
]);
160-
return SqsRecordSchema.extend({
161-
body: JSONStringified(schema as any),
162-
});
163-
}
164176
const { SqsRecordSchema } = await import(
165177
'@aws-lambda-powertools/parser/schemas/sqs'
166178
);
167-
return SqsRecordSchema.extend({ body: schema });
179+
return SqsRecordSchema.extend({
180+
body: schema,
181+
});
168182
}
169-
170183
case EventType.KinesisDataStreams: {
171-
if (useTransformers) {
172-
const [
173-
{ Base64Encoded },
174-
{ KinesisDataStreamRecord, KinesisDataStreamRecordPayload },
175-
] = await Promise.all([
176-
import('@aws-lambda-powertools/parser/helpers'),
177-
import('@aws-lambda-powertools/parser/schemas/kinesis'),
178-
]);
179-
return KinesisDataStreamRecord.extend({
180-
kinesis: KinesisDataStreamRecordPayload.extend({
181-
data: Base64Encoded(schema as any),
182-
}),
183-
});
184-
}
185184
const { KinesisDataStreamRecord, KinesisDataStreamRecordPayload } =
186185
await import('@aws-lambda-powertools/parser/schemas/kinesis');
187186
return KinesisDataStreamRecord.extend({
188-
kinesis: KinesisDataStreamRecordPayload.extend({ data: schema }),
187+
kinesis: KinesisDataStreamRecordPayload.extend({
188+
data: schema,
189+
}),
189190
});
190191
}
191-
192192
case EventType.DynamoDBStreams: {
193-
if (useTransformers) {
194-
const [
195-
{ DynamoDBMarshalled },
196-
{ DynamoDBStreamRecord, DynamoDBStreamChangeRecordBase },
197-
] = await Promise.all([
198-
import('@aws-lambda-powertools/parser/helpers/dynamodb'),
199-
import('@aws-lambda-powertools/parser/schemas/dynamodb'),
200-
]);
201-
return DynamoDBStreamRecord.extend({
202-
dynamodb: DynamoDBStreamChangeRecordBase.extend({
203-
OldImage: DynamoDBMarshalled<StreamRecord['OldImage']>(
204-
schema as any
205-
).optional(),
206-
NewImage: DynamoDBMarshalled<StreamRecord['NewImage']>(
207-
schema as any
208-
).optional(),
209-
}),
210-
});
211-
}
212193
const { DynamoDBStreamRecord, DynamoDBStreamChangeRecordBase } =
213194
await import('@aws-lambda-powertools/parser/schemas/dynamodb');
214195
return DynamoDBStreamRecord.extend({
@@ -218,7 +199,6 @@ class BatchProcessor extends BasePartialBatchProcessor {
218199
}),
219200
});
220201
}
221-
222202
default: {
223203
console.warn(
224204
`The event type provided is not supported. Supported events: ${Object.values(EventType).join(',')}`
@@ -238,57 +218,77 @@ class BatchProcessor extends BasePartialBatchProcessor {
238218
*
239219
* @param record - The record to be parsed
240220
* @param eventType - The type of event to process
241-
* @param schema - The StandardSchema to be used for parsing
242221
*/
243222
async #parseRecord(
244223
record: EventSourceDataClassTypes,
245-
eventType: keyof typeof EventType,
246-
schema: StandardSchemaV1
224+
eventType: keyof typeof EventType
247225
): Promise<EventSourceDataClassTypes> {
248-
const { parse } = await import('@aws-lambda-powertools/parser');
249-
// Try parsing with the original schema first
250-
const extendedSchemaParsing = parse(record, undefined, schema, true);
251-
if (extendedSchemaParsing.success) {
252-
return extendedSchemaParsing.data as EventSourceDataClassTypes;
226+
if (this.parserConfig == null) {
227+
return record;
253228
}
254-
// Only proceed with schema extension if it's a Zod schema
255-
if (schema['~standard'].vendor !== SchemaVendor.Zod) {
256-
console.warn(
257-
'The schema provided is not supported. Only Zod schemas are supported for extension.'
229+
const { parse } = await import('@aws-lambda-powertools/parser');
230+
const { schema, innerSchema, transformer } = this.parserConfig;
231+
// If the external schema is specified, use it to parse the record
232+
if (schema != null) {
233+
const extendedSchemaParsing = parse(record, undefined, schema, true);
234+
if (extendedSchemaParsing.success) {
235+
return extendedSchemaParsing.data as EventSourceDataClassTypes;
236+
}
237+
const issues = extendedSchemaParsing.error
238+
.cause as ReadonlyArray<StandardSchemaV1.Issue>;
239+
throw new Error(
240+
`Failed to parse record: ${issues.map((issue) => `${issue?.path?.join('.')}: ${issue.message}`).join('; ')} `
258241
);
259-
throw new Error('Unsupported schema type');
260-
}
261-
// Handle schema extension based on event type
262-
// Try without transformers first, then with transformers
263-
const schemaWithoutTransformers = await this.#createExtendedSchema({
264-
eventType,
265-
schema,
266-
useTransformers: false,
267-
});
268-
const schemaWithoutTransformersParsing = parse(
269-
record,
270-
undefined,
271-
schemaWithoutTransformers,
272-
true
273-
);
274-
if (schemaWithoutTransformersParsing.success) {
275-
return schemaWithoutTransformersParsing.data as EventSourceDataClassTypes;
276242
}
277-
const schemaWithTransformers = await this.#createExtendedSchema({
278-
eventType,
279-
schema,
280-
useTransformers: true,
281-
});
282-
const schemaWithTransformersParsing = parse(
283-
record,
284-
undefined,
285-
schemaWithTransformers,
286-
true
287-
);
288-
if (schemaWithTransformersParsing.success) {
289-
return schemaWithTransformersParsing.data as EventSourceDataClassTypes;
243+
if (innerSchema != null) {
244+
// Only proceed with schema extension if it's a Zod schema
245+
if (innerSchema['~standard'].vendor !== SchemaVendor.Zod) {
246+
console.warn(
247+
'The schema provided is not supported. Only Zod schemas are supported for extension.'
248+
);
249+
throw new Error('Unsupported schema type');
250+
}
251+
if (transformer != null) {
252+
const schemaWithTransformers = await this.#createExtendedSchema({
253+
eventType,
254+
innerSchema,
255+
transformer,
256+
});
257+
const schemaWithTransformersParsing = parse(
258+
record,
259+
undefined,
260+
schemaWithTransformers,
261+
true
262+
);
263+
if (schemaWithTransformersParsing.success) {
264+
return schemaWithTransformersParsing.data as EventSourceDataClassTypes;
265+
}
266+
const issues = schemaWithTransformersParsing.error
267+
.cause as ReadonlyArray<StandardSchemaV1.Issue>;
268+
throw new Error(
269+
`Failed to parse record: ${issues.map((issue) => `${issue?.path?.join('.')}: ${issue.message}`).join('; ')} `
270+
);
271+
}
272+
const schemaWithoutTransformers = await this.#createExtendedSchema({
273+
eventType,
274+
innerSchema,
275+
});
276+
const schemaWithoutTransformersParsing = parse(
277+
record,
278+
undefined,
279+
schemaWithoutTransformers,
280+
true
281+
);
282+
if (schemaWithoutTransformersParsing.success) {
283+
return schemaWithoutTransformersParsing.data as EventSourceDataClassTypes;
284+
}
285+
const issues = schemaWithoutTransformersParsing.error
286+
.cause as ReadonlyArray<StandardSchemaV1.Issue>;
287+
throw new Error(
288+
`Failed to parse record: ${issues.map((issue) => `${issue?.path?.join('.')}: ${issue.message}`).join('; ')} `
289+
);
290290
}
291-
throw new Error('Failed to parse record');
291+
throw new Error('Either schema or innerSchema is required for parsing');
292292
}
293293
}
294294

packages/batch/src/types.ts

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import type {
55
KinesisStreamRecord,
66
SQSRecord,
77
} from 'aws-lambda';
8-
8+
import type { GenericLogger } from '../../commons/lib/esm/types/GenericLogger.js';
99
import type { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js';
1010
import type { SqsFifoPartialProcessor } from './SqsFifoPartialProcessor.js';
1111
import type { SqsFifoPartialProcessorAsync } from './SqsFifoPartialProcessorAsync.js';
@@ -92,18 +92,37 @@ type PartialItemFailures = { itemIdentifier: string };
9292
type PartialItemFailureResponse = { batchItemFailures: PartialItemFailures[] };
9393

9494
/**
95-
* Type representing the configuration options passed to the BasePartialBatchProcessor class.
95+
* Type representing the parser configuration options passed to the BasePartialBatchProcessor class.
9696
*
9797
* @property schema - The schema to be used for parsing
98+
* @property innerSchema - The schema for the inner payload
99+
* @property transformer - The transformer to be used for parsing the payload
100+
* @property logger - The logger to be used for logging debug and warning messages.
98101
*/
99-
type BasePartialBatchProcessorConfig = {
102+
type BasePartialBatchProcessorParserConfig = {
103+
/**
104+
* The schema for the full event including the extended inner payload schema.
105+
*
106+
* StandardSchema is supported.
107+
*/
108+
schema?: StandardSchemaV1;
109+
/**
110+
* The schema for the inner payload of the event.
111+
* Only Zod schemas are supported.
112+
*/
113+
innerSchema?: StandardSchemaV1;
114+
/**
115+
* The transformer to be used for parsing the payload.
116+
* Supported transformers are:
117+
* 1. 'json': Uses JSONStringified helper
118+
* 2. 'base64': Uses Base64Encoded helper
119+
* 3. 'unmarshall': Uses DynamoDBMarshalled helper
120+
*/
121+
transformer?: 'json' | 'base64' | 'unmarshall';
100122
/**
101-
* The schema be either of the following:
102-
* 1. An internal schema of the payload of the supported event types.
103-
* 2. An internal schema along with helper transformer functions.
104-
* 3. An extended schema of the supported event type.
123+
* The logger to be used for logging debug and warning messages.
105124
*/
106-
schema: StandardSchemaV1;
125+
logger?: Pick<GenericLogger, 'debug' | 'warn' | 'error'>;
107126
};
108127

109128
export type {
@@ -114,5 +133,5 @@ export type {
114133
FailureResponse,
115134
PartialItemFailures,
116135
PartialItemFailureResponse,
117-
BasePartialBatchProcessorConfig,
136+
BasePartialBatchProcessorParserConfig,
118137
};

0 commit comments

Comments
 (0)