@@ -45,9 +45,14 @@ namespace RabbitMQ.Client.Unit
45
45
public class TestPublisherConfirms : IntegrationFixture
46
46
{
47
47
private const string QueueName = "RabbitMQ.Client.Unit.TestPublisherConfirms" ;
48
+ private readonly byte [ ] _body ;
48
49
49
50
public TestPublisherConfirms ( ITestOutputHelper output ) : base ( output )
50
51
{
52
+ var rnd = new Random ( ) ;
53
+ _body = new byte [ 4096 ] ;
54
+ rnd . NextBytes ( _body ) ;
55
+
51
56
}
52
57
53
58
[ Fact ]
@@ -64,18 +69,22 @@ public void TestWaitForConfirmsWithTimeout()
64
69
{
65
70
TestWaitForConfirms ( 200 , ( ch ) =>
66
71
{
67
- using var cts = new CancellationTokenSource ( TimeSpan . FromSeconds ( 4 ) ) ;
68
- Assert . True ( ch . WaitForConfirmsAsync ( cts . Token ) . GetAwaiter ( ) . GetResult ( ) ) ;
72
+ using ( var cts = new CancellationTokenSource ( TimeSpan . FromSeconds ( 4 ) ) )
73
+ {
74
+ Assert . True ( ch . WaitForConfirmsAsync ( cts . Token ) . GetAwaiter ( ) . GetResult ( ) ) ;
75
+ }
69
76
} ) ;
70
77
}
71
78
72
79
[ Fact ]
73
80
public void TestWaitForConfirmsWithTimeout_AllMessagesAcked_WaitingHasTimedout_ReturnTrue ( )
74
81
{
75
- TestWaitForConfirms ( 2000 , ( ch ) =>
82
+ TestWaitForConfirms ( 10000 , ( ch ) =>
76
83
{
77
- using var cts = new CancellationTokenSource ( TimeSpan . FromMilliseconds ( 1 ) ) ;
78
- Assert . Throws < TaskCanceledException > ( ( ) => ch . WaitForConfirmsAsync ( cts . Token ) . GetAwaiter ( ) . GetResult ( ) ) ;
84
+ using ( var cts = new CancellationTokenSource ( TimeSpan . FromMilliseconds ( 1 ) ) )
85
+ {
86
+ Assert . Throws < TaskCanceledException > ( ( ) => ch . WaitForConfirmsAsync ( cts . Token ) . GetAwaiter ( ) . GetResult ( ) ) ;
87
+ }
79
88
} ) ;
80
89
}
81
90
@@ -90,67 +99,70 @@ public void TestWaitForConfirmsWithTimeout_MessageNacked_WaitingHasTimedout_Retu
90
99
. GetMethod ( "HandleAckNack" , BindingFlags . Instance | BindingFlags . NonPublic )
91
100
. Invoke ( actualModel , new object [ ] { 10UL , false , true } ) ;
92
101
93
- using var cts = new CancellationTokenSource ( TimeSpan . FromSeconds ( 4 ) ) ;
94
- Assert . False ( ch . WaitForConfirmsAsync ( cts . Token ) . GetAwaiter ( ) . GetResult ( ) ) ;
102
+ using ( var cts = new CancellationTokenSource ( TimeSpan . FromSeconds ( 4 ) ) )
103
+ {
104
+ Assert . False ( ch . WaitForConfirmsAsync ( cts . Token ) . GetAwaiter ( ) . GetResult ( ) ) ;
105
+ }
95
106
} ) ;
96
107
}
97
108
98
109
[ Fact ]
99
110
public async Task TestWaitForConfirmsWithEvents ( )
100
111
{
101
- IModel ch = _conn . CreateModel ( ) ;
102
- ch . ConfirmSelect ( ) ;
112
+ using ( IModel ch = _conn . CreateModel ( ) )
113
+ {
114
+ ch . ConfirmSelect ( ) ;
103
115
104
- ch . QueueDeclare ( QueueName ) ;
105
- int n = 200 ;
106
- // number of event handler invocations
107
- int c = 0 ;
116
+ ch . QueueDeclare ( QueueName ) ;
117
+ int n = 200 ;
118
+ // number of event handler invocations
119
+ int c = 0 ;
108
120
109
- ch . BasicAcks += ( _ , args ) =>
110
- {
111
- Interlocked . Increment ( ref c ) ;
112
- } ;
113
- try
114
- {
115
- for ( int i = 0 ; i < n ; i ++ )
121
+ ch . BasicAcks += ( _ , args ) =>
122
+ {
123
+ Interlocked . Increment ( ref c ) ;
124
+ } ;
125
+ try
116
126
{
117
- ch . BasicPublish ( "" , QueueName , _encoding . GetBytes ( "msg" ) ) ;
127
+ for ( int i = 0 ; i < n ; i ++ )
128
+ {
129
+ ch . BasicPublish ( "" , QueueName , _encoding . GetBytes ( "msg" ) ) ;
130
+ }
131
+ await ch . WaitForConfirmsAsync ( ) . ConfigureAwait ( false ) ;
132
+
133
+ // Note: number of event invocations is not guaranteed
134
+ // to be equal to N because acks can be batched,
135
+ // so we primarily care about event handlers being invoked
136
+ // in this test
137
+ Assert . True ( c > 5 ) ;
138
+ }
139
+ finally
140
+ {
141
+ ch . QueueDelete ( QueueName ) ;
118
142
}
119
- await ch . WaitForConfirmsAsync ( ) . ConfigureAwait ( false ) ;
120
-
121
- // Note: number of event invocations is not guaranteed
122
- // to be equal to N because acks can be batched,
123
- // so we primarily care about event handlers being invoked
124
- // in this test
125
- Assert . True ( c > 5 ) ;
126
- }
127
- finally
128
- {
129
- ch . QueueDelete ( QueueName ) ;
130
- ch . Close ( ) ;
131
143
}
132
144
}
133
145
134
146
protected void TestWaitForConfirms ( int numberOfMessagesToPublish , Action < IModel > fn )
135
147
{
136
- IModel ch = _conn . CreateModel ( ) ;
137
- ch . ConfirmSelect ( ) ;
138
- ch . QueueDeclare ( QueueName ) ;
139
-
140
- ReadOnlyMemory < byte > body = _encoding . GetBytes ( "msg" ) ;
141
- for ( int i = 0 ; i < numberOfMessagesToPublish ; i ++ )
148
+ using ( IModel ch = _conn . CreateModel ( ) )
142
149
{
143
- ch . BasicPublish ( "" , QueueName , body ) ;
144
- }
150
+ ch . ConfirmSelect ( ) ;
151
+ ch . QueueDeclare ( QueueName ) ;
145
152
146
- try
147
- {
148
- fn ( ch ) ;
149
- }
150
- finally
151
- {
152
- ch . QueueDelete ( QueueName ) ;
153
- ch . Close ( ) ;
153
+ for ( int i = 0 ; i < numberOfMessagesToPublish ; i ++ )
154
+ {
155
+ ch . BasicPublish ( "" , QueueName , _body ) ;
156
+ }
157
+
158
+ try
159
+ {
160
+ fn ( ch ) ;
161
+ }
162
+ finally
163
+ {
164
+ ch . QueueDelete ( QueueName ) ;
165
+ }
154
166
}
155
167
}
156
168
}
0 commit comments