Skip to content

Commit 83aa018

Browse files
committed
add support for streaming lambda function + custom status code and HTTP headers
1 parent b6fb60c commit 83aa018

13 files changed

+299
-90
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@ Package.resolved
1313
Makefile
1414
.devcontainer
1515
.amazonq
16-
.kiro
16+
.kiro
17+
nodejs

Examples/Streaming/Sources/main.swift

Lines changed: 3 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -31,21 +31,19 @@ struct SendNumbersWithPause: StreamingLambdaHandler {
3131
// Send HTTP status code and headers before streaming the response body
3232
try await responseWriter.writeStatusAndHeaders(
3333
StreamingLambdaStatusAndHeadersResponse(
34-
statusCode: 200,
34+
statusCode: 418, // I'm a tea pot
3535
headers: [
3636
"Content-Type": "text/plain",
3737
"x-my-custom-header": "streaming-example",
38-
],
39-
multiValueHeaders: [
40-
"Set-Cookie": ["session=abc123", "theme=dark"]
4138
]
4239
)
4340
)
4441

4542
// Stream numbers with pauses to demonstrate streaming functionality
46-
for i in 1...10 {
43+
for i in 1...3 {
4744
// Send partial data
4845
try await responseWriter.write(ByteBuffer(string: "Number: \(i)\n"))
46+
4947
// Perform some long asynchronous work to simulate processing
5048
try await Task.sleep(for: .milliseconds(1000))
5149
}
@@ -58,64 +56,5 @@ struct SendNumbersWithPause: StreamingLambdaHandler {
5856
}
5957
}
6058

61-
// Example of a more complex streaming handler that demonstrates different response scenarios
62-
struct ConditionalStreamingHandler: StreamingLambdaHandler {
63-
func handle(
64-
_ event: ByteBuffer,
65-
responseWriter: some LambdaResponseStreamWriter,
66-
context: LambdaContext
67-
) async throws {
68-
69-
// Parse the event to determine response type
70-
let eventString = String(buffer: event)
71-
let shouldError = eventString.contains("error")
72-
73-
if shouldError {
74-
// Send error response with appropriate status code
75-
try await responseWriter.writeStatusAndHeaders(
76-
StreamingLambdaStatusAndHeadersResponse(
77-
statusCode: 400,
78-
headers: [
79-
"Content-Type": "application/json",
80-
"x-error-type": "client-error",
81-
]
82-
)
83-
)
84-
85-
try await responseWriter.writeAndFinish(
86-
ByteBuffer(string: #"{"error": "Bad request", "message": "Error requested in input"}"#)
87-
)
88-
} else {
89-
// Send successful response with streaming data
90-
try await responseWriter.writeStatusAndHeaders(
91-
StreamingLambdaStatusAndHeadersResponse(
92-
statusCode: 200,
93-
headers: [
94-
"Content-Type": "application/json",
95-
"Cache-Control": "no-cache",
96-
]
97-
)
98-
)
99-
100-
// Stream JSON array elements
101-
try await responseWriter.write(ByteBuffer(string: "["))
102-
103-
for i in 1...5 {
104-
if i > 1 {
105-
try await responseWriter.write(ByteBuffer(string: ","))
106-
}
107-
try await responseWriter.write(
108-
ByteBuffer(string: #"{"id": \#(i), "timestamp": "\#(Date().timeIntervalSince1970)"}"#)
109-
)
110-
try await Task.sleep(for: .milliseconds(500))
111-
}
112-
113-
try await responseWriter.write(ByteBuffer(string: "]"))
114-
try await responseWriter.finish()
115-
}
116-
}
117-
}
118-
119-
// Use the simple example by default
12059
let runtime = LambdaRuntime(handler: SendNumbersWithPause())
12160
try await runtime.run()

Examples/Streaming/template.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ Resources:
1717
FunctionUrlConfig:
1818
AuthType: AWS_IAM
1919
InvokeMode: RESPONSE_STREAM
20+
Environment:
21+
Variables:
22+
LOG_LEVEL: trace
2023

2124
Outputs:
2225
# print Lambda function URL

Sources/AWSLambdaRuntime/LambdaHandlers.swift

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,15 @@ public protocol LambdaResponseStreamWriter {
5757
/// Write a response part into the stream and then end the stream as well as the underlying HTTP response.
5858
/// - Parameter buffer: The buffer to write.
5959
func writeAndFinish(_ buffer: ByteBuffer) async throws
60+
61+
/// Write a response part into the stream.
62+
// In the context of streaming Lambda, this is used to allow the user
63+
// to send custom headers or statusCode.
64+
/// - Note: user should use the writeStatusAndHeaders(:StreamingLambdaStatusAndHeadersResponse)
65+
// function to write the status code and headers
66+
/// - Parameter buffer: The buffer corresponding to the status code and headers to write.
67+
func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws
68+
6069
}
6170

6271
/// This handler protocol is intended to serve the most common use-cases.

Sources/AWSLambdaRuntime/LambdaResponseStreamWriter+Headers.swift

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public struct StreamingLambdaStatusAndHeadersResponse: Codable, Sendable {
4444
public init(
4545
statusCode: Int,
4646
headers: [String: String]? = nil,
47-
multiValueHeaders: [String: [String]]? = nil
47+
multiValueHeaders: [String: [String]]? = nil,
4848
) {
4949
self.statusCode = statusCode
5050
self.headers = headers
@@ -67,17 +67,15 @@ extension LambdaResponseStreamWriter {
6767
encoder: Encoder
6868
) async throws where Encoder.Output == StreamingLambdaStatusAndHeadersResponse {
6969

70-
// Convert Data to ByteBuffer
70+
// Convert JSON headers to an array of bytes in a ByteBuffer
7171
var buffer = ByteBuffer()
7272
try encoder.encode(response, into: &buffer)
7373

74-
// Write the JSON data
75-
try await write(buffer)
76-
7774
// Write eight null bytes as separator
78-
var separatorBuffer = ByteBuffer()
79-
separatorBuffer.writeBytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
80-
try await write(separatorBuffer)
75+
buffer.writeBytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])
76+
77+
// Write the JSON data and the separator
78+
try await writeCustomHeader(buffer)
8179
}
8280
}
8381

@@ -95,6 +93,7 @@ extension LambdaResponseStreamWriter {
9593
_ response: StreamingLambdaStatusAndHeadersResponse,
9694
encoder: JSONEncoder = JSONEncoder()
9795
) async throws {
96+
encoder.outputFormatting = .withoutEscapingSlashes
9897
try await self.writeStatusAndHeaders(response, encoder: LambdaJSONOutputEncoder(encoder))
9998
}
10099
}

Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift

Lines changed: 57 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,13 @@ import Logging
1616
import NIOCore
1717
import NIOHTTP1
1818
import NIOPosix
19+
import Synchronization
1920

2021
@usableFromInline
2122
final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {
23+
@usableFromInline
24+
var _hasStreamingCustomHeaders = false
25+
2226
@usableFromInline
2327
nonisolated let unownedExecutor: UnownedSerialExecutor
2428

@@ -42,6 +46,11 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {
4246
self.runtimeClient = runtimeClient
4347
}
4448

49+
@usableFromInline
50+
func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws {
51+
try await self.runtimeClient.writeCustomHeader(buffer)
52+
}
53+
4554
@usableFromInline
4655
func write(_ buffer: NIOCore.ByteBuffer) async throws {
4756
try await self.runtimeClient.write(buffer)
@@ -188,6 +197,10 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {
188197
}
189198
}
190199

200+
private func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws {
201+
_hasStreamingCustomHeaders = true
202+
try await self.write(buffer)
203+
}
191204
private func write(_ buffer: NIOCore.ByteBuffer) async throws {
192205
switch self.lambdaState {
193206
case .idle, .sentResponse:
@@ -210,6 +223,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {
210223
}
211224

212225
private func writeAndFinish(_ buffer: NIOCore.ByteBuffer?) async throws {
226+
_hasStreamingCustomHeaders = false
213227
switch self.lambdaState {
214228
case .idle, .sentResponse:
215229
throw LambdaRuntimeError(code: .finishAfterFinishHasBeenSent)
@@ -330,7 +344,11 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {
330344
NIOHTTPClientResponseAggregator(maxContentLength: 6 * 1024 * 1024)
331345
)
332346
try channel.pipeline.syncOperations.addHandler(
333-
LambdaChannelHandler(delegate: self, logger: self.logger, configuration: self.configuration)
347+
LambdaChannelHandler(
348+
delegate: self,
349+
logger: self.logger,
350+
configuration: self.configuration
351+
)
334352
)
335353
return channel.eventLoop.makeSucceededFuture(())
336354
} catch {
@@ -425,13 +443,17 @@ extension LambdaRuntimeClient: LambdaChannelHandlerDelegate {
425443

426444
}
427445
}
446+
}
428447

448+
func hasStreamingCustomHeaders(isolation: isolated (any Actor)? = #isolation) async -> Bool {
449+
await self._hasStreamingCustomHeaders
429450
}
430451
}
431452

432453
private protocol LambdaChannelHandlerDelegate {
433454
func connectionWillClose(channel: any Channel)
434455
func connectionErrorHappened(_ error: any Error, channel: any Channel)
456+
func hasStreamingCustomHeaders(isolation: isolated (any Actor)?) async -> Bool
435457
}
436458

437459
private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate> {
@@ -467,10 +489,16 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
467489
let defaultHeaders: HTTPHeaders
468490
/// These headers must be sent along an invocation or initialization error report
469491
let errorHeaders: HTTPHeaders
470-
/// These headers must be sent when streaming a response
492+
/// These headers must be sent when streaming a large response
493+
let largeResponseHeaders: HTTPHeaders
494+
/// These headers must be sent when the handler streams its response
471495
let streamingHeaders: HTTPHeaders
472496

473-
init(delegate: Delegate, logger: Logger, configuration: LambdaRuntimeClient.Configuration) {
497+
init(
498+
delegate: Delegate,
499+
logger: Logger,
500+
configuration: LambdaRuntimeClient.Configuration
501+
) {
474502
self.delegate = delegate
475503
self.logger = logger
476504
self.configuration = configuration
@@ -483,11 +511,23 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
483511
"user-agent": .userAgent,
484512
"lambda-runtime-function-error-type": "Unhandled",
485513
]
486-
self.streamingHeaders = [
514+
self.largeResponseHeaders = [
487515
"host": "\(self.configuration.ip):\(self.configuration.port)",
488516
"user-agent": .userAgent,
489517
"transfer-encoding": "chunked",
490518
]
519+
// https://docs.aws.amazon.com/lambda/latest/dg/runtimes-custom.html#runtimes-custom-response-streaming
520+
// These are the headers returned by the Runtime to the Lambda Data plane.
521+
// These are not the headers the Lambda Data plane sends to the caller of the Lambda function
522+
// The developer of the function can set the caller's headers in the handler code.
523+
self.streamingHeaders = [
524+
"host": "\(self.configuration.ip):\(self.configuration.port)",
525+
"user-agent": .userAgent,
526+
"Lambda-Runtime-Function-Response-Mode": "streaming",
527+
// these are not used by this runtime client at the moment
528+
// FIXME: the eror handling should inject these headers in the streamed response to report mid-stream errors
529+
"Trailer": "Lambda-Runtime-Function-Error-Type, Lambda-Runtime-Function-Error-Body",
530+
]
491531
}
492532

493533
func nextInvocation(isolation: isolated (any Actor)? = #isolation) async throws -> Invocation {
@@ -625,11 +665,16 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
625665
// TODO: This feels super expensive. We should be able to make this cheaper. requestIDs are fixed length
626666
let url = Consts.invocationURLPrefix + "/" + requestID + Consts.postResponseURLSuffix
627667

668+
var headers = self.streamingHeaders
669+
if await self.delegate.hasStreamingCustomHeaders(isolation: #isolation) {
670+
// this headers is required by Function URL when the user sends custom status code or headers
671+
headers.add(name: "Content-Type", value: "application/vnd.awslambda.http-integration-response")
672+
}
628673
let httpRequest = HTTPRequestHead(
629674
version: .http1_1,
630675
method: .POST,
631676
uri: url,
632-
headers: self.streamingHeaders
677+
headers: headers
633678
)
634679

635680
context.write(self.wrapOutboundOut(.head(httpRequest)), promise: nil)
@@ -652,17 +697,13 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
652697

653698
// If we have less than 6MB, we don't want to use the streaming API. If we have more
654699
// than 6MB we must use the streaming mode.
655-
let headers: HTTPHeaders =
656-
if byteBuffer?.readableBytes ?? 0 < 6_000_000 {
657-
[
658-
"host": "\(self.configuration.ip):\(self.configuration.port)",
659-
"user-agent": .userAgent,
660-
"content-length": "\(byteBuffer?.readableBytes ?? 0)",
661-
]
662-
} else {
663-
self.streamingHeaders
664-
}
665-
700+
var headers: HTTPHeaders!
701+
if byteBuffer?.readableBytes ?? 0 < 6_000_000 {
702+
headers = self.defaultHeaders
703+
headers.add(name: "content-length", value: "\(byteBuffer?.readableBytes ?? 0)")
704+
} else {
705+
headers = self.largeResponseHeaders
706+
}
666707
let httpRequest = HTTPRequestHead(
667708
version: .http1_1,
668709
method: .POST,

Sources/AWSLambdaRuntime/LambdaRuntimeClientProtocol.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package protocol LambdaRuntimeClientResponseStreamWriter: LambdaResponseStreamWr
2020
func finish() async throws
2121
func writeAndFinish(_ buffer: ByteBuffer) async throws
2222
func reportError(_ error: any Error) async throws
23+
func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws
2324
}
2425

2526
@usableFromInline

Tests/AWSLambdaRuntimeTests/Lambda+CodableTests.swift

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,5 +96,11 @@ struct JSONTests {
9696
func finish() async throws {
9797
fatalError("Unexpected call")
9898
}
99+
100+
func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws {
101+
// This is a mock, so we don't actually write custom headers.
102+
// In a real implementation, this would handle writing custom headers.
103+
fatalError("Unexpected call to writeCustomHeader")
104+
}
99105
}
100106
}

Tests/AWSLambdaRuntimeTests/LambdaResponseStreamWriter+HeadersTests.swift

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -587,6 +587,12 @@ final class MockLambdaResponseStreamWriter: LambdaResponseStreamWriter {
587587
writtenBuffers.append(buffer)
588588
isFinished = true
589589
}
590+
591+
func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws {
592+
// This is a mock, so we don't actually write custom headers.
593+
// In a real implementation, this would handle writing custom headers.
594+
fatalError("Unexpected call to writeCustomHeader")
595+
}
590596
}
591597

592598
// MARK: - Error Handling Mock Implementations
@@ -620,6 +626,12 @@ final class FailingMockLambdaResponseStreamWriter: LambdaResponseStreamWriter {
620626
try await write(buffer)
621627
try await finish()
622628
}
629+
630+
func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws {
631+
// This is a mock, so we don't actually write custom headers.
632+
// In a real implementation, this would handle writing custom headers.
633+
fatalError("Unexpected call to writeCustomHeader")
634+
}
623635
}
624636

625637
// MARK: - Test Error Types
@@ -716,6 +728,12 @@ final class TrackingLambdaResponseStreamWriter: LambdaResponseStreamWriter {
716728
writtenBuffers.append(buffer)
717729
isFinished = true
718730
}
731+
732+
func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws {
733+
// This is a mock, so we don't actually write custom headers.
734+
// In a real implementation, this would handle writing custom headers.
735+
fatalError("Unexpected call to writeCustomHeader")
736+
}
719737
}
720738

721739
/// Mock implementation with custom behavior for integration testing
@@ -739,4 +757,10 @@ final class CustomBehaviorLambdaResponseStreamWriter: LambdaResponseStreamWriter
739757
writtenBuffers.append(buffer)
740758
isFinished = true
741759
}
760+
761+
func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws {
762+
// This is a mock, so we don't actually write custom headers.
763+
// In a real implementation, this would handle writing custom headers.
764+
fatalError("Unexpected call to writeCustomHeader")
765+
}
742766
}

0 commit comments

Comments
 (0)