Skip to content
Open
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
1de4b19
Gently decline subsequent POST /invoke request while the Lambda handl…
Oct 14, 2025
f35685c
Update Sources/AWSLambdaRuntime/Lambda+LocalServer.swift
sebsto Oct 14, 2025
44068af
Merge branch 'main' into sebsto/fix_584
Oct 14, 2025
af43436
Merge branch 'sebsto/fix_584' of github.com:sebsto/swift-aws-lambda-r…
Oct 14, 2025
8aa9a46
fix typo and language in comments
Oct 14, 2025
6d02848
fix test
Oct 14, 2025
0566ef5
remove async constraint on `push()`
Oct 14, 2025
b341bb4
swift-format
Oct 14, 2025
d73dce6
remove the fatal error to make testinge easier
Oct 14, 2025
0cd73da
fix comment
Oct 14, 2025
5cc8ed9
Merge branch 'main' into sebsto/fix_584
Oct 14, 2025
b3576fe
use Result instead of Tupe
Oct 15, 2025
ae18d7b
swift format
Oct 15, 2025
605e86c
onCancel: resume continuation outside of the lock
Oct 15, 2025
2d6842a
swift-format
Oct 15, 2025
009b5c6
add logging trace details
Oct 16, 2025
0ed15f9
fix parallel invocation for non streaming lambda functions
Oct 16, 2025
a2ce3be
move pool to a separate file
Oct 16, 2025
5529cae
add unit test for the pool
Oct 16, 2025
9abbe8e
remove forced trace level for logging
Oct 16, 2025
6311b46
add license header
Oct 16, 2025
18e09c6
skip some tests on Swift 6.0
Oct 16, 2025
c07555a
swift-format
Oct 16, 2025
bff7507
replace a do {} catch {} with #expect(throwing:)
Oct 16, 2025
86591a7
swift-format
Oct 16, 2025
df989af
Generate time independent requestID + add a test for rapid fire
Oct 16, 2025
585a414
fix test on Linux
Oct 16, 2025
8845467
Also check that the test responses are in the correct order
Oct 16, 2025
6138790
simplify the test #expect
Oct 16, 2025
669ad43
use String(decoding:) instead of String(data:)
Oct 16, 2025
9c85560
State : use a struct instead of enum
Oct 16, 2025
54744c5
Update Sources/AWSLambdaRuntime/Lambda+LocalServer.swift
sebsto Oct 16, 2025
503f2da
Update Sources/AWSLambdaRuntime/Lambda+LocalServer.swift
sebsto Oct 16, 2025
30f19ad
Merge branch 'main' into sebsto/fix_584
Oct 16, 2025
b662024
Merge branch 'main' into sebsto/fix_584
Oct 17, 2025
ab77e5a
return an HTTP 5xx error in case we mess up something in the Pool
Oct 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Examples/Streaming/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ You can test the function locally before deploying:
swift run

