Skip to content

Commit 027670b

Browse files
committed
* Ensure other tests do not share channel across threads.
1 parent 11fe11b commit 027670b

File tree

3 files changed

+120
-93
lines changed

3 files changed

+120
-93
lines changed

projects/Test/Integration/TestAsyncConsumer.cs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -476,14 +476,17 @@ await _channel.BasicConsumeAsync(queue: queueName, autoAck: false,
476476

477477
Task<bool> publishTask = Task.Run(async () =>
478478
{
479-
for (int i = 0; i < messageCount; i++)
479+
using (IChannel publishChannel = await _conn.CreateChannelAsync())
480480
{
481-
byte[] _body = _encoding.GetBytes(Guid.NewGuid().ToString());
482-
await _channel.BasicPublishAsync(ExchangeName.Empty, (RoutingKey)queueName, _body);
483-
await _channel.WaitForConfirmsOrDieAsync();
484-
}
481+
for (int i = 0; i < messageCount; i++)
482+
{
483+
byte[] _body = _encoding.GetBytes(Guid.NewGuid().ToString());
484+
await publishChannel.BasicPublishAsync(ExchangeName.Empty, (RoutingKey)queueName, _body);
485+
await publishChannel.WaitForConfirmsOrDieAsync();
486+
}
485487

486-
return true;
488+
return true;
489+
}
487490
});
488491

489492
Assert.True(await publishTask);

projects/Test/Integration/TestExchangeDeclare.cs

Lines changed: 52 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -61,18 +61,21 @@ public async Task TestConcurrentExchangeDeclareAndBind()
6161
{
6262
async Task f()
6363
{
64-
try
64+
using (IChannel ch = await _conn.CreateChannelAsync())
6565
{
66-
await Task.Delay(S_Random.Next(5, 50));
67-
ExchangeName exchangeName = GenerateExchangeName();
68-
await _channel.ExchangeDeclareAsync(exchange: exchangeName, type: ExchangeType.Fanout, false, false);
69-
await _channel.ExchangeBindAsync(destination: ex_destination, source: exchangeName,
70-
routingKey: new RoutingKey("unused"));
71-
exchangeNames.Add(exchangeName);
72-
}
73-
catch (NotSupportedException e)
74-
{
75-
nse = e;
66+
try
67+
{
68+
await Task.Delay(S_Random.Next(5, 50));
69+
ExchangeName exchangeName = GenerateExchangeName();
70+
await ch.ExchangeDeclareAsync(exchange: exchangeName, type: ExchangeType.Fanout, false, false);
71+
await ch.ExchangeBindAsync(destination: ex_destination, source: exchangeName,
72+
routingKey: new RoutingKey("unused"));
73+
exchangeNames.Add(exchangeName);
74+
}
75+
catch (NotSupportedException e)
76+
{
77+
nse = e;
78+
}
7679
}
7780
}
7881
var t = Task.Run(f);
@@ -87,16 +90,19 @@ await _channel.ExchangeBindAsync(destination: ex_destination, source: exchangeNa
8790
{
8891
async Task f()
8992
{
90-
try
91-
{
92-
await Task.Delay(S_Random.Next(5, 50));
93-
await _channel.ExchangeUnbindAsync(destination: ex_destination, source: exchangeName, routingKey: (RoutingKey)"unused",
94-
noWait: false, arguments: null);
95-
await _channel.ExchangeDeleteAsync(exchange: exchangeName, ifUnused: false);
96-
}
97-
catch (NotSupportedException e)
93+
using (IChannel ch = await _conn.CreateChannelAsync())
9894
{
99-
nse = e;
95+
try
96+
{
97+
await Task.Delay(S_Random.Next(5, 50));
98+
await _channel.ExchangeUnbindAsync(destination: ex_destination, source: exchangeName, routingKey: (RoutingKey)"unused",
99+
noWait: false, arguments: null);
100+
await _channel.ExchangeDeleteAsync(exchange: exchangeName, ifUnused: false);
101+
}
102+
catch (NotSupportedException e)
103+
{
104+
nse = e;
105+
}
100106
}
101107
}
102108
var t = Task.Run(f);
@@ -118,18 +124,21 @@ public async Task TestConcurrentExchangeDeclareAndDelete()
118124
{
119125
var t = Task.Run(async () =>
120126
{
121-
try
122-
{
123-
// sleep for a random amount of time to increase the chances
124-
// of thread interleaving. MK.
125-
await Task.Delay(_rnd.Next(5, 500));
126-
ExchangeName exchangeName = GenerateExchangeName();
127-
await _channel.ExchangeDeclareAsync(exchange: exchangeName, ExchangeType.Fanout, false, false);
128-
exchangeNames.Add(exchangeName);
129-
}
130-
catch (NotSupportedException e)
127+
using (IChannel ch = await _conn.CreateChannelAsync())
131128
{
132-
nse = e;
129+
try
130+
{
131+
// sleep for a random amount of time to increase the chances
132+
// of thread interleaving. MK.
133+
await Task.Delay(_rnd.Next(5, 500));
134+
ExchangeName exchangeName = GenerateExchangeName();
135+
await ch.ExchangeDeclareAsync(exchange: exchangeName, ExchangeType.Fanout, false, false);
136+
exchangeNames.Add(exchangeName);
137+
}
138+
catch (NotSupportedException e)
139+
{
140+
nse = e;
141+
}
133142
}
134143
});
135144
tasks.Add(t);
@@ -145,16 +154,19 @@ public async Task TestConcurrentExchangeDeclareAndDelete()
145154
ExchangeName ex = exchangeName;
146155
var t = Task.Run(async () =>
147156
{
148-
try
149-
{
150-
// sleep for a random amount of time to increase the chances
151-
// of thread interleaving. MK.
152-
await Task.Delay(_rnd.Next(5, 500));
153-
await _channel.ExchangeDeleteAsync(ex);
154-
}
155-
catch (NotSupportedException e)
157+
using (IChannel ch = await _conn.CreateChannelAsync())
156158
{
157-
nse = e;
159+
try
160+
{
161+
// sleep for a random amount of time to increase the chances
162+
// of thread interleaving. MK.
163+
await Task.Delay(_rnd.Next(5, 500));
164+
await ch.ExchangeDeleteAsync(ex);
165+
}
166+
catch (NotSupportedException e)
167+
{
168+
nse = e;
169+
}
158170
}
159171
});
160172
tasks.Add(t);

projects/Test/Integration/TestQueueDeclare.cs

Lines changed: 59 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -96,25 +96,28 @@ await _channel.ExchangeDeclareAsync(exchange: exchangeName, ExchangeType.Fanout,
9696
{
9797
async Task f()
9898
{
99-
try
99+
using (IChannel ch = await _conn.CreateChannelAsync())
100100
{
101-
// sleep for a random amount of time to increase the chances
102-
// of thread interleaving. MK.
103-
await Task.Delay(S_Random.Next(5, 50));
104-
QueueName queueName = GenerateQueueName();
105-
QueueDeclareOk r = await _channel.QueueDeclareAsync(queue: queueName,
106-
durable: false, exclusive: true, autoDelete: false);
107-
Assert.Equal(queueName, r.QueueName);
108-
await _channel.QueueBindAsync(queue: queueName,
109-
exchange: exchangeName, routingKey: (RoutingKey)queueName);
110-
if (false == queueNames.TryAdd(queueName, true))
101+
try
111102
{
112-
throw new InvalidOperationException($"queue with name {queueName} already added!");
103+
// sleep for a random amount of time to increase the chances
104+
// of thread interleaving. MK.
105+
await Task.Delay(S_Random.Next(5, 50));
106+
QueueName queueName = GenerateQueueName();
107+
QueueDeclareOk r = await ch.QueueDeclareAsync(queue: queueName,
108+
durable: false, exclusive: true, autoDelete: false);
109+
Assert.Equal(queueName, r.QueueName);
110+
await ch.QueueBindAsync(queue: queueName,
111+
exchange: exchangeName, routingKey: (RoutingKey)queueName);
112+
if (false == queueNames.TryAdd(queueName, true))
113+
{
114+
throw new InvalidOperationException($"queue with name {queueName} already added!");
115+
}
116+
}
117+
catch (NotSupportedException e)
118+
{
119+
nse = e;
113120
}
114-
}
115-
catch (NotSupportedException e)
116-
{
117-
nse = e;
118121
}
119122
}
120123
var t = Task.Run(f);
@@ -131,23 +134,26 @@ await _channel.QueueBindAsync(queue: queueName,
131134
async Task f()
132135
{
133136
QueueName qname = q;
134-
try
137+
using (IChannel ch = await _conn.CreateChannelAsync())
135138
{
136-
await Task.Delay(S_Random.Next(5, 50));
139+
try
140+
{
141+
await Task.Delay(S_Random.Next(5, 50));
137142

138-
QueueDeclareOk r = await _channel.QueueDeclarePassiveAsync(qname);
139-
Assert.Equal(qname, r.QueueName);
140-
Assert.Equal((uint)0, r.MessageCount);
143+
QueueDeclareOk r = await ch.QueueDeclarePassiveAsync(qname);
144+
Assert.Equal(qname, r.QueueName);
145+
Assert.Equal((uint)0, r.MessageCount);
141146

142-
await _channel.QueueUnbindAsync(queue: qname,
143-
exchange: exchangeName, routingKey: (RoutingKey)qname, null);
147+
await ch.QueueUnbindAsync(queue: qname,
148+
exchange: exchangeName, routingKey: (RoutingKey)qname, null);
144149

145-
uint deletedMessageCount = await _channel.QueueDeleteAsync(qname, false, false);
146-
Assert.Equal((uint)0, deletedMessageCount);
147-
}
148-
catch (NotSupportedException e)
149-
{
150-
nse = e;
150+
uint deletedMessageCount = await ch.QueueDeleteAsync(qname, false, false);
151+
Assert.Equal((uint)0, deletedMessageCount);
152+
}
153+
catch (NotSupportedException e)
154+
{
155+
nse = e;
156+
}
151157
}
152158
}
153159
var t = Task.Run(f);
@@ -169,18 +175,21 @@ public async Task TestConcurrentQueueDeclare()
169175
{
170176
var t = Task.Run(async () =>
171177
{
172-
try
178+
using (IChannel ch = await _conn.CreateChannelAsync())
173179
{
174-
// sleep for a random amount of time to increase the chances
175-
// of thread interleaving. MK.
176-
await Task.Delay(S_Random.Next(5, 50));
177-
QueueName q = GenerateQueueName();
178-
await _channel.QueueDeclareAsync(q, false, false, false);
179-
queueNames.Add(q);
180-
}
181-
catch (NotSupportedException e)
182-
{
183-
nse = e;
180+
try
181+
{
182+
// sleep for a random amount of time to increase the chances
183+
// of thread interleaving. MK.
184+
await Task.Delay(S_Random.Next(5, 50));
185+
QueueName q = GenerateQueueName();
186+
await ch.QueueDeclareAsync(q, false, false, false);
187+
queueNames.Add(q);
188+
}
189+
catch (NotSupportedException e)
190+
{
191+
nse = e;
192+
}
184193
}
185194
});
186195
tasks.Add(t);
@@ -196,14 +205,17 @@ public async Task TestConcurrentQueueDeclare()
196205
QueueName q = queueName;
197206
var t = Task.Run(async () =>
198207
{
199-
try
200-
{
201-
await Task.Delay(S_Random.Next(5, 50));
202-
await _channel.QueueDeleteAsync(queueName);
203-
}
204-
catch (NotSupportedException e)
208+
using (IChannel ch = await _conn.CreateChannelAsync())
205209
{
206-
nse = e;
210+
try
211+
{
212+
await Task.Delay(S_Random.Next(5, 50));
213+
await ch.QueueDeleteAsync(queueName);
214+
}
215+
catch (NotSupportedException e)
216+
{
217+
nse = e;
218+
}
207219
}
208220
});
209221
tasks.Add(t);

0 commit comments

Comments
 (0)