Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/trigger-PR-pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
/k:"POW-Software_ByteSync"
/o:"pow-software"
/d:sonar.host.url="https://sonarcloud.io"
/d:sonar.login="$SONAR_TOKEN"
/d:sonar.token="$SONAR_TOKEN"
/d:sonar.cs.opencover.reportsPaths="**/coverage.opencover.xml"
/d:sonar.cs.vstest.reportsPaths="**/*.trx"

Expand All @@ -61,7 +61,7 @@ jobs:
- name: SonarCloud End (publish)
run: >
dotnet sonarscanner end
/d:sonar.login="$SONAR_TOKEN"
/d:sonar.token="$SONAR_TOKEN"

discover-tests:
runs-on: ubuntu-latest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
using ByteSync.Services.Communications.SignalR;
using ByteSync.Services.Communications.Transfers.Downloading;
using ByteSync.Services.Inventories;
using ByteSync.Services.Communications.Transfers.Uploading;
using ByteSync.Interfaces.Controls.Communications;
using ByteSync.Services.Navigations;
using ByteSync.Services.Sessions.Connecting;
using ByteSync.Services.Synchronizations;
Expand Down Expand Up @@ -34,5 +36,8 @@ protected override void Load(ContainerBuilder builder)
builder.RegisterType<SynchronizationStarter>().SingleInstance().AsImplementedInterfaces();
builder.RegisterType<SynchronizationActionServerInformer>().SingleInstance().AsImplementedInterfaces();
builder.RegisterType<DownloadManager>().SingleInstance().AsImplementedInterfaces();

