Skip to content

Commit 95b66cc

Browse files
authored
High CPU utilization due to large number of DELETE statements (#1450)
remove file
1 parent 96a9950 commit 95b66cc

File tree

8 files changed

+264
-18
lines changed

8 files changed

+264
-18
lines changed

src/NServiceBus.Transport.PostgreSql.TransportTests/ConfigurePostgreSqlTransportInfrastructure.cs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,11 @@
1111

1212
public class ConfigurePostgreSqlTransportInfrastructure : IConfigureTransportInfrastructure
1313
{
14-
public TransportDefinition CreateTransportDefinition()
15-
{
16-
connectionString = Environment.GetEnvironmentVariable("PostgreSqlTransportConnectionString") ?? @"User ID=user;Password=admin;Host=localhost;Port=54320;Database=nservicebus;Pooling=true;Connection Lifetime=0;";
14+
public static string ConnectionString =>
15+
Environment.GetEnvironmentVariable("PostgreSqlTransportConnectionString") ??
16+
@"User ID=user;Password=admin;Host=localhost;Port=54320;Database=nservicebus;Pooling=true;Connection Lifetime=0;";
1717

18-
return new PostgreSqlTransport(connectionString);
19-
}
18+
public TransportDefinition CreateTransportDefinition() => new PostgreSqlTransport(ConnectionString);
2019

2120
public async Task<TransportInfrastructure> Configure(TransportDefinition transportDefinition, HostSettings hostSettings, QueueAddress queueAddress, string errorQueueName, CancellationToken cancellationToken = default)
2221
{
@@ -48,7 +47,7 @@ public async Task Cleanup(CancellationToken cancellationToken = default)
4847
return;
4948
}
5049

51-
if (string.IsNullOrWhiteSpace(connectionString))
50+
if (string.IsNullOrWhiteSpace(ConnectionString))
5251
{
5352
return;
5453
}
@@ -64,7 +63,7 @@ public async Task Cleanup(CancellationToken cancellationToken = default)
6463
queues.Add(delayedDeliveryQueueName);
6564
}
6665

67-
using var conn = new NpgsqlConnection(connectionString);
66+
using var conn = new NpgsqlConnection(ConnectionString);
6867
await conn.OpenAsync(cancellationToken).ConfigureAwait(false);
6968

7069
foreach (var queue in queues.Where(q => !string.IsNullOrWhiteSpace(q)))
@@ -91,7 +90,6 @@ public async Task Cleanup(CancellationToken cancellationToken = default)
9190
}
9291
}
9392

94-
string connectionString;
9593
string inputQueueName;
9694
string errorQueueName;
9795
PostgreSqlTransport postgreSqlTransport;

src/NServiceBus.Transport.PostgreSql.TransportTests/NServiceBus.Transport.PostgreSql.TransportTests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
<ItemGroup>
1010
<ProjectReference Include="..\NServiceBus.Transport.PostgreSql\NServiceBus.Transport.PostgreSql.csproj" />
11+
<ProjectReference Include="..\NServiceBus.Transport.Sql.Shared\NServiceBus.Transport.Sql.Shared.csproj" />
1112
</ItemGroup>
1213

