Skip to content

Commit 05ef96f

Browse files
committed
Fixed #255
1 parent 368ef77 commit 05ef96f

File tree

3 files changed

+124
-69
lines changed

3 files changed

+124
-69
lines changed

src/DotNext.IO/IO/SparseStream.cs

Lines changed: 83 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,40 @@
11
using System.Runtime.CompilerServices;
2+
using System.Runtime.InteropServices;
23

34
namespace DotNext.IO;
45

6+
using Buffers;
7+
58
/// <summary>
69
/// Represents multiple streams as a single stream.
710
/// </summary>
811
/// <remarks>
912
/// The stream is available for read-only operations.
1013
/// </remarks>
11-
internal sealed class SparseStream : Stream, IFlushable
14+
internal abstract class SparseStream : Stream, IFlushable
1215
{
13-
private readonly IEnumerator<Stream> enumerator;
14-
private bool streamAvailable;
15-
16-
/// <summary>
17-
/// Initializes a new sparse stream.
18-
/// </summary>
19-
/// <param name="streams">A collection of readable streams.</param>
20-
public SparseStream(IEnumerable<Stream> streams)
16+
private int runningIndex;
17+
18+
protected abstract ReadOnlySpan<Stream> Streams { get; }
19+
20+
private Stream? Current
2121
{
22-
enumerator = streams.GetEnumerator();
23-
streamAvailable = enumerator.MoveNext();
24-
}
22+
get
23+
{
24+
var streams = Streams;
2525

26-
[MethodImpl(MethodImplOptions.AggressiveInlining)]
27-
private void MoveToNextStream() => streamAvailable = enumerator.MoveNext();
26+
return (uint)runningIndex < (uint)streams.Length ? streams[runningIndex] : null;
27+
}
28+
}
2829

2930
/// <inheritdoc />
30-
public override int ReadByte()
31+
public sealed override int ReadByte()
3132
{
3233
var result = -1;
3334

34-
for (; streamAvailable; MoveToNextStream())
35+
for (; Current is { } current; runningIndex++)
3536
{
36-
result = enumerator.Current.ReadByte();
37+
result = current.ReadByte();
3738

3839
if (result >= 0)
3940
break;
@@ -43,12 +44,12 @@ public override int ReadByte()
4344
}
4445

4546
/// <inheritdoc />
46-
public override int Read(Span<byte> buffer)
47+
public sealed override int Read(Span<byte> buffer)
4748
{
4849
int count;
49-
for (count = 0; streamAvailable; MoveToNextStream())
50+
for (count = 0; Current is { } current; runningIndex++)
5051
{
51-
count = enumerator.Current.Read(buffer);
52+
count = current.Read(buffer);
5253

5354
if (count > 0)
5455
break;
@@ -58,7 +59,7 @@ public override int Read(Span<byte> buffer)
5859
}
5960

6061
/// <inheritdoc />
61-
public override int Read(byte[] buffer, int offset, int count)
62+
public sealed override int Read(byte[] buffer, int offset, int count)
6263
{
6364
ValidateBufferArguments(buffer, offset, count);
6465

@@ -67,12 +68,12 @@ public override int Read(byte[] buffer, int offset, int count)
6768

6869
/// <inheritdoc />
6970
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))]
70-
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken token = default)
71+
public sealed override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken token = default)
7172
{
7273
int count;
73-
for (count = 0; streamAvailable; MoveToNextStream())
74+
for (count = 0; Current is { } current; runningIndex++)
7475
{
75-
count = await enumerator.Current.ReadAsync(buffer, token).ConfigureAwait(false);
76+
count = await current.ReadAsync(buffer, token).ConfigureAwait(false);
7677

7778
if (count > 0)
7879
break;
@@ -82,95 +83,118 @@ public override async ValueTask<int> ReadAsync(Memory<byte> buffer, Cancellation
8283
}
8384

8485
/// <inheritdoc />
85-
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken token)
86+
public sealed override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken token)
8687
=> ReadAsync(buffer.AsMemory(offset, count), token).AsTask();
8788

89+
public sealed override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
90+
=> TaskToAsyncResult.Begin(ReadAsync(buffer, offset, count), callback, state);
91+
92+
public sealed override int EndRead(IAsyncResult asyncResult)
93+
=> TaskToAsyncResult.End<int>(asyncResult);
94+
8895
/// <inheritdoc />
89-
public override void CopyTo(Stream destination, int bufferSize)
96+
public sealed override void CopyTo(Stream destination, int bufferSize)
9097
{
9198
ValidateCopyToArguments(destination, bufferSize);
9299

93-
for (; streamAvailable; MoveToNextStream())
94-
enumerator.Current.CopyTo(destination, bufferSize);
100+
for (; Current is { } current; runningIndex++)
101+
current.CopyTo(destination, bufferSize);
95102
}
96103

