diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java index f76783eacd..17022156d0 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java @@ -315,6 +315,10 @@ private NativeConnectionWrapper connect( .useWebSockets(true) .webSocketPath(connectionSettings.webSocketPath()); } + connectionOptions + .transportOptions() + .readBytesConsumer(this.environment().readBytesConsumer()) + .writtenBytesConsumer(this.environment().writtenBytesConsumer()); StopWatch stopWatch = new StopWatch(); try { LOGGER.trace("Connecting '{}' to {}...", this.name(), address); diff --git a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpEnvironment.java b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpEnvironment.java index 7d490c1daf..ef9c95a123 100644 --- a/src/main/java/com/rabbitmq/client/amqp/impl/AmqpEnvironment.java +++ b/src/main/java/com/rabbitmq/client/amqp/impl/AmqpEnvironment.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.IntConsumer; import org.apache.qpid.protonj2.client.Client; import org.apache.qpid.protonj2.client.ClientOptions; import org.slf4j.Logger; @@ -65,6 +66,7 @@ class AmqpEnvironment implements Environment { private final ExecutorService recoveryEventLoopExecutorService; private final CredentialsManagerFactory credentialsManagerFactory = new CredentialsManagerFactory(this); + private final IntConsumer readBytesConsumer, writtenBytesConsumer; AmqpEnvironment( ExecutorService executorService, @@ -106,6 +108,8 @@ class AmqpEnvironment implements Environment { } this.metricsCollector = metricsCollector == null ? NoOpMetricsCollector.INSTANCE : metricsCollector; + this.readBytesConsumer = this.metricsCollector::readBytes; + this.writtenBytesConsumer = this.metricsCollector::writtenBytes; this.observationCollector = observationCollector == null ? Utils.NO_OP_OBSERVATION_COLLECTOR : observationCollector; this.recoveryEventLoopExecutorService = @@ -190,6 +194,14 @@ MetricsCollector metricsCollector() { return this.metricsCollector; } + IntConsumer readBytesConsumer() { + return this.readBytesConsumer; + } + + IntConsumer writtenBytesConsumer() { + return this.writtenBytesConsumer; + } + ObservationCollector observationCollector() { return this.observationCollector; } diff --git a/src/main/java/com/rabbitmq/client/amqp/metrics/MetricsCollector.java b/src/main/java/com/rabbitmq/client/amqp/metrics/MetricsCollector.java index 36c39f2953..e715a3c623 100644 --- a/src/main/java/com/rabbitmq/client/amqp/metrics/MetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/amqp/metrics/MetricsCollector.java @@ -62,6 +62,20 @@ public interface MetricsCollector { */ void consumeDisposition(ConsumeDisposition disposition); + /** + * Called when a connection writes bytes to its socket. + * + * @param writtenBytes the number of written bytes + */ + void writtenBytes(int writtenBytes); + + /** + * Called when a connection reads bytes from its socket. + * + * @param readBytes the number of read bytes + */ + void readBytes(int readBytes); + /** The broker-to-client dispositions. */ enum PublishDisposition { /** see {@link com.rabbitmq.client.amqp.Publisher.Status#ACCEPTED} */ diff --git a/src/main/java/com/rabbitmq/client/amqp/metrics/MicrometerMetricsCollector.java b/src/main/java/com/rabbitmq/client/amqp/metrics/MicrometerMetricsCollector.java index 4511790ac1..53b4728224 100644 --- a/src/main/java/com/rabbitmq/client/amqp/metrics/MicrometerMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/amqp/metrics/MicrometerMetricsCollector.java @@ -34,6 +34,8 @@ public class MicrometerMetricsCollector implements MetricsCollector { private final AtomicLong consumers; private final Counter publish, publishAccepted, publishRejected, publishReleased; private final Counter consume, consumeAccepted, consumeRequeued, consumeDiscarded; + private final Counter writtenBytes; + private final Counter readBytes; public MicrometerMetricsCollector(MeterRegistry registry) { this(registry, "rabbitmq.amqp"); @@ -61,6 +63,8 @@ public MicrometerMetricsCollector( this.consumeAccepted = registry.counter(prefix + ".consumed_accepted", tags); this.consumeRequeued = registry.counter(prefix + ".consumed_requeued", tags); this.consumeDiscarded = registry.counter(prefix + ".consumed_discarded", tags); + this.writtenBytes = registry.counter(prefix + ".written_bytes", tags); + this.readBytes = registry.counter(prefix + ".read_bytes", tags); } @Override @@ -136,4 +140,14 @@ public void consumeDisposition(ConsumeDisposition disposition) { break; } } + + @Override + public void writtenBytes(int writtenBytes) { + this.writtenBytes.increment(writtenBytes); + } + + @Override + public void readBytes(int readBytes) { + this.readBytes.increment(readBytes); + } } diff --git a/src/main/java/com/rabbitmq/client/amqp/metrics/NoOpMetricsCollector.java b/src/main/java/com/rabbitmq/client/amqp/metrics/NoOpMetricsCollector.java index aab6d4e03b..a5d2c85ad7 100644 --- a/src/main/java/com/rabbitmq/client/amqp/metrics/NoOpMetricsCollector.java +++ b/src/main/java/com/rabbitmq/client/amqp/metrics/NoOpMetricsCollector.java @@ -53,4 +53,10 @@ public void consume() {} @Override public void consumeDisposition(ConsumeDisposition disposition) {} + + @Override + public void writtenBytes(int writtenBytes) {} + + @Override + public void readBytes(int readBytes) {} } diff --git a/src/main/qpid/org/apache/qpid/protonj2/client/TransportOptions.java b/src/main/qpid/org/apache/qpid/protonj2/client/TransportOptions.java index 47e4f3c7c3..c6dc219e9f 100644 --- a/src/main/qpid/org/apache/qpid/protonj2/client/TransportOptions.java +++ b/src/main/qpid/org/apache/qpid/protonj2/client/TransportOptions.java @@ -21,6 +21,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.function.IntConsumer; /** * Encapsulates all the Transport options in one configuration object. @@ -67,7 +68,11 @@ public class TransportOptions implements Cloneable { private final Map webSocketHeaders = new HashMap<>(); - @Override + private IntConsumer readBytesConsumer = i -> { }; + private IntConsumer writtenBytesConsumer = i -> { }; + + + @Override public TransportOptions clone() { return copyInto(new TransportOptions()); } @@ -481,6 +486,24 @@ public TransportOptions webSocketCompression(boolean enabled) { return this; } + public IntConsumer readBytesConsumer() { + return readBytesConsumer; + } + + public TransportOptions readBytesConsumer(IntConsumer readBytesConsumer) { + this.readBytesConsumer = readBytesConsumer; + return this; + } + + public IntConsumer writtenBytesConsumer() { + return writtenBytesConsumer; + } + + public TransportOptions writtenBytesConsumer(IntConsumer writtenBytesConsumer) { + this.writtenBytesConsumer = writtenBytesConsumer; + return this; + } + /** * Copy all configuration into the given {@link TransportOptions} from this instance. * @@ -509,6 +532,8 @@ public TransportOptions copyInto(TransportOptions other) { other.webSocketHeaders().putAll(webSocketHeaders); other.webSocketMaxFrameSize(webSocketMaxFrameSize()); other.webSocketCompression(webSocketCompression()); + other.readBytesConsumer(readBytesConsumer()); + other.writtenBytesConsumer(writtenBytesConsumer()); return other; } diff --git a/src/main/qpid/org/apache/qpid/protonj2/client/transport/netty4/TcpTransport.java b/src/main/qpid/org/apache/qpid/protonj2/client/transport/netty4/TcpTransport.java index 795c0afb11..f24f5cd299 100644 --- a/src/main/qpid/org/apache/qpid/protonj2/client/transport/netty4/TcpTransport.java +++ b/src/main/qpid/org/apache/qpid/protonj2/client/transport/netty4/TcpTransport.java @@ -22,6 +22,7 @@ import java.security.Principal; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.IntConsumer; import io.netty.buffer.PooledByteBufAllocator; import org.apache.qpid.protonj2.buffer.ProtonBuffer; @@ -70,6 +71,8 @@ public class TcpTransport implements Transport { protected final TransportOptions options; protected final SslOptions sslOptions; protected final Bootstrap bootstrap; + private final IntConsumer readBytesConsumer; + private final IntConsumer writtenBytesConsumer; protected Channel channel; protected volatile IOException failureCause; @@ -104,6 +107,8 @@ public TcpTransport(Bootstrap bootstrap, TransportOptions options, SslOptions ss this.sslOptions = sslOptions; this.options = options; this.bootstrap = bootstrap; + this.readBytesConsumer = options.readBytesConsumer(); + this.writtenBytesConsumer = options.writtenBytesConsumer(); } @Override @@ -344,6 +349,8 @@ private TcpTransport writeOutputBuffer(final ProtonBuffer buffer, boolean flush, } } + this.writtenBytesConsumer.accept(nettyBuf.writableBytes()); + if (--writeCount > 0) { channel.write(nettyBuf, channel.voidPromise()); } else { @@ -530,6 +537,7 @@ protected void dispatchReadBuffer(ByteBuf buffer) throws Exception { final ProtonBuffer wrapped = nettyAllocator.wrap(buffer).convertToReadOnly(); try (wrapped) { + readBytesConsumer.accept(wrapped.getReadableBytes()); listener.transportRead(wrapped); } } diff --git a/src/test/java/com/rabbitmq/client/amqp/impl/MetricsCollectorTest.java b/src/test/java/com/rabbitmq/client/amqp/impl/MetricsCollectorTest.java index 8e600673eb..1d127d4e09 100644 --- a/src/test/java/com/rabbitmq/client/amqp/impl/MetricsCollectorTest.java +++ b/src/test/java/com/rabbitmq/client/amqp/impl/MetricsCollectorTest.java @@ -17,11 +17,15 @@ // info@rabbitmq.com. package com.rabbitmq.client.amqp.impl; +import static com.rabbitmq.client.amqp.impl.Assertions.assertThat; +import static com.rabbitmq.client.amqp.impl.TestUtils.waitAtMost; import static com.rabbitmq.client.amqp.metrics.MetricsCollector.ConsumeDisposition.DISCARDED; import static com.rabbitmq.client.amqp.metrics.MetricsCollector.ConsumeDisposition.REQUEUED; import static com.rabbitmq.client.amqp.metrics.MetricsCollector.PublishDisposition.RELEASED; import static java.lang.String.format; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -55,6 +59,8 @@ void metricsShouldBeCollected() { try (Environment environment = TestUtils.environmentBuilder().metricsCollector(metricsCollector).build()) { verify(metricsCollector, never()).openConnection(); + verify(metricsCollector, never()).writtenBytes(anyInt()); + verify(metricsCollector, never()).readBytes(anyInt()); String c1Name = UUID.randomUUID().toString(); CountDownLatch recoveredLatch = new CountDownLatch(1); Connection c1 = @@ -66,9 +72,11 @@ void metricsShouldBeCollected() { .connectionBuilder() .build(); verify(metricsCollector, times(1)).openConnection(); + verify(metricsCollector, atLeastOnce()).writtenBytes(anyInt()); + verify(metricsCollector, atLeastOnce()).readBytes(anyInt()); Cli.closeConnection(c1Name); - Assertions.assertThat(recoveredLatch).completes(); + assertThat(recoveredLatch).completes(); // a recovered connection is not closed verify(metricsCollector, never()).closeConnection(); @@ -98,7 +106,7 @@ void metricsShouldBeCollected() { .when(metricsCollector) .closeConnection(); Cli.closeConnection(c2Name); - Assertions.assertThat(c2ClosedLatch).completes(); + assertThat(c2ClosedLatch).completes(); // the connection is closed because automatic recovery was not activated verify(metricsCollector, times(1)).closeConnection(); @@ -112,7 +120,7 @@ void metricsShouldBeCollected() { CountDownLatch disposed = new CountDownLatch(1); publisher.publish(publisher.message().toAddress().queue(q).message(), disposed(disposed)); verify(metricsCollector, times(1)).publish(); - Assertions.assertThat(disposed).completes(); + assertThat(disposed).completes(); verify(metricsCollector, times(1)) .publishDisposition(MetricsCollector.PublishDisposition.ACCEPTED); @@ -122,7 +130,7 @@ void metricsShouldBeCollected() { publisher.message().toAddress().queue(UUID.randomUUID().toString()).message(), disposed(disposed)); verify(metricsCollector, times(2)).publish(); - Assertions.assertThat(disposed).completes(); + assertThat(disposed).completes(); // the last message could not be routed, so its disposition state is RELEASED verify(metricsCollector, times(1)).publishDisposition(RELEASED); verify(metricsCollector, times(2)).publishDisposition(any()); @@ -160,7 +168,7 @@ void metricsShouldBeCollected() { consumedCount.incrementAndGet(); }) .build(); - TestUtils.waitAtMost( + waitAtMost( () -> consumedCount.get() == 1, () -> format("Expected 1 message, but got %d.", consumedCount.get())); // the first message is accepted @@ -172,7 +180,7 @@ void metricsShouldBeCollected() { publisher.publish(publisher.message().toAddress().queue(q).message(), ctx -> {}); // the message is requeued, so it comes back, and it's then discarded - TestUtils.waitAtMost(() -> consumedCount.get() == 1 + 2); + waitAtMost(() -> consumedCount.get() == 1 + 2); verify(metricsCollector, times(1 + 2)).consume(); verify(metricsCollector, times(1)).consumeDisposition(REQUEUED); verify(metricsCollector, times(1)).consumeDisposition(DISCARDED); diff --git a/src/test/java/com/rabbitmq/client/amqp/metrics/MicrometerMetricsCollectorTest.java b/src/test/java/com/rabbitmq/client/amqp/metrics/MicrometerMetricsCollectorTest.java index eb91c48142..866b4ab5e5 100644 --- a/src/test/java/com/rabbitmq/client/amqp/metrics/MicrometerMetricsCollectorTest.java +++ b/src/test/java/com/rabbitmq/client/amqp/metrics/MicrometerMetricsCollectorTest.java @@ -20,7 +20,9 @@ import static com.rabbitmq.client.amqp.impl.TestUtils.waitAtMost; import static org.assertj.core.api.Assertions.assertThat; -import com.rabbitmq.client.amqp.impl.TestUtils; +import com.rabbitmq.client.amqp.impl.TestUtils.DisabledOnJavaSemeru; +import com.rabbitmq.client.amqp.metrics.MetricsCollector.ConsumeDisposition; +import com.rabbitmq.client.amqp.metrics.MetricsCollector.PublishDisposition; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; import io.micrometer.prometheusmetrics.PrometheusConfig; import io.micrometer.prometheusmetrics.PrometheusMeterRegistry; @@ -65,15 +67,15 @@ void simple() { assertThat(registry.get("rabbitmq.amqp.published").counter().count()).isEqualTo(2.0); assertThat(registry.get("rabbitmq.amqp.published_accepted").counter().count()).isZero(); - collector.publishDisposition(MetricsCollector.PublishDisposition.ACCEPTED); + collector.publishDisposition(PublishDisposition.ACCEPTED); assertThat(registry.get("rabbitmq.amqp.published_accepted").counter().count()).isEqualTo(1.0); assertThat(registry.get("rabbitmq.amqp.published_rejected").counter().count()).isZero(); - collector.publishDisposition(MetricsCollector.PublishDisposition.REJECTED); + collector.publishDisposition(PublishDisposition.REJECTED); assertThat(registry.get("rabbitmq.amqp.published_rejected").counter().count()).isEqualTo(1.0); assertThat(registry.get("rabbitmq.amqp.published_released").counter().count()).isZero(); - collector.publishDisposition(MetricsCollector.PublishDisposition.RELEASED); + collector.publishDisposition(PublishDisposition.RELEASED); assertThat(registry.get("rabbitmq.amqp.published_released").counter().count()).isEqualTo(1.0); assertThat(registry.get("rabbitmq.amqp.consumed").counter().count()).isZero(); @@ -84,19 +86,31 @@ void simple() { assertThat(registry.get("rabbitmq.amqp.consumed").counter().count()).isEqualTo(3.0); assertThat(registry.get("rabbitmq.amqp.consumed_accepted").counter().count()).isZero(); - collector.consumeDisposition(MetricsCollector.ConsumeDisposition.ACCEPTED); + collector.consumeDisposition(ConsumeDisposition.ACCEPTED); assertThat(registry.get("rabbitmq.amqp.consumed_accepted").counter().count()).isEqualTo(1.0); assertThat(registry.get("rabbitmq.amqp.consumed_requeued").counter().count()).isZero(); - collector.consumeDisposition(MetricsCollector.ConsumeDisposition.REQUEUED); + collector.consumeDisposition(ConsumeDisposition.REQUEUED); assertThat(registry.get("rabbitmq.amqp.consumed_requeued").counter().count()).isEqualTo(1.0); assertThat(registry.get("rabbitmq.amqp.consumed_discarded").counter().count()).isZero(); - collector.consumeDisposition(MetricsCollector.ConsumeDisposition.DISCARDED); + collector.consumeDisposition(ConsumeDisposition.DISCARDED); assertThat(registry.get("rabbitmq.amqp.consumed_discarded").counter().count()).isEqualTo(1.0); + + assertThat(registry.get("rabbitmq.amqp.written_bytes").counter().count()).isZero(); + collector.writtenBytes(12); + assertThat(registry.get("rabbitmq.amqp.written_bytes").counter().count()).isEqualTo(12); + collector.writtenBytes(20); + assertThat(registry.get("rabbitmq.amqp.written_bytes").counter().count()).isEqualTo(32.0); + + assertThat(registry.get("rabbitmq.amqp.read_bytes").counter().count()).isZero(); + collector.readBytes(42); + assertThat(registry.get("rabbitmq.amqp.read_bytes").counter().count()).isEqualTo(42.0); + collector.readBytes(10); + assertThat(registry.get("rabbitmq.amqp.read_bytes").counter().count()).isEqualTo(52.0); } - @TestUtils.DisabledOnJavaSemeru + @DisabledOnJavaSemeru @Test void prometheus() { PrometheusMeterRegistry registry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT); @@ -117,17 +131,20 @@ void prometheus() { collector.publish(); collector.publish(); - collector.publishDisposition(MetricsCollector.PublishDisposition.ACCEPTED); - collector.publishDisposition(MetricsCollector.PublishDisposition.REJECTED); - collector.publishDisposition(MetricsCollector.PublishDisposition.RELEASED); + collector.publishDisposition(PublishDisposition.ACCEPTED); + collector.publishDisposition(PublishDisposition.REJECTED); + collector.publishDisposition(PublishDisposition.RELEASED); collector.consume(); collector.consume(); collector.consume(); - collector.consumeDisposition(MetricsCollector.ConsumeDisposition.ACCEPTED); - collector.consumeDisposition(MetricsCollector.ConsumeDisposition.REQUEUED); - collector.consumeDisposition(MetricsCollector.ConsumeDisposition.DISCARDED); + collector.consumeDisposition(ConsumeDisposition.ACCEPTED); + collector.consumeDisposition(ConsumeDisposition.REQUEUED); + collector.consumeDisposition(ConsumeDisposition.DISCARDED); + + collector.writtenBytes(42); + collector.readBytes(12); Stream.of( "# TYPE rabbitmq_amqp_connections gauge", @@ -151,7 +168,13 @@ void prometheus() { "# TYPE rabbitmq_amqp_consumed_discarded_total counter", "rabbitmq_amqp_consumed_discarded_total 1.0", "# TYPE rabbitmq_amqp_consumed_requeued_total counter", - "rabbitmq_amqp_consumed_requeued_total 1.0") + "rabbitmq_amqp_consumed_requeued_total 1.0", + "# HELP rabbitmq_amqp_read_bytes_total", + "# TYPE rabbitmq_amqp_read_bytes_total counter", + "rabbitmq_amqp_read_bytes_total 12.0", + "# HELP rabbitmq_amqp_written_bytes_total", + "# TYPE rabbitmq_amqp_written_bytes_total counter", + "rabbitmq_amqp_written_bytes_total 42.0") .forEach(expected -> waitAtMost(() -> registry.scrape().contains(expected))); } }