Skip to content

Commit d2ce1e3

Browse files
feat: tracing support for timer calls (#2044)
* feat: tracing support for timer calls * fixing test * Apply suggestions from code review Co-authored-by: Francisco Lopez-Sancho <[email protected]> * fixing compilation --------- Co-authored-by: Francisco Lopez-Sancho <[email protected]>
1 parent 51e37ad commit d2ce1e3

File tree

8 files changed

+71
-27
lines changed

8 files changed

+71
-27
lines changed

samples/java-spring-choreography-saga-quickstart/src/it/java/user/registry/UserCreationIntegrationTest.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import java.time.Duration;
1010

1111
import static org.assertj.core.api.Assertions.assertThat;
12-
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
12+
import static org.awaitility.Awaitility.await;
1313

1414
/**
1515
* This is a skeleton for implementing integration tests for a Kalix application built with the Java SDK.
@@ -108,10 +108,7 @@ public void testUserCreationFailureDueToInvalidInput() throws Exception {
108108

109109
await()
110110
.ignoreExceptions()
111-
// timer will fire in 3 seconds and un-reserve the email
112-
// see it/resources/application.conf for the configuration
113-
// we only start to polling after 3 seconds to give the timer a chance to fire
114-
.between(Duration.ofSeconds(3), Duration.ofSeconds(6))
111+
.timeout(Duration.ofSeconds(10)) //3 seconds for the projection lag + 3 seconds for the timer to fire
115112
.untilAsserted(() -> {
116113
assertThat(callGetEmailInfo.execute())
117114
.succeedsWithin(timeout)

sdk/java-sdk-protobuf/src/main/java/kalix/javasdk/action/Action.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ public final TimerScheduler timers() {
103103
ActionContextImpl impl =
104104
(ActionContextImpl)
105105
actionContext("Timers can only be scheduled or cancelled when handling a message.");
106-
return new TimerSchedulerImpl(impl.messageCodec(), impl.system());
106+
return new TimerSchedulerImpl(impl.messageCodec(), impl.system(), impl.componentCallMetadata());
107107
}
108108

109109
/**

sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/telemetry/Telemetry.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ private final class TraceInstrumentation(
190190
if (logger.isTraceEnabled) logger.trace("`traceparent` found")
191191

192192
val context = openTelemetry.getPropagators.getTextMapPropagator
193-
.extract(OtelContext.current(), metadata, otelGetter.asInstanceOf[TextMapGetter[Object]])
193+
.extract(OtelContext.current(), metadata, otelGetter)
194194

195195
val span = openTelemetry
196196
.getTracer("java-sdk")
@@ -217,7 +217,7 @@ private final class TraceInstrumentation(
217217
if (logger.isTraceEnabled) logger.trace("`traceparent` found")
218218

219219
val context = openTelemetry.getPropagators.getTextMapPropagator
220-
.extract(OtelContext.current(), metadata, otelGetter.asInstanceOf[TextMapGetter[Object]])
220+
.extract(OtelContext.current(), metadata, otelGetter)
221221

222222
val span = getTracer()
223223
.spanBuilder(command.name)

sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/timer/TimerSchedulerImpl.scala

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,32 @@ package kalix.javasdk.impl.timer
1919
import java.time.Duration
2020
import java.util.concurrent.CompletionStage
2121

22+
import scala.jdk.CollectionConverters._
2223
import scala.jdk.FutureConverters.FutureOps
2324

2425
import akka.Done
2526
import akka.actor.ActorSystem
27+
import akka.grpc.scaladsl.SingleResponseRequestBuilder
28+
import com.google.protobuf.any.{ Any => ScalaPbAny }
2629
import com.google.protobuf.duration.{ Duration => ProtoDuration }
2730
import com.google.protobuf.wrappers.StringValue
28-
import com.google.protobuf.any.{ Any => ScalaPbAny }
2931
import kalix.javasdk.DeferredCall
30-
import kalix.javasdk.impl.{ GrpcClients, GrpcDeferredCall, MessageCodec, RestDeferredCall }
32+
import kalix.javasdk.Metadata
33+
import kalix.javasdk.impl.GrpcClients
34+
import kalix.javasdk.impl.GrpcDeferredCall
35+
import kalix.javasdk.impl.MessageCodec
36+
import kalix.javasdk.impl.RestDeferredCall
3137
import kalix.javasdk.timer.TimerScheduler
3238
import kalix.timers.timers.Call
3339
import kalix.timers.timers.SingleTimer
3440
import kalix.timers.timers.TimerService
41+
import kalix.timers.timers.TimerServiceClient
3542

3643
/** INTERNAL API */
37-
private[kalix] final class TimerSchedulerImpl(val messageCodec: MessageCodec, val system: ActorSystem)
44+
private[kalix] final class TimerSchedulerImpl(
45+
val messageCodec: MessageCodec,
46+
val system: ActorSystem,
47+
val metadata: Metadata)
3848
extends TimerScheduler {
3949

4050
override def startSingleTimer[I, O](
@@ -48,7 +58,8 @@ private[kalix] final class TimerSchedulerImpl(val messageCodec: MessageCodec, va
4858
delay: Duration,
4959
maxRetries: Int,
5060
deferredCall: DeferredCall[I, O]): CompletionStage[Done] = {
51-
val timerServiceClient = GrpcClients(system).getProxyGrpcClient(classOf[TimerService])
61+
val timerServiceClient =
62+
GrpcClients(system).getProxyGrpcClient(classOf[TimerService]).asInstanceOf[TimerServiceClient]
5263

5364
val call = deferredCall match {
5465
case grpcDeferredCall: GrpcDeferredCall[I, O] =>
@@ -64,13 +75,22 @@ private[kalix] final class TimerSchedulerImpl(val messageCodec: MessageCodec, va
6475
}
6576

6677
val singleTimer = SingleTimer(name, Some(call), Some(ProtoDuration(delay)), maxRetries)
67-
68-
timerServiceClient.addSingle(singleTimer).asJava.thenApply(_ => Done)
78+
addHeaders(timerServiceClient.addSingle(), metadata).invoke(singleTimer).asJava.thenApply(_ => Done)
6979
}
7080

7181
def cancel(name: String): CompletionStage[Done] = {
72-
val timerServiceClient = GrpcClients(system).getProxyGrpcClient(classOf[TimerService])
73-
timerServiceClient.remove(StringValue(name)).asJava.thenApply(_ => Done)
82+
val timerServiceClient =
83+
GrpcClients(system).getProxyGrpcClient(classOf[TimerService]).asInstanceOf[TimerServiceClient]
84+
addHeaders(timerServiceClient.remove(), metadata).invoke(StringValue(name)).asJava.thenApply(_ => Done)
85+
}
86+
87+
private def addHeaders[I, O](
88+
callBuilder: SingleResponseRequestBuilder[I, O],
89+
metadata: Metadata): SingleResponseRequestBuilder[I, O] = {
90+
metadata.asScala.foldLeft(callBuilder) { case (builder, entry) =>
91+
if (entry.isText) builder.addHeader(entry.getKey, entry.getValue)
92+
else builder
93+
}
7494
}
7595

7696
}

sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/workflow/WorkflowImpl.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ final class WorkflowImpl(system: ActorSystem, val services: Map[String, Workflow
282282
val metadata = new MetadataImpl(command.metadata.map(_.entries.toVector).getOrElse(Nil))
283283

284284
val context = new CommandContextImpl(workflowId, command.name, command.id, metadata, system)
285-
val timerScheduler = new TimerSchedulerImpl(service.messageCodec, system)
285+
val timerScheduler = new TimerSchedulerImpl(service.messageCodec, system, context.componentCallMetadata)
286286

287287
val cmd =
288288
service.messageCodec.decodeMessage(
@@ -306,7 +306,7 @@ final class WorkflowImpl(system: ActorSystem, val services: Map[String, Workflow
306306
case Step(executeStep) =>
307307
val context =
308308
new CommandContextImpl(workflowId, executeStep.stepName, executeStep.commandId, Metadata.EMPTY, system)
309-
val timerScheduler = new TimerSchedulerImpl(service.messageCodec, system)
309+
val timerScheduler = new TimerSchedulerImpl(service.messageCodec, system, context.componentCallMetadata)
310310
val stepResponse =
311311
try {
312312
executeStep.userState.foreach { state =>

sdk/scala-sdk-protobuf/src/main/scala/kalix/scalasdk/action/Action.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ import scala.concurrent.Future
2222
import kalix.scalasdk.{ DeferredCall, Metadata, SideEffect }
2323
import kalix.scalasdk.impl.action.ActionEffectImpl
2424
import io.grpc.Status
25+
import kalix.javasdk.impl
2526
import kalix.javasdk.impl.action.ActionContextImpl
27+
import kalix.scalasdk.impl.MetadataImpl
2628
import kalix.scalasdk.impl.action.ScalaActionContextAdapter
2729
import kalix.scalasdk.timer.TimerScheduler
2830
import kalix.scalasdk.impl.timer.TimerSchedulerImpl
@@ -259,7 +261,10 @@ abstract class Action {
259261
s"Incompatible ActionContext instance. Found ${other.getClass}, expecting ${classOf[ActionContextImpl].getName}")
260262
}
261263

262-
new TimerSchedulerImpl(javaActionContextImpl.messageCodec, javaActionContextImpl.system)
264+
new TimerSchedulerImpl(
265+
javaActionContextImpl.messageCodec,
266+
javaActionContextImpl.system,
267+
MetadataImpl(javaActionContextImpl.metadata.asInstanceOf[impl.MetadataImpl]))
263268
}
264269

265270
protected final def effects[T]: Action.Effect.Builder =

sdk/scala-sdk-protobuf/src/main/scala/kalix/scalasdk/impl/timer/TimerSchedulerImpl.scala

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,24 @@ import scala.jdk.DurationConverters.ScalaDurationOps
2323

2424
import akka.Done
2525
import akka.actor.ActorSystem
26+
import akka.grpc.scaladsl.SingleResponseRequestBuilder
2627
import com.google.protobuf.duration.{ Duration => ProtoDuration }
2728
import com.google.protobuf.wrappers.StringValue
2829
import kalix.javasdk.impl.GrpcDeferredCall
2930
import kalix.javasdk.impl.GrpcClients
3031
import kalix.javasdk.impl.MessageCodec
3132
import kalix.scalasdk.DeferredCall
33+
import kalix.scalasdk.Metadata
3234
import kalix.scalasdk.impl.ScalaDeferredCallAdapter
3335
import kalix.scalasdk.timer.TimerScheduler
3436
import kalix.timers.timers.Call
3537
import kalix.timers.timers.SingleTimer
3638
import kalix.timers.timers.TimerService
39+
import kalix.timers.timers.TimerServiceClient
3740

3841
/** INTERNAL API */
39-
private[kalix] final class TimerSchedulerImpl(messageCodec: MessageCodec, system: ActorSystem) extends TimerScheduler {
42+
private[kalix] final class TimerSchedulerImpl(messageCodec: MessageCodec, system: ActorSystem, metadata: Metadata)
43+
extends TimerScheduler {
4044

4145
override def startSingleTimer[I, O](
4246
name: String,
@@ -49,7 +53,8 @@ private[kalix] final class TimerSchedulerImpl(messageCodec: MessageCodec, system
4953
delay: FiniteDuration,
5054
maxRetries: Int,
5155
deferredCall: DeferredCall[I, O]): Future[Done] = {
52-
val timerServiceClient = GrpcClients(system).getProxyGrpcClient(classOf[TimerService])
56+
val timerServiceClient =
57+
GrpcClients(system).getProxyGrpcClient(classOf[TimerService]).asInstanceOf[TimerServiceClient]
5358

5459
val deferredCallImpl =
5560
deferredCall match {
@@ -67,12 +72,25 @@ private[kalix] final class TimerSchedulerImpl(messageCodec: MessageCodec, system
6772
Some(messageCodec.encodeScala(deferredCall.message)))
6873

6974
val singleTimer = SingleTimer(name, Some(call), Some(ProtoDuration(delay.toJava)))
70-
timerServiceClient.addSingle(singleTimer).map(_ => Done)(ExecutionContext.parasitic)
71-
75+
addHeaders(timerServiceClient.addSingle(), metadata)
76+
.invoke(singleTimer)
77+
.map(_ => Done)(ExecutionContext.parasitic)
7278
}
7379

7480
override def cancel(name: String): Future[Done] = {
75-
val timerServiceClient = GrpcClients(system).getProxyGrpcClient(classOf[TimerService])
76-
timerServiceClient.remove(StringValue(name)).map(_ => Done)(ExecutionContext.parasitic)
81+
val timerServiceClient =
82+
GrpcClients(system).getProxyGrpcClient(classOf[TimerService]).asInstanceOf[TimerServiceClient]
83+
addHeaders(timerServiceClient.remove(), metadata)
84+
.invoke(StringValue(name))
85+
.map(_ => Done)(ExecutionContext.parasitic)
86+
}
87+
88+
private def addHeaders[I, O](
89+
callBuilder: SingleResponseRequestBuilder[I, O],
90+
metadata: Metadata): SingleResponseRequestBuilder[I, O] = {
91+
metadata.foldLeft(callBuilder) { case (builder, entry) =>
92+
if (entry.isText) builder.addHeader(entry.key, entry.value)
93+
else builder
94+
}
7795
}
7896
}

sdk/scala-sdk-protobuf/src/main/scala/kalix/scalasdk/impl/workflow/WorkflowAdapters.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,11 @@ import scala.jdk.OptionConverters._
2727
import akka.stream.Materializer
2828
import com.google.protobuf.Descriptors
2929
import kalix.javasdk
30+
import kalix.javasdk.impl
3031
import kalix.javasdk.timer.TimerScheduler
31-
import kalix.javasdk.workflow.AbstractWorkflow.RecoverStrategy.MaxRetries
3232
import kalix.scalasdk.impl.InternalContext
3333
import kalix.scalasdk.impl.MetadataConverters
34+
import kalix.scalasdk.impl.MetadataImpl
3435
import kalix.scalasdk.impl.ScalaDeferredCallAdapter
3536
import kalix.scalasdk.impl.timer.TimerSchedulerImpl
3637
import kalix.scalasdk.workflow.AbstractWorkflow
@@ -57,7 +58,10 @@ private[scalasdk] final class JavaWorkflowAdapter[S >: Null](scalaSdkWorkflow: A
5758
override def _internalSetTimerScheduler(timerScheduler: Optional[TimerScheduler]): Unit = {
5859
scalaSdkWorkflow._internalSetTimerScheduler(timerScheduler.toScala.map {
5960
case javaTimerScheduler: kalix.javasdk.impl.timer.TimerSchedulerImpl =>
60-
new TimerSchedulerImpl(javaTimerScheduler.messageCodec, javaTimerScheduler.system)
61+
new TimerSchedulerImpl(
62+
javaTimerScheduler.messageCodec,
63+
javaTimerScheduler.system,
64+
MetadataImpl(javaTimerScheduler.metadata.asInstanceOf[impl.MetadataImpl]))
6165
})
6266
}
6367

0 commit comments

Comments
 (0)