Skip to content

Commit b52de54

Browse files
konardclaude
andcommitted
Add timer-based subscription restart mechanism for trader
- Add SubscriptionTimeoutInterval (5 minutes) to detect silent subscription failures - Track LastTradesDataTicks and LastMarketDataTicks for both subscription streams - Implement CheckTradesTimeout and CheckMarketDataTimeout methods to monitor data flow - Update ReceiveTradesLoop and SendOrdersLoop to restart subscriptions when no data received - Fix existing Task.Delay calls to properly handle cancellation tokens Fixes issue where trades subscription was broken after 17 days of uptime. The bot now proactively restarts subscriptions if no data is received within the timeout period. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
1 parent aa1323f commit b52de54

File tree

1 file changed

+98
-4
lines changed

1 file changed

+98
-4
lines changed

csharp/TraderBot/TradingService.cs

Lines changed: 98 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ public class TradingService : BackgroundService
1919
protected static readonly TimeSpan RefreshInterval = TimeSpan.FromSeconds(10);
2020
protected static readonly TimeSpan SyncInterval = TimeSpan.FromSeconds(20);
2121
protected static readonly TimeSpan WaitOutputInterval = TimeSpan.FromSeconds(20);
22+
protected static readonly TimeSpan SubscriptionTimeoutInterval = TimeSpan.FromMinutes(5);
2223
protected readonly InvestApiClient InvestApi;
2324
protected readonly ILogger<TradingService> Logger;
2425
protected readonly IHostApplicationLifetime Lifetime;
@@ -33,6 +34,8 @@ public class TradingService : BackgroundService
3334
protected long LastRefreshTicks;
3435
protected long LastSyncTicks;
3536
protected long LastWaitOutputTicks;
37+
protected long LastTradesDataTicks;
38+
protected long LastMarketDataTicks;
3639
protected TimeSpan MinimumTimeToBuy;
3740
protected TimeSpan MaximumTimeToBuy;
3841
protected readonly ConcurrentDictionary<string, OrderState> ActiveBuyOrders;
@@ -112,6 +115,9 @@ public TradingService(ILogger<TradingService> logger, InvestApiClient investApi,
112115
LotsSets = new ConcurrentDictionary<decimal, long>();
113116
ActiveSellOrderSourcePrice = new ConcurrentDictionary<string, decimal>();
114117
LastOperationsCheckpoint = settings.LoadOperationsFrom;
118+
var nowTicks = DateTime.UtcNow.Ticks;
119+
LastTradesDataTicks = nowTicks;
120+
LastMarketDataTicks = nowTicks;
115121
}
116122

