Skip to content
Open
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
1de4b19
Gently decline subsequent POST /invoke request while the Lambda handl…
Oct 14, 2025
f35685c
Update Sources/AWSLambdaRuntime/Lambda+LocalServer.swift
sebsto Oct 14, 2025
44068af
Merge branch 'main' into sebsto/fix_584
Oct 14, 2025
af43436
Merge branch 'sebsto/fix_584' of github.com:sebsto/swift-aws-lambda-r…
Oct 14, 2025
8aa9a46
fix typo and language in comments
Oct 14, 2025
6d02848
fix test
Oct 14, 2025
0566ef5
remove async constraint on `push()`
Oct 14, 2025
b341bb4
swift-format
Oct 14, 2025
d73dce6
remove the fatal error to make testinge easier
Oct 14, 2025
0cd73da
fix comment
Oct 14, 2025
5cc8ed9
Merge branch 'main' into sebsto/fix_584
Oct 14, 2025
b3576fe
use Result instead of Tupe
Oct 15, 2025
ae18d7b
swift format
Oct 15, 2025
605e86c
onCancel: resume continuation outside of the lock
Oct 15, 2025
2d6842a
swift-format
Oct 15, 2025
009b5c6
add logging trace details
Oct 16, 2025
0ed15f9
fix parallel invocation for non streaming lambda functions
Oct 16, 2025
a2ce3be
move pool to a separate file
Oct 16, 2025
5529cae
add unit test for the pool
Oct 16, 2025
9abbe8e
remove forced trace level for logging
Oct 16, 2025
6311b46
add license header
Oct 16, 2025
18e09c6
skip some tests on Swift 6.0
Oct 16, 2025
c07555a
swift-format
Oct 16, 2025
bff7507
replace a do {} catch {} with #expect(throwing:)
Oct 16, 2025
86591a7
swift-format
Oct 16, 2025
df989af
Generate time independent requestID + add a test for rapid fire
Oct 16, 2025
585a414
fix test on Linux
Oct 16, 2025
8845467
Also check that the test responses are in the correct order
Oct 16, 2025
6138790
simplify the test #expect
Oct 16, 2025
669ad43
use String(decoding:) instead of String(data:)
Oct 16, 2025
9c85560
State : use a struct instead of enum
Oct 16, 2025
54744c5
Update Sources/AWSLambdaRuntime/Lambda+LocalServer.swift
sebsto Oct 16, 2025
503f2da
Update Sources/AWSLambdaRuntime/Lambda+LocalServer.swift
sebsto Oct 16, 2025
30f19ad
Merge branch 'main' into sebsto/fix_584
Oct 16, 2025
b662024
Merge branch 'main' into sebsto/fix_584
Oct 17, 2025
ab77e5a
return an HTTP 5xx error in case we mess up something in the Pool
Oct 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Examples/Streaming/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ You can test the function locally before deploying:
swift run

# In another terminal, test with curl:
curl -v \
curl -v --output response.txt \
--header "Content-Type: application/json" \
--data '"this is not used"' \
http://127.0.0.1:7000/invoke
Expand Down
192 changes: 192 additions & 0 deletions Sources/AWSLambdaRuntime/Lambda+LocalServer+Pool.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2025 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

#if LocalServerSupport
import DequeModule
import Synchronization

