Skip to content

Commit 0584d18

Browse files
authored
Merge pull request #768 from dotnet/fixes
Fix node.js stream blockage in mxstream channels
2 parents 1e38407 + 82b8983 commit 0584d18

File tree

3 files changed

+67
-5
lines changed

3 files changed

+67
-5
lines changed

src/nerdbank-streams/src/Channel.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,11 +216,20 @@ export class ChannelClass extends Channel {
216216
}
217217

218218
public onContent(buffer: Buffer | null) {
219+
const priorReadableFlowing = this._duplex.readableFlowing
220+
219221
this._duplex.push(buffer)
220222

223+
// Large buffer pushes can switch a stream from flowing to non-flowing
224+
// when it meets or exceeds the highWaterMark. We need to resume the stream
225+
// in this case so that the user can continue to receive data.
226+
if (priorReadableFlowing && this._duplex.readableFlowing === false) {
227+
this._duplex.resume()
228+
}
229+
221230
// We should find a way to detect when we *actually* share the received buffer with the Channel's user
222231
// and only report consumption when they receive the buffer from us so that we effectively apply
223-
// backpressure to the remote party based on our user's actual consumption rather than keep allocating memory.
232+
// backpressure to the remote party based on our user's actual consumption rather than continually allocating memory.
224233
if (this._multiplexingStream.backpressureSupportEnabled && buffer) {
225234
this._multiplexingStream.localContentExamined(this, buffer.length)
226235
}

src/nerdbank-streams/src/Utilities.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,12 @@ export async function getBufferFrom(
9292
throw new Error('Stream terminated before required bytes were read.')
9393
}
9494

95-
// Returns what has been read so far
95+
// Returns what has been read so far.
9696
if (readBuffer === null) {
9797
return null
9898
}
9999

100-
// we need trim extra spaces
100+
// We need to trim the trailing space.
101101
return readBuffer.subarray(0, index)
102102
}
103103

@@ -116,11 +116,11 @@ export async function getBufferFrom(
116116

117117
if (readBuffer === null) {
118118
if (availableSize === size || newBuffer.length < availableSize) {
119-
// in the fast pass, we read the entire data once, and donot allocate an extra array.
119+
// In the fast pass, we read the entire data once, and do not allocate an extra array.
120120
return newBuffer
121121
}
122122

123-
// if we read partial data, we need allocate a buffer to join all data together.
123+
// If we read partial data, we need to allocate a buffer to join all data together.
124124
readBuffer = Buffer.alloc(size)
125125
}
126126

src/nerdbank-streams/src/tests/MultiplexingStream.spec.ts

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,59 @@ import { Channel } from '../Channel'
88
import CancellationToken from 'cancellationtoken'
99
import * as assert from 'assert'
1010
import { nextTick } from 'process'
11+
import { Duplex } from 'stream'
12+
13+
it('highWatermark threshold does not clog', async () => {
14+
// Brokered service
15+
let bytesToReceive = 0
16+
let receivedAllBytes = new Deferred()
17+
function receiver(pipe: Duplex) {
18+
let lengths: number[] = []
19+
pipe.on('data', (data: Buffer) => {
20+
lengths.push(data.length)
21+
22+
bytesToReceive -= data.length
23+
// console.log(`recv ${data.length}. ${bytesToReceive} remaining`)
24+
if (bytesToReceive <= 0) {
25+
receivedAllBytes.resolve(undefined)
26+
}
27+
})
28+
}
29+
30+
// IServiceBroker
31+
const { first: localServicePipe, second: servicePipe } = FullDuplexStream.CreatePair()
32+
receiver(localServicePipe)
33+
34+
// MultiplexingStreamServiceBroker
35+
const simulatedMxStream = FullDuplexStream.CreatePair()
36+
const [mx1, mx2] = await Promise.all([MultiplexingStream.CreateAsync(simulatedMxStream.first), MultiplexingStream.CreateAsync(simulatedMxStream.second)])
37+
const [local, remote] = await Promise.all([mx1.offerChannelAsync(''), mx2.acceptChannelAsync('')])
38+
servicePipe.pipe(local.stream)
39+
local.stream.pipe(servicePipe)
40+
41+
global.test_servicePipe = servicePipe
42+
global.test_d = local.stream
43+
global.test_localServicePipe = localServicePipe
44+
45+
// brokered service client
46+
function writeHelper(buffer: Buffer): boolean {
47+
bytesToReceive += buffer.length
48+
const result = remote.stream.write(buffer)
49+
// console.log('written', buffer.length, result)
50+
return result
51+
}
52+
for (let i = 15; i < 20; i++) {
53+
const buffer = Buffer.alloc(i * 1024)
54+
writeHelper(buffer)
55+
await nextTickAsync()
56+
writeHelper(Buffer.alloc(10))
57+
await nextTickAsync()
58+
}
59+
60+
if (bytesToReceive > 0) {
61+
await receivedAllBytes.promise
62+
}
63+
})
1164
;[1, 2, 3].forEach(protocolMajorVersion => {
1265
describe(`MultiplexingStream v${protocolMajorVersion}`, () => {
1366
let mx1: MultiplexingStream

0 commit comments

Comments
 (0)