Skip to content

Commit 731c79b

Browse files
committed
Merge branch 'master' into dev-coverage
* master: connection rotation strategy (#224)
2 parents dc9b21c + e50528b commit 731c79b

File tree

5 files changed

+87
-8
lines changed

5 files changed

+87
-8
lines changed

.github/workflows/ci.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ concurrency:
1313
jobs:
1414
lint:
1515
name: Swift Lint
16-
runs-on: ubuntu-latest
16+
runs-on: ubuntu-24.04
1717

1818
steps:
1919
- uses: actions/checkout@v4

Networking/Sources/MsQuicSwift/QuicStream.swift

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ private struct Storage {
88
let connection: QuicConnection
99
}
1010

11+
public enum SendError: Error {
12+
case emptyData
13+
}
14+
1115
public final class QuicStream: Sendable {
1216
public let id: UniqueId
1317
private let logger: Logger
@@ -97,9 +101,13 @@ public final class QuicStream: Sendable {
97101
throw QuicError.alreadyClosed
98102
}
99103

100-
// TODO: improve the case when data is empty
101104
let messageLength = data.count
102105

106+
if messageLength == 0 {
107+
logger.trace("No data to send.")
108+
throw SendError.emptyData // Throw a specific error or return
109+
}
110+
103111
let sendBufferRaw = UnsafeMutableRawPointer.allocate( // !! allocate
104112
byteCount: MemoryLayout<QUIC_BUFFER>.size + messageLength,
105113
alignment: MemoryLayout<QUIC_BUFFER>.alignment
@@ -108,7 +116,6 @@ public final class QuicStream: Sendable {
108116
let sendBuffer = sendBufferRaw.assumingMemoryBound(to: QUIC_BUFFER.self)
109117
let bufferPointer = sendBufferRaw.advanced(by: MemoryLayout<QUIC_BUFFER>.size).assumingMemoryBound(to: UInt8.self)
110118
data.copyBytes(to: bufferPointer, count: messageLength) // TODO: figure out a better way to avoid memory copy here
111-
112119
sendBuffer.pointee.Buffer = bufferPointer
113120
sendBuffer.pointee.Length = UInt32(messageLength)
114121

Networking/Sources/Networking/Connection.swift

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import AsyncChannels
22
import Foundation
33
import MsQuicSwift
4+
import Synchronization
45
import TracingUtils
56
import Utils
67

@@ -36,7 +37,7 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
3637

3738
public let role: PeerRole
3839
public let remoteAddress: NetAddr
39-
40+
private let lastActive: Atomic<TimeInterval> = Atomic(0)
4041
let presistentStreams: ThreadSafeContainer<
4142
[Handler.PresistentHandler.StreamKind: Stream<Handler>]
4243
> = .init([:])
@@ -58,6 +59,10 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
5859
}
5960
}
6061

62+
func getLastActive() -> TimeInterval {
63+
lastActive.load(ordering: .sequentiallyConsistent)
64+
}
65+
6166
public var id: UniqueId {
6267
connection.id
6368
}
@@ -68,6 +73,11 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
6873
self.role = role
6974
self.remoteAddress = remoteAddress
7075
self.initiatedByLocal = initiatedByLocal
76+
updateLastActive()
77+
}
78+
79+
func updateLastActive() {
80+
lastActive.store(Date().timeIntervalSince1970, ordering: .releasing)
7181
}
7282

7383
func opened(publicKey: Data) throws {

Networking/Sources/Networking/Peer.swift

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public struct PeerOptions<Handler: StreamHandler>: Sendable {
6565
}
6666
}
6767

68-
// TODO: reopen UP stream, peer reputation system to ban peers not following the protocol
68+
// TODO: peer reputation system to ban peers not following the protocol
6969
public final class Peer<Handler: StreamHandler>: Sendable {
7070
private let impl: PeerImpl<Handler>
7171

@@ -266,9 +266,15 @@ final class PeerImpl<Handler: StreamHandler>: Sendable {
266266
if role == .builder {
267267
let currentCount = connections.byAddr.values.filter { $0.role == role }.count
268268
if currentCount >= self.settings.maxBuilderConnections {
269-
self.logger.warning("max builder connections reached")
270-
// TODO: consider connection rotation strategy
271-
return false
269+
if let conn = connections.byAddr.values.filter({ $0.role == .builder })
270+
.sorted(by: { $0.getLastActive() < $1.getLastActive() }).first
271+
{
272+
self.logger.warning("Replacing least active builder connection at \(conn.remoteAddress)")
273+
conn.close(abort: false)
274+
} else {
275+
self.logger.warning("Max builder connections reached, no eligible replacement found")
276+
return false
277+
}
272278
}
273279
}
274280
if connections.byAddr[addr] != nil {
@@ -579,6 +585,10 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
579585
}
580586
if let stream {
581587
stream.received(data: data)
588+
let connection = impl.connections.read { connections in
589+
connections.byId[stream.connectionId]
590+
}
591+
connection?.updateLastActive()
582592
}
583593
}
584594

Networking/Tests/NetworkingTests/PeerTests.swift

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,58 @@ struct PeerTests {
124124
typealias EphemeralHandler = MockEphemeralStreamHandler
125125
}
126126

127+
@Test
128+
func connectionRotationStrategy() async throws {
129+
var peers: [Peer<MockStreamHandler>] = []
130+
var handlers: [MockPresentStreamHandler] = []
131+
let centerPeer = try Peer(
132+
options: PeerOptions<MockStreamHandler>(
133+
role: .validator,
134+
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
135+
genesisHeader: Data32(),
136+
secretKey: Ed25519.SecretKey(from: Data32.random()),
137+
presistentStreamHandler: MockPresentStreamHandler(),
138+
ephemeralStreamHandler: MockEphemeralStreamHandler(),
139+
serverSettings: .defaultSettings,
140+
clientSettings: .defaultSettings
141+
)
142+
)
143+
// Create 30 peer nodes
144+
for _ in 0 ..< 30 {
145+
let handler = MockPresentStreamHandler()
146+
handlers.append(handler)
147+
let peer = try Peer(
148+
options: PeerOptions<MockStreamHandler>(
149+
role: .builder,
150+
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
151+
genesisHeader: Data32(),
152+
secretKey: Ed25519.SecretKey(from: Data32.random()),
153+
presistentStreamHandler: handler,
154+
ephemeralStreamHandler: MockEphemeralStreamHandler(),
155+
serverSettings: .defaultSettings,
156+
clientSettings: .defaultSettings
157+
)
158+
)
159+
peers.append(peer)
160+
}
161+
162+
// Make some connections
163+
for i in 0 ..< 30 {
164+
let peer = peers[i]
165+
let con = try peer.connect(to: centerPeer.listenAddress(), role: .builder)
166+
try await con.ready()
167+
}
168+
// Simulate close connections 3~5s
169+
try? await Task.sleep(for: .milliseconds(5000))
170+
centerPeer.broadcast(kind: .uniqueA, message: .init(kind: .uniqueA, data: Data("connection rotation strategy".utf8)))
171+
try? await Task.sleep(for: .milliseconds(1000))
172+
var receivedCount = 0
173+
for handler in handlers {
174+
receivedCount += await handler.receivedData.count
175+
}
176+
#expect(receivedCount == PeerSettings.defaultSettings.maxBuilderConnections)
177+
}
178+
127179
@Test
128180
func mockHandshakeFailure() async throws {
129181
let mockPeerTest = try MockPeerEventTests()

0 commit comments

Comments
 (0)