1- namespace Microsoft . eShopOnContainers . BuildingBlocks . EventBusServiceBus
1+ using Azure . Messaging . ServiceBus ;
2+ using Azure . Messaging . ServiceBus . Administration ;
3+ using Autofac ;
4+ using Microsoft . eShopOnContainers . BuildingBlocks . EventBus ;
5+ using Microsoft . eShopOnContainers . BuildingBlocks . EventBus . Abstractions ;
6+ using Microsoft . eShopOnContainers . BuildingBlocks . EventBus . Events ;
7+ using Microsoft . Extensions . Logging ;
8+ using System ;
9+ using System . Text . Json ;
10+ using System . Threading . Tasks ;
11+ using System . Text ;
12+
13+ namespace Microsoft . eShopOnContainers . BuildingBlocks . EventBusServiceBus
214{
3- using Autofac ;
4- using Microsoft . Azure . ServiceBus ;
5- using Microsoft . eShopOnContainers . BuildingBlocks . EventBus ;
6- using Microsoft . eShopOnContainers . BuildingBlocks . EventBus . Abstractions ;
7- using Microsoft . eShopOnContainers . BuildingBlocks . EventBus . Events ;
8- using Microsoft . Extensions . Logging ;
9- using System ;
10- using System . Text ;
11- using System . Text . Json ;
12- using System . Threading . Tasks ;
13-
14- public class EventBusServiceBus : IEventBus
15+ public class EventBusServiceBus : IEventBus , IDisposable
1516 {
1617 private readonly IServiceBusPersisterConnection _serviceBusPersisterConnection ;
1718 private readonly ILogger < EventBusServiceBus > _logger ;
1819 private readonly IEventBusSubscriptionsManager _subsManager ;
1920 private readonly ILifetimeScope _autofac ;
21+ private readonly string _topicName = "eshop_event_bus" ;
22+ private readonly string _subscriptionName ;
23+ private ServiceBusSender _sender ;
24+ private ServiceBusProcessor _processor ;
2025 private readonly string AUTOFAC_SCOPE_NAME = "eshop_event_bus" ;
2126 private const string INTEGRATION_EVENT_SUFFIX = "IntegrationEvent" ;
2227
2328 public EventBusServiceBus ( IServiceBusPersisterConnection serviceBusPersisterConnection ,
24- ILogger < EventBusServiceBus > logger , IEventBusSubscriptionsManager subsManager , ILifetimeScope autofac )
29+ ILogger < EventBusServiceBus > logger , IEventBusSubscriptionsManager subsManager , ILifetimeScope autofac , string subscriptionClientName )
2530 {
2631 _serviceBusPersisterConnection = serviceBusPersisterConnection ;
2732 _logger = logger ?? throw new ArgumentNullException ( nameof ( logger ) ) ;
2833 _subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager ( ) ;
2934 _autofac = autofac ;
35+ _subscriptionName = subscriptionClientName ;
36+ _sender = _serviceBusPersisterConnection . TopicClient . CreateSender ( _topicName ) ;
37+ ServiceBusProcessorOptions options = new ServiceBusProcessorOptions { MaxConcurrentCalls = 10 , AutoCompleteMessages = false } ;
38+ _processor = _serviceBusPersisterConnection . TopicClient . CreateProcessor ( _topicName , _subscriptionName , options ) ;
3039
3140 RemoveDefaultRule ( ) ;
32- RegisterSubscriptionClientMessageHandler ( ) ;
41+ RegisterSubscriptionClientMessageHandlerAsync ( ) . GetAwaiter ( ) . GetResult ( ) ;
3342 }
3443
3544 public void Publish ( IntegrationEvent @event )
@@ -38,14 +47,14 @@ public void Publish(IntegrationEvent @event)
3847 var jsonMessage = JsonSerializer . Serialize ( @event , @event . GetType ( ) ) ;
3948 var body = Encoding . UTF8 . GetBytes ( jsonMessage ) ;
4049
41- var message = new Message
50+ var message = new ServiceBusMessage
4251 {
4352 MessageId = Guid . NewGuid ( ) . ToString ( ) ,
44- Body = body ,
45- Label = eventName ,
53+ Body = new BinaryData ( body ) ,
54+ Subject = eventName ,
4655 } ;
4756
48- _serviceBusPersisterConnection . TopicClient . SendAsync ( message )
57+ _sender . SendMessageAsync ( message )
4958 . GetAwaiter ( )
5059 . GetResult ( ) ;
5160 }
@@ -69,9 +78,9 @@ public void Subscribe<T, TH>()
6978 {
7079 try
7180 {
72- _serviceBusPersisterConnection . SubscriptionClient . AddRuleAsync ( new RuleDescription
81+ _serviceBusPersisterConnection . AdministrationClient . CreateRuleAsync ( _topicName , _subscriptionName , new CreateRuleOptions
7382 {
74- Filter = new CorrelationFilter { Label = eventName } ,
83+ Filter = new CorrelationRuleFilter ( ) { Subject = eventName } ,
7584 Name = eventName
7685 } ) . GetAwaiter ( ) . GetResult ( ) ;
7786 }
@@ -95,12 +104,12 @@ public void Unsubscribe<T, TH>()
95104 try
96105 {
97106 _serviceBusPersisterConnection
98- . SubscriptionClient
99- . RemoveRuleAsync ( eventName )
107+ . AdministrationClient
108+ . DeleteRuleAsync ( _topicName , _subscriptionName , eventName )
100109 . GetAwaiter ( )
101110 . GetResult ( ) ;
102111 }
103- catch ( MessagingEntityNotFoundException )
112+ catch ( ServiceBusException ex ) when ( ex . Reason == ServiceBusFailureReason . MessagingEntityNotFound )
104113 {
105114 _logger . LogWarning ( "The messaging entity {eventName} Could not be found." , eventName ) ;
106115 }
@@ -118,32 +127,35 @@ public void UnsubscribeDynamic<TH>(string eventName)
118127 _subsManager . RemoveDynamicSubscription < TH > ( eventName ) ;
119128 }
120129
121- public void Dispose ( )
122- {
123- _subsManager . Clear ( ) ;
124- }
125-
126- private void RegisterSubscriptionClientMessageHandler ( )
130+ private async Task RegisterSubscriptionClientMessageHandlerAsync ( )
127131 {
128- _serviceBusPersisterConnection . SubscriptionClient . RegisterMessageHandler (
129- async ( message , token ) =>
132+ _processor . ProcessMessageAsync +=
133+ async ( args ) =>
130134 {
131- var eventName = $ "{ message . Label } { INTEGRATION_EVENT_SUFFIX } ";
132- var messageData = Encoding . UTF8 . GetString ( message . Body ) ;
135+ var eventName = $ "{ args . Message . Subject } { INTEGRATION_EVENT_SUFFIX } ";
136+ string messageData = args . Message . Body . ToString ( ) ;
133137
134138 // Complete the message so that it is not received again.
135139 if ( await ProcessEvent ( eventName , messageData ) )
136140 {
137- await _serviceBusPersisterConnection . SubscriptionClient . CompleteAsync ( message . SystemProperties . LockToken ) ;
141+ await args . CompleteMessageAsync ( args . Message ) ;
138142 }
139- } ,
140- new MessageHandlerOptions ( ExceptionReceivedHandler ) { MaxConcurrentCalls = 10 , AutoComplete = false } ) ;
143+ } ;
144+
145+ _processor . ProcessErrorAsync += ErrorHandler ;
146+ await _processor . StartProcessingAsync ( ) ;
147+ }
148+
149+ public void Dispose ( )
150+ {
151+ _subsManager . Clear ( ) ;
152+ _processor . CloseAsync ( ) . GetAwaiter ( ) . GetResult ( ) ;
141153 }
142154
143- private Task ExceptionReceivedHandler ( ExceptionReceivedEventArgs exceptionReceivedEventArgs )
155+ private Task ErrorHandler ( ProcessErrorEventArgs args )
144156 {
145- var ex = exceptionReceivedEventArgs . Exception ;
146- var context = exceptionReceivedEventArgs . ExceptionReceivedContext ;
157+ var ex = args . Exception ;
158+ var context = args . ErrorSource ;
147159
148160 _logger . LogError ( ex , "ERROR handling message: {ExceptionMessage} - Context: {@ExceptionContext}" , ex . Message , context ) ;
149161
@@ -189,14 +201,14 @@ private void RemoveDefaultRule()
189201 try
190202 {
191203 _serviceBusPersisterConnection
192- . SubscriptionClient
193- . RemoveRuleAsync ( RuleDescription . DefaultRuleName )
204+ . AdministrationClient
205+ . DeleteRuleAsync ( _topicName , _subscriptionName , RuleProperties . DefaultRuleName )
194206 . GetAwaiter ( )
195207 . GetResult ( ) ;
196208 }
197- catch ( MessagingEntityNotFoundException )
209+ catch ( ServiceBusException ex ) when ( ex . Reason == ServiceBusFailureReason . MessagingEntityNotFound )
198210 {
199- _logger . LogWarning ( "The messaging entity {DefaultRuleName} Could not be found." , RuleDescription . DefaultRuleName ) ;
211+ _logger . LogWarning ( "The messaging entity {DefaultRuleName} Could not be found." , RuleProperties . DefaultRuleName ) ;
200212 }
201213 }
202214 }
0 commit comments