Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/130939.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 130939
summary: Expose HTTP connection metrics to telemetry
area: Network
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.TelemetryPlugin;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
import org.elasticsearch.rest.RestController;
Expand All @@ -60,6 +62,8 @@
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.EmptyResponseListener;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.telemetry.Measurement;
import org.elasticsearch.telemetry.TestTelemetryPlugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.netty4.NettyAllocator;
import org.elasticsearch.xcontent.ToXContentObject;
Expand Down Expand Up @@ -91,7 +95,7 @@ public class Netty4PipeliningIT extends ESNetty4IntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.concatLists(
List.of(CountDown3Plugin.class, ChunkAndFailPlugin.class, KeepPipeliningPlugin.class),
List.of(CountDown3Plugin.class, ChunkAndFailPlugin.class, KeepPipeliningPlugin.class, TestTelemetryPlugin.class),
super.nodePlugins()
);
}
Expand Down Expand Up @@ -281,6 +285,90 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
}
}

public void testConnectionStatsExposedToTelemetryPlugin() throws Exception {
final var targetNode = internalCluster().startNode();

final var telemetryPlugin = asInstanceOf(
TestTelemetryPlugin.class,
internalCluster().getInstance(PluginsService.class, targetNode).filterPlugins(TelemetryPlugin.class).findAny().orElseThrow()
);

assertHttpMetrics(telemetryPlugin, 0L, 0L);

final var releasables = new ArrayList<Releasable>(3);
try {
final var keepPipeliningRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, KeepPipeliningPlugin.ROUTE);
releasables.add(keepPipeliningRequest::release);

final var responseReceivedLatch = new CountDownLatch(1);

final EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
releasables.add(() -> eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).awaitUninterruptibly());
final var clientBootstrap = new Bootstrap().channel(NettyAllocator.getChannelType())
.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator())
.group(eventLoopGroup)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new HttpClientCodec());
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpResponse>() {

@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpResponse msg) {
responseReceivedLatch.countDown();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ExceptionsHelper.maybeDieOnAnotherThread(new AssertionError(cause));
}
});
}
});

final var httpServerTransport = internalCluster().getInstance(HttpServerTransport.class, targetNode);
final var httpServerAddress = randomFrom(httpServerTransport.boundAddress().boundAddresses()).address();

// Open a channel on which we will pipeline the requests to KeepPipeliningPlugin.ROUTE
final var pipeliningChannel = clientBootstrap.connect(httpServerAddress).syncUninterruptibly().channel();
releasables.add(() -> pipeliningChannel.close().syncUninterruptibly());

if (randomBoolean()) {
// assertBusy because client-side connect may complete before server-side
assertBusy(() -> assertHttpMetrics(telemetryPlugin, 1L, 1L));
} else {
// Send two pipelined requests so that we start to receive responses
pipeliningChannel.writeAndFlush(keepPipeliningRequest.retain());
pipeliningChannel.writeAndFlush(keepPipeliningRequest.retain());

// wait until we've started to receive responses (but we won't have received them all) - server side is definitely open now
safeAwait(responseReceivedLatch);
assertHttpMetrics(telemetryPlugin, 1L, 1L);
}
} finally {
Collections.reverse(releasables);
Releasables.close(releasables);
}

// assertBusy because client-side close may complete before server-side
assertBusy(() -> assertHttpMetrics(telemetryPlugin, 1L, 0L));
}

private static void assertHttpMetrics(TestTelemetryPlugin telemetryPlugin, long expectedTotal, long expectedCurrent) {
try {
telemetryPlugin.collect();
assertMeasurement(telemetryPlugin.getLongAsyncCounterMeasurement("es.http.connections.total"), expectedTotal);
assertMeasurement(telemetryPlugin.getLongGaugeMeasurement("es.http.connections.current"), expectedCurrent);
} finally {
telemetryPlugin.resetMeter();
}
}

private static void assertMeasurement(List<Measurement> measurements, long expectedValue) {
assertThat(measurements, hasSize(1));
assertThat(measurements.get(0).getLong(), equalTo(expectedValue));
}

private void assertOpaqueIdsInOrder(Collection<String> opaqueIds) {
// check if opaque ids are monotonically increasing
int i = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.telemetry.metric.LongWithAttributes;
import org.elasticsearch.telemetry.metric.MeterRegistry;
import org.elasticsearch.telemetry.tracing.Tracer;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BindTransportException;
Expand Down Expand Up @@ -103,6 +105,8 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo

private final HttpTracer httpLogger;
private final Tracer tracer;
private final MeterRegistry meterRegistry;
private final List<AutoCloseable> metricsToClose = new ArrayList<>(2);
private volatile boolean shuttingDown;
private final ReadWriteLock shuttingDownRWLock = new StampedLock().asReadWriteLock();

Expand Down Expand Up @@ -142,6 +146,7 @@ protected AbstractHttpServerTransport(

this.maxContentLength = SETTING_HTTP_MAX_CONTENT_LENGTH.get(settings);
this.tracer = telemetryProvider.getTracer();
this.meterRegistry = telemetryProvider.getMeterRegistry();
this.httpLogger = new HttpTracer(settings, clusterSettings);
clusterSettings.addSettingsUpdateConsumer(
TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING,
Expand Down Expand Up @@ -238,6 +243,22 @@ private TransportAddress bindAddress(final InetAddress hostAddress) {

@Override
protected final void doStart() {
metricsToClose.add(
meterRegistry.registerLongAsyncCounter(
"es.http.connections.total",
"total number of inbound HTTP connections accepted",
"count",
() -> new LongWithAttributes(totalChannelsAccepted.get())
)
);
metricsToClose.add(
meterRegistry.registerLongGauge(
"es.http.connections.current",
"number of inbound HTTP connections currently open",
"count",
() -> new LongWithAttributes(httpChannels.size())
)
);
startInternal();
}

Expand Down Expand Up @@ -328,6 +349,16 @@ protected final void doStop() {
logger.warn("unexpected exception while waiting for http channels to close", e);
}
}

for (final var metricToClose : metricsToClose) {
try {
metricToClose.close();
} catch (Exception e) {
logger.warn("unexpected exception while closing metric [{}]", metricToClose);
assert false : e;
}
}

stopInternal();
}

Expand Down
Loading