Skip to content

Commit 447cfb9

Browse files
committed
Merged 19561 into default
2 parents 344c73b + e71cc1c commit 447cfb9

File tree

2 files changed

+26
-0
lines changed

2 files changed

+26
-0
lines changed

src/client/api/IModel.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -601,6 +601,15 @@ void HandleBasicReturn(ushort replyCode,
601601
IBasicProperties basicProperties,
602602
[AmqpContentBodyMapping]
603603
byte[] body);
604+
605+
///<summary>Used to send a Channel.FlowOk. Confirms that
606+
///Channel.Flow from the broker was processed.</summary>
607+
[AmqpMethodMapping(null, "channel", "flow-ok")]
608+
void _Private_ChannelFlowOk();
609+
610+
///<summary>Handle incoming Channel.Flow methods. Either
611+
///stops or resumes sending the methods that have content.</summary>
612+
void HandleChannelFlow(bool active);
604613

605614
///<summary>Handle an incoming Channel.Close. Shuts down the
606615
///session and model.</summary>

src/client/impl/ModelBase.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ public abstract class ModelBase : IFullModel
7676
private readonly object m_eventLock = new object();
7777
private BasicReturnEventHandler m_basicReturn;
7878
private CallbackExceptionEventHandler m_callbackException;
79+
80+
public ManualResetEvent m_flowControlBlock = new ManualResetEvent(true);
7981

8082
public event ModelShutdownEventHandler ModelShutdown
8183
{
@@ -225,6 +227,7 @@ public virtual void OnModelShutdown(ShutdownEventArgs reason)
225227
}
226228
}
227229
}
230+
m_flowControlBlock.Set();
228231
}
229232

230233
public virtual void OnBasicReturn(BasicReturnEventArgs args)
@@ -318,6 +321,9 @@ public bool IsOpen
318321

319322
public void ModelSend(MethodBase method, ContentHeaderBase header, byte[] body)
320323
{
324+
if (method.HasContent) {
325+
m_flowControlBlock.WaitOne();
326+
}
321327
m_session.Transmit(new Command(method, header, body));
322328
}
323329

@@ -381,6 +387,17 @@ public void HandleBasicReturn(ushort replyCode,
381387
e.Body = body;
382388
OnBasicReturn(e);
383389
}
390+
391+
public abstract void _Private_ChannelFlowOk();
392+
393+
public void HandleChannelFlow(bool active)
394+
{
395+
if (active)
396+
m_flowControlBlock.Set();
397+
else
398+
m_flowControlBlock.Reset();
399+
_Private_ChannelFlowOk();
400+
}
384401

385402
public void HandleConnectionStart(byte versionMajor,
386403
byte versionMinor,

0 commit comments

Comments
 (0)