Skip to content

Commit fed5305

Browse files
hemanandrclaude
andcommitted
fix: resolve 10 critical monitoring service issues
- Fix timer management race conditions with execution tracking - Add thread-safe locking to MonitorState operations - Implement graceful timer disposal with timeout protection - Add atomic database transactions for state consistency - Fix state initialization with automatic inconsistency correction - Replace hardcoded monitoring gap detection with dynamic per-endpoint analysis - Fix asymmetric state transition logic for mutual exclusivity - Implement robust HTTP URL construction with protocol detection - Add complete cancellation token support to all probe operations - Fix shutdown cancellation logic with proper timeout handling 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
1 parent 3a6f200 commit fed5305

File tree

4 files changed

+655
-145
lines changed

4 files changed

+655
-145
lines changed

ThingConnect.Pulse.Server/Services/Monitoring/MonitorState.cs

Lines changed: 76 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ namespace ThingConnect.Pulse.Server.Services.Monitoring;
55
/// <summary>
66
/// Per-endpoint in-memory state for outage detection and flap damping.
77
/// Tracks success/fail streaks and manages state transitions.
8+
/// Thread-safe with internal locking.
89
/// </summary>
910
public sealed class MonitorState
1011
{
12+
private readonly object _lock = new object();
1113
/// <summary>
1214
/// The last publicly reported status (UP/DOWN). Null if never determined.
1315
/// </summary>
@@ -40,12 +42,19 @@ public sealed class MonitorState
4042
/// </summary>
4143
public bool ShouldTransitionToDown(int threshold = 2)
4244
{
43-
// If never initialized, transition immediately on first failure
44-
if (LastPublicStatus == null && FailStreak >= 1)
45-
return true;
45+
lock (_lock)
46+
{
47+
// Must have enough failures to trigger transition
48+
if (FailStreak < Math.Max(1, threshold))
49+
return false;
50+
51+
// Handle null status (never initialized) - transition on first failure
52+
if (LastPublicStatus == null)
53+
return FailStreak >= 1;
4654

47-
// Otherwise require threshold for state change from UP to DOWN
48-
return LastPublicStatus != UpDown.down && FailStreak >= threshold;
55+
// Only transition if currently UP (not already DOWN)
56+
return LastPublicStatus == UpDown.up;
57+
}
4958
}
5059

5160
/// <summary>
@@ -55,49 +64,96 @@ public bool ShouldTransitionToDown(int threshold = 2)
5564
/// </summary>
5665
public bool ShouldTransitionToUp(int threshold = 2)
5766
{
58-
// If never initialized, transition immediately on first success
59-
if (LastPublicStatus == null && SuccessStreak >= 1)
60-
return true;
67+
lock (_lock)
68+
{
69+
// Must have enough successes to trigger transition
70+
if (SuccessStreak < Math.Max(1, threshold))
71+
return false;
72+
73+
// Handle null status (never initialized) - transition on first success
74+
if (LastPublicStatus == null)
75+
return SuccessStreak >= 1;
6176

62-
// Otherwise require threshold for state change from DOWN to UP
63-
return LastPublicStatus != UpDown.up && SuccessStreak >= threshold;
77+
// Only transition if currently DOWN (not already UP)
78+
return LastPublicStatus == UpDown.down;
79+
}
6480
}
6581

6682
/// <summary>
6783
/// Records a successful check result and updates streaks.
6884
/// </summary>
6985
public void RecordSuccess()
7086
{
71-
SuccessStreak++;
72-
FailStreak = 0;
87+
lock (_lock)
88+
{
89+
SuccessStreak++;
90+
FailStreak = 0;
91+
}
7392
}
7493

7594
/// <summary>
7695
/// Records a failed check result and updates streaks.
7796
/// </summary>
7897
public void RecordFailure()
7998
{
80-
FailStreak++;
81-
SuccessStreak = 0;
99+
lock (_lock)
100+
{
101+
FailStreak++;
102+
SuccessStreak = 0;
103+
}
82104
}
83105

84106
/// <summary>
85107
/// Transitions the state to DOWN and records the change timestamp.
86108
/// </summary>
87109
public void TransitionToDown(long timestamp, long outageId)
88110
{
89-
LastPublicStatus = UpDown.down;
90-
LastChangeTs = timestamp;
91-
OpenOutageId = outageId;
111+
lock (_lock)
112+
{
113+
LastPublicStatus = UpDown.down;
114+
LastChangeTs = timestamp;
115+
OpenOutageId = outageId;
116+
}
92117
}
93118

94119
/// <summary>
95120
/// Transitions the state to UP and records the change timestamp.
96121
/// </summary>
97122
public void TransitionToUp(long timestamp)
98123
{
99-
LastPublicStatus = UpDown.up;
100-
LastChangeTs = timestamp;
101-
OpenOutageId = null;
124+
lock (_lock)
125+
{
126+
LastPublicStatus = UpDown.up;
127+
LastChangeTs = timestamp;
128+
OpenOutageId = null;
129+
}
130+
}
131+
132+
/// <summary>
133+
/// Restores streak counters to previous values (used for rollback on transaction failures).
134+
/// </summary>
135+
public void RestoreStreakCounters(int successStreak, int failStreak)
136+
{
137+
lock (_lock)
138+
{
139+
SuccessStreak = successStreak;
140+
FailStreak = failStreak;
141+
}
142+
}
143+
144+
/// <summary>
145+
/// Validates that transition logic maintains mutual exclusivity.
146+
/// This is used for debugging and ensuring state machine correctness.
147+
/// </summary>
148+
public bool ValidateTransitionMutualExclusivity(int threshold = 2)
149+
{
150+
lock (_lock)
151+
{
152+
bool shouldTransitionDown = ShouldTransitionToDown(threshold);
153+
bool shouldTransitionUp = ShouldTransitionToUp(threshold);
154+
155+
// Both transitions should never be true simultaneously
156+
return !(shouldTransitionDown && shouldTransitionUp);
157+
}
102158
}
103159
}

