Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/130939.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 130939
summary: Expose HTTP connection metrics to telemetry
area: Network
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.TelemetryPlugin;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
import org.elasticsearch.rest.RestController;
Expand All @@ -60,6 +62,8 @@
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.EmptyResponseListener;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.telemetry.Measurement;
import org.elasticsearch.telemetry.TestTelemetryPlugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.transport.netty4.NettyAllocator;
import org.elasticsearch.xcontent.ToXContentObject;
Expand Down Expand Up @@ -91,7 +95,7 @@ public class Netty4PipeliningIT extends ESNetty4IntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.concatLists(
List.of(CountDown3Plugin.class, ChunkAndFailPlugin.class, KeepPipeliningPlugin.class),
List.of(CountDown3Plugin.class, ChunkAndFailPlugin.class, KeepPipeliningPlugin.class, TestTelemetryPlugin.class),
super.nodePlugins()
);
}
Expand Down Expand Up @@ -281,6 +285,87 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
}
}

public void testConnectionStatsExposedToTelemetryPlugin() throws Exception {
final var targetNode = internalCluster().startNode();

final var telemetryPlugin = asInstanceOf(
TestTelemetryPlugin.class,
internalCluster().getInstance(PluginsService.class, targetNode).filterPlugins(TelemetryPlugin.class).findAny().orElseThrow()
);

assertHttpMetrics(telemetryPlugin, 0L, 0L);

final var releasables = new ArrayList<Releasable>(3);
try {
final var keepPipeliningRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, KeepPipeliningPlugin.ROUTE);
releasables.add(keepPipeliningRequest::release);

final var responseReceivedLatch = new CountDownLatch(1);

final EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
releasables.add(() -> eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).awaitUninterruptibly());
final var clientBootstrap = new Bootstrap().channel(NettyAllocator.getChannelType())
.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator())
.group(eventLoopGroup)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new HttpClientCodec());
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpResponse>() {

@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpResponse msg) {
responseReceivedLatch.countDown();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ExceptionsHelper.maybeDieOnAnotherThread(new AssertionError(cause));
}
});
}
});

final var httpServerTransport = internalCluster().getInstance(HttpServerTransport.class, targetNode);
final var httpServerAddress = randomFrom(httpServerTransport.boundAddress().boundAddresses()).address();

// Open a channel on which we will pipeline the requests to KeepPipeliningPlugin.ROUTE
final var pipeliningChannel = clientBootstrap.connect(httpServerAddress).syncUninterruptibly().channel();
releasables.add(() -> pipeliningChannel.close().syncUninterruptibly());

if (randomBoolean()) {
// assertBusy because client-side connect may complete before server-side
assertBusy(() -> assertHttpMetrics(telemetryPlugin, 1L, 1L));
} else {
// Send two pipelined requests so that we start to receive responses
pipeliningChannel.writeAndFlush(keepPipeliningRequest.retain());
pipeliningChannel.writeAndFlush(keepPipeliningRequest.retain());

// wait until we've started to receive responses (but we won't have received them all) - server side is definitely open now
safeAwait(responseReceivedLatch);
assertHttpMetrics(telemetryPlugin, 1L, 1L);
}
} finally {
Collections.reverse(releasables);
Releasables.close(releasables);
}

// assertBusy because client-side close may complete before server-side
assertBusy(() -> assertHttpMetrics(telemetryPlugin, 1L, 0L));
}

private static void assertHttpMetrics(TestTelemetryPlugin telemetryPlugin, long expectedTotal, long expectedCurrent) {
telemetryPlugin.collect();
assertMeasurement(telemetryPlugin.getLongAsyncCounterMeasurement("es.http.connections.total"), expectedTotal);
assertMeasurement(telemetryPlugin.getLongGaugeMeasurement("es.http.connections.current"), expectedCurrent);
telemetryPlugin.resetMeter();
}

private static void assertMeasurement(List<Measurement> measurements, long expectedValue) {
assertThat(measurements, hasSize(1));
assertThat(measurements.get(0).getLong(), equalTo(expectedValue));
}

private void assertOpaqueIdsInOrder(Collection<String> opaqueIds) {
// check if opaque ids are monotonically increasing
int i = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.telemetry.metric.LongWithAttributes;
import org.elasticsearch.telemetry.metric.MeterRegistry;
import org.elasticsearch.telemetry.tracing.Tracer;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BindTransportException;
Expand Down Expand Up @@ -103,6 +105,8 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo

private final HttpTracer httpLogger;
private final Tracer tracer;
private final MeterRegistry meterRegistry;
private final List<AutoCloseable> metricsToClose = new ArrayList<>(2);
private volatile boolean shuttingDown;
private final ReadWriteLock shuttingDownRWLock = new StampedLock().asReadWriteLock();

Expand Down Expand Up @@ -142,6 +146,7 @@ protected AbstractHttpServerTransport(

this.maxContentLength = SETTING_HTTP_MAX_CONTENT_LENGTH.get(settings);
this.tracer = telemetryProvider.getTracer();
this.meterRegistry = telemetryProvider.getMeterRegistry();
this.httpLogger = new HttpTracer(settings, clusterSettings);
clusterSettings.addSettingsUpdateConsumer(
TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING,
Expand Down Expand Up @@ -238,6 +243,22 @@ private TransportAddress bindAddress(final InetAddress hostAddress) {

@Override
protected final void doStart() {
metricsToClose.add(
meterRegistry.registerLongAsyncCounter(
"es.http.connections.total",
"total number of inbound HTTP connections accepted",
"count",
() -> new LongWithAttributes(totalChannelsAccepted.get())
)
);
metricsToClose.add(
meterRegistry.registerLongGauge(
"es.http.connections.current",
"number of inbound HTTP connections currently open",
"count",
() -> new LongWithAttributes(httpChannels.size())
)
);
startInternal();
}

Expand Down Expand Up @@ -328,6 +349,16 @@ protected final void doStop() {
logger.warn("unexpected exception while waiting for http channels to close", e);
}
}

for (final var metricToClose : metricsToClose) {
try {
metricToClose.close();
} catch (Exception e) {
logger.warn("unexpected exception while closing metric [{}]", metricToClose);
assert false : e;
}
}

stopInternal();
}

Expand Down
Loading