From 5485f71af19a60ccf35971f1afd1f10105f5c701 Mon Sep 17 00:00:00 2001 From: Shawn Snyder Date: Wed, 22 Oct 2025 22:03:26 -0500 Subject: [PATCH 1/4] initial --- .../BlackScholesGreekCalculatorTests.cs | 26 +++--- .../Composite/BlackScholesGreekCalculator.cs | 14 +-- Intrinio.Realtime/Composite/Greek.cs | 10 ++- Intrinio.Realtime/Composite/GreekClient.cs | 4 + Intrinio.Realtime/Intrinio.Realtime.csproj | 2 +- Intrinio.Realtime/WebSocketClient.cs | 86 ++++++++++++++++--- 6 files changed, 110 insertions(+), 32 deletions(-) diff --git a/Intrinio.Realtime.Tests/BlackScholesGreekCalculatorTests.cs b/Intrinio.Realtime.Tests/BlackScholesGreekCalculatorTests.cs index 778df3c..e981864 100644 --- a/Intrinio.Realtime.Tests/BlackScholesGreekCalculatorTests.cs +++ b/Intrinio.Realtime.Tests/BlackScholesGreekCalculatorTests.cs @@ -85,7 +85,7 @@ public void AccuracyTest_Call() #endregion #region Act - Greek greek = Intrinio.Realtime.Composite.BlackScholesGreekCalculator.Calculate(riskFreeInterestRate, dividendYield, equitiesTrade.Price, optionsQuote.Timestamp, (optionsQuote.AskPrice + optionsQuote.BidPrice) / 2, optionsQuote.IsPut(), optionsQuote.GetStrikePrice(), optionsQuote.GetExpirationDate()); + Greek greek = Intrinio.Realtime.Composite.BlackScholesGreekCalculator.Calculate(riskFreeInterestRate, dividendYield, equitiesTrade.Price, optionsQuote.Timestamp, (optionsQuote.AskPrice + optionsQuote.BidPrice) / 2, 0.0D, 0.0D, optionsQuote.IsPut(), optionsQuote.GetStrikePrice(), optionsQuote.GetExpirationDate()); #endregion #region Asserts @@ -107,7 +107,7 @@ public void Calculate_InvalidMarketPrice_ReturnsFailure() double q = 0.0; double S = 100.0; - Greek result = BlackScholesGreekCalculator.Calculate(r, q, S, quote.Timestamp, 0.0D, quote.IsPut(), quote.GetStrikePrice(), quote.GetExpirationDate()); + Greek result = BlackScholesGreekCalculator.Calculate(r, q, S, quote.Timestamp, 0.0D, 0.0D, 0.0D, quote.IsPut(), quote.GetStrikePrice(), quote.GetExpirationDate()); Assert.IsFalse(result.IsValid); Assert.AreEqual(0.0, result.ImpliedVolatility); @@ -125,7 +125,7 @@ public void Calculate_InvalidRiskFreeRate_ReturnsFailure() double q = 0.0; double S = 100.0; - Greek result = BlackScholesGreekCalculator.Calculate(r, q, S, quote.Timestamp, (quote.AskPrice + quote.BidPrice) / 2, quote.IsPut(), quote.GetStrikePrice(), quote.GetExpirationDate()); + Greek result = BlackScholesGreekCalculator.Calculate(r, q, S, quote.Timestamp, (quote.AskPrice + quote.BidPrice) / 2, 0.0D, 0.0D, quote.IsPut(), quote.GetStrikePrice(), quote.GetExpirationDate()); Assert.IsFalse(result.IsValid); Assert.AreEqual(0.0, result.ImpliedVolatility); @@ -143,7 +143,7 @@ public void Calculate_InvalidUnderlyingPrice_ReturnsFailure() double q = 0.0; double S = 0.0; - Greek result = BlackScholesGreekCalculator.Calculate(r, q, S, quote.Timestamp, (quote.AskPrice + quote.BidPrice) / 2, quote.IsPut(), quote.GetStrikePrice(), quote.GetExpirationDate()); + Greek result = BlackScholesGreekCalculator.Calculate(r, q, S, quote.Timestamp, (quote.AskPrice + quote.BidPrice) / 2, 0.0D, 0.0D, quote.IsPut(), quote.GetStrikePrice(), quote.GetExpirationDate()); Assert.IsFalse(result.IsValid); Assert.AreEqual(0.0, result.ImpliedVolatility); @@ -161,7 +161,7 @@ public void Calculate_ExpirationInPast_ReturnsFailure() double q = 0.0; double S = 100.0; - Greek result = BlackScholesGreekCalculator.Calculate(r, q, S, quote.Timestamp, (quote.AskPrice + quote.BidPrice) / 2, quote.IsPut(), quote.GetStrikePrice(), quote.GetExpirationDate()); + Greek result = BlackScholesGreekCalculator.Calculate(r, q, S, quote.Timestamp, (quote.AskPrice + quote.BidPrice) / 2, 0.0D, 0.0D, quote.IsPut(), quote.GetStrikePrice(), quote.GetExpirationDate()); Assert.IsFalse(result.IsValid); Assert.AreEqual(0.0, result.ImpliedVolatility); @@ -179,7 +179,7 @@ public void Calculate_ExpirationNow_ReturnsFailure() double q = 0.0; double S = 100.0; - Greek result = BlackScholesGreekCalculator.Calculate(r, q, S, quote.Timestamp, (quote.AskPrice + quote.BidPrice) / 2, quote.IsPut(), quote.GetStrikePrice(), quote.GetExpirationDate()); + Greek result = BlackScholesGreekCalculator.Calculate(r, q, S, quote.Timestamp, (quote.AskPrice + quote.BidPrice) / 2, 0.0D, 0.0D, quote.IsPut(), quote.GetStrikePrice(), quote.GetExpirationDate()); Assert.IsFalse(result.IsValid); Assert.AreEqual(0.0, result.ImpliedVolatility); @@ -197,7 +197,7 @@ public void Calculate_InvalidStrike_ReturnsFailure() double q = 0.0; double S = 100.0; - Greek result = BlackScholesGreekCalculator.Calculate(r, q, S, quote.Timestamp, (quote.AskPrice + quote.BidPrice) / 2, quote.IsPut(), quote.GetStrikePrice(), quote.GetExpirationDate()); + Greek result = BlackScholesGreekCalculator.Calculate(r, q, S, quote.Timestamp, (quote.AskPrice + quote.BidPrice) / 2, 0.0D, 0.0D, quote.IsPut(), quote.GetStrikePrice(), quote.GetExpirationDate()); Assert.IsFalse(result.IsValid); Assert.AreEqual(0.0, result.ImpliedVolatility); @@ -225,7 +225,7 @@ public void Calculate_ATMCall_ReturnsCorrectGreeks() Intrinio.Realtime.Options.Quote quote = CreateQuote(askPrice: marketPrice, bidPrice: marketPrice, unixTimestamp: 1755292170, expirationSecondsFromNow: tSeconds, strike: K, isPut: isPut); - Greek result = BlackScholesGreekCalculator.Calculate(r, q, S, quote.Timestamp, (quote.AskPrice + quote.BidPrice) / 2, quote.IsPut(), quote.GetStrikePrice(), quote.GetExpirationDate()); + Greek result = BlackScholesGreekCalculator.Calculate(r, q, S, quote.Timestamp, (quote.AskPrice + quote.BidPrice) / 2, 0.0D, 0.0D, quote.IsPut(), quote.GetStrikePrice(), quote.GetExpirationDate()); Assert.IsTrue(result.IsValid); Assert.AreEqual(expectedIV, result.ImpliedVolatility, Tolerance); @@ -253,7 +253,7 @@ public void Calculate_ITMCall_ReturnsCorrectGreeks() Intrinio.Realtime.Options.Quote quote = CreateQuote(askPrice: marketPrice, bidPrice: marketPrice, unixTimestamp: 1755292170, expirationSecondsFromNow: tSeconds, strike: K, isPut: isPut); - Greek result = BlackScholesGreekCalculator.Calculate(r, q, S, quote.Timestamp, (quote.AskPrice + quote.BidPrice) / 2, quote.IsPut(), quote.GetStrikePrice(), quote.GetExpirationDate()); + Greek result = BlackScholesGreekCalculator.Calculate(r, q, S, quote.Timestamp, (quote.AskPrice + quote.BidPrice) / 2, 0.0D, 0.0D, quote.IsPut(), quote.GetStrikePrice(), quote.GetExpirationDate()); Assert.IsTrue(result.IsValid); Assert.AreEqual(expectedIV, result.ImpliedVolatility, Tolerance); @@ -281,7 +281,7 @@ public void Calculate_OTMPut_ReturnsCorrectGreeks() Intrinio.Realtime.Options.Quote quote = CreateQuote(askPrice: marketPrice, bidPrice: marketPrice, unixTimestamp: 1755292170, expirationSecondsFromNow: tSeconds, strike: K, isPut: isPut); - Greek result = BlackScholesGreekCalculator.Calculate(r, q, S, quote.Timestamp, (quote.AskPrice + quote.BidPrice) / 2, quote.IsPut(), quote.GetStrikePrice(), quote.GetExpirationDate()); + Greek result = BlackScholesGreekCalculator.Calculate(r, q, S, quote.Timestamp, (quote.AskPrice + quote.BidPrice) / 2, 0.0D, 0.0D, quote.IsPut(), quote.GetStrikePrice(), quote.GetExpirationDate()); Assert.IsTrue(result.IsValid); Assert.AreEqual(expectedIV, result.ImpliedVolatility, Tolerance); @@ -309,7 +309,7 @@ public void Calculate_DeepITMCallLowVol_ReturnsCorrectGreeks() Intrinio.Realtime.Options.Quote quote = CreateQuote(askPrice: marketPrice, bidPrice: marketPrice, unixTimestamp: 1755292170, expirationSecondsFromNow: tSeconds, strike: K, isPut: isPut); - Greek result = BlackScholesGreekCalculator.Calculate(r, q, S, quote.Timestamp, (quote.AskPrice + quote.BidPrice) / 2, quote.IsPut(), quote.GetStrikePrice(), quote.GetExpirationDate()); + Greek result = BlackScholesGreekCalculator.Calculate(r, q, S, quote.Timestamp, (quote.AskPrice + quote.BidPrice) / 2, 0.0D, 0.0D, quote.IsPut(), quote.GetStrikePrice(), quote.GetExpirationDate()); Assert.IsTrue(result.IsValid); Assert.AreEqual(expectedIV, result.ImpliedVolatility, Tolerance * 750); // looser for low vol @@ -337,7 +337,7 @@ public void Calculate_DeepOTMPutHighVol_ReturnsCorrectGreeks() Intrinio.Realtime.Options.Quote quote = CreateQuote(askPrice: marketPrice, bidPrice: marketPrice, unixTimestamp: 1755292170, expirationSecondsFromNow: tSeconds, strike: K, isPut: isPut); - Greek result = BlackScholesGreekCalculator.Calculate(r, q, S, quote.Timestamp, (quote.AskPrice + quote.BidPrice) / 2, quote.IsPut(), quote.GetStrikePrice(), quote.GetExpirationDate()); + Greek result = BlackScholesGreekCalculator.Calculate(r, q, S, quote.Timestamp, (quote.AskPrice + quote.BidPrice) / 2, 0.0D, 0.0D, quote.IsPut(), quote.GetStrikePrice(), quote.GetExpirationDate()); Assert.IsTrue(result.IsValid); Assert.AreEqual(expectedIV, result.ImpliedVolatility, Tolerance); @@ -365,7 +365,7 @@ public void Calculate_VeryShortTimeATMCall_ReturnsCorrectGreeks() Intrinio.Realtime.Options.Quote quote = CreateQuote(askPrice: marketPrice, bidPrice: marketPrice, unixTimestamp: 1755292170, expirationSecondsFromNow: tSeconds, strike: K, isPut: isPut); - Greek result = BlackScholesGreekCalculator.Calculate(r, q, S, quote.Timestamp, marketPrice, quote.IsPut(), quote.GetStrikePrice(), quote.GetExpirationDate()); + Greek result = BlackScholesGreekCalculator.Calculate(r, q, S, quote.Timestamp, marketPrice, 0.0D, 0.0D, quote.IsPut(), quote.GetStrikePrice(), quote.GetExpirationDate()); Assert.IsTrue(result.IsValid); Assert.AreEqual(expectedIV, result.ImpliedVolatility, Tolerance); diff --git a/Intrinio.Realtime/Composite/BlackScholesGreekCalculator.cs b/Intrinio.Realtime/Composite/BlackScholesGreekCalculator.cs index dd6dbb9..e4ddc62 100644 --- a/Intrinio.Realtime/Composite/BlackScholesGreekCalculator.cs +++ b/Intrinio.Realtime/Composite/BlackScholesGreekCalculator.cs @@ -12,19 +12,23 @@ public static class BlackScholesGreekCalculator private const double MAX_Z_SCORE = 8.0D; private static readonly double root2Pi = Math.Sqrt(2.0D * Math.PI); - public static Greek Calculate(double riskFreeInterestRate, double dividendYield, double underlyingPrice, double latestEventUnixTimestamp, double marketPrice, bool isPut, double strike, DateTime expirationDate) + public static Greek Calculate(double riskFreeInterestRate, double dividendYield, double underlyingPrice, double latestEventUnixTimestamp, double marketPrice, double askPrice, double bidPrice, bool isPut, double strike, DateTime expirationDate) { if (marketPrice <= 0.0D || riskFreeInterestRate <= 0.0D || underlyingPrice <= 0.0D) - return new Greek(0.0D, 0.0D, 0.0D, 0.0D, 0.0D, false); + return new Greek(0.0D, 0.0D, 0.0D, 0.0D, 0.0D, 0.0D, 0.0D, false); double yearsToExpiration = GetYearsToExpiration(latestEventUnixTimestamp, expirationDate); if (yearsToExpiration <= 0.0D || strike <= 0.0D) - return new Greek(0.0D, 0.0D, 0.0D, 0.0D, 0.0D, false); + return new Greek(0.0D, 0.0D, 0.0D, 0.0D, 0.0D, 0.0D, 0.0D, false); double impliedVolatility = CalcImpliedVolatility(isPut, underlyingPrice, strike, yearsToExpiration, riskFreeInterestRate, dividendYield, marketPrice); if (impliedVolatility == 0.0D) - return new Greek(0.0D, 0.0D, 0.0D, 0.0D, 0.0D, false); + return new Greek(0.0D, 0.0D, 0.0D, 0.0D, 0.0D, 0.0D, 0.0D, false); + + + double askImpliedVolatility = (askPrice > 0.0D) ? CalcImpliedVolatility(isPut, underlyingPrice, strike, yearsToExpiration, riskFreeInterestRate, dividendYield, askPrice) : 0.0D; + double bidImpliedVolatility = (askPrice > 0.0D) ? CalcImpliedVolatility(isPut, underlyingPrice, strike, yearsToExpiration, riskFreeInterestRate, dividendYield, bidPrice) : 0.0D; // Compute common values once for all Greeks to avoid redundant calcs double sqrtT = Math.Sqrt(yearsToExpiration); @@ -46,7 +50,7 @@ public static Greek Calculate(double riskFreeInterestRate, double dividendYield, double term3 = dividendYield * underlyingPrice * expQt * (isPut ? (1.0D - nD1) : nD1); double theta = isPut ? (-term1 + term2 - term3) / 365.25D : (-term1 - term2 + term3) / 365.25D; - return new Greek(impliedVolatility, delta, gamma, theta, vega, true); + return new Greek(impliedVolatility, delta, gamma, theta, vega, askImpliedVolatility, bidImpliedVolatility, true); } [MethodImpl(MethodImplOptions.AggressiveInlining)] diff --git a/Intrinio.Realtime/Composite/Greek.cs b/Intrinio.Realtime/Composite/Greek.cs index 1ca7d0f..b1c4f72 100644 --- a/Intrinio.Realtime/Composite/Greek.cs +++ b/Intrinio.Realtime/Composite/Greek.cs @@ -9,15 +9,19 @@ public struct Greek public readonly double Gamma; public readonly double Theta; public readonly double Vega; - public readonly bool IsValid; + public readonly double AskImpliedVolatility; + public readonly double BidImpliedVolatility; + public readonly bool IsValid; - public Greek(double impliedVolatility, double delta, double gamma, double theta, double vega, bool isValid) + public Greek(double impliedVolatility, double delta, double gamma, double theta, double vega, double askImpliedVolatility, double bidImpliedVolatility, bool isValid) { ImpliedVolatility = impliedVolatility; Delta = delta; Gamma = gamma; Theta = theta; Vega = vega; + AskImpliedVolatility = askImpliedVolatility; + BidImpliedVolatility = bidImpliedVolatility; IsValid = isValid; } @@ -33,6 +37,8 @@ public bool Equals(Greek obj) && Gamma == obj.Gamma && Theta == obj.Theta && Vega == obj.Vega + && AskImpliedVolatility == obj.AskImpliedVolatility + && BidImpliedVolatility == obj.BidImpliedVolatility && IsValid == obj.IsValid; } }; \ No newline at end of file diff --git a/Intrinio.Realtime/Composite/GreekClient.cs b/Intrinio.Realtime/Composite/GreekClient.cs index ba3ff4a..4fb6a22 100644 --- a/Intrinio.Realtime/Composite/GreekClient.cs +++ b/Intrinio.Realtime/Composite/GreekClient.cs @@ -455,6 +455,8 @@ private void BlackScholesCalc(IOptionsContractData optionsContractData, ISecurit equitiesTrade.Value.Price, optionsQuote.Value.Timestamp, (optionsQuote.Value.AskPrice + optionsQuote.Value.BidPrice) / 2.0D, + optionsQuote.Value.AskPrice, + optionsQuote.Value.BidPrice, optionsQuote.Value.IsPut(), optionsQuote.Value.GetStrikePrice(), optionsQuote.Value.GetExpirationDate()); @@ -483,6 +485,8 @@ private void BlackScholesCalcOptionsEdge(IOptionsContractData optionsContractDat equitiesTrade.Value.Price, optionsTrade.Value.Timestamp, optionsTrade.Value.Price, + optionsTrade.Value.Price, + optionsTrade.Value.Price, optionsTrade.Value.IsPut(), optionsTrade.Value.GetStrikePrice(), optionsTrade.Value.GetExpirationDate()); diff --git a/Intrinio.Realtime/Intrinio.Realtime.csproj b/Intrinio.Realtime/Intrinio.Realtime.csproj index 8d46f33..814f2c3 100644 --- a/Intrinio.Realtime/Intrinio.Realtime.csproj +++ b/Intrinio.Realtime/Intrinio.Realtime.csproj @@ -25,7 +25,7 @@ - + diff --git a/Intrinio.Realtime/WebSocketClient.cs b/Intrinio.Realtime/WebSocketClient.cs index ab92bfb..8da3c88 100644 --- a/Intrinio.Realtime/WebSocketClient.cs +++ b/Intrinio.Realtime/WebSocketClient.cs @@ -34,6 +34,7 @@ public abstract class WebSocketClient private readonly uint _bufferBlockSize; private readonly SingleProducerRingBuffer _data; private readonly DropOldestRingBuffer _overflowData; + private IDynamicBlockPriorityRingBufferPool _priorityQueue; private readonly Func _tryReconnect; private readonly HttpClient _httpClient = new (); private const string ClientInfoHeaderKey = "Client-Information"; @@ -41,6 +42,7 @@ public abstract class WebSocketClient private readonly ThreadPriority _mainThreadPriority; private readonly Thread[] _threads; private Thread? _receiveThread; + private Thread? _prioritizeThread; private bool _started; #endregion //Data Members @@ -103,8 +105,11 @@ public async Task Start() if (_started) return; _started = true; + + _priorityQueue = GetPriorityRingBufferPool(); _receiveThread = new Thread(ReceiveFn); + _prioritizeThread = new Thread(PrioritizeFn); for (int i = 0; i < _threads.Length; i++) _threads[i] = new Thread(ProcessFn); @@ -141,6 +146,8 @@ public async Task Stop() _ctSource.Cancel(); if (_receiveThread != null) _receiveThread.Join(); + if (_prioritizeThread != null) + _prioritizeThread.Join(); foreach (Thread thread in _threads) if (thread != null) thread.Join(); @@ -266,7 +273,8 @@ protected async Task JoinImpl(string channel, bool skipAddCheck = false) protected abstract byte[] MakeJoinMessage(string channel); protected abstract byte[] MakeLeaveMessage(string channel); protected abstract void HandleMessage(in ReadOnlySpan bytes); - protected abstract int GetNextChunkLength(ReadOnlySpan bytes); + protected abstract ChunkInfo GetNextChunkInfo(ReadOnlySpan bytes); + protected abstract IDynamicBlockPriorityRingBufferPool GetPriorityRingBufferPool(); #endregion //Abstract Methods @@ -380,19 +388,23 @@ private void ReceiveFn() } } } - - private void ProcessFn() + + private void PrioritizeFn() { CancellationToken ct = _ctSource.Token; Thread.CurrentThread.Priority = (ThreadPriority)(Math.Max((((int)_mainThreadPriority) - 1), 0)); //Set below main thread priority so doesn't interfere with main thread accepting messages. - byte[] underlyingBuffer = new byte[_bufferBlockSize]; - Span datum = new Span(underlyingBuffer); + byte[] underlyingBuffer = new byte[_bufferBlockSize]; + Span datum = new Span(underlyingBuffer); + int iterationsSinceWork = 0; //int for the Thread.sleep arg type, and this number will never get more than 1000. + while (!ct.IsCancellationRequested) { try { if (_data.TryDequeue(datum) || _overflowData.TryDequeue(datum)) { + iterationsSinceWork = 0; + // These are grouped (many) messages. // The first byte tells us how many messages there are. // From there, for each message, check the message length at index 1 of each chunk to know how many bytes each chunk has. @@ -401,22 +413,60 @@ private void ProcessFn() int startIndex = 1; for (ulong i = 0UL; i < cnt; ++i) { - int msgLength = 1; //default value in case corrupt array so we don't reprocess same bytes over and over. + ChunkInfo chunkInfo = new ChunkInfo(1, 0); //default value in case corrupt array so we don't reprocess same bytes over and over. try { - msgLength = GetNextChunkLength(datum.Slice(startIndex)); - ReadOnlySpan chunk = datum.Slice(startIndex, msgLength); - HandleMessage(in chunk); + chunkInfo = GetNextChunkInfo(datum.Slice(startIndex)); + ReadOnlySpan chunk = datum.Slice(startIndex, chunkInfo.ChunkLength); + _priorityQueue.TryEnqueue(chunkInfo.Priority, chunk); } catch(Exception e) {LogMessage(LogLevel.ERROR, "Error parsing message: {0}; {1}", new object[]{e.Message, e.StackTrace});} finally { - startIndex += msgLength; + startIndex += chunkInfo.ChunkLength; } } } else - Thread.Sleep(10); + { + iterationsSinceWork = Math.Min(iterationsSinceWork + 1, 1000); + Thread.Sleep(iterationsSinceWork / 100); + } + } + catch (OperationCanceledException) + { + } + catch (Exception exn) + { + LogMessage(LogLevel.WARNING, "Error parsing message: {0}; {1}", new object[]{exn.Message, exn.StackTrace}); + } + }; + } + + private void ProcessFn() + { + CancellationToken ct = _ctSource.Token; + Thread.CurrentThread.Priority = (ThreadPriority)(Math.Max((((int)_mainThreadPriority) - 1), 0)); //Set below main thread priority so doesn't interfere with main thread accepting messages. + byte[] underlyingBuffer = new byte[_bufferBlockSize]; + Span datum = new Span(underlyingBuffer); + ReadOnlySpan chunk = datum; + int iterationsSinceWork = 0; //int for the Thread.sleep arg type, and this number will never get more than 1000. + + while (!ct.IsCancellationRequested) + { + try + { + if (_priorityQueue.TryDequeue(underlyingBuffer, out datum)) + { + iterationsSinceWork = 0; + chunk = datum; + HandleMessage(in chunk); + } + else + { + iterationsSinceWork = Math.Min(iterationsSinceWork + 1, 1000); + Thread.Sleep(iterationsSinceWork / 100); + } } catch (OperationCanceledException) { @@ -513,6 +563,8 @@ private async Task OnOpen() if (!thread.IsAlive && thread.ThreadState.HasFlag(ThreadState.Unstarted)) thread.Start(); } + if (!_prioritizeThread.IsAlive && _prioritizeThread.ThreadState.HasFlag(ThreadState.Unstarted)) + _prioritizeThread.Start(); if (!_receiveThread.IsAlive && _receiveThread.ThreadState.HasFlag(ThreadState.Unstarted)) _receiveThread.Start(); } @@ -590,4 +642,16 @@ private async Task InitializeWebSockets(string token) } #endregion //Private Methods +} + +public readonly struct ChunkInfo +{ + public readonly int ChunkLength; + public readonly uint Priority; + + public ChunkInfo(int chunkLength, uint priority) + { + ChunkLength = chunkLength; + Priority = priority; + } } \ No newline at end of file From 8c4bda56e40a9e6c12060126c24ae160a929caed Mon Sep 17 00:00:00 2001 From: Shawn Snyder Date: Wed, 22 Oct 2025 23:24:37 -0500 Subject: [PATCH 2/4] client stats --- Intrinio.Realtime/ClientStats.cs | 76 +++++++++++++------ .../Equities/EquitiesWebSocketClient.cs | 19 ++++- .../Options/OptionsWebSocketClient.cs | 24 ++++-- Intrinio.Realtime/WebSocketClient.cs | 76 ++++++++++--------- SampleApp/EquitiesSampleApp.cs | 6 +- SampleApp/GreekSampleApp.cs | 12 ++- SampleApp/KitchenSinkSampleApp.cs | 12 ++- SampleApp/OptionsSampleApp.cs | 6 +- 8 files changed, 153 insertions(+), 78 deletions(-) diff --git a/Intrinio.Realtime/ClientStats.cs b/Intrinio.Realtime/ClientStats.cs index c093386..47cba3d 100644 --- a/Intrinio.Realtime/ClientStats.cs +++ b/Intrinio.Realtime/ClientStats.cs @@ -6,42 +6,74 @@ public class ClientStats { private readonly UInt64 _socketDataMessages; private readonly UInt64 _socketTextMessages; - private readonly int _queueDepth; + private readonly UInt64 _queueDepth; private readonly UInt64 _eventCount; - private readonly int _queueCapacity; - private readonly int _overflowQueueDepth; - private readonly int _overflowQueueCapacity; - private readonly int _droppedCount; - private readonly int _overflowCount; + private readonly UInt64 _queueCapacity; + private readonly UInt64 _overflowQueueDepth; + private readonly UInt64 _overflowQueueCapacity; + private readonly UInt64 _droppedCount; + private readonly UInt64 _overflowCount; + private readonly UInt64 _priorityQueueDepth; + private readonly UInt64 _priorityQueueCapacity; + private readonly UInt64 _priorityQueueDroppedCount; - public ClientStats(UInt64 socketDataMessages, UInt64 socketTextMessages, int queueDepth, UInt64 eventCount, int queueCapacity, int overflowQueueDepth, int overflowQueueCapacity, int droppedCount, int overflowCount) + public ClientStats(UInt64 socketDataMessages, + UInt64 socketTextMessages, + UInt64 queueDepth, + UInt64 eventCount, + UInt64 queueCapacity, + UInt64 overflowQueueDepth, + UInt64 overflowQueueCapacity, + UInt64 droppedCount, + UInt64 overflowCount, + UInt64 priorityQueueDepth, + UInt64 priorityQueueCapacity, + UInt64 priorityQueueDroppedCount) { - _socketDataMessages = socketDataMessages; - _socketTextMessages = socketTextMessages; - _queueDepth = queueDepth; - _eventCount = eventCount; - _queueCapacity = queueCapacity; - _overflowQueueDepth = overflowQueueDepth; - _overflowQueueCapacity = overflowQueueCapacity; - _droppedCount = droppedCount; - _overflowCount = overflowCount; + _socketDataMessages = socketDataMessages; + _socketTextMessages = socketTextMessages; + _queueDepth = queueDepth; + _eventCount = eventCount; + _queueCapacity = queueCapacity; + _overflowQueueDepth = overflowQueueDepth; + _overflowQueueCapacity = overflowQueueCapacity; + _droppedCount = droppedCount; + _overflowCount = overflowCount; + _priorityQueueDepth = priorityQueueDepth; + _priorityQueueCapacity = priorityQueueCapacity; + _priorityQueueDroppedCount = priorityQueueDroppedCount; } public UInt64 SocketDataMessages { get { return _socketDataMessages; } } public UInt64 SocketTextMessages { get { return _socketTextMessages; } } - public int QueueDepth { get { return _queueDepth; } } + public UInt64 QueueDepth { get { return _queueDepth; } } - public int QueueCapacity { get { return _queueCapacity; } } + public UInt64 QueueCapacity { get { return _queueCapacity; } } - public int OverflowQueueDepth { get { return _overflowQueueDepth; } } + public UInt64 OverflowQueueDepth { get { return _overflowQueueDepth; } } - public int OverflowQueueCapacity { get { return _overflowQueueCapacity; } } + public UInt64 OverflowQueueCapacity { get { return _overflowQueueCapacity; } } public UInt64 EventCount { get { return _eventCount; } } - public int DroppedCount { get { return _droppedCount; } } + public UInt64 DroppedCount { get { return _droppedCount; } } - public int OverflowCount { get { return _overflowCount; } } + public UInt64 OverflowCount { get { return _overflowCount; } } + + public UInt64 PriorityQueueDroppedCount + { + get { return _priorityQueueDroppedCount; } + } + + public UInt64 PriorityQueueCapacity + { + get { return _priorityQueueCapacity; } + } + + public UInt64 PriorityQueueDepth + { + get { return _priorityQueueDepth; } + } } \ No newline at end of file diff --git a/Intrinio.Realtime/Equities/EquitiesWebSocketClient.cs b/Intrinio.Realtime/Equities/EquitiesWebSocketClient.cs index d5516ef..c20dadb 100644 --- a/Intrinio.Realtime/Equities/EquitiesWebSocketClient.cs +++ b/Intrinio.Realtime/Equities/EquitiesWebSocketClient.cs @@ -6,6 +6,7 @@ using System.Threading.Tasks; using System.Linq; using System.Runtime.CompilerServices; +using Intrinio.Collections.RingBuffers; using Intrinio.Realtime.Composite; namespace Intrinio.Realtime.Equities; @@ -275,11 +276,25 @@ protected override string GetWebSocketUrl(string token) break; } } + + protected override IDynamicBlockPriorityRingBufferPool GetPriorityRingBufferPool() + { + IDynamicBlockPriorityRingBufferPool queue = new DynamicBlockPriorityRingBufferPool(_bufferBlockSize, _bufferSize); + + queue.AddUpdateRingBufferToPool(0, new DynamicBlockNoLockRingBuffer(_bufferBlockSize, _bufferSize)); //trades + queue.AddUpdateRingBufferToPool(1, new DynamicBlockNoLockDropOldestRingBuffer(_bufferBlockSize, _bufferSize)); //quotes + + return queue; + } [MethodImpl(MethodImplOptions.AggressiveInlining)] - protected override int GetNextChunkLength(ReadOnlySpan bytes) + protected override ChunkInfo GetNextChunkInfo(ReadOnlySpan bytes) { - return Convert.ToInt32(bytes[1]); + int length = Convert.ToInt32(bytes[1]); + uint priority = 0u; + + + //return new ChunkInfo(length, priority); } protected override List> GetCustomSocketHeaders() diff --git a/Intrinio.Realtime/Options/OptionsWebSocketClient.cs b/Intrinio.Realtime/Options/OptionsWebSocketClient.cs index 9a374c7..7f2e938 100644 --- a/Intrinio.Realtime/Options/OptionsWebSocketClient.cs +++ b/Intrinio.Realtime/Options/OptionsWebSocketClient.cs @@ -6,6 +6,7 @@ using System.Threading.Tasks; using System.Linq; using System.Runtime.CompilerServices; +using Intrinio.Collections.RingBuffers; using Intrinio.Realtime.Composite; namespace Intrinio.Realtime.Options; @@ -308,20 +309,31 @@ protected override List> GetCustomSocketHeaders() headers.Add(new KeyValuePair(DelayHeaderKey, DelayHeaderValue)); return headers; } + + protected override IDynamicBlockPriorityRingBufferPool GetPriorityRingBufferPool() + { + IDynamicBlockPriorityRingBufferPool queue = new DynamicBlockPriorityRingBufferPool(_bufferBlockSize, _bufferSize); + + queue.AddUpdateRingBufferToPool(0, new DynamicBlockNoLockRingBuffer(_bufferBlockSize, _bufferSize)); //trades + queue.AddUpdateRingBufferToPool(1, new DynamicBlockNoLockDropOldestRingBuffer(_bufferBlockSize, _bufferSize)); //refreshes and unusual activity + queue.AddUpdateRingBufferToPool(2, new DynamicBlockNoLockDropOldestRingBuffer(_bufferBlockSize, _bufferSize)); //quotes + + return queue; + } [MethodImpl(MethodImplOptions.AggressiveInlining)] - protected override int GetNextChunkLength(ReadOnlySpan bytes) + protected override ChunkInfo GetNextChunkInfo(ReadOnlySpan bytes) { byte msgType = bytes[MessageTypeIndex]; //in the bytes, symbol length is first, then symbol, then msg type. - + //using if-else vs switch for hotpathing if (msgType == 1u) - return QuoteMessageSize; + return new ChunkInfo(QuoteMessageSize, 2u); if (msgType == 0u) - return TradeMessageSize; + return new ChunkInfo(TradeMessageSize, 0u); if (msgType == 2u) - return RefreshMessageSize; - return UnusualActivityMessageSize; + return new ChunkInfo(RefreshMessageSize, 1u); + return new ChunkInfo(UnusualActivityMessageSize, 1u); } private string FormatContract(ReadOnlySpan alternateFormattedChars) diff --git a/Intrinio.Realtime/WebSocketClient.cs b/Intrinio.Realtime/WebSocketClient.cs index 8da3c88..25a3326 100644 --- a/Intrinio.Realtime/WebSocketClient.cs +++ b/Intrinio.Realtime/WebSocketClient.cs @@ -15,35 +15,35 @@ namespace Intrinio.Realtime; public abstract class WebSocketClient { #region Data Members - private readonly uint _processingThreadsQuantity; - private readonly uint _bufferSize; - private readonly uint _overflowBufferSize; - private int[] _selfHealBackoffs = new int[] { 10_000, 30_000, 60_000, 300_000, 600_000 }; - private readonly object _tLock = new (); - private readonly object _wsLock = new (); - private Tuple _token = new (null, DateTime.Now); - private WebSocketState _wsState = null; - private UInt64 _dataMsgCount = 0UL; - private UInt64 _dataEventCount = 0UL; - private UInt64 _textMsgCount = 0UL; - private readonly HashSet _channels = new (); - protected IEnumerable Channels { get { return _channels.ToArray(); } } - private readonly CancellationTokenSource _ctSource = new (); - protected CancellationToken CancellationToken { get { return _ctSource.Token; } } - private readonly uint _maxMessageSize; - private readonly uint _bufferBlockSize; - private readonly SingleProducerRingBuffer _data; - private readonly DropOldestRingBuffer _overflowData; - private IDynamicBlockPriorityRingBufferPool _priorityQueue; - private readonly Func _tryReconnect; - private readonly HttpClient _httpClient = new (); - private const string ClientInfoHeaderKey = "Client-Information"; - private const string ClientInfoHeaderValue = "IntrinioDotNetSDKv17.2"; - private readonly ThreadPriority _mainThreadPriority; - private readonly Thread[] _threads; - private Thread? _receiveThread; - private Thread? _prioritizeThread; - private bool _started; + private readonly uint _processingThreadsQuantity; + protected readonly uint _bufferSize; + private readonly uint _overflowBufferSize; + private int[] _selfHealBackoffs = new int[] { 10_000, 30_000, 60_000, 300_000, 600_000 }; + private readonly object _tLock = new (); + private readonly object _wsLock = new (); + private Tuple _token = new (null, DateTime.Now); + private WebSocketState _wsState = null; + private UInt64 _dataMsgCount = 0UL; + private UInt64 _dataEventCount = 0UL; + private UInt64 _textMsgCount = 0UL; + private readonly HashSet _channels = new (); + protected IEnumerable Channels { get { return _channels.ToArray(); } } + private readonly CancellationTokenSource _ctSource = new (); + protected CancellationToken CancellationToken { get { return _ctSource.Token; } } + private readonly uint _maxMessageSize; + protected readonly uint _bufferBlockSize; + private readonly SingleProducerRingBuffer _data; + private readonly DropOldestRingBuffer _overflowData; + private IDynamicBlockPriorityRingBufferPool _priorityQueue; + private readonly Func _tryReconnect; + private readonly HttpClient _httpClient = new (); + private const string ClientInfoHeaderKey = "Client-Information"; + private const string ClientInfoHeaderValue = "IntrinioDotNetSDKv17.2"; + private readonly ThreadPriority _mainThreadPriority; + private readonly Thread[] _threads; + private Thread? _receiveThread; + private Thread? _prioritizeThread; + private bool _started; #endregion //Data Members #region Constuctors @@ -165,13 +165,16 @@ public ClientStats GetStats() { return new ClientStats(Interlocked.Read(ref _dataMsgCount), Interlocked.Read(ref _textMsgCount), - Convert.ToInt32(_data.Count), + _data.Count, Interlocked.Read(ref _dataEventCount), - Convert.ToInt32(_data.BlockCapacity), - Convert.ToInt32(_overflowData.Count), - Convert.ToInt32(_overflowData.BlockCapacity), - Convert.ToInt32(_overflowData.DropCount), - System.Convert.ToInt32(_data.DropCount)); + _data.BlockCapacity, + _overflowData.Count, + _overflowData.BlockCapacity, + _overflowData.DropCount, + _data.DropCount, + _priorityQueue.Count, + _priorityQueue.TotalBlockCapacity, + _priorityQueue.DropCount); } [Serilog.Core.MessageTemplateFormatMethod("messageTemplate")] @@ -418,7 +421,8 @@ private void PrioritizeFn() { chunkInfo = GetNextChunkInfo(datum.Slice(startIndex)); ReadOnlySpan chunk = datum.Slice(startIndex, chunkInfo.ChunkLength); - _priorityQueue.TryEnqueue(chunkInfo.Priority, chunk); + while(!_priorityQueue.TryEnqueue(chunkInfo.Priority, chunk)) + Thread.Sleep(0); } catch(Exception e) {LogMessage(LogLevel.ERROR, "Error parsing message: {0}; {1}", new object[]{e.Message, e.StackTrace});} finally diff --git a/SampleApp/EquitiesSampleApp.cs b/SampleApp/EquitiesSampleApp.cs index 156313f..aabf7f2 100644 --- a/SampleApp/EquitiesSampleApp.cs +++ b/SampleApp/EquitiesSampleApp.cs @@ -88,7 +88,7 @@ static void TimerCallback(object obj) { IEquitiesWebSocketClient client = (IEquitiesWebSocketClient) obj; ClientStats stats = client.GetStats(); - Log("Socket Stats - Grouped Messages: {0}, Text Messages: {1}, Queue Depth: {2}%, Overflow Queue Depth: {3}%, Drops: {4}, Overflow Count: {5}, Individual Events: {6}, Trades: {7}, Quotes: {8}", + Log("Socket Stats - Grouped Messages: {0}, Text Messages: {1}, Queue Depth: {2}%, Overflow Queue Depth: {3}%, Drops: {4}, Overflow Count: {5}, PriorityQueue Depth: {9}%; PriorityQueue Drops: {10}, Individual Events: {6}, Trades: {7}, Quotes: {8}", stats.SocketDataMessages, stats.SocketTextMessages, (stats.QueueDepth * 100) / stats.QueueCapacity, @@ -97,7 +97,9 @@ static void TimerCallback(object obj) stats.OverflowCount, stats.EventCount, client.TradeCount, - client.QuoteCount); + client.QuoteCount, + (stats.PriorityQueueDepth * 100) / stats.PriorityQueueCapacity, + stats.PriorityQueueDroppedCount); if (maxTradeCount > 0) { Log("Most active trade: {0} ({1} updates)", maxCountTrade, maxTradeCount); diff --git a/SampleApp/GreekSampleApp.cs b/SampleApp/GreekSampleApp.cs index 3c21648..d1852a9 100644 --- a/SampleApp/GreekSampleApp.cs +++ b/SampleApp/GreekSampleApp.cs @@ -58,7 +58,7 @@ static void TimerCallback(object obj) { IOptionsWebSocketClient optionsClient = _optionsClient; ClientStats optionsClientStats = optionsClient.GetStats(); - Log("Options Socket Stats - Grouped Messages: {0}, Queue Depth: {1}%, Overflow Queue Depth: {2}%, Drops: {3}, Overflow Count: {4}, Individual Events: {5}, Trades: {6}, Quotes: {7}", + Log("Options Socket Stats - Grouped Messages: {0}, Queue Depth: {1}%, Overflow Queue Depth: {2}%, Drops: {3}, Overflow Count: {4}, PriorityQueue Depth: {8}%; PriorityQueue Drops: {9}, , Individual Events: {5}, Trades: {6}, Quotes: {7}", optionsClientStats.SocketDataMessages, (optionsClientStats.QueueDepth * 100) / optionsClientStats.QueueCapacity, (optionsClientStats.OverflowQueueDepth * 100) / optionsClientStats.OverflowQueueCapacity, @@ -66,11 +66,13 @@ static void TimerCallback(object obj) optionsClientStats.OverflowCount, optionsClientStats.EventCount, optionsClient.TradeCount, - optionsClient.QuoteCount); + optionsClient.QuoteCount, + (optionsClientStats.PriorityQueueDepth * 100) / optionsClientStats.PriorityQueueCapacity, + optionsClientStats.PriorityQueueDroppedCount); IEquitiesWebSocketClient equitiesClient = _equitiesClient; ClientStats equitiesClientStats = equitiesClient.GetStats(); - Log("Equities Socket Stats - Grouped Messages: {0}, Queue Depth: {1}%, Overflow Queue Depth: {2}%, Drops: {3}, Overflow Count: {4}, Individual Events: {5}, Trades: {6}, Quotes: {7}", + Log("Equities Socket Stats - Grouped Messages: {0}, Queue Depth: {1}%, Overflow Queue Depth: {2}%, PriorityQueue Depth: {8}%; PriorityQueue Drops: {9}, Drops: {3}, Overflow Count: {4}, Individual Events: {5}, Trades: {6}, Quotes: {7}", equitiesClientStats.SocketDataMessages, (equitiesClientStats.QueueDepth * 100) / equitiesClientStats.QueueCapacity, (equitiesClientStats.OverflowQueueDepth * 100) / equitiesClientStats.OverflowQueueCapacity, @@ -78,7 +80,9 @@ static void TimerCallback(object obj) equitiesClientStats.OverflowCount, equitiesClientStats.EventCount, equitiesClient.TradeCount, - equitiesClient.QuoteCount); + equitiesClient.QuoteCount, + (equitiesClientStats.PriorityQueueDepth * 100) / equitiesClientStats.PriorityQueueCapacity, + equitiesClientStats.PriorityQueueDroppedCount); Log("Greek updates: {0}", _greekUpdatedEventCount); Log("Data Cache Security Count: {0}", _dataCache.AllSecurityData.Count); diff --git a/SampleApp/KitchenSinkSampleApp.cs b/SampleApp/KitchenSinkSampleApp.cs index affed7e..416a080 100644 --- a/SampleApp/KitchenSinkSampleApp.cs +++ b/SampleApp/KitchenSinkSampleApp.cs @@ -212,7 +212,7 @@ static void TimerCallback(object obj) { IOptionsWebSocketClient optionsClient = _optionsClient; ClientStats optionsClientStats = optionsClient.GetStats(); - Log("Options Socket Stats - Grouped Messages: {0}, Text Messages: {1}, Queue Depth: {2}%, Overflow Queue Depth: {3}%, Drops: {4}, Overflow Count: {5}, Individual Events: {6}, Trades: {7}, Quotes: {8}, Refreshes: {9}, UnusualActivities: {10}", + Log("Options Socket Stats - Grouped Messages: {0}, Text Messages: {1}, Queue Depth: {2}%, Overflow Queue Depth: {3}%, Drops: {4}, Overflow Count: {5}, PriorityQueue Depth: {11}%; PriorityQueue Drops: {12}, Individual Events: {6}, Trades: {7}, Quotes: {8}, Refreshes: {9}, UnusualActivities: {10}", optionsClientStats.SocketDataMessages, optionsClientStats.SocketTextMessages, (optionsClientStats.QueueDepth * 100) / optionsClientStats.QueueCapacity, @@ -223,7 +223,9 @@ static void TimerCallback(object obj) optionsClient.TradeCount, optionsClient.QuoteCount, optionsClient.RefreshCount, - optionsClient.UnusualActivityCount); + optionsClient.UnusualActivityCount, + (optionsClientStats.PriorityQueueDepth * 100) / optionsClientStats.PriorityQueueCapacity, + optionsClientStats.PriorityQueueDroppedCount); if (_optionsUseTradeCandleSticks) Log("OPTION TRADE CANDLESTICK STATS - TradeCandleSticks = {0}, TradeCandleSticksIncomplete = {1}", _optionsTradeCandleStickCount, _optionsTradeCandleStickCountIncomplete); @@ -232,7 +234,7 @@ static void TimerCallback(object obj) IEquitiesWebSocketClient equitiesClient = _equitiesClient; ClientStats equitiesClientStats = equitiesClient.GetStats(); - Log("Equities Socket Stats - Grouped Messages: {0}, Text Messages: {1}, Queue Depth: {2}%, Overflow Queue Depth: {3}%, Drops: {4}, Overflow Count: {5}, Individual Events: {6}, Trades: {7}, Quotes: {8}", + Log("Equities Socket Stats - Grouped Messages: {0}, Text Messages: {1}, Queue Depth: {2}%, Overflow Queue Depth: {3}%, Drops: {4}, Overflow Count: {5}, PriorityQueue Depth: {9}%; PriorityQueue Drops: {10}, Individual Events: {6}, Trades: {7}, Quotes: {8}", equitiesClientStats.SocketDataMessages, equitiesClientStats.SocketTextMessages, (equitiesClientStats.QueueDepth * 100) / equitiesClientStats.QueueCapacity, @@ -241,7 +243,9 @@ static void TimerCallback(object obj) equitiesClientStats.OverflowCount, equitiesClientStats.EventCount, equitiesClient.TradeCount, - equitiesClient.QuoteCount); + equitiesClient.QuoteCount, + (equitiesClientStats.PriorityQueueDepth * 100) / equitiesClientStats.PriorityQueueCapacity, + equitiesClientStats.PriorityQueueDroppedCount); if (_equitiesUseTradeCandleSticks) Log("EQUITIES TRADE CANDLESTICK STATS - TradeCandleSticks = {0}, TradeCandleSticksIncomplete = {1}", _equitiesTradeCandleStickCount, _equitiesTradeCandleStickCountIncomplete); diff --git a/SampleApp/OptionsSampleApp.cs b/SampleApp/OptionsSampleApp.cs index b32c7a0..d666aa8 100644 --- a/SampleApp/OptionsSampleApp.cs +++ b/SampleApp/OptionsSampleApp.cs @@ -76,7 +76,7 @@ static void TimerCallback(object obj) { IOptionsWebSocketClient client = (IOptionsWebSocketClient) obj; ClientStats stats = client.GetStats(); - Log("Socket Stats - Grouped Messages: {0}, Text Messages: {1}, Queue Depth: {2}%, Overflow Queue Depth: {3}%, Drops: {4}, Overflow Count: {5}, Individual Events: {6}, Trades: {7}, Quotes: {8}, Refreshes: {9}, UnusualActivities: {10}", + Log("Socket Stats - Grouped Messages: {0}, Text Messages: {1}, Queue Depth: {2}%, Overflow Queue Depth: {3}%, Drops: {4}, Overflow Count: {5}, PriorityQueue Depth: {11}%; PriorityQueue Drops: {12}, Individual Events: {6}, Trades: {7}, Quotes: {8}, Refreshes: {9}, UnusualActivities: {10}", stats.SocketDataMessages, stats.SocketTextMessages, (stats.QueueDepth * 100) / stats.QueueCapacity, @@ -87,7 +87,9 @@ static void TimerCallback(object obj) client.TradeCount, client.QuoteCount, client.RefreshCount, - client.UnusualActivityCount); + client.UnusualActivityCount, + (stats.PriorityQueueDepth * 100) / stats.PriorityQueueCapacity, + stats.PriorityQueueDroppedCount); if (_useTradeCandleSticks) Log("TRADE CANDLESTICK STATS - TradeCandleSticks = {0}, TradeCandleSticksIncomplete = {1}", _tradeCandleStickCount, _tradeCandleStickCountIncomplete); From a83d18d2bc28fb96932f8d7e7752070855cc2f22 Mon Sep 17 00:00:00 2001 From: Shawn Snyder Date: Wed, 22 Oct 2025 23:31:27 -0500 Subject: [PATCH 3/4] Testing --- .../Equities/EquitiesWebSocketClient.cs | 19 +++++++++++++++---- Intrinio.Realtime/Equities/ReplayClient.cs | 5 ++++- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/Intrinio.Realtime/Equities/EquitiesWebSocketClient.cs b/Intrinio.Realtime/Equities/EquitiesWebSocketClient.cs index c20dadb..9a7a1ce 100644 --- a/Intrinio.Realtime/Equities/EquitiesWebSocketClient.cs +++ b/Intrinio.Realtime/Equities/EquitiesWebSocketClient.cs @@ -291,10 +291,21 @@ protected override IDynamicBlockPriorityRingBufferPool GetPriorityRingBufferPool protected override ChunkInfo GetNextChunkInfo(ReadOnlySpan bytes) { int length = Convert.ToInt32(bytes[1]); - uint priority = 0u; - - - //return new ChunkInfo(length, priority); + MessageType msgType = (MessageType)Convert.ToInt32(bytes[0]); + switch (msgType) + { + case MessageType.Trade: + { + return new ChunkInfo(length, 0); + } + case MessageType.Ask: + case MessageType.Bid: + { + return new ChunkInfo(length, 1); + } + default: + return new ChunkInfo(length, 1); + } } protected override List> GetCustomSocketHeaders() diff --git a/Intrinio.Realtime/Equities/ReplayClient.cs b/Intrinio.Realtime/Equities/ReplayClient.cs index 016ac91..12aa3ef 100644 --- a/Intrinio.Realtime/Equities/ReplayClient.cs +++ b/Intrinio.Realtime/Equities/ReplayClient.cs @@ -230,12 +230,15 @@ public ClientStats GetStats() return new ClientStats( Interlocked.Read(ref _dataMsgCount), Interlocked.Read(ref _textMsgCount), - _data.Count, + Convert.ToUInt64(_data.Count), Interlocked.Read(ref _dataEventCount), Int32.MaxValue, 0, Int32.MaxValue, 0, + 0, + 0, + Int32.MaxValue, 0 ); } From f98bcad2b8544f1d27954c2dba3019bd3ad278d0 Mon Sep 17 00:00:00 2001 From: Shawn Snyder Date: Wed, 22 Oct 2025 23:51:48 -0500 Subject: [PATCH 4/4] version bump --- Intrinio.Realtime/WebSocketClient.cs | 2 +- IntrinioRealTimeClient.nuspec | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Intrinio.Realtime/WebSocketClient.cs b/Intrinio.Realtime/WebSocketClient.cs index 25a3326..6eab10e 100644 --- a/Intrinio.Realtime/WebSocketClient.cs +++ b/Intrinio.Realtime/WebSocketClient.cs @@ -38,7 +38,7 @@ public abstract class WebSocketClient private readonly Func _tryReconnect; private readonly HttpClient _httpClient = new (); private const string ClientInfoHeaderKey = "Client-Information"; - private const string ClientInfoHeaderValue = "IntrinioDotNetSDKv17.2"; + private const string ClientInfoHeaderValue = "IntrinioDotNetSDKv18.0"; private readonly ThreadPriority _mainThreadPriority; private readonly Thread[] _threads; private Thread? _receiveThread; diff --git a/IntrinioRealTimeClient.nuspec b/IntrinioRealTimeClient.nuspec index 7f550dc..240b46b 100644 --- a/IntrinioRealTimeClient.nuspec +++ b/IntrinioRealTimeClient.nuspec @@ -2,7 +2,7 @@ IntrinioRealTimeClient - 17.2.0 + 18.0.0 Intrinio SDK for Real-Time Stock and Option Prices Intrinio Intrinio @@ -10,7 +10,7 @@ https://licenses.nuget.org/MIT https://github.com/intrinio/intrinio-realtime-csharp-sdk Intrinio provides real-time stock and option prices via a two-way WebSocket connection. - Version 17.2.0 release. + Version 18.0.0 release. Copyright 2025 Intrinio fintech stocks options prices websocket real-time market finance