1314
<ItemGroup>
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
#pragma warning disable PS0018
2+
namespace NServiceBus.TransportTests;
3+
4+
using System;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using NUnit.Framework;
8+
using Transport;
9+
using Transport.PostgreSql;
10+
11+
//HINT: This test operates on the lower level than the transport tests because we need to verify
12+
// internal behavior of the transport. In this case the peek behavior that is not exposed by the transport seam.
13+
// Therefore we are not using the transport base class.
14+
public class When_receive_takes_long_to_complete
15+
{
16+
[TestCase(TransportTransactionMode.None)]
17+
[TestCase(TransportTransactionMode.ReceiveOnly)]
18+
[TestCase(TransportTransactionMode.SendsAtomicWithReceive)]
19+
[TestCase(TransportTransactionMode.TransactionScope)]
20+
public async Task Peeker_should_provide_accurate_queue_length_estimate(TransportTransactionMode transactionMode)
21+
{
22+
await SendAMessage(connectionFactory, queue);
23+
await SendAMessage(connectionFactory, queue);
24+
25+
var (txStarted, txFinished, txCompletionSource) = SpawnALongRunningReceiveTransaction();
26+
27+
await txStarted;
28+
29+
int peekCount;
30+
31+
await using (var connection = await connectionFactory.OpenNewConnection())
32+
{
33+
var transaction = await connection.BeginTransactionAsync();
34+
35+
queue.FormatPeekCommand();
36+
peekCount = await queue.TryPeek(connection, transaction, null);
37+
}
38+
39+
txCompletionSource.SetResult();
40+
await txFinished;
41+
42+
Assert.That(peekCount, Is.EqualTo(1), "A long running receive transaction should not skew the estimation for number of messages in the queue.");
43+
}
44+
45+
static async Task<PostgreSqlTableBasedQueue> CreateATestQueue(PostgreSqlDbConnectionFactory connectionFactory)
46+
{
47+
var queueName = "queue_length_estimation_test";
48+
49+
var sqlConstants = new PostgreSqlConstants();
50+
51+
var queue = new PostgreSqlTableBasedQueue(sqlConstants, queueName, queueName, false);
52+
53+
var addressTranslator = new QueueAddressTranslator("public", null, new QueueSchemaOptions());
54+
var queueCreator = new QueueCreator(sqlConstants, connectionFactory, addressTranslator.Parse, false);
55+
56+
await queueCreator.CreateQueueIfNecessary(new[] { queueName }, null);
57+
58+
await using var connection = await connectionFactory.OpenNewConnection();
59+
await queue.Purge(connection);
60+
61+
return queue;
62+
}
63+
64+
static async Task SendAMessage(PostgreSqlDbConnectionFactory connectionFactory, PostgreSqlTableBasedQueue queue)
65+
{
66+
await using var connection = await connectionFactory.OpenNewConnection();
67+
var transaction = await connection.BeginTransactionAsync();
68+
69+
await queue.Send(
70+
new OutgoingMessage(Guid.NewGuid().ToString(), [], Array.Empty<byte>()),
71+
TimeSpan.MaxValue, connection, transaction);
72+
73+
await transaction.CommitAsync();
74+
}
75+
76+
(Task, Task, TaskCompletionSource) SpawnALongRunningReceiveTransaction()
77+
{
78+
var started = new TaskCompletionSource();
79+
var cancellationTokenSource = new TaskCompletionSource();
80+
81+
var task = Task.Run(async () =>
82+
{
83+
await using var connection = await connectionFactory.OpenNewConnection();
84+
var transaction = await connection.BeginTransactionAsync();
85+
86+
await queue.TryReceive(connection, transaction);
87+
88+
started.SetResult();
89+
90+
await cancellationTokenSource.Task;
91+
});
92+
93+
return (started.Task, task, cancellationTokenSource);
94+
}
95+
96+
[SetUp]
97+
public async Task Setup()
98+
{
99+
connectionFactory = new PostgreSqlDbConnectionFactory(ConfigurePostgreSqlTransportInfrastructure.ConnectionString);
100+
101+
queue = await CreateATestQueue(connectionFactory);
102+
}
103+
104+
[TearDown]
105+
public async Task TearDown()
106+
{
107+
if (queue == null)
108+
{
109+
return;
110+
}
111+
112+
await using var connection = await connectionFactory.OpenNewConnection(CancellationToken.None);
113+
await using var comm = connection.CreateCommand();
114+
115+
comm.CommandText = $"DROP TABLE IF EXISTS \"public\".\"{queue}\"; " +
116+
$"DROP SEQUENCE IF EXISTS \"public\".\"{queue}_seq_seq\";";
117+
118+
await comm.ExecuteNonQueryAsync(CancellationToken.None);
119+
}
120+
121+
PostgreSqlTableBasedQueue queue;
122+
PostgreSqlDbConnectionFactory connectionFactory;
123+
}

src/NServiceBus.Transport.PostgreSql/PostgreSqlConstants.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ SELECT now() AT TIME ZONE 'UTC' as UtcNow, Due as NextDue
5353
ORDER BY Due LIMIT 1 FOR UPDATE SKIP LOCKED";
5454

