Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
using Logging;
using NServiceBus.Transport.Sql.Shared;

class PostgreSqlTransportInfrastructure : TransportInfrastructure
[Janitor.SkipWeaving]

Check failure on line 13 in src/NServiceBus.Transport.PostgreSql/PostgreSqlTransportInfrastructure.cs

View workflow job for this annotation

GitHub Actions / Linux

The type or namespace name 'Janitor' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 13 in src/NServiceBus.Transport.PostgreSql/PostgreSqlTransportInfrastructure.cs

View workflow job for this annotation

GitHub Actions / Linux

The type or namespace name 'Janitor' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 13 in src/NServiceBus.Transport.PostgreSql/PostgreSqlTransportInfrastructure.cs

View workflow job for this annotation

GitHub Actions / Windows

The type or namespace name 'Janitor' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 13 in src/NServiceBus.Transport.PostgreSql/PostgreSqlTransportInfrastructure.cs

View workflow job for this annotation

GitHub Actions / Windows

The type or namespace name 'Janitor' could not be found (are you missing a using directive or an assembly reference?)
class PostgreSqlTransportInfrastructure : TransportInfrastructure, IDisposable
{
//The limit is 55=63-max(8,8). 63 is the built-in PostgreSQL limit and we also need to reserve space for:
// - "_Seq_seq" suffix (8 bytes) used in the auto-created sequence for the main queue table
Expand Down Expand Up @@ -47,12 +48,27 @@

public override async Task Shutdown(CancellationToken cancellationToken = default)
{
await Task.WhenAll(Receivers.Values.Select(pump => pump.StopReceive(cancellationToken)))
.ConfigureAwait(false);
try
{
await Task.WhenAll(Receivers.Values.Select(pump => pump.StopReceive(cancellationToken)))
.ConfigureAwait(false);

if (dueDelayedMessageProcessor != null)
if (dueDelayedMessageProcessor != null)
{
await dueDelayedMessageProcessor.Stop(cancellationToken).ConfigureAwait(false);
}
}
finally
{
Dispose();
}
}

public void Dispose()
{
foreach (var r in Receivers.Values.Cast<MessageReceiver>())
{
await dueDelayedMessageProcessor.Stop(cancellationToken).ConfigureAwait(false);
r.Dispose();
}
}

Expand Down
55 changes: 33 additions & 22 deletions src/NServiceBus.Transport.Sql.Shared/Receiving/MessageReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
using System.Threading.Tasks;
using Logging;

class MessageReceiver : IMessageReceiver
[Janitor.SkipWeaving]

Check failure on line 8 in src/NServiceBus.Transport.Sql.Shared/Receiving/MessageReceiver.cs

View workflow job for this annotation

GitHub Actions / Linux

The type or namespace name 'Janitor' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 8 in src/NServiceBus.Transport.Sql.Shared/Receiving/MessageReceiver.cs

View workflow job for this annotation

GitHub Actions / Linux

The type or namespace name 'Janitor' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 8 in src/NServiceBus.Transport.Sql.Shared/Receiving/MessageReceiver.cs

View workflow job for this annotation

GitHub Actions / Linux

The type or namespace name 'Janitor' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 8 in src/NServiceBus.Transport.Sql.Shared/Receiving/MessageReceiver.cs

View workflow job for this annotation

GitHub Actions / Linux

The type or namespace name 'Janitor' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 8 in src/NServiceBus.Transport.Sql.Shared/Receiving/MessageReceiver.cs

View workflow job for this annotation

GitHub Actions / Windows

The type or namespace name 'Janitor' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 8 in src/NServiceBus.Transport.Sql.Shared/Receiving/MessageReceiver.cs

View workflow job for this annotation

GitHub Actions / Windows

The type or namespace name 'Janitor' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 8 in src/NServiceBus.Transport.Sql.Shared/Receiving/MessageReceiver.cs

View workflow job for this annotation

GitHub Actions / Windows

The type or namespace name 'Janitor' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 8 in src/NServiceBus.Transport.Sql.Shared/Receiving/MessageReceiver.cs

View workflow job for this annotation

GitHub Actions / Windows

The type or namespace name 'Janitor' could not be found (are you missing a using directive or an assembly reference?)
class MessageReceiver : IMessageReceiver, IDisposable
{
public MessageReceiver(
TransportDefinition transport,
Expand Down Expand Up @@ -125,35 +126,45 @@

public async Task StopReceive(CancellationToken cancellationToken = default)
{
if (messageReceivingCancellationTokenSource == null)
try
{
// already stopped or never started
return;
}

await messageReceivingCancellationTokenSource.CancelAsync().ConfigureAwait(false);
if (messageReceivingCancellationTokenSource == null)
{
// already stopped or never started
return;
}

await using (cancellationToken.Register(() => messageProcessingCancellationTokenSource?.Cancel()))
{
await messageReceivingTask.ConfigureAwait(false);
await messageReceivingCancellationTokenSource.CancelAsync().ConfigureAwait(false);

while (concurrencyLimiter.CurrentCount != maxConcurrency)
await using (cancellationToken.Register(() => messageProcessingCancellationTokenSource?.Cancel()))
{
// Pass CancellationToken.None so that no exceptions will be thrown while waiting
// for the message receiver to gracefully shut down. The cancellation tokens passed to
// ProcessMessages (and thus the message processing pipelines) will be responsible
// for more forcefully shutting down message processing after the user's shutdown SLA
// is reached
await Task.Delay(50, CancellationToken.None).ConfigureAwait(false);
await messageReceivingTask.ConfigureAwait(false);

while (concurrencyLimiter.CurrentCount != maxConcurrency)
{
// Pass CancellationToken.None so that no exceptions will be thrown while waiting
// for the message receiver to gracefully shut down. The cancellation tokens passed to
// ProcessMessages (and thus the message processing pipelines) will be responsible
// for more forcefully shutting down message processing after the user's shutdown SLA
// is reached
await Task.Delay(50, CancellationToken.None).ConfigureAwait(false);
}
}
}
finally
{
Dispose();
}
}

messageReceivingCircuitBreaker.Dispose();
messageProcessingCircuitBreaker.Dispose();
concurrencyLimiter.Dispose();
messageReceivingCancellationTokenSource.Dispose();
public void Dispose()
{
messageReceivingCircuitBreaker?.Dispose();
messageProcessingCircuitBreaker?.Dispose();
concurrencyLimiter?.Dispose();
messageReceivingCancellationTokenSource?.Dispose();
messageReceivingCancellationTokenSource = null;
messageProcessingCancellationTokenSource.Dispose();
messageProcessingCancellationTokenSource?.Dispose();
}

async Task ReceiveMessagesAndSwallowExceptions(CancellationToken messageReceivingCancellationToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
using NServiceBus.Transport.Sql.Shared;
using Transport;

class SqlServerTransportInfrastructure : TransportInfrastructure
[Janitor.SkipWeaving]

Check failure on line 13 in src/NServiceBus.Transport.SqlServer/SqlServerTransportInfrastructure.cs

View workflow job for this annotation

GitHub Actions / Linux

The type or namespace name 'Janitor' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 13 in src/NServiceBus.Transport.SqlServer/SqlServerTransportInfrastructure.cs

View workflow job for this annotation

GitHub Actions / Linux

The type or namespace name 'Janitor' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 13 in src/NServiceBus.Transport.SqlServer/SqlServerTransportInfrastructure.cs

View workflow job for this annotation

GitHub Actions / Windows

The type or namespace name 'Janitor' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 13 in src/NServiceBus.Transport.SqlServer/SqlServerTransportInfrastructure.cs

View workflow job for this annotation

GitHub Actions / Windows

The type or namespace name 'Janitor' could not be found (are you missing a using directive or an assembly reference?)
class SqlServerTransportInfrastructure : TransportInfrastructure, IDisposable
{
public SqlServerTransportInfrastructure(SqlServerTransport transport, HostSettings hostSettings, ReceiveSettings[] receiveSettings, string[] sendingAddresses)
{
Expand Down Expand Up @@ -337,12 +338,27 @@

public override async Task Shutdown(CancellationToken cancellationToken = default)
{
await Task.WhenAll(Receivers.Values.Select(pump => pump.StopReceive(cancellationToken)))
.ConfigureAwait(false);
try
{
await Task.WhenAll(Receivers.Values.Select(pump => pump.StopReceive(cancellationToken)))
.ConfigureAwait(false);

if (dueDelayedMessageProcessor != null)
if (dueDelayedMessageProcessor != null)
{
await dueDelayedMessageProcessor.Stop(cancellationToken).ConfigureAwait(false);
}
}
finally
{
Dispose();
}
}

public void Dispose()
{
foreach (var r in Receivers.Values.Cast<MessageReceiver>())
{
await dueDelayedMessageProcessor.Stop(cancellationToken).ConfigureAwait(false);
r.Dispose();
}
}

Expand Down
Loading