Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0

import java.net.URI

import io.opentelemetry.instrumentation.testing.junit.http.AbstractHttpServerUsingTest
import io.opentelemetry.instrumentation.test.utils.PortUtils
import io.opentelemetry.testing.internal.armeria.client.{ClientFactory, WebClient}
import io.opentelemetry.testing.internal.armeria.client.logging.LoggingClient
import io.opentelemetry.testing.internal.armeria.common.{
AggregatedHttpRequest,
HttpHeaderNames,
HttpMethod
}
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.{AfterAll, Test, TestInstance}

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class PekkoHttpServerWithActorTest {

private val port = PortUtils.findOpenPort()
val server = new PekkoHttpTestWebServerWithActor(port)

@AfterAll def cleanup(): Unit = {
server.stop()
}

private val minute = java.time.Duration.ofMinutes(1)
private val client = WebClient.builder()
.responseTimeout(minute)
.writeTimeout(minute)
.factory(ClientFactory.builder().connectTimeout(minute).build())
.setHeader(HttpHeaderNames.USER_AGENT, AbstractHttpServerUsingTest.TEST_USER_AGENT)
.setHeader(HttpHeaderNames.X_FORWARDED_FOR, AbstractHttpServerUsingTest.TEST_CLIENT_IP)
.decorator(LoggingClient.newDecorator())
.build()

@Test def testSpan(): Unit = {
val address = new URI(s"http://localhost:$port")
val request = AggregatedHttpRequest.of(
HttpMethod.GET,
address.resolve("/test").toString
)
val response = client.execute(request).aggregate.join
assertThat(response.status.code).isEqualTo(200)
val responseText = response.contentUtf8
val splits = responseText.split("\n")
assertThat(splits.length).isEqualTo(2)
val routeSpan = splits(0).substring(6, splits(0).length)
val actorSpan = splits(1).substring(6, splits(1).length)
assertThat(routeSpan).isEqualTo(actorSpan)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.pekkohttp.v1_0

import io.opentelemetry.api.trace.Span
import org.apache.pekko.actor.{Actor, ActorSystem, Props}
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.http.scaladsl.Http.ServerBinding
import org.apache.pekko.http.scaladsl.server.Directives._
import org.apache.pekko.pattern.ask
import org.apache.pekko.util.Timeout

import scala.concurrent.{Await, ExecutionContext}
import scala.concurrent.duration.DurationInt

object PekkoHttpTestWebServerWithActor {
case object TestMessage
class SpanTestActor extends Actor {
def receive = { case TestMessage =>
sender() ! spanSummary(Span.current())
}
}

def spanSummary(span: Span): String = {
val spanId = span.getSpanContext().getSpanId()
val traceId = span.getSpanContext().getTraceId()
s"Span(traceId=$traceId, spanId=$spanId)"
}
}

class PekkoHttpTestWebServerWithActor(port: Int) {
import PekkoHttpTestWebServerWithActor._
implicit val system: ActorSystem = ActorSystem("http-server-with-actor")
// needed for the future flatMap/onComplete in the end
implicit val executionContext: ExecutionContext = system.dispatcher

val spanTestActor = system.actorOf(Props[SpanTestActor]())

val route = get {
path("test") {
complete {
val otelSummary = spanSummary(Span.current())
spanTestActor.ask(TestMessage)(Timeout(5.seconds)).mapTo[String].map {
actorSummary =>
s"Route=$otelSummary\nActor=$actorSummary"
}
}
}
}

var binding: ServerBinding =
Await.result(Http().bindAndHandle(route, "localhost", port), 10.seconds)

def stop(): Unit = synchronized {
if (binding != null) {
binding.unbind()
system.terminate()
binding = null
}
}
}
Loading