37
37
import java .util .concurrent .ThreadFactory ;
38
38
import java .util .concurrent .TimeUnit ;
39
39
import java .util .concurrent .atomic .AtomicBoolean ;
40
+ import java .util .concurrent .atomic .AtomicInteger ;
40
41
import org .assertj .core .api .InstanceOfAssertFactories ;
41
42
import org .junit .jupiter .api .Test ;
42
43
import org .slf4j .Logger ;
@@ -53,9 +54,8 @@ class AbstractPollingMessageSourceTests {
53
54
54
55
private static final Logger logger = LoggerFactory .getLogger (AbstractPollingMessageSourceTests .class );
55
56
56
- // @RepeatedTest(400)
57
57
@ Test
58
- void shouldAcquireAndReleaseFullPermits () throws Exception {
58
+ void shouldAcquireAndReleaseFullPermits () {
59
59
String testName = "shouldAcquireAndReleaseFullPermits" ;
60
60
61
61
SemaphoreBackPressureHandler backPressureHandler = SemaphoreBackPressureHandler .builder ()
@@ -73,36 +73,47 @@ void shouldAcquireAndReleaseFullPermits() throws Exception {
73
73
74
74
@ Override
75
75
protected CompletableFuture <Collection <Message >> doPollForMessages (int messagesToRequest ) {
76
- doSleep (100 );
77
- // Since BackPressureMode.ALWAYS_POLL_MAX_MESSAGES, should always be 10.
78
- assertThat (messagesToRequest ).isEqualTo (10 );
79
- assertAvailablePermits (backPressureHandler , 0 );
80
- boolean firstPoll = hasReceived .compareAndSet (false , true );
81
- if (firstPoll ) {
82
- // No permits released yet, should be TM low
83
- assertThroughputMode (backPressureHandler , "low" );
84
- }
85
- else if (hasMadeSecondPoll .compareAndSet (false , true )) {
86
- // Permits returned, should be high
87
- assertThroughputMode (backPressureHandler , "high" );
88
- }
89
- else {
90
- // Already returned full permits, should be low
91
- assertThroughputMode (backPressureHandler , "low" );
92
- }
93
- return CompletableFuture
94
- .supplyAsync (() -> firstPoll
76
+ return CompletableFuture .supplyAsync (() -> {
77
+ try {
78
+ // Since BackPressureMode.ALWAYS_POLL_MAX_MESSAGES, should always be 10.
79
+ assertThat (messagesToRequest ).isEqualTo (10 );
80
+ assertAvailablePermits (backPressureHandler , 0 );
81
+ boolean firstPoll = hasReceived .compareAndSet (false , true );
82
+ if (firstPoll ) {
83
+ logger .debug ("First poll" );
84
+ // No permits released yet, should be TM low
85
+ assertThroughputMode (backPressureHandler , "low" );
86
+ }
87
+ else if (hasMadeSecondPoll .compareAndSet (false , true )) {
88
+ logger .debug ("Second poll" );
89
+ // Permits returned, should be high
90
+ assertThroughputMode (backPressureHandler , "high" );
91
+ }
92
+ else {
93
+ logger .debug ("Third poll" );
94
+ // Already returned full permits, should be low
95
+ assertThroughputMode (backPressureHandler , "low" );
96
+ }
97
+ return firstPoll
95
98
? (Collection <Message >) List .of (Message .builder ()
96
99
.messageId (UUID .randomUUID ().toString ()).body ("message" ).build ())
97
- : Collections .<Message > emptyList (), threadPool )
98
- .whenComplete ((v , t ) -> pollingCounter .countDown ());
100
+ : Collections .<Message > emptyList ();
101
+ }
102
+ catch (Throwable t ) {
103
+ logger .error ("Error" , t );
104
+ throw new RuntimeException (t );
105
+ }
106
+ }, threadPool ).whenComplete ((v , t ) -> {
107
+ if (t == null ) {
108
+ pollingCounter .countDown ();
109
+ }
110
+ });
99
111
}
100
112
};
101
113
102
114
source .setBackPressureHandler (backPressureHandler );
103
115
source .setMessageSink ((msgs , context ) -> {
104
116
assertAvailablePermits (backPressureHandler , 9 );
105
- doSleep (500 ); // Longer than acquire timout + polling sleep
106
117
msgs .forEach (msg -> context .runBackPressureReleaseCallback ());
107
118
return CompletableFuture .runAsync (processingCounter ::countDown );
108
119
});
@@ -112,20 +123,23 @@ else if (hasMadeSecondPoll.compareAndSet(false, true)) {
112
123
source .setTaskExecutor (createTaskExecutor (testName ));
113
124
source .setAcknowledgementProcessor (getAcknowledgementProcessor ());
114
125
source .start ();
115
- assertThat (pollingCounter . await ( 2 , TimeUnit . SECONDS )).isTrue ();
116
- assertThat (processingCounter . await ( 2 , TimeUnit . SECONDS )).isTrue ();
126
+ assertThat (doAwait ( pollingCounter )).isTrue ();
127
+ assertThat (doAwait ( processingCounter )).isTrue ();
117
128
}
118
129
119
- // @RepeatedTest(400)
130
+ private static final AtomicInteger testCounter = new AtomicInteger ();
131
+
120
132
@ Test
121
- void shouldAcquireAndReleasePartialPermits () throws Exception {
133
+ void shouldAcquireAndReleasePartialPermits () {
122
134
String testName = "shouldAcquireAndReleasePartialPermits" ;
123
135
SemaphoreBackPressureHandler backPressureHandler = SemaphoreBackPressureHandler .builder ()
124
- .acquireTimeout (Duration .ofMillis (200 )).batchSize (10 ).totalPermits (10 )
136
+ .acquireTimeout (Duration .ofMillis (150 )).batchSize (10 ).totalPermits (10 )
125
137
.throughputConfiguration (BackPressureMode .AUTO ).build ();
126
- ExecutorService threadPool = Executors .newCachedThreadPool ();
138
+ ExecutorService threadPool = Executors
139
+ .newCachedThreadPool (new MessageExecutionThreadFactory ("test " + testCounter .incrementAndGet ()));
127
140
CountDownLatch pollingCounter = new CountDownLatch (4 );
128
141
CountDownLatch processingCounter = new CountDownLatch (1 );
142
+ CountDownLatch processingLatch = new CountDownLatch (1 );
129
143
AtomicBoolean hasThrownError = new AtomicBoolean (false );
130
144
131
145
AbstractPollingMessageSource <Object , Message > source = new AbstractPollingMessageSource <>() {
@@ -138,64 +152,67 @@ void shouldAcquireAndReleasePartialPermits() throws Exception {
138
152
139
153
@ Override
140
154
protected CompletableFuture <Collection <Message >> doPollForMessages (int messagesToRequest ) {
141
- try {
142
- // Give it some time between returning empty and polling again
143
- doSleep (100 );
144
-
145
- // Will only be true the first time it sets hasReceived to true
146
- boolean shouldReturnMessage = hasReceived .compareAndSet (false , true );
147
- if (shouldReturnMessage ) {
148
- // First poll, should have 10
149
- logger .debug ("First poll - should request 10 messages" );
150
- assertThat (messagesToRequest ).isEqualTo (10 );
151
- assertAvailablePermits (backPressureHandler , 0 );
152
- // No permits have been released yet
153
- assertThroughputMode (backPressureHandler , "low" );
154
- }
155
- else if (hasAcquired9 .compareAndSet (false , true )) {
156
- // Second poll, should have 9
157
- logger .debug ("Second poll - should request 9 messages" );
158
- assertThat (messagesToRequest ).isEqualTo (9 );
159
- assertAvailablePermitsLessThanOrEqualTo (backPressureHandler , 1 );
160
- // Has released 9 permits, should be TM HIGH
161
- assertThroughputMode (backPressureHandler , "high" );
162
- }
163
- else {
164
- boolean thirdPoll = hasMadeThirdPoll .compareAndSet (false , true );
165
- // Third poll or later, should have 10 again
166
- logger .debug ("Third poll - should request 10 messages" );
167
- assertThat (messagesToRequest ).isEqualTo (10 );
168
- assertAvailablePermits (backPressureHandler , 0 );
169
- if (thirdPoll ) {
170
- // Hasn't yet returned a full batch, should be TM High
155
+ return CompletableFuture .supplyAsync (() -> {
156
+ try {
157
+ // Give it some time between returning empty and polling again
158
+ // doSleep(100);
159
+
160
+ // Will only be true the first time it sets hasReceived to true
161
+ boolean shouldReturnMessage = hasReceived .compareAndSet (false , true );
162
+ if (shouldReturnMessage ) {
163
+ // First poll, should have 10
164
+ logger .debug ("First poll - should request 10 messages" );
165
+ assertThat (messagesToRequest ).isEqualTo (10 );
166
+ assertAvailablePermits (backPressureHandler , 0 );
167
+ // No permits have been released yet
168
+ assertThroughputMode (backPressureHandler , "low" );
169
+ }
170
+ else if (hasAcquired9 .compareAndSet (false , true )) {
171
+ // Second poll, should have 9
172
+ logger .debug ("Second poll - should request 9 messages" );
173
+ assertThat (messagesToRequest ).isEqualTo (9 );
174
+ assertAvailablePermitsLessThanOrEqualTo (backPressureHandler , 1 );
175
+ // Has released 9 permits, should be TM HIGH
171
176
assertThroughputMode (backPressureHandler , "high" );
177
+ processingLatch .countDown (); // Release processing now
172
178
}
173
179
else {
174
- // Has returned all permits in third poll
175
- assertThroughputMode (backPressureHandler , "low" );
180
+ boolean thirdPoll = hasMadeThirdPoll .compareAndSet (false , true );
181
+ // Third poll or later, should have 10 again
182
+ logger .debug ("Third poll - should request 10 messages" );
183
+ assertThat (messagesToRequest ).isEqualTo (10 );
184
+ assertAvailablePermits (backPressureHandler , 0 );
185
+ if (thirdPoll ) {
186
+ // Hasn't yet returned a full batch, should be TM High
187
+ assertThroughputMode (backPressureHandler , "high" );
188
+ }
189
+ else {
190
+ // Has returned all permits in third poll
191
+ assertThroughputMode (backPressureHandler , "low" );
192
+ }
176
193
}
177
- }
178
- return CompletableFuture .supplyAsync (() -> {
179
194
if (shouldReturnMessage ) {
180
195
logger .debug ("shouldReturnMessage, returning one message" );
181
196
return (Collection <Message >) List .of (
182
197
Message .builder ().messageId (UUID .randomUUID ().toString ()).body ("message" ).build ());
183
198
}
184
199
logger .debug ("should not return message, returning empty list" );
185
200
return Collections .<Message > emptyList ();
186
- }, threadPool ). whenComplete (( v , t ) -> pollingCounter . countDown ());
187
- }
188
- catch ( Error e ) {
189
- hasThrownError . set ( true );
190
- return CompletableFuture . failedFuture ( new RuntimeException ( e ));
191
- }
201
+ }
202
+ catch ( Error e ) {
203
+ hasThrownError . set ( true );
204
+ throw new RuntimeException ( "Error polling for messages" , e );
205
+ }
206
+ }, threadPool ). whenComplete (( v , t ) -> pollingCounter . countDown ());
192
207
}
193
208
};
194
209
195
210
source .setBackPressureHandler (backPressureHandler );
196
211
source .setMessageSink ((msgs , context ) -> {
212
+ logger .debug ("Processing {} messages" , msgs .size ());
197
213
assertAvailablePermits (backPressureHandler , 9 );
198
- doSleep (500 ); // Longer than acquire timout + polling sleep
214
+ assertThat (doAwait (processingLatch )).isTrue ();
215
+ logger .debug ("Finished processing {} messages" , msgs .size ());
199
216
msgs .forEach (msg -> context .runBackPressureReleaseCallback ());
200
217
return CompletableFuture .completedFuture (null ).thenRun (processingCounter ::countDown );
201
218
});
@@ -204,12 +221,22 @@ else if (hasAcquired9.compareAndSet(false, true)) {
204
221
source .setTaskExecutor (createTaskExecutor (testName ));
205
222
source .setAcknowledgementProcessor (getAcknowledgementProcessor ());
206
223
source .start ();
207
- assertThat (processingCounter . await ( 2 , TimeUnit . SECONDS )).isTrue ();
208
- assertThat (pollingCounter . await ( 2 , TimeUnit . SECONDS )).isTrue ();
224
+ assertThat (doAwait ( processingCounter )).isTrue ();
225
+ assertThat (doAwait ( pollingCounter )).isTrue ();
209
226
source .stop ();
210
227
assertThat (hasThrownError .get ()).isFalse ();
211
228
}
212
229
230
+ private static boolean doAwait (CountDownLatch processingLatch ) {
231
+ try {
232
+ return processingLatch .await (4 , TimeUnit .SECONDS );
233
+ }
234
+ catch (InterruptedException e ) {
235
+ Thread .currentThread ().interrupt ();
236
+ throw new RuntimeException ("Interrupted while waiting for latch" , e );
237
+ }
238
+ }
239
+
213
240
private void assertThroughputMode (SemaphoreBackPressureHandler backPressureHandler , String expectedThroughputMode ) {
214
241
assertThat (ReflectionTestUtils .getField (backPressureHandler , "currentThroughputMode" ))
215
242
.extracting (Object ::toString ).extracting (String ::toLowerCase )
@@ -243,7 +270,6 @@ protected TaskExecutor createTaskExecutor(String testName) {
243
270
int poolSize = 10 ;
244
271
executor .setMaxPoolSize (poolSize );
245
272
executor .setCorePoolSize (10 );
246
- // Necessary due to a small racing condition between releasing the permit and releasing the thread.
247
273
executor .setQueueCapacity (poolSize );
248
274
executor .setAllowCoreThreadTimeOut (true );
249
275
executor .setThreadFactory (createThreadFactory (testName ));
0 commit comments