Skip to content

Commit e6fa149

Browse files
committed
AsyncConnectionDecorator renamed to AsyncConnection, AsyncConnection now inherits from Connection
AutorecoveringConnection and ProtocolBase create correct connection depending on settings
1 parent 9bc1ed5 commit e6fa149

File tree

4 files changed

+29
-104
lines changed

4 files changed

+29
-104
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
using RabbitMQ.Client.Impl;
2+
3+
namespace RabbitMQ.Client.Framing.Impl
4+
{
5+
internal class AsyncConnection : Connection, IAsyncConnection
6+
{
7+
public AsyncConnection(IConnectionFactory factory, bool insist, IFrameHandler frameHandler, string clientProvidedName = null) :
8+
base(factory, insist, frameHandler, clientProvidedName)
9+
{
10+
AsyncConsumerWorkService = new AsyncConsumerWorkService();
11+
}
12+
13+
public AsyncConsumerWorkService AsyncConsumerWorkService { get; }
14+
}
15+
}

projects/client/RabbitMQ.Client/src/client/impl/AsyncConnectionDecorator.cs

Lines changed: 0 additions & 101 deletions
This file was deleted.

projects/client/RabbitMQ.Client/src/client/impl/AutorecoveringConnection.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -626,8 +626,8 @@ public void Init(IEndpointResolver endpoints)
626626

627627
private void Init(IFrameHandler fh)
628628
{
629-
m_delegate = new Connection(m_factory, false,
630-
fh, this.ClientProvidedName);
629+
m_delegate = m_factory.DispatchConsumersAsync ? new AsyncConnection(m_factory, false, fh, this.ClientProvidedName)
630+
: new Connection(m_factory, false, fh, this.ClientProvidedName);
631631

632632
AutorecoveringConnection self = this;
633633
EventHandler<ShutdownEventArgs> recoveryListener = (_, args) =>
@@ -831,7 +831,8 @@ protected bool RecoverConnectionDelegate()
831831
try
832832
{
833833
var fh = endpoints.SelectOne(m_factory.CreateFrameHandler);
834-
m_delegate = new Connection(m_factory, false, fh, this.ClientProvidedName);
834+
m_delegate = m_factory.DispatchConsumersAsync ? new AsyncConnection(m_factory, false, fh, this.ClientProvidedName)
835+
: new Connection(m_factory, false, fh, this.ClientProvidedName);
835836
return true;
836837
}
837838
catch (Exception e)

projects/client/RabbitMQ.Client/src/client/impl/ProtocolBase.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,11 @@ public IConnection CreateConnection(IConnectionFactory factory,
131131
bool insist,
132132
IFrameHandler frameHandler)
133133
{
134+
var connectionFactory = factory as IAsyncConnectionFactory;
135+
if (connectionFactory != null && connectionFactory.DispatchConsumersAsync)
136+
{
137+
return new AsyncConnection(factory, insist, frameHandler, null);
138+
}
134139
return new Connection(factory, insist, frameHandler, null);
135140
}
136141

@@ -140,6 +145,11 @@ public IConnection CreateConnection(IConnectionFactory factory,
140145
IFrameHandler frameHandler,
141146
string clientProvidedName)
142147
{
148+
var connectionFactory = factory as IAsyncConnectionFactory;
149+
if (connectionFactory != null && connectionFactory.DispatchConsumersAsync)
150+
{
151+
return new AsyncConnection(factory, insist, frameHandler, clientProvidedName);
152+
}
143153
return new Connection(factory, insist, frameHandler, clientProvidedName);
144154
}
145155

0 commit comments

Comments
 (0)