Skip to content

Commit 6357b50

Browse files
Stop pumps on shutdown (#1468)
* Stop pumps on shutdown * Allow stopping multiple times * Stop the pumps on shutdown * Cleanup --------- Co-authored-by: Daniel Marbach <[email protected]>
1 parent 91e5267 commit 6357b50

File tree

4 files changed

+53
-42
lines changed

4 files changed

+53
-42
lines changed

src/NServiceBus.Transport.PostgreSql/PostgreSqlTransportInfrastructure.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,15 @@ public PostgreSqlTransportInfrastructure(PostgreSqlTransport transport, HostSett
5151
exceptionClassifier = new PostgreSqlExceptionClassifier();
5252
}
5353

54-
public override Task Shutdown(CancellationToken cancellationToken = default)
54+
public override async Task Shutdown(CancellationToken cancellationToken = default)
5555
{
56-
return dueDelayedMessageProcessor?.Stop(cancellationToken) ?? Task.FromResult(0);
56+
await Task.WhenAll(Receivers.Values.Select(pump => pump.StopReceive(cancellationToken)))
57+
.ConfigureAwait(false);
58+
59+
if (dueDelayedMessageProcessor != null)
60+
{
61+
await dueDelayedMessageProcessor.Stop(cancellationToken).ConfigureAwait(false);
62+
}
5763
}
5864

5965
public override string ToTransportAddress(Transport.QueueAddress address)

src/NServiceBus.Transport.PostgreSql/Receiving/QueueCreator.cs

Lines changed: 26 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -23,41 +23,36 @@ public QueueCreator(ISqlConstants sqlConstants, DbConnectionFactory connectionFa
2323

2424
public async Task CreateQueueIfNecessary(string[] queueAddresses, CanonicalQueueAddress delayedQueueAddress, CancellationToken cancellationToken = default)
2525
{
26-
using (var connection = await connectionFactory.OpenNewConnection(cancellationToken).ConfigureAwait(false))
26+
using var connection = await connectionFactory.OpenNewConnection(cancellationToken).ConfigureAwait(false);
27+
foreach (var address in queueAddresses)
2728
{
28-
foreach (var address in queueAddresses)
29-
{
30-
await CreateQueue(sqlConstants.CreateQueueText, addressTranslator(address), connection, createMessageBodyColumn, cancellationToken).ConfigureAwait(false);
31-
}
29+
await CreateQueue(sqlConstants.CreateQueueText, addressTranslator(address), connection, createMessageBodyColumn, cancellationToken).ConfigureAwait(false);
30+
}
3231

33-
if (delayedQueueAddress != null)
34-
{
35-
await CreateQueue(sqlConstants.CreateDelayedMessageStoreText, delayedQueueAddress, connection, createMessageBodyColumn, cancellationToken).ConfigureAwait(false);
36-
}
32+
if (delayedQueueAddress != null)
33+
{
34+
await CreateQueue(sqlConstants.CreateDelayedMessageStoreText, delayedQueueAddress, connection, createMessageBodyColumn, cancellationToken).ConfigureAwait(false);
3735
}
3836
}
3937

4038
async Task CreateQueue(string creationScript, CanonicalQueueAddress canonicalQueueAddress, DbConnection connection, bool createMessageBodyColumn, CancellationToken cancellationToken)
4139
{
4240
try
4341
{
44-
using (var transaction = connection.BeginTransaction())
45-
{
46-
var tableName = canonicalQueueAddress.QualifiedTableName;
42+
using var transaction = await connection.BeginTransactionAsync(cancellationToken).ConfigureAwait(false);
43+
var tableName = canonicalQueueAddress.QualifiedTableName;
4744

48-
var sql = string.Format(creationScript, tableName, canonicalQueueAddress.Table);
49-
using (var command = connection.CreateCommand())
50-
{
51-
command.Transaction = transaction;
52-
command.CommandText = sql;
53-
command.CommandType = CommandType.Text;
54-
55-
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
56-
57-
}
45+
var sql = string.Format(creationScript, tableName, canonicalQueueAddress.Table);
46+
using (var command = connection.CreateCommand())
47+
{
48+
command.Transaction = transaction;
49+
command.CommandText = sql;
50+
command.CommandType = CommandType.Text;
5851

59-
transaction.Commit();
52+
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
6053
}
54+
55+
await transaction.CommitAsync(cancellationToken).ConfigureAwait(false);
6156
}
6257
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
6358
{
@@ -75,19 +70,17 @@ async Task CreateQueue(string creationScript, CanonicalQueueAddress canonicalQue
7570
var advisoryLockId = CalculateLockId(canonicalQueueAddress.QualifiedTableName);
7671
var bodyStringSql = string.Format(sqlConstants.AddMessageBodyStringColumn, canonicalQueueAddress.Schema, canonicalQueueAddress.Table, advisoryLockId);
7772

78-
using (var transaction = connection.BeginTransaction())
73+
using var transaction = await connection.BeginTransactionAsync(cancellationToken).ConfigureAwait(false);
74+
using (var command = connection.CreateCommand())
7975
{
80-
using (var command = connection.CreateCommand())
81-
{
82-
command.Transaction = transaction;
83-
command.CommandText = bodyStringSql;
84-
command.CommandType = CommandType.Text;
85-
86-
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
87-
}
76+
command.Transaction = transaction;
77+
command.CommandText = bodyStringSql;
78+
command.CommandType = CommandType.Text;
8879

89-
transaction.Commit();
80+
await command.ExecuteNonQueryAsync(cancellationToken).ConfigureAwait(false);
9081
}
82+
83+
await transaction.CommitAsync(cancellationToken).ConfigureAwait(false);
9184
}
9285
}
9386

src/NServiceBus.Transport.Sql.Shared/Receiving/MessageReceiver.cs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,15 @@ public async Task ChangeConcurrency(PushRuntimeSettings newLimitations,
120120

121121
public async Task StopReceive(CancellationToken cancellationToken = default)
122122
{
123-
messageReceivingCancellationTokenSource?.Cancel();
123+
if (messageReceivingCancellationTokenSource == null)
124+
{
125+
// already stopped or never started
126+
return;
127+
}
128+
129+
await messageReceivingCancellationTokenSource.CancelAsync().ConfigureAwait(false);
124130

125-
using (cancellationToken.Register(() => messageProcessingCancellationTokenSource?.Cancel()))
131+
await using (cancellationToken.Register(() => messageProcessingCancellationTokenSource?.Cancel()))
126132
{
127133
await messageReceivingTask.ConfigureAwait(false);
128134

@@ -140,8 +146,9 @@ public async Task StopReceive(CancellationToken cancellationToken = default)
140146
messageReceivingCircuitBreaker.Dispose();
141147
messageProcessingCircuitBreaker.Dispose();
142148
concurrencyLimiter.Dispose();
143-
messageReceivingCancellationTokenSource?.Dispose();
144-
messageProcessingCancellationTokenSource?.Dispose();
149+
messageReceivingCancellationTokenSource.Dispose();
150+
messageReceivingCancellationTokenSource = null;
151+
messageProcessingCancellationTokenSource.Dispose();
145152
}
146153

147154
async Task ReceiveMessagesAndSwallowExceptions(CancellationToken messageReceivingCancellationToken)

src/NServiceBus.Transport.SqlServer/SqlServerTransportInfrastructure.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -342,11 +342,16 @@ public void ConfigureSendInfrastructure()
342342
connectionFactory);
343343
}
344344

345-
public override Task Shutdown(CancellationToken cancellationToken = default)
345+
public override async Task Shutdown(CancellationToken cancellationToken = default)
346346
{
347-
return dueDelayedMessageProcessor?.Stop(cancellationToken) ?? Task.FromResult(0);
348-
}
347+
await Task.WhenAll(Receivers.Values.Select(pump => pump.StopReceive(cancellationToken)))
348+
.ConfigureAwait(false);
349349

350+
if (dueDelayedMessageProcessor != null)
351+
{
352+
await dueDelayedMessageProcessor.Stop(cancellationToken).ConfigureAwait(false);
353+
}
354+
}
350355

351356
readonly SqlServerTransport transport;
352357
readonly HostSettings hostSettings;

0 commit comments

Comments
 (0)