diff --git a/packages/event-handler/src/rest/utils.ts b/packages/event-handler/src/rest/utils.ts index caf4ad82f8..e040a3759a 100644 --- a/packages/event-handler/src/rest/utils.ts +++ b/packages/event-handler/src/rest/utils.ts @@ -1,4 +1,4 @@ -import { Readable, Writable } from 'node:stream'; +import { Duplex, Readable, Writable } from 'node:stream'; import { isRecord, isRegExp, @@ -112,7 +112,7 @@ export const isNodeReadableStream = (value: unknown): value is Readable => { return ( value != null && typeof value === 'object' && - value instanceof Readable && + (value instanceof Readable || value instanceof Duplex) && 'readable' in value && 'read' in value && typeof value.read === 'function' diff --git a/packages/event-handler/tests/unit/rest/Router/streaming.test.ts b/packages/event-handler/tests/unit/rest/Router/streaming.test.ts index 16a172b4df..46f29e834c 100644 --- a/packages/event-handler/tests/unit/rest/Router/streaming.test.ts +++ b/packages/event-handler/tests/unit/rest/Router/streaming.test.ts @@ -1,4 +1,4 @@ -import { Readable } from 'node:stream'; +import { Duplex, PassThrough, Readable } from 'node:stream'; import context from '@aws-lambda-powertools/testing-utils/context'; import { describe, expect, it, vi } from 'vitest'; import { UnauthorizedError } from '../../../../src/rest/errors.js'; @@ -306,4 +306,29 @@ describe('Class: Router - Streaming', () => { app.resolveStream(invalidEvent, context, { responseStream }) ).rejects.toThrow(); }); + + it('handles duplex stream body', async () => { + // Prepare + const app = new Router(); + const passThrough = new PassThrough(); + passThrough.write(Buffer.from('{"message":"duplex stream body"}')); + passThrough.end(); + + app.get('/test', () => ({ + statusCode: 200, + body: Duplex.from(passThrough), + })); + + const responseStream = new MockResponseStream(); + + // Act + await app.resolveStream(createTestEvent('/test', 'GET'), context, { + responseStream, + }); + + // Assess + const { prelude, body } = parseStreamOutput(responseStream.chunks); + expect(prelude.statusCode).toBe(200); + expect(JSON.parse(body)).toEqual({ message: 'duplex stream body' }); + }); });