38
38
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
39
39
//---------------------------------------------------------------------------
40
40
41
- using System ;
42
-
43
41
using NUnit . Framework ;
44
- //using System.Timers;
42
+ using RabbitMQ . Client . Events ;
43
+ using System ;
44
+ using System . Collections . Generic ;
45
+ using System . Text ;
46
+ using System . Threading ;
47
+ using System . Threading . Tasks ;
45
48
46
49
namespace RabbitMQ . Client . Unit
47
50
{
48
51
[ TestFixture ]
49
52
public class TestFloodPublishing : IntegrationFixture
50
53
{
51
- [ SetUp ]
52
- public override void Init ( )
54
+ [ Test , Category ( "LongRunning" ) ]
55
+ public void TestUnthrottledFloodPublishing ( )
53
56
{
54
57
var connFactory = new ConnectionFactory ( )
55
58
{
56
59
RequestedHeartbeat = TimeSpan . FromSeconds ( 60 ) ,
57
60
AutomaticRecoveryEnabled = false
58
61
} ;
59
- Conn = connFactory . CreateConnection ( ) ;
60
- Model = Conn . CreateModel ( ) ;
61
- }
62
+ using var Conn = connFactory . CreateConnection ( ) ;
63
+ using var Model = Conn . CreateModel ( ) ;
62
64
63
- [ Test , Category ( "LongRunning" ) ]
64
- public void TestUnthrottledFloodPublishing ( )
65
- {
66
65
Conn . ConnectionShutdown += ( _ , args ) =>
67
66
{
68
67
if ( args . Initiator != ShutdownInitiator . Application )
@@ -85,5 +84,66 @@ public void TestUnthrottledFloodPublishing()
85
84
Assert . IsTrue ( Conn . IsOpen ) ;
86
85
t . Dispose ( ) ;
87
86
}
87
+
88
+ [ Test ]
89
+ public async Task TestMultithreadFloodPublishing ( )
90
+ {
91
+ string message = "test message" ;
92
+ int threadCount = 4 ;
93
+ int publishCount = 100 ;
94
+ var receivedCount = 0 ;
95
+ byte [ ] sendBody = Encoding . UTF8 . GetBytes ( message ) ;
96
+
97
+ var cf = new ConnectionFactory ( ) ;
98
+ using ( IConnection c = cf . CreateConnection ( ) )
99
+ using ( IModel m = c . CreateModel ( ) )
100
+ {
101
+ QueueDeclareOk q = m . QueueDeclare ( ) ;
102
+ IBasicProperties bp = m . CreateBasicProperties ( ) ;
103
+
104
+ var consumer = new EventingBasicConsumer ( m ) ;
105
+ var tcs = new TaskCompletionSource < bool > ( ) ;
106
+ consumer . Received += ( o , a ) =>
107
+ {
108
+ Assert . AreEqual ( message , Encoding . UTF8 . GetString ( a . Body . ToArray ( ) ) ) ;
109
+
110
+ var result = Interlocked . Increment ( ref receivedCount ) ;
111
+ if ( result == threadCount * publishCount )
112
+ {
113
+ tcs . SetResult ( true ) ;
114
+ }
115
+ } ;
116
+
117
+ string tag = m . BasicConsume ( q . QueueName , true , consumer ) ;
118
+ var cts = new CancellationTokenSource ( TimeSpan . FromSeconds ( 5 ) ) ;
119
+
120
+
121
+
122
+ using ( var timeoutRegistration = cts . Token . Register ( ( ) => tcs . SetCanceled ( ) ) )
123
+ {
124
+ var tasks = new List < Task > ( ) ;
125
+ for ( int i = 0 ; i < threadCount ; i ++ )
126
+ {
127
+ tasks . Add ( Task . Run ( ( ) => StartFlood ( m , q . QueueName , bp , sendBody , publishCount ) ) ) ;
128
+ }
129
+ await Task . WhenAll ( tasks ) ;
130
+ await tcs . Task ;
131
+ }
132
+ m . BasicCancel ( tag ) ;
133
+
134
+
135
+
136
+ Assert . AreEqual ( threadCount * publishCount , receivedCount ) ;
137
+ }
138
+
139
+
140
+ void StartFlood ( IModel model , string queue , IBasicProperties properties , byte [ ] body , int count )
141
+ {
142
+ for ( int i = 0 ; i < count ; i ++ )
143
+ {
144
+ model . BasicPublish ( string . Empty , queue , properties , body ) ;
145
+ }
146
+ }
147
+ }
88
148
}
89
149
}
0 commit comments