Skip to content

Commit 8472c2f

Browse files
authored
improve peer test (#288)
1 parent 269dd34 commit 8472c2f

File tree

3 files changed

+33
-19
lines changed

3 files changed

+33
-19
lines changed

Networking/Sources/Networking/Connection.swift

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ enum ConnectionState {
2828
case connecting(continuations: [CheckedContinuation<Void, Error>])
2929
case connected(publicKey: Data)
3030
case closed
31+
case closing
3132
case reconnect(publicKey: Data)
3233
}
3334

@@ -51,7 +52,7 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
5152
nil
5253
case let .connected(publicKey):
5354
publicKey
54-
case .closed:
55+
case .closed, .closing:
5556
nil
5657
case let .reconnect(publicKey):
5758
publicKey
@@ -99,30 +100,41 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
99100
for continuation in continuations {
100101
continuation.resume(throwing: ConnectionError.closed)
101102
}
102-
state = .closed
103103
}
104104
state = .closed
105105
}
106106
}
107107

108+
func closing() {
109+
state.write { state in
110+
if case let .connecting(continuations) = state {
111+
for continuation in continuations {
112+
continuation.resume(throwing: ConnectionError.closed)
113+
}
114+
}
115+
state = .closing
116+
}
117+
}
118+
108119
func reconnect(publicKey: Data) {
109120
state.write { state in
110121
if case let .connecting(continuations) = state {
111122
for continuation in continuations {
112123
continuation.resume(throwing: ConnectionError.reconnect)
113124
}
114-
state = .reconnect(publicKey: publicKey)
115125
}
116126
state = .reconnect(publicKey: publicKey)
117127
}
118128
}
119129

120130
public var isClosed: Bool {
121131
state.read {
122-
if case .closed = $0 {
123-
return true
132+
switch $0 {
133+
case .closed, .closing:
134+
true
135+
case .connected, .reconnect, .connecting:
136+
false
124137
}
125-
return false
126138
}
127139
}
128140

@@ -140,11 +152,7 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
140152
switch $0 {
141153
case .connecting:
142154
false
143-
case .connected:
144-
true
145-
case .closed:
146-
true
147-
case .reconnect:
155+
case .connected, .closed, .closing, .reconnect:
148156
true
149157
}
150158
}
@@ -165,6 +173,7 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
165173
}
166174

167175
public func close(abort: Bool = false) {
176+
closing()
168177
try? connection.shutdown(errorCode: abort ? 1 : 0) // TODO: define some error code
169178
}
170179

Networking/Sources/Networking/Peer.swift

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -177,14 +177,17 @@ public final class Peer<Handler: StreamHandler>: Sendable {
177177
public func broadcast(
178178
kind: Handler.PresistentHandler.StreamKind, message: Handler.PresistentHandler.Message
179179
) {
180-
let connections = impl.connections.read { connections in
181-
connections.byId.values
182-
}
183180
guard let messageData = try? message.encode() else {
184181
impl.logger.warning("Failed to encode message: \(message)")
185182
return
186183
}
184+
let connections = impl.connections.read { connections in
185+
connections.byId.values
186+
}
187187
for connection in connections {
188+
if connection.isClosed {
189+
continue
190+
}
188191
if let stream = try? connection.createPreistentStream(kind: kind) {
189192
Task {
190193
let res = await Result {
@@ -210,8 +213,9 @@ public final class Peer<Handler: StreamHandler>: Sendable {
210213
}
211214

212215
// there should be only one connection per peer
216+
// exlcude closed connections
213217
public var peersCount: Int {
214-
impl.connections.read { $0.byId.count }
218+
impl.connections.read { $0.byId.count { $0.value.isClosed == false } }
215219
}
216220

217221
public var peersRole: PeerRole {

Networking/Tests/NetworkingTests/PeerTests.swift

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,9 @@ struct PeerTests {
166166
let con = try peer.connect(to: centerPeer.listenAddress(), role: .builder)
167167
try await con.ready()
168168
}
169-
// Simulate close connections 1~3s
170-
try? await Task.sleep(for: .milliseconds(1000))
169+
170+
#expect(centerPeer.peersCount == 3)
171+
171172
centerPeer.broadcast(kind: .uniqueA, message: .init(kind: .uniqueA, data: Data("connection rotation strategy".utf8)))
172173
try? await Task.sleep(for: .milliseconds(100))
173174
var receivedCount = 0
@@ -546,7 +547,7 @@ struct PeerTests {
546547
#expect(receivedData == messageData + Data(" response".utf8))
547548
try? await Task.sleep(for: .milliseconds(100))
548549
// Simulate abnormal shutdown of connections
549-
connection.close(abort: true)
550+
try connection.connection.shutdown(errorCode: 1)
550551
// Wait to simulate downtime & reconnected 3~5s
551552
try? await Task.sleep(for: .milliseconds(3000))
552553
peer1.broadcast(
@@ -650,7 +651,7 @@ struct PeerTests {
650651
#expect(receivedData == messageData + Data(" response".utf8))
651652
try? await Task.sleep(for: .milliseconds(100))
652653
// Simulate a peer failure by disconnecting one peer
653-
connection.close(abort: false)
654+
try connection.connection.shutdown()
654655
// Wait to simulate downtime
655656
try? await Task.sleep(for: .milliseconds(200))
656657
// Reconnect the failing peer

0 commit comments

Comments
 (0)