Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ private NativeConnectionWrapper connect(
.useWebSockets(true)
.webSocketPath(connectionSettings.webSocketPath());
}
connectionOptions
.transportOptions()
.readBytesConsumer(this.environment().readBytesConsumer())
.writtenBytesConsumer(this.environment().writtenBytesConsumer());
StopWatch stopWatch = new StopWatch();
try {
LOGGER.trace("Connecting '{}' to {}...", this.name(), address);
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/com/rabbitmq/client/amqp/impl/AmqpEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.IntConsumer;
import org.apache.qpid.protonj2.client.Client;
import org.apache.qpid.protonj2.client.ClientOptions;
import org.slf4j.Logger;
Expand Down Expand Up @@ -65,6 +66,7 @@ class AmqpEnvironment implements Environment {
private final ExecutorService recoveryEventLoopExecutorService;
private final CredentialsManagerFactory credentialsManagerFactory =
new CredentialsManagerFactory(this);
private final IntConsumer readBytesConsumer, writtenBytesConsumer;

AmqpEnvironment(
ExecutorService executorService,
Expand Down Expand Up @@ -106,6 +108,8 @@ class AmqpEnvironment implements Environment {
}
this.metricsCollector =
metricsCollector == null ? NoOpMetricsCollector.INSTANCE : metricsCollector;
this.readBytesConsumer = this.metricsCollector::readBytes;
this.writtenBytesConsumer = this.metricsCollector::writtenBytes;
this.observationCollector =
observationCollector == null ? Utils.NO_OP_OBSERVATION_COLLECTOR : observationCollector;
this.recoveryEventLoopExecutorService =
Expand Down Expand Up @@ -190,6 +194,14 @@ MetricsCollector metricsCollector() {
return this.metricsCollector;
}

IntConsumer readBytesConsumer() {
return this.readBytesConsumer;
}

IntConsumer writtenBytesConsumer() {
return this.writtenBytesConsumer;
}

ObservationCollector observationCollector() {
return this.observationCollector;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,20 @@ public interface MetricsCollector {
*/
void consumeDisposition(ConsumeDisposition disposition);

/**
* Called when a connection writes bytes to its socket.
*
* @param writtenBytes the number of written bytes
*/
void writtenBytes(int writtenBytes);

/**
* Called when a connection reads bytes from its socket.
*
* @param readBytes the number of read bytes
*/
void readBytes(int readBytes);

/** The broker-to-client dispositions. */
enum PublishDisposition {
/** see {@link com.rabbitmq.client.amqp.Publisher.Status#ACCEPTED} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public class MicrometerMetricsCollector implements MetricsCollector {
private final AtomicLong consumers;
private final Counter publish, publishAccepted, publishRejected, publishReleased;
private final Counter consume, consumeAccepted, consumeRequeued, consumeDiscarded;
private final Counter writtenBytes;
private final Counter readBytes;

public MicrometerMetricsCollector(MeterRegistry registry) {
this(registry, "rabbitmq.amqp");
Expand Down Expand Up @@ -61,6 +63,8 @@ public MicrometerMetricsCollector(
this.consumeAccepted = registry.counter(prefix + ".consumed_accepted", tags);
this.consumeRequeued = registry.counter(prefix + ".consumed_requeued", tags);
this.consumeDiscarded = registry.counter(prefix + ".consumed_discarded", tags);
this.writtenBytes = registry.counter(prefix + ".written_bytes", tags);
this.readBytes = registry.counter(prefix + ".read_bytes", tags);
}

@Override
Expand Down Expand Up @@ -136,4 +140,14 @@ public void consumeDisposition(ConsumeDisposition disposition) {
break;
}
}

@Override
public void writtenBytes(int writtenBytes) {
this.writtenBytes.increment(writtenBytes);
}

@Override
public void readBytes(int readBytes) {
this.readBytes.increment(readBytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,10 @@ public void consume() {}

@Override
public void consumeDisposition(ConsumeDisposition disposition) {}

@Override
public void writtenBytes(int writtenBytes) {}

@Override
public void readBytes(int readBytes) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.IntConsumer;

/**
* Encapsulates all the Transport options in one configuration object.
Expand Down Expand Up @@ -67,7 +68,11 @@ public class TransportOptions implements Cloneable {

private final Map<String, String> webSocketHeaders = new HashMap<>();

@Override
private IntConsumer readBytesConsumer = i -> { };
private IntConsumer writtenBytesConsumer = i -> { };


@Override
public TransportOptions clone() {
return copyInto(new TransportOptions());
}
Expand Down Expand Up @@ -481,6 +486,24 @@ public TransportOptions webSocketCompression(boolean enabled) {
return this;
}

public IntConsumer readBytesConsumer() {
return readBytesConsumer;
}

public TransportOptions readBytesConsumer(IntConsumer readBytesConsumer) {
this.readBytesConsumer = readBytesConsumer;
return this;
}

public IntConsumer writtenBytesConsumer() {
return writtenBytesConsumer;
}

public TransportOptions writtenBytesConsumer(IntConsumer writtenBytesConsumer) {
this.writtenBytesConsumer = writtenBytesConsumer;
return this;
}

/**
* Copy all configuration into the given {@link TransportOptions} from this instance.
*
Expand Down Expand Up @@ -509,6 +532,8 @@ public TransportOptions copyInto(TransportOptions other) {
other.webSocketHeaders().putAll(webSocketHeaders);
other.webSocketMaxFrameSize(webSocketMaxFrameSize());
other.webSocketCompression(webSocketCompression());
other.readBytesConsumer(readBytesConsumer());
other.writtenBytesConsumer(writtenBytesConsumer());

return other;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.security.Principal;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntConsumer;

import io.netty.buffer.PooledByteBufAllocator;
import org.apache.qpid.protonj2.buffer.ProtonBuffer;
Expand Down Expand Up @@ -70,6 +71,8 @@ public class TcpTransport implements Transport {
protected final TransportOptions options;
protected final SslOptions sslOptions;
protected final Bootstrap bootstrap;
private final IntConsumer readBytesConsumer;
private final IntConsumer writtenBytesConsumer;

protected Channel channel;
protected volatile IOException failureCause;
Expand Down Expand Up @@ -104,6 +107,8 @@ public TcpTransport(Bootstrap bootstrap, TransportOptions options, SslOptions ss
this.sslOptions = sslOptions;
this.options = options;
this.bootstrap = bootstrap;
this.readBytesConsumer = options.readBytesConsumer();
this.writtenBytesConsumer = options.writtenBytesConsumer();
}

@Override
Expand Down Expand Up @@ -344,6 +349,8 @@ private TcpTransport writeOutputBuffer(final ProtonBuffer buffer, boolean flush,
}
}

this.writtenBytesConsumer.accept(nettyBuf.writableBytes());

if (--writeCount > 0) {
channel.write(nettyBuf, channel.voidPromise());
} else {
Expand Down Expand Up @@ -530,6 +537,7 @@ protected void dispatchReadBuffer(ByteBuf buffer) throws Exception {
final ProtonBuffer wrapped = nettyAllocator.wrap(buffer).convertToReadOnly();

try (wrapped) {
readBytesConsumer.accept(wrapped.getReadableBytes());
listener.transportRead(wrapped);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@
// [email protected].
package com.rabbitmq.client.amqp.impl;

import static com.rabbitmq.client.amqp.impl.Assertions.assertThat;
import static com.rabbitmq.client.amqp.impl.TestUtils.waitAtMost;
import static com.rabbitmq.client.amqp.metrics.MetricsCollector.ConsumeDisposition.DISCARDED;
import static com.rabbitmq.client.amqp.metrics.MetricsCollector.ConsumeDisposition.REQUEUED;
import static com.rabbitmq.client.amqp.metrics.MetricsCollector.PublishDisposition.RELEASED;
import static java.lang.String.format;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -55,6 +59,8 @@ void metricsShouldBeCollected() {
try (Environment environment =
TestUtils.environmentBuilder().metricsCollector(metricsCollector).build()) {
verify(metricsCollector, never()).openConnection();
verify(metricsCollector, never()).writtenBytes(anyInt());
verify(metricsCollector, never()).readBytes(anyInt());
String c1Name = UUID.randomUUID().toString();
CountDownLatch recoveredLatch = new CountDownLatch(1);
Connection c1 =
Expand All @@ -66,9 +72,11 @@ void metricsShouldBeCollected() {
.connectionBuilder()
.build();
verify(metricsCollector, times(1)).openConnection();
verify(metricsCollector, atLeastOnce()).writtenBytes(anyInt());
verify(metricsCollector, atLeastOnce()).readBytes(anyInt());

Cli.closeConnection(c1Name);
Assertions.assertThat(recoveredLatch).completes();
assertThat(recoveredLatch).completes();
// a recovered connection is not closed
verify(metricsCollector, never()).closeConnection();

Expand Down Expand Up @@ -98,7 +106,7 @@ void metricsShouldBeCollected() {
.when(metricsCollector)
.closeConnection();
Cli.closeConnection(c2Name);
Assertions.assertThat(c2ClosedLatch).completes();
assertThat(c2ClosedLatch).completes();
// the connection is closed because automatic recovery was not activated
verify(metricsCollector, times(1)).closeConnection();

Expand All @@ -112,7 +120,7 @@ void metricsShouldBeCollected() {
CountDownLatch disposed = new CountDownLatch(1);
publisher.publish(publisher.message().toAddress().queue(q).message(), disposed(disposed));
verify(metricsCollector, times(1)).publish();
Assertions.assertThat(disposed).completes();
assertThat(disposed).completes();
verify(metricsCollector, times(1))
.publishDisposition(MetricsCollector.PublishDisposition.ACCEPTED);

Expand All @@ -122,7 +130,7 @@ void metricsShouldBeCollected() {
publisher.message().toAddress().queue(UUID.randomUUID().toString()).message(),
disposed(disposed));
verify(metricsCollector, times(2)).publish();
Assertions.assertThat(disposed).completes();
assertThat(disposed).completes();
// the last message could not be routed, so its disposition state is RELEASED
verify(metricsCollector, times(1)).publishDisposition(RELEASED);
verify(metricsCollector, times(2)).publishDisposition(any());
Expand Down Expand Up @@ -160,7 +168,7 @@ void metricsShouldBeCollected() {
consumedCount.incrementAndGet();
})
.build();
TestUtils.waitAtMost(
waitAtMost(
() -> consumedCount.get() == 1,
() -> format("Expected 1 message, but got %d.", consumedCount.get()));
// the first message is accepted
Expand All @@ -172,7 +180,7 @@ void metricsShouldBeCollected() {
publisher.publish(publisher.message().toAddress().queue(q).message(), ctx -> {});

// the message is requeued, so it comes back, and it's then discarded
TestUtils.waitAtMost(() -> consumedCount.get() == 1 + 2);
waitAtMost(() -> consumedCount.get() == 1 + 2);
verify(metricsCollector, times(1 + 2)).consume();
verify(metricsCollector, times(1)).consumeDisposition(REQUEUED);
verify(metricsCollector, times(1)).consumeDisposition(DISCARDED);
Expand Down
Loading