50
50
using RabbitMQ . Client . Events ;
51
51
using RabbitMQ . Client . MessagePatterns ;
52
52
53
- namespace RabbitMQ . Client . Unit {
53
+ namespace RabbitMQ . Client . Unit
54
+ {
54
55
[ TestFixture ]
55
- public class TestMessagePatternsSubscription : IntegrationFixture {
56
+ public class TestMessagePatternsSubscription : IntegrationFixture
57
+ {
56
58
UTF8Encoding enc = new UTF8Encoding ( ) ;
57
59
58
60
[ Test ]
@@ -90,6 +92,44 @@ public void TestSubscriptionNack()
90
92
TestSubscriptionAction ( ( s ) => s . Nack ( false , false ) ) ;
91
93
}
92
94
95
+ [ Test ]
96
+ public void TestConcurrentIterationAndAck ( )
97
+ {
98
+ TestConcurrentIterationWithDrainer ( ( s ) => s . Ack ( ) ) ;
99
+ }
100
+
101
+ [ Test ]
102
+ public void TestConcurrentIterationAndNack ( )
103
+ {
104
+ TestConcurrentIterationWithDrainer ( ( s ) => s . Nack ( false , false ) ) ;
105
+ }
106
+
107
+ protected void TestConcurrentIterationWithDrainer ( SubscriptionAction act )
108
+ {
109
+ IDictionary < string , object > args = new Dictionary < string , object >
110
+ {
111
+ { "x-message-ttl" , 5000 }
112
+ } ;
113
+ string q = Model . QueueDeclare ( "" , false , true , false , args ) ;
114
+ Subscription sub = new Subscription ( Model , q , false ) ;
115
+
116
+ PreparedQueue ( q ) ;
117
+
118
+ List < Thread > ts = new List < Thread > ( ) ;
119
+ for ( int i = 0 ; i < 50 ; i ++ )
120
+ {
121
+ SubscriptionDrainer drainer = new SubscriptionDrainer ( sub , act ) ;
122
+ Thread t = new Thread ( drainer . Drain ) ;
123
+ ts . Add ( t ) ;
124
+ t . Start ( ) ;
125
+ }
126
+
127
+ foreach ( Thread t in ts )
128
+ {
129
+ t . Join ( ) ;
130
+ }
131
+ }
132
+
93
133
private void TestSubscriptionAction ( SubscriptionAction action )
94
134
{
95
135
Model . BasicQos ( 0 , 1 , false ) ;
@@ -129,15 +169,18 @@ public void Drain()
129
169
{
130
170
Assert . That ( ea , Is . TypeOf ( typeof ( BasicDeliverEventArgs ) ) ) ;
131
171
this . PostProcess ( m_subscription ) ;
132
- } else
172
+ }
173
+ else
133
174
{
134
175
break ;
135
176
}
136
177
}
137
- } catch ( AlreadyClosedException ace )
178
+ }
179
+ catch ( AlreadyClosedException ace )
138
180
{
139
181
// expected
140
- } finally
182
+ }
183
+ finally
141
184
{
142
185
m_subscription . Close ( ) ;
143
186
}
@@ -146,44 +189,6 @@ public void Drain()
146
189
}
147
190
}
148
191
149
- [ Test ]
150
- public void TestConcurrentIterationAndAck ( )
151
- {
152
- TestConcurrentIterationWithDrainer ( ( s ) => s . Ack ( ) ) ;
153
- }
154
-
155
- [ Test ]
156
- public void TestConcurrentIterationAndNack ( )
157
- {
158
- TestConcurrentIterationWithDrainer ( ( s ) => s . Nack ( false , false ) ) ;
159
- }
160
-
161
- protected void TestConcurrentIterationWithDrainer ( SubscriptionAction act )
162
- {
163
- IDictionary < string , object > args = new Dictionary < string , object >
164
- {
165
- { "x-message-ttl" , 5000 }
166
- } ;
167
- string q = Model . QueueDeclare ( "" , false , true , false , args ) ;
168
- Subscription sub = new Subscription ( Model , q , false ) ;
169
-
170
- PreparedQueue ( q ) ;
171
-
172
- List < Thread > ts = new List < Thread > ( ) ;
173
- for ( int i = 0 ; i < 50 ; i ++ )
174
- {
175
- SubscriptionDrainer drainer = new SubscriptionDrainer ( sub , act ) ;
176
- Thread t = new Thread ( drainer . Drain ) ;
177
- ts . Add ( t ) ;
178
- t . Start ( ) ;
179
- }
180
-
181
- foreach ( Thread t in ts )
182
- {
183
- t . Join ( ) ;
184
- }
185
- }
186
-
187
192
private void PreparedQueue ( string q )
188
193
{
189
194
for ( int i = 0 ; i < 1024 ; i ++ )
0 commit comments