Skip to content

Commit c47b4e4

Browse files
authored
fix: handling workflow step call expections (#2376)
1 parent 8ce0eec commit c47b4e4

File tree

1 file changed

+40
-30
lines changed

1 file changed

+40
-30
lines changed

sdk/java-sdk-protobuf/src/main/scala/kalix/javasdk/impl/workflow/WorkflowRouter.scala

Lines changed: 40 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,14 @@ import java.nio.ByteBuffer
88
import java.util.Optional
99
import java.util.concurrent.CompletionStage
1010
import java.util.function.{ Function => JFunc }
11+
1112
import scala.concurrent.ExecutionContext
1213
import scala.concurrent.Future
1314
import scala.jdk.CollectionConverters._
1415
import scala.jdk.OptionConverters.RichOptional
1516
import scala.jdk.FutureConverters._
17+
import scala.util.Try
18+
1619
import com.google.api.HttpBody
1720
import com.google.protobuf.any.{ Any => ScalaPbAny }
1821
import kalix.javasdk.DeferredCall
@@ -176,44 +179,51 @@ abstract class WorkflowRouter[S, W <: AbstractWorkflow[S]](protected val workflo
176179
case None => null // to meet a signature of supplier expressed as a function
177180
}
178181

179-
val defCall = call.callFunc
180-
.asInstanceOf[JFunc[Any, DeferredCall[Any, Any]]]
181-
.apply(decodedInput)
182-
183-
val (commandName, serviceName) =
184-
defCall match {
185-
case grpcDefCall: GrpcDeferredCall[_, _] =>
186-
(grpcDefCall.methodName, grpcDefCall.fullServiceName)
187-
case restDefCall: RestDeferredCall[_, _] =>
188-
(restDefCall.methodName, restDefCall.fullServiceName)
189-
case _ =>
190-
// should never happen, but needs to make compiler happy
191-
throw new IllegalStateException("Unknown DeferredCall implementation")
182+
Future
183+
.fromTry(Try {
184+
call.callFunc
185+
.asInstanceOf[JFunc[Any, DeferredCall[Any, Any]]]
186+
.apply(decodedInput)
187+
})
188+
.map { defCall =>
189+
190+
val (commandName, serviceName) =
191+
defCall match {
192+
case grpcDefCall: GrpcDeferredCall[_, _] =>
193+
(grpcDefCall.methodName, grpcDefCall.fullServiceName)
194+
case restDefCall: RestDeferredCall[_, _] =>
195+
(restDefCall.methodName, restDefCall.fullServiceName)
196+
case _ =>
197+
// should never happen, but needs to make compiler happy
198+
throw new IllegalStateException("Unknown DeferredCall implementation")
199+
}
200+
201+
val stepDefCall =
202+
StepDeferredCall(
203+
serviceName,
204+
commandName,
205+
payload = Some(messageCodec.encodeScala(defCall.message())),
206+
metadata = MetadataImpl.toProtocol(defCall.metadata()))
207+
208+
StepResponse(commandId, stepName, StepResponse.Response.DeferredCall(stepDefCall))
209+
}
210+
.recover { case t: Throwable =>
211+
log.error("Workflow call failed.", t)
212+
StepResponse(commandId, stepName, StepResponse.Response.ExecutionFailed(StepExecutionFailed(t.getMessage)))
192213
}
193-
194-
val stepDefCall =
195-
StepDeferredCall(
196-
serviceName,
197-
commandName,
198-
payload = Some(messageCodec.encodeScala(defCall.message())),
199-
metadata = MetadataImpl.toProtocol(defCall.metadata()))
200-
201-
Future.successful {
202-
StepResponse(commandId, stepName, StepResponse.Response.DeferredCall(stepDefCall))
203-
}
204214

205215
case Some(call: AsyncCallStep[_, _, _]) =>
206216
val decodedInput = input match {
207217
case Some(inputValue) => decodeInput(messageCodec, inputValue, call.callInputClass)
208218
case None => null // to meet a signature of supplier expressed as a function
209219
}
210220

211-
val future = call.callFunc
212-
.asInstanceOf[JFunc[Any, CompletionStage[Any]]]
213-
.apply(decodedInput)
214-
.asScala
215-
216-
future
221+
Future {
222+
call.callFunc
223+
.asInstanceOf[JFunc[Any, CompletionStage[Any]]]
224+
.apply(decodedInput)
225+
.asScala
226+
}.flatten
217227
.map { res =>
218228
val encoded = messageCodec.encodeScala(res)
219229
val executedRes = StepExecuted(Some(encoded))

0 commit comments

Comments
 (0)