From 50444dd590182e27b9fdb778659a197005555158 Mon Sep 17 00:00:00 2001 From: svozza Date: Mon, 29 Sep 2025 18:39:27 +0100 Subject: [PATCH 1/2] feat(event-handler): add streaming functionality --- package-lock.json | 9 +- packages/event-handler/package.json | 3 +- packages/event-handler/src/rest/Router.ts | 104 ++++-- packages/event-handler/src/rest/converters.ts | 147 +++++++-- packages/event-handler/src/rest/index.ts | 2 +- packages/event-handler/src/rest/utils.ts | 62 +++- packages/event-handler/src/types/common.ts | 8 +- packages/event-handler/src/types/rest.ts | 24 +- .../tests/unit/rest/Router/decorators.test.ts | 135 ++++++-- .../tests/unit/rest/Router/middleware.test.ts | 56 ++++ .../tests/unit/rest/Router/streaming.test.ts | 309 ++++++++++++++++++ .../tests/unit/rest/converters.test.ts | 133 +++++++- .../event-handler/tests/unit/rest/helpers.ts | 43 ++- .../tests/unit/rest/utils.test.ts | 45 ++- 14 files changed, 978 insertions(+), 102 deletions(-) create mode 100644 packages/event-handler/tests/unit/rest/Router/streaming.test.ts diff --git a/package-lock.json b/package-lock.json index e2c21cb6b5..2baf438acc 100644 --- a/package-lock.json +++ b/package-lock.json @@ -7986,6 +7986,12 @@ "node": ">= 12" } }, + "node_modules/lambda-stream": { + "version": "0.6.0", + "resolved": "https://registry.npmjs.org/lambda-stream/-/lambda-stream-0.6.0.tgz", + "integrity": "sha512-S625+Jdil56zFOtfBt5r0snP8SIBY7fXn7pLKXQ8C6u9A5kNSuEWcEVC05QoDkvBLjMhwBeIUwVU6PoOB7ll5w==", + "license": "MIT" + }, "node_modules/layers": { "resolved": "layers", "link": true @@ -10686,7 +10692,8 @@ "version": "2.27.0", "license": "MIT-0", "dependencies": { - "@aws-lambda-powertools/commons": "2.27.0" + "@aws-lambda-powertools/commons": "2.27.0", + "lambda-stream": "0.6.0" } }, "packages/idempotency": { diff --git a/packages/event-handler/package.json b/packages/event-handler/package.json index 8b6e8bcc6d..9c225e3736 100644 --- a/packages/event-handler/package.json +++ b/packages/event-handler/package.json @@ -125,7 +125,8 @@ "url": "https://github.com/aws-powertools/powertools-lambda-typescript/issues" }, "dependencies": { - "@aws-lambda-powertools/commons": "2.27.0" + "@aws-lambda-powertools/commons": "2.27.0", + "lambda-stream": "0.6.0" }, "keywords": [ "aws", diff --git a/packages/event-handler/src/rest/Router.ts b/packages/event-handler/src/rest/Router.ts index 8ce3f5e191..04c679e539 100644 --- a/packages/event-handler/src/rest/Router.ts +++ b/packages/event-handler/src/rest/Router.ts @@ -1,14 +1,14 @@ +import { Readable } from 'node:stream'; +import { pipeline } from 'node:stream/promises'; +import type streamWeb from 'node:stream/web'; import type { GenericLogger } from '@aws-lambda-powertools/commons/types'; import { getStringFromEnv, isDevMode, } from '@aws-lambda-powertools/commons/utils/env'; import type { APIGatewayProxyResult, Context } from 'aws-lambda'; -import type { - HandlerResponse, - HttpStatusCode, - ResolveOptions, -} from '../types/index.js'; +import type { ResolveStreamOptions } from '../types/common.js'; +import type { HandlerResponse, ResolveOptions } from '../types/index.js'; import type { ErrorConstructor, ErrorHandler, @@ -17,6 +17,7 @@ import type { Middleware, Path, RequestContext, + ResponseStream, RestRouteOptions, RestRouterOptions, RouteHandler, @@ -26,6 +27,7 @@ import { handlerResultToProxyResult, handlerResultToWebResponse, proxyEventToWebRequest, + webHeadersToApiGatewayV1Headers, } from './converters.js'; import { ErrorHandlerRegistry } from './ErrorHandlerRegistry.js'; import { @@ -38,8 +40,9 @@ import { Route } from './Route.js'; import { RouteHandlerRegistry } from './RouteHandlerRegistry.js'; import { composeMiddleware, + HttpResponseStream, isAPIGatewayProxyEvent, - isAPIGatewayProxyResult, + isExtendedAPIGatewayProxyResult, isHttpMethod, resolvePrefixedPath, } from './utils.js'; @@ -187,21 +190,19 @@ class Router { } /** - * Resolves an API Gateway event by routing it to the appropriate handler - * and converting the result to an API Gateway proxy result. Handles errors - * using registered error handlers or falls back to default error handling - * (500 Internal Server Error). + * Core resolution logic shared by both resolve and resolveStream methods. + * Validates the event, routes to handlers, executes middleware, and handles errors. * * @param event - The Lambda event to resolve * @param context - The Lambda context * @param options - Optional resolve options for scope binding - * @returns An API Gateway proxy result + * @returns A handler response (Response, JSONObject, or ExtendedAPIGatewayProxyResult) */ - public async resolve( + async #resolve( event: unknown, context: Context, options?: ResolveOptions - ): Promise { + ): Promise { if (!isAPIGatewayProxyEvent(event)) { this.logger.error( 'Received an event that is not compatible with this resolver' @@ -276,18 +277,79 @@ class Router { }); // middleware result takes precedence to allow short-circuiting - const result = middlewareResult ?? requestContext.res; - - return handlerResultToProxyResult(result); + return middlewareResult ?? requestContext.res; } catch (error) { this.logger.debug(`There was an error processing the request: ${error}`); - const result = await this.handleError(error as Error, { + return this.handleError(error as Error, { ...requestContext, scope: options?.scope, }); - const statusCode = - result instanceof Response ? result.status : result.statusCode; - return handlerResultToProxyResult(result, statusCode as HttpStatusCode); + } + } + + /** + * Resolves an API Gateway event by routing it to the appropriate handler + * and converting the result to an API Gateway proxy result. Handles errors + * using registered error handlers or falls back to default error handling + * (500 Internal Server Error). + * + * @param event - The Lambda event to resolve + * @param context - The Lambda context + * @param options - Optional resolve options for scope binding + * @returns An API Gateway proxy result + */ + public async resolve( + event: unknown, + context: Context, + options?: ResolveOptions + ): Promise { + const result = await this.#resolve(event, context, options); + return handlerResultToProxyResult(result); + } + + /** + * Resolves an API Gateway event by routing it to the appropriate handler + * and streaming the response directly to the provided response stream. + * Used for Lambda response streaming. + * + * @param event - The Lambda event to resolve + * @param context - The Lambda context + * @param options - Stream resolve options including the response stream + */ + public async resolveStream( + event: unknown, + context: Context, + options: ResolveStreamOptions + ): Promise { + const result = await this.#resolve(event, context, options); + await this.#streamHandlerResponse(result, options.responseStream); + } + + /** + * Streams a handler response to the Lambda response stream. + * Converts the response to a web response and pipes it through the stream. + * + * @param response - The handler response to stream + * @param responseStream - The Lambda response stream to write to + */ + async #streamHandlerResponse( + response: HandlerResponse, + responseStream: ResponseStream + ) { + const webResponse = handlerResultToWebResponse(response); + const { headers } = webHeadersToApiGatewayV1Headers(webResponse.headers); + const resStream = HttpResponseStream.from(responseStream, { + statusCode: webResponse.status, + headers, + }); + + if (webResponse.body) { + const nodeStream = Readable.fromWeb( + webResponse.body as streamWeb.ReadableStream + ); + await pipeline(nodeStream, resStream); + } else { + resStream.write(''); } } @@ -320,7 +382,7 @@ class Router { try { const { scope, ...reqCtx } = options; const body = await handler.apply(scope ?? this, [error, reqCtx]); - if (body instanceof Response || isAPIGatewayProxyResult(body)) { + if (body instanceof Response || isExtendedAPIGatewayProxyResult(body)) { return body; } if (!body.statusCode) { diff --git a/packages/event-handler/src/rest/converters.ts b/packages/event-handler/src/rest/converters.ts index 56c4271d0b..73afeb4c02 100644 --- a/packages/event-handler/src/rest/converters.ts +++ b/packages/event-handler/src/rest/converters.ts @@ -1,11 +1,20 @@ +import { Readable } from 'node:stream'; +import type streamWeb from 'node:stream/web'; +import { isString } from '@aws-lambda-powertools/commons/typeutils'; import type { APIGatewayProxyEvent, APIGatewayProxyResult } from 'aws-lambda'; import type { CompressionOptions, + ExtendedAPIGatewayProxyResult, + ExtendedAPIGatewayProxyResultBody, HandlerResponse, HttpStatusCode, } from '../types/rest.js'; import { COMPRESSION_ENCODING_TYPES, HttpStatusCodes } from './constants.js'; -import { isAPIGatewayProxyResult } from './utils.js'; +import { + isExtendedAPIGatewayProxyResult, + isNodeReadableStream, + isWebReadableStream, +} from './utils.js'; /** * Creates a request body from API Gateway event body, handling base64 decoding if needed. @@ -72,18 +81,17 @@ const proxyEventToWebRequest = (event: APIGatewayProxyEvent): Request => { }; /** - * Converts a Web API Response object to an API Gateway proxy result. + * Converts Web API Headers to API Gateway v1 headers format. + * Splits multi-value headers by comma and organizes them into separate objects. * - * @param response - The Web API Response object - * @returns An API Gateway proxy result + * @param webHeaders - The Web API Headers object + * @returns Object containing headers and multiValueHeaders */ -const webResponseToProxyResult = async ( - response: Response -): Promise => { +const webHeadersToApiGatewayV1Headers = (webHeaders: Headers) => { const headers: Record = {}; const multiValueHeaders: Record> = {}; - for (const [key, value] of response.headers.entries()) { + for (const [key, value] of webHeaders.entries()) { const values = value.split(',').map((v) => v.trimStart()); if (values.length > 1) { multiValueHeaders[key] = values; @@ -92,6 +100,25 @@ const webResponseToProxyResult = async ( } } + return { + headers, + multiValueHeaders, + }; +}; + +/** + * Converts a Web API Response object to an API Gateway proxy result. + * + * @param response - The Web API Response object + * @returns An API Gateway proxy result + */ +const webResponseToProxyResult = async ( + response: Response +): Promise => { + const { headers, multiValueHeaders } = webHeadersToApiGatewayV1Headers( + response.headers + ); + // Check if response contains compressed/binary content const contentEncoding = response.headers.get( 'content-encoding' @@ -129,18 +156,47 @@ const webResponseToProxyResult = async ( return result; }; +/** + * Adds headers from an ExtendedAPIGatewayProxyResult to a Headers object. + * + * @param headers - The Headers object to mutate + * @param response - The response containing headers to add + * @remarks This function mutates the headers object by adding entries from + * response.headers and response.multiValueHeaders + */ +function addProxyEventHeaders( + headers: Headers, + response: ExtendedAPIGatewayProxyResult +) { + for (const [key, value] of Object.entries(response.headers ?? {})) { + if (value != null) { + headers.set(key, String(value)); + } + } + + for (const [key, values] of Object.entries( + response.multiValueHeaders ?? {} + )) { + for (const value of values ?? []) { + headers.append(key, String(value)); + } + } +} + /** * Converts a handler response to a Web API Response object. * Handles APIGatewayProxyResult, Response objects, and plain objects. * * @param response - The handler response (APIGatewayProxyResult, Response, or plain object) * @param resHeaders - Optional headers to be included in the response + * @returns A Web API Response object */ const handlerResultToWebResponse = ( response: HandlerResponse, resHeaders?: Headers ): Response => { if (response instanceof Response) { + if (resHeaders === undefined) return response; const headers = new Headers(resHeaders); for (const [key, value] of response.headers.entries()) { headers.set(key, value); @@ -154,22 +210,15 @@ const handlerResultToWebResponse = ( const headers = new Headers(resHeaders); headers.set('Content-Type', 'application/json'); - if (isAPIGatewayProxyResult(response)) { - for (const [key, value] of Object.entries(response.headers ?? {})) { - if (value != null) { - headers.set(key, String(value)); - } - } + if (isExtendedAPIGatewayProxyResult(response)) { + addProxyEventHeaders(headers, response); - for (const [key, values] of Object.entries( - response.multiValueHeaders ?? {} - )) { - for (const value of values ?? []) { - headers.append(key, String(value)); - } - } + const body = + response.body instanceof Readable + ? (Readable.toWeb(response.body) as ReadableStream) + : response.body; - return new Response(response.body, { + return new Response(body, { status: response.statusCode, headers, }); @@ -189,8 +238,24 @@ const handlerResultToProxyResult = async ( response: HandlerResponse, statusCode: HttpStatusCode = HttpStatusCodes.OK ): Promise => { - if (isAPIGatewayProxyResult(response)) { - return response; + if (isExtendedAPIGatewayProxyResult(response)) { + if (isString(response.body)) { + return { + ...response, + body: response.body, + }; + } + if ( + isNodeReadableStream(response.body) || + isWebReadableStream(response.body) + ) { + const nodeStream = bodyToNodeStream(response.body); + return { + ...response, + isBase64Encoded: true, + body: await nodeStreamToBase64(nodeStream), + }; + } } if (response instanceof Response) { return await webResponseToProxyResult(response); @@ -203,9 +268,43 @@ const handlerResultToProxyResult = async ( }; }; +/** + * Converts various body types to a Node.js Readable stream. + * Handles Node.js streams, web streams, and string bodies. + * + * @param body - The body to convert (Readable, ReadableStream, or string) + * @returns A Node.js Readable stream + */ +const bodyToNodeStream = (body: ExtendedAPIGatewayProxyResultBody) => { + if (isNodeReadableStream(body)) { + return body; + } + if (isWebReadableStream(body)) { + return Readable.fromWeb(body as streamWeb.ReadableStream); + } + return Readable.from(Buffer.from(body as string)); +}; + +/** + * Converts a Node.js Readable stream to a base64 encoded string. + * Handles both Buffer and string chunks by converting all to Buffers. + * + * @param stream - The Node.js Readable stream to convert + * @returns A Promise that resolves to a base64 encoded string + */ +async function nodeStreamToBase64(stream: Readable) { + const chunks: Buffer[] = []; + for await (const chunk of stream) { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); + } + return Buffer.concat(chunks).toString('base64'); +} + export { proxyEventToWebRequest, webResponseToProxyResult, handlerResultToWebResponse, handlerResultToProxyResult, + bodyToNodeStream, + webHeadersToApiGatewayV1Headers, }; diff --git a/packages/event-handler/src/rest/index.ts b/packages/event-handler/src/rest/index.ts index 2a4d605ea4..95df1e226e 100644 --- a/packages/event-handler/src/rest/index.ts +++ b/packages/event-handler/src/rest/index.ts @@ -23,6 +23,6 @@ export { Router } from './Router.js'; export { composeMiddleware, isAPIGatewayProxyEvent, - isAPIGatewayProxyResult, + isExtendedAPIGatewayProxyResult, isHttpMethod, } from './utils.js'; diff --git a/packages/event-handler/src/rest/utils.ts b/packages/event-handler/src/rest/utils.ts index ca41f0a5a7..16b9578f25 100644 --- a/packages/event-handler/src/rest/utils.ts +++ b/packages/event-handler/src/rest/utils.ts @@ -1,11 +1,14 @@ +import { Readable } from 'node:stream'; import { isRecord, isString } from '@aws-lambda-powertools/commons/typeutils'; -import type { APIGatewayProxyEvent, APIGatewayProxyResult } from 'aws-lambda'; +import type { APIGatewayProxyEvent } from 'aws-lambda'; import type { CompiledRoute, + ExtendedAPIGatewayProxyResult, HandlerResponse, HttpMethod, Middleware, Path, + ResponseStream, ValidationResult, } from '../types/rest.js'; import { @@ -92,6 +95,28 @@ export const isHttpMethod = (method: string): method is HttpMethod => { return Object.keys(HttpVerbs).includes(method); }; +export const isNodeReadableStream = (value: unknown): value is Readable => { + return ( + value != null && + typeof value === 'object' && + value instanceof Readable && + 'readable' in value && + 'read' in value && + typeof value.read === 'function' + ); +}; + +export const isWebReadableStream = ( + value: unknown +): value is ReadableStream => { + return ( + value != null && + typeof value === 'object' && + 'getReader' in value && + typeof (value as Record).getReader === 'function' + ); +}; + /** * Type guard to check if the provided result is an API Gateway Proxy result. * @@ -100,13 +125,15 @@ export const isHttpMethod = (method: string): method is HttpMethod => { * * @param result - The result to check */ -export const isAPIGatewayProxyResult = ( +export const isExtendedAPIGatewayProxyResult = ( result: unknown -): result is APIGatewayProxyResult => { +): result is ExtendedAPIGatewayProxyResult => { if (!isRecord(result)) return false; return ( typeof result.statusCode === 'number' && - isString(result.body) && + (isString(result.body) || + isNodeReadableStream(result.body) || + isWebReadableStream(result.body)) && (result.headers === undefined || isRecord(result.headers)) && (result.multiValueHeaders === undefined || isRecord(result.multiValueHeaders)) && @@ -209,3 +236,30 @@ export const resolvePrefixedPath = (path: Path, prefix?: Path): Path => { } return path; }; + +export const HttpResponseStream = + globalThis.awslambda?.HttpResponseStream ?? + class HttpResponseStream { + static from( + underlyingStream: ResponseStream, + prelude: Record + ) { + underlyingStream.setContentType( + "'application/vnd.awslambda.http-integration-response'" + ); + + // JSON.stringify is required. NULL byte is not allowed in metadataPrelude. + const metadataPrelude = JSON.stringify(prelude); + + underlyingStream._onBeforeFirstWrite = ( + write: (data: Uint8Array | string) => void + ) => { + write(metadataPrelude); + + // Write 8 null bytes after the JSON prelude. + write(new Uint8Array(8)); + }; + + return underlyingStream; + } + }; diff --git a/packages/event-handler/src/types/common.ts b/packages/event-handler/src/types/common.ts index 8efc28bf51..b7b739d7bc 100644 --- a/packages/event-handler/src/types/common.ts +++ b/packages/event-handler/src/types/common.ts @@ -1,5 +1,6 @@ import type { AppSyncEventsResolver } from '../appsync-events/AppSyncEventsResolver.js'; import type { AppSyncGraphQLResolver } from '../appsync-graphql/AppSyncGraphQLResolver.js'; +import type { ResponseStream } from './rest.js'; // biome-ignore lint/suspicious/noExplicitAny: We intentionally use `any` here to represent any type of data and keep the logger is as flexible as possible. type Anything = any; @@ -42,4 +43,9 @@ type ResolveOptions = { scope?: unknown; }; -export type { Anything, ResolveOptions }; +type ResolveStreamOptions = { + scope?: unknown; + responseStream: ResponseStream; +}; + +export type { Anything, ResolveOptions, ResolveStreamOptions }; diff --git a/packages/event-handler/src/types/rest.ts b/packages/event-handler/src/types/rest.ts index b7a0d2e21e..5f01bcff32 100644 --- a/packages/event-handler/src/types/rest.ts +++ b/packages/event-handler/src/types/rest.ts @@ -1,11 +1,16 @@ +import type { Readable } from 'node:stream'; import type { GenericLogger, JSONObject, } from '@aws-lambda-powertools/commons/types'; -import type { APIGatewayProxyEvent, Context } from 'aws-lambda'; +import type { + APIGatewayProxyEvent, + APIGatewayProxyResult, + Context, +} from 'aws-lambda'; +import type { ResponseStream as LambdaResponseStream } from 'lambda-stream'; import type { HttpStatusCodes, HttpVerbs } from '../rest/constants.js'; import type { Route } from '../rest/Route.js'; -import type { Router } from '../rest/Router.js'; import type { ResolveOptions } from './common.js'; type RequestContext = { @@ -54,7 +59,17 @@ interface CompiledRoute { type DynamicRoute = Route & CompiledRoute; -type HandlerResponse = Response | JSONObject; +type ExtendedAPIGatewayProxyResultBody = string | Readable | ReadableStream; + +type ExtendedAPIGatewayProxyResult = Omit & { + body: ExtendedAPIGatewayProxyResultBody; +}; + +type ResponseStream = LambdaResponseStream & { + _onBeforeFirstWrite?: (write: (data: Uint8Array | string) => void) => void; +}; + +type HandlerResponse = Response | JSONObject | ExtendedAPIGatewayProxyResult; type RouteHandler = ( reqCtx: RequestContext @@ -159,6 +174,8 @@ type CompressionOptions = { }; export type { + ExtendedAPIGatewayProxyResult, + ExtendedAPIGatewayProxyResultBody, CompiledRoute, CorsOptions, DynamicRoute, @@ -173,6 +190,7 @@ export type { Path, RequestContext, RestRouterOptions, + ResponseStream, RouteHandler, RestRouteOptions, RestRouteHandlerOptions, diff --git a/packages/event-handler/tests/unit/rest/Router/decorators.test.ts b/packages/event-handler/tests/unit/rest/Router/decorators.test.ts index 98c0ab7c5b..c08914b55d 100644 --- a/packages/event-handler/tests/unit/rest/Router/decorators.test.ts +++ b/packages/event-handler/tests/unit/rest/Router/decorators.test.ts @@ -7,9 +7,27 @@ import { MethodNotAllowedError, type NotFoundError, Router, + UnauthorizedError, } from '../../../../src/rest/index.js'; import type { RequestContext } from '../../../../src/types/rest.js'; -import { createTestEvent, createTrackingMiddleware } from '../helpers.js'; +import { + createTestEvent, + createTrackingMiddleware, + MockResponseStream, + parseStreamOutput, +} from '../helpers.js'; + +const createHandler = (app: Router) => (event: unknown, _context: Context) => + app.resolve(event, _context); + +const createHandlerWithScope = + (app: Router, scope: unknown) => (event: unknown, _context: Context) => + app.resolve(event, _context, { scope }); + +const createStreamHandler = + (app: Router, scope: unknown) => + (event: unknown, _context: Context, responseStream: MockResponseStream) => + app.resolveStream(event, _context, { scope, responseStream }); describe('Class: Router - Decorators', () => { describe('decorators', () => { @@ -51,9 +69,7 @@ describe('Class: Router - Decorators', () => { return { result: 'options-test' }; } - public handler(event: unknown, _context: Context) { - return app.resolve(event, _context); - } + public handler = createHandler(app); } it.each([ @@ -101,9 +117,7 @@ describe('Class: Router - Decorators', () => { return { result: `${this.scope}: decorator-with-middleware` }; } - public handler(event: unknown, _context: Context) { - return app.resolve(event, _context, { scope: this }); - } + public handler = createHandlerWithScope(app, this); } const lambda = new Lambda(); @@ -183,9 +197,7 @@ describe('Class: Router - Decorators', () => { return { result: 'options-decorator-middleware' }; } - public handler(event: unknown, _context: Context) { - return app.resolve(event, _context); - } + public handler = createHandler(app); } const lambda = new Lambda(); @@ -231,9 +243,7 @@ describe('Class: Router - Decorators', () => { throw new BadRequestError('test error'); } - public handler(event: unknown, _context: Context) { - return app.resolve(event, _context); - } + public handler = createHandler(app); } const lambda = new Lambda(); @@ -273,9 +283,7 @@ describe('Class: Router - Decorators', () => { }; } - public handler(event: unknown, _context: Context) { - return app.resolve(event, _context, { scope: this }); - } + public handler = createHandlerWithScope(app, this); } const lambda = new Lambda(); @@ -319,9 +327,7 @@ describe('Class: Router - Decorators', () => { throw new MethodNotAllowedError('POST not allowed'); } - public handler(event: unknown, _context: Context) { - return app.resolve(event, _context); - } + public handler = createHandler(app); } const lambda = new Lambda(); @@ -366,9 +372,7 @@ describe('Class: Router - Decorators', () => { throw new BadRequestError('test error'); } - public handler(event: unknown, _context: Context) { - return app.resolve(event, _context, { scope: this }); - } + public handler = createHandlerWithScope(app, this); } const lambda = new Lambda(); @@ -407,9 +411,7 @@ describe('Class: Router - Decorators', () => { }; } - public handler(event: unknown, _context: Context) { - return app.resolve(event, _context); - } + public handler = createHandler(app); } const lambda = new Lambda(); @@ -450,9 +452,7 @@ describe('Class: Router - Decorators', () => { throw new BadRequestError('test error'); } - public handler(event: unknown, _context: Context) { - return app.resolve(event, _context); - } + public handler = createHandler(app); } const lambda = new Lambda(); @@ -481,9 +481,7 @@ describe('Class: Router - Decorators', () => { }; } - public handler(event: unknown, _context: Context) { - return app.resolve(event, _context, { scope: this }); - } + public handler = createHandlerWithScope(app, this); } const lambda = new Lambda(); @@ -503,4 +501,79 @@ describe('Class: Router - Decorators', () => { }); }); }); + + describe('streaming with decorators', () => { + it('preserves scope when using resolveStream with decorators', async () => { + // Prepare + const app = new Router(); + + class Lambda { + public scope = 'streaming-scope'; + + @app.get('/test') + public getTest() { + return { + message: `${this.scope}: streaming success`, + }; + } + + public handler = createStreamHandler(app, this); + } + + const lambda = new Lambda(); + const responseStream = new MockResponseStream(); + const handler = lambda.handler.bind(lambda); + + // Act + await handler(createTestEvent('/test', 'GET'), context, responseStream); + + // Assess + const { prelude, body } = parseStreamOutput(responseStream.chunks); + expect(prelude.statusCode).toBe(200); + expect(JSON.parse(body)).toEqual({ + message: 'streaming-scope: streaming success', + }); + }); + + it('preserves scope when handler throws error in streaming', async () => { + // Prepare + const app = new Router(); + + class Lambda { + public scope = 'error-scope'; + + @app.errorHandler(UnauthorizedError) + public handleUnauthorized(error: UnauthorizedError) { + return { + statusCode: HttpStatusCodes.UNAUTHORIZED, + error: 'Unauthorized', + message: `${this.scope}: ${error.message}`, + }; + } + + @app.get('/test') + public getTest() { + throw new UnauthorizedError('UnauthorizedError!'); + } + + public handler = createStreamHandler(app, this); + } + + const lambda = new Lambda(); + const responseStream = new MockResponseStream(); + const handler = lambda.handler.bind(lambda); + + // Act + await handler(createTestEvent('/test', 'GET'), context, responseStream); + + // Assess + const { prelude, body } = parseStreamOutput(responseStream.chunks); + expect(prelude.statusCode).toBe(401); + expect(JSON.parse(body)).toEqual({ + statusCode: HttpStatusCodes.UNAUTHORIZED, + error: 'Unauthorized', + message: 'error-scope: UnauthorizedError!', + }); + }); + }); }); diff --git a/packages/event-handler/tests/unit/rest/Router/middleware.test.ts b/packages/event-handler/tests/unit/rest/Router/middleware.test.ts index 93b6abc230..624ed246f7 100644 --- a/packages/event-handler/tests/unit/rest/Router/middleware.test.ts +++ b/packages/event-handler/tests/unit/rest/Router/middleware.test.ts @@ -1,3 +1,4 @@ +import { Readable } from 'node:stream'; import context from '@aws-lambda-powertools/testing-utils/context'; import type { Context } from 'aws-lambda'; import { describe, expect, it, vi } from 'vitest'; @@ -192,6 +193,7 @@ describe('Class: Router - Middleware', () => { app.use(({ next }) => { next(); + return Promise.resolve(); }); // Act @@ -572,6 +574,60 @@ describe('Class: Router - Middleware', () => { isBase64Encoded: false, }); }); + + it('handles middleware returning ExtendedAPIGatewayProxyResult with node stream body', async () => { + // Prepare + const app = new Router(); + const testData = 'middleware stream data'; + + app.use(async () => ({ + statusCode: 200, + body: Readable.from(Buffer.from(testData)), + })); + + app.get('/test', () => ({ success: true })); + + // Act + const result = await app.resolve( + createTestEvent('/test', 'GET'), + context + ); + + // Assess + expect(result.statusCode).toBe(200); + expect(result.isBase64Encoded).toBe(true); + expect(result.body).toBe(Buffer.from(testData).toString('base64')); + }); + + it('handles middleware returning ExtendedAPIGatewayProxyResult with web stream body', async () => { + // Prepare + const app = new Router(); + const testData = 'middleware web stream data'; + const webStream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode(testData)); + controller.close(); + }, + }); + + app.use(async () => ({ + statusCode: 200, + body: webStream, + })); + + app.get('/test', () => ({ success: true })); + + // Act + const result = await app.resolve( + createTestEvent('/test', 'GET'), + context + ); + + // Assess + expect(result.statusCode).toBe(200); + expect(result.isBase64Encoded).toBe(true); + expect(result.body).toBe(Buffer.from(testData).toString('base64')); + }); }); describe('middleware - route specific', () => { diff --git a/packages/event-handler/tests/unit/rest/Router/streaming.test.ts b/packages/event-handler/tests/unit/rest/Router/streaming.test.ts new file mode 100644 index 0000000000..16a172b4df --- /dev/null +++ b/packages/event-handler/tests/unit/rest/Router/streaming.test.ts @@ -0,0 +1,309 @@ +import { Readable } from 'node:stream'; +import context from '@aws-lambda-powertools/testing-utils/context'; +import { describe, expect, it, vi } from 'vitest'; +import { UnauthorizedError } from '../../../../src/rest/errors.js'; +import { Router } from '../../../../src/rest/index.js'; +import { + createTestEvent, + MockResponseStream, + parseStreamOutput, +} from '../helpers.js'; + +describe('Class: Router - Streaming', () => { + it('streams a simple JSON response', async () => { + // Prepare + const app = new Router(); + app.get('/test', async () => ({ message: 'Hello, World!' })); + + // Create a mock ResponseStream + const responseStream = new MockResponseStream(); + + // Act + await app.resolveStream(createTestEvent('/test', 'GET'), context, { + responseStream, + }); + + // Assess + const { prelude, body } = parseStreamOutput(responseStream.chunks); + expect(prelude.statusCode).toBe(200); + expect(JSON.parse(body)).toEqual({ message: 'Hello, World!' }); + }); + + it('streams a Response object', async () => { + // Prepare + const app = new Router(); + app.get('/test', () => { + return new Response(JSON.stringify({ data: 'test' }), { + status: 201, + headers: { 'Content-Type': 'application/json' }, + }); + }); + + // Create a mock ResponseStream + const responseStream = new MockResponseStream(); + + // Act + await app.resolveStream(createTestEvent('/test', 'GET'), context, { + responseStream, + }); + + // Assess + const { prelude, body } = parseStreamOutput(responseStream.chunks); + expect(prelude.statusCode).toBe(201); + expect(JSON.parse(body)).toEqual({ data: 'test' }); + }); + + it('handles route not found', async () => { + // Prepare + const app = new Router(); + const responseStream = new MockResponseStream(); + + // Act + await app.resolveStream(createTestEvent('/nonexistent', 'GET'), context, { + responseStream, + }); + + // Assess + const { prelude, body } = parseStreamOutput(responseStream.chunks); + expect(prelude.statusCode).toBe(404); + const parsedBody = JSON.parse(body); + expect(parsedBody.statusCode).toBe(404); + }); + + it('works with middleware', async () => { + // Prepare + const app = new Router(); + + // Add middleware that modifies the response + app.use(async ({ reqCtx, next }) => { + await next(); + reqCtx.res.headers.set('X-Custom-Header', 'test-value'); + }); + + app.get('/test', () => ({ message: 'middleware test' })); + + const responseStream = new MockResponseStream(); + + // Act + await app.resolveStream(createTestEvent('/test', 'GET'), context, { + responseStream, + }); + + // Assess + const { prelude, body } = parseStreamOutput(responseStream.chunks); + expect(prelude.statusCode).toBe(200); + expect(prelude.headers['x-custom-header']).toBe('test-value'); + expect(JSON.parse(body)).toEqual({ message: 'middleware test' }); + }); + + it('handles thrown errors', async () => { + // Prepare + const app = new Router(); + app.get('/test', () => { + throw new UnauthorizedError('Access denied'); + }); + + const responseStream = new MockResponseStream(); + + // Act + await app.resolveStream(createTestEvent('/test', 'GET'), context, { + responseStream, + }); + + // Assess + const { prelude, body } = parseStreamOutput(responseStream.chunks); + expect(prelude.statusCode).toBe(401); + const parsedBody = JSON.parse(body); + expect(parsedBody.message).toBe('Access denied'); + }); + + it('works with error handlers', async () => { + // Prepare + const app = new Router(); + + app.errorHandler(UnauthorizedError, async (error) => ({ + statusCode: 401, + message: `Custom: ${error.message}`, + })); + + app.get('/test', () => { + throw new UnauthorizedError('handler error'); + }); + + const responseStream = new MockResponseStream(); + + // Act + await app.resolveStream(createTestEvent('/test', 'GET'), context, { + responseStream, + }); + + // Assess + const { prelude, body } = parseStreamOutput(responseStream.chunks); + expect(prelude.statusCode).toBe(401); + expect(JSON.parse(body)).toEqual({ + statusCode: 401, + message: 'Custom: handler error', + }); + }); + + it.each([ + [ + 'string body', + () => ({ statusCode: 200, body: '{"message":"string body"}' }), + ], + [ + 'node stream body', + () => ({ + statusCode: 200, + body: Readable.from(Buffer.from('{"message":"node stream body"}')), + }), + ], + [ + 'web stream body', + () => ({ + statusCode: 200, + body: new ReadableStream({ + start(controller) { + controller.enqueue( + new TextEncoder().encode('{"message":"web stream body"}') + ); + controller.close(); + }, + }), + }), + ], + ])('handles ExtendedAPIGatewayProxyResult with %s', async (_, handlerFn) => { + // Prepare + const app = new Router(); + app.get('/test', handlerFn); + const responseStream = new MockResponseStream(); + + // Act + await app.resolveStream(createTestEvent('/test', 'GET'), context, { + responseStream, + }); + + // Assess + const { prelude, body } = parseStreamOutput(responseStream.chunks); + expect(prelude.statusCode).toBe(200); + expect(JSON.parse(body).message).toMatch(/body$/); + }); + + it('handles Response with no body', async () => { + // Prepare + const app = new Router(); + app.get('/test', () => new Response(null, { status: 204 })); + const responseStream = new MockResponseStream(); + + // Act + await app.resolveStream(createTestEvent('/test', 'GET'), context, { + responseStream, + }); + + // Assess + const { prelude, body } = parseStreamOutput(responseStream.chunks); + expect(prelude.statusCode).toBe(204); + expect(body).toBe(''); + }); + + it('handles Response with undefined body', async () => { + // Prepare + const app = new Router(); + app.get('/test', () => new Response(undefined, { status: 200 })); + const responseStream = new MockResponseStream(); + + // Act + await app.resolveStream(createTestEvent('/test', 'GET'), context, { + responseStream, + }); + + // Assess + const { prelude, body } = parseStreamOutput(responseStream.chunks); + expect(prelude.statusCode).toBe(200); + expect(body).toBe(''); + }); + + it('handles pipeline errors during streaming', async () => { + // Prepare + const app = new Router(); + const errorStream = new ReadableStream({ + start(controller) { + controller.error(new Error('Stream error')); + }, + }); + + app.get('/test', () => new Response(errorStream, { status: 200 })); + const responseStream = new MockResponseStream(); + + // Act & Assess + await expect( + app.resolveStream(createTestEvent('/test', 'GET'), context, { + responseStream, + }) + ).rejects.toThrow('Stream error'); + }); + + it('extracts route parameters correctly', async () => { + // Prepare + const app = new Router(); + let capturedParams: Record = {}; + + app.get('/users/:userId/posts/:postId', ({ params }) => { + capturedParams = params; + return { userId: params.userId, postId: params.postId }; + }); + + const responseStream = new MockResponseStream(); + + // Act + await app.resolveStream( + createTestEvent('/users/123/posts/456', 'GET'), + context, + { responseStream } + ); + + // Assess + const { prelude, body } = parseStreamOutput(responseStream.chunks); + expect(prelude.statusCode).toBe(200); + expect(capturedParams).toEqual({ userId: '123', postId: '456' }); + expect(JSON.parse(body)).toEqual({ userId: '123', postId: '456' }); + }); + + it('uses default error handler for unregistered errors', async () => { + // Prepare + vi.stubEnv('POWERTOOLS_DEV', 'true'); + const app = new Router(); + app.get('/test', () => { + throw new Error('Unhandled error'); + }); + + const responseStream = new MockResponseStream(); + + // Act + await app.resolveStream(createTestEvent('/test', 'GET'), context, { + responseStream, + }); + + // Assess + const { prelude, body } = parseStreamOutput(responseStream.chunks); + expect(prelude.statusCode).toBe(500); + const parsedBody = JSON.parse(body); + expect(parsedBody.statusCode).toBe(500); + expect(parsedBody.error).toBe('Internal Server Error'); + expect(parsedBody.message).toBe('Unhandled error'); + expect(parsedBody.stack).toBeDefined(); + expect(parsedBody.details).toEqual({ errorName: 'Error' }); + }); + + it('throws InternalServerError for invalid events', async () => { + // Prepare + const app = new Router(); + const invalidEvent = { invalid: 'event' }; + const responseStream = new MockResponseStream(); + + // Act & Assess + await expect( + app.resolveStream(invalidEvent, context, { responseStream }) + ).rejects.toThrow(); + }); +}); diff --git a/packages/event-handler/tests/unit/rest/converters.test.ts b/packages/event-handler/tests/unit/rest/converters.test.ts index e6f5a9a893..41be5e7d69 100644 --- a/packages/event-handler/tests/unit/rest/converters.test.ts +++ b/packages/event-handler/tests/unit/rest/converters.test.ts @@ -1,4 +1,6 @@ +import { Readable } from 'node:stream'; import { describe, expect, it } from 'vitest'; +import { bodyToNodeStream } from '../../../src/rest/converters.js'; import { handlerResultToProxyResult, handlerResultToWebResponse, @@ -458,7 +460,7 @@ describe('Converters', () => { }); describe('handlerResultToProxyResult', () => { - it('returns APIGatewayProxyResult as-is', async () => { + it('returns ExtendedAPIGatewayProxyResult with string body as-is', async () => { // Prepare const proxyResult = { statusCode: 200, @@ -471,7 +473,88 @@ describe('Converters', () => { const result = await handlerResultToProxyResult(proxyResult); // Assess - expect(result).toBe(proxyResult); + expect(result).toEqual({ + statusCode: 200, + body: 'test', + headers: { 'content-type': 'text/plain' }, + isBase64Encoded: false, + }); + }); + + it('converts ExtendedAPIGatewayProxyResult with Node.js Buffer stream body to base64', async () => { + // Prepare + const stream = Readable.from([ + Buffer.from('Hello'), + Buffer.from(' '), + Buffer.from('World'), + ]); + const proxyResult = { + statusCode: 200, + body: stream, + headers: { 'content-type': 'application/octet-stream' }, + isBase64Encoded: false, + }; + + // Act + const result = await handlerResultToProxyResult(proxyResult); + + // Assess + expect(result.statusCode).toBe(200); + expect(result.isBase64Encoded).toBe(true); + expect(result.body).toBe(Buffer.from('Hello World').toString('base64')); + expect(result.headers).toEqual({ + 'content-type': 'application/octet-stream', + }); + }); + + it('converts ExtendedAPIGatewayProxyResult with Node.js string stream body to base64', async () => { + // Prepare + const stream = Readable.from(['Hello', ' ', 'World']); + const proxyResult = { + statusCode: 200, + body: stream, + headers: { 'content-type': 'application/octet-stream' }, + isBase64Encoded: false, + }; + + // Act + const result = await handlerResultToProxyResult(proxyResult); + + // Assess + expect(result.statusCode).toBe(200); + expect(result.isBase64Encoded).toBe(true); + expect(result.body).toBe(Buffer.from('Hello World').toString('base64')); + expect(result.headers).toEqual({ + 'content-type': 'application/octet-stream', + }); + }); + + it('converts ExtendedAPIGatewayProxyResult with web stream body to base64', async () => { + // Prepare + const webStream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('Hello')); + controller.enqueue(new TextEncoder().encode(' World')); + controller.close(); + }, + }); + const proxyResult = { + statusCode: 200, + body: webStream, + headers: { 'content-type': 'application/octet-stream' }, + isBase64Encoded: false, + }; + + // Act + const result = await handlerResultToProxyResult(proxyResult); + + // Assess + expect(result.statusCode).toBe(200); + expect(result.isBase64Encoded).toBe(true); + expect(result.body).toBe(Buffer.from('Hello World').toString('base64')); + expect(result.headers).toEqual({ + 'content-type': 'application/octet-stream', + }); }); it('converts Response object', async () => { @@ -642,4 +725,50 @@ describe('Converters', () => { expect(result.text()).resolves.toBe('Hello'); }); }); + + describe('bodyToNodeStream', () => { + it('returns Node.js Readable stream as-is', () => { + // Prepare + const nodeStream = Readable.from(['Hello World']); + + // Act + const result = bodyToNodeStream(nodeStream); + + // Assess + expect(result).toBe(nodeStream); + }); + + it('converts Web ReadableStream to Node.js Readable stream', () => { + // Prepare + const webStream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('Hello World')); + controller.close(); + }, + }); + + // Act + const result = bodyToNodeStream(webStream); + + // Assess + expect(result).toBeInstanceOf(Readable); + }); + + it('converts string body to Node.js Readable stream', async () => { + // Prepare + const stringBody = 'Hello World'; + + // Act + const stream = bodyToNodeStream(stringBody); + + // Assess + expect(stream).toBeInstanceOf(Readable); + const chunks: Buffer[] = []; + for await (const chunk of stream) { + chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk)); + } + const result = Buffer.concat(chunks).toString(); + expect(result).toBe('Hello World'); + }); + }); }); diff --git a/packages/event-handler/tests/unit/rest/helpers.ts b/packages/event-handler/tests/unit/rest/helpers.ts index aeb4e51c2d..5ec94eb621 100644 --- a/packages/event-handler/tests/unit/rest/helpers.ts +++ b/packages/event-handler/tests/unit/rest/helpers.ts @@ -1,4 +1,5 @@ import type { APIGatewayProxyEvent } from 'aws-lambda'; +import { ResponseStream } from 'lambda-stream'; import type { HandlerResponse, Middleware } from '../../../src/types/rest.js'; export const createTestEvent = ( @@ -53,7 +54,7 @@ export const createReturningMiddleware = ( ): Middleware => { return () => { executionOrder.push(name); - return response; + return Promise.resolve(response); }; }; @@ -63,6 +64,7 @@ export const createNoNextMiddleware = ( ): Middleware => { return () => { executionOrder.push(name); + return Promise.resolve(); // Intentionally doesn't call next() }; }; @@ -88,3 +90,42 @@ export const createHeaderCheckMiddleware = (headers: { await next(); }; }; + +// Mock ResponseStream that extends the actual ResponseStream class +export class MockResponseStream extends ResponseStream { + public chunks: Buffer[] = []; + public _onBeforeFirstWrite?: ( + write: (data: Uint8Array | string) => void + ) => void; + private firstWrite = true; + + _write(chunk: Buffer, _encoding: string, callback: () => void): void { + if (this.firstWrite && this._onBeforeFirstWrite) { + this._onBeforeFirstWrite((data: Uint8Array | string) => { + this.chunks.push(Buffer.from(data)); + }); + this.firstWrite = false; + } + this.chunks.push(chunk); + callback(); + } +} + +// Helper to parse streaming response format +export function parseStreamOutput(chunks: Buffer[]) { + const output = Buffer.concat(chunks); + const nullBytes = Buffer.from([0, 0, 0, 0, 0, 0, 0, 0]); + const separatorIndex = output.indexOf(nullBytes); + + if (separatorIndex === -1) { + return { prelude: null, body: output.toString() }; + } + + const preludeBuffer = output.subarray(0, separatorIndex); + const bodyBuffer = output.subarray(separatorIndex + 8); + + return { + prelude: JSON.parse(preludeBuffer.toString()), + body: bodyBuffer.toString(), + }; +} diff --git a/packages/event-handler/tests/unit/rest/utils.test.ts b/packages/event-handler/tests/unit/rest/utils.test.ts index 927fae01eb..478b7225bb 100644 --- a/packages/event-handler/tests/unit/rest/utils.test.ts +++ b/packages/event-handler/tests/unit/rest/utils.test.ts @@ -3,11 +3,11 @@ import type { APIGatewayProxyResult, Context, } from 'aws-lambda'; -import { describe, expect, it } from 'vitest'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; import { composeMiddleware, isAPIGatewayProxyEvent, - isAPIGatewayProxyResult, + isExtendedAPIGatewayProxyResult, } from '../../../src/rest/index.js'; import { compilePath, @@ -21,6 +21,9 @@ import type { } from '../../../src/types/rest.js'; describe('Path Utilities', () => { + beforeEach(() => { + vi.unstubAllGlobals(); + }); describe('validatePathPattern', () => { it.each([ { path: '/users/:id', expected: true, issues: [] }, @@ -390,7 +393,7 @@ describe('Path Utilities', () => { body: 'Hello World', }; - expect(isAPIGatewayProxyResult(validResult)).toBe(true); + expect(isExtendedAPIGatewayProxyResult(validResult)).toBe(true); }); it('should return true for valid result with all optional fields', () => { @@ -402,7 +405,7 @@ describe('Path Utilities', () => { isBase64Encoded: false, }; - expect(isAPIGatewayProxyResult(validResult)).toBe(true); + expect(isExtendedAPIGatewayProxyResult(validResult)).toBe(true); }); it.each([ @@ -412,7 +415,7 @@ describe('Path Utilities', () => { { case: 'number', result: 123 }, { case: 'array', result: [] }, ])('should return false for $case', ({ result }) => { - expect(isAPIGatewayProxyResult(result)).toBe(false); + expect(isExtendedAPIGatewayProxyResult(result)).toBe(false); }); it.each([ @@ -432,7 +435,7 @@ describe('Path Utilities', () => { }; const invalidResult = { ...baseResult, [field]: value }; - expect(isAPIGatewayProxyResult(invalidResult)).toBe(false); + expect(isExtendedAPIGatewayProxyResult(invalidResult)).toBe(false); } ); @@ -442,7 +445,7 @@ describe('Path Utilities', () => { // missing body }; - expect(isAPIGatewayProxyResult(incompleteResult)).toBe(false); + expect(isExtendedAPIGatewayProxyResult(incompleteResult)).toBe(false); }); }); @@ -475,6 +478,7 @@ describe('Path Utilities', () => { reqCtx: mockOptions, next: () => { executionOrder.push('handler'); + return Promise.resolve(); }, }); @@ -493,7 +497,7 @@ describe('Path Utilities', () => { await next(); }, () => { - return { shortCircuit: true }; + return Promise.resolve({ shortCircuit: true }); }, ]; @@ -501,7 +505,7 @@ describe('Path Utilities', () => { const result = await composed({ reqCtx: mockOptions, next: () => { - return { handler: true }; + return Promise.resolve({ handler: true }); }, }); @@ -519,7 +523,7 @@ describe('Path Utilities', () => { const result = await composed({ reqCtx: mockOptions, next: () => { - return { handler: true }; + return Promise.resolve({ handler: true }); }, }); @@ -546,7 +550,7 @@ describe('Path Utilities', () => { const result = await composed({ reqCtx: mockOptions, next: () => { - return { handler: true }; + return Promise.resolve({ handler: true }); }, }); @@ -564,7 +568,7 @@ describe('Path Utilities', () => { const result = await composed({ reqCtx: mockOptions, next: () => { - return undefined; + return Promise.resolve(undefined); }, }); @@ -585,4 +589,21 @@ describe('Path Utilities', () => { expect(resolvedPath).toBe(expected); }); }); + + describe('HttpResponseStream', () => { + it('uses globalThis.awslambda.HttpResponseStream when available', async () => { + // Prepare + const mockHttpResponseStream = { from: vi.fn() }; + vi.stubGlobal('awslambda', { + HttpResponseStream: mockHttpResponseStream, + }); + + // Clear module cache and dynamically import + vi.resetModules(); + const { HttpResponseStream } = await import('../../../src/rest/utils.js'); + + // Assert + expect(HttpResponseStream).toBe(mockHttpResponseStream); + }); + }); }); From df92e5d6d13917ced105dc47c94385388e3470fa Mon Sep 17 00:00:00 2001 From: svozza Date: Wed, 1 Oct 2025 13:36:02 +0100 Subject: [PATCH 2/2] address PR comments --- package-lock.json | 9 +--- packages/event-handler/package.json | 3 +- packages/event-handler/src/rest/Router.ts | 2 +- packages/event-handler/src/rest/utils.ts | 10 +++- packages/event-handler/src/types/common.ts | 11 +--- packages/event-handler/src/types/rest.ts | 54 ++++++++++++++++--- .../tests/unit/rest/Router/middleware.test.ts | 4 +- .../event-handler/tests/unit/rest/helpers.ts | 10 ++-- 8 files changed, 67 insertions(+), 36 deletions(-) diff --git a/package-lock.json b/package-lock.json index 2baf438acc..e2c21cb6b5 100644 --- a/package-lock.json +++ b/package-lock.json @@ -7986,12 +7986,6 @@ "node": ">= 12" } }, - "node_modules/lambda-stream": { - "version": "0.6.0", - "resolved": "https://registry.npmjs.org/lambda-stream/-/lambda-stream-0.6.0.tgz", - "integrity": "sha512-S625+Jdil56zFOtfBt5r0snP8SIBY7fXn7pLKXQ8C6u9A5kNSuEWcEVC05QoDkvBLjMhwBeIUwVU6PoOB7ll5w==", - "license": "MIT" - }, "node_modules/layers": { "resolved": "layers", "link": true @@ -10692,8 +10686,7 @@ "version": "2.27.0", "license": "MIT-0", "dependencies": { - "@aws-lambda-powertools/commons": "2.27.0", - "lambda-stream": "0.6.0" + "@aws-lambda-powertools/commons": "2.27.0" } }, "packages/idempotency": { diff --git a/packages/event-handler/package.json b/packages/event-handler/package.json index 9c225e3736..8b6e8bcc6d 100644 --- a/packages/event-handler/package.json +++ b/packages/event-handler/package.json @@ -125,8 +125,7 @@ "url": "https://github.com/aws-powertools/powertools-lambda-typescript/issues" }, "dependencies": { - "@aws-lambda-powertools/commons": "2.27.0", - "lambda-stream": "0.6.0" + "@aws-lambda-powertools/commons": "2.27.0" }, "keywords": [ "aws", diff --git a/packages/event-handler/src/rest/Router.ts b/packages/event-handler/src/rest/Router.ts index 04c679e539..6c975d4f33 100644 --- a/packages/event-handler/src/rest/Router.ts +++ b/packages/event-handler/src/rest/Router.ts @@ -7,7 +7,6 @@ import { isDevMode, } from '@aws-lambda-powertools/commons/utils/env'; import type { APIGatewayProxyResult, Context } from 'aws-lambda'; -import type { ResolveStreamOptions } from '../types/common.js'; import type { HandlerResponse, ResolveOptions } from '../types/index.js'; import type { ErrorConstructor, @@ -17,6 +16,7 @@ import type { Middleware, Path, RequestContext, + ResolveStreamOptions, ResponseStream, RestRouteOptions, RestRouterOptions, diff --git a/packages/event-handler/src/rest/utils.ts b/packages/event-handler/src/rest/utils.ts index 16b9578f25..b0943d3e4f 100644 --- a/packages/event-handler/src/rest/utils.ts +++ b/packages/event-handler/src/rest/utils.ts @@ -1,4 +1,4 @@ -import { Readable } from 'node:stream'; +import { Readable, Writable } from 'node:stream'; import { isRecord, isString } from '@aws-lambda-powertools/commons/typeutils'; import type { APIGatewayProxyEvent } from 'aws-lambda'; import type { @@ -239,7 +239,13 @@ export const resolvePrefixedPath = (path: Path, prefix?: Path): Path => { export const HttpResponseStream = globalThis.awslambda?.HttpResponseStream ?? - class HttpResponseStream { + class LocalHttpResponseStream extends Writable { + #contentType: string | undefined; + + setContentType(contentType: string) { + this.#contentType = contentType; + } + static from( underlyingStream: ResponseStream, prelude: Record diff --git a/packages/event-handler/src/types/common.ts b/packages/event-handler/src/types/common.ts index b7b739d7bc..58e675e41d 100644 --- a/packages/event-handler/src/types/common.ts +++ b/packages/event-handler/src/types/common.ts @@ -1,7 +1,3 @@ -import type { AppSyncEventsResolver } from '../appsync-events/AppSyncEventsResolver.js'; -import type { AppSyncGraphQLResolver } from '../appsync-graphql/AppSyncGraphQLResolver.js'; -import type { ResponseStream } from './rest.js'; - // biome-ignore lint/suspicious/noExplicitAny: We intentionally use `any` here to represent any type of data and keep the logger is as flexible as possible. type Anything = any; @@ -43,9 +39,4 @@ type ResolveOptions = { scope?: unknown; }; -type ResolveStreamOptions = { - scope?: unknown; - responseStream: ResponseStream; -}; - -export type { Anything, ResolveOptions, ResolveStreamOptions }; +export type { Anything, ResolveOptions }; diff --git a/packages/event-handler/src/types/rest.ts b/packages/event-handler/src/types/rest.ts index 5f01bcff32..ae7f9f8d5f 100644 --- a/packages/event-handler/src/types/rest.ts +++ b/packages/event-handler/src/types/rest.ts @@ -8,9 +8,9 @@ import type { APIGatewayProxyResult, Context, } from 'aws-lambda'; -import type { ResponseStream as LambdaResponseStream } from 'lambda-stream'; import type { HttpStatusCodes, HttpVerbs } from '../rest/constants.js'; import type { Route } from '../rest/Route.js'; +import type { HttpResponseStream } from '../rest/utils.js'; import type { ResolveOptions } from './common.js'; type RequestContext = { @@ -65,10 +65,6 @@ type ExtendedAPIGatewayProxyResult = Omit & { body: ExtendedAPIGatewayProxyResultBody; }; -type ResponseStream = LambdaResponseStream & { - _onBeforeFirstWrite?: (write: (data: Uint8Array | string) => void) => void; -}; - type HandlerResponse = Response | JSONObject | ExtendedAPIGatewayProxyResult; type RouteHandler = ( @@ -126,6 +122,51 @@ type ValidationResult = { issues: string[]; }; +type ResponseStream = InstanceType & { + _onBeforeFirstWrite?: (write: (data: Uint8Array | string) => void) => void; +}; + +/** + * Object to pass to the {@link Router.resolveStream | `Router.resolveStream()`} method. + */ +type ResolveStreamOptions = { + /** + * Reference to `this` instance of the class that is calling the `resolveStream` method. + * + * This parameter should be used only when using {@link Router} route decorators like + * {@link Router.get | `Router.get()`}, {@link Router.post | `Router.post()`}, etc. as class method decorators, and + * it's used to bind the decorated methods to your class instance. + * + * @example + * ```ts + * import { Router } from '@aws-lambda-powertools/event-handler/experimental-rest'; + * + * const app = new Router(); + * + * class Lambda { + * public scope = 'scoped'; + * + * @app.get('/test') + * public async getTest() { + * return { message: `${this.scope}: success` }; + * } + * + * public async handler(event: unknown, context: Context, responseStream: ResponseStream) { + * return app.resolveStream(event, context, { scope: this, responseStream }); + * } + * } + * const lambda = new Lambda(); + * const handler = lambda.handler.bind(lambda); + * ``` + */ + scope?: unknown; + /** + * The Lambda response stream used for streaming responses directly to the client. + * This stream is provided by the AWS Lambda runtime for response streaming. + */ + responseStream: ResponseStream; +}; + /** * Configuration options for CORS middleware */ @@ -190,8 +231,9 @@ export type { Path, RequestContext, RestRouterOptions, - ResponseStream, RouteHandler, + ResolveStreamOptions, + ResponseStream, RestRouteOptions, RestRouteHandlerOptions, RouteRegistryOptions, diff --git a/packages/event-handler/tests/unit/rest/Router/middleware.test.ts b/packages/event-handler/tests/unit/rest/Router/middleware.test.ts index 624ed246f7..bcdc41c16b 100644 --- a/packages/event-handler/tests/unit/rest/Router/middleware.test.ts +++ b/packages/event-handler/tests/unit/rest/Router/middleware.test.ts @@ -191,9 +191,9 @@ describe('Class: Router - Middleware', () => { await next(); }); - app.use(({ next }) => { + // biome-ignore lint/suspicious/useAwait: This specifically tests a missing await call in an async function + app.use(async ({ next }) => { next(); - return Promise.resolve(); }); // Act diff --git a/packages/event-handler/tests/unit/rest/helpers.ts b/packages/event-handler/tests/unit/rest/helpers.ts index 5ec94eb621..1a6163e763 100644 --- a/packages/event-handler/tests/unit/rest/helpers.ts +++ b/packages/event-handler/tests/unit/rest/helpers.ts @@ -1,5 +1,5 @@ import type { APIGatewayProxyEvent } from 'aws-lambda'; -import { ResponseStream } from 'lambda-stream'; +import { HttpResponseStream } from '../../../src/rest/utils.js'; import type { HandlerResponse, Middleware } from '../../../src/types/rest.js'; export const createTestEvent = ( @@ -92,19 +92,19 @@ export const createHeaderCheckMiddleware = (headers: { }; // Mock ResponseStream that extends the actual ResponseStream class -export class MockResponseStream extends ResponseStream { +export class MockResponseStream extends HttpResponseStream { public chunks: Buffer[] = []; public _onBeforeFirstWrite?: ( write: (data: Uint8Array | string) => void ) => void; - private firstWrite = true; + #firstWrite = true; _write(chunk: Buffer, _encoding: string, callback: () => void): void { - if (this.firstWrite && this._onBeforeFirstWrite) { + if (this.#firstWrite && this._onBeforeFirstWrite) { this._onBeforeFirstWrite((data: Uint8Array | string) => { this.chunks.push(Buffer.from(data)); }); - this.firstWrite = false; + this.#firstWrite = false; } this.chunks.push(chunk); callback();