Skip to content

Commit e9d6bb3

Browse files
feat: Allow multiple subscription to topics or EventSourced Entities' events (#1123)
* All functionality in place
1 parent 2ee0ee7 commit e9d6bb3

23 files changed

+387
-95
lines changed

sdk/spring-sdk/src/it/java/com/example/wiring/SpringSdkWiringIntegrationTest.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.example.wiring;
1818

19+
import akka.http.javadsl.model.StatusCode;
1920
import com.example.Main;
2021
import com.example.wiring.actions.echo.Message;
2122
import com.example.wiring.eventsourcedentities.counter.Counter;
@@ -96,6 +97,32 @@ public void verifyStreamActions() {
9697
Assertions.assertEquals(3, messageList.size());
9798
}
9899

100+
@Test
101+
public void verifyCounterEventSourceSubscription() {
102+
// GIVEN IncreaseAction is subscribed to CounterEntity events
103+
// WHEN the CounterEntity is requested to increase 42
104+
webClient
105+
.post()
106+
.uri("/counter/hello1/increase/42")
107+
.retrieve()
108+
.bodyToMono(Integer.class)
109+
.block(timeout);
110+
111+
// THEN IncreaseAction receives the event 42 and increases the counter 1 more
112+
await()
113+
.ignoreExceptions()
114+
.atMost(10, TimeUnit.of(SECONDS))
115+
.until(
116+
() ->
117+
webClient
118+
.get()
119+
.uri("/counter/hello1")
120+
.retrieve()
121+
.bodyToMono(Integer.class)
122+
.block(timeout),
123+
new IsEqual(42 + 1));
124+
}
125+
99126
@Test
100127
public void verifyCounterEventSourcedWiring() {
101128

@@ -154,6 +181,43 @@ public void verifyFindCounterByValue() {
154181
new IsEqual<Integer>(10));
155182
}
156183

