Skip to content

Commit 83ebf9e

Browse files
committed
Log Netty channel writability history
1 parent 3eeab50 commit 83ebf9e

File tree

3 files changed

+49
-9
lines changed

3 files changed

+49
-9
lines changed

.github/workflows/test.yml

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,16 @@ jobs:
4646
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
4747
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
4848
-Dmaven.javadoc.skip=true \
49-
--no-transfer-progress
50-
- name: Test with blocking IO
51-
run: |
52-
./mvnw verify -Dio.layer=socket -Drabbitmqctl.bin=DOCKER:rabbitmq0 \
53-
-Dtest-broker.A.nodename=rabbit@node0 -Dtest-broker.B.nodename=rabbit@node1 \
54-
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
55-
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
56-
-Dmaven.javadoc.skip=true \
57-
--no-transfer-progress
49+
--no-transfer-progress \
50+
-Dit.test=Confirm#persistentMandatoryCombinations,RequeueOnConnectionClose#requeueInFlightConsumerNoAck
51+
# - name: Test with blocking IO
52+
# run: |
53+
# ./mvnw verify -Dio.layer=socket -Drabbitmqctl.bin=DOCKER:rabbitmq0 \
54+
# -Dtest-broker.A.nodename=rabbit@node0 -Dtest-broker.B.nodename=rabbit@node1 \
55+
# -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
56+
# -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
57+
# -Dmaven.javadoc.skip=true \
58+
# --no-transfer-progress
5859
- name: Stop cluster
5960
run: docker compose --file ci/cluster/docker-compose.yml down
6061
- name: Publish snapshot

src/main/java/com/rabbitmq/client/impl/NettyFrameHandlerFactory.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151
import java.net.InetSocketAddress;
5252
import java.net.SocketAddress;
5353
import java.time.Duration;
54+
import java.util.Queue;
55+
import java.util.concurrent.ConcurrentLinkedQueue;
5456
import java.util.concurrent.CountDownLatch;
5557
import java.util.concurrent.ExecutionException;
5658
import java.util.concurrent.TimeUnit;
@@ -322,6 +324,7 @@ public void writeFrame(Frame frame) throws IOException {
322324
if (canWriteNow) {
323325
this.doWriteFrame(frame);
324326
} else {
327+
this.handler.logEvents();
325328
throw new IOException("Frame enqueuing failed");
326329
}
327330
} catch (InterruptedException e) {
@@ -345,6 +348,7 @@ public void flush() {
345348
@Override
346349
public void close() {
347350
if (this.closed.compareAndSet(false, true)) {
351+
this.handler.logEvents();
348352
Runnable closing = () -> closeNettyState(this.channel, this.eventLoopGroup);
349353
if (this.channel.eventLoop().inEventLoop()) {
350354
this.channel.eventLoop().submit(closing);
@@ -438,9 +442,43 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
438442
}
439443
}
440444

445+
private static class Event {
446+
447+
private final long time;
448+
private final String label;
449+
450+
public Event(long time, String label) {
451+
this.time = time;
452+
this.label = label;
453+
}
454+
455+
@Override
456+
public String toString() {
457+
return this.label + " " + this.time;
458+
}
459+
}
460+
461+
private static final int MAX_EVENTS = 100;
462+
private final Queue<Event> events = new ConcurrentLinkedQueue<>();
463+
464+
private void logEvents() {
465+
if (this.events.size() > 0) {
466+
long start = this.events.peek().time;
467+
LOGGER.info("channel writability history:");
468+
events.forEach(e -> LOGGER.info("{}: {}", (e.time - start) / 1_000_000, e.label));
469+
}
470+
}
471+
441472
@Override
442473
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
443474
boolean canWrite = ctx.channel().isWritable();
475+
Event event = new Event(System.nanoTime(), String.valueOf(canWrite));
476+
if (this.events.size() >= MAX_EVENTS) {
477+
this.events.poll();
478+
this.events.offer(event);
479+
}
480+
this.events.add(event);
481+
444482
if (this.writable.compareAndSet(!canWrite, canWrite)) {
445483
if (canWrite) {
446484
CountDownLatch latch = writableLatch.getAndSet(new CountDownLatch(1));

src/test/resources/logback-test.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
<logger name="com.rabbitmq.client.AmqpClientTestExtension" level="debug" />
99
<logger name="com.rabbitmq.client.test.server.TopicPermissions" level="debug" />
1010
<logger name="com.rabbitmq.client.impl.AbstractMetricsCollector" level="debug" />
11+
<logger name="com.rabbitmq.client.impl.NettyFrameHandlerFactory" level="info" />
1112

1213
<root level="warn">
1314
<appender-ref ref="STDOUT" />

0 commit comments

Comments
 (0)