20
20
import java .util .ArrayList ;
21
21
import java .util .Arrays ;
22
22
import java .util .List ;
23
- import java .util .concurrent .CopyOnWriteArrayList ;
23
+ import java .util .concurrent .BlockingQueue ;
24
24
import java .util .concurrent .CountDownLatch ;
25
+ import java .util .concurrent .LinkedBlockingQueue ;
25
26
import java .util .concurrent .TimeUnit ;
26
27
27
28
import org .apache .activemq .broker .BrokerService ;
38
39
import org .springframework .messaging .MessageHandler ;
39
40
import org .springframework .messaging .MessagingException ;
40
41
import org .springframework .messaging .StubMessageChannel ;
42
+ import org .springframework .messaging .simp .SimpMessageHeaderAccessor ;
41
43
import org .springframework .messaging .simp .broker .BrokerAvailabilityEvent ;
42
44
import org .springframework .messaging .simp .SimpMessageType ;
43
45
import org .springframework .messaging .support .ExecutorSubscribableChannel ;
@@ -64,9 +66,9 @@ public class StompBrokerRelayMessageHandlerIntegrationTests {
64
66
65
67
private ExecutorSubscribableChannel responseChannel ;
66
68
67
- private ExpectationMatchingMessageHandler responseHandler ;
69
+ private TestMessageHandler responseHandler ;
68
70
69
- private ExpectationMatchingEventPublisher eventPublisher ;
71
+ private TestEventPublisher eventPublisher ;
70
72
71
73
private int port ;
72
74
@@ -77,9 +79,9 @@ public void setUp() throws Exception {
77
79
this .port = SocketUtils .findAvailableTcpPort (61613 );
78
80
79
81
this .responseChannel = new ExecutorSubscribableChannel ();
80
- this .responseHandler = new ExpectationMatchingMessageHandler ();
82
+ this .responseHandler = new TestMessageHandler ();
81
83
this .responseChannel .subscribe (this .responseHandler );
82
- this .eventPublisher = new ExpectationMatchingEventPublisher ();
84
+ this .eventPublisher = new TestEventPublisher ();
83
85
84
86
startActiveMqBroker ();
85
87
createAndStartRelay ();
@@ -103,9 +105,8 @@ private void createAndStartRelay() throws InterruptedException {
103
105
this .relay .setSystemHeartbeatReceiveInterval (0 );
104
106
this .relay .setSystemHeartbeatSendInterval (0 );
105
107
106
- this .eventPublisher .expectAvailabilityStatusChanges (true );
107
108
this .relay .start ();
108
- this .eventPublisher .awaitAndAssert ( );
109
+ this .eventPublisher .expectBrokerAvailabilityEvent ( true );
109
110
}
110
111
111
112
@ After
@@ -138,94 +139,90 @@ public void run() {
138
139
@ Test
139
140
public void publishSubscribe () throws Exception {
140
141
142
+ logger .debug ("Starting test publishSubscribe()" );
143
+
141
144
String sess1 = "sess1" ;
142
145
String sess2 = "sess2" ;
143
146
MessageExchange conn1 = MessageExchangeBuilder .connect (sess1 ).build ();
144
147
MessageExchange conn2 = MessageExchangeBuilder .connect (sess2 ).build ();
145
- this .responseHandler .expect (conn1 , conn2 );
146
148
147
149
this .relay .handleMessage (conn1 .message );
148
150
this .relay .handleMessage (conn2 .message );
149
- this .responseHandler .awaitAndAssert ( );
151
+ this .responseHandler .expectMessages ( conn1 , conn2 );
150
152
151
153
String subs1 = "subs1" ;
152
154
String destination = "/topic/test" ;
153
155
154
156
MessageExchange subscribe = MessageExchangeBuilder .subscribeWithReceipt (sess1 , subs1 , destination , "r1" ).build ();
155
- this .responseHandler .expect (subscribe );
156
-
157
157
this .relay .handleMessage (subscribe .message );
158
- this .responseHandler .awaitAndAssert ( );
158
+ this .responseHandler .expectMessages ( subscribe );
159
159
160
160
MessageExchange send = MessageExchangeBuilder .send (destination , "foo" ).andExpectMessage (sess1 , subs1 ).build ();
161
- this .responseHandler .expect (send );
162
-
163
161
this .relay .handleMessage (send .message );
164
- this .responseHandler .awaitAndAssert ( );
162
+ this .responseHandler .expectMessages ( send );
165
163
}
166
164
167
165
@ Test (expected =MessageDeliveryException .class )
168
166
public void messageDeliverExceptionIfSystemSessionForwardFails () throws Exception {
167
+
168
+ logger .debug ("Starting test messageDeliveryExceptionIfSystemSessionForwardFails()" );
169
+
169
170
stopActiveMqBrokerAndAwait ();
171
+ this .eventPublisher .expectBrokerAvailabilityEvent (false );
172
+
170
173
StompHeaderAccessor headers = StompHeaderAccessor .create (StompCommand .SEND );
171
174
this .relay .handleMessage (MessageBuilder .withPayload ("test" .getBytes ()).setHeaders (headers ).build ());
172
175
}
173
176
174
177
@ Test
175
178
public void brokerBecomingUnvailableTriggersErrorFrame () throws Exception {
176
179
180
+ logger .debug ("Starting test brokerBecomingUnvailableTriggersErrorFrame()" );
181
+
177
182
String sess1 = "sess1" ;
178
183
MessageExchange connect = MessageExchangeBuilder .connect (sess1 ).build ();
179
- this .responseHandler .expect (connect );
180
-
181
184
this .relay .handleMessage (connect .message );
182
- this .responseHandler .awaitAndAssert ();
183
-
184
- this .responseHandler .expect (MessageExchangeBuilder .error (sess1 ).build ());
185
+ this .responseHandler .expectMessages (connect );
185
186
187
+ MessageExchange error = MessageExchangeBuilder .error (sess1 ).build ();
186
188
stopActiveMqBrokerAndAwait ();
187
-
188
- this .responseHandler .awaitAndAssert ( );
189
+ this . eventPublisher . expectBrokerAvailabilityEvent ( false );
190
+ this .responseHandler .expectMessages ( error );
189
191
}
190
192
191
193
@ Test
192
194
public void brokerAvailabilityEventWhenStopped () throws Exception {
193
- this .eventPublisher .expectAvailabilityStatusChanges (false );
195
+
196
+ logger .debug ("Starting test brokerAvailabilityEventWhenStopped()" );
197
+
194
198
stopActiveMqBrokerAndAwait ();
195
- this .eventPublisher .awaitAndAssert ( );
199
+ this .eventPublisher .expectBrokerAvailabilityEvent ( false );
196
200
}
197
201
198
202
@ Test
199
203
public void relayReconnectsIfBrokerComesBackUp () throws Exception {
200
204
205
+ logger .debug ("Starting test relayReconnectsIfBrokerComesBackUp()" );
206
+
201
207
String sess1 = "sess1" ;
202
208
MessageExchange conn1 = MessageExchangeBuilder .connect (sess1 ).build ();
203
- this .responseHandler .expect (conn1 );
204
-
205
209
this .relay .handleMessage (conn1 .message );
206
- this .responseHandler .awaitAndAssert ( );
210
+ this .responseHandler .expectMessages ( conn1 );
207
211
208
212
String subs1 = "subs1" ;
209
213
String destination = "/topic/test" ;
210
- MessageExchange subscribe =
211
- MessageExchangeBuilder .subscribeWithReceipt (sess1 , subs1 , destination , "r1" ).build ();
212
- this .responseHandler .expect (subscribe );
213
-
214
+ MessageExchange subscribe = MessageExchangeBuilder .subscribeWithReceipt (sess1 , subs1 , destination , "r1" ).build ();
214
215
this .relay .handleMessage (subscribe .message );
215
- this .responseHandler .awaitAndAssert ();
216
-
217
- this .responseHandler .expect (MessageExchangeBuilder .error (sess1 ).build ());
216
+ this .responseHandler .expectMessages (subscribe );
218
217
218
+ MessageExchange error = MessageExchangeBuilder .error (sess1 ).build ();
219
219
stopActiveMqBrokerAndAwait ();
220
+ this .responseHandler .expectMessages (error );
220
221
221
- this .responseHandler . awaitAndAssert ( );
222
+ this .eventPublisher . expectBrokerAvailabilityEvent ( false );
222
223
223
- this .eventPublisher .expectAvailabilityStatusChanges (false );
224
- this .eventPublisher .awaitAndAssert ();
225
-
226
- this .eventPublisher .expectAvailabilityStatusChanges (true );
227
224
startActiveMqBroker ();
228
- this .eventPublisher .awaitAndAssert ( );
225
+ this .eventPublisher .expectBrokerAvailabilityEvent ( true );
229
226
230
227
// TODO The event publisher assertions show that the broker's back up and the system relay session
231
228
// has reconnected. We need to decide what we want the reconnect behaviour to be for client relay
@@ -236,11 +233,11 @@ public void relayReconnectsIfBrokerComesBackUp() throws Exception {
236
233
@ Test
237
234
public void disconnectClosesRelaySessionCleanly () throws Exception {
238
235
239
- MessageExchange connect = MessageExchangeBuilder .connect ("sess1" ).build ();
240
- this .responseHandler .expect (connect );
236
+ logger .debug ("Starting test disconnectClosesRelaySessionCleanly()" );
241
237
238
+ MessageExchange connect = MessageExchangeBuilder .connect ("sess1" ).build ();
242
239
this .relay .handleMessage (connect .message );
243
- this .responseHandler .awaitAndAssert ( );
240
+ this .responseHandler .expectMessages ( connect );
244
241
245
242
StompHeaderAccessor headers = StompHeaderAccessor .create (StompCommand .DISCONNECT );
246
243
headers .setSessionId ("sess1" );
@@ -249,79 +246,64 @@ public void disconnectClosesRelaySessionCleanly() throws Exception {
249
246
Thread .sleep (2000 );
250
247
251
248
// Check that we have not received an ERROR as a result of the connection closing
252
- this .responseHandler .awaitAndAssert ( );
249
+ assertTrue ( "Unexpected messages: " + this .responseHandler .queue , this . responseHandler . queue . isEmpty () );
253
250
}
254
251
255
252
256
- /**
257
- * Handles messages by matching them to expectations including a latch to wait for
258
- * the completion of expected messages.
259
- */
260
- private static class ExpectationMatchingMessageHandler implements MessageHandler {
261
-
262
- private final Object monitor = new Object ();
263
-
264
- private final List <MessageExchange > expected ;
265
-
266
- private final List <MessageExchange > actual = new ArrayList <>();
253
+ private static class TestEventPublisher implements ApplicationEventPublisher {
267
254
268
- private final List < Message <?>> unexpected = new ArrayList <>();
255
+ private final BlockingQueue < BrokerAvailabilityEvent > eventQueue = new LinkedBlockingQueue <>();
269
256
270
-
271
- public ExpectationMatchingMessageHandler (MessageExchange ... expected ) {
272
- synchronized (this .monitor ) {
273
- this .expected = new CopyOnWriteArrayList <>(expected );
257
+ @ Override
258
+ public void publishEvent (ApplicationEvent event ) {
259
+ logger .debug ("Processing ApplicationEvent " + event );
260
+ if (event instanceof BrokerAvailabilityEvent ) {
261
+ this .eventQueue .add ((BrokerAvailabilityEvent ) event );
274
262
}
275
263
}
276
264
277
- public void expect ( MessageExchange ... expected ) {
278
- synchronized ( this .monitor ) {
279
- this . expected . addAll ( Arrays . asList ( expected ) );
280
- }
265
+ public void expectBrokerAvailabilityEvent ( boolean isBrokerAvailable ) throws InterruptedException {
266
+ BrokerAvailabilityEvent event = this .eventQueue . poll ( 10000 , TimeUnit . MILLISECONDS );
267
+ assertNotNull ( "Times out waiting for BrokerAvailabilityEvent[" + isBrokerAvailable + "]" , event );
268
+ assertEquals ( isBrokerAvailable , event . isBrokerAvailable ());
281
269
}
270
+ }
282
271
283
- public void awaitAndAssert () throws InterruptedException {
284
- long endTime = System .currentTimeMillis () + 10000 ;
285
- synchronized (this .monitor ) {
286
- while (!this .expected .isEmpty () && System .currentTimeMillis () < endTime ) {
287
- this .monitor .wait (500 );
288
- }
289
- boolean result = this .expected .isEmpty ();
290
- assertTrue (getAsString (), result && this .unexpected .isEmpty ());
291
- }
292
- }
272
+ private static class TestMessageHandler implements MessageHandler {
273
+
274
+ private final BlockingQueue <Message <?>> queue = new LinkedBlockingQueue <>();
293
275
294
276
@ Override
295
277
public void handleMessage (Message <?> message ) throws MessagingException {
296
- if (StompHeaderAccessor .wrap (message ).getMessageType () != SimpMessageType .HEARTBEAT ) {
297
- synchronized (this .monitor ) {
298
- for (MessageExchange exch : this .expected ) {
299
- if (exch .matchMessage (message )) {
300
- if (exch .isDone ()) {
301
- this .expected .remove (exch );
302
- this .actual .add (exch );
303
- if (this .expected .isEmpty ()) {
304
- this .monitor .notifyAll ();
305
- }
306
- }
307
- return ;
308
- }
309
- }
310
- this .unexpected .add (message );
311
- }
278
+ if (SimpMessageType .HEARTBEAT == SimpMessageHeaderAccessor .wrap (message ).getMessageType ()) {
279
+ return ;
312
280
}
281
+ this .queue .add (message );
313
282
}
314
283
315
- public String getAsString () {
316
- StringBuilder sb = new StringBuilder ("\n " );
284
+ public void expectMessages (MessageExchange ... messageExchanges ) throws InterruptedException {
317
285
318
- synchronized (this .monitor ) {
319
- sb .append ("UNMATCHED EXPECTATIONS:\n " ).append (this .expected ).append ("\n " );
320
- sb .append ("MATCHED EXPECTATIONS:\n " ).append (this .actual ).append ("\n " );
321
- sb .append ("UNEXPECTED MESSAGES:\n " ).append (this .unexpected ).append ("\n " );
286
+ List <MessageExchange > expectedMessages =
287
+ new ArrayList <MessageExchange >(Arrays .<MessageExchange >asList (messageExchanges ));
288
+
289
+ while (expectedMessages .size () > 0 ) {
290
+ Message <?> message = this .queue .poll (10000 , TimeUnit .MILLISECONDS );
291
+ assertNotNull ("Timed out waiting for messages, expected [" + expectedMessages + "]" , message );
292
+
293
+ MessageExchange match = findMatch (expectedMessages , message );
294
+ assertNotNull ("Unexpected message=" + message + ", expected [" + expectedMessages + "]" , match );
295
+
296
+ expectedMessages .remove (match );
322
297
}
298
+ }
323
299
324
- return sb .toString ();
300
+ private MessageExchange findMatch (List <MessageExchange > expectedMessages , Message <?> message ) {
301
+ for (MessageExchange exchange : expectedMessages ) {
302
+ if (exchange .matchMessage (message )) {
303
+ return exchange ;
304
+ }
305
+ }
306
+ return null ;
325
307
}
326
308
}
327
309
@@ -564,43 +546,4 @@ public StompConnectedFrameMessageMatcher(String sessionId) {
564
546
565
547
}
566
548
567
- private static class ExpectationMatchingEventPublisher implements ApplicationEventPublisher {
568
-
569
- private final List <Boolean > expected = new ArrayList <>();
570
-
571
- private final List <Boolean > actual = new ArrayList <>();
572
-
573
- private final Object monitor = new Object ();
574
-
575
-
576
- public void expectAvailabilityStatusChanges (Boolean ... expected ) {
577
- synchronized (this .monitor ) {
578
- this .expected .addAll (Arrays .asList (expected ));
579
- }
580
- }
581
-
582
- public void awaitAndAssert () throws InterruptedException {
583
- synchronized (this .monitor ) {
584
- long endTime = System .currentTimeMillis () + 60000 ;
585
- while ((this .expected .size () != this .actual .size ()) && (System .currentTimeMillis () < endTime )) {
586
- this .monitor .wait (500 );
587
- }
588
- assertEquals (this .expected , this .actual );
589
- }
590
- }
591
-
592
- @ Override
593
- public void publishEvent (ApplicationEvent event ) {
594
- logger .debug ("Processing ApplicationEvent " + event );
595
- if (event instanceof BrokerAvailabilityEvent ) {
596
- synchronized (this .monitor ) {
597
- this .actual .add (((BrokerAvailabilityEvent ) event ).isBrokerAvailable ());
598
- if (this .actual .size () == this .expected .size ()) {
599
- this .monitor .notifyAll ();
600
- }
601
- }
602
- }
603
- }
604
- }
605
-
606
549
}
0 commit comments