Skip to content

Commit 898c19d

Browse files
Unlink dead nodes in MpscLinkedQueue
Similar to https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/MpscLinkedQueue.java#L120, null out the next pointer in the discarded consumer node when polling from the queue. If not, we leave behind a (potentially long) chain of connected garbage nodes. If we're unlucky (for example one of the early nodes is promoted to old generation, triggering nepotism), this can cause GC issues as now we have a long linked list which must be marked by young collections. Reproducer: ``` import io.reactivex.rxjava3.internal.queue.MpscLinkedQueue; public class MpscLinkedQueueGC { public static void main(String[] args) { MpscLinkedQueue<Integer> queue = new MpscLinkedQueue<>(); for (int i = 0; i < 10; i++) System.gc(); // tenure consumer node while (true) { queue.offer(123); queue.poll(); } } } ``` ``` Before fix: $ java -Xlog:gc -Xmx1G -cp build/classes/java/main MpscLinkedQueueGC.java ... [1.261s] GC(20) Pause Young (Normal) (G1 Preventive Collection) 115M->115M(204M) 209.335ms [1.385s] GC(23) Pause Young (Normal) (G1 Evacuation Pause) 148M->149M(204M) 31.491ms [1.417s] GC(24) Pause Young (Normal) (G1 Evacuation Pause) 157M->158M(204M) 19.333ms [1.453s] GC(25) Pause Young (Normal) (G1 Evacuation Pause) 166M->167M(599M) 22.678ms [1.966s] GC(26) Pause Young (Normal) (G1 Evacuation Pause) 249M->249M(497M) 305.238ms ... After fix: $ java -Xlog:gc -Xmx1G -cp build/classes/java/main MpscLinkedQueueGC.java ... [1.169s] GC(14) Pause Young (Normal) (G1 Evacuation Pause) 304M->2M(506M) 0.755ms [1.558s] GC(15) Pause Young (Normal) (G1 Evacuation Pause) 304M->2M(506M) 0.689ms [1.948s] GC(16) Pause Young (Normal) (G1 Evacuation Pause) 304M->2M(506M) 0.800ms [2.337s] GC(17) Pause Young (Normal) (G1 Evacuation Pause) 304M->2M(506M) 0.714ms ... ```
1 parent d51d3ff commit 898c19d

File tree

1 file changed

+4
-0
lines changed

1 file changed

+4
-0
lines changed

src/main/java/io/reactivex/rxjava3/internal/queue/MpscLinkedQueue.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ public T poll() {
9191
// we have to null out the value because we are going to hang on to the node
9292
final T nextValue = nextNode.getAndNullValue();
9393
spConsumerNode(nextNode);
94+
// unlink previous consumer to help gc
95+
currConsumerNode.soNext(null);
9496
return nextValue;
9597
}
9698
else if (currConsumerNode != lvProducerNode()) {
@@ -101,6 +103,8 @@ else if (currConsumerNode != lvProducerNode()) {
101103
// we have to null out the value because we are going to hang on to the node
102104
final T nextValue = nextNode.getAndNullValue();
103105
spConsumerNode(nextNode);
106+
// unlink previous consumer to help gc
107+
currConsumerNode.soNext(null);
104108
return nextValue;
105109
}
106110
return null;

0 commit comments

Comments
 (0)