From c36a597b8581fd5f473d006b75bb93c2fa65130c Mon Sep 17 00:00:00 2001 From: Mariia Skripchenko <61115099+marychatte@users.noreply.github.com> Date: Tue, 25 Nov 2025 16:15:38 +0100 Subject: [PATCH 1/2] Fix double `end()` calls in `KtorServerTelemetry` --- .../internal/KtorServerTelemetryUtil.kt | 27 ++-- .../ktor/v2_0/KtorServerTelemetryBuilder.kt | 4 +- .../ktor/v3_0/KtorServerTelemetryBuilder.kt | 4 +- .../ktor/v3_0/KtorServerMetricsTest.kt | 117 ++++++++++++++++++ 4 files changed, 132 insertions(+), 20 deletions(-) create mode 100644 instrumentation/ktor/ktor-3.0/library/src/test/kotlin/io/opentelemetry/instrumentation/ktor/v3_0/KtorServerMetricsTest.kt diff --git a/instrumentation/ktor/ktor-2-common/library/src/main/kotlin/io/opentelemetry/instrumentation/ktor/v2_0/common/internal/KtorServerTelemetryUtil.kt b/instrumentation/ktor/ktor-2-common/library/src/main/kotlin/io/opentelemetry/instrumentation/ktor/v2_0/common/internal/KtorServerTelemetryUtil.kt index fa124bb24d86..d6e7d996031c 100644 --- a/instrumentation/ktor/ktor-2-common/library/src/main/kotlin/io/opentelemetry/instrumentation/ktor/v2_0/common/internal/KtorServerTelemetryUtil.kt +++ b/instrumentation/ktor/ktor-2-common/library/src/main/kotlin/io/opentelemetry/instrumentation/ktor/v2_0/common/internal/KtorServerTelemetryUtil.kt @@ -6,6 +6,7 @@ package io.opentelemetry.instrumentation.ktor.v2_0.common.internal import io.ktor.server.application.* +import io.ktor.server.application.hooks.* import io.ktor.server.request.* import io.ktor.server.response.* import io.ktor.util.* @@ -25,14 +26,15 @@ import kotlinx.coroutines.withContext */ object KtorServerTelemetryUtil { - fun configureTelemetry(builder: AbstractKtorServerTelemetryBuilder, application: Application) { + fun PluginBuilder<*>.configureTelemetry(builder: AbstractKtorServerTelemetryBuilder, application: Application) { val contextKey = AttributeKey("OpenTelemetry") val errorKey = AttributeKey("OpenTelemetryException") + val processedKey = AttributeKey("OpenTelemetryProcessed") val instrumenter = instrumenter(builder) val tracer = KtorServerTracer(instrumenter) - val startPhase = PipelinePhase("OpenTelemetry") + val startPhase = PipelinePhase("OpenTelemetry") application.insertPhaseBefore(ApplicationCallPipeline.Setup, startPhase) application.intercept(startPhase) { val context = tracer.start(call) @@ -59,22 +61,15 @@ object KtorServerTelemetryUtil { } } - val postSendPhase = PipelinePhase("OpenTelemetryPostSend") - application.sendPipeline.insertPhaseAfter(ApplicationSendPipeline.After, postSendPhase) - application.sendPipeline.intercept(postSendPhase) { + on(ResponseSent) { call -> + if (call.attributes.contains(processedKey)) { + return@on + } + val context = call.attributes.getOrNull(contextKey) if (context != null) { - var error: Throwable? = call.attributes.getOrNull(errorKey) - try { - proceed() - } catch (t: Throwable) { - error = t - throw t - } finally { - tracer.end(context, call, error) - } - } else { - proceed() + tracer.end(context, call, call.attributes.getOrNull(errorKey)) + call.attributes.put(processedKey, Unit) } } } diff --git a/instrumentation/ktor/ktor-2.0/library/src/main/kotlin/io/opentelemetry/instrumentation/ktor/v2_0/KtorServerTelemetryBuilder.kt b/instrumentation/ktor/ktor-2.0/library/src/main/kotlin/io/opentelemetry/instrumentation/ktor/v2_0/KtorServerTelemetryBuilder.kt index 9460f5d0f2e6..a6c721bcc0e6 100644 --- a/instrumentation/ktor/ktor-2.0/library/src/main/kotlin/io/opentelemetry/instrumentation/ktor/v2_0/KtorServerTelemetryBuilder.kt +++ b/instrumentation/ktor/ktor-2.0/library/src/main/kotlin/io/opentelemetry/instrumentation/ktor/v2_0/KtorServerTelemetryBuilder.kt @@ -12,7 +12,7 @@ import io.opentelemetry.instrumentation.api.semconv.http.HttpServerRoute import io.opentelemetry.instrumentation.api.semconv.http.HttpServerRouteSource import io.opentelemetry.instrumentation.ktor.v2_0.InstrumentationProperties.INSTRUMENTATION_NAME import io.opentelemetry.instrumentation.ktor.v2_0.common.AbstractKtorServerTelemetryBuilder -import io.opentelemetry.instrumentation.ktor.v2_0.common.internal.KtorServerTelemetryUtil +import io.opentelemetry.instrumentation.ktor.v2_0.common.internal.KtorServerTelemetryUtil.configureTelemetry class KtorServerTelemetryBuilder internal constructor( instrumentationName: String @@ -21,7 +21,7 @@ class KtorServerTelemetryBuilder internal constructor( val KtorServerTelemetry = createRouteScopedPlugin("OpenTelemetry", { KtorServerTelemetryBuilder(INSTRUMENTATION_NAME) }) { require(pluginConfig.isOpenTelemetryInitialized()) { "OpenTelemetry must be set" } - KtorServerTelemetryUtil.configureTelemetry(pluginConfig, application) + configureTelemetry(pluginConfig, application) application.environment.monitor.subscribe(Routing.RoutingCallStarted) { call -> HttpServerRoute.update(Context.current(), HttpServerRouteSource.SERVER, { _, arg -> arg!!.route.parent.toString() }, call) diff --git a/instrumentation/ktor/ktor-3.0/library/src/main/kotlin/io/opentelemetry/instrumentation/ktor/v3_0/KtorServerTelemetryBuilder.kt b/instrumentation/ktor/ktor-3.0/library/src/main/kotlin/io/opentelemetry/instrumentation/ktor/v3_0/KtorServerTelemetryBuilder.kt index 31d3a0892a6f..7a6c72a68564 100644 --- a/instrumentation/ktor/ktor-3.0/library/src/main/kotlin/io/opentelemetry/instrumentation/ktor/v3_0/KtorServerTelemetryBuilder.kt +++ b/instrumentation/ktor/ktor-3.0/library/src/main/kotlin/io/opentelemetry/instrumentation/ktor/v3_0/KtorServerTelemetryBuilder.kt @@ -11,7 +11,7 @@ import io.opentelemetry.context.Context import io.opentelemetry.instrumentation.api.semconv.http.HttpServerRoute import io.opentelemetry.instrumentation.api.semconv.http.HttpServerRouteSource import io.opentelemetry.instrumentation.ktor.v2_0.common.AbstractKtorServerTelemetryBuilder -import io.opentelemetry.instrumentation.ktor.v2_0.common.internal.KtorServerTelemetryUtil +import io.opentelemetry.instrumentation.ktor.v2_0.common.internal.KtorServerTelemetryUtil.configureTelemetry import io.opentelemetry.instrumentation.ktor.v3_0.InstrumentationProperties.INSTRUMENTATION_NAME class KtorServerTelemetryBuilder internal constructor( @@ -21,7 +21,7 @@ class KtorServerTelemetryBuilder internal constructor( val KtorServerTelemetry = createRouteScopedPlugin("OpenTelemetry", { KtorServerTelemetryBuilder(INSTRUMENTATION_NAME) }) { require(pluginConfig.isOpenTelemetryInitialized()) { "OpenTelemetry must be set" } - KtorServerTelemetryUtil.configureTelemetry(pluginConfig, application) + configureTelemetry(pluginConfig, application) application.monitor.subscribe(RoutingRoot.RoutingCallStarted) { call -> HttpServerRoute.update(Context.current(), HttpServerRouteSource.SERVER, { _, arg -> arg!!.route.parent.toString() }, call) diff --git a/instrumentation/ktor/ktor-3.0/library/src/test/kotlin/io/opentelemetry/instrumentation/ktor/v3_0/KtorServerMetricsTest.kt b/instrumentation/ktor/ktor-3.0/library/src/test/kotlin/io/opentelemetry/instrumentation/ktor/v3_0/KtorServerMetricsTest.kt new file mode 100644 index 000000000000..4962caed04a1 --- /dev/null +++ b/instrumentation/ktor/ktor-3.0/library/src/test/kotlin/io/opentelemetry/instrumentation/ktor/v3_0/KtorServerMetricsTest.kt @@ -0,0 +1,117 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.ktor.v3_0 + +import io.ktor.http.* +import io.ktor.server.application.* +import io.ktor.server.engine.* +import io.ktor.server.netty.* +import io.ktor.server.response.* +import io.ktor.server.routing.* +import io.opentelemetry.instrumentation.ktor.v2_0.common.internal.Experimental +import io.opentelemetry.instrumentation.ktor.v3_0.InstrumentationProperties.INSTRUMENTATION_NAME +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension +import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpServerUsingTest +import io.opentelemetry.instrumentation.testing.junit.http.HttpServerInstrumentationExtension +import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint +import io.opentelemetry.sdk.metrics.data.MetricData +import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions +import io.opentelemetry.semconv.HttpAttributes +import io.opentelemetry.semconv.UrlAttributes +import io.opentelemetry.testing.internal.armeria.common.AggregatedHttpRequest +import io.opentelemetry.testing.internal.armeria.common.HttpMethod +import org.assertj.core.api.ThrowingConsumer +import org.junit.jupiter.api.AfterAll +import org.junit.jupiter.api.BeforeAll +import org.junit.jupiter.api.extension.RegisterExtension +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.Arguments +import org.junit.jupiter.params.provider.Arguments.arguments +import org.junit.jupiter.params.provider.MethodSource +import java.util.concurrent.TimeUnit +import java.util.stream.Stream + +class KtorServerMetricsTest : AbstractHttpServerUsingTest>() { + companion object { + @JvmStatic + @RegisterExtension + val testing: InstrumentationExtension = HttpServerInstrumentationExtension.forLibrary() + } + + private val errorDuringSendEndpoint = ServerEndpoint("errorDuringSend", "error-during-send", 500, "") + private val errorAfterSendEndpoint = ServerEndpoint("errorAfterSend", "error-after-send", 200, "") + + @BeforeAll + fun setupOptions() { + startServer() + } + + @AfterAll + fun cleanup() { + cleanupServer() + } + + override fun getContextPath() = "" + + override fun setupServer(): EmbeddedServer<*, *> = embeddedServer(Netty, port = port) { + install(KtorServerTelemetry) { + setOpenTelemetry(testing.openTelemetry) + Experimental.emitExperimentalTelemetry(this) + } + + routing { + get(errorDuringSendEndpoint.path) { + call.respondBytesWriter { + throw IllegalArgumentException("exception") + } + } + get(errorAfterSendEndpoint.path) { + call.respondText(errorAfterSendEndpoint.body, status = HttpStatusCode.fromValue(errorAfterSendEndpoint.status)) + throw IllegalArgumentException("exception") + } + } + }.start() + + override fun stopServer(server: EmbeddedServer<*, *>) { + server.stop(0, 10, TimeUnit.SECONDS) + } + + @ParameterizedTest + @MethodSource("provideArguments") + fun testActiveRequestsMetric(endpoint: ServerEndpoint) { + val request = AggregatedHttpRequest.of(HttpMethod.valueOf("GET"), resolveAddress(endpoint)) + try { + client.execute(request).aggregate().join() + } catch (_: Throwable) { + // we expect server error + } + + testing.waitAndAssertMetrics( + INSTRUMENTATION_NAME, + "http.server.active_requests" + ) { metrics -> + metrics!!.anySatisfy(ThrowingConsumer { metric: MetricData? -> + OpenTelemetryAssertions.assertThat(metric) + .hasDescription("Number of active HTTP server requests.") + .hasUnit("{requests}") + .hasLongSumSatisfying { sum -> + sum.hasPointsSatisfying({ point -> + point.hasValue(0) + .hasAttributesSatisfying { + OpenTelemetryAssertions.equalTo(HttpAttributes.HTTP_REQUEST_METHOD, "GET") + OpenTelemetryAssertions.equalTo(UrlAttributes.URL_PATH, endpoint.path) + } + }) + } + }) + } + } + + private fun provideArguments(): Stream = Stream.of( + arguments(errorDuringSendEndpoint), + arguments(errorAfterSendEndpoint), + ) +} From 21d026880b596756d28c6fa74226ab8c3c44285c Mon Sep 17 00:00:00 2001 From: Mariia Skripchenko <61115099+marychatte@users.noreply.github.com> Date: Fri, 28 Nov 2025 11:38:05 +0100 Subject: [PATCH 2/2] Add comment --- .../instrumentation/ktor/v3_0/KtorServerMetricsTest.kt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/instrumentation/ktor/ktor-3.0/library/src/test/kotlin/io/opentelemetry/instrumentation/ktor/v3_0/KtorServerMetricsTest.kt b/instrumentation/ktor/ktor-3.0/library/src/test/kotlin/io/opentelemetry/instrumentation/ktor/v3_0/KtorServerMetricsTest.kt index 4962caed04a1..a304d894e485 100644 --- a/instrumentation/ktor/ktor-3.0/library/src/test/kotlin/io/opentelemetry/instrumentation/ktor/v3_0/KtorServerMetricsTest.kt +++ b/instrumentation/ktor/ktor-3.0/library/src/test/kotlin/io/opentelemetry/instrumentation/ktor/v3_0/KtorServerMetricsTest.kt @@ -79,6 +79,9 @@ class KtorServerMetricsTest : AbstractHttpServerUsingTest>( server.stop(0, 10, TimeUnit.SECONDS) } + // regression test for + // https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/15303 + // verify that active requests are counted correctly when there is a send error @ParameterizedTest @MethodSource("provideArguments") fun testActiveRequestsMetric(endpoint: ServerEndpoint) {