97104
/// <inheritdoc />
98-
public override async Task CopyToAsync(Stream destination, int bufferSize, CancellationToken token)
105+
public sealed override async Task CopyToAsync(Stream destination, int bufferSize, CancellationToken token)
99106
{
100107
ValidateCopyToArguments(destination, bufferSize);
101108

102-
for (; streamAvailable; MoveToNextStream())
103-
await enumerator.Current.CopyToAsync(destination, bufferSize, token).ConfigureAwait(false);
109+
for (; Current is { } current; runningIndex++)
110+
await current.CopyToAsync(destination, bufferSize, token).ConfigureAwait(false);
104111
}
105112

106113
/// <inheritdoc />
107-
public override bool CanRead => true;
114+
public sealed override bool CanRead => true;
108115

109116
/// <inheritdoc />
110-
public override bool CanWrite => false;
117+
public sealed override bool CanWrite => false;
111118

112119
/// <inheritdoc />
113-
public override bool CanSeek => false;
120+
public sealed override bool CanSeek => false;
114121

115122
/// <inheritdoc />
116-
public override long Position
123+
public sealed override long Position
117124
{
118125
get => throw new NotSupportedException();
119126
set => throw new NotSupportedException();
120127
}
121128

122129
/// <inheritdoc cref="Stream.Flush"/>
123-
public override void Flush()
124-
{
125-
if (streamAvailable)
126-
enumerator.Current.Flush();
127-
}
130+
public sealed override void Flush() => Current?.Flush();
128131

129132
/// <inheritdoc cref="Stream.FlushAsync(CancellationToken)"/>
130-
public override Task FlushAsync(CancellationToken token)
131-
=> streamAvailable ? enumerator.Current.FlushAsync(token) : Task.CompletedTask;
133+
public sealed override Task FlushAsync(CancellationToken token)
134+
=> Current?.FlushAsync(token) ?? Task.CompletedTask;
132135

133136
/// <inheritdoc />
134-
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
137+
public sealed override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
135138

136139
/// <inheritdoc />
137-
public override long Length => throw new NotSupportedException();
140+
public sealed override long Length
141+
{
142+
get
143+
{
144+
var length = 0L;
145+
146+
foreach (var stream in Streams)
147+
{
148+
length += stream.Length;
149+
}
150+
151+
return length;
152+
}
153+
}
138154

139155
/// <inheritdoc />
140-
public override void SetLength(long value) => throw new NotSupportedException();
156+
public sealed override void SetLength(long value) => throw new NotSupportedException();
141157

142158
/// <inheritdoc/>
143-
public override void WriteByte(byte value) => throw new NotSupportedException();
159+
public sealed override void WriteByte(byte value) => throw new NotSupportedException();
144160

145161
/// <inheritdoc/>
146-
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
162+
public sealed override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
147163

148164
/// <inheritdoc/>
149-
public override void Write(ReadOnlySpan<byte> buffer) => throw new NotSupportedException();
165+
public sealed override void Write(ReadOnlySpan<byte> buffer) => throw new NotSupportedException();
150166

151167
/// <inheritdoc/>
152-
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken token) => Task.FromException(new NotSupportedException());
168+
public sealed override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken token) => Task.FromException(new NotSupportedException());
153169

154170
/// <inheritdoc/>
155-
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
171+
public sealed override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
156172
=> ValueTask.FromException(new NotSupportedException());
157173

158174
/// <inheritdoc/>
159-
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
175+
public sealed override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
160176
=> throw new NotSupportedException();
161177

162178
/// <inheritdoc/>
163-
public override void EndWrite(IAsyncResult asyncResult) => throw new InvalidOperationException();
179+
public sealed override void EndWrite(IAsyncResult asyncResult) => throw new InvalidOperationException();
180+
}
181+
182+
internal sealed class SparseStream<T>(T streams) : SparseStream
183+
where T : struct, ITuple
184+
{
185+
protected override ReadOnlySpan<Stream> Streams
186+
=> MemoryMarshal.CreateReadOnlySpan(in Unsafe.As<T, Stream>(ref Unsafe.AsRef(in streams)), streams.Length);
187+
}
188+
189+
internal sealed class UnboundedSparseStream(ReadOnlySpan<Stream> streams) : SparseStream
190+
{
191+
private MemoryOwner<Stream> streams = streams.Copy();
192+
193+
protected override ReadOnlySpan<Stream> Streams => streams.Span;
164194

165-
/// <inheritdoc />
166195
protected override void Dispose(bool disposing)
167196
{
168-
if (disposing)
169-
{
170-
enumerator.Dispose();
171-
}
172-
173-
streamAvailable = false;
197+
streams.Dispose();
174198
base.Dispose(disposing);
175199
}
176200
}

