Skip to content
Closed
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
096fc62
implement onNeighborListChanged event
ptesavol Apr 21, 2025
56de86b
eslint
ptesavol Apr 22, 2025
e4bded7
refactor to a more straightforward implementation
ptesavol Apr 22, 2025
f848537
add user-level api for the neighborListUpdated event
ptesavol Apr 22, 2025
e24664f
release: v103.0.0-rc.4
ptesavol Apr 22, 2025
361dc46
release: v103.0.0-rc.4
ptesavol Apr 22, 2025
38017ec
release: v103.0.0-rc.5
ptesavol Apr 22, 2025
03c5272
disable BrowserWebRTCClient additional buffering and disable removin…
ptesavol Apr 22, 2025
5f1b4bc
release: v103.0.0-rc.6
ptesavol Apr 22, 2025
b12be76
release: v103.0.0-rc.7
ptesavol Apr 22, 2025
c65488a
release: v103.0.0-rc.8
ptesavol Apr 22, 2025
1552d48
buffredAmout change event should now propagate to the user level
ptesavol Apr 23, 2025
00219e5
release: v103.0.0-rc.9
ptesavol Apr 23, 2025
022e300
emit statistics object instead of the mere bufferedAmount
ptesavol Apr 23, 2025
eda0e08
add calculating uploadrate from peerConnection getStats()
ptesavol Apr 23, 2025
96768cd
release: v103.0.0-rc.10
ptesavol Apr 23, 2025
1aa6409
eslint
ptesavol Apr 25, 2025
c24d69f
re-enable removing of neighbors who take too long to respond
ptesavol Apr 30, 2025
011296b
add any casting instead of useless parameters
juslesan Apr 30, 2025
769ba54
review changes
ptesavol May 2, 2025
ff16a4d
Merge branch 'main' into NET-1450-report-buffereddAmount-of-webrtc-da…
juslesan May 2, 2025
5ce3f24
add download rate metering
ptesavol May 13, 2025
293e3fe
merged with main
ptesavol May 13, 2025
19d1bc1
fix potential memory leak when removing event listener
ptesavol May 13, 2025
2d52b1e
Merge branch 'main' into NET-1450-report-buffereddAmount-of-webrtc-da…
juslesan May 16, 2025
636d79c
release: v103.0.0-rc.17
juslesan May 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/dht/src/connection/ConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import { DhtAddress, areEqualPeerDescriptors, toNodeId } from '../identifiers'
import { getOfferer } from '../helpers/offering'
import { ConnectionsView } from './ConnectionsView'
import { OutputBuffer } from './OutputBuffer'
import { IConnection } from './IConnection'
import { ConnectionStatistics, IConnection } from './IConnection'
import { PendingConnection } from './PendingConnection'

