43
43
using System ;
44
44
using System . Text ;
45
45
using System . Threading ;
46
+ using System . Collections . Generic ;
47
+ using System . Timers ;
46
48
47
49
using RabbitMQ . Client . Exceptions ;
48
50
using RabbitMQ . Client . Events ;
@@ -110,16 +112,23 @@ private class SubscriptionDrainer
110
112
{
111
113
protected Subscription m_subscription ;
112
114
protected bool m_ack ;
115
+ protected System . Timers . Timer m_timer ;
116
+ protected volatile bool shouldStop = false ;
113
117
114
118
public SubscriptionDrainer ( Subscription sub , bool ack )
115
119
{
116
120
m_subscription = sub ;
117
121
m_ack = ack ;
122
+ m_timer = new System . Timers . Timer ( 5000 ) ;
123
+ m_timer . Elapsed += delegate ( object o , ElapsedEventArgs args )
124
+ {
125
+ shouldStop = true ;
126
+ } ;
127
+ m_timer . Enabled = true ;
118
128
}
119
129
120
130
public void Drain ( )
121
131
{
122
- bool shouldStop = false ;
123
132
#pragma warning disable 0168
124
133
try
125
134
{
@@ -157,7 +166,11 @@ protected void PostProcess()
157
166
[ Test ]
158
167
public void TestConcurrentIterationAndAck ( )
159
168
{
160
- string q = Model . QueueDeclare ( ) ;
169
+ IDictionary < string , object > args = new Dictionary < string , object >
170
+ {
171
+ { "x-message-ttl" , 5000 }
172
+ } ;
173
+ string q = Model . QueueDeclare ( "" , false , true , false , args ) ;
161
174
Subscription sub = new Subscription ( Model , q , false ) ;
162
175
163
176
PreparedQueue ( q ) ;
@@ -172,7 +185,11 @@ public void TestConcurrentIterationAndAck()
172
185
[ Test ]
173
186
public void TestConcurrentIterationAndNack ( )
174
187
{
175
- string q = Model . QueueDeclare ( ) ;
188
+ IDictionary < string , object > args = new Dictionary < string , object >
189
+ {
190
+ { "x-message-ttl" , 5000 }
191
+ } ;
192
+ string q = Model . QueueDeclare ( "" , false , true , false , args ) ;
176
193
Subscription sub = new Subscription ( Model , q , false ) ;
177
194
178
195
PreparedQueue ( q ) ;
0 commit comments