@@ -59,74 +59,79 @@ public void TestUnthrottledFloodPublishing()
59
59
RequestedHeartbeat = TimeSpan . FromSeconds ( 60 ) ,
60
60
AutomaticRecoveryEnabled = false
61
61
} ;
62
- using var Conn = connFactory . CreateConnection ( ) ;
63
- using var Model = Conn . CreateModel ( ) ;
64
62
65
- Conn . ConnectionShutdown += ( _ , args ) =>
63
+ using ( var conn = connFactory . CreateConnection ( ) )
66
64
{
67
- if ( args . Initiator != ShutdownInitiator . Application )
65
+ using ( var model = conn . CreateModel ( ) )
68
66
{
69
- Assert . Fail ( "Unexpected connection shutdown!" ) ;
70
- }
71
- } ;
67
+ conn . ConnectionShutdown += ( _ , args ) =>
68
+ {
69
+ if ( args . Initiator != ShutdownInitiator . Application )
70
+ {
71
+ Assert . Fail ( "Unexpected connection shutdown!" ) ;
72
+ }
73
+ } ;
72
74
73
- bool elapsed = false ;
74
- using ( Timer t = new Timer ( ( _obj ) => { elapsed = true ; } , null , 1000 * 185 , - 1 ) )
75
- {
76
- while ( ! elapsed )
77
- {
78
- Model . BasicPublish ( "" , "" , null , new byte [ 2048 ] ) ;
75
+ bool elapsed = false ;
76
+ using ( Timer t = new Timer ( ( _obj ) => { elapsed = true ; } , null , 1000 * 185 , - 1 ) )
77
+ {
78
+ while ( ! elapsed )
79
+ {
80
+ model . BasicPublish ( "" , "" , null , new byte [ 2048 ] ) ;
81
+ }
82
+ Assert . IsTrue ( conn . IsOpen ) ;
83
+ }
79
84
}
80
- Assert . IsTrue ( Conn . IsOpen ) ;
81
85
}
82
86
}
83
87
84
- // TODO rabbitmq/rabbitmq-dotnet-client#802 FIX THIS
88
+ // rabbitmq/rabbitmq-dotnet-client#802 FIX THIS TODO
85
89
[ Test , Category ( "LongRunning" ) ]
86
90
public async Task TestMultithreadFloodPublishing ( )
87
91
{
88
92
string message = "test message" ;
89
93
int threadCount = 1 ;
90
94
int publishCount = 100 ;
91
- var receivedCount = 0 ;
95
+ int receivedCount = 0 ;
92
96
byte [ ] sendBody = Encoding . UTF8 . GetBytes ( message ) ;
93
97
94
98
var cf = new ConnectionFactory ( ) ;
95
99
using ( IConnection c = cf . CreateConnection ( ) )
96
- using ( IModel m = c . CreateModel ( ) )
97
100
{
98
- QueueDeclareOk q = m . QueueDeclare ( ) ;
99
- IBasicProperties bp = m . CreateBasicProperties ( ) ;
100
-
101
- var consumer = new EventingBasicConsumer ( m ) ;
102
- var tcs = new TaskCompletionSource < bool > ( ) ;
103
- consumer . Received += ( o , a ) =>
101
+ using ( IModel m = c . CreateModel ( ) )
104
102
{
105
- var receivedMessage = Encoding . UTF8 . GetString ( a . Body . ToArray ( ) ) ;
106
- Assert . AreEqual ( message , receivedMessage ) ;
103
+ QueueDeclareOk q = m . QueueDeclare ( ) ;
104
+ IBasicProperties bp = m . CreateBasicProperties ( ) ;
107
105
108
- var result = Interlocked . Increment ( ref receivedCount ) ;
109
- if ( result == threadCount * publishCount )
106
+ var consumer = new EventingBasicConsumer ( m ) ;
107
+ var tcs = new TaskCompletionSource < bool > ( ) ;
108
+ consumer . Received += ( o , a ) =>
110
109
{
111
- tcs . SetResult ( true ) ;
112
- }
113
- } ;
110
+ var receivedMessage = Encoding . UTF8 . GetString ( a . Body . ToArray ( ) ) ;
111
+ Assert . AreEqual ( message , receivedMessage ) ;
114
112
115
- string tag = m . BasicConsume ( q . QueueName , true , consumer ) ;
116
- var cts = new CancellationTokenSource ( TimeSpan . FromSeconds ( 30 ) ) ;
113
+ var result = Interlocked . Increment ( ref receivedCount ) ;
114
+ if ( result == threadCount * publishCount )
115
+ {
116
+ tcs . SetResult ( true ) ;
117
+ }
118
+ } ;
117
119
118
- using ( var timeoutRegistration = cts . Token . Register ( ( ) => tcs . SetCanceled ( ) ) )
119
- {
120
- for ( int i = 0 ; i < publishCount ; i ++ )
120
+ string tag = m . BasicConsume ( q . QueueName , true , consumer ) ;
121
+ var cts = new CancellationTokenSource ( TimeSpan . FromSeconds ( 30 ) ) ;
122
+
123
+ using ( var timeoutRegistration = cts . Token . Register ( ( ) => tcs . SetCanceled ( ) ) )
121
124
{
122
- m . BasicPublish ( string . Empty , q . QueueName , bp , sendBody ) ;
125
+ for ( int i = 0 ; i < publishCount ; i ++ )
126
+ {
127
+ m . BasicPublish ( string . Empty , q . QueueName , bp , sendBody ) ;
128
+ }
129
+ bool allMessagesReceived = await tcs . Task ;
130
+ Assert . IsTrue ( allMessagesReceived ) ;
123
131
}
124
-
125
- await tcs . Task ;
132
+ m . BasicCancel ( tag ) ;
133
+ Assert . AreEqual ( threadCount * publishCount , receivedCount ) ;
126
134
}
127
- m . BasicCancel ( tag ) ;
128
- await tcs . Task ;
129
- Assert . AreEqual ( threadCount * publishCount , receivedCount ) ;
130
135
}
131
136
}
132
137
}
0 commit comments