@available(LambdaSwift 2.0, *)
extension LambdaHTTPServer {

/// A shared data structure to store the current invocation or response requests and the continuation objects.
/// This data structure is shared between instances of the HTTPHandler
/// (one instance to serve requests from the Lambda function and one instance to serve requests from the client invoking the lambda function).
internal final class Pool<T>: AsyncSequence, AsyncIteratorProtocol, Sendable where T: Sendable {
private let poolName: String
internal init(name: String = "Pool") { self.poolName = name }

typealias Element = T

struct State {
var buffer: Deque<T> = []
var waitingForAny: CheckedContinuation<T, any Error>?
var waitingForSpecific: [String: CheckedContinuation<T, any Error>] = [:]
}

private let lock = Mutex<State>(State())

/// enqueue an element, or give it back immediately to the iterator if it is waiting for an element
public func push(_ item: T) {
let continuationToResume = self.lock.withLock { state -> CheckedContinuation<T, any Error>? in
// First check if there's a waiting continuation that can handle this item

// Check for FIFO waiter first
if let continuation = state.waitingForAny {
state.waitingForAny = nil
return continuation
}

// Check for specific waiter
if let response = item as? LocalServerResponse,
let requestId = response.requestId,
let continuation = state.waitingForSpecific.removeValue(forKey: requestId)
{
return continuation
}

// No waiting continuation, add to buffer
state.buffer.append(item)
return nil
}

// Resume continuation outside the lock to prevent potential deadlocks
continuationToResume?.resume(returning: item)
}

/// Unified next() method that handles both FIFO and requestId-specific waiting
private func _next(for requestId: String?) async throws -> T {
// exit if the task is cancelled
guard !Task.isCancelled else {
throw CancellationError()
}

return try await withTaskCancellationHandler {
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<T, any Error>) in
let nextAction: Result<T, PoolError>? = self.lock.withLock { state -> Result<T, PoolError>? in
if let requestId = requestId {
// Look for oldest (first) item for this requestId in buffer
if let index = state.buffer.firstIndex(where: { item in
if let response = item as? LocalServerResponse {
return response.requestId == requestId
}
return false
}) {
let item = state.buffer.remove(at: index)
return .success(item)
} else {
// Check for conflicting waiters
if state.waitingForAny != nil {
return .failure(PoolError(cause: .mixedWaitingModes))
}
if state.waitingForSpecific[requestId] != nil {
return .failure(PoolError(cause: .duplicateRequestIdWait(requestId)))
}

// No matching item, wait for it
state.waitingForSpecific[requestId] = continuation
return nil
}
} else {
// FIFO mode - take first item
if let first = state.buffer.popFirst() {
return .success(first)
} else {
// Check for conflicting waiters
if !state.waitingForSpecific.isEmpty {
return .failure(PoolError(cause: .mixedWaitingModes))
}
if state.waitingForAny != nil {
return .failure(PoolError(cause: .nextCalledTwice(state.waitingForAny!)))
}

state.waitingForAny = continuation
return nil
}
}
}

switch nextAction {
case .success(let item):
continuation.resume(returning: item)
case .failure(let error):
if case let .nextCalledTwice(prevContinuation) = error.cause {
prevContinuation.resume(throwing: error)
}
continuation.resume(throwing: error)
case .none:
// do nothing - continuation is stored in state
break
}
}
} onCancel: {
// Ensure we properly handle cancellation by removing stored continuation
let continuationsToCancel = self.lock.withLock { state -> [CheckedContinuation<T, any Error>] in
var toCancel: [CheckedContinuation<T, any Error>] = []

if let continuation = state.waitingForAny {
toCancel.append(continuation)
state.waitingForAny = nil
}

for continuation in state.waitingForSpecific.values {
toCancel.append(continuation)
}
state.waitingForSpecific.removeAll()

return toCancel
}

// Resume all continuations outside the lock to avoid potential deadlocks
for continuation in continuationsToCancel {
continuation.resume(throwing: CancellationError())
}
}
}

/// Simple FIFO next() method - used by AsyncIteratorProtocol
func next() async throws -> T? {
try await _next(for: nil)
}

/// RequestId-specific next() method for LocalServerResponse - NOT part of AsyncIteratorProtocol
func next(for requestId: String) async throws -> T {
try await _next(for: requestId)
}

func makeAsyncIterator() -> Pool {
self
}

struct PoolError: Error {
let cause: Cause
var message: String {
switch self.cause {
case .nextCalledTwice:
return "Concurrent invocations to next(). This is not allowed."
case .duplicateRequestIdWait(let requestId):
return "Already waiting for requestId: \(requestId)"
case .mixedWaitingModes:
return "Cannot mix FIFO waiting (next()) with specific waiting (next(for:))"
}
}

enum Cause {
case nextCalledTwice(CheckedContinuation<T, any Error>)
case duplicateRequestIdWait(String)
case mixedWaitingModes
}
}
}
}
#endif
Loading