Skip to content
169 changes: 169 additions & 0 deletions Sources/AWSLambdaRuntimeCore/DetachedTasks.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftAWSLambdaRuntime open source project
//
// Copyright (c) 2022 Apple Inc. and the SwiftAWSLambdaRuntime project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftAWSLambdaRuntime project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import Foundation
import NIOConcurrencyHelpers
import NIOCore
import Logging

/// A container that allows tasks to finish after a synchronous invocation
/// has produced its response.
public final class DetachedTasksContainer {

struct Context {
let eventLoop: EventLoop
let logger: Logger
}

private var context: Context
private var storage: Storage

init(context: Context) {
self.storage = Storage()
self.context = context
}

/// Adds a detached task that runs on the given event loop.
///
/// - Parameters:
/// - name: The name of the task.
/// - task: The task to execute. It receives an `EventLoop` and returns an `EventLoopFuture<Void>`.
/// - Returns: A `RegistrationKey` for the registered task.
@discardableResult
public func detached(name: String, task: @escaping (EventLoop) -> EventLoopFuture<Void>) -> RegistrationKey {
let key = RegistrationKey()
let task = task(context.eventLoop).always { _ in
self.storage.remove(key)
}
self.storage.add(key: key, name: name, task: task)
return key
}

/// Adds a detached async task.
///
/// - Parameters:
/// - name: The name of the task.
/// - task: The async task to execute.
/// - Returns: A `RegistrationKey` for the registered task.
@discardableResult
public func detached(name: String, task: @Sendable @escaping () async throws -> Void) -> RegistrationKey {
let key = RegistrationKey()
let promise = context.eventLoop.makePromise(of: Void.self)
promise.completeWithTask(task)
let task = promise.futureResult.always { result in
switch result {
case .success:
break
case .failure(let failure):
self.context.logger.warning(
"Execution of detached task failed with error.",
metadata: [
"taskName": "\(name)",
"error": "\(failure)"
]
)
}
self.storage.remove(key)
}
self.storage.add(key: key, name: name, task: task)
return key
}

/// Informs the runtime that the specified task should not be awaited anymore.
///
/// - Warning: This method does not actually stop the execution of the
/// detached task, it only prevents the runtime from waiting for it before
/// `/next` is invoked.
///
/// - Parameter key: The `RegistrationKey` of the task to cancel.
public func unsafeCancel(_ key: RegistrationKey) {
// To discuss:
// Canceling the execution doesn't seem to be an easy
// task https://github.com/apple/swift-nio/issues/2087
//
// While removing the handler will allow the runtime
// to invoke `/next` without actually awaiting for the
// task to complete, it does not actually cancel
// the execution of the dispatched task.
// Since this is a bit counter-intuitive, we might not
// want this method to exist at all.
self.storage.remove(key)
}

/// Awaits all registered tasks to complete.
///
/// - Returns: An `EventLoopFuture<Void>` that completes when all tasks have finished.
internal func awaitAll() -> EventLoopFuture<Void> {
let tasks = storage.tasks
if tasks.isEmpty {
return context.eventLoop.makeSucceededVoidFuture()
} else {
return EventLoopFuture.andAllComplete(tasks.map(\.value.task), on: context.eventLoop).flatMap {
self.awaitAll()
}
}
}
}

extension DetachedTasksContainer {
/// Lambda detached task registration key.
public struct RegistrationKey: Hashable, CustomStringConvertible {
var value: String

init() {
// UUID basically
self.value = UUID().uuidString
}

public var description: String {
self.value
}
}
}

extension DetachedTasksContainer {
fileprivate final class Storage {
private let lock: NIOLock

private var map: [RegistrationKey: (name: String, task: EventLoopFuture<Void>)]

init() {
self.lock = .init()
self.map = [:]
}

func add(key: RegistrationKey, name: String, task: EventLoopFuture<Void>) {
self.lock.withLock {
self.map[key] = (name: name, task: task)
}
}

func remove(_ key: RegistrationKey) {
self.lock.withLock {
self.map[key] = nil
}
}

var tasks: [RegistrationKey: (name: String, task: EventLoopFuture<Void>)] {
self.lock.withLock {
self.map
}
}
}
}

// Ideally this would not be @unchecked Sendable, but Sendable checks do not understand locks
// We can transition this to an actor once we drop support for older Swift versions
extension DetachedTasksContainer: @unchecked Sendable {}
extension DetachedTasksContainer.Storage: @unchecked Sendable {}
extension DetachedTasksContainer.RegistrationKey: Sendable {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We must make sure that all those types are Sendable without using @unchecked. I'm sure we can achieve this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we can, I took inspiration from Terminator.swift for that part, but I'll change it.

17 changes: 15 additions & 2 deletions Sources/AWSLambdaRuntimeCore/LambdaContext.swift
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public struct LambdaContext: CustomDebugStringConvertible, Sendable {
let logger: Logger
let eventLoop: EventLoop
let allocator: ByteBufferAllocator
let tasks: DetachedTasksContainer

init(
requestID: String,
Expand All @@ -91,7 +92,8 @@ public struct LambdaContext: CustomDebugStringConvertible, Sendable {
clientContext: String?,
logger: Logger,
eventLoop: EventLoop,
allocator: ByteBufferAllocator
allocator: ByteBufferAllocator,
tasks: DetachedTasksContainer
) {
self.requestID = requestID
self.traceID = traceID
Expand All @@ -102,6 +104,7 @@ public struct LambdaContext: CustomDebugStringConvertible, Sendable {
self.logger = logger
self.eventLoop = eventLoop
self.allocator = allocator
self.tasks = tasks
}
}

Expand Down Expand Up @@ -158,6 +161,10 @@ public struct LambdaContext: CustomDebugStringConvertible, Sendable {
public var allocator: ByteBufferAllocator {
self.storage.allocator
}

public var tasks: DetachedTasksContainer {
self.storage.tasks
}

init(requestID: String,
traceID: String,
Expand All @@ -177,7 +184,13 @@ public struct LambdaContext: CustomDebugStringConvertible, Sendable {
clientContext: clientContext,
logger: logger,
eventLoop: eventLoop,
allocator: allocator
allocator: allocator,
tasks: DetachedTasksContainer(
context: DetachedTasksContainer.Context(
eventLoop: eventLoop,
logger: logger
)
)
)
}

Expand Down
12 changes: 9 additions & 3 deletions Sources/AWSLambdaRuntimeCore/LambdaRunner.swift
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,19 @@ internal final class LambdaRunner {
if case .failure(let error) = result {
logger.warning("lambda handler returned an error: \(error)")
}
return (invocation, result)
return (invocation, result, context)
}
}.flatMap { invocation, result in
}.flatMap { invocation, result, context in
// 3. report results to runtime engine
self.runtimeClient.reportResults(logger: logger, invocation: invocation, result: result).peekError { error in
logger.error("could not report results to lambda runtime engine: \(error)")
}
// To discuss:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only point that remains open @fabianfett. When the runtime fails to report a result to AWS Lambda, do we want to wait for the background tasks to complete before stopping the execution of the whole process?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the error case we should await all subtasks.

// Do we want to await the tasks in this case?
return context.tasks.awaitAll()
}.map { _ in context }
}
.flatMap { context in
context.tasks.awaitAll()
}
}

Expand Down