Skip to content

Commit cd845bf

Browse files
committed
Implement back pressure in Netty frame handler
1 parent d3413d2 commit cd845bf

File tree

1 file changed

+47
-7
lines changed

1 file changed

+47
-7
lines changed

src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,12 @@
4949
import java.net.InetSocketAddress;
5050
import java.net.SocketAddress;
5151
import java.time.Duration;
52+
import java.util.concurrent.CountDownLatch;
5253
import java.util.concurrent.ExecutionException;
5354
import java.util.concurrent.TimeUnit;
5455
import java.util.concurrent.TimeoutException;
5556
import java.util.concurrent.atomic.AtomicBoolean;
57+
import java.util.concurrent.atomic.AtomicReference;
5658
import java.util.function.Consumer;
5759
import java.util.function.Function;
5860
import javax.net.ssl.SSLHandshakeException;
@@ -167,12 +169,11 @@ private NettyFrameHandler(
167169
new ChannelInitializer<SocketChannel>() {
168170
@Override
169171
public void initChannel(SocketChannel ch) {
170-
// ch.pipeline()
171-
// .addFirst(
172-
// HANDLER_FLUSH_CONSOLIDATION,
173-
// new FlushConsolidationHandler(
174-
//
175-
// FlushConsolidationHandler.DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true));
172+
ch.pipeline()
173+
.addFirst(
174+
HANDLER_FLUSH_CONSOLIDATION,
175+
new FlushConsolidationHandler(
176+
FlushConsolidationHandler.DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, true));
176177
ch.pipeline()
177178
.addLast(
178179
HANDLER_FRAME_DECODER,
@@ -271,7 +272,28 @@ public Frame readFrame() {
271272

272273
@Override
273274
public void writeFrame(Frame frame) throws IOException {
274-
// TODO check if the frame is not too big?
275+
if (this.handler.isWritable()) {
276+
this.doWriteFrame(frame);
277+
} else {
278+
if (this.channel.eventLoop().inEventLoop()) {
279+
// we do not wait in the event loop
280+
this.doWriteFrame(frame);
281+
} else {
282+
try {
283+
boolean canWriteNow = this.handler.writableLatch().await(10, SECONDS);
284+
if (canWriteNow) {
285+
this.doWriteFrame(frame);
286+
} else {
287+
throw new IOException("Frame enqueuing failed");
288+
}
289+
} catch (InterruptedException e) {
290+
Thread.currentThread().interrupt();
291+
}
292+
}
293+
}
294+
}
295+
296+
private void doWriteFrame(Frame frame) throws IOException {
275297
ByteBuf bb = this.channel.alloc().buffer(frame.size());
276298
frame.writeToByteBuf(bb);
277299
this.channel.write(bb);
@@ -332,6 +354,9 @@ private static class AmqpHandler extends ChannelInboundHandlerAdapter {
332354
private final int maxPayloadSize;
333355
private final Runnable closeSequence;
334356
private volatile AMQConnection connection;
357+
private final AtomicBoolean writable = new AtomicBoolean(true);
358+
private final AtomicReference<CountDownLatch> writableLatch =
359+
new AtomicReference<>(new CountDownLatch(1));
335360

336361
private AmqpHandler(int maxPayloadSize, Runnable closeSequence) {
337362
this.maxPayloadSize = maxPayloadSize;
@@ -381,6 +406,13 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
381406

382407
@Override
383408
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
409+
boolean canWrite = ctx.channel().isWritable();
410+
if (this.writable.compareAndSet(!canWrite, canWrite)) {
411+
if (canWrite) {
412+
CountDownLatch latch = writableLatch.getAndSet(new CountDownLatch(1));
413+
latch.countDown();
414+
}
415+
}
384416
super.channelWritabilityChanged(ctx);
385417
}
386418

@@ -441,5 +473,13 @@ private boolean needToDispatchIoError() {
441473
AMQConnection c = this.connection;
442474
return c != null && c.isOpen();
443475
}
476+
477+
private boolean isWritable() {
478+
return this.writable.get();
479+
}
480+
481+
private CountDownLatch writableLatch() {
482+
return this.writableLatch.get();
483+
}
444484
}
445485
}

0 commit comments

Comments
 (0)