Skip to content

Commit 657d485

Browse files
authored
Merge pull request #587 from AArnott/fix578
Fix hang in nested multiplexing stream
2 parents 6f686a7 + d3beea2 commit 657d485

File tree

2 files changed

+49
-21
lines changed

2 files changed

+49
-21
lines changed

src/nerdbank-streams/src/MultiplexingStream.ts

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,6 @@ export abstract class MultiplexingStream implements IDisposableObservable {
3737
/** The default window size used for new channels that do not specify a value for ChannelOptions.ChannelReceivingWindowSize. */
3838
readonly defaultChannelReceivingWindowSize: number
3939

40-
protected readonly formatter: MultiplexingStreamFormatter
41-
4240
protected get disposalToken() {
4341
return this.disposalTokenSource.token
4442
}
@@ -99,7 +97,7 @@ export abstract class MultiplexingStream implements IDisposableObservable {
9997
const handshakeResult = await formatter.readHandshakeAsync(writeHandshakeData, cancellationToken)
10098
formatter.isOdd = handshakeResult.isOdd
10199

102-
return new MultiplexingStreamClass(stream, handshakeResult.isOdd, options)
100+
return new MultiplexingStreamClass(formatter, handshakeResult.isOdd, options)
103101
}
104102

105103
/**
@@ -120,7 +118,7 @@ export abstract class MultiplexingStream implements IDisposableObservable {
120118
throw new Error(`Protocol major version ${options.protocolMajorVersion} is not supported. Try CreateAsync instead.`)
121119
}
122120

123-
return new MultiplexingStreamClass(stream, undefined, options)
121+
return new MultiplexingStreamClass(formatter, undefined, options)
124122
}
125123

126124
/**
@@ -176,22 +174,13 @@ export abstract class MultiplexingStream implements IDisposableObservable {
176174

177175
private disposalTokenSource = CancellationToken.create()
178176

179-
protected constructor(stream: NodeJS.ReadWriteStream, private readonly isOdd: boolean | undefined, options: MultiplexingStreamOptions) {
177+
protected constructor(
178+
protected readonly formatter: MultiplexingStreamFormatter,
179+
private readonly isOdd: boolean | undefined,
180+
options: MultiplexingStreamOptions
181+
) {
180182
this.defaultChannelReceivingWindowSize = options.defaultChannelReceivingWindowSize ?? MultiplexingStream.recommendedDefaultChannelReceivingWindowSize
181183
this.protocolMajorVersion = options.protocolMajorVersion ?? 1
182-
const formatter: MultiplexingStreamFormatter | undefined =
183-
options.protocolMajorVersion === 1
184-
? new MultiplexingStreamV1Formatter(stream)
185-
: options.protocolMajorVersion === 2
186-
? new MultiplexingStreamV2Formatter(stream)
187-
: options.protocolMajorVersion === 3
188-
? new MultiplexingStreamV3Formatter(stream)
189-
: undefined
190-
if (formatter === undefined) {
191-
throw new Error(`Unsupported major protocol version: ${options.protocolMajorVersion}`)
192-
}
193-
formatter.isOdd = isOdd
194-
this.formatter = formatter
195184

196185
if (options.seededChannels) {
197186
for (let i = 0; i < options.seededChannels.length; i++) {
@@ -534,8 +523,8 @@ export class MultiplexingStreamClass extends MultiplexingStream {
534523
protected lastOfferedChannelId: number
535524
private readonly sendingSemaphore = new Semaphore(1)
536525

537-
constructor(stream: NodeJS.ReadWriteStream, isOdd: boolean | undefined, options: MultiplexingStreamOptions) {
538-
super(stream, isOdd, options)
526+
constructor(formatter: MultiplexingStreamFormatter, isOdd: boolean | undefined, options: MultiplexingStreamOptions) {
527+
super(formatter, isOdd, options)
539528

540529
this.lastOfferedChannelId = isOdd ? -1 : 0 // the first channel created should be 1 or 2
541530
this.lastOfferedChannelId += options.seededChannels?.length ?? 0

src/nerdbank-streams/src/tests/MultiplexingStream.spec.ts

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ import { nextTick } from 'process'
115115

116116
// Give time for the termination fram to arrive *before* we try to accept the channel.
117117
for (let i = 0; i < 100; i++) {
118-
await new Promise<void>(resolve => nextTick(() => resolve()))
118+
await nextTickAsync()
119119
}
120120

121121
// We expect this to timeout. But we need this for the test to fail if we have unobserved promise rejections.
@@ -323,6 +323,41 @@ import { nextTick } from 'process'
323323
await Promise.all([mx1Local.offerChannelAsync(''), mx2Local.acceptChannelAsync('')])
324324
})
325325
}
326+
327+
it('nested stream does not pause', async () => {
328+
const rpcChannels = await Promise.all([mx1.offerChannelAsync('test'), mx2.acceptChannelAsync('test')])
329+
330+
const inner1 = MultiplexingStream.Create(rpcChannels[0].stream, { protocolMajorVersion: 3 })
331+
const inner2 = MultiplexingStream.Create(rpcChannels[1].stream, { protocolMajorVersion: 3 })
332+
const innerRpcChannels = await Promise.all([inner1.offerChannelAsync('test'), inner2.acceptChannelAsync('test')])
333+
334+
const iterations = 32 // a high number to exceed high water mark levels in object streams
335+
336+
const fulfilled = new Promise<Buffer>(resolve => {
337+
const chunks: Buffer[] = []
338+
innerRpcChannels[1].stream.on('data', chunk => {
339+
chunks.push(chunk)
340+
if (chunks.length === iterations) {
341+
resolve(Buffer.concat(chunks))
342+
}
343+
})
344+
})
345+
346+
for (let i = 0; i < iterations; i++) {
347+
innerRpcChannels[0].stream.write(Buffer.from([i]))
348+
}
349+
350+
await fulfilled
351+
352+
innerRpcChannels[0].stream.end()
353+
innerRpcChannels[1].stream.end()
354+
inner1.dispose()
355+
inner2.dispose()
356+
await inner1.completion
357+
await inner2.completion
358+
rpcChannels[0].stream.end()
359+
rpcChannels[1].stream.end()
360+
})
326361
})
327362

328363
async function expectThrow<T>(promise: Promise<T>): Promise<any> {
@@ -334,3 +369,7 @@ import { nextTick } from 'process'
334369
}
335370
}
336371
})
372+
373+
function nextTickAsync() {
374+
return new Promise<void>(resolve => nextTick(() => resolve()))
375+
}

0 commit comments

Comments
 (0)