Skip to content

Commit 515d1ae

Browse files
authored
When connecting to Azure ServiceBus fails, log additional error information to help diagnose and resolve the issue
1 parent 919ff0e commit 515d1ae

File tree

5 files changed

+67
-26
lines changed

5 files changed

+67
-26
lines changed

src/Tests/Sending/MessageDispatcherTests.cs

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public async Task Should_dispatch_unicast_isolated_dispatches_individually()
1717
{
1818
var client = new FakeServiceBusClient();
1919

20-
var dispatcher = new MessageDispatcher(new MessageSenderRegistry(client), TopicTopology.FromOptions(new TopologyOptions()));
20+
var dispatcher = new MessageDispatcher(client, new MessageSenderRegistry(), TopicTopology.FromOptions(new TopologyOptions()));
2121

2222
var operation1 =
2323
new TransportOperation(new OutgoingMessage("SomeId",
@@ -51,7 +51,7 @@ public void Should_rethrow_when_unicast_dispatch_destination_not_available()
5151
{
5252
var client = new FakeServiceBusClient();
5353

54-
var dispatcher = new MessageDispatcher(new MessageSenderRegistry(client), TopicTopology.FromOptions(new TopologyOptions()));
54+
var dispatcher = new MessageDispatcher(client, new MessageSenderRegistry(), TopicTopology.FromOptions(new TopologyOptions()));
5555

5656
var sender = new FakeSender
5757
{
@@ -75,7 +75,9 @@ public async Task Should_dispatch_multicast_isolated_dispatches_individually()
7575
{
7676
var client = new FakeServiceBusClient();
7777

78-
var dispatcher = new MessageDispatcher(new MessageSenderRegistry(client),
78+
var dispatcher = new MessageDispatcher(
79+
client,
80+
new MessageSenderRegistry(),
7981
TopicTopology.FromOptions(new TopologyOptions
8082
{
8183
PublishedEventToTopicsMap = { { typeof(SomeEvent).FullName, "sometopic" } }
@@ -114,7 +116,7 @@ public void Should_swallow_when_multicast_dispatch_destination_not_available()
114116
{
115117
var client = new FakeServiceBusClient();
116118

117-
var dispatcher = new MessageDispatcher(new MessageSenderRegistry(client), TopicTopology.FromOptions(new TopologyOptions()
119+
var dispatcher = new MessageDispatcher(client, new MessageSenderRegistry(), TopicTopology.FromOptions(new TopologyOptions()
118120
{
119121
PublishedEventToTopicsMap = { { typeof(SomeEvent).FullName, "sometopic" } }
120122
}));
@@ -150,7 +152,7 @@ public async Task Should_dispatch_unicast_isolated_dispatches_individually_per_d
150152
{
151153
var client = new FakeServiceBusClient();
152154

153-
var dispatcher = new MessageDispatcher(new MessageSenderRegistry(client), TopicTopology.FromOptions(new TopologyOptions()));
155+
var dispatcher = new MessageDispatcher(client, new MessageSenderRegistry(), TopicTopology.FromOptions(new TopologyOptions()));
154156

155157
var operation1 =
156158
new TransportOperation(new OutgoingMessage("SomeId",
@@ -188,7 +190,9 @@ public async Task
188190
{
189191
var client = new FakeServiceBusClient();
190192

191-
var dispatcher = new MessageDispatcher(new MessageSenderRegistry(client),
193+
var dispatcher = new MessageDispatcher(
194+
client,
195+
new MessageSenderRegistry(),
192196
TopicTopology.FromOptions(new TopologyOptions
193197
{
194198
PublishedEventToTopicsMap =
@@ -230,7 +234,7 @@ public async Task Should_dispatch_unicast_default_dispatches_together_as_batch()
230234
{
231235
var client = new FakeServiceBusClient();
232236

233-
var dispatcher = new MessageDispatcher(new MessageSenderRegistry(client), TopicTopology.FromOptions(new TopologyOptions()));
237+
var dispatcher = new MessageDispatcher(client, new MessageSenderRegistry(), TopicTopology.FromOptions(new TopologyOptions()));
234238

235239
var operation1 =
236240
new TransportOperation(new OutgoingMessage("SomeId",
@@ -266,7 +270,9 @@ public async Task Should_dispatch_multicast_default_dispatches_together_as_batch
266270
{
267271
var client = new FakeServiceBusClient();
268272

269-
var dispatcher = new MessageDispatcher(new MessageSenderRegistry(client),
273+
var dispatcher = new MessageDispatcher(
274+
client,
275+
new MessageSenderRegistry(),
270276
TopicTopology.FromOptions(new TopologyOptions
271277
{
272278
PublishedEventToTopicsMap = { { typeof(SomeEvent).FullName, "sometopic" } }
@@ -306,7 +312,7 @@ public async Task Should_dispatch_unicast_default_dispatches_together_as_batch_p
306312
{
307313
var client = new FakeServiceBusClient();
308314

309-
var dispatcher = new MessageDispatcher(new MessageSenderRegistry(client), TopicTopology.FromOptions(new TopologyOptions()));
315+
var dispatcher = new MessageDispatcher(client, new MessageSenderRegistry(), TopicTopology.FromOptions(new TopologyOptions()));
310316

311317
var operation1 =
312318
new TransportOperation(new OutgoingMessage("SomeId",
@@ -354,7 +360,9 @@ public async Task
354360
{
355361
var client = new FakeServiceBusClient();
356362

357-
var dispatcher = new MessageDispatcher(new MessageSenderRegistry(client),
363+
var dispatcher = new MessageDispatcher(
364+
client,
365+
new MessageSenderRegistry(),
358366
TopicTopology.FromOptions(new TopologyOptions
359367
{
360368
PublishedEventToTopicsMap =
@@ -409,7 +417,9 @@ public async Task Should_allow_mixing_operations()
409417
{
410418
var client = new FakeServiceBusClient();
411419

412-
var dispatcher = new MessageDispatcher(new MessageSenderRegistry(client),
420+
var dispatcher = new MessageDispatcher(
421+
client,
422+
new MessageSenderRegistry(),
413423
TopicTopology.FromOptions(new TopologyOptions
414424
{
415425
PublishedEventToTopicsMap =
@@ -491,7 +501,7 @@ public async Task Should_use_connection_information_of_existing_service_bus_tran
491501
defaultClient.Senders["SomeDestination"] = defaultSender;
492502
var transactionalClient = new FakeServiceBusClient();
493503

494-
var dispatcher = new MessageDispatcher(new MessageSenderRegistry(defaultClient), TopicTopology.FromOptions(new TopologyOptions()));
504+
var dispatcher = new MessageDispatcher(defaultClient, new MessageSenderRegistry(), TopicTopology.FromOptions(new TopologyOptions()));
495505

496506
var operation1 =
497507
new TransportOperation(new OutgoingMessage("SomeId",
@@ -543,7 +553,7 @@ public void Should_throw_when_detecting_more_than_hundred_messages_when_transact
543553
return false;
544554
};
545555

546-
var dispatcher = new MessageDispatcher(new MessageSenderRegistry(defaultClient), TopicTopology.FromOptions(new TopologyOptions()));
556+
var dispatcher = new MessageDispatcher(defaultClient, new MessageSenderRegistry(), TopicTopology.FromOptions(new TopologyOptions()));
547557

548558
var nrOfMessages = 150;
549559
var operations = new List<TransportOperation>(nrOfMessages);
@@ -587,7 +597,7 @@ public async Task Should_split_into_multiple_batches_according_to_the_sdk()
587597
return false;
588598
};
589599

590-
var dispatcher = new MessageDispatcher(new MessageSenderRegistry(defaultClient), TopicTopology.FromOptions(new TopologyOptions()));
600+
var dispatcher = new MessageDispatcher(defaultClient, new MessageSenderRegistry(), TopicTopology.FromOptions(new TopologyOptions()));
591601

592602
var operations = new List<TransportOperation>(200);
593603
for (int i = 0; i < 200; i++)
@@ -624,7 +634,7 @@ public async Task Should_fallback_to_individual_sends_when_messages_cannot_be_ad
624634

625635
defaultSender.TryAdd = msg => false;
626636

627-
var dispatcher = new MessageDispatcher(new MessageSenderRegistry(defaultClient), TopicTopology.FromOptions(new TopologyOptions()));
637+
var dispatcher = new MessageDispatcher(defaultClient, new MessageSenderRegistry(), TopicTopology.FromOptions(new TopologyOptions()));
628638

629639
var operations = new List<TransportOperation>(5);
630640
for (int i = 0; i < 5; i++)
@@ -666,7 +676,7 @@ public async Task
666676
};
667677
};
668678

669-
var dispatcher = new MessageDispatcher(new MessageSenderRegistry(defaultClient), TopicTopology.FromOptions(new TopologyOptions()));
679+
var dispatcher = new MessageDispatcher(defaultClient, new MessageSenderRegistry(), TopicTopology.FromOptions(new TopologyOptions()));
670680

671681
var operations = new List<TransportOperation>(5);
672682
for (int i = 0; i < 10; i++)
@@ -699,7 +709,7 @@ public async Task Should_use_default_connection_information_when_existing_servic
699709
var transactionalSender = new FakeSender();
700710
transactionalClient.Senders["SomeDestination"] = transactionalSender;
701711

702-
var dispatcher = new MessageDispatcher(new MessageSenderRegistry(defaultClient), TopicTopology.FromOptions(new TopologyOptions()));
712+
var dispatcher = new MessageDispatcher(defaultClient, new MessageSenderRegistry(), TopicTopology.FromOptions(new TopologyOptions()));
703713

704714
var operation1 =
705715
new TransportOperation(new OutgoingMessage("SomeId",

src/Tests/Sending/MessageRegistryTests.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,17 @@ public class MessageRegistryTests
1010
[Test]
1111
public async Task Should_get_cached_sender_per_destination()
1212
{
13-
var pool = new MessageSenderRegistry(new ServiceBusClient(FakeConnectionString));
13+
var client = new ServiceBusClient(FakeConnectionString);
14+
var pool = new MessageSenderRegistry();
1415

1516
try
1617
{
17-
var firstMessageSenderDest1 = pool.GetMessageSender("dest1", null);
18+
var firstMessageSenderDest1 = pool.GetMessageSender("dest1", client);
1819

19-
var firstMessageSenderDest2 = pool.GetMessageSender("dest2", null);
20+
var firstMessageSenderDest2 = pool.GetMessageSender("dest2", client);
2021

21-
var secondMessageSenderDest1 = pool.GetMessageSender("dest1", null);
22-
var secondMessageSenderDest2 = pool.GetMessageSender("dest2", null);
22+
var secondMessageSenderDest1 = pool.GetMessageSender("dest1", client);
23+
var secondMessageSenderDest2 = pool.GetMessageSender("dest2", client);
2324

2425
Assert.Multiple(() =>
2526
{

src/Transport/AzureServiceBusTransportInfrastructure.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,10 @@ ServiceBusAdministrationClient administrationClient
3333
this.administrationClient = administrationClient;
3434
this.receiveSettingsAndClientPairs = receiveSettingsAndClientPairs;
3535

36-
messageSenderRegistry = new MessageSenderRegistry(defaultClient);
36+
messageSenderRegistry = new MessageSenderRegistry();
3737

3838
Dispatcher = new MessageDispatcher(
39+
defaultClient,
3940
messageSenderRegistry,
4041
transportSettings.Topology,
4142
transportSettings.OutgoingNativeMessageCustomization

src/Transport/Sending/MessageDispatcher.cs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
using Logging;
1111

1212
class MessageDispatcher(
13+
ServiceBusClient defaultClient,
1314
MessageSenderRegistry messageSenderRegistry,
1415
TopicTopology topology,
1516
OutgoingNativeMessageCustomizationAction? customizerCallback = null)
@@ -147,6 +148,7 @@ async Task DispatchBatchOrFallbackToIndividualSendsForDestination(string destina
147148
{
148149
int batchCount = 0;
149150
List<ServiceBusMessage>? messagesTooLargeToBeBatched = null;
151+
client ??= defaultClient;
150152
var sender = messageSenderRegistry.GetMessageSender(destination, client);
151153
while (messagesToSend.Count > 0)
152154
{
@@ -227,6 +229,19 @@ async Task DispatchBatchOrFallbackToIndividualSendsForDestination(string destina
227229

228230
return;
229231
}
232+
catch (ServiceBusException e) when (e.Reason == ServiceBusFailureReason.ServiceCommunicationProblem)
233+
{
234+
if (client.TransportType == ServiceBusTransportType.AmqpTcp)
235+
{
236+
Log.Error("Couldn't connect using AMQP over TCP. Ensure AMQP ports 5671 and 5672 on the server are reachable, or try using websockets");
237+
}
238+
else if (client.TransportType == ServiceBusTransportType.AmqpWebSockets)
239+
{
240+
Log.Error("Couldn't connect using AMQP over web sockets.");
241+
}
242+
243+
throw;
244+
}
230245
}
231246

232247
if (messagesTooLargeToBeBatched is not null)
@@ -292,6 +307,7 @@ Task DispatchIsolatedOperations(
292307
async Task DispatchForDestination(string destination, bool isMulticast, ServiceBusClient? client,
293308
Transaction? transaction, ServiceBusMessage message, CancellationToken cancellationToken)
294309
{
310+
client ??= defaultClient;
295311
var sender = messageSenderRegistry.GetMessageSender(destination, client);
296312
try
297313
{
@@ -307,5 +323,18 @@ async Task DispatchForDestination(string destination, bool isMulticast, ServiceB
307323
Log.Debug($"Sending message with message ID '{message.MessageId}' to topic {destination} failed because the destination does not exist.");
308324
}
309325
}
326+
catch (ServiceBusException e) when (e.Reason == ServiceBusFailureReason.ServiceCommunicationProblem)
327+
{
328+
if (client.TransportType == ServiceBusTransportType.AmqpTcp)
329+
{
330+
Log.Error("Couldn't connect using AMQP over TCP. Ensure AMQP ports 5671 and 5672 on the server are reachable, or try using websockets");
331+
}
332+
else if (client.TransportType == ServiceBusTransportType.AmqpWebSockets)
333+
{
334+
Log.Error("Couldn't connect using AMQP over web sockets.");
335+
}
336+
337+
throw;
338+
}
310339
}
311340
}

src/Transport/Sending/MessageSenderRegistry.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@
77
using System.Threading.Tasks;
88
using Azure.Messaging.ServiceBus;
99

10-
sealed class MessageSenderRegistry(ServiceBusClient defaultClient)
10+
sealed class MessageSenderRegistry
1111
{
12-
public ServiceBusSender GetMessageSender(string destination, ServiceBusClient? client)
12+
public ServiceBusSender GetMessageSender(string destination, ServiceBusClient client)
1313
{
1414
// According to the client SDK guidelines we can safely use these client objects for concurrent asynchronous
1515
// operations and from multiple threads.
1616
// see https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-performance-improvements
17-
var lazySender = destinationToSenderMapping.GetOrAdd((destination, client ?? defaultClient),
17+
var lazySender = destinationToSenderMapping.GetOrAdd((destination, client),
1818
static arg =>
1919
{
2020
(string innerDestination, ServiceBusClient innerClient) = arg;

0 commit comments

Comments
 (0)