184+
@Test
185+
public void verifyCounterViewMultipleSubscriptions() throws InterruptedException {
186+
ResponseEntity<Integer> response1 =
187+
webClient
188+
.post()
189+
.uri("/counter/hello2/increase/1")
190+
.retrieve()
191+
.toEntity(Integer.class)
192+
.block(timeout);
193+
194+
Assertions.assertEquals(HttpStatus.OK, response1.getStatusCode());
195+
ResponseEntity<Integer> response2 =
196+
webClient
197+
.post()
198+
.uri("/counter/hello3/increase/1")
199+
.retrieve()
200+
.toEntity(Integer.class)
201+
.block(timeout);
202+
203+
Assertions.assertEquals(HttpStatus.OK, response2.getStatusCode());
204+
205+
await()
206+
.ignoreExceptions()
207+
.atMost(20, TimeUnit.SECONDS)
208+
.until(
209+
() ->
210+
webClient
211+
.get()
212+
.uri("/counters-ms/by-value/1")
213+
.retrieve()
214+
.bodyToFlux(Counter.class)
215+
.toStream()
216+
.collect(Collectors.toList())
217+
.size(),
218+
new IsEqual<>(2));
219+
}
220+
157221
@Test
158222
public void verifyTransformedUserViewWiring() throws InterruptedException {
159223

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright 2021 Lightbend Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example.wiring.eventsourcedentities.counter;
18+
19+
import kalix.javasdk.action.Action;
20+
import kalix.javasdk.action.ActionCreationContext;
21+
import kalix.springsdk.KalixClient;
22+
import kalix.springsdk.KalixConfigurationTest;
23+
import kalix.springsdk.annotations.Subscribe;
24+
import org.springframework.context.annotation.Import;
25+
26+
import java.util.concurrent.CompletableFuture;
27+
import java.util.concurrent.CompletionStage;
28+
29+
@Import(KalixConfigurationTest.class)
30+
public class IncreaseAction extends Action {
31+
32+
private KalixClient kalixClient;
33+
34+
private ActionCreationContext context;
35+
36+
public IncreaseAction(KalixClient kalixClient, ActionCreationContext context) {
37+
this.kalixClient = kalixClient;
38+
this.context = context;
39+
}
40+
41+
@Subscribe.EventSourcedEntity(value = CounterEntity.class)
42+
public Effect<ValueMultiplied> printMultiply(ValueMultiplied event) {
43+
return effects().reply(event);
44+
}
45+
46+
@Subscribe.EventSourcedEntity(value = CounterEntity.class)
47+
public Effect<Integer> printIncrease(ValueIncreased event) {
48+
String entityId = this.actionContext().metadata().asCloudEvent().subject().get();
49+
if (event.value == 42) {
50+
CompletionStage<Integer> res =
51+
kalixClient.post("/counter/" + entityId + "/increase/1", "", Integer.class).execute();
52+
return effects().asyncReply(res);
53+
}
54+
return effects().reply(event.value);
55+
}
56+
}

sdk/spring-sdk/src/it/java/com/example/wiring/eventsourcedentities/counter/ValueMultiplied.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public class ValueMultiplied implements CounterEvent {
2424
public final int value;
2525

2626
@JsonCreator
27-
public ValueMultiplied(@JsonProperty Integer value) {
27+
public ValueMultiplied(@JsonProperty("value") int value) {
2828
this.value = value;
2929
}
3030
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2021 Lightbend Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example.wiring.views;
18+
19+
import com.example.wiring.eventsourcedentities.counter.*;
20+
import kalix.javasdk.view.View;
21+
import kalix.springsdk.annotations.Query;
22+
import kalix.springsdk.annotations.Subscribe;
23+
import kalix.springsdk.annotations.Table;
24+
import org.springframework.web.bind.annotation.GetMapping;
25+
import org.springframework.web.bind.annotation.PathVariable;
26+
import reactor.core.publisher.Flux;
27+
28+
// With Multiple Subscriptions
29+
@Table("counters_by_value_ms")
30+
public class CountersByValueSubscriptions extends View<Counter> {
31+
32+
@Override
33+
public Counter emptyState() {
34+
return new Counter(0);
35+
}
36+
37+
@GetMapping("/counters-ms/by-value/{value}")
38+
@Query("SELECT * FROM counters_by_value_ms WHERE value = :value")
39+
public Flux<Counter> getCounterByValue(@PathVariable Integer value) {
40+
return null;
41+
}
42+
43+
@Subscribe.EventSourcedEntity(CounterEntity.class)
44+
public UpdateEffect<Counter> onEvent(Counter counter, ValueIncreased event) {
45+
return effects().updateState(counter.onValueIncreased(event));
46+
}
47+
48+
@Subscribe.EventSourcedEntity(CounterEntity.class)
49+
public UpdateEffect<Counter> onEvent(Counter counter, ValueMultiplied event) {
50+
return effects().updateState(counter.onValueMultiplied(event));
51+
}
52+
}

sdk/spring-sdk/src/main/scala/kalix/springsdk/impl/ActionDescriptorFactory.scala

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@ import kalix.springsdk.impl.ComponentDescriptorFactory.hasTopicPublication
2525
import kalix.springsdk.impl.ComponentDescriptorFactory.hasTopicSubscription
2626
import kalix.springsdk.impl.ComponentDescriptorFactory.hasValueEntitySubscription
2727
import kalix.springsdk.impl.ComponentDescriptorFactory.validateRestMethod
28+
import kalix.springsdk.impl.reflection.CombinedSubscriptionServiceMethod
2829
import kalix.springsdk.impl.reflection.KalixMethod
2930
import kalix.springsdk.impl.reflection.NameGenerator
3031
import kalix.springsdk.impl.reflection.ReflectionUtils
3132
import kalix.springsdk.impl.reflection.RestServiceIntrospector
32-
import kalix.springsdk.impl.reflection.RestServiceMethod
33+
import kalix.springsdk.impl.reflection.SubscriptionServiceMethod
3334

3435
private[impl] object ActionDescriptorFactory extends ComponentDescriptorFactory {
3536

@@ -51,7 +52,7 @@ private[impl] object ActionDescriptorFactory extends ComponentDescriptorFactory
5152
val kalixOptions =
5253
kalix.MethodOptions.newBuilder().setEventing(subscriptionOptions).build()
5354

54-
KalixMethod(RestServiceMethod(method))
55+
KalixMethod(SubscriptionServiceMethod(method))
5556
.withKalixOptions(kalixOptions)
5657
}
5758

@@ -63,7 +64,7 @@ private[impl] object ActionDescriptorFactory extends ComponentDescriptorFactory
6364
val kalixOptions =
6465
kalix.MethodOptions.newBuilder().setEventing(subscriptionOptions).build()
6566

66-
KalixMethod(RestServiceMethod(method))
67+
KalixMethod(SubscriptionServiceMethod(method))
6768
.withKalixOptions(kalixOptions)
6869
}
6970

@@ -75,24 +76,35 @@ private[impl] object ActionDescriptorFactory extends ComponentDescriptorFactory
7576
val kalixOptions =
7677
kalix.MethodOptions.newBuilder().setEventing(subscriptionOptions).build()
7778

78-
KalixMethod(RestServiceMethod(method))
79+
KalixMethod(SubscriptionServiceMethod(method))
7980
.withKalixOptions(kalixOptions)
8081
}
8182

82-
def checkNotTopicDuplication(subscriptions: Seq[KalixMethod]): Seq[KalixMethod] = {
83+
def combineByTopic(subscriptions: Seq[KalixMethod]): Seq[KalixMethod] = {
8384
def groupByTopic(methods: Seq[KalixMethod]): Map[String, Seq[KalixMethod]] = {
8485
val withTopicIn = methods.filter(kalixMethod =>
8586
kalixMethod.methodOptions.exists(option =>
8687
option.hasEventing && option.getEventing.hasIn && option.getEventing.getIn.hasTopic))
87-
//Assuming there is only one topic, therefore head is as good as any other
88+
//Assuming there is only one topic annotation per method, therefore head is as good as any other
8889
withTopicIn.groupBy(m => m.methodOptions.head.getEventing.getIn.getTopic)
8990
}
9091
groupByTopic(subscriptions).collect {
9192
case (topic, kMethods) if kMethods.size > 1 =>
92-
throw InvalidComponentException(
93-
s"topic: '$topic' it is used in multiple @Subscription.Topic annotations. Each @Subscription.Topic must point to a different topic")
94-
}
95-
subscriptions
93+
val typeUrl2Methods: Seq[TypeUrl2Method] = kMethods.map { k =>
94+
TypeUrl2Method(
95+
k.serviceMethod.javaMethodOpt.get.getParameterTypes()(0).getName,
96+
k.serviceMethod.javaMethodOpt.get)
97+
}
98+
99+
KalixMethod(
100+
CombinedSubscriptionServiceMethod(
101+
"KalixSyntheticMethodOnTopic" + topic.capitalize,
102+
kMethods.head.serviceMethod.asInstanceOf[SubscriptionServiceMethod],
103+
typeUrl2Methods))
104+
.withKalixOptions(kMethods.head.methodOptions)
105+
case (topic, kMethod +: Nil) =>
106+
kMethod
107+
}.toSeq
96108
}
97109

98110
val publicationTopicMethods = component.getMethods
@@ -103,7 +115,7 @@ private[impl] object ActionDescriptorFactory extends ComponentDescriptorFactory
103115
val kalixOptions =
104116
kalix.MethodOptions.newBuilder().setEventing(publicationOptions).build()
105117

106-
KalixMethod(RestServiceMethod(method))
118+
KalixMethod(SubscriptionServiceMethod(method))
107119
.withKalixOptions(kalixOptions)
108120
}
109121
val serviceName = nameGenerator.getName(component.getSimpleName)
@@ -135,8 +147,8 @@ private[impl] object ActionDescriptorFactory extends ComponentDescriptorFactory
135147
component.getPackageName,
136148
filterAndAddKalixOptions(springAnnotatedMethods, publicationTopicMethods)
137149
++ subscriptionValueEntityMethods
138-
++ subscriptionEventSourcedEntityMethods
139-
++ checkNotTopicDuplication(subscriptionTopicMethods)
150+
++ combineByES(subscriptionEventSourcedEntityMethods)
151+
++ combineByTopic(subscriptionTopicMethods)
140152
++ removeDuplicates(springAnnotatedMethods, publicationTopicMethods))
141153
}
142154
}

