Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions Networking/Sources/MsQuicSwift/QuicStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ private struct Storage {
let connection: QuicConnection
}

public enum SendError: Error {
case emptyData
}

public final class QuicStream: Sendable {
public let id: UniqueId
private let logger: Logger
Expand Down Expand Up @@ -97,18 +101,21 @@ public final class QuicStream: Sendable {
throw QuicError.alreadyClosed
}

// TODO: improve the case when data is empty
let messageLength = data.count

if messageLength == 0 {
logger.trace("No data to send.")
throw SendError.emptyData // Throw a specific error or return
}

let sendBufferRaw = UnsafeMutableRawPointer.allocate( // !! allocate
byteCount: MemoryLayout<QUIC_BUFFER>.size + messageLength,
alignment: MemoryLayout<QUIC_BUFFER>.alignment
)

let sendBuffer = sendBufferRaw.assumingMemoryBound(to: QUIC_BUFFER.self)
let bufferPointer = sendBufferRaw.advanced(by: MemoryLayout<QUIC_BUFFER>.size).assumingMemoryBound(to: UInt8.self)
data.copyBytes(to: bufferPointer, count: messageLength) // TODO: figure out a better way to avoid memory copy here

data.withUnsafeBytes { bufferPointer.update(from: $0.baseAddress!.assumingMemoryBound(to: UInt8.self), count: messageLength) }
sendBuffer.pointee.Buffer = bufferPointer
sendBuffer.pointee.Length = UInt32(messageLength)

Expand Down
12 changes: 6 additions & 6 deletions Networking/Sources/Networking/Connection.swift
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import AsyncChannels
import Foundation
import MsQuicSwift
import Synchronization
import TracingUtils
import Utils

Expand Down Expand Up @@ -36,8 +37,7 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP

public let role: PeerRole
public let remoteAddress: NetAddr
private let lastActive: ThreadSafeContainer<TimeInterval> = .init(0)

private let lastActive: Atomic<TimeInterval> = Atomic(0)
let presistentStreams: ThreadSafeContainer<
[Handler.PresistentHandler.StreamKind: Stream<Handler>]
> = .init([:])
Expand All @@ -59,8 +59,8 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
}
}

public var lastActiveTimeStamp: TimeInterval {
lastActive.read { $0 }
func getLastActive() -> TimeInterval {
lastActive.load(ordering: .sequentiallyConsistent)
}

public var id: UniqueId {
Expand All @@ -73,11 +73,11 @@ public final class Connection<Handler: StreamHandler>: Sendable, ConnectionInfoP
self.role = role
self.remoteAddress = remoteAddress
self.initiatedByLocal = initiatedByLocal
lastActive.write { $0 = Date().timeIntervalSince1970 }
updateLastActive()
}

func updateLastActive() {
lastActive.write { $0 = Date().timeIntervalSince1970 }
lastActive.store(Date().timeIntervalSince1970, ordering: .releasing)
}

func opened(publicKey: Data) throws {
Expand Down
4 changes: 2 additions & 2 deletions Networking/Sources/Networking/Peer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public struct PeerOptions<Handler: StreamHandler>: Sendable {
}
}

// TODO: reopen UP stream, peer reputation system to ban peers not following the protocol
// TODO: peer reputation system to ban peers not following the protocol
public final class Peer<Handler: StreamHandler>: Sendable {
private let impl: PeerImpl<Handler>

Expand Down Expand Up @@ -272,7 +272,7 @@ final class PeerImpl<Handler: StreamHandler>: Sendable {
let currentCount = connections.byAddr.values.filter { $0.role == role }.count
if currentCount >= self.settings.maxBuilderConnections {
if let conn = connections.byAddr.values.filter({ $0.role == .builder })
.sorted(by: { $0.lastActiveTimeStamp < $1.lastActiveTimeStamp }).first
.sorted(by: { $0.getLastActive() < $1.getLastActive() }).first
{
self.logger.warning("Replacing least active builder connection at \(conn.remoteAddress)")
conn.close(abort: false)
Expand Down