Skip to content
6 changes: 3 additions & 3 deletions src/ByteSync.Client/Exceptions/ApiException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ public ApiException(string message, Exception innerException) : base(message, in
{
}

public ApiException(string message, HttpStatusCode httptatusCode) : base(message)
public ApiException(string message, HttpStatusCode httpStatusCode) : base(message)
{
HttptatusCode = httptatusCode;
HttpStatusCode = httpStatusCode;
}

public HttpStatusCode? HttptatusCode { get; set; }
public HttpStatusCode? HttpStatusCode { get; set; }
}
12 changes: 6 additions & 6 deletions src/ByteSync.Client/Factories/FileUploadProcessorFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,27 @@ public FileUploadProcessorFactory(IComponentContext context)
}

public IFileUploadProcessor Create(
ISlicerEncrypter slicerEncrypter,
string? localFileToUpload,
MemoryStream? memoryStream,
SharedFileDefinition sharedFileDefinition)
{
// Create the slicer encrypter
var slicerEncrypter = _context.Resolve<ISlicerEncrypter>();

// Create coordination components
var fileUploadCoordinator = new FileUploadCoordinator(_context.Resolve<ILogger<FileUploadCoordinator>>());
var semaphoreSlim = new SemaphoreSlim(1, 1);

// Create file slicer
var adaptiveUploadController = _context.Resolve<IAdaptiveUploadController>();
var fileSlicer = new FileSlicer(slicerEncrypter, fileUploadCoordinator.AvailableSlices,
semaphoreSlim, fileUploadCoordinator.ExceptionOccurred, _context.Resolve<ILogger<FileSlicer>>());
semaphoreSlim, fileUploadCoordinator.ExceptionOccurred, _context.Resolve<ILogger<FileSlicer>>(), adaptiveUploadController);

// Create file upload worker
var policyFactory = _context.Resolve<IPolicyFactory>();
var fileTransferApiClient = _context.Resolve<IFileTransferApiClient>();
var strategies = _context.Resolve<IIndex<StorageProvider, IUploadStrategy>>();
var fileUploadWorker = new FileUploadWorker(policyFactory, fileTransferApiClient, sharedFileDefinition,
semaphoreSlim, fileUploadCoordinator.ExceptionOccurred, strategies,
fileUploadCoordinator.UploadingIsFinished, _context.Resolve<ILogger<FileUploadWorker>>());
fileUploadCoordinator.UploadingIsFinished, _context.Resolve<ILogger<FileUploadWorker>>(), adaptiveUploadController);

// Create file part upload asserter
var sessionService = _context.Resolve<ISessionService>();
Expand All @@ -57,7 +56,8 @@ public IFileUploadProcessor Create(
new TypedParameter(typeof(IFileUploadWorker), fileUploadWorker),
new TypedParameter(typeof(IFilePartUploadAsserter), filePartUploadAsserter),
new TypedParameter(typeof(string), localFileToUpload),
new TypedParameter(typeof(SemaphoreSlim), semaphoreSlim)
new TypedParameter(typeof(SemaphoreSlim), semaphoreSlim),
new TypedParameter(typeof(IAdaptiveUploadController), adaptiveUploadController)
);

return fileUploadProcessor;
Expand Down
35 changes: 7 additions & 28 deletions src/ByteSync.Client/Factories/FileUploaderFactory.cs
Original file line number Diff line number Diff line change
@@ -1,15 +1,9 @@
using System.IO;
using System.Threading;
using Autofac;
using Autofac.Features.Indexed;
using ByteSync.Common.Business.SharedFiles;
using ByteSync.Interfaces;
using ByteSync.Interfaces.Controls.Communications;
using ByteSync.Interfaces.Controls.Communications.Http;
using ByteSync.Interfaces.Controls.Encryptions;
using ByteSync.Interfaces.Factories;
using ByteSync.Interfaces.Services.Sessions;
using ByteSync.Services.Communications.Transfers.Uploading;

namespace ByteSync.Factories;

