diff --git a/packages/kafka/src/consumer.ts b/packages/kafka/src/consumer.ts index 6f73b54351..6099f116d6 100644 --- a/packages/kafka/src/consumer.ts +++ b/packages/kafka/src/consumer.ts @@ -2,14 +2,19 @@ import type { AsyncHandler } from '@aws-lambda-powertools/commons/types'; import { isNull, isRecord } from '@aws-lambda-powertools/commons/typeutils'; import type { StandardSchemaV1 } from '@standard-schema/spec'; import type { Context, Handler } from 'aws-lambda'; +import { deserialize as deserializeJson } from './deserializer/json.js'; +import { deserialize as deserializePrimitive } from './deserializer/primitive.js'; import { KafkaConsumerAvroMissingSchemaError, + KafkaConsumerDeserializationError, + KafkaConsumerError, KafkaConsumerParserError, KafkaConsumerProtobufMissingSchemaError, } from './errors.js'; import type { ConsumerRecord, ConsumerRecords, + Deserializer, Record as KafkaRecord, MSKEvent, SchemaConfig, @@ -27,7 +32,7 @@ const assertIsMSKEvent = (event: unknown): event is MSKEvent => { !isRecord(event.records) || !Object.values(event.records).every((arr) => Array.isArray(arr)) ) { - throw new Error( + throw new KafkaConsumerError( 'Event is not a valid MSKEvent. Expected an object with a "records" property.' ); } @@ -69,69 +74,80 @@ const deserializeHeaders = (headers: Record[] | null) => { * @param config - The schema configuration to use for deserialization. See {@link SchemaConfigValue | `SchemaConfigValue`}. * If not provided, the value is decoded as a UTF-8 string. */ -const deserialize = async (value: string, config?: SchemaConfigValue) => { - // no config -> default to base64 decoding +const deserialize = ( + value: string, + deserializer: Deserializer, + config?: SchemaConfigValue +) => { if (config === undefined) { - return Buffer.from(value, 'base64').toString(); + return deserializer(value); } - - // if config is provided, we expect it to have a specific type - if (!['json', 'avro', 'protobuf'].includes(config.type)) { - throw new Error( - `Unsupported deserialization type: ${config.type}. Supported types are: json, avro, protobuf.` - ); - } - if (config.type === 'json') { - const deserializer = await import('./deserializer/json.js'); - return deserializer.deserialize(value); + return deserializer(value); } if (config.type === 'avro') { if (!config.schema) { throw new KafkaConsumerAvroMissingSchemaError( - 'Schema string is required for Avro deserialization' + 'Schema string is required for avro deserialization' ); } - const deserializer = await import('./deserializer/avro.js'); - return deserializer.deserialize(value, config.schema); + return deserializer(value, config.schema); } if (config.type === 'protobuf') { if (!config.schema) { throw new KafkaConsumerProtobufMissingSchemaError( - 'Schema string is required for Protobuf deserialization' + 'Schema string is required for protobuf deserialization' ); } - const deserializer = await import('./deserializer/protobuf.js'); - return deserializer.deserialize(value, config.schema); + return deserializer(value, config.schema); } }; /** - * Deserialize the key of a Kafka record. + * Get the deserializer function based on the provided type. * - * If the key is `undefined`, it returns `undefined`. - * - * @param key - The base64-encoded key to deserialize. - * @param config - The schema configuration for deserializing the key. See {@link SchemaConfigValue | `SchemaConfigValue`}. + * @param type - The type of deserializer to use. Supported types are: `json`, `avro`, `protobuf`, or `undefined`. + * If `undefined`, it defaults to deserializing as a primitive string. */ -const deserializeKey = async (key?: string, config?: SchemaConfigValue) => { - if (key === undefined || key === '') { - return undefined; +const getDeserializer = async (type?: string) => { + if (!type) { + return deserializePrimitive as Deserializer; + } + if (type === 'json') { + return deserializeJson as Deserializer; + } + if (type === 'protobuf') { + const deserializer = await import('./deserializer/protobuf.js'); + return deserializer.deserialize as Deserializer; + } + if (type === 'avro') { + const deserializer = await import('./deserializer/avro.js'); + return deserializer.deserialize as Deserializer; } - if (isNull(key)) return null; - return await deserialize(key, config); + throw new KafkaConsumerDeserializationError( + `Unsupported deserialization type: ${type}. Supported types are: json, avro, protobuf.` + ); }; -const parseSchema = async (value: unknown, schema: StandardSchemaV1) => { - let result = schema['~standard'].validate(value); +/** + * Parse a value against a provided schema using the `~standard` property for validation. + * + * @param value - The value to parse against the schema. + * @param schema - The schema to validate against, which should be a {@link StandardSchemaV1 | `Standard Schema V1`} object. + */ +const parseSchema = (value: unknown, schema: StandardSchemaV1) => { + const result = schema['~standard'].validate(value); /* v8 ignore start */ - if (result instanceof Promise) result = await result; - /* v8 ignore stop */ - if (result.issues) { + if (result instanceof Promise) throw new KafkaConsumerParserError( - `Schema validation failed ${result.issues}` + 'Schema parsing supports only synchronous validation' ); + /* v8 ignore stop */ + if (result.issues) { + throw new KafkaConsumerParserError('Schema validation failed', { + cause: result.issues, + }); } return result.value; }; @@ -142,24 +158,45 @@ const parseSchema = async (value: unknown, schema: StandardSchemaV1) => { * @param record - A single record from the MSK event. * @param config - The schema configuration for deserializing the record's key and value. */ -const deserializeRecord = async (record: KafkaRecord, config: SchemaConfig) => { +const deserializeRecord = async ( + record: KafkaRecord, + config?: SchemaConfig +) => { const { key, value, headers, ...rest } = record; - const { key: keyConfig, value: valueConfig } = config; + const { key: keyConfig, value: valueConfig } = config || {}; - const deserializedKey = await deserializeKey(key, keyConfig); - const deserializedValue = await deserialize(value, valueConfig); + const deserializerKey = await getDeserializer(keyConfig?.type); + const deserializerValue = await getDeserializer(valueConfig?.type); return { ...rest, - key: keyConfig?.parserSchema - ? await parseSchema(deserializedKey, keyConfig.parserSchema) - : deserializedKey, - value: valueConfig?.parserSchema - ? await parseSchema(deserializedValue, valueConfig.parserSchema) - : deserializedValue, + get key() { + if (key === undefined || key === '') { + return undefined; + } + if (isNull(key)) return null; + const deserializedKey = deserialize(key, deserializerKey, keyConfig); + + return keyConfig?.parserSchema + ? parseSchema(deserializedKey, keyConfig.parserSchema) + : deserializedKey; + }, originalKey: key, + get value() { + const deserializedValue = deserialize( + value, + deserializerValue, + valueConfig + ); + + return valueConfig?.parserSchema + ? parseSchema(deserializedValue, valueConfig.parserSchema) + : deserializedValue; + }, originalValue: value, - headers: deserializeHeaders(headers), + get headers() { + return deserializeHeaders(headers); + }, originalHeaders: headers, }; }; @@ -202,7 +239,7 @@ const deserializeRecord = async (record: KafkaRecord, config: SchemaConfig) => { */ const kafkaConsumer = ( handler: AsyncHandler>>, - config: SchemaConfig + config?: SchemaConfig ): ((event: MSKEvent, context: Context) => Promise) => { return async (event: MSKEvent, context: Context): Promise => { assertIsMSKEvent(event); @@ -210,7 +247,12 @@ const kafkaConsumer = ( const consumerRecords: ConsumerRecord[] = []; for (const recordsArray of Object.values(event.records)) { for (const record of recordsArray) { - consumerRecords.push(await deserializeRecord(record, config)); + consumerRecords.push( + (await deserializeRecord( + record, + config + )) as unknown as ConsumerRecord + ); } } diff --git a/packages/kafka/src/deserializer/avro.ts b/packages/kafka/src/deserializer/avro.ts index 91d8721464..19d2888035 100644 --- a/packages/kafka/src/deserializer/avro.ts +++ b/packages/kafka/src/deserializer/avro.ts @@ -7,7 +7,7 @@ import { KafkaConsumerDeserializationError } from '../errors.js'; * @param data - The base64-encoded string representing the Avro binary data. * @param schema - The Avro schema as a JSON string. */ -export const deserialize = async (data: string, schema: string) => { +const deserialize = (data: string, schema: string) => { try { const type = avro.parse(schema); const buffer = Buffer.from(data, 'base64'); @@ -18,3 +18,5 @@ export const deserialize = async (data: string, schema: string) => { ); } }; + +export { deserialize }; diff --git a/packages/kafka/src/deserializer/json.ts b/packages/kafka/src/deserializer/json.ts index 30e937bff1..4b716101f5 100644 --- a/packages/kafka/src/deserializer/json.ts +++ b/packages/kafka/src/deserializer/json.ts @@ -1,19 +1,23 @@ +import { deserialize as deserializePrimitive } from './primitive.js'; + /** - * Deserializes a base64 encoded string into either a JSON object or plain string + * Deserialize a base64 encoded string into either a JSON object or plain string + * * @param data - The base64 encoded string to deserialize * @returns The deserialized data as either a JSON object or string */ -export const deserialize = async (data: string) => { - // Decode the base64 string to a buffer - const decoded = Buffer.from(data, 'base64'); +const deserialize = (data: string) => { + const plainText = deserializePrimitive(data); try { // Attempt to parse the decoded data as JSON // we assume it's a JSON but it can also be a string, we don't know - return JSON.parse(decoded.toString()); + return JSON.parse(plainText); } catch (error) { // If JSON parsing fails, log the error and return the decoded string // in case we could not parse it we return the base64 decoded value console.error(`Failed to parse JSON from base64 value: ${data}`, error); - return decoded.toString(); + return plainText; } }; + +export { deserialize }; diff --git a/packages/kafka/src/deserializer/primitive.ts b/packages/kafka/src/deserializer/primitive.ts new file mode 100644 index 0000000000..27dc286bb6 --- /dev/null +++ b/packages/kafka/src/deserializer/primitive.ts @@ -0,0 +1,16 @@ +import { fromBase64 } from '@aws-lambda-powertools/commons/utils/base64'; + +const decoder = new TextDecoder('utf-8'); + +/** + * Deserialize a base64-encoded primitive value (string). + * + * When customers don't provide a schema configuration, we assume the value is a base64-encoded string. + * + * @param data - The base64-encoded string to deserialize. + */ +const deserialize = (data: string) => { + return decoder.decode(fromBase64(data, 'base64')); +}; + +export { deserialize }; diff --git a/packages/kafka/src/deserializer/protobuf.ts b/packages/kafka/src/deserializer/protobuf.ts index f6c323e5f8..e4f107e4d9 100644 --- a/packages/kafka/src/deserializer/protobuf.ts +++ b/packages/kafka/src/deserializer/protobuf.ts @@ -1,20 +1,16 @@ +import type { Message } from 'protobufjs'; import { KafkaConsumerDeserializationError } from '../errors.js'; import type { ProtobufMessage } from '../types/types.js'; /** - * Deserialises a Protobuf message from a base64-encoded string. + * Deserialize a Protobuf message from a base64-encoded string. + * + * @template T - The type of the deserialized message object. * - * @template T - The type of the deserialised message object. - * @param MessageClass - The Protobuf message type definition. - * See {@link MessageType} from '@protobuf-ts/runtime'. * @param data - The base64-encoded string representing the Protobuf binary data. - * @returns The deserialised message object of type T. - * @throws {KafkaConsumerDeserializationError} If deserialization fails. + * @param messageType - The Protobuf message type definition - see {@link Message | `Message`} from {@link https://www.npmjs.com/package/protobufjs | `protobufjs`}. */ -export const deserialize = ( - data: string, - messageType: ProtobufMessage -): T => { +const deserialize = (data: string, messageType: ProtobufMessage): T => { try { const buffer = Buffer.from(data, 'base64'); return messageType.decode(buffer, buffer.length); @@ -24,3 +20,5 @@ export const deserialize = ( ); } }; + +export { deserialize }; diff --git a/packages/kafka/src/errors.ts b/packages/kafka/src/errors.ts index 5eac573f17..dffea255a5 100644 --- a/packages/kafka/src/errors.ts +++ b/packages/kafka/src/errors.ts @@ -3,8 +3,8 @@ * All Kafka consumer errors should extend this class. */ class KafkaConsumerError extends Error { - constructor(message: string) { - super(message); + constructor(message: string, options?: ErrorOptions) { + super(message, options); this.name = 'KafkaConsumerError'; } } @@ -13,8 +13,8 @@ class KafkaConsumerError extends Error { * Error thrown when a required Protobuf schema is missing during Kafka message consumption. */ class KafkaConsumerProtobufMissingSchemaError extends KafkaConsumerError { - constructor(message: string) { - super(message); + constructor(message: string, options?: ErrorOptions) { + super(message, options); this.name = 'KafkaConsumerProtobufMissingSchemaError'; } } @@ -23,8 +23,8 @@ class KafkaConsumerProtobufMissingSchemaError extends KafkaConsumerError { * Error thrown when deserialization of a Kafka message fails. */ class KafkaConsumerDeserializationError extends KafkaConsumerError { - constructor(message: string) { - super(message); + constructor(message: string, options?: ErrorOptions) { + super(message, options); this.name = 'KafkaConsumerDeserializationError'; } } @@ -33,15 +33,18 @@ class KafkaConsumerDeserializationError extends KafkaConsumerError { * Error thrown when a required Avro schema is missing during Kafka message consumption. */ class KafkaConsumerAvroMissingSchemaError extends KafkaConsumerError { - constructor(message: string) { - super(message); + constructor(message: string, options?: ErrorOptions) { + super(message, options); this.name = 'KafkaConsumerAvroMissingSchemaError'; } } +/** + * Error thrown when parsing a Kafka message fails. + */ class KafkaConsumerParserError extends KafkaConsumerError { - constructor(message: string) { - super(message); + constructor(message: string, options?: ErrorOptions) { + super(message, options); this.name = 'KafkaConsumerParserError'; } } diff --git a/packages/kafka/src/types/types.ts b/packages/kafka/src/types/types.ts index 8ea00b8cf6..645ae9d418 100644 --- a/packages/kafka/src/types/types.ts +++ b/packages/kafka/src/types/types.ts @@ -9,27 +9,47 @@ type ConsumerRecord = { /** * The deserialized key of the record */ - key: K | undefined; + key: K; /** * The deserialized value of the record */ value: V; /** - * The original (raw, encoded) key as received from Kafka, or undefined if not present + * The original (raw, encoded) key as received from Kafka, or `undefined` if not present */ - originalKey: string | undefined; + originalKey?: string; /** - * The original (raw, encoded) value as received from Kafka, or undefined if not present + * The original (raw, encoded) value as received from Kafka */ originalValue: string; /** - * Optional array of headers as key-value string pairs, or null/undefined if not present + * Optional array of headers as key-value string pairs, or `null/`undefined` if not present */ headers?: { [k: string]: string }[] | null; /** * Optional array of original record headers */ originalHeaders?: RecordHeader[] | null; + /** + * The topic from which the record was consumed + */ + topic: string; + /** + * The partition from which the record was consumed + */ + partition: number; + /** + * The offset of the record within the partition + */ + offset: number; + /** + * The timestamp of the record + */ + timestamp: number; + /** + * The type of timestamp (CREATE_TIME or LOG_APPEND_TIME) + */ + timestampType: 'CREATE_TIME' | 'LOG_APPEND_TIME'; }; /** @@ -61,7 +81,7 @@ type JsonConfig = { */ type: typeof SchemaTypeMap.JSON; /** - * Optional Zod schema for runtime validation + * Optional {@link https://github.com/standard-schema/standard-schema | Standard Schema} for runtime validation */ parserSchema?: StandardSchemaV1; }; @@ -79,10 +99,11 @@ type AvroConfig = { */ schema: string; /** - * Optional Zod schema for runtime validation + * Optional {@link https://github.com/standard-schema/standard-schema | Standard Schema} for runtime validation */ parserSchema?: StandardSchemaV1; }; + /** * Configuration for Protobuf schema validation. */ @@ -96,7 +117,7 @@ type ProtobufConfig = { */ schema: ProtobufMessage; /** - * Optional Zod schema for runtime validation + * Optional {@link https://github.com/standard-schema/standard-schema | Standard Schema} for runtime validation */ parserSchema?: StandardSchemaV1; }; @@ -165,10 +186,9 @@ interface Record { headers: RecordHeader[]; } -// https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html /** * AWS Lambda event structure for MSK (Managed Streaming for Kafka). - * See: https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html + * @see {@link https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html | AWS Lambda with MSK} */ interface MSKEvent { /** @@ -195,12 +215,7 @@ interface ProtobufMessage { decode(reader: Reader | Uint8Array, length?: number): T; } -interface Deserializer { - deserialize( - input: string, - schema: string | ProtobufMessage - ): unknown; -} +type Deserializer = (input: string, schema?: unknown) => unknown; export type { ConsumerRecord, diff --git a/packages/kafka/tests/unit/consumer.test.ts b/packages/kafka/tests/unit/consumer.test.ts index 04b6caf201..9884b7a531 100644 --- a/packages/kafka/tests/unit/consumer.test.ts +++ b/packages/kafka/tests/unit/consumer.test.ts @@ -5,7 +5,6 @@ import { describe, expect, it } from 'vitest'; import { z } from 'zod'; import { KafkaConsumerAvroMissingSchemaError, - KafkaConsumerParserError, KafkaConsumerProtobufMissingSchemaError, } from '../../src/errors.js'; import { SchemaType, kafkaConsumer } from '../../src/index.js'; @@ -25,7 +24,6 @@ describe('Kafka consumer', () => { type Key = z.infer; type Product = z.infer; - type SerializationType = 'json' | 'avro' | 'protobuf'; const jsonTestEvent = JSON.parse( readFileSync(join(__dirname, '..', 'events', 'default.json'), 'utf-8') @@ -166,30 +164,51 @@ describe('Kafka consumer', () => { it.each([ { - type: 'avro' as Extract, - event: avroTestEvent, - error: KafkaConsumerAvroMissingSchemaError, - }, - { - type: 'protobuf' as Extract, + type: SchemaType.PROTOBUF, event: protobufTestEvent, error: KafkaConsumerProtobufMissingSchemaError, }, + { + type: SchemaType.AVRO, + event: avroTestEvent, + error: KafkaConsumerAvroMissingSchemaError, + }, ])( 'throws when schemaStr not passed for $type event', async ({ type, error, event }) => { // Prepare - const handler = kafkaConsumer(baseHandler, { - // @ts-expect-error - testing missing schemaStr - value: { type }, - }); + const handler = kafkaConsumer( + async (event) => { + for (const record of event.records) { + try { + return record.value; + } catch (error) { + return error; + } + } + }, + { + // @ts-expect-error - testing missing schemaStr + value: { type }, + } + ); - // Act & Assess - await expect(handler(event, context)).rejects.toThrow(error); + // Act + const result = await handler(event, context); + + // Assess + expect(result).toEqual( + expect.objectContaining({ + message: expect.stringContaining( + `Schema string is required for ${type} deserialization` + ), + name: error.name, + }) + ); } ); - it('throws if schema type is not json, avro or protobuf', async () => { + it('throws if using an unsupported schema type', async () => { // Prepare const handler = kafkaConsumer(baseHandler, { value: { @@ -199,7 +218,14 @@ describe('Kafka consumer', () => { }); // Act & Assess - await expect(handler(jsonTestEvent, context)).rejects.toThrow(); + await expect(handler(jsonTestEvent, context)).rejects.toEqual( + expect.objectContaining({ + message: expect.stringContaining( + 'Unsupported deserialization type: xml. Supported types are: json, avro, protobuf.' + ), + name: 'KafkaConsumerDeserializationError', + }) + ); }); it('deserializes with no headers provided', async () => { @@ -265,24 +291,50 @@ describe('Kafka consumer', () => { }, } as unknown as MSKEvent, }, - ])('throws when zod schema validation fails for $type', async ({ event }) => { - // Prepare - const handler = kafkaConsumer(baseHandler, { - value: { - type: SchemaType.JSON, - parserSchema: valueZodSchema, - }, - key: { - type: SchemaType.JSON, - parserSchema: keyZodSchema, - }, - }); + ])( + 'throws when parser schema validation fails for $type', + async ({ event }) => { + // Prepare + const handler = kafkaConsumer( + async (event) => { + for (const record of event.records) { + try { + const { value, key } = record; + return [value, key]; + } catch (error) { + return error; + } + } + }, + { + value: { + type: SchemaType.JSON, + parserSchema: valueZodSchema, + }, + key: { + type: SchemaType.JSON, + parserSchema: keyZodSchema, + }, + } + ); - // Act & Assess - await expect(handler(event, context)).rejects.toThrow( - KafkaConsumerParserError - ); - }); + // Act & Assess + const result = await handler(event, context); + + expect(result).toEqual( + expect.objectContaining({ + message: expect.stringContaining('Schema validation failed'), + name: 'KafkaConsumerParserError', + cause: expect.arrayContaining([ + expect.objectContaining({ + code: expect.any(String), + message: expect.any(String), + }), + ]), + }) + ); + } + ); it('throws when non MSK event passed kafka consumer', async () => { // Prepare @@ -342,4 +394,104 @@ describe('Kafka consumer', () => { ...TEST_DATA.headers.withoutHeaders, }); }); + + it.each([ + { + type: 'undefined', + keyValue: undefined, + }, + { + type: 'empty string', + keyValue: '', + }, + ])('handles empty keys gracefully $type', async ({ keyValue }) => { + // Prepare + const handler = kafkaConsumer(baseHandler, { + value: { type: 'json' }, + key: { type: 'json' }, + }); + + const customEvent = { + ...jsonTestEvent, + records: { + 'test-topic': [ + { + key: keyValue, + value: TEST_DATA.json.originalValue, + headers: null, + }, + ], + }, + } as unknown as MSKEvent; + + // Act + const result = (await handler(customEvent, context)) as ConsumerRecords; + + // Assess + expect(result.records[0].key).toBeUndefined(); + }); + + it('handles null keys gracefully', async () => { + // Prepare + const handler = kafkaConsumer(baseHandler, { + value: { type: 'json' }, + key: { type: 'json' }, + }); + + const customEvent = { + ...jsonTestEvent, + records: { + 'test-topic': [ + { + key: null, + value: TEST_DATA.json.originalValue, + headers: null, + }, + ], + }, + } as unknown as MSKEvent; + + // Act + const result = (await handler(customEvent, context)) as ConsumerRecords; + + // Assess + expect(result.records[0].key).toBeNull(); + }); + + it('defaults to primitive types when no SchemaConfig is provided', async () => { + // Prepare + const handler = kafkaConsumer(baseHandler); + + // Act + const result = (await handler(jsonTestEvent, context)) as ConsumerRecords< + unknown, + unknown + >; + + // Assess + expect(result.records[0]).toEqual({ + key: 'recordKey', + value: JSON.stringify( + { id: 12345, name: 'product5', price: 45 }, + null, + 2 + ), + originalKey: 'cmVjb3JkS2V5', + originalValue: + 'ewogICJpZCI6IDEyMzQ1LAogICJuYW1lIjogInByb2R1Y3Q1IiwKICAicHJpY2UiOiA0NQp9', + headers: [ + { + headerKey: 'headerValue', + }, + ], + originalHeaders: [ + { headerKey: [104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101] }, + ], + topic: 'mytopic', + partition: 0, + offset: 15, + timestamp: 1545084650987, + timestampType: 'CREATE_TIME', + }); + }); }); diff --git a/packages/kafka/tests/unit/deserializer.avro.test.ts b/packages/kafka/tests/unit/deserializer.avro.test.ts index a0dd6486e8..9893b5c258 100644 --- a/packages/kafka/tests/unit/deserializer.avro.test.ts +++ b/packages/kafka/tests/unit/deserializer.avro.test.ts @@ -34,7 +34,7 @@ describe('Avro Deserializer: ', () => { }`; // Invalid schema, missing "price" field // Act & Assess - await expect(deserialize(message, schema)).rejects.toThrow( + expect(() => deserialize(message, schema)).toThrow( KafkaConsumerDeserializationError ); }); @@ -53,7 +53,7 @@ describe('Avro Deserializer: ', () => { }`; // Valid schema, but does not match the message content // Act & Assess - await expect(deserialize(message, schema)).rejects.toThrow( + expect(() => deserialize(message, schema)).toThrow( KafkaConsumerDeserializationError ); });