Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Sources/AWSLambdaRuntime/Lambda+LocalServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,14 @@ internal struct LambdaHTTPServer {
await self.responsePool.push(
LocalServerResponse(id: requestId, final: true)
)

// Send acknowledgment back to Lambda runtime client for streaming END
// This is the single HTTP response to the chunked HTTP request
try await self.sendResponse(
.init(id: requestId, status: .accepted, final: true),
outbound: outbound,
logger: logger
)
} else {
// process the buffered response for non streaming requests
try await self.processRequestAndSendResponse(
Expand Down
346 changes: 346 additions & 0 deletions Tests/AWSLambdaRuntimeTests/LambdaLocalServer+StreamingTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,346 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2025 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Logging
import NIOCore
import NIOHTTP1
import NIOPosix
import Testing

@testable import AWSLambdaRuntime

#if canImport(FoundationEssentials)
import FoundationEssentials
#else
import Foundation
#endif

#if canImport(FoundationNetworking)
import FoundationNetworking
#else
import Foundation
#endif

extension LambdaLocalServerTest {
@Test("Streaming handler sends multiple chunks and completes successfully")
@available(LambdaSwift 2.0, *)
func testStreamingHandlerMultipleChunks() async throws {
let customPort = 8090

// Set environment variable
setenv("LOCAL_LAMBDA_PORT", "\(customPort)", 1)
defer { unsetenv("LOCAL_LAMBDA_PORT") }

let results = try await withThrowingTaskGroup(of: StreamingTestResult.self) { group in

// Start the Lambda runtime with streaming handler
group.addTask {
struct StreamingTestHandler: StreamingLambdaHandler {
func handle(
_ event: ByteBuffer,
responseWriter: some LambdaResponseStreamWriter,
context: LambdaContext
) async throws {
// Send multiple chunks with delays to test streaming
for i in 1...3 {
try await responseWriter.write(ByteBuffer(string: "Chunk \(i)\n"))
try await Task.sleep(for: .milliseconds(50))
}
try await responseWriter.finish()
}
}

let runtime = LambdaRuntime(
handler: StreamingTestHandler()
)

try await runtime._run()
return StreamingTestResult(chunks: [], statusCode: 0, completed: false)
}

// Start HTTP client to make streaming request
group.addTask {
// Give server time to start
try await Task.sleep(for: .milliseconds(200))

return try await self.makeStreamingInvokeRequest(
host: "127.0.0.1",
port: customPort,
payload: "\"test-event\""
)
}

// Get the first result (streaming response) and cancel the runtime
let first = try await group.next()
group.cancelAll()
return first ?? StreamingTestResult(chunks: [], statusCode: 0, completed: false)
}

// Verify streaming response
#expect(results.statusCode == 200, "Expected 200 OK, got \(results.statusCode)")
#expect(results.completed, "Streaming response should be completed")
#expect(results.chunks.count >= 1, "Expected at least 1 chunk, got \(results.chunks.count)")

// The streaming chunks are concatenated in the HTTP response
let fullResponse = results.chunks.joined()
let expectedContent = "Chunk 1\nChunk 2\nChunk 3\n"
#expect(fullResponse == expectedContent, "Response was '\(fullResponse)', expected '\(expectedContent)'")
}

@Test("Multiple streaming invocations work correctly")
@available(LambdaSwift 2.0, *)
func testMultipleStreamingInvocations() async throws {
let customPort = 8091

setenv("LOCAL_LAMBDA_PORT", "\(customPort)", 1)
defer { unsetenv("LOCAL_LAMBDA_PORT") }

let results = try await withThrowingTaskGroup(of: [StreamingTestResult].self) { group in

// Start the Lambda runtime
group.addTask {
struct MultiStreamingHandler: StreamingLambdaHandler {
func handle(
_ event: ByteBuffer,
responseWriter: some LambdaResponseStreamWriter,
context: LambdaContext
) async throws {
let eventString = String(buffer: event)
try await responseWriter.write(ByteBuffer(string: "Echo: \(eventString)\n"))
try await responseWriter.finish()
}
}

let runtime = LambdaRuntime(
handler: MultiStreamingHandler()
)

try await runtime._run()
return []
}

// Make multiple streaming requests
group.addTask {
try await Task.sleep(for: .milliseconds(200))

var results: [StreamingTestResult] = []

// Make 3 sequential streaming requests
for i in 1...3 {
let result = try await self.makeStreamingInvokeRequest(
host: "127.0.0.1",
port: customPort,
payload: "\"request-\(i)\""
)
results.append(result)

// Small delay between requests
try await Task.sleep(for: .milliseconds(100))
}

return results
}

let first = try await group.next()
group.cancelAll()
return first ?? []
}

// Verify all requests completed successfully
#expect(results.count == 3, "Expected 3 responses, got \(results.count)")

for (index, result) in results.enumerated() {
#expect(result.statusCode == 200, "Request \(index + 1) returned \(result.statusCode), expected 200")
#expect(result.completed, "Request \(index + 1) should be completed")
#expect(result.chunks.count == 1, "Request \(index + 1) should have 1 chunk, got \(result.chunks.count)")

let expectedContent = "Echo: \"request-\(index + 1)\"\n"
#expect(result.chunks.first == expectedContent, "Request \(index + 1) content mismatch")
}
}

@Test("Streaming handler with custom headers works correctly")
@available(LambdaSwift 2.0, *)
func testStreamingHandlerWithCustomHeaders() async throws {
let customPort = 8092

setenv("LOCAL_LAMBDA_PORT", "\(customPort)", 1)
defer { unsetenv("LOCAL_LAMBDA_PORT") }

let results = try await withThrowingTaskGroup(of: StreamingTestResult.self) { group in

group.addTask {
struct HeaderStreamingHandler: StreamingLambdaHandler {
func handle(
_ event: ByteBuffer,
responseWriter: some LambdaResponseStreamWriter,
context: LambdaContext
) async throws {
// Send custom headers
try await responseWriter.writeStatusAndHeaders(
StreamingLambdaStatusAndHeadersResponse(
statusCode: 201,
headers: [
"Content-Type": "text/plain",
"X-Custom-Header": "streaming-test",
]
)
)

try await responseWriter.write(ByteBuffer(string: "Custom response"))
try await responseWriter.finish()
}
}

let runtime = LambdaRuntime(
handler: HeaderStreamingHandler()
)

try await runtime._run()
return StreamingTestResult(chunks: [], statusCode: 0, completed: false)
}

group.addTask {
try await Task.sleep(for: .milliseconds(200))

return try await self.makeStreamingInvokeRequest(
host: "127.0.0.1",
port: customPort,
payload: "\"header-test\""
)
}

let first = try await group.next()
group.cancelAll()
return first ?? StreamingTestResult(chunks: [], statusCode: 0, completed: false)
}

// Verify response (custom headers are returned as JSON in the response body)
#expect(results.statusCode == 200, "Expected 200 OK, got \(results.statusCode)")
#expect(results.completed, "Streaming response should be completed")
#expect(results.chunks.count >= 1, "Expected at least 1 chunk, got \(results.chunks.count)")

// The response contains both the headers JSON and the content
let fullResponse = results.chunks.joined()
#expect(fullResponse.contains("\"statusCode\":201"), "Response should contain custom status code")
#expect(
fullResponse.contains("\"X-Custom-Header\":\"streaming-test\""),
"Response should contain custom header"
)
#expect(fullResponse.contains("Custom response"), "Response should contain custom content")
}

@Test("Streaming handler error handling works correctly")
@available(LambdaSwift 2.0, *)
func testStreamingHandlerErrorHandling() async throws {
let customPort = 8093

setenv("LOCAL_LAMBDA_PORT", "\(customPort)", 1)
defer { unsetenv("LOCAL_LAMBDA_PORT") }

let results = try await withThrowingTaskGroup(of: StreamingTestResult.self) { group in

group.addTask {
struct ErrorStreamingHandler: StreamingLambdaHandler {
func handle(
_ event: ByteBuffer,
responseWriter: some LambdaResponseStreamWriter,
context: LambdaContext
) async throws {
let eventString = String(buffer: event)

if eventString.contains("error") {
throw TestStreamingError.intentionalError
}

try await responseWriter.write(ByteBuffer(string: "Success"))
try await responseWriter.finish()
}
}

let runtime = LambdaRuntime(
handler: ErrorStreamingHandler()
)

try await runtime._run()
return StreamingTestResult(chunks: [], statusCode: 0, completed: false)
}

group.addTask {
try await Task.sleep(for: .milliseconds(200))

return try await self.makeStreamingInvokeRequest(
host: "127.0.0.1",
port: customPort,
payload: "\"trigger-error\""
)
}

let first = try await group.next()
group.cancelAll()
return first ?? StreamingTestResult(chunks: [], statusCode: 0, completed: false)
}

// Verify error response
#expect(results.statusCode == 500, "Expected 500 Internal Server Error, got \(results.statusCode)")
#expect(results.completed, "Error response should be completed")
}

// MARK: - Helper Methods

private func makeStreamingInvokeRequest(
host: String,
port: Int,
payload: String
) async throws -> StreamingTestResult {
let url = URL(string: "http://\(host):\(port)/invoke")!
var request = URLRequest(url: url)
request.httpMethod = "POST"
request.setValue("application/json", forHTTPHeaderField: "Content-Type")
request.httpBody = payload.data(using: .utf8)
request.timeoutInterval = 10.0

let (data, response) = try await URLSession.shared.data(for: request)

guard let httpResponse = response as? HTTPURLResponse else {
// On Linux, create a custom error since URLError might not be available
struct HTTPError: Error {
let message: String
}
throw HTTPError(message: "Bad server response")
}

// Parse the streaming response
let responseString = String(data: data, encoding: .utf8) ?? ""
let chunks = responseString.isEmpty ? [] : [responseString]

return StreamingTestResult(
chunks: chunks,
statusCode: httpResponse.statusCode,
completed: true
)
}
}

// MARK: - Test Support Types

struct StreamingTestResult {
let chunks: [String]
let statusCode: Int
let completed: Bool
}

enum TestStreamingError: Error {
case intentionalError
}
5 changes: 3 additions & 2 deletions Tests/AWSLambdaRuntimeTests/LambdaLocalServerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ import Testing

@testable import AWSLambdaRuntime

extension LambdaRuntimeTests {

// serialized to start only one runtime at a time
@Suite(.serialized)
struct LambdaLocalServerTest {
@Test("Local server respects LOCAL_LAMBDA_PORT environment variable")
@available(LambdaSwift 2.0, *)
func testLocalServerCustomPort() async throws {
Expand Down