Skip to content

Commit f276565

Browse files
Cluster testing devcontainer (#153)
* Add devcontainer for testing clusters Signed-off-by: Adam Fowler <[email protected]> * Update tests, add failover test Need to improve this test to ensure we hit a moved error everytime Signed-off-by: Adam Fowler <[email protected]> * Fix parsing of IPv6 addresses in moved error Signed-off-by: Adam Fowler <[email protected]> * Fix testFailover so it always returns MOVED error Signed-off-by: Adam Fowler <[email protected]> * cap_add and security_opt Signed-off-by: Adam Fowler <[email protected]> * Add docker-compose with replicas Signed-off-by: Adam Fowler <[email protected]> * Fix YAML lint error Signed-off-by: Adam Fowler <[email protected]> * Update Sources/Valkey/Cluster/ValkeyClusterClient.swift Co-authored-by: Fabian Fett <[email protected]> Signed-off-by: Adam Fowler <[email protected]> * Add IPv6 loopback address test for move error Signed-off-by: Adam Fowler <[email protected]> --------- Signed-off-by: Adam Fowler <[email protected]> Co-authored-by: Fabian Fett <[email protected]>
1 parent 90d4172 commit f276565

File tree

9 files changed

+262
-28
lines changed

9 files changed

+262
-28
lines changed

.devcontainer/devcontainer.json

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
{
2+
"name": "Swift",
3+
"dockerComposeFile": "docker-compose-cluster.yml",
4+
"service": "app",
5+
"workspaceFolder": "/workspace",
6+
"features": {
7+
"ghcr.io/devcontainers/features/common-utils:2": {
8+
"installZsh": "false",
9+
"username": "ubuntu",
10+
"upgradePackages": "false"
11+
},
12+
"ghcr.io/devcontainers/features/git:1": {
13+
"version": "os-provided",
14+
"ppa": "false"
15+
},
16+
"ghcr.io/swift-server-community/swift-devcontainer-features/jemalloc:1": { }
17+
},
18+
// Configure tool-specific properties.
19+
"customizations": {
20+
// Configure properties specific to VS Code.
21+
"vscode": {
22+
// Set *default* container specific settings.json values on container create.
23+
"settings": {
24+
"lldb.library": "/usr/lib/liblldb.so",
25+
"swift.path": ""
26+
},
27+
// Add the IDs of extensions you want installed when the container is created.
28+
"extensions": [
29+
"swiftlang.swift-vscode"
30+
]
31+
}
32+
},
33+
// Use 'forwardPorts' to make a list of ports inside the container available locally.
34+
// "forwardPorts": [],
35+
36+
// Set `remoteUser` to `root` to connect as root instead. More info: https://aka.ms/vscode-remote/containers/non-root.
37+
"remoteUser": "ubuntu"
38+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
version: '3.8'
2+
3+
services:
4+
5+
# Here we have six Valkey containers with Cluster mode enabled,
6+
# three of them will work as primary nodes and each one of
7+
# will have a replica, so in case of failures, the replica becomes the primary.
8+
# They are configured by the `cluster_initiator` container.
9+
10+
# To make Docker compatible with Valkey Cluster, you need to use Docker's host
11+
# networking mode. Please see the --net=host option in the Docker documentation
12+
# for more information.
13+
app:
14+
image: swift:6.1
15+
network_mode: "host"
16+
volumes:
17+
- ..:/workspace
18+
depends_on:
19+
- cluster_initiator
20+
- valkey
21+
cap_add:
22+
- SYS_PTRACE
23+
security_opt:
24+
- seccomp=unconfined
25+
environment:
26+
- VALKEY_NODE1_HOSTNAME=localhost
27+
- VALKEY_NODE1_PORT=36001
28+
command: sleep infinity
29+
30+
valkey:
31+
image: 'valkey/valkey:latest'
32+
network_mode: "host"
33+
command: valkey-server --port 6379
34+
35+
valkey_cluster_1:
36+
image: 'valkey/valkey:latest'
37+
network_mode: "host"
38+
command: valkey-server --port 36001 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes
39+
40+
valkey_cluster_2:
41+
image: 'valkey/valkey:latest'
42+
network_mode: "host"
43+
command: valkey-server --port 36002 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes
44+
45+
valkey_cluster_3:
46+
image: 'valkey/valkey:latest'
47+
network_mode: "host"
48+
command: valkey-server --port 36003 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes
49+
50+
valkey_cluster_4:
51+
image: 'valkey/valkey:latest'
52+
network_mode: "host"
53+
command: valkey-server --port 36004 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes
54+
55+
valkey_cluster_5:
56+
image: 'valkey/valkey:latest'
57+
network_mode: "host"
58+
command: valkey-server --port 36005 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes
59+
60+
valkey_cluster_6:
61+
image: 'valkey/valkey:latest'
62+
network_mode: "host"
63+
command: valkey-server --port 36006 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes
64+
65+
# Ephemeral container to create the valkey cluster connections.
66+
# Once the setup is done, this container shuts down
67+
# and the cluster can be used by the service app container
68+
cluster_initiator:
69+
image: 'valkey/valkey:latest'
70+
network_mode: "host"
71+
container_name: cluster_initiator
72+
command: valkey-cli --cluster create localhost:36001 localhost:36002 localhost:36003 localhost:36004 localhost:36005 localhost:36006 --cluster-replicas 1 --cluster-yes
73+
tty: true
74+
depends_on:
75+
- valkey_cluster_1
76+
- valkey_cluster_2
77+
- valkey_cluster_3
78+
- valkey_cluster_4
79+
- valkey_cluster_5
80+
- valkey_cluster_6
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
version: '3.8'
2+
3+
services:
4+
5+
# Here we have four Valkey containers. A primary, two replicas replicating from
6+
# the primary and an additinal replica replicating from one of the replicas
7+
8+
# To make Docker compatible with Valkey replicas, you need to use Docker's host
9+
# networking mode. Please see the --net=host option in the Docker documentation
10+
# for more information.
11+
app:
12+
image: swift:6.1
13+
network_mode: "host"
14+
volumes:
15+
- ..:/workspace
16+
depends_on:
17+
- valkey
18+
- valkey_replica_1
19+
- valkey_replica_2
20+
- valkey_replica_3
21+
cap_add:
22+
- SYS_PTRACE
23+
security_opt:
24+
- seccomp=unconfined
25+
command: sleep infinity
26+
27+
valkey:
28+
image: 'valkey/valkey:latest'
29+
network_mode: "host"
30+
command: valkey-server --port 6379
31+
32+
valkey_replica_1:
33+
image: 'valkey/valkey:latest'
34+
network_mode: "host"
35+
depends_on:
36+
- valkey
37+
command: valkey-server --port 36001 --replicaof 127.0.0.1 6379
38+
39+
valkey_replica_2:
40+
image: 'valkey/valkey:latest'
41+
network_mode: "host"
42+
depends_on:
43+
- valkey
44+
command: valkey-server --port 36002 --replicaof 127.0.0.1 6379
45+
46+
valkey_replica_3:
47+
image: 'valkey/valkey:latest'
48+
network_mode: "host"
49+
depends_on:
50+
- valkey_replica_2
51+
command: valkey-server --port 36003 --replicaof 127.0.0.1 36002

.gitignore

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,5 @@ DerivedData/
99
.vscode
1010
Package.resolved
1111
.benchmarkBaselines/
12-
.devcontainer
1312
.swift-version
1413
.docc-build

Sources/Valkey/Cluster/ValkeyClusterClient.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ public final class ValkeyClusterClient: Sendable {
167167
guard let errorMessage = error.message, let movedError = ValkeyMovedError(errorMessage) else {
168168
throw error
169169
}
170+
self.logger.trace("Received move error", metadata: ["error": "\(movedError)"])
170171
clientSelector = { try await self.client(for: movedError) }
171172
}
172173
}

Sources/Valkey/Cluster/ValkeyMovedError.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ extension ValkeyMovedError {
7575

7676
let firstEndpointIndex = msg.index(after: spaceAfterSlotIndex)
7777

78-
guard let colonIndex = msg[spaceAfterSlotIndex...].firstIndex(where: { $0 == ":" }) else {
78+
guard let colonIndex = msg[spaceAfterSlotIndex...].lastIndex(of: ":") else {
7979
return nil
8080
}
8181

Tests/ClusterIntegrationTests/ClusterIntegrationTests.swift

Lines changed: 68 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,22 @@ import Foundation
1616
import Logging
1717
import Testing
1818
import Valkey
19+
import XCTest
1920

21+
@Suite(
22+
"Cluster Integration Tests",
23+
.serialized,
24+
.disabled(if: clusterFirstNodeHostname == nil, "VALKEY_NODE1_HOSTNAME environment variable is not set.")
25+
)
2026
struct ClusterIntegrationTests {
21-
22-
@Test(.disabled(if: ClusterIntegrationTests.firstNodeHostname == nil, "VALKEY_NODE1_HOSTNAME environment variable is not set."))
27+
@Test
2328
@available(valkeySwift 1.0, *)
2429
func testSetGet() async throws {
2530
var logger = Logger(label: "ValkeyCluster")
2631
logger.logLevel = .trace
27-
let firstNodeHostname = ClusterIntegrationTests.firstNodeHostname!
28-
let firstNodePort = ClusterIntegrationTests.firstNodePort ?? 6379
29-
try await Self.withValkeyCluster([(host: firstNodeHostname, port: firstNodePort, tls: false)]) { (client, logger) in
32+
let firstNodeHostname = clusterFirstNodeHostname!
33+
let firstNodePort = clusterFirstNodePort ?? 6379
34+
try await Self.withValkeyCluster([(host: firstNodeHostname, port: firstNodePort, tls: false)], logger: logger) { client in
3035
try await Self.withKey(connection: client) { key in
3136
try await client.set(key, value: "Hello")
3237

@@ -36,13 +41,14 @@ struct ClusterIntegrationTests {
3641
}
3742
}
3843

44+
@Test
3945
@available(valkeySwift 1.0, *)
4046
func testWithConnection() async throws {
4147
var logger = Logger(label: "ValkeyCluster")
4248
logger.logLevel = .trace
43-
let firstNodeHostname = ClusterIntegrationTests.firstNodeHostname!
44-
let firstNodePort = ClusterIntegrationTests.firstNodePort ?? 6379
45-
try await Self.withValkeyCluster([(host: firstNodeHostname, port: firstNodePort, tls: false)]) { (client, logger) in
49+
let firstNodeHostname = clusterFirstNodeHostname!
50+
let firstNodePort = clusterFirstNodePort ?? 6379
51+
try await Self.withValkeyCluster([(host: firstNodeHostname, port: firstNodePort, tls: false)], logger: logger) { client in
4652
try await Self.withKey(connection: client) { key in
4753
try await client.withConnection(forKeys: [key]) { connection in
4854
_ = try await connection.set(key, value: "Hello")
@@ -53,6 +59,36 @@ struct ClusterIntegrationTests {
5359
}
5460
}
5561

62+
@Test
63+
@available(valkeySwift 1.0, *)
64+
func testFailover() async throws {
65+
var logger = Logger(label: "ValkeyCluster")
66+
logger.logLevel = .trace
67+
let firstNodeHostname = clusterFirstNodeHostname!
68+
let firstNodePort = clusterFirstNodePort ?? 6379
69+
try await Self.withValkeyCluster([(host: firstNodeHostname, port: firstNodePort, tls: false)], logger: logger) { clusterClient in
70+
try await Self.withKey(connection: clusterClient) { key in
71+
try await clusterClient.set(key, value: "bar")
72+
let cluster = try await clusterClient.clusterShards()
73+
let shard = try #require(
74+
cluster.shards.first { shard in
75+
let hashSlot = HashSlot(key: key)
76+
return shard.slots[0].lowerBound <= hashSlot && shard.slots[0].upperBound >= hashSlot
77+
}
78+
)
79+
let replica = try #require(shard.nodes.first { $0.role == .replica })
80+
let port = try #require(replica.port)
81+
// connect to replica and call CLUSTER FAILOVER
82+
try await withValkeyClient(.hostname(replica.endpoint, port: port), logger: logger) { client in
83+
try await client.clusterFailover()
84+
}
85+
try await clusterClient.set(key, value: "baz")
86+
let response = try await clusterClient.get(key)
87+
#expect(response.map { String(buffer: $0) } == "baz")
88+
}
89+
}
90+
}
91+
5692
@available(valkeySwift 1.0, *)
5793
static func withKey<Value>(
5894
connection: some ValkeyConnectionProtocol,
@@ -73,10 +109,9 @@ struct ClusterIntegrationTests {
73109
static func withValkeyCluster<T>(
74110
_ nodeAddresses: [(host: String, port: Int, tls: Bool)],
75111
nodeClientConfiguration: ValkeyClientConfiguration = .init(),
76-
_ body: (ValkeyClusterClient, Logger) async throws -> sending T
112+
logger: Logger,
113+
_ body: (ValkeyClusterClient) async throws -> sending T
77114
) async throws -> T {
78-
var logger = Logger(label: "Valkey")
79-
logger.logLevel = .debug
80115
let client = ValkeyClusterClient(
81116
clientConfiguration: nodeClientConfiguration,
82117
nodeDiscovery: ValkeyStaticNodeDiscovery(nodeAddresses.map { .init(host: $0.host, port: $0.port, useTLS: $0.tls) }),
@@ -90,7 +125,7 @@ struct ClusterIntegrationTests {
90125

91126
let result: Result<T, any Error>
92127
do {
93-
result = try await .success(body(client, logger))
128+
result = try await .success(body(client))
94129
} catch {
95130
result = .failure(error)
96131
}
@@ -102,14 +137,26 @@ struct ClusterIntegrationTests {
102137
return try result.get()
103138
}
104139

105-
}
106-
107-
extension ClusterIntegrationTests {
108-
static var firstNodeHostname: String? {
109-
ProcessInfo.processInfo.environment["VALKEY_NODE1_HOSTNAME"]
110-
}
111-
112-
static var firstNodePort: Int? {
113-
ProcessInfo.processInfo.environment["VALKEY_NODE1_PORT"].flatMap { Int($0) }
140+
@available(valkeySwift 1.0, *)
141+
func withValkeyClient(
142+
_ address: ValkeyServerAddress,
143+
configuration: ValkeyClientConfiguration = .init(),
144+
logger: Logger,
145+
operation: @escaping @Sendable (ValkeyClient) async throws -> Void
146+
) async throws {
147+
try await withThrowingTaskGroup(of: Void.self) { group in
148+
let client = ValkeyClient(address, configuration: configuration, logger: logger)
149+
group.addTask {
150+
await client.run()
151+
}
152+
group.addTask {
153+
try await operation(client)
154+
}
155+
try await group.next()
156+
group.cancelAll()
157+
}
114158
}
115159
}
160+
161+
private let clusterFirstNodeHostname: String? = ProcessInfo.processInfo.environment["VALKEY_NODE1_HOSTNAME"]
162+
private let clusterFirstNodePort: Int? = ProcessInfo.processInfo.environment["VALKEY_NODE1_PORT"].flatMap { Int($0) }

Tests/IntegrationTests/ValkeyTests.swift

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -308,9 +308,12 @@ struct GeneratedCommands {
308308
var logger = Logger(label: "Valkey")
309309
logger.logLevel = .debug
310310
try await withValkeyConnection(.hostname(valkeyHostname, port: 6379), logger: logger) { connection in
311-
try await withKey(connection: connection) { key in
312-
let role = try await connection.role()
313-
print(role)
311+
let role = try await connection.role()
312+
switch role {
313+
case .primary:
314+
break
315+
case .replica, .sentinel:
316+
Issue.record()
314317
}
315318
}
316319
}

Tests/ValkeyTests/Cluster/ValkeyMovedErrorTests.swift

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ struct ValkeyMovedErrorTests {
3232
#expect(movedError?.port == 6379)
3333
}
3434

35-
@Test("parseMovedError parses valid MOVED error from bulkError")
36-
func testParseValidMovedErrorFromBulkError() async throws {
35+
@Test("parseMovedError parses valid MOVED error with IPv4")
36+
func testParseValidMovedErrorWithIPv4() async throws {
3737
// Create a RESPToken with a MOVED error
3838
let errorMessage = "MOVED 5000 10.0.0.1:6380"
3939

@@ -47,6 +47,21 @@ struct ValkeyMovedErrorTests {
4747
#expect(movedError?.port == 6380)
4848
}
4949

50+
@Test("parseMovedError parses valid MOVED error with IPv6")
51+
func testParseValidMovedErrorWithIPv6() async throws {
52+
// Create a RESPToken with a MOVED error
53+
let errorMessage = "MOVED 5000 ::1:9000"
54+
55+
// Parse the moved error
56+
let movedError = ValkeyMovedError(errorMessage)
57+
58+
// Verify the moved error is parsed correctly
59+
#expect(movedError != nil)
60+
#expect(movedError?.slot.rawValue == 5000)
61+
#expect(movedError?.endpoint == "::1")
62+
#expect(movedError?.port == 9000)
63+
}
64+
5065
@Test("parseMovedError returns nil for error tokens without MOVED prefix")
5166
func testParseNonMovedError() async throws {
5267
#expect(ValkeyMovedError("ERR unknown command") == nil)

0 commit comments

Comments
 (0)