Skip to content

Commit e144f77

Browse files
authored
Merge pull request #280 from rabbitmq/bytes-metrics
Add written and read bytes metrics
2 parents 9ca8df4 + 32b1bef commit e144f77

File tree

9 files changed

+136
-22
lines changed

9 files changed

+136
-22
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,10 @@ private NativeConnectionWrapper connect(
315315
.useWebSockets(true)
316316
.webSocketPath(connectionSettings.webSocketPath());
317317
}
318+
connectionOptions
319+
.transportOptions()
320+
.readBytesConsumer(this.environment().readBytesConsumer())
321+
.writtenBytesConsumer(this.environment().writtenBytesConsumer());
318322
StopWatch stopWatch = new StopWatch();
319323
try {
320324
LOGGER.trace("Connecting '{}' to {}...", this.name(), address);

src/main/java/com/rabbitmq/client/amqp/impl/AmqpEnvironment.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.concurrent.TimeUnit;
3333
import java.util.concurrent.atomic.AtomicBoolean;
3434
import java.util.concurrent.atomic.AtomicLong;
35+
import java.util.function.IntConsumer;
3536
import org.apache.qpid.protonj2.client.Client;
3637
import org.apache.qpid.protonj2.client.ClientOptions;
3738
import org.slf4j.Logger;
@@ -65,6 +66,7 @@ class AmqpEnvironment implements Environment {
6566
private final ExecutorService recoveryEventLoopExecutorService;
6667
private final CredentialsManagerFactory credentialsManagerFactory =
6768
new CredentialsManagerFactory(this);
69+
private final IntConsumer readBytesConsumer, writtenBytesConsumer;
6870

6971
AmqpEnvironment(
7072
ExecutorService executorService,
@@ -106,6 +108,8 @@ class AmqpEnvironment implements Environment {
106108
}
107109
this.metricsCollector =
108110
metricsCollector == null ? NoOpMetricsCollector.INSTANCE : metricsCollector;
111+
this.readBytesConsumer = this.metricsCollector::readBytes;
112+
this.writtenBytesConsumer = this.metricsCollector::writtenBytes;
109113
this.observationCollector =
110114
observationCollector == null ? Utils.NO_OP_OBSERVATION_COLLECTOR : observationCollector;
111115
this.recoveryEventLoopExecutorService =
@@ -190,6 +194,14 @@ MetricsCollector metricsCollector() {
190194
return this.metricsCollector;
191195
}
192196

197+
IntConsumer readBytesConsumer() {
198+
return this.readBytesConsumer;
199+
}
200+
201+
IntConsumer writtenBytesConsumer() {
202+
return this.writtenBytesConsumer;
203+
}
204+
193205
ObservationCollector observationCollector() {
194206
return this.observationCollector;
195207
}

src/main/java/com/rabbitmq/client/amqp/metrics/MetricsCollector.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,20 @@ public interface MetricsCollector {
6262
*/
6363
void consumeDisposition(ConsumeDisposition disposition);
6464

65+
/**
66+
* Called when a connection writes bytes to its socket.
67+
*
68+
* @param writtenBytes the number of written bytes
69+
*/
70+
void writtenBytes(int writtenBytes);
71+
72+
/**
73+
* Called when a connection reads bytes from its socket.
74+
*
75+
* @param readBytes the number of read bytes
76+
*/
77+
void readBytes(int readBytes);
78+
6579
/** The broker-to-client dispositions. */
6680
enum PublishDisposition {
6781
/** see {@link com.rabbitmq.client.amqp.Publisher.Status#ACCEPTED} */

src/main/java/com/rabbitmq/client/amqp/metrics/MicrometerMetricsCollector.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ public class MicrometerMetricsCollector implements MetricsCollector {
3434
private final AtomicLong consumers;
3535
private final Counter publish, publishAccepted, publishRejected, publishReleased;
3636
private final Counter consume, consumeAccepted, consumeRequeued, consumeDiscarded;
37+
private final Counter writtenBytes;
38+
private final Counter readBytes;
3739

3840
public MicrometerMetricsCollector(MeterRegistry registry) {
3941
this(registry, "rabbitmq.amqp");
@@ -61,6 +63,8 @@ public MicrometerMetricsCollector(
6163
this.consumeAccepted = registry.counter(prefix + ".consumed_accepted", tags);
6264
this.consumeRequeued = registry.counter(prefix + ".consumed_requeued", tags);
6365
this.consumeDiscarded = registry.counter(prefix + ".consumed_discarded", tags);
66+
this.writtenBytes = registry.counter(prefix + ".written_bytes", tags);
67+
this.readBytes = registry.counter(prefix + ".read_bytes", tags);
6468
}
6569

6670
@Override
@@ -136,4 +140,14 @@ public void consumeDisposition(ConsumeDisposition disposition) {
136140
break;
137141
}
138142
}
143+
144+
@Override
145+
public void writtenBytes(int writtenBytes) {
146+
this.writtenBytes.increment(writtenBytes);
147+
}
148+
149+
@Override
150+
public void readBytes(int readBytes) {
151+
this.readBytes.increment(readBytes);
152+
}
139153
}

src/main/java/com/rabbitmq/client/amqp/metrics/NoOpMetricsCollector.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,4 +53,10 @@ public void consume() {}
5353

5454
@Override
5555
public void consumeDisposition(ConsumeDisposition disposition) {}
56+
57+
@Override
58+
public void writtenBytes(int writtenBytes) {}
59+
60+
@Override
61+
public void readBytes(int readBytes) {}
5662
}

src/main/qpid/org/apache/qpid/protonj2/client/TransportOptions.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.HashMap;
2222
import java.util.List;
2323
import java.util.Map;
24+
import java.util.function.IntConsumer;
2425

2526
/**
2627
* Encapsulates all the Transport options in one configuration object.
@@ -67,7 +68,11 @@ public class TransportOptions implements Cloneable {
6768

6869
private final Map<String, String> webSocketHeaders = new HashMap<>();
6970

70-
@Override
71+
private IntConsumer readBytesConsumer = i -> { };
72+
private IntConsumer writtenBytesConsumer = i -> { };
73+
74+
75+
@Override
7176
public TransportOptions clone() {
7277
return copyInto(new TransportOptions());
7378
}
@@ -481,6 +486,24 @@ public TransportOptions webSocketCompression(boolean enabled) {
481486
return this;
482487
}
483488

489+
public IntConsumer readBytesConsumer() {
490+
return readBytesConsumer;
491+
}
492+
493+
public TransportOptions readBytesConsumer(IntConsumer readBytesConsumer) {
494+
this.readBytesConsumer = readBytesConsumer;
495+
return this;
496+
}
497+
498+
public IntConsumer writtenBytesConsumer() {
499+
return writtenBytesConsumer;
500+
}
501+
502+
public TransportOptions writtenBytesConsumer(IntConsumer writtenBytesConsumer) {
503+
this.writtenBytesConsumer = writtenBytesConsumer;
504+
return this;
505+
}
506+
484507
/**
485508
* Copy all configuration into the given {@link TransportOptions} from this instance.
486509
*
@@ -509,6 +532,8 @@ public TransportOptions copyInto(TransportOptions other) {
509532
other.webSocketHeaders().putAll(webSocketHeaders);
510533
other.webSocketMaxFrameSize(webSocketMaxFrameSize());
511534
other.webSocketCompression(webSocketCompression());
535+
other.readBytesConsumer(readBytesConsumer());
536+
other.writtenBytesConsumer(writtenBytesConsumer());
512537

513538
return other;
514539
}

src/main/qpid/org/apache/qpid/protonj2/client/transport/netty4/TcpTransport.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.security.Principal;
2323
import java.util.concurrent.CountDownLatch;
2424
import java.util.concurrent.atomic.AtomicBoolean;
25+
import java.util.function.IntConsumer;
2526

2627
import io.netty.buffer.PooledByteBufAllocator;
2728
import org.apache.qpid.protonj2.buffer.ProtonBuffer;
@@ -70,6 +71,8 @@ public class TcpTransport implements Transport {
7071
protected final TransportOptions options;
7172
protected final SslOptions sslOptions;
7273
protected final Bootstrap bootstrap;
74+
private final IntConsumer readBytesConsumer;
75+
private final IntConsumer writtenBytesConsumer;
7376

7477
protected Channel channel;
7578
protected volatile IOException failureCause;
@@ -104,6 +107,8 @@ public TcpTransport(Bootstrap bootstrap, TransportOptions options, SslOptions ss
104107
this.sslOptions = sslOptions;
105108
this.options = options;
106109
this.bootstrap = bootstrap;
110+
this.readBytesConsumer = options.readBytesConsumer();
111+
this.writtenBytesConsumer = options.writtenBytesConsumer();
107112
}
108113

109114
@Override
@@ -344,6 +349,8 @@ private TcpTransport writeOutputBuffer(final ProtonBuffer buffer, boolean flush,
344349
}
345350
}
346351

352+
this.writtenBytesConsumer.accept(nettyBuf.writableBytes());
353+
347354
if (--writeCount > 0) {
348355
channel.write(nettyBuf, channel.voidPromise());
349356
} else {
@@ -530,6 +537,7 @@ protected void dispatchReadBuffer(ByteBuf buffer) throws Exception {
530537
final ProtonBuffer wrapped = nettyAllocator.wrap(buffer).convertToReadOnly();
531538

532539
try (wrapped) {
540+
readBytesConsumer.accept(wrapped.getReadableBytes());
533541
listener.transportRead(wrapped);
534542
}
535543
}

src/test/java/com/rabbitmq/client/amqp/impl/MetricsCollectorTest.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,15 @@
1717
1818
package com.rabbitmq.client.amqp.impl;
1919

20+
import static com.rabbitmq.client.amqp.impl.Assertions.assertThat;
21+
import static com.rabbitmq.client.amqp.impl.TestUtils.waitAtMost;
2022
import static com.rabbitmq.client.amqp.metrics.MetricsCollector.ConsumeDisposition.DISCARDED;
2123
import static com.rabbitmq.client.amqp.metrics.MetricsCollector.ConsumeDisposition.REQUEUED;
2224
import static com.rabbitmq.client.amqp.metrics.MetricsCollector.PublishDisposition.RELEASED;
2325
import static java.lang.String.format;
2426
import static org.mockito.ArgumentMatchers.any;
27+
import static org.mockito.ArgumentMatchers.anyInt;
28+
import static org.mockito.Mockito.atLeastOnce;
2529
import static org.mockito.Mockito.doAnswer;
2630
import static org.mockito.Mockito.mock;
2731
import static org.mockito.Mockito.never;
@@ -55,6 +59,8 @@ void metricsShouldBeCollected() {
5559
try (Environment environment =
5660
TestUtils.environmentBuilder().metricsCollector(metricsCollector).build()) {
5761
verify(metricsCollector, never()).openConnection();
62+
verify(metricsCollector, never()).writtenBytes(anyInt());
63+
verify(metricsCollector, never()).readBytes(anyInt());
5864
String c1Name = UUID.randomUUID().toString();
5965
CountDownLatch recoveredLatch = new CountDownLatch(1);
6066
Connection c1 =
@@ -66,9 +72,11 @@ void metricsShouldBeCollected() {
6672
.connectionBuilder()
6773
.build();
6874
verify(metricsCollector, times(1)).openConnection();
75+
verify(metricsCollector, atLeastOnce()).writtenBytes(anyInt());
76+
verify(metricsCollector, atLeastOnce()).readBytes(anyInt());
6977

7078
Cli.closeConnection(c1Name);
71-
Assertions.assertThat(recoveredLatch).completes();
79+
assertThat(recoveredLatch).completes();
7280
// a recovered connection is not closed
7381
verify(metricsCollector, never()).closeConnection();
7482

@@ -98,7 +106,7 @@ void metricsShouldBeCollected() {
98106
.when(metricsCollector)
99107
.closeConnection();
100108
Cli.closeConnection(c2Name);
101-
Assertions.assertThat(c2ClosedLatch).completes();
109+
assertThat(c2ClosedLatch).completes();
102110
// the connection is closed because automatic recovery was not activated
103111
verify(metricsCollector, times(1)).closeConnection();
104112

@@ -112,7 +120,7 @@ void metricsShouldBeCollected() {
112120
CountDownLatch disposed = new CountDownLatch(1);
113121
publisher.publish(publisher.message().toAddress().queue(q).message(), disposed(disposed));
114122
verify(metricsCollector, times(1)).publish();
115-
Assertions.assertThat(disposed).completes();
123+
assertThat(disposed).completes();
116124
verify(metricsCollector, times(1))
117125
.publishDisposition(MetricsCollector.PublishDisposition.ACCEPTED);
118126

@@ -122,7 +130,7 @@ void metricsShouldBeCollected() {
122130
publisher.message().toAddress().queue(UUID.randomUUID().toString()).message(),
123131
disposed(disposed));
124132
verify(metricsCollector, times(2)).publish();
125-
Assertions.assertThat(disposed).completes();
133+
assertThat(disposed).completes();
126134
// the last message could not be routed, so its disposition state is RELEASED
127135
verify(metricsCollector, times(1)).publishDisposition(RELEASED);
128136
verify(metricsCollector, times(2)).publishDisposition(any());
@@ -160,7 +168,7 @@ void metricsShouldBeCollected() {
160168
consumedCount.incrementAndGet();
161169
})
162170
.build();
163-
TestUtils.waitAtMost(
171+
waitAtMost(
164172
() -> consumedCount.get() == 1,
165173
() -> format("Expected 1 message, but got %d.", consumedCount.get()));
166174
// the first message is accepted
@@ -172,7 +180,7 @@ void metricsShouldBeCollected() {
172180
publisher.publish(publisher.message().toAddress().queue(q).message(), ctx -> {});
173181

174182
// the message is requeued, so it comes back, and it's then discarded
175-
TestUtils.waitAtMost(() -> consumedCount.get() == 1 + 2);
183+
waitAtMost(() -> consumedCount.get() == 1 + 2);
176184
verify(metricsCollector, times(1 + 2)).consume();
177185
verify(metricsCollector, times(1)).consumeDisposition(REQUEUED);
178186
verify(metricsCollector, times(1)).consumeDisposition(DISCARDED);

0 commit comments

Comments
 (0)