Skip to content

Commit 0c3a6e3

Browse files
committed
AsyncConsumerDispatcher uses ConcurrentConsumerDispatcher as shim
1 parent b3f3965 commit 0c3a6e3

File tree

9 files changed

+28
-51
lines changed

9 files changed

+28
-51
lines changed

projects/client/RabbitMQ.Client/src/client/api/IAsyncConnection.cs

Lines changed: 0 additions & 7 deletions
This file was deleted.

projects/client/RabbitMQ.Client/src/client/impl/AsyncConnection.cs

Lines changed: 0 additions & 15 deletions
This file was deleted.

projects/client/RabbitMQ.Client/src/client/impl/AsyncConsumerDispatcher.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@ public void Quiesce()
2020
public void Shutdown()
2121
{
2222
// necessary evil
23-
this.workService.StopWork().GetAwaiter().GetResult();
23+
this.workService.Stop().GetAwaiter().GetResult();
2424
}
2525

2626
public void Shutdown(IModel model)
2727
{
2828
// necessary evil
29-
this.workService.StopWork(model).GetAwaiter().GetResult();
29+
this.workService.Stop(model).GetAwaiter().GetResult();
3030
}
3131

3232
public bool IsShutdown

projects/client/RabbitMQ.Client/src/client/impl/AsyncConsumerWorkService.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
namespace RabbitMQ.Client
88
{
9-
internal class AsyncConsumerWorkService
9+
internal class AsyncConsumerWorkService : ConsumerWorkService
1010
{
1111
readonly ConcurrentDictionary<IModel, WorkPool> workPools = new ConcurrentDictionary<IModel, WorkPool>();
1212

@@ -32,7 +32,7 @@ public void Schedule<TWork>(ModelBase model, TWork work)
3232
workPool.Enqueue(work);
3333
}
3434

35-
public async Task StopWork(IModel model)
35+
public async Task Stop(IModel model)
3636
{
3737
WorkPool workPool;
3838
if (workPools.TryRemove(model, out workPool))
@@ -41,11 +41,11 @@ public async Task StopWork(IModel model)
4141
}
4242
}
4343

44-
public async Task StopWork()
44+
public async Task Stop()
4545
{
4646
foreach (var model in workPools.Keys)
4747
{
48-
await StopWork(model).ConfigureAwait(false);
48+
await Stop(model).ConfigureAwait(false);
4949
}
5050
}
5151

projects/client/RabbitMQ.Client/src/client/impl/AutorecoveringConnection.cs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -626,8 +626,8 @@ public void Init(IEndpointResolver endpoints)
626626

627627
private void Init(IFrameHandler fh)
628628
{
629-
m_delegate = m_factory.DispatchConsumersAsync ? new AsyncConnection(m_factory, false, fh, this.ClientProvidedName)
630-
: new Connection(m_factory, false, fh, this.ClientProvidedName);
629+
m_delegate = new Connection(m_factory, false,
630+
fh, this.ClientProvidedName);
631631

632632
AutorecoveringConnection self = this;
633633
EventHandler<ShutdownEventArgs> recoveryListener = (_, args) =>
@@ -831,8 +831,7 @@ protected bool RecoverConnectionDelegate()
831831
try
832832
{
833833
var fh = endpoints.SelectOne(m_factory.CreateFrameHandler);
834-
m_delegate = m_factory.DispatchConsumersAsync ? new AsyncConnection(m_factory, false, fh, this.ClientProvidedName)
835-
: new Connection(m_factory, false, fh, this.ClientProvidedName);
834+
m_delegate = new Connection(m_factory, false, fh, this.ClientProvidedName);
836835
return true;
837836
}
838837
catch (Exception e)

projects/client/RabbitMQ.Client/src/client/impl/Connection.cs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,16 @@ public Connection(IConnectionFactory factory, bool insist, IFrameHandler frameHa
120120
FrameMax = 0;
121121
m_factory = factory;
122122
m_frameHandler = frameHandler;
123-
ConsumerWorkService = new ConsumerWorkService();
123+
124+
var asyncConnectionFactory = factory as IAsyncConnectionFactory;
125+
if (asyncConnectionFactory != null && asyncConnectionFactory.DispatchConsumersAsync)
126+
{
127+
ConsumerWorkService = new AsyncConsumerWorkService();
128+
}
129+
else
130+
{
131+
ConsumerWorkService = new ConsumerWorkService();
132+
}
124133

125134
m_sessionManager = new SessionManager(this, 0);
126135
m_session0 = new MainSession(this) { Handler = NotifyReceivedCloseOk };

projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,10 @@ public ModelBase(ISession session)
9696

9797
public ModelBase(ISession session, ConsumerWorkService workService)
9898
{
99-
var asyncConnection = session.Connection as IAsyncConnection;
100-
if (asyncConnection != null)
99+
var asyncConsumerWorkService = workService as AsyncConsumerWorkService;
100+
if (asyncConsumerWorkService != null)
101101
{
102-
ConsumerDispatcher = new AsyncConsumerDispatcher(this, asyncConnection.AsyncConsumerWorkService);
102+
ConsumerDispatcher = new AsyncConsumerDispatcher(this, asyncConsumerWorkService);
103103
}
104104
else
105105
{
@@ -1168,8 +1168,9 @@ public string BasicConsume(string queue,
11681168
IDictionary<string, object> arguments,
11691169
IBasicConsumer consumer)
11701170
{
1171-
var sessionConnection = Session.Connection as IAsyncConnection;
1172-
if (sessionConnection != null)
1171+
// TODO: Replace with flag
1172+
var asyncDispatcher = ConsumerDispatcher as AsyncConsumerDispatcher;
1173+
if (asyncDispatcher != null)
11731174
{
11741175
var asyncConsumer = consumer as IAsyncBasicConsumer;
11751176
if (asyncConsumer == null)

projects/client/RabbitMQ.Client/src/client/impl/ProtocolBase.cs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,6 @@ public IConnection CreateConnection(IConnectionFactory factory,
131131
bool insist,
132132
IFrameHandler frameHandler)
133133
{
134-
var connectionFactory = factory as IAsyncConnectionFactory;
135-
if (connectionFactory != null && connectionFactory.DispatchConsumersAsync)
136-
{
137-
return new AsyncConnection(factory, insist, frameHandler, null);
138-
}
139134
return new Connection(factory, insist, frameHandler, null);
140135
}
141136

@@ -145,11 +140,6 @@ public IConnection CreateConnection(IConnectionFactory factory,
145140
IFrameHandler frameHandler,
146141
string clientProvidedName)
147142
{
148-
var connectionFactory = factory as IAsyncConnectionFactory;
149-
if (connectionFactory != null && connectionFactory.DispatchConsumersAsync)
150-
{
151-
return new AsyncConnection(factory, insist, frameHandler, clientProvidedName);
152-
}
153143
return new Connection(factory, insist, frameHandler, clientProvidedName);
154144
}
155145

projects/client/RabbitMQ.Client/src/client/impl/Work.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,15 @@ protected Work(IBasicConsumer consumer)
1212
asyncConsumer = (IAsyncBasicConsumer)consumer;
1313
}
1414

15-
public Task Execute(ModelBase model)
15+
public async Task Execute(ModelBase model)
1616
{
1717
try
1818
{
19-
return Execute(model, asyncConsumer);
19+
await Execute(model, asyncConsumer).ConfigureAwait(false);
2020
}
2121
catch (Exception)
2222
{
23-
return TaskExtensions.CompletedTask;
23+
// intentionally caught
2424
}
2525
}
2626

0 commit comments

Comments
 (0)