Skip to content

Commit e321526

Browse files
authored
feat(event-handler): add streaming functionality (#4586)
1 parent 92b5e92 commit e321526

File tree

12 files changed

+1010
-103
lines changed

12 files changed

+1010
-103
lines changed

packages/event-handler/src/rest/Router.ts

Lines changed: 83 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
1+
import { Readable } from 'node:stream';
2+
import { pipeline } from 'node:stream/promises';
3+
import type streamWeb from 'node:stream/web';
14
import type { GenericLogger } from '@aws-lambda-powertools/commons/types';
25
import {
36
getStringFromEnv,
47
isDevMode,
58
} from '@aws-lambda-powertools/commons/utils/env';
69
import type { APIGatewayProxyResult, Context } from 'aws-lambda';
7-
import type {
8-
HandlerResponse,
9-
HttpStatusCode,
10-
ResolveOptions,
11-
} from '../types/index.js';
10+
import type { HandlerResponse, ResolveOptions } from '../types/index.js';
1211
import type {
1312
ErrorConstructor,
1413
ErrorHandler,
@@ -17,6 +16,8 @@ import type {
1716
Middleware,
1817
Path,
1918
RequestContext,
19+
ResolveStreamOptions,
20+
ResponseStream,
2021
RestRouteOptions,
2122
RestRouterOptions,
2223
RouteHandler,
@@ -26,6 +27,7 @@ import {
2627
handlerResultToProxyResult,
2728
handlerResultToWebResponse,
2829
proxyEventToWebRequest,
30+
webHeadersToApiGatewayV1Headers,
2931
} from './converters.js';
3032
import { ErrorHandlerRegistry } from './ErrorHandlerRegistry.js';
3133
import {
@@ -38,8 +40,9 @@ import { Route } from './Route.js';
3840
import { RouteHandlerRegistry } from './RouteHandlerRegistry.js';
3941
import {
4042
composeMiddleware,
43+
HttpResponseStream,
4144
isAPIGatewayProxyEvent,
42-
isAPIGatewayProxyResult,
45+
isExtendedAPIGatewayProxyResult,
4346
isHttpMethod,
4447
resolvePrefixedPath,
4548
} from './utils.js';
@@ -187,21 +190,19 @@ class Router {
187190
}
188191

189192
/**
190-
* Resolves an API Gateway event by routing it to the appropriate handler
191-
* and converting the result to an API Gateway proxy result. Handles errors
192-
* using registered error handlers or falls back to default error handling
193-
* (500 Internal Server Error).
193+
* Core resolution logic shared by both resolve and resolveStream methods.
194+
* Validates the event, routes to handlers, executes middleware, and handles errors.
194195
*
195196
* @param event - The Lambda event to resolve
196197
* @param context - The Lambda context
197198
* @param options - Optional resolve options for scope binding
198-
* @returns An API Gateway proxy result
199+
* @returns A handler response (Response, JSONObject, or ExtendedAPIGatewayProxyResult)
199200
*/
200-
public async resolve(
201+
async #resolve(
201202
event: unknown,
202203
context: Context,
203204
options?: ResolveOptions
204-
): Promise<APIGatewayProxyResult> {
205+
): Promise<HandlerResponse> {
205206
if (!isAPIGatewayProxyEvent(event)) {
206207
this.logger.error(
207208
'Received an event that is not compatible with this resolver'
@@ -276,18 +277,79 @@ class Router {
276277
});
277278

278279
// middleware result takes precedence to allow short-circuiting
279-
const result = middlewareResult ?? requestContext.res;
280-
281-
return handlerResultToProxyResult(result);
280+
return middlewareResult ?? requestContext.res;
282281
} catch (error) {
283282
this.logger.debug(`There was an error processing the request: ${error}`);
284-
const result = await this.handleError(error as Error, {
283+
return this.handleError(error as Error, {
285284
...requestContext,
286285
scope: options?.scope,
287286
});
288-
const statusCode =
289-
result instanceof Response ? result.status : result.statusCode;
290-
return handlerResultToProxyResult(result, statusCode as HttpStatusCode);
287+
}
288+
}
289+
290+
/**
291+
* Resolves an API Gateway event by routing it to the appropriate handler
292+
* and converting the result to an API Gateway proxy result. Handles errors
293+
* using registered error handlers or falls back to default error handling
294+
* (500 Internal Server Error).
295+
*
296+
* @param event - The Lambda event to resolve
297+
* @param context - The Lambda context
298+
* @param options - Optional resolve options for scope binding
299+
* @returns An API Gateway proxy result
300+
*/
301+
public async resolve(
302+
event: unknown,
303+
context: Context,
304+
options?: ResolveOptions
305+
): Promise<APIGatewayProxyResult> {
306+
const result = await this.#resolve(event, context, options);
307+
return handlerResultToProxyResult(result);
308+
}
309+
310+
/**
311+
* Resolves an API Gateway event by routing it to the appropriate handler
312+
* and streaming the response directly to the provided response stream.
313+
* Used for Lambda response streaming.
314+
*
315+
* @param event - The Lambda event to resolve
316+
* @param context - The Lambda context
317+
* @param options - Stream resolve options including the response stream
318+
*/
319+
public async resolveStream(
320+
event: unknown,
321+
context: Context,
322+
options: ResolveStreamOptions
323+
): Promise<void> {
324+
const result = await this.#resolve(event, context, options);
325+
await this.#streamHandlerResponse(result, options.responseStream);
326+
}
327+
328+
/**
329+
* Streams a handler response to the Lambda response stream.
330+
* Converts the response to a web response and pipes it through the stream.
331+
*
332+
* @param response - The handler response to stream
333+
* @param responseStream - The Lambda response stream to write to
334+
*/
335+
async #streamHandlerResponse(
336+
response: HandlerResponse,
337+
responseStream: ResponseStream
338+
) {
339+
const webResponse = handlerResultToWebResponse(response);
340+
const { headers } = webHeadersToApiGatewayV1Headers(webResponse.headers);
341+
const resStream = HttpResponseStream.from(responseStream, {
342+
statusCode: webResponse.status,
343+
headers,
344+
});
345+
346+
if (webResponse.body) {
347+
const nodeStream = Readable.fromWeb(
348+
webResponse.body as streamWeb.ReadableStream
349+
);
350+
await pipeline(nodeStream, resStream);
351+
} else {
352+
resStream.write('');
291353
}
292354
}
293355

@@ -320,7 +382,7 @@ class Router {
320382
try {
321383
const { scope, ...reqCtx } = options;
322384
const body = await handler.apply(scope ?? this, [error, reqCtx]);
323-
if (body instanceof Response || isAPIGatewayProxyResult(body)) {
385+
if (body instanceof Response || isExtendedAPIGatewayProxyResult(body)) {
324386
return body;
325387
}
326388
if (!body.statusCode) {

packages/event-handler/src/rest/converters.ts

Lines changed: 123 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,20 @@
1+
import { Readable } from 'node:stream';
2+
import type streamWeb from 'node:stream/web';
3+
import { isString } from '@aws-lambda-powertools/commons/typeutils';
14
import type { APIGatewayProxyEvent, APIGatewayProxyResult } from 'aws-lambda';
25
import type {
36
CompressionOptions,
7+
ExtendedAPIGatewayProxyResult,
8+
ExtendedAPIGatewayProxyResultBody,
49
HandlerResponse,
510
HttpStatusCode,
611
} from '../types/rest.js';
712
import { COMPRESSION_ENCODING_TYPES, HttpStatusCodes } from './constants.js';
8-
import { isAPIGatewayProxyResult } from './utils.js';
13+
import {
14+
isExtendedAPIGatewayProxyResult,
15+
isNodeReadableStream,
16+
isWebReadableStream,
17+
} from './utils.js';
918

1019
/**
1120
* Creates a request body from API Gateway event body, handling base64 decoding if needed.
@@ -72,18 +81,17 @@ const proxyEventToWebRequest = (event: APIGatewayProxyEvent): Request => {
7281
};
7382

7483
/**
75-
* Converts a Web API Response object to an API Gateway proxy result.
84+
* Converts Web API Headers to API Gateway v1 headers format.
85+
* Splits multi-value headers by comma and organizes them into separate objects.
7686
*
77-
* @param response - The Web API Response object
78-
* @returns An API Gateway proxy result
87+
* @param webHeaders - The Web API Headers object
88+
* @returns Object containing headers and multiValueHeaders
7989
*/
80-
const webResponseToProxyResult = async (
81-
response: Response
82-
): Promise<APIGatewayProxyResult> => {
90+
const webHeadersToApiGatewayV1Headers = (webHeaders: Headers) => {
8391
const headers: Record<string, string> = {};
8492
const multiValueHeaders: Record<string, Array<string>> = {};
8593

86-
for (const [key, value] of response.headers.entries()) {
94+
for (const [key, value] of webHeaders.entries()) {
8795
const values = value.split(',').map((v) => v.trimStart());
8896
if (values.length > 1) {
8997
multiValueHeaders[key] = values;
@@ -92,6 +100,25 @@ const webResponseToProxyResult = async (
92100
}
93101
}
94102

103+
return {
104+
headers,
105+
multiValueHeaders,
106+
};
107+
};
108+
109+
/**
110+
* Converts a Web API Response object to an API Gateway proxy result.
111+
*
112+
* @param response - The Web API Response object
113+
* @returns An API Gateway proxy result
114+
*/
115+
const webResponseToProxyResult = async (
116+
response: Response
117+
): Promise<APIGatewayProxyResult> => {
118+
const { headers, multiValueHeaders } = webHeadersToApiGatewayV1Headers(
119+
response.headers
120+
);
121+
95122
// Check if response contains compressed/binary content
96123
const contentEncoding = response.headers.get(
97124
'content-encoding'
@@ -129,18 +156,47 @@ const webResponseToProxyResult = async (
129156
return result;
130157
};
131158

159+
/**
160+
* Adds headers from an ExtendedAPIGatewayProxyResult to a Headers object.
161+
*
162+
* @param headers - The Headers object to mutate
163+
* @param response - The response containing headers to add
164+
* @remarks This function mutates the headers object by adding entries from
165+
* response.headers and response.multiValueHeaders
166+
*/
167+
function addProxyEventHeaders(
168+
headers: Headers,
169+
response: ExtendedAPIGatewayProxyResult
170+
) {
171+
for (const [key, value] of Object.entries(response.headers ?? {})) {
172+
if (value != null) {
173+
headers.set(key, String(value));
174+
}
175+
}
176+
177+
for (const [key, values] of Object.entries(
178+
response.multiValueHeaders ?? {}
179+
)) {
180+
for (const value of values ?? []) {
181+
headers.append(key, String(value));
182+
}
183+
}
184+
}
185+
132186
/**
133187
* Converts a handler response to a Web API Response object.
134188
* Handles APIGatewayProxyResult, Response objects, and plain objects.
135189
*
136190
* @param response - The handler response (APIGatewayProxyResult, Response, or plain object)
137191
* @param resHeaders - Optional headers to be included in the response
192+
* @returns A Web API Response object
138193
*/
139194
const handlerResultToWebResponse = (
140195
response: HandlerResponse,
141196
resHeaders?: Headers
142197
): Response => {
143198
if (response instanceof Response) {
199+
if (resHeaders === undefined) return response;
144200
const headers = new Headers(resHeaders);
145201
for (const [key, value] of response.headers.entries()) {
146202
headers.set(key, value);
@@ -154,22 +210,15 @@ const handlerResultToWebResponse = (
154210
const headers = new Headers(resHeaders);
155211
headers.set('Content-Type', 'application/json');
156212

157-
if (isAPIGatewayProxyResult(response)) {
158-
for (const [key, value] of Object.entries(response.headers ?? {})) {
159-
if (value != null) {
160-
headers.set(key, String(value));
161-
}
162-
}
213+
if (isExtendedAPIGatewayProxyResult(response)) {
214+
addProxyEventHeaders(headers, response);
163215

164-
for (const [key, values] of Object.entries(
165-
response.multiValueHeaders ?? {}
166-
)) {
167-
for (const value of values ?? []) {
168-
headers.append(key, String(value));
169-
}
170-
}
216+
const body =
217+
response.body instanceof Readable
218+
? (Readable.toWeb(response.body) as ReadableStream)
219+
: response.body;
171220

172-
return new Response(response.body, {
221+
return new Response(body, {
173222
status: response.statusCode,
174223
headers,
175224
});
@@ -189,8 +238,24 @@ const handlerResultToProxyResult = async (
189238
response: HandlerResponse,
190239
statusCode: HttpStatusCode = HttpStatusCodes.OK
191240
): Promise<APIGatewayProxyResult> => {
192-
if (isAPIGatewayProxyResult(response)) {
193-
return response;
241+
if (isExtendedAPIGatewayProxyResult(response)) {
242+
if (isString(response.body)) {
243+
return {
244+
...response,
245+
body: response.body,
246+
};
247+
}
248+
if (
249+
isNodeReadableStream(response.body) ||
250+
isWebReadableStream(response.body)
251+
) {
252+
const nodeStream = bodyToNodeStream(response.body);
253+
return {
254+
...response,
255+
isBase64Encoded: true,
256+
body: await nodeStreamToBase64(nodeStream),
257+
};
258+
}
194259
}
195260
if (response instanceof Response) {
196261
return await webResponseToProxyResult(response);
@@ -203,9 +268,43 @@ const handlerResultToProxyResult = async (
203268
};
204269
};
205270

271+
/**
272+
* Converts various body types to a Node.js Readable stream.
273+
* Handles Node.js streams, web streams, and string bodies.
274+
*
275+
* @param body - The body to convert (Readable, ReadableStream, or string)
276+
* @returns A Node.js Readable stream
277+
*/
278+
const bodyToNodeStream = (body: ExtendedAPIGatewayProxyResultBody) => {
279+
if (isNodeReadableStream(body)) {
280+
return body;
281+
}
282+
if (isWebReadableStream(body)) {
283+
return Readable.fromWeb(body as streamWeb.ReadableStream);
284+
}
285+
return Readable.from(Buffer.from(body as string));
286+
};
287+
288+
/**
289+
* Converts a Node.js Readable stream to a base64 encoded string.
290+
* Handles both Buffer and string chunks by converting all to Buffers.
291+
*
292+
* @param stream - The Node.js Readable stream to convert
293+
* @returns A Promise that resolves to a base64 encoded string
294+
*/
295+
async function nodeStreamToBase64(stream: Readable) {
296+
const chunks: Buffer[] = [];
297+
for await (const chunk of stream) {
298+
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
299+
}
300+
return Buffer.concat(chunks).toString('base64');
301+
}
302+
206303
export {
207304
proxyEventToWebRequest,
208305
webResponseToProxyResult,
209306
handlerResultToWebResponse,
210307
handlerResultToProxyResult,
308+
bodyToNodeStream,
309+
webHeadersToApiGatewayV1Headers,
211310
};

packages/event-handler/src/rest/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,6 @@ export { Router } from './Router.js';
2323
export {
2424
composeMiddleware,
2525
isAPIGatewayProxyEvent,
26-
isAPIGatewayProxyResult,
26+
isExtendedAPIGatewayProxyResult,
2727
isHttpMethod,
2828
} from './utils.js';

0 commit comments

Comments
 (0)