Skip to content

Commit 348df86

Browse files
committed
Manual handling of writeQueueFull in BufferingContextAwareWriteStream
1 parent 0296513 commit 348df86

File tree

2 files changed

+129
-2
lines changed

2 files changed

+129
-2
lines changed

query-engine/src/main/java/uk/co/spudsoft/query/web/BufferingContextAwareWriteStream.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@ private Future<Void> flush() {
8181
context.runOnContext(v -> {
8282
delegate.write(toWrite)
8383
.onComplete(ar -> {
84-
pendingFlushes.decrementAndGet();
8584
thisContext.runOnContext(v2 -> {
8685
int remaining = pendingFlushes.decrementAndGet();
8786

@@ -91,7 +90,7 @@ private Future<Void> flush() {
9190
promise.fail(ar.cause());
9291
}
9392

94-
if (remaining <= 4 && !delegate.writeQueueFull() && drainHandler != null) {
93+
if (remaining <= MAX_PENDING_WRITES / 2 && !delegate.writeQueueFull() && drainHandler != null) {
9594
drainHandler.handle(null);
9695
}
9796
});
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Copyright (C) 2025 njt
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU General Public License as published by
6+
* the Free Software Foundation, either version 3 of the License, or
7+
* (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*/
17+
package uk.co.spudsoft.query.web;
18+
19+
import io.vertx.core.Context;
20+
import io.vertx.core.Handler;
21+
import io.vertx.core.Vertx;
22+
import io.vertx.core.buffer.Buffer;
23+
import io.vertx.core.streams.WriteStream;
24+
import io.vertx.junit5.VertxExtension;
25+
import io.vertx.junit5.VertxTestContext;
26+
import org.junit.jupiter.api.Test;
27+
import org.junit.jupiter.api.extension.ExtendWith;
28+
import org.mockito.ArgumentCaptor;
29+
30+
import java.lang.reflect.Field;
31+
import java.util.concurrent.atomic.AtomicInteger;
32+
33+
import static org.junit.jupiter.api.Assertions.*;
34+
import static org.mockito.Mockito.*;
35+
36+
@ExtendWith(VertxExtension.class)
37+
public class BufferingContextAwareWriteStreamTest {
38+
39+
@Test
40+
@SuppressWarnings("unchecked")
41+
void testWriteQueueFullAndDrain(Vertx vertx, VertxTestContext testContext) throws Exception {
42+
WriteStream<Buffer> mockDelegate = mock(WriteStream.class);
43+
Context producerContext = vertx.getOrCreateContext();
44+
Context consumerContext = vertx.getOrCreateContext();
45+
46+
// Use a small threshold for testing
47+
BufferingContextAwareWriteStream stream = new BufferingContextAwareWriteStream(mockDelegate, consumerContext, 100);
48+
49+
// Get access to internal counter to simulate backlog
50+
Field pendingField = BufferingContextAwareWriteStream.class.getDeclaredField("pendingFlushes");
51+
pendingField.setAccessible(true);
52+
AtomicInteger pendingFlushes = (AtomicInteger) pendingField.get(stream);
53+
54+
// 1. Test writeQueueFull based on delegate
55+
when(mockDelegate.writeQueueFull()).thenReturn(true);
56+
assertTrue(stream.writeQueueFull(), "Should be full if delegate is full");
57+
58+
// 2. Test writeQueueFull based on pending flushes
59+
when(mockDelegate.writeQueueFull()).thenReturn(false);
60+
pendingFlushes.set(20); // Greater than MAX_PENDING_WRITES (16)
61+
assertTrue(stream.writeQueueFull(), "Should be full if too many flushes pending");
62+
63+
// 3. Test Drain Handler registration and execution
64+
producerContext.runOnContext(v -> {
65+
stream.drainHandler(v2 -> {
66+
// This is what we are waiting for
67+
testContext.verify(() -> {
68+
assertFalse(stream.writeQueueFull(), "Drain should only fire when queue is not full");
69+
});
70+
testContext.completeNow();
71+
});
72+
73+
// Capture the handler passed to the delegate
74+
consumerContext.runOnContext(v3 -> {
75+
ArgumentCaptor<Handler<Void>> handlerCaptor = ArgumentCaptor.forClass(Handler.class);
76+
verify(mockDelegate, timeout(1000)).drainHandler(handlerCaptor.capture());
77+
Handler<Void> capturedHandler = handlerCaptor.getValue();
78+
79+
// Simulate queue becoming free
80+
pendingFlushes.set(0);
81+
when(mockDelegate.writeQueueFull()).thenReturn(false);
82+
83+
// Trigger the delegate's drain signal
84+
capturedHandler.handle(null);
85+
});
86+
});
87+
}
88+
89+
@Test
90+
@SuppressWarnings("unchecked")
91+
void testDrainFiredOnFlushCompletion(Vertx vertx, VertxTestContext testContext) throws Exception {
92+
WriteStream<Buffer> mockDelegate = mock(WriteStream.class);
93+
Context consumerContext = vertx.getOrCreateContext();
94+
95+
// Setup mock to return a future we can control
96+
io.vertx.core.Promise<Void> writePromise = io.vertx.core.Promise.promise();
97+
when(mockDelegate.write(any())).thenReturn(writePromise.future());
98+
when(mockDelegate.writeQueueFull()).thenReturn(false);
99+
100+
BufferingContextAwareWriteStream stream = new BufferingContextAwareWriteStream(mockDelegate, consumerContext, 10);
101+
102+
stream.drainHandler(v -> {
103+
testContext.completeNow();
104+
});
105+
106+
// Manually pump up the pending flushes to trigger "full"
107+
Field pendingField = BufferingContextAwareWriteStream.class.getDeclaredField("pendingFlushes");
108+
pendingField.setAccessible(true);
109+
AtomicInteger pendingFlushes = (AtomicInteger) pendingField.get(stream);
110+
pendingFlushes.set(17); // > 16
111+
112+
assertTrue(stream.writeQueueFull());
113+
114+
pendingFlushes.set(8); // 16 / 2
115+
116+
consumerContext.runOnContext(v -> {
117+
// Trigger a write that will complete and decrement the counter
118+
stream.write(Buffer.buffer("0123456789012345")); // Triggers flush()
119+
});
120+
121+
// Complete the delegate write
122+
consumerContext.runOnContext(v -> {
123+
writePromise.complete();
124+
});
125+
126+
// If the drainHandler isn't called within 2s, the test will timeout (fail)
127+
}
128+
}

0 commit comments

Comments
 (0)