Skip to content

Commit 8b0de17

Browse files
author
Simon MacMullen
committed
Merging bug22814 (race in handling (synchronous) basic.recover, .net client)
2 parents a3df973 + 92091c5 commit 8b0de17

File tree

5 files changed

+170
-1
lines changed

5 files changed

+170
-1
lines changed

projects/client/ApigenBootstrap/RabbitMQ.Client.ApigenBootstrap.csproj

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@
7272
<Compile Include="..\RabbitMQ.Client\src\client\api\ShutdownInitiator.cs">
7373
<Link>src\client\api\ShutdownInitiator.cs</Link>
7474
</Compile>
75+
<Compile Include="..\RabbitMQ.Client\src\client\events\BasicRecoverOkEventHandler.cs">
76+
<Link>src\client\events\BasicRecoverOkEventHandler.cs</Link>
77+
</Compile>
7578
<Compile Include="..\RabbitMQ.Client\src\client\events\BasicReturnEventArgs.cs">
7679
<Link>src\client\events\BasicReturnEventArgs.cs</Link>
7780
</Compile>

projects/client/RabbitMQ.Client/src/client/api/IModel.cs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,16 @@ public interface IModel: IDisposable
9393
///</remarks>
9494
event CallbackExceptionEventHandler CallbackException;
9595

96+
///<summary>All messages received before this fires that haven't been
97+
///ack'ed will be redelivered. All messages received afterwards won't
98+
///be.
99+
///
100+
///Handlers for this event are invoked by the connection thread.
101+
///It is sometimes useful to allow that thread to know that a recover-ok
102+
///has been received, rather than the thread that invoked BasicRecover().
103+
///</summary>
104+
event BasicRecoverOkEventHandler BasicRecoverOk;
105+
96106
///<summary>Signalled when an unexpected message is delivered
97107
///
98108
/// Under certain circumstances it is possible for a channel to receive a
@@ -364,6 +374,7 @@ void BasicReject(ulong deliveryTag,
364374
bool requeue);
365375

366376
///<summary>(Spec method)</summary>
377+
[AmqpMethodDoNotImplement(null)]
367378
void BasicRecover(bool requeue);
368379

369380
///<summary>(Spec method)</summary>
@@ -622,6 +633,15 @@ void HandleBasicGetOk(ulong deliveryTag,
622633
///</remarks>
623634
void HandleBasicGetEmpty();
624635

636+
///<summary>Handle incoming Basic.RecoverOk methods
637+
///received in reply to Basic.Recover.
638+
///</summary>
639+
void HandleBasicRecoverOk();
640+
641+
[AmqpForceOneWay]
642+
[AmqpMethodMapping(null, "basic", "recover")]
643+
void _Private_BasicRecover(bool requeue);
644+
625645
///<summary>Handle incoming Basic.Deliver methods. Dispatches
626646
///to waiting consumers.</summary>
627647
void HandleBasicDeliver(string consumerTag,
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 1.1.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial
8+
// Technologies LLC., and Rabbit Technologies Ltd.
9+
//
10+
// Licensed under the Apache License, Version 2.0 (the "License");
11+
// you may not use this file except in compliance with the License.
12+
// You may obtain a copy of the License at
13+
//
14+
// http://www.apache.org/licenses/LICENSE-2.0
15+
//
16+
// Unless required by applicable law or agreed to in writing, software
17+
// distributed under the License is distributed on an "AS IS" BASIS,
18+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19+
// See the License for the specific language governing permissions and
20+
// limitations under the License.
21+
//---------------------------------------------------------------------------
22+
//
23+
// The MPL v1.1:
24+
//
25+
//---------------------------------------------------------------------------
26+
// The contents of this file are subject to the Mozilla Public License
27+
// Version 1.1 (the "License"); you may not use this file except in
28+
// compliance with the License. You may obtain a copy of the License at
29+
// http://www.rabbitmq.com/mpl.html
30+
//
31+
// Software distributed under the License is distributed on an "AS IS"
32+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
33+
// License for the specific language governing rights and limitations
34+
// under the License.
35+
//
36+
// The Original Code is The RabbitMQ .NET Client.
37+
//
38+
// The Initial Developers of the Original Code are LShift Ltd,
39+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
40+
//
41+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
42+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
43+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
44+
// Technologies LLC, and Rabbit Technologies Ltd.
45+
//
46+
// Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
47+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
48+
// Copyright (C) 2007-2010 Cohesive Financial Technologies
49+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
50+
// (C) 2007-2010 Rabbit Technologies Ltd.
51+
//
52+
// All Rights Reserved.
53+
//
54+
// Contributor(s): ______________________________________.
55+
//
56+
//---------------------------------------------------------------------------
57+
using System;
58+
59+
namespace RabbitMQ.Client.Events
60+
{
61+
///<summary>Delegate used to process Basic.RecoverOk events.</summary>
62+
public delegate void BasicRecoverOkEventHandler(IModel model, EventArgs args);
63+
}
64+

projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ public abstract class ModelBase : IFullModel
8282
private readonly object m_eventLock = new object();
8383
private BasicReturnEventHandler m_basicReturn;
8484
private CallbackExceptionEventHandler m_callbackException;
85+
private BasicRecoverOkEventHandler m_basicRecoverOk;
8586

8687
public ManualResetEvent m_flowControlBlock = new ManualResetEvent(true);
8788

@@ -148,6 +149,24 @@ public event CallbackExceptionEventHandler CallbackException
148149
}
149150
}
150151

