Skip to content

Commit e9c29d3

Browse files
committed
feat(event-handler): add streaming functionality
1 parent abd7b8b commit e9c29d3

File tree

14 files changed

+978
-102
lines changed

14 files changed

+978
-102
lines changed

package-lock.json

Lines changed: 8 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/event-handler/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,8 @@
125125
"url": "https://github.com/aws-powertools/powertools-lambda-typescript/issues"
126126
},
127127
"dependencies": {
128-
"@aws-lambda-powertools/commons": "2.27.0"
128+
"@aws-lambda-powertools/commons": "2.27.0",
129+
"lambda-stream": "0.6.0"
129130
},
130131
"keywords": [
131132
"aws",

packages/event-handler/src/rest/Router.ts

Lines changed: 83 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1+
import { Readable } from 'node:stream';
2+
import { pipeline } from 'node:stream/promises';
3+
import type streamWeb from 'node:stream/web';
14
import type { GenericLogger } from '@aws-lambda-powertools/commons/types';
25
import {
36
getStringFromEnv,
47
isDevMode,
58
} from '@aws-lambda-powertools/commons/utils/env';
69
import type { APIGatewayProxyResult, Context } from 'aws-lambda';
7-
import type {
8-
HandlerResponse,
9-
HttpStatusCode,
10-
ResolveOptions,
11-
} from '../types/index.js';
10+
import type { ResolveStreamOptions } from '../types/common.js';
11+
import type { HandlerResponse, ResolveOptions } from '../types/index.js';
1212
import type {
1313
ErrorConstructor,
1414
ErrorHandler,
@@ -17,6 +17,7 @@ import type {
1717
Middleware,
1818
Path,
1919
RequestContext,
20+
ResponseStream,
2021
RestRouteOptions,
2122
RestRouterOptions,
2223
RouteHandler,
@@ -26,6 +27,7 @@ import {
2627
handlerResultToProxyResult,
2728
handlerResultToWebResponse,
2829
proxyEventToWebRequest,
30+
webHeadersToApiGatewayV1Headers,
2931
} from './converters.js';
3032
import { ErrorHandlerRegistry } from './ErrorHandlerRegistry.js';
3133
import {
@@ -38,8 +40,9 @@ import { Route } from './Route.js';
3840
import { RouteHandlerRegistry } from './RouteHandlerRegistry.js';
3941
import {
4042
composeMiddleware,
43+
HttpResponseStream,
4144
isAPIGatewayProxyEvent,
42-
isAPIGatewayProxyResult,
45+
isExtendedAPIGatewayProxyResult,
4346
isHttpMethod,
4447
resolvePrefixedPath,
4548
} from './utils.js';
@@ -187,21 +190,19 @@ class Router {
187190
}
188191

189192
/**
190-
* Resolves an API Gateway event by routing it to the appropriate handler
191-
* and converting the result to an API Gateway proxy result. Handles errors
192-
* using registered error handlers or falls back to default error handling
193-
* (500 Internal Server Error).
193+
* Core resolution logic shared by both resolve and resolveStream methods.
194+
* Validates the event, routes to handlers, executes middleware, and handles errors.
194195
*
195196
* @param event - The Lambda event to resolve
196197
* @param context - The Lambda context
197198
* @param options - Optional resolve options for scope binding
198-
* @returns An API Gateway proxy result
199+
* @returns A handler response (Response, JSONObject, or ExtendedAPIGatewayProxyResult)
199200
*/
200-
public async resolve(
201+
async #resolve(
201202
event: unknown,
202203
context: Context,
203204
options?: ResolveOptions
204-
): Promise<APIGatewayProxyResult> {
205+
): Promise<HandlerResponse> {
205206
if (!isAPIGatewayProxyEvent(event)) {
206207
this.logger.error(
207208
'Received an event that is not compatible with this resolver'
@@ -276,18 +277,79 @@ class Router {
276277
});
277278

278279
// middleware result takes precedence to allow short-circuiting
279-
const result = middlewareResult ?? requestContext.res;
280-
281-
return handlerResultToProxyResult(result);
280+
return middlewareResult ?? requestContext.res;
282281
} catch (error) {
283282
this.logger.debug(`There was an error processing the request: ${error}`);
284-
const result = await this.handleError(error as Error, {
283+
return this.handleError(error as Error, {
285284
...requestContext,
286285
scope: options?.scope,
287286
});
288-
const statusCode =
289-
result instanceof Response ? result.status : result.statusCode;
290-
return handlerResultToProxyResult(result, statusCode as HttpStatusCode);
287+
}
288+
}
289+
290+
/**
291+
* Resolves an API Gateway event by routing it to the appropriate handler
292+
* and converting the result to an API Gateway proxy result. Handles errors
293+
* using registered error handlers or falls back to default error handling
294+
* (500 Internal Server Error).
295+
*
296+
* @param event - The Lambda event to resolve
297+
* @param context - The Lambda context
298+
* @param options - Optional resolve options for scope binding
299+
* @returns An API Gateway proxy result
300+
*/
301+
public async resolve(
302+
event: unknown,
303+
context: Context,
304+
options?: ResolveOptions
305+
): Promise<APIGatewayProxyResult> {
306+
const result = await this.#resolve(event, context, options);
307+
return handlerResultToProxyResult(result);
308+
}
309+
310+
/**
311+
* Resolves an API Gateway event by routing it to the appropriate handler
312+
* and streaming the response directly to the provided response stream.
313+
* Used for Lambda response streaming.
314+
*
315+
* @param event - The Lambda event to resolve
316+
* @param context - The Lambda context
317+
* @param options - Stream resolve options including the response stream
318+
*/
319+
public async resolveStream(
320+
event: unknown,
321+
context: Context,
322+
options: ResolveStreamOptions
323+
): Promise<void> {
324+
const result = await this.#resolve(event, context, options);
325+
await this.#streamHandlerResponse(result, options.responseStream);
326+
}
327+
328+
/**
329+
* Streams a handler response to the Lambda response stream.
330+
* Converts the response to a web response and pipes it through the stream.
331+
*
332+
* @param response - The handler response to stream
333+
* @param responseStream - The Lambda response stream to write to
334+
*/
335+
async #streamHandlerResponse(
336+
response: HandlerResponse,
337+
responseStream: ResponseStream
338+
) {
339+
const webResponse = handlerResultToWebResponse(response);
340+
const { headers } = webHeadersToApiGatewayV1Headers(webResponse.headers);
341+
const resStream = HttpResponseStream.from(responseStream, {
342+
statusCode: webResponse.status,
343+
headers,
344+
});
345+
346+
if (webResponse.body) {
347+
const nodeStream = Readable.fromWeb(
348+
webResponse.body as streamWeb.ReadableStream
349+
);
350+
await pipeline(nodeStream, resStream);
351+
} else {
352+
resStream.write('');
291353
}
292354
}
293355

@@ -320,7 +382,7 @@ class Router {
320382
try {
321383
const { scope, ...reqCtx } = options;
322384
const body = await handler.apply(scope ?? this, [error, reqCtx]);
323-
if (body instanceof Response || isAPIGatewayProxyResult(body)) {
385+
if (body instanceof Response || isExtendedAPIGatewayProxyResult(body)) {
324386
return body;
325387
}
326388
if (!body.statusCode) {

0 commit comments

Comments
 (0)