# In another terminal, test with curl:
curl -v \
curl -v --output response.txt \
--header "Content-Type: application/json" \
--data '"this is not used"' \
http://127.0.0.1:7000/invoke
Expand Down
136 changes: 95 additions & 41 deletions Sources/AWSLambdaRuntime/Lambda+LocalServer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ extension Lambda {
internal struct LambdaHTTPServer {
private let invocationEndpoint: String

private let invocationPool = Pool<LocalServerInvocation>()
private let responsePool = Pool<LocalServerResponse>()
private let invocationPool = Pool<LocalServerInvocation>(name: "Invocation Pool")
private let responsePool = Pool<LocalServerResponse>(name: "Response Pool")

private init(
invocationEndpoint: String?
Expand Down Expand Up @@ -272,7 +272,7 @@ internal struct LambdaHTTPServer {

// for streaming requests, push a partial head response
if self.isStreamingResponse(requestHead) {
await self.responsePool.push(
self.responsePool.push(
LocalServerResponse(
id: requestId,
status: .ok
Expand All @@ -286,7 +286,7 @@ internal struct LambdaHTTPServer {
// if this is a request from a Streaming Lambda Handler,
// stream the response instead of buffering it
if self.isStreamingResponse(requestHead) {
await self.responsePool.push(
self.responsePool.push(
LocalServerResponse(id: requestId, body: body)
)
} else {
Expand All @@ -298,7 +298,7 @@ internal struct LambdaHTTPServer {

if self.isStreamingResponse(requestHead) {
// for streaming response, send the final response
await self.responsePool.push(
self.responsePool.push(
LocalServerResponse(id: requestId, final: true)
)
} else {
Expand Down Expand Up @@ -388,34 +388,55 @@ internal struct LambdaHTTPServer {
// we always accept the /invoke request and push them to the pool
let requestId = "\(DispatchTime.now().uptimeNanoseconds)"
logger[metadataKey: "requestId"] = "\(requestId)"

logger.trace("/invoke received invocation, pushing it to the pool and wait for a lambda response")
await self.invocationPool.push(LocalServerInvocation(requestId: requestId, request: body))
self.invocationPool.push(LocalServerInvocation(requestId: requestId, request: body))

// wait for the lambda function to process the request
for try await response in self.responsePool {
logger[metadataKey: "response requestId"] = "\(response.requestId ?? "nil")"
logger.trace("Received response to return to client")
if response.requestId == requestId {
logger.trace("/invoke requestId is valid, sending the response")
// send the response to the client
// if the response is final, we can send it and return
// if the response is not final, we can send it and wait for the next response
try await self.sendResponse(response, outbound: outbound, logger: logger)
if response.final == true {
logger.trace("/invoke returning")
return // if the response is final, we can return and close the connection
// when POST /invoke is called multiple times before a response is processed,
// the `for try await ... in` loop will throw an error and we will return a 400 error to the client
do {
for try await response in self.responsePool {
logger[metadataKey: "response requestId"] = "\(response.requestId ?? "nil")"
logger.trace("Received response to return to client")
if response.requestId == requestId {
logger.trace("/invoke requestId is valid, sending the response")
// send the response to the client
// if the response is final, we can send it and return
// if the response is not final, we can send it and wait for the next response
try await self.sendResponse(response, outbound: outbound, logger: logger)
if response.final == true {
logger.trace("/invoke returning")
return // if the response is final, we can return and close the connection
}
} else {
logger.error(
"Received response for a different requestId",
metadata: ["response requestId": "\(response.requestId ?? "")"]
)
let response = LocalServerResponse(
id: requestId,
status: .badRequest,
body: ByteBuffer(string: "The responseId is not equal to the requestId.")
)
try await self.sendResponse(response, outbound: outbound, logger: logger)
}
} else {
logger.error(
"Received response for a different request id",
metadata: ["response requestId": "\(response.requestId ?? "")"]
)
// should we return an error here ? Or crash as this is probably a programming error?
}
// What todo when there is no more responses to process?
// This should not happen as the async iterator blocks until there is a response to process
fatalError("No more responses to process - the async for loop should not return")
} catch is LambdaHTTPServer.Pool<LambdaHTTPServer.LocalServerResponse>.PoolError {
// detect concurrent invocations of POST and gently decline the requests while we're processing one.
let response = LocalServerResponse(
id: requestId,
status: .badRequest,
body: ByteBuffer(
string:
"It is not allowed to invoke multiple Lambda function executions in parallel. (The Lambda runtime environment on AWS will never do that)"
)
)
try await self.sendResponse(response, outbound: outbound, logger: logger)
}
// What todo when there is no more responses to process?
// This should not happen as the async iterator blocks until there is a response to process
fatalError("No more responses to process - the async for loop should not return")

// client uses incorrect HTTP method
case (_, let url) where url.hasSuffix(self.invocationEndpoint):
Expand Down Expand Up @@ -457,7 +478,7 @@ internal struct LambdaHTTPServer {
}
// enqueue the lambda function response to be served as response to the client /invoke
logger.trace("/:requestId/response received response", metadata: ["requestId": "\(requestId)"])
await self.responsePool.push(
self.responsePool.push(
LocalServerResponse(
id: requestId,
status: .accepted,
Expand Down Expand Up @@ -488,7 +509,7 @@ internal struct LambdaHTTPServer {
}
// enqueue the lambda function response to be served as response to the client /invoke
logger.trace("/:requestId/response received response", metadata: ["requestId": "\(requestId)"])
await self.responsePool.push(
self.responsePool.push(
LocalServerResponse(
id: requestId,
status: .internalServerError,
Expand Down Expand Up @@ -548,18 +569,21 @@ internal struct LambdaHTTPServer {
/// This data structure is shared between instances of the HTTPHandler
/// (one instance to serve requests from the Lambda function and one instance to serve requests from the client invoking the lambda function).
internal final class Pool<T>: AsyncSequence, AsyncIteratorProtocol, Sendable where T: Sendable {
private let poolName: String
internal init(name: String = "Pool") { self.poolName = name }

typealias Element = T

enum State: ~Copyable {
case buffer(Deque<T>)
case continuation(CheckedContinuation<T, any Error>?)
case continuation(CheckedContinuation<T, any Error>)
}

private let lock = Mutex<State>(.buffer([]))

/// enqueue an element, or give it back immediately to the iterator if it is waiting for an element
public func push(_ invocation: T) async {
// if the iterator is waiting for an element, give it to it
public func push(_ invocation: T) {
// if the iterator is waiting for an element on `next()``, give it to it
// otherwise, enqueue the element
let maybeContinuation = self.lock.withLock { state -> CheckedContinuation<T, any Error>? in
switch consume state {
Expand All @@ -574,6 +598,7 @@ internal struct LambdaHTTPServer {
}
}

// Resume continuation outside the lock to prevent potential deadlocks
maybeContinuation?.resume(returning: invocation)
}

Expand All @@ -585,42 +610,71 @@ internal struct LambdaHTTPServer {

return try await withTaskCancellationHandler {
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<T, any Error>) in
let nextAction = self.lock.withLock { state -> T? in
let nextAction: Result<T, PoolError>? = self.lock.withLock { state -> Result<T, PoolError>? in
switch consume state {
case .buffer(var buffer):
if let first = buffer.popFirst() {
state = .buffer(buffer)
return first
return .success(first)
} else {
state = .continuation(continuation)
return nil
}

case .continuation:
fatalError("Concurrent invocations to next(). This is illegal.")
case .continuation(let previousContinuation):
state = .buffer([])
return .failure(PoolError(cause: .nextCalledTwice(previousContinuation)))
}
}

guard let nextAction else { return }

continuation.resume(returning: nextAction)
switch nextAction {
case .success(let action):
continuation.resume(returning: action)
case .failure(let error):
if case let .nextCalledTwice(prevContinuation) = error.cause {
prevContinuation.resume(throwing: error)
}
continuation.resume(throwing: error)
case .none:
// do nothing - continuation is stored in state
break
}
}
} onCancel: {
self.lock.withLock { state in
// Ensure we properly handle cancellation by checking if we have a stored continuation
let continuationToCancel = self.lock.withLock { state -> CheckedContinuation<T, any Error>? in
switch consume state {
case .buffer(let buffer):
state = .buffer(buffer)
return nil
case .continuation(let continuation):
continuation?.resume(throwing: CancellationError())
state = .buffer([])
return continuation
}
}

// Resume the continuation outside the lock to avoid potential deadlocks
continuationToCancel?.resume(throwing: CancellationError())
}
}

func makeAsyncIterator() -> Pool {
self
}

struct PoolError: Error {
let cause: Cause
var message: String {
switch self.cause {
case .nextCalledTwice:
return "Concurrent invocations to next(). This is not allowed."
}
}

enum Cause {
case nextCalledTwice(CheckedContinuation<T, any Error>)
}
}
}

private struct LocalServerResponse: Sendable {
Expand Down
44 changes: 38 additions & 6 deletions Tests/AWSLambdaRuntimeTests/PoolTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ struct PoolTests {
let pool = LambdaHTTPServer.Pool<String>()

// Push values
await pool.push("first")
await pool.push("second")
pool.push("first")
pool.push("second")

// Iterate and verify order
var values = [String]()
Expand Down Expand Up @@ -53,7 +53,11 @@ struct PoolTests {
task.cancel()

// This should complete without receiving any values
try await task.value
do {
try await task.value
} catch is CancellationError {
// this might happen depending on the order on which the cancellation is handled
}
}

@Test
Expand All @@ -78,7 +82,7 @@ struct PoolTests {
try await withThrowingTaskGroup(of: Void.self) { group in
for i in 0..<iterations {
group.addTask {
await pool.push(i)
pool.push(i)
}
}
try await group.waitForAll()
Expand Down Expand Up @@ -110,7 +114,7 @@ struct PoolTests {
try await Task.sleep(nanoseconds: 100_000_000) // 0.1 seconds

// Push a value
await pool.push(expectedValue)
pool.push(expectedValue)

// Wait for consumer to complete
try await consumer.value
Expand Down Expand Up @@ -140,7 +144,7 @@ struct PoolTests {
for p in 0..<producerCount {
group.addTask {
for i in 0..<messagesPerProducer {
await pool.push(p * messagesPerProducer + i)
pool.push(p * messagesPerProducer + i)
}
}
}
Expand All @@ -154,4 +158,32 @@ struct PoolTests {
#expect(receivedValues.count == producerCount * messagesPerProducer)
#expect(Set(receivedValues).count == producerCount * messagesPerProducer)
}

@Test
@available(LambdaSwift 2.0, *)
func testConcurrentNext() async throws {
let pool = LambdaHTTPServer.Pool<String>()

// Create two tasks that will both wait for elements to be available
await #expect(throws: LambdaHTTPServer.Pool<Swift.String>.PoolError.self) {
try await withThrowingTaskGroup(of: Void.self) { group in

// one of the two task will throw a PoolError

group.addTask {
for try await _ in pool {
}
Issue.record("Loop 1 should not complete")
}

group.addTask {
for try await _ in pool {
}
Issue.record("Loop 2 should not complete")
}
try await group.waitForAll()
}
}
}

}