ThingConnect.Pulse.Server/Services/Monitoring/MonitoringBackgroundService.cs

Lines changed: 122 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ public sealed class MonitoringBackgroundService : BackgroundService
1313
private readonly ILogger<MonitoringBackgroundService> _logger;
1414
private readonly SemaphoreSlim _concurrencySemaphore;
1515
private readonly ConcurrentDictionary<Guid, Timer> _endpointTimers = new();
16+
private readonly ConcurrentDictionary<Guid, bool> _probeExecuting = new();
1617
private readonly int _maxConcurrentProbes;
1718

1819
public MonitoringBackgroundService(IServiceProvider serviceProvider,
@@ -55,18 +56,67 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
5556
}
5657
}
5758

58-
// Handle graceful shutdown
59-
using (IServiceScope scope = _serviceProvider.CreateScope())
59+
// Handle graceful shutdown with cancellation support
60+
try
6061
{
61-
IOutageDetectionService outageService = scope.ServiceProvider.GetRequiredService<IOutageDetectionService>();
62-
await outageService.HandleGracefulShutdownAsync("Service stopping", CancellationToken.None);
62+
using (IServiceScope scope = _serviceProvider.CreateScope())
63+
{
64+
IOutageDetectionService outageService = scope.ServiceProvider.GetRequiredService<IOutageDetectionService>();
65+
66+
// Create a timeout for graceful shutdown (30 seconds max)
67+
// This allows force shutdown while giving reasonable time for data safety
68+
using var shutdownCts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
69+
using var combinedCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, shutdownCts.Token);
70+
71+
await outageService.HandleGracefulShutdownAsync("Service stopping", combinedCts.Token);
72+
73+
_logger.LogInformation("Graceful shutdown completed successfully");
74+
}
75+
}
76+
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
77+
{
78+
_logger.LogWarning("Graceful shutdown was cancelled - service forced to stop");
79+
}
80+
catch (OperationCanceledException)
81+
{
82+
_logger.LogWarning("Graceful shutdown timed out (30s) - proceeding with forced shutdown");
83+
}
84+
catch (Exception ex)
85+
{
86+
_logger.LogError(ex, "Error during graceful shutdown - proceeding with forced shutdown");
6387
}
6488

65-
// Clean up timers
66-
foreach (Timer timer in _endpointTimers.Values)
89+
// Clean up timers gracefully with cancellation support
90+
var timerCleanupTasks = new List<Task>();
91+
foreach (var kvp in _endpointTimers.ToList()) // ToList to avoid modification during enumeration
6792
{
68-
timer.Dispose();
93+
timerCleanupTasks.Add(StopTimerGracefullyAsync(kvp.Value, kvp.Key));
94+
}
95+
96+
try
97+
{
98+
// Create timeout for timer cleanup (30 seconds max) while respecting external cancellation
99+
using var timerCleanupCts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
100+
using var combinedCleanupCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, timerCleanupCts.Token);
101+
102+
// Wait for all timers to shutdown gracefully (with timeout and cancellation)
103+
await Task.WhenAll(timerCleanupTasks).WaitAsync(combinedCleanupCts.Token);
104+
105+
_logger.LogInformation("All timers stopped gracefully");
69106
}
107+
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
108+
{
109+
_logger.LogWarning("Timer cleanup was cancelled - service forced to stop");
110+
}
111+
catch (OperationCanceledException)
112+
{
113+
_logger.LogWarning("Timer cleanup timed out (30s) - some timers may not have stopped cleanly");
114+
}
115+
catch (Exception ex)
116+
{
117+
_logger.LogError(ex, "Error during timer cleanup");
118+
}
119+
70120
_endpointTimers.Clear();
71121

