Skip to content

Commit dab18a3

Browse files
authored
[feature] Improve upload management adaptive controller (#194)
1 parent c2b4704 commit dab18a3

File tree

24 files changed

+864
-209
lines changed

24 files changed

+864
-209
lines changed

src/ByteSync.Client/Exceptions/ApiException.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ public ApiException(string message, Exception innerException) : base(message, in
1212
{
1313
}
1414

15-
public ApiException(string message, HttpStatusCode httptatusCode) : base(message)
15+
public ApiException(string message, HttpStatusCode httpStatusCode) : base(message)
1616
{
17-
HttptatusCode = httptatusCode;
17+
HttpStatusCode = httpStatusCode;
1818
}
1919

20-
public HttpStatusCode? HttptatusCode { get; set; }
20+
public HttpStatusCode? HttpStatusCode { get; set; }
2121
}

src/ByteSync.Client/Factories/FileUploadProcessorFactory.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,28 +23,27 @@ public FileUploadProcessorFactory(IComponentContext context)
2323
}
2424

2525
public IFileUploadProcessor Create(
26+
ISlicerEncrypter slicerEncrypter,
2627
string? localFileToUpload,
2728
MemoryStream? memoryStream,
2829
SharedFileDefinition sharedFileDefinition)
2930
{
30-
// Create the slicer encrypter
31-
var slicerEncrypter = _context.Resolve<ISlicerEncrypter>();
32-
3331
// Create coordination components
3432
var fileUploadCoordinator = new FileUploadCoordinator(_context.Resolve<ILogger<FileUploadCoordinator>>());
3533
var semaphoreSlim = new SemaphoreSlim(1, 1);
3634

3735
// Create file slicer
36+
var adaptiveUploadController = _context.Resolve<IAdaptiveUploadController>();
3837
var fileSlicer = new FileSlicer(slicerEncrypter, fileUploadCoordinator.AvailableSlices,
39-
semaphoreSlim, fileUploadCoordinator.ExceptionOccurred, _context.Resolve<ILogger<FileSlicer>>());
38+
semaphoreSlim, fileUploadCoordinator.ExceptionOccurred, _context.Resolve<ILogger<FileSlicer>>(), adaptiveUploadController);
4039

4140
// Create file upload worker
4241
var policyFactory = _context.Resolve<IPolicyFactory>();
4342
var fileTransferApiClient = _context.Resolve<IFileTransferApiClient>();
4443
var strategies = _context.Resolve<IIndex<StorageProvider, IUploadStrategy>>();
4544
var fileUploadWorker = new FileUploadWorker(policyFactory, fileTransferApiClient, sharedFileDefinition,
4645
semaphoreSlim, fileUploadCoordinator.ExceptionOccurred, strategies,
47-
fileUploadCoordinator.UploadingIsFinished, _context.Resolve<ILogger<FileUploadWorker>>());
46+
fileUploadCoordinator.UploadingIsFinished, _context.Resolve<ILogger<FileUploadWorker>>(), adaptiveUploadController);
4847

4948
// Create file part upload asserter
5049
var sessionService = _context.Resolve<ISessionService>();
@@ -57,7 +56,8 @@ public IFileUploadProcessor Create(
5756
new TypedParameter(typeof(IFileUploadWorker), fileUploadWorker),
5857
new TypedParameter(typeof(IFilePartUploadAsserter), filePartUploadAsserter),
5958
new TypedParameter(typeof(string), localFileToUpload),
60-
new TypedParameter(typeof(SemaphoreSlim), semaphoreSlim)
59+
new TypedParameter(typeof(SemaphoreSlim), semaphoreSlim),
60+
new TypedParameter(typeof(IAdaptiveUploadController), adaptiveUploadController)
6161
);
6262

6363
return fileUploadProcessor;

src/ByteSync.Client/Factories/FileUploaderFactory.cs

Lines changed: 7 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,9 @@
11
using System.IO;
2-
using System.Threading;
32
using Autofac;
4-
using Autofac.Features.Indexed;
53
using ByteSync.Common.Business.SharedFiles;
6-
using ByteSync.Interfaces;
74
using ByteSync.Interfaces.Controls.Communications;
8-
using ByteSync.Interfaces.Controls.Communications.Http;
95
using ByteSync.Interfaces.Controls.Encryptions;
106
using ByteSync.Interfaces.Factories;
11-
using ByteSync.Interfaces.Services.Sessions;
12-
using ByteSync.Services.Communications.Transfers.Uploading;
137

148
namespace ByteSync.Factories;
159