5555
public string PeekText { get; set; } = @"
56-
SELECT COALESCE(cast(max(seq) - min(seq) + 1 AS int), 0) Id FROM {0}";
56+
SELECT COALESCE(cast((SELECT seq FROM {0} ORDER BY seq DESC LIMIT 1 FOR UPDATE SKIP LOCKED)
57+
- (SELECT seq FROM {0} ORDER BY seq ASC LIMIT 1 FOR UPDATE SKIP LOCKED) + 1 AS int), 0);";
5758

5859
public string AddMessageBodyStringColumn { get; set; } = @"
5960
DO $$

src/NServiceBus.Transport.Sql.Shared/NServiceBus.Transport.Sql.Shared.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
<Project Sdk="Microsoft.NET.Sdk">
1+
<Project Sdk="Microsoft.NET.Sdk">
22

33
<PropertyGroup>
44
<TargetFramework>net8.0</TargetFramework>

src/NServiceBus.Transport.SqlServer.TransportTests/ConfigureSqlServerTransportInfrastructure.cs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,11 @@
99

1010
public class ConfigureSqlServerTransportInfrastructure : IConfigureTransportInfrastructure
1111
{
12-
public TransportDefinition CreateTransportDefinition()
13-
{
14-
connectionString = Environment.GetEnvironmentVariable("SqlServerTransportConnectionString") ?? @"Data Source=.\SQLEXPRESS;Initial Catalog=nservicebus;Integrated Security=True;TrustServerCertificate=true";
12+
public static string ConnectionString =>
13+
Environment.GetEnvironmentVariable("SqlServerTransportConnectionString")
14+
?? @"Data Source=.\SQLEXPRESS;Initial Catalog=nservicebus;Integrated Security=True;TrustServerCertificate=true";
1515

16-
return new SqlServerTransport(connectionString);
17-
}
16+
public TransportDefinition CreateTransportDefinition() => new SqlServerTransport(ConnectionString);
1817

1918
public async Task<TransportInfrastructure> Configure(TransportDefinition transportDefinition, HostSettings hostSettings, QueueAddress queueAddress, string errorQueueName, CancellationToken cancellationToken = default)
2019
{
@@ -51,7 +50,7 @@ public async Task Cleanup(CancellationToken cancellationToken = default)
5150
return;
5251
}
5352

54-
if (string.IsNullOrWhiteSpace(connectionString) == false)
53+
if (string.IsNullOrWhiteSpace(ConnectionString) == false)
5554
{
5655
var queues = new[]
5756
{
@@ -60,7 +59,7 @@ public async Task Cleanup(CancellationToken cancellationToken = default)
6059
sqlServerTransport.Testing.DelayedDeliveryQueue
6160
};
6261

63-
using (var conn = new SqlConnection(connectionString))
62+
using (var conn = new SqlConnection(ConnectionString))
6463
{
6564
await conn.OpenAsync(cancellationToken);
6665

@@ -80,7 +79,6 @@ public async Task Cleanup(CancellationToken cancellationToken = default)
8079
}
8180
}
8281

83-
string connectionString;
8482
string inputQueueName;
8583
string errorQueueName;
8684
SqlServerTransport sqlServerTransport;

src/NServiceBus.Transport.SqlServer.TransportTests/NServiceBus.Transport.SqlServer.TransportTests.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
<ItemGroup>
1010
<ProjectReference Include="..\NServiceBus.Transport.SqlServer\NServiceBus.Transport.SqlServer.csproj" />
11+
<ProjectReference Include="..\NServiceBus.Transport.Sql.Shared\NServiceBus.Transport.Sql.Shared.csproj" />
1112
</ItemGroup>
1213

1314
<ItemGroup>
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
#pragma warning disable PS0018
2+
namespace NServiceBus.TransportTests;
3+
4+
using System;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using NUnit.Framework;
8+
using Transport;
9+
using Transport.SqlServer;
10+
11+
//HINT: This test operates on the lower level than the transport tests because we need to verify
12+
// internal behavior of the transport. In this case the peek behavior that is not exposed by the transport seam.
13+
// Therefore we are not using the transport base class.
14+
public class When_receive_takes_long_to_complete
15+
{
16+
[TestCase(TransportTransactionMode.None)]
17+
[TestCase(TransportTransactionMode.ReceiveOnly)]
18+
[TestCase(TransportTransactionMode.SendsAtomicWithReceive)]
19+
[TestCase(TransportTransactionMode.TransactionScope)]
20+
public async Task Peeker_should_provide_accurate_queue_length_estimate(TransportTransactionMode transactionMode)
21+
{
22+
queue = await CreateATestQueue(connectionFactory);
23+
24+
await SendAMessage(connectionFactory, queue);
25+
await SendAMessage(connectionFactory, queue);
26+
27+
var (txStarted, txFinished, txCompletionSource) = SpawnALongRunningReceiveTransaction(connectionFactory, queue);
28+
29+
await txStarted;
30+
31+
int peekCount;
32+
33+
using (var connection = await connectionFactory.OpenNewConnection())
34+
{
35+
var transaction = await connection.BeginTransactionAsync();
36+
37+
queue.FormatPeekCommand();
38+
peekCount = await queue.TryPeek(connection, transaction, null);
39+
}
40+
41+
txCompletionSource.SetResult();
42+
await txFinished;
43+
44+
Assert.That(peekCount, Is.EqualTo(1), "A long running receive transaction should not skew the estimation for number of messages in the queue.");
45+
}
46+
47+
static async Task<SqlTableBasedQueue> CreateATestQueue(SqlServerDbConnectionFactory connectionFactory)
48+
{
49+
var queueName = "queue_length_estimation_test";
50+
51+
var sqlConstants = new SqlServerConstants();
52+
53+
var queue = new SqlTableBasedQueue(sqlConstants, queueName, queueName, false);
54+
55+
var addressTranslator = new QueueAddressTranslator("nservicebus", "dbo", null, null);
56+
var queueCreator = new QueueCreator(sqlConstants, connectionFactory, addressTranslator.Parse, false);
57+
58+
await queueCreator.CreateQueueIfNecessary(new[] { queueName }, null);
59+
60+
await using var connection = await connectionFactory.OpenNewConnection();
61+
await queue.Purge(connection);
62+
63+
return queue;
64+
}
65+
66+
static async Task SendAMessage(SqlServerDbConnectionFactory connectionFactory, SqlTableBasedQueue queue)
67+
{
68+
await using var connection = await connectionFactory.OpenNewConnection();
69+
var transaction = await connection.BeginTransactionAsync();
70+
71+
await queue.Send(
72+
new OutgoingMessage(Guid.NewGuid().ToString(), [], Array.Empty<byte>()),
73+
TimeSpan.MaxValue, connection, transaction);
74+
75+
await transaction.CommitAsync();
76+
}
77+
78+
(Task, Task, TaskCompletionSource) SpawnALongRunningReceiveTransaction(SqlServerDbConnectionFactory connectionFactory, SqlTableBasedQueue queue)
79+
{
80+
var started = new TaskCompletionSource();
81+
var cancellationTokenSource = new TaskCompletionSource();
82+
83+
var task = Task.Run(async () =>
84+
{
85+
await using var connection = await connectionFactory.OpenNewConnection();
86+
var transaction = await connection.BeginTransactionAsync();
87+
88+
await queue.TryReceive(connection, transaction);
89+
90+
started.SetResult();
91+
92+
await cancellationTokenSource.Task;
93+
});
94+
95+
return (started.Task, task, cancellationTokenSource);
96+
}
97+
98+
[SetUp]
99+
public async Task Setup()
100+
{
101+
connectionFactory = new SqlServerDbConnectionFactory(ConfigureSqlServerTransportInfrastructure.ConnectionString);
102+
103+
queue = await CreateATestQueue(connectionFactory);
104+
}
105+
106+
[TearDown]
107+
public async Task TearDown()
108+
{
109+
if (queue == null)
110+
{
111+
return;
112+
}
113+
114+
await using var connection = await connectionFactory.OpenNewConnection(CancellationToken.None);
115+
await using var comm = connection.CreateCommand();
116+
117+
comm.CommandText = $"IF OBJECT_ID('{queue}', 'U') IS NOT NULL DROP TABLE {queue}";
118+
119+
await comm.ExecuteNonQueryAsync(CancellationToken.None);
120+
}
121+
122+
SqlTableBasedQueue queue;
123+
SqlServerDbConnectionFactory connectionFactory;
124+
}

0 commit comments

Comments
 (0)