diff --git a/packages/parser/src/helpers/index.ts b/packages/parser/src/helpers/index.ts index 4098233df2..a951237e85 100644 --- a/packages/parser/src/helpers/index.ts +++ b/packages/parser/src/helpers/index.ts @@ -1,5 +1,17 @@ +import { gunzipSync } from 'node:zlib'; +import { fromBase64 } from '@aws-lambda-powertools/commons/utils/base64'; import { type ZodType, z } from 'zod'; +const decoder = new TextDecoder(); + +const decompress = (data: string): string => { + try { + return JSON.parse(gunzipSync(fromBase64(data, 'base64')).toString('utf8')); + } catch { + return data; + } +}; + /** * A helper function to parse a JSON string and validate it against a schema. * @@ -54,4 +66,53 @@ const JSONStringified = (schema: T) => }) .pipe(schema); -export { JSONStringified }; +/** + * A helper function to decode a Base64 string and validate it against a schema. + * + * + * Use it for built-in schemas like `KinesisDataStreamRecordPayload` that have fields that are base64 encoded + * and extend them with your custom schema. + * + * For example, if you have an event with a base64 encoded body similar to the following: + * + * ```json + * { + * // ... other fields + * "data": "e3Rlc3Q6ICJ0ZXN0In0=", + * } + * ``` + * + * You can extend any built-in schema with your custom schema using the `Base64Encoded` helper function. + * + * @example + * ```typescript + * import { Base64Encoded } from '@aws-lambda-powertools/parser/helpers'; + * import { KinesisDataStreamRecordPayload } from '@aws-lambda-powertools/parser/schemas/kinesis'; + * import { z } from 'zod'; + * + * const extendedSchema = KinesisDataStreamRecordPayload.extend({ + * data: Base64Encoded(z.object({ + * test: z.string(), + * })) + * }); + * type _ExtendedKinesisDataStream = z.infer; + * ``` + * + * @param schema - The schema to validate the Base 64 decoded value against + */ +const Base64Encoded = (schema: T) => + z + .string() + .transform((data) => { + const decompressed = decompress(data); + const decoded = decoder.decode(fromBase64(data, 'base64')); + try { + // If data was not compressed, try to parse it as JSON otherwise it must be string + return decompressed === data ? JSON.parse(decoded) : decompressed; + } catch { + return decoded; + } + }) + .pipe(schema); + +export { JSONStringified, Base64Encoded }; diff --git a/packages/parser/src/schemas/kinesis.ts b/packages/parser/src/schemas/kinesis.ts index 882812e3ff..37e0427a21 100644 --- a/packages/parser/src/schemas/kinesis.ts +++ b/packages/parser/src/schemas/kinesis.ts @@ -1,36 +1,16 @@ -import { gunzipSync } from 'node:zlib'; -import { fromBase64 } from '@aws-lambda-powertools/commons/utils/base64'; import { z } from 'zod'; +import { Base64Encoded } from '../helpers/index.js'; import type { KinesisDataStreamEvent } from '../types/schema.js'; import { DynamoDBStreamToKinesisRecord } from './dynamodb.js'; -const decoder = new TextDecoder(); - const KinesisDataStreamRecordPayload = z.object({ kinesisSchemaVersion: z.string(), partitionKey: z.string(), sequenceNumber: z.string(), approximateArrivalTimestamp: z.number(), - data: z.string().transform((data) => { - const decompressed = decompress(data); - const decoded = decoder.decode(fromBase64(data, 'base64')); - try { - // If data was not compressed, try to parse it as JSON otherwise it must be string - return decompressed === data ? JSON.parse(decoded) : decompressed; - } catch { - return decoded; - } - }), + data: Base64Encoded(z.any()), }); -const decompress = (data: string): string => { - try { - return JSON.parse(gunzipSync(fromBase64(data, 'base64')).toString('utf8')); - } catch { - return data; - } -}; - const KinesisDataStreamRecord = z.object({ eventSource: z.literal('aws:kinesis'), eventVersion: z.string(), @@ -46,13 +26,7 @@ const KinesisDynamoDBStreamSchema = z.object({ Records: z.array( KinesisDataStreamRecord.extend({ kinesis: KinesisDataStreamRecordPayload.extend({ - data: z - .string() - .transform((data) => { - const decoded = decoder.decode(fromBase64(data, 'base64')); - return JSON.parse(decoded); - }) - .pipe(DynamoDBStreamToKinesisRecord), + data: Base64Encoded(z.any()).pipe(DynamoDBStreamToKinesisRecord), }), }) ), diff --git a/packages/parser/tests/unit/helpers.test.ts b/packages/parser/tests/unit/helpers.test.ts index 9811234c2d..efb184de0c 100644 --- a/packages/parser/tests/unit/helpers.test.ts +++ b/packages/parser/tests/unit/helpers.test.ts @@ -1,7 +1,13 @@ +import { gzipSync } from 'node:zlib'; +import { + KinesisDataStreamRecord, + KinesisDataStreamRecordPayload, + KinesisDataStreamSchema, +} from 'src/schemas/kinesis.js'; import { describe, expect, it } from 'vitest'; import { z } from 'zod'; import { DynamoDBMarshalled } from '../../src/helpers/dynamodb.js'; -import { JSONStringified } from '../../src/helpers/index.js'; +import { Base64Encoded, JSONStringified } from '../../src/helpers/index.js'; import { AlbSchema } from '../../src/schemas/alb.js'; import { DynamoDBStreamRecord, @@ -14,6 +20,7 @@ import { import { SqsRecordSchema, SqsSchema } from '../../src/schemas/sqs.js'; import type { DynamoDBStreamEvent, + KinesisDataStreamEvent, SnsEvent, SqsEvent, } from '../../src/types/schema.js'; @@ -277,3 +284,129 @@ describe('Helper: DynamoDBMarshalled', () => { expect(() => extendedSchema.parse(event)).toThrow(); }); }); + +describe('Helper: Base64Encoded', () => { + it('returns a valid base64 decoded object when passed an encoded object', () => { + // Prepare + const data = { + body: Buffer.from(JSON.stringify(structuredClone(basePayload))).toString( + 'base64' + ), + }; + + // Act + const extendedSchema = envelopeSchema.extend({ + body: Base64Encoded(bodySchema), + }); + + // Assess + expect(extendedSchema.parse(data)).toStrictEqual({ + body: basePayload, + }); + }); + + it('returns a valid base64 decoded object when passed a compressed object', () => { + // Prepare + const data = { + body: Buffer.from( + gzipSync(JSON.stringify(structuredClone(basePayload))) + ).toString('base64'), + }; + + // Act + const extendedSchema = envelopeSchema.extend({ + body: Base64Encoded(bodySchema), + }); + + // Assess + expect(extendedSchema.parse(data)).toStrictEqual({ + body: basePayload, + }); + }); + + it('throws an error if the payload is does not match the schema', () => { + // Prepare + const data = { + body: Buffer.from( + JSON.stringify({ ...basePayload, email: 'invalid' }) + ).toString('base64'), + }; + + // Act + const extendedSchema = envelopeSchema.extend({ + body: Base64Encoded(bodySchema), + }); + + // Assess + expect(() => extendedSchema.parse(data)).toThrow(); + }); + + it('throws an error if the payload is malformed', () => { + // Prepare + const data = { + body: Buffer.from('{"foo": 1, }').toString('base64'), + }; + + // Act + const extendedSchema = envelopeSchema.extend({ + body: Base64Encoded(bodySchema), + }); + + // Assess + expect(() => extendedSchema.parse(data)).toThrow(); + }); + + it('throws an error if the base64 payload is malformed', () => { + // Prepare + const data = { + body: 'invalid-base64-string', + }; + + // Act + const extendedSchema = envelopeSchema.extend({ + body: Base64Encoded(bodySchema), + }); + + // Assess + expect(() => extendedSchema.parse(data)).toThrow(); + }); + + it('parses extended KinesisDataStreamSchema', () => { + // Prepare + const testEvent = getTestEvent({ + eventsPath: 'kinesis', + filename: 'stream', + }); + const stringifiedBody = JSON.stringify(basePayload); + testEvent.Records[0].kinesis.data = + Buffer.from(stringifiedBody).toString('base64'); + testEvent.Records[1].kinesis.data = + Buffer.from(stringifiedBody).toString('base64'); + + // Act + const extendedSchema = KinesisDataStreamSchema.extend({ + Records: z.array( + KinesisDataStreamRecord.extend({ + kinesis: KinesisDataStreamRecordPayload.extend({ + data: Base64Encoded(bodySchema), + }), + }) + ), + }); + + // Assess + expect(extendedSchema.parse(testEvent)).toStrictEqual({ + ...testEvent, + Records: [ + { + ...testEvent.Records[0], + kinesis: { ...testEvent.Records[0].kinesis, data: basePayload }, + }, + { + ...testEvent.Records[1], + kinesis: { ...testEvent.Records[1].kinesis, data: basePayload }, + }, + ], + }); + }); +});