Skip to content

Commit 044389f

Browse files
committed
Introduced FixedLengthStream class
1 parent d020d52 commit 044389f

File tree

4 files changed

+317
-1
lines changed

4 files changed

+317
-1
lines changed

src/DotNext.IO/ExceptionMessages.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,6 @@ internal static string DirectoryNotFound(string path)
2727
internal static string ReadBufferNotEmpty => (string)Resources.Get();
2828

2929
internal static string WriteBufferNotEmpty => (string)Resources.Get();
30+
31+
internal static string StreamOverflow => (string)Resources.Get();
3032
}

src/DotNext.IO/ExceptionMessages.restext

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ DirectoryNotFound=Directory {0} doesn't exist
55
WriterInReadMode=The writer is in read-only mode. Dispose active memory manager obtained from writer
66
FileHandleClosed=The file handle is closed
77
ReadBufferNotEmpty=The internal buffer has unconsumed data to read
8-
WriteBufferNotEmpty=The internal buffer has data to flush
8+
WriteBufferNotEmpty=The internal buffer has data to flush
9+
StreamOverflow=The internal buffer is full
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
using System.Diagnostics;
2+
3+
namespace DotNext.IO;
4+
5+
/// <summary>
6+
/// Represents a stream wrapper over the memory block.
7+
/// </summary>
8+
/// <param name="data">The mutable memory block.</param>
9+
public sealed class FixedLengthStream(Memory<byte> data) : ModernStream
10+
{
11+
private int position, length = data.Length;
12+
13+
/// <summary>
14+
/// Gets the consumed part of the data.
15+
/// </summary>
16+
public Span<byte> ConsumedSpan => data.Span[..position];
17+
18+
/// <summary>
19+
/// Gets the remaining part of the data.
20+
/// </summary>
21+
public Span<byte> RemainingSpan => data.Span[position..length];
22+
23+
/// <summary>
24+
/// Gets or sets a value indicating that <see cref="Write"/> and <see cref="WriteAsync"/> must throw
25+
/// <see cref="IOException"/> if the caller is trying to write past to the end of the underlying buffer.
26+
/// </summary>
27+
/// <remarks>
28+
/// The default value is <see langword="false"/>.
29+
/// </remarks>
30+
public bool SkipOnOverflow { get; init; }
31+
32+
/// <inheritdoc/>
33+
public override void Flush()
34+
{
35+
}
36+
37+
/// <inheritdoc/>
38+
public override Task FlushAsync(CancellationToken token)
39+
=> token.IsCancellationRequested ? Task.FromCanceled(token) : Task.CompletedTask;
40+
41+
[Conditional("DEBUG")]
42+
private void AssertState()
43+
{
44+
Debug.Assert(position <= length);
45+
Debug.Assert(length <= data.Length);
46+
}
47+
48+
/// <inheritdoc/>
49+
public override int Read(Span<byte> buffer)
50+
{
51+
AssertState();
52+
53+
RemainingSpan.CopyTo(buffer, out var count);
54+
position += count;
55+
return count;
56+
}
57+
58+
/// <inheritdoc/>
59+
public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken token = default)
60+
{
61+
ValueTask<int> task;
62+
if (token.IsCancellationRequested)
63+
{
64+
task = ValueTask.FromCanceled<int>(token);
65+
}
66+
else
67+
{
68+
try
69+
{
70+
task = new(Read(buffer.Span));
71+
}
72+
catch (Exception e)
73+
{
74+
task = ValueTask.FromException<int>(e);
75+
}
76+
}
77+
78+
return task;
79+
}
80+
81+
/// <inheritdoc/>
82+
public override long Seek(long offset, SeekOrigin origin)
83+
{
84+
AssertState();
85+
86+
var newPosition = origin switch
87+
{
88+
SeekOrigin.Begin => offset,
89+
SeekOrigin.Current => Position + offset,
90+
SeekOrigin.End => Length + offset,
91+
_ => throw new ArgumentOutOfRangeException(nameof(origin))
92+
};
93+
94+
if (newPosition < 0L)
95+
throw new IOException();
96+
97+
ArgumentOutOfRangeException.ThrowIfGreaterThan(newPosition, Length, nameof(offset));
98+
99+
position = (int)newPosition;
100+
return newPosition;
101+
}
102+
103+
private void SetLength(int newLength)
104+
{
105+
if (newLength > data.Length)
106+
throw new IOException(ExceptionMessages.StreamOverflow);
107+
108+
position = Math.Min(position, length = newLength);
109+
}
110+
111+
/// <inheritdoc/>
112+
public override void SetLength(long value)
113+
{
114+
ArgumentOutOfRangeException.ThrowIfGreaterThan((ulong)value, (uint)int.MaxValue, nameof(value));
115+
116+
AssertState();
117+
SetLength((int)value);
118+
}
119+
120+
/// <inheritdoc/>
121+
public override void Write(ReadOnlySpan<byte> buffer)
122+
{
123+
AssertState();
124+
125+
var remaining = RemainingSpan;
126+
if (remaining.Length >= buffer.Length)
127+
{
128+
// nothing to do
129+
}
130+
else if (SkipOnOverflow)
131+
{
132+
buffer = buffer.Slice(0, remaining.Length);
133+
}
134+
else
135+
{
136+
throw new IOException(ExceptionMessages.StreamOverflow);
137+
}
138+
139+
buffer.CopyTo(remaining);
140+
position += buffer.Length;
141+
}
142+
143+
/// <inheritdoc/>
144+
public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken token = default)
145+
{
146+
ValueTask task;
147+
if (token.IsCancellationRequested)
148+
{
149+
task = ValueTask.FromCanceled(token);
150+
}
151+
else
152+
{
153+
task = ValueTask.CompletedTask;
154+
try
155+
{
156+
Write(buffer.Span);
157+
}
158+
catch (Exception e)
159+
{
160+
task = ValueTask.FromException(e);
161+
}
162+
}
163+
164+
return task;
165+
}
166+
167+
/// <inheritdoc/>
168+
public override bool CanRead => true;
169+
170+
/// <inheritdoc/>
171+
public override bool CanSeek => true;
172+
173+
/// <inheritdoc/>
174+
public override bool CanWrite => true;
175+
176+
/// <inheritdoc/>
177+
public override long Length => length;
178+
179+
/// <inheritdoc/>
180+
public override long Position
181+
{
182+
get => position;
183+
set
184+
{
185+
ArgumentOutOfRangeException.ThrowIfGreaterThan((ulong)value, (uint)length, nameof(value));
186+
187+
position = (int)value;
188+
AssertState();
189+
}
190+
}
191+
}
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
using System.Text;
2+
3+
namespace DotNext.IO;
4+
5+
public sealed class FixedLengthStreamTests : Test
6+
{
7+
private static async Task ReadWriteStringUsingEncodingAsync(string value, Encoding encoding, LengthFormat lengthEnc)
8+
{
9+
Memory<byte> buffer = new byte[16];
10+
await using var ms = new FixedLengthStream(new byte[1024]);
11+
await ms.EncodeAsync(value.AsMemory(), encoding, lengthEnc, buffer);
12+
ms.Position = 0;
13+
using var result = await ms.DecodeAsync(encoding, lengthEnc, buffer);
14+
Equal(value, result.ToString());
15+
}
16+
17+
[Theory]
18+
[InlineData(LengthFormat.Compressed)]
19+
[InlineData(LengthFormat.LittleEndian)]
20+
[InlineData(LengthFormat.BigEndian)]
21+
public static async Task ReadWriteStringAsync(LengthFormat lengthEnc)
22+
{
23+
const string testString1 = "Hello, world!&*(@&*(fghjwgfwffgw";
24+
await ReadWriteStringUsingEncodingAsync(testString1, Encoding.UTF8, lengthEnc);
25+
await ReadWriteStringUsingEncodingAsync(testString1, Encoding.Unicode, lengthEnc);
26+
await ReadWriteStringUsingEncodingAsync(testString1, Encoding.UTF32, lengthEnc);
27+
await ReadWriteStringUsingEncodingAsync(testString1, Encoding.ASCII, lengthEnc);
28+
const string testString2 = "������, ���!";
29+
await ReadWriteStringUsingEncodingAsync(testString2, Encoding.UTF8, lengthEnc);
30+
await ReadWriteStringUsingEncodingAsync(testString2, Encoding.Unicode, lengthEnc);
31+
await ReadWriteStringUsingEncodingAsync(testString2, Encoding.UTF32, lengthEnc);
32+
}
33+
34+
[Theory]
35+
[InlineData(false)]
36+
[InlineData(true)]
37+
public static void Overflow(bool skipOnOverflow)
38+
{
39+
using var stream = new FixedLengthStream(new byte[128]) { SkipOnOverflow = skipOnOverflow };
40+
ReadOnlyMemory<byte> dataToWrite = RandomBytes(256);
41+
42+
if (skipOnOverflow)
43+
{
44+
stream.Write(dataToWrite.Span);
45+
Equal(stream.ConsumedSpan, dataToWrite.Span.Slice(0, (int)stream.Length));
46+
True(stream.RemainingSpan.IsEmpty);
47+
}
48+
else
49+
{
50+
Throws<IOException>(() => stream.Write(dataToWrite.Span));
51+
}
52+
}
53+
54+
[Fact]
55+
public static void ReadWrite()
56+
{
57+
const int bufferSize = 128;
58+
Memory<byte> buffer = new byte[bufferSize];
59+
using var stream = new FixedLengthStream(buffer);
60+
True(stream.ConsumedSpan.IsEmpty);
61+
False(stream.RemainingSpan.IsEmpty);
62+
63+
ReadOnlySpan<byte> dataToWrite = RandomBytes(bufferSize);
64+
stream.Write(dataToWrite);
65+
True(stream.RemainingSpan.IsEmpty);
66+
67+
Equal(dataToWrite, stream.ConsumedSpan);
68+
69+
Span<byte> readBuffer = new byte[buffer.Length];
70+
stream.Position = 0L;
71+
Equal(buffer.Length, stream.Read(readBuffer));
72+
Equal(dataToWrite, readBuffer);
73+
}
74+
75+
[Fact]
76+
public static async Task ReadWriteAsync()
77+
{
78+
const int bufferSize = 128;
79+
Memory<byte> buffer = new byte[bufferSize];
80+
await using var stream = new FixedLengthStream(buffer);
81+
True(stream.ConsumedSpan.IsEmpty);
82+
False(stream.RemainingSpan.IsEmpty);
83+
84+
ReadOnlyMemory<byte> dataToWrite = RandomBytes(bufferSize);
85+
await stream.WriteAsync(dataToWrite);
86+
True(stream.RemainingSpan.IsEmpty);
87+
88+
Equal(dataToWrite.Span, stream.ConsumedSpan);
89+
90+
Memory<byte> readBuffer = new byte[buffer.Length];
91+
stream.Position = 0L;
92+
Equal(buffer.Length, await stream.ReadAsync(readBuffer));
93+
Equal(dataToWrite, readBuffer);
94+
}
95+
96+
[Fact]
97+
public static void Truncate()
98+
{
99+
const int bufferSize = 128;
100+
Memory<byte> buffer = new byte[bufferSize];
101+
using var stream = new FixedLengthStream(buffer);
102+
103+
ReadOnlySpan<byte> dataToWrite = RandomBytes(bufferSize);
104+
stream.Write(dataToWrite);
105+
106+
stream.SetLength(bufferSize / 2);
107+
Equal(bufferSize / 2, stream.Length);
108+
Equal(bufferSize / 2, stream.Position);
109+
Equal(dataToWrite.Slice(0, bufferSize / 2), stream.ConsumedSpan);
110+
}
111+
112+
[Fact]
113+
public static void SetInvalidLength()
114+
{
115+
const int bufferSize = 128;
116+
Memory<byte> buffer = new byte[bufferSize];
117+
using var stream = new FixedLengthStream(buffer);
118+
119+
Throws<ArgumentOutOfRangeException>(() => stream.SetLength(-1L));
120+
Throws<IOException>(() => stream.SetLength(bufferSize + 1));
121+
}
122+
}

0 commit comments

Comments
 (0)