diff --git a/Examples/Streaming/README.md b/Examples/Streaming/README.md index 2c40df57..663ac14c 100644 --- a/Examples/Streaming/README.md +++ b/Examples/Streaming/README.md @@ -82,7 +82,7 @@ You can test the function locally before deploying: swift run # In another terminal, test with curl: -curl -v \ +curl -v --output response.txt \ --header "Content-Type: application/json" \ --data '"this is not used"' \ http://127.0.0.1:7000/invoke diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer+Pool.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer+Pool.swift new file mode 100644 index 00000000..f7cf99e2 --- /dev/null +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer+Pool.swift @@ -0,0 +1,192 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the SwiftAWSLambdaRuntime open source project +// +// Copyright (c) 2025 Apple Inc. and the SwiftAWSLambdaRuntime project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +#if LocalServerSupport +import DequeModule +import Synchronization + +@available(LambdaSwift 2.0, *) +extension LambdaHTTPServer { + + /// A shared data structure to store the current invocation or response requests and the continuation objects. + /// This data structure is shared between instances of the HTTPHandler + /// (one instance to serve requests from the Lambda function and one instance to serve requests from the client invoking the lambda function). + internal final class Pool: AsyncSequence, AsyncIteratorProtocol, Sendable where T: Sendable { + private let poolName: String + internal init(name: String = "Pool") { self.poolName = name } + + typealias Element = T + + struct State { + var buffer: Deque = [] + var waitingForAny: CheckedContinuation? + var waitingForSpecific: [String: CheckedContinuation] = [:] + } + + private let lock = Mutex(State()) + + /// enqueue an element, or give it back immediately to the iterator if it is waiting for an element + public func push(_ item: T) { + let continuationToResume = self.lock.withLock { state -> CheckedContinuation? in + // First check if there's a waiting continuation that can handle this item + + // Check for FIFO waiter first + if let continuation = state.waitingForAny { + state.waitingForAny = nil + return continuation + } + + // Check for specific waiter + if let response = item as? LocalServerResponse, + let requestId = response.requestId, + let continuation = state.waitingForSpecific.removeValue(forKey: requestId) + { + return continuation + } + + // No waiting continuation, add to buffer + state.buffer.append(item) + return nil + } + + // Resume continuation outside the lock to prevent potential deadlocks + continuationToResume?.resume(returning: item) + } + + /// Unified next() method that handles both FIFO and requestId-specific waiting + private func _next(for requestId: String?) async throws -> T { + // exit if the task is cancelled + guard !Task.isCancelled else { + throw CancellationError() + } + + return try await withTaskCancellationHandler { + try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + let nextAction: Result? = self.lock.withLock { state -> Result? in + if let requestId = requestId { + // Look for oldest (first) item for this requestId in buffer + if let index = state.buffer.firstIndex(where: { item in + if let response = item as? LocalServerResponse { + return response.requestId == requestId + } + return false + }) { + let item = state.buffer.remove(at: index) + return .success(item) + } else { + // Check for conflicting waiters + if state.waitingForAny != nil { + return .failure(PoolError(cause: .mixedWaitingModes)) + } + if state.waitingForSpecific[requestId] != nil { + return .failure(PoolError(cause: .duplicateRequestIdWait(requestId))) + } + + // No matching item, wait for it + state.waitingForSpecific[requestId] = continuation + return nil + } + } else { + // FIFO mode - take first item + if let first = state.buffer.popFirst() { + return .success(first) + } else { + // Check for conflicting waiters + if !state.waitingForSpecific.isEmpty { + return .failure(PoolError(cause: .mixedWaitingModes)) + } + if state.waitingForAny != nil { + return .failure(PoolError(cause: .nextCalledTwice(state.waitingForAny!))) + } + + state.waitingForAny = continuation + return nil + } + } + } + + switch nextAction { + case .success(let item): + continuation.resume(returning: item) + case .failure(let error): + if case let .nextCalledTwice(prevContinuation) = error.cause { + prevContinuation.resume(throwing: error) + } + continuation.resume(throwing: error) + case .none: + // do nothing - continuation is stored in state + break + } + } + } onCancel: { + // Ensure we properly handle cancellation by removing stored continuation + let continuationsToCancel = self.lock.withLock { state -> [CheckedContinuation] in + var toCancel: [CheckedContinuation] = [] + + if let continuation = state.waitingForAny { + toCancel.append(continuation) + state.waitingForAny = nil + } + + for continuation in state.waitingForSpecific.values { + toCancel.append(continuation) + } + state.waitingForSpecific.removeAll() + + return toCancel + } + + // Resume all continuations outside the lock to avoid potential deadlocks + for continuation in continuationsToCancel { + continuation.resume(throwing: CancellationError()) + } + } + } + + /// Simple FIFO next() method - used by AsyncIteratorProtocol + func next() async throws -> T? { + try await _next(for: nil) + } + + /// RequestId-specific next() method for LocalServerResponse - NOT part of AsyncIteratorProtocol + func next(for requestId: String) async throws -> T { + try await _next(for: requestId) + } + + func makeAsyncIterator() -> Pool { + self + } + + struct PoolError: Error { + let cause: Cause + var message: String { + switch self.cause { + case .nextCalledTwice: + return "Concurrent invocations to next(). This is not allowed." + case .duplicateRequestIdWait(let requestId): + return "Already waiting for requestId: \(requestId)" + case .mixedWaitingModes: + return "Cannot mix FIFO waiting (next()) with specific waiting (next(for:))" + } + } + + enum Cause { + case nextCalledTwice(CheckedContinuation) + case duplicateRequestIdWait(String) + case mixedWaitingModes + } + } + } +} +#endif diff --git a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift index 1e924a55..81bc91be 100644 --- a/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift +++ b/Sources/AWSLambdaRuntime/Lambda+LocalServer.swift @@ -14,13 +14,19 @@ #if LocalServerSupport import DequeModule -import Dispatch import Logging import NIOCore import NIOHTTP1 import NIOPosix import Synchronization +// for UUID +#if canImport(FoundationEssentials) +import FoundationEssentials +#else +import Foundation +#endif + // This functionality is designed for local testing when the LocalServerSupport trait is enabled. // For example: @@ -95,8 +101,8 @@ extension Lambda { internal struct LambdaHTTPServer { private let invocationEndpoint: String - private let invocationPool = Pool() - private let responsePool = Pool() + private let invocationPool = Pool(name: "Invocation Pool") + private let responsePool = Pool(name: "Response Pool") private init( invocationEndpoint: String? @@ -125,6 +131,7 @@ internal struct LambdaHTTPServer { logger: Logger, _ closure: sending @escaping () async throws -> Result ) async throws -> Result { + let channel = try await ServerBootstrap(group: eventLoopGroup) .serverChannelOption(.backlog, value: 256) .serverChannelOption(.socketOption(.so_reuseaddr), value: 1) @@ -165,6 +172,8 @@ internal struct LambdaHTTPServer { let closureBox = UnsafeTransferBox(value: closure) let result = await withTaskGroup(of: TaskResult.self, returning: Swift.Result.self) { group in + + // this Task will run the content of the closure we received, typically the Lambda Runtime Client HTTP group.addTask { let c = closureBox.value do { @@ -175,6 +184,7 @@ internal struct LambdaHTTPServer { } } + // this Task will create one subtask to handle each individual connection group.addTask { do { // We are handling each incoming connection in a separate child task. It is important @@ -272,7 +282,7 @@ internal struct LambdaHTTPServer { // for streaming requests, push a partial head response if self.isStreamingResponse(requestHead) { - await self.responsePool.push( + self.responsePool.push( LocalServerResponse( id: requestId, status: .ok @@ -286,7 +296,7 @@ internal struct LambdaHTTPServer { // if this is a request from a Streaming Lambda Handler, // stream the response instead of buffering it if self.isStreamingResponse(requestHead) { - await self.responsePool.push( + self.responsePool.push( LocalServerResponse(id: requestId, body: body) ) } else { @@ -298,7 +308,7 @@ internal struct LambdaHTTPServer { if self.isStreamingResponse(requestHead) { // for streaming response, send the final response - await self.responsePool.push( + self.responsePool.push( LocalServerResponse(id: requestId, final: true) ) @@ -394,36 +404,42 @@ internal struct LambdaHTTPServer { ) } // we always accept the /invoke request and push them to the pool - let requestId = "\(DispatchTime.now().uptimeNanoseconds)" + let requestId = UUID().uuidString logger[metadataKey: "requestId"] = "\(requestId)" + logger.trace("/invoke received invocation, pushing it to the pool and wait for a lambda response") - await self.invocationPool.push(LocalServerInvocation(requestId: requestId, request: body)) + self.invocationPool.push(LocalServerInvocation(requestId: requestId, request: body)) // wait for the lambda function to process the request - for try await response in self.responsePool { - logger[metadataKey: "response requestId"] = "\(response.requestId ?? "nil")" - logger.trace("Received response to return to client") - if response.requestId == requestId { - logger.trace("/invoke requestId is valid, sending the response") - // send the response to the client - // if the response is final, we can send it and return - // if the response is not final, we can send it and wait for the next response + // Handle streaming responses by collecting all chunks for this requestId + do { + var isComplete = false + while !isComplete { + let response = try await self.responsePool.next(for: requestId) + logger[metadataKey: "response_requestId"] = "\(response.requestId ?? "nil")" + logger.trace("Received response chunk to return to client") + + // send the response chunk to the client try await self.sendResponse(response, outbound: outbound, logger: logger) + if response.final == true { - logger.trace("/invoke returning") - return // if the response is final, we can return and close the connection + logger.trace("/invoke complete, returning") + isComplete = true } - } else { - logger.error( - "Received response for a different request id", - metadata: ["response requestId": "\(response.requestId ?? "")"] - ) - // should we return an error here ? Or crash as this is probably a programming error? } + } catch let error as LambdaHTTPServer.Pool.PoolError { + logger.trace("PoolError caught", metadata: ["error": "\(error)"]) + // detect concurrent invocations of POST and gently decline the requests while we're processing one. + let response = LocalServerResponse( + id: requestId, + status: .internalServerError, + body: ByteBuffer( + string: + "\(error): It is not allowed to invoke multiple Lambda function executions in parallel. (The Lambda runtime environment on AWS will never do that)" + ) + ) + try await self.sendResponse(response, outbound: outbound, logger: logger) } - // What todo when there is no more responses to process? - // This should not happen as the async iterator blocks until there is a response to process - fatalError("No more responses to process - the async for loop should not return") // client uses incorrect HTTP method case (_, let url) where url.hasSuffix(self.invocationEndpoint): @@ -447,7 +463,9 @@ internal struct LambdaHTTPServer { logger[metadataKey: "requestId"] = "\(invocation.requestId)" logger.trace("/next retrieved invocation") // tell the lambda function we accepted the invocation - return try await sendResponse(invocation.acceptedResponse(), outbound: outbound, logger: logger) + try await sendResponse(invocation.acceptedResponse(), outbound: outbound, logger: logger) + logger.trace("/next accepted, returning") + return } // What todo when there is no more tasks to process? // This should not happen as the async iterator blocks until there is a task to process @@ -465,7 +483,7 @@ internal struct LambdaHTTPServer { } // enqueue the lambda function response to be served as response to the client /invoke logger.trace("/:requestId/response received response", metadata: ["requestId": "\(requestId)"]) - await self.responsePool.push( + self.responsePool.push( LocalServerResponse( id: requestId, status: .accepted, @@ -496,7 +514,7 @@ internal struct LambdaHTTPServer { } // enqueue the lambda function response to be served as response to the client /invoke logger.trace("/:requestId/response received response", metadata: ["requestId": "\(requestId)"]) - await self.responsePool.push( + self.responsePool.push( LocalServerResponse( id: requestId, status: .internalServerError, @@ -552,86 +570,7 @@ internal struct LambdaHTTPServer { } } - /// A shared data structure to store the current invocation or response requests and the continuation objects. - /// This data structure is shared between instances of the HTTPHandler - /// (one instance to serve requests from the Lambda function and one instance to serve requests from the client invoking the lambda function). - internal final class Pool: AsyncSequence, AsyncIteratorProtocol, Sendable where T: Sendable { - typealias Element = T - - enum State: ~Copyable { - case buffer(Deque) - case continuation(CheckedContinuation?) - } - - private let lock = Mutex(.buffer([])) - - /// enqueue an element, or give it back immediately to the iterator if it is waiting for an element - public func push(_ invocation: T) async { - // if the iterator is waiting for an element, give it to it - // otherwise, enqueue the element - let maybeContinuation = self.lock.withLock { state -> CheckedContinuation? in - switch consume state { - case .continuation(let continuation): - state = .buffer([]) - return continuation - - case .buffer(var buffer): - buffer.append(invocation) - state = .buffer(buffer) - return nil - } - } - - maybeContinuation?.resume(returning: invocation) - } - - func next() async throws -> T? { - // exit the async for loop if the task is cancelled - guard !Task.isCancelled else { - return nil - } - - return try await withTaskCancellationHandler { - try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in - let nextAction = self.lock.withLock { state -> T? in - switch consume state { - case .buffer(var buffer): - if let first = buffer.popFirst() { - state = .buffer(buffer) - return first - } else { - state = .continuation(continuation) - return nil - } - - case .continuation: - fatalError("Concurrent invocations to next(). This is illegal.") - } - } - - guard let nextAction else { return } - - continuation.resume(returning: nextAction) - } - } onCancel: { - self.lock.withLock { state in - switch consume state { - case .buffer(let buffer): - state = .buffer(buffer) - case .continuation(let continuation): - continuation?.resume(throwing: CancellationError()) - state = .buffer([]) - } - } - } - } - - func makeAsyncIterator() -> Pool { - self - } - } - - private struct LocalServerResponse: Sendable { + struct LocalServerResponse: Sendable { let requestId: String? let status: HTTPResponseStatus? let headers: HTTPHeaders? @@ -652,7 +591,7 @@ internal struct LambdaHTTPServer { } } - private struct LocalServerInvocation: Sendable { + struct LocalServerInvocation: Sendable { let requestId: String let request: ByteBuffer diff --git a/Tests/AWSLambdaRuntimeTests/LambdaLocalServerTests.swift b/Tests/AWSLambdaRuntimeTests/LambdaLocalServerTests.swift index 806f5c83..ac040dd6 100644 --- a/Tests/AWSLambdaRuntimeTests/LambdaLocalServerTests.swift +++ b/Tests/AWSLambdaRuntimeTests/LambdaLocalServerTests.swift @@ -19,6 +19,18 @@ import Testing @testable import AWSLambdaRuntime +#if canImport(FoundationEssentials) +import FoundationEssentials +#else +import Foundation +#endif + +#if canImport(FoundationNetworking) +import FoundationNetworking +#else +import Foundation +#endif + // serialized to start only one runtime at a time @Suite(.serialized) struct LambdaLocalServerTest { @@ -78,6 +90,117 @@ struct LambdaLocalServerTest { #expect(result == true) } + @Test("Local server handles rapid concurrent requests without HTTP 400 errors") + @available(LambdaSwift 2.0, *) + func testRapidConcurrentRequests() async throws { + let customPort = 8081 + + // Set environment variable + setenv("LOCAL_LAMBDA_PORT", "\(customPort)", 1) + defer { unsetenv("LOCAL_LAMBDA_PORT") } + + struct RequestResult { + let requestIndex: Int + let statusCode: Int + let responseBody: String + } + + let results = try await withThrowingTaskGroup(of: [RequestResult].self) { group in + + // Start the Lambda runtime with local server + group.addTask { + let runtime = LambdaRuntime { (event: String, context: LambdaContext) in + try await Task.sleep(for: .milliseconds(100)) + return "Hello \(event)" + } + + // Start runtime (this will block until cancelled) + try await runtime._run() + return [] + } + + // Start HTTP client to make rapid requests + group.addTask { + // Give server time to start + try await Task.sleep(for: .milliseconds(200)) + + // Make 10 rapid concurrent POST requests to /invoke + return try await withThrowingTaskGroup(of: RequestResult.self) { clientGroup in + var requestResults: [RequestResult] = [] + + for i in 0..<10 { + try await Task.sleep(for: .milliseconds(0)) + clientGroup.addTask { + let (data, response) = try await self.makeInvokeRequest( + host: "127.0.0.1", + port: customPort, + payload: "\"World\(i)\"" + ) + return RequestResult( + requestIndex: i, + statusCode: response.statusCode, + responseBody: String(decoding: data, as: UTF8.self) + ) + } + } + + for try await result in clientGroup { + requestResults.append(result) + } + + return requestResults + } + } + + // Get the first result (request results) and cancel the runtime + let first = try await group.next() + group.cancelAll() + return first ?? [] + } + + #expect(results.count == 10, "Expected 10 responses") + + // Verify that each request was processed correctly by checking response content + // Sort results by request index to verify proper execution order + let sortedResults = results.sorted { $0.requestIndex < $1.requestIndex } + for (index, result) in sortedResults.enumerated() { + let expectedResponse = "\"Hello World\(index)\"" + #expect( + result.responseBody == expectedResponse, + "Request \(index) response was '\(result.responseBody)', expected '\(expectedResponse)'" + ) + #expect( + result.requestIndex == index, + "Request order mismatch: got index \(result.requestIndex), expected \(index)" + ) + #expect( + result.statusCode == 202, + "Request \(result.requestIndex) returned \(result.statusCode), expected 202 OK" + ) + } + } + + private func makeInvokeRequest(host: String, port: Int, payload: String) async throws -> (Data, HTTPURLResponse) { + let url = URL(string: "http://\(host):\(port)/invoke")! + var request = URLRequest(url: url) + request.httpMethod = "POST" + request.setValue("application/json", forHTTPHeaderField: "Content-Type") + request.httpBody = payload.data(using: .utf8) + request.timeoutInterval = 10.0 + + let (data, response) = try await URLSession.shared.data(for: request) + + guard let httpResponse = response as? HTTPURLResponse else { + // Create a custom error since URLError might not be available on Linux + struct HTTPError: Error { + let message: String + } + throw HTTPError(message: "Bad server response") + } + + return (data, httpResponse) + } + private func isPortResponding(host: String, port: Int) async throws -> Bool { let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) diff --git a/Tests/AWSLambdaRuntimeTests/PoolTests.swift b/Tests/AWSLambdaRuntimeTests/PoolTests.swift index 8cbe8a2e..9d5b34ed 100644 --- a/Tests/AWSLambdaRuntimeTests/PoolTests.swift +++ b/Tests/AWSLambdaRuntimeTests/PoolTests.swift @@ -12,6 +12,9 @@ // //===----------------------------------------------------------------------===// +#if LocalServerSupport + +import NIOCore import Testing @testable import AWSLambdaRuntime @@ -24,8 +27,8 @@ struct PoolTests { let pool = LambdaHTTPServer.Pool() // Push values - await pool.push("first") - await pool.push("second") + pool.push("first") + pool.push("second") // Iterate and verify order var values = [String]() @@ -53,7 +56,11 @@ struct PoolTests { task.cancel() // This should complete without receiving any values - try await task.value + do { + try await task.value + } catch is CancellationError { + // this might happen depending on the order on which the cancellation is handled + } } @Test @@ -78,7 +85,7 @@ struct PoolTests { try await withThrowingTaskGroup(of: Void.self) { group in for i in 0..=6.1) + + @Test + @available(LambdaSwift 2.0, *) + func testConcurrentNext() async throws { + let pool = LambdaHTTPServer.Pool() + + // Create two tasks that will both wait for elements to be available + let error = await #expect(throws: LambdaHTTPServer.Pool.PoolError.self) { + try await withThrowingTaskGroup(of: Void.self) { group in + + // one of the two task will throw a PoolError + + group.addTask { + for try await _ in pool { + } + Issue.record("Loop 1 should not complete") + } + + group.addTask { + for try await _ in pool { + } + Issue.record("Loop 2 should not complete") + } + try await group.waitForAll() + } + } + + // Verify it's the correct error cause + if case .nextCalledTwice = error?.cause { + // This is the expected error + } else { + Issue.record("Expected nextCalledTwice error, got: \(String(describing: error?.cause))") + } + } + + // MARK: - Invariant Tests for RequestId-specific functionality + + @Test + @available(LambdaSwift 2.0, *) + func testRequestIdSpecificNext() async throws { + let pool = LambdaHTTPServer.Pool() + + // Push responses with different requestIds + pool.push(LambdaHTTPServer.LocalServerResponse(id: "req1", body: ByteBuffer(string: "data1"))) + pool.push(LambdaHTTPServer.LocalServerResponse(id: "req2", body: ByteBuffer(string: "data2"))) + pool.push(LambdaHTTPServer.LocalServerResponse(id: "req1", body: ByteBuffer(string: "data3"))) + + // Get specific responses + let response1 = try await pool.next(for: "req1") + #expect(response1.requestId == "req1") + #expect(String(buffer: response1.body!) == "data1") + + let response2 = try await pool.next(for: "req2") + #expect(response2.requestId == "req2") + #expect(String(buffer: response2.body!) == "data2") + + let response3 = try await pool.next(for: "req1") + #expect(response3.requestId == "req1") + #expect(String(buffer: response3.body!) == "data3") + } + + @Test + @available(LambdaSwift 2.0, *) + func testStreamingResponsesWithSameRequestId() async throws { + let pool = LambdaHTTPServer.Pool() + let requestId = "streaming-req" + + let chunks = try await withThrowingTaskGroup(of: [String].self) { group in + // Start consumer task + group.addTask { + var chunks: [String] = [] + var isComplete = false + + while !isComplete { + let response = try await pool.next(for: requestId) + if let body = response.body { + chunks.append(String(buffer: body)) + } + if response.final { + isComplete = true + } + } + return chunks + } + + // Start producer task + group.addTask { + // Give consumer time to start waiting + try await Task.sleep(nanoseconds: 10_000_000) // 0.01 seconds + + // Push multiple chunks for the same requestId + pool.push( + LambdaHTTPServer.LocalServerResponse( + id: requestId, + body: ByteBuffer(string: "chunk1"), + final: false + ) + ) + pool.push( + LambdaHTTPServer.LocalServerResponse( + id: requestId, + body: ByteBuffer(string: "chunk2"), + final: false + ) + ) + pool.push( + LambdaHTTPServer.LocalServerResponse(id: requestId, body: ByteBuffer(string: "chunk3"), final: true) + ) + + return [] // Producer doesn't return chunks + } + + // Wait for consumer to complete and return its result + for try await result in group { + if !result.isEmpty { + group.cancelAll() + return result + } + } + return [] + } + + #expect(chunks == ["chunk1", "chunk2", "chunk3"]) + } + + @Test + @available(LambdaSwift 2.0, *) + func testMixedWaitingModesError() async throws { + let pool = LambdaHTTPServer.Pool() + + let error = await #expect(throws: LambdaHTTPServer.Pool.PoolError.self) { + try await withThrowingTaskGroup(of: Void.self) { group in + // Start a FIFO consumer + group.addTask { + for try await _ in pool { + // This should block waiting for any item + } + } + + // Start a requestId-specific consumer after a delay + group.addTask { + // Give FIFO task time to start waiting + try await Task.sleep(nanoseconds: 10_000_000) // 0.01 seconds + + // Try to use requestId-specific next - should fail with mixedWaitingModes + _ = try await pool.next(for: "req1") + } + + // Wait for the first task to complete (which should be the error) + try await group.next() + group.cancelAll() + } + } + + // Verify it's the correct error cause + if case .mixedWaitingModes = error?.cause { + // This is the expected error + } else { + Issue.record("Expected mixedWaitingModes error, got: \(String(describing: error?.cause))") + } + } + + @Test + @available(LambdaSwift 2.0, *) + func testMixedWaitingModesErrorReverse() async throws { + let pool = LambdaHTTPServer.Pool() + + let error = await #expect(throws: LambdaHTTPServer.Pool.PoolError.self) { + try await withThrowingTaskGroup(of: Void.self) { group in + // Start a requestId-specific consumer + group.addTask { + _ = try await pool.next(for: "req1") + } + + // Start a FIFO consumer after a delay + group.addTask { + // Give specific task time to start waiting + try await Task.sleep(nanoseconds: 10_000_000) // 0.01 seconds + + // Try to use FIFO next - should fail with mixedWaitingModes + for try await _ in pool { + break + } + } + + // Wait for the first task to complete (which should be the error) + try await group.next() + group.cancelAll() + } + } + + // Verify it's the correct error cause + if case .mixedWaitingModes = error?.cause { + // This is the expected error + } else { + Issue.record("Expected mixedWaitingModes error, got: \(String(describing: error?.cause))") + } + } + + @Test + @available(LambdaSwift 2.0, *) + func testDuplicateRequestIdWaitError() async throws { + let pool = LambdaHTTPServer.Pool() + + let error = await #expect(throws: LambdaHTTPServer.Pool.PoolError.self) { + try await withThrowingTaskGroup(of: Void.self) { group in + // Start first consumer waiting for specific requestId + group.addTask { + _ = try await pool.next(for: "req1") + } + + // Start second consumer for same requestId after a delay + group.addTask { + // Give first task time to start waiting + try await Task.sleep(nanoseconds: 10_000_000) // 0.01 seconds + + // Try to wait for the same requestId - should fail + _ = try await pool.next(for: "req1") + } + + // Wait for the first task to complete (which should be the error) + try await group.next() + group.cancelAll() + } + } + + // Verify it's the correct error cause and requestId + if case let .duplicateRequestIdWait(requestId) = error?.cause { + #expect(requestId == "req1") + } else { + Issue.record("Expected duplicateRequestIdWait error, got: \(String(describing: error?.cause))") + } + } + + @Test + @available(LambdaSwift 2.0, *) + func testConcurrentRequestIdConsumers() async throws { + let pool = LambdaHTTPServer.Pool() + + let results = try await withThrowingTaskGroup(of: (String, String).self) { group in + // Start multiple consumers for different requestIds + group.addTask { + let response = try await pool.next(for: "req1") + return ("req1", String(buffer: response.body!)) + } + + group.addTask { + let response = try await pool.next(for: "req2") + return ("req2", String(buffer: response.body!)) + } + + group.addTask { + let response = try await pool.next(for: "req3") + return ("req3", String(buffer: response.body!)) + } + + // Start producer task + group.addTask { + // Give tasks time to start waiting + try await Task.sleep(nanoseconds: 10_000_000) // 0.01 seconds + + // Push responses in different order + pool.push(LambdaHTTPServer.LocalServerResponse(id: "req3", body: ByteBuffer(string: "data3"))) + pool.push(LambdaHTTPServer.LocalServerResponse(id: "req1", body: ByteBuffer(string: "data1"))) + pool.push(LambdaHTTPServer.LocalServerResponse(id: "req2", body: ByteBuffer(string: "data2"))) + + return ("producer", "") // Producer doesn't return meaningful data + } + + // Collect results from consumers + var consumerResults: [String: String] = [:] + for try await (requestId, data) in group { + if requestId != "producer" { + consumerResults[requestId] = data + } + if consumerResults.count == 3 { + group.cancelAll() + break + } + } + return consumerResults + } + + // Verify each consumer gets the correct response + #expect(results["req1"] == "data1") + #expect(results["req2"] == "data2") + #expect(results["req3"] == "data3") + } + + @Test + @available(LambdaSwift 2.0, *) + func testCancellationCleansUpAllContinuations() async throws { + let pool = LambdaHTTPServer.Pool() + + // Test that cancellation properly cleans up all continuations + await #expect(throws: CancellationError.self) { + try await withThrowingTaskGroup(of: Void.self) { group in + // Start multiple consumers for different requestIds + group.addTask { + _ = try await pool.next(for: "req1") + } + + group.addTask { + _ = try await pool.next(for: "req2") + } + + group.addTask { + _ = try await pool.next(for: "req3") + } + + // Give tasks time to start waiting then cancel all + try await Task.sleep(nanoseconds: 10_000_000) // 0.01 seconds + group.cancelAll() + + try await group.waitForAll() + } + } + + // Pool should be back to clean state - verify by pushing and consuming normally + pool.push(LambdaHTTPServer.LocalServerResponse(id: "new-req", body: ByteBuffer(string: "new-data"))) + let response = try await pool.next(for: "new-req") + #expect(String(buffer: response.body!) == "new-data") + } + + @Test + @available(LambdaSwift 2.0, *) + func testBufferOrderingWithRequestIds() async throws { + let pool = LambdaHTTPServer.Pool() + + // Push multiple responses for the same requestId + pool.push(LambdaHTTPServer.LocalServerResponse(id: "req1", body: ByteBuffer(string: "first"))) + pool.push(LambdaHTTPServer.LocalServerResponse(id: "req2", body: ByteBuffer(string: "other"))) + pool.push(LambdaHTTPServer.LocalServerResponse(id: "req1", body: ByteBuffer(string: "second"))) + pool.push(LambdaHTTPServer.LocalServerResponse(id: "req1", body: ByteBuffer(string: "third"))) + + // Consume in order - should get FIFO order for the same requestId + let first = try await pool.next(for: "req1") + #expect(String(buffer: first.body!) == "first") + + let second = try await pool.next(for: "req1") + #expect(String(buffer: second.body!) == "second") + + let other = try await pool.next(for: "req2") + #expect(String(buffer: other.body!) == "other") + + let third = try await pool.next(for: "req1") + #expect(String(buffer: third.body!) == "third") + } + #endif //swift >= 6.1 + } +#endif // trait