117123
protected async Task ReceiveTrades(CancellationToken cancellationToken)
@@ -122,6 +128,7 @@ protected async Task ReceiveTrades(CancellationToken cancellationToken)
122128
});
123129
await foreach (var data in tradesStream.ResponseStream.ReadAllAsync(cancellationToken))
124130
{
131+
Interlocked.Exchange(ref LastTradesDataTicks, DateTime.UtcNow.Ticks);
125132
Logger.LogInformation($"Trade: {data}");
126133
if (data.PayloadCase == TradesStreamResponse.PayloadOneofCase.OrderTrades)
127134
{
@@ -349,14 +356,40 @@ protected async Task SendOrdersLoop(CancellationToken cancellationToken)
349356
try
350357
{
351358
await Refresh(forceReset: true);
352-
await SendOrders(cancellationToken);
359+
360+
using var timeoutCancellationTokenSource = new CancellationTokenSource();
361+
using var combinedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCancellationTokenSource.Token);
362+
363+
var sendOrdersTask = SendOrders(combinedCancellationTokenSource.Token);
364+
var timeoutTask = CheckMarketDataTimeout(timeoutCancellationTokenSource, cancellationToken);
365+
366+
await Task.WhenAny(sendOrdersTask, timeoutTask);
367+
368+
if (timeoutTask.IsCompleted && !timeoutTask.IsCanceled)
369+
{
370+
Logger.LogWarning("Market data subscription timeout detected, restarting subscription.");
371+
timeoutCancellationTokenSource.Cancel();
372+
}
373+
374+
try
375+
{
376+
await sendOrdersTask;
377+
}
378+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
379+
{
380+
throw;
381+
}
382+
catch (OperationCanceledException)
383+
{
384+
Logger.LogInformation("Market data subscription cancelled due to timeout, will restart.");
385+
}
353386
}
354387
catch (Exception ex)
355388
{
356389
if (!cancellationToken.IsCancellationRequested)
357390
{
358391
Logger.LogError(ex, "SendOrders exception.");
359-
await Task.Delay(RecoveryInterval);
392+
await Task.Delay(RecoveryInterval, cancellationToken);
360393
}
361394
}
362395
}
@@ -369,19 +402,79 @@ protected async Task ReceiveTradesLoop(CancellationToken cancellationToken)
369402
try
370403
{
371404
await Refresh(forceReset: true);
372-
await ReceiveTrades(cancellationToken);
405+
406+
using var timeoutCancellationTokenSource = new CancellationTokenSource();
407+
using var combinedCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCancellationTokenSource.Token);
408+
409+
var receiveTradesTask = ReceiveTrades(combinedCancellationTokenSource.Token);
410+
var timeoutTask = CheckTradesTimeout(timeoutCancellationTokenSource, cancellationToken);
411+
412+
await Task.WhenAny(receiveTradesTask, timeoutTask);
413+
414+
if (timeoutTask.IsCompleted && !timeoutTask.IsCanceled)
415+
{
416+
Logger.LogWarning("Trades subscription timeout detected, restarting subscription.");
417+
timeoutCancellationTokenSource.Cancel();
418+
}
419+
420+
try
421+
{
422+
await receiveTradesTask;
423+
}
424+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
425+
{
426+
throw;
427+
}
428+
catch (OperationCanceledException)
429+
{
430+
Logger.LogInformation("Trades subscription cancelled due to timeout, will restart.");
431+
}
373432
}
374433
catch (Exception ex)
375434
{
376435
if (!cancellationToken.IsCancellationRequested)
377436
{
378437
Logger.LogError(ex, "ReceiveTrades exception.");
379-
await Task.Delay(RecoveryInterval);
438+
await Task.Delay(RecoveryInterval, cancellationToken);
380439
}
381440
}
382441
}
383442
}
384443

444+
protected async Task CheckTradesTimeout(CancellationTokenSource timeoutCancellationTokenSource, CancellationToken cancellationToken)
445+
{
446+
while (!cancellationToken.IsCancellationRequested && !timeoutCancellationTokenSource.Token.IsCancellationRequested)
447+
{
448+
var nowTicks = DateTime.UtcNow.Ticks;
449+
var lastDataTicks = Interlocked.Read(ref LastTradesDataTicks);
450+
451+
if (nowTicks - lastDataTicks > SubscriptionTimeoutInterval.Ticks)
452+
{
453+
Logger.LogWarning($"No trades data received for {SubscriptionTimeoutInterval.TotalMinutes} minutes, triggering restart.");
454+
return;
455+
}
456+
457+
await Task.Delay(TimeSpan.FromSeconds(30), cancellationToken);
458+
}
459+
}
460+
461+
protected async Task CheckMarketDataTimeout(CancellationTokenSource timeoutCancellationTokenSource, CancellationToken cancellationToken)
462+
{
463+
while (!cancellationToken.IsCancellationRequested && !timeoutCancellationTokenSource.Token.IsCancellationRequested)
464+
{
465+
var nowTicks = DateTime.UtcNow.Ticks;
466+
var lastDataTicks = Interlocked.Read(ref LastMarketDataTicks);
467+
468+
if (nowTicks - lastDataTicks > SubscriptionTimeoutInterval.Ticks)
469+
{
470+
Logger.LogWarning($"No market data received for {SubscriptionTimeoutInterval.TotalMinutes} minutes, triggering restart.");
471+
return;
472+
}
473+
474+
await Task.Delay(TimeSpan.FromSeconds(30), cancellationToken);
475+
}
476+
}
477+
385478
protected async Task SendOrders(CancellationToken cancellationToken)
386479
{
387480
var marketDataStream = InvestApi.MarketDataStream.MarketDataStream();
@@ -402,6 +495,7 @@ await marketDataStream.RequestStream.WriteAsync(new MarketDataRequest
402495
}, cancellationToken);
403496
await foreach (var data in marketDataStream.ResponseStream.ReadAllAsync(cancellationToken))
404497
{
498+
Interlocked.Exchange(ref LastMarketDataTicks, DateTime.UtcNow.Ticks);
405499
// Logger.LogInformation($"data.PayloadCase: {data.PayloadCase}");
406500
if (data.PayloadCase == MarketDataResponse.PayloadOneofCase.SubscribeOrderBookResponse)
407501
{

0 commit comments

Comments
 (0)