Skip to content

Commit 35f0d13

Browse files
authored
Connection as actor. (#13)
* ValkeyConnection as actor * Fix * Drop Swift 6.0
1 parent 0d07442 commit 35f0d13

File tree

8 files changed

+50
-214
lines changed

8 files changed

+50
-214
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ jobs:
2121
timeout-minutes: 15
2222
strategy:
2323
matrix:
24-
image: ["swift:6.0", "swift:6.1"]
24+
image: ["swift:6.1"]
2525
container:
2626
image: ${{ matrix.image }}
2727
services:

[email protected]

Lines changed: 0 additions & 93 deletions
This file was deleted.

Sources/Valkey/Connection/ValkeyConnection+ConnectionPool.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ extension ValkeyConnection: PooledConnection {
2121
// connection id
2222
public typealias ID = Int
2323
// on close
24-
public func onClose(_ closure: @escaping @Sendable ((any Error)?) -> Void) {
24+
public nonisolated func onClose(_ closure: @escaping @Sendable ((any Error)?) -> Void) {
2525
self.channel.closeFuture.whenComplete { _ in closure(nil) }
2626
}
2727
}

Sources/Valkey/Connection/ValkeyConnection.swift

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,17 @@ import NIOTransportServices
2424
#endif
2525

2626
/// Single connection to a Valkey database
27-
public final class ValkeyConnection: ValkeyConnectionProtocol, Sendable {
27+
public final actor ValkeyConnection: ValkeyConnectionProtocol, Sendable {
28+
nonisolated public let unownedExecutor: UnownedSerialExecutor
29+
2830
/// Connection ID, used by connection pool
2931
public let id: ID
3032
/// Logger used by Server
3133
let logger: Logger
3234
@usableFromInline
3335
let channel: Channel
3436
@usableFromInline
35-
let channelHandler: NIOLoopBound<ValkeyChannelHandler>
37+
let channelHandler: ValkeyChannelHandler
3638
let configuration: ValkeyClientConfiguration
3739
let isClosed: Atomic<Bool>
3840

@@ -44,8 +46,9 @@ public final class ValkeyConnection: ValkeyConnectionProtocol, Sendable {
4446
configuration: ValkeyClientConfiguration,
4547
logger: Logger
4648
) {
49+
self.unownedExecutor = channel.eventLoop.executor.asUnownedSerialExecutor()
4750
self.channel = channel
48-
self.channelHandler = .init(channelHandler, eventLoop: channel.eventLoop)
51+
self.channelHandler = channelHandler
4952
self.configuration = configuration
5053
self.id = connectionID
5154
self.logger = logger
@@ -97,7 +100,7 @@ public final class ValkeyConnection: ValkeyConnectionProtocol, Sendable {
97100

98101
/// Close connection
99102
/// - Returns: EventLoopFuture that is completed on connection closure
100-
public func close() {
103+
public nonisolated func close() {
101104
guard self.isClosed.compareExchange(expected: false, desired: true, successOrdering: .relaxed, failureOrdering: .relaxed).exchanged else {
102105
return
103106
}
@@ -110,13 +113,7 @@ public final class ValkeyConnection: ValkeyConnectionProtocol, Sendable {
110113
@inlinable
111114
public func send<Command: ValkeyCommand>(command: Command) async throws -> Command.Response {
112115
let result = try await withCheckedThrowingContinuation { continuation in
113-
if self.channel.eventLoop.inEventLoop {
114-
self.channelHandler.value.write(command: command, continuation: continuation)
115-
} else {
116-
self.channel.eventLoop.execute {
117-
self.channelHandler.value.write(command: command, continuation: continuation)
118-
}
119-
}
116+
self.channelHandler.write(command: command, continuation: continuation)
120117
}
121118
return try .init(fromRESP: result)
122119
}
@@ -129,7 +126,7 @@ public final class ValkeyConnection: ValkeyConnectionProtocol, Sendable {
129126
@inlinable
130127
public func pipeline<each Command: ValkeyCommand>(
131128
_ commands: repeat each Command
132-
) async -> (repeat Result<(each Command).Response, Error>) {
129+
) async -> sending (repeat Result<(each Command).Response, Error>) {
133130
func convert<Response: RESPTokenDecodable>(_ result: Result<RESPToken, Error>, to: Response.Type) -> Result<Response, Error> {
134131
result.flatMap {
135132
do {
@@ -149,13 +146,7 @@ public final class ValkeyConnection: ValkeyConnectionProtocol, Sendable {
149146
let outBuffer = encoder.buffer
150147
let promises = mpromises
151148
// write directly to channel handler
152-
if self.channel.eventLoop.inEventLoop {
153-
self.channelHandler.value.write(request: ValkeyRequest.multiple(buffer: outBuffer, promises: promises.map { .nio($0) }))
154-
} else {
155-
self.channel.eventLoop.execute {
156-
self.channelHandler.value.write(request: ValkeyRequest.multiple(buffer: outBuffer, promises: promises.map { .nio($0) }))
157-
}
158-
}
149+
self.channelHandler.write(request: ValkeyRequest.multiple(buffer: outBuffer, promises: promises.map { .nio($0) }))
159150

160151
// get response from channel handler
161152
var index = AutoIncrementingInteger()

Sources/Valkey/Subscriptions/ValkeyConnection+subscribe.swift

Lines changed: 14 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -168,45 +168,29 @@ extension ValkeyConnection {
168168
@usableFromInline
169169
func subscribe(
170170
command: some ValkeyCommand,
171-
filters: [ValkeySubscriptionFilter],
172-
isolation: isolated (any Actor)? = #isolation
171+
filters: [ValkeySubscriptionFilter]
173172
) async throws -> (Int, ValkeySubscription) {
174173
let (stream, streamContinuation) = ValkeySubscription.makeStream()
175174
let subscriptionID: Int = try await withCheckedThrowingContinuation { continuation in
176-
if self.channel.eventLoop.inEventLoop {
177-
self.channelHandler.value.subscribe(
178-
command: command,
179-
streamContinuation: streamContinuation,
180-
filters: filters,
181-
promise: .swift(continuation)
182-
)
183-
} else {
184-
self.channel.eventLoop.execute {
185-
self.channelHandler.value.subscribe(
186-
command: command,
187-
streamContinuation: streamContinuation,
188-
filters: filters,
189-
promise: .swift(continuation)
190-
)
191-
}
192-
}
175+
self.channelHandler.subscribe(
176+
command: command,
177+
streamContinuation: streamContinuation,
178+
filters: filters,
179+
promise: .swift(continuation)
180+
)
193181
}
194182
return (subscriptionID, stream)
195183
}
196184

197185
@usableFromInline
198-
func unsubscribe(
199-
id: Int,
200-
isolation: isolated (any Actor)? = #isolation
201-
) async throws {
186+
func unsubscribe(id: Int) async throws {
202187
try await withCheckedThrowingContinuation { continuation in
203-
if self.channel.eventLoop.inEventLoop {
204-
self.channelHandler.value.unsubscribe(id: id, promise: .swift(continuation))
205-
} else {
206-
self.channel.eventLoop.execute {
207-
self.channelHandler.value.unsubscribe(id: id, promise: .swift(continuation))
208-
}
209-
}
188+
self.channelHandler.unsubscribe(id: id, promise: .swift(continuation))
210189
}
211190
}
191+
192+
/// DEBUG function to check if the internal subscription state machine is empty
193+
package func isSubscriptionsEmpty() -> Bool {
194+
self.channelHandler.subscriptions.isEmpty
195+
}
212196
}

Sources/Valkey/ValkeyCommand.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import NIOCore
1616

1717
/// A Valkey command that can be executed on a connection.
1818
public protocol ValkeyCommand: Sendable {
19-
associatedtype Response: RESPTokenDecodable = RESPToken
19+
associatedtype Response: RESPTokenDecodable & Sendable = RESPToken
2020
associatedtype Keys: Collection<ValkeyKey>
2121

2222
/// Keys affected by command. This is used in cluster mode to determine which

Tests/IntegrationTests/ValkeyTests.swift

Lines changed: 8 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -379,9 +379,7 @@ struct GeneratedCommands {
379379
await #expect(throws: Never.self) { try await iterator.next()?.message == "hello" }
380380
await #expect(throws: Never.self) { try await iterator.next()?.message == "goodbye" }
381381
}
382-
try await connection.channel.eventLoop.submit {
383-
#expect(connection.channelHandler.value.subscriptions.isEmpty)
384-
}.get()
382+
#expect(await connection.isSubscriptionsEmpty())
385383
}
386384
}
387385
try await client.withConnection { connection in
@@ -419,9 +417,7 @@ struct GeneratedCommands {
419417
cont.yield()
420418
await #expect(throws: Never.self) { try await iterator.next()?.message == "!" }
421419
}
422-
try await connection.channel.eventLoop.submit {
423-
#expect(connection.channelHandler.value.subscriptions.isEmpty)
424-
}.get()
420+
#expect(await connection.isSubscriptionsEmpty())
425421
}
426422
}
427423
try await client.withConnection { connection in
@@ -454,9 +450,7 @@ struct GeneratedCommands {
454450
await #expect(throws: Never.self) { try await iterator2.next()?.message == "goodbye" }
455451
}
456452
}
457-
try await connection.channel.eventLoop.submit {
458-
#expect(connection.channelHandler.value.subscriptions.isEmpty)
459-
}.get()
453+
#expect(await connection.isSubscriptionsEmpty())
460454
}
461455
}
462456
try await client.withConnection { connection in
@@ -486,9 +480,7 @@ struct GeneratedCommands {
486480
await #expect(throws: Never.self) { try await iterator.next()?.message == "2" }
487481
await #expect(throws: Never.self) { try await iterator.next()?.message == "3" }
488482
}
489-
try await connection.channel.eventLoop.submit {
490-
#expect(connection.channelHandler.value.subscriptions.isEmpty)
491-
}.get()
483+
#expect(await connection.isSubscriptionsEmpty())
492484
}
493485
}
494486
try await client.withConnection { connection in
@@ -518,9 +510,7 @@ struct GeneratedCommands {
518510
try #expect(await iterator.next() == .init(channel: "pattern.1", message: "hello"))
519511
try #expect(await iterator.next() == .init(channel: "pattern.abc", message: "goodbye"))
520512
}
521-
try await connection.channel.eventLoop.submit {
522-
#expect(connection.channelHandler.value.subscriptions.isEmpty)
523-
}.get()
513+
#expect(await connection.isSubscriptionsEmpty())
524514
}
525515
}
526516
try await client.withConnection { connection in
@@ -553,9 +543,7 @@ struct GeneratedCommands {
553543
try #expect(await iterator2.next() == .init(channel: "PatternChannelSubscriptions2", message: "goodbye"))
554544
}
555545
}
556-
try await connection.channel.eventLoop.submit {
557-
#expect(connection.channelHandler.value.subscriptions.isEmpty)
558-
}.get()
546+
#expect(await connection.isSubscriptionsEmpty())
559547
}
560548
}
561549
try await client.withConnection { connection in
@@ -583,9 +571,7 @@ struct GeneratedCommands {
583571
try #expect(await iterator.next() == .init(channel: "shard", message: "hello"))
584572
try #expect(await iterator.next() == .init(channel: "shard", message: "goodbye"))
585573
}
586-
try await connection.channel.eventLoop.submit {
587-
#expect(connection.channelHandler.value.subscriptions.isEmpty)
588-
}.get()
574+
#expect(await connection.isSubscriptionsEmpty())
589575
}
590576
}
591577
try await client.withConnection { connection in
@@ -621,9 +607,7 @@ struct GeneratedCommands {
621607

622608
await #expect(throws: Never.self) { try await iterator.next()?.message == "goodbye" }
623609
}
624-
try await connection.channel.eventLoop.submit {
625-
#expect(connection.channelHandler.value.subscriptions.isEmpty)
626-
}.get()
610+
#expect(await connection.isSubscriptionsEmpty())
627611
}
628612
}
629613
try await valkeyClient.withConnection { connection in

0 commit comments

Comments
 (0)