Skip to content

Commit 8650786

Browse files
committed
Rent buffers used in SFTP reads
An SFTP download performs several reads from the server in parallel, allocating an array to store each result until it's ready to be consumed. Since these buffers are short-lived and normally of the same large-ish size (32KB), it seems like a good candidate for pooling.
1 parent 4e02502 commit 8650786

File tree

11 files changed

+162
-33
lines changed

11 files changed

+162
-33
lines changed

src/Renci.SshNet/Common/Extensions.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,14 @@ async Task<T> WaitCore()
417417
return await completedTask.ConfigureAwait(false);
418418
}
419419
}
420+
421+
extension(Task t)
422+
{
423+
internal bool IsCompletedSuccessfully
424+
{
425+
get { return t.Status == TaskStatus.RanToCompletion; }
426+
}
427+
}
420428
#endif
421429
}
422430
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
#nullable enable
2+
using System;
3+
using System.Buffers;
4+
using System.Diagnostics;
5+
using System.Net;
6+
7+
namespace Renci.SshNet.Common
8+
{
9+
/// <summary>
10+
/// A type representing ownership of a rented, read-only buffer.
11+
/// </summary>
12+
internal sealed class ReadOnlyMemoryOwner : IMemoryOwner<byte>
13+
{
14+
private ArrayBuffer _buffer;
15+
16+
public ReadOnlyMemoryOwner(ArrayBuffer buffer)
17+
{
18+
_buffer = buffer;
19+
20+
AssertValid();
21+
}
22+
23+
[Conditional("DEBUG")]
24+
private void AssertValid()
25+
{
26+
Debug.Assert(
27+
_buffer.ActiveLength > 0 || _buffer.AvailableLength == 0,
28+
"If the buffer is empty, then it should have been returned to the pool.");
29+
}
30+
31+
public int Length
32+
{
33+
get
34+
{
35+
AssertValid();
36+
return _buffer.ActiveLength;
37+
}
38+
}
39+
40+
public bool IsEmpty
41+
{
42+
get
43+
{
44+
AssertValid();
45+
return _buffer.ActiveLength == 0;
46+
}
47+
}
48+
49+
public ReadOnlySpan<byte> Span
50+
{
51+
get
52+
{
53+
AssertValid();
54+
return _buffer.ActiveReadOnlySpan;
55+
}
56+
}
57+
58+
Memory<byte> IMemoryOwner<byte>.Memory
59+
{
60+
get
61+
{
62+
AssertValid();
63+
return _buffer.ActiveMemory;
64+
}
65+
}
66+
67+
public void Slice(int start)
68+
{
69+
AssertValid();
70+
71+
_buffer.Discard(start);
72+
73+
if (_buffer.ActiveLength == 0)
74+
{
75+
// Return the rented buffer as soon as it's no longer in use.
76+
_buffer.ClearAndReturnBuffer();
77+
}
78+
}
79+
80+
public void Dispose()
81+
{
82+
AssertValid();
83+
84+
_buffer.ClearAndReturnBuffer();
85+
}
86+
}
87+
}

src/Renci.SshNet/Sftp/ISftpSession.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.Threading;
44
using System.Threading.Tasks;
55

6+
using Renci.SshNet.Common;
67
using Renci.SshNet.Sftp.Responses;
78

89
namespace Renci.SshNet.Sftp
@@ -198,7 +199,7 @@ internal interface ISftpSession : ISubsystemSession
198199
/// its <see cref="Task{Task}.Result"/> contains the data read from the file, or an empty
199200
/// array when the end of the file is reached.
200201
/// </returns>
201-
Task<byte[]> RequestReadAsync(byte[] handle, ulong offset, uint length, CancellationToken cancellationToken);
202+
Task<ReadOnlyMemoryOwner> RequestReadAsync(byte[] handle, ulong offset, uint length, CancellationToken cancellationToken);
202203

203204
/// <summary>
204205
/// Performs a <c>SSH_FXP_READDIR</c> request.
Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
namespace Renci.SshNet.Sftp.Responses
1+
using System;
2+
3+
namespace Renci.SshNet.Sftp.Responses
24
{
35
internal sealed class SftpDataResponse : SftpResponse
46
{
@@ -7,7 +9,7 @@ public override SftpMessageTypes SftpMessageType
79
get { return SftpMessageTypes.Data; }
810
}
911

10-
public byte[] Data { get; set; }
12+
public ArraySegment<byte> Data { get; set; }
1113

1214
public SftpDataResponse(uint protocolVersion)
1315
: base(protocolVersion)
@@ -18,14 +20,14 @@ protected override void LoadData()
1820
{
1921
base.LoadData();
2022

21-
Data = ReadBinary();
23+
Data = ReadBinarySegment();
2224
}
2325

2426
protected override void SaveData()
2527
{
2628
base.SaveData();
2729

28-
WriteBinary(Data, 0, Data.Length);
30+
WriteBinary(Data.Array, Data.Offset, Data.Count);
2931
}
3032
}
3133
}

