Skip to content

Commit 5163dc8

Browse files
jeremydmillerclaude
andcommitted
Latch receivers immediately on ApplicationStopping to prevent processing during shutdown
Hook into IHostApplicationLifetime.ApplicationStopping so all message receivers are latched the moment SIGTERM fires, rather than waiting for IHostedService.StopAsync which may be delayed by other hosted services stopping first. This prevents messages already in internal queues from being picked up after the shutdown signal. Also reorders StopAsync to drain endpoints before releasing ownership, adds bounded WaitForCompletionAsync calls in DurableReceiver and BufferedReceiver with configurable DrainTimeout (default 30s), and includes GracefulShutdown/RollingRestart chaos tests. Closes #2282 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent e93531b commit 5163dc8

File tree

9 files changed

+212
-15
lines changed

9 files changed

+212
-15
lines changed

src/Transports/RabbitMQ/ChaosTesting/ChaosSpecifications.cs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,4 +143,30 @@ public Task RabbitMqDurableListener_Marten_ReceiverGoesUpAndDown() =>
143143
[Fact]
144144
public Task RabbitMqFiveDurableListeners_Marten_ReceiverGoesUpAndDown() =>
145145
execute<MartenStorageStrategy, ReceiverGoesUpAndDown>(RabbitMqFiveDurableListeners);
146+
147+
// Graceful shutdown tests
148+
[Fact]
149+
public Task RabbitMqDurableListener_Marten_GracefulShutdown() =>
150+
execute<MartenStorageStrategy, GracefulShutdown>(RabbitMqDurableListener);
151+
152+
[Fact]
153+
public Task RabbitMqFiveDurableListeners_Marten_GracefulShutdown() =>
154+
execute<MartenStorageStrategy, GracefulShutdown>(RabbitMqFiveDurableListeners);
155+
156+
[Fact]
157+
public Task RabbitMqBufferedListener_Marten_GracefulShutdown() =>
158+
execute<MartenStorageStrategy, GracefulShutdown>(RabbitMqBufferedListener);
159+
160+
[Fact]
161+
public Task RabbitMqOneInlineListener_Marten_GracefulShutdown() =>
162+
execute<MartenStorageStrategy, GracefulShutdown>(RabbitMqOneInlineListener);
163+
164+
// Rolling restart tests
165+
[Fact]
166+
public Task RabbitMqDurableListener_Marten_RollingRestart() =>
167+
execute<MartenStorageStrategy, RollingRestart>(RabbitMqDurableListener);
168+
169+
[Fact]
170+
public Task RabbitMqFiveDurableListeners_Marten_RollingRestart() =>
171+
execute<MartenStorageStrategy, RollingRestart>(RabbitMqFiveDurableListeners);
146172
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
using JasperFx.Core;
2+
3+
namespace ChaosTesting.Scripts;
4+
5+
/// <summary>
6+
/// Tests that in-flight messages complete during graceful shutdown and no messages
7+
/// are left orphaned. Sends messages, then stops the receiver while messages are
8+
/// still being processed, starts a new receiver, and verifies all messages complete.
9+
/// </summary>
10+
public class GracefulShutdown : ChaosScript
11+
{
12+
public GracefulShutdown()
13+
{
14+
TimeOut = 2.Minutes();
15+
}
16+
17+
public override async Task Drive(ChaosDriver driver)
18+
{
19+
await driver.StartReceiver("one");
20+
await driver.StartSender("one");
21+
22+
// Send a batch of messages
23+
await driver.SendMessages("one", 200);
24+
25+
// Give some time for processing to start but not complete
26+
await Task.Delay(500.Milliseconds());
27+
28+
// Gracefully stop the receiver while messages are in-flight
29+
await driver.StopReceiver("one");
30+
31+
// Start a new receiver to pick up any remaining messages
32+
await driver.StartReceiver("two");
33+
}
34+
}
35+
36+
/// <summary>
37+
/// Tests that multiple rapid shutdown/restart cycles don't lose messages.
38+
/// Simulates rolling deployment behavior where receivers are stopped and new
39+
/// ones started in quick succession.
40+
/// </summary>
41+
public class RollingRestart : ChaosScript
42+
{
43+
public RollingRestart()
44+
{
45+
TimeOut = 3.Minutes();
46+
}
47+
48+
public override async Task Drive(ChaosDriver driver)
49+
{
50+
await driver.StartReceiver("one");
51+
await driver.StartSender("one");
52+
53+
// Send initial batch
54+
await driver.SendMessages("one", 300);
55+
56+
// Simulate a rolling restart: stop old, start new, repeat
57+
await Task.Delay(1.Seconds());
58+
await driver.StopReceiver("one");
59+
60+
await driver.StartReceiver("two");
61+
await Task.Delay(1.Seconds());
62+
await driver.StopReceiver("two");
63+
64+
await driver.StartReceiver("three");
65+
}
66+
}

