Skip to content

Commit e5f6716

Browse files
authored
Merge pull request #57 from rameel/fix-s3uploadstream-flush
Make Flash/FlashAsync no-op
2 parents 6ac1163 + fabaa0a commit e5f6716

File tree

2 files changed

+103
-18
lines changed

2 files changed

+103
-18
lines changed

src/Ramstack.FileSystem.Amazon/S3UploadStream.cs

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,11 @@ namespace Ramstack.FileSystem.Amazon;
1010

1111
/// <summary>
1212
/// Represents a stream for uploading data to Amazon S3 using multipart upload.
13-
/// This stream accumulates data in a temporary buffer and uploads it to S3 in parts
14-
/// once the buffer reaches a predefined size.
1513
/// </summary>
1614
internal sealed class S3UploadStream : Stream
1715
{
18-
private const long PartSize = 5 * 1024 * 1024;
16+
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
17+
private const long MinPartSize = 5L * 1024 * 1024;
1918

2019
private readonly IAmazonS3 _client;
2120
private readonly string _bucketName;
@@ -81,7 +80,7 @@ public S3UploadStream(IAmazonS3 client, string bucketName, string key, string up
8180
FileShare.None,
8281
bufferSize: 4096,
8382
FileOptions.DeleteOnClose
84-
| FileOptions.Asynchronous);
83+
| FileOptions.Asynchronous);
8584
}
8685

8786
/// <inheritdoc />
@@ -102,7 +101,7 @@ public override void Write(ReadOnlySpan<byte> buffer)
102101
{
103102
_stream.Write(buffer);
104103

105-
if (_stream.Length >= PartSize)
104+
if (_stream.Length >= MinPartSize)
106105
UploadPart();
107106
}
108107
catch (Exception exception)
@@ -122,7 +121,7 @@ public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, Cancella
122121
try
123122
{
124123
await _stream.WriteAsync(buffer, cancellationToken).ConfigureAwait(false);
125-
if (_stream.Length >= PartSize)
124+
if (_stream.Length >= MinPartSize)
126125
await UploadPartAsync(cancellationToken).ConfigureAwait(false);
127126
}
128127
catch (Exception exception)
@@ -146,16 +145,11 @@ public override void SetLength(long value) =>
146145
/// <inheritdoc />
147146
public override void Flush()
148147
{
149-
_stream.Flush();
150-
UploadPart();
151148
}
152149

153150
/// <inheritdoc />
154-
public override async Task FlushAsync(CancellationToken cancellationToken)
155-
{
156-
await _stream.FlushAsync(cancellationToken).ConfigureAwait(false);
157-
await UploadPartAsync(cancellationToken).ConfigureAwait(false);
158-
}
151+
public override Task FlushAsync(CancellationToken cancellationToken) =>
152+
Task.CompletedTask;
159153

160154
/// <inheritdoc />
161155
protected override void Dispose(bool disposing)
@@ -233,7 +227,13 @@ private async ValueTask UploadPartAsync(CancellationToken cancellationToken)
233227
_stream.Position = 0;
234228

235229
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
236-
// The maximum allowed part size is 5 gigabytes.
230+
// The maximum allowed part size is 5 GiB.
231+
// -----------------------------------------------------------------------------------
232+
// We don't need to worry about S3's 5 GiB part limit because:
233+
// 1. All Write/WriteAsync methods are inherently limited by Array.MaxLength (~2 GiB).
234+
// 2. The upload starts as soon as the buffer reaches MinPartSize (5 MiB).
235+
// Even if a single write matches Array.MaxLength, the data is
236+
// uploaded immediately, staying within AWS limits.
237237

