Skip to content

Commit 8f8e333

Browse files
jeremydmillerclaude
andcommitted
Fix AutoPurgeOnStartup for session-enabled Azure Service Bus subscriptions
AzureServiceBusSubscription.PurgeAsync was using CreateReceiver which fails when sessions are enabled. Now checks Options.RequiresSession and uses AcceptNextSessionAsync for session-enabled subscriptions, matching the existing pattern in AzureServiceBusQueue.PurgeAsync. Closes #2283 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent e416e5c commit 8f8e333

File tree

2 files changed

+175
-17
lines changed

2 files changed

+175
-17
lines changed
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
using JasperFx.Core;
2+
using Microsoft.Extensions.Hosting;
3+
using Shouldly;
4+
using Wolverine.Tracking;
5+
using Xunit;
6+
7+
namespace Wolverine.AzureServiceBus.Tests.Bugs;
8+
9+
public class Bug_2283_purge_session_subscription : IAsyncLifetime
10+
{
11+
private IHost _host;
12+
13+
public async Task InitializeAsync()
14+
{
15+
// This should not throw even though the subscription has sessions enabled
16+
// and AutoPurgeOnStartup is set. Before the fix, PurgeAsync on a session-enabled
17+
// subscription used CreateReceiver instead of AcceptNextSessionAsync, which fails.
18+
_host = await Host.CreateDefaultBuilder()
19+
.UseWolverine(opts =>
20+
{
21+
opts.UseAzureServiceBusTesting()
22+
.AutoProvision()
23+
.AutoPurgeOnStartup();
24+
25+
opts.PublishMessage<Bug2283Message>()
26+
.ToAzureServiceBusTopic("bug2283")
27+
.SendInline();
28+
29+
opts.ListenToAzureServiceBusSubscription("bug2283sub")
30+
.FromTopic("bug2283")
31+
.RequireSessions(1)
32+
.ProcessInline();
33+
}).StartAsync();
34+
}
35+
36+
public async Task DisposeAsync()
37+
{
38+
await _host.StopAsync();
39+
_host.Dispose();
40+
await AzureServiceBusTesting.DeleteAllEmulatorObjectsAsync();
41+
}
42+
43+
[Fact]
44+
public async Task can_start_with_auto_purge_and_session_enabled_subscription()
45+
{
46+
// If we got here, the host started successfully with AutoPurgeOnStartup
47+
// and a session-enabled subscription without throwing.
48+
// Now verify we can also send and receive messages through it.
49+
Func<IMessageContext, Task> sendMany = async bus =>
50+
{
51+
await bus.SendAsync(new Bug2283Message("First"), new DeliveryOptions { GroupId = "session1" });
52+
await bus.SendAsync(new Bug2283Message("Second"), new DeliveryOptions { GroupId = "session1" });
53+
await bus.SendAsync(new Bug2283Message("Third"), new DeliveryOptions { GroupId = "session1" });
54+
};
55+
56+
var session = await _host.TrackActivity()
57+
.IncludeExternalTransports()
58+
.Timeout(30.Seconds())
59+
.ExecuteAndWaitAsync(sendMany);
60+
61+
session.Received.MessagesOf<Bug2283Message>().Select(x => x.Name)
62+
.ShouldBe(["First", "Second", "Third"]);
63+
}
64+
65+
[Fact]
66+
public async Task can_purge_existing_messages_from_session_subscription()
67+
{
68+
// First, send some messages that will sit in the subscription
69+
Func<IMessageContext, Task> sendMessages = async bus =>
70+
{
71+
await bus.SendAsync(new Bug2283Message("Pre1"), new DeliveryOptions { GroupId = "purge-test" });
72+
await bus.SendAsync(new Bug2283Message("Pre2"), new DeliveryOptions { GroupId = "purge-test" });
73+
};
74+
75+
await _host.TrackActivity()
76+
.IncludeExternalTransports()
77+
.Timeout(30.Seconds())
78+
.ExecuteAndWaitAsync(sendMessages);
79+
80+
// Now start a second host with the same subscription + AutoPurgeOnStartup.
81+
// This validates that purge works when there ARE messages in a session-enabled subscription.
82+
using var host2 = await Host.CreateDefaultBuilder()
83+
.UseWolverine(opts =>
84+
{
85+
opts.UseAzureServiceBusTesting()
86+
.AutoProvision()
87+
.AutoPurgeOnStartup();
88+
89+
opts.PublishMessage<Bug2283Message>()
90+
.ToAzureServiceBusTopic("bug2283")
91+
.SendInline();
92+
93+
opts.ListenToAzureServiceBusSubscription("bug2283sub")
94+
.FromTopic("bug2283")
95+
.RequireSessions(1)
96+
.ProcessInline();
97+
}).StartAsync();
98+
99+
// Send new messages through host2 and verify only the new ones arrive
100+
Func<IMessageContext, Task> sendNew = async bus =>
101+
{
102+
await bus.SendAsync(new Bug2283Message("New1"), new DeliveryOptions { GroupId = "purge-test-2" });
103+
};
104+
105+
var session = await host2.TrackActivity()
106+
.IncludeExternalTransports()
107+
.Timeout(30.Seconds())
108+
.ExecuteAndWaitAsync(sendNew);
109+
110+
var received = session.Received.MessagesOf<Bug2283Message>().Select(x => x.Name).ToArray();
111+
received.ShouldContain("New1");
112+
113+
await host2.StopAsync();
114+
}
115+
}
116+
117+
public record Bug2283Message(string Name);
118+
119+
public static class Bug2283Handler
120+
{
121+
public static void Handle(Bug2283Message message)
122+
{
123+
// nothing
124+
}
125+
}

