diff --git a/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpServerWithActorTest.scala b/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpServerWithActorTest.scala new file mode 100644 index 000000000000..4dedd7f06530 --- /dev/null +++ b/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpServerWithActorTest.scala @@ -0,0 +1,57 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0 + +import java.net.URI + +import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpServerUsingTest +import io.opentelemetry.instrumentation.test.utils.PortUtils +import io.opentelemetry.testing.internal.armeria.client.{ClientFactory, WebClient} +import io.opentelemetry.testing.internal.armeria.client.logging.LoggingClient +import io.opentelemetry.testing.internal.armeria.common.{ + AggregatedHttpRequest, + HttpHeaderNames, + HttpMethod +} +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.{AfterAll, Test, TestInstance} + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class PekkoHttpServerWithActorTest { + + private val port = PortUtils.findOpenPort() + val server = new PekkoHttpTestWebServerWithActor(port) + + @AfterAll def cleanup(): Unit = { + server.stop() + } + + private val minute = java.time.Duration.ofMinutes(1) + private val client = WebClient.builder() + .responseTimeout(minute) + .writeTimeout(minute) + .factory(ClientFactory.builder().connectTimeout(minute).build()) + .setHeader(HttpHeaderNames.USER_AGENT, AbstractHttpServerUsingTest.TEST_USER_AGENT) + .setHeader(HttpHeaderNames.X_FORWARDED_FOR, AbstractHttpServerUsingTest.TEST_CLIENT_IP) + .decorator(LoggingClient.newDecorator()) + .build() + + @Test def testSpan(): Unit = { + val address = new URI(s"http://localhost:$port") + val request = AggregatedHttpRequest.of( + HttpMethod.GET, + address.resolve("/test").toString + ) + val response = client.execute(request).aggregate.join + assertThat(response.status.code).isEqualTo(200) + val responseText = response.contentUtf8 + val splits = responseText.split("\n") + assertThat(splits.length).isEqualTo(2) + val routeSpan = splits(0).substring(6, splits(0).length) + val actorSpan = splits(1).substring(6, splits(1).length) + assertThat(routeSpan).isEqualTo(actorSpan) + } +} diff --git a/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpTestWebServerWithActor.scala b/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpTestWebServerWithActor.scala new file mode 100644 index 000000000000..8ce9935847bd --- /dev/null +++ b/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpTestWebServerWithActor.scala @@ -0,0 +1,64 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0 + +import io.opentelemetry.api.trace.Span +import org.apache.pekko.actor.{Actor, ActorSystem, Props} +import org.apache.pekko.http.scaladsl.Http +import org.apache.pekko.http.scaladsl.Http.ServerBinding +import org.apache.pekko.http.scaladsl.server.Directives._ +import org.apache.pekko.pattern.ask +import org.apache.pekko.util.Timeout + +import scala.concurrent.{Await, ExecutionContext} +import scala.concurrent.duration.DurationInt + +object PekkoHttpTestWebServerWithActor { + case object TestMessage + class SpanTestActor extends Actor { + def receive = { case TestMessage => + sender() ! spanSummary(Span.current()) + } + } + + def spanSummary(span: Span): String = { + val spanId = span.getSpanContext().getSpanId() + val traceId = span.getSpanContext().getTraceId() + s"Span(traceId=$traceId, spanId=$spanId)" + } +} + +class PekkoHttpTestWebServerWithActor(port: Int) { + import PekkoHttpTestWebServerWithActor._ + implicit val system: ActorSystem = ActorSystem("http-server-with-actor") + // needed for the future flatMap/onComplete in the end + implicit val executionContext: ExecutionContext = system.dispatcher + + val spanTestActor = system.actorOf(Props[SpanTestActor]()) + + val route = get { + path("test") { + complete { + val otelSummary = spanSummary(Span.current()) + spanTestActor.ask(TestMessage)(Timeout(5.seconds)).mapTo[String].map { + actorSummary => + s"Route=$otelSummary\nActor=$actorSummary" + } + } + } + } + + var binding: ServerBinding = + Await.result(Http().bindAndHandle(route, "localhost", port), 10.seconds) + + def stop(): Unit = synchronized { + if (binding != null) { + binding.unbind() + system.terminate() + binding = null + } + } +}