41
41
using NUnit . Framework ;
42
42
using RabbitMQ . Client . Events ;
43
43
using System ;
44
- using System . Collections . Generic ;
44
+ using System . Diagnostics ;
45
45
using System . Text ;
46
46
using System . Threading ;
47
47
using System . Threading . Tasks ;
@@ -89,7 +89,7 @@ public void TestUnthrottledFloodPublishing()
89
89
public async Task TestMultithreadFloodPublishing ( )
90
90
{
91
91
string message = "test message" ;
92
- int threadCount = 4 ;
92
+ int threadCount = 1 ;
93
93
int publishCount = 100 ;
94
94
var receivedCount = 0 ;
95
95
byte [ ] sendBody = Encoding . UTF8 . GetBytes ( message ) ;
@@ -105,7 +105,13 @@ public async Task TestMultithreadFloodPublishing()
105
105
var tcs = new TaskCompletionSource < bool > ( ) ;
106
106
consumer . Received += ( o , a ) =>
107
107
{
108
- Assert . AreEqual ( message , Encoding . UTF8 . GetString ( a . Body . ToArray ( ) ) ) ;
108
+ Console . WriteLine ( "Receiving" ) ;
109
+ var receivedMessage = Encoding . UTF8 . GetString ( a . Body . ToArray ( ) ) ;
110
+ if ( ! receivedMessage . Equals ( message ) )
111
+ {
112
+ Debugger . Break ( ) ;
113
+ }
114
+ Assert . AreEqual ( message , receivedMessage ) ;
109
115
110
116
var result = Interlocked . Increment ( ref receivedCount ) ;
111
117
if ( result == threadCount * publishCount )
@@ -115,24 +121,22 @@ public async Task TestMultithreadFloodPublishing()
115
121
} ;
116
122
117
123
string tag = m . BasicConsume ( q . QueueName , true , consumer ) ;
118
- var cts = new CancellationTokenSource ( TimeSpan . FromSeconds ( 5 ) ) ;
119
-
120
-
124
+ var cts = new CancellationTokenSource ( TimeSpan . FromSeconds ( 30 ) ) ;
121
125
122
126
using ( var timeoutRegistration = cts . Token . Register ( ( ) => tcs . SetCanceled ( ) ) )
123
127
{
124
- var tasks = new List < Task > ( ) ;
125
- for ( int i = 0 ; i < threadCount ; i ++ )
126
- {
127
- tasks . Add ( Task . Run ( ( ) => StartFlood ( m , q . QueueName , bp , sendBody , publishCount ) ) ) ;
128
- }
129
- await Task . WhenAll ( tasks ) ;
128
+ StartFlood ( m , q . QueueName , bp , sendBody , publishCount ) ;
129
+
130
+ //var tasks = new List<Task>();
131
+ //for (int i = 0; i < threadCount; i++)
132
+ //{
133
+ // tasks.Add(Task.Run(() => StartFlood(m, q.QueueName, bp, sendBody, publishCount)));
134
+ //}
135
+ //await Task.WhenAll(tasks);
130
136
await tcs . Task ;
131
137
}
132
138
m . BasicCancel ( tag ) ;
133
-
134
-
135
-
139
+ await tcs . Task ;
136
140
Assert . AreEqual ( threadCount * publishCount , receivedCount ) ;
137
141
}
138
142
0 commit comments