From 46d5b14c2146245203a3ed49955c74a0aa86d6d9 Mon Sep 17 00:00:00 2001 From: Swopnil Dangol Date: Wed, 3 Sep 2025 18:08:14 +0100 Subject: [PATCH 1/5] Created the Base64Encoded helper to decode base64 strings and validate against schema --- packages/parser/src/helpers/index.ts | 63 ++++++++++++- packages/parser/tests/unit/helpers.test.ts | 100 ++++++++++++++++++++- 2 files changed, 161 insertions(+), 2 deletions(-) diff --git a/packages/parser/src/helpers/index.ts b/packages/parser/src/helpers/index.ts index 4098233df2..a951237e85 100644 --- a/packages/parser/src/helpers/index.ts +++ b/packages/parser/src/helpers/index.ts @@ -1,5 +1,17 @@ +import { gunzipSync } from 'node:zlib'; +import { fromBase64 } from '@aws-lambda-powertools/commons/utils/base64'; import { type ZodType, z } from 'zod'; +const decoder = new TextDecoder(); + +const decompress = (data: string): string => { + try { + return JSON.parse(gunzipSync(fromBase64(data, 'base64')).toString('utf8')); + } catch { + return data; + } +}; + /** * A helper function to parse a JSON string and validate it against a schema. * @@ -54,4 +66,53 @@ const JSONStringified = (schema: T) => }) .pipe(schema); -export { JSONStringified }; +/** + * A helper function to decode a Base64 string and validate it against a schema. + * + * + * Use it for built-in schemas like `KinesisDataStreamRecordPayload` that have fields that are base64 encoded + * and extend them with your custom schema. + * + * For example, if you have an event with a base64 encoded body similar to the following: + * + * ```json + * { + * // ... other fields + * "data": "e3Rlc3Q6ICJ0ZXN0In0=", + * } + * ``` + * + * You can extend any built-in schema with your custom schema using the `Base64Encoded` helper function. + * + * @example + * ```typescript + * import { Base64Encoded } from '@aws-lambda-powertools/parser/helpers'; + * import { KinesisDataStreamRecordPayload } from '@aws-lambda-powertools/parser/schemas/kinesis'; + * import { z } from 'zod'; + * + * const extendedSchema = KinesisDataStreamRecordPayload.extend({ + * data: Base64Encoded(z.object({ + * test: z.string(), + * })) + * }); + * type _ExtendedKinesisDataStream = z.infer; + * ``` + * + * @param schema - The schema to validate the Base 64 decoded value against + */ +const Base64Encoded = (schema: T) => + z + .string() + .transform((data) => { + const decompressed = decompress(data); + const decoded = decoder.decode(fromBase64(data, 'base64')); + try { + // If data was not compressed, try to parse it as JSON otherwise it must be string + return decompressed === data ? JSON.parse(decoded) : decompressed; + } catch { + return decoded; + } + }) + .pipe(schema); + +export { JSONStringified, Base64Encoded }; diff --git a/packages/parser/tests/unit/helpers.test.ts b/packages/parser/tests/unit/helpers.test.ts index 9811234c2d..f5aa87fb68 100644 --- a/packages/parser/tests/unit/helpers.test.ts +++ b/packages/parser/tests/unit/helpers.test.ts @@ -1,7 +1,12 @@ +import { + KinesisDataStreamRecord, + KinesisDataStreamRecordPayload, + KinesisDataStreamSchema, +} from 'src/schemas/kinesis.js'; import { describe, expect, it } from 'vitest'; import { z } from 'zod'; import { DynamoDBMarshalled } from '../../src/helpers/dynamodb.js'; -import { JSONStringified } from '../../src/helpers/index.js'; +import { Base64Encoded, JSONStringified } from '../../src/helpers/index.js'; import { AlbSchema } from '../../src/schemas/alb.js'; import { DynamoDBStreamRecord, @@ -14,6 +19,7 @@ import { import { SqsRecordSchema, SqsSchema } from '../../src/schemas/sqs.js'; import type { DynamoDBStreamEvent, + KinesisDataStreamEvent, SnsEvent, SqsEvent, } from '../../src/types/schema.js'; @@ -277,3 +283,95 @@ describe('Helper: DynamoDBMarshalled', () => { expect(() => extendedSchema.parse(event)).toThrow(); }); }); + +describe('Helper: Base64Encoded', () => { + it('returns a valid base64 decoded payload', () => { + // Prepare + const data = { + body: Buffer.from(JSON.stringify(structuredClone(basePayload))).toString( + 'base64' + ), + }; + + // Act + const extendedSchema = envelopeSchema.extend({ + body: Base64Encoded(bodySchema), + }); + + // Assess + expect(extendedSchema.parse(data)).toStrictEqual({ + body: basePayload, + }); + }); + + it('throws an error if the payload is invalid', () => { + // Prepare + const data = { + body: Buffer.from( + JSON.stringify({ ...basePayload, email: 'invalid' }) + ).toString('base64'), + }; + + // Act + const extendedSchema = envelopeSchema.extend({ + body: Base64Encoded(bodySchema), + }); + + // Assess + expect(() => extendedSchema.parse(data)).toThrow(); + }); + + it('throws an error if the base64 payload is malformed', () => { + // Prepare + const data = { + body: 'invalid-base64-string', + }; + + // Act + const extendedSchema = envelopeSchema.extend({ + body: Base64Encoded(bodySchema), + }); + + // Assess + expect(() => extendedSchema.parse(data)).toThrow(); + }); + + it('parses extended KinesisDataStreamSchema', () => { + // Prepare + const testEvent = getTestEvent({ + eventsPath: 'kinesis', + filename: 'stream', + }); + const stringifiedBody = JSON.stringify(basePayload); + testEvent.Records[0].kinesis.data = + Buffer.from(stringifiedBody).toString('base64'); + testEvent.Records[1].kinesis.data = + Buffer.from(stringifiedBody).toString('base64'); + + // Act + const extendedSchema = KinesisDataStreamSchema.extend({ + Records: z.array( + KinesisDataStreamRecord.extend({ + kinesis: KinesisDataStreamRecordPayload.extend({ + data: Base64Encoded(bodySchema), + }), + }) + ), + }); + + // Assess + expect(extendedSchema.parse(testEvent)).toStrictEqual({ + ...testEvent, + Records: [ + { + ...testEvent.Records[0], + kinesis: { ...testEvent.Records[0].kinesis, data: basePayload }, + }, + { + ...testEvent.Records[1], + kinesis: { ...testEvent.Records[1].kinesis, data: basePayload }, + }, + ], + }); + }); +}); From bddcb82c7c95098135af907d3977921edf15493e Mon Sep 17 00:00:00 2001 From: Swopnil Dangol Date: Wed, 3 Sep 2025 18:34:27 +0100 Subject: [PATCH 2/5] Added test cases to cover all lines --- packages/parser/tests/unit/helpers.test.ts | 39 ++++++++++++++++++++-- 1 file changed, 37 insertions(+), 2 deletions(-) diff --git a/packages/parser/tests/unit/helpers.test.ts b/packages/parser/tests/unit/helpers.test.ts index f5aa87fb68..efb184de0c 100644 --- a/packages/parser/tests/unit/helpers.test.ts +++ b/packages/parser/tests/unit/helpers.test.ts @@ -1,3 +1,4 @@ +import { gzipSync } from 'node:zlib'; import { KinesisDataStreamRecord, KinesisDataStreamRecordPayload, @@ -285,7 +286,7 @@ describe('Helper: DynamoDBMarshalled', () => { }); describe('Helper: Base64Encoded', () => { - it('returns a valid base64 decoded payload', () => { + it('returns a valid base64 decoded object when passed an encoded object', () => { // Prepare const data = { body: Buffer.from(JSON.stringify(structuredClone(basePayload))).toString( @@ -304,7 +305,26 @@ describe('Helper: Base64Encoded', () => { }); }); - it('throws an error if the payload is invalid', () => { + it('returns a valid base64 decoded object when passed a compressed object', () => { + // Prepare + const data = { + body: Buffer.from( + gzipSync(JSON.stringify(structuredClone(basePayload))) + ).toString('base64'), + }; + + // Act + const extendedSchema = envelopeSchema.extend({ + body: Base64Encoded(bodySchema), + }); + + // Assess + expect(extendedSchema.parse(data)).toStrictEqual({ + body: basePayload, + }); + }); + + it('throws an error if the payload is does not match the schema', () => { // Prepare const data = { body: Buffer.from( @@ -321,6 +341,21 @@ describe('Helper: Base64Encoded', () => { expect(() => extendedSchema.parse(data)).toThrow(); }); + it('throws an error if the payload is malformed', () => { + // Prepare + const data = { + body: Buffer.from('{"foo": 1, }').toString('base64'), + }; + + // Act + const extendedSchema = envelopeSchema.extend({ + body: Base64Encoded(bodySchema), + }); + + // Assess + expect(() => extendedSchema.parse(data)).toThrow(); + }); + it('throws an error if the base64 payload is malformed', () => { // Prepare const data = { From 22f1c9f029b118c1e1b4201d17131ebe41784b53 Mon Sep 17 00:00:00 2001 From: Swopnil Dangol Date: Thu, 4 Sep 2025 09:44:42 +0100 Subject: [PATCH 3/5] Used the helper function in the kinesis schema example --- packages/parser/src/schemas/kinesis.ts | 23 ++--------------------- 1 file changed, 2 insertions(+), 21 deletions(-) diff --git a/packages/parser/src/schemas/kinesis.ts b/packages/parser/src/schemas/kinesis.ts index 882812e3ff..c457915971 100644 --- a/packages/parser/src/schemas/kinesis.ts +++ b/packages/parser/src/schemas/kinesis.ts @@ -1,36 +1,17 @@ -import { gunzipSync } from 'node:zlib'; import { fromBase64 } from '@aws-lambda-powertools/commons/utils/base64'; +import { Base64Encoded } from 'src/helpers/index.js'; import { z } from 'zod'; import type { KinesisDataStreamEvent } from '../types/schema.js'; import { DynamoDBStreamToKinesisRecord } from './dynamodb.js'; -const decoder = new TextDecoder(); - const KinesisDataStreamRecordPayload = z.object({ kinesisSchemaVersion: z.string(), partitionKey: z.string(), sequenceNumber: z.string(), approximateArrivalTimestamp: z.number(), - data: z.string().transform((data) => { - const decompressed = decompress(data); - const decoded = decoder.decode(fromBase64(data, 'base64')); - try { - // If data was not compressed, try to parse it as JSON otherwise it must be string - return decompressed === data ? JSON.parse(decoded) : decompressed; - } catch { - return decoded; - } - }), + data: Base64Encoded(z.any()), }); -const decompress = (data: string): string => { - try { - return JSON.parse(gunzipSync(fromBase64(data, 'base64')).toString('utf8')); - } catch { - return data; - } -}; - const KinesisDataStreamRecord = z.object({ eventSource: z.literal('aws:kinesis'), eventVersion: z.string(), From a33acaffaa76f40b940ff4c4d7d2dec1ef9f0652 Mon Sep 17 00:00:00 2001 From: Swopnil Dangol Date: Thu, 4 Sep 2025 09:47:54 +0100 Subject: [PATCH 4/5] Used the helper function for KinesisDynamoDBStreamSchema as well --- packages/parser/src/schemas/kinesis.ts | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/packages/parser/src/schemas/kinesis.ts b/packages/parser/src/schemas/kinesis.ts index c457915971..e4bb3edee6 100644 --- a/packages/parser/src/schemas/kinesis.ts +++ b/packages/parser/src/schemas/kinesis.ts @@ -1,4 +1,3 @@ -import { fromBase64 } from '@aws-lambda-powertools/commons/utils/base64'; import { Base64Encoded } from 'src/helpers/index.js'; import { z } from 'zod'; import type { KinesisDataStreamEvent } from '../types/schema.js'; @@ -27,13 +26,7 @@ const KinesisDynamoDBStreamSchema = z.object({ Records: z.array( KinesisDataStreamRecord.extend({ kinesis: KinesisDataStreamRecordPayload.extend({ - data: z - .string() - .transform((data) => { - const decoded = decoder.decode(fromBase64(data, 'base64')); - return JSON.parse(decoded); - }) - .pipe(DynamoDBStreamToKinesisRecord), + data: Base64Encoded(z.any()).pipe(DynamoDBStreamToKinesisRecord), }), }) ), From f1ab6d5c6c0e317c730b996e50a302c61a41b464 Mon Sep 17 00:00:00 2001 From: Swopnil Dangol Date: Thu, 4 Sep 2025 09:58:23 +0100 Subject: [PATCH 5/5] Fixed the import path of the helper --- packages/parser/src/schemas/kinesis.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/parser/src/schemas/kinesis.ts b/packages/parser/src/schemas/kinesis.ts index e4bb3edee6..37e0427a21 100644 --- a/packages/parser/src/schemas/kinesis.ts +++ b/packages/parser/src/schemas/kinesis.ts @@ -1,5 +1,5 @@ -import { Base64Encoded } from 'src/helpers/index.js'; import { z } from 'zod'; +import { Base64Encoded } from '../helpers/index.js'; import type { KinesisDataStreamEvent } from '../types/schema.js'; import { DynamoDBStreamToKinesisRecord } from './dynamodb.js';