-
Notifications
You must be signed in to change notification settings - Fork 2
Architecture
This document provides a comprehensive technical overview of
- Type Hierarchy
- State Machine Design
- Priority Queue Implementation
- Channel Architecture
- Request Lifecycle
- Container Architecture
- Thread Safety Mechanisms
- Performance Optimizations
┌─────────────────────────────────────────────────────────────────┐
│ INTERFACES │
├─────────────────────────────────────────────────────────────────┤
│ │
│ IRequest │
│ ├─ State: RequestState │
│ ├─ StateChanged: EventHandler<RequestState> │
│ ├─ Priority: RequestPriority │
│ ├─ Task: Task │
│ ├─ Exception: AggregateException? │
│ ├─ SubsequentRequest: IRequest? │
│ ├─ StartRequestAsync(): Task │
│ ├─ Start(): void │
│ ├─ Pause(): void │
│ ├─ Cancel(): void │
│ └─ TrySetIdle(): bool │
│ │
│ IProgressableRequest : IRequest │
│ └─ Progress: Progress<float> │
│ │
│ IRequestContainer<TRequest> : IRequest, IEnumerable<TRequest> │
│ ├─ Count: int │
│ ├─ Add(TRequest): void │
│ ├─ AddRange(params TRequest[]): void │
│ ├─ Remove(params TRequest[]): void │
│ ├─ SetPriority(RequestPriority): void │
│ └─ WaitForCurrentRequestsAsync(CancellationToken): Task │
│ │
│ IRequestHandler : IRequestContainer<IRequest> │
│ ├─ CancellationToken: CancellationToken │
│ └─ DefaultSynchronizationContext: SynchronizationContext │
│ │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ ABSTRACT CLASSES │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Request<TOptions, TCompleted, TFailed> : IRequest, IDisposable │
│ └─ implements: IValueTaskSource (efficient awaiting) │
│ │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ CONCRETE IMPLEMENTATIONS │
├─────────────────────────────────────────────────────────────────┤
│ │
│ OwnRequest : Request<...> │
│ RequestContainer<TRequest> : IRequestContainer<TRequest> │
│ ProgressableContainer<TRequest> : RequestContainer<TRequest> │
│ ParallelRequestHandler : IRequestHandler │
│ SequentialRequestHandler : IRequestHandler │
│ │
└─────────────────────────────────────────────────────────────────┘
-
$\text{\color{green}Composition}$ :RequestcontainsRequestOptions,CancellationTokenSource,RequestStateMachine -
$\text{\color{blue}Aggregation}$ :RequestContaineraggregatesIRequest[],RequestHandlermanagesIRequestqueue -
$\text{\color{red}Dependency}$ :Requestdepends onIRequestHandlerfor execution,RequestHandlerdepends on priority channels
public class RequestStateMachine
{
private int _stateInt;
public RequestState State => (RequestState)Volatile.Read(ref _stateInt);
public Action<RequestState, RequestState>? OnStateChanged { get; set; }
public bool TrySetState(RequestState from, RequestState to)
{
int currentInt = (int)from;
int newInt = (int)to;
// Lock-free atomic compare-and-swap
int oldInt = Interlocked.CompareExchange(ref _stateInt, newInt, currentInt);
if (oldInt == currentInt)
{
// Transition succeeded
OnStateChanged?.Invoke(from, to);
return true;
}
// Transition failed (state changed concurrently)
return false;
}
public bool TrySetStateMulti(RequestState to, params RequestState[] from)
{
foreach (var fromState in from)
{
if (TrySetState(fromState, to))
return true;
}
return false;
}
}Individual Request (RequestStateMachine):
| From State | Valid To States |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
| (terminal) | |
| (terminal) | |
| (terminal) |
Container/Handler (RequestContainerStateMachine):
| From State | Valid To States |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Why Lock-Free?
- Performance: No contention on state reads/writes
- Predictability: No lock convoy effect
- Correctness: ABA problem avoided with validation
Why CompareExchange?
- Atomic operation ensures only one thread succeeds
- Failed transitions indicate concurrent modification
- Enables retry logic at caller level
Example Transition Flow:
// In Request.Start():
if (_stateMachine.TrySetState(RequestState.Paused, RequestState.Idle))
{
// Successfully transitioned, proceed
Options.Handler.Add(this);
}
else
{
// State was changed concurrently, check new state
if (State == RequestState.Running)
return; // Already running, nothing to do
}Why Quaternary (4-ary) instead of Binary?
-
Reduced Tree Depth:
$\log_4 n$ vs$\log_2 n$ = ~50% fewer levels - Better Cache Locality: More children per node → fewer pointer chases
- Efficient Heap Operations: Fewer comparisons per level
public class ConcurrentPriorityQueue<TElement>
{
private PriorityItem<TElement>[] _heap;
private int _size;
private long _globalInsertionCounter; // FIFO within same priority
private readonly object _lock = new();
// Priority item with insertion order for FIFO
private record struct PriorityItem<T>(float Priority, T Item, long Order);
} [0]
/ | \ \
[1][2][3][4]
/ | \ \
[5][6][7][8] ...
// Parent of index i: (i - 1) / 4
private static int GetParentIndex(int index) => (index - 1) >> 2;
// First child of index i: i * 4 + 1
private static int GetFirstChildIndex(int index) => (index << 2) + 1;
// Children of index i: [i*4+1, i*4+2, i*4+3, i*4+4]public void Enqueue(TElement item, float priority)
{
long order = Interlocked.Increment(ref _globalInsertionCounter);
lock (_lock)
{
EnsureCapacity();
int index = _size;
_heap[index] = new PriorityItem<TElement>(priority, item, order);
_size++;
MoveUp(index); // Bubble up to correct position
}
}
private void MoveUp(int index)
{
var item = _heap[index];
while (index > 0)
{
int parentIndex = GetParentIndex(index);
var parent = _heap[parentIndex];
// Compare by priority, then insertion order
if (CompareItems(item, parent) >= 0)
break; // Correct position found
// Swap with parent
_heap[index] = parent;
index = parentIndex;
}
_heap[index] = item;
}
private int CompareItems(PriorityItem<TElement> a, PriorityItem<TElement> b)
{
int priorityComp = a.Priority.CompareTo(b.Priority);
if (priorityComp != 0)
return priorityComp; // Different priorities
// Same priority: earlier insertion (lower order) = higher priority
return a.Order.CompareTo(b.Order);
}Complexity:
public bool TryDequeue(out TElement? item, out float priority)
{
lock (_lock)
{
if (_size == 0)
{
item = default;
priority = default;
return false;
}
// Extract min (root)
var minItem = _heap[0];
item = minItem.Item;
priority = minItem.Priority;
_size--;
if (_size > 0)
{
// Move last item to root
_heap[0] = _heap[_size];
MoveDown(0); // Bubble down to correct position
}
return true;
}
}
private void MoveDown(int index)
{
var item = _heap[index];
while (true)
{
int firstChildIndex = GetFirstChildIndex(index);
if (firstChildIndex >= _size)
break; // No children
// Find smallest among up to 4 children
int smallestChildIndex = firstChildIndex;
var smallestChild = _heap[firstChildIndex];
for (int i = 1; i < 4 && firstChildIndex + i < _size; i++)
{
var child = _heap[firstChildIndex + i];
if (CompareItems(child, smallestChild) < 0)
{
smallestChildIndex = firstChildIndex + i;
smallestChild = child;
}
}
if (CompareItems(item, smallestChild) <= 0)
break; // Correct position found
// Swap with smallest child
_heap[index] = smallestChild;
index = smallestChildIndex;
}
_heap[index] = item;
}Complexity:
private long _globalInsertionCounter = 0;
// On enqueue:
long order = Interlocked.Increment(ref _globalInsertionCounter); // Thread-safe increment
// In comparison:
if (a.Priority == b.Priority)
return a.Order.CompareTo(b.Order); // Lower order = inserted earlier = higher priorityGuarantees:
- Requests with same priority are dequeued in insertion order (FIFO)
- Global counter prevents ABA problem
- Thread-safe via Interlocked operations
Wraps ConcurrentPriorityQueue with System.Threading.Channels-like interface.
public class DynamicPriorityChannel<TElement> : IPriorityChannel<TElement>
{
private readonly ConcurrentPriorityQueue<TElement> _queue;
private readonly ParallelChannelOptions _options;
private readonly SemaphoreSlim _semaphore;
public ChannelReader<PriorityItem<TElement>> Reader { get; }
public ChannelWriter<PriorityItem<TElement>> Writer { get; }
}public class DynamicPriorityChannelReader : ChannelReader<PriorityItem<TElement>>
{
public override bool TryRead(out PriorityItem<TElement> item)
{
return _parent._queue.TryDequeue(out item);
}
public override ValueTask<PriorityItem<TElement>> ReadAsync(CancellationToken ct)
{
if (TryRead(out var item))
return new ValueTask<PriorityItem<TElement>>(item);
// Wait for item availability
return ReadSlowAsync(ct);
}
private async ValueTask<PriorityItem<TElement>> ReadSlowAsync(CancellationToken ct)
{
var asyncOp = new AsyncOperation<PriorityItem<TElement>>();
lock (_parent._blockedReaders)
{
// Double-check after lock
if (TryRead(out var item))
return item;
// Add to blocked readers queue
_parent._blockedReaders.Enqueue(asyncOp);
}
// Wait for writer to signal
return await asyncOp.Task.ConfigureAwait(false);
}
}public class DynamicPriorityChannelWriter : ChannelWriter<PriorityItem<TElement>>
{
public override bool TryWrite(PriorityItem<TElement> item)
{
_parent._queue.Enqueue(item.Item, item.Priority);
// Wake up blocked readers
lock (_parent._blockedReaders)
{
if (_parent._blockedReaders.TryDequeue(out var blockedReader))
{
// Signal waiting reader
blockedReader.TrySetResult(item);
}
}
return true;
}
}public async Task RunParallelReader(
Func<PriorityItem<TElement>, CancellationToken, ValueTask> handler,
CancellationToken ct)
{
await Parallel.ForEachAsync(
ReadAllAsync(ct), // Async enumerable
new ParallelOptions
{
MaxDegreeOfParallelism = _options.MaxDegreeOfParallelism,
CancellationToken = ct
},
async (item, token) =>
{
// Throttle with semaphore
await _semaphore.WaitAsync(token);
try
{
await handler(item, token);
}
finally
{
_semaphore.Release();
}
});
}_options.DegreeOfParallelismChangedDelta += (sender, delta) =>
{
if (delta > 0)
{
// Increase available permits
_semaphore.Release(delta);
}
else if (delta < 0)
{
// Decrease available permits (async wait to drain)
_ = Task.Run(async () =>
{
for (int i = 0; i < Math.Abs(delta); i++)
await _semaphore.WaitAsync(_options.CancellationToken);
});
}
};┌─────────────────────────────────────────────────────────────────┐
│ 1. CREATION │
├─────────────────────────────────────────────────────────────────┤
│ var request = new OwnRequest(..., options); │
│ - Captures SynchronizationContext │
│ - Initializes state machine (Paused) │
│ - Creates CancellationTokenSource │
│ - Registers callbacks │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ 2. AUTO-START (if AutoStart = true) │
├─────────────────────────────────────────────────────────────────┤
│ AutoStart() called in constructor │
│ - Calls Start() │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ 3. START │
├─────────────────────────────────────────────────────────────────┤
│ request.Start() │
│ - If DeployDelay: Paused → Waiting → (delay) → Idle │
│ - Else: Paused → Idle │
│ - Adds to handler: Options.Handler.Add(this) │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ 4. QUEUED IN HANDLER │
├─────────────────────────────────────────────────────────────────┤
│ Handler enqueues in priority queue │
│ - Priority: High (0) > Normal (1) > Low (2) │
│ - FIFO within same priority level │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ 5. DEQUEUED FOR EXECUTION │
├─────────────────────────────────────────────────────────────────┤
│ Handler dequeues highest-priority request │
│ - Calls IRequest.StartRequestAsync() │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ 6. EXECUTION START │
├─────────────────────────────────────────────────────────────────┤
│ StartRequestAsync() │
│ - Idle → Running │
│ - Fires RequestStarted callback │
│ - Sets AsyncLocal context (for Yield) │
│ - Calls ExecuteInternalAsync() │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ 7. REQUEST LOGIC EXECUTION │
├─────────────────────────────────────────────────────────────────┤
│ ExecuteInternalAsync() → RunRequestAsync() (user code) │
│ - May call await Request.Yield() (cooperative pause/cancel) │
│ - Returns RequestReturn { Successful, CompletedReturn, ... } │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ 8. RESULT PROCESSING │
├─────────────────────────────────────────────────────────────────┤
│ ProcessResultAsync(RequestReturn result) │
│ │
│ If Successful: │
│ - Running → Completed │
│ - Fires RequestCompleted callback │
│ - Sets TaskCompletionSource result │
│ │
│ If Failed: │
│ - Increment AttemptCounter │
│ - If AttemptCounter < NumberOfAttempts: │
│ - If DelayBetweenAttempts: Running → Waiting → Idle │
│ - Else: Running → Idle │
│ - Re-add to handler (RETRY) │
│ - Else: │
│ - Running → Failed (terminal) │
│ - Fires RequestFailed callback │
│ - Sets TaskCompletionSource exception │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ 9. SUBSEQUENT REQUEST HANDLING (if configured) │
├─────────────────────────────────────────────────────────────────┤
│ If SubsequentRequest != null: │
│ - If parent Completed: Start subsequent immediately │
│ - If parent Failed/Cancelled: Dispose subsequent (cascade) │
└─────────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────────┐
│ 10. DISPOSAL │
├─────────────────────────────────────────────────────────────────┤
│ request.Dispose() │
│ - Disposes CancellationTokenSource │
│ - Clears callbacks │
│ - Removes from handler │
└─────────────────────────────────────────────────────────────────┘
┌──────────────────────────────────┐
│ User calls request.Pause() │
│ - Running → Paused │
└──────────────────────────────────┘
↓
┌──────────────────────────────────┐
│ Next await Request.Yield() │
│ - Checks State == Paused │
│ - Creates TaskCompletionSource │
│ - Awaits TCS.Task (blocks) │
└──────────────────────────────────┘
↓
┌──────────────────────────────────┐
│ User calls request.Start() │
│ - Paused → Running │
│ - Calls StartRequestAsync() │
└──────────────────────────────────┘
↓
┌──────────────────────────────────┐
│ StartRequestAsync() resumes │
│ - Sets TCS result │
│ - Unblocks Yield() │
│ - Execution continues │
└──────────────────────────────────┘
public class RequestContainer<TRequest> : IRequestContainer<TRequest>
where TRequest : IRequest
{
private volatile TRequest?[] _requests = Array.Empty<TRequest?>();
private int _count = 0;
private readonly object _writeLock = new(); // Spin-lock
private TaskCompletionSource _taskCompletionSource;
private readonly RequestContainerStateMachine _stateMachine;
}public void Add(TRequest request)
{
lock (_writeLock)
{
// Ensure capacity
if (_count == _requests.Length)
{
int newCapacity = _requests.Length == 0 ? 4 : _requests.Length * 2;
var newArray = new TRequest?[newCapacity];
Array.Copy(_requests, newArray, _count);
_requests = newArray; // Volatile write
}
_requests[_count++] = request;
// Subscribe to state changes
request.StateChanged += OnChildStateChanged;
// Apply container state to new request
if (State == RequestState.Running)
request.Start();
else if (State == RequestState.Paused)
request.Pause();
NewTaskCompletion(); // Recreate TCS for new total
}
}private void OnChildStateChanged(object? sender, RequestState newState)
{
// Recalculate container state
var calculatedState = CalculateState();
if (_stateMachine.TrySetState(State, calculatedState))
{
// State changed, fire event
StateChanged?.Invoke(this, calculatedState);
}
}
private RequestState CalculateState()
{
var snapshot = _requests; // Volatile read
int count = _count;
// Count each state
var stateCounts = new Dictionary<RequestState, int>();
for (int i = 0; i < count; i++)
{
var request = snapshot[i];
if (request != null)
{
var state = request.State;
stateCounts[state] = stateCounts.GetValueOrDefault(state) + 1;
}
}
// Priority order: Failed > Running > Cancelled > Idle > Waiting > Completed > Paused
if (stateCounts.GetValueOrDefault(RequestState.Failed) > 0)
return RequestState.Failed;
if (stateCounts.GetValueOrDefault(RequestState.Running) > 0)
return RequestState.Running;
if (stateCounts.GetValueOrDefault(RequestState.Cancelled) > 0)
return RequestState.Cancelled;
if (stateCounts.GetValueOrDefault(RequestState.Idle) > 0)
return RequestState.Idle;
if (stateCounts.GetValueOrDefault(RequestState.Waiting) > 0)
return RequestState.Waiting;
if (stateCounts.GetValueOrDefault(RequestState.Completed) > 0)
return RequestState.Completed;
return RequestState.Paused;
}public class ProgressableContainer<TRequest> : RequestContainer<TRequest>
where TRequest : IProgressableRequest
{
private readonly CombinableProgress _combinedProgress = new();
private class CombinableProgress
{
private float _currentAverage = 0f;
private readonly List<float> _values = new();
private readonly object _lock = new();
public void UpdateProgress(int index, float newValue)
{
lock (_lock)
{
float oldValue = _values[index];
// Incremental average update: O(1)
// new_avg = old_avg + (new_val - old_val) / count
_currentAverage += (newValue - oldValue) / _values.Count;
_values[index] = newValue;
// Report aggregated progress
((IProgress<float>)this).Report(_currentAverage);
}
}
}
}Why Incremental?
- O(1) vs O(n): No need to sum all values on each update
- Lock Time: Minimal lock hold time
- Accuracy: Mathematically equivalent to full recalculation
// Atomic compare-and-swap
int old = Interlocked.CompareExchange(ref _stateInt, (int)to, (int)from);
return old == (int)from; // Success if value matched expectationprivate volatile TRequest?[] _requests; // Volatile ensures visibility
public TRequest?[] GetSnapshot()
{
return _requests; // Volatile read, sees latest writes
}private readonly object _writeLock = new();
public void Add(TRequest request)
{
lock (_writeLock) // Spin-based lock for short critical sections
{
// Modify array
}
}if (SynchronizationContext != null)
SynchronizationContext.Post(s_callback, state); // Marshal to original context
else
callback.Invoke(state); // Direct invoke (thread pool)private long _globalInsertionCounter;
long order = Interlocked.Increment(ref _globalInsertionCounter); // Thread-safe increment- 50% fewer tree levels than binary heap
- Better cache locality
- No contention on state reads
- CompareExchange for atomic writes
if (!Token.IsCancellationRequested && State != Running)
return ValueTask.CompletedTask; // No allocation- Zero-allocation awaiting for pause/resume
- O(1) updates instead of O(n) summation
private static readonly SendOrPostCallback s_callback = state => { ... };
// Shared across all instances, no allocations- Readers lock-free (volatile read)
- Writers use lock (rare operation)
- Factor-of-2 growth prevents frequent reallocations
- Amortized O(1) append
-
$\text{\color{lightblue}Type Hierarchy}$ : Clean interfaces with extensible implementations -
$\text{\color{orange}State Machines}$ : Lock-free atomic transitions for correctness and performance -
$\text{\color{green}Priority Queues}$ : Quaternary heap with FIFO ordering for efficient scheduling -
$\text{\color{blue}Channels}$ : Flexible async reader/writer pattern with dynamic parallelism -
$\text{\color{purple}Lifecycle}$ : Complete flow from creation to disposal with retry and chaining -
$\text{\color{red}Containers}$ : Smart state aggregation with O(1) progress tracking -
$\text{\color{orange}Thread Safety}$ : Mix of lock-free, volatile, and lock-based mechanisms -
$\text{\color{green}Performance}$ : Numerous optimizations for zero-allocation hot paths
This architecture enables high-throughput, low-latency async workflows suitable for production systems.