src/Wolverine/Configuration/EndpointCollection.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,24 @@ private ISendingAgent buildSendingAgent(Uri uri, Action<Endpoint>? configureNewE
382382
return endpoint.StartSending(_runtime, transport.ReplyEndpoint()?.Uri);
383383
}
384384

385+
/// <summary>
386+
/// Immediately latch all receivers to stop picking up new messages from their internal queues.
387+
/// This is called as early as possible during shutdown (via IHostApplicationLifetime.ApplicationStopping)
388+
/// so that messages already queued internally are not processed after the shutdown signal.
389+
/// </summary>
390+
public void LatchAllReceivers()
391+
{
392+
foreach (var listener in _listeners.Values)
393+
{
394+
listener.LatchReceiver();
395+
}
396+
397+
foreach (var queue in _localSenders.Enumerate().Select(x => x.Value).OfType<DurableLocalQueue>())
398+
{
399+
queue.LatchReceiver();
400+
}
401+
}
402+
385403
public async Task DrainAsync()
386404
{
387405
// Drain the listeners

src/Wolverine/DurabilitySettings.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,12 @@ internal set
247247
/// </summary>
248248
public TimeSpan? NodeAssignmentHealthCheckTraceSamplingPeriod { get; set; }
249249

250+
/// <summary>
251+
/// Maximum time to wait for in-flight message handlers to complete during graceful
252+
/// shutdown before proceeding with the shutdown sequence. Default is 30 seconds.
253+
/// </summary>
254+
public TimeSpan DrainTimeout { get; set; } = 30.Seconds();
255+
250256
/// <summary>
251257
/// Get or set the logical Wolverine service name. By default, this is
252258
/// derived from the name of a custom WolverineOptions

src/Wolverine/Runtime/WolverineRuntime.HostService.cs

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using JasperFx.CodeGeneration;
33
using JasperFx.Core;
44
using JasperFx.Core.Reflection;
5+
using Microsoft.Extensions.Hosting;
56
using Microsoft.Extensions.Logging;
67
using Wolverine.Configuration;
78
using Wolverine.Persistence.Durability;
@@ -102,6 +103,19 @@ public async Task StartAsync(CancellationToken cancellationToken)
102103

103104
await Observer.RuntimeIsFullyStarted();
104105
_hasStarted = true;
106+
107+
// Subscribe to the host shutdown signal so we can immediately latch all receivers
108+
// the moment SIGTERM/ApplicationStopping fires, rather than waiting until our
109+
// IHostedService.StopAsync is called (which may be delayed by other hosted services)
110+
try
111+
{
112+
var lifetime = _container.Services.GetService(typeof(IHostApplicationLifetime)) as IHostApplicationLifetime;
113+
lifetime?.ApplicationStopping.Register(OnApplicationStopping);
114+
}
115+
catch (Exception e)
116+
{
117+
Logger.LogDebug(e, "Could not subscribe to IHostApplicationLifetime.ApplicationStopping");
118+
}
105119
}
106120
catch (Exception? e)
107121
{
@@ -110,6 +124,12 @@ public async Task StartAsync(CancellationToken cancellationToken)
110124
}
111125
}
112126

127+
internal void OnApplicationStopping()
128+
{
129+
Logger.LogInformation("Application stopping signal received, latching all message receivers");
130+
_endpoints.LatchAllReceivers();
131+
}
132+
113133
private bool _hasMigratedStorage;
114134

115135
private async Task tryMigrateStorage()
@@ -196,7 +216,21 @@ public async Task StopAsync(CancellationToken cancellationToken)
196216
DisableHealthChecks();
197217

