Skip to content

[core] Implement Lambda streaming with custom HTTP headers #521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Jul 24, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions Sources/AWSLambdaRuntime/LambdaHandlers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,14 @@ public protocol StreamingLambdaHandler: _Lambda_SendableMetatype {
public protocol LambdaResponseStreamWriter {
/// Write a response part into the stream. Bytes written are streamed continually.
/// - Parameter buffer: The buffer to write.
func write(_ buffer: ByteBuffer) async throws
func write(_ buffer: ByteBuffer, hasCustomHeaders: Bool) async throws

/// End the response stream and the underlying HTTP response.
func finish() async throws

/// Write a response part into the stream and then end the stream as well as the underlying HTTP response.
/// - Parameter buffer: The buffer to write.
func writeAndFinish(_ buffer: ByteBuffer) async throws

/// Write a response part into the stream.
// In the context of streaming Lambda, this is used to allow the user
// to send custom headers or statusCode.
/// - Note: user should use the writeStatusAndHeaders(:StreamingLambdaStatusAndHeadersResponse)
// function to write the status code and headers
/// - Parameter buffer: The buffer corresponding to the status code and headers to write.
func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws

}

/// This handler protocol is intended to serve the most common use-cases.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,14 @@ extension LambdaResponseStreamWriter {
buffer.writeBytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00])

// Write the JSON data and the separator
try await writeCustomHeader(buffer)
try await self.write(buffer, hasCustomHeaders: true)
}

/// Write a response part into the stream. Bytes written are streamed continually.
/// - Parameter buffer: The buffer to write.
public func write(_ buffer: ByteBuffer) async throws {
// Write the buffer to the response stream
try await self.write(buffer, hasCustomHeaders: false)
}
}

Expand Down
53 changes: 25 additions & 28 deletions Sources/AWSLambdaRuntime/LambdaRuntimeClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ import Synchronization

