Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
2fdbebe
Initial plan
Copilot Jan 26, 2026
cf1c2ec
fix: Correct series comparison logic in stream hub pruning tests
Copilot Jan 26, 2026
bf44234
perf: Use direct list constructor for better performance
Copilot Jan 26, 2026
3c2aee3
WIP: Attempt to fix series comparison logic with timestamp-based filt…
Copilot Jan 27, 2026
e67063e
revert: Restore original SSE test logic temporarily
Copilot Jan 27, 2026
0eb0393
Merge branch 'test-pruning' into copilot/sub-pr-1937
DaveSkender Jan 27, 2026
bd8ed2f
fix: Apply TakeLast pattern for test comparison logic
Copilot Jan 27, 2026
8dac6f2
fix: Apply Insert operations to quote list before series calculation
Copilot Jan 27, 2026
96d7034
fix: Correctly handle Insert/Remove operations as no-ops for series
Copilot Jan 27, 2026
7878213
Merge branch 'test-pruning' into copilot/sub-pr-1937
DaveSkender Jan 28, 2026
0beaf5d
Refactor ThreadSafety integration tests for clarity
DaveSkender Jan 28, 2026
f69767a
Merge branch 'test-pruning' into copilot/sub-pr-1937
DaveSkender Jan 28, 2026
bbaa89d
Merge branch 'test-pruning' into copilot/sub-pr-1937
DaveSkender Feb 2, 2026
255592e
refactor: Remove StreamHub Insert API (#1945)
DaveSkender Feb 2, 2026
d865ba1
fix: Address code review feedback
Copilot Feb 2, 2026
96e7c91
refactor: Improve thread safety documentation and constants
DaveSkender Feb 2, 2026
f8f0820
refactor: Clarify comments for QuoteAction instantiation
DaveSkender Feb 2, 2026
9661860
QuoteHub: Ignore Add operations for quotes preceding current timeline…
Copilot Feb 2, 2026
89856f4
refactor: Update migration guide and improve thread safety tests
DaveSkender Feb 2, 2026
7af82c1
fix: Trigger rebuild on out-of-order insert in QuoteHub
DaveSkender Feb 2, 2026
b78b288
refactor: Clean up whitespace in QuoteHub's insert logic
DaveSkender Feb 2, 2026
996b510
refactor: Improve comments and enhance QuoteHub functionality
DaveSkender Feb 2, 2026
7d26d79
refactor: Simplify variable assignment in MinCacheSize tests
DaveSkender Feb 2, 2026
9d13d3f
refactor: Update observer notification logic in QuoteHub
DaveSkender Feb 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"vscode": {
"settings": {
"chat.tools.global.autoApprove": true,
"dotnet.defaultSolution": "Stock.Indicators.sln",
"terminal.integrated.defaultProfile.linux": "zsh"
},
"extensions": [
Expand Down
6 changes: 3 additions & 3 deletions docs/features/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ quoteHub.Add(quote1);
quoteHub.Add(quote2);

// late-arriving data with earlier timestamp
quoteHub.Insert(lateQuote); // triggers recalculation
quoteHub.Add(lateQuote); // triggers recalculation in dependent hubs

// remove incorrect quote
quoteHub.Remove(badQuote); // triggers recalculation
Expand All @@ -125,8 +125,8 @@ The hub automatically handles state rollback and recalculation when data arrives

Stream hubs use internal locking to protect cache operations during rebuild and rollback scenarios:

- **Internal cache operations** are thread-safe (Insert, RemoveAt, RemoveRange, Rebuild)
- **External access requires synchronization** when multiple threads call Add, Insert, or Remove
- **Internal cache operations** are thread-safe (Add, RemoveAt, RemoveRange, Rebuild)
- **External access requires synchronization** when multiple threads call Add or Remove
- **Single-threaded usage** requires no additional synchronization
- **Multi-threaded usage** should synchronize external calls to hub methods

Expand Down
4 changes: 2 additions & 2 deletions docs/guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ quoteHub.Add(quote1);
quoteHub.Add(quote2);

// Late-arriving data with earlier timestamp
quoteHub.Insert(lateQuote); // Triggers recalculation
quoteHub.Add(lateQuote); // Triggers recalculation in dependent hubs

// Remove incorrect quote
quoteHub.Remove(badQuote); // Triggers recalculation
Expand Down Expand Up @@ -747,7 +747,7 @@ await Task.WhenAll(
**You need locks or channels when:**

- Sharing a single hub instance across multiple threads
- Multiple threads call `.Add()`, `.Insert()`, or `.Remove()` on the same hub
- Do not call `.Add()` or `.Remove()` on the same hub from multiple threads
- Reading results from one thread while another thread updates the hub
- Processing quotes from multiple concurrent sources into one hub

Expand Down
1 change: 1 addition & 0 deletions docs/migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ Popular indicators with complete streaming documentation:
| `UseResult` | `TimeValue` | Type renamed |
| `SmaAnalysis` | `SmaAnalysisResult` | Type renamed |
| `UlcerIndexResult.UI` | `UlcerIndexResult.UlcerIndex` | Property renamed |
| `StreamHub.Insert()` | `StreamHub.Add()` | Insert removed; use Add |
| `SyncSeries()` | (removed) | Use manual alignment |
| `Find()` / `FindIndex()` | LINQ methods | Use `.FirstOrDefault()` etc. |
| `GetBaseQuote()` | `Use(CandlePart)` | Use utility instead |
Expand Down
4 changes: 2 additions & 2 deletions docs/performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ StreamHub style adds observable patterns and state management:
Series (502 quotes): ~25μs total
StreamHub (502 quotes): ~32μs total (~28% overhead)
Per-quote latency: ~64ns average
Rollback (Insert): ~2-5μs for state rebuild
Rollback (late Add): ~2-5μs for state rebuild
```

**Scaling Characteristics:**
Expand Down Expand Up @@ -206,7 +206,7 @@ Real-time performance targets for trading applications:
| Single indicator per quote | <100μs | 60-80μs |
| 5 indicators on hub per quote | <500μs | 300-400μs |
| Complex chains (EMA→RSI→Slope) | <200μs | 120-150μs |
| State rebuild (Insert/Remove) | <5ms | 2-3ms |
| State rebuild (Add/Remove) | <5ms | 2-3ms |

### When to Choose Each Style

Expand Down
2 changes: 1 addition & 1 deletion docs/plans/test-pruning.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ For the remaining 75 indicators, add validation following these patterns:
- **T3**: `ValidateCacheSize(lookbackPeriods * 6, Name);`
- **Multi-parameter**: Calculate `requiredWarmup` based on indicator-specific logic, then call `ValidateCacheSize(requiredWarmup, Name);`

Insert validation call immediately before `Reinitialize()` in each indicator's constructor.
Add validation call immediately before `Reinitialize()` in each indicator's constructor.

---
Last updated: January 26, 2026
7 changes: 1 addition & 6 deletions src/_common/Enums/Act.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,8 @@ internal enum Act
/// </summary>
Ignore = 1,

/// <summary>
/// Insert item without rebuilding cache.
/// </summary>
Insert = 2,

/// <summary>
/// Reset and rebuild from marker position.
/// </summary>
Rebuild = 3
Rebuild = 2
}
18 changes: 18 additions & 0 deletions src/_common/QuotePart/QuotePart.StreamHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,24 @@ CandlePart candlePart
/// <inheritdoc/>
public CandlePart CandlePartSelection { get; init; }

/// <inheritdoc/>
public override void OnAdd(IQuote item, bool notify, int? indexHint)
{
// Lock to prevent concurrent cache access.
lock (CacheLock)
{
(TimeValue result, int index) = ToIndicator(item, indexHint);

if (index >= 0 && index < Cache.Count)
{
InsertWithoutRebuild(result, index, notify);
return;
}

AppendCache(result, notify);
}
}


/// <inheritdoc/>
protected override (TimeValue result, int index)
Expand Down
38 changes: 38 additions & 0 deletions src/_common/Quotes/Quote.StreamHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,17 @@ public override void OnAdd(IQuote item, bool notify, int? indexHint)
{
ArgumentNullException.ThrowIfNull(item);

// Reject additions that precede the current cache timeline
// (applies to both standalone and non-standalone QuoteHub)
lock (CacheLock)
{
if (Cache.Count > 0 && item.Timestamp < Cache[0].Timestamp)
{
// Silently ignore - this prevents indeterminate gaps in the timeline
return;
}
}

// for non-standalone QuoteHub, use standard behavior (which handles locking)
if (!_isStandalone)
{
Expand All @@ -94,9 +105,19 @@ public override void OnAdd(IQuote item, bool notify, int? indexHint)
// Lock for standalone QuoteHub operations
lock (CacheLock)
{

// get result and position
(IQuote result, int index) = ToIndicator(item, indexHint);

// Reject modifications that would affect indices before MinCacheSize
// to prevent corrupted rebuilds in subscribers
// This includes both insertions and same-timestamp replacements
if (index >= 0 && index < MinCacheSize && index < Cache.Count)
{
// Silently ignore all modifications before MinCacheSize
return;
}

// check if this is a same-timestamp update (not a new item at the end)
if (Cache.Count > 0 && index < Cache.Count && Cache[index].Timestamp == result.Timestamp)
{
Expand All @@ -119,6 +140,23 @@ public override void OnAdd(IQuote item, bool notify, int? indexHint)
}
else
{
// if out-of-order insert, insert and trigger rebuild
if (index >= 0 && index < Cache.Count)
{
Cache.Insert(index, result);

// For standalone QuoteHub, notify observers to rebuild from this timestamp
// For non-standalone, this won't be reached due to earlier branch
if (notify)
{
// Notify observers directly - they will rebuild from the updated cache
// No need to call Rebuild on QuoteHub itself since cache is already updated
NotifyObserversOnRebuild(result.Timestamp);
}

return;
}

// standard add behavior for new items
AppendCache(result, notify);
}
Expand Down
13 changes: 0 additions & 13 deletions src/_common/StreamHub/IStreamHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,6 @@ public interface IStreamHub<in TIn, out TOut> : IStreamObserver<TIn>, IStreamObs
/// </param>
void Add(IEnumerable<TIn> batchIn);

/// <summary>
/// Insert a new item without rebuilding the cache.
/// </summary>
/// <remarks>
/// This is used in situations when inserting an older item
/// and where newer cache entries do not need to be rebuilt.
/// Typically, this is only used for provider-only hubs.
/// </remarks>
/// <param name="newIn">
/// Item to insert
/// </param>
void Insert(TIn newIn);

/// <summary>
/// Delete an item from the cache, from a specific position.
/// </summary>
Expand Down
8 changes: 8 additions & 0 deletions src/_common/StreamHub/IStreamObservable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ public interface IStreamObservable<out T>
/// </summary>
int MaxCacheSize { get; }

/// <summary>
/// Gets the minimum cache size required by this hub or its subscribers.
/// This value represents the maximum minimum cache size requirement from all subscribers.
/// Additions that would modify index values before this position are rejected to prevent
/// corrupted rebuilds.
/// </summary>
int MinCacheSize { get; }

/// <summary>
/// Gets the current number of subscribers.
/// </summary>
Expand Down
3 changes: 3 additions & 0 deletions src/_common/StreamHub/Providers/BaseProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public class BaseProvider<T>(int maxCacheSize = 0)
/// <inheritdoc/>
public int MaxCacheSize { get; } = maxCacheSize;

/// <inheritdoc/>
public int MinCacheSize => 0; // Base provider has no minimum cache requirement

/// <inheritdoc />
public int ObserverCount => 0;

Expand Down
68 changes: 63 additions & 5 deletions src/_common/StreamHub/StreamHub.Observable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@
{
private readonly HashSet<IStreamObserver<TOut>> _observers = [];

/// <summary>
/// Stores the hub's own minimum cache size requirement (baseline).
/// This value represents the warmup periods needed by this hub itself,
/// independent of its subscribers.
/// </summary>
private int _minCacheSizeBaseline;

Check warning on line 14 in src/_common/StreamHub/StreamHub.Observable.cs

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

src/_common/StreamHub/StreamHub.Observable.cs#L14

Remove unassigned field '_minCacheSizeBaseline', or set its value.

// PROPERTIES

/// <inheritdoc/>
Expand All @@ -14,6 +21,9 @@
/// <inheritdoc/>
public int MaxCacheSize { get; private set; }

/// <inheritdoc/>
public int MinCacheSize { get; private set; }

/// <inheritdoc/>
public int ObserverCount => _observers.Count;

Expand All @@ -30,12 +40,46 @@
public IDisposable Subscribe(IStreamObserver<TOut> observer)
{
_observers.Add(observer);
return new Unsubscriber(_observers, observer);

// Update MinCacheSize to the maximum of all subscribers
UpdateMinCacheSize();

return new Unsubscriber(_observers, observer, this);
}

/// <inheritdoc/>
public bool Unsubscribe(IStreamObserver<TOut> observer)
=> _observers.Remove(observer);
{
bool removed = _observers.Remove(observer);

// Re-evaluate MinCacheSize after unsubscribing
if (removed)
{
UpdateMinCacheSize();
}

return removed;
}

/// <summary>
/// Updates the MinCacheSize to the maximum of this hub's baseline requirement
/// and all subscribers' MinCacheSize values.
/// </summary>
private void UpdateMinCacheSize()
{
// Start from the hub's own baseline requirement
int maxMinCacheSize = _minCacheSizeBaseline;

foreach (IStreamObserver<TOut> observer in _observers)
{
if (observer is IStreamObservable<ISeries> observable)
{
maxMinCacheSize = Math.Max(maxMinCacheSize, observable.MinCacheSize);
}
}

MinCacheSize = maxMinCacheSize;
}

/// <inheritdoc/>
public void EndTransmission()
Expand All @@ -51,6 +95,9 @@
}

_observers.Clear();

// Reset to baseline when all subscribers are removed
MinCacheSize = _minCacheSizeBaseline;
}

/// <summary>
Expand All @@ -63,17 +110,28 @@
/// <param name="observer">
/// Your unique subscription as provided.
/// </param>
/// <param name="hub">
/// The parent hub that needs MinCacheSize re-evaluation on unsubscribe.
/// </param>
private sealed class Unsubscriber(
ISet<IStreamObserver<TOut>> observers,
IStreamObserver<TOut> observer) : IDisposable
IStreamObserver<TOut> observer,
StreamHub<TIn, TOut> hub) : IDisposable
{
private readonly ISet<IStreamObserver<TOut>> _observers = observers;
private readonly IStreamObserver<TOut> _observer = observer;
private readonly StreamHub<TIn, TOut> _hub = hub;

/// <summary>
/// Remove single observer.
/// Remove single observer and update parent MinCacheSize.
/// </summary>
public void Dispose() => _observers.Remove(_observer);
public void Dispose()
{
if (_observers.Remove(_observer))
{
_hub.UpdateMinCacheSize();
}
}
}

/// <summary>
Expand Down
Loading
Loading