Skip to content

Add ability to client to run other workloads than the connection manager #174

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions Sources/Valkey/Utils/ActionRunner.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the valkey-swift project
//
// Copyright (c) 2025 the valkey-swift authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See valkey-swift/CONTRIBUTORS.txt for the list of valkey-swift authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

struct ActionStream<Action: Sendable>: Sendable {
let stream: AsyncStream<Action>
let continuation: AsyncStream<Action>.Continuation

init() {
(self.stream, self.continuation) = AsyncStream.makeStream()
}
}

protocol ActionRunner: Sendable {
associatedtype Action: Sendable

/// stream of actions to execute.
var actionStream: ActionStream<Action> { get }

/// Run action
func runAction(_ action: Action) async
}

@available(valkeySwift 1.0, *)
extension ActionRunner {
/// Run discarding task group running actions
func runActionTaskGroup() async {
await withDiscardingTaskGroup { group in
for await action in actionStream.stream {
group.addTask {
await self.runAction(action)
}
}
}
}

/// Queue action to be run
func queueAction(_ action: Action) {
self.actionStream.continuation.yield(action)
}
}
23 changes: 20 additions & 3 deletions Sources/Valkey/ValkeyClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ import ServiceLifecycle
///
/// `ValkeyClient` supports TLS using both NIOSSL and the Network framework.
@available(valkeySwift 1.0, *)
public final class ValkeyClient: Sendable {
public final class ValkeyClient: ActionRunner, Sendable {
enum Action: Sendable {
case runClient(ValkeyNodeClient)
}
let nodeClientFactory: ValkeyNodeClientFactory
/// single node
@usableFromInline
Expand All @@ -41,6 +44,8 @@ public final class ValkeyClient: Sendable {
let logger: Logger
/// running atomic
let runningAtomic: Atomic<Bool>
/// Action stream
let actionStream: ActionStream<Action>

/// Creates a new Valkey client
///
Expand Down Expand Up @@ -84,6 +89,8 @@ public final class ValkeyClient: Sendable {
self.logger = logger
self.runningAtomic = .init(false)
self.node = self.nodeClientFactory.makeConnectionPool(serverAddress: address)
self.actionStream = .init()
self.queueAction(.runClient(self.node))
}
}

Expand All @@ -95,10 +102,10 @@ extension ValkeyClient {
precondition(!atomicOp.original, "ValkeyClient.run() should just be called once!")
#if ServiceLifecycleSupport
await cancelWhenGracefulShutdown {
await self.node.run()
await self.runActionTaskGroup()
}
#else
await self.node.run()
await self.runActionTaskGroup()
#endif
}

Expand All @@ -117,6 +124,16 @@ extension ValkeyClient {
}
}

@available(valkeySwift 1.0, *)
extension ValkeyClient {
func runAction(_ action: Action) async {
switch action {
case .runClient(let nodeClient):
await nodeClient.run()
}
}
}

/// Extend ValkeyClient so we can call commands directly from it
@available(valkeySwift 1.0, *)
extension ValkeyClient: ValkeyClientProtocol {
Expand Down
116 changes: 116 additions & 0 deletions Tests/ValkeyTests/ActionRunnerTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the valkey-swift open source project
//
// Copyright (c) 2025 the valkey-swift project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of valkey-swift project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

import Synchronization
import Testing

@testable import Valkey

struct ActionRunnerTests {
@available(valkeySwift 1.0, *)
@Test
func testRunsAction() async throws {
final class TestActionRunner: ActionRunner {
let (stream, cont) = AsyncStream.makeStream(of: Void.self)
let actionStream: ActionStream<Action> = .init()

enum Action {
case action1
}
func runAction(_ action: Action) async {
switch action {
case .action1:
cont.yield()
}
}
}
let test = TestActionRunner()
await withTaskGroup(of: Void.self) { group in
group.addTask {
await test.runActionTaskGroup()
}
test.queueAction(.action1)
await test.stream.first { _ in true }
group.cancelAll()
}
}

@available(valkeySwift 1.0, *)
@Test
func testRunsMultipleActions() async throws {
final class TestActionRunner: ActionRunner {
let (stream, cont) = AsyncStream.makeStream(of: Void.self)
let actionStream: ActionStream<Action> = .init()
let value = Mutex<Int>(0)
enum Action {
case action1
case action2
}
func runAction(_ action: Action) async {
switch action {
case .action1:
value.withLock { $0 = $0 + 1 }
cont.yield()
case .action2:
value.withLock { $0 = $0 + 2 }
cont.yield()
}
}
}
let test = TestActionRunner()
await withTaskGroup(of: Void.self) { group in
group.addTask {
await test.runActionTaskGroup()
}
test.queueAction(.action1)
test.queueAction(.action2)
await test.stream.first { _ in true }
await test.stream.first { _ in true }
test.value.withLock {
#expect($0 == 3)
}
group.cancelAll()
}
}

@available(valkeySwift 1.0, *)
@Test
func testCancellationPropagatesToActions() async throws {
final class TestActionRunner: ActionRunner {
let (stream, cont) = AsyncStream.makeStream(of: Void.self)
let actionStream: ActionStream<Action> = .init()
enum Action {
case action1
}
func runAction(_ action: Action) async {
switch action {
case .action1:
cont.yield()
try? await Task.sleep(for: .seconds(60))
}
}
}
let test = TestActionRunner()
await withTaskGroup(of: Void.self) { group in
group.addTask {
await test.runActionTaskGroup()
}
test.queueAction(.action1)
await test.stream.first { _ in true }
let now = ContinuousClock.now
group.cancelAll()
#expect(.now - now < .seconds(1))
}
}
}