2929import com .datastax .oss .driver .api .core .session .throttling .Throttled ;
3030import com .datastax .oss .driver .shaded .guava .common .collect .Lists ;
3131import java .util .List ;
32+ import java .util .concurrent .CountDownLatch ;
3233import java .util .function .Consumer ;
3334import org .junit .Before ;
3435import org .junit .Test ;
@@ -67,7 +68,7 @@ public void should_start_immediately_when_under_capacity() {
6768 throttler .register (request );
6869
6970 // Then
70- assertThatStage (request .started ).isSuccess (wasDelayed -> assertThat (wasDelayed ).isFalse ());
71+ assertThatStage (request .ended ).isSuccess (wasDelayed -> assertThat (wasDelayed ).isFalse ());
7172 assertThat (throttler .getConcurrentRequests ()).isEqualTo (1 );
7273 assertThat (throttler .getQueue ()).isEmpty ();
7374 }
@@ -98,7 +99,7 @@ private void should_allow_new_request_when_active_one_completes(
9899 // Given
99100 MockThrottled first = new MockThrottled ();
100101 throttler .register (first );
101- assertThatStage (first .started ).isSuccess (wasDelayed -> assertThat (wasDelayed ).isFalse ());
102+ assertThatStage (first .ended ).isSuccess (wasDelayed -> assertThat (wasDelayed ).isFalse ());
102103 for (int i = 0 ; i < 4 ; i ++) { // fill to capacity
103104 throttler .register (new MockThrottled ());
104105 }
@@ -113,7 +114,7 @@ private void should_allow_new_request_when_active_one_completes(
113114 throttler .register (incoming );
114115
115116 // Then
116- assertThatStage (incoming .started ).isSuccess (wasDelayed -> assertThat (wasDelayed ).isFalse ());
117+ assertThatStage (incoming .ended ).isSuccess (wasDelayed -> assertThat (wasDelayed ).isFalse ());
117118 assertThat (throttler .getConcurrentRequests ()).isEqualTo (5 );
118119 assertThat (throttler .getQueue ()).isEmpty ();
119120 }
@@ -132,7 +133,7 @@ public void should_enqueue_when_over_capacity() {
132133 throttler .register (incoming );
133134
134135 // Then
135- assertThatStage (incoming .started ).isNotDone ();
136+ assertThatStage (incoming .ended ).isNotDone ();
136137 assertThat (throttler .getConcurrentRequests ()).isEqualTo (5 );
137138 assertThat (throttler .getQueue ()).containsExactly (incoming );
138139 }
@@ -157,20 +158,20 @@ private void should_dequeue_when_active_completes(Consumer<Throttled> completeCa
157158 // Given
158159 MockThrottled first = new MockThrottled ();
159160 throttler .register (first );
160- assertThatStage (first .started ).isSuccess (wasDelayed -> assertThat (wasDelayed ).isFalse ());
161+ assertThatStage (first .ended ).isSuccess (wasDelayed -> assertThat (wasDelayed ).isFalse ());
161162 for (int i = 0 ; i < 4 ; i ++) {
162163 throttler .register (new MockThrottled ());
163164 }
164165
165166 MockThrottled incoming = new MockThrottled ();
166167 throttler .register (incoming );
167- assertThatStage (incoming .started ).isNotDone ();
168+ assertThatStage (incoming .ended ).isNotDone ();
168169
169170 // When
170171 completeCallback .accept (first );
171172
172173 // Then
173- assertThatStage (incoming .started ).isSuccess (wasDelayed -> assertThat (wasDelayed ).isTrue ());
174+ assertThatStage (incoming .ended ).isSuccess (wasDelayed -> assertThat (wasDelayed ).isTrue ());
174175 assertThat (throttler .getConcurrentRequests ()).isEqualTo (5 );
175176 assertThat (throttler .getQueue ()).isEmpty ();
176177 }
@@ -189,7 +190,7 @@ public void should_reject_when_queue_is_full() {
189190 throttler .register (incoming );
190191
191192 // Then
192- assertThatStage (incoming .started )
193+ assertThatStage (incoming .ended )
193194 .isFailed (error -> assertThat (error ).isInstanceOf (RequestThrottlingException .class ));
194195 }
195196
@@ -208,7 +209,7 @@ public void should_remove_timed_out_request_from_queue() {
208209 throttler .signalTimeout (queued1 );
209210
210211 // Then
211- assertThatStage (queued2 .started ).isNotDone ();
212+ assertThatStage (queued2 .ended ).isNotDone ();
212213 assertThat (throttler .getConcurrentRequests ()).isEqualTo (5 );
213214 assertThat (throttler .getQueue ()).hasSize (1 );
214215 }
@@ -223,7 +224,7 @@ public void should_reject_enqueued_when_closing() {
223224 for (int i = 0 ; i < 10 ; i ++) {
224225 MockThrottled request = new MockThrottled ();
225226 throttler .register (request );
226- assertThatStage (request .started ).isNotDone ();
227+ assertThatStage (request .ended ).isNotDone ();
227228 enqueued .add (request );
228229 }
229230
@@ -232,7 +233,7 @@ public void should_reject_enqueued_when_closing() {
232233
233234 // Then
234235 for (MockThrottled request : enqueued ) {
235- assertThatStage (request .started )
236+ assertThatStage (request .ended )
236237 .isFailed (error -> assertThat (error ).isInstanceOf (RequestThrottlingException .class ));
237238 }
238239
@@ -241,7 +242,125 @@ public void should_reject_enqueued_when_closing() {
241242 throttler .register (request );
242243
243244 // Then
244- assertThatStage (request .started )
245+ assertThatStage (request .ended )
245246 .isFailed (error -> assertThat (error ).isInstanceOf (RequestThrottlingException .class ));
246247 }
248+
249+ @ Test
250+ public void should_run_throttle_callbacks_concurrently () throws InterruptedException {
251+ // Given
252+
253+ // a task is enqueued, which when in onThrottleReady, will stall latch countDown()ed
254+ // register() should automatically start onThrottleReady on same thread
255+
256+ // start a parallel thread
257+ CountDownLatch firstRelease = new CountDownLatch (1 );
258+ MockThrottled first = new MockThrottled (firstRelease );
259+ Runnable r =
260+ () -> {
261+ throttler .register (first );
262+ first .ended .toCompletableFuture ().thenRun (() -> throttler .signalSuccess (first ));
263+ };
264+ Thread t = new Thread (r );
265+ t .start ();
266+
267+ // wait for the registration threads to reach await state
268+ assertThatStage (first .started ).isSuccess ();
269+ assertThatStage (first .ended ).isNotDone ();
270+
271+ // When
272+ // we concurrently submit a second shorter task
273+ MockThrottled second = new MockThrottled ();
274+ // (on a second thread, so that we can join and force a timeout in case
275+ // registration is delayed)
276+ Thread t2 = new Thread (() -> throttler .register (second ));
277+ t2 .start ();
278+ t2 .join (1_000 );
279+
280+ // Then
281+ // registration will trigger callback, should complete ~immediately
282+ assertThatStage (second .ended ).isSuccess (wasDelayed -> assertThat (wasDelayed ).isFalse ());
283+ // first should still be unfinished
284+ assertThatStage (first .started ).isDone ();
285+ assertThatStage (first .ended ).isNotDone ();
286+ // now finish, and verify
287+ firstRelease .countDown ();
288+ assertThatStage (first .ended ).isSuccess (wasDelayed -> assertThat (wasDelayed ).isFalse ());
289+
290+ t .join (1_000 );
291+ }
292+
293+ @ Test
294+ public void should_enqueue_tasks_quickly_when_callbacks_blocked () throws InterruptedException {
295+ // Given
296+
297+ // Multiple tasks are registered, up to the limit, and proceed into their
298+ // callback
299+
300+ // start five parallel threads
301+ final int THREADS = 5 ;
302+ Thread [] threads = new Thread [THREADS ];
303+ CountDownLatch [] latches = new CountDownLatch [THREADS ];
304+ MockThrottled [] throttled = new MockThrottled [THREADS ];
305+ for (int i = 0 ; i < threads .length ; i ++) {
306+ latches [i ] = new CountDownLatch (1 );
307+ final MockThrottled itThrottled = new MockThrottled (latches [i ]);
308+ throttled [i ] = itThrottled ;
309+ threads [i ] =
310+ new Thread (
311+ () -> {
312+ throttler .register (itThrottled );
313+ itThrottled
314+ .ended
315+ .toCompletableFuture ()
316+ .thenRun (() -> throttler .signalSuccess (itThrottled ));
317+ });
318+ threads [i ].start ();
319+ }
320+
321+ // wait for the registration threads to be launched
322+ // they are all waiting now
323+ for (int i = 0 ; i < throttled .length ; i ++) {
324+ assertThatStage (throttled [i ].started ).isSuccess ();
325+ assertThatStage (throttled [i ].ended ).isNotDone ();
326+ }
327+
328+ // When
329+ // we concurrently submit another task
330+ MockThrottled last = new MockThrottled ();
331+ throttler .register (last );
332+
333+ // Then
334+ // registration will enqueue the callback, and it should not
335+ // take any time to proceed (ie: we should not be blocked)
336+ // and there should be an element in the queue
337+ assertThatStage (last .started ).isNotDone ();
338+ assertThatStage (last .ended ).isNotDone ();
339+ assertThat (throttler .getQueue ()).containsExactly (last );
340+
341+ // we still have not released, so old throttled threads should be waiting
342+ for (int i = 0 ; i < throttled .length ; i ++) {
343+ assertThatStage (throttled [i ].started ).isDone ();
344+ assertThatStage (throttled [i ].ended ).isNotDone ();
345+ }
346+
347+ // now let us release ..
348+ for (int i = 0 ; i < latches .length ; i ++) {
349+ latches [i ].countDown ();
350+ }
351+
352+ // .. and check everything finished up OK
353+ for (int i = 0 ; i < latches .length ; i ++) {
354+ assertThatStage (throttled [i ].started ).isSuccess ();
355+ assertThatStage (throttled [i ].ended ).isSuccess ();
356+ }
357+
358+ // for good measure, we will also wait for the enqueued to complete
359+ assertThatStage (last .started ).isSuccess ();
360+ assertThatStage (last .ended ).isSuccess ();
361+
362+ for (int i = 0 ; i < threads .length ; i ++) {
363+ threads [i ].join (1_000 );
364+ }
365+ }
247366}
0 commit comments