@usableFromInline
final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {
@usableFromInline
var _hasStreamingCustomHeaders = false

@usableFromInline
nonisolated let unownedExecutor: UnownedSerialExecutor

Expand All @@ -47,13 +44,8 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {
}

@usableFromInline
func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws {
try await self.runtimeClient.writeCustomHeader(buffer)
}

@usableFromInline
func write(_ buffer: NIOCore.ByteBuffer) async throws {
try await self.runtimeClient.write(buffer)
func write(_ buffer: NIOCore.ByteBuffer, hasCustomHeaders: Bool = false) async throws {
try await self.runtimeClient.write(buffer, hasCustomHeaders: hasCustomHeaders)
}

@usableFromInline
Expand Down Expand Up @@ -197,11 +189,7 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {
}
}

private func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws {
_hasStreamingCustomHeaders = true
try await self.write(buffer)
}
private func write(_ buffer: NIOCore.ByteBuffer) async throws {
private func write(_ buffer: NIOCore.ByteBuffer, hasCustomHeaders: Bool = false) async throws {
switch self.lambdaState {
case .idle, .sentResponse:
throw LambdaRuntimeError(code: .writeAfterFinishHasBeenSent)
Expand All @@ -218,12 +206,15 @@ final actor LambdaRuntimeClient: LambdaRuntimeClientProtocol {
guard case .sendingResponse(requestID) = self.lambdaState else {
fatalError("Invalid state: \(self.lambdaState)")
}
return try await handler.writeResponseBodyPart(buffer, requestID: requestID)
return try await handler.writeResponseBodyPart(
buffer,
requestID: requestID,
hasCustomHeaders: hasCustomHeaders
)
}
}

private func writeAndFinish(_ buffer: NIOCore.ByteBuffer?) async throws {
_hasStreamingCustomHeaders = false
switch self.lambdaState {
case .idle, .sentResponse:
throw LambdaRuntimeError(code: .finishAfterFinishHasBeenSent)
Expand Down Expand Up @@ -444,16 +435,11 @@ extension LambdaRuntimeClient: LambdaChannelHandlerDelegate {
}
}
}

func hasStreamingCustomHeaders(isolation: isolated (any Actor)? = #isolation) async -> Bool {
await self._hasStreamingCustomHeaders
}
}

private protocol LambdaChannelHandlerDelegate {
func connectionWillClose(channel: any Channel)
func connectionErrorHappened(_ error: any Error, channel: any Channel)
func hasStreamingCustomHeaders(isolation: isolated (any Actor)?) async -> Bool
}

private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate> {
Expand Down Expand Up @@ -596,18 +582,29 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
func writeResponseBodyPart(
isolation: isolated (any Actor)? = #isolation,
_ byteBuffer: ByteBuffer,
requestID: String
requestID: String,
hasCustomHeaders: Bool
) async throws {
switch self.state {
case .connected(_, .waitingForNextInvocation):
fatalError("Invalid state: \(self.state)")

case .connected(let context, .waitingForResponse):
self.state = .connected(context, .sendingResponse)
try await self.sendResponseBodyPart(byteBuffer, sendHeadWithRequestID: requestID, context: context)
try await self.sendResponseBodyPart(
byteBuffer,
sendHeadWithRequestID: requestID,
context: context,
hasCustomHeaders: hasCustomHeaders
)

case .connected(let context, .sendingResponse):
try await self.sendResponseBodyPart(byteBuffer, sendHeadWithRequestID: nil, context: context)
try await self.sendResponseBodyPart(
byteBuffer,
sendHeadWithRequestID: nil,
context: context,
hasCustomHeaders: hasCustomHeaders
)

case .connected(_, .idle),
.connected(_, .sentResponse):
Expand Down Expand Up @@ -658,15 +655,16 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
isolation: isolated (any Actor)? = #isolation,
_ byteBuffer: ByteBuffer,
sendHeadWithRequestID: String?,
context: ChannelHandlerContext
context: ChannelHandlerContext,
hasCustomHeaders: Bool
) async throws {

if let requestID = sendHeadWithRequestID {
// TODO: This feels super expensive. We should be able to make this cheaper. requestIDs are fixed length.
let url = Consts.invocationURLPrefix + "/" + requestID + Consts.postResponseURLSuffix

var headers = self.streamingHeaders
if await self.delegate.hasStreamingCustomHeaders(isolation: #isolation) {
if hasCustomHeaders {
// this header is required by Function URL when the user sends custom status code or headers
headers.add(name: "Content-Type", value: "application/vnd.awslambda.http-integration-response")
}
Expand Down Expand Up @@ -764,7 +762,6 @@ private final class LambdaChannelHandler<Delegate: LambdaChannelHandlerDelegate>
}

private func sendResponseStreamingFailure(error: any Error, context: ChannelHandlerContext) {
// TODO: Use base64 here
let trailers: HTTPHeaders = [
"Lambda-Runtime-Function-Error-Type": "Unhandled",
"Lambda-Runtime-Function-Error-Body": "Requires base64",
Expand Down
3 changes: 1 addition & 2 deletions Sources/AWSLambdaRuntime/LambdaRuntimeClientProtocol.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ import NIOCore

@usableFromInline
package protocol LambdaRuntimeClientResponseStreamWriter: LambdaResponseStreamWriter {
func write(_ buffer: ByteBuffer) async throws
func write(_ buffer: ByteBuffer, hasCustomHeaders: Bool) async throws
func finish() async throws
func writeAndFinish(_ buffer: ByteBuffer) async throws
func reportError(_ error: any Error) async throws
func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws
}

@usableFromInline
Expand Down
8 changes: 1 addition & 7 deletions Tests/AWSLambdaRuntimeTests/Lambda+CodableTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,12 @@ struct JSONTests {
self._buffer = buffer
}

func write(_ buffer: ByteBuffer) async throws {
func write(_ buffer: ByteBuffer, hasCustomHeaders: Bool = false) async throws {
fatalError("Unexpected call")
}

func finish() async throws {
fatalError("Unexpected call")
}

func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws {
// This is a mock, so we don't actually write custom headers.
// In a real implementation, this would handle writing custom headers.
fatalError("Unexpected call to writeCustomHeader")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -537,11 +537,12 @@ final class MockLambdaResponseStreamWriter: LambdaResponseStreamWriter {
let nullBytes: [UInt8] = [0, 0, 0, 0, 0, 0, 0, 0]
buffer.writeBytes(nullBytes)

try await self.writeCustomHeader(buffer)
try await self.write(buffer, hasCustomHeaders: true)
}

func write(_ buffer: ByteBuffer) async throws {
func write(_ buffer: ByteBuffer, hasCustomHeaders: Bool = false) async throws {
writtenBuffers.append(buffer)
self.hasCustomHeaders = hasCustomHeaders
}

func finish() async throws {
Expand All @@ -552,11 +553,6 @@ final class MockLambdaResponseStreamWriter: LambdaResponseStreamWriter {
writtenBuffers.append(buffer)
isFinished = true
}

func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws {
hasCustomHeaders = true
try await self.write(buffer)
}
}

// MARK: - Error Handling Mock Implementations
Expand All @@ -579,11 +575,12 @@ final class FailingMockLambdaResponseStreamWriter: LambdaResponseStreamWriter {
) async throws {
var buffer = ByteBuffer()
buffer.writeString("{\"statusCode\":200}")
try await writeCustomHeader(buffer)
try await write(buffer, hasCustomHeaders: true)
}

func write(_ buffer: ByteBuffer) async throws {
func write(_ buffer: ByteBuffer, hasCustomHeaders: Bool = false) async throws {
writeCallCount += 1
self.hasCustomHeaders = hasCustomHeaders

if writeCallCount == failOnWriteCall {
throw TestWriteError()
Expand All @@ -601,10 +598,6 @@ final class FailingMockLambdaResponseStreamWriter: LambdaResponseStreamWriter {
try await finish()
}

func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws {
hasCustomHeaders = true
try await write(buffer)
}
}

// MARK: - Test Error Types
Expand Down Expand Up @@ -693,11 +686,12 @@ final class TrackingLambdaResponseStreamWriter: LambdaResponseStreamWriter {
) async throws {
var buffer = ByteBuffer()
buffer.writeString("{\"statusCode\":200}")
try await writeCustomHeader(buffer)
try await write(buffer, hasCustomHeaders: true)
}

func write(_ buffer: ByteBuffer) async throws {
func write(_ buffer: ByteBuffer, hasCustomHeaders: Bool = false) async throws {
writeCallCount += 1
self.hasCustomHeaders = hasCustomHeaders
writtenBuffers.append(buffer)
}

Expand All @@ -712,10 +706,6 @@ final class TrackingLambdaResponseStreamWriter: LambdaResponseStreamWriter {
isFinished = true
}

func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws {
hasCustomHeaders = true
try await write(buffer)
}
}

/// Mock implementation with custom behavior for integration testing
Expand All @@ -732,12 +722,13 @@ final class CustomBehaviorLambdaResponseStreamWriter: LambdaResponseStreamWriter
customBehaviorTriggered = true
var buffer = ByteBuffer()
buffer.writeString("{\"statusCode\":200}")
try await writeCustomHeader(buffer)
try await write(buffer, hasCustomHeaders: true)
}

func write(_ buffer: ByteBuffer) async throws {
func write(_ buffer: ByteBuffer, hasCustomHeaders: Bool = false) async throws {
// Trigger custom behavior on any write
customBehaviorTriggered = true
self.hasCustomHeaders = hasCustomHeaders
writtenBuffers.append(buffer)
}

Expand All @@ -750,9 +741,4 @@ final class CustomBehaviorLambdaResponseStreamWriter: LambdaResponseStreamWriter
writtenBuffers.append(buffer)
isFinished = true
}

func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws {
hasCustomHeaders = true
try await write(buffer)
}
}
13 changes: 5 additions & 8 deletions Tests/AWSLambdaRuntimeTests/MockLambdaClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ struct MockLambdaWriter: LambdaRuntimeClientResponseStreamWriter {
self.underlying = underlying
}

func write(_ buffer: ByteBuffer) async throws {
try await self.underlying.write(buffer)
func write(_ buffer: ByteBuffer, hasCustomHeaders: Bool = false) async throws {
try await self.underlying.write(buffer, hasCustomHeaders: hasCustomHeaders)
}

func finish() async throws {
Expand All @@ -45,9 +45,6 @@ struct MockLambdaWriter: LambdaRuntimeClientResponseStreamWriter {
func reportError(_ error: any Error) async throws {
await self.underlying.reportError(error)
}

func writeCustomHeader(_ buffer: NIOCore.ByteBuffer) async throws {
}
}

enum LambdaError: Error, Equatable {
Expand Down Expand Up @@ -158,7 +155,7 @@ final actor MockLambdaClient: LambdaRuntimeClientProtocol {
}
}

mutating func writeResult(buffer: ByteBuffer) -> ResultAction {
mutating func writeResult(buffer: ByteBuffer, hasCustomHeaders: Bool = false) -> ResultAction {
switch self.state {
case .handlerIsProcessing(var accumulatedResponse, let eventProcessedHandler):
accumulatedResponse.append(buffer)
Expand Down Expand Up @@ -279,8 +276,8 @@ final actor MockLambdaClient: LambdaRuntimeClientProtocol {
}
}

func write(_ buffer: ByteBuffer) async throws {
switch self.stateMachine.writeResult(buffer: buffer) {
func write(_ buffer: ByteBuffer, hasCustomHeaders: Bool = false) async throws {
switch self.stateMachine.writeResult(buffer: buffer, hasCustomHeaders: hasCustomHeaders) {
case .readyForMore:
break
case .fail(let error):
Expand Down