Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ concurrency:
jobs:
lint:
name: Swift Lint
runs-on: ubuntu-latest
runs-on: ubuntu-24.04

steps:
- uses: actions/checkout@v4
Expand Down
11 changes: 9 additions & 2 deletions Networking/Sources/MsQuicSwift/QuicStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ private struct Storage {
let connection: QuicConnection
}

public enum SendError: Error {
case emptyData
}

public final class QuicStream: Sendable {
public let id: UniqueId
private let logger: Logger
Expand Down Expand Up @@ -97,9 +101,13 @@ public final class QuicStream: Sendable {
throw QuicError.alreadyClosed
}

// TODO: improve the case when data is empty
let messageLength = data.count

if messageLength == 0 {
logger.trace("No data to send.")
throw SendError.emptyData // Throw a specific error or return
}

let sendBufferRaw = UnsafeMutableRawPointer.allocate( // !! allocate
byteCount: MemoryLayout<QUIC_BUFFER>.size + messageLength,
alignment: MemoryLayout<QUIC_BUFFER>.alignment
Expand All @@ -108,7 +116,6 @@ public final class QuicStream: Sendable {
let sendBuffer = sendBufferRaw.assumingMemoryBound(to: QUIC_BUFFER.self)
let bufferPointer = sendBufferRaw.advanced(by: MemoryLayout<QUIC_BUFFER>.size).assumingMemoryBound(to: UInt8.self)
data.copyBytes(to: bufferPointer, count: messageLength) // TODO: figure out a better way to avoid memory copy here

sendBuffer.pointee.Buffer = bufferPointer
sendBuffer.pointee.Length = UInt32(messageLength)

Expand Down
12 changes: 11 additions & 1 deletion Networking/Sources/Networking/Connection.swift
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import AsyncChannels
import Foundation
import MsQuicSwift
import Synchronization
import TracingUtils
import Utils

Expand Down Expand Up @@ -36,7 +37,7 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP

public let role: PeerRole
public let remoteAddress: NetAddr

private let lastActive: Atomic<TimeInterval> = Atomic(0)
let presistentStreams: ThreadSafeContainer<
[Handler.PresistentHandler.StreamKind: Stream<Handler>]
> = .init([:])
Expand All @@ -58,6 +59,10 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
}
}

func getLastActive() -> TimeInterval {
lastActive.load(ordering: .sequentiallyConsistent)
}

public var id: UniqueId {
connection.id
}
Expand All @@ -68,6 +73,11 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
self.role = role
self.remoteAddress = remoteAddress
self.initiatedByLocal = initiatedByLocal
updateLastActive()
}

func updateLastActive() {
lastActive.store(Date().timeIntervalSince1970, ordering: .releasing)
}

func opened(publicKey: Data) throws {
Expand Down
18 changes: 14 additions & 4 deletions Networking/Sources/Networking/Peer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public struct PeerOptions<Handler: StreamHandler>: Sendable {
}
}

// TODO: reopen UP stream, peer reputation system to ban peers not following the protocol
// TODO: peer reputation system to ban peers not following the protocol
public final class Peer<Handler: StreamHandler>: Sendable {
private let impl: PeerImpl<Handler>

Expand Down Expand Up @@ -271,9 +271,15 @@ final class PeerImpl<Handler: StreamHandler>: Sendable {
if role == .builder {
let currentCount = connections.byAddr.values.filter { $0.role == role }.count
if currentCount >= self.settings.maxBuilderConnections {
self.logger.warning("max builder connections reached")
// TODO: consider connection rotation strategy
return false
if let conn = connections.byAddr.values.filter({ $0.role == .builder })
.sorted(by: { $0.getLastActive() < $1.getLastActive() }).first
{
self.logger.warning("Replacing least active builder connection at \(conn.remoteAddress)")
conn.close(abort: false)
} else {
self.logger.warning("Max builder connections reached, no eligible replacement found")
return false
}
}
}
if connections.byAddr[addr] != nil {
Expand Down Expand Up @@ -584,6 +590,10 @@ private struct PeerEventHandler<Handler: StreamHandler>: QuicEventHandler {
}
if let stream {
stream.received(data: data)
let connection = impl.connections.read { connections in
connections.byId[stream.connectionId]
}
connection?.updateLastActive()
}
}

Expand Down
52 changes: 52 additions & 0 deletions Networking/Tests/NetworkingTests/PeerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,58 @@ struct PeerTests {
typealias EphemeralHandler = MockEphemeralStreamHandler
}

@Test
func connectionRotationStrategy() async throws {
var peers: [Peer<MockStreamHandler>] = []
var handlers: [MockPresentStreamHandler] = []
let centerPeer = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .validator,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: MockPresentStreamHandler(),
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)
// Create 30 peer nodes
for _ in 0 ..< 30 {
let handler = MockPresentStreamHandler()
handlers.append(handler)
let peer = try Peer(
options: PeerOptions<MockStreamHandler>(
role: .builder,
listenAddress: NetAddr(ipAddress: "127.0.0.1", port: 0)!,
genesisHeader: Data32(),
secretKey: Ed25519.SecretKey(from: Data32.random()),
presistentStreamHandler: handler,
ephemeralStreamHandler: MockEphemeralStreamHandler(),
serverSettings: .defaultSettings,
clientSettings: .defaultSettings
)
)
peers.append(peer)
}

// Make some connections
for i in 0 ..< 30 {
let peer = peers[i]
let con = try peer.connect(to: centerPeer.listenAddress(), role: .builder)
try await con.ready()
}
// Simulate close connections 3~5s
try? await Task.sleep(for: .milliseconds(5000))
centerPeer.broadcast(kind: .uniqueA, message: .init(kind: .uniqueA, data: Data("connection rotation strategy".utf8)))
try? await Task.sleep(for: .milliseconds(1000))
var receivedCount = 0
for handler in handlers {
receivedCount += await handler.receivedData.count
}
#expect(receivedCount == PeerSettings.defaultSettings.maxBuilderConnections)
}

@Test
func mockHandshakeFailure() async throws {
let mockPeerTest = try MockPeerEventTests()
Expand Down