Skip to content

Commit fb09ad6

Browse files
DaveCTurnermridula-s109
authored andcommitted
Expose HTTP connection metrics to telemetry (elastic#130939)
Closes ES-12223
1 parent 784458c commit fb09ad6

File tree

3 files changed

+125
-1
lines changed

3 files changed

+125
-1
lines changed

docs/changelog/130939.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 130939
2+
summary: Expose HTTP connection metrics to telemetry
3+
area: Network
4+
type: enhancement
5+
issues: []

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4PipeliningIT.java

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151
import org.elasticsearch.http.HttpServerTransport;
5252
import org.elasticsearch.plugins.ActionPlugin;
5353
import org.elasticsearch.plugins.Plugin;
54+
import org.elasticsearch.plugins.PluginsService;
55+
import org.elasticsearch.plugins.TelemetryPlugin;
5456
import org.elasticsearch.rest.BaseRestHandler;
5557
import org.elasticsearch.rest.ChunkedRestResponseBodyPart;
5658
import org.elasticsearch.rest.RestController;
@@ -60,6 +62,8 @@
6062
import org.elasticsearch.rest.RestStatus;
6163
import org.elasticsearch.rest.action.EmptyResponseListener;
6264
import org.elasticsearch.rest.action.RestToXContentListener;
65+
import org.elasticsearch.telemetry.Measurement;
66+
import org.elasticsearch.telemetry.TestTelemetryPlugin;
6367
import org.elasticsearch.test.ESIntegTestCase;
6468
import org.elasticsearch.transport.netty4.NettyAllocator;
6569
import org.elasticsearch.xcontent.ToXContentObject;
@@ -91,7 +95,7 @@ public class Netty4PipeliningIT extends ESNetty4IntegTestCase {
9195
@Override
9296
protected Collection<Class<? extends Plugin>> nodePlugins() {
9397
return CollectionUtils.concatLists(
94-
List.of(CountDown3Plugin.class, ChunkAndFailPlugin.class, KeepPipeliningPlugin.class),
98+
List.of(CountDown3Plugin.class, ChunkAndFailPlugin.class, KeepPipeliningPlugin.class, TestTelemetryPlugin.class),
9599
super.nodePlugins()
96100
);
97101
}
@@ -281,6 +285,90 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
281285
}
282286
}
283287

288+
public void testConnectionStatsExposedToTelemetryPlugin() throws Exception {
289+
final var targetNode = internalCluster().startNode();
290+
291+
final var telemetryPlugin = asInstanceOf(
292+
TestTelemetryPlugin.class,
293+
internalCluster().getInstance(PluginsService.class, targetNode).filterPlugins(TelemetryPlugin.class).findAny().orElseThrow()
294+
);
295+
296+
assertHttpMetrics(telemetryPlugin, 0L, 0L);
297+
298+
final var releasables = new ArrayList<Releasable>(3);
299+
try {
300+
final var keepPipeliningRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, KeepPipeliningPlugin.ROUTE);
301+
releasables.add(keepPipeliningRequest::release);
302+
303+
final var responseReceivedLatch = new CountDownLatch(1);
304+
305+
final EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
306+
releasables.add(() -> eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).awaitUninterruptibly());
307+
final var clientBootstrap = new Bootstrap().channel(NettyAllocator.getChannelType())
308+
.option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator())
309+
.group(eventLoopGroup)
310+
.handler(new ChannelInitializer<SocketChannel>() {
311+
@Override
312+
protected void initChannel(SocketChannel ch) {
313+
ch.pipeline().addLast(new HttpClientCodec());
314+
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpResponse>() {
315+
316+
@Override
317+
protected void channelRead0(ChannelHandlerContext ctx, HttpResponse msg) {
318+
responseReceivedLatch.countDown();
319+
}
320+
321+
@Override
322+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
323+
ExceptionsHelper.maybeDieOnAnotherThread(new AssertionError(cause));
324+
}
325+
});
326+
}
327+
});
328+
329+
final var httpServerTransport = internalCluster().getInstance(HttpServerTransport.class, targetNode);
330+
final var httpServerAddress = randomFrom(httpServerTransport.boundAddress().boundAddresses()).address();
331+
332+
// Open a channel on which we will pipeline the requests to KeepPipeliningPlugin.ROUTE
333+
final var pipeliningChannel = clientBootstrap.connect(httpServerAddress).syncUninterruptibly().channel();
334+
releasables.add(() -> pipeliningChannel.close().syncUninterruptibly());
335+
336+
if (randomBoolean()) {
337+
// assertBusy because client-side connect may complete before server-side
338+
assertBusy(() -> assertHttpMetrics(telemetryPlugin, 1L, 1L));
339+
} else {
340+
// Send two pipelined requests so that we start to receive responses
341+
pipeliningChannel.writeAndFlush(keepPipeliningRequest.retain());
342+
pipeliningChannel.writeAndFlush(keepPipeliningRequest.retain());
343+
344+
// wait until we've started to receive responses (but we won't have received them all) - server side is definitely open now
345+
safeAwait(responseReceivedLatch);
346+
assertHttpMetrics(telemetryPlugin, 1L, 1L);
347+
}
348+
} finally {
349+
Collections.reverse(releasables);
350+
Releasables.close(releasables);
351+
}
352+
353+
// assertBusy because client-side close may complete before server-side
354+
assertBusy(() -> assertHttpMetrics(telemetryPlugin, 1L, 0L));
355+
}
356+
357+
private static void assertHttpMetrics(TestTelemetryPlugin telemetryPlugin, long expectedTotal, long expectedCurrent) {
358+
try {
359+
telemetryPlugin.collect();
360+
assertMeasurement(telemetryPlugin.getLongAsyncCounterMeasurement("es.http.connections.total"), expectedTotal);
361+
assertMeasurement(telemetryPlugin.getLongGaugeMeasurement("es.http.connections.current"), expectedCurrent);
362+
} finally {
363+
telemetryPlugin.resetMeter();
364+
}
365+
}
366+
367+
private static void assertMeasurement(List<Measurement> measurements, long expectedValue) {
368+
assertThat(measurements, hasSize(1));
369+
assertThat(measurements.get(0).getLong(), equalTo(expectedValue));
370+
}
371+
284372
private void assertOpaqueIdsInOrder(Collection<String> opaqueIds) {
285373
// check if opaque ids are monotonically increasing
286374
int i = 0;

server/src/main/java/org/elasticsearch/http/AbstractHttpServerTransport.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import org.elasticsearch.rest.RestRequest;
4242
import org.elasticsearch.tasks.Task;
4343
import org.elasticsearch.telemetry.TelemetryProvider;
44+
import org.elasticsearch.telemetry.metric.LongWithAttributes;
45+
import org.elasticsearch.telemetry.metric.MeterRegistry;
4446
import org.elasticsearch.telemetry.tracing.Tracer;
4547
import org.elasticsearch.threadpool.ThreadPool;
4648
import org.elasticsearch.transport.BindTransportException;
@@ -103,6 +105,8 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
103105

104106
private final HttpTracer httpLogger;
105107
private final Tracer tracer;
108+
private final MeterRegistry meterRegistry;
109+
private final List<AutoCloseable> metricsToClose = new ArrayList<>(2);
106110
private volatile boolean shuttingDown;
107111
private final ReadWriteLock shuttingDownRWLock = new StampedLock().asReadWriteLock();
108112

@@ -142,6 +146,7 @@ protected AbstractHttpServerTransport(
142146

143147
this.maxContentLength = SETTING_HTTP_MAX_CONTENT_LENGTH.get(settings);
144148
this.tracer = telemetryProvider.getTracer();
149+
this.meterRegistry = telemetryProvider.getMeterRegistry();
145150
this.httpLogger = new HttpTracer(settings, clusterSettings);
146151
clusterSettings.addSettingsUpdateConsumer(
147152
TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING,
@@ -238,6 +243,22 @@ private TransportAddress bindAddress(final InetAddress hostAddress) {
238243

239244
@Override
240245
protected final void doStart() {
246+
metricsToClose.add(
247+
meterRegistry.registerLongAsyncCounter(
248+
"es.http.connections.total",
249+
"total number of inbound HTTP connections accepted",
250+
"count",
251+
() -> new LongWithAttributes(totalChannelsAccepted.get())
252+
)
253+
);
254+
metricsToClose.add(
255+
meterRegistry.registerLongGauge(
256+
"es.http.connections.current",
257+
"number of inbound HTTP connections currently open",
258+
"count",
259+
() -> new LongWithAttributes(httpChannels.size())
260+
)
261+
);
241262
startInternal();
242263
}
243264

@@ -328,6 +349,16 @@ protected final void doStop() {
328349
logger.warn("unexpected exception while waiting for http channels to close", e);
329350
}
330351
}
352+
353+
for (final var metricToClose : metricsToClose) {
354+
try {
355+
metricToClose.close();
356+
} catch (Exception e) {
357+
logger.warn("unexpected exception while closing metric [{}]", metricToClose);
358+
assert false : e;
359+
}
360+
}
361+
331362
stopInternal();
332363
}
333364

0 commit comments

Comments
 (0)