Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 91 additions & 49 deletions packages/kafka/src/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@ import type { AsyncHandler } from '@aws-lambda-powertools/commons/types';
import { isNull, isRecord } from '@aws-lambda-powertools/commons/typeutils';
import type { StandardSchemaV1 } from '@standard-schema/spec';
import type { Context, Handler } from 'aws-lambda';
import { deserialize as deserializeJson } from './deserializer/json.js';
import { deserialize as deserializePrimitive } from './deserializer/primitive.js';
import {
KafkaConsumerAvroMissingSchemaError,
KafkaConsumerDeserializationError,
KafkaConsumerError,
KafkaConsumerParserError,
KafkaConsumerProtobufMissingSchemaError,
} from './errors.js';
import type {
ConsumerRecord,
ConsumerRecords,
Deserializer,
Record as KafkaRecord,
MSKEvent,
SchemaConfig,
Expand All @@ -27,7 +32,7 @@ const assertIsMSKEvent = (event: unknown): event is MSKEvent => {
!isRecord(event.records) ||
!Object.values(event.records).every((arr) => Array.isArray(arr))
) {
throw new Error(
throw new KafkaConsumerError(
'Event is not a valid MSKEvent. Expected an object with a "records" property.'
);
}
Expand Down Expand Up @@ -69,69 +74,80 @@ const deserializeHeaders = (headers: Record<string, number[]>[] | null) => {
* @param config - The schema configuration to use for deserialization. See {@link SchemaConfigValue | `SchemaConfigValue`}.
* If not provided, the value is decoded as a UTF-8 string.
*/
const deserialize = async (value: string, config?: SchemaConfigValue) => {
// no config -> default to base64 decoding
const deserialize = (
value: string,
deserializer: Deserializer,
config?: SchemaConfigValue
) => {
if (config === undefined) {
return Buffer.from(value, 'base64').toString();
return deserializer(value);
}

// if config is provided, we expect it to have a specific type
if (!['json', 'avro', 'protobuf'].includes(config.type)) {
throw new Error(
`Unsupported deserialization type: ${config.type}. Supported types are: json, avro, protobuf.`
);
}

if (config.type === 'json') {
const deserializer = await import('./deserializer/json.js');
return deserializer.deserialize(value);
return deserializer(value);
}

if (config.type === 'avro') {
if (!config.schema) {
throw new KafkaConsumerAvroMissingSchemaError(
'Schema string is required for Avro deserialization'
'Schema string is required for avro deserialization'
);
}
const deserializer = await import('./deserializer/avro.js');
return deserializer.deserialize(value, config.schema);
return deserializer(value, config.schema);
}
if (config.type === 'protobuf') {
if (!config.schema) {
throw new KafkaConsumerProtobufMissingSchemaError(
'Schema string is required for Protobuf deserialization'
'Schema string is required for protobuf deserialization'
);
}
const deserializer = await import('./deserializer/protobuf.js');
return deserializer.deserialize(value, config.schema);
return deserializer(value, config.schema);
}
};

/**
* Deserialize the key of a Kafka record.
* Get the deserializer function based on the provided type.
*
* If the key is `undefined`, it returns `undefined`.
*
* @param key - The base64-encoded key to deserialize.
* @param config - The schema configuration for deserializing the key. See {@link SchemaConfigValue | `SchemaConfigValue`}.
* @param type - The type of deserializer to use. Supported types are: `json`, `avro`, `protobuf`, or `undefined`.
* If `undefined`, it defaults to deserializing as a primitive string.
*/
const deserializeKey = async (key?: string, config?: SchemaConfigValue) => {
if (key === undefined || key === '') {
return undefined;
const getDeserializer = async (type?: string) => {
if (!type) {
return deserializePrimitive as Deserializer;
}
if (type === 'json') {
return deserializeJson as Deserializer;
}
if (type === 'protobuf') {
const deserializer = await import('./deserializer/protobuf.js');
return deserializer.deserialize as Deserializer;
}
if (type === 'avro') {
const deserializer = await import('./deserializer/avro.js');
return deserializer.deserialize as Deserializer;
}
if (isNull(key)) return null;
return await deserialize(key, config);
throw new KafkaConsumerDeserializationError(
`Unsupported deserialization type: ${type}. Supported types are: json, avro, protobuf.`
);
};

const parseSchema = async (value: unknown, schema: StandardSchemaV1) => {
let result = schema['~standard'].validate(value);
/**
* Parse a value against a provided schema using the `~standard` property for validation.
*
* @param value - The value to parse against the schema.
* @param schema - The schema to validate against, which should be a {@link StandardSchemaV1 | `Standard Schema V1`} object.
*/
const parseSchema = (value: unknown, schema: StandardSchemaV1) => {
const result = schema['~standard'].validate(value);
/* v8 ignore start */
if (result instanceof Promise) result = await result;
/* v8 ignore stop */
if (result.issues) {
if (result instanceof Promise)
throw new KafkaConsumerParserError(
`Schema validation failed ${result.issues}`
'Schema parsing supports only synchronous validation'
);
/* v8 ignore stop */
if (result.issues) {
throw new KafkaConsumerParserError('Schema validation failed', {
cause: result.issues,
});
}
return result.value;
};
Expand All @@ -142,24 +158,45 @@ const parseSchema = async (value: unknown, schema: StandardSchemaV1) => {
* @param record - A single record from the MSK event.
* @param config - The schema configuration for deserializing the record's key and value.
*/
const deserializeRecord = async (record: KafkaRecord, config: SchemaConfig) => {
const deserializeRecord = async (
record: KafkaRecord,
config?: SchemaConfig
) => {
const { key, value, headers, ...rest } = record;
const { key: keyConfig, value: valueConfig } = config;
const { key: keyConfig, value: valueConfig } = config || {};

const deserializedKey = await deserializeKey(key, keyConfig);
const deserializedValue = await deserialize(value, valueConfig);
const deserializerKey = await getDeserializer(keyConfig?.type);
const deserializerValue = await getDeserializer(valueConfig?.type);

return {
...rest,
key: keyConfig?.parserSchema
? await parseSchema(deserializedKey, keyConfig.parserSchema)
: deserializedKey,
value: valueConfig?.parserSchema
? await parseSchema(deserializedValue, valueConfig.parserSchema)
: deserializedValue,
get key() {
if (key === undefined || key === '') {
return undefined;
}
if (isNull(key)) return null;
const deserializedKey = deserialize(key, deserializerKey, keyConfig);

return keyConfig?.parserSchema
? parseSchema(deserializedKey, keyConfig.parserSchema)
: deserializedKey;
},
originalKey: key,
get value() {
const deserializedValue = deserialize(
value,
deserializerValue,
valueConfig
);

return valueConfig?.parserSchema
? parseSchema(deserializedValue, valueConfig.parserSchema)
: deserializedValue;
},
originalValue: value,
headers: deserializeHeaders(headers),
get headers() {
return deserializeHeaders(headers);
},
originalHeaders: headers,
};
};
Expand Down Expand Up @@ -202,15 +239,20 @@ const deserializeRecord = async (record: KafkaRecord, config: SchemaConfig) => {
*/
const kafkaConsumer = <K, V>(
handler: AsyncHandler<Handler<ConsumerRecords<K, V>>>,
config: SchemaConfig
config?: SchemaConfig
): ((event: MSKEvent, context: Context) => Promise<unknown>) => {
return async (event: MSKEvent, context: Context): Promise<unknown> => {
assertIsMSKEvent(event);

const consumerRecords: ConsumerRecord<K, V>[] = [];
for (const recordsArray of Object.values(event.records)) {
for (const record of recordsArray) {
consumerRecords.push(await deserializeRecord(record, config));
consumerRecords.push(
(await deserializeRecord(
record,
config
)) as unknown as ConsumerRecord<K, V>
);
}
}

Expand Down
4 changes: 3 additions & 1 deletion packages/kafka/src/deserializer/avro.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { KafkaConsumerDeserializationError } from '../errors.js';
* @param data - The base64-encoded string representing the Avro binary data.
* @param schema - The Avro schema as a JSON string.
*/
export const deserialize = async (data: string, schema: string) => {
const deserialize = (data: string, schema: string) => {
try {
const type = avro.parse(schema);
const buffer = Buffer.from(data, 'base64');
Expand All @@ -18,3 +18,5 @@ export const deserialize = async (data: string, schema: string) => {
);
}
};

export { deserialize };
16 changes: 10 additions & 6 deletions packages/kafka/src/deserializer/json.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
import { deserialize as deserializePrimitive } from './primitive.js';

/**
* Deserializes a base64 encoded string into either a JSON object or plain string
* Deserialize a base64 encoded string into either a JSON object or plain string
*
* @param data - The base64 encoded string to deserialize
* @returns The deserialized data as either a JSON object or string
*/
export const deserialize = async (data: string) => {
// Decode the base64 string to a buffer
const decoded = Buffer.from(data, 'base64');
const deserialize = (data: string) => {
const plainText = deserializePrimitive(data);
try {
// Attempt to parse the decoded data as JSON
// we assume it's a JSON but it can also be a string, we don't know
return JSON.parse(decoded.toString());
return JSON.parse(plainText);
} catch (error) {
// If JSON parsing fails, log the error and return the decoded string
// in case we could not parse it we return the base64 decoded value
console.error(`Failed to parse JSON from base64 value: ${data}`, error);
return decoded.toString();
return plainText;
}
};

export { deserialize };
16 changes: 16 additions & 0 deletions packages/kafka/src/deserializer/primitive.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { fromBase64 } from '@aws-lambda-powertools/commons/utils/base64';

const decoder = new TextDecoder('utf-8');

/**
* Deserialize a base64-encoded primitive value (string).
*
* When customers don't provide a schema configuration, we assume the value is a base64-encoded string.
*
* @param data - The base64-encoded string to deserialize.
*/
const deserialize = (data: string) => {
return decoder.decode(fromBase64(data, 'base64'));
};

export { deserialize };
18 changes: 8 additions & 10 deletions packages/kafka/src/deserializer/protobuf.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
import type { Message } from 'protobufjs';
import { KafkaConsumerDeserializationError } from '../errors.js';
import type { ProtobufMessage } from '../types/types.js';

/**
* Deserialises a Protobuf message from a base64-encoded string.
* Deserialize a Protobuf message from a base64-encoded string.
*
* @template T - The type of the deserialized message object.
*
* @template T - The type of the deserialised message object.
* @param MessageClass - The Protobuf message type definition.
* See {@link MessageType} from '@protobuf-ts/runtime'.
* @param data - The base64-encoded string representing the Protobuf binary data.
* @returns The deserialised message object of type T.
* @throws {KafkaConsumerDeserializationError} If deserialization fails.
* @param messageType - The Protobuf message type definition - see {@link Message | `Message`} from {@link https://www.npmjs.com/package/protobufjs | `protobufjs`}.
*/
export const deserialize = <T>(
data: string,
messageType: ProtobufMessage<T>
): T => {
const deserialize = <T>(data: string, messageType: ProtobufMessage<T>): T => {
try {
const buffer = Buffer.from(data, 'base64');
return messageType.decode(buffer, buffer.length);
Expand All @@ -24,3 +20,5 @@ export const deserialize = <T>(
);
}
};

export { deserialize };
23 changes: 13 additions & 10 deletions packages/kafka/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
* All Kafka consumer errors should extend this class.
*/
class KafkaConsumerError extends Error {
constructor(message: string) {
super(message);
constructor(message: string, options?: ErrorOptions) {
super(message, options);
this.name = 'KafkaConsumerError';
}
}
Expand All @@ -13,8 +13,8 @@ class KafkaConsumerError extends Error {
* Error thrown when a required Protobuf schema is missing during Kafka message consumption.
*/
class KafkaConsumerProtobufMissingSchemaError extends KafkaConsumerError {
constructor(message: string) {
super(message);
constructor(message: string, options?: ErrorOptions) {
super(message, options);
this.name = 'KafkaConsumerProtobufMissingSchemaError';
}
}
Expand All @@ -23,8 +23,8 @@ class KafkaConsumerProtobufMissingSchemaError extends KafkaConsumerError {
* Error thrown when deserialization of a Kafka message fails.
*/
class KafkaConsumerDeserializationError extends KafkaConsumerError {
constructor(message: string) {
super(message);
constructor(message: string, options?: ErrorOptions) {
super(message, options);
this.name = 'KafkaConsumerDeserializationError';
}
}
Expand All @@ -33,15 +33,18 @@ class KafkaConsumerDeserializationError extends KafkaConsumerError {
* Error thrown when a required Avro schema is missing during Kafka message consumption.
*/
class KafkaConsumerAvroMissingSchemaError extends KafkaConsumerError {
constructor(message: string) {
super(message);
constructor(message: string, options?: ErrorOptions) {
super(message, options);
this.name = 'KafkaConsumerAvroMissingSchemaError';
}
}

/**
* Error thrown when parsing a Kafka message fails.
*/
class KafkaConsumerParserError extends KafkaConsumerError {
constructor(message: string) {
super(message);
constructor(message: string, options?: ErrorOptions) {
super(message, options);
this.name = 'KafkaConsumerParserError';
}
}
Expand Down
Loading
Loading