11/*
2- * Copyright 2002-2017 the original author or authors.
2+ * Copyright 2002-2018 the original author or authors.
33 *
44 * Licensed under the Apache License, Version 2.0 (the "License");
55 * you may not use this file except in compliance with the License.
1919import static org .junit .Assert .assertEquals ;
2020import static org .mockito .Mockito .mock ;
2121
22- import java .util .Date ;
23- import java .util .concurrent .CountDownLatch ;
24- import java .util .concurrent .TimeUnit ;
25- import java .util .concurrent .atomic .AtomicBoolean ;
2622import java .util .concurrent .atomic .AtomicInteger ;
2723
2824import org .junit .After ;
3329import org .springframework .beans .factory .BeanFactory ;
3430import org .springframework .integration .MessageRejectedException ;
3531import org .springframework .integration .support .MessagingExceptionWrapper ;
32+ import org .springframework .integration .test .util .OnlyOnceTrigger ;
3633import org .springframework .messaging .Message ;
3734import org .springframework .messaging .MessageHandler ;
3835import org .springframework .messaging .PollableChannel ;
3936import org .springframework .messaging .support .GenericMessage ;
40- import org .springframework .scheduling .Trigger ;
41- import org .springframework .scheduling .TriggerContext ;
4237import org .springframework .scheduling .concurrent .ThreadPoolTaskScheduler ;
4338import org .springframework .util .ErrorHandler ;
4439
4540/**
4641 * @author Iwein Fuld
4742 * @author Mark Fisher
4843 * @author Kiel Boatman
44+ * @author Artem Bilan
4945 */
50- @ SuppressWarnings (" unchecked" )
46+ @ SuppressWarnings ({ "rawtypes" , " unchecked" } )
5147public class PollingConsumerEndpointTests {
5248
53- private PollingConsumer endpoint ;
54-
55- private final TestTrigger trigger = new TestTrigger ();
49+ private final OnlyOnceTrigger trigger = new OnlyOnceTrigger ();
5650
5751 private final TestConsumer consumer = new TestConsumer ();
5852
59- @ SuppressWarnings ("rawtypes" )
60- private final Message message = new GenericMessage <String >("test" );
53+ private final Message message = new GenericMessage <>("test" );
6154
62- @ SuppressWarnings ("rawtypes" )
63- private final Message badMessage = new GenericMessage <String >("bad" );
55+ private final Message badMessage = new GenericMessage <>("bad" );
6456
6557 private final TestErrorHandler errorHandler = new TestErrorHandler ();
6658
67- private PollableChannel channelMock ;
68-
6959 private final ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler ();
7060
61+ private PollingConsumer endpoint ;
62+
63+ private PollableChannel channelMock ;
64+
7165
7266 @ Before
73- public void init () throws Exception {
74- channelMock = Mockito .mock (PollableChannel .class );
75- consumer .counter .set (0 );
76- trigger .reset ();
77- endpoint = new PollingConsumer (channelMock , consumer );
78- taskScheduler .setPoolSize (5 );
79- endpoint .setErrorHandler (errorHandler );
80- endpoint .setTaskScheduler (taskScheduler );
81- endpoint .setTrigger (trigger );
82- endpoint .setBeanFactory (mock (BeanFactory .class ));
83- endpoint .setReceiveTimeout (-1 );
84- endpoint .afterPropertiesSet ();
85- taskScheduler .afterPropertiesSet ();
67+ public void init () {
68+ this .channelMock = mock (PollableChannel .class );
69+ this .endpoint = new PollingConsumer (this .channelMock , this .consumer );
70+ this .taskScheduler .setPoolSize (5 );
71+ this .endpoint .setErrorHandler (this .errorHandler );
72+ this .endpoint .setTaskScheduler (this .taskScheduler );
73+ this .endpoint .setTrigger (this .trigger );
74+ this .endpoint .setBeanFactory (mock (BeanFactory .class ));
75+ this .endpoint .setReceiveTimeout (-1 );
76+ this .endpoint .afterPropertiesSet ();
77+ this .taskScheduler .afterPropertiesSet ();
8678 }
8779
8880 @ After
89- public void stop () throws Exception {
81+ public void stop () {
9082 taskScheduler .destroy ();
9183 }
9284
9385
9486 @ Test
9587 public void singleMessage () {
96- Mockito .when (channelMock .receive ()).thenReturn (message );
97- endpoint .setMaxMessagesPerPoll (1 );
98- endpoint .start ();
99- trigger .await ();
100- endpoint .stop ();
101- assertEquals (1 , consumer .counter .get ());
88+ Mockito .when (this . channelMock .receive ()).thenReturn (this . message );
89+ this . endpoint .setMaxMessagesPerPoll (1 );
90+ this . endpoint .start ();
91+ this . trigger .await ();
92+ this . endpoint .stop ();
93+ assertEquals (1 , this . consumer .counter .get ());
10294 }
10395
10496 @ Test
10597 public void multipleMessages () {
106- Mockito .when (channelMock .receive ()).thenReturn (message , message , message , message , message );
107- endpoint .setMaxMessagesPerPoll (5 );
108- endpoint .start ();
109- trigger .await ();
110- endpoint .stop ();
111- assertEquals (5 , consumer .counter .get ());
98+ Mockito .when (this .channelMock .receive ())
99+ .thenReturn (this .message , this .message , this .message , this .message , this .message );
100+ this .endpoint .setMaxMessagesPerPoll (5 );
101+ this .endpoint .start ();
102+ this .trigger .await ();
103+ this .endpoint .stop ();
104+ assertEquals (5 , this .consumer .counter .get ());
112105 }
113106
114107 @ Test
115- public void multipleMessages_underrun () {
116- Mockito .when (channelMock .receive ()).thenReturn (message , message , message , message , message , null );
117- endpoint .setMaxMessagesPerPoll (6 );
118- endpoint .start ();
119- trigger .await ();
120- endpoint .stop ();
121- assertEquals (5 , consumer .counter .get ());
108+ public void multipleMessages_under_run () {
109+ Mockito .when (this .channelMock .receive ())
110+ .thenReturn (this .message , this .message , this .message , this .message , this .message , null );
111+ this .endpoint .setMaxMessagesPerPoll (6 );
112+ this .endpoint .start ();
113+ this .trigger .await ();
114+ this .endpoint .stop ();
115+ assertEquals (5 , this .consumer .counter .get ());
122116 }
123117
124118 @ Test
125- public void heavierLoadTest () throws Exception {
119+ public void heavierLoadTest () {
126120 for (int i = 0 ; i < 1000 ; i ++) {
127- this .init ();
128- this .multipleMessages ();
129- this .stop ();
121+ init ();
122+ this .trigger .reset ();
123+ this .consumer .counter .set (0 );
124+ multipleMessages ();
125+ stop ();
130126 }
131127 }
132128
133129 @ Test (expected = MessageRejectedException .class )
134130 public void rejectedMessage () throws Throwable {
135- Mockito .when (channelMock .receive ()).thenReturn (badMessage );
136- endpoint .start ();
137- trigger .await ();
138- endpoint .stop ();
139- assertEquals (1 , consumer .counter .get ());
140- errorHandler .throwLastErrorIfAvailable ();
131+ Mockito .when (this . channelMock .receive ()).thenReturn (this . badMessage );
132+ this . endpoint .start ();
133+ this . trigger .await ();
134+ this . endpoint .stop ();
135+ assertEquals (1 , this . consumer .counter .get ());
136+ this . errorHandler .throwLastErrorIfAvailable ();
141137 }
142138
143139 @ Test (expected = MessageRejectedException .class )
144140 public void droppedMessage_onePerPoll () throws Throwable {
145- Mockito .when (channelMock .receive ()).thenReturn (badMessage );
146- endpoint .setMaxMessagesPerPoll (10 );
147- endpoint .start ();
148- trigger .await ();
149- endpoint .stop ();
150- assertEquals (1 , consumer .counter .get ());
151- errorHandler .throwLastErrorIfAvailable ();
141+ Mockito .when (this . channelMock .receive ()).thenReturn (this . badMessage );
142+ this . endpoint .setMaxMessagesPerPoll (10 );
143+ this . endpoint .start ();
144+ this . trigger .await ();
145+ this . endpoint .stop ();
146+ assertEquals (1 , this . consumer .counter .get ());
147+ this . errorHandler .throwLastErrorIfAvailable ();
152148 }
153149
154150 @ Test
155151 public void blockingSourceTimedOut () {
156152 // we don't need to await the timeout, returning null suffices
157- Mockito .when (channelMock .receive ()).thenReturn (null );
158- endpoint .setReceiveTimeout (1 );
159- endpoint .start ();
160- trigger .await ();
161- endpoint .stop ();
162- assertEquals (0 , consumer .counter .get ());
153+ Mockito .when (this . channelMock .receive ()).thenReturn (null );
154+ this . endpoint .setReceiveTimeout (1 );
155+ this . endpoint .start ();
156+ this . trigger .await ();
157+ this . endpoint .stop ();
158+ assertEquals (0 , this . consumer .counter .get ());
163159 }
164160
165161 @ Test
166162 public void blockingSourceNotTimedOut () {
167- Mockito .when (channelMock .receive (Mockito .eq (1L ))).thenReturn (message );
168- endpoint .setReceiveTimeout (1 );
169- endpoint .setMaxMessagesPerPoll (1 );
170- endpoint .start ();
171- trigger .await ();
172- endpoint .stop ();
173- assertEquals (1 , consumer .counter .get ());
163+ Mockito .when (this . channelMock .receive (Mockito .eq (1L ))).thenReturn (this . message );
164+ this . endpoint .setReceiveTimeout (1 );
165+ this . endpoint .setMaxMessagesPerPoll (1 );
166+ this . endpoint .start ();
167+ this . trigger .await ();
168+ this . endpoint .stop ();
169+ assertEquals (1 , this . consumer .counter .get ());
174170 }
175171
176172
@@ -192,45 +188,6 @@ public void handleMessage(Message<?> message) {
192188 }
193189
194190
195- private static class TestTrigger implements Trigger {
196-
197- private final AtomicBoolean hasRun = new AtomicBoolean ();
198-
199- private volatile CountDownLatch latch = new CountDownLatch (1 );
200-
201-
202- TestTrigger () {
203- super ();
204- }
205-
206- @ Override
207- public Date nextExecutionTime (TriggerContext triggerContext ) {
208- if (!this .hasRun .getAndSet (true )) {
209- return new Date ();
210- }
211- this .latch .countDown ();
212- return null ;
213- }
214-
215- public void reset () {
216- this .latch = new CountDownLatch (1 );
217- this .hasRun .set (false );
218- }
219-
220- public void await () {
221- try {
222- this .latch .await (5000 , TimeUnit .MILLISECONDS );
223- if (latch .getCount () != 0 ) {
224- throw new RuntimeException ("test latch.await() did not count down" );
225- }
226- }
227- catch (InterruptedException e ) {
228- throw new RuntimeException ("test latch.await() interrupted" );
229- }
230- }
231- }
232-
233-
234191 private static class TestErrorHandler implements ErrorHandler {
235192
236193 private volatile Throwable lastError ;
@@ -252,6 +209,7 @@ public void throwLastErrorIfAvailable() throws Throwable {
252209 this .lastError = null ;
253210 throw t ;
254211 }
212+
255213 }
256214
257215}
0 commit comments