Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 83 additions & 21 deletions packages/event-handler/src/rest/Router.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import { Readable } from 'node:stream';
import { pipeline } from 'node:stream/promises';
import type streamWeb from 'node:stream/web';
import type { GenericLogger } from '@aws-lambda-powertools/commons/types';
import {
getStringFromEnv,
isDevMode,
} from '@aws-lambda-powertools/commons/utils/env';
import type { APIGatewayProxyResult, Context } from 'aws-lambda';
import type {
HandlerResponse,
HttpStatusCode,
ResolveOptions,
} from '../types/index.js';
import type { HandlerResponse, ResolveOptions } from '../types/index.js';
import type {
ErrorConstructor,
ErrorHandler,
Expand All @@ -17,6 +16,8 @@ import type {
Middleware,
Path,
RequestContext,
ResolveStreamOptions,
ResponseStream,
RestRouteOptions,
RestRouterOptions,
RouteHandler,
Expand All @@ -26,6 +27,7 @@ import {
handlerResultToProxyResult,
handlerResultToWebResponse,
proxyEventToWebRequest,
webHeadersToApiGatewayV1Headers,
} from './converters.js';
import { ErrorHandlerRegistry } from './ErrorHandlerRegistry.js';
import {
Expand All @@ -38,8 +40,9 @@ import { Route } from './Route.js';
import { RouteHandlerRegistry } from './RouteHandlerRegistry.js';
import {
composeMiddleware,
HttpResponseStream,
isAPIGatewayProxyEvent,
isAPIGatewayProxyResult,
isExtendedAPIGatewayProxyResult,
isHttpMethod,
resolvePrefixedPath,
} from './utils.js';
Expand Down Expand Up @@ -187,21 +190,19 @@ class Router {
}

/**
* Resolves an API Gateway event by routing it to the appropriate handler
* and converting the result to an API Gateway proxy result. Handles errors
* using registered error handlers or falls back to default error handling
* (500 Internal Server Error).
* Core resolution logic shared by both resolve and resolveStream methods.
* Validates the event, routes to handlers, executes middleware, and handles errors.
*
* @param event - The Lambda event to resolve
* @param context - The Lambda context
* @param options - Optional resolve options for scope binding
* @returns An API Gateway proxy result
* @returns A handler response (Response, JSONObject, or ExtendedAPIGatewayProxyResult)
*/
public async resolve(
async #resolve(
event: unknown,
context: Context,
options?: ResolveOptions
): Promise<APIGatewayProxyResult> {
): Promise<HandlerResponse> {
if (!isAPIGatewayProxyEvent(event)) {
this.logger.error(
'Received an event that is not compatible with this resolver'
Expand Down Expand Up @@ -276,18 +277,79 @@ class Router {
});

// middleware result takes precedence to allow short-circuiting
const result = middlewareResult ?? requestContext.res;

return handlerResultToProxyResult(result);
return middlewareResult ?? requestContext.res;
} catch (error) {
this.logger.debug(`There was an error processing the request: ${error}`);
const result = await this.handleError(error as Error, {
return this.handleError(error as Error, {
...requestContext,
scope: options?.scope,
});
const statusCode =
result instanceof Response ? result.status : result.statusCode;
return handlerResultToProxyResult(result, statusCode as HttpStatusCode);
}
}

/**
* Resolves an API Gateway event by routing it to the appropriate handler
* and converting the result to an API Gateway proxy result. Handles errors
* using registered error handlers or falls back to default error handling
* (500 Internal Server Error).
*
* @param event - The Lambda event to resolve
* @param context - The Lambda context
* @param options - Optional resolve options for scope binding
* @returns An API Gateway proxy result
*/
public async resolve(
event: unknown,
context: Context,
options?: ResolveOptions
): Promise<APIGatewayProxyResult> {
const result = await this.#resolve(event, context, options);
return handlerResultToProxyResult(result);
}

/**
* Resolves an API Gateway event by routing it to the appropriate handler
* and streaming the response directly to the provided response stream.
* Used for Lambda response streaming.
*
* @param event - The Lambda event to resolve
* @param context - The Lambda context
* @param options - Stream resolve options including the response stream
*/
public async resolveStream(
event: unknown,
context: Context,
options: ResolveStreamOptions
): Promise<void> {
const result = await this.#resolve(event, context, options);
await this.#streamHandlerResponse(result, options.responseStream);
}

/**
* Streams a handler response to the Lambda response stream.
* Converts the response to a web response and pipes it through the stream.
*
* @param response - The handler response to stream
* @param responseStream - The Lambda response stream to write to
*/
async #streamHandlerResponse(
response: HandlerResponse,
responseStream: ResponseStream
) {
const webResponse = handlerResultToWebResponse(response);
const { headers } = webHeadersToApiGatewayV1Headers(webResponse.headers);
const resStream = HttpResponseStream.from(responseStream, {
statusCode: webResponse.status,
headers,
});

if (webResponse.body) {
const nodeStream = Readable.fromWeb(
webResponse.body as streamWeb.ReadableStream
);
await pipeline(nodeStream, resStream);
} else {
resStream.write('');
}
}

Expand Down Expand Up @@ -320,7 +382,7 @@ class Router {
try {
const { scope, ...reqCtx } = options;
const body = await handler.apply(scope ?? this, [error, reqCtx]);
if (body instanceof Response || isAPIGatewayProxyResult(body)) {
if (body instanceof Response || isExtendedAPIGatewayProxyResult(body)) {
return body;
}
if (!body.statusCode) {
Expand Down
147 changes: 123 additions & 24 deletions packages/event-handler/src/rest/converters.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
import { Readable } from 'node:stream';
import type streamWeb from 'node:stream/web';
import { isString } from '@aws-lambda-powertools/commons/typeutils';
import type { APIGatewayProxyEvent, APIGatewayProxyResult } from 'aws-lambda';
import type {
CompressionOptions,
ExtendedAPIGatewayProxyResult,
ExtendedAPIGatewayProxyResultBody,
HandlerResponse,
HttpStatusCode,
} from '../types/rest.js';
import { COMPRESSION_ENCODING_TYPES, HttpStatusCodes } from './constants.js';
import { isAPIGatewayProxyResult } from './utils.js';
import {
isExtendedAPIGatewayProxyResult,
isNodeReadableStream,
isWebReadableStream,
} from './utils.js';

/**
* Creates a request body from API Gateway event body, handling base64 decoding if needed.
Expand Down Expand Up @@ -72,18 +81,17 @@ const proxyEventToWebRequest = (event: APIGatewayProxyEvent): Request => {
};

/**
* Converts a Web API Response object to an API Gateway proxy result.
* Converts Web API Headers to API Gateway v1 headers format.
* Splits multi-value headers by comma and organizes them into separate objects.
*
* @param response - The Web API Response object
* @returns An API Gateway proxy result
* @param webHeaders - The Web API Headers object
* @returns Object containing headers and multiValueHeaders
*/
const webResponseToProxyResult = async (
response: Response
): Promise<APIGatewayProxyResult> => {
const webHeadersToApiGatewayV1Headers = (webHeaders: Headers) => {
const headers: Record<string, string> = {};
const multiValueHeaders: Record<string, Array<string>> = {};

for (const [key, value] of response.headers.entries()) {
for (const [key, value] of webHeaders.entries()) {
const values = value.split(',').map((v) => v.trimStart());
if (values.length > 1) {
multiValueHeaders[key] = values;
Expand All @@ -92,6 +100,25 @@ const webResponseToProxyResult = async (
}
}

return {
headers,
multiValueHeaders,
};
};

/**
* Converts a Web API Response object to an API Gateway proxy result.
*
* @param response - The Web API Response object
* @returns An API Gateway proxy result
*/
const webResponseToProxyResult = async (
response: Response
): Promise<APIGatewayProxyResult> => {
const { headers, multiValueHeaders } = webHeadersToApiGatewayV1Headers(
response.headers
);

// Check if response contains compressed/binary content
const contentEncoding = response.headers.get(
'content-encoding'
Expand Down Expand Up @@ -129,18 +156,47 @@ const webResponseToProxyResult = async (
return result;
};

/**
* Adds headers from an ExtendedAPIGatewayProxyResult to a Headers object.
*
* @param headers - The Headers object to mutate
* @param response - The response containing headers to add
* @remarks This function mutates the headers object by adding entries from
* response.headers and response.multiValueHeaders
*/
function addProxyEventHeaders(
headers: Headers,
response: ExtendedAPIGatewayProxyResult
) {
for (const [key, value] of Object.entries(response.headers ?? {})) {
if (value != null) {
headers.set(key, String(value));
}
}

for (const [key, values] of Object.entries(
response.multiValueHeaders ?? {}
)) {
for (const value of values ?? []) {
headers.append(key, String(value));
}
}
}

/**
* Converts a handler response to a Web API Response object.
* Handles APIGatewayProxyResult, Response objects, and plain objects.
*
* @param response - The handler response (APIGatewayProxyResult, Response, or plain object)
* @param resHeaders - Optional headers to be included in the response
* @returns A Web API Response object
*/
const handlerResultToWebResponse = (
response: HandlerResponse,
resHeaders?: Headers
): Response => {
if (response instanceof Response) {
if (resHeaders === undefined) return response;
const headers = new Headers(resHeaders);
for (const [key, value] of response.headers.entries()) {
headers.set(key, value);
Expand All @@ -154,22 +210,15 @@ const handlerResultToWebResponse = (
const headers = new Headers(resHeaders);
headers.set('Content-Type', 'application/json');

if (isAPIGatewayProxyResult(response)) {
for (const [key, value] of Object.entries(response.headers ?? {})) {
if (value != null) {
headers.set(key, String(value));
}
}
if (isExtendedAPIGatewayProxyResult(response)) {
addProxyEventHeaders(headers, response);

for (const [key, values] of Object.entries(
response.multiValueHeaders ?? {}
)) {
for (const value of values ?? []) {
headers.append(key, String(value));
}
}
const body =
response.body instanceof Readable
? (Readable.toWeb(response.body) as ReadableStream)
: response.body;

return new Response(response.body, {
return new Response(body, {
status: response.statusCode,
headers,
});
Expand All @@ -189,8 +238,24 @@ const handlerResultToProxyResult = async (
response: HandlerResponse,
statusCode: HttpStatusCode = HttpStatusCodes.OK
): Promise<APIGatewayProxyResult> => {
if (isAPIGatewayProxyResult(response)) {
return response;
if (isExtendedAPIGatewayProxyResult(response)) {
if (isString(response.body)) {
return {
...response,
body: response.body,
};
}
if (
isNodeReadableStream(response.body) ||
isWebReadableStream(response.body)
) {
const nodeStream = bodyToNodeStream(response.body);
return {
...response,
isBase64Encoded: true,
body: await nodeStreamToBase64(nodeStream),
};
}
}
if (response instanceof Response) {
return await webResponseToProxyResult(response);
Expand All @@ -203,9 +268,43 @@ const handlerResultToProxyResult = async (
};
};

/**
* Converts various body types to a Node.js Readable stream.
* Handles Node.js streams, web streams, and string bodies.
*
* @param body - The body to convert (Readable, ReadableStream, or string)
* @returns A Node.js Readable stream
*/
const bodyToNodeStream = (body: ExtendedAPIGatewayProxyResultBody) => {
if (isNodeReadableStream(body)) {
return body;
}
if (isWebReadableStream(body)) {
return Readable.fromWeb(body as streamWeb.ReadableStream);
}
return Readable.from(Buffer.from(body as string));
};

/**
* Converts a Node.js Readable stream to a base64 encoded string.
* Handles both Buffer and string chunks by converting all to Buffers.
*
* @param stream - The Node.js Readable stream to convert
* @returns A Promise that resolves to a base64 encoded string
*/
async function nodeStreamToBase64(stream: Readable) {
const chunks: Buffer[] = [];
for await (const chunk of stream) {
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
}
return Buffer.concat(chunks).toString('base64');
}

export {
proxyEventToWebRequest,
webResponseToProxyResult,
handlerResultToWebResponse,
handlerResultToProxyResult,
bodyToNodeStream,
webHeadersToApiGatewayV1Headers,
};
2 changes: 1 addition & 1 deletion packages/event-handler/src/rest/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ export { Router } from './Router.js';
export {
composeMiddleware,
isAPIGatewayProxyEvent,
isAPIGatewayProxyResult,
isExtendedAPIGatewayProxyResult,
isHttpMethod,
} from './utils.js';
Loading
Loading