diff --git a/Sources/Valkey/Utils/ActionRunner.swift b/Sources/Valkey/Utils/ActionRunner.swift new file mode 100644 index 00000000..b3e76614 --- /dev/null +++ b/Sources/Valkey/Utils/ActionRunner.swift @@ -0,0 +1,51 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the valkey-swift project +// +// Copyright (c) 2025 the valkey-swift authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See valkey-swift/CONTRIBUTORS.txt for the list of valkey-swift authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +struct ActionStream: Sendable { + let stream: AsyncStream + let continuation: AsyncStream.Continuation + + init() { + (self.stream, self.continuation) = AsyncStream.makeStream() + } +} + +protocol ActionRunner: Sendable { + associatedtype Action: Sendable + + /// stream of actions to execute. + var actionStream: ActionStream { get } + + /// Run action + func runAction(_ action: Action) async +} + +@available(valkeySwift 1.0, *) +extension ActionRunner { + /// Run discarding task group running actions + func runActionTaskGroup() async { + await withDiscardingTaskGroup { group in + for await action in actionStream.stream { + group.addTask { + await self.runAction(action) + } + } + } + } + + /// Queue action to be run + func queueAction(_ action: Action) { + self.actionStream.continuation.yield(action) + } +} diff --git a/Sources/Valkey/ValkeyClient.swift b/Sources/Valkey/ValkeyClient.swift index c1b73159..f099b3d2 100644 --- a/Sources/Valkey/ValkeyClient.swift +++ b/Sources/Valkey/ValkeyClient.swift @@ -28,7 +28,10 @@ import ServiceLifecycle /// /// `ValkeyClient` supports TLS using both NIOSSL and the Network framework. @available(valkeySwift 1.0, *) -public final class ValkeyClient: Sendable { +public final class ValkeyClient: ActionRunner, Sendable { + enum Action: Sendable { + case runClient(ValkeyNodeClient) + } let nodeClientFactory: ValkeyNodeClientFactory /// single node @usableFromInline @@ -41,6 +44,8 @@ public final class ValkeyClient: Sendable { let logger: Logger /// running atomic let runningAtomic: Atomic + /// Action stream + let actionStream: ActionStream /// Creates a new Valkey client /// @@ -84,6 +89,8 @@ public final class ValkeyClient: Sendable { self.logger = logger self.runningAtomic = .init(false) self.node = self.nodeClientFactory.makeConnectionPool(serverAddress: address) + self.actionStream = .init() + self.queueAction(.runClient(self.node)) } } @@ -95,10 +102,10 @@ extension ValkeyClient { precondition(!atomicOp.original, "ValkeyClient.run() should just be called once!") #if ServiceLifecycleSupport await cancelWhenGracefulShutdown { - await self.node.run() + await self.runActionTaskGroup() } #else - await self.node.run() + await self.runActionTaskGroup() #endif } @@ -117,6 +124,16 @@ extension ValkeyClient { } } +@available(valkeySwift 1.0, *) +extension ValkeyClient { + func runAction(_ action: Action) async { + switch action { + case .runClient(let nodeClient): + await nodeClient.run() + } + } +} + /// Extend ValkeyClient so we can call commands directly from it @available(valkeySwift 1.0, *) extension ValkeyClient: ValkeyClientProtocol { diff --git a/Tests/ValkeyTests/ActionRunnerTests.swift b/Tests/ValkeyTests/ActionRunnerTests.swift new file mode 100644 index 00000000..366e07bb --- /dev/null +++ b/Tests/ValkeyTests/ActionRunnerTests.swift @@ -0,0 +1,116 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the valkey-swift open source project +// +// Copyright (c) 2025 the valkey-swift project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of valkey-swift project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Synchronization +import Testing + +@testable import Valkey + +struct ActionRunnerTests { + @available(valkeySwift 1.0, *) + @Test + func testRunsAction() async throws { + final class TestActionRunner: ActionRunner { + let (stream, cont) = AsyncStream.makeStream(of: Void.self) + let actionStream: ActionStream = .init() + + enum Action { + case action1 + } + func runAction(_ action: Action) async { + switch action { + case .action1: + cont.yield() + } + } + } + let test = TestActionRunner() + await withTaskGroup(of: Void.self) { group in + group.addTask { + await test.runActionTaskGroup() + } + test.queueAction(.action1) + await test.stream.first { _ in true } + group.cancelAll() + } + } + + @available(valkeySwift 1.0, *) + @Test + func testRunsMultipleActions() async throws { + final class TestActionRunner: ActionRunner { + let (stream, cont) = AsyncStream.makeStream(of: Void.self) + let actionStream: ActionStream = .init() + let value = Mutex(0) + enum Action { + case action1 + case action2 + } + func runAction(_ action: Action) async { + switch action { + case .action1: + value.withLock { $0 = $0 + 1 } + cont.yield() + case .action2: + value.withLock { $0 = $0 + 2 } + cont.yield() + } + } + } + let test = TestActionRunner() + await withTaskGroup(of: Void.self) { group in + group.addTask { + await test.runActionTaskGroup() + } + test.queueAction(.action1) + test.queueAction(.action2) + await test.stream.first { _ in true } + await test.stream.first { _ in true } + test.value.withLock { + #expect($0 == 3) + } + group.cancelAll() + } + } + + @available(valkeySwift 1.0, *) + @Test + func testCancellationPropagatesToActions() async throws { + final class TestActionRunner: ActionRunner { + let (stream, cont) = AsyncStream.makeStream(of: Void.self) + let actionStream: ActionStream = .init() + enum Action { + case action1 + } + func runAction(_ action: Action) async { + switch action { + case .action1: + cont.yield() + try? await Task.sleep(for: .seconds(60)) + } + } + } + let test = TestActionRunner() + await withTaskGroup(of: Void.self) { group in + group.addTask { + await test.runActionTaskGroup() + } + test.queueAction(.action1) + await test.stream.first { _ in true } + let now = ContinuousClock.now + group.cancelAll() + #expect(.now - now < .seconds(1)) + } + } +}