export interface ConnectionManagerOptions {
Expand Down Expand Up @@ -399,7 +399,7 @@ export class ConnectionManager extends EventEmitter<TransportEvents> implements
const managedConnection = new ManagedConnection(peerDescriptor, connection)
managedConnection.on('managedData', this.onData)
managedConnection.once('disconnected', (gracefulLeave: boolean) => this.onDisconnected(peerDescriptor, gracefulLeave))

managedConnection.on('statisticsUpdated', (statistics: ConnectionStatistics) => this.emit('statisticsUpdated', peerDescriptor, statistics))
const nodeId = toNodeId(peerDescriptor)
const endpoint = this.endpoints.get(nodeId)! as ConnectingEndpoint
const outputBuffer = endpoint.buffer
Expand Down
14 changes: 12 additions & 2 deletions packages/dht/src/connection/IConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export interface ConnectionEvents {
connected: () => void
disconnected: (gracefulLeave: boolean, code?: number, reason?: string) => void
error: (name: string) => void
statisticsUpdated: (statistics: ConnectionStatistics) => void
}

export enum ConnectionType {
Expand All @@ -17,23 +18,32 @@ export enum ConnectionType {

export type ConnectionID = BrandedString<'ConnectionID'>

export interface ConnectionStatistics {
uploadRateBytesPerSecond: number
downloadRateBytesPerSecond: number
bufferedAmount: number
}

export interface IConnection {

on(event: 'data', listener: (bytes: Uint8Array) => void): this
on(event: 'error', listener: (name: string) => void): this
on(event: 'connected', listener: () => void): this
on(event: 'disconnected', listener: (gracefulLeave: boolean, code?: number, reason?: string) => void): this

on(event: 'statisticsUpdated', listener: (statistics: ConnectionStatistics) => void): this

once(event: 'data', listener: (bytes: Uint8Array) => void): this
once(event: 'error', listener: (name: string) => void): this
once(event: 'connected', listener: () => void): this
once(event: 'disconnected', listener: (gracefulLeave: boolean, code?: number, reason?: string) => void): this
once(event: 'statisticsUpdated', listener: (statistics: ConnectionStatistics) => void): this

off(event: 'data', listener: (bytes: Uint8Array) => void): void
off(event: 'error', listener: (name: string) => void): void
off(event: 'connected', listener: () => void): void
off(event: 'disconnected', listener: (gracefulLeave: boolean, code?: number, reason?: string) => void): void

off(event: 'statisticsUpdated', listener: (statistics: ConnectionStatistics) => void): void

send(data: Uint8Array): void
close(gracefulLeave: boolean): Promise<void>
destroy(): void
Expand Down
5 changes: 3 additions & 2 deletions packages/dht/src/connection/ManagedConnection.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ConnectionID, IConnection } from './IConnection'
import { ConnectionID, ConnectionStatistics, IConnection } from './IConnection'
import * as Err from '../helpers/errors'
import { PeerDescriptor } from '../../generated/packages/dht/protos/DhtRpc'
import { Logger } from '@streamr/utils'
Expand All @@ -10,6 +10,7 @@ import { createRandomConnectionId } from './Connection'
export interface ManagedConnectionEvents {
managedData: (bytes: Uint8Array, remotePeerDescriptor: PeerDescriptor) => void
disconnected: (gracefulLeave: boolean) => void
statisticsUpdated: (statistics: ConnectionStatistics) => void
}

const logger = new Logger(module)
Expand Down Expand Up @@ -44,7 +45,7 @@ export class ManagedConnection extends EventEmitter<ManagedConnectionEvents> {
this.emit('managedData', bytes, this.getPeerDescriptor()!)
})
connection.on('disconnected', (gracefulLeave) => this.onDisconnected(gracefulLeave))

connection.on('statisticsUpdated', (statistics: ConnectionStatistics) => this.emit('statisticsUpdated', statistics))
this.lastUsedTimestamp = Date.now()
this.remotePeerDescriptor = peerDescriptor
}
Expand Down
114 changes: 99 additions & 15 deletions packages/dht/src/connection/webrtc/BrowserWebrtcConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import EventEmitter from 'eventemitter3'
import { WebrtcConnectionEvents, IWebrtcConnection, RtcDescription } from './IWebrtcConnection'
import { IConnection, ConnectionID, ConnectionEvents, ConnectionType } from '../IConnection'
import { IConnection, ConnectionID, ConnectionEvents, ConnectionType, ConnectionStatistics } from '../IConnection'
import { Logger } from '@streamr/utils'
import { EARLY_TIMEOUT, IceServer } from './WebrtcConnector'
import { createRandomConnectionId } from '../Connection'
Expand Down Expand Up @@ -30,14 +30,23 @@ export class NodeWebrtcConnection extends EventEmitter<Events> implements IWebrt
private lastState: RTCPeerConnectionState = 'connecting'
private readonly iceServers: IceServer[]
private peerConnection?: RTCPeerConnection
private readonly bufferThresholdHigh = 2 ** 17
private readonly bufferThresholdLow = 2 ** 15
private dataChannel?: RTCDataChannel
private makingOffer = false
private isOffering = false
private closed = false
private earlyTimeout: NodeJS.Timeout
private readonly messageQueue: Uint8Array[] = []
private currentBufferedAmount = 0
private statistics: ConnectionStatistics = {
uploadRateBytesPerSecond: 0,
downloadRateBytesPerSecond: 0,
bufferedAmount: 0
}
private lastBytesSent = 0
private lastBytesReceived = 0
private lastStatsCollectionTime = 0
private statsInterval?: NodeJS.Timeout
private readonly statsUpdateInterval = 200 // ms, adjust as needed for X times per second

constructor(params: Params) {
super()
Expand Down Expand Up @@ -144,6 +153,12 @@ export class NodeWebrtcConnection extends EventEmitter<Events> implements IWebrt
this.closed = true
this.lastState = 'closed'
clearTimeout(this.earlyTimeout)

// Clear the stats collection interval
if (this.statsInterval) {
clearInterval(this.statsInterval)
this.statsInterval = undefined
}

this.stopListening()
this.emit('disconnected', gracefulLeave, undefined, reason)
Expand Down Expand Up @@ -179,10 +194,16 @@ export class NodeWebrtcConnection extends EventEmitter<Events> implements IWebrt

public send(data: Uint8Array): void {
if (this.lastState === 'connected') {
if (this.dataChannel!.bufferedAmount > this.bufferThresholdHigh) {
this.messageQueue.push(data)
} else {
this.dataChannel?.send(data as Buffer)

this.dataChannel?.send(data as Buffer)

let bufferedAmountRemainder = this.dataChannel!.bufferedAmount - data.length
bufferedAmountRemainder = Math.max(0, bufferedAmountRemainder)
if (bufferedAmountRemainder !== this.currentBufferedAmount) {
this.currentBufferedAmount = bufferedAmountRemainder
// Update bufferedAmount in statistics but keep the upload rate the same
this.statistics.bufferedAmount = this.currentBufferedAmount
this.emit('statisticsUpdated', this.statistics)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this event end up being called quite often? Could there be some performance related if the bufferedAmount changes on each send

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The buffered amount should stay at 0 in normal operation, and be non-zero only if you are sending too fast. Maybe we should test this out with streamrtv by making a rc release of this branch?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I suppose it would make sense to do an internal release of these changes

}
} else {
logger.warn('Tried to send on a connection with last state ' + this.lastState)
Expand All @@ -192,7 +213,6 @@ export class NodeWebrtcConnection extends EventEmitter<Events> implements IWebrt
private setupDataChannel(dataChannel: RTCDataChannel): void {
this.dataChannel = dataChannel
this.dataChannel.binaryType = 'arraybuffer'
this.dataChannel.bufferedAmountLowThreshold = this.bufferThresholdLow
dataChannel.onopen = () => {
logger.trace('dc.onOpen')
this.onDataChannelOpen()
Expand All @@ -211,13 +231,6 @@ export class NodeWebrtcConnection extends EventEmitter<Events> implements IWebrt
logger.trace('dc.onmessage')
this.emit('data', new Uint8Array(msg.data))
}
dataChannel.onbufferedamountlow = () => {
logger.trace('dc.onBufferedAmountLow')
while (this.messageQueue.length > 0 && this.dataChannel!.bufferedAmount < this.bufferThresholdHigh) {
const data = this.messageQueue.shift()!
this.dataChannel!.send(data as Buffer)
}
}
}

private stopListening() {
Expand All @@ -241,13 +254,84 @@ export class NodeWebrtcConnection extends EventEmitter<Events> implements IWebrt
private onDataChannelOpen(): void {
this.lastState = 'connected'
this.emit('connected')
this.startStatsCollection()
}

private startStatsCollection(): void {
// Clear any existing interval
if (this.statsInterval) {
clearInterval(this.statsInterval)
}

// Initialize the last collection time
this.lastStatsCollectionTime = Date.now()

// Start collecting stats periodically
this.statsInterval = setInterval(() => {
this.collectAndEmitStats()
}, this.statsUpdateInterval)
}

private async collectAndEmitStats(): Promise<void> {
if (!this.peerConnection || this.lastState !== 'connected') {
return
}

try {
const currentTime = Date.now()
const elapsedTimeSeconds = (currentTime - this.lastStatsCollectionTime) / 1000

// Avoid division by zero or very small time intervals
if (elapsedTimeSeconds < 0.1) {
return
}

const stats = await this.peerConnection.getStats()
let currentBytesSent = 0
let currentBytesReceived = 0
stats.forEach((report) => {
if (report.type === 'transport') {
currentBytesSent = report.bytesSent ?? 0
currentBytesReceived = report.bytesReceived ?? 0
}
})

// Calculate upload and download rates based on actual elapsed time
const bytesSentDelta = Math.max(0, currentBytesSent - this.lastBytesSent)
const uploadRateBytesPerSecond = Math.round(bytesSentDelta / elapsedTimeSeconds)

const bytesReceivedDelta = Math.max(0, currentBytesReceived - this.lastBytesReceived)
const downloadRateBytesPerSecond = Math.round(bytesReceivedDelta / elapsedTimeSeconds)

this.lastBytesSent = currentBytesSent
this.lastBytesReceived = currentBytesReceived

this.lastStatsCollectionTime = currentTime

// Update statistics and emit event
this.statistics = {
uploadRateBytesPerSecond,
downloadRateBytesPerSecond,
bufferedAmount: this.currentBufferedAmount
}

this.emit('statisticsUpdated', this.statistics)

} catch (err) {
logger.warn('Failed to collect WebRTC stats', { err })
}
}

private onStateChange(): void {
if (this.peerConnection!.connectionState === DisconnectedRtcPeerConnectionStateEnum.CLOSED
|| this.peerConnection!.connectionState === DisconnectedRtcPeerConnectionStateEnum.DISCONNECTED
|| this.peerConnection!.connectionState === DisconnectedRtcPeerConnectionStateEnum.FAILED
) {
// Clear stats interval when connection is closed
if (this.statsInterval) {
clearInterval(this.statsInterval)
this.statsInterval = undefined
}
this.doClose(false)
}
}
Expand Down
1 change: 1 addition & 0 deletions packages/dht/src/exports.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export { ManagedConnection } from './connection/ManagedConnection'
export { PendingConnection } from './connection/PendingConnection'
export type { IConnection } from './connection/IConnection'
export { ConnectionType } from './connection/IConnection'
export type { ConnectionStatistics } from './connection/IConnection'
export type { ServiceID } from './types/ServiceID'
export type { RingContacts } from './dht/contact/RingContactList'
export { createOutgoingHandshaker } from './connection/Handshaker'
Expand Down
5 changes: 5 additions & 0 deletions packages/dht/src/transport/ITransport.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import { Message, PeerDescriptor } from '../../generated/packages/dht/protos/DhtRpc'
import { ConnectionStatistics } from '../connection/IConnection'

export interface TransportEvents {
disconnected: (peerDescriptor: PeerDescriptor, gracefulLeave: boolean) => void
message: (message: Message) => void
connected: (peerDescriptor: PeerDescriptor) => void
statisticsUpdated: (peerDescriptor: PeerDescriptor, statistics: ConnectionStatistics) => void
}

export interface SendOptions {
Expand All @@ -22,15 +24,18 @@ export interface ITransport {
on<T extends keyof TransportEvents>(eventName: T, listener: (message: Message) => void): void
on<T extends keyof TransportEvents>(eventName: T, listener: (peerDescriptor: PeerDescriptor) => void): void
on<T extends keyof TransportEvents>(eventName: T, listener: (peerDescriptor: PeerDescriptor, gracefulLeave: boolean) => void): void
on<T extends keyof TransportEvents>(eventName: T, listener: (peerDescriptor: PeerDescriptor, statistics: ConnectionStatistics) => void): void

once<T extends keyof TransportEvents>(eventName: T, listener: (message: Message) => void): void
once<T extends keyof TransportEvents>(eventName: T, listener: (peerDescriptor: PeerDescriptor) => void): void
once<T extends keyof TransportEvents>(eventName: T, listener: (peerDescriptor: PeerDescriptor,
gracefulLeave: boolean) => void): void
once<T extends keyof TransportEvents>(eventName: T, listener: (peerDescriptor: PeerDescriptor, statistics: ConnectionStatistics) => void): void

off<T extends keyof TransportEvents>(eventName: T, listener: (message: Message) => void): void
off<T extends keyof TransportEvents>(eventName: T, listener: (peerDescriptor: PeerDescriptor) => void): void
off<T extends keyof TransportEvents>(eventName: T, listener: (peerDescriptor: PeerDescriptor, gracefulLeave: boolean) => void): void
off<T extends keyof TransportEvents>(eventName: T, listener: (peerDescriptor: PeerDescriptor, statistics: ConnectionStatistics) => void): void

send(msg: Message, opts?: SendOptions): Promise<void>
getLocalPeerDescriptor(): PeerDescriptor
Expand Down
5 changes: 5 additions & 0 deletions packages/trackerless-network/src/ContentDeliveryManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import {
DEFAULT_MIN_PROPAGATION_TARGETS,
DEFAULT_PROPAGATION_BUFFER_TTL
} from './content-delivery-layer/propagation/Propagation'
import { ContentDeliveryRpcRemote } from './content-delivery-layer/ContentDeliveryRpcRemote'

export type StreamPartDelivery = {
broadcast: (msg: StreamMessage) => void
Expand All @@ -54,6 +55,7 @@ export type StreamPartDelivery = {

export interface Events {
newMessage: (msg: StreamMessage) => void
neighborListUpdated: (streamPartId: StreamPartID, neighbors: ContentDeliveryRpcRemote[]) => void
}

const logger = new Logger(module)
Expand Down Expand Up @@ -194,6 +196,9 @@ export class ContentDeliveryManager extends EventEmitter<Events> {
node.on('message', (message: StreamMessage) => {
this.emit('newMessage', message)
})
node.on('neighborListUpdated', (neighbors: ContentDeliveryRpcRemote[]) => {
this.emit('neighborListUpdated', streamPartId, neighbors)
})
const handleEntryPointLeave = async () => {
if (this.destroyed || peerDescriptorStoreManager.isLocalNodeStored() || this.knownStreamPartEntryPoints.has(streamPartId)) {
return
Expand Down
9 changes: 9 additions & 0 deletions packages/trackerless-network/src/NetworkNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { ExternalNetworkRpc, ExternalRpcClient, ExternalRpcClientClass } from '.
import { NetworkOptions, NetworkStack } from './NetworkStack'
import { ProxyDirection, StreamMessage } from '../generated/packages/trackerless-network/protos/NetworkRpc'
import { NodeInfo } from './types'
import { ContentDeliveryRpcRemote } from './content-delivery-layer/ContentDeliveryRpcRemote'
import { StreamPartDeliveryOptions } from './ContentDeliveryManager'

export const createNetworkNode = (opts: NetworkOptions): NetworkNode => {
Expand Down Expand Up @@ -66,6 +67,10 @@ export class NetworkNode {
this.stack.getContentDeliveryManager().on('newMessage', listener)
}

addNeighborListUpdatedListener(listener: (streamPartId: StreamPartID, neighbors: ContentDeliveryRpcRemote[]) => void): void {
this.stack.getContentDeliveryManager().on('neighborListUpdated', listener)
}

setStreamPartEntryPoints(streamPartId: StreamPartID, contactPeerDescriptors: PeerDescriptor[]): void {
this.stack.getContentDeliveryManager().setStreamPartEntryPoints(streamPartId, contactPeerDescriptors)
}
Expand All @@ -74,6 +79,10 @@ export class NetworkNode {
this.stack.getContentDeliveryManager().off('newMessage', listener)
}

removeNeighborListUpdatedListener(listener: (streamPartId: StreamPartID, neighbors: ContentDeliveryRpcRemote[]) => void): void {
this.stack.getContentDeliveryManager().off('neighborListUpdated', listener)
}

async leave(streamPartId: StreamPartID): Promise<void> {
if (this.stopped) {
return
Expand Down
Loading