src/Renci.SshNet/Sftp/SftpFileReader.cs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,7 @@
66
using System.Threading;
77
using System.Threading.Tasks;
88

9-
#if !NET
109
using Renci.SshNet.Common;
11-
#endif
1210

1311
namespace Renci.SshNet.Sftp
1412
{
@@ -58,7 +56,7 @@ public SftpFileReader(byte[] handle, ISftpSession sftpSession, int chunkSize, lo
5856
_cts = new CancellationTokenSource();
5957
}
6058

61-
public async Task<byte[]> ReadAsync(CancellationToken cancellationToken)
59+
public async Task<ReadOnlyMemoryOwner> ReadAsync(CancellationToken cancellationToken)
6260
{
6361
_exception?.Throw();
6462

@@ -172,14 +170,21 @@ public void Dispose()
172170

173171
if (_requests.Count > 0)
174172
{
175-
// Cancel outstanding requests and observe the exception on them
176-
// as an effort to prevent unhandled exceptions.
177-
178173
_cts.Cancel();
179174

180175
foreach (var request in _requests.Values)
181176
{
182-
_ = request.Task.Exception;
177+
// Return rented buffers to the pool, or observe exception on
178+
// the task as an effort to prevent unhandled exceptions.
179+
180+
if (request.Task.IsCompletedSuccessfully)
181+
{
182+
request.Task.GetAwaiter().GetResult().Dispose();
183+
}
184+
else
185+
{
186+
_ = request.Task.Exception;
187+
}
183188
}
184189

185190
_requests.Clear();
@@ -190,7 +195,7 @@ public void Dispose()
190195

191196
private sealed class Request
192197
{
193-
public Request(ulong offset, uint count, Task<byte[]> task)
198+
public Request(ulong offset, uint count, Task<ReadOnlyMemoryOwner> task)
194199
{
195200
Offset = offset;
196201
Count = count;
@@ -199,7 +204,7 @@ public Request(ulong offset, uint count, Task<byte[]> task)
199204

200205
public ulong Offset { get; }
201206
public uint Count { get; }
202-
public Task<byte[]> Task { get; }
207+
public Task<ReadOnlyMemoryOwner> Task { get; }
203208
}
204209
}
205210
}

src/Renci.SshNet/Sftp/SftpFileStream.cs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public sealed partial class SftpFileStream : Stream
2626
private readonly int _readBufferSize;
2727

2828
private SftpFileReader? _sftpFileReader;
29-
private ReadOnlyMemory<byte> _readBuffer;
29+
private ReadOnlyMemoryOwner _readBuffer;
3030
private System.Net.ArrayBuffer _writeBuffer;
3131

3232
private long _position;
@@ -153,6 +153,7 @@ private SftpFileStream(
153153
_readBufferSize = readBufferSize;
154154
_position = position;
155155
_writeBuffer = new System.Net.ArrayBuffer(writeBufferSize);
156+
_readBuffer = new ReadOnlyMemoryOwner(new System.Net.ArrayBuffer(0, usePool: true));
156157
_sftpFileReader = initialReader;
157158
}
158159

@@ -390,7 +391,7 @@ await _session.RequestWriteAsync(
390391

391392
private void InvalidateReads()
392393
{
393-
_readBuffer = ReadOnlyMemory<byte>.Empty;
394+
_readBuffer.Dispose();
394395
_sftpFileReader?.Dispose();
395396
_sftpFileReader = null;
396397
}
@@ -441,7 +442,7 @@ private int Read(Span<byte> buffer)
441442
var bytesRead = Math.Min(buffer.Length, _readBuffer.Length);
442443

443444
_readBuffer.Span.Slice(0, bytesRead).CopyTo(buffer);
444-
_readBuffer = _readBuffer.Slice(bytesRead);
445+
_readBuffer.Slice(bytesRead);
445446

446447
_position += bytesRead;
447448

@@ -494,8 +495,8 @@ private async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken ca
494495

495496
var bytesRead = Math.Min(buffer.Length, _readBuffer.Length);
496497

497-
_readBuffer.Slice(0, bytesRead).CopyTo(buffer);
498-
_readBuffer = _readBuffer.Slice(bytesRead);
498+
_readBuffer.Span.Slice(0, bytesRead).CopyTo(buffer.Span);
499+
_readBuffer.Slice(bytesRead);
499500

500501
_position += bytesRead;
501502

@@ -649,7 +650,7 @@ public override long Seek(long offset, SeekOrigin origin)
649650

650651
if (readBufferStart <= newPosition && newPosition <= readBufferEnd)
651652
{
652-
_readBuffer = _readBuffer.Slice((int)(newPosition - readBufferStart));
653+
_readBuffer.Slice((int)(newPosition - readBufferStart));
653654
}
654655
else
655656
{

src/Renci.SshNet/Sftp/SftpSession.cs

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
using System.Collections.Generic;
44
using System.Diagnostics;
55
using System.Globalization;
6+
using System.Net;
67
using System.Text;
78
using System.Threading;
89
using System.Threading.Tasks;
@@ -24,7 +25,7 @@ internal sealed class SftpSession : SubsystemSession, ISftpSession
2425
private readonly Dictionary<uint, SftpRequest> _requests = new Dictionary<uint, SftpRequest>();
2526
private readonly ISftpResponseFactory _sftpResponseFactory;
2627
private readonly Encoding _encoding;
27-
private System.Net.ArrayBuffer _buffer = new(32 * 1024);
28+
private ArrayBuffer _buffer = new(32 * 1024);
2829
private EventWaitHandle _sftpVersionConfirmed = new AutoResetEvent(initialState: false);
2930
private IDictionary<string, string> _supportedExtensions;
3031

@@ -495,7 +496,7 @@ public byte[] RequestRead(byte[] handle, ulong offset, uint length)
495496
length,
496497
response =>
497498
{
498-
data = response.Data;
499+
data = response.Data.ToArray();
499500
wait.SetIgnoringObjectDisposed();
500501
},
501502
response =>
@@ -526,28 +527,42 @@ public byte[] RequestRead(byte[] handle, ulong offset, uint length)
526527
}
527528

528529
/// <inheritdoc/>
529-
public Task<byte[]> RequestReadAsync(byte[] handle, ulong offset, uint length, CancellationToken cancellationToken)
530+
public Task<ReadOnlyMemoryOwner> RequestReadAsync(byte[] handle, ulong offset, uint length, CancellationToken cancellationToken)
530531
{
531532
Debug.Assert(length > 0, "This implementation cannot distinguish between EOF and zero-length reads");
532533

533534
if (cancellationToken.IsCancellationRequested)
534535
{
535-
return Task.FromCanceled<byte[]>(cancellationToken);
536+
return Task.FromCanceled<ReadOnlyMemoryOwner>(cancellationToken);
536537
}
537538

538-
var tcs = new TaskCompletionSource<byte[]>(TaskCreationOptions.RunContinuationsAsynchronously);
539+
var tcs = new TaskCompletionSource<ReadOnlyMemoryOwner>(TaskCreationOptions.RunContinuationsAsynchronously);
539540

540541
SendRequest(new SftpReadRequest(ProtocolVersion,
541542
NextRequestId,
542543
handle,
543544
offset,
544545
length,
545-
response => tcs.TrySetResult(response.Data),
546+
response =>
547+
{
548+
ArrayBuffer buffer = new(response.Data.Count, usePool: true);
549+
550+
response.Data.AsSpan().CopyTo(buffer.AvailableSpan);
551+
552+
buffer.Commit(response.Data.Count);
553+
554+
ReadOnlyMemoryOwner owner = new(buffer);
555+
556+
if (!tcs.TrySetResult(owner))
557+
{
558+
owner.Dispose();
559+
}
560+
},
546561
response =>
547562
{
548563
if (response.StatusCode == StatusCode.Eof)
549564
{
550-
_ = tcs.TrySetResult(Array.Empty<byte>());
565+
_ = tcs.TrySetResult(new(new(0, usePool: true)));
551566
}
552567
else
553568
{

test/Renci.SshNet.Tests/Classes/Sftp/Responses/SftpDataResponseTest.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public void Constructor()
3232
{
3333
var target = new SftpDataResponse(_protocolVersion);
3434

35-
Assert.IsNull(target.Data);
35+
Assert.AreEqual(default, target.Data);
3636
Assert.AreEqual(_protocolVersion, target.ProtocolVersion);
3737
Assert.AreEqual((uint)0, target.ResponseId);
3838
Assert.AreEqual(SftpMessageTypes.Data, target.SftpMessageType);
@@ -52,7 +52,6 @@ public void Load()
5252

5353
target.Load(sshData);
5454

55-
Assert.IsNotNull(target.Data);
5655
Assert.IsTrue(target.Data.SequenceEqual(_data));
5756
Assert.AreEqual(_protocolVersion, target.ProtocolVersion);
5857
Assert.AreEqual(_responseId, target.ResponseId);

test/Renci.SshNet.Tests/Classes/Sftp/SftpDataResponseBuilder.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
using Renci.SshNet.Sftp.Responses;
1+
using System;
2+
3+
using Renci.SshNet.Sftp.Responses;
24

35
namespace Renci.SshNet.Tests.Classes.Sftp
46
{
@@ -31,7 +33,7 @@ public SftpDataResponse Build()
3133
return new SftpDataResponse(_protocolVersion)
3234
{
3335
ResponseId = _responseId,
34-
Data = _data
36+
Data = new ArraySegment<byte>(_data)
3537
};
3638
}
3739
}

0 commit comments

Comments
 (0)