Skip to content

Commit 0296513

Browse files
committed
Limit pending writes from BufferingContextAwareWriteStream
With drain handling
1 parent 6e5e8ce commit 0296513

File tree

2 files changed

+33
-7
lines changed

2 files changed

+33
-7
lines changed

query-engine/src/main/java/uk/co/spudsoft/query/web/BufferingContextAwareWriteStream.java

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,21 +24,26 @@
2424
import io.vertx.core.Vertx;
2525
import io.vertx.core.buffer.Buffer;
2626
import io.vertx.core.streams.WriteStream;
27+
import java.util.concurrent.atomic.AtomicInteger;
2728

2829
/**
2930
* An implementation of {@link WriteStream} that bridges Vert.x {@link Context}s.
3031
* <p>
31-
* Every request that can write to the delegate WriteStream is run using runOnContext, and if there is a return value
32+
* Every request that can write to the delegate WriteStream is run using runOnContext, and if there is a return value
3233
* it is returned by another runOnContext.
33-
*
34+
*
3435
* @author jtalbut
3536
*/
3637
public class BufferingContextAwareWriteStream implements WriteStream<Buffer> {
37-
38+
39+
private static final int MAX_PENDING_WRITES = 16;
40+
3841
private final WriteStream<Buffer> delegate;
3942
private final Context context;
4043
private final int flushThreshold;
4144
private Buffer buffer = Buffer.buffer();
45+
private final AtomicInteger pendingFlushes = new AtomicInteger();
46+
private Handler<Void> drainHandler;
4247

4348
/**
4449
* Constructor.
@@ -52,7 +57,7 @@ public BufferingContextAwareWriteStream(WriteStream<Buffer> delegate, Context co
5257
this.context = context;
5358
this.flushThreshold = flushThreshold;
5459
}
55-
60+
5661
@Override
5762
public Future<Void> write(Buffer data) {
5863
buffer.appendBuffer(data);
@@ -68,17 +73,27 @@ private Future<Void> flush() {
6873
}
6974
Buffer toWrite = buffer;
7075
buffer = Buffer.buffer();
76+
77+
pendingFlushes.incrementAndGet();
78+
7179
Promise<Void> promise = Promise.promise();
7280
Context thisContext = Vertx.currentContext();
7381
context.runOnContext(v -> {
7482
delegate.write(toWrite)
7583
.onComplete(ar -> {
84+
pendingFlushes.decrementAndGet();
7685
thisContext.runOnContext(v2 -> {
86+
int remaining = pendingFlushes.decrementAndGet();
87+
7788
if (ar.succeeded()) {
7889
promise.complete(ar.result());
7990
} else {
8091
promise.fail(ar.cause());
8192
}
93+
94+
if (remaining <= 4 && !delegate.writeQueueFull() && drainHandler != null) {
95+
drainHandler.handle(null);
96+
}
8297
});
8398
});
8499
});
@@ -120,12 +135,23 @@ public WriteStream<Buffer> setWriteQueueMaxSize(int maxSize) {
120135
@Override
121136
public boolean writeQueueFull() {
122137
// Safe to call directly; doesn't mutate state
123-
return delegate.writeQueueFull();
138+
return delegate.writeQueueFull() || pendingFlushes.get() > MAX_PENDING_WRITES;
124139
}
125140

126141
@Override
127142
public WriteStream<Buffer> drainHandler(Handler<Void> handler) {
128-
context.runOnContext(v -> delegate.drainHandler(handler));
143+
this.drainHandler = handler;
144+
Context thisContext = Vertx.currentContext();
145+
// Intercept the delegate's drain signal too
146+
context.runOnContext(v -> {
147+
delegate.drainHandler(v2 -> {
148+
thisContext.runOnContext(v3 -> {
149+
if (!writeQueueFull() && drainHandler != null) {
150+
drainHandler.handle(null);
151+
}
152+
});
153+
});
154+
});
129155
return this;
130156
}
131157

query-engine/src/main/java/uk/co/spudsoft/query/web/QueryRouter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ public void handle(RoutingContext routingContext) {
237237
queriesExecuted.increment();
238238
}
239239
if (logger.isTraceEnabled()) {
240-
Log.decorate(logger.atTrace(), requestContext).log("There are now {} currently executing queries: ", queriesExecuting.size(), Json.encode(queriesExecuting));
240+
Log.decorate(logger.atTrace(), requestContext).log("There are now {} currently executing queries: {}", queriesExecuting.size(), Json.encode(queriesExecuting));
241241
} else if (logger.isDebugEnabled()) {
242242
Log.decorate(logger.atDebug(), requestContext).log("There are now {} currently executing queries", queriesExecuting.size());
243243
}

0 commit comments

Comments
 (0)