@@ -35,33 +29,18 @@ public IFileUploader Build(MemoryStream memoryStream, SharedFileDefinition share
3529
private IFileUploader DoBuild(string? fullName, MemoryStream? memoryStream, SharedFileDefinition sharedFileDefinition)
3630
{
3731
var slicerEncrypter = _context.Resolve<ISlicerEncrypter>();
38-
39-
var fileUploadCoordinator = new FileUploadCoordinator(_context.Resolve<ILogger<FileUploadCoordinator>>());
40-
var semaphoreSlim = new SemaphoreSlim(1, 1);
41-
42-
var fileSlicer = new FileSlicer(slicerEncrypter, fileUploadCoordinator.AvailableSlices,
43-
semaphoreSlim, fileUploadCoordinator.ExceptionOccurred, _context.Resolve<ILogger<FileSlicer>>());
44-
45-
var policyFactory = _context.Resolve<IPolicyFactory>();
46-
var fileTransferApiClient = _context.Resolve<IFileTransferApiClient>();
47-
var strategies = _context.Resolve<IIndex<StorageProvider, IUploadStrategy>>();
48-
var fileUploadWorker = new FileUploadWorker(policyFactory, fileTransferApiClient, sharedFileDefinition,
49-
semaphoreSlim, fileUploadCoordinator.ExceptionOccurred, strategies,
50-
fileUploadCoordinator.UploadingIsFinished, _context.Resolve<ILogger<FileUploadWorker>>());
51-
52-
var sessionService = _context.Resolve<ISessionService>();
53-
var filePartUploadAsserter = new FilePartUploadAsserter(fileTransferApiClient, sessionService);
54-
32+
var preparerFactory = _context.Resolve<IFileUploadPreparerFactory>();
33+
var processorFactory = _context.Resolve<IFileUploadProcessorFactory>();
34+
var fileUploadPreparer = preparerFactory.Create();
35+
var fileUploadProcessor = processorFactory.Create(slicerEncrypter, fullName, memoryStream, sharedFileDefinition);
36+
5537
var fileUploader = _context.Resolve<IFileUploader>(
5638
new TypedParameter(typeof(string), fullName),
5739
new TypedParameter(typeof(MemoryStream), memoryStream),
5840
new TypedParameter(typeof(SharedFileDefinition), sharedFileDefinition),
59-
new TypedParameter(typeof(IFileUploadCoordinator), fileUploadCoordinator),
60-
new TypedParameter(typeof(IFileSlicer), fileSlicer),
61-
new TypedParameter(typeof(IFileUploadWorker), fileUploadWorker),
62-
new TypedParameter(typeof(IFilePartUploadAsserter), filePartUploadAsserter),
6341
new TypedParameter(typeof(ISlicerEncrypter), slicerEncrypter),
64-
new TypedParameter(typeof(SemaphoreSlim), semaphoreSlim)
42+
new TypedParameter(typeof(IFileUploadPreparer), fileUploadPreparer),
43+
new TypedParameter(typeof(IFileUploadProcessor), fileUploadProcessor)
6544
);
6645

6746
return fileUploader;
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
namespace ByteSync.Interfaces.Controls.Communications;
2+
3+
public interface IAdaptiveUploadController
4+
{
5+
int CurrentChunkSizeBytes { get; }
6+
int CurrentParallelism { get; }
7+
8+
// Returns the chunk size to use for the next slice
9+
int GetNextChunkSizeBytes();
10+
11+
// Record the result of an upload attempt for a slice
12+
void RecordUploadResult(TimeSpan elapsed, bool isSuccess, int partNumber, int? statusCode = null, Exception? exception = null);
13+
}

src/ByteSync.Client/Interfaces/Controls/Communications/IFileSlicer.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ namespace ByteSync.Interfaces.Controls.Communications;
55

66
public interface IFileSlicer
77
{
8+
// TODO : remove this method and its implementation
89
Task SliceAndEncryptAsync(SharedFileDefinition sharedFileDefinition, UploadProgressState progressState,
910
int? maxSliceLength = null);
11+
12+
Task SliceAndEncryptAdaptiveAsync(SharedFileDefinition sharedFileDefinition, UploadProgressState progressState);
1013
}
1114

src/ByteSync.Client/Interfaces/Controls/Communications/IFileUploadWorker.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,11 @@ namespace ByteSync.Interfaces.Controls.Communications;
55

66
public interface IFileUploadWorker
77
{
8+
// TODO: remove this method and its implementation
89
Task UploadAvailableSlicesAsync(Channel<FileUploaderSlice> availableSlices, UploadProgressState progressState);
910

11+
// TODO: remove this method and its implementation
1012
void StartUploadWorkers(Channel<FileUploaderSlice> availableSlices, int workerCount, UploadProgressState progressState);
13+
14+
Task UploadAvailableSlicesAdaptiveAsync(Channel<FileUploaderSlice> availableSlices, UploadProgressState progressState);
1115
}

src/ByteSync.Client/Interfaces/Factories/IFileUploadProcessorFactory.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
using System.IO;
22
using ByteSync.Common.Business.SharedFiles;
33
using ByteSync.Interfaces.Controls.Communications;
4+
using ByteSync.Interfaces.Controls.Encryptions;
45

56
namespace ByteSync.Interfaces.Factories;
67

78
public interface IFileUploadProcessorFactory
89
{
910
IFileUploadProcessor Create(
11+
ISlicerEncrypter slicerEncrypter,
1012
string? localFileToUpload,
1113
MemoryStream? memoryStream,
1214
SharedFileDefinition sharedFileDefinition);
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
using ByteSync.Interfaces.Controls.Communications;
2+
3+
namespace ByteSync.Services.Communications.Transfers.Uploading;
4+
5+
public class AdaptiveUploadController : IAdaptiveUploadController
6+
{
7+
// Initial configuration
8+
private const int INITIAL_CHUNK_SIZE_BYTES = 500 * 1024; // 500 KB
9+
private const int MIN_PARALLELISM = 2;
10+
private const int MAX_PARALLELISM = 4;
11+
12+
// Thresholds
13+
private static readonly TimeSpan UpscaleThreshold = TimeSpan.FromSeconds(25);
14+
private static readonly TimeSpan DownscaleThreshold = TimeSpan.FromSeconds(30);
15+
16+
// Chunk size thresholds for parallelism increases
17+
private const int FOUR_MB = 4 * 1024 * 1024;
18+
private const int EIGHT_MB = 8 * 1024 * 1024;
19+
20+
// State
21+
private int _currentChunkSizeBytes;
22+
private int _currentParallelism;
23+
private readonly Queue<TimeSpan> _recentDurations;
24+
private readonly Queue<int> _recentPartNumbers;
25+
private readonly Queue<bool> _recentSuccesses;
26+
private int _successesInWindow;
27+
private int _windowSize;
28+
private readonly ILogger<AdaptiveUploadController> _logger;
29+
30+
public AdaptiveUploadController(ILogger<AdaptiveUploadController> logger)
31+
{
32+
_logger = logger;
33+
_currentChunkSizeBytes = INITIAL_CHUNK_SIZE_BYTES;
34+
_currentParallelism = MIN_PARALLELISM;
35+
_recentDurations = new Queue<TimeSpan>();
36+
_recentPartNumbers = new Queue<int>();
37+
_recentSuccesses = new Queue<bool>();
38+
_windowSize = _currentParallelism;
39+
}
40+
41+
public int CurrentChunkSizeBytes => _currentChunkSizeBytes;
42+
public int CurrentParallelism => _currentParallelism;
43+
44+
public int GetNextChunkSizeBytes()
45+
{
46+
return _currentChunkSizeBytes;
47+
}
48+
49+
public void RecordUploadResult(TimeSpan elapsed, bool isSuccess, int partNumber, int? statusCode = null, Exception? exception = null)
50+
{
51+
// Track window of last N uploads where N == current parallelism
52+
_recentDurations.Enqueue(elapsed);
53+
_recentPartNumbers.Enqueue(partNumber);
54+
_recentSuccesses.Enqueue(isSuccess);
55+
if (isSuccess) { _successesInWindow += 1; }
56+
while (_recentDurations.Count > _windowSize)
57+
{
58+
_recentDurations.Dequeue();
59+
if (_recentPartNumbers.Count > 0)
60+
{
61+
_recentPartNumbers.Dequeue();
62+
}
63+
if (_recentSuccesses.Count > 0)
64+
{
65+
var removedSuccess = _recentSuccesses.Dequeue();
66+
if (removedSuccess && _successesInWindow > 0)
67+
{
68+
_successesInWindow -= 1;
69+
}
70+
}
71+
}
72+
73+
// If error indicates bandwidth problems, reset chunk size
74+
if (!isSuccess && statusCode != null)
75+
{
76+
if (statusCode == 429 || statusCode == 503 || statusCode == 507)
77+
{
78+
_logger.LogWarning("Adaptive: bandwidth error status {Status}. Resetting chunk size to {InitialKb} KB (was {PrevKb} KB)", statusCode, INITIAL_CHUNK_SIZE_BYTES / 1024, _currentChunkSizeBytes / 1024);
79+
_currentChunkSizeBytes = INITIAL_CHUNK_SIZE_BYTES;
80+
return;
81+
}
82+
}
83+
84+
if (_recentDurations.Count < _windowSize)
85+
{
86+
return; // not enough data yet
87+
}
88+
89+
var maxElapsed = TimeSpan.Zero;
90+
foreach (var d in _recentDurations)
91+
{
92+
if (d > maxElapsed) maxElapsed = d;
93+
}
94+
95+
_logger.LogDebug(
96+
"Adaptive: maxElapsedMs={MaxElapsedMs}, parallelism={Parallelism}, chunkKB={ChunkKb}",
97+
maxElapsed.TotalMilliseconds, _currentParallelism, _currentChunkSizeBytes / 1024);
98+
99+
// Downscale path first
100+
if (maxElapsed > DownscaleThreshold)
101+
{
102+
// Adjust chunk size first (incremental), then parallelism in the same evaluation
103+
_currentChunkSizeBytes = (int)Math.Max(64 * 1024, _currentChunkSizeBytes * 0.75);
104+
_logger.LogInformation(
105+
"Adaptive: Downscale. maxElapsedMs={MaxElapsedMs} > {ThresholdMs}. New chunkKB={ChunkKb}.",
106+
maxElapsed.TotalMilliseconds, DownscaleThreshold.TotalMilliseconds, _currentChunkSizeBytes / 1024);
107+
if (_currentParallelism > MIN_PARALLELISM)
108+
{
109+
_logger.LogInformation(
110+
"Adaptive: Downscale. Reducing parallelism {Prev} -> {Next}. Resetting window.",
111+
_currentParallelism, _currentParallelism - 1);
112+
_currentParallelism -= 1;
113+
_windowSize = _currentParallelism;
114+
}
115+
// Logging before resetting window done above; now reset
116+
ResetWindow();
117+
return;
118+
}
119+
120+
// Upscale when stable and fast (<= 25s) and all in window were successful
121+
if (maxElapsed <= UpscaleThreshold && _successesInWindow >= _windowSize)
122+
{
123+
// Increase chunk size up to 25% per step, but target towards 25s heuristically
124+
// Simple rule: +25%
125+
var increased = (int)(_currentChunkSizeBytes * 1.25);
126+
_currentChunkSizeBytes = increased;
127+
_logger.LogInformation(
128+
"Adaptive: Upscale. maxElapsedMs={MaxElapsedMs} <= {ThresholdMs}. New chunkKB={ChunkKb}.",
129+
maxElapsed.TotalMilliseconds, UpscaleThreshold.TotalMilliseconds, _currentChunkSizeBytes / 1024);
130+
131+
// Increase parallelism at thresholds of chunk size
132+
if (_currentChunkSizeBytes >= EIGHT_MB)
133+
{
134+
var prev = _currentParallelism;
135+
_currentParallelism = Math.Max(_currentParallelism, 4);
136+
if (_currentParallelism != prev)
137+
{
138+
_logger.LogInformation("Adaptive: Upscale. Increasing parallelism {Prev} -> {Next} due to chunk>=8MB.", prev, _currentParallelism);
139+
}
140+
}
141+
else if (_currentChunkSizeBytes >= FOUR_MB)
142+
{
143+
var prev = _currentParallelism;
144+
_currentParallelism = Math.Max(_currentParallelism, 3);
145+
if (_currentParallelism != prev)
146+
{
147+
_logger.LogInformation("Adaptive: Upscale. Increasing parallelism {Prev} -> {Next} due to chunk>=4MB.", prev, _currentParallelism);
148+
}
149+
}
150+
_currentParallelism = Math.Min(_currentParallelism, MAX_PARALLELISM);
151+
_windowSize = _currentParallelism;
152+
}
153+
}
154+
155+
private void ResetWindow()
156+
{
157+
while (_recentDurations.Count > 0)
158+
{
159+
_recentDurations.Dequeue();
160+
}
161+
while (_recentPartNumbers.Count > 0)
162+
{
163+
_recentPartNumbers.Dequeue();
164+
}
165+
while (_recentSuccesses.Count > 0)
166+
{
167+
_recentSuccesses.Dequeue();
168+
}
169+
_successesInWindow = 0;
170+
}
171+
}

0 commit comments

Comments
 (0)