@@ -28,32 +28,39 @@ async Task<TResponse> IOutputChannel.SendMessageAsync<TResponse>(IMessage messag
2828 ArgumentNullException . ThrowIfNull ( responseReader ) ;
2929
3030 var tokenSource = CombineTokens ( [ LifecycleToken , token ] ) ;
31- do
31+ try
3232 {
33- var leader = Leader ?? throw new QuorumUnreachableException ( ) ;
34- try
35- {
36- return await ( leader . IsRemote
37- ? leader . SendMessageAsync ( message , responseReader , true , tokenSource . Token )
38- : TryReceiveMessage ( leader , message , messageHandlers , responseReader , tokenSource . Token ) )
39- . ConfigureAwait ( false ) ;
40- }
41- catch ( MemberUnavailableException e )
42- {
43- Logger . FailedToRouteMessage ( message . Name , e ) ;
44- }
45- catch ( UnexpectedStatusCodeException e ) when ( e . StatusCode is HttpStatusCode . BadRequest )
46- {
47- // keep in sync with ReceiveMessage behavior
48- Logger . FailedToRouteMessage ( message . Name , e ) ;
49- }
50- catch ( OperationCanceledException e ) when ( tokenSource . Token == e . CancellationToken )
33+ do
5134 {
52- break ;
53- }
54- } while ( tokenSource . Token is { IsCancellationRequested : false } ) ;
55-
56- throw new OperationCanceledException ( tokenSource . CancellationOrigin ) ;
35+ var leader = Leader ?? throw new QuorumUnreachableException ( ) ;
36+ try
37+ {
38+ return await ( leader . IsRemote
39+ ? leader . SendMessageAsync ( message , responseReader , true , tokenSource . Token )
40+ : TryReceiveMessage ( leader , message , messageHandlers , responseReader , tokenSource . Token ) )
41+ . ConfigureAwait ( false ) ;
42+ }
43+ catch ( MemberUnavailableException e )
44+ {
45+ Logger . FailedToRouteMessage ( message . Name , e ) ;
46+ }
47+ catch ( UnexpectedStatusCodeException e ) when ( e . StatusCode is HttpStatusCode . BadRequest )
48+ {
49+ // keep in sync with ReceiveMessage behavior
50+ Logger . FailedToRouteMessage ( message . Name , e ) ;
51+ }
52+ catch ( OperationCanceledException e ) when ( tokenSource . Token == e . CancellationToken )
53+ {
54+ break ;
55+ }
56+ } while ( tokenSource is { Token . IsCancellationRequested : false } ) ;
57+
58+ throw new OperationCanceledException ( tokenSource . CancellationOrigin ) ;
59+ }
60+ finally
61+ {
62+ await tokenSource . DisposeAsync ( ) . ConfigureAwait ( false ) ;
63+ }
5764
5865 static async Task < TResponse > TryReceiveMessage ( RaftClusterMember sender , IMessage message , IEnumerable < IInputChannel > handlers , MessageReader < TResponse > responseReader , CancellationToken token )
5966 {
@@ -66,33 +73,40 @@ async Task<TResponse> IOutputChannel.SendMessageAsync<TResponse>(IMessage messag
6673 {
6774 ArgumentNullException . ThrowIfNull ( message ) ;
6875
69- using var tokenSource = CombineTokens ( [ token , LifecycleToken ] ) ;
70- do
76+ var tokenSource = CombineTokens ( [ token , LifecycleToken ] ) ;
77+ try
7178 {
72- var leader = Leader ?? throw new QuorumUnreachableException ( ) ;
73- try
74- {
75- return await ( leader . IsRemote
76- ? leader . SendMessageAsync < TResponse > ( message , true , tokenSource . Token )
77- : TryReceiveMessage ( leader , message , messageHandlers , tokenSource . Token ) )
78- . ConfigureAwait ( false ) ;
79- }
80- catch ( MemberUnavailableException e )
81- {
82- Logger . FailedToRouteMessage ( message . Name , e ) ;
83- }
84- catch ( UnexpectedStatusCodeException e ) when ( e . StatusCode is HttpStatusCode . BadRequest )
85- {
86- // keep in sync with ReceiveMessage behavior
87- Logger . FailedToRouteMessage ( message . Name , e ) ;
88- }
89- catch ( OperationCanceledException e ) when ( tokenSource . Token == e . CancellationToken )
79+ do
9080 {
91- throw new OperationCanceledException ( e . Message , e , tokenSource . CancellationOrigin ) ;
92- }
93- } while ( tokenSource . Token is { IsCancellationRequested : false } ) ;
94-
95- throw new OperationCanceledException ( tokenSource . CancellationOrigin ) ;
81+ var leader = Leader ?? throw new QuorumUnreachableException ( ) ;
82+ try
83+ {
84+ return await ( leader . IsRemote
85+ ? leader . SendMessageAsync < TResponse > ( message , true , tokenSource . Token )
86+ : TryReceiveMessage ( leader , message , messageHandlers , tokenSource . Token ) )
87+ . ConfigureAwait ( false ) ;
88+ }
89+ catch ( MemberUnavailableException e )
90+ {
91+ Logger . FailedToRouteMessage ( message . Name , e ) ;
92+ }
93+ catch ( UnexpectedStatusCodeException e ) when ( e . StatusCode is HttpStatusCode . BadRequest )
94+ {
95+ // keep in sync with ReceiveMessage behavior
96+ Logger . FailedToRouteMessage ( message . Name , e ) ;
97+ }
98+ catch ( OperationCanceledException e ) when ( tokenSource . Token == e . CancellationToken )
99+ {
100+ throw new OperationCanceledException ( e . Message , e , tokenSource . CancellationOrigin ) ;
101+ }
102+ } while ( tokenSource is { Token . IsCancellationRequested : false } ) ;
103+
104+ throw new OperationCanceledException ( tokenSource . CancellationOrigin ) ;
105+ }
106+ finally
107+ {
108+ await tokenSource . DisposeAsync ( ) . ConfigureAwait ( false ) ;
109+ }
96110
97111 static async Task < TResponse > TryReceiveMessage ( RaftClusterMember sender , IMessage message , IEnumerable < IInputChannel > handlers , CancellationToken token )
98112 {
@@ -107,44 +121,51 @@ async Task IOutputChannel.SendSignalAsync(IMessage message, CancellationToken to
107121
108122 // keep the same message between retries for correct identification of duplicate messages
109123 var signal = new CustomMessage ( LocalMemberId , message , true ) { RespectLeadership = true } ;
110- using var tokenSource = CombineTokens ( [ token , LifecycleToken ] ) ;
111- do
124+ var tokenSource = CombineTokens ( [ token , LifecycleToken ] ) ;
125+ try
112126 {
113- var leader = Leader ?? throw new QuorumUnreachableException ( ) ;
114- try
115- {
116- var response = leader . IsRemote
117- ? leader . SendSignalAsync ( signal , tokenSource . Token )
118- : ( messageHandlers . TryReceiveSignal ( leader , signal . Message , null , tokenSource . Token ) ??
119- throw new UnexpectedStatusCodeException ( new NotImplementedException ( ) ) ) ;
120- await response . ConfigureAwait ( false ) ;
121- return ;
122- }
123- catch ( MemberUnavailableException e )
124- {
125- Logger . FailedToRouteMessage ( message . Name , e ) ;
126- }
127- catch ( UnexpectedStatusCodeException e ) when ( e . StatusCode is HttpStatusCode . ServiceUnavailable )
128- {
129- // keep in sync with ReceiveMessage behavior
130- Logger . FailedToRouteMessage ( message . Name , e ) ;
131- }
132- catch ( OperationCanceledException e ) when ( tokenSource . Token == e . CancellationToken )
127+ do
133128 {
134- throw new OperationCanceledException ( e . Message , e , tokenSource . CancellationOrigin ) ;
135- }
136- } while ( tokenSource . Token is { IsCancellationRequested : false } ) ;
137-
138- throw new OperationCanceledException ( tokenSource . CancellationOrigin ) ;
129+ var leader = Leader ?? throw new QuorumUnreachableException ( ) ;
130+ try
131+ {
132+ var response = leader . IsRemote
133+ ? leader . SendSignalAsync ( signal , tokenSource . Token )
134+ : ( messageHandlers . TryReceiveSignal ( leader , signal . Message , null , tokenSource . Token ) ??
135+ throw new UnexpectedStatusCodeException ( new NotImplementedException ( ) ) ) ;
136+ await response . ConfigureAwait ( false ) ;
137+ return ;
138+ }
139+ catch ( MemberUnavailableException e )
140+ {
141+ Logger . FailedToRouteMessage ( message . Name , e ) ;
142+ }
143+ catch ( UnexpectedStatusCodeException e ) when ( e . StatusCode is HttpStatusCode . ServiceUnavailable )
144+ {
145+ // keep in sync with ReceiveMessage behavior
146+ Logger . FailedToRouteMessage ( message . Name , e ) ;
147+ }
148+ catch ( OperationCanceledException e ) when ( tokenSource . Token == e . CancellationToken )
149+ {
150+ throw new OperationCanceledException ( e . Message , e , tokenSource . CancellationOrigin ) ;
151+ }
152+ } while ( tokenSource is { Token . IsCancellationRequested : false } ) ;
153+
154+ throw new OperationCanceledException ( tokenSource . CancellationOrigin ) ;
155+ }
156+ finally
157+ {
158+ await tokenSource . DisposeAsync ( ) . ConfigureAwait ( false ) ;
159+ }
139160 }
140161
141162 IOutputChannel IMessageBus . LeaderRouter => this ;
142163
143164 private static async Task ReceiveOneWayMessageFastAckAsync ( ISubscriber sender , IMessage message , IEnumerable < IInputChannel > handlers , HttpResponse response , CancellationToken token )
144165 {
145- IInputChannel ? handler = handlers . FirstOrDefault ( message . IsSignalSupported ) ;
146- if ( handler is null )
166+ if ( handlers . FirstOrDefault ( message . IsSignalSupported ) is not { } handler )
147167 return ;
168+
148169 IBufferedMessage buffered = message . Length is { } length and < FileMessage . MinSize
149170 ? new InMemoryMessage ( message . Name , message . Type , Convert . ToInt32 ( length ) )
150171 : new FileMessage ( message . Name , message . Type ) ;
@@ -156,8 +177,14 @@ private static async Task ReceiveOneWayMessageFastAckAsync(ISubscriber sender, I
156177 // OnCompleted callback
157178 async Task ReceiveSignal ( )
158179 {
159- using ( buffered )
180+ try
181+ {
160182 await handler . ReceiveSignal ( sender , buffered , null , token ) . ConfigureAwait ( false ) ;
183+ }
184+ finally
185+ {
186+ await buffered . DisposeAsync ( ) . ConfigureAwait ( false ) ;
187+ }
161188 }
162189 }
163190
@@ -168,7 +195,8 @@ private static Task ReceiveOneWayMessageAsync(ISubscriber sender, CustomMessage
168195 // drop duplicated request
169196 if ( response . HttpContext . Features . Get < DuplicateRequestDetector > ( ) ? . IsDuplicated ( request ) ?? false )
170197 return Task . CompletedTask ;
171- Task ? task = reliable ?
198+
199+ var task = reliable ?
172200 handlers . TryReceiveSignal ( sender , request . Message , response . HttpContext , token ) :
173201 ReceiveOneWayMessageFastAckAsync ( sender , request . Message , handlers , response , token ) ;
174202 if ( task is null )
@@ -180,42 +208,45 @@ private static Task ReceiveOneWayMessageAsync(ISubscriber sender, CustomMessage
180208 return task ;
181209 }
182210
183- private static async Task ReceiveMessageAsync ( ISubscriber sender , CustomMessage request , IEnumerable < IInputChannel > handlers , HttpResponse response , CancellationToken token )
211+ private static async Task ReceiveMessageAsync ( ISubscriber sender , CustomMessage request , IEnumerable < IInputChannel > handlers ,
212+ HttpResponse response , CancellationToken token )
184213 {
185214 response . StatusCode = StatusCodes . Status200OK ;
186- var task = handlers . TryReceiveMessage ( sender , request . Message , response . HttpContext , token ) ;
187- if ( task is null )
188- response . StatusCode = StatusCodes . Status501NotImplemented ;
189- else
215+ if ( handlers . TryReceiveMessage ( sender , request . Message , response . HttpContext , token ) is { } task )
216+ {
190217 await CustomMessage . SaveResponseAsync ( response , await task . ConfigureAwait ( false ) , token ) . ConfigureAwait ( false ) ;
218+ }
219+ else
220+ {
221+ response . StatusCode = StatusCodes . Status501NotImplemented ;
222+ }
191223 }
192224
193225 private Task ReceiveMessageAsync ( CustomMessage message , HttpResponse response , CancellationToken token )
194226 {
195- var sender = TryGetMember ( message . Sender ) ;
196-
197- Task result ;
198- if ( sender is null )
227+ Task task ;
228+ if ( TryGetMember ( message . Sender ) is not { } sender )
199229 {
230+ sender = null ;
200231 response . StatusCode = StatusCodes . Status404NotFound ;
201- result = Task . CompletedTask ;
232+ task = Task . CompletedTask ;
202233 }
203234 else if ( ! message . RespectLeadership )
204235 {
205- result = ReceiveMessageAsync ( sender , message , response , token ) ;
236+ task = ReceiveMessageAsync ( sender , message , response , token ) ;
206237 }
207238 else if ( LeadershipToken is { IsCancellationRequested : false } lt )
208239 {
209- result = ReceiveMessageAsync ( sender , message , response , lt , token ) ;
240+ task = ReceiveMessageAsync ( sender , message , response , lt , token ) ;
210241 }
211242 else
212243 {
213244 response . StatusCode = StatusCodes . Status503ServiceUnavailable ;
214- result = Task . CompletedTask ;
245+ task = Task . CompletedTask ;
215246 }
216247
217248 sender ? . Touch ( ) ;
218- return result ;
249+ return task ;
219250 }
220251
221252 private async Task ReceiveMessageAsync ( RaftClusterMember sender ,
@@ -258,17 +289,15 @@ static Task BadRequest(HttpResponse response)
258289
259290 private async Task VoteAsync ( RequestVoteMessage request , HttpResponse response , CancellationToken token )
260291 {
261- var sender = TryGetMember ( request . Sender ) ;
262-
263292 Result < bool > result ;
264- if ( sender is null )
293+ if ( TryGetMember ( request . Sender ) is { } sender )
265294 {
266- result = new ( ) { Term = Term } ;
295+ sender . Touch ( ) ;
296+ result = await VoteAsync ( request . Sender , request . ConsensusTerm , request . LastLogIndex , request . LastLogTerm , token ) . ConfigureAwait ( false ) ;
267297 }
268298 else
269299 {
270- sender . Touch ( ) ;
271- result = await VoteAsync ( request . Sender , request . ConsensusTerm , request . LastLogIndex , request . LastLogTerm , token ) . ConfigureAwait ( false ) ;
300+ result = new ( ) { Term = Term } ;
272301 }
273302
274303 await RequestVoteMessage . SaveResponseAsync ( response , result , token ) . ConfigureAwait ( false ) ;
@@ -277,22 +306,21 @@ private async Task VoteAsync(RequestVoteMessage request, HttpResponse response,
277306 private async Task PreVoteAsync ( PreVoteMessage request , HttpResponse response , CancellationToken token )
278307 {
279308 TryGetMember ( request . Sender ) ? . Touch ( ) ;
280- await PreVoteMessage . SaveResponseAsync ( response , await PreVoteAsync ( request . Sender , request . ConsensusTerm + 1L , request . LastLogIndex , request . LastLogTerm , token ) . ConfigureAwait ( false ) , token ) . ConfigureAwait ( false ) ;
309+ await PreVoteMessage . SaveResponseAsync ( response ,
310+ await PreVoteAsync ( request . Sender , request . ConsensusTerm + 1L , request . LastLogIndex , request . LastLogTerm , token ) . ConfigureAwait ( false ) ,
311+ token ) . ConfigureAwait ( false ) ;
281312 }
282313
283314 private async Task ResignAsync ( ResignMessage request , HttpResponse response , CancellationToken token )
284315 {
285- var sender = TryGetMember ( request . Sender ) ;
316+ TryGetMember ( request . Sender ) ? . Touch ( ) ;
286317 await ResignMessage . SaveResponseAsync ( response , await ResignAsync ( token ) . ConfigureAwait ( false ) , token ) . ConfigureAwait ( false ) ;
287- sender ? . Touch ( ) ;
288318 }
289319
290320 private Task GetMetadataAsync ( MetadataMessage request , HttpResponse response , CancellationToken token )
291321 {
292- var sender = TryGetMember ( request . Sender ) ;
293- var result = MetadataMessage . SaveResponseAsync ( response , metadata , token ) ;
294- sender ? . Touch ( ) ;
295- return result ;
322+ TryGetMember ( request . Sender ) ? . Touch ( ) ;
323+ return MetadataMessage . SaveResponseAsync ( response , metadata , token ) ;
296324 }
297325
298326 private async Task AppendEntriesAsync ( HttpRequest request , HttpResponse response , CancellationToken token )
@@ -306,14 +334,19 @@ private async Task AppendEntriesAsync(HttpRequest request, HttpResponse response
306334 return ;
307335 }
308336
309- using var configuration = new ReceivedClusterConfiguration ( ( int ) message . ConfigurationLength ) { Fingerprint = message . ConfigurationFingerprint } ;
310- await configurationReader ( configuration . Content , token ) . ConfigureAwait ( false ) ;
311-
312- await using ( entries . ConfigureAwait ( false ) )
337+ var configuration = new ReceivedClusterConfiguration ( ( int ) message . ConfigurationLength ) { Fingerprint = message . ConfigurationFingerprint } ;
338+ try
313339 {
314- var result = await AppendEntriesAsync ( message . Sender , message . ConsensusTerm , entries , message . PrevLogIndex , message . PrevLogTerm , message . CommitIndex , configuration , message . ApplyConfiguration , token ) . ConfigureAwait ( false ) ;
340+ await configurationReader ( configuration . Content , token ) . ConfigureAwait ( false ) ;
341+ var result = await AppendEntriesAsync ( message . Sender , message . ConsensusTerm , entries , message . PrevLogIndex , message . PrevLogTerm ,
342+ message . CommitIndex , configuration , message . ApplyConfiguration , token ) . ConfigureAwait ( false ) ;
315343 await AppendEntriesMessage . SaveResponseAsync ( response , result , token ) . ConfigureAwait ( false ) ;
316344 }
345+ finally
346+ {
347+ await entries . DisposeAsync ( ) . ConfigureAwait ( false ) ;
348+ configuration . Dispose ( ) ;
349+ }
317350 }
318351
319352 private async Task InstallSnapshotAsync ( InstallSnapshotMessage message , HttpResponse response , CancellationToken token )
0 commit comments