Skip to content

Commit bfba294

Browse files
committed
Remove more synchronous code.
Part of the fix for #1472 * Remove `Close` from `ChannelBase` * Remove unreachable methods in `ChannelBase` * Remove `_Private_ChannelClose` * Convert `ConnectionTuneOk` into `ConnectionTuneOkAsync` * Making `HandleChannleClose` async
1 parent 2c54f96 commit bfba294

File tree

5 files changed

+15
-147
lines changed

5 files changed

+15
-147
lines changed

projects/RabbitMQ.Client/client/framing/Channel.cs

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
3030
//---------------------------------------------------------------------------
3131

32-
using System.Collections.Generic;
3332
using System.Threading;
3433
using System.Threading.Tasks;
3534
using RabbitMQ.Client.client.framing;
@@ -43,19 +42,16 @@ public Channel(ConnectionConfig config, ISession session) : base(config, session
4342
{
4443
}
4544

46-
public override void ConnectionTuneOk(ushort channelMax, uint frameMax, ushort heartbeat)
45+
public override Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken)
4746
{
48-
ChannelSend(new ConnectionTuneOk(channelMax, frameMax, heartbeat));
47+
var method = new ConnectionTuneOk(channelMax, frameMax, heartbeat);
48+
return ModelSendAsync(method, cancellationToken).AsTask();
4949
}
5050

51-
public override void _Private_ChannelClose(ushort replyCode, string replyText, ushort classId, ushort methodId)
51+
public override Task _Private_ChannelCloseOkAsync(CancellationToken cancellationToken)
5252
{
53-
ChannelSend(new ChannelClose(replyCode, replyText, classId, methodId));
54-
}
55-
56-
public override void _Private_ChannelCloseOk()
57-
{
58-
ChannelSend(new ChannelCloseOk());
53+
var method = new ChannelCloseOk();
54+
return ModelSendAsync(method, cancellationToken).AsTask();
5955
}
6056

6157
public override void _Private_ChannelFlowOk(bool active)

projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -201,20 +201,6 @@ await _connection.RecoverConsumersAsync(this, newChannel, recordedEntitiesSemaph
201201
_innerChannel.RunRecoveryEventHandlers(this);
202202
}
203203

204-
public void Close(ushort replyCode, string replyText, bool abort)
205-
{
206-
ThrowIfDisposed();
207-
try
208-
{
209-
_innerChannel.Close(replyCode, replyText, abort);
210-
}
211-
finally
212-
{
213-
_connection.DeleteRecordedChannel(this,
214-
channelsSemaphoreHeld: false, recordedEntitiesSemaphoreHeld: false);
215-
}
216-
}
217-
218204
public Task CloseAsync(ushort replyCode, string replyText, bool abort)
219205
{
220206
ThrowIfDisposed();

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Lines changed: 7 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -211,52 +211,6 @@ protected void TakeOver(ChannelBase other)
211211
_recoveryWrapper.Takeover(other._recoveryWrapper);
212212
}
213213

214-
public void Close(ushort replyCode, string replyText, bool abort)
215-
{
216-
var reason = new ShutdownEventArgs(ShutdownInitiator.Application, replyCode, replyText);
217-
var k = new ShutdownContinuation();
218-
ChannelShutdown += k.OnConnectionShutdown;
219-
220-
try
221-
{
222-
ConsumerDispatcher.Quiesce();
223-
224-
if (SetCloseReason(reason))
225-
{
226-
_Private_ChannelClose(reason.ReplyCode, reason.ReplyText, reason.ClassId, reason.MethodId);
227-
}
228-
229-
k.Wait(TimeSpan.FromMilliseconds(10000));
230-
231-
ConsumerDispatcher.WaitForShutdown();
232-
}
233-
catch (AlreadyClosedException)
234-
{
235-
if (!abort)
236-
{
237-
throw;
238-
}
239-
}
240-
catch (IOException)
241-
{
242-
if (!abort)
243-
{
244-
throw;
245-
}
246-
}
247-
catch (Exception)
248-
{
249-
if (!abort)
250-
{
251-
throw;
252-
}
253-
}
254-
finally
255-
{
256-
ChannelShutdown -= k.OnConnectionShutdown;
257-
}
258-
}
259-
260214
public Task CloseAsync(ushort replyCode, string replyText, bool abort)
261215
{
262216
var args = new ShutdownEventArgs(ShutdownInitiator.Application, replyCode, replyText);
@@ -448,63 +402,7 @@ private void HandleCommand(in IncomingCommand cmd)
448402
}
449403
}
450404