238238
var request = new UploadPartRequest
239239
{

tests/Ramstack.FileSystem.Amazon.Tests/WritableAmazonFileSystemTests.cs

Lines changed: 89 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,8 @@ public async Task File_OpenWrite_InternalBufferWriteError_DoesNotCreateFile()
6060
var underlying = (FileStream)stream.GetType().GetField("_stream", BindingFlags.NonPublic | BindingFlags.Instance)!.GetValue(stream)!;
6161
Assert.That(underlying, Is.Not.Null);
6262

63-
await stream.WriteAsync(new ReadOnlyMemory<byte>(new byte[1024]));
64-
65-
// Forces to upload buffer.
66-
await stream.FlushAsync();
63+
// Write enough data to trigger automatic part upload (>= 5 MiB).
64+
await stream.WriteAsync(new ReadOnlyMemory<byte>(new byte[6 * 1024 * 1024]));
6765

6866
// Simulates an internal buffer write error.
6967
await underlying.DisposeAsync();
@@ -195,6 +193,93 @@ await reader.ReadToEndAsync(),
195193
await destination.DeleteAsync();
196194
}
197195

196+
[Test]
197+
public async Task File_OpenWrite_FlushDoesNotCauseUndersizedParts()
198+
{
199+
using var fs = GetFileSystem();
200+
201+
const string Content = "Hello, World!";
202+
203+
{
204+
await using var stream = await fs.OpenWriteAsync("/flush-test.txt");
205+
await using var writer = new StreamWriter(stream);
206+
207+
// Write small data and flush multiple times.
208+
// Flush should be a no-op and not upload undersized parts.
209+
foreach (var ch in Content)
210+
{
211+
await writer.WriteAsync(ch);
212+
await writer.FlushAsync();
213+
}
214+
}
215+
{
216+
// ReSharper disable once UseAwaitUsing
217+
using var stream = await fs.OpenReadAsync("/flush-test.txt");
218+
using var reader = new StreamReader(stream);
219+
220+
Assert.That(await reader.ReadToEndAsync(), Is.EqualTo(Content));
221+
}
222+
223+
await fs.DeleteFileAsync("/flush-test.txt");
224+
}
225+
226+
[Test]
227+
public async Task File_OpenWrite_FlushWithMultipartUpload()
228+
{
229+
using var fs = GetFileSystem();
230+
231+
const int Count = 5;
232+
const string FileName = "/flush-multipart-test.bin";
233+
234+
var chunk = new byte[3 * 1024 * 1024];
235+
Random.Shared.NextBytes(chunk);
236+
237+
{
238+
await using var stream = await fs.OpenWriteAsync(FileName);
239+
for (var i = 0; i < Count; i++)
240+
await stream.WriteAsync(chunk);
241+
}
242+
243+
{
244+
var file = fs.GetFile(FileName);
245+
246+
Assert.That(await file.ExistsAsync(), Is.True);
247+
Assert.That(await file.GetLengthAsync(), Is.EqualTo(chunk.Length * Count));
248+
249+
// ReSharper disable once UseAwaitUsing
250+
using var stream = await file.OpenReadAsync();
251+
252+
var bytes = new byte[chunk.Length];
253+
254+
for (var i = 0; i < Count; i++)
255+
{
256+
var n = await ReadBlockAsync(stream, bytes);
257+
Assert.That(n, Is.EqualTo(bytes.Length));
258+
259+
Assert.That(
260+
bytes.AsSpan().SequenceEqual(chunk),
261+
Is.True);
262+
}
263+
}
264+
265+
await fs.DeleteFileAsync(FileName);
266+
267+
static async Task<int> ReadBlockAsync(Stream stream, Memory<byte> memory)
268+
{
269+
var count = memory.Length;
270+
271+
while (!memory.IsEmpty)
272+
{
273+
var n = await stream.ReadAsync(memory);
274+
if (n == 0)
275+
return 0;
276+
277+
memory = memory[n..];
278+
}
279+
280+
return count;
281+
}
282+
}
198283

199284
[Test]
200285
public async Task Directory_BatchDeleting()

0 commit comments

Comments
 (0)