@@ -7,12 +7,13 @@ public class ServiceBusMessageBus : MessageBusBase<ServiceBusMessageBusSettings>
77 private readonly ILogger _logger ;
88 private ServiceBusClient _client ;
99 private SafeDictionaryWrapper < string , ServiceBusSender > _producerByPath ;
10+ private ServiceBusTopologyService _topologyService ;
1011
1112 public ServiceBusMessageBus ( MessageBusSettings settings , ServiceBusMessageBusSettings providerSettings )
1213 : base ( settings , providerSettings )
1314 {
1415 _logger = LoggerFactory . CreateLogger < ServiceBusMessageBus > ( ) ;
15-
16+ _topologyService = new ServiceBusTopologyService ( LoggerFactory . CreateLogger < ServiceBusTopologyService > ( ) , Settings , ProviderSettings ) ;
1617 OnBuildProvider ( ) ;
1718 }
1819
@@ -46,9 +47,7 @@ protected override async ValueTask DisposeAsyncCore()
4647 public override async Task ProvisionTopology ( )
4748 {
4849 await base . ProvisionTopology ( ) ;
49-
50- var provisioningService = new ServiceBusTopologyService ( LoggerFactory . CreateLogger < ServiceBusTopologyService > ( ) , Settings , ProviderSettings ) ;
51- await provisioningService . ProvisionTopology ( ) ; // provisioning happens asynchronously
50+ await _topologyService . ProvisionTopology ( ) ; // provisioning happens asynchronously
5251 }
5352
5453 #region Overrides of MessageBusBase
@@ -133,15 +132,48 @@ public override async Task ProduceToTransport(object message, Type messageType,
133132 {
134133 var transportMessage = GetTransportMessage ( message , messageType , messageHeaders , path ) ;
135134 var senderClient = _producerByPath . GetOrAdd ( path ) ;
136- await senderClient . SendMessageAsync ( transportMessage , cancellationToken ) . ConfigureAwait ( false ) ;
137- _logger . LogDebug ( "Delivered item {Message} of type {MessageType} to {Path}" , message , messageType ? . Name , path ) ;
135+
136+ try
137+ {
138+ await senderClient . SendMessageAsync ( transportMessage , cancellationToken ) . ConfigureAwait ( false ) ;
139+ }
140+ catch ( ServiceBusException ex ) when ( ex . Reason == ServiceBusFailureReason . MessagingEntityNotFound )
141+ {
142+ await EnsurePathExists ( messageType , path ) ;
143+ // Resend messages after the path has been created
144+ await senderClient . SendMessageAsync ( transportMessage , cancellationToken ) . ConfigureAwait ( false ) ;
145+ }
146+
147+ _logger . LogDebug ( "Delivered message {Message} of type {MessageType} to {Path}" , message , messageType ? . Name , path ) ;
138148 }
139149 catch ( Exception ex ) when ( ex is not ProducerMessageBusException && ex is not TaskCanceledException )
140150 {
141151 throw new ProducerMessageBusException ( GetProducerErrorMessage ( path , message , messageType , ex ) , ex ) ;
142152 }
143153 }
144154
155+ /// <summary>
156+ /// When the topic or queue does not exist, we can try to create it.
157+ /// This happens in cases where the path is dynamically set upon publish/send (different from default path)
158+ /// </summary>
159+ /// <param name="messageType"></param>
160+ /// <param name="path">topic or queue</param>
161+ /// <returns></returns>
162+ private async Task EnsurePathExists ( Type messageType , string path )
163+ {
164+ var producerSettings = GetProducerSettings ( messageType ) ;
165+ if ( producerSettings . PathKind == PathKind . Topic )
166+ {
167+ _logger . LogInformation ( "Topic {Path} does not exist, trying to create it" , path ) ;
168+ await _topologyService . TryCreateTopic ( path , ProviderSettings . TopologyProvisioning . CanProducerCreateTopic ) ;
169+ }
170+ else
171+ {
172+ _logger . LogInformation ( "Queue {Path} does not exist, trying to create it" , path ) ;
173+ await _topologyService . TryCreateQueue ( path , ProviderSettings . TopologyProvisioning . CanProducerCreateQueue ) ;
174+ }
175+ }
176+
145177 public override async Task < ProduceToTransportBulkResult < T > > ProduceToTransportBulk < T > ( IReadOnlyCollection < T > envelopes , string path , IMessageBusTarget targetBus , CancellationToken cancellationToken )
146178 {
147179 AssertActive ( ) ;
@@ -150,7 +182,21 @@ Task SendBatchAsync(ServiceBusSender senderClient, ServiceBusMessageBatch batch,
150182 Retry . WithDelay (
151183 operation : async cancellationToken =>
152184 {
153- await senderClient . SendMessagesAsync ( batch , cancellationToken ) . ConfigureAwait ( false ) ;
185+ try
186+ {
187+ await senderClient . SendMessagesAsync ( batch , cancellationToken ) . ConfigureAwait ( false ) ;
188+ }
189+ catch ( ServiceBusException ex ) when ( ex . Reason == ServiceBusFailureReason . MessagingEntityNotFound )
190+ {
191+ var messageType = envelopes . FirstOrDefault ( ) . MessageType ;
192+ if ( messageType != null )
193+ {
194+ await EnsurePathExists ( messageType , path ) ;
195+ // Resend messages after the path has been created
196+ await senderClient . SendMessagesAsync ( batch , cancellationToken ) . ConfigureAwait ( false ) ;
197+ }
198+ }
199+
154200 _logger . LogDebug ( "Batch of {BatchSize} message(s) dispatched to {Path} ({SizeInBytes} bytes)" , batch . Count , path , batch . SizeInBytes ) ;
155201 } ,
156202 shouldRetry : ( exception , attempt ) =>
0 commit comments