Skip to content

Commit 5e5a4bc

Browse files
authored
Fix bug with application of max idle time (#176)
* add failing tests for lifo checkout order * add LifoQueue and use it in MemberSingle
1 parent ae50abf commit 5e5a4bc

File tree

4 files changed

+154
-2
lines changed

4 files changed

+154
-2
lines changed
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package org.davidmoten.rx.internal;
2+
3+
import java.util.concurrent.atomic.AtomicReference;
4+
5+
import io.reactivex.annotations.NonNull;
6+
import io.reactivex.annotations.Nullable;
7+
8+
/**
9+
* Thread-safe Last-In-First-Out queue. Current usage is multi-producer, single
10+
* consumer but LIFO use case doesn't seem to offer opportunity for performance
11+
* enhancements like the MpscLinkedQueue does for FIFO use case.
12+
*
13+
* @param <T> queued item type
14+
*/
15+
public final class LifoQueue<T> {
16+
17+
private final AtomicReference<Node<T>> head = new AtomicReference<>();
18+
19+
public void offer(@NonNull T t) {
20+
while (true) {
21+
Node<T> a = head.get();
22+
Node<T> b = new Node<>(t, a);
23+
if (head.compareAndSet(a, b)) {
24+
return;
25+
}
26+
}
27+
}
28+
29+
public @Nullable T poll() {
30+
Node<T> a = head.get();
31+
if (a == null) {
32+
return null;
33+
} else {
34+
while (true) {
35+
if (head.compareAndSet(a, a.next)) {
36+
return a.value;
37+
} else {
38+
a = head.get();
39+
}
40+
}
41+
}
42+
}
43+
44+
public void clear() {
45+
head.set(null);
46+
}
47+
48+
static final class Node<T> {
49+
final @NonNull T value;
50+
final @Nullable Node<T> next;
51+
52+
Node(T value, Node<T> next) {
53+
this.value = value;
54+
this.next = next;
55+
}
56+
}
57+
58+
}

rxjava2-pool/src/main/java/org/davidmoten/rx/pool/MemberSingle.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.concurrent.atomic.AtomicReference;
1010
import java.util.function.BiFunction;
1111

12+
import org.davidmoten.rx.internal.LifoQueue;
1213
import org.slf4j.Logger;
1314
import org.slf4j.LoggerFactory;
1415

@@ -38,7 +39,7 @@ final class MemberSingle<T> extends Single<Member<T>> implements Closeable {
3839
// toBeRemoved queue
3940
private final MemberSingleObserver<T> removeAll;
4041

41-
private final SimplePlainQueue<DecoratingMember<T>> initializedAvailable;
42+
private final LifoQueue<DecoratingMember<T>> initializedAvailable;
4243
private final SimplePlainQueue<DecoratingMember<T>> notInitialized;
4344
private final SimplePlainQueue<DecoratingMember<T>> toBeReleased;
4445
private final SimplePlainQueue<DecoratingMember<T>> toBeChecked;
@@ -63,7 +64,7 @@ final class MemberSingle<T> extends Single<Member<T>> implements Closeable {
6364
MemberSingle(NonBlockingPool<T> pool) {
6465
Preconditions.checkNotNull(pool);
6566
this.notInitialized = new MpscLinkedQueue<>();
66-
this.initializedAvailable = new MpscLinkedQueue<>();
67+
this.initializedAvailable = new LifoQueue<>();
6768
this.toBeReleased = new MpscLinkedQueue<>();
6869
this.toBeChecked = new MpscLinkedQueue<>();
6970
this.toBeAdded = new MpscLinkedQueue<>();
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package org.davidmoten.rx.internal;
2+
3+
import static org.junit.Assert.assertEquals;
4+
import static org.junit.Assert.assertNull;
5+
6+
import org.junit.Test;
7+
8+
public class LifoQueueTest {
9+
10+
@Test
11+
public void testIsLifo() {
12+
LifoQueue<Integer> q = new LifoQueue<>();
13+
q.offer(1);
14+
q.offer(2);
15+
assertEquals(2, (int) q.poll());
16+
assertEquals(1, (int) q.poll());
17+
assertNull(q.poll());
18+
}
19+
20+
@Test
21+
public void testClear() {
22+
LifoQueue<Integer> q = new LifoQueue<>();
23+
q.offer(1);
24+
q.offer(2);
25+
q.clear();
26+
assertNull(q.poll());
27+
}
28+
29+
}

rxjava2-pool/src/test/java/org/davidmoten/rx/pool/NonBlockingPoolTest.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static org.junit.Assert.assertEquals;
44
import static org.junit.Assert.assertFalse;
5+
import static org.junit.Assert.assertNotNull;
56
import static org.junit.Assert.assertNull;
67
import static org.junit.Assert.assertTrue;
78

@@ -239,6 +240,69 @@ public void testConnectionPoolRecylesMany() throws SQLException {
239240
}
240241
}
241242

243+
@Test
244+
public void testConnectionPoolRecylesLastInFirstOut() throws Exception {
245+
AtomicInteger count = new AtomicInteger();
246+
try (Pool<Integer> pool = NonBlockingPool //
247+
.factory(() -> count.incrementAndGet()) //
248+
.healthCheck(n -> true) //
249+
.maxSize(4) //
250+
.maxIdleTime(1, TimeUnit.MINUTES) //
251+
.build()) {
252+
Member<Integer> m1 = pool.member().blockingGet();
253+
Member<Integer> m2 = pool.member().blockingGet();
254+
m1.checkin();
255+
m2.checkin();
256+
Member<Integer> m3 = pool.member().blockingGet();
257+
assertTrue(m2 == m3);
258+
}
259+
}
260+
261+
@Test
262+
public void testMaxIdleTimeIsAppliedGivenConcurrentWorkThenMultipleSingleThreadedWorkBeforeMaxIdleTime() throws InterruptedException {
263+
TestScheduler s = new TestScheduler();
264+
AtomicInteger count = new AtomicInteger();
265+
AtomicInteger disposed = new AtomicInteger();
266+
Pool<Integer> pool = NonBlockingPool //
267+
.factory(() -> count.incrementAndGet()) //
268+
.healthCheck(n -> true) //
269+
.maxSize(4) //
270+
.maxIdleTime(2, TimeUnit.MINUTES) //
271+
.disposer(n -> disposed.incrementAndGet()) //
272+
.scheduler(s) //
273+
.build();
274+
// checkout two members concurrently
275+
AtomicReference<Member<Integer>> a = new AtomicReference<>();
276+
AtomicReference<Member<Integer>> b = new AtomicReference<>();
277+
pool.member().doOnSuccess(a::set).subscribe();
278+
pool.member().doOnSuccess(b::set).subscribe();
279+
s.triggerActions();
280+
assertNotNull(a.get());
281+
assertFalse(a.get() == b.get());
282+
283+
// check the two in again
284+
a.get().checkin();
285+
b.get().checkin();
286+
s.triggerActions();
287+
288+
// now advance time and do two non-concurrent uses of pool members
289+
// if FIFO queue used then prevents idle timeout. Code should use LIFO
290+
// under the covers
291+
s.advanceTimeBy(1, TimeUnit.MINUTES);
292+
AtomicReference<Member<Integer>> c = new AtomicReference<>();
293+
pool.member().doOnSuccess(c::set).subscribe();
294+
s.triggerActions();
295+
c.get().checkin();
296+
pool.member().doOnSuccess(c::set).subscribe();
297+
s.triggerActions();
298+
c.get().checkin();
299+
300+
// advance to timeout and ensure 1 member times out
301+
s.advanceTimeBy(1, TimeUnit.MINUTES);
302+
s.triggerActions();
303+
assertEquals(1, disposed.get());
304+
}
305+
242306
@Test
243307
public void testHealthCheckWhenFails() throws Exception {
244308
TestScheduler s = new TestScheduler();

0 commit comments

Comments
 (0)