builder.RegisterType<AdaptiveUploadController>().SingleInstance().As<IAdaptiveUploadController>();
builder.RegisterType<UploadSlicingManager>().SingleInstance().As<IUploadSlicingManager>();
}
}
9 changes: 4 additions & 5 deletions src/ByteSync.Client/Factories/FileUploadProcessorFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,8 @@ public IFileUploadProcessor Create(
var fileUploadCoordinator = new FileUploadCoordinator(_context.Resolve<ILogger<FileUploadCoordinator>>());
var semaphoreSlim = new SemaphoreSlim(1, 1);

// Create file slicer
// Resolve adaptive upload controller
var adaptiveUploadController = _context.Resolve<IAdaptiveUploadController>();
var fileSlicer = new FileSlicer(slicerEncrypter, fileUploadCoordinator.AvailableSlices,
semaphoreSlim, fileUploadCoordinator.ExceptionOccurred, _context.Resolve<ILogger<FileSlicer>>(), adaptiveUploadController);

// Create file upload worker
var policyFactory = _context.Resolve<IPolicyFactory>();
Expand All @@ -49,15 +47,16 @@ public IFileUploadProcessor Create(
var sessionService = _context.Resolve<ISessionService>();
var filePartUploadAsserter = new FilePartUploadAsserter(fileTransferApiClient, sessionService);

var slicingManager = _context.Resolve<IUploadSlicingManager>();
var fileUploadProcessor = _context.Resolve<IFileUploadProcessor>(
new TypedParameter(typeof(ISlicerEncrypter), slicerEncrypter),
new TypedParameter(typeof(IFileUploadCoordinator), fileUploadCoordinator),
new TypedParameter(typeof(IFileSlicer), fileSlicer),
new TypedParameter(typeof(IFileUploadWorker), fileUploadWorker),
new TypedParameter(typeof(IFilePartUploadAsserter), filePartUploadAsserter),
new TypedParameter(typeof(string), localFileToUpload),
new TypedParameter(typeof(SemaphoreSlim), semaphoreSlim),
new TypedParameter(typeof(IAdaptiveUploadController), adaptiveUploadController)
new TypedParameter(typeof(IAdaptiveUploadController), adaptiveUploadController),
new TypedParameter(typeof(IUploadSlicingManager), slicingManager)
);

return fileUploadProcessor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ namespace ByteSync.Interfaces.Controls.Communications;

public interface IFileSlicer
{
// TODO : remove this method and its implementation
Task SliceAndEncryptAsync(SharedFileDefinition sharedFileDefinition, UploadProgressState progressState,
int? maxSliceLength = null);

Task SliceAndEncryptAdaptiveAsync(SharedFileDefinition sharedFileDefinition, UploadProgressState progressState);
}

Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,5 @@ namespace ByteSync.Interfaces.Controls.Communications;

public interface IFileUploadWorker
{
// TODO: remove this method and its implementation
Task UploadAvailableSlicesAsync(Channel<FileUploaderSlice> availableSlices, UploadProgressState progressState);

// TODO: remove this method and its implementation
void StartUploadWorkers(Channel<FileUploaderSlice> availableSlices, int workerCount, UploadProgressState progressState);

Task UploadAvailableSlicesAdaptiveAsync(Channel<FileUploaderSlice> availableSlices, UploadProgressState progressState);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System.Threading;
using System.Threading.Channels;
using ByteSync.Business.Communications.Transfers;
using ByteSync.Common.Business.SharedFiles;
using ByteSync.Interfaces.Controls.Encryptions;

namespace ByteSync.Interfaces.Controls.Communications;

public interface IUploadSlicingManager
{
Task<UploadProgressState> Enqueue(
SharedFileDefinition sharedFileDefinition,
ISlicerEncrypter slicerEncrypter,
Channel<FileUploaderSlice> availableSlices,
SemaphoreSlim semaphoreSlim,
ManualResetEvent exceptionOccurred,
IAdaptiveUploadController adaptiveUploadController);
}


Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
using System.Reactive.Linq;
using ByteSync.Business.Sessions;
using ByteSync.Interfaces.Controls.Communications;
using ByteSync.Interfaces.Services.Sessions;

namespace ByteSync.Services.Communications.Transfers.Uploading;

Expand Down Expand Up @@ -27,15 +30,17 @@ public class AdaptiveUploadController : IAdaptiveUploadController
private int _windowSize;
private readonly ILogger<AdaptiveUploadController> _logger;

public AdaptiveUploadController(ILogger<AdaptiveUploadController> logger)
public AdaptiveUploadController(ILogger<AdaptiveUploadController> logger, ISessionService sessionService)
{
_logger = logger;
_currentChunkSizeBytes = INITIAL_CHUNK_SIZE_BYTES;
_currentParallelism = MIN_PARALLELISM;
_recentDurations = new Queue<TimeSpan>();
_recentPartNumbers = new Queue<int>();
_recentSuccesses = new Queue<bool>();
_windowSize = _currentParallelism;
ResetState();
sessionService.SessionObservable.Subscribe(_ => { ResetState(); });
sessionService.SessionStatusObservable
.Where(status => status == SessionStatus.Preparation)
.Subscribe(_ => { ResetState(); });
}

public int CurrentChunkSizeBytes => _currentChunkSizeBytes;
Expand Down Expand Up @@ -99,20 +104,27 @@ public void RecordUploadResult(TimeSpan elapsed, bool isSuccess, int partNumber,
// Downscale path first
if (maxElapsed > DownscaleThreshold)
{
// Adjust chunk size first (incremental), then parallelism in the same evaluation
_currentChunkSizeBytes = (int)Math.Max(64 * 1024, _currentChunkSizeBytes * 0.75);
_logger.LogInformation(
"Adaptive: Downscale. maxElapsedMs={MaxElapsedMs} > {ThresholdMs}. New chunkKB={ChunkKb}.",
maxElapsed.TotalMilliseconds, DownscaleThreshold.TotalMilliseconds, _currentChunkSizeBytes / 1024);
// First, reduce the number of parallel upload tasks (minimum = 2)
if (_currentParallelism > MIN_PARALLELISM)
{
_logger.LogInformation(
"Adaptive: Downscale. Reducing parallelism {Prev} -> {Next}. Resetting window.",
_currentParallelism, _currentParallelism - 1);
_currentParallelism -= 1;
_windowSize = _currentParallelism;
ResetWindow();
return;
}

// If already at minimum parallelism, reduce chunk size proportionally
var reduced = (int)Math.Max(64 * 1024, _currentChunkSizeBytes * 0.75);
if (reduced != _currentChunkSizeBytes)
{
_currentChunkSizeBytes = reduced;
_logger.LogInformation(
"Adaptive: Downscale. maxElapsedMs={MaxElapsedMs} > {ThresholdMs}. New chunkKB={ChunkKb}.",
maxElapsed.TotalMilliseconds, DownscaleThreshold.TotalMilliseconds, _currentChunkSizeBytes / 1024);
}
// Logging before resetting window done above; now reset
ResetWindow();
return;
}
Expand Down Expand Up @@ -168,4 +180,12 @@ private void ResetWindow()
}
_successesInWindow = 0;
}

private void ResetState()
{
_currentChunkSizeBytes = INITIAL_CHUNK_SIZE_BYTES;
_currentParallelism = MIN_PARALLELISM;
_windowSize = _currentParallelism;
ResetWindow();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,36 @@ public async Task SliceAndEncryptAdaptiveAsync(SharedFileDefinition sharedFileDe
return;
}

// Pause slicing if pending slices exceed 2 × current upload task count
int pending;
int threshold;
await _semaphoreSlim.WaitAsync();
try
{
pending = progressState.TotalCreatedSlices - progressState.TotalUploadedSlices;
threshold = 2 * _adaptiveUploadController.CurrentParallelism;
}
finally
{
_semaphoreSlim.Release();
}

while (pending > threshold && !_exceptionOccurred.WaitOne(0))
{
await Task.Delay(10);

await _semaphoreSlim.WaitAsync();
try
{
pending = progressState.TotalCreatedSlices - progressState.TotalUploadedSlices;
threshold = 2 * _adaptiveUploadController.CurrentParallelism;
}
finally
{
_semaphoreSlim.Release();
}
}

var nextSize = _adaptiveUploadController.GetNextChunkSizeBytes();
if (nextSize > 0)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public FileUploadCoordinator(ILogger<FileUploadCoordinator> logger)
{
_logger = logger;
_syncRoot = new object();
_availableSlices = Channel.CreateBounded<FileUploaderSlice>(2);
_availableSlices = Channel.CreateUnbounded<FileUploaderSlice>();
_uploadingIsFinished = new ManualResetEvent(false);
_exceptionOccurred = new ManualResetEvent(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class FileUploadProcessor : IFileUploadProcessor
private readonly ISlicerEncrypter _slicerEncrypter;
private readonly ILogger<FileUploadProcessor> _logger;
private readonly IFileUploadCoordinator _fileUploadCoordinator;
private readonly IFileSlicer _fileSlicer;
private readonly IUploadSlicingManager _uploadSlicingManager;
private readonly IFileUploadWorker _fileUploadWorker;
private readonly IFilePartUploadAsserter _filePartUploadAsserter;
private readonly string? _localFileToUpload;
Expand All @@ -25,38 +25,40 @@ public FileUploadProcessor(
ISlicerEncrypter slicerEncrypter,
ILogger<FileUploadProcessor> logger,
IFileUploadCoordinator fileUploadCoordinator,
IFileSlicer fileSlicer,
IFileUploadWorker fileUploadWorker,
IFilePartUploadAsserter filePartUploadAsserter,
string? localFileToUpload,
SemaphoreSlim semaphoreSlim,
IAdaptiveUploadController adaptiveUploadController)
IAdaptiveUploadController adaptiveUploadController,
IUploadSlicingManager uploadSlicingManager)
{
_slicerEncrypter = slicerEncrypter;
_logger = logger;
_fileUploadCoordinator = fileUploadCoordinator;
_fileSlicer = fileSlicer;
_fileUploadWorker = fileUploadWorker;
_filePartUploadAsserter = filePartUploadAsserter;
_localFileToUpload = localFileToUpload;
_semaphoreSlim = semaphoreSlim;
_adaptiveUploadController = adaptiveUploadController;
_uploadSlicingManager = uploadSlicingManager;
}

public async Task ProcessUpload(SharedFileDefinition sharedFileDefinition, int? maxSliceLength = null)
{
_progressState = new UploadProgressState();
_progressState.StartTimeUtc = DateTimeOffset.UtcNow;

_progressState = await _uploadSlicingManager.Enqueue(
sharedFileDefinition,
_slicerEncrypter,
_fileUploadCoordinator.AvailableSlices,
_semaphoreSlim,
_fileUploadCoordinator.ExceptionOccurred,
_adaptiveUploadController);

// Start upload workers (adaptive)
for (var i = 0; i < _adaptiveUploadController.CurrentParallelism; i++)
{
_ = Task.Run(() => _fileUploadWorker.UploadAvailableSlicesAdaptiveAsync(_fileUploadCoordinator.AvailableSlices, _progressState!));
}

// Start slicer (adaptive)
await Task.Run(() => _fileSlicer.SliceAndEncryptAdaptiveAsync(sharedFileDefinition, _progressState));


// Wait for completion
await _fileUploadCoordinator.WaitForCompletionAsync();

Expand Down
Loading
Loading