Skip to content

Commit d52f13f

Browse files
Refactor concurrent Subscription iteration test, join created threads
1 parent c217b27 commit d52f13f

File tree

1 file changed

+23
-21
lines changed

1 file changed

+23
-21
lines changed

projects/client/Unit/src/unit/TestMessagePatternsSubscription.cs

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public void TestSubscriptionNack()
108108
Assert.AreEqual(0, ok.MessageCount);
109109
}
110110

111-
private class SubscriptionDrainer
111+
protected class SubscriptionDrainer
112112
{
113113
protected Subscription m_subscription;
114114
protected bool m_ack;
@@ -132,23 +132,25 @@ public void Drain()
132132
#pragma warning disable 0168
133133
try
134134
{
135-
while(!shouldStop)
135+
for(int i = 0; i < 1000; i++)
136136
{
137137
BasicDeliverEventArgs ea = m_subscription.Next();
138138
if(ea != null)
139139
{
140140
Assert.That(ea, Is.TypeOf(typeof(BasicDeliverEventArgs)));
141141
PostProcess();
142142
} else
143-
{
144-
shouldStop = true;
145-
}
143+
{
144+
break;
145+
}
146146
}
147147
} catch (AlreadyClosedException ace)
148148
{
149-
shouldStop = true;
149+
// expected
150150
}
151151
#pragma warning restore
152+
153+
m_subscription.Close();
152154
}
153155

154156
protected void PostProcess()
@@ -166,24 +168,16 @@ protected void PostProcess()
166168
[Test]
167169
public void TestConcurrentIterationAndAck()
168170
{
169-
IDictionary<string, object> args = new Dictionary<string, object>
170-
{
171-
{"x-message-ttl", 5000}
172-
};
173-
string q = Model.QueueDeclare("", false, true, false, args);
174-
Subscription sub = new Subscription(Model, q, false);
175-
176-
PreparedQueue(q);
177-
for (int i = 0; i < 10; i++)
178-
{
179-
SubscriptionDrainer drainer = new SubscriptionDrainer(sub, true);
180-
Thread t = new Thread(drainer.Drain);
181-
t.Start();
182-
}
171+
TestConcurrentIterationWithDrainer(true);
183172
}
184173

185174
[Test]
186175
public void TestConcurrentIterationAndNack()
176+
{
177+
TestConcurrentIterationWithDrainer(false);
178+
}
179+
180+
protected void TestConcurrentIterationWithDrainer(bool ack)
187181
{
188182
IDictionary<string, object> args = new Dictionary<string, object>
189183
{
@@ -193,12 +187,20 @@ public void TestConcurrentIterationAndNack()
193187
Subscription sub = new Subscription(Model, q, false);
194188

195189
PreparedQueue(q);
190+
191+
List<Thread> ts = new List<Thread>();
196192
for (int i = 0; i < 10; i++)
197193
{
198-
SubscriptionDrainer drainer = new SubscriptionDrainer(sub, false);
194+
SubscriptionDrainer drainer = new SubscriptionDrainer(sub, ack);
199195
Thread t = new Thread(drainer.Drain);
196+
ts.Add(t);
200197
t.Start();
201198
}
199+
200+
foreach(Thread t in ts)
201+
{
202+
t.Join();
203+
}
202204
}
203205

204206
private void PreparedQueue(string q)

0 commit comments

Comments
 (0)