Skip to content

Commit bacdd9c

Browse files
Menelionclaude
andauthored
feat: Implement bandwidth throttling (MaxBytesPerSecond) (#37)
Add built-in bandwidth throttling support for sync operations: - Add `MaxBytesPerSecond` property to `SyncOptions` for configuring maximum transfer rate (null/0 = unlimited) - Create `ThrottledStream` class that wraps streams and limits read/write speeds using a time-based throttling algorithm - Update `SyncEngine` to wrap file streams with `ThrottledStream` when bandwidth limiting is enabled - Add comprehensive unit tests for `ThrottledStream` - Add unit tests for `MaxBytesPerSecond` in `SyncOptionsTests` - Update CLAUDE.md to mark this v1.0 feature as completed This feature is useful for desktop clients like Nimbus to prevent network saturation on shared connections. Co-authored-by: Claude <noreply@anthropic.com>
1 parent 5198d3b commit bacdd9c

File tree

6 files changed

+590
-6
lines changed

6 files changed

+590
-6
lines changed

CLAUDE.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ _watcher.EnableRaisingEvents = true;
292292
| No incremental change notification | FileSystemWatcher triggers full scan | Planned: `NotifyLocalChangeAsync()` |
293293
| No virtual file awareness | Can't track placeholder vs downloaded | Planned: `VirtualFileCallback` |
294294
| Single-threaded engine | One sync at a time per instance | By design - create separate instances if needed |
295-
| No bandwidth throttling | Can saturate network | Planned: `SyncOptions.MaxBytesPerSecond` |
295+
| ~~No bandwidth throttling~~ | ~~Can saturate network~~ | `SyncOptions.MaxBytesPerSecond` IMPLEMENTED |
296296
| OCIS TUS not implemented | Falls back to generic upload | Planned for v1.0 |
297297

298298
### Required SharpSync API Additions (v1.0)
@@ -308,7 +308,7 @@ These APIs are required for v1.0 release to support Nimbus desktop client:
308308
4. OCIS TUS protocol implementation (`WebDavStorage.cs:547` currently falls back)
309309

310310
**Sync Control:**
311-
5. `SyncOptions.MaxBytesPerSecond` - Built-in bandwidth throttling
311+
5. ~~`SyncOptions.MaxBytesPerSecond`~~ Built-in bandwidth throttling (IMPLEMENTED)
312312
6. `PauseAsync()` / `ResumeAsync()` - Pause and resume long-running syncs
313313
7. `GetPendingOperationsAsync()` - Inspect sync queue for UI display
314314

@@ -500,7 +500,7 @@ Desktop Client APIs (for Nimbus):
500500
- [ ] `SyncFilesAsync(IEnumerable<string> paths)` - Sync specific files on demand
501501
- [ ] `NotifyLocalChangeAsync(string path, ChangeType type)` - Accept FileSystemWatcher events for incremental sync
502502
- [ ] OCIS TUS protocol implementation (currently falls back to generic upload at `WebDavStorage.cs:547`)
503-
- [ ] `SyncOptions.MaxBytesPerSecond` - Built-in bandwidth throttling
503+
- [x] `SyncOptions.MaxBytesPerSecond` - Built-in bandwidth throttling
504504
- [ ] `PauseAsync()` / `ResumeAsync()` - Pause and resume long-running syncs
505505
- [ ] `GetPendingOperationsAsync()` - Inspect sync queue for UI display
506506
- [ ] Per-file progress events (currently only per-sync-operation)

src/SharpSync/Core/SyncOptions.cs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,20 @@ public class SyncOptions {
6464
/// </summary>
6565
public List<string> ExcludePatterns { get; set; } = new List<string>();
6666

67+
/// <summary>
68+
/// Gets or sets the maximum transfer rate in bytes per second.
69+
/// Set to 0 or null for unlimited bandwidth.
70+
/// </summary>
71+
/// <remarks>
72+
/// This setting applies to both upload and download operations.
73+
/// Useful for preventing network saturation on shared connections.
74+
/// Example values:
75+
/// - 1_048_576 (1 MB/s)
76+
/// - 10_485_760 (10 MB/s)
77+
/// - 104_857_600 (100 MB/s)
78+
/// </remarks>
79+
public long? MaxBytesPerSecond { get; set; }
80+
6781
/// <summary>
6882
/// Creates a copy of the sync options
6983
/// </summary>
@@ -81,7 +95,8 @@ public SyncOptions Clone() {
8195
UpdateExisting = UpdateExisting,
8296
ConflictResolution = ConflictResolution,
8397
TimeoutSeconds = TimeoutSeconds,
84-
ExcludePatterns = new List<string>(ExcludePatterns)
98+
ExcludePatterns = new List<string>(ExcludePatterns),
99+
MaxBytesPerSecond = MaxBytesPerSecond
85100
};
86101
}
87102
}
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
using System.Diagnostics;
2+
3+
namespace Oire.SharpSync.Storage;
4+
5+
/// <summary>
6+
/// Stream wrapper that throttles read and write operations to limit bandwidth usage.
7+
/// Uses a token bucket algorithm for smooth rate limiting.
8+
/// </summary>
9+
internal sealed class ThrottledStream: Stream {
10+
private readonly Stream _innerStream;
11+
private readonly long _maxBytesPerSecond;
12+
private readonly Stopwatch _stopwatch;
13+
private long _totalBytesTransferred;
14+
private readonly object _lock = new();
15+
16+
/// <summary>
17+
/// Creates a new ThrottledStream wrapping the specified stream.
18+
/// </summary>
19+
/// <param name="innerStream">The stream to wrap.</param>
20+
/// <param name="maxBytesPerSecond">Maximum bytes per second (must be positive).</param>
21+
/// <exception cref="ArgumentNullException">Thrown when innerStream is null.</exception>
22+
/// <exception cref="ArgumentOutOfRangeException">Thrown when maxBytesPerSecond is not positive.</exception>
23+
public ThrottledStream(Stream innerStream, long maxBytesPerSecond) {
24+
_innerStream = innerStream ?? throw new ArgumentNullException(nameof(innerStream));
25+
26+
if (maxBytesPerSecond <= 0) {
27+
throw new ArgumentOutOfRangeException(nameof(maxBytesPerSecond), "Max bytes per second must be positive.");
28+
}
29+
30+
_maxBytesPerSecond = maxBytesPerSecond;
31+
_stopwatch = Stopwatch.StartNew();
32+
_totalBytesTransferred = 0;
33+
}
34+
35+
public override bool CanRead => _innerStream.CanRead;
36+
public override bool CanSeek => _innerStream.CanSeek;
37+
public override bool CanWrite => _innerStream.CanWrite;
38+
public override long Length => _innerStream.Length;
39+
40+
public override long Position {
41+
get => _innerStream.Position;
42+
set => _innerStream.Position = value;
43+
}
44+
45+
public override void Flush() => _innerStream.Flush();
46+
47+
public override Task FlushAsync(CancellationToken cancellationToken) =>
48+
_innerStream.FlushAsync(cancellationToken);
49+
50+
public override int Read(byte[] buffer, int offset, int count) {
51+
ThrottleSync(count);
52+
var bytesRead = _innerStream.Read(buffer, offset, count);
53+
RecordBytesTransferred(bytesRead);
54+
return bytesRead;
55+
}
56+
57+
public override async Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) {
58+
await ThrottleAsync(count, cancellationToken);
59+
var bytesRead = await _innerStream.ReadAsync(buffer.AsMemory(offset, count), cancellationToken);
60+
RecordBytesTransferred(bytesRead);
61+
return bytesRead;
62+
}
63+
64+
public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default) {
65+
await ThrottleAsync(buffer.Length, cancellationToken);
66+
var bytesRead = await _innerStream.ReadAsync(buffer, cancellationToken);
67+
RecordBytesTransferred(bytesRead);
68+
return bytesRead;
69+
}
70+
71+
public override void Write(byte[] buffer, int offset, int count) {
72+
ThrottleSync(count);
73+
_innerStream.Write(buffer, offset, count);
74+
RecordBytesTransferred(count);
75+
}
76+
77+
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) {
78+
await ThrottleAsync(count, cancellationToken);
79+
await _innerStream.WriteAsync(buffer.AsMemory(offset, count), cancellationToken);
80+
RecordBytesTransferred(count);
81+
}
82+
83+
public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default) {
84+
await ThrottleAsync(buffer.Length, cancellationToken);
85+
await _innerStream.WriteAsync(buffer, cancellationToken);
86+
RecordBytesTransferred(buffer.Length);
87+
}
88+
89+
public override long Seek(long offset, SeekOrigin origin) => _innerStream.Seek(offset, origin);
90+
91+
public override void SetLength(long value) => _innerStream.SetLength(value);
92+
93+
/// <summary>
94+
/// Synchronously waits if the transfer rate would exceed the limit.
95+
/// </summary>
96+
private void ThrottleSync(int requestedBytes) {
97+
var delay = CalculateDelay(requestedBytes);
98+
if (delay > TimeSpan.Zero) {
99+
Thread.Sleep(delay);
100+
}
101+
}
102+
103+
/// <summary>
104+
/// Asynchronously waits if the transfer rate would exceed the limit.
105+
/// </summary>
106+
private async Task ThrottleAsync(int requestedBytes, CancellationToken cancellationToken) {
107+
var delay = CalculateDelay(requestedBytes);
108+
if (delay > TimeSpan.Zero) {
109+
await Task.Delay(delay, cancellationToken);
110+
}
111+
}
112+
113+
/// <summary>
114+
/// Calculates the delay needed to maintain the target transfer rate.
115+
/// </summary>
116+
private TimeSpan CalculateDelay(int requestedBytes) {
117+
lock (_lock) {
118+
var elapsedSeconds = _stopwatch.Elapsed.TotalSeconds;
119+
if (elapsedSeconds <= 0) {
120+
return TimeSpan.Zero;
121+
}
122+
123+
// Calculate the expected time for the bytes already transferred plus the new bytes
124+
var expectedBytes = _totalBytesTransferred + requestedBytes;
125+
var expectedTimeSeconds = (double)expectedBytes / _maxBytesPerSecond;
126+
127+
// If we're ahead of schedule, delay
128+
if (expectedTimeSeconds > elapsedSeconds) {
129+
var delaySeconds = expectedTimeSeconds - elapsedSeconds;
130+
// Cap delay to a reasonable maximum to prevent extremely long waits
131+
delaySeconds = Math.Min(delaySeconds, 5.0);
132+
return TimeSpan.FromSeconds(delaySeconds);
133+
}
134+
135+
return TimeSpan.Zero;
136+
}
137+
}
138+
139+
/// <summary>
140+
/// Records that bytes have been transferred.
141+
/// </summary>
142+
private void RecordBytesTransferred(int bytes) {
143+
if (bytes > 0) {
144+
lock (_lock) {
145+
_totalBytesTransferred += bytes;
146+
}
147+
}
148+
}
149+
150+
protected override void Dispose(bool disposing) {
151+
if (disposing) {
152+
_innerStream?.Dispose();
153+
}
154+
base.Dispose(disposing);
155+
}
156+
}

