Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion packages/batch/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,21 @@
"default": "./lib/esm/index.js"
}
},
"./parser": {
"import": "./lib/esm/parser.js",
"require": "./lib/cjs/parser.js"
},
"./types": {
"import": "./lib/esm/types.js",
"require": "./lib/cjs/types.js"
}
},
"typesVersions": {
"*": {
"parser": [
"lib/cjs/parser.d.ts",
"lib/esm/parser.d.ts"
],
"types": [
"lib/cjs/types.d.ts",
"lib/esm/types.d.ts"
Expand Down Expand Up @@ -76,4 +84,4 @@
"@aws-lambda-powertools/testing-utils": "file:../testing",
"@aws-lambda-powertools/parser": "2.25.2"
}
}
}
31 changes: 23 additions & 8 deletions packages/batch/src/BasePartialBatchProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import type { StandardSchemaV1 } from '@standard-schema/spec';
import { getStringFromEnv } from '@aws-lambda-powertools/commons/utils/env';
import type {
DynamoDBRecord,
KinesisStreamRecord,
SQSRecord,
} from 'aws-lambda';
import type { GenericLogger } from '../../commons/lib/esm/types/GenericLogger.js';
import { BasePartialProcessor } from './BasePartialProcessor.js';
import {
DATA_CLASS_MAPPING,
Expand All @@ -12,7 +13,7 @@ import {
} from './constants.js';
import { FullBatchFailureError } from './errors.js';
import type {
BasePartialBatchProcessorConfig,
BasePartialBatchProcessorParserConfig,
EventSourceDataClassTypes,
PartialItemFailureResponse,
PartialItemFailures,
Expand Down Expand Up @@ -45,9 +46,16 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
public eventType: keyof typeof EventType;

/**
* The schema of the body of the event record for parsing
* A logger instance to be used for logging debug, warning, and error messages.
*
* When no logger is provided, we'll only log warnings and errors using the global `console` object.
*/
protected readonly logger: Pick<GenericLogger, 'debug' | 'warn' | 'error'>;

/**
* The configuration options for the parser integration
*/
protected schema?: StandardSchemaV1;
protected parserConfig?: BasePartialBatchProcessorParserConfig;

/**
* Initializes base batch processing class
Expand All @@ -56,7 +64,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
*/
public constructor(
eventType: keyof typeof EventType,
config?: BasePartialBatchProcessorConfig
parserConfig?: BasePartialBatchProcessorParserConfig
) {
super();
this.eventType = eventType;
Expand All @@ -66,9 +74,16 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
[EventType.KinesisDataStreams]: () => this.collectKinesisFailures(),
[EventType.DynamoDBStreams]: () => this.collectDynamoDBFailures(),
};
if (config) {
this.schema = config.schema;
}
this.parserConfig = parserConfig;
const alcLogLevel = getStringFromEnv({
key: 'AWS_LAMBDA_LOG_LEVEL',
defaultValue: '',
});
this.logger = parserConfig?.logger ?? {
debug: alcLogLevel === 'DEBUG' ? console.debug : () => undefined,
error: console.error,
warn: console.warn,
};
}

/**
Expand Down
180 changes: 9 additions & 171 deletions packages/batch/src/BatchProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,6 @@
import type { StandardSchemaV1 } from '@standard-schema/spec';
import type { StreamRecord } from 'aws-lambda';
import { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js';
import { EventType, SchemaVendor } from './constants.js';
import { BatchProcessingError } from './errors.js';
import type {
BaseRecord,
EventSourceDataClassTypes,
FailureResponse,
SuccessResponse,
} from './types.js';
import type { BaseRecord, FailureResponse, SuccessResponse } from './types.js';

/**
* Process records in a batch asynchronously and handle partial failure cases.
Expand Down Expand Up @@ -108,13 +100,16 @@ class BatchProcessor extends BasePartialBatchProcessor {
record: BaseRecord
): Promise<SuccessResponse | FailureResponse> {
try {
const recordToProcess =
this.schema == null
? record
: await this.#parseRecord(record, this.eventType, this.schema);
const recordToProcess = this.parserConfig?.parser
? await this.parserConfig.parser(
record,
this.eventType,
this.logger,
this.parserConfig
)
: record;
const data = this.toBatchType(recordToProcess, this.eventType);
const result = await this.handler(data, this.options?.context);

return this.successHandler(record, result);
} catch (error) {
return this.failureHandler(record, error as Error);
Expand All @@ -133,163 +128,6 @@ class BatchProcessor extends BasePartialBatchProcessor {
'Not implemented. Use asyncProcess() instead.'
);
}

/**
* Extend the schema according to the event type passed.
*
* If useTransformers is true, extend using opinionated transformers.
* Otherwise, extend without any transformers.
*
* @param eventType - The type of event to process (SQS, Kinesis, DynamoDB)
* @param schema - The StandardSchema to be used for parsing
* @param useTransformers - Whether to use transformers for parsing
*/
async #createExtendedSchema(options: {
eventType: keyof typeof EventType;
schema: StandardSchemaV1;
useTransformers: boolean;
}) {
const { eventType, schema, useTransformers } = options;
switch (eventType) {
case EventType.SQS: {
if (useTransformers) {
const [{ JSONStringified }, { SqsRecordSchema }] = await Promise.all([
import('@aws-lambda-powertools/parser/helpers'),
import('@aws-lambda-powertools/parser/schemas/sqs'),
]);
return SqsRecordSchema.extend({
body: JSONStringified(schema as any),
});
}
const { SqsRecordSchema } = await import(
'@aws-lambda-powertools/parser/schemas/sqs'
);
return SqsRecordSchema.extend({ body: schema });
}

case EventType.KinesisDataStreams: {
if (useTransformers) {
const [
{ Base64Encoded },
{ KinesisDataStreamRecord, KinesisDataStreamRecordPayload },
] = await Promise.all([
import('@aws-lambda-powertools/parser/helpers'),
import('@aws-lambda-powertools/parser/schemas/kinesis'),
]);
return KinesisDataStreamRecord.extend({
kinesis: KinesisDataStreamRecordPayload.extend({
data: Base64Encoded(schema as any),
}),
});
}
const { KinesisDataStreamRecord, KinesisDataStreamRecordPayload } =
await import('@aws-lambda-powertools/parser/schemas/kinesis');
return KinesisDataStreamRecord.extend({
kinesis: KinesisDataStreamRecordPayload.extend({ data: schema }),
});
}

case EventType.DynamoDBStreams: {
if (useTransformers) {
const [
{ DynamoDBMarshalled },
{ DynamoDBStreamRecord, DynamoDBStreamChangeRecordBase },
] = await Promise.all([
import('@aws-lambda-powertools/parser/helpers/dynamodb'),
import('@aws-lambda-powertools/parser/schemas/dynamodb'),
]);
return DynamoDBStreamRecord.extend({
dynamodb: DynamoDBStreamChangeRecordBase.extend({
OldImage: DynamoDBMarshalled<StreamRecord['OldImage']>(
schema as any
).optional(),
NewImage: DynamoDBMarshalled<StreamRecord['NewImage']>(
schema as any
).optional(),
}),
});
}
const { DynamoDBStreamRecord, DynamoDBStreamChangeRecordBase } =
await import('@aws-lambda-powertools/parser/schemas/dynamodb');
return DynamoDBStreamRecord.extend({
dynamodb: DynamoDBStreamChangeRecordBase.extend({
OldImage: (schema as any).optional(),
NewImage: (schema as any).optional(),
}),
});
}

default: {
console.warn(
`The event type provided is not supported. Supported events: ${Object.values(EventType).join(',')}`
);
throw new Error('Unsupported event type');
}
}
}

