Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/plans/streaming-indicators.plan.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,12 @@ Based on performance analysis (January 3, 2026), the following indicators have c
- **Status**: COMPLETE - Significant optimization achieved while maintaining mathematical correctness
- **Priority**: 🔴 HIGH

- [ ] **P006** - Prs StreamHub performance optimization (3-4 hours)
- [x] **P006** - Prs StreamHub performance optimization (3-4 hours)
- **Current**: 7.47x slower than Series (35,070 ns vs 4,694 ns)
- **Problem**: Potential state management or allocation inefficiencies
- **Action**: Review implementation for unnecessary recalculations
- **Priority**: 🔴 HIGH
- **Status**: COMPLETE - Implemented StreamHub with O(1) performance using cache reference refresh pattern

- [x] **P007** - Roc StreamHub performance optimization (3-4 hours)
- **Current**: 6.98x slower than Series (30,153 ns vs 4,322 ns)
Expand Down
119 changes: 119 additions & 0 deletions src/m-r/Prs/Prs.StreamHub.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
namespace Skender.Stock.Indicators;

/// <summary>
/// Streaming hub for Price Relative Strength (PRS).
/// </summary>
public class PrsHub
: ChainHub<IReusable, PrsResult>, IPrs
{
private readonly IStreamObservable<IReusable> _providerBase;
private IReadOnlyList<IReusable> _baseCache = null!;

internal PrsHub(
IChainProvider<IReusable> providerEval,
IChainProvider<IReusable> providerBase,
int lookbackPeriods) : base(providerEval)
{
_providerBase = providerBase ?? throw new ArgumentNullException(nameof(providerBase));
_baseCache = _providerBase.GetCacheRef();
LookbackPeriods = lookbackPeriods == int.MinValue ? int.MinValue : lookbackPeriods;

if (lookbackPeriods is <= 0 and not int.MinValue)
{
throw new ArgumentOutOfRangeException(
nameof(lookbackPeriods), lookbackPeriods,
"Lookback periods must be greater than 0 for Price Relative Strength.");
}

Name = lookbackPeriods == int.MinValue
? "PRS"
: $"PRS({lookbackPeriods})";

Reinitialize();
}

/// <inheritdoc/>
public int LookbackPeriods { get; init; }

/// <inheritdoc/>
protected override (PrsResult result, int index)
ToIndicator(IReusable itemEval, int? indexHint)
{
ArgumentNullException.ThrowIfNull(itemEval);

// refresh base cache reference
_baseCache = _providerBase.GetCacheRef();

int i = indexHint ?? ProviderCache.IndexOf(itemEval, true);

// validate matching timestamps
if (i >= _baseCache.Count)
{
throw new InvalidQuotesException(
nameof(itemEval), itemEval.Timestamp,
"Base quotes should have at least as many records as Eval quotes for PRS.");
}

IReusable itemBase = _baseCache[i];

if (itemEval.Timestamp != itemBase.Timestamp)
{
throw new InvalidQuotesException(
nameof(itemEval), itemEval.Timestamp,
"Timestamp sequence does not match. "
+ "Price Relative requires matching dates in provided histories.");
}

// calculate PRS ratio
double? prs = itemBase.Value == 0
? null
: (itemEval.Value / itemBase.Value).NaN2Null();

// calculate PRS percent if lookback is specified
double? prsPercent = null;

if (LookbackPeriods > 0 && i >= LookbackPeriods)
{
IReusable baseOld = _baseCache[i - LookbackPeriods];
IReusable evalOld = ProviderCache[i - LookbackPeriods];

if (baseOld.Value != 0 && evalOld.Value != 0)
{
double pctBase = (itemBase.Value - baseOld.Value) / baseOld.Value;
double pctEval = (itemEval.Value - evalOld.Value) / evalOld.Value;

prsPercent = (pctEval - pctBase).NaN2Null();
}
}

PrsResult r = new(
Timestamp: itemEval.Timestamp,
Prs: prs,
PrsPercent: prsPercent);

return (r, i);
}
}

