From 1de4b1946fe44909550dfc77b8c9f844de3ab179 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Tue, 14 Oct 2025 11:23:39 +0200 Subject: [PATCH 01/31] Gently decline subsequent POST /invoke request while the Lambda handler processes a request --- Examples/Streaming/README.md | 2 +- .../AWSLambdaRuntime/Lambda+LocalServer.swift | 47 +++++++++++++++---- 2 files changed, 39 insertions(+), 10 deletions(-) diff --git a/Examples/Streaming/README.md b/Examples/Streaming/README.md index 2c40df57..663ac14c 100644 --- a/Examples/Streaming/README.md +++ b/Examples/Streaming/README.md @@ -82,7 +82,7 @@ You can test the function locally before deploying: swift run # In another terminal, test with curl: -curl -v \ +curl -v --output response.txt \ --header "Content-Type: application/json" \ --data '"this is not used"' \ http://127.0.0.1:7000/invoke diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index c3fa2e5d..dcbd88b7 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -95,8 +95,8 @@ extension Lambda { internal struct LambdaHTTPServer { private let invocationEndpoint: String - private let invocationPool = Pool() - private let responsePool = Pool() + private let invocationPool = Pool(name: "Invocation Pool") + private let responsePool = Pool(name: "Response Pool") private init( invocationEndpoint: String? @@ -388,8 +388,21 @@ internal struct LambdaHTTPServer { // we always accept the /invoke request and push them to the pool let requestId = "\(DispatchTime.now().uptimeNanoseconds)" logger[metadataKey: "requestId"] = "\(requestId)" + logger.trace("/invoke received invocation, pushing it to the pool and wait for a lambda response") - await self.invocationPool.push(LocalServerInvocation(requestId: requestId, request: body)) + // detect concurrent invocations of POST and gently decline the requests while we're processing one. + if await !self.invocationPool.push(LocalServerInvocation(requestId: requestId, request: body)) { + let response = LocalServerResponse( + id: requestId, + status: .badRequest, + body: ByteBuffer( + string: + "It's illegal to invoke multiple Lambda function executions in parallel. (The Lambda runtime environment on AWS will never do that)" + ) + ) + try await self.sendResponse(response, outbound: outbound, logger: logger) + return + } // wait for the lambda function to process the request for try await response in self.responsePool { @@ -410,7 +423,12 @@ internal struct LambdaHTTPServer { "Received response for a different request id", metadata: ["response requestId": "\(response.requestId ?? "")"] ) - // should we return an error here ? Or crash as this is probably a programming error? + let response = LocalServerResponse( + id: requestId, + status: .badRequest, + body: ByteBuffer(string: "The response Id not equal to the request Id.") + ) + try await self.sendResponse(response, outbound: outbound, logger: logger) } } // What todo when there is no more responses to process? @@ -548,6 +566,9 @@ internal struct LambdaHTTPServer { /// This data structure is shared between instances of the HTTPHandler /// (one instance to serve requests from the Lambda function and one instance to serve requests from the client invoking the lambda function). internal final class Pool: AsyncSequence, AsyncIteratorProtocol, Sendable where T: Sendable { + private let poolName: String + internal init(name: String) { self.poolName = name } + typealias Element = T enum State: ~Copyable { @@ -558,8 +579,11 @@ internal struct LambdaHTTPServer { private let lock = Mutex(.buffer([])) /// enqueue an element, or give it back immediately to the iterator if it is waiting for an element - public func push(_ invocation: T) async { - // if the iterator is waiting for an element, give it to it + /// Returns true when we receive a element and the pool was in "waiting for continuation" state, false otherwise + @discardableResult + public func push(_ invocation: T) async -> Bool { + + // if the iterator is waiting for an element on `next()``, give it to it // otherwise, enqueue the element let maybeContinuation = self.lock.withLock { state -> CheckedContinuation? in switch consume state { @@ -574,7 +598,12 @@ internal struct LambdaHTTPServer { } } - maybeContinuation?.resume(returning: invocation) + if let maybeContinuation { + maybeContinuation.resume(returning: invocation) + return true + } else { + return false + } } func next() async throws -> T? { @@ -596,8 +625,8 @@ internal struct LambdaHTTPServer { return nil } - case .continuation: - fatalError("Concurrent invocations to next(). This is illegal.") + case .continuation(_): + fatalError("\(self.poolName) : Concurrent invocations to next(). This is illegal.") } } From f35685c05c8090ff81d64f09f197d476cbc7bd46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Tue, 14 Oct 2025 11:33:21 +0200 Subject: [PATCH 02/31] Update Sources/AWSLambdaRuntime/Lambda+LocalServer.swift Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- Sources/AWSLambdaRuntime/Lambda+LocalServer.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index dcbd88b7..46327984 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -397,7 +397,7 @@ internal struct LambdaHTTPServer { status: .badRequest, body: ByteBuffer( string: - "It's illegal to invoke multiple Lambda function executions in parallel. (The Lambda runtime environment on AWS will never do that)" + "It is not allowed to invoke multiple Lambda function executions in parallel. (The Lambda runtime environment on AWS will never do that)" ) ) try await self.sendResponse(response, outbound: outbound, logger: logger) From 8aa9a46da61fb02c4cdf9161a332b50b7dbaccc3 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Tue, 14 Oct 2025 11:36:17 +0200 Subject: [PATCH 03/31] fix typo and language in comments --- Sources/AWSLambdaRuntime/Lambda+LocalServer.swift | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index 46327984..ec17742b 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -420,13 +420,13 @@ internal struct LambdaHTTPServer { } } else { logger.error( - "Received response for a different request id", + "Received response for a different requestId", metadata: ["response requestId": "\(response.requestId ?? "")"] ) let response = LocalServerResponse( id: requestId, status: .badRequest, - body: ByteBuffer(string: "The response Id not equal to the request Id.") + body: ByteBuffer(string: "The responseId is not equal to the requestId.") ) try await self.sendResponse(response, outbound: outbound, logger: logger) } @@ -626,7 +626,7 @@ internal struct LambdaHTTPServer { } case .continuation(_): - fatalError("\(self.poolName) : Concurrent invocations to next(). This is illegal.") + fatalError("\(self.poolName) : Concurrent invocations to next(). This is not allowed.") } } From 6d028481b333fda952d29d8856d6e46e036dff86 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Tue, 14 Oct 2025 11:39:44 +0200 Subject: [PATCH 04/31] fix test --- Sources/AWSLambdaRuntime/Lambda+LocalServer.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index ec17742b..e4a36036 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -567,7 +567,7 @@ internal struct LambdaHTTPServer { /// (one instance to serve requests from the Lambda function and one instance to serve requests from the client invoking the lambda function). internal final class Pool: AsyncSequence, AsyncIteratorProtocol, Sendable where T: Sendable { private let poolName: String - internal init(name: String) { self.poolName = name } + internal init(name: String = "Pool") { self.poolName = name } typealias Element = T From 0566ef5d54253ce430b829652d6bbf15847c072f Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Tue, 14 Oct 2025 13:32:42 +0200 Subject: [PATCH 05/31] remove async constraint on `push()` --- Sources/AWSLambdaRuntime/Lambda+LocalServer.swift | 14 +++++++------- Tests/AWSLambdaRuntimeTests/PoolTests.swift | 14 ++++++++------ 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index e4a36036..c7687c63 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -272,7 +272,7 @@ internal struct LambdaHTTPServer { // for streaming requests, push a partial head response if self.isStreamingResponse(requestHead) { - await self.responsePool.push( + self.responsePool.push( LocalServerResponse( id: requestId, status: .ok @@ -286,7 +286,7 @@ internal struct LambdaHTTPServer { // if this is a request from a Streaming Lambda Handler, // stream the response instead of buffering it if self.isStreamingResponse(requestHead) { - await self.responsePool.push( + self.responsePool.push( LocalServerResponse(id: requestId, body: body) ) } else { @@ -298,7 +298,7 @@ internal struct LambdaHTTPServer { if self.isStreamingResponse(requestHead) { // for streaming response, send the final response - await self.responsePool.push( + self.responsePool.push( LocalServerResponse(id: requestId, final: true) ) } else { @@ -391,7 +391,7 @@ internal struct LambdaHTTPServer { logger.trace("/invoke received invocation, pushing it to the pool and wait for a lambda response") // detect concurrent invocations of POST and gently decline the requests while we're processing one. - if await !self.invocationPool.push(LocalServerInvocation(requestId: requestId, request: body)) { + if !self.invocationPool.push(LocalServerInvocation(requestId: requestId, request: body)) { let response = LocalServerResponse( id: requestId, status: .badRequest, @@ -475,7 +475,7 @@ internal struct LambdaHTTPServer { } // enqueue the lambda function response to be served as response to the client /invoke logger.trace("/:requestId/response received response", metadata: ["requestId": "\(requestId)"]) - await self.responsePool.push( + self.responsePool.push( LocalServerResponse( id: requestId, status: .accepted, @@ -506,7 +506,7 @@ internal struct LambdaHTTPServer { } // enqueue the lambda function response to be served as response to the client /invoke logger.trace("/:requestId/response received response", metadata: ["requestId": "\(requestId)"]) - await self.responsePool.push( + self.responsePool.push( LocalServerResponse( id: requestId, status: .internalServerError, @@ -581,7 +581,7 @@ internal struct LambdaHTTPServer { /// enqueue an element, or give it back immediately to the iterator if it is waiting for an element /// Returns true when we receive a element and the pool was in "waiting for continuation" state, false otherwise @discardableResult - public func push(_ invocation: T) async -> Bool { + public func push(_ invocation: T) -> Bool { // if the iterator is waiting for an element on `next()``, give it to it // otherwise, enqueue the element diff --git a/Tests/AWSLambdaRuntimeTests/PoolTests.swift b/Tests/AWSLambdaRuntimeTests/PoolTests.swift index 8cbe8a2e..84be4178 100644 --- a/Tests/AWSLambdaRuntimeTests/PoolTests.swift +++ b/Tests/AWSLambdaRuntimeTests/PoolTests.swift @@ -24,8 +24,8 @@ struct PoolTests { let pool = LambdaHTTPServer.Pool() // Push values - await pool.push("first") - await pool.push("second") + pool.push("first") + pool.push("second") // Iterate and verify order var values = [String]() @@ -53,7 +53,9 @@ struct PoolTests { task.cancel() // This should complete without receiving any values - try await task.value + do { + try await task.value + } catch is CancellationError {} // this might happen depending on the order on which the cancellation is handled } @Test @@ -78,7 +80,7 @@ struct PoolTests { try await withThrowingTaskGroup(of: Void.self) { group in for i in 0.. Date: Tue, 14 Oct 2025 13:42:33 +0200 Subject: [PATCH 06/31] swift-format --- Tests/AWSLambdaRuntimeTests/PoolTests.swift | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Tests/AWSLambdaRuntimeTests/PoolTests.swift b/Tests/AWSLambdaRuntimeTests/PoolTests.swift index 84be4178..2e042914 100644 --- a/Tests/AWSLambdaRuntimeTests/PoolTests.swift +++ b/Tests/AWSLambdaRuntimeTests/PoolTests.swift @@ -55,7 +55,9 @@ struct PoolTests { // This should complete without receiving any values do { try await task.value - } catch is CancellationError {} // this might happen depending on the order on which the cancellation is handled + } catch is CancellationError { + // this might happen depending on the order on which the cancellation is handled + } } @Test From d73dce678a8f155ea4f760b49d0eefae7395c805 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Tue, 14 Oct 2025 14:25:21 +0200 Subject: [PATCH 07/31] remove the fatal error to make testinge easier --- .../AWSLambdaRuntime/Lambda+LocalServer.swift | 117 ++++++++++-------- Tests/AWSLambdaRuntimeTests/PoolTests.swift | 28 +++++ 2 files changed, 94 insertions(+), 51 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index c7687c63..0b661ad7 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -391,7 +391,42 @@ internal struct LambdaHTTPServer { logger.trace("/invoke received invocation, pushing it to the pool and wait for a lambda response") // detect concurrent invocations of POST and gently decline the requests while we're processing one. - if !self.invocationPool.push(LocalServerInvocation(requestId: requestId, request: body)) { + self.invocationPool.push(LocalServerInvocation(requestId: requestId, request: body)) + + // wait for the lambda function to process the request + // when POST /invoke is called multiple times before a response is process, the + // `for try await ... in` loop will throw an error and we will return a 400 error to the client + do { + for try await response in self.responsePool { + logger[metadataKey: "response requestId"] = "\(response.requestId ?? "nil")" + logger.trace("Received response to return to client") + if response.requestId == requestId { + logger.trace("/invoke requestId is valid, sending the response") + // send the response to the client + // if the response is final, we can send it and return + // if the response is not final, we can send it and wait for the next response + try await self.sendResponse(response, outbound: outbound, logger: logger) + if response.final == true { + logger.trace("/invoke returning") + return // if the response is final, we can return and close the connection + } + } else { + logger.error( + "Received response for a different requestId", + metadata: ["response requestId": "\(response.requestId ?? "")"] + ) + let response = LocalServerResponse( + id: requestId, + status: .badRequest, + body: ByteBuffer(string: "The responseId is not equal to the requestId.") + ) + try await self.sendResponse(response, outbound: outbound, logger: logger) + } + } + // What todo when there is no more responses to process? + // This should not happen as the async iterator blocks until there is a response to process + fatalError("No more responses to process - the async for loop should not return") + } catch is LambdaHTTPServer.Pool.PoolError { let response = LocalServerResponse( id: requestId, status: .badRequest, @@ -401,39 +436,7 @@ internal struct LambdaHTTPServer { ) ) try await self.sendResponse(response, outbound: outbound, logger: logger) - return - } - - // wait for the lambda function to process the request - for try await response in self.responsePool { - logger[metadataKey: "response requestId"] = "\(response.requestId ?? "nil")" - logger.trace("Received response to return to client") - if response.requestId == requestId { - logger.trace("/invoke requestId is valid, sending the response") - // send the response to the client - // if the response is final, we can send it and return - // if the response is not final, we can send it and wait for the next response - try await self.sendResponse(response, outbound: outbound, logger: logger) - if response.final == true { - logger.trace("/invoke returning") - return // if the response is final, we can return and close the connection - } - } else { - logger.error( - "Received response for a different requestId", - metadata: ["response requestId": "\(response.requestId ?? "")"] - ) - let response = LocalServerResponse( - id: requestId, - status: .badRequest, - body: ByteBuffer(string: "The responseId is not equal to the requestId.") - ) - try await self.sendResponse(response, outbound: outbound, logger: logger) - } } - // What todo when there is no more responses to process? - // This should not happen as the async iterator blocks until there is a response to process - fatalError("No more responses to process - the async for loop should not return") // client uses incorrect HTTP method case (_, let url) where url.hasSuffix(self.invocationEndpoint): @@ -579,9 +582,7 @@ internal struct LambdaHTTPServer { private let lock = Mutex(.buffer([])) /// enqueue an element, or give it back immediately to the iterator if it is waiting for an element - /// Returns true when we receive a element and the pool was in "waiting for continuation" state, false otherwise - @discardableResult - public func push(_ invocation: T) -> Bool { + public func push(_ invocation: T) { // if the iterator is waiting for an element on `next()``, give it to it // otherwise, enqueue the element @@ -598,12 +599,7 @@ internal struct LambdaHTTPServer { } } - if let maybeContinuation { - maybeContinuation.resume(returning: invocation) - return true - } else { - return false - } + maybeContinuation?.resume(returning: invocation) } func next() async throws -> T? { @@ -614,25 +610,30 @@ internal struct LambdaHTTPServer { return try await withTaskCancellationHandler { try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - let nextAction = self.lock.withLock { state -> T? in + let (nextAction, nextError) = self.lock.withLock { state -> (T?, PoolError?) in switch consume state { case .buffer(var buffer): if let first = buffer.popFirst() { state = .buffer(buffer) - return first + return (first, nil) } else { state = .continuation(continuation) - return nil + return (nil, nil) } - case .continuation(_): - fatalError("\(self.poolName) : Concurrent invocations to next(). This is not allowed.") + case .continuation(let previousContinuation): + state = .buffer([]) + return (nil, PoolError(cause: .nextCalledTwice([previousContinuation, continuation]))) } } - guard let nextAction else { return } - - continuation.resume(returning: nextAction) + if let nextError, + case let .nextCalledTwice(continuations) = nextError.cause + { + for continuation in continuations { continuation?.resume(throwing: nextError) } + } else if let nextAction { + continuation.resume(returning: nextAction) + } } } onCancel: { self.lock.withLock { state in @@ -640,8 +641,8 @@ internal struct LambdaHTTPServer { case .buffer(let buffer): state = .buffer(buffer) case .continuation(let continuation): - continuation?.resume(throwing: CancellationError()) state = .buffer([]) + continuation?.resume(throwing: CancellationError()) } } } @@ -650,6 +651,20 @@ internal struct LambdaHTTPServer { func makeAsyncIterator() -> Pool { self } + + struct PoolError: Error { + let cause: Cause + var message: String { + switch self.cause { + case .nextCalledTwice: + return "Concurrent invocations to next(). This is not allowed." + } + } + + enum Cause { + case nextCalledTwice([CheckedContinuation?]) + } + } } private struct LocalServerResponse: Sendable { diff --git a/Tests/AWSLambdaRuntimeTests/PoolTests.swift b/Tests/AWSLambdaRuntimeTests/PoolTests.swift index 2e042914..1e2fff2e 100644 --- a/Tests/AWSLambdaRuntimeTests/PoolTests.swift +++ b/Tests/AWSLambdaRuntimeTests/PoolTests.swift @@ -158,4 +158,32 @@ struct PoolTests { #expect(receivedValues.count == producerCount * messagesPerProducer) #expect(Set(receivedValues).count == producerCount * messagesPerProducer) } + + @Test + @available(LambdaSwift 2.0, *) + func testConcurrentNext() async throws { + let pool = LambdaHTTPServer.Pool() + + // Create two tasks that will both wait for elements to be available + await #expect(throws: LambdaHTTPServer.Pool.PoolError.self) { + try await withThrowingTaskGroup(of: Void.self) { group in + + // one of the two task will throw a PoolError + + group.addTask { + for try await _ in pool { + } + Issue.record("Loop 1 should not complete") + } + + group.addTask { + for try await _ in pool { + } + Issue.record("Loop 2 should not complete") + } + try await group.waitForAll() + } + } + } + } From 0cd73dad2ad8a5bb4f7a3e8ac62730246605fbb6 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Tue, 14 Oct 2025 14:34:42 +0200 Subject: [PATCH 08/31] fix comment --- Sources/AWSLambdaRuntime/Lambda+LocalServer.swift | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index 0b661ad7..498d3e79 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -390,12 +390,11 @@ internal struct LambdaHTTPServer { logger[metadataKey: "requestId"] = "\(requestId)" logger.trace("/invoke received invocation, pushing it to the pool and wait for a lambda response") - // detect concurrent invocations of POST and gently decline the requests while we're processing one. self.invocationPool.push(LocalServerInvocation(requestId: requestId, request: body)) // wait for the lambda function to process the request - // when POST /invoke is called multiple times before a response is process, the - // `for try await ... in` loop will throw an error and we will return a 400 error to the client + // when POST /invoke is called multiple times before a response is processed, + // the `for try await ... in` loop will throw an error and we will return a 400 error to the client do { for try await response in self.responsePool { logger[metadataKey: "response requestId"] = "\(response.requestId ?? "nil")" @@ -427,6 +426,7 @@ internal struct LambdaHTTPServer { // This should not happen as the async iterator blocks until there is a response to process fatalError("No more responses to process - the async for loop should not return") } catch is LambdaHTTPServer.Pool.PoolError { + // detect concurrent invocations of POST and gently decline the requests while we're processing one. let response = LocalServerResponse( id: requestId, status: .badRequest, From b3576fe094c1c575fc6b9f9558993d01f333ec7b Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Wed, 15 Oct 2025 09:56:22 +0200 Subject: [PATCH 09/31] use Result instead of Tupe --- .../AWSLambdaRuntime/Lambda+LocalServer.swift | 31 +++++++++++-------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index 498d3e79..0e571673 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -576,7 +576,7 @@ internal struct LambdaHTTPServer { enum State: ~Copyable { case buffer(Deque) - case continuation(CheckedContinuation?) + case continuation(CheckedContinuation) } private let lock = Mutex(.buffer([])) @@ -610,29 +610,34 @@ internal struct LambdaHTTPServer { return try await withTaskCancellationHandler { try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - let (nextAction, nextError) = self.lock.withLock { state -> (T?, PoolError?) in + let nextAction: Result? = self.lock.withLock { state -> Result? in switch consume state { case .buffer(var buffer): if let first = buffer.popFirst() { state = .buffer(buffer) - return (first, nil) + return .success(first) } else { state = .continuation(continuation) - return (nil, nil) + return nil } case .continuation(let previousContinuation): state = .buffer([]) - return (nil, PoolError(cause: .nextCalledTwice([previousContinuation, continuation]))) + return .failure(PoolError(cause: .nextCalledTwice(previousContinuation))) } } - if let nextError, - case let .nextCalledTwice(continuations) = nextError.cause - { - for continuation in continuations { continuation?.resume(throwing: nextError) } - } else if let nextAction { - continuation.resume(returning: nextAction) + switch nextAction { + case .success(let action): + continuation.resume(returning: action) + case .failure(let error): + if case let .nextCalledTwice(prevContinuation) = error.cause { + prevContinuation.resume(throwing: error) + } + continuation.resume(throwing: error) + case .none: + // do nothing + break } } } onCancel: { @@ -642,7 +647,7 @@ internal struct LambdaHTTPServer { state = .buffer(buffer) case .continuation(let continuation): state = .buffer([]) - continuation?.resume(throwing: CancellationError()) + continuation.resume(throwing: CancellationError()) } } } @@ -662,7 +667,7 @@ internal struct LambdaHTTPServer { } enum Cause { - case nextCalledTwice([CheckedContinuation?]) + case nextCalledTwice(CheckedContinuation) } } } From ae18d7baa9759031d3d4ae2a0d29ca7120b9b5ca Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Wed, 15 Oct 2025 10:02:34 +0200 Subject: [PATCH 10/31] swift format --- Sources/AWSLambdaRuntime/Lambda+LocalServer.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index 0e571673..f3356a96 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -610,7 +610,7 @@ internal struct LambdaHTTPServer { return try await withTaskCancellationHandler { try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - let nextAction: Result? = self.lock.withLock { state -> Result? in + let nextAction: Result? = self.lock.withLock { state -> Result? in switch consume state { case .buffer(var buffer): if let first = buffer.popFirst() { From 605e86c7879a9de7866ba982e05415b2aa63f2d1 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Wed, 15 Oct 2025 10:15:26 +0200 Subject: [PATCH 11/31] onCancel: resume continuation outside of the lock --- Sources/AWSLambdaRuntime/Lambda+LocalServer.swift | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index f3356a96..055c9f1c 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -583,7 +583,6 @@ internal struct LambdaHTTPServer { /// enqueue an element, or give it back immediately to the iterator if it is waiting for an element public func push(_ invocation: T) { - // if the iterator is waiting for an element on `next()``, give it to it // otherwise, enqueue the element let maybeContinuation = self.lock.withLock { state -> CheckedContinuation? in @@ -599,6 +598,7 @@ internal struct LambdaHTTPServer { } } + // Resume continuation outside the lock to prevent potential deadlocks maybeContinuation?.resume(returning: invocation) } @@ -636,20 +636,25 @@ internal struct LambdaHTTPServer { } continuation.resume(throwing: error) case .none: - // do nothing + // do nothing - continuation is stored in state break } } } onCancel: { - self.lock.withLock { state in + // Ensure we properly handle cancellation by checking if we have a stored continuation + let continuationToCancel = self.lock.withLock { state -> CheckedContinuation? in switch consume state { case .buffer(let buffer): state = .buffer(buffer) + return nil case .continuation(let continuation): state = .buffer([]) - continuation.resume(throwing: CancellationError()) + return continuation } } + + // Resume the continuation outside the lock to avoid potential deadlocks + continuationToCancel?.resume(throwing: CancellationError()) } } From 2d6842a195e8e2ba8783240694d8af95bc001a56 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Wed, 15 Oct 2025 10:17:38 +0200 Subject: [PATCH 12/31] swift-format --- Sources/AWSLambdaRuntime/Lambda+LocalServer.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index 055c9f1c..da5f2597 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -652,7 +652,7 @@ internal struct LambdaHTTPServer { return continuation } } - + // Resume the continuation outside the lock to avoid potential deadlocks continuationToCancel?.resume(throwing: CancellationError()) } From 009b5c6a1444dbb9928087fe474c4473b3d42823 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Thu, 16 Oct 2025 09:12:35 +0200 Subject: [PATCH 13/31] add logging trace details --- Sources/AWSLambdaRuntime/Lambda+LocalServer.swift | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index da5f2597..58d763e7 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -125,6 +125,11 @@ internal struct LambdaHTTPServer { logger: Logger, _ closure: sending @escaping () async throws -> Result ) async throws -> Result { + + var l = Logger(label: "HTTPServer") + l.logLevel = .trace + let logger = l + let channel = try await ServerBootstrap(group: eventLoopGroup) .serverChannelOption(.backlog, value: 256) .serverChannelOption(.socketOption(.so_reuseaddr), value: 1) @@ -165,6 +170,8 @@ internal struct LambdaHTTPServer { let closureBox = UnsafeTransferBox(value: closure) let result = await withTaskGroup(of: TaskResult.self, returning: Swift.Result.self) { group in + + // this Task will run the content of the closure we received, typically the Lambda Runtime Client HTTP group.addTask { let c = closureBox.value do { @@ -175,6 +182,7 @@ internal struct LambdaHTTPServer { } } + // this Task will create one subtask to handle each individual connection group.addTask { do { // We are handling each incoming connection in a separate child task. It is important @@ -397,7 +405,7 @@ internal struct LambdaHTTPServer { // the `for try await ... in` loop will throw an error and we will return a 400 error to the client do { for try await response in self.responsePool { - logger[metadataKey: "response requestId"] = "\(response.requestId ?? "nil")" + logger[metadataKey: "response_requestId"] = "\(response.requestId ?? "nil")" logger.trace("Received response to return to client") if response.requestId == requestId { logger.trace("/invoke requestId is valid, sending the response") @@ -426,6 +434,7 @@ internal struct LambdaHTTPServer { // This should not happen as the async iterator blocks until there is a response to process fatalError("No more responses to process - the async for loop should not return") } catch is LambdaHTTPServer.Pool.PoolError { + logger.trace("PoolError catched") // detect concurrent invocations of POST and gently decline the requests while we're processing one. let response = LocalServerResponse( id: requestId, @@ -460,7 +469,9 @@ internal struct LambdaHTTPServer { logger[metadataKey: "requestId"] = "\(invocation.requestId)" logger.trace("/next retrieved invocation") // tell the lambda function we accepted the invocation - return try await sendResponse(invocation.acceptedResponse(), outbound: outbound, logger: logger) + try await sendResponse(invocation.acceptedResponse(), outbound: outbound, logger: logger) + logger.trace("/next accepted, returning") + return } // What todo when there is no more tasks to process? // This should not happen as the async iterator blocks until there is a task to process From 0ed15f9cdc22dd4052e031911d6041bcc45b9a88 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Thu, 16 Oct 2025 12:18:17 +0200 Subject: [PATCH 14/31] fix parallel invocation for non streaming lambda functions --- .../AWSLambdaRuntime/Lambda+LocalServer.swift | 188 ++++++++++++------ 1 file changed, 130 insertions(+), 58 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index 58d763e7..0cf6060f 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -401,38 +401,22 @@ internal struct LambdaHTTPServer { self.invocationPool.push(LocalServerInvocation(requestId: requestId, request: body)) // wait for the lambda function to process the request - // when POST /invoke is called multiple times before a response is processed, - // the `for try await ... in` loop will throw an error and we will return a 400 error to the client + // Handle streaming responses by collecting all chunks for this requestId do { - for try await response in self.responsePool { + var isComplete = false + while !isComplete { + let response = try await self.responsePool.next(for: requestId) logger[metadataKey: "response_requestId"] = "\(response.requestId ?? "nil")" - logger.trace("Received response to return to client") - if response.requestId == requestId { - logger.trace("/invoke requestId is valid, sending the response") - // send the response to the client - // if the response is final, we can send it and return - // if the response is not final, we can send it and wait for the next response - try await self.sendResponse(response, outbound: outbound, logger: logger) - if response.final == true { - logger.trace("/invoke returning") - return // if the response is final, we can return and close the connection - } - } else { - logger.error( - "Received response for a different requestId", - metadata: ["response requestId": "\(response.requestId ?? "")"] - ) - let response = LocalServerResponse( - id: requestId, - status: .badRequest, - body: ByteBuffer(string: "The responseId is not equal to the requestId.") - ) - try await self.sendResponse(response, outbound: outbound, logger: logger) + logger.trace("Received response chunk to return to client") + + // send the response chunk to the client + try await self.sendResponse(response, outbound: outbound, logger: logger) + + if response.final == true { + logger.trace("/invoke complete, returning") + isComplete = true } } - // What todo when there is no more responses to process? - // This should not happen as the async iterator blocks until there is a response to process - fatalError("No more responses to process - the async for loop should not return") } catch is LambdaHTTPServer.Pool.PoolError { logger.trace("PoolError catched") // detect concurrent invocations of POST and gently decline the requests while we're processing one. @@ -587,36 +571,58 @@ internal struct LambdaHTTPServer { enum State: ~Copyable { case buffer(Deque) - case continuation(CheckedContinuation) + case waitingForAny(CheckedContinuation) // FIFO waiting (for invocations) + case waitingForSpecific([String: CheckedContinuation]) // RequestId-based waiting (for responses) } private let lock = Mutex(.buffer([])) /// enqueue an element, or give it back immediately to the iterator if it is waiting for an element - public func push(_ invocation: T) { - // if the iterator is waiting for an element on `next()``, give it to it - // otherwise, enqueue the element - let maybeContinuation = self.lock.withLock { state -> CheckedContinuation? in + public func push(_ item: T) { + let continuationToResume = self.lock.withLock { state -> CheckedContinuation? in switch consume state { - case .continuation(let continuation): - state = .buffer([]) - return continuation - case .buffer(var buffer): - buffer.append(invocation) + buffer.append(item) state = .buffer(buffer) return nil + + case .waitingForAny(let continuation): + // Someone is waiting for any item (FIFO) + state = .buffer([]) + return continuation + + case .waitingForSpecific(var continuations): + // Check if this item matches any waiting continuation + if let response = item as? LocalServerResponse, + let requestId = response.requestId, + let continuation = continuations.removeValue(forKey: requestId) + { + // Found a matching continuation + if continuations.isEmpty { + state = .buffer([]) + } else { + state = .waitingForSpecific(continuations) + } + return continuation + } else { + // No matching continuation, add to buffer + var buffer = Deque() + buffer.append(item) + state = .buffer(buffer) + return nil + } } } // Resume continuation outside the lock to prevent potential deadlocks - maybeContinuation?.resume(returning: invocation) + continuationToResume?.resume(returning: item) } - func next() async throws -> T? { - // exit the async for loop if the task is cancelled + /// Unified next() method that handles both FIFO and requestId-specific waiting + private func _next(for requestId: String?) async throws -> T { + // exit if the task is cancelled guard !Task.isCancelled else { - return nil + throw CancellationError() } return try await withTaskCancellationHandler { @@ -624,23 +630,68 @@ internal struct LambdaHTTPServer { let nextAction: Result? = self.lock.withLock { state -> Result? in switch consume state { case .buffer(var buffer): - if let first = buffer.popFirst() { - state = .buffer(buffer) - return .success(first) + if let requestId = requestId { + // Look for oldest (first) item for this requestId in buffer + if let index = buffer.firstIndex(where: { item in + if let response = item as? LocalServerResponse { + return response.requestId == requestId + } + return false + }) { + let item = buffer.remove(at: index) + state = .buffer(buffer) + return .success(item) + } else { + // No matching item, wait for it + var continuations: [String: CheckedContinuation] = [:] + continuations[requestId] = continuation + state = .waitingForSpecific(continuations) + return nil + } } else { - state = .continuation(continuation) - return nil + // FIFO mode - take first item + if let first = buffer.popFirst() { + state = .buffer(buffer) + return .success(first) + } else { + state = .waitingForAny(continuation) + return nil + } } - case .continuation(let previousContinuation): - state = .buffer([]) - return .failure(PoolError(cause: .nextCalledTwice(previousContinuation))) + case .waitingForAny(let previousContinuation): + if requestId == nil { + // Another FIFO call while already waiting + state = .buffer([]) + return .failure(PoolError(cause: .nextCalledTwice(previousContinuation))) + } else { + // Can't mix FIFO and specific waiting + state = .waitingForAny(previousContinuation) + return .failure(PoolError(cause: .mixedWaitingModes)) + } + + case .waitingForSpecific(var continuations): + if let requestId = requestId { + if continuations[requestId] != nil { + // Already waiting for this requestId + state = .waitingForSpecific(continuations) + return .failure(PoolError(cause: .duplicateRequestIdWait(requestId))) + } else { + continuations[requestId] = continuation + state = .waitingForSpecific(continuations) + return nil + } + } else { + // Can't mix FIFO and specific waiting + state = .waitingForSpecific(continuations) + return .failure(PoolError(cause: .mixedWaitingModes)) + } } } switch nextAction { - case .success(let action): - continuation.resume(returning: action) + case .success(let item): + continuation.resume(returning: item) case .failure(let error): if case let .nextCalledTwice(prevContinuation) = error.cause { prevContinuation.resume(throwing: error) @@ -653,22 +704,37 @@ internal struct LambdaHTTPServer { } } onCancel: { // Ensure we properly handle cancellation by checking if we have a stored continuation - let continuationToCancel = self.lock.withLock { state -> CheckedContinuation? in + let continuationsToCancel = self.lock.withLock { state -> [String: CheckedContinuation] in switch consume state { case .buffer(let buffer): state = .buffer(buffer) - return nil - case .continuation(let continuation): + return [:] + case .waitingForAny(let continuation): state = .buffer([]) - return continuation + return ["": continuation] // Use empty string as key for single continuation + case .waitingForSpecific(let continuations): + state = .buffer([]) + return continuations } } - // Resume the continuation outside the lock to avoid potential deadlocks - continuationToCancel?.resume(throwing: CancellationError()) + // Resume all continuations outside the lock to avoid potential deadlocks + for continuation in continuationsToCancel.values { + continuation.resume(throwing: CancellationError()) + } } } + /// Simple FIFO next() method - used by AsyncIteratorProtocol + func next() async throws -> T? { + try await _next(for: nil) + } + + /// RequestId-specific next() method for LocalServerResponse - NOT part of AsyncIteratorProtocol + func next(for requestId: String) async throws -> T { + try await _next(for: requestId) + } + func makeAsyncIterator() -> Pool { self } @@ -679,11 +745,17 @@ internal struct LambdaHTTPServer { switch self.cause { case .nextCalledTwice: return "Concurrent invocations to next(). This is not allowed." + case .duplicateRequestIdWait(let requestId): + return "Already waiting for requestId: \(requestId)" + case .mixedWaitingModes: + return "Cannot mix FIFO waiting (next()) with specific waiting (next(for:))" } } enum Cause { case nextCalledTwice(CheckedContinuation) + case duplicateRequestIdWait(String) + case mixedWaitingModes } } } From a2ce3be9139eb8d5a7270857b4f818d0629905af Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Thu, 16 Oct 2025 12:25:44 +0200 Subject: [PATCH 15/31] move pool to a separate file --- .../Lambda+LocalServer+Pool.swift | 210 ++++++++++++++++++ .../AWSLambdaRuntime/Lambda+LocalServer.swift | 207 +---------------- 2 files changed, 213 insertions(+), 204 deletions(-) create mode 100644 Sources/AWSLambdaRuntime/Lambda+LocalServer+Pool.swift diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer+Pool.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer+Pool.swift new file mode 100644 index 00000000..0e7814cd --- /dev/null +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer+Pool.swift @@ -0,0 +1,210 @@ +#if LocalServerSupport +import DequeModule +import Synchronization + +@available(LambdaSwift 2.0, *) +extension LambdaHTTPServer { + + /// A shared data structure to store the current invocation or response requests and the continuation objects. + /// This data structure is shared between instances of the HTTPHandler + /// (one instance to serve requests from the Lambda function and one instance to serve requests from the client invoking the lambda function). + internal final class Pool: AsyncSequence, AsyncIteratorProtocol, Sendable where T: Sendable { + private let poolName: String + internal init(name: String = "Pool") { self.poolName = name } + + typealias Element = T + + enum State: ~Copyable { + case buffer(Deque) + // FIFO waiting (for invocations) + case waitingForAny(CheckedContinuation) + // RequestId-based waiting (for responses) + case waitingForSpecific([String: CheckedContinuation]) + } + + private let lock = Mutex(.buffer([])) + + /// enqueue an element, or give it back immediately to the iterator if it is waiting for an element + public func push(_ item: T) { + let continuationToResume = self.lock.withLock { state -> CheckedContinuation? in + switch consume state { + case .buffer(var buffer): + buffer.append(item) + state = .buffer(buffer) + return nil + + case .waitingForAny(let continuation): + // Someone is waiting for any item (FIFO) + state = .buffer([]) + return continuation + + case .waitingForSpecific(var continuations): + // Check if this item matches any waiting continuation + if let response = item as? LocalServerResponse, + let requestId = response.requestId, + let continuation = continuations.removeValue(forKey: requestId) + { + // Found a matching continuation + if continuations.isEmpty { + state = .buffer([]) + } else { + state = .waitingForSpecific(continuations) + } + return continuation + } else { + // No matching continuation, add to buffer + var buffer = Deque() + buffer.append(item) + state = .buffer(buffer) + return nil + } + } + } + + // Resume continuation outside the lock to prevent potential deadlocks + continuationToResume?.resume(returning: item) + } + + /// Unified next() method that handles both FIFO and requestId-specific waiting + private func _next(for requestId: String?) async throws -> T { + // exit if the task is cancelled + guard !Task.isCancelled else { + throw CancellationError() + } + + return try await withTaskCancellationHandler { + try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + let nextAction: Result? = self.lock.withLock { state -> Result? in + switch consume state { + case .buffer(var buffer): + if let requestId = requestId { + // Look for oldest (first) item for this requestId in buffer + if let index = buffer.firstIndex(where: { item in + if let response = item as? LocalServerResponse { + return response.requestId == requestId + } + return false + }) { + let item = buffer.remove(at: index) + state = .buffer(buffer) + return .success(item) + } else { + // No matching item, wait for it + var continuations: [String: CheckedContinuation] = [:] + continuations[requestId] = continuation + state = .waitingForSpecific(continuations) + return nil + } + } else { + // FIFO mode - take first item + if let first = buffer.popFirst() { + state = .buffer(buffer) + return .success(first) + } else { + state = .waitingForAny(continuation) + return nil + } + } + + case .waitingForAny(let previousContinuation): + if requestId == nil { + // Another FIFO call while already waiting + state = .buffer([]) + return .failure(PoolError(cause: .nextCalledTwice(previousContinuation))) + } else { + // Can't mix FIFO and specific waiting + state = .waitingForAny(previousContinuation) + return .failure(PoolError(cause: .mixedWaitingModes)) + } + + case .waitingForSpecific(var continuations): + if let requestId = requestId { + if continuations[requestId] != nil { + // Already waiting for this requestId + state = .waitingForSpecific(continuations) + return .failure(PoolError(cause: .duplicateRequestIdWait(requestId))) + } else { + continuations[requestId] = continuation + state = .waitingForSpecific(continuations) + return nil + } + } else { + // Can't mix FIFO and specific waiting + state = .waitingForSpecific(continuations) + return .failure(PoolError(cause: .mixedWaitingModes)) + } + } + } + + switch nextAction { + case .success(let item): + continuation.resume(returning: item) + case .failure(let error): + if case let .nextCalledTwice(prevContinuation) = error.cause { + prevContinuation.resume(throwing: error) + } + continuation.resume(throwing: error) + case .none: + // do nothing - continuation is stored in state + break + } + } + } onCancel: { + // Ensure we properly handle cancellation by checking if we have a stored continuation + let continuationsToCancel = self.lock.withLock { state -> [String: CheckedContinuation] in + switch consume state { + case .buffer(let buffer): + state = .buffer(buffer) + return [:] + case .waitingForAny(let continuation): + state = .buffer([]) + return ["": continuation] // Use empty string as key for single continuation + case .waitingForSpecific(let continuations): + state = .buffer([]) + return continuations + } + } + + // Resume all continuations outside the lock to avoid potential deadlocks + for continuation in continuationsToCancel.values { + continuation.resume(throwing: CancellationError()) + } + } + } + + /// Simple FIFO next() method - used by AsyncIteratorProtocol + func next() async throws -> T? { + try await _next(for: nil) + } + + /// RequestId-specific next() method for LocalServerResponse - NOT part of AsyncIteratorProtocol + func next(for requestId: String) async throws -> T { + try await _next(for: requestId) + } + + func makeAsyncIterator() -> Pool { + self + } + + struct PoolError: Error { + let cause: Cause + var message: String { + switch self.cause { + case .nextCalledTwice: + return "Concurrent invocations to next(). This is not allowed." + case .duplicateRequestIdWait(let requestId): + return "Already waiting for requestId: \(requestId)" + case .mixedWaitingModes: + return "Cannot mix FIFO waiting (next()) with specific waiting (next(for:))" + } + } + + enum Cause { + case nextCalledTwice(CheckedContinuation) + case duplicateRequestIdWait(String) + case mixedWaitingModes + } + } + } +} +#endif diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index 0cf6060f..dca1e055 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -14,7 +14,6 @@ #if LocalServerSupport import DequeModule -import Dispatch import Logging import NIOCore import NIOHTTP1 @@ -394,7 +393,7 @@ internal struct LambdaHTTPServer { ) } // we always accept the /invoke request and push them to the pool - let requestId = "\(DispatchTime.now().uptimeNanoseconds)" + let requestId = "\(LambdaClock().now))" logger[metadataKey: "requestId"] = "\(requestId)" logger.trace("/invoke received invocation, pushing it to the pool and wait for a lambda response") @@ -560,207 +559,7 @@ internal struct LambdaHTTPServer { } } - /// A shared data structure to store the current invocation or response requests and the continuation objects. - /// This data structure is shared between instances of the HTTPHandler - /// (one instance to serve requests from the Lambda function and one instance to serve requests from the client invoking the lambda function). - internal final class Pool: AsyncSequence, AsyncIteratorProtocol, Sendable where T: Sendable { - private let poolName: String - internal init(name: String = "Pool") { self.poolName = name } - - typealias Element = T - - enum State: ~Copyable { - case buffer(Deque) - case waitingForAny(CheckedContinuation) // FIFO waiting (for invocations) - case waitingForSpecific([String: CheckedContinuation]) // RequestId-based waiting (for responses) - } - - private let lock = Mutex(.buffer([])) - - /// enqueue an element, or give it back immediately to the iterator if it is waiting for an element - public func push(_ item: T) { - let continuationToResume = self.lock.withLock { state -> CheckedContinuation? in - switch consume state { - case .buffer(var buffer): - buffer.append(item) - state = .buffer(buffer) - return nil - - case .waitingForAny(let continuation): - // Someone is waiting for any item (FIFO) - state = .buffer([]) - return continuation - - case .waitingForSpecific(var continuations): - // Check if this item matches any waiting continuation - if let response = item as? LocalServerResponse, - let requestId = response.requestId, - let continuation = continuations.removeValue(forKey: requestId) - { - // Found a matching continuation - if continuations.isEmpty { - state = .buffer([]) - } else { - state = .waitingForSpecific(continuations) - } - return continuation - } else { - // No matching continuation, add to buffer - var buffer = Deque() - buffer.append(item) - state = .buffer(buffer) - return nil - } - } - } - - // Resume continuation outside the lock to prevent potential deadlocks - continuationToResume?.resume(returning: item) - } - - /// Unified next() method that handles both FIFO and requestId-specific waiting - private func _next(for requestId: String?) async throws -> T { - // exit if the task is cancelled - guard !Task.isCancelled else { - throw CancellationError() - } - - return try await withTaskCancellationHandler { - try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - let nextAction: Result? = self.lock.withLock { state -> Result? in - switch consume state { - case .buffer(var buffer): - if let requestId = requestId { - // Look for oldest (first) item for this requestId in buffer - if let index = buffer.firstIndex(where: { item in - if let response = item as? LocalServerResponse { - return response.requestId == requestId - } - return false - }) { - let item = buffer.remove(at: index) - state = .buffer(buffer) - return .success(item) - } else { - // No matching item, wait for it - var continuations: [String: CheckedContinuation] = [:] - continuations[requestId] = continuation - state = .waitingForSpecific(continuations) - return nil - } - } else { - // FIFO mode - take first item - if let first = buffer.popFirst() { - state = .buffer(buffer) - return .success(first) - } else { - state = .waitingForAny(continuation) - return nil - } - } - - case .waitingForAny(let previousContinuation): - if requestId == nil { - // Another FIFO call while already waiting - state = .buffer([]) - return .failure(PoolError(cause: .nextCalledTwice(previousContinuation))) - } else { - // Can't mix FIFO and specific waiting - state = .waitingForAny(previousContinuation) - return .failure(PoolError(cause: .mixedWaitingModes)) - } - - case .waitingForSpecific(var continuations): - if let requestId = requestId { - if continuations[requestId] != nil { - // Already waiting for this requestId - state = .waitingForSpecific(continuations) - return .failure(PoolError(cause: .duplicateRequestIdWait(requestId))) - } else { - continuations[requestId] = continuation - state = .waitingForSpecific(continuations) - return nil - } - } else { - // Can't mix FIFO and specific waiting - state = .waitingForSpecific(continuations) - return .failure(PoolError(cause: .mixedWaitingModes)) - } - } - } - - switch nextAction { - case .success(let item): - continuation.resume(returning: item) - case .failure(let error): - if case let .nextCalledTwice(prevContinuation) = error.cause { - prevContinuation.resume(throwing: error) - } - continuation.resume(throwing: error) - case .none: - // do nothing - continuation is stored in state - break - } - } - } onCancel: { - // Ensure we properly handle cancellation by checking if we have a stored continuation - let continuationsToCancel = self.lock.withLock { state -> [String: CheckedContinuation] in - switch consume state { - case .buffer(let buffer): - state = .buffer(buffer) - return [:] - case .waitingForAny(let continuation): - state = .buffer([]) - return ["": continuation] // Use empty string as key for single continuation - case .waitingForSpecific(let continuations): - state = .buffer([]) - return continuations - } - } - - // Resume all continuations outside the lock to avoid potential deadlocks - for continuation in continuationsToCancel.values { - continuation.resume(throwing: CancellationError()) - } - } - } - - /// Simple FIFO next() method - used by AsyncIteratorProtocol - func next() async throws -> T? { - try await _next(for: nil) - } - - /// RequestId-specific next() method for LocalServerResponse - NOT part of AsyncIteratorProtocol - func next(for requestId: String) async throws -> T { - try await _next(for: requestId) - } - - func makeAsyncIterator() -> Pool { - self - } - - struct PoolError: Error { - let cause: Cause - var message: String { - switch self.cause { - case .nextCalledTwice: - return "Concurrent invocations to next(). This is not allowed." - case .duplicateRequestIdWait(let requestId): - return "Already waiting for requestId: \(requestId)" - case .mixedWaitingModes: - return "Cannot mix FIFO waiting (next()) with specific waiting (next(for:))" - } - } - - enum Cause { - case nextCalledTwice(CheckedContinuation) - case duplicateRequestIdWait(String) - case mixedWaitingModes - } - } - } - - private struct LocalServerResponse: Sendable { + struct LocalServerResponse: Sendable { let requestId: String? let status: HTTPResponseStatus? let headers: HTTPHeaders? @@ -781,7 +580,7 @@ internal struct LambdaHTTPServer { } } - private struct LocalServerInvocation: Sendable { + struct LocalServerInvocation: Sendable { let requestId: String let request: ByteBuffer From 5529caeaa10fb9c642618406a64bc114cd11547e Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Thu, 16 Oct 2025 12:46:35 +0200 Subject: [PATCH 16/31] add unit test for the pool --- Tests/AWSLambdaRuntimeTests/PoolTests.swift | 326 +++++++++++++++++++- 1 file changed, 325 insertions(+), 1 deletion(-) diff --git a/Tests/AWSLambdaRuntimeTests/PoolTests.swift b/Tests/AWSLambdaRuntimeTests/PoolTests.swift index 1e2fff2e..93149655 100644 --- a/Tests/AWSLambdaRuntimeTests/PoolTests.swift +++ b/Tests/AWSLambdaRuntimeTests/PoolTests.swift @@ -12,6 +12,7 @@ // //===----------------------------------------------------------------------===// +import NIOCore import Testing @testable import AWSLambdaRuntime @@ -165,7 +166,7 @@ struct PoolTests { let pool = LambdaHTTPServer.Pool() // Create two tasks that will both wait for elements to be available - await #expect(throws: LambdaHTTPServer.Pool.PoolError.self) { + let error = await #expect(throws: LambdaHTTPServer.Pool.PoolError.self) { try await withThrowingTaskGroup(of: Void.self) { group in // one of the two task will throw a PoolError @@ -184,6 +185,329 @@ struct PoolTests { try await group.waitForAll() } } + + // Verify it's the correct error cause + if case .nextCalledTwice = error?.cause { + // This is the expected error + } else { + Issue.record("Expected nextCalledTwice error, got: \(String(describing: error?.cause))") + } + } + + // MARK: - Invariant Tests for RequestId-specific functionality + + @Test + @available(LambdaSwift 2.0, *) + func testRequestIdSpecificNext() async throws { + let pool = LambdaHTTPServer.Pool() + + // Push responses with different requestIds + pool.push(LambdaHTTPServer.LocalServerResponse(id: "req1", body: ByteBuffer(string: "data1"))) + pool.push(LambdaHTTPServer.LocalServerResponse(id: "req2", body: ByteBuffer(string: "data2"))) + pool.push(LambdaHTTPServer.LocalServerResponse(id: "req1", body: ByteBuffer(string: "data3"))) + + // Get specific responses + let response1 = try await pool.next(for: "req1") + #expect(response1.requestId == "req1") + #expect(String(buffer: response1.body!) == "data1") + + let response2 = try await pool.next(for: "req2") + #expect(response2.requestId == "req2") + #expect(String(buffer: response2.body!) == "data2") + + let response3 = try await pool.next(for: "req1") + #expect(response3.requestId == "req1") + #expect(String(buffer: response3.body!) == "data3") + } + + @Test + @available(LambdaSwift 2.0, *) + func testStreamingResponsesWithSameRequestId() async throws { + let pool = LambdaHTTPServer.Pool() + let requestId = "streaming-req" + + let chunks = try await withThrowingTaskGroup(of: [String].self) { group in + // Start consumer task + group.addTask { + var chunks: [String] = [] + var isComplete = false + + while !isComplete { + let response = try await pool.next(for: requestId) + if let body = response.body { + chunks.append(String(buffer: body)) + } + if response.final { + isComplete = true + } + } + return chunks + } + + // Start producer task + group.addTask { + // Give consumer time to start waiting + try await Task.sleep(nanoseconds: 10_000_000) // 0.01 seconds + + // Push multiple chunks for the same requestId + pool.push( + LambdaHTTPServer.LocalServerResponse( + id: requestId, + body: ByteBuffer(string: "chunk1"), + final: false + ) + ) + pool.push( + LambdaHTTPServer.LocalServerResponse( + id: requestId, + body: ByteBuffer(string: "chunk2"), + final: false + ) + ) + pool.push( + LambdaHTTPServer.LocalServerResponse(id: requestId, body: ByteBuffer(string: "chunk3"), final: true) + ) + + return [] // Producer doesn't return chunks + } + + // Wait for consumer to complete and return its result + for try await result in group { + if !result.isEmpty { + group.cancelAll() + return result + } + } + return [] + } + + #expect(chunks == ["chunk1", "chunk2", "chunk3"]) + } + + @Test + @available(LambdaSwift 2.0, *) + func testMixedWaitingModesError() async throws { + let pool = LambdaHTTPServer.Pool() + + let error = await #expect(throws: LambdaHTTPServer.Pool.PoolError.self) { + try await withThrowingTaskGroup(of: Void.self) { group in + // Start a FIFO consumer + group.addTask { + for try await _ in pool { + // This should block waiting for any item + } + } + + // Start a requestId-specific consumer after a delay + group.addTask { + // Give FIFO task time to start waiting + try await Task.sleep(nanoseconds: 10_000_000) // 0.01 seconds + + // Try to use requestId-specific next - should fail with mixedWaitingModes + _ = try await pool.next(for: "req1") + } + + // Wait for the first task to complete (which should be the error) + try await group.next() + group.cancelAll() + } + } + + // Verify it's the correct error cause + if case .mixedWaitingModes = error?.cause { + // This is the expected error + } else { + Issue.record("Expected mixedWaitingModes error, got: \(String(describing: error?.cause))") + } + } + + @Test + @available(LambdaSwift 2.0, *) + func testMixedWaitingModesErrorReverse() async throws { + let pool = LambdaHTTPServer.Pool() + + let error = await #expect(throws: LambdaHTTPServer.Pool.PoolError.self) { + try await withThrowingTaskGroup(of: Void.self) { group in + // Start a requestId-specific consumer + group.addTask { + _ = try await pool.next(for: "req1") + } + + // Start a FIFO consumer after a delay + group.addTask { + // Give specific task time to start waiting + try await Task.sleep(nanoseconds: 10_000_000) // 0.01 seconds + + // Try to use FIFO next - should fail with mixedWaitingModes + for try await _ in pool { + break + } + } + + // Wait for the first task to complete (which should be the error) + try await group.next() + group.cancelAll() + } + } + + // Verify it's the correct error cause + if case .mixedWaitingModes = error?.cause { + // This is the expected error + } else { + Issue.record("Expected mixedWaitingModes error, got: \(String(describing: error?.cause))") + } + } + + @Test + @available(LambdaSwift 2.0, *) + func testDuplicateRequestIdWaitError() async throws { + let pool = LambdaHTTPServer.Pool() + + let error = await #expect(throws: LambdaHTTPServer.Pool.PoolError.self) { + try await withThrowingTaskGroup(of: Void.self) { group in + // Start first consumer waiting for specific requestId + group.addTask { + _ = try await pool.next(for: "req1") + } + + // Start second consumer for same requestId after a delay + group.addTask { + // Give first task time to start waiting + try await Task.sleep(nanoseconds: 10_000_000) // 0.01 seconds + + // Try to wait for the same requestId - should fail + _ = try await pool.next(for: "req1") + } + + // Wait for the first task to complete (which should be the error) + try await group.next() + group.cancelAll() + } + } + + // Verify it's the correct error cause and requestId + if case let .duplicateRequestIdWait(requestId) = error?.cause { + #expect(requestId == "req1") + } else { + Issue.record("Expected duplicateRequestIdWait error, got: \(String(describing: error?.cause))") + } + } + + @Test + @available(LambdaSwift 2.0, *) + func testConcurrentRequestIdConsumers() async throws { + let pool = LambdaHTTPServer.Pool() + + let results = try await withThrowingTaskGroup(of: (String, String).self) { group in + // Start multiple consumers for different requestIds + group.addTask { + let response = try await pool.next(for: "req1") + return ("req1", String(buffer: response.body!)) + } + + group.addTask { + let response = try await pool.next(for: "req2") + return ("req2", String(buffer: response.body!)) + } + + group.addTask { + let response = try await pool.next(for: "req3") + return ("req3", String(buffer: response.body!)) + } + + // Start producer task + group.addTask { + // Give tasks time to start waiting + try await Task.sleep(nanoseconds: 10_000_000) // 0.01 seconds + + // Push responses in different order + pool.push(LambdaHTTPServer.LocalServerResponse(id: "req3", body: ByteBuffer(string: "data3"))) + pool.push(LambdaHTTPServer.LocalServerResponse(id: "req1", body: ByteBuffer(string: "data1"))) + pool.push(LambdaHTTPServer.LocalServerResponse(id: "req2", body: ByteBuffer(string: "data2"))) + + return ("producer", "") // Producer doesn't return meaningful data + } + + // Collect results from consumers + var consumerResults: [String: String] = [:] + for try await (requestId, data) in group { + if requestId != "producer" { + consumerResults[requestId] = data + } + if consumerResults.count == 3 { + group.cancelAll() + break + } + } + return consumerResults + } + + // Verify each consumer gets the correct response + #expect(results["req1"] == "data1") + #expect(results["req2"] == "data2") + #expect(results["req3"] == "data3") + } + + @Test + @available(LambdaSwift 2.0, *) + func testCancellationCleansUpAllContinuations() async throws { + let pool = LambdaHTTPServer.Pool() + + // Test that cancellation properly cleans up all continuations + do { + try await withThrowingTaskGroup(of: Void.self) { group in + // Start multiple consumers for different requestIds + group.addTask { + _ = try await pool.next(for: "req1") + } + + group.addTask { + _ = try await pool.next(for: "req2") + } + + group.addTask { + _ = try await pool.next(for: "req3") + } + + // Give tasks time to start waiting then cancel all + try await Task.sleep(nanoseconds: 10_000_000) // 0.01 seconds + group.cancelAll() + + try await group.waitForAll() + } + } catch is CancellationError { + // Expected - tasks should be cancelled + } + + // Pool should be back to clean state - verify by pushing and consuming normally + pool.push(LambdaHTTPServer.LocalServerResponse(id: "new-req", body: ByteBuffer(string: "new-data"))) + let response = try await pool.next(for: "new-req") + #expect(String(buffer: response.body!) == "new-data") + } + + @Test + @available(LambdaSwift 2.0, *) + func testBufferOrderingWithRequestIds() async throws { + let pool = LambdaHTTPServer.Pool() + + // Push multiple responses for the same requestId + pool.push(LambdaHTTPServer.LocalServerResponse(id: "req1", body: ByteBuffer(string: "first"))) + pool.push(LambdaHTTPServer.LocalServerResponse(id: "req2", body: ByteBuffer(string: "other"))) + pool.push(LambdaHTTPServer.LocalServerResponse(id: "req1", body: ByteBuffer(string: "second"))) + pool.push(LambdaHTTPServer.LocalServerResponse(id: "req1", body: ByteBuffer(string: "third"))) + + // Consume in order - should get FIFO order for the same requestId + let first = try await pool.next(for: "req1") + #expect(String(buffer: first.body!) == "first") + + let second = try await pool.next(for: "req1") + #expect(String(buffer: second.body!) == "second") + + let other = try await pool.next(for: "req2") + #expect(String(buffer: other.body!) == "other") + + let third = try await pool.next(for: "req1") + #expect(String(buffer: third.body!) == "third") } } From 9abbe8e1041f33c80b0073b27364477206f2cde0 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Thu, 16 Oct 2025 12:53:48 +0200 Subject: [PATCH 17/31] remove forced trace level for logging --- Sources/AWSLambdaRuntime/Lambda+LocalServer.swift | 4 ---- 1 file changed, 4 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index dca1e055..7e462d95 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -125,10 +125,6 @@ internal struct LambdaHTTPServer { _ closure: sending @escaping () async throws -> Result ) async throws -> Result { - var l = Logger(label: "HTTPServer") - l.logLevel = .trace - let logger = l - let channel = try await ServerBootstrap(group: eventLoopGroup) .serverChannelOption(.backlog, value: 256) .serverChannelOption(.socketOption(.so_reuseaddr), value: 1) From 6311b4678a35e97e76235f81318d236286f63bbf Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Thu, 16 Oct 2025 13:04:03 +0200 Subject: [PATCH 18/31] add license header --- .../AWSLambdaRuntime/Lambda+LocalServer+Pool.swift | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer+Pool.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer+Pool.swift index 0e7814cd..cb3646ca 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer+Pool.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer+Pool.swift @@ -1,3 +1,17 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2025 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + #if LocalServerSupport import DequeModule import Synchronization From 18e09c638cc08453007080779bcfe686d07d046a Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Thu, 16 Oct 2025 13:20:33 +0200 Subject: [PATCH 19/31] skip some tests on Swift 6.0 --- Tests/AWSLambdaRuntimeTests/PoolTests.swift | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/Tests/AWSLambdaRuntimeTests/PoolTests.swift b/Tests/AWSLambdaRuntimeTests/PoolTests.swift index 93149655..da66a79e 100644 --- a/Tests/AWSLambdaRuntimeTests/PoolTests.swift +++ b/Tests/AWSLambdaRuntimeTests/PoolTests.swift @@ -12,6 +12,8 @@ // //===----------------------------------------------------------------------===// +#if LocalServerSupport + import NIOCore import Testing @@ -160,6 +162,10 @@ struct PoolTests { #expect(Set(receivedValues).count == producerCount * messagesPerProducer) } +// in Swift 6.0, the error returned by #expect(throwing:) macro is a tuple () +// I decided to skip these tests on Swift 6.0 +#if swift(>=6.1) + @Test @available(LambdaSwift 2.0, *) func testConcurrentNext() async throws { @@ -509,5 +515,7 @@ struct PoolTests { let third = try await pool.next(for: "req1") #expect(String(buffer: third.body!) == "third") } +#endif //swift >= 6.1 } +#endif // trait From c07555a97cfbc8701875cd60991c8adf8ccf0f1c Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Thu, 16 Oct 2025 13:21:22 +0200 Subject: [PATCH 20/31] swift-format --- Tests/AWSLambdaRuntimeTests/PoolTests.swift | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/Tests/AWSLambdaRuntimeTests/PoolTests.swift b/Tests/AWSLambdaRuntimeTests/PoolTests.swift index da66a79e..0ce34044 100644 --- a/Tests/AWSLambdaRuntimeTests/PoolTests.swift +++ b/Tests/AWSLambdaRuntimeTests/PoolTests.swift @@ -162,9 +162,9 @@ struct PoolTests { #expect(Set(receivedValues).count == producerCount * messagesPerProducer) } -// in Swift 6.0, the error returned by #expect(throwing:) macro is a tuple () -// I decided to skip these tests on Swift 6.0 -#if swift(>=6.1) + // in Swift 6.0, the error returned by #expect(throwing:) macro is a tuple () + // I decided to skip these tests on Swift 6.0 + #if swift(>=6.1) @Test @available(LambdaSwift 2.0, *) @@ -515,7 +515,7 @@ struct PoolTests { let third = try await pool.next(for: "req1") #expect(String(buffer: third.body!) == "third") } -#endif //swift >= 6.1 + #endif //swift >= 6.1 } -#endif // trait +#endif // trait From bff7507a64f03f5f5b64e82e04fb211f94ddea8d Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Thu, 16 Oct 2025 14:21:50 +0200 Subject: [PATCH 21/31] replace a do {} catch {} with #expect(throwing:) --- Tests/AWSLambdaRuntimeTests/PoolTests.swift | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/Tests/AWSLambdaRuntimeTests/PoolTests.swift b/Tests/AWSLambdaRuntimeTests/PoolTests.swift index 0ce34044..257521ef 100644 --- a/Tests/AWSLambdaRuntimeTests/PoolTests.swift +++ b/Tests/AWSLambdaRuntimeTests/PoolTests.swift @@ -460,7 +460,7 @@ struct PoolTests { let pool = LambdaHTTPServer.Pool() // Test that cancellation properly cleans up all continuations - do { + await #expect(throws: CancellationError.self) { try await withThrowingTaskGroup(of: Void.self) { group in // Start multiple consumers for different requestIds group.addTask { @@ -481,9 +481,7 @@ struct PoolTests { try await group.waitForAll() } - } catch is CancellationError { - // Expected - tasks should be cancelled - } + } // Pool should be back to clean state - verify by pushing and consuming normally pool.push(LambdaHTTPServer.LocalServerResponse(id: "new-req", body: ByteBuffer(string: "new-data"))) From 86591a74105b916a300cf56530ae1e6085a7061a Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Thu, 16 Oct 2025 14:30:55 +0200 Subject: [PATCH 22/31] swift-format --- Tests/AWSLambdaRuntimeTests/PoolTests.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/AWSLambdaRuntimeTests/PoolTests.swift b/Tests/AWSLambdaRuntimeTests/PoolTests.swift index 257521ef..9d5b34ed 100644 --- a/Tests/AWSLambdaRuntimeTests/PoolTests.swift +++ b/Tests/AWSLambdaRuntimeTests/PoolTests.swift @@ -481,7 +481,7 @@ struct PoolTests { try await group.waitForAll() } - } + } // Pool should be back to clean state - verify by pushing and consuming normally pool.push(LambdaHTTPServer.LocalServerResponse(id: "new-req", body: ByteBuffer(string: "new-data"))) From df989af0105d4c570ffb2159acdddab24726d742 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Thu, 16 Oct 2025 17:41:56 +0200 Subject: [PATCH 23/31] Generate time independent requestID + add a test for rapid fire --- .../AWSLambdaRuntime/Lambda+LocalServer.swift | 13 ++- .../LambdaLocalServerTests.swift | 88 +++++++++++++++++++ 2 files changed, 98 insertions(+), 3 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index 7e462d95..72c705b2 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -20,6 +20,13 @@ import NIOHTTP1 import NIOPosix import Synchronization +// for UUID +#if canImport(FoundationEssentials) +import FoundationEssentials +#else +import Foundation +#endif + // This functionality is designed for local testing when the LocalServerSupport trait is enabled. // For example: @@ -389,7 +396,7 @@ internal struct LambdaHTTPServer { ) } // we always accept the /invoke request and push them to the pool - let requestId = "\(LambdaClock().now))" + let requestId = "\(UUID().uuidString))" logger[metadataKey: "requestId"] = "\(requestId)" logger.trace("/invoke received invocation, pushing it to the pool and wait for a lambda response") @@ -412,7 +419,7 @@ internal struct LambdaHTTPServer { isComplete = true } } - } catch is LambdaHTTPServer.Pool.PoolError { + } catch let error as LambdaHTTPServer.Pool.PoolError { logger.trace("PoolError catched") // detect concurrent invocations of POST and gently decline the requests while we're processing one. let response = LocalServerResponse( @@ -420,7 +427,7 @@ internal struct LambdaHTTPServer { status: .badRequest, body: ByteBuffer( string: - "It is not allowed to invoke multiple Lambda function executions in parallel. (The Lambda runtime environment on AWS will never do that)" + "\(error): It is not allowed to invoke multiple Lambda function executions in parallel. (The Lambda runtime environment on AWS will never do that)" ) ) try await self.sendResponse(response, outbound: outbound, logger: logger) diff --git a/Tests/AWSLambdaRuntimeTests/LambdaLocalServerTests.swift b/Tests/AWSLambdaRuntimeTests/LambdaLocalServerTests.swift index 1bcf0033..0f9a55e2 100644 --- a/Tests/AWSLambdaRuntimeTests/LambdaLocalServerTests.swift +++ b/Tests/AWSLambdaRuntimeTests/LambdaLocalServerTests.swift @@ -19,6 +19,12 @@ import Testing @testable import AWSLambdaRuntime +#if canImport(FoundationEssentials) +import FoundationEssentials +#else +import Foundation +#endif + extension LambdaRuntimeTests { @Test("Local server respects LOCAL_LAMBDA_PORT environment variable") @@ -77,6 +83,88 @@ extension LambdaRuntimeTests { #expect(result == true) } + @Test("Local server handles rapid concurrent requests without HTTP 400 errors") + @available(LambdaSwift 2.0, *) + func testRapidConcurrentRequests() async throws { + let customPort = 8081 + + // Set environment variable + setenv("LOCAL_LAMBDA_PORT", "\(customPort)", 1) + defer { unsetenv("LOCAL_LAMBDA_PORT") } + + let results = try await withThrowingTaskGroup(of: [Int].self) { group in + + // Start the Lambda runtime with local server + group.addTask { + let runtime = LambdaRuntime { (event: String, context: LambdaContext) in + try await Task.sleep(for: .milliseconds(100)) + return "Hello \(event)" + } + + // Start runtime (this will block until cancelled) + try await runtime._run() + return [] + } + + // Start HTTP client to make rapid requests + group.addTask { + // Give server time to start + try await Task.sleep(for: .milliseconds(200)) + + // Make 10 rapid concurrent POST requests to /invoke + return try await withThrowingTaskGroup(of: Int.self) { clientGroup in + var statuses: [Int] = [] + + for i in 0..<10 { + try await Task.sleep(for: .milliseconds(0)) + clientGroup.addTask { + let (_, response) = try await self.makeInvokeRequest( + host: "127.0.0.1", + port: customPort, + payload: "\"World\(i)\"" + ) + return response.statusCode + } + } + + for try await status in clientGroup { + statuses.append(status) + } + + return statuses + } + } + + // Get the first result (HTTP statuses) and cancel the runtime + let first = try await group.next() + group.cancelAll() + return first ?? [] + } + + // Verify all requests returned 200 OK (no HTTP 400 errors) + #expect(results.count == 10, "Expected 10 responses") + for (index, status) in results.enumerated() { + #expect(status == 202, "Request \(index) returned \(status), expected 202 OK") + } + } + + private func makeInvokeRequest(host: String, port: Int, payload: String) async throws -> (Data, HTTPURLResponse) { + let url = URL(string: "http://\(host):\(port)/invoke")! + var request = URLRequest(url: url) + request.httpMethod = "POST" + request.setValue("application/json", forHTTPHeaderField: "Content-Type") + request.httpBody = payload.data(using: .utf8) + request.timeoutInterval = 10.0 + + let (data, response) = try await URLSession.shared.data(for: request) + + guard let httpResponse = response as? HTTPURLResponse else { + throw URLError(.badServerResponse) + } + + return (data, httpResponse) + } + private func isPortResponding(host: String, port: Int) async throws -> Bool { let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) From 585a41466d4db92353a0e16cbbafe861248e833e Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Thu, 16 Oct 2025 17:50:37 +0200 Subject: [PATCH 24/31] fix test on Linux --- .../LambdaLocalServerTests.swift | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/Tests/AWSLambdaRuntimeTests/LambdaLocalServerTests.swift b/Tests/AWSLambdaRuntimeTests/LambdaLocalServerTests.swift index 0f9a55e2..9043d1a6 100644 --- a/Tests/AWSLambdaRuntimeTests/LambdaLocalServerTests.swift +++ b/Tests/AWSLambdaRuntimeTests/LambdaLocalServerTests.swift @@ -25,6 +25,12 @@ import FoundationEssentials import Foundation #endif +#if canImport(FoundationNetworking) +import FoundationNetworking +#else +import Foundation +#endif + extension LambdaRuntimeTests { @Test("Local server respects LOCAL_LAMBDA_PORT environment variable") @@ -159,7 +165,11 @@ extension LambdaRuntimeTests { let (data, response) = try await URLSession.shared.data(for: request) guard let httpResponse = response as? HTTPURLResponse else { - throw URLError(.badServerResponse) + // Create a custom error since URLError might not be available on Linux + struct HTTPError: Error { + let message: String + } + throw HTTPError(message: "Bad server response") } return (data, httpResponse) From 88454674aae6053beb66f361805ef7a38c9464ed Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Thu, 16 Oct 2025 18:01:22 +0200 Subject: [PATCH 25/31] Also check that the test responses are in the correct order --- .../LambdaLocalServerTests.swift | 53 ++++++++++++++----- 1 file changed, 41 insertions(+), 12 deletions(-) diff --git a/Tests/AWSLambdaRuntimeTests/LambdaLocalServerTests.swift b/Tests/AWSLambdaRuntimeTests/LambdaLocalServerTests.swift index 9043d1a6..0079d372 100644 --- a/Tests/AWSLambdaRuntimeTests/LambdaLocalServerTests.swift +++ b/Tests/AWSLambdaRuntimeTests/LambdaLocalServerTests.swift @@ -98,7 +98,13 @@ extension LambdaRuntimeTests { setenv("LOCAL_LAMBDA_PORT", "\(customPort)", 1) defer { unsetenv("LOCAL_LAMBDA_PORT") } - let results = try await withThrowingTaskGroup(of: [Int].self) { group in + struct RequestResult { + let requestIndex: Int + let statusCode: Int + let responseBody: String + } + + let results = try await withThrowingTaskGroup(of: [RequestResult].self) { group in // Start the Lambda runtime with local server group.addTask { @@ -118,39 +124,62 @@ extension LambdaRuntimeTests { try await Task.sleep(for: .milliseconds(200)) // Make 10 rapid concurrent POST requests to /invoke - return try await withThrowingTaskGroup(of: Int.self) { clientGroup in - var statuses: [Int] = [] + return try await withThrowingTaskGroup(of: RequestResult.self) { clientGroup in + var requestResults: [RequestResult] = [] for i in 0..<10 { try await Task.sleep(for: .milliseconds(0)) clientGroup.addTask { - let (_, response) = try await self.makeInvokeRequest( + let (data, response) = try await self.makeInvokeRequest( host: "127.0.0.1", port: customPort, payload: "\"World\(i)\"" ) - return response.statusCode + let responseBody = String(data: data, encoding: .utf8) ?? "" + return RequestResult( + requestIndex: i, + statusCode: response.statusCode, + responseBody: responseBody + ) } } - for try await status in clientGroup { - statuses.append(status) + for try await result in clientGroup { + requestResults.append(result) } - return statuses + return requestResults } } - // Get the first result (HTTP statuses) and cancel the runtime + // Get the first result (request results) and cancel the runtime let first = try await group.next() group.cancelAll() return first ?? [] } - // Verify all requests returned 200 OK (no HTTP 400 errors) + // Verify all requests returned 202 OK (no HTTP 400 errors) #expect(results.count == 10, "Expected 10 responses") - for (index, status) in results.enumerated() { - #expect(status == 202, "Request \(index) returned \(status), expected 202 OK") + for result in results { + #expect( + result.statusCode == 202, + "Request \(result.requestIndex) returned \(result.statusCode), expected 202 OK" + ) + } + + // Verify that each request was processed correctly by checking response content + // Sort results by request index to verify proper execution order + let sortedResults = results.sorted { $0.requestIndex < $1.requestIndex } + for (index, result) in sortedResults.enumerated() { + let expectedResponse = "\"Hello World\(index)\"" + #expect( + result.responseBody == expectedResponse, + "Request \(index) response was '\(result.responseBody)', expected '\(expectedResponse)'" + ) + #expect( + result.requestIndex == index, + "Request order mismatch: got index \(result.requestIndex), expected \(index)" + ) } } From 613879033ba8cf6f766c66625c7bdb836aabb4e8 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Thu, 16 Oct 2025 18:17:24 +0200 Subject: [PATCH 26/31] simplify the test #expect --- .../LambdaLocalServerTests.swift | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/Tests/AWSLambdaRuntimeTests/LambdaLocalServerTests.swift b/Tests/AWSLambdaRuntimeTests/LambdaLocalServerTests.swift index 0079d372..9787aa2e 100644 --- a/Tests/AWSLambdaRuntimeTests/LambdaLocalServerTests.swift +++ b/Tests/AWSLambdaRuntimeTests/LambdaLocalServerTests.swift @@ -158,14 +158,7 @@ extension LambdaRuntimeTests { return first ?? [] } - // Verify all requests returned 202 OK (no HTTP 400 errors) #expect(results.count == 10, "Expected 10 responses") - for result in results { - #expect( - result.statusCode == 202, - "Request \(result.requestIndex) returned \(result.statusCode), expected 202 OK" - ) - } // Verify that each request was processed correctly by checking response content // Sort results by request index to verify proper execution order @@ -180,6 +173,10 @@ extension LambdaRuntimeTests { result.requestIndex == index, "Request order mismatch: got index \(result.requestIndex), expected \(index)" ) + #expect( + result.statusCode == 202, + "Request \(result.requestIndex) returned \(result.statusCode), expected 202 OK" + ) } } From 669ad43f347ba0f7e8f9090eb2496f61c57a5c83 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Thu, 16 Oct 2025 18:24:01 +0200 Subject: [PATCH 27/31] use String(decoding:) instead of String(data:) --- Tests/AWSLambdaRuntimeTests/LambdaLocalServerTests.swift | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Tests/AWSLambdaRuntimeTests/LambdaLocalServerTests.swift b/Tests/AWSLambdaRuntimeTests/LambdaLocalServerTests.swift index 9787aa2e..7fbabd5f 100644 --- a/Tests/AWSLambdaRuntimeTests/LambdaLocalServerTests.swift +++ b/Tests/AWSLambdaRuntimeTests/LambdaLocalServerTests.swift @@ -135,11 +135,10 @@ extension LambdaRuntimeTests { port: customPort, payload: "\"World\(i)\"" ) - let responseBody = String(data: data, encoding: .utf8) ?? "" return RequestResult( requestIndex: i, statusCode: response.statusCode, - responseBody: responseBody + responseBody: String(decoding: data, as: UTF8.self) ) } } From 9c85560c45ead7ade9cfcecdead7b0d04ac96931 Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Thu, 16 Oct 2025 20:14:06 +0200 Subject: [PATCH 28/31] State : use a struct instead of enum --- .../Lambda+LocalServer+Pool.swift | 168 +++++++----------- 1 file changed, 68 insertions(+), 100 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer+Pool.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer+Pool.swift index cb3646ca..f7cf99e2 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer+Pool.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer+Pool.swift @@ -28,51 +28,36 @@ extension LambdaHTTPServer { typealias Element = T - enum State: ~Copyable { - case buffer(Deque) - // FIFO waiting (for invocations) - case waitingForAny(CheckedContinuation) - // RequestId-based waiting (for responses) - case waitingForSpecific([String: CheckedContinuation]) + struct State { + var buffer: Deque = [] + var waitingForAny: CheckedContinuation? + var waitingForSpecific: [String: CheckedContinuation] = [:] } - private let lock = Mutex(.buffer([])) + private let lock = Mutex(State()) /// enqueue an element, or give it back immediately to the iterator if it is waiting for an element public func push(_ item: T) { let continuationToResume = self.lock.withLock { state -> CheckedContinuation? in - switch consume state { - case .buffer(var buffer): - buffer.append(item) - state = .buffer(buffer) - return nil - - case .waitingForAny(let continuation): - // Someone is waiting for any item (FIFO) - state = .buffer([]) + // First check if there's a waiting continuation that can handle this item + + // Check for FIFO waiter first + if let continuation = state.waitingForAny { + state.waitingForAny = nil return continuation + } - case .waitingForSpecific(var continuations): - // Check if this item matches any waiting continuation - if let response = item as? LocalServerResponse, - let requestId = response.requestId, - let continuation = continuations.removeValue(forKey: requestId) - { - // Found a matching continuation - if continuations.isEmpty { - state = .buffer([]) - } else { - state = .waitingForSpecific(continuations) - } - return continuation - } else { - // No matching continuation, add to buffer - var buffer = Deque() - buffer.append(item) - state = .buffer(buffer) - return nil - } + // Check for specific waiter + if let response = item as? LocalServerResponse, + let requestId = response.requestId, + let continuation = state.waitingForSpecific.removeValue(forKey: requestId) + { + return continuation } + + // No waiting continuation, add to buffer + state.buffer.append(item) + return nil } // Resume continuation outside the lock to prevent potential deadlocks @@ -89,63 +74,44 @@ extension LambdaHTTPServer { return try await withTaskCancellationHandler { try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in let nextAction: Result? = self.lock.withLock { state -> Result? in - switch consume state { - case .buffer(var buffer): - if let requestId = requestId { - // Look for oldest (first) item for this requestId in buffer - if let index = buffer.firstIndex(where: { item in - if let response = item as? LocalServerResponse { - return response.requestId == requestId - } - return false - }) { - let item = buffer.remove(at: index) - state = .buffer(buffer) - return .success(item) - } else { - // No matching item, wait for it - var continuations: [String: CheckedContinuation] = [:] - continuations[requestId] = continuation - state = .waitingForSpecific(continuations) - return nil + if let requestId = requestId { + // Look for oldest (first) item for this requestId in buffer + if let index = state.buffer.firstIndex(where: { item in + if let response = item as? LocalServerResponse { + return response.requestId == requestId } + return false + }) { + let item = state.buffer.remove(at: index) + return .success(item) } else { - // FIFO mode - take first item - if let first = buffer.popFirst() { - state = .buffer(buffer) - return .success(first) - } else { - state = .waitingForAny(continuation) - return nil + // Check for conflicting waiters + if state.waitingForAny != nil { + return .failure(PoolError(cause: .mixedWaitingModes)) } - } - - case .waitingForAny(let previousContinuation): - if requestId == nil { - // Another FIFO call while already waiting - state = .buffer([]) - return .failure(PoolError(cause: .nextCalledTwice(previousContinuation))) - } else { - // Can't mix FIFO and specific waiting - state = .waitingForAny(previousContinuation) - return .failure(PoolError(cause: .mixedWaitingModes)) - } - - case .waitingForSpecific(var continuations): - if let requestId = requestId { - if continuations[requestId] != nil { - // Already waiting for this requestId - state = .waitingForSpecific(continuations) + if state.waitingForSpecific[requestId] != nil { return .failure(PoolError(cause: .duplicateRequestIdWait(requestId))) - } else { - continuations[requestId] = continuation - state = .waitingForSpecific(continuations) - return nil } + + // No matching item, wait for it + state.waitingForSpecific[requestId] = continuation + return nil + } + } else { + // FIFO mode - take first item + if let first = state.buffer.popFirst() { + return .success(first) } else { - // Can't mix FIFO and specific waiting - state = .waitingForSpecific(continuations) - return .failure(PoolError(cause: .mixedWaitingModes)) + // Check for conflicting waiters + if !state.waitingForSpecific.isEmpty { + return .failure(PoolError(cause: .mixedWaitingModes)) + } + if state.waitingForAny != nil { + return .failure(PoolError(cause: .nextCalledTwice(state.waitingForAny!))) + } + + state.waitingForAny = continuation + return nil } } } @@ -164,23 +130,25 @@ extension LambdaHTTPServer { } } } onCancel: { - // Ensure we properly handle cancellation by checking if we have a stored continuation - let continuationsToCancel = self.lock.withLock { state -> [String: CheckedContinuation] in - switch consume state { - case .buffer(let buffer): - state = .buffer(buffer) - return [:] - case .waitingForAny(let continuation): - state = .buffer([]) - return ["": continuation] // Use empty string as key for single continuation - case .waitingForSpecific(let continuations): - state = .buffer([]) - return continuations + // Ensure we properly handle cancellation by removing stored continuation + let continuationsToCancel = self.lock.withLock { state -> [CheckedContinuation] in + var toCancel: [CheckedContinuation] = [] + + if let continuation = state.waitingForAny { + toCancel.append(continuation) + state.waitingForAny = nil } + + for continuation in state.waitingForSpecific.values { + toCancel.append(continuation) + } + state.waitingForSpecific.removeAll() + + return toCancel } // Resume all continuations outside the lock to avoid potential deadlocks - for continuation in continuationsToCancel.values { + for continuation in continuationsToCancel { continuation.resume(throwing: CancellationError()) } } From 54744c55d62d8de813fb187470dadfb0a669f367 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Thu, 16 Oct 2025 20:21:38 +0200 Subject: [PATCH 29/31] Update Sources/AWSLambdaRuntime/Lambda+LocalServer.swift Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- Sources/AWSLambdaRuntime/Lambda+LocalServer.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index 72c705b2..902b51f6 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -396,7 +396,7 @@ internal struct LambdaHTTPServer { ) } // we always accept the /invoke request and push them to the pool - let requestId = "\(UUID().uuidString))" + let requestId = UUID().uuidString logger[metadataKey: "requestId"] = "\(requestId)" logger.trace("/invoke received invocation, pushing it to the pool and wait for a lambda response") From 503f2dae1f3c5e684de80d4443eb246ed321c7f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Stormacq?= Date: Thu, 16 Oct 2025 20:21:49 +0200 Subject: [PATCH 30/31] Update Sources/AWSLambdaRuntime/Lambda+LocalServer.swift Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- Sources/AWSLambdaRuntime/Lambda+LocalServer.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index 902b51f6..0aab679a 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -420,7 +420,7 @@ internal struct LambdaHTTPServer { } } } catch let error as LambdaHTTPServer.Pool.PoolError { - logger.trace("PoolError catched") + logger.trace("PoolError caught") // detect concurrent invocations of POST and gently decline the requests while we're processing one. let response = LocalServerResponse( id: requestId, From ab77e5a8bb47d2b0e6871e620c7345e98c12cd0b Mon Sep 17 00:00:00 2001 From: Sebastien Stormacq Date: Fri, 17 Oct 2025 12:35:08 +0200 Subject: [PATCH 31/31] return an HTTP 5xx error in case we mess up something in the Pool --- Sources/AWSLambdaRuntime/Lambda+LocalServer.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index 4d17ed55..81bc91be 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -428,11 +428,11 @@ internal struct LambdaHTTPServer { } } } catch let error as LambdaHTTPServer.Pool.PoolError { - logger.trace("PoolError caught") + logger.trace("PoolError caught", metadata: ["error": "\(error)"]) // detect concurrent invocations of POST and gently decline the requests while we're processing one. let response = LocalServerResponse( id: requestId, - status: .badRequest, + status: .internalServerError, body: ByteBuffer( string: "\(error): It is not allowed to invoke multiple Lambda function executions in parallel. (The Lambda runtime environment on AWS will never do that)"