From f53f4a9d5c6b78375e581cd1f700d04a6c4cdeec Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Thu, 3 Oct 2024 21:47:59 +0100 Subject: [PATCH 1/5] test otel span is passed from pekko routes to actors --- .../pekkohttp/v1_0/PekkoHttpTestSetup.java | 43 ++++++++++++ .../v1_0/PekkoHttpServerWithActorTest.scala | 47 +++++++++++++ .../PekkoHttpTestWebServerWithActor.scala | 67 +++++++++++++++++++ 3 files changed, 157 insertions(+) create mode 100644 instrumentation/pekko/pekko-http-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpTestSetup.java create mode 100644 instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpServerWithActorTest.scala create mode 100644 instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpTestWebServerWithActor.scala diff --git a/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpTestSetup.java b/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpTestSetup.java new file mode 100644 index 000000000000..746ad10a0602 --- /dev/null +++ b/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpTestSetup.java @@ -0,0 +1,43 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0; + +import static io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpServerUsingTest.TEST_CLIENT_IP; +import static io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpServerUsingTest.TEST_USER_AGENT; + +import io.opentelemetry.instrumentation.test.utils.PortUtils; +import io.opentelemetry.testing.internal.armeria.client.ClientFactory; +import io.opentelemetry.testing.internal.armeria.client.WebClient; +import io.opentelemetry.testing.internal.armeria.client.logging.LoggingClient; +import io.opentelemetry.testing.internal.armeria.common.HttpHeaderNames; +import java.time.Duration; + +public final class PekkoHttpTestSetup { + + private final int port; + private final WebClient client; + + public PekkoHttpTestSetup() { + port = PortUtils.findOpenPort(); + client = + WebClient.builder() + .responseTimeout(Duration.ofMinutes(1)) + .writeTimeout(Duration.ofMinutes(1)) + .factory(ClientFactory.builder().connectTimeout(Duration.ofMinutes(1)).build()) + .setHeader(HttpHeaderNames.USER_AGENT, TEST_USER_AGENT) + .setHeader(HttpHeaderNames.X_FORWARDED_FOR, TEST_CLIENT_IP) + .decorator(LoggingClient.newDecorator()) + .build(); + } + + public int getPort() { + return port; + } + + public WebClient getClient() { + return client; + } +} diff --git a/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpServerWithActorTest.scala b/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpServerWithActorTest.scala new file mode 100644 index 000000000000..20cd05473589 --- /dev/null +++ b/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpServerWithActorTest.scala @@ -0,0 +1,47 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0 + +import java.net.URI + +import io.opentelemetry.instrumentation.test.utils.PortUtils +import io.opentelemetry.instrumentation.testing.junit.http.HttpServerInstrumentationExtension +import io.opentelemetry.testing.internal.armeria.common.{ + AggregatedHttpRequest, + HttpMethod +} +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.{AfterAll, BeforeAll, Test, TestInstance} + +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +class PekkoHttpServerWithActorTest { + + val setup = new PekkoHttpTestSetup() + + @BeforeAll def setupOptions(): Unit = { + PekkoHttpTestWebServerWithActor.start(setup.getPort()) + } + + @AfterAll def cleanup(): Unit = { + PekkoHttpTestWebServerWithActor.stop() + } + + @Test def testSpan(): Unit = { + val address = new URI(s"http://localhost:${setup.getPort()}/") + val request = AggregatedHttpRequest.of( + HttpMethod.GET, + address.resolve("/test").toString + ) + val response = setup.getClient().execute(request).aggregate.join + assertThat(response.status.code).isEqualTo(200) + val responseText = response.contentUtf8 + val splits = responseText.split("\n") + assertThat(splits.length).isEqualTo(2) + val routeSpan = splits(0).substring(6, splits(0).length) + val actorSpan = splits(1).substring(6, splits(1).length) + assertThat(routeSpan).isEqualTo(actorSpan) + } +} diff --git a/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpTestWebServerWithActor.scala b/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpTestWebServerWithActor.scala new file mode 100644 index 000000000000..1143e1e99ffe --- /dev/null +++ b/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpTestWebServerWithActor.scala @@ -0,0 +1,67 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0 + +import io.opentelemetry.api.trace.Span +import org.apache.pekko.actor.{Actor, ActorSystem, Props} +import org.apache.pekko.http.scaladsl.Http +import org.apache.pekko.http.scaladsl.Http.ServerBinding +import org.apache.pekko.http.scaladsl.server.Directives._ +import org.apache.pekko.pattern.ask +import org.apache.pekko.util.Timeout + +import scala.concurrent.{Await, ExecutionContext} +import scala.concurrent.duration.DurationInt + +object PekkoHttpTestWebServerWithActor { + implicit val system: ActorSystem = ActorSystem("http-server-with-actor") + // needed for the future flatMap/onComplete in the end + implicit val executionContext: ExecutionContext = system.dispatcher + + private case object TestMessage + private class KamonTestActor extends Actor { + def receive = { case TestMessage => + sender() ! spanSummary(Span.current()) + } + } + + val kamonTestActor = system.actorOf(Props[KamonTestActor]()) + + var route = get { + path("test") { + complete { + val kamonSummary = spanSummary(Span.current()) + kamonTestActor.ask(TestMessage)(Timeout(5.seconds)).mapTo[String].map { + actorSummary => + s"Route=$kamonSummary\nActor=$actorSummary" + } + } + } + } + + private var binding: ServerBinding = null + + def start(port: Int): Unit = synchronized { + if (null == binding) { + binding = + Await.result(Http().bindAndHandle(route, "localhost", port), 10.seconds) + } + } + + def stop(): Unit = synchronized { + if (null != binding) { + binding.unbind() + system.terminate() + binding = null + } + } + + def spanSummary(span: Span): String = { + val spanId = span.getSpanContext().getSpanId() + val traceId = span.getSpanContext().getTraceId() + s"Span(traceId=$traceId, spanId=$spanId)" + } +} From e38024168465cea2d59306e829250ff8e17ce94b Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Thu, 3 Oct 2024 21:52:37 +0100 Subject: [PATCH 2/5] stray imports --- .../pekkohttp/v1_0/PekkoHttpServerWithActorTest.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpServerWithActorTest.scala b/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpServerWithActorTest.scala index 20cd05473589..4c4dd7cec2cf 100644 --- a/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpServerWithActorTest.scala +++ b/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpServerWithActorTest.scala @@ -7,8 +7,6 @@ package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0 import java.net.URI -import io.opentelemetry.instrumentation.test.utils.PortUtils -import io.opentelemetry.instrumentation.testing.junit.http.HttpServerInstrumentationExtension import io.opentelemetry.testing.internal.armeria.common.{ AggregatedHttpRequest, HttpMethod From ef58870bb562b66b42d9c7ce037a763b2f572f44 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Thu, 3 Oct 2024 23:39:47 +0100 Subject: [PATCH 3/5] Update PekkoHttpTestWebServerWithActor.scala --- .../v1_0/PekkoHttpTestWebServerWithActor.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpTestWebServerWithActor.scala b/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpTestWebServerWithActor.scala index 1143e1e99ffe..17d174281a46 100644 --- a/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpTestWebServerWithActor.scala +++ b/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpTestWebServerWithActor.scala @@ -22,21 +22,21 @@ object PekkoHttpTestWebServerWithActor { implicit val executionContext: ExecutionContext = system.dispatcher private case object TestMessage - private class KamonTestActor extends Actor { + private class SpanTestActor extends Actor { def receive = { case TestMessage => sender() ! spanSummary(Span.current()) } } - val kamonTestActor = system.actorOf(Props[KamonTestActor]()) + val spanTestActor = system.actorOf(Props[SpanTestActor]()) var route = get { path("test") { complete { - val kamonSummary = spanSummary(Span.current()) - kamonTestActor.ask(TestMessage)(Timeout(5.seconds)).mapTo[String].map { + val otelSummary = spanSummary(Span.current()) + spanTestActor.ask(TestMessage)(Timeout(5.seconds)).mapTo[String].map { actorSummary => - s"Route=$kamonSummary\nActor=$actorSummary" + s"Route=$otelSummary\nActor=$actorSummary" } } } From 1addd436df2cd76d96ab011ce84b2f662962b175 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Tue, 8 Oct 2024 14:51:54 +0100 Subject: [PATCH 4/5] refactor test --- .../pekkohttp/v1_0/PekkoHttpTestSetup.java | 43 ------------------- .../v1_0/PekkoHttpServerWithActorTest.scala | 30 +++++++++---- .../PekkoHttpTestWebServerWithActor.scala | 18 +++----- 3 files changed, 27 insertions(+), 64 deletions(-) delete mode 100644 instrumentation/pekko/pekko-http-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpTestSetup.java diff --git a/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpTestSetup.java b/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpTestSetup.java deleted file mode 100644 index 746ad10a0602..000000000000 --- a/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpTestSetup.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0; - -import static io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpServerUsingTest.TEST_CLIENT_IP; -import static io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpServerUsingTest.TEST_USER_AGENT; - -import io.opentelemetry.instrumentation.test.utils.PortUtils; -import io.opentelemetry.testing.internal.armeria.client.ClientFactory; -import io.opentelemetry.testing.internal.armeria.client.WebClient; -import io.opentelemetry.testing.internal.armeria.client.logging.LoggingClient; -import io.opentelemetry.testing.internal.armeria.common.HttpHeaderNames; -import java.time.Duration; - -public final class PekkoHttpTestSetup { - - private final int port; - private final WebClient client; - - public PekkoHttpTestSetup() { - port = PortUtils.findOpenPort(); - client = - WebClient.builder() - .responseTimeout(Duration.ofMinutes(1)) - .writeTimeout(Duration.ofMinutes(1)) - .factory(ClientFactory.builder().connectTimeout(Duration.ofMinutes(1)).build()) - .setHeader(HttpHeaderNames.USER_AGENT, TEST_USER_AGENT) - .setHeader(HttpHeaderNames.X_FORWARDED_FOR, TEST_CLIENT_IP) - .decorator(LoggingClient.newDecorator()) - .build(); - } - - public int getPort() { - return port; - } - - public WebClient getClient() { - return client; - } -} diff --git a/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpServerWithActorTest.scala b/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpServerWithActorTest.scala index 4c4dd7cec2cf..4dedd7f06530 100644 --- a/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpServerWithActorTest.scala +++ b/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpServerWithActorTest.scala @@ -7,33 +7,45 @@ package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0 import java.net.URI +import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpServerUsingTest +import io.opentelemetry.instrumentation.test.utils.PortUtils +import io.opentelemetry.testing.internal.armeria.client.{ClientFactory, WebClient} +import io.opentelemetry.testing.internal.armeria.client.logging.LoggingClient import io.opentelemetry.testing.internal.armeria.common.{ AggregatedHttpRequest, + HttpHeaderNames, HttpMethod } import org.assertj.core.api.Assertions.assertThat -import org.junit.jupiter.api.{AfterAll, BeforeAll, Test, TestInstance} +import org.junit.jupiter.api.{AfterAll, Test, TestInstance} @TestInstance(TestInstance.Lifecycle.PER_CLASS) class PekkoHttpServerWithActorTest { - val setup = new PekkoHttpTestSetup() - - @BeforeAll def setupOptions(): Unit = { - PekkoHttpTestWebServerWithActor.start(setup.getPort()) - } + private val port = PortUtils.findOpenPort() + val server = new PekkoHttpTestWebServerWithActor(port) @AfterAll def cleanup(): Unit = { - PekkoHttpTestWebServerWithActor.stop() + server.stop() } + private val minute = java.time.Duration.ofMinutes(1) + private val client = WebClient.builder() + .responseTimeout(minute) + .writeTimeout(minute) + .factory(ClientFactory.builder().connectTimeout(minute).build()) + .setHeader(HttpHeaderNames.USER_AGENT, AbstractHttpServerUsingTest.TEST_USER_AGENT) + .setHeader(HttpHeaderNames.X_FORWARDED_FOR, AbstractHttpServerUsingTest.TEST_CLIENT_IP) + .decorator(LoggingClient.newDecorator()) + .build() + @Test def testSpan(): Unit = { - val address = new URI(s"http://localhost:${setup.getPort()}/") + val address = new URI(s"http://localhost:$port") val request = AggregatedHttpRequest.of( HttpMethod.GET, address.resolve("/test").toString ) - val response = setup.getClient().execute(request).aggregate.join + val response = client.execute(request).aggregate.join assertThat(response.status.code).isEqualTo(200) val responseText = response.contentUtf8 val splits = responseText.split("\n") diff --git a/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpTestWebServerWithActor.scala b/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpTestWebServerWithActor.scala index 17d174281a46..972938d43d89 100644 --- a/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpTestWebServerWithActor.scala +++ b/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpTestWebServerWithActor.scala @@ -16,7 +16,7 @@ import org.apache.pekko.util.Timeout import scala.concurrent.{Await, ExecutionContext} import scala.concurrent.duration.DurationInt -object PekkoHttpTestWebServerWithActor { +class PekkoHttpTestWebServerWithActor(port: Int) { implicit val system: ActorSystem = ActorSystem("http-server-with-actor") // needed for the future flatMap/onComplete in the end implicit val executionContext: ExecutionContext = system.dispatcher @@ -28,9 +28,9 @@ object PekkoHttpTestWebServerWithActor { } } - val spanTestActor = system.actorOf(Props[SpanTestActor]()) + val spanTestActor = system.actorOf(Props(new SpanTestActor)) - var route = get { + val route = get { path("test") { complete { val otelSummary = spanSummary(Span.current()) @@ -42,17 +42,11 @@ object PekkoHttpTestWebServerWithActor { } } - private var binding: ServerBinding = null - - def start(port: Int): Unit = synchronized { - if (null == binding) { - binding = - Await.result(Http().bindAndHandle(route, "localhost", port), 10.seconds) - } - } + var binding: ServerBinding = + Await.result(Http().bindAndHandle(route, "localhost", port), 10.seconds) def stop(): Unit = synchronized { - if (null != binding) { + if (binding != null) { binding.unbind() system.terminate() binding = null From dd5ea77bb9b380642a365a6227c767d687af2267 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Tue, 8 Oct 2024 15:02:59 +0100 Subject: [PATCH 5/5] Update PekkoHttpTestWebServerWithActor.scala --- .../PekkoHttpTestWebServerWithActor.scala | 31 ++++++++++--------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpTestWebServerWithActor.scala b/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpTestWebServerWithActor.scala index 972938d43d89..8ce9935847bd 100644 --- a/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpTestWebServerWithActor.scala +++ b/instrumentation/pekko/pekko-http-1.0/javaagent/src/test/scala/io/opentelemetry/javaagent/instrumentation/pekkohttp/v1_0/PekkoHttpTestWebServerWithActor.scala @@ -16,19 +16,28 @@ import org.apache.pekko.util.Timeout import scala.concurrent.{Await, ExecutionContext} import scala.concurrent.duration.DurationInt -class PekkoHttpTestWebServerWithActor(port: Int) { - implicit val system: ActorSystem = ActorSystem("http-server-with-actor") - // needed for the future flatMap/onComplete in the end - implicit val executionContext: ExecutionContext = system.dispatcher - - private case object TestMessage - private class SpanTestActor extends Actor { +object PekkoHttpTestWebServerWithActor { + case object TestMessage + class SpanTestActor extends Actor { def receive = { case TestMessage => sender() ! spanSummary(Span.current()) } } - val spanTestActor = system.actorOf(Props(new SpanTestActor)) + def spanSummary(span: Span): String = { + val spanId = span.getSpanContext().getSpanId() + val traceId = span.getSpanContext().getTraceId() + s"Span(traceId=$traceId, spanId=$spanId)" + } +} + +class PekkoHttpTestWebServerWithActor(port: Int) { + import PekkoHttpTestWebServerWithActor._ + implicit val system: ActorSystem = ActorSystem("http-server-with-actor") + // needed for the future flatMap/onComplete in the end + implicit val executionContext: ExecutionContext = system.dispatcher + + val spanTestActor = system.actorOf(Props[SpanTestActor]()) val route = get { path("test") { @@ -52,10 +61,4 @@ class PekkoHttpTestWebServerWithActor(port: Int) { binding = null } } - - def spanSummary(span: Span): String = { - val spanId = span.getSpanContext().getSpanId() - val traceId = span.getSpanContext().getTraceId() - s"Span(traceId=$traceId, spanId=$spanId)" - } }