198218
_idleAgentCleanupLoop?.SafeDispose();
199-
219+
220+
if (StopMode == StopMode.Normal)
221+
{
222+
// Step 1: Drain endpoints first — stop listeners from accepting new messages
223+
// and wait for in-flight handlers to complete before releasing ownership.
224+
// Receivers were already latched via IHostApplicationLifetime.ApplicationStopping
225+
// to prevent new messages from being picked up, so this just waits for completion.
226+
await _endpoints.DrainAsync();
227+
228+
if (_accumulator.IsValueCreated)
229+
{
230+
await _accumulator.Value.DrainAsync();
231+
}
232+
}
233+
200234
if (_stores.IsValueCreated && StopMode == StopMode.Normal)
201235
{
202236
try
@@ -210,7 +244,8 @@ public async Task StopAsync(CancellationToken cancellationToken)
210244

211245
try
212246
{
213-
// New to 3.0, try to release any ownership on the way out. Do this *after* the drain
247+
// Release any ownership on the way out. Do this *after* draining endpoints
248+
// so in-flight messages complete before their ownership is released.
214249
await _stores.Value.ReleaseAllOwnershipAsync(DurabilitySettings.AssignedNodeNumber);
215250
}
216251
catch (ObjectDisposedException)
@@ -221,15 +256,8 @@ public async Task StopAsync(CancellationToken cancellationToken)
221256

222257
if (StopMode == StopMode.Normal)
223258
{
224-
// This MUST be called before draining the endpoints
259+
// Step 2: Now teardown agents — safe after endpoints drained and ownership released
225260
await teardownAgentsAsync();
226-
227-
await _endpoints.DrainAsync();
228-
229-
if (_accumulator.IsValueCreated)
230-
{
231-
await _accumulator.Value.DrainAsync();
232-
}
233261
}
234262

235263
DurabilitySettings.Cancel();

src/Wolverine/Runtime/WorkerQueues/BufferedReceiver.cs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,21 +114,38 @@ async ValueTask IChannelCallback.DeferAsync(Envelope envelope)
114114

115115
public int QueueCount => (int)_receivingBlock.Count;
116116

117+
/// <summary>
118+
/// Immediately latch to stop processing new messages without draining.
119+
/// </summary>
120+
public void Latch()
121+
{
122+
_latched = true;
123+
}
124+
117125
public async ValueTask DrainAsync()
118126
{
119127
_latched = true;
120128
_receivingBlock.Complete();
121129

130+
// Wait for in-flight handler executions to complete, bounded by a timeout
131+
// to prevent hanging during shutdown
132+
try
133+
{
134+
var completion = _receivingBlock.WaitForCompletionAsync();
135+
await Task.WhenAny(completion, Task.Delay(_settings.DrainTimeout));
136+
}
137+
catch (Exception e)
138+
{
139+
_logger.LogDebug(e, "Error waiting for in-flight message processing to complete at {Uri}", Uri);
140+
}
141+
122142
await _completeBlock.DrainAsync();
123143
await _deferBlock.DrainAsync();
124144

125145
if (_moveToErrors != null)
126146
{
127147
await _moveToErrors.DrainAsync();
128148
}
129-
130-
// It hangs, nothing to be done about this I think
131-
//await _receivingBlock.Completion;
132149
}
133150

134151
public void Enqueue(Envelope envelope)

src/Wolverine/Runtime/WorkerQueues/DurableReceiver.cs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -286,8 +286,17 @@ public async ValueTask DrainAsync()
286286
_latched = true;
287287
_receiver.Complete();
288288

289-
// Latching is the best you can do here, otherwise it can hang
290-
//await _receiver.Completion;
289+
// Wait for in-flight handler executions to complete, bounded by a timeout
290+
// to prevent hanging during shutdown
291+
try
292+
{
293+
var completion = _receiver.WaitForCompletionAsync();
294+
await Task.WhenAny(completion, Task.Delay(_settings.DrainTimeout));
295+
}
296+
catch (Exception e)
297+
{
298+
_logger.LogDebug(e, "Error waiting for in-flight message processing to complete at {Uri}", Uri);
299+
}
291300

292301
await _incrementAttempts.DrainAsync();
293302
await _scheduleExecution.DrainAsync();

src/Wolverine/Transports/ListeningAgent.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,24 @@ public async Task EnqueueDirectlyAsync(IEnumerable<Envelope> envelopes)
139139

140140
public ListeningStatus Status { get; private set; } = ListeningStatus.Stopped;
141141

142+
/// <summary>
143+
/// Immediately latch the receiver to stop processing new messages from its internal queue.
144+
/// Does not stop the listener or drain — just prevents the receiver from executing any more messages.
145+
/// </summary>
146+
public void LatchReceiver()
147+
{
148+
if (_receiver is DurableReceiver dr)
149+
{
150+
dr.Latch();
151+
}
152+
// BufferedReceiver latches via its _latched field in DrainAsync,
153+
// but we need an immediate latch here too
154+
else if (_receiver is BufferedReceiver br)
155+
{
156+
br.Latch();
157+
}
158+
}
159+
142160
public async ValueTask StopAndDrainAsync()
143161
{
144162
if (Status == ListeningStatus.Stopped || Status == ListeningStatus.GloballyLatched)

src/Wolverine/Transports/Local/DurableLocalQueue.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,15 @@ public DurableLocalQueue(Endpoint endpoint, WolverineRuntime runtime)
6363

6464
public CircuitBreaker? CircuitBreaker { get; }
6565

66+
/// <summary>
67+
/// Immediately latch the receiver to stop processing new messages.
68+
/// </summary>
69+
public void LatchReceiver()
70+
{
71+
Latched = true;
72+
_receiver?.Latch();
73+
}
74+
6675
int IListenerCircuit.QueueCount => _receiver?.QueueCount ?? 0;
6776

6877
async Task IListenerCircuit.EnqueueDirectlyAsync(IEnumerable<Envelope> envelopes)

0 commit comments

Comments
 (0)