|
51 | 51 | import org.elasticsearch.http.HttpServerTransport; |
52 | 52 | import org.elasticsearch.plugins.ActionPlugin; |
53 | 53 | import org.elasticsearch.plugins.Plugin; |
| 54 | +import org.elasticsearch.plugins.PluginsService; |
| 55 | +import org.elasticsearch.plugins.TelemetryPlugin; |
54 | 56 | import org.elasticsearch.rest.BaseRestHandler; |
55 | 57 | import org.elasticsearch.rest.ChunkedRestResponseBodyPart; |
56 | 58 | import org.elasticsearch.rest.RestController; |
|
60 | 62 | import org.elasticsearch.rest.RestStatus; |
61 | 63 | import org.elasticsearch.rest.action.EmptyResponseListener; |
62 | 64 | import org.elasticsearch.rest.action.RestToXContentListener; |
| 65 | +import org.elasticsearch.telemetry.Measurement; |
| 66 | +import org.elasticsearch.telemetry.TestTelemetryPlugin; |
63 | 67 | import org.elasticsearch.test.ESIntegTestCase; |
64 | 68 | import org.elasticsearch.transport.netty4.NettyAllocator; |
65 | 69 | import org.elasticsearch.xcontent.ToXContentObject; |
@@ -91,7 +95,7 @@ public class Netty4PipeliningIT extends ESNetty4IntegTestCase { |
91 | 95 | @Override |
92 | 96 | protected Collection<Class<? extends Plugin>> nodePlugins() { |
93 | 97 | return CollectionUtils.concatLists( |
94 | | - List.of(CountDown3Plugin.class, ChunkAndFailPlugin.class, KeepPipeliningPlugin.class), |
| 98 | + List.of(CountDown3Plugin.class, ChunkAndFailPlugin.class, KeepPipeliningPlugin.class, TestTelemetryPlugin.class), |
95 | 99 | super.nodePlugins() |
96 | 100 | ); |
97 | 101 | } |
@@ -281,6 +285,90 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { |
281 | 285 | } |
282 | 286 | } |
283 | 287 |
|
| 288 | + public void testConnectionStatsExposedToTelemetryPlugin() throws Exception { |
| 289 | + final var targetNode = internalCluster().startNode(); |
| 290 | + |
| 291 | + final var telemetryPlugin = asInstanceOf( |
| 292 | + TestTelemetryPlugin.class, |
| 293 | + internalCluster().getInstance(PluginsService.class, targetNode).filterPlugins(TelemetryPlugin.class).findAny().orElseThrow() |
| 294 | + ); |
| 295 | + |
| 296 | + assertHttpMetrics(telemetryPlugin, 0L, 0L); |
| 297 | + |
| 298 | + final var releasables = new ArrayList<Releasable>(3); |
| 299 | + try { |
| 300 | + final var keepPipeliningRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, KeepPipeliningPlugin.ROUTE); |
| 301 | + releasables.add(keepPipeliningRequest::release); |
| 302 | + |
| 303 | + final var responseReceivedLatch = new CountDownLatch(1); |
| 304 | + |
| 305 | + final EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1); |
| 306 | + releasables.add(() -> eventLoopGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS).awaitUninterruptibly()); |
| 307 | + final var clientBootstrap = new Bootstrap().channel(NettyAllocator.getChannelType()) |
| 308 | + .option(ChannelOption.ALLOCATOR, NettyAllocator.getAllocator()) |
| 309 | + .group(eventLoopGroup) |
| 310 | + .handler(new ChannelInitializer<SocketChannel>() { |
| 311 | + @Override |
| 312 | + protected void initChannel(SocketChannel ch) { |
| 313 | + ch.pipeline().addLast(new HttpClientCodec()); |
| 314 | + ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpResponse>() { |
| 315 | + |
| 316 | + @Override |
| 317 | + protected void channelRead0(ChannelHandlerContext ctx, HttpResponse msg) { |
| 318 | + responseReceivedLatch.countDown(); |
| 319 | + } |
| 320 | + |
| 321 | + @Override |
| 322 | + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { |
| 323 | + ExceptionsHelper.maybeDieOnAnotherThread(new AssertionError(cause)); |
| 324 | + } |
| 325 | + }); |
| 326 | + } |
| 327 | + }); |
| 328 | + |
| 329 | + final var httpServerTransport = internalCluster().getInstance(HttpServerTransport.class, targetNode); |
| 330 | + final var httpServerAddress = randomFrom(httpServerTransport.boundAddress().boundAddresses()).address(); |
| 331 | + |
| 332 | + // Open a channel on which we will pipeline the requests to KeepPipeliningPlugin.ROUTE |
| 333 | + final var pipeliningChannel = clientBootstrap.connect(httpServerAddress).syncUninterruptibly().channel(); |
| 334 | + releasables.add(() -> pipeliningChannel.close().syncUninterruptibly()); |
| 335 | + |
| 336 | + if (randomBoolean()) { |
| 337 | + // assertBusy because client-side connect may complete before server-side |
| 338 | + assertBusy(() -> assertHttpMetrics(telemetryPlugin, 1L, 1L)); |
| 339 | + } else { |
| 340 | + // Send two pipelined requests so that we start to receive responses |
| 341 | + pipeliningChannel.writeAndFlush(keepPipeliningRequest.retain()); |
| 342 | + pipeliningChannel.writeAndFlush(keepPipeliningRequest.retain()); |
| 343 | + |
| 344 | + // wait until we've started to receive responses (but we won't have received them all) - server side is definitely open now |
| 345 | + safeAwait(responseReceivedLatch); |
| 346 | + assertHttpMetrics(telemetryPlugin, 1L, 1L); |
| 347 | + } |
| 348 | + } finally { |
| 349 | + Collections.reverse(releasables); |
| 350 | + Releasables.close(releasables); |
| 351 | + } |
| 352 | + |
| 353 | + // assertBusy because client-side close may complete before server-side |
| 354 | + assertBusy(() -> assertHttpMetrics(telemetryPlugin, 1L, 0L)); |
| 355 | + } |
| 356 | + |
| 357 | + private static void assertHttpMetrics(TestTelemetryPlugin telemetryPlugin, long expectedTotal, long expectedCurrent) { |
| 358 | + try { |
| 359 | + telemetryPlugin.collect(); |
| 360 | + assertMeasurement(telemetryPlugin.getLongAsyncCounterMeasurement("es.http.connections.total"), expectedTotal); |
| 361 | + assertMeasurement(telemetryPlugin.getLongGaugeMeasurement("es.http.connections.current"), expectedCurrent); |
| 362 | + } finally { |
| 363 | + telemetryPlugin.resetMeter(); |
| 364 | + } |
| 365 | + } |
| 366 | + |
| 367 | + private static void assertMeasurement(List<Measurement> measurements, long expectedValue) { |
| 368 | + assertThat(measurements, hasSize(1)); |
| 369 | + assertThat(measurements.get(0).getLong(), equalTo(expectedValue)); |
| 370 | + } |
| 371 | + |
284 | 372 | private void assertOpaqueIdsInOrder(Collection<String> opaqueIds) { |
285 | 373 | // check if opaque ids are monotonically increasing |
286 | 374 | int i = 0; |
|
0 commit comments