Skip to content

Commit 8041814

Browse files
CopilotDaveSkendercoderabbitai[bot]
authored
Fix SSE integration test comparison logic to use correct series pattern (#1941)
Signed-off-by: Dave Skender <[email protected]> Signed-off-by: GitHub <[email protected]> Co-authored-by: copilot-swe-agent[bot] <[email protected]> Co-authored-by: DaveSkender <[email protected]> Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
1 parent 4309092 commit 8041814

File tree

115 files changed

+1272
-448
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

115 files changed

+1272
-448
lines changed

.devcontainer/devcontainer.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
"vscode": {
2222
"settings": {
2323
"chat.tools.global.autoApprove": true,
24+
"dotnet.defaultSolution": "Stock.Indicators.sln",
2425
"terminal.integrated.defaultProfile.linux": "zsh"
2526
},
2627
"extensions": [

docs/features/stream.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ quoteHub.Add(quote1);
106106
quoteHub.Add(quote2);
107107

108108
// late-arriving data with earlier timestamp
109-
quoteHub.Insert(lateQuote); // triggers recalculation
109+
quoteHub.Add(lateQuote); // triggers recalculation in dependent hubs
110110
111111
// remove incorrect quote
112112
quoteHub.Remove(badQuote); // triggers recalculation
@@ -125,8 +125,8 @@ The hub automatically handles state rollback and recalculation when data arrives
125125

126126
Stream hubs use internal locking to protect cache operations during rebuild and rollback scenarios:
127127

128-
- **Internal cache operations** are thread-safe (Insert, RemoveAt, RemoveRange, Rebuild)
129-
- **External access requires synchronization** when multiple threads call Add, Insert, or Remove
128+
- **Internal cache operations** are thread-safe (Add, RemoveAt, RemoveRange, Rebuild)
129+
- **External access requires synchronization** when multiple threads call Add or Remove
130130
- **Single-threaded usage** requires no additional synchronization
131131
- **Multi-threaded usage** should synchronize external calls to hub methods
132132

docs/guide.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ quoteHub.Add(quote1);
451451
quoteHub.Add(quote2);
452452

453453
// Late-arriving data with earlier timestamp
454-
quoteHub.Insert(lateQuote); // Triggers recalculation
454+
quoteHub.Add(lateQuote); // Triggers recalculation in dependent hubs
455455
456456
// Remove incorrect quote
457457
quoteHub.Remove(badQuote); // Triggers recalculation
@@ -747,7 +747,7 @@ await Task.WhenAll(
747747
**You need locks or channels when:**
748748

749749
- Sharing a single hub instance across multiple threads
750-
- Multiple threads call `.Add()`, `.Insert()`, or `.Remove()` on the same hub
750+
- Do not call `.Add()` or `.Remove()` on the same hub from multiple threads
751751
- Reading results from one thread while another thread updates the hub
752752
- Processing quotes from multiple concurrent sources into one hub
753753

docs/migration.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,7 @@ Popular indicators with complete streaming documentation:
347347
| `UseResult` | `TimeValue` | Type renamed |
348348
| `SmaAnalysis` | `SmaAnalysisResult` | Type renamed |
349349
| `UlcerIndexResult.UI` | `UlcerIndexResult.UlcerIndex` | Property renamed |
350+
| `StreamHub.Insert()` | `StreamHub.Add()` | Insert removed; use Add |
350351
| `SyncSeries()` | (removed) | Use manual alignment |
351352
| `Find()` / `FindIndex()` | LINQ methods | Use `.FirstOrDefault()` etc. |
352353
| `GetBaseQuote()` | `Use(CandlePart)` | Use utility instead |

docs/performance.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ StreamHub style adds observable patterns and state management:
170170
Series (502 quotes): ~25μs total
171171
StreamHub (502 quotes): ~32μs total (~28% overhead)
172172
Per-quote latency: ~64ns average
173-
Rollback (Insert): ~2-5μs for state rebuild
173+
Rollback (late Add): ~2-5μs for state rebuild
174174
```
175175

176176
**Scaling Characteristics:**
@@ -206,7 +206,7 @@ Real-time performance targets for trading applications:
206206
| Single indicator per quote | <100μs | 60-80μs |
207207
| 5 indicators on hub per quote | <500μs | 300-400μs |
208208
| Complex chains (EMA→RSI→Slope) | <200μs | 120-150μs |
209-
| State rebuild (Insert/Remove) | <5ms | 2-3ms |
209+
| State rebuild (Add/Remove) | <5ms | 2-3ms |
210210

211211
### When to Choose Each Style
212212

docs/plans/test-pruning.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ For the remaining 75 indicators, add validation following these patterns:
123123
- **T3**: `ValidateCacheSize(lookbackPeriods * 6, Name);`
124124
- **Multi-parameter**: Calculate `requiredWarmup` based on indicator-specific logic, then call `ValidateCacheSize(requiredWarmup, Name);`
125125

126-
Insert validation call immediately before `Reinitialize()` in each indicator's constructor.
126+
Add validation call immediately before `Reinitialize()` in each indicator's constructor.
127127

128128
---
129129
Last updated: January 26, 2026

src/_common/Enums/Act.cs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,8 @@ internal enum Act
1515
/// </summary>
1616
Ignore = 1,
1717

18-
/// <summary>
19-
/// Insert item without rebuilding cache.
20-
/// </summary>
21-
Insert = 2,
22-
2318
/// <summary>
2419
/// Reset and rebuild from marker position.
2520
/// </summary>
26-
Rebuild = 3
21+
Rebuild = 2
2722
}

src/_common/QuotePart/QuotePart.StreamHub.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,24 @@ CandlePart candlePart
1919
/// <inheritdoc/>
2020
public CandlePart CandlePartSelection { get; init; }
2121

22+
/// <inheritdoc/>
23+
public override void OnAdd(IQuote item, bool notify, int? indexHint)
24+
{
25+
// Lock to prevent concurrent cache access.
26+
lock (CacheLock)
27+
{
28+
(TimeValue result, int index) = ToIndicator(item, indexHint);
29+
30+
if (index >= 0 && index < Cache.Count)
31+
{
32+
InsertWithoutRebuild(result, index, notify);
33+
return;
34+
}
35+
36+
AppendCache(result, notify);
37+
}
38+
}
39+
2240

2341
/// <inheritdoc/>
2442
protected override (TimeValue result, int index)

src/_common/Quotes/Quote.StreamHub.cs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,17 @@ public override void OnAdd(IQuote item, bool notify, int? indexHint)
8484
{
8585
ArgumentNullException.ThrowIfNull(item);
8686

87+
// Reject additions that precede the current cache timeline
88+
// (applies to both standalone and non-standalone QuoteHub)
89+
lock (CacheLock)
90+
{
91+
if (Cache.Count > 0 && item.Timestamp < Cache[0].Timestamp)
92+
{
93+
// Silently ignore - this prevents indeterminate gaps in the timeline
94+
return;
95+
}
96+
}
97+
8798
// for non-standalone QuoteHub, use standard behavior (which handles locking)
8899
if (!_isStandalone)
89100
{
@@ -94,9 +105,19 @@ public override void OnAdd(IQuote item, bool notify, int? indexHint)
94105
// Lock for standalone QuoteHub operations
95106
lock (CacheLock)
96107
{
108+
97109
// get result and position
98110
(IQuote result, int index) = ToIndicator(item, indexHint);
99111

112+
// Reject modifications that would affect indices before MinCacheSize
113+
// to prevent corrupted rebuilds in subscribers
114+
// This includes both insertions and same-timestamp replacements
115+
if (index >= 0 && index < MinCacheSize && index < Cache.Count)
116+
{
117+
// Silently ignore all modifications before MinCacheSize
118+
return;
119+
}
120+
100121
// check if this is a same-timestamp update (not a new item at the end)
101122
if (Cache.Count > 0 && index < Cache.Count && Cache[index].Timestamp == result.Timestamp)
102123
{
@@ -119,6 +140,23 @@ public override void OnAdd(IQuote item, bool notify, int? indexHint)
119140
}
120141
else
121142
{
143+
// if out-of-order insert, insert and trigger rebuild
144+
if (index >= 0 && index < Cache.Count)
145+
{
146+
Cache.Insert(index, result);
147+
148+
// For standalone QuoteHub, notify observers to rebuild from this timestamp
149+
// For non-standalone, this won't be reached due to earlier branch
150+
if (notify)
151+
{
152+
// Notify observers directly - they will rebuild from the updated cache
153+
// No need to call Rebuild on QuoteHub itself since cache is already updated
154+
NotifyObserversOnRebuild(result.Timestamp);
155+
}
156+
157+
return;
158+
}
159+
122160
// standard add behavior for new items
123161
AppendCache(result, notify);
124162
}

src/_common/StreamHub/IStreamHub.cs

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -60,19 +60,6 @@ public interface IStreamHub<in TIn, out TOut> : IStreamObserver<TIn>, IStreamObs
6060
/// </param>
6161
void Add(IEnumerable<TIn> batchIn);
6262

63-
/// <summary>
64-
/// Insert a new item without rebuilding the cache.
65-
/// </summary>
66-
/// <remarks>
67-
/// This is used in situations when inserting an older item
68-
/// and where newer cache entries do not need to be rebuilt.
69-
/// Typically, this is only used for provider-only hubs.
70-
/// </remarks>
71-
/// <param name="newIn">
72-
/// Item to insert
73-
/// </param>
74-
void Insert(TIn newIn);
75-
7663
/// <summary>
7764
/// Delete an item from the cache, from a specific position.
7865
/// </summary>

0 commit comments

Comments
 (0)