public static partial class Prs
{
/// <summary>
/// Creates a PRS streaming hub from two chain providers.
/// </summary>
/// <param name="chainProviderEval">The evaluation chain provider.</param>
/// <param name="chainProviderBase">The base chain provider.</param>
/// <param name="lookbackPeriods">The number of periods for the PRS% lookback calculation. Optional.</param>
/// <returns>A PRS hub.</returns>
/// <exception cref="ArgumentNullException">Thrown when either chain provider is null.</exception>
/// <exception cref="ArgumentOutOfRangeException">Thrown when the lookback periods are invalid.</exception>
public static PrsHub ToPrsHub(
this IChainProvider<IReusable> chainProviderEval,
IChainProvider<IReusable> chainProviderBase,
int lookbackPeriods = int.MinValue)
{
ArgumentNullException.ThrowIfNull(chainProviderEval);
ArgumentNullException.ThrowIfNull(chainProviderBase);

return new PrsHub(chainProviderEval, chainProviderBase, lookbackPeriods);
}
}
222 changes: 222 additions & 0 deletions tests/indicators/m-r/Prs/Prs.StreamHub.Tests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
namespace StreamHubs;

[TestClass]
public class PrsHubTests : StreamHubTestBase, ITestChainObserver, ITestChainProvider
{
private static readonly IReadOnlyList<Quote> RevisedOtherQuotes
= OtherQuotes.Where(static (_, idx) => idx != removeAtIndex).ToList();

[TestMethod]
public void QuoteObserver_WithWarmupLateArrivalAndRemoval_MatchesSeriesExactly()
{
// setup quote provider hubs
QuoteHub quoteHubEval = new();
QuoteHub quoteHubBase = new();

// prefill quotes at both providers
quoteHubEval.Add(OtherQuotes.Take(20));
quoteHubBase.Add(Quotes.Take(20));

// initialize observer
PrsHub observer = quoteHubEval.ToPrsHub(quoteHubBase);

// fetch initial results (early)
IReadOnlyList<PrsResult> sut = observer.Results;

// emulate adding quotes to provider hubs
for (int i = 20; i < quotesCount; i++)
{
// skip one (add later)
if (i == 80) { continue; }

Quote qEval = OtherQuotes[i];
Quote qBase = Quotes[i];

// IMPORTANT: Add to base BEFORE eval to ensure base cache is updated first
quoteHubBase.Add(qBase);
quoteHubEval.Add(qEval);

// resend duplicate quotes
if (i is > 100 and < 105)
{
quoteHubBase.Add(qBase);
quoteHubEval.Add(qEval);
}
}

// late arrival, should equal series
quoteHubBase.Insert(Quotes[80]);
quoteHubEval.Insert(OtherQuotes[80]);

IReadOnlyList<PrsResult> expectedOriginal = OtherQuotes.ToPrs(Quotes);
sut.IsExactly(expectedOriginal);

// delete, should equal series (revised)
quoteHubBase.Remove(Quotes[removeAtIndex]);
quoteHubEval.Remove(OtherQuotes[removeAtIndex]);
IReadOnlyList<PrsResult> expectedRevised = RevisedOtherQuotes.ToPrs(RevisedQuotes);
sut.IsExactly(expectedRevised);
sut.Should().HaveCount(quotesCount - 1);

// cleanup
observer.Unsubscribe();
quoteHubEval.EndTransmission();
quoteHubBase.EndTransmission();
}

[TestMethod]
public void QuoteObserver_WithLookbackPeriod_MatchesSeriesExactly()
{
const int lookbackPeriods = 14;

// setup quote provider hubs
QuoteHub quoteHubEval = new();
QuoteHub quoteHubBase = new();

// prefill quotes at both providers
quoteHubEval.Add(OtherQuotes.Take(20));
quoteHubBase.Add(Quotes.Take(20));

// initialize observer with lookback
PrsHub observer = quoteHubEval.ToPrsHub(quoteHubBase, lookbackPeriods);

// emulate adding quotes to provider hubs
for (int i = 20; i < quotesCount; i++)
{
Quote qEval = OtherQuotes[i];
Quote qBase = Quotes[i];

// IMPORTANT: Add to base BEFORE eval
quoteHubBase.Add(qBase);
quoteHubEval.Add(qEval);
}

// final results
IReadOnlyList<PrsResult> sut = observer.Results;

// time-series, for comparison
IReadOnlyList<PrsResult> expected = OtherQuotes.ToPrs(Quotes, lookbackPeriods);

// assert, should equal series
sut.IsExactly(expected);
sut.Should().HaveCount(quotesCount);

// cleanup
observer.Unsubscribe();
quoteHubEval.EndTransmission();
quoteHubBase.EndTransmission();
}

[TestMethod]
public void ChainObserver_ChainedProvider_MatchesSeriesExactly()
{
const int emaPeriods = 12;

// setup quote provider hubs
QuoteHub quoteHubEval = new();
QuoteHub quoteHubBase = new();

// initialize observer with chained provider
PrsHub observer = quoteHubEval
.ToEmaHub(emaPeriods)
.ToPrsHub(quoteHubBase);

// emulate quote stream
for (int i = 0; i < quotesCount; i++)
{
// IMPORTANT: Add to base BEFORE eval
quoteHubBase.Add(Quotes[i]);
quoteHubEval.Add(OtherQuotes[i]);
}

// final results
IReadOnlyList<PrsResult> sut = observer.Results;

// time-series, for comparison
IReadOnlyList<PrsResult> expected = OtherQuotes
.ToEma(emaPeriods)
.ToPrs(Quotes);

// assert, should equal series
sut.IsExactly(expected);
sut.Should().HaveCount(quotesCount);

// cleanup
observer.Unsubscribe();
quoteHubEval.EndTransmission();
quoteHubBase.EndTransmission();
}

[TestMethod]
public void ChainProvider_MatchesSeriesExactly()
{
const int emaPeriods = 12;

// setup quote provider hubs
QuoteHub quoteHubEval = new();
QuoteHub quoteHubBase = new();

// initialize observer with PRS as provider
EmaHub observer = quoteHubEval
.ToPrsHub(quoteHubBase)
.ToEmaHub(emaPeriods);

// emulate adding quotes to provider hubs
for (int i = 0; i < quotesCount; i++)
{
// skip one (add later)
if (i == 80) { continue; }

Quote qEval = OtherQuotes[i];
Quote qBase = Quotes[i];

// IMPORTANT: Add to base BEFORE eval
quoteHubBase.Add(qBase);
quoteHubEval.Add(qEval);

// resend duplicate quotes
if (i is > 100 and < 105)
{
quoteHubBase.Add(qBase);
quoteHubEval.Add(qEval);
}
}

// late arrival
quoteHubBase.Insert(Quotes[80]);
quoteHubEval.Insert(OtherQuotes[80]);

// delete
quoteHubBase.Remove(Quotes[removeAtIndex]);
quoteHubEval.Remove(OtherQuotes[removeAtIndex]);

// final results
IReadOnlyList<EmaResult> sut = observer.Results;

// time-series, for comparison (revised)
IReadOnlyList<EmaResult> expected = RevisedOtherQuotes
.ToPrs(RevisedQuotes)
.ToEma(emaPeriods);

// assert, should equal series
sut.Should().HaveCount(quotesCount - 1);
sut.IsExactly(expected);

// cleanup
observer.Unsubscribe();
quoteHubEval.EndTransmission();
quoteHubBase.EndTransmission();
}

[TestMethod]
public override void ToStringOverride_ReturnsExpectedName()
{
QuoteHub hubEval = new();
QuoteHub hubBase = new();
PrsHub hub = new(hubEval, hubBase, int.MinValue);
hub.ToString().Should().Be("PRS");

PrsHub hubWithLookback = new(hubEval, hubBase, 14);
hubWithLookback.ToString().Should().Be("PRS(14)");
}
}
4 changes: 4 additions & 0 deletions tools/performance/Perf.StyleComparison.cs
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,10 @@ public void Cleanup()
[Benchmark(Baseline = true)]
public IReadOnlyList<PrsResult> PrsSeries() => quotes.ToPrs(o);

[BenchmarkCategory("Prs")]
[Benchmark]
public IReadOnlyList<PrsResult> PrsStream() => quoteHub.ToPrsHub(quoteHubOther).Results;

[BenchmarkCategory("Pvo")]
[Benchmark(Baseline = true)]
public IReadOnlyList<PvoResult> PvoSeries() => quotes.ToPvo();
Expand Down
Loading