sdk/spring-sdk/src/main/scala/kalix/springsdk/impl/ComponentDescriptor.scala

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@ import com.google.protobuf.DescriptorProtos.ServiceDescriptorProto
3131
import com.google.protobuf.Descriptors
3232
import com.google.protobuf.Descriptors.FileDescriptor
3333
import com.google.protobuf.{ Any => JavaPbAny }
34-
import kalix.springsdk.annotations.Entity
35-
import kalix.springsdk.annotations.Table
34+
import kalix.springsdk.impl.reflection.CombinedSubscriptionServiceMethod
3635
import kalix.springsdk.impl.reflection.{
3736
AnyServiceMethod,
3837
DynamicMessageContext,
@@ -50,8 +49,10 @@ import kalix.springsdk.impl.reflection.RestServiceIntrospector.HeaderParameter
5049
import kalix.springsdk.impl.reflection.RestServiceIntrospector.PathParameter
5150
import kalix.springsdk.impl.reflection.RestServiceIntrospector.QueryParamParameter
5251
import kalix.springsdk.impl.reflection.RestServiceIntrospector.UnhandledParameter
52+
import kalix.springsdk.impl.reflection.SubscriptionServiceMethod
5353
import org.springframework.web.bind.annotation.RequestMethod
5454

55+
import java.lang.reflect.Method
5556
import scala.jdk.CollectionConverters.CollectionHasAsScala
5657

5758
/**
@@ -176,13 +177,27 @@ private[impl] object ComponentDescriptor {
176177
}.toArray
177178
} else Array.empty
178179

179-
ComponentMethod(serviceMethod.javaMethodOpt, grpcMethodName, parameterExtractors, message)
180-
180+
ComponentMethod(
181+
grpcMethodName,
182+
parameterExtractors,
183+
message,
184+
Seq(TypeUrl2Method(method.javaMethod.getName, method.javaMethod)))
185+
case method: CombinedSubscriptionServiceMethod =>
186+
val parameterExtractors: ParameterExtractors =
187+
method.typeUrl2Method
188+
.flatMap(each =>
189+
each.method.getParameterTypes.map(param => new ParameterExtractors.AnyBodyExtractor[AnyRef](param)))
190+
.toArray
191+
ComponentMethod(grpcMethodName, parameterExtractors, JavaPbAny.getDescriptor, method.typeUrl2Method)
181192
case method: AnyServiceMethod =>
182193
// methods that receive Any as input always default to AnyBodyExtractor
183194
val parameterExtractors: ParameterExtractors = Array(
184195
new ParameterExtractors.AnyBodyExtractor(method.inputType))
185-
ComponentMethod(serviceMethod.javaMethodOpt, grpcMethodName, parameterExtractors, JavaPbAny.getDescriptor)
196+
val typeUrl2method = serviceMethod.javaMethodOpt match {
197+
case Some(m) => Seq(TypeUrl2Method(m.getName, m))
198+
case None => Nil
199+
}
200+
ComponentMethod(grpcMethodName, parameterExtractors, JavaPbAny.getDescriptor, typeUrl2method)
186201
}
187202

188203
}

sdk/spring-sdk/src/main/scala/kalix/springsdk/impl/ComponentDescriptorFactory.scala

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,36 @@ private[impl] trait ComponentDescriptorFactory {
131131
* Inspect the component class (type), validate the annotations/methods and build a component descriptor for it.
132132
*/
133133
def buildDescriptorFor(componentClass: Class[_], nameGenerator: NameGenerator): ComponentDescriptor
134+
135+
def combineByES(subscriptions: Seq[KalixMethod]): Seq[KalixMethod] = {
136+
def groupByES(methods: Seq[KalixMethod]): Map[String, Seq[KalixMethod]] = {
137+
val withEventSourcedIn = methods.filter(kalixMethod =>
138+
kalixMethod.methodOptions.exists(option =>
139+
option.hasEventing && option.getEventing.hasIn && option.getEventing.getIn.hasEventSourcedEntity))
140+
//Assuming there is only one eventing.in annotation per method, therefore head is as good as any other
141+
withEventSourcedIn.groupBy(m => m.methodOptions.head.getEventing.getIn.getEventSourcedEntity)
142+
}
143+
groupByES(subscriptions).collect {
144+
case (eventSourcedEntity, kMethods) if kMethods.size > 1 =>
145+
val typeUrl2Method: Seq[TypeUrl2Method] = kMethods.map { k =>
146+
val methodParameterTypes = k.serviceMethod.javaMethodOpt.get.getParameterTypes();
147+
val eventParameter = methodParameterTypes(methodParameterTypes.size - 1)
148+
// it is safe to pick the last parameter. An action has one and View has two. In the View always the last is the event
149+
TypeUrl2Method(
150+
kalix.javasdk.JsonSupport.KALIX_JSON
151+
+ SpringSdkMessageCodec.findTypeHint(eventParameter),
152+
k.serviceMethod.javaMethodOpt.get)
153+
}
154+
KalixMethod(
155+
CombinedSubscriptionServiceMethod(
156+
"KalixSyntheticMethodOnES" + eventSourcedEntity.capitalize,
157+
kMethods.head.serviceMethod.asInstanceOf[SubscriptionServiceMethod],
158+
typeUrl2Method))
159+
.withKalixOptions(kMethods.head.methodOptions)
160+
case (eventSourcedEntity, kMethod +: Nil) =>
161+
kMethod
162+
}.toSeq
163+
}
134164
}
135165

136166
/**

0 commit comments

Comments
 (0)