451-
protected void ChannelRpc<TMethod>(in TMethod method, ProtocolCommandId returnCommandId)
452-
where TMethod : struct, IOutgoingAmqpMethod
453-
{
454-
var k = new SimpleBlockingRpcContinuation();
455-
IncomingCommand reply = default;
456-
_rpcSemaphore.Wait();
457-
try
458-
{
459-
Enqueue(k);
460-
Session.Transmit(in method);
461-
k.GetReply(ContinuationTimeout, out reply);
462-
463-
if (reply.CommandId != returnCommandId)
464-
{
465-
throw new UnexpectedMethodException(reply.CommandId, returnCommandId);
466-
}
467-
}
468-
finally
469-
{
470-
reply.ReturnBuffers();
471-
_rpcSemaphore.Release();
472-
}
473-
}
474-
475-
protected TReturn ChannelRpc<TMethod, TReturn>(in TMethod method, ProtocolCommandId returnCommandId, Func<RentedMemory, TReturn> createFunc)
476-
where TMethod : struct, IOutgoingAmqpMethod
477-
{
478-
IncomingCommand reply = default;
479-
try
480-
{
481-
var k = new SimpleBlockingRpcContinuation();
482-
483-
_rpcSemaphore.Wait();
484-
try
485-
{
486-
Enqueue(k);
487-
Session.Transmit(in method);
488-
k.GetReply(ContinuationTimeout, out reply);
489-
}
490-
finally
491-
{
492-
_rpcSemaphore.Release();
493-
}
494-
495-
if (reply.CommandId != returnCommandId)
496-
{
497-
throw new UnexpectedMethodException(reply.CommandId, returnCommandId);
498-
}
499-
500-
return createFunc(reply.Method);
501-
}
502-
finally
503-
{
504-
reply.ReturnBuffers();
505-
}
506-
}
507-
405+
// TODO REMOVE rabbitmq-dotnet-client-1472
508406
[MethodImpl(MethodImplOptions.AggressiveInlining)]
509407
protected void ChannelSend<T>(in T method) where T : struct, IOutgoingAmqpMethod
510408
{
@@ -517,19 +415,6 @@ protected ValueTask ModelSendAsync<T>(in T method, CancellationToken cancellatio
517415
return Session.TransmitAsync(in method, cancellationToken);
518416
}
519417

520-
[MethodImpl(MethodImplOptions.AggressiveInlining)]
521-
protected void ChannelSend<TMethod, THeader>(in TMethod method, in THeader header, ReadOnlyMemory<byte> body)
522-
where TMethod : struct, IOutgoingAmqpMethod
523-
where THeader : IAmqpHeader
524-
{
525-
if (!_flowControlBlock.IsSet)
526-
{
527-
_flowControlBlock.Wait();
528-
}
529-
530-
Session.Transmit(in method, in header, body);
531-
}
532-
533418
[MethodImpl(MethodImplOptions.AggressiveInlining)]
534419
protected ValueTask ModelSendAsync<TMethod, THeader>(in TMethod method, in THeader header, ReadOnlyMemory<byte> body, CancellationToken cancellationToken)
535420
where TMethod : struct, IOutgoingAmqpMethod
@@ -620,7 +505,7 @@ protected virtual void Dispose(bool disposing)
620505
// dispose unmanaged resources
621506
}
622507

623-
public abstract void ConnectionTuneOk(ushort channelMax, uint frameMax, ushort heartbeat);
508+
public abstract Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken);
624509

625510
protected void HandleBasicAck(in IncomingCommand cmd)
626511
{
@@ -884,7 +769,8 @@ protected void HandleChannelClose(in IncomingCommand cmd)
884769

885770
Session.Close(CloseReason, false);
886771

887-
_Private_ChannelCloseOk();
772+
// TODO async
773+
_Private_ChannelCloseOkAsync(CancellationToken.None).EnsureCompleted();
888774
}
889775
finally
890776
{
@@ -1067,12 +953,12 @@ protected bool HandleQueueDeclareOk(in IncomingCommand cmd)
1067953
}
1068954
}
1069955

1070-
public abstract void _Private_ChannelClose(ushort replyCode, string replyText, ushort classId, ushort methodId);
1071-
1072-
public abstract void _Private_ChannelCloseOk();
956+
public abstract Task _Private_ChannelCloseOkAsync(CancellationToken cancellationToken);
1073957

958+
// TODO async
1074959
public abstract void _Private_ChannelFlowOk(bool active);
1075960

961+
// TODO async
1076962
public abstract void _Private_ConnectionCloseOk();
1077963

1078964
public abstract ValueTask BasicAckAsync(ulong deliveryTag, bool multiple);

projects/RabbitMQ.Client/client/impl/Connection.Commands.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,7 @@ await _frameHandler.SendProtocolHeaderAsync(cancellationToken)
180180
uint heartbeatInSeconds = NegotiatedMaxValue((uint)_config.HeartbeatInterval.TotalSeconds, (uint)connectionTune.m_heartbeatInSeconds);
181181
Heartbeat = TimeSpan.FromSeconds(heartbeatInSeconds);
182182

183-
// TODO cancellationToken / async
184-
_channel0.ConnectionTuneOk(channelMax, frameMax, (ushort)Heartbeat.TotalSeconds);
183+
await _channel0.ConnectionTuneOkAsync(channelMax, frameMax, (ushort)Heartbeat.TotalSeconds, cancellationToken);
185184

186185
// TODO check for cancellation
187186
MaybeStartCredentialRefresher();

projects/RabbitMQ.Client/client/impl/Connection.Receive.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ private async Task ReceiveLoopAsync(CancellationToken mainLoopCancelllationToken
111111
}
112112
}
113113

114+
// TODO async
114115
private void ProcessFrame(InboundFrame frame)
115116
{
116117
bool shallReturnPayload = true;

0 commit comments

Comments
 (0)