diff --git a/package-lock.json b/package-lock.json index 985212baa8..c2a4763823 100644 --- a/package-lock.json +++ b/package-lock.json @@ -27683,11 +27683,11 @@ }, "packages/autocertifier-client": { "name": "@streamr/autocertifier-client", - "version": "103.0.0-rc.16", + "version": "103.0.0-rc.17", "license": "STREAMR NETWORK OPEN SOURCE LICENSE", "dependencies": { "@protobuf-ts/runtime-rpc": "^2.8.2", - "@streamr/utils": "103.0.0-rc.16", + "@streamr/utils": "103.0.0-rc.17", "eventemitter3": "^5.0.0", "node-forge": "^1.3.1" }, @@ -27697,14 +27697,14 @@ }, "packages/autocertifier-server": { "name": "@streamr/autocertifier-server", - "version": "103.0.0-rc.16", + "version": "103.0.0-rc.17", "license": "STREAMR NETWORK OPEN SOURCE LICENSE", "dependencies": { "@aws-sdk/client-route-53": "^3.806.0", - "@streamr/autocertifier-client": "103.0.0-rc.16", - "@streamr/dht": "103.0.0-rc.16", - "@streamr/proto-rpc": "103.0.0-rc.16", - "@streamr/utils": "103.0.0-rc.16", + "@streamr/autocertifier-client": "103.0.0-rc.17", + "@streamr/dht": "103.0.0-rc.17", + "@streamr/proto-rpc": "103.0.0-rc.17", + "@streamr/utils": "103.0.0-rc.17", "acme-client": "^5.4.0", "body-parser": "^2.2.0", "dns2": "^2.1.0", @@ -27750,10 +27750,10 @@ }, "packages/cdn-location": { "name": "@streamr/cdn-location", - "version": "103.0.0-rc.16", + "version": "103.0.0-rc.17", "license": "Apache-2.0", "dependencies": { - "@streamr/utils": "103.0.0-rc.16", + "@streamr/utils": "103.0.0-rc.17", "haversine": "^1.1.1" }, "devDependencies": { @@ -27762,15 +27762,15 @@ }, "packages/cli-tools": { "name": "@streamr/cli-tools", - "version": "103.0.0-rc.16", + "version": "103.0.0-rc.17", "license": "AGPL-3.0", "dependencies": { "@streamr/config": "^5.5.6", - "@streamr/dht": "103.0.0-rc.16", + "@streamr/dht": "103.0.0-rc.17", "@streamr/network-contracts": "^9.1.0", - "@streamr/sdk": "103.0.0-rc.16", - "@streamr/trackerless-network": "103.0.0-rc.16", - "@streamr/utils": "103.0.0-rc.16", + "@streamr/sdk": "103.0.0-rc.17", + "@streamr/trackerless-network": "103.0.0-rc.17", + "@streamr/utils": "103.0.0-rc.17", "commander": "^13.1.0", "easy-table": "^1.1.1", "ethers": "^6.13.0", @@ -27782,7 +27782,7 @@ "streamr": "dist/bin/streamr.js" }, "devDependencies": { - "@streamr/test-utils": "103.0.0-rc.16", + "@streamr/test-utils": "103.0.0-rc.17", "@types/event-stream": "^4.0.5", "@types/lodash": "^4.17.16", "@types/merge2": "^1.4.4", @@ -27792,17 +27792,17 @@ }, "packages/dht": { "name": "@streamr/dht", - "version": "103.0.0-rc.16", + "version": "103.0.0-rc.17", "license": "STREAMR NETWORK OPEN SOURCE LICENSE", "dependencies": { "@js-sdsl/ordered-map": "^4.4.2", "@protobuf-ts/runtime": "^2.8.2", "@protobuf-ts/runtime-rpc": "^2.8.2", - "@streamr/autocertifier-client": "103.0.0-rc.16", - "@streamr/cdn-location": "103.0.0-rc.16", - "@streamr/geoip-location": "103.0.0-rc.16", - "@streamr/proto-rpc": "103.0.0-rc.16", - "@streamr/utils": "103.0.0-rc.16", + "@streamr/autocertifier-client": "103.0.0-rc.17", + "@streamr/cdn-location": "103.0.0-rc.17", + "@streamr/geoip-location": "103.0.0-rc.17", + "@streamr/proto-rpc": "103.0.0-rc.17", + "@streamr/utils": "103.0.0-rc.17", "eventemitter3": "^5.0.0", "heap": "^0.2.6", "ipaddr.js": "^2.0.1", @@ -27816,7 +27816,7 @@ }, "devDependencies": { "@streamr/browser-test-runner": "^0.0.1", - "@streamr/test-utils": "103.0.0-rc.16", + "@streamr/test-utils": "103.0.0-rc.17", "@types/heap": "^0.2.34", "@types/k-bucket": "^5.0.1", "@types/lodash": "^4.17.16", @@ -27843,10 +27843,10 @@ }, "packages/geoip-location": { "name": "@streamr/geoip-location", - "version": "103.0.0-rc.16", + "version": "103.0.0-rc.17", "license": "Apache-2.0", "dependencies": { - "@streamr/utils": "103.0.0-rc.16", + "@streamr/utils": "103.0.0-rc.17", "eventemitter3": "^5.0.0", "long-timeout": "^0.1.1", "mmdb-lib": "^3.0.0", @@ -27968,14 +27968,14 @@ }, "packages/node": { "name": "@streamr/node", - "version": "103.0.0-rc.16", + "version": "103.0.0-rc.17", "license": "STREAMR NETWORK OPEN SOURCE LICENSE", "dependencies": { "@inquirer/prompts": "^7.5.1", "@streamr/config": "^5.5.6", - "@streamr/dht": "103.0.0-rc.16", - "@streamr/sdk": "103.0.0-rc.16", - "@streamr/utils": "103.0.0-rc.16", + "@streamr/dht": "103.0.0-rc.17", + "@streamr/sdk": "103.0.0-rc.17", + "@streamr/utils": "103.0.0-rc.17", "aedes": "^0.51.3", "ajv": "^8.17.1", "ajv-formats": "^3.0.1", @@ -28005,7 +28005,7 @@ "devDependencies": { "@inquirer/testing": "^2.1.46", "@streamr/network-contracts": "^9.1.0", - "@streamr/test-utils": "103.0.0-rc.16", + "@streamr/test-utils": "103.0.0-rc.17", "@types/cors": "^2.8.18", "@types/express": "^5.0.1", "@types/heap": "^0.2.34", @@ -28037,19 +28037,19 @@ }, "packages/proto-rpc": { "name": "@streamr/proto-rpc", - "version": "103.0.0-rc.16", + "version": "103.0.0-rc.17", "license": "(Apache-2.0 AND BSD-3-Clause)", "dependencies": { "@protobuf-ts/runtime": "^2.8.2", "@protobuf-ts/runtime-rpc": "^2.8.2", - "@streamr/utils": "103.0.0-rc.16", + "@streamr/utils": "103.0.0-rc.17", "eventemitter3": "^5.0.0", "lodash": "^4.17.21", "uuid": "^11.1.0" }, "devDependencies": { "@streamr/browser-test-runner": "^0.0.1", - "@streamr/test-utils": "103.0.0-rc.16", + "@streamr/test-utils": "103.0.0-rc.17", "@types/lodash": "^4.17.16" }, "optionalDependencies": { @@ -28059,7 +28059,7 @@ }, "packages/sdk": { "name": "@streamr/sdk", - "version": "103.0.0-rc.16", + "version": "103.0.0-rc.17", "license": "Apache-2.0", "dependencies": { "@babel/runtime": "^7.27.1", @@ -28068,11 +28068,11 @@ "@protobuf-ts/runtime": "^2.8.2", "@protobuf-ts/runtime-rpc": "^2.8.2", "@streamr/config": "^5.5.6", - "@streamr/dht": "103.0.0-rc.16", + "@streamr/dht": "103.0.0-rc.17", "@streamr/network-contracts": "^9.1.0", - "@streamr/proto-rpc": "103.0.0-rc.16", - "@streamr/trackerless-network": "103.0.0-rc.16", - "@streamr/utils": "103.0.0-rc.16", + "@streamr/proto-rpc": "103.0.0-rc.17", + "@streamr/trackerless-network": "103.0.0-rc.17", + "@streamr/utils": "103.0.0-rc.17", "core-js": "^3.42.0", "env-paths": "^2.2.1", "ethers": "^6.13.0", @@ -28100,7 +28100,7 @@ "@babel/preset-env": "^7.27.2", "@babel/preset-typescript": "^7.27.1", "@jest/globals": "^29.7.0", - "@streamr/test-utils": "103.0.0-rc.16", + "@streamr/test-utils": "103.0.0-rc.17", "@types/heap": "^0.2.34", "@types/lodash": "^4.17.16", "ajv": "^8.17.1", @@ -28152,12 +28152,12 @@ }, "packages/test-utils": { "name": "@streamr/test-utils", - "version": "103.0.0-rc.16", + "version": "103.0.0-rc.17", "license": "Apache-2.0", "dependencies": { "@streamr/config": "^5.5.6", "@streamr/network-contracts": "^9.1.0", - "@streamr/utils": "103.0.0-rc.16", + "@streamr/utils": "103.0.0-rc.17", "cors": "^2.8.5", "ethers": "^6.13.0", "express": "^5.1.0", @@ -28172,14 +28172,14 @@ }, "packages/trackerless-network": { "name": "@streamr/trackerless-network", - "version": "103.0.0-rc.16", + "version": "103.0.0-rc.17", "license": "STREAMR NETWORK OPEN SOURCE LICENSE", "dependencies": { "@protobuf-ts/runtime": "^2.8.2", "@protobuf-ts/runtime-rpc": "^2.8.2", - "@streamr/dht": "103.0.0-rc.16", - "@streamr/proto-rpc": "103.0.0-rc.16", - "@streamr/utils": "103.0.0-rc.16", + "@streamr/dht": "103.0.0-rc.17", + "@streamr/proto-rpc": "103.0.0-rc.17", + "@streamr/utils": "103.0.0-rc.17", "eventemitter3": "^5.0.0", "lodash": "^4.17.21", "ts-essentials": "^10.0.4", @@ -28188,7 +28188,7 @@ }, "devDependencies": { "@streamr/browser-test-runner": "^0.0.1", - "@streamr/test-utils": "103.0.0-rc.16", + "@streamr/test-utils": "103.0.0-rc.17", "@types/lodash": "^4.17.16", "@types/yallist": "^4.0.1", "expect": "^29.6.2", @@ -28206,7 +28206,7 @@ }, "packages/utils": { "name": "@streamr/utils", - "version": "103.0.0-rc.16", + "version": "103.0.0-rc.17", "license": "Apache-2.0", "dependencies": { "@noble/curves": "^1.9.0", diff --git a/packages/autocertifier-client/package.json b/packages/autocertifier-client/package.json index 083f26e598..a59799db50 100644 --- a/packages/autocertifier-client/package.json +++ b/packages/autocertifier-client/package.json @@ -1,6 +1,6 @@ { "name": "@streamr/autocertifier-client", - "version": "103.0.0-rc.16", + "version": "103.0.0-rc.17", "description": "Autocertifier Client for Streamr Network", "repository": { "type": "git", @@ -26,7 +26,7 @@ }, "dependencies": { "@protobuf-ts/runtime-rpc": "^2.8.2", - "@streamr/utils": "103.0.0-rc.16", + "@streamr/utils": "103.0.0-rc.17", "eventemitter3": "^5.0.0", "node-forge": "^1.3.1" }, diff --git a/packages/autocertifier-server/package.json b/packages/autocertifier-server/package.json index 63e778abdb..37c4afb608 100644 --- a/packages/autocertifier-server/package.json +++ b/packages/autocertifier-server/package.json @@ -1,6 +1,6 @@ { "name": "@streamr/autocertifier-server", - "version": "103.0.0-rc.16", + "version": "103.0.0-rc.17", "description": "Server for providing TLS Certificates", "repository": { "type": "git", @@ -35,10 +35,10 @@ }, "dependencies": { "@aws-sdk/client-route-53": "^3.806.0", - "@streamr/autocertifier-client": "103.0.0-rc.16", - "@streamr/dht": "103.0.0-rc.16", - "@streamr/proto-rpc": "103.0.0-rc.16", - "@streamr/utils": "103.0.0-rc.16", + "@streamr/autocertifier-client": "103.0.0-rc.17", + "@streamr/dht": "103.0.0-rc.17", + "@streamr/proto-rpc": "103.0.0-rc.17", + "@streamr/utils": "103.0.0-rc.17", "acme-client": "^5.4.0", "body-parser": "^2.2.0", "dns2": "^2.1.0", diff --git a/packages/cdn-location/package.json b/packages/cdn-location/package.json index d5d97b4a62..1f8adcbc1a 100644 --- a/packages/cdn-location/package.json +++ b/packages/cdn-location/package.json @@ -1,6 +1,6 @@ { "name": "@streamr/cdn-location", - "version": "103.0.0-rc.16", + "version": "103.0.0-rc.17", "description": "Library for getting own approximate location by querying CDN servers", "repository": { "type": "git", @@ -28,7 +28,7 @@ "generate-data-from-tsp-solution": "./data-generation/generateDataFromTSPSolverResult.sh" }, "dependencies": { - "@streamr/utils": "103.0.0-rc.16", + "@streamr/utils": "103.0.0-rc.17", "haversine": "^1.1.1" }, "devDependencies": { diff --git a/packages/cli-tools/package.json b/packages/cli-tools/package.json index 287d65d080..4c4f96f898 100644 --- a/packages/cli-tools/package.json +++ b/packages/cli-tools/package.json @@ -1,6 +1,6 @@ { "name": "@streamr/cli-tools", - "version": "103.0.0-rc.16", + "version": "103.0.0-rc.17", "description": "Command line tools for Streamr", "repository": { "type": "git", @@ -33,11 +33,11 @@ "license": "AGPL-3.0", "dependencies": { "@streamr/config": "^5.5.6", - "@streamr/dht": "103.0.0-rc.16", + "@streamr/dht": "103.0.0-rc.17", "@streamr/network-contracts": "^9.1.0", - "@streamr/sdk": "103.0.0-rc.16", - "@streamr/trackerless-network": "103.0.0-rc.16", - "@streamr/utils": "103.0.0-rc.16", + "@streamr/sdk": "103.0.0-rc.17", + "@streamr/trackerless-network": "103.0.0-rc.17", + "@streamr/utils": "103.0.0-rc.17", "commander": "^13.1.0", "easy-table": "^1.1.1", "ethers": "^6.13.0", @@ -46,7 +46,7 @@ "semver": "^7.7.1" }, "devDependencies": { - "@streamr/test-utils": "103.0.0-rc.16", + "@streamr/test-utils": "103.0.0-rc.17", "@types/event-stream": "^4.0.5", "@types/lodash": "^4.17.16", "@types/merge2": "^1.4.4", diff --git a/packages/dht/package.json b/packages/dht/package.json index e6eb036392..82c3f8d84f 100644 --- a/packages/dht/package.json +++ b/packages/dht/package.json @@ -1,6 +1,6 @@ { "name": "@streamr/dht", - "version": "103.0.0-rc.16", + "version": "103.0.0-rc.17", "description": "Streamr Network DHT", "repository": { "type": "git", @@ -40,11 +40,11 @@ "@js-sdsl/ordered-map": "^4.4.2", "@protobuf-ts/runtime": "^2.8.2", "@protobuf-ts/runtime-rpc": "^2.8.2", - "@streamr/autocertifier-client": "103.0.0-rc.16", - "@streamr/cdn-location": "103.0.0-rc.16", - "@streamr/geoip-location": "103.0.0-rc.16", - "@streamr/proto-rpc": "103.0.0-rc.16", - "@streamr/utils": "103.0.0-rc.16", + "@streamr/autocertifier-client": "103.0.0-rc.17", + "@streamr/cdn-location": "103.0.0-rc.17", + "@streamr/geoip-location": "103.0.0-rc.17", + "@streamr/proto-rpc": "103.0.0-rc.17", + "@streamr/utils": "103.0.0-rc.17", "eventemitter3": "^5.0.0", "heap": "^0.2.6", "ipaddr.js": "^2.0.1", @@ -58,7 +58,7 @@ }, "devDependencies": { "@streamr/browser-test-runner": "^0.0.1", - "@streamr/test-utils": "103.0.0-rc.16", + "@streamr/test-utils": "103.0.0-rc.17", "@types/heap": "^0.2.34", "@types/k-bucket": "^5.0.1", "@types/lodash": "^4.17.16", diff --git a/packages/dht/src/connection/ConnectionManager.ts b/packages/dht/src/connection/ConnectionManager.ts index 2c1dbf68c8..b1d152fbb6 100644 --- a/packages/dht/src/connection/ConnectionManager.ts +++ b/packages/dht/src/connection/ConnectionManager.ts @@ -26,7 +26,7 @@ import { DhtAddress, areEqualPeerDescriptors, toNodeId } from '../identifiers' import { getOfferer } from '../helpers/offering' import { ConnectionsView } from './ConnectionsView' import { OutputBuffer } from './OutputBuffer' -import { IConnection } from './IConnection' +import { ConnectionStatistics, IConnection } from './IConnection' import { PendingConnection } from './PendingConnection' export interface ConnectionManagerOptions { @@ -399,7 +399,7 @@ export class ConnectionManager extends EventEmitter implements const managedConnection = new ManagedConnection(peerDescriptor, connection) managedConnection.on('managedData', this.onData) managedConnection.once('disconnected', (gracefulLeave: boolean) => this.onDisconnected(peerDescriptor, gracefulLeave)) - + managedConnection.on('statisticsUpdated', (statistics: ConnectionStatistics) => this.emit('statisticsUpdated', peerDescriptor, statistics)) const nodeId = toNodeId(peerDescriptor) const endpoint = this.endpoints.get(nodeId)! as ConnectingEndpoint const outputBuffer = endpoint.buffer diff --git a/packages/dht/src/connection/IConnection.ts b/packages/dht/src/connection/IConnection.ts index 9adad2213f..64b886c8f6 100644 --- a/packages/dht/src/connection/IConnection.ts +++ b/packages/dht/src/connection/IConnection.ts @@ -5,6 +5,7 @@ export interface ConnectionEvents { connected: () => void disconnected: (gracefulLeave: boolean, code?: number, reason?: string) => void error: (name: string) => void + statisticsUpdated: (statistics: ConnectionStatistics) => void } export enum ConnectionType { @@ -17,23 +18,32 @@ export enum ConnectionType { export type ConnectionID = BrandedString<'ConnectionID'> +export interface ConnectionStatistics { + uploadRateBytesPerSecond: number + downloadRateBytesPerSecond: number + bufferedAmount: number +} + export interface IConnection { on(event: 'data', listener: (bytes: Uint8Array) => void): this on(event: 'error', listener: (name: string) => void): this on(event: 'connected', listener: () => void): this on(event: 'disconnected', listener: (gracefulLeave: boolean, code?: number, reason?: string) => void): this - + on(event: 'statisticsUpdated', listener: (statistics: ConnectionStatistics) => void): this + once(event: 'data', listener: (bytes: Uint8Array) => void): this once(event: 'error', listener: (name: string) => void): this once(event: 'connected', listener: () => void): this once(event: 'disconnected', listener: (gracefulLeave: boolean, code?: number, reason?: string) => void): this + once(event: 'statisticsUpdated', listener: (statistics: ConnectionStatistics) => void): this off(event: 'data', listener: (bytes: Uint8Array) => void): void off(event: 'error', listener: (name: string) => void): void off(event: 'connected', listener: () => void): void off(event: 'disconnected', listener: (gracefulLeave: boolean, code?: number, reason?: string) => void): void - + off(event: 'statisticsUpdated', listener: (statistics: ConnectionStatistics) => void): void + send(data: Uint8Array): void close(gracefulLeave: boolean): Promise destroy(): void diff --git a/packages/dht/src/connection/ManagedConnection.ts b/packages/dht/src/connection/ManagedConnection.ts index fd65e27d55..fffc7a4ebd 100644 --- a/packages/dht/src/connection/ManagedConnection.ts +++ b/packages/dht/src/connection/ManagedConnection.ts @@ -1,4 +1,4 @@ -import { ConnectionID, IConnection } from './IConnection' +import { ConnectionID, ConnectionStatistics, IConnection } from './IConnection' import * as Err from '../helpers/errors' import { PeerDescriptor } from '../../generated/packages/dht/protos/DhtRpc' import { Logger } from '@streamr/utils' @@ -10,6 +10,7 @@ import { createRandomConnectionId } from './Connection' export interface ManagedConnectionEvents { managedData: (bytes: Uint8Array, remotePeerDescriptor: PeerDescriptor) => void disconnected: (gracefulLeave: boolean) => void + statisticsUpdated: (statistics: ConnectionStatistics) => void } const logger = new Logger(module) @@ -42,7 +43,7 @@ export class ManagedConnection extends EventEmitter { this.emit('managedData', bytes, this.getPeerDescriptor()!) }) connection.on('disconnected', (gracefulLeave) => this.onDisconnected(gracefulLeave)) - + connection.on('statisticsUpdated', (statistics: ConnectionStatistics) => this.emit('statisticsUpdated', statistics)) this.lastUsedTimestamp = Date.now() this.remotePeerDescriptor = peerDescriptor } diff --git a/packages/dht/src/connection/webrtc/BrowserWebrtcConnection.ts b/packages/dht/src/connection/webrtc/BrowserWebrtcConnection.ts index 65ccdd3124..f56eb6eea0 100644 --- a/packages/dht/src/connection/webrtc/BrowserWebrtcConnection.ts +++ b/packages/dht/src/connection/webrtc/BrowserWebrtcConnection.ts @@ -2,7 +2,7 @@ import EventEmitter from 'eventemitter3' import { WebrtcConnectionEvents, IWebrtcConnection, RtcDescription } from './IWebrtcConnection' -import { IConnection, ConnectionID, ConnectionType } from '../IConnection' +import { IConnection, ConnectionID, ConnectionType, ConnectionStatistics } from '../IConnection' import { Logger } from '@streamr/utils' import { EARLY_TIMEOUT, IceServer } from './WebrtcConnector' import { createRandomConnectionId } from '../Connection' @@ -28,14 +28,23 @@ export class NodeWebrtcConnection extends EventEmitter i private lastState: RTCPeerConnectionState = 'connecting' private readonly iceServers: IceServer[] private peerConnection?: RTCPeerConnection - private readonly bufferThresholdHigh = 2 ** 17 - private readonly bufferThresholdLow = 2 ** 15 private dataChannel?: RTCDataChannel private makingOffer = false private isOffering = false private closed = false private earlyTimeout: NodeJS.Timeout private readonly messageQueue: Uint8Array[] = [] + private currentBufferedAmount = 0 + private statistics: ConnectionStatistics = { + uploadRateBytesPerSecond: 0, + downloadRateBytesPerSecond: 0, + bufferedAmount: 0 + } + private lastBytesSent = 0 + private lastBytesReceived = 0 + private lastStatsCollectionTime = 0 + private statsInterval?: NodeJS.Timeout + private readonly statsUpdateInterval = 200 // ms, adjust as needed for X times per second constructor(params: Params) { super() @@ -142,6 +151,12 @@ export class NodeWebrtcConnection extends EventEmitter i this.closed = true this.lastState = 'closed' clearTimeout(this.earlyTimeout) + + // Clear the stats collection interval + if (this.statsInterval) { + clearInterval(this.statsInterval) + this.statsInterval = undefined + } this.stopListening() this.emit('disconnected', gracefulLeave, undefined, reason) @@ -177,10 +192,16 @@ export class NodeWebrtcConnection extends EventEmitter i public send(data: Uint8Array): void { if (this.lastState === 'connected') { - if (this.dataChannel!.bufferedAmount > this.bufferThresholdHigh) { - this.messageQueue.push(data) - } else { - this.dataChannel?.send(data as Buffer) + + this.dataChannel?.send(data as Buffer) + + let bufferedAmountRemainder = this.dataChannel!.bufferedAmount - data.length + bufferedAmountRemainder = Math.max(0, bufferedAmountRemainder) + if (bufferedAmountRemainder !== this.currentBufferedAmount) { + this.currentBufferedAmount = bufferedAmountRemainder + // Update bufferedAmount in statistics but keep the upload rate the same + this.statistics.bufferedAmount = this.currentBufferedAmount + this.emit('statisticsUpdated', this.statistics) } } else { logger.warn('Tried to send on a connection with last state ' + this.lastState) @@ -190,7 +211,6 @@ export class NodeWebrtcConnection extends EventEmitter i private setupDataChannel(dataChannel: RTCDataChannel): void { this.dataChannel = dataChannel this.dataChannel.binaryType = 'arraybuffer' - this.dataChannel.bufferedAmountLowThreshold = this.bufferThresholdLow dataChannel.onopen = () => { logger.trace('dc.onOpen') this.onDataChannelOpen() @@ -209,13 +229,6 @@ export class NodeWebrtcConnection extends EventEmitter i logger.trace('dc.onmessage') this.emit('data', new Uint8Array(msg.data)) } - dataChannel.onbufferedamountlow = () => { - logger.trace('dc.onBufferedAmountLow') - while (this.messageQueue.length > 0 && this.dataChannel!.bufferedAmount < this.bufferThresholdHigh) { - const data = this.messageQueue.shift()! - this.dataChannel!.send(data as Buffer) - } - } } private stopListening() { @@ -239,6 +252,72 @@ export class NodeWebrtcConnection extends EventEmitter i private onDataChannelOpen(): void { this.lastState = 'connected' this.emit('connected') + this.startStatsCollection() + } + + private startStatsCollection(): void { + // Clear any existing interval + if (this.statsInterval) { + clearInterval(this.statsInterval) + } + + // Initialize the last collection time + this.lastStatsCollectionTime = Date.now() + + // Start collecting stats periodically + this.statsInterval = setInterval(() => { + this.collectAndEmitStats() + }, this.statsUpdateInterval) + } + + private async collectAndEmitStats(): Promise { + if (!this.peerConnection || this.lastState !== 'connected') { + return + } + + try { + const currentTime = Date.now() + const elapsedTimeSeconds = (currentTime - this.lastStatsCollectionTime) / 1000 + + // Avoid division by zero or very small time intervals + if (elapsedTimeSeconds < 0.1) { + return + } + + const stats = await this.peerConnection.getStats() + let currentBytesSent = 0 + let currentBytesReceived = 0 + stats.forEach((report) => { + if (report.type === 'transport') { + currentBytesSent = report.bytesSent ?? 0 + currentBytesReceived = report.bytesReceived ?? 0 + } + }) + + // Calculate upload and download rates based on actual elapsed time + const bytesSentDelta = Math.max(0, currentBytesSent - this.lastBytesSent) + const uploadRateBytesPerSecond = Math.round(bytesSentDelta / elapsedTimeSeconds) + + const bytesReceivedDelta = Math.max(0, currentBytesReceived - this.lastBytesReceived) + const downloadRateBytesPerSecond = Math.round(bytesReceivedDelta / elapsedTimeSeconds) + + this.lastBytesSent = currentBytesSent + this.lastBytesReceived = currentBytesReceived + + this.lastStatsCollectionTime = currentTime + + // Update statistics and emit event + this.statistics = { + uploadRateBytesPerSecond, + downloadRateBytesPerSecond, + bufferedAmount: this.currentBufferedAmount + } + + this.emit('statisticsUpdated', this.statistics) + + } catch (err) { + logger.warn('Failed to collect WebRTC stats', { err }) + } } private onStateChange(): void { @@ -246,6 +325,11 @@ export class NodeWebrtcConnection extends EventEmitter i || this.peerConnection!.connectionState === DisconnectedRtcPeerConnectionStateEnum.DISCONNECTED || this.peerConnection!.connectionState === DisconnectedRtcPeerConnectionStateEnum.FAILED ) { + // Clear stats interval when connection is closed + if (this.statsInterval) { + clearInterval(this.statsInterval) + this.statsInterval = undefined + } this.doClose(false) } } diff --git a/packages/dht/src/exports.ts b/packages/dht/src/exports.ts index 6fe245fe1e..67bb382187 100644 --- a/packages/dht/src/exports.ts +++ b/packages/dht/src/exports.ts @@ -19,6 +19,7 @@ export { ManagedConnection } from './connection/ManagedConnection' export { PendingConnection } from './connection/PendingConnection' export type { IConnection } from './connection/IConnection' export { ConnectionType } from './connection/IConnection' +export type { ConnectionStatistics } from './connection/IConnection' export type { ServiceID } from './types/ServiceID' export type { RingContacts } from './dht/contact/RingContactList' export { createOutgoingHandshaker } from './connection/Handshaker' diff --git a/packages/dht/src/transport/ITransport.ts b/packages/dht/src/transport/ITransport.ts index cb98a71c25..1c13e4d81b 100644 --- a/packages/dht/src/transport/ITransport.ts +++ b/packages/dht/src/transport/ITransport.ts @@ -1,9 +1,11 @@ import { Message, PeerDescriptor } from '../../generated/packages/dht/protos/DhtRpc' +import { ConnectionStatistics } from '../connection/IConnection' export interface TransportEvents { connected: (peerDescriptor: PeerDescriptor) => void disconnected: (peerDescriptor: PeerDescriptor, gracefulLeave: boolean) => void message: (message: Message) => void + statisticsUpdated: (peerDescriptor: PeerDescriptor, statistics: ConnectionStatistics) => void } export interface SendOptions { @@ -22,15 +24,18 @@ export interface ITransport { on(eventName: T, listener: (message: Message) => void): void on(eventName: T, listener: (peerDescriptor: PeerDescriptor) => void): void on(eventName: T, listener: (peerDescriptor: PeerDescriptor, gracefulLeave: boolean) => void): void + on(eventName: T, listener: (peerDescriptor: PeerDescriptor, statistics: ConnectionStatistics) => void): void once(eventName: T, listener: (message: Message) => void): void once(eventName: T, listener: (peerDescriptor: PeerDescriptor) => void): void once(eventName: T, listener: (peerDescriptor: PeerDescriptor, gracefulLeave: boolean) => void): void + once(eventName: T, listener: (peerDescriptor: PeerDescriptor, statistics: ConnectionStatistics) => void): void off(eventName: T, listener: (message: Message) => void): void off(eventName: T, listener: (peerDescriptor: PeerDescriptor) => void): void off(eventName: T, listener: (peerDescriptor: PeerDescriptor, gracefulLeave: boolean) => void): void + off(eventName: T, listener: (peerDescriptor: PeerDescriptor, statistics: ConnectionStatistics) => void): void send(msg: Message, opts?: SendOptions): Promise getLocalPeerDescriptor(): PeerDescriptor diff --git a/packages/geoip-location/package.json b/packages/geoip-location/package.json index d76d0bfc8a..43bad4a6e3 100644 --- a/packages/geoip-location/package.json +++ b/packages/geoip-location/package.json @@ -1,6 +1,6 @@ { "name": "@streamr/geoip-location", - "version": "103.0.0-rc.16", + "version": "103.0.0-rc.17", "description": "Library for getting location information from IP addresses based on MaxMind GeoLite2 databases", "repository": { "type": "git", @@ -31,7 +31,7 @@ "test-unit": "jest test/unit" }, "dependencies": { - "@streamr/utils": "103.0.0-rc.16", + "@streamr/utils": "103.0.0-rc.17", "eventemitter3": "^5.0.0", "long-timeout": "^0.1.1", "mmdb-lib": "^3.0.0", diff --git a/packages/node/package.json b/packages/node/package.json index 29bd07328b..c8c07e4c07 100644 --- a/packages/node/package.json +++ b/packages/node/package.json @@ -1,6 +1,6 @@ { "name": "@streamr/node", - "version": "103.0.0-rc.16", + "version": "103.0.0-rc.17", "description": "A full-featured node implementation for the Streamr Network", "repository": { "type": "git", @@ -39,9 +39,9 @@ "dependencies": { "@inquirer/prompts": "^7.5.1", "@streamr/config": "^5.5.6", - "@streamr/dht": "103.0.0-rc.16", - "@streamr/sdk": "103.0.0-rc.16", - "@streamr/utils": "103.0.0-rc.16", + "@streamr/dht": "103.0.0-rc.17", + "@streamr/sdk": "103.0.0-rc.17", + "@streamr/utils": "103.0.0-rc.17", "aedes": "^0.51.3", "ajv": "^8.17.1", "ajv-formats": "^3.0.1", @@ -65,7 +65,7 @@ "devDependencies": { "@inquirer/testing": "^2.1.46", "@streamr/network-contracts": "^9.1.0", - "@streamr/test-utils": "103.0.0-rc.16", + "@streamr/test-utils": "103.0.0-rc.17", "@types/cors": "^2.8.18", "@types/express": "^5.0.1", "@types/heap": "^0.2.34", diff --git a/packages/proto-rpc/package.json b/packages/proto-rpc/package.json index 2df639a672..961497ae6e 100644 --- a/packages/proto-rpc/package.json +++ b/packages/proto-rpc/package.json @@ -1,6 +1,6 @@ { "name": "@streamr/proto-rpc", - "version": "103.0.0-rc.16", + "version": "103.0.0-rc.17", "description": "Proto-RPC", "repository": { "type": "git", @@ -32,14 +32,14 @@ "dependencies": { "@protobuf-ts/runtime": "^2.8.2", "@protobuf-ts/runtime-rpc": "^2.8.2", - "@streamr/utils": "103.0.0-rc.16", + "@streamr/utils": "103.0.0-rc.17", "eventemitter3": "^5.0.0", "lodash": "^4.17.21", "uuid": "^11.1.0" }, "devDependencies": { "@streamr/browser-test-runner": "^0.0.1", - "@streamr/test-utils": "103.0.0-rc.16", + "@streamr/test-utils": "103.0.0-rc.17", "@types/lodash": "^4.17.16" }, "optionalDependencies": { diff --git a/packages/sdk/package.json b/packages/sdk/package.json index d1f00c05a0..3689c74fb5 100644 --- a/packages/sdk/package.json +++ b/packages/sdk/package.json @@ -1,6 +1,6 @@ { "name": "@streamr/sdk", - "version": "103.0.0-rc.16", + "version": "103.0.0-rc.17", "description": "JavaScript / TypeScript SDK for Streamr", "repository": { "type": "git", @@ -53,7 +53,7 @@ "@babel/preset-env": "^7.27.2", "@babel/preset-typescript": "^7.27.1", "@jest/globals": "^29.7.0", - "@streamr/test-utils": "103.0.0-rc.16", + "@streamr/test-utils": "103.0.0-rc.17", "@types/heap": "^0.2.34", "@types/lodash": "^4.17.16", "ajv": "^8.17.1", @@ -91,11 +91,11 @@ "@protobuf-ts/runtime": "^2.8.2", "@protobuf-ts/runtime-rpc": "^2.8.2", "@streamr/config": "^5.5.6", - "@streamr/dht": "103.0.0-rc.16", + "@streamr/dht": "103.0.0-rc.17", "@streamr/network-contracts": "^9.1.0", - "@streamr/proto-rpc": "103.0.0-rc.16", - "@streamr/trackerless-network": "103.0.0-rc.16", - "@streamr/utils": "103.0.0-rc.16", + "@streamr/proto-rpc": "103.0.0-rc.17", + "@streamr/trackerless-network": "103.0.0-rc.17", + "@streamr/utils": "103.0.0-rc.17", "core-js": "^3.42.0", "env-paths": "^2.2.1", "ethers": "^6.13.0", diff --git a/packages/test-utils/package.json b/packages/test-utils/package.json index 046565146b..7df4d971a8 100644 --- a/packages/test-utils/package.json +++ b/packages/test-utils/package.json @@ -1,6 +1,6 @@ { "name": "@streamr/test-utils", - "version": "103.0.0-rc.16", + "version": "103.0.0-rc.17", "description": "A collection of shared test utilities", "repository": { "type": "git", @@ -27,7 +27,7 @@ "dependencies": { "@streamr/config": "^5.5.6", "@streamr/network-contracts": "^9.1.0", - "@streamr/utils": "103.0.0-rc.16", + "@streamr/utils": "103.0.0-rc.17", "cors": "^2.8.5", "ethers": "^6.13.0", "express": "^5.1.0", diff --git a/packages/trackerless-network/package.json b/packages/trackerless-network/package.json index 6abf6a96b6..7951dcede3 100644 --- a/packages/trackerless-network/package.json +++ b/packages/trackerless-network/package.json @@ -1,6 +1,6 @@ { "name": "@streamr/trackerless-network", - "version": "103.0.0-rc.16", + "version": "103.0.0-rc.17", "description": "Minimal and extendable implementation of the Streamr Network node.", "repository": { "type": "git", @@ -36,9 +36,9 @@ "dependencies": { "@protobuf-ts/runtime": "^2.8.2", "@protobuf-ts/runtime-rpc": "^2.8.2", - "@streamr/dht": "103.0.0-rc.16", - "@streamr/proto-rpc": "103.0.0-rc.16", - "@streamr/utils": "103.0.0-rc.16", + "@streamr/dht": "103.0.0-rc.17", + "@streamr/proto-rpc": "103.0.0-rc.17", + "@streamr/utils": "103.0.0-rc.17", "eventemitter3": "^5.0.0", "lodash": "^4.17.21", "ts-essentials": "^10.0.4", @@ -47,7 +47,7 @@ }, "devDependencies": { "@streamr/browser-test-runner": "^0.0.1", - "@streamr/test-utils": "103.0.0-rc.16", + "@streamr/test-utils": "103.0.0-rc.17", "@types/lodash": "^4.17.16", "@types/yallist": "^4.0.1", "expect": "^29.6.2", diff --git a/packages/trackerless-network/src/ContentDeliveryManager.ts b/packages/trackerless-network/src/ContentDeliveryManager.ts index 25bd0d1f00..df8be37e7e 100644 --- a/packages/trackerless-network/src/ContentDeliveryManager.ts +++ b/packages/trackerless-network/src/ContentDeliveryManager.ts @@ -36,6 +36,7 @@ import { DEFAULT_MIN_PROPAGATION_TARGETS, DEFAULT_PROPAGATION_BUFFER_TTL } from './content-delivery-layer/propagation/Propagation' +import { ContentDeliveryRpcRemote } from './content-delivery-layer/ContentDeliveryRpcRemote' export type StreamPartDelivery = { broadcast: (msg: StreamMessage) => void @@ -54,6 +55,7 @@ export type StreamPartDelivery = { export interface Events { newMessage: (msg: StreamMessage) => void + neighborListUpdated: (streamPartId: StreamPartID, neighbors: ContentDeliveryRpcRemote[]) => void } const logger = new Logger(module) @@ -194,6 +196,9 @@ export class ContentDeliveryManager extends EventEmitter { node.on('message', (message: StreamMessage) => { this.emit('newMessage', message) }) + node.on('neighborListUpdated', (neighbors: ContentDeliveryRpcRemote[]) => { + this.emit('neighborListUpdated', streamPartId, neighbors) + }) const handleEntryPointLeave = async () => { if (this.destroyed || peerDescriptorStoreManager.isLocalNodeStored() || this.knownStreamPartEntryPoints.has(streamPartId)) { return diff --git a/packages/trackerless-network/src/NetworkNode.ts b/packages/trackerless-network/src/NetworkNode.ts index 73ffce8827..8e85f21253 100644 --- a/packages/trackerless-network/src/NetworkNode.ts +++ b/packages/trackerless-network/src/NetworkNode.ts @@ -7,6 +7,7 @@ import { ExternalNetworkRpc, ExternalRpcClient, ExternalRpcClientClass } from '. import { NetworkOptions, NetworkStack } from './NetworkStack' import { ProxyDirection, StreamMessage } from '../generated/packages/trackerless-network/protos/NetworkRpc' import { NodeInfo } from './types' +import { ContentDeliveryRpcRemote } from './content-delivery-layer/ContentDeliveryRpcRemote' import { StreamPartDeliveryOptions } from './ContentDeliveryManager' export const createNetworkNode = (opts: NetworkOptions): NetworkNode => { @@ -66,6 +67,10 @@ export class NetworkNode { this.stack.getContentDeliveryManager().on('newMessage', listener) } + addNeighborListUpdatedListener(listener: (streamPartId: StreamPartID, neighbors: ContentDeliveryRpcRemote[]) => void): void { + this.stack.getContentDeliveryManager().on('neighborListUpdated', listener) + } + setStreamPartEntryPoints(streamPartId: StreamPartID, contactPeerDescriptors: PeerDescriptor[]): void { this.stack.getContentDeliveryManager().setStreamPartEntryPoints(streamPartId, contactPeerDescriptors) } @@ -74,6 +79,10 @@ export class NetworkNode { this.stack.getContentDeliveryManager().off('newMessage', listener) } + removeNeighborListUpdatedListener(listener: (streamPartId: StreamPartID, neighbors: ContentDeliveryRpcRemote[]) => void): void { + this.stack.getContentDeliveryManager().off('neighborListUpdated', listener) + } + async leave(streamPartId: StreamPartID): Promise { if (this.stopped) { return diff --git a/packages/trackerless-network/src/content-delivery-layer/ContentDeliveryLayerNode.ts b/packages/trackerless-network/src/content-delivery-layer/ContentDeliveryLayerNode.ts index d22157410b..300dfea660 100644 --- a/packages/trackerless-network/src/content-delivery-layer/ContentDeliveryLayerNode.ts +++ b/packages/trackerless-network/src/content-delivery-layer/ContentDeliveryLayerNode.ts @@ -5,6 +5,7 @@ import { ListeningRpcCommunicator, PeerDescriptor, toNodeId, + ConnectionStatistics } from '@streamr/dht' import { Logger, StreamPartID, addManagedEventListener } from '@streamr/utils' import { EventEmitter } from 'eventemitter3' @@ -37,9 +38,9 @@ import { PlumTreeManager } from './plum-tree/PlumTreeManager' export interface Events { message: (message: StreamMessage) => void neighborConnected: (nodeId: DhtAddress) => void + neighborListUpdated: (neighbors: ContentDeliveryRpcRemote[]) => void entryPointLeaveDetected: () => void } - export interface StrictContentDeliveryLayerNodeOptions { streamPartId: StreamPartID discoveryLayerNode: DiscoveryLayerNode @@ -166,7 +167,13 @@ export class ContentDeliveryLayerNode extends EventEmitter { this.abortController.signal ) addManagedEventListener( - this.options.neighbors, + this.options.transport, + 'statisticsUpdated', + (peerDescriptor, statistics) => this.onStatisticsUpdated(peerDescriptor, statistics), + this.abortController.signal + ) + addManagedEventListener( + this.options.neighbors as any, 'nodeAdded', (id, remote) => { this.options.propagation.onNeighborJoined(id) @@ -179,7 +186,7 @@ export class ContentDeliveryLayerNode extends EventEmitter { this.abortController.signal ) addManagedEventListener( - this.options.neighbors, + this.options.neighbors as any, 'nodeRemoved', (_id, remote) => { this.options.connectionLocker.weakUnlockConnection( @@ -189,6 +196,12 @@ export class ContentDeliveryLayerNode extends EventEmitter { }, this.abortController.signal ) + addManagedEventListener( + this.options.neighbors, + 'nodeListUpdated', + () => this.emit('neighborListUpdated', this.options.neighbors.getAll()), + this.abortController.signal + ) if (this.options.proxyConnectionRpcLocal !== undefined) { addManagedEventListener( this.options.proxyConnectionRpcLocal, @@ -338,6 +351,12 @@ export class ContentDeliveryLayerNode extends EventEmitter { } } + private onStatisticsUpdated(peerDescriptor: PeerDescriptor, statistics: ConnectionStatistics): void { + if (this.options.neighbors.has(toNodeId(peerDescriptor))) { + this.options.neighbors.get(toNodeId(peerDescriptor))!.setStatistics(statistics) + } + } + hasProxyConnection(nodeId: DhtAddress): boolean { if (this.options.proxyConnectionRpcLocal) { return this.options.proxyConnectionRpcLocal.hasConnection(nodeId) diff --git a/packages/trackerless-network/src/content-delivery-layer/ContentDeliveryRpcRemote.ts b/packages/trackerless-network/src/content-delivery-layer/ContentDeliveryRpcRemote.ts index 96e5d713a1..d9178d5a69 100644 --- a/packages/trackerless-network/src/content-delivery-layer/ContentDeliveryRpcRemote.ts +++ b/packages/trackerless-network/src/content-delivery-layer/ContentDeliveryRpcRemote.ts @@ -1,17 +1,27 @@ -import { RpcRemote } from '@streamr/dht' +import { ConnectionStatistics, RpcRemote } from '@streamr/dht' import { Logger, StreamPartID } from '@streamr/utils' import { LeaveStreamPartNotice, StreamMessage } from '../../generated/packages/trackerless-network/protos/NetworkRpc' import { ContentDeliveryRpcClient } from '../../generated/packages/trackerless-network/protos/NetworkRpc.client' - +import { EventEmitter } from 'eventemitter3' const logger = new Logger(module) +export interface ContentDeliveryRpcRemoteEvents { + statisticsChanged: (statistics: ConnectionStatistics) => void +} + export class ContentDeliveryRpcRemote extends RpcRemote { private rtt?: number - + private statistics: ConnectionStatistics = { + uploadRateBytesPerSecond: 0, + downloadRateBytesPerSecond: 0, + bufferedAmount: 0 + } + public readonly emitter: EventEmitter = new EventEmitter() + async sendStreamMessage(msg: StreamMessage, doNotBufferWhileConnecting?: boolean): Promise { const options = this.formDhtRpcOptions({ notification: true, @@ -42,4 +52,15 @@ export class ContentDeliveryRpcRemote extends RpcRemote void nodeRemoved: (id: DhtAddress, remote: ContentDeliveryRpcRemote) => void + nodeListUpdated: () => void } const getValuesOfIncludedKeys = ( @@ -35,14 +36,20 @@ export class NodeList extends EventEmitter { this.ownId = ownId } + private onStatisticsChanged = (): void => { + this.emit('nodeListUpdated') + } + add(remote: ContentDeliveryRpcRemote): void { const nodeId = toNodeId(remote.getPeerDescriptor()) if ((this.ownId !== nodeId) && (this.nodes.size < this.limit)) { + remote.emitter.on('statisticsChanged', this.onStatisticsChanged) const isExistingNode = this.nodes.has(nodeId) this.nodes.set(nodeId, remote) if (!isExistingNode) { this.emit('nodeAdded', nodeId, remote) + this.emit('nodeListUpdated') } } } @@ -50,8 +57,10 @@ export class NodeList extends EventEmitter { remove(nodeId: DhtAddress): void { if (this.nodes.has(nodeId)) { const remote = this.nodes.get(nodeId)! + remote.emitter.off('statisticsChanged', this.onStatisticsChanged) this.nodes.delete(nodeId) this.emit('nodeRemoved', nodeId, remote) + this.emit('nodeListUpdated') } } diff --git a/packages/trackerless-network/src/content-delivery-layer/neighbor-discovery/NeighborUpdateManager.ts b/packages/trackerless-network/src/content-delivery-layer/neighbor-discovery/NeighborUpdateManager.ts index 259fbb963c..adc5fef581 100644 --- a/packages/trackerless-network/src/content-delivery-layer/neighbor-discovery/NeighborUpdateManager.ts +++ b/packages/trackerless-network/src/content-delivery-layer/neighbor-discovery/NeighborUpdateManager.ts @@ -53,10 +53,12 @@ export class NeighborUpdateManager { const res = await this.createRemote(neighbor.getPeerDescriptor()).updateNeighbors(this.options.streamPartId, neighborDescriptors) const nodeId = toNodeId(neighbor.getPeerDescriptor()) this.options.neighbors.get(nodeId)!.setRtt(Date.now() - startTime) + if (res.removeMe) { this.options.neighbors.remove(nodeId) this.options.neighborFinder.start([nodeId]) } + })) } diff --git a/packages/trackerless-network/src/discovery-layer/DiscoveryLayerNode.ts b/packages/trackerless-network/src/discovery-layer/DiscoveryLayerNode.ts index 0ab8a29af6..84a0f6508d 100644 --- a/packages/trackerless-network/src/discovery-layer/DiscoveryLayerNode.ts +++ b/packages/trackerless-network/src/discovery-layer/DiscoveryLayerNode.ts @@ -1,5 +1,4 @@ import { DhtAddress, PeerDescriptor, RingContacts } from '@streamr/dht' - export interface DiscoveryLayerNodeEvents { manualRejoinRequired: () => void nearbyContactAdded: (peerDescriptor: PeerDescriptor) => void @@ -17,6 +16,7 @@ export interface DiscoveryLayerNode { off(eventName: T, listener: () => void): void once(eventName: T, listener: (peerDescriptor: PeerDescriptor) => void): void once(eventName: T, listener: () => void): void + removeContact: (nodeId: DhtAddress) => void getClosestContacts: (maxCount?: number) => PeerDescriptor[] getRandomContacts: (maxCount?: number) => PeerDescriptor[] diff --git a/packages/utils/package.json b/packages/utils/package.json index 8f0d984843..5b2039ffca 100644 --- a/packages/utils/package.json +++ b/packages/utils/package.json @@ -1,6 +1,6 @@ { "name": "@streamr/utils", - "version": "103.0.0-rc.16", + "version": "103.0.0-rc.17", "description": "A collection of shared common utilities", "repository": { "type": "git",