Expand All @@ -35,33 +29,18 @@ public IFileUploader Build(MemoryStream memoryStream, SharedFileDefinition share
private IFileUploader DoBuild(string? fullName, MemoryStream? memoryStream, SharedFileDefinition sharedFileDefinition)
{
var slicerEncrypter = _context.Resolve<ISlicerEncrypter>();

var fileUploadCoordinator = new FileUploadCoordinator(_context.Resolve<ILogger<FileUploadCoordinator>>());
var semaphoreSlim = new SemaphoreSlim(1, 1);

var fileSlicer = new FileSlicer(slicerEncrypter, fileUploadCoordinator.AvailableSlices,
semaphoreSlim, fileUploadCoordinator.ExceptionOccurred, _context.Resolve<ILogger<FileSlicer>>());

var policyFactory = _context.Resolve<IPolicyFactory>();
var fileTransferApiClient = _context.Resolve<IFileTransferApiClient>();
var strategies = _context.Resolve<IIndex<StorageProvider, IUploadStrategy>>();
var fileUploadWorker = new FileUploadWorker(policyFactory, fileTransferApiClient, sharedFileDefinition,
semaphoreSlim, fileUploadCoordinator.ExceptionOccurred, strategies,
fileUploadCoordinator.UploadingIsFinished, _context.Resolve<ILogger<FileUploadWorker>>());

var sessionService = _context.Resolve<ISessionService>();
var filePartUploadAsserter = new FilePartUploadAsserter(fileTransferApiClient, sessionService);

var preparerFactory = _context.Resolve<IFileUploadPreparerFactory>();
var processorFactory = _context.Resolve<IFileUploadProcessorFactory>();
var fileUploadPreparer = preparerFactory.Create();
var fileUploadProcessor = processorFactory.Create(slicerEncrypter, fullName, memoryStream, sharedFileDefinition);

var fileUploader = _context.Resolve<IFileUploader>(
new TypedParameter(typeof(string), fullName),
new TypedParameter(typeof(MemoryStream), memoryStream),
new TypedParameter(typeof(SharedFileDefinition), sharedFileDefinition),
new TypedParameter(typeof(IFileUploadCoordinator), fileUploadCoordinator),
new TypedParameter(typeof(IFileSlicer), fileSlicer),
new TypedParameter(typeof(IFileUploadWorker), fileUploadWorker),
new TypedParameter(typeof(IFilePartUploadAsserter), filePartUploadAsserter),
new TypedParameter(typeof(ISlicerEncrypter), slicerEncrypter),
new TypedParameter(typeof(SemaphoreSlim), semaphoreSlim)
new TypedParameter(typeof(IFileUploadPreparer), fileUploadPreparer),
new TypedParameter(typeof(IFileUploadProcessor), fileUploadProcessor)
);

return fileUploader;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace ByteSync.Interfaces.Controls.Communications;

public interface IAdaptiveUploadController
{
int CurrentChunkSizeBytes { get; }
int CurrentParallelism { get; }

// Returns the chunk size to use for the next slice
int GetNextChunkSizeBytes();

// Record the result of an upload attempt for a slice
void RecordUploadResult(TimeSpan elapsed, bool isSuccess, int partNumber, int? statusCode = null, Exception? exception = null);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ namespace ByteSync.Interfaces.Controls.Communications;

public interface IFileSlicer
{
// TODO : remove this method and its implementation
Task SliceAndEncryptAsync(SharedFileDefinition sharedFileDefinition, UploadProgressState progressState,
int? maxSliceLength = null);

Task SliceAndEncryptAdaptiveAsync(SharedFileDefinition sharedFileDefinition, UploadProgressState progressState);
}

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ namespace ByteSync.Interfaces.Controls.Communications;

public interface IFileUploadWorker
{
// TODO: remove this method and its implementation
Task UploadAvailableSlicesAsync(Channel<FileUploaderSlice> availableSlices, UploadProgressState progressState);

// TODO: remove this method and its implementation
void StartUploadWorkers(Channel<FileUploaderSlice> availableSlices, int workerCount, UploadProgressState progressState);

Task UploadAvailableSlicesAdaptiveAsync(Channel<FileUploaderSlice> availableSlices, UploadProgressState progressState);
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
using System.IO;
using ByteSync.Common.Business.SharedFiles;
using ByteSync.Interfaces.Controls.Communications;
using ByteSync.Interfaces.Controls.Encryptions;

namespace ByteSync.Interfaces.Factories;

public interface IFileUploadProcessorFactory
{
IFileUploadProcessor Create(
ISlicerEncrypter slicerEncrypter,
string? localFileToUpload,
MemoryStream? memoryStream,
SharedFileDefinition sharedFileDefinition);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
using ByteSync.Interfaces.Controls.Communications;

namespace ByteSync.Services.Communications.Transfers.Uploading;

public class AdaptiveUploadController : IAdaptiveUploadController
{
// Initial configuration
private const int INITIAL_CHUNK_SIZE_BYTES = 500 * 1024; // 500 KB
private const int MIN_PARALLELISM = 2;
private const int MAX_PARALLELISM = 4;

// Thresholds
private static readonly TimeSpan UpscaleThreshold = TimeSpan.FromSeconds(25);
private static readonly TimeSpan DownscaleThreshold = TimeSpan.FromSeconds(30);

// Chunk size thresholds for parallelism increases
private const int FOUR_MB = 4 * 1024 * 1024;
private const int EIGHT_MB = 8 * 1024 * 1024;

// State
private int _currentChunkSizeBytes;
private int _currentParallelism;
private readonly Queue<TimeSpan> _recentDurations;
private readonly Queue<int> _recentPartNumbers;
private readonly Queue<bool> _recentSuccesses;
private int _successesInWindow;
private int _windowSize;
private readonly ILogger<AdaptiveUploadController> _logger;

public AdaptiveUploadController(ILogger<AdaptiveUploadController> logger)
{
_logger = logger;
_currentChunkSizeBytes = INITIAL_CHUNK_SIZE_BYTES;
_currentParallelism = MIN_PARALLELISM;
_recentDurations = new Queue<TimeSpan>();
_recentPartNumbers = new Queue<int>();
_recentSuccesses = new Queue<bool>();
_windowSize = _currentParallelism;
}

public int CurrentChunkSizeBytes => _currentChunkSizeBytes;
public int CurrentParallelism => _currentParallelism;

public int GetNextChunkSizeBytes()
{
return _currentChunkSizeBytes;
}

public void RecordUploadResult(TimeSpan elapsed, bool isSuccess, int partNumber, int? statusCode = null, Exception? exception = null)
{
// Track window of last N uploads where N == current parallelism
_recentDurations.Enqueue(elapsed);
_recentPartNumbers.Enqueue(partNumber);
_recentSuccesses.Enqueue(isSuccess);
if (isSuccess) { _successesInWindow += 1; }
while (_recentDurations.Count > _windowSize)
{
_recentDurations.Dequeue();
if (_recentPartNumbers.Count > 0)
{
_recentPartNumbers.Dequeue();
}
if (_recentSuccesses.Count > 0)
{
var removedSuccess = _recentSuccesses.Dequeue();
if (removedSuccess && _successesInWindow > 0)
{
_successesInWindow -= 1;
}
}
}

// If error indicates bandwidth problems, reset chunk size
if (!isSuccess && statusCode != null)
{
if (statusCode == 429 || statusCode == 503 || statusCode == 507)
{
_logger.LogWarning("Adaptive: bandwidth error status {Status}. Resetting chunk size to {InitialKb} KB (was {PrevKb} KB)", statusCode, INITIAL_CHUNK_SIZE_BYTES / 1024, _currentChunkSizeBytes / 1024);
_currentChunkSizeBytes = INITIAL_CHUNK_SIZE_BYTES;
return;
}
}

if (_recentDurations.Count < _windowSize)
{
return; // not enough data yet
}

var maxElapsed = TimeSpan.Zero;
foreach (var d in _recentDurations)
{
if (d > maxElapsed) maxElapsed = d;
}

_logger.LogDebug(
"Adaptive: maxElapsedMs={MaxElapsedMs}, parallelism={Parallelism}, chunkKB={ChunkKb}",
maxElapsed.TotalMilliseconds, _currentParallelism, _currentChunkSizeBytes / 1024);

// Downscale path first
if (maxElapsed > DownscaleThreshold)
{
// Adjust chunk size first (incremental), then parallelism in the same evaluation
_currentChunkSizeBytes = (int)Math.Max(64 * 1024, _currentChunkSizeBytes * 0.75);
_logger.LogInformation(
"Adaptive: Downscale. maxElapsedMs={MaxElapsedMs} > {ThresholdMs}. New chunkKB={ChunkKb}.",
maxElapsed.TotalMilliseconds, DownscaleThreshold.TotalMilliseconds, _currentChunkSizeBytes / 1024);
if (_currentParallelism > MIN_PARALLELISM)
{
_logger.LogInformation(
"Adaptive: Downscale. Reducing parallelism {Prev} -> {Next}. Resetting window.",
_currentParallelism, _currentParallelism - 1);
_currentParallelism -= 1;
_windowSize = _currentParallelism;
}
// Logging before resetting window done above; now reset
ResetWindow();
return;
}

// Upscale when stable and fast (<= 25s) and all in window were successful
if (maxElapsed <= UpscaleThreshold && _successesInWindow >= _windowSize)
{
// Increase chunk size up to 25% per step, but target towards 25s heuristically
// Simple rule: +25%
var increased = (int)(_currentChunkSizeBytes * 1.25);
_currentChunkSizeBytes = increased;
_logger.LogInformation(
"Adaptive: Upscale. maxElapsedMs={MaxElapsedMs} <= {ThresholdMs}. New chunkKB={ChunkKb}.",
maxElapsed.TotalMilliseconds, UpscaleThreshold.TotalMilliseconds, _currentChunkSizeBytes / 1024);

// Increase parallelism at thresholds of chunk size
if (_currentChunkSizeBytes >= EIGHT_MB)
{
var prev = _currentParallelism;
_currentParallelism = Math.Max(_currentParallelism, 4);
if (_currentParallelism != prev)
{
_logger.LogInformation("Adaptive: Upscale. Increasing parallelism {Prev} -> {Next} due to chunk>=8MB.", prev, _currentParallelism);
}
}
else if (_currentChunkSizeBytes >= FOUR_MB)
{
var prev = _currentParallelism;
_currentParallelism = Math.Max(_currentParallelism, 3);
if (_currentParallelism != prev)
{
_logger.LogInformation("Adaptive: Upscale. Increasing parallelism {Prev} -> {Next} due to chunk>=4MB.", prev, _currentParallelism);
}
}
_currentParallelism = Math.Min(_currentParallelism, MAX_PARALLELISM);
_windowSize = _currentParallelism;
}
}

private void ResetWindow()
{
while (_recentDurations.Count > 0)
{
_recentDurations.Dequeue();
}
while (_recentPartNumbers.Count > 0)
{
_recentPartNumbers.Dequeue();
}
while (_recentSuccesses.Count > 0)
{
_recentSuccesses.Dequeue();
}
_successesInWindow = 0;
}
}
Loading
Loading