Skip to content

Commit a50a794

Browse files
authored
fix: Ensure streaming calls from proxy are parallel #1078 (#1081)
Calls to stream out methods from the same client are sequential with Akka HTTP/gRPC by default, this adds an async boundary to allow them to run in parallel.
1 parent 8801c9f commit a50a794

File tree

5 files changed

+5
-0
lines changed

5 files changed

+5
-0
lines changed

sdk/java-sdk/src/main/scala/kalix/javasdk/impl/action/ActionsImpl.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ private[javasdk] final class ActionsImpl(
266266
// user stream failed with an "unexpected" error
267267
handleUnexpectedException(service, in, ex)
268268
}
269+
.async
269270
} catch {
270271
case NonFatal(ex) =>
271272
// command handler threw an "unexpected" error

sdk/java-sdk/src/main/scala/kalix/javasdk/impl/eventsourcedentity/EventSourcedEntitiesImpl.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ final class EventSourcedEntitiesImpl(
247247
EventSourcedStreamOut(OutFailure(Failure(description = s"Unexpected failure [$correlationId]")))
248248
}
249249
}
250+
.async
250251
}
251252

252253
private class CommandContextImpl(

sdk/java-sdk/src/main/scala/kalix/javasdk/impl/replicatedentity/ReplicatedEntitiesImpl.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ final class ReplicatedEntitiesImpl(system: ActorSystem, services: Map[String, Re
100100
ReplicatedEntityStreamOut(Out.Failure(Failure(description = s"Unexpected error [$correlationId]")))
101101
}
102102
}
103+
.async
103104

104105
private def runEntity(
105106
init: ReplicatedEntityInit): Flow[ReplicatedEntityStreamIn, ReplicatedEntityStreamOut, NotUsed] = {

sdk/java-sdk/src/main/scala/kalix/javasdk/impl/valueentity/ValueEntitiesImpl.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ final class ValueEntitiesImpl(system: ActorSystem, val services: Map[String, Val
114114
ValueEntityStreamOut(OutFailure(Failure(description = s"Unexpected error [$correlationId]")))
115115
}
116116
}
117+
.async
117118

118119
private def runEntity(init: ValueEntityInit): Flow[ValueEntityStreamIn, ValueEntityStreamOut, NotUsed] = {
119120
val service =

sdk/java-sdk/src/main/scala/kalix/javasdk/impl/view/ViewsImpl.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ final class ViewsImpl(system: ActorSystem, _services: Map[String, ViewService],
150150
s"Kalix protocol failure: expected ReceiveEvent message, but got ${other.getClass.getName}"
151151
Source.failed(new RuntimeException(errMsg))
152152
}
153+
.async
153154

154155
private final class UpdateContextImpl(
155156
override val viewId: String,

0 commit comments

Comments
 (0)