@@ -16,23 +16,20 @@ public class ServiceBusLifeCycleEventHub : ILifeCycleEventHub
16
16
private readonly ITopicClient _topicClient ;
17
17
private readonly ILogger _logger ;
18
18
private readonly ISubscriptionClient _subscriptionClient ;
19
- private readonly ICollection < Action < LifeCycleEvent > > _subscribers =
20
- new HashSet < Action < LifeCycleEvent > > ( ) ;
21
- private readonly JsonSerializerSettings _serializerSettings =
22
- new JsonSerializerSettings
23
- {
24
- TypeNameHandling = TypeNameHandling . All ,
25
- ReferenceLoopHandling = ReferenceLoopHandling . Error ,
26
- } ;
19
+ private readonly ICollection < Action < LifeCycleEvent > > _subscribers = new HashSet < Action < LifeCycleEvent > > ( ) ;
20
+ private readonly JsonSerializerSettings _serializerSettings = new JsonSerializerSettings
21
+ {
22
+ TypeNameHandling = TypeNameHandling . All ,
23
+ ReferenceLoopHandling = ReferenceLoopHandling . Error ,
24
+ } ;
27
25
28
26
public ServiceBusLifeCycleEventHub (
29
27
string connectionString ,
30
28
string topicName ,
31
29
string subscriptionName ,
32
30
ILoggerFactory logFactory )
33
31
{
34
- _subscriptionClient = new SubscriptionClient (
35
- connectionString , topicName , subscriptionName ) ;
32
+ _subscriptionClient = new SubscriptionClient ( connectionString , topicName , subscriptionName ) ;
36
33
_topicClient = new TopicClient ( connectionString , topicName ) ;
37
34
_logger = logFactory . CreateLogger ( GetType ( ) ) ;
38
35
}
@@ -55,14 +52,12 @@ public void Subscribe(Action<LifeCycleEvent> action)
55
52
56
53
public Task Start ( )
57
54
{
58
- var sessionHandlerOptions = new SessionHandlerOptions ( ExceptionHandler )
55
+ var messageHandlerOptions = new MessageHandlerOptions ( ExceptionHandler )
59
56
{
60
- MaxConcurrentSessions = 1 ,
61
57
AutoComplete = false
62
58
} ;
63
59
64
- _subscriptionClient . RegisterSessionHandler (
65
- MessageHandler , sessionHandlerOptions ) ;
60
+ _subscriptionClient . RegisterMessageHandler ( MessageHandler , messageHandlerOptions ) ;
66
61
67
62
return Task . CompletedTask ;
68
63
}
@@ -73,10 +68,7 @@ public async Task Stop()
73
68
await _subscriptionClient . CloseAsync ( ) ;
74
69
}
75
70
76
- private async Task MessageHandler (
77
- IMessageSession messageSession ,
78
- Message message ,
79
- CancellationToken cancellationToken )
71
+ private async Task MessageHandler ( Message message , CancellationToken cancellationToken )
80
72
{
81
73
try
82
74
{
@@ -92,15 +84,13 @@ await _subscriptionClient
92
84
}
93
85
catch
94
86
{
95
- await _subscriptionClient
96
- . AbandonAsync ( message . SystemProperties . LockToken ) ;
87
+ await _subscriptionClient . AbandonAsync ( message . SystemProperties . LockToken ) ;
97
88
}
98
89
}
99
90
100
91
private Task ExceptionHandler ( ExceptionReceivedEventArgs arg )
101
92
{
102
- _logger . LogWarning (
103
- default , arg . Exception , "Error on receiving events" ) ;
93
+ _logger . LogWarning ( default , arg . Exception , "Error on receiving events" ) ;
104
94
105
95
return Task . CompletedTask ;
106
96
}
0 commit comments