src/Transports/Azure/Wolverine.AzureServiceBus/Internal/AzureServiceBusSubscription.cs

Lines changed: 50 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -120,29 +120,62 @@ internal async ValueTask SetupAsync(ServiceBusAdministrationClient client, ILogg
120120

121121
public async ValueTask PurgeAsync(ILogger logger)
122122
{
123-
try
123+
await Parent.WithServiceBusClientAsync(async client =>
124124
{
125-
await Parent.WithServiceBusClientAsync(async client =>
125+
try
126126
{
127-
var receiver = client.CreateReceiver(Topic.TopicName, SubscriptionName);
128-
129-
var stopwatch = new Stopwatch();
130-
stopwatch.Start();
131-
while (stopwatch.ElapsedMilliseconds < 2000)
127+
if (Options.RequiresSession)
132128
{
133-
var messages = await receiver.ReceiveMessagesAsync(25, 1.Seconds());
134-
if (!messages.Any())
135-
{
136-
return;
137-
}
138-
139-
foreach (var message in messages) await receiver.CompleteMessageAsync(message);
129+
await purgeWithSessions(client);
130+
}
131+
else
132+
{
133+
await purgeWithoutSessions(client);
140134
}
141-
});
135+
}
136+
catch (Exception e)
137+
{
138+
logger.LogDebug(e, "Error trying to purge Azure Service Bus subscription {SubscriptionName} for topic {TopicName}", SubscriptionName, Topic.TopicName);
139+
}
140+
});
141+
}
142+
143+
private async Task purgeWithSessions(ServiceBusClient client)
144+
{
145+
var cancellation = new CancellationTokenSource();
146+
cancellation.CancelAfter(2000);
147+
148+
var stopwatch = new Stopwatch();
149+
stopwatch.Start();
150+
while (stopwatch.ElapsedMilliseconds < 2000)
151+
{
152+
var session = await client.AcceptNextSessionAsync(Topic.TopicName, SubscriptionName, cancellationToken: cancellation.Token);
153+
154+
var messages = await session.ReceiveMessagesAsync(25, 1.Seconds(), cancellation.Token);
155+
foreach (var message in messages) await session.CompleteMessageAsync(message, cancellation.Token);
156+
while (messages.Any())
157+
{
158+
messages = await session.ReceiveMessagesAsync(25, 1.Seconds(), cancellation.Token);
159+
foreach (var message in messages) await session.CompleteMessageAsync(message, cancellation.Token);
160+
}
142161
}
143-
catch (Exception e)
162+
}
163+
164+
private async Task purgeWithoutSessions(ServiceBusClient client)
165+
{
166+
var receiver = client.CreateReceiver(Topic.TopicName, SubscriptionName);
167+
168+
var stopwatch = new Stopwatch();
169+
stopwatch.Start();
170+
while (stopwatch.ElapsedMilliseconds < 2000)
144171
{
145-
logger.LogError(e, "Error trying to purge Azure Service Bus subscription {SubscriptionName} for topic {TopicName}", SubscriptionName, Topic.TopicName);
172+
var messages = await receiver.ReceiveMessagesAsync(25, 1.Seconds());
173+
if (!messages.Any())
174+
{
175+
return;
176+
}
177+
178+
foreach (var message in messages) await receiver.CompleteMessageAsync(message);
146179
}
147180
}
148181

0 commit comments

Comments
 (0)