src/DotNext.IO/IO/StreamExtensions.cs

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33

44
namespace DotNext.IO;
55

6+
using Collections.Generic;
7+
68
/// <summary>
79
/// Represents high-level read/write methods for the stream.
810
/// </summary>
@@ -24,8 +26,41 @@ internal static void ThrowIfEmpty<T>(in Memory<T> buffer, [CallerArgumentExpress
2426
/// <param name="stream">The stream to combine.</param>
2527
/// <param name="others">A collection of streams.</param>
2628
/// <returns>An object that represents multiple streams as one logical stream.</returns>
27-
public static Stream Combine(this Stream stream, ReadOnlySpan<Stream> others)
28-
=> others is { Length: > 0 } ? new SparseStream([stream, .. others]) : stream;
29+
public static Stream Combine(this Stream stream, ReadOnlySpan<Stream> others) // TODO: Use params in future
30+
=> others switch
31+
{
32+
[] => stream,
33+
[var s] => new SparseStream<(Stream, Stream)>((stream, s)),
34+
[var s1, var s2] => new SparseStream<(Stream, Stream, Stream)>((stream, s1, s2)),
35+
[var s1, var s2, var s3] => new SparseStream<(Stream, Stream, Stream, Stream)>((stream, s1, s2, s3)),
36+
[var s1, var s2, var s3, var s4] => new SparseStream<(Stream, Stream, Stream, Stream, Stream)>((stream, s1, s2, s3, s4)),
37+
[var s1, var s2, var s3, var s4, var s5] => new SparseStream<(Stream, Stream, Stream, Stream, Stream, Stream)>((stream, s1, s2, s3, s4,
38+
s5)),
39+
_ => new UnboundedSparseStream(others.ToArray()),
40+
};
41+
42+
/// <summary>
43+
/// Combines multiple readable streams.
44+
/// </summary>
45+
/// <param name="streams">A collection of streams.</param>
46+
/// <returns>An object that represents multiple streams as one logical stream.</returns>
47+
/// <exception cref="ArgumentException"><paramref name="streams"/> is empty.</exception>
48+
public static Stream Combine(this ReadOnlySpan<Stream> streams)
49+
=> streams is [var first, .. var rest]
50+
? Combine(first, rest)
51+
: throw new ArgumentException(ExceptionMessages.BufferTooSmall, nameof(streams));
52+
53+
/// <summary>
54+
/// Combines multiple readable streams.
55+
/// </summary>
56+
/// <param name="streams">A collection of streams.</param>
57+
/// <returns>An object that represents multiple streams as one logical stream.</returns>
58+
/// <exception cref="ArgumentException"><paramref name="streams"/> is empty.</exception>
59+
public static Stream Combine(this IEnumerable<Stream> streams)
60+
{
61+
using var buffer = streams.Copy();
62+
return Combine(buffer.Span);
63+
}
2964

3065
/// <summary>
3166
/// Creates a stream for the specified file handle.

src/DotNext.Tests/IO/StreamExtensionsTests.cs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
11
using System.Buffers;
2-
using System.Globalization;
32
using System.Text;
4-
using static System.Globalization.CultureInfo;
5-
using DateTimeStyles = System.Globalization.DateTimeStyles;
63

74
namespace DotNext.IO;
85

9-
using DotNext.Buffers.Binary;
10-
using Net.Cluster;
11-
using Text;
6+
using Buffers.Binary;
7+
using Collections.Generic;
128

139
public sealed class StreamExtensionsTests : Test
1410
{
@@ -156,7 +152,7 @@ public static async Task CombineStreamsAsync()
156152
{
157153
await using var ms1 = new MemoryStream([1, 2, 3]);
158154
await using var ms2 = new MemoryStream([4, 5, 6]);
159-
await using var combined = ms1.Combine([ms2]);
155+
await using var combined = StreamExtensions.Combine([ms1, ms2]);
160156

161157
var buffer = new byte[6];
162158
await combined.ReadExactlyAsync(buffer);
@@ -169,7 +165,7 @@ public static void CopyCombinedStreams()
169165
{
170166
using var ms1 = new MemoryStream([1, 2, 3]);
171167
using var ms2 = new MemoryStream([4, 5, 6]);
172-
using var combined = ms1.Combine([ms2]);
168+
using var combined = List.Singleton(ms1).Append(ms2).Combine();
173169
using var result = new MemoryStream();
174170

175171
combined.CopyTo(result, 128);

0 commit comments

Comments
 (0)