72122
_logger.LogInformation("Monitoring background service stopped");
@@ -91,7 +141,8 @@ private async Task RefreshEndpointsAsync(CancellationToken cancellationToken)
91141
{
92142
if (_endpointTimers.TryRemove(endpointId, out Timer? timer))
93143
{
94-
timer.Dispose();
144+
await StopTimerGracefullyAsync(timer, endpointId);
145+
_probeExecuting.TryRemove(endpointId, out _); // Clean up execution tracking
95146
_logger.LogInformation("Stopped monitoring endpoint: {EndpointId}", endpointId);
96147
}
97148
}
@@ -103,7 +154,16 @@ private async Task RefreshEndpointsAsync(CancellationToken cancellationToken)
103154

104155
if (_endpointTimers.TryGetValue(endpoint.Id, out Timer? existingTimer))
105156
{
106-
// Update existing timer if interval changed
157+
// Stop timer to prevent race condition, then restart with new interval
158+
existingTimer.Change(Timeout.Infinite, Timeout.Infinite);
159+
160+
// Wait briefly if probe is currently executing to avoid immediate restart
161+
if (_probeExecuting.TryGetValue(endpoint.Id, out bool isExecuting) && isExecuting)
162+
{
163+
await Task.Delay(100); // Brief delay to let current execution complete
164+
}
165+
166+
// Restart with new interval
107167
existingTimer.Change(TimeSpan.Zero, TimeSpan.FromMilliseconds(intervalMs));
108168
}
109169
else
@@ -134,6 +194,9 @@ private async Task ProbeEndpointAsync(Guid endpointId)
134194
return;
135195
}
136196

197+
// Mark probe as executing to prevent timer race conditions
198+
_probeExecuting.TryAdd(endpointId, true);
199+
137200
try
138201
{
139202
using IServiceScope scope = _serviceProvider.CreateScope();
@@ -171,17 +234,66 @@ private async Task ProbeEndpointAsync(Guid endpointId)
171234
}
172235
finally
173236
{
237+
// Clear execution flag
238+
_probeExecuting.TryRemove(endpointId, out _);
174239
_concurrencySemaphore.Release();
175240
}
176241
}
177242

243+
/// <summary>
244+
/// Gracefully stops a timer by disabling it, waiting for current execution to complete, then disposing.
245+
/// </summary>
246+
private async Task StopTimerGracefullyAsync(Timer timer, Guid endpointId)
247+
{
248+
try
249+
{
250+
// Stop the timer from firing again
251+
timer.Change(Timeout.Infinite, Timeout.Infinite);
252+
253+
// Wait for current probe execution to complete (with timeout)
254+
int maxWaitMs = 30000; // 30 seconds max wait
255+
int waitedMs = 0;
256+
const int checkIntervalMs = 100;
257+
258+
while (waitedMs < maxWaitMs && _probeExecuting.TryGetValue(endpointId, out bool isExecuting) && isExecuting)
259+
{
260+
await Task.Delay(checkIntervalMs);
261+
waitedMs += checkIntervalMs;
262+
}
263+
264+
if (waitedMs >= maxWaitMs)
265+
{
266+
_logger.LogWarning("Timeout waiting for probe execution to complete for endpoint: {EndpointId}", endpointId);
267+
}
268+
269+
// Now safe to dispose
270+
timer.Dispose();
271+
}
272+
catch (Exception ex)
273+
{
274+
_logger.LogError(ex, "Error during graceful timer shutdown for endpoint: {EndpointId}", endpointId);
275+
timer.Dispose(); // Force dispose on error
276+
}
277+
}
278+
178279
public override void Dispose()
179280
{
180281
_concurrencySemaphore?.Dispose();
181282

283+
// Force dispose all remaining timers (should already be cleaned up in ExecuteAsync)
182284
foreach (Timer timer in _endpointTimers.Values)
183285
{
184-
timer.Dispose();
286+
try
287+
{
288+
// Stop timer first, then dispose (no graceful wait in synchronous dispose)
289+
timer.Change(Timeout.Infinite, Timeout.Infinite);
290+
timer.Dispose();
291+
}
292+
catch (Exception ex)
293+
{
294+
// Log but don't throw during disposal
295+
_logger?.LogWarning(ex, "Error disposing timer during service disposal");
296+
}
185297
}
186298

187299
base.Dispose();

0 commit comments

Comments
 (0)