Skip to content

Commit c36a597

Browse files
committed
Fix double end() calls in KtorServerTelemetry
1 parent 0c5e3ad commit c36a597

File tree

4 files changed

+132
-20
lines changed

4 files changed

+132
-20
lines changed

instrumentation/ktor/ktor-2-common/library/src/main/kotlin/io/opentelemetry/instrumentation/ktor/v2_0/common/internal/KtorServerTelemetryUtil.kt

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
package io.opentelemetry.instrumentation.ktor.v2_0.common.internal
77

88
import io.ktor.server.application.*
9+
import io.ktor.server.application.hooks.*
910
import io.ktor.server.request.*
1011
import io.ktor.server.response.*
1112
import io.ktor.util.*
@@ -25,14 +26,15 @@ import kotlinx.coroutines.withContext
2526
*/
2627
object KtorServerTelemetryUtil {
2728

28-
fun configureTelemetry(builder: AbstractKtorServerTelemetryBuilder, application: Application) {
29+
fun PluginBuilder<*>.configureTelemetry(builder: AbstractKtorServerTelemetryBuilder, application: Application) {
2930
val contextKey = AttributeKey<Context>("OpenTelemetry")
3031
val errorKey = AttributeKey<Throwable>("OpenTelemetryException")
32+
val processedKey = AttributeKey<Unit>("OpenTelemetryProcessed")
3133

3234
val instrumenter = instrumenter(builder)
3335
val tracer = KtorServerTracer(instrumenter)
34-
val startPhase = PipelinePhase("OpenTelemetry")
3536

37+
val startPhase = PipelinePhase("OpenTelemetry")
3638
application.insertPhaseBefore(ApplicationCallPipeline.Setup, startPhase)
3739
application.intercept(startPhase) {
3840
val context = tracer.start(call)
@@ -59,22 +61,15 @@ object KtorServerTelemetryUtil {
5961
}
6062
}
6163

62-
val postSendPhase = PipelinePhase("OpenTelemetryPostSend")
63-
application.sendPipeline.insertPhaseAfter(ApplicationSendPipeline.After, postSendPhase)
64-
application.sendPipeline.intercept(postSendPhase) {
64+
on(ResponseSent) { call ->
65+
if (call.attributes.contains(processedKey)) {
66+
return@on
67+
}
68+
6569
val context = call.attributes.getOrNull(contextKey)
6670
if (context != null) {
67-
var error: Throwable? = call.attributes.getOrNull(errorKey)
68-
try {
69-
proceed()
70-
} catch (t: Throwable) {
71-
error = t
72-
throw t
73-
} finally {
74-
tracer.end(context, call, error)
75-
}
76-
} else {
77-
proceed()
71+
tracer.end(context, call, call.attributes.getOrNull(errorKey))
72+
call.attributes.put(processedKey, Unit)
7873
}
7974
}
8075
}

instrumentation/ktor/ktor-2.0/library/src/main/kotlin/io/opentelemetry/instrumentation/ktor/v2_0/KtorServerTelemetryBuilder.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import io.opentelemetry.instrumentation.api.semconv.http.HttpServerRoute
1212
import io.opentelemetry.instrumentation.api.semconv.http.HttpServerRouteSource
1313
import io.opentelemetry.instrumentation.ktor.v2_0.InstrumentationProperties.INSTRUMENTATION_NAME
1414
import io.opentelemetry.instrumentation.ktor.v2_0.common.AbstractKtorServerTelemetryBuilder
15-
import io.opentelemetry.instrumentation.ktor.v2_0.common.internal.KtorServerTelemetryUtil
15+
import io.opentelemetry.instrumentation.ktor.v2_0.common.internal.KtorServerTelemetryUtil.configureTelemetry
1616

1717
class KtorServerTelemetryBuilder internal constructor(
1818
instrumentationName: String
@@ -21,7 +21,7 @@ class KtorServerTelemetryBuilder internal constructor(
2121
val KtorServerTelemetry = createRouteScopedPlugin("OpenTelemetry", { KtorServerTelemetryBuilder(INSTRUMENTATION_NAME) }) {
2222
require(pluginConfig.isOpenTelemetryInitialized()) { "OpenTelemetry must be set" }
2323

24-
KtorServerTelemetryUtil.configureTelemetry(pluginConfig, application)
24+
configureTelemetry(pluginConfig, application)
2525

2626
application.environment.monitor.subscribe(Routing.RoutingCallStarted) { call ->
2727
HttpServerRoute.update(Context.current(), HttpServerRouteSource.SERVER, { _, arg -> arg!!.route.parent.toString() }, call)

instrumentation/ktor/ktor-3.0/library/src/main/kotlin/io/opentelemetry/instrumentation/ktor/v3_0/KtorServerTelemetryBuilder.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import io.opentelemetry.context.Context
1111
import io.opentelemetry.instrumentation.api.semconv.http.HttpServerRoute
1212
import io.opentelemetry.instrumentation.api.semconv.http.HttpServerRouteSource
1313
import io.opentelemetry.instrumentation.ktor.v2_0.common.AbstractKtorServerTelemetryBuilder
14-
import io.opentelemetry.instrumentation.ktor.v2_0.common.internal.KtorServerTelemetryUtil
14+
import io.opentelemetry.instrumentation.ktor.v2_0.common.internal.KtorServerTelemetryUtil.configureTelemetry
1515
import io.opentelemetry.instrumentation.ktor.v3_0.InstrumentationProperties.INSTRUMENTATION_NAME
1616

1717
class KtorServerTelemetryBuilder internal constructor(
@@ -21,7 +21,7 @@ class KtorServerTelemetryBuilder internal constructor(
2121
val KtorServerTelemetry = createRouteScopedPlugin("OpenTelemetry", { KtorServerTelemetryBuilder(INSTRUMENTATION_NAME) }) {
2222
require(pluginConfig.isOpenTelemetryInitialized()) { "OpenTelemetry must be set" }
2323

24-
KtorServerTelemetryUtil.configureTelemetry(pluginConfig, application)
24+
configureTelemetry(pluginConfig, application)
2525

2626
application.monitor.subscribe(RoutingRoot.RoutingCallStarted) { call ->
2727
HttpServerRoute.update(Context.current(), HttpServerRouteSource.SERVER, { _, arg -> arg!!.route.parent.toString() }, call)
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright The OpenTelemetry Authors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package io.opentelemetry.instrumentation.ktor.v3_0
7+
8+
import io.ktor.http.*
9+
import io.ktor.server.application.*
10+
import io.ktor.server.engine.*
11+
import io.ktor.server.netty.*
12+
import io.ktor.server.response.*
13+
import io.ktor.server.routing.*
14+
import io.opentelemetry.instrumentation.ktor.v2_0.common.internal.Experimental
15+
import io.opentelemetry.instrumentation.ktor.v3_0.InstrumentationProperties.INSTRUMENTATION_NAME
16+
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension
17+
import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpServerUsingTest
18+
import io.opentelemetry.instrumentation.testing.junit.http.HttpServerInstrumentationExtension
19+
import io.opentelemetry.instrumentation.testing.junit.http.ServerEndpoint
20+
import io.opentelemetry.sdk.metrics.data.MetricData
21+
import io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions
22+
import io.opentelemetry.semconv.HttpAttributes
23+
import io.opentelemetry.semconv.UrlAttributes
24+
import io.opentelemetry.testing.internal.armeria.common.AggregatedHttpRequest
25+
import io.opentelemetry.testing.internal.armeria.common.HttpMethod
26+
import org.assertj.core.api.ThrowingConsumer
27+
import org.junit.jupiter.api.AfterAll
28+
import org.junit.jupiter.api.BeforeAll
29+
import org.junit.jupiter.api.extension.RegisterExtension
30+
import org.junit.jupiter.params.ParameterizedTest
31+
import org.junit.jupiter.params.provider.Arguments
32+
import org.junit.jupiter.params.provider.Arguments.arguments
33+
import org.junit.jupiter.params.provider.MethodSource
34+
import java.util.concurrent.TimeUnit
35+
import java.util.stream.Stream
36+
37+
class KtorServerMetricsTest : AbstractHttpServerUsingTest<EmbeddedServer<*, *>>() {
38+
companion object {
39+
@JvmStatic
40+
@RegisterExtension
41+
val testing: InstrumentationExtension = HttpServerInstrumentationExtension.forLibrary()
42+
}
43+
44+
private val errorDuringSendEndpoint = ServerEndpoint("errorDuringSend", "error-during-send", 500, "")
45+
private val errorAfterSendEndpoint = ServerEndpoint("errorAfterSend", "error-after-send", 200, "")
46+
47+
@BeforeAll
48+
fun setupOptions() {
49+
startServer()
50+
}
51+
52+
@AfterAll
53+
fun cleanup() {
54+
cleanupServer()
55+
}
56+
57+
override fun getContextPath() = ""
58+
59+
override fun setupServer(): EmbeddedServer<*, *> = embeddedServer(Netty, port = port) {
60+
install(KtorServerTelemetry) {
61+
setOpenTelemetry(testing.openTelemetry)
62+
Experimental.emitExperimentalTelemetry(this)
63+
}
64+
65+
routing {
66+
get(errorDuringSendEndpoint.path) {
67+
call.respondBytesWriter {
68+
throw IllegalArgumentException("exception")
69+
}
70+
}
71+
get(errorAfterSendEndpoint.path) {
72+
call.respondText(errorAfterSendEndpoint.body, status = HttpStatusCode.fromValue(errorAfterSendEndpoint.status))
73+
throw IllegalArgumentException("exception")
74+
}
75+
}
76+
}.start()
77+
78+
override fun stopServer(server: EmbeddedServer<*, *>) {
79+
server.stop(0, 10, TimeUnit.SECONDS)
80+
}
81+
82+
@ParameterizedTest
83+
@MethodSource("provideArguments")
84+
fun testActiveRequestsMetric(endpoint: ServerEndpoint) {
85+
val request = AggregatedHttpRequest.of(HttpMethod.valueOf("GET"), resolveAddress(endpoint))
86+
try {
87+
client.execute(request).aggregate().join()
88+
} catch (_: Throwable) {
89+
// we expect server error
90+
}
91+
92+
testing.waitAndAssertMetrics(
93+
INSTRUMENTATION_NAME,
94+
"http.server.active_requests"
95+
) { metrics ->
96+
metrics!!.anySatisfy(ThrowingConsumer { metric: MetricData? ->
97+
OpenTelemetryAssertions.assertThat(metric)
98+
.hasDescription("Number of active HTTP server requests.")
99+
.hasUnit("{requests}")
100+
.hasLongSumSatisfying { sum ->
101+
sum.hasPointsSatisfying({ point ->
102+
point.hasValue(0)
103+
.hasAttributesSatisfying {
104+
OpenTelemetryAssertions.equalTo(HttpAttributes.HTTP_REQUEST_METHOD, "GET")
105+
OpenTelemetryAssertions.equalTo(UrlAttributes.URL_PATH, endpoint.path)
106+
}
107+
})
108+
}
109+
})
110+
}
111+
}
112+
113+
private fun provideArguments(): Stream<Arguments> = Stream.of(
114+
arguments(errorDuringSendEndpoint),
115+
arguments(errorAfterSendEndpoint),
116+
)
117+
}

0 commit comments

Comments
 (0)