Skip to content

Commit 4036a61

Browse files
authored
Merge pull request quarkusio#50376 from franz1981/backpressure
Handle backpressure whilst not on the event loop
2 parents 0565a9c + aa8a2ac commit 4036a61

File tree

1 file changed

+11
-45
lines changed

1 file changed

+11
-45
lines changed

independent-projects/vertx-utils/src/main/java/io/quarkus/vertx/utils/VertxOutputStream.java

Lines changed: 11 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,21 @@
11
package io.quarkus.vertx.utils;
22

3-
import java.io.ByteArrayOutputStream;
43
import java.io.IOException;
54
import java.io.InterruptedIOException;
65
import java.io.OutputStream;
76
import java.util.Optional;
87

8+
import io.vertx.core.Context;
99
import org.jboss.logging.Logger;
1010

1111
import io.netty.buffer.ByteBuf;
1212
import io.netty.handler.codec.http.HttpHeaderNames;
1313
import io.netty.handler.codec.http.HttpResponse;
1414
import io.vertx.core.AsyncResult;
1515
import io.vertx.core.Handler;
16-
import io.vertx.core.Vertx;
1716
import io.vertx.core.buffer.Buffer;
1817
import io.vertx.core.http.HttpServerRequest;
1918
import io.vertx.core.http.HttpServerResponse;
20-
import io.vertx.core.http.impl.HttpServerRequestInternal;
2119

2220
/**
2321
* An {@link OutputStream} forwarding the bytes to Vert.x Web {@link HttpResponse}.
@@ -35,9 +33,7 @@ public class VertxOutputStream extends OutputStream {
3533
private boolean committed;
3634
private boolean closed;
3735
private boolean waitingForDrain;
38-
private boolean first = true;
3936
private Throwable throwable;
40-
private ByteArrayOutputStream overflow;
4137

4238
public VertxOutputStream(VertxJavaIoContext context) {
4339
this.context = context;
@@ -87,29 +83,13 @@ private void write(ByteBuf data, boolean last) throws IOException {
8783
//do all this in the same lock
8884
synchronized (request.connection()) {
8985
try {
90-
boolean bufferRequired = awaitWriteable() || (overflow != null && overflow.size() > 0);
91-
if (bufferRequired) {
92-
//just buffer everything
93-
if (overflow == null) {
94-
overflow = new ByteArrayOutputStream();
86+
awaitWriteable();
87+
if (last) {
88+
if (!response.ended()) { // can happen when an exception occurs during JSON serialization with Jackson
89+
response.end(createBuffer(data), null);
9590
}
96-
if (data.hasArray()) {
97-
overflow.write(data.array(), data.arrayOffset() + data.readerIndex(), data.readableBytes());
98-
} else {
99-
data.getBytes(data.readerIndex(), overflow, data.readableBytes());
100-
}
101-
if (last) {
102-
closed = true;
103-
}
104-
data.release();
10591
} else {
106-
if (last) {
107-
if (!response.ended()) { // can happen when an exception occurs during JSON serialization with Jackson
108-
response.end(createBuffer(data), null);
109-
}
110-
} else {
111-
response.write(createBuffer(data), null);
112-
}
92+
response.write(createBuffer(data), null);
11393
}
11494
} catch (Exception e) {
11595
if (data != null && data.refCnt() > 0) {
@@ -120,13 +100,11 @@ private void write(ByteBuf data, boolean last) throws IOException {
120100
}
121101
}
122102

123-
private boolean awaitWriteable() throws IOException {
124-
if (Vertx.currentContext() == ((HttpServerRequestInternal) request).context()) {
125-
return false; // we are on the (right) event loop, so we can write - Netty will do the right thing.
126-
}
127-
if (first) {
128-
first = false;
129-
return false;
103+
private void awaitWriteable() throws IOException {
104+
// is it running in an event loop?
105+
if (Context.isOnEventLoopThread()) {
106+
// NEVER block the event loop!
107+
return;
130108
}
131109
assert Thread.holdsLock(request.connection());
132110
while (response.writeQueueFull()) {
@@ -136,7 +114,6 @@ private boolean awaitWriteable() throws IOException {
136114
if (response.closed()) {
137115
throw new IOException("Connection has been closed");
138116
}
139-
// registerDrainHandler();
140117
try {
141118
waitingForDrain = true;
142119
request.connection().wait();
@@ -146,7 +123,6 @@ private boolean awaitWriteable() throws IOException {
146123
waitingForDrain = false;
147124
}
148125
}
149-
return false;
150126
}
151127

152128
/**
@@ -258,16 +234,6 @@ public void handle(Void event) {
258234
if (out.waitingForDrain) {
259235
out.request.connection().notifyAll();
260236
}
261-
if (out.overflow != null) {
262-
if (out.overflow.size() > 0) {
263-
if (out.closed) {
264-
out.response.end(Buffer.buffer(out.overflow.toByteArray()), null);
265-
} else {
266-
out.response.write(Buffer.buffer(out.overflow.toByteArray()), null);
267-
}
268-
out.overflow.reset();
269-
}
270-
}
271237
}
272238
}
273239
}

0 commit comments

Comments
 (0)