@@ -81,20 +81,16 @@ public void TestChannelClosureIsObservableOnSubscription()
81
81
[ Test ]
82
82
public void TestSubscriptionAck ( )
83
83
{
84
- Model . BasicQos ( 0 , 1 , false ) ;
85
- string q = Model . QueueDeclare ( ) ;
86
- Subscription sub = new Subscription ( Model , q , false ) ;
87
-
88
- Model . BasicPublish ( "" , q , null , enc . GetBytes ( "a message" ) ) ;
89
- BasicDeliverEventArgs res = sub . Next ( ) ;
90
- Assert . IsNotNull ( res ) ;
91
- sub . Ack ( ) ;
92
- QueueDeclareOk ok = Model . QueueDeclarePassive ( q ) ;
93
- Assert . AreEqual ( 0 , ok . MessageCount ) ;
84
+ TestSubscriptionAction ( ( s ) => s . Ack ( ) ) ;
94
85
}
95
86
96
87
[ Test ]
97
88
public void TestSubscriptionNack ( )
89
+ {
90
+ TestSubscriptionAction ( ( s ) => s . Nack ( false , false ) ) ;
91
+ }
92
+
93
+ private void TestSubscriptionAction ( SubscriptionAction action )
98
94
{
99
95
Model . BasicQos ( 0 , 1 , false ) ;
100
96
string q = Model . QueueDeclare ( ) ;
@@ -103,20 +99,22 @@ public void TestSubscriptionNack()
103
99
Model . BasicPublish ( "" , q , null , enc . GetBytes ( "a message" ) ) ;
104
100
BasicDeliverEventArgs res = sub . Next ( ) ;
105
101
Assert . IsNotNull ( res ) ;
106
- sub . Nack ( false , false ) ;
102
+ action ( sub ) ;
107
103
QueueDeclareOk ok = Model . QueueDeclarePassive ( q ) ;
108
104
Assert . AreEqual ( 0 , ok . MessageCount ) ;
109
105
}
110
106
107
+ protected delegate void SubscriptionAction ( Subscription s ) ;
108
+
111
109
protected class SubscriptionDrainer
112
110
{
113
111
protected Subscription m_subscription ;
114
- protected bool m_ack ;
112
+ private SubscriptionAction PostProcess { get ; set ; }
115
113
116
- public SubscriptionDrainer ( Subscription sub , bool ack )
114
+ public SubscriptionDrainer ( Subscription sub , SubscriptionAction op )
117
115
{
118
116
m_subscription = sub ;
119
- m_ack = ack ;
117
+ PostProcess = op ;
120
118
}
121
119
122
120
public void Drain ( )
@@ -130,7 +128,7 @@ public void Drain()
130
128
if ( ea != null )
131
129
{
132
130
Assert . That ( ea , Is . TypeOf ( typeof ( BasicDeliverEventArgs ) ) ) ;
133
- PostProcess ( ) ;
131
+ this . PostProcess ( m_subscription ) ;
134
132
} else
135
133
{
136
134
break ;
@@ -146,32 +144,21 @@ public void Drain()
146
144
#pragma warning restore
147
145
148
146
}
149
-
150
- protected void PostProcess ( )
151
- {
152
- if ( m_ack )
153
- {
154
- m_subscription . Ack ( ) ;
155
- } else
156
- {
157
- m_subscription . Nack ( false , false ) ;
158
- }
159
- }
160
147
}
161
148
162
149
[ Test ]
163
150
public void TestConcurrentIterationAndAck ( )
164
151
{
165
- TestConcurrentIterationWithDrainer ( true ) ;
152
+ TestConcurrentIterationWithDrainer ( ( s ) => s . Ack ( ) ) ;
166
153
}
167
154
168
155
[ Test ]
169
156
public void TestConcurrentIterationAndNack ( )
170
157
{
171
- TestConcurrentIterationWithDrainer ( false ) ;
158
+ TestConcurrentIterationWithDrainer ( ( s ) => s . Nack ( false , false ) ) ;
172
159
}
173
160
174
- protected void TestConcurrentIterationWithDrainer ( bool ack )
161
+ protected void TestConcurrentIterationWithDrainer ( SubscriptionAction act )
175
162
{
176
163
IDictionary < string , object > args = new Dictionary < string , object >
177
164
{
@@ -185,7 +172,7 @@ protected void TestConcurrentIterationWithDrainer(bool ack)
185
172
List < Thread > ts = new List < Thread > ( ) ;
186
173
for ( int i = 0 ; i < 50 ; i ++ )
187
174
{
188
- SubscriptionDrainer drainer = new SubscriptionDrainer ( sub , ack ) ;
175
+ SubscriptionDrainer drainer = new SubscriptionDrainer ( sub , act ) ;
189
176
Thread t = new Thread ( drainer . Drain ) ;
190
177
ts . Add ( t ) ;
191
178
t . Start ( ) ;
0 commit comments