/**
* Parse the record according to the schema and event type passed.
*
* If the passed schema is already an extended schema,
* use the schema directly to parse the record.
*
* Only Zod Schemas are supported for schema extension.
*
* @param record - The record to be parsed
* @param eventType - The type of event to process
* @param schema - The StandardSchema to be used for parsing
*/
async #parseRecord(
record: EventSourceDataClassTypes,
eventType: keyof typeof EventType,
schema: StandardSchemaV1
): Promise<EventSourceDataClassTypes> {
const { parse } = await import('@aws-lambda-powertools/parser');
// Try parsing with the original schema first
const extendedSchemaParsing = parse(record, undefined, schema, true);
if (extendedSchemaParsing.success) {
return extendedSchemaParsing.data as EventSourceDataClassTypes;
}
// Only proceed with schema extension if it's a Zod schema
if (schema['~standard'].vendor !== SchemaVendor.Zod) {
console.warn(
'The schema provided is not supported. Only Zod schemas are supported for extension.'
);
throw new Error('Unsupported schema type');
}
// Handle schema extension based on event type
// Try without transformers first, then with transformers
const schemaWithoutTransformers = await this.#createExtendedSchema({
eventType,
schema,
useTransformers: false,
});
const schemaWithoutTransformersParsing = parse(
record,
undefined,
schemaWithoutTransformers,
true
);
if (schemaWithoutTransformersParsing.success) {
return schemaWithoutTransformersParsing.data as EventSourceDataClassTypes;
}
const schemaWithTransformers = await this.#createExtendedSchema({
eventType,
schema,
useTransformers: true,
});
const schemaWithTransformersParsing = parse(
record,
undefined,
schemaWithTransformers,
true
);
if (schemaWithTransformersParsing.success) {
return schemaWithTransformersParsing.data as EventSourceDataClassTypes;
}
throw new Error('Failed to parse record');
}
}

export { BatchProcessor };
11 changes: 11 additions & 0 deletions packages/batch/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,21 @@ class UnexpectedBatchTypeError extends BatchProcessingError {
}
}

/**
* Error thrown by the Batch Processing utility when a record fails to be parsed.
*/
class ParsingError extends BatchProcessingError {
public constructor(message: string) {
super(message);
this.name = 'ParsingError';
}
}

export {
BatchProcessingError,
FullBatchFailureError,
SqsFifoShortCircuitError,
SqsFifoMessageGroupShortCircuitError,
UnexpectedBatchTypeError,
ParsingError,
};
1 change: 1 addition & 0 deletions packages/batch/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export { EventType } from './constants.js';
export {
BatchProcessingError,
FullBatchFailureError,
ParsingError,
SqsFifoMessageGroupShortCircuitError,
SqsFifoShortCircuitError,
UnexpectedBatchTypeError,
Expand Down
Loading