Skip to content

Commit f59f5d9

Browse files
authored
Merge pull request #10 from xledger/net48-support
Implements Stream.ToOwnedMemory for .NET Framework.
2 parents 920f063 + d6d41f0 commit f59f5d9

File tree

3 files changed

+132
-38
lines changed

3 files changed

+132
-38
lines changed

Xledger.Collections.Test/TestMemoryOwner.cs

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,22 @@ public void TestSlice() {
2323
Assert.Equal(array.AsMemory().Slice(3).Slice(1, 4), sliced2.Memory);
2424
}
2525

26-
#if NET
2726
[Fact]
2827
public void TestStream_ToMemoryOwner() {
2928
// This length should be larger than the default GetCopyBufferSize.
3029
byte[] array = new byte[4 * 1024 * 1024 + 3];
3130
new Random().NextBytes(array);
3231
var ms = new MemoryStream(array);
3332
using var memoryOwner = ms.ToOwnedMemory();
33+
#if NET
3434
Assert.Equal(array.AsMemory(), memoryOwner.Memory);
35+
#else
36+
var mem = memoryOwner.Memory;
37+
Assert.Equal(array.Length, mem.Length);
38+
for (int i = 0; i < array.Length; ++i) {
39+
Assert.Equal(array[i], mem.Span[i]);
40+
}
41+
#endif
3542
}
3643

3744
[Fact]
@@ -41,7 +48,67 @@ public async Task TestStream_ToMemoryOwnerAsync() {
4148
new Random().NextBytes(array);
4249
var ms = new MemoryStream(array);
4350
using var memoryOwner = await ms.ToOwnedMemoryAsync();
51+
#if NET
4452
Assert.Equal(array.AsMemory(), memoryOwner.Memory);
53+
#else
54+
var mem = memoryOwner.Memory;
55+
Assert.Equal(array.Length, mem.Length);
56+
for (int i = 0; i < array.Length; ++i) {
57+
Assert.Equal(array[i], mem.Span[i]);
58+
}
59+
#endif
4560
}
61+
62+
// Tests that reading from a stream with an unknown size does so correctly.
63+
[Fact]
64+
public void TestUnsizedStream_ToMemoryOwner() {
65+
// This length should be larger than the default GetCopyBufferSize.
66+
byte[] array = new byte[2 * 1024 * 1024 + 17];
67+
new Random().NextBytes(array);
68+
var ms = new UnsizedMemoryStream(array);
69+
using var memoryOwner = ms.ToOwnedMemory();
70+
#if NET
71+
Assert.Equal(array.AsMemory(), memoryOwner.Memory);
72+
#else
73+
var mem = memoryOwner.Memory;
74+
Assert.Equal(array.Length, mem.Length);
75+
for (int i = 0; i < array.Length; ++i) {
76+
Assert.Equal(array[i], mem.Span[i]);
77+
}
4678
#endif
79+
}
80+
81+
// Tests that reading from a stream with an unknown size does so correctly.
82+
[Fact]
83+
public async Task TestUnsizedStream_ToMemoryOwnerAsync() {
84+
// This length should be larger than the default GetCopyBufferSize.
85+
byte[] array = new byte[2 * 1024 * 1024 + 17];
86+
new Random().NextBytes(array);
87+
var ms = new UnsizedMemoryStream(array);
88+
using var memoryOwner = await ms.ToOwnedMemoryAsync();
89+
#if NET
90+
Assert.Equal(array.AsMemory(), memoryOwner.Memory);
91+
#else
92+
var mem = memoryOwner.Memory;
93+
Assert.Equal(array.Length, mem.Length);
94+
for (int i = 0; i < array.Length; ++i) {
95+
Assert.Equal(array[i], mem.Span[i]);
96+
}
97+
#endif
98+
}
99+
100+
class UnsizedMemoryStream(byte[] buffer) : MemoryStream(buffer) {
101+
public override bool CanSeek => false;
102+
103+
public override int Capacity {
104+
get => throw new NotImplementedException();
105+
set => throw new NotImplementedException();
106+
}
107+
108+
public override long Length => throw new NotImplementedException();
109+
110+
public override void SetLength(long value) {
111+
throw new NotImplementedException();
112+
}
113+
}
47114
}

Xledger.Collections/Memory/Extensions.cs

Lines changed: 63 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -24,51 +24,69 @@ public static IMemoryOwner<T> Slice<T>(this IMemoryOwner<T> memoryOwner, int sta
2424
return new SizedMemoryOwner<T>(memoryOwner, start, length);
2525
}
2626

27+
static readonly int ArrayMaxLength =
2728
#if NET
29+
Array.MaxLength;
30+
#else
31+
int.MaxValue;
32+
#endif
33+
34+
static readonly ThreadLocal<byte[]> PROBE = new(() => new byte[1]);
35+
2836
public static IMemoryOwner<byte> ToOwnedMemory(this Stream source, bool leaveOpen = false) {
2937
if (source == null) {
3038
throw new ArgumentNullException(nameof(source));
3139
}
3240

33-
int initialBufLen = GetCopyBufferSize(source);
41+
(bool canHoldEntireStream, int initialBufLen) = GetBufferSize(source);
3442

35-
var currentOwner = MemoryPool<byte>.Shared.Rent(initialBufLen);
36-
var currentBuffer = currentOwner.Memory;
43+
var currentBuffer = ArrayPool<byte>.Shared.Rent(initialBufLen);
44+
var currentOwner = currentBuffer.ToOwnedMemory(ArrayPool<byte>.Shared);
3745
int totalBytesRead = 0;
3846

3947
try {
4048
while (true) {
41-
var dest = currentBuffer.Slice(totalBytesRead);
42-
43-
int bytesRead = source.Read(dest.Span);
49+
int bytesRead = source.Read(
50+
currentBuffer,
51+
totalBytesRead,
52+
currentBuffer.Length - totalBytesRead);
4453

4554
if (bytesRead == 0) {
4655
break;
4756
}
4857

4958
totalBytesRead += bytesRead;
5059

60+
if (canHoldEntireStream && totalBytesRead == initialBufLen) {
61+
// We've read the entire stream.
62+
break;
63+
}
64+
5165
if (totalBytesRead != currentBuffer.Length) {
5266
continue;
5367
}
5468

55-
if (currentBuffer.Length == Array.MaxLength) {
69+
if (currentBuffer.Length == ArrayMaxLength) {
70+
#if NET
5671
Span<byte> probe = stackalloc byte[1];
5772
if (source.Read(probe) > 0) {
58-
throw new IOException($"Stream exceeds the maximum bufferable array size of {Array.MaxLength} bytes.");
73+
#else
74+
if (source.Read(PROBE.Value, 0, 1) > 0) {
75+
#endif
76+
throw new IOException($"Stream exceeds the maximum bufferable array size of {ArrayMaxLength} bytes.");
5977
}
6078
break; // we are at the end of the stream
6179
}
6280

6381
var newCapacity = (long)currentBuffer.Length * 2;
64-
if (newCapacity > Array.MaxLength) {
65-
newCapacity = Array.MaxLength;
82+
if (newCapacity > ArrayMaxLength) {
83+
newCapacity = ArrayMaxLength;
6684
}
6785

68-
var newOwner = MemoryPool<byte>.Shared.Rent((int)newCapacity);
69-
var newBuffer = newOwner.Memory;
86+
var newBuffer = ArrayPool<byte>.Shared.Rent((int)newCapacity);
87+
var newOwner = newBuffer.ToOwnedMemory(ArrayPool<byte>.Shared);
7088

71-
currentBuffer.CopyTo(newBuffer);
89+
currentBuffer.CopyTo(newBuffer.AsSpan());
7290
currentOwner.Dispose();
7391
currentOwner = newOwner;
7492
currentBuffer = newBuffer;
@@ -85,53 +103,58 @@ public static IMemoryOwner<byte> ToOwnedMemory(this Stream source, bool leaveOpe
85103
return currentOwner.Slice(0, totalBytesRead);
86104
}
87105

88-
static readonly byte[] ASYNC_PROBE = new byte[1];
89-
90106
public static async Task<IMemoryOwner<byte>> ToOwnedMemoryAsync(this Stream source, bool leaveOpen = false, CancellationToken tok = default) {
91107
if (source == null) {
92108
throw new ArgumentNullException(nameof(source));
93109
}
94110

95-
int initialBufLen = GetCopyBufferSize(source);
111+
(bool canHoldEntireStream, int initialBufLen) = GetBufferSize(source);
96112

97-
var currentOwner = MemoryPool<byte>.Shared.Rent(initialBufLen);
98-
var currentBuffer = currentOwner.Memory;
113+
var currentBuffer = ArrayPool<byte>.Shared.Rent(initialBufLen);
114+
var currentOwner = currentBuffer.ToOwnedMemory(ArrayPool<byte>.Shared);
99115
int totalBytesRead = 0;
100116

101117
try {
102118
while (true) {
103119
tok.ThrowIfCancellationRequested();
104120

105-
var dest = currentBuffer.Slice(totalBytesRead);
106-
107-
int bytesRead = await source.ReadAsync(dest, tok).ConfigureAwait(false);
121+
int bytesRead = await source.ReadAsync(
122+
currentBuffer,
123+
totalBytesRead,
124+
currentBuffer.Length - totalBytesRead,
125+
tok).ConfigureAwait(false);
108126

109127
if (bytesRead == 0) {
110128
break;
111129
}
112130

113131
totalBytesRead += bytesRead;
114132

133+
if (canHoldEntireStream && totalBytesRead == initialBufLen) {
134+
// We've read the entire stream.
135+
break;
136+
}
137+
115138
if (totalBytesRead != currentBuffer.Length) {
116139
continue;
117140
}
118141

119-
if (currentBuffer.Length == Array.MaxLength) {
120-
if (await source.ReadAsync(ASYNC_PROBE, tok).ConfigureAwait(false) > 0) {
121-
throw new IOException($"Stream exceeds the maximum bufferable array size of {Array.MaxLength} bytes.");
142+
if (currentBuffer.Length == ArrayMaxLength) {
143+
if (await source.ReadAsync(PROBE.Value, 0, 1, tok).ConfigureAwait(false) > 0) {
144+
throw new IOException($"Stream exceeds the maximum bufferable array size of {ArrayMaxLength} bytes.");
122145
}
123146
break; // we are at the end of the stream
124147
}
125148

126149
var newCapacity = (long)currentBuffer.Length * 2;
127-
if (newCapacity > Array.MaxLength) {
128-
newCapacity = Array.MaxLength;
150+
if (newCapacity > ArrayMaxLength) {
151+
newCapacity = ArrayMaxLength;
129152
}
130153

131-
var newOwner = MemoryPool<byte>.Shared.Rent((int)newCapacity);
132-
var newBuffer = newOwner.Memory;
154+
var newBuffer = ArrayPool<byte>.Shared.Rent((int)newCapacity);
155+
var newOwner = newBuffer.ToOwnedMemory(ArrayPool<byte>.Shared);
133156

134-
currentBuffer.CopyTo(newBuffer);
157+
currentBuffer.CopyTo(newBuffer.AsSpan());
135158
currentOwner.Dispose();
136159
currentOwner = newOwner;
137160
currentBuffer = newBuffer;
@@ -148,8 +171,9 @@ public static async Task<IMemoryOwner<byte>> ToOwnedMemoryAsync(this Stream sour
148171
return currentOwner.Slice(0, totalBytesRead);
149172
}
150173

151-
// Copied from System.IO.Stream, adapted to be static
152-
static int GetCopyBufferSize(Stream stream) {
174+
// Initially copied from System.IO.Stream, adapted to be static and to match
175+
// the use above which is to copy an entire stream into a single array.
176+
static (bool isSufficient, int length) GetBufferSize(Stream stream) {
153177
// This value was originally picked to be the largest multiple of 4096 that is still smaller than the large object heap threshold (85K).
154178
// The CopyTo{Async} buffer is short-lived and is likely to be collected at Gen0, and it offers a significant improvement in Copy
155179
// performance. Since then, the base implementations of CopyTo{Async} have been updated to use ArrayPool, which will end up rounding
@@ -158,6 +182,7 @@ static int GetCopyBufferSize(Stream stream) {
158182
// benefits to using the larger buffer size. So, for now, this value remains.
159183
const int DefaultCopyBufferSize = 81920;
160184

185+
bool isSufficient = false;
161186
int bufferSize = DefaultCopyBufferSize;
162187

163188
if (stream.CanSeek) {
@@ -172,16 +197,18 @@ static int GetCopyBufferSize(Stream stream) {
172197
bufferSize = 1;
173198
} else {
174199
long remaining = length - position;
175-
if (remaining > 0) {
176-
// In the case of a positive overflow, stick to the default size
177-
bufferSize = (int)Math.Min(bufferSize, remaining);
200+
if (remaining > ArrayMaxLength) {
201+
throw new IOException($"Stream exceeds the maximum bufferable array size of {ArrayMaxLength} bytes.");
202+
} else if (remaining > 0) {
203+
// If there is some remaining amount in the stream, we copy into a buffer of that size.
204+
isSufficient = true;
205+
bufferSize = (int)remaining;
178206
}
179207
}
180208
}
181209

182-
return bufferSize;
210+
return (isSufficient, bufferSize);
183211
}
184-
#endif
185212

186213
/// <summary>
187214
/// Adapt an array to IMemoryOwner. If you pass in an ArrayPool owner, the Array will be returned to the pool on dispose.

Xledger.Collections/Xledger.Collections.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<AssemblyName>Xledger.Collections</AssemblyName>
77

88
<TargetFrameworks>net48;net8.0</TargetFrameworks>
9-
<LangVersion>12.0</LangVersion>
9+
<LangVersion>13.0</LangVersion>
1010
<ImplicitUsings>disable</ImplicitUsings>
1111
<Nullable>disable</Nullable>
1212

0 commit comments

Comments
 (0)