Skip to content

Commit 2de7f48

Browse files
juslesanteogeb
andauthored
feat(sdk): Add content delivery layer buffer configurations (#3305)
## Background There are two different buffers related to this PR. The propagation buffer and connection buffers. The propagation buffer is used when a node is broadcasting data but does not have neighbors to send the data to. The connection buffer is used when a new connection is formed in the network. ## Summary We can now configure these two options for the content delivery layer: - `streamPartitionMaxPropagationBufferSize` can be used to set the size of the propagation buffer. - `contentDeliveryBufferWhileConnecting` can be used to stop buffering while a connection is being formed. ## Related changes - renamed `doNotBufferWhileConnecting` to `bufferWhileConnecting` in the `trackerless-network` and `dht` packages. ## Example use case In a case where a publisher joins the network before subscribers the publisher stores the data to a buffer as it cannot send it to anyone. When a subscriber joins the content of the buffer is sent to the subscriber first and after that the real-time data is sent. If we don't want that the buffer content is sent then we can use set `streamPartitionMaxBufferSize` to 0 to avoid propagation buffering. Additionally if the opening of connections takes long it is possible to disable buffering on connecting connections to avoid sending of old messages by setting the `contentDeliveryBufferWhileConnecting` as false. --------- Co-authored-by: Teo Gebhard <[email protected]>
1 parent 2502c44 commit 2de7f48

File tree

14 files changed

+40
-17
lines changed

14 files changed

+40
-17
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ Changes before Tatum release are not documented in this file.
1717
- Add `StreamrClient#publishRaw()` for publishing raw messages (https://github.com/streamr-dev/network/pull/3280)
1818
- Add new `keys` configuration to the `encryption` section (https://github.com/streamr-dev/network/pull/3284)
1919
- Add new `validation` configuration section (https://github.com/streamr-dev/network/pull/3302)
20+
- Add new configuration options for controlling content delivery buffering (https://github.com/streamr-dev/network/pull/3305)
2021

2122
#### Changed
2223

packages/dht/src/connection/ConnectionManager.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ export class ConnectionManager extends EventEmitter<TransportEvents> implements
309309

310310
if (this.endpoints.get(nodeId)!.connected) {
311311
(connection as ManagedConnection).send(binary)
312-
} else if (!opts.doNotBufferWhileConnecting) {
312+
} else if (opts.bufferWhileConnecting) {
313313
return (this.endpoints.get(nodeId)! as ConnectingEndpoint).buffer.push(binary)
314314
}
315315
}

packages/dht/src/rpc-protocol/DhtCallContext.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ export class DhtCallContext extends ProtoCallContext implements DhtRpcOptions {
99
clientId?: number
1010
connect?: boolean
1111
sendIfStopped?: boolean
12-
doNotBufferWhileConnecting?: boolean
12+
bufferWhileConnecting?: boolean
1313
//used in incoming calls
1414
incomingSourceDescriptor?: PeerDescriptor
1515
}

packages/dht/src/rpc-protocol/DhtRpcOptions.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,5 +7,5 @@ export interface DhtRpcOptions extends ProtoRpcOptions {
77
clientId?: number
88
connect?: boolean
99
sendIfStopped?: boolean
10-
doNotBufferWhileConnecting?: boolean
10+
bufferWhileConnecting?: boolean
1111
}

packages/dht/src/transport/ITransport.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@ export interface TransportEvents {
99
export interface SendOptions {
1010
connect: boolean
1111
sendIfStopped: boolean
12-
doNotBufferWhileConnecting: boolean
12+
bufferWhileConnecting: boolean
1313
}
1414

1515
export const DEFAULT_SEND_OPTIONS = {
1616
connect: true,
1717
sendIfStopped: false,
18-
doNotBufferWhileConnecting: false
18+
bufferWhileConnecting: true
1919
}
2020

2121
export interface ITransport {

packages/dht/src/transport/RoutingRpcCommunicator.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,11 @@ export class RoutingRpcCommunicator extends RpcCommunicator<DhtCallContext> {
4747
// TODO maybe this options could be removed?
4848
sendIfStopped: true,
4949
// Responses should be buffered if necessary
50-
doNotBufferWhileConnecting: false
50+
bufferWhileConnecting: true
5151
} : {
5252
connect: callContext?.connect ?? DEFAULT_SEND_OPTIONS.connect,
5353
sendIfStopped: callContext?.sendIfStopped ?? DEFAULT_SEND_OPTIONS.sendIfStopped,
54-
doNotBufferWhileConnecting: callContext?.doNotBufferWhileConnecting ?? DEFAULT_SEND_OPTIONS.doNotBufferWhileConnecting
54+
bufferWhileConnecting: callContext?.bufferWhileConnecting ?? DEFAULT_SEND_OPTIONS.bufferWhileConnecting
5555
}
5656
return sendFn(message, sendOpts)
5757
})

packages/dht/test/integration/ConnectionManager.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -523,7 +523,7 @@ describe('ConnectionManager', () => {
523523

524524
})
525525

526-
it('send doNotBufferWhileConnecting', async () => {
526+
it('send bufferWhileConnecting as false', async () => {
527527
const connectionManager1 = createConnectionManager({
528528
transport: mockTransport,
529529
websocketHost: '127.0.0.1',
@@ -572,7 +572,7 @@ describe('ConnectionManager', () => {
572572
const sendOptions = {
573573
connect: true,
574574
sendIfStopped: false,
575-
doNotBufferWhileConnecting: true
575+
bufferWhileConnecting: false
576576
}
577577
await Promise.all([connectedPromise1, connectedPromise2, connectionManager2.send(msg, sendOptions)])
578578
await wait(1000)

packages/sdk/src/Config.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,16 @@ export interface NetworkNodeConfig {
183183
*/
184184
streamPartitionMinPropagationTargets?: number
185185

186+
/**
187+
* The maximum number of messages that can be buffered in the propagation buffer.
188+
*/
189+
streamPartitionMaxPropagationBufferSize?: number
190+
191+
/**
192+
* Whether to buffer while connecting in the content delivery layer.
193+
*/
194+
contentDeliveryBufferWhileConnecting?: boolean
195+
186196
/**
187197
* Whether to accept proxy connections. Enabling this option allows
188198
* this network node to act as proxy on behalf of other nodes / clients.

packages/sdk/src/NetworkNodeFacade.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import {
1717
import { Logger, MetricsContext, StreamPartID, StreamPartIDUtils, UserID } from '@streamr/utils'
1818
import EventEmitter from 'eventemitter3'
1919
import pull from 'lodash/pull'
20+
import omit from 'lodash/omit'
2021
import { Lifecycle, inject, scoped } from 'tsyringe'
2122
import { Identity, IdentityInjectionToken } from './identity/Identity'
2223
import { ConfigInjectionToken, NetworkPeerDescriptor, StrictStreamrClientConfig } from './Config'
@@ -140,7 +141,10 @@ export class NetworkNodeFacade {
140141
? this.config.network.controlLayer.websocketPortRange
141142
: undefined
142143
},
143-
networkNode: this.config.network.node,
144+
networkNode: {
145+
...omit(this.config.network.node, 'contentDeliveryBufferWhileConnecting'),
146+
bufferWhileConnecting: this.config.network.node.contentDeliveryBufferWhileConnecting
147+
},
144148
metricsContext: new MetricsContext()
145149
}
146150
}

packages/sdk/src/config.schema.json

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,14 @@
275275
"type": "number",
276276
"default": 2
277277
},
278+
"streamPartitionMaxPropagationBufferSize": {
279+
"type": "number",
280+
"default": 150
281+
},
282+
"contentDeliveryBufferWhileConnecting": {
283+
"type": "boolean",
284+
"default": true
285+
},
278286
"acceptProxyConnections": {
279287
"type": "boolean",
280288
"default": false

0 commit comments

Comments
 (0)