@@ -53,7 +53,6 @@ public class TestFloodPublishing
53
53
private readonly byte [ ] _body = new byte [ 2048 ] ;
54
54
private readonly TimeSpan _tenSeconds = TimeSpan . FromSeconds ( 10 ) ;
55
55
56
- /*
57
56
[ Test ]
58
57
public void TestUnthrottledFloodPublishing ( )
59
58
{
@@ -84,8 +83,6 @@ public void TestUnthrottledFloodPublishing()
84
83
{
85
84
now = DateTime . Now ;
86
85
shouldStop = DateTime . Now > stopTime ;
87
- TestContext.Out.WriteLine("@@@@@@@@ NUNIT Checking now {0} stopTime {1} shouldStop {2}", now, stopTime, shouldStop);
88
- Console.Error.WriteLine("@@@@@@@@ STDERR Checking now {0} stopTime {1} shouldStop {2}", now, stopTime, shouldStop);
89
86
if ( shouldStop )
90
87
{
91
88
break ;
@@ -97,16 +94,15 @@ public void TestUnthrottledFloodPublishing()
97
94
}
98
95
}
99
96
}
100
- */
101
97
102
98
[ Test ]
103
- public async Task TestMultithreadFloodPublishing ( )
99
+ public void TestMultithreadFloodPublishing ( )
104
100
{
105
- string message = "test message" ;
106
- int threadCount = 1 ;
107
- int publishCount = 100 ;
108
- int receivedCount = 0 ;
101
+ string testName = TestContext . CurrentContext . Test . FullName ;
102
+ string message = string . Format ( "Hello from test {0}" , testName ) ;
109
103
byte [ ] sendBody = Encoding . UTF8 . GetBytes ( message ) ;
104
+ int publishCount = 4096 ;
105
+ int receivedCount = 0 ;
110
106
111
107
var cf = new ConnectionFactory ( )
112
108
{
@@ -116,39 +112,37 @@ public async Task TestMultithreadFloodPublishing()
116
112
117
113
using ( IConnection c = cf . CreateConnection ( ) )
118
114
{
115
+ string queueName = null ;
119
116
using ( IModel m = c . CreateModel ( ) )
120
117
{
121
118
QueueDeclareOk q = m . QueueDeclare ( ) ;
122
- IBasicProperties bp = m . CreateBasicProperties ( ) ;
123
-
124
- var consumer = new EventingBasicConsumer ( m ) ;
125
- var tcs = new TaskCompletionSource < bool > ( ) ;
126
- consumer . Received += ( o , a ) =>
127
- {
128
- var receivedMessage = Encoding . UTF8 . GetString ( a . Body . ToArray ( ) ) ;
129
- Assert . AreEqual ( message , receivedMessage ) ;
130
-
131
- var result = Interlocked . Increment ( ref receivedCount ) ;
132
- if ( result == threadCount * publishCount )
133
- {
134
- tcs . SetResult ( true ) ;
135
- }
136
- } ;
137
-
138
- string tag = m . BasicConsume ( q . QueueName , true , consumer ) ;
139
- var cts = new CancellationTokenSource ( _tenSeconds ) ;
119
+ queueName = q . QueueName ;
120
+ }
140
121
141
- using ( var timeoutRegistration = cts . Token . Register ( ( ) => tcs . SetCanceled ( ) ) )
122
+ Task pub = Task . Run ( ( ) =>
123
+ {
124
+ using ( IModel m = c . CreateModel ( ) )
142
125
{
126
+ IBasicProperties bp = m . CreateBasicProperties ( ) ;
143
127
for ( int i = 0 ; i < publishCount ; i ++ )
144
128
{
145
- m . BasicPublish ( string . Empty , q . QueueName , bp , sendBody ) ;
129
+ m . BasicPublish ( string . Empty , queueName , bp , sendBody ) ;
146
130
}
147
- bool allMessagesReceived = await tcs . Task ;
148
- Assert . IsTrue ( allMessagesReceived ) ;
149
131
}
150
- m . BasicCancel ( tag ) ;
151
- Assert . AreEqual ( threadCount * publishCount , receivedCount ) ;
132
+ } ) ;
133
+
134
+ using ( IModel consumerModel = c . CreateModel ( ) )
135
+ {
136
+ var consumer = new EventingBasicConsumer ( consumerModel ) ;
137
+ consumer . Received += ( o , a ) =>
138
+ {
139
+ string receivedMessage = Encoding . UTF8 . GetString ( a . Body . ToArray ( ) ) ;
140
+ Assert . AreEqual ( message , receivedMessage ) ;
141
+ Interlocked . Increment ( ref receivedCount ) ;
142
+ } ;
143
+ consumerModel . BasicConsume ( queueName , true , consumer ) ;
144
+ Assert . IsTrue ( pub . Wait ( _tenSeconds ) ) ;
145
+ Assert . AreEqual ( publishCount , receivedCount ) ;
152
146
}
153
147
}
154
148
}
0 commit comments