Skip to content

Commit 6778cce

Browse files
committed
Modify PipeStream to interrupt blocking read when stream is closed/disposed.
Added tests for PipeStream.
1 parent c6fca9d commit 6778cce

File tree

6 files changed

+314
-3
lines changed

6 files changed

+314
-3
lines changed
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
using System;
2+
using System.Threading;
3+
using Microsoft.VisualStudio.TestTools.UnitTesting;
4+
using Renci.SshNet.Common;
5+
6+
namespace Renci.SshNet.Tests.Common
7+
{
8+
[TestClass]
9+
public class PipeStream_Close_BlockingRead
10+
{
11+
private PipeStream _pipeStream;
12+
private int _bytesRead;
13+
private IAsyncResult _asyncReadResult;
14+
15+
[TestInitialize]
16+
public void Init()
17+
{
18+
_pipeStream = new PipeStream();
19+
20+
_pipeStream.WriteByte(10);
21+
_pipeStream.WriteByte(13);
22+
_pipeStream.WriteByte(25);
23+
24+
_bytesRead = 123;
25+
26+
Action readAction = () => _bytesRead = _pipeStream.Read(new byte[4], 0, 4);
27+
_asyncReadResult = readAction.BeginInvoke(null, null);
28+
_asyncReadResult.AsyncWaitHandle.WaitOne(50);
29+
30+
Act();
31+
}
32+
33+
protected void Act()
34+
{
35+
_pipeStream.Close();
36+
}
37+
38+
[TestMethod]
39+
public void AsyncReadShouldHaveFinished()
40+
{
41+
Assert.IsTrue(_asyncReadResult.IsCompleted);
42+
}
43+
44+
[TestMethod]
45+
public void ReadShouldHaveReturnedZero()
46+
{
47+
Assert.AreEqual(0, _bytesRead);
48+
}
49+
}
50+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
using System;
2+
using Microsoft.VisualStudio.TestTools.UnitTesting;
3+
using Renci.SshNet.Common;
4+
5+
namespace Renci.SshNet.Tests.Common
6+
{
7+
[TestClass]
8+
public class PipeStream_Close_BlockingWrite
9+
{
10+
private PipeStream _pipeStream;
11+
private Exception _writeException;
12+
private IAsyncResult _asyncWriteResult;
13+
14+
[TestInitialize]
15+
public void Init()
16+
{
17+
_pipeStream = new PipeStream {MaxBufferLength = 3};
18+
19+
Action writeAction = () =>
20+
{
21+
_pipeStream.WriteByte(10);
22+
_pipeStream.WriteByte(13);
23+
_pipeStream.WriteByte(25);
24+
25+
try
26+
{
27+
_pipeStream.WriteByte(35);
28+
}
29+
catch (Exception ex)
30+
{
31+
_writeException = ex;
32+
throw;
33+
}
34+
};
35+
_asyncWriteResult = writeAction.BeginInvoke(null, null);
36+
_asyncWriteResult.AsyncWaitHandle.WaitOne(50);
37+
38+
Act();
39+
}
40+
41+
protected void Act()
42+
{
43+
_pipeStream.Close();
44+
}
45+
46+
[TestMethod]
47+
public void AsyncWriteShouldHaveFinished()
48+
{
49+
Assert.IsTrue(_asyncWriteResult.IsCompleted);
50+
}
51+
52+
[TestMethod]
53+
public void WriteThatExceedsMaxBufferLengthShouldHaveThrownObjectDisposedException()
54+
{
55+
Assert.IsNotNull(_writeException);
56+
Assert.AreEqual(typeof (ObjectDisposedException), _writeException.GetType());
57+
}
58+
}
59+
}
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
using System;
2+
using System.Threading;
3+
using Microsoft.VisualStudio.TestTools.UnitTesting;
4+
using Renci.SshNet.Common;
5+
6+
namespace Renci.SshNet.Tests.Common
7+
{
8+
[TestClass]
9+
public class PipeStream_Flush_BytesRemainingAfterRead
10+
{
11+
private PipeStream _pipeStream;
12+
private byte[] _readBuffer;
13+
private int _bytesRead;
14+
private IAsyncResult _asyncReadResult;
15+
16+
[TestInitialize]
17+
public void Init()
18+
{
19+
_pipeStream = new PipeStream();
20+
_pipeStream.WriteByte(10);
21+
_pipeStream.WriteByte(13);
22+
_pipeStream.WriteByte(15);
23+
_pipeStream.WriteByte(18);
24+
_pipeStream.WriteByte(23);
25+
_pipeStream.WriteByte(28);
26+
27+
_bytesRead = 0;
28+
_readBuffer = new byte[4];
29+
30+
Action readAction = () => _bytesRead = _pipeStream.Read(_readBuffer, 0, _readBuffer.Length);
31+
_asyncReadResult = readAction.BeginInvoke(null, null);
32+
_asyncReadResult.AsyncWaitHandle.WaitOne(50);
33+
34+
Act();
35+
}
36+
37+
protected void Act()
38+
{
39+
_pipeStream.Flush();
40+
}
41+
42+
[TestMethod]
43+
public void AsyncReadShouldHaveFinished()
44+
{
45+
Assert.IsTrue(_asyncReadResult.IsCompleted);
46+
}
47+
48+
[TestMethod]
49+
public void ReadShouldReturnNumberOfBytesAvailableThatAreWrittenToBuffer()
50+
{
51+
Assert.AreEqual(4, _bytesRead);
52+
}
53+
54+
[TestMethod]
55+
public void BytesAvailableInStreamShouldHaveBeenWrittenToBuffer()
56+
{
57+
Assert.AreEqual(10, _readBuffer[0]);
58+
Assert.AreEqual(13, _readBuffer[1]);
59+
Assert.AreEqual(15, _readBuffer[2]);
60+
Assert.AreEqual(18, _readBuffer[3]);
61+
}
62+
63+
[TestMethod]
64+
public void RemainingBytesCanBeRead()
65+
{
66+
var buffer = new byte[3];
67+
68+
var bytesRead = _pipeStream.Read(buffer, 0, 2);
69+
70+
Assert.AreEqual(2, bytesRead);
71+
Assert.AreEqual(23, buffer[0]);
72+
Assert.AreEqual(28, buffer[1]);
73+
Assert.AreEqual(0, buffer[2]);
74+
}
75+
76+
[TestMethod]
77+
public void ReadingMoreBytesThanAvailableDoesNotBlock()
78+
{
79+
var buffer = new byte[4];
80+
81+
var bytesRead = _pipeStream.Read(buffer, 0, buffer.Length);
82+
83+
Assert.AreEqual(2, bytesRead);
84+
Assert.AreEqual(23, buffer[0]);
85+
Assert.AreEqual(28, buffer[1]);
86+
Assert.AreEqual(0, buffer[2]);
87+
Assert.AreEqual(0, buffer[3]);
88+
}
89+
90+
[TestMethod]
91+
public void WriteCausesSubsequentReadToBlockUntilRequestedNumberOfBytesAreAvailable()
92+
{
93+
_pipeStream.WriteByte(32);
94+
95+
var buffer = new byte[4];
96+
int bytesRead = int.MaxValue;
97+
98+
Thread readThread = new Thread(() =>
99+
{
100+
bytesRead = _pipeStream.Read(buffer, 0, buffer.Length);
101+
});
102+
readThread.Start();
103+
104+
Assert.IsFalse(readThread.Join(500));
105+
readThread.Abort();
106+
107+
Assert.AreEqual(int.MaxValue, bytesRead);
108+
Assert.AreEqual(0, buffer[0]);
109+
Assert.AreEqual(0, buffer[1]);
110+
Assert.AreEqual(0, buffer[2]);
111+
Assert.AreEqual(0, buffer[3]);
112+
}
113+
}
114+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
using System;
2+
using Microsoft.VisualStudio.TestTools.UnitTesting;
3+
using Renci.SshNet.Common;
4+
5+
namespace Renci.SshNet.Tests.Common
6+
{
7+
[TestClass]
8+
public class PipeStream_Flush_NoBytesRemainingAfterRead
9+
{
10+
private PipeStream _pipeStream;
11+
private byte[] _readBuffer;
12+
private int _bytesRead;
13+
private IAsyncResult _asyncReadResult;
14+
15+
[TestInitialize]
16+
public void Init()
17+
{
18+
_pipeStream = new PipeStream();
19+
_pipeStream.WriteByte(10);
20+
_pipeStream.WriteByte(13);
21+
22+
_bytesRead = 0;
23+
_readBuffer = new byte[4];
24+
25+
Action readAction = () => _bytesRead = _pipeStream.Read(_readBuffer, 0, _readBuffer.Length);
26+
_asyncReadResult = readAction.BeginInvoke(null, null);
27+
_asyncReadResult.AsyncWaitHandle.WaitOne(50);
28+
29+
Act();
30+
}
31+
32+
protected void Act()
33+
{
34+
_pipeStream.Flush();
35+
}
36+
37+
[TestMethod]
38+
public void AsyncReadShouldHaveFinished()
39+
{
40+
Assert.IsTrue(_asyncReadResult.IsCompleted);
41+
}
42+
43+
[TestMethod]
44+
public void ReadShouldReturnNumberOfBytesAvailableThatAreWrittenToBuffer()
45+
{
46+
Assert.AreEqual(2, _bytesRead);
47+
}
48+
49+
[TestMethod]
50+
public void BytesAvailableInStreamShouldHaveBeenWrittenToBuffer()
51+
{
52+
Assert.AreEqual(10, _readBuffer[0]);
53+
Assert.AreEqual(13, _readBuffer[1]);
54+
Assert.AreEqual(0, _readBuffer[2]);
55+
Assert.AreEqual(0, _readBuffer[3]);
56+
}
57+
}
58+
}

src/Renci.SshNet.Tests/Renci.SshNet.Tests.csproj

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,10 @@
441441
<Compile Include="Common\AsyncSocketListener.cs" />
442442
<Compile Include="Common\HttpProxyStub.cs" />
443443
<Compile Include="Common\HttpRequest.cs" />
444+
<Compile Include="Common\PipeStream_Close_BlockingRead.cs" />
445+
<Compile Include="Common\PipeStream_Close_BlockingWrite.cs" />
446+
<Compile Include="Common\PipeStream_Flush_BytesRemainingAfterRead.cs" />
447+
<Compile Include="Common\PipeStream_Flush_NoBytesRemainingAfterRead.cs" />
444448
<Compile Include="Common\TestBase.cs" />
445449
<Compile Include="Classes\Compression\CompressorTest.cs" />
446450
<Compile Include="Classes\Common\DerDataTest.cs" />

src/Renci.SshNet/Common/PipeStream.cs

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -122,14 +122,21 @@ public bool BlockLastReadBuffer
122122
/// </summary>
123123
/// <exception cref="IOException">An I/O error occurs.</exception>
124124
/// <exception cref="ObjectDisposedException">Methods were called after the stream was closed.</exception>
125+
/// <remarks>
126+
/// Once flushed, any subsequent read operations no longer block until requested bytes are available. Any write operation reactivates blocking
127+
/// reads.
128+
/// </remarks>
125129
public override void Flush()
126130
{
127131
if (_isDisposed)
128132
throw CreateObjectDisposedException();
129133

130134
_isFlushed = true;
131135
lock (_buffer)
136+
{
137+
// unblock read hereby allowing buffer to be partially filled
132138
Monitor.Pulse(_buffer);
139+
}
133140
}
134141

135142
/// <summary>
@@ -160,7 +167,7 @@ public override void SetLength(long value)
160167
///When overridden in a derived class, reads a sequence of bytes from the current stream and advances the position within the stream by the number of bytes read.
161168
///</summary>
162169
///<returns>
163-
///The total number of bytes read into the buffer. This can be less than the number of bytes requested if that many bytes are not currently available, or zero if the end of the stream has been reached.
170+
///The total number of bytes read into the buffer. This can be less than the number of bytes requested if that many bytes are not currently available, or zero if the stream is closed or end of the stream has been reached.
164171
///</returns>
165172
///<param name="offset">The zero-based byte offset in buffer at which to begin storing the data read from the current stream.</param>
166173
///<param name="count">The maximum number of bytes to be read from the current stream.</param>
@@ -192,8 +199,16 @@ public override int Read(byte[] buffer, int offset, int count)
192199

193200
lock (_buffer)
194201
{
195-
while (!ReadAvailable(count))
202+
while (!_isDisposed && !ReadAvailable(count))
203+
{
196204
Monitor.Wait(_buffer);
205+
}
206+
207+
// return zero when the read is interrupted by a close/dispose of the stream
208+
if (_isDisposed)
209+
{
210+
return 0;
211+
}
197212

198213
// fill the read buffer
199214
for (; readLength < count && Length > 0 && _buffer.Count > 0; readLength++)
@@ -203,6 +218,7 @@ public override int Read(byte[] buffer, int offset, int count)
203218

204219
Monitor.Pulse(_buffer);
205220
}
221+
206222
return readLength;
207223
}
208224

@@ -264,11 +280,21 @@ public override void Write(byte[] buffer, int offset, int count)
264280
/// Releases the unmanaged resources used by the Stream and optionally releases the managed resources.
265281
/// </summary>
266282
/// <param name="disposing"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param>
283+
/// <remarks>
284+
/// Disposing a <see cref="PipeStream"/> will interrupt blocking read and write operations.
285+
/// </remarks>
267286
protected override void Dispose(bool disposing)
268287
{
269288
base.Dispose(disposing);
270289

271-
_isDisposed = true;
290+
if (!_isDisposed)
291+
{
292+
lock (_buffer)
293+
{
294+
_isDisposed = true;
295+
Monitor.Pulse(_buffer);
296+
}
297+
}
272298
}
273299

274300
///<summary>

0 commit comments

Comments
 (0)