Skip to content

Commit 7a15cc4

Browse files
committed
feat(event-handler): add streaming functionality
1 parent abd7b8b commit 7a15cc4

File tree

14 files changed

+945
-62
lines changed

14 files changed

+945
-62
lines changed

package-lock.json

Lines changed: 8 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/event-handler/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,8 @@
125125
"url": "https://github.com/aws-powertools/powertools-lambda-typescript/issues"
126126
},
127127
"dependencies": {
128-
"@aws-lambda-powertools/commons": "2.27.0"
128+
"@aws-lambda-powertools/commons": "2.27.0",
129+
"lambda-stream": "0.6.0"
129130
},
130131
"keywords": [
131132
"aws",

packages/event-handler/src/rest/Router.ts

Lines changed: 83 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1+
import { Readable } from 'node:stream';
2+
import { pipeline } from 'node:stream/promises';
3+
import type streamWeb from 'node:stream/web';
14
import type { GenericLogger } from '@aws-lambda-powertools/commons/types';
25
import {
36
getStringFromEnv,
47
isDevMode,
58
} from '@aws-lambda-powertools/commons/utils/env';
69
import type { APIGatewayProxyResult, Context } from 'aws-lambda';
7-
import type {
8-
HandlerResponse,
9-
HttpStatusCode,
10-
ResolveOptions,
11-
} from '../types/index.js';
10+
import type { ResolveStreamOptions } from '../types/common.js';
11+
import type { HandlerResponse, ResolveOptions } from '../types/index.js';
1212
import type {
1313
ErrorConstructor,
1414
ErrorHandler,
@@ -17,6 +17,7 @@ import type {
1717
Middleware,
1818
Path,
1919
RequestContext,
20+
ResponseStream,
2021
RestRouteOptions,
2122
RestRouterOptions,
2223
RouteHandler,
@@ -26,6 +27,7 @@ import {
2627
handlerResultToProxyResult,
2728
handlerResultToWebResponse,
2829
proxyEventToWebRequest,
30+
webHeadersToApiGatewayV1Headers,
2931
} from './converters.js';
3032
import { ErrorHandlerRegistry } from './ErrorHandlerRegistry.js';
3133
import {
@@ -38,8 +40,9 @@ import { Route } from './Route.js';
3840
import { RouteHandlerRegistry } from './RouteHandlerRegistry.js';
3941
import {
4042
composeMiddleware,
43+
HttpResponseStream,
4144
isAPIGatewayProxyEvent,
42-
isAPIGatewayProxyResult,
45+
isExtendedAPIGatewayProxyResult,
4346
isHttpMethod,
4447
resolvePrefixedPath,
4548
} from './utils.js';
@@ -187,21 +190,19 @@ class Router {
187190
}
188191

189192
/**
190-
* Resolves an API Gateway event by routing it to the appropriate handler
191-
* and converting the result to an API Gateway proxy result. Handles errors
192-
* using registered error handlers or falls back to default error handling
193-
* (500 Internal Server Error).
193+
* Core resolution logic shared by both resolve and resolveStream methods.
194+
* Validates the event, routes to handlers, executes middleware, and handles errors.
194195
*
195196
* @param event - The Lambda event to resolve
196197
* @param context - The Lambda context
197198
* @param options - Optional resolve options for scope binding
198-
* @returns An API Gateway proxy result
199+
* @returns A handler response (Response, JSONObject, or ExtendedAPIGatewayProxyResult)
199200
*/
200-
public async resolve(
201+
async #resolve(
201202
event: unknown,
202203
context: Context,
203204
options?: ResolveOptions
204-
): Promise<APIGatewayProxyResult> {
205+
): Promise<HandlerResponse> {
205206
if (!isAPIGatewayProxyEvent(event)) {
206207
this.logger.error(
207208
'Received an event that is not compatible with this resolver'
@@ -276,18 +277,79 @@ class Router {
276277
});
277278

278279
// middleware result takes precedence to allow short-circuiting
279-
const result = middlewareResult ?? requestContext.res;
280-
281-
return handlerResultToProxyResult(result);
280+
return middlewareResult ?? requestContext.res;
282281
} catch (error) {
283282
this.logger.debug(`There was an error processing the request: ${error}`);
284-
const result = await this.handleError(error as Error, {
283+
return this.handleError(error as Error, {
285284
...requestContext,
286285
scope: options?.scope,
287286
});
288-
const statusCode =
289-
result instanceof Response ? result.status : result.statusCode;
290-
return handlerResultToProxyResult(result, statusCode as HttpStatusCode);
287+
}
288+
}
289+
290+
/**
291+
* Resolves an API Gateway event by routing it to the appropriate handler
292+
* and converting the result to an API Gateway proxy result. Handles errors
293+
* using registered error handlers or falls back to default error handling
294+
* (500 Internal Server Error).
295+
*
296+
* @param event - The Lambda event to resolve
297+
* @param context - The Lambda context
298+
* @param options - Optional resolve options for scope binding
299+
* @returns An API Gateway proxy result
300+
*/
301+
public async resolve(
302+
event: unknown,
303+
context: Context,
304+
options?: ResolveOptions
305+
): Promise<APIGatewayProxyResult> {
306+
const result = await this.#resolve(event, context, options);
307+
return handlerResultToProxyResult(result);
308+
}
309+
310+
/**
311+
* Resolves an API Gateway event by routing it to the appropriate handler
312+
* and streaming the response directly to the provided response stream.
313+
* Used for Lambda response streaming.
314+
*
315+
* @param event - The Lambda event to resolve
316+
* @param context - The Lambda context
317+
* @param options - Stream resolve options including the response stream
318+
*/
319+
public async resolveStream(
320+
event: unknown,
321+
context: Context,
322+
options: ResolveStreamOptions
323+
): Promise<void> {
324+
const result = await this.#resolve(event, context, options);
325+
await this.#streamHandlerResponse(result, options.responseStream);
326+
}
327+
328+
/**
329+
* Streams a handler response to the Lambda response stream.
330+
* Converts the response to a web response and pipes it through the stream.
331+
*
332+
* @param response - The handler response to stream
333+
* @param responseStream - The Lambda response stream to write to
334+
*/
335+
async #streamHandlerResponse(
336+
response: HandlerResponse,
337+
responseStream: ResponseStream
338+
) {
339+
const webResponse = handlerResultToWebResponse(response);
340+
const { headers } = webHeadersToApiGatewayV1Headers(webResponse.headers);
341+
const resStream = HttpResponseStream.from(responseStream, {
342+
statusCode: webResponse.status,
343+
headers,
344+
});
345+
346+
if (webResponse.body) {
347+
const nodeStream = Readable.fromWeb(
348+
webResponse.body as streamWeb.ReadableStream
349+
);
350+
await pipeline(nodeStream, resStream);
351+
} else {
352+
resStream.write('');
291353
}
292354
}
293355

@@ -320,7 +382,7 @@ class Router {
320382
try {
321383
const { scope, ...reqCtx } = options;
322384
const body = await handler.apply(scope ?? this, [error, reqCtx]);
323-
if (body instanceof Response || isAPIGatewayProxyResult(body)) {
385+
if (body instanceof Response || isExtendedAPIGatewayProxyResult(body)) {
324386
return body;
325387
}
326388
if (!body.statusCode) {

packages/event-handler/src/rest/converters.ts

Lines changed: 95 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,19 @@
1+
import { Readable } from 'node:stream';
2+
import type streamWeb from 'node:stream/web';
3+
import { isString } from '@aws-lambda-powertools/commons/typeutils';
14
import type { APIGatewayProxyEvent, APIGatewayProxyResult } from 'aws-lambda';
25
import type {
36
CompressionOptions,
7+
ExtendedAPIGatewayProxyResultBody,
48
HandlerResponse,
59
HttpStatusCode,
610
} from '../types/rest.js';
711
import { COMPRESSION_ENCODING_TYPES, HttpStatusCodes } from './constants.js';
8-
import { isAPIGatewayProxyResult } from './utils.js';
12+
import {
13+
isExtendedAPIGatewayProxyResult,
14+
isNodeReadableStream,
15+
isWebReadableStream,
16+
} from './utils.js';
917

1018
/**
1119
* Creates a request body from API Gateway event body, handling base64 decoding if needed.
@@ -72,18 +80,17 @@ const proxyEventToWebRequest = (event: APIGatewayProxyEvent): Request => {
7280
};
7381

7482
/**
75-
* Converts a Web API Response object to an API Gateway proxy result.
83+
* Converts Web API Headers to API Gateway v1 headers format.
84+
* Splits multi-value headers by comma and organizes them into separate objects.
7685
*
77-
* @param response - The Web API Response object
78-
* @returns An API Gateway proxy result
86+
* @param webHeaders - The Web API Headers object
87+
* @returns Object containing headers and multiValueHeaders
7988
*/
80-
const webResponseToProxyResult = async (
81-
response: Response
82-
): Promise<APIGatewayProxyResult> => {
89+
const webHeadersToApiGatewayV1Headers = (webHeaders: Headers) => {
8390
const headers: Record<string, string> = {};
8491
const multiValueHeaders: Record<string, Array<string>> = {};
8592

86-
for (const [key, value] of response.headers.entries()) {
93+
for (const [key, value] of webHeaders.entries()) {
8794
const values = value.split(',').map((v) => v.trimStart());
8895
if (values.length > 1) {
8996
multiValueHeaders[key] = values;
@@ -92,6 +99,25 @@ const webResponseToProxyResult = async (
9299
}
93100
}
94101

102+
return {
103+
headers,
104+
multiValueHeaders,
105+
};
106+
};
107+
108+
/**
109+
* Converts a Web API Response object to an API Gateway proxy result.
110+
*
111+
* @param response - The Web API Response object
112+
* @returns An API Gateway proxy result
113+
*/
114+
const webResponseToProxyResult = async (
115+
response: Response
116+
): Promise<APIGatewayProxyResult> => {
117+
const { headers, multiValueHeaders } = webHeadersToApiGatewayV1Headers(
118+
response.headers
119+
);
120+
95121
// Check if response contains compressed/binary content
96122
const contentEncoding = response.headers.get(
97123
'content-encoding'
@@ -135,12 +161,14 @@ const webResponseToProxyResult = async (
135161
*
136162
* @param response - The handler response (APIGatewayProxyResult, Response, or plain object)
137163
* @param resHeaders - Optional headers to be included in the response
164+
* @returns A Web API Response object
138165
*/
139166
const handlerResultToWebResponse = (
140167
response: HandlerResponse,
141168
resHeaders?: Headers
142169
): Response => {
143170
if (response instanceof Response) {
171+
if (resHeaders === undefined) return response;
144172
const headers = new Headers(resHeaders);
145173
for (const [key, value] of response.headers.entries()) {
146174
headers.set(key, value);
@@ -154,7 +182,7 @@ const handlerResultToWebResponse = (
154182
const headers = new Headers(resHeaders);
155183
headers.set('Content-Type', 'application/json');
156184

157-
if (isAPIGatewayProxyResult(response)) {
185+
if (isExtendedAPIGatewayProxyResult(response)) {
158186
for (const [key, value] of Object.entries(response.headers ?? {})) {
159187
if (value != null) {
160188
headers.set(key, String(value));
@@ -169,7 +197,12 @@ const handlerResultToWebResponse = (
169197
}
170198
}
171199

172-
return new Response(response.body, {
200+
const body =
201+
response.body instanceof Readable
202+
? (Readable.toWeb(response.body) as ReadableStream)
203+
: response.body;
204+
205+
return new Response(body, {
173206
status: response.statusCode,
174207
headers,
175208
});
@@ -189,8 +222,24 @@ const handlerResultToProxyResult = async (
189222
response: HandlerResponse,
190223
statusCode: HttpStatusCode = HttpStatusCodes.OK
191224
): Promise<APIGatewayProxyResult> => {
192-
if (isAPIGatewayProxyResult(response)) {
193-
return response;
225+
if (isExtendedAPIGatewayProxyResult(response)) {
226+
if (isString(response.body)) {
227+
return {
228+
...response,
229+
body: response.body,
230+
};
231+
}
232+
if (
233+
isNodeReadableStream(response.body) ||
234+
isWebReadableStream(response.body)
235+
) {
236+
const nodeStream = bodyToNodeStream(response.body);
237+
return {
238+
...response,
239+
isBase64Encoded: true,
240+
body: await nodeStreamToBase64(nodeStream),
241+
};
242+
}
194243
}
195244
if (response instanceof Response) {
196245
return await webResponseToProxyResult(response);
@@ -203,9 +252,43 @@ const handlerResultToProxyResult = async (
203252
};
204253
};
205254

255+
/**
256+
* Converts various body types to a Node.js Readable stream.
257+
* Handles Node.js streams, web streams, and string bodies.
258+
*
259+
* @param body - The body to convert (Readable, ReadableStream, or string)
260+
* @returns A Node.js Readable stream
261+
*/
262+
const bodyToNodeStream = (body: ExtendedAPIGatewayProxyResultBody) => {
263+
if (isNodeReadableStream(body)) {
264+
return body;
265+
}
266+
if (isWebReadableStream(body)) {
267+
return Readable.fromWeb(body as streamWeb.ReadableStream);
268+
}
269+
return Readable.from(Buffer.from(body as string));
270+
};
271+
272+
/**
273+
* Converts a Node.js Readable stream to a base64 encoded string.
274+
* Handles both Buffer and string chunks by converting all to Buffers.
275+
*
276+
* @param stream - The Node.js Readable stream to convert
277+
* @returns A Promise that resolves to a base64 encoded string
278+
*/
279+
async function nodeStreamToBase64(stream: Readable) {
280+
const chunks: Buffer[] = [];
281+
for await (const chunk of stream) {
282+
chunks.push(Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk));
283+
}
284+
return Buffer.concat(chunks).toString('base64');
285+
}
286+
206287
export {
207288
proxyEventToWebRequest,
208289
webResponseToProxyResult,
209290
handlerResultToWebResponse,
210291
handlerResultToProxyResult,
292+
bodyToNodeStream,
293+
webHeadersToApiGatewayV1Headers,
211294
};

packages/event-handler/src/rest/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,6 @@ export { Router } from './Router.js';
2323
export {
2424
composeMiddleware,
2525
isAPIGatewayProxyEvent,
26-
isAPIGatewayProxyResult,
26+
isExtendedAPIGatewayProxyResult,
2727
isHttpMethod,
2828
} from './utils.js';

0 commit comments

Comments
 (0)