152+
public event BasicRecoverOkEventHandler BasicRecoverOk
153+
{
154+
add
155+
{
156+
lock (m_eventLock)
157+
{
158+
m_basicRecoverOk += value;
159+
}
160+
}
161+
remove
162+
{
163+
lock (m_eventLock)
164+
{
165+
m_basicRecoverOk -= value;
166+
}
167+
}
168+
}
169+
151170
public IBasicConsumer DefaultConsumer { get; set; }
152171

153172
public ISession m_session;
@@ -278,6 +297,31 @@ public virtual void OnCallbackException(CallbackExceptionEventArgs args)
278297
}
279298
}
280299
}
300+
301+
public virtual void OnBasicRecoverOk(EventArgs args)
302+
{
303+
BasicRecoverOkEventHandler handler;
304+
lock (m_eventLock)
305+
{
306+
handler = m_basicRecoverOk;
307+
}
308+
if (handler != null)
309+
{
310+
foreach (BasicRecoverOkEventHandler h in handler.GetInvocationList())
311+
{
312+
try
313+
{
314+
h(this, args);
315+
}
316+
catch (Exception e)
317+
{
318+
CallbackExceptionEventArgs exnArgs = new CallbackExceptionEventArgs(e);
319+
exnArgs.Detail["context"] = "OnBasicRecoverOk";
320+
OnCallbackException(exnArgs);
321+
}
322+
}
323+
}
324+
}
281325

282326
public void Enqueue(IRpcContinuation k)
283327
{
@@ -734,6 +778,28 @@ public BasicGetResult BasicGet(string queue,
734778
return k.m_result;
735779
}
736780

781+
public abstract void _Private_BasicRecover(bool requeue);
782+
783+
public void BasicRecover(bool requeue)
784+
{
785+
SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation();
786+
787+
Enqueue(k);
788+
789+
try
790+
{
791+
_Private_BasicRecover(requeue);
792+
}
793+
catch (AlreadyClosedException)
794+
{
795+
// Ignored, since the continuation will be told about
796+
// the closure via an OperationInterruptedException because
797+
// of the shutdown event propagation.
798+
}
799+
800+
k.GetReply();
801+
}
802+
737803
public abstract void BasicQos(uint prefetchSize,
738804
ushort prefetchCount,
739805
bool global);
@@ -804,7 +870,6 @@ public abstract void BasicAck(ulong deliveryTag,
804870
public abstract void BasicReject(ulong deliveryTag,
805871
bool requeue);
806872

807-
public abstract void BasicRecover(bool requeue);
808873
public abstract void BasicRecoverAsync(bool requeue);
809874

810875
public abstract void TxSelect();
@@ -904,6 +969,13 @@ public void HandleBasicGetEmpty()
904969
k.HandleCommand(null); // release the continuation.
905970
}
906971

972+
public void HandleBasicRecoverOk()
973+
{
974+
SimpleBlockingRpcContinuation k = (SimpleBlockingRpcContinuation)m_continuationQueue.Next();
975+
OnBasicRecoverOk(new EventArgs());
976+
k.HandleCommand(null);
977+
}
978+
907979
public abstract ConnectionTuneDetails ConnectionStartOk(IDictionary clientProperties,
908980
string mechanism,
909981
byte[] response,

projects/client/Unit/src/unit/TestRecoverAfterCancel.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,5 +115,15 @@ public void TestRecoverAfterCancel_()
115115
Assert.IsFalse(Event.Redelivered);
116116
Assert.IsTrue(Event2.Redelivered);
117117
}
118+
119+
[Test]
120+
public void TestRecoverCallback()
121+
{
122+
int callbackCount = 0;
123+
Channel.BasicRecoverOk += (sender, eventArgs) => callbackCount++;
124+
Channel.BasicRecover(false);
125+
Assert.AreEqual(1, callbackCount);
126+
}
127+
118128
}
119129
}

0 commit comments

Comments
 (0)