src/SharpSync/Sync/SyncEngine.cs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System.Collections.Concurrent;
22
using System.Diagnostics;
33
using Oire.SharpSync.Core;
4+
using Oire.SharpSync.Storage;
45

56
namespace Oire.SharpSync.Sync;
67

@@ -24,6 +25,7 @@ public class SyncEngine: ISyncEngine {
2425
private bool _disposed;
2526
private readonly SemaphoreSlim _syncSemaphore;
2627
private CancellationTokenSource? _currentSyncCts;
28+
private long? _currentMaxBytesPerSecond;
2729

2830
/// <summary>
2931
/// Gets whether the engine is currently synchronizing
@@ -113,6 +115,7 @@ public async Task<SyncResult> SynchronizeAsync(SyncOptions? options = null, Canc
113115
try {
114116
using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
115117
_currentSyncCts = linkedCts;
118+
_currentMaxBytesPerSecond = options?.MaxBytesPerSecond;
116119
var syncToken = linkedCts.Token;
117120
var result = new SyncResult();
118121
var sw = Stopwatch.StartNew();
@@ -157,6 +160,7 @@ public async Task<SyncResult> SynchronizeAsync(SyncOptions? options = null, Canc
157160
return result;
158161
} finally {
159162
_currentSyncCts = null;
163+
_currentMaxBytesPerSecond = null;
160164
_syncSemaphore.Release();
161165
}
162166
}
@@ -830,7 +834,8 @@ private async Task DownloadFileAsync(SyncAction action, ThreadSafeSyncResult res
830834
await _localStorage.CreateDirectoryAsync(action.Path, cancellationToken);
831835
} else {
832836
using var remoteStream = await _remoteStorage.ReadFileAsync(action.Path, cancellationToken);
833-
await _localStorage.WriteFileAsync(action.Path, remoteStream, cancellationToken);
837+
var streamToRead = WrapWithThrottling(remoteStream);
838+
await _localStorage.WriteFileAsync(action.Path, streamToRead, cancellationToken);
834839
}
835840

836841
result.IncrementFilesSynchronized();
@@ -841,7 +846,8 @@ private async Task UploadFileAsync(SyncAction action, ThreadSafeSyncResult resul
841846
await _remoteStorage.CreateDirectoryAsync(action.Path, cancellationToken);
842847
} else {
843848
using var localStream = await _localStorage.ReadFileAsync(action.Path, cancellationToken);
844-
await _remoteStorage.WriteFileAsync(action.Path, localStream, cancellationToken);
849+
var streamToRead = WrapWithThrottling(localStream);
850+
await _remoteStorage.WriteFileAsync(action.Path, streamToRead, cancellationToken);
845851
}
846852

847853
result.IncrementFilesSynchronized();
@@ -1091,6 +1097,19 @@ private async Task UpdateDatabaseStateAsync(ChangeSet changes, CancellationToken
10911097
_ => SyncOperation.Unknown
10921098
};
10931099

1100+
/// <summary>
1101+
/// Wraps the provided stream with a throttled stream if bandwidth limiting is enabled.
1102+
/// </summary>
1103+
/// <param name="stream">The stream to potentially wrap.</param>
1104+
/// <returns>The original stream or a throttled wrapper.</returns>
1105+
private Stream WrapWithThrottling(Stream stream) {
1106+
if (_currentMaxBytesPerSecond.HasValue && _currentMaxBytesPerSecond.Value > 0) {
1107+
return new ThrottledStream(stream, _currentMaxBytesPerSecond.Value);
1108+
}
1109+
1110+
return stream;
1111+
}
1112+
10941113
private void RaiseProgress(SyncProgress progress, SyncOperation operation) {
10951114
ProgressChanged?.Invoke(this, new SyncProgressEventArgs(progress, progress.CurrentItem, operation));
10961115
}

0 commit comments

Comments
 (0)