Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions packages/batch/src/BasePartialBatchProcessor.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import type { StandardSchemaV1 } from '@standard-schema/spec';
import { getStringFromEnv } from '@aws-lambda-powertools/commons/utils/env';
import type {
DynamoDBRecord,
KinesisStreamRecord,
SQSRecord,
} from 'aws-lambda';
import type { GenericLogger } from '../../commons/lib/esm/types/GenericLogger.js';
import { BasePartialProcessor } from './BasePartialProcessor.js';
import {
DATA_CLASS_MAPPING,
Expand All @@ -12,7 +13,7 @@ import {
} from './constants.js';
import { FullBatchFailureError } from './errors.js';
import type {
BasePartialBatchProcessorConfig,
BasePartialBatchProcessorParserConfig,
EventSourceDataClassTypes,
PartialItemFailureResponse,
PartialItemFailures,
Expand Down Expand Up @@ -45,9 +46,16 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
public eventType: keyof typeof EventType;

/**
* The schema of the body of the event record for parsing
* A logger instance to be used for logging debug, warning, and error messages.
*
* When no logger is provided, we'll only log warnings and errors using the global `console` object.
*/
protected readonly logger: Pick<GenericLogger, 'debug' | 'warn' | 'error'>;

/**
* The configuration options for the parser integration
*/
protected schema?: StandardSchemaV1;
protected parserConfig?: BasePartialBatchProcessorParserConfig;

/**
* Initializes base batch processing class
Expand All @@ -56,7 +64,7 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
*/
public constructor(
eventType: keyof typeof EventType,
config?: BasePartialBatchProcessorConfig
parserConfig?: BasePartialBatchProcessorParserConfig
) {
super();
this.eventType = eventType;
Expand All @@ -66,9 +74,16 @@ abstract class BasePartialBatchProcessor extends BasePartialProcessor {
[EventType.KinesisDataStreams]: () => this.collectKinesisFailures(),
[EventType.DynamoDBStreams]: () => this.collectDynamoDBFailures(),
};
if (config) {
this.schema = config.schema;
}
this.parserConfig = parserConfig;
const alcLogLevel = getStringFromEnv({
key: 'AWS_LAMBDA_LOG_LEVEL',
defaultValue: '',
});
this.logger = parserConfig?.logger ?? {
debug: alcLogLevel === 'DEBUG' ? console.debug : () => undefined,
error: console.error,
warn: console.warn,
};
}

/**
Expand Down
188 changes: 90 additions & 98 deletions packages/batch/src/BatchProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js';
import { EventType, SchemaVendor } from './constants.js';
import { BatchProcessingError } from './errors.js';
import type {
BasePartialBatchProcessorParserConfig,
BaseRecord,
EventSourceDataClassTypes,
FailureResponse,
Expand Down Expand Up @@ -108,13 +109,9 @@ class BatchProcessor extends BasePartialBatchProcessor {
record: BaseRecord
): Promise<SuccessResponse | FailureResponse> {
try {
const recordToProcess =
this.schema == null
? record
: await this.#parseRecord(record, this.eventType, this.schema);
const recordToProcess = await this.#parseRecord(record, this.eventType);
const data = this.toBatchType(recordToProcess, this.eventType);
const result = await this.handler(data, this.options?.context);

return this.successHandler(record, result);
} catch (error) {
return this.failureHandler(record, error as Error);
Expand Down Expand Up @@ -146,69 +143,53 @@ class BatchProcessor extends BasePartialBatchProcessor {
*/
async #createExtendedSchema(options: {
eventType: keyof typeof EventType;
schema: StandardSchemaV1;
useTransformers: boolean;
innerSchema: StandardSchemaV1;
transformer?: BasePartialBatchProcessorParserConfig['transformer'];
}) {
const { eventType, schema, useTransformers } = options;
const { eventType, innerSchema, transformer } = options;
let schema = innerSchema;
switch (transformer) {
case 'json': {
const { JSONStringified } = await import(
'@aws-lambda-powertools/parser/helpers'
);
schema = JSONStringified(innerSchema as any);
break;
}
case 'base64': {
const { Base64Encoded } = await import(
'@aws-lambda-powertools/parser/helpers'
);
schema = Base64Encoded(innerSchema as any);
break;
}
case 'unmarshall': {
const { DynamoDBMarshalled } = await import(
'@aws-lambda-powertools/parser/helpers/dynamodb'
);
schema = DynamoDBMarshalled(innerSchema as any);
break;
}
}
switch (eventType) {
case EventType.SQS: {
if (useTransformers) {
const [{ JSONStringified }, { SqsRecordSchema }] = await Promise.all([
import('@aws-lambda-powertools/parser/helpers'),
import('@aws-lambda-powertools/parser/schemas/sqs'),
]);
return SqsRecordSchema.extend({
body: JSONStringified(schema as any),
});
}
const { SqsRecordSchema } = await import(
'@aws-lambda-powertools/parser/schemas/sqs'
);
return SqsRecordSchema.extend({ body: schema });
return SqsRecordSchema.extend({
body: schema,
});
}

case EventType.KinesisDataStreams: {
if (useTransformers) {
const [
{ Base64Encoded },
{ KinesisDataStreamRecord, KinesisDataStreamRecordPayload },
] = await Promise.all([
import('@aws-lambda-powertools/parser/helpers'),
import('@aws-lambda-powertools/parser/schemas/kinesis'),
]);
return KinesisDataStreamRecord.extend({
kinesis: KinesisDataStreamRecordPayload.extend({
data: Base64Encoded(schema as any),
}),
});
}
const { KinesisDataStreamRecord, KinesisDataStreamRecordPayload } =
await import('@aws-lambda-powertools/parser/schemas/kinesis');
return KinesisDataStreamRecord.extend({
kinesis: KinesisDataStreamRecordPayload.extend({ data: schema }),
kinesis: KinesisDataStreamRecordPayload.extend({
data: schema,
}),
});
}

case EventType.DynamoDBStreams: {
if (useTransformers) {
const [
{ DynamoDBMarshalled },
{ DynamoDBStreamRecord, DynamoDBStreamChangeRecordBase },
] = await Promise.all([
import('@aws-lambda-powertools/parser/helpers/dynamodb'),
import('@aws-lambda-powertools/parser/schemas/dynamodb'),
]);
return DynamoDBStreamRecord.extend({
dynamodb: DynamoDBStreamChangeRecordBase.extend({
OldImage: DynamoDBMarshalled<StreamRecord['OldImage']>(
schema as any
).optional(),
NewImage: DynamoDBMarshalled<StreamRecord['NewImage']>(
schema as any
).optional(),
}),
});
}
const { DynamoDBStreamRecord, DynamoDBStreamChangeRecordBase } =
await import('@aws-lambda-powertools/parser/schemas/dynamodb');
return DynamoDBStreamRecord.extend({
Expand All @@ -218,7 +199,6 @@ class BatchProcessor extends BasePartialBatchProcessor {
}),
});
}

default: {
console.warn(
`The event type provided is not supported. Supported events: ${Object.values(EventType).join(',')}`
Expand All @@ -228,6 +208,30 @@ class BatchProcessor extends BasePartialBatchProcessor {
}
}

/**
* Parse the record with the passed schema and
* return the result or throw the error depending on parsing success
*
* @param record - The record to be parsed
* @param schema - The modified schema to parse with
*/
async #parseWithErrorHandling(
record: EventSourceDataClassTypes,
schema: StandardSchemaV1
) {
const { parse } = await import('@aws-lambda-powertools/parser');
const result = parse(record, undefined, schema, true);
if (result.success) {
return result.data as EventSourceDataClassTypes;
}
const issues = result.error.cause as ReadonlyArray<StandardSchemaV1.Issue>;
const errorMessage = issues
.map((issue) => `${issue?.path?.join('.')}: ${issue.message}`)
.join('; ');
this.logger.debug(errorMessage);
throw new Error(errorMessage);
}

/**
* Parse the record according to the schema and event type passed.
*
Expand All @@ -238,57 +242,45 @@ class BatchProcessor extends BasePartialBatchProcessor {
*
* @param record - The record to be parsed
* @param eventType - The type of event to process
* @param schema - The StandardSchema to be used for parsing
*/
async #parseRecord(
record: EventSourceDataClassTypes,
eventType: keyof typeof EventType,
schema: StandardSchemaV1
eventType: keyof typeof EventType
): Promise<EventSourceDataClassTypes> {
const { parse } = await import('@aws-lambda-powertools/parser');
// Try parsing with the original schema first
const extendedSchemaParsing = parse(record, undefined, schema, true);
if (extendedSchemaParsing.success) {
return extendedSchemaParsing.data as EventSourceDataClassTypes;
if (this.parserConfig == null) {
return record;
}
// Only proceed with schema extension if it's a Zod schema
if (schema['~standard'].vendor !== SchemaVendor.Zod) {
console.warn(
'The schema provided is not supported. Only Zod schemas are supported for extension.'
);
throw new Error('Unsupported schema type');
const { schema, innerSchema, transformer } = this.parserConfig;
// If the external schema is specified, use it to parse the record
if (schema != null) {
return this.#parseWithErrorHandling(record, schema);
}
// Handle schema extension based on event type
// Try without transformers first, then with transformers
const schemaWithoutTransformers = await this.#createExtendedSchema({
eventType,
schema,
useTransformers: false,
});
const schemaWithoutTransformersParsing = parse(
record,
undefined,
schemaWithoutTransformers,
true
);
if (schemaWithoutTransformersParsing.success) {
return schemaWithoutTransformersParsing.data as EventSourceDataClassTypes;
if (innerSchema != null) {
// Only proceed with schema extension if it's a Zod schema
if (innerSchema['~standard'].vendor !== SchemaVendor.Zod) {
this.logger.error(
'The schema provided is not supported. Only Zod schemas are supported for extension.'
);
throw new Error('Unsupported schema type');
}
if (transformer != null) {
const schemaWithTransformers = await this.#createExtendedSchema({
eventType,
innerSchema,
transformer,
});
return this.#parseWithErrorHandling(record, schemaWithTransformers);
}
const schemaWithoutTransformers = await this.#createExtendedSchema({
eventType,
innerSchema,
});
return this.#parseWithErrorHandling(record, schemaWithoutTransformers);
}
const schemaWithTransformers = await this.#createExtendedSchema({
eventType,
schema,
useTransformers: true,
});
const schemaWithTransformersParsing = parse(
record,
undefined,
schemaWithTransformers,
true
this.logger.error(
'The schema provided is not supported. Only Zod schemas are supported for extension.'
);
if (schemaWithTransformersParsing.success) {
return schemaWithTransformersParsing.data as EventSourceDataClassTypes;
}
throw new Error('Failed to parse record');
throw new Error('Either schema or innerSchema is required for parsing');
}
}

Expand Down
40 changes: 30 additions & 10 deletions packages/batch/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import type {
KinesisStreamRecord,
SQSRecord,
} from 'aws-lambda';

import type { GenericLogger } from '../../commons/lib/esm/types/GenericLogger.js';
import type { BasePartialBatchProcessor } from './BasePartialBatchProcessor.js';
import type { SqsFifoPartialProcessor } from './SqsFifoPartialProcessor.js';
import type { SqsFifoPartialProcessorAsync } from './SqsFifoPartialProcessorAsync.js';
Expand Down Expand Up @@ -92,18 +92,38 @@ type PartialItemFailures = { itemIdentifier: string };
type PartialItemFailureResponse = { batchItemFailures: PartialItemFailures[] };

/**
* Type representing the configuration options passed to the BasePartialBatchProcessor class.
* Type representing the parser configuration options passed to the BasePartialBatchProcessor class.
*
* @property schema - The schema to be used for parsing
* @property schema - The full event schema to be used for parsing
* @property innerSchema - The inner payload schema
* @property transformer - The transformer to be used for parsing the payload
* @property logger - The logger to be used for logging debug and warning messages.
*/
type BasePartialBatchProcessorConfig = {
type BasePartialBatchProcessorParserConfig = {
/**
* The schema for the full event including the extended inner payload schema.
*
* StandardSchema is supported.
*/
schema?: StandardSchemaV1;
/**
* The schema for the inner payload of the event.
* Only Zod schemas are supported.
*/
innerSchema?: StandardSchemaV1;
/**
* The transformer to be used for parsing the payload.
* No transformers will be used if this is not provided.
* Supported transformers are:
* 1. 'json': Uses JSONStringified helper
* 2. 'base64': Uses Base64Encoded helper
* 3. 'unmarshall': Uses DynamoDBMarshalled helper
*/
transformer?: 'json' | 'base64' | 'unmarshall';
/**
* The schema be either of the following:
* 1. An internal schema of the payload of the supported event types.
* 2. An internal schema along with helper transformer functions.
* 3. An extended schema of the supported event type.
* The logger to be used for logging debug and warning messages.
*/
schema: StandardSchemaV1;
logger?: Pick<GenericLogger, 'debug' | 'warn' | 'error'>;
};

export type {
Expand All @@ -114,5 +134,5 @@ export type {
FailureResponse,
PartialItemFailures,
PartialItemFailureResponse,
BasePartialBatchProcessorConfig,
BasePartialBatchProcessorParserConfig,
};
Loading
Loading