- 
                Notifications
    
You must be signed in to change notification settings  - Fork 1k
 
Closed
Labels
bugSomething isn't workingSomething isn't workingneeds triageNew issue that requires triageNew issue that requires triage
Description
Describe the bug
The after pattern causes the instrumentation to get confused. This is related to the execution context provided. To make it work, manual wrapping must be used.
Steps to reproduce
https://github.com/wsargent/akka-after-loses-otel-thread-context
package org.example.application
import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.trace.Span
import io.opentelemetry.context.Context
import org.apache.pekko.Done
import org.apache.pekko.actor.ActorSystem
import org.slf4j.LoggerFactory
import scala.language.reflectiveCalls
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, ExecutionContext, Future}
object Main {
  private val logger = LoggerFactory.getLogger(getClass)
  private val openTelemetry = GlobalOpenTelemetry.get()
  private val tracer = openTelemetry.getTracer("example")
  // https://tersesystems.com/blog/2024/06/20/executioncontext.parasitic-and-friends/
  private val opportunisticExecutionContext = (scala.concurrent.ExecutionContext: {def opportunistic: scala.concurrent.ExecutionContextExecutor}).opportunistic
  def main(args: Array[String]): Unit = {
    implicit val actorSystem = ActorSystem("example")
    implicit val ec = ExecutionContext.global
    val f = for {
      _ <- operation("wrapping")
      _ <- operation("global")
      _ <- operation("parasitic")
      _ <- operation("opportunistic")
      _ <- operation("dispatcher")
    } yield actorSystem.terminate()
    Await.result(f, 30.seconds)
  }
  def operation(mode: String)(implicit actorSystem: ActorSystem): Future[Done] = {
    traceSync(s"root $mode") {
      val expectedSpan = Span.current()
      logger.info(s"mode: We expect ${expectedSpan}")
      val afterExecutionContext = defineExecutionContext(mode)
      org.apache.pekko.pattern.after(1.second, actorSystem.scheduler) {
        val actualSpan = Span.current()
        Future.successful {
          if (!expectedSpan.equals(actualSpan)) {
            logger.error(s"$mode: Unexpected $actualSpan")
          } else {
            logger.info(s"$mode: Reached delayed with $actualSpan")
          }
          Done
        }
      }(afterExecutionContext)
    }
  }
  def traceSync[A](traceName: String)(block: => A): A = {
    val span = tracer.spanBuilder(traceName).startSpan()
    assert(span.isRecording, "No-op span, you must run this class with the java agent so it instruments correctly!")
    try {
      val scope = span.makeCurrent()
      try {
        block
      } finally {
        scope.close()
      }
    } finally {
      span.end()
    }
  }
  def defineExecutionContext(mode: String)(implicit system: ActorSystem): ExecutionContext = {
    val dispatcher = system.classicSystem.dispatcher
    mode match {
      case "wrapping" =>
        val context = Context.current()
        new ExecutionContext {
          override def execute(runnable: Runnable): Unit = dispatcher.execute(context.wrap(runnable))
          override def reportFailure(cause: Throwable): Unit = dispatcher.reportFailure(cause)
        }
      case "global" =>
        ExecutionContext.global
      case "parasitic" =>
        ExecutionContext.parasitic
      case "opportunistic" =>
        opportunisticExecutionContext
      case _ =>
        dispatcher
    }
  }
}Expected behavior
The after pattern carries over the existing span context.
Actual behavior
100.123.234.53 ❱ ./bin/akka-after-loses-thread-context
OpenJDK 64-Bit Server VM warning: Sharing is only supported for boot loader classes because bootstrap classpath has been appended
129   INFO  trace_id= span_id= [main] io.opentelemetry.javaagent.tooling.VersionLogger - opentelemetry-javaagent - version: 2.4.0
678   INFO  trace_id=8746ecc707c7eb642df61a1933b396e9 span_id=1a8c4711d7702a04 [main] org.example.application.Main$ - mode: We expect ApplicationSpan{agentSpan=SdkSpan{traceId=8746ecc707c7eb642df61a1933b396e9, spanId=1a8c4711d7702a04, parentSpanContext=ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}, name=root wrapping, kind=INTERNAL, attributes=AttributesMap{data={thread.name=main, thread.id=1}, capacity=128, totalAddedValues=2}, status=ImmutableStatusData{statusCode=UNSET, description=}, totalRecordedEvents=0, totalRecordedLinks=0, startEpochNanos=1720292847324051943, endEpochNanos=0}}
680   INFO  trace_id= span_id= [main] io.opentelemetry.exporter.logging.LoggingSpanExporter - 'root wrapping' : 8746ecc707c7eb642df61a1933b396e9 1a8c4711d7702a04 INTERNAL [tracer: example:] AttributesMap{data={thread.name=main, thread.id=1}, capacity=128, totalAddedValues=2}
1711  INFO  trace_id=8746ecc707c7eb642df61a1933b396e9 span_id=1a8c4711d7702a04 [example-pekko.actor.default-dispatcher-5] org.example.application.Main$ - wrapping: Reached delayed with ApplicationSpan{agentSpan=SdkSpan{traceId=8746ecc707c7eb642df61a1933b396e9, spanId=1a8c4711d7702a04, parentSpanContext=ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}, name=root wrapping, kind=INTERNAL, attributes=AttributesMap{data={thread.name=main, thread.id=1}, capacity=128, totalAddedValues=2}, status=ImmutableStatusData{statusCode=UNSET, description=}, totalRecordedEvents=0, totalRecordedLinks=0, startEpochNanos=1720292847324051943, endEpochNanos=1720292847338923884}}
1715  INFO  trace_id=8746ecc707c7eb642df61a1933b396e9 span_id=7246937d638eba1a [scala-execution-context-global-26] org.example.application.Main$ - mode: We expect ApplicationSpan{agentSpan=SdkSpan{traceId=8746ecc707c7eb642df61a1933b396e9, spanId=7246937d638eba1a, parentSpanContext=ImmutableSpanContext{traceId=8746ecc707c7eb642df61a1933b396e9, spanId=1a8c4711d7702a04, traceFlags=01, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=true}, name=root global, kind=INTERNAL, attributes=AttributesMap{data={thread.name=scala-execution-context-global-26, thread.id=26}, capacity=128, totalAddedValues=2}, status=ImmutableStatusData{statusCode=UNSET, description=}, totalRecordedEvents=0, totalRecordedLinks=0, startEpochNanos=1720292848374104426, endEpochNanos=0}}
1715  INFO  trace_id=8746ecc707c7eb642df61a1933b396e9 span_id=1a8c4711d7702a04 [scala-execution-context-global-26] io.opentelemetry.exporter.logging.LoggingSpanExporter - 'root global' : 8746ecc707c7eb642df61a1933b396e9 7246937d638eba1a INTERNAL [tracer: example:] AttributesMap{data={thread.name=scala-execution-context-global-26, thread.id=26}, capacity=128, totalAddedValues=2}
2730  ERROR trace_id= span_id= [scala-execution-context-global-26] org.example.application.Main$ - global: Unexpected PropagatedSpan{ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}}
2731  INFO  trace_id=4a46e96a6cd3bb1fe64335281a6bb229 span_id=3f967afae96d6627 [scala-execution-context-global-26] org.example.application.Main$ - mode: We expect ApplicationSpan{agentSpan=SdkSpan{traceId=4a46e96a6cd3bb1fe64335281a6bb229, spanId=3f967afae96d6627, parentSpanContext=ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}, name=root parasitic, kind=INTERNAL, attributes=AttributesMap{data={thread.name=scala-execution-context-global-26, thread.id=26}, capacity=128, totalAddedValues=2}, status=ImmutableStatusData{statusCode=UNSET, description=}, totalRecordedEvents=0, totalRecordedLinks=0, startEpochNanos=1720292849389997315, endEpochNanos=0}}
2731  INFO  trace_id= span_id= [scala-execution-context-global-26] io.opentelemetry.exporter.logging.LoggingSpanExporter - 'root parasitic' : 4a46e96a6cd3bb1fe64335281a6bb229 3f967afae96d6627 INTERNAL [tracer: example:] AttributesMap{data={thread.name=scala-execution-context-global-26, thread.id=26}, capacity=128, totalAddedValues=2}
3747  ERROR trace_id= span_id= [example-scheduler-1] org.example.application.Main$ - parasitic: Unexpected PropagatedSpan{ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}}
3748  INFO  trace_id=f0c1bc85c8635a598f51244982df7750 span_id=5e10398ab14c17ee [scala-execution-context-global-26] org.example.application.Main$ - mode: We expect ApplicationSpan{agentSpan=SdkSpan{traceId=f0c1bc85c8635a598f51244982df7750, spanId=5e10398ab14c17ee, parentSpanContext=ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}, name=root opportunistic, kind=INTERNAL, attributes=AttributesMap{data={thread.name=scala-execution-context-global-26, thread.id=26}, capacity=128, totalAddedValues=2}, status=ImmutableStatusData{statusCode=UNSET, description=}, totalRecordedEvents=0, totalRecordedLinks=0, startEpochNanos=1720292850407256495, endEpochNanos=0}}
3748  INFO  trace_id= span_id= [scala-execution-context-global-26] io.opentelemetry.exporter.logging.LoggingSpanExporter - 'root opportunistic' : f0c1bc85c8635a598f51244982df7750 5e10398ab14c17ee INTERNAL [tracer: example:] AttributesMap{data={thread.name=scala-execution-context-global-26, thread.id=26}, capacity=128, totalAddedValues=2}
4767  ERROR trace_id= span_id= [scala-execution-context-global-26] org.example.application.Main$ - opportunistic: Unexpected PropagatedSpan{ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}}
4768  INFO  trace_id=077ce825f5698598d8ca24c433f812f5 span_id=0b8912756c432c1d [scala-execution-context-global-26] org.example.application.Main$ - mode: We expect ApplicationSpan{agentSpan=SdkSpan{traceId=077ce825f5698598d8ca24c433f812f5, spanId=0b8912756c432c1d, parentSpanContext=ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}, name=root dispatcher, kind=INTERNAL, attributes=AttributesMap{data={thread.name=scala-execution-context-global-26, thread.id=26}, capacity=128, totalAddedValues=2}, status=ImmutableStatusData{statusCode=UNSET, description=}, totalRecordedEvents=0, totalRecordedLinks=0, startEpochNanos=1720292851427164157, endEpochNanos=0}}
4768  INFO  trace_id= span_id= [scala-execution-context-global-26] io.opentelemetry.exporter.logging.LoggingSpanExporter - 'root dispatcher' : 077ce825f5698598d8ca24c433f812f5 0b8912756c432c1d INTERNAL [tracer: example:] AttributesMap{data={thread.name=scala-execution-context-global-26, thread.id=26}, capacity=128, totalAddedValues=2}
5788  ERROR trace_id= span_id= [example-pekko.actor.default-dispatcher-5] org.example.application.Main$ - dispatcher: Unexpected PropagatedSpan{ImmutableSpanContext{traceId=00000000000000000000000000000000, spanId=0000000000000000, traceFlags=00, traceState=ArrayBasedTraceState{entries=[]}, remote=false, valid=false}}
[INFO] [07/06/2024 12:07:32.469] [scala-execution-context-global-26] [CoordinatedShutdown(pekko://example)] Running CoordinatedShutdown with reason [ActorSystemTerminateReason]
Javaagent or library instrumentation version
1.39.0
Environment
JDK:
openjdk version "17.0.7" 2023-04-18
OpenJDK Runtime Environment Temurin-17.0.7+7 (build 17.0.7+7)
OpenJDK 64-Bit Server VM Temurin-17.0.7+7 (build 17.0.7+7, mixed mode, sharing)
OS:
Linux devserver 6.5.0-41-generic #41~22.04.2-Ubuntu SMP PREEMPT_DYNAMIC Mon Jun 3 11:32:55 UTC 2 x86_64 x86_64 x86_64 GNU/Linux
Additional context
https://github.com/wsargent/opentelemetry-with-scala-futures
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't workingneeds triageNew issue that requires triageNew issue that requires triage