| 
 | 1 | +/*  | 
 | 2 | + * Copyright The OpenTelemetry Authors  | 
 | 3 | + * SPDX-License-Identifier: Apache-2.0  | 
 | 4 | + */  | 
 | 5 | + | 
 | 6 | +package io.opentelemetry.instrumentation.ktor.v1_0  | 
 | 7 | + | 
 | 8 | +import io.ktor.application.*  | 
 | 9 | +import io.ktor.request.*  | 
 | 10 | +import io.ktor.response.*  | 
 | 11 | +import io.ktor.routing.*  | 
 | 12 | +import io.ktor.util.*  | 
 | 13 | +import io.ktor.util.pipeline.*  | 
 | 14 | +import io.opentelemetry.api.OpenTelemetry  | 
 | 15 | +import io.opentelemetry.context.Context  | 
 | 16 | +import io.opentelemetry.extension.kotlin.asContextElement  | 
 | 17 | +import io.opentelemetry.instrumentation.api.incubator.builder.internal.DefaultHttpServerInstrumenterBuilder  | 
 | 18 | +import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor  | 
 | 19 | +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter  | 
 | 20 | +import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor  | 
 | 21 | +import io.opentelemetry.instrumentation.api.instrumenter.SpanStatusBuilder  | 
 | 22 | +import io.opentelemetry.instrumentation.api.instrumenter.SpanStatusExtractor  | 
 | 23 | +import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil  | 
 | 24 | +import io.opentelemetry.instrumentation.api.semconv.http.HttpServerRoute  | 
 | 25 | +import io.opentelemetry.instrumentation.api.semconv.http.HttpServerRouteSource  | 
 | 26 | +import kotlinx.coroutines.withContext  | 
 | 27 | + | 
 | 28 | +class KtorServerTelemetry private constructor(  | 
 | 29 | +  private val instrumenter: Instrumenter<ApplicationRequest, ApplicationResponse>,  | 
 | 30 | +) {  | 
 | 31 | + | 
 | 32 | +  class Configuration {  | 
 | 33 | +    internal lateinit var builder: DefaultHttpServerInstrumenterBuilder<ApplicationRequest, ApplicationResponse>  | 
 | 34 | + | 
 | 35 | +    internal var spanKindExtractor:  | 
 | 36 | +      (SpanKindExtractor<ApplicationRequest>) -> SpanKindExtractor<ApplicationRequest> = { a -> a }  | 
 | 37 | + | 
 | 38 | +    fun setOpenTelemetry(openTelemetry: OpenTelemetry) {  | 
 | 39 | +      this.builder =  | 
 | 40 | +        DefaultHttpServerInstrumenterBuilder.create(  | 
 | 41 | +          INSTRUMENTATION_NAME,  | 
 | 42 | +          openTelemetry,  | 
 | 43 | +          KtorHttpServerAttributesGetter.INSTANCE  | 
 | 44 | +        )  | 
 | 45 | +    }  | 
 | 46 | + | 
 | 47 | +    fun setStatusExtractor(  | 
 | 48 | +      extractor: (SpanStatusExtractor<in ApplicationRequest, in ApplicationResponse>) -> SpanStatusExtractor<in ApplicationRequest, in ApplicationResponse>  | 
 | 49 | +    ) {  | 
 | 50 | +      builder.setStatusExtractor { prevExtractor ->  | 
 | 51 | +        SpanStatusExtractor { spanStatusBuilder: SpanStatusBuilder,  | 
 | 52 | +                              request: ApplicationRequest,  | 
 | 53 | +                              response: ApplicationResponse?,  | 
 | 54 | +                              throwable: Throwable? ->  | 
 | 55 | +          extractor(prevExtractor).extract(spanStatusBuilder, request, response, throwable)  | 
 | 56 | +        }  | 
 | 57 | +      }  | 
 | 58 | +    }  | 
 | 59 | + | 
 | 60 | +    fun setSpanKindExtractor(extractor: (SpanKindExtractor<ApplicationRequest>) -> SpanKindExtractor<ApplicationRequest>) {  | 
 | 61 | +      this.spanKindExtractor = extractor  | 
 | 62 | +    }  | 
 | 63 | + | 
 | 64 | +    fun addAttributeExtractor(extractor: AttributesExtractor<in ApplicationRequest, in ApplicationResponse>) {  | 
 | 65 | +      builder.addAttributesExtractor(extractor)  | 
 | 66 | +    }  | 
 | 67 | + | 
 | 68 | +    fun setCapturedRequestHeaders(requestHeaders: List<String>) {  | 
 | 69 | +      builder.setCapturedRequestHeaders(requestHeaders)  | 
 | 70 | +    }  | 
 | 71 | + | 
 | 72 | +    fun setCapturedResponseHeaders(responseHeaders: List<String>) {  | 
 | 73 | +      builder.setCapturedResponseHeaders(responseHeaders)  | 
 | 74 | +    }  | 
 | 75 | + | 
 | 76 | +    fun setKnownMethods(knownMethods: Set<String>) {  | 
 | 77 | +      builder.setKnownMethods(knownMethods)  | 
 | 78 | +    }  | 
 | 79 | + | 
 | 80 | +    internal fun isOpenTelemetryInitialized(): Boolean = this::builder.isInitialized  | 
 | 81 | +  }  | 
 | 82 | + | 
 | 83 | +  private fun start(call: ApplicationCall): Context? {  | 
 | 84 | +    val parentContext = Context.current()  | 
 | 85 | +    if (!instrumenter.shouldStart(parentContext, call.request)) {  | 
 | 86 | +      return null  | 
 | 87 | +    }  | 
 | 88 | + | 
 | 89 | +    return instrumenter.start(parentContext, call.request)  | 
 | 90 | +  }  | 
 | 91 | + | 
 | 92 | +  private fun end(context: Context, call: ApplicationCall, error: Throwable?) {  | 
 | 93 | +    instrumenter.end(context, call.request, call.response, error)  | 
 | 94 | +  }  | 
 | 95 | + | 
 | 96 | +  companion object Feature : ApplicationFeature<Application, Configuration, KtorServerTelemetry> {  | 
 | 97 | +    private const val INSTRUMENTATION_NAME = "io.opentelemetry.ktor-1.0"  | 
 | 98 | + | 
 | 99 | +    private val contextKey = AttributeKey<Context>("OpenTelemetry")  | 
 | 100 | +    private val errorKey = AttributeKey<Throwable>("OpenTelemetryException")  | 
 | 101 | + | 
 | 102 | +    override val key: AttributeKey<KtorServerTelemetry> = AttributeKey("OpenTelemetry")  | 
 | 103 | + | 
 | 104 | +    override fun install(pipeline: Application, configure: Configuration.() -> Unit): KtorServerTelemetry {  | 
 | 105 | +      val configuration = Configuration().apply(configure)  | 
 | 106 | + | 
 | 107 | +      if (!configuration.isOpenTelemetryInitialized()) {  | 
 | 108 | +        throw IllegalArgumentException("OpenTelemetry must be set")  | 
 | 109 | +      }  | 
 | 110 | + | 
 | 111 | +      val instrumenter = InstrumenterUtil.buildUpstreamInstrumenter(  | 
 | 112 | +        configuration.builder.instrumenterBuilder(),  | 
 | 113 | +        ApplicationRequestGetter,  | 
 | 114 | +        configuration.spanKindExtractor(SpanKindExtractor.alwaysServer())  | 
 | 115 | +      )  | 
 | 116 | + | 
 | 117 | +      val feature = KtorServerTelemetry(instrumenter)  | 
 | 118 | + | 
 | 119 | +      val startPhase = PipelinePhase("OpenTelemetry")  | 
 | 120 | +      pipeline.insertPhaseBefore(ApplicationCallPipeline.Monitoring, startPhase)  | 
 | 121 | +      pipeline.intercept(startPhase) {  | 
 | 122 | +        val context = feature.start(call)  | 
 | 123 | + | 
 | 124 | +        if (context != null) {  | 
 | 125 | +          call.attributes.put(contextKey, context)  | 
 | 126 | +          withContext(context.asContextElement()) {  | 
 | 127 | +            try {  | 
 | 128 | +              proceed()  | 
 | 129 | +            } catch (err: Throwable) {  | 
 | 130 | +              // Stash error for reporting later since need ktor to finish setting up the response  | 
 | 131 | +              call.attributes.put(errorKey, err)  | 
 | 132 | +              throw err  | 
 | 133 | +            }  | 
 | 134 | +          }  | 
 | 135 | +        } else {  | 
 | 136 | +          proceed()  | 
 | 137 | +        }  | 
 | 138 | +      }  | 
 | 139 | + | 
 | 140 | +      val postSendPhase = PipelinePhase("OpenTelemetryPostSend")  | 
 | 141 | +      pipeline.sendPipeline.insertPhaseAfter(ApplicationSendPipeline.After, postSendPhase)  | 
 | 142 | +      pipeline.sendPipeline.intercept(postSendPhase) {  | 
 | 143 | +        val context = call.attributes.getOrNull(contextKey)  | 
 | 144 | +        if (context != null) {  | 
 | 145 | +          var error: Throwable? = call.attributes.getOrNull(errorKey)  | 
 | 146 | +          try {  | 
 | 147 | +            proceed()  | 
 | 148 | +          } catch (t: Throwable) {  | 
 | 149 | +            error = t  | 
 | 150 | +            throw t  | 
 | 151 | +          } finally {  | 
 | 152 | +            feature.end(context, call, error)  | 
 | 153 | +          }  | 
 | 154 | +        } else {  | 
 | 155 | +          proceed()  | 
 | 156 | +        }  | 
 | 157 | +      }  | 
 | 158 | + | 
 | 159 | +      pipeline.environment.monitor.subscribe(Routing.RoutingCallStarted) { call ->  | 
 | 160 | +        val context = call.attributes.getOrNull(contextKey)  | 
 | 161 | +        if (context != null) {  | 
 | 162 | +          HttpServerRoute.update(context, HttpServerRouteSource.SERVER, { _, arg -> arg.route.parent.toString() }, call)  | 
 | 163 | +        }  | 
 | 164 | +      }  | 
 | 165 | + | 
 | 166 | +      return feature  | 
 | 167 | +    }  | 
 | 168 | +  }  | 
 | 169 | +}  | 
0 commit comments