Skip to content

Commit db75f78

Browse files
author
Emile Joubert
committed
Add channel flow event
1 parent 43fc3dd commit db75f78

File tree

4 files changed

+130
-1
lines changed

4 files changed

+130
-1
lines changed

projects/client/ApigenBootstrap/RabbitMQ.Client.ApigenBootstrap.csproj

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@
8787
<Compile Include="..\RabbitMQ.Client\src\client\events\ModelShutdownEventHandler.cs">
8888
<Link>src\client\events\ModelShutdownEventHandler.cs</Link>
8989
</Compile>
90+
<Compile Include="..\RabbitMQ.Client\src\client\events\FlowControlEventHandler.cs">
91+
<Link>src\client\events\FlowControlEventHandler.cs</Link>
92+
</Compile>
9093
<Compile Include="properties\AssemblyInfo.cs" />
9194
</ItemGroup>
9295

projects/client/RabbitMQ.Client/src/client/api/IModel.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ public interface IModel: IDisposable
9393
///</remarks>
9494
event CallbackExceptionEventHandler CallbackException;
9595

96+
event FlowControlEventHandler FlowControl;
97+
9698
///<summary>Signalled when an unexpected message is delivered
9799
///
98100
/// Under certain circumstances it is possible for a channel to receive a
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 1.1.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial
8+
// Technologies LLC., and Rabbit Technologies Ltd.
9+
//
10+
// Licensed under the Apache License, Version 2.0 (the "License");
11+
// you may not use this file except in compliance with the License.
12+
// You may obtain a copy of the License at
13+
//
14+
// http://www.apache.org/licenses/LICENSE-2.0
15+
//
16+
// Unless required by applicable law or agreed to in writing, software
17+
// distributed under the License is distributed on an "AS IS" BASIS,
18+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19+
// See the License for the specific language governing permissions and
20+
// limitations under the License.
21+
//---------------------------------------------------------------------------
22+
//
23+
// The MPL v1.1:
24+
//
25+
//---------------------------------------------------------------------------
26+
// The contents of this file are subject to the Mozilla Public License
27+
// Version 1.1 (the "License"); you may not use this file except in
28+
// compliance with the License. You may obtain a copy of the License at
29+
// http://www.rabbitmq.com/mpl.html
30+
//
31+
// Software distributed under the License is distributed on an "AS IS"
32+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
33+
// License for the specific language governing rights and limitations
34+
// under the License.
35+
//
36+
// The Original Code is The RabbitMQ .NET Client.
37+
//
38+
// The Initial Developers of the Original Code are LShift Ltd,
39+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
40+
//
41+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
42+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
43+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
44+
// Technologies LLC, and Rabbit Technologies Ltd.
45+
//
46+
// Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
47+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
48+
// Copyright (C) 2007-2010 Cohesive Financial Technologies
49+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
50+
// (C) 2007-2010 Rabbit Technologies Ltd.
51+
//
52+
// All Rights Reserved.
53+
//
54+
// Contributor(s): ______________________________________.
55+
//
56+
//---------------------------------------------------------------------------
57+
58+
using System;
59+
60+
namespace RabbitMQ.Client.Events
61+
{
62+
63+
///<summary>Delegate used to process flow control events.</summary>
64+
public delegate void FlowControlEventHandler(IModel sender, FlowControlEventArgs args);
65+
66+
///<summary>Event relating to flow control</summary>
67+
public class FlowControlEventArgs : EventArgs
68+
{
69+
private readonly bool m_active;
70+
71+
public FlowControlEventArgs(bool active)
72+
{
73+
m_active = active;
74+
}
75+
76+
///<summary>Access the flow control setting</summary>
77+
public bool Active { get { return m_active; } }
78+
}
79+
}

projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ public abstract class ModelBase : IFullModel
8282
private readonly object m_eventLock = new object();
8383
private BasicReturnEventHandler m_basicReturn;
8484
private CallbackExceptionEventHandler m_callbackException;
85+
private FlowControlEventHandler m_flowControl;
8586

8687
public ManualResetEvent m_flowControlBlock = new ManualResetEvent(true);
8788

@@ -148,6 +149,24 @@ public event CallbackExceptionEventHandler CallbackException
148149
}
149150
}
150151

152+
public event FlowControlEventHandler FlowControl
153+
{
154+
add
155+
{
156+
lock (m_eventLock)
157+
{
158+
m_flowControl += value;
159+
}
160+
}
161+
remove
162+
{
163+
lock (m_eventLock)
164+
{
165+
m_flowControl -= value;
166+
}
167+
}
168+
}
169+
151170
public IBasicConsumer DefaultConsumer { get; set; }
152171

153172
public ISession m_session;
@@ -278,7 +297,32 @@ public virtual void OnCallbackException(CallbackExceptionEventArgs args)
278297
}
279298
}
280299
}
281-
300+
301+
public virtual void OnFlowControl(FlowControlEventArgs args)
302+
{
303+
FlowControlEventHandler handler;
304+
lock (m_eventLock)
305+
{
306+
handler = m_flowControl;
307+
}
308+
if (handler != null)
309+
{
310+
foreach (FlowControlEventHandler h in handler.GetInvocationList())
311+
{
312+
try
313+
{
314+
h(this, args);
315+
}
316+
catch (Exception e)
317+
{
318+
Console.WriteLine("exception while running flow control event handler");
319+
Console.WriteLine(e + e.StackTrace);
320+
321+
}
322+
}
323+
}
324+
}
325+
282326
public void Enqueue(IRpcContinuation k)
283327
{
284328
bool ok = false;
@@ -411,6 +455,7 @@ public void HandleChannelFlow(bool active)
411455
else
412456
m_flowControlBlock.Reset();
413457
_Private_ChannelFlowOk(active);
458+
OnFlowControl(new FlowControlEventArgs(active));
414459
}
415460

416461
public void HandleConnectionStart(byte versionMajor,

0 commit comments

Comments
 (0)