Skip to content

Commit 19857df

Browse files
authored
feat: (java-spring) support multiple subscriptions of the same stream (#2032)
1 parent a46573a commit 19857df

File tree

5 files changed

+86
-16
lines changed

5 files changed

+86
-16
lines changed

samples/java-spring-eventsourced-customer-registry-subscriber/src/it/java/customer/api/CustomersByNameViewIntegrationTest.java

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public class CustomersByNameViewIntegrationTest extends KalixIntegrationTestKitS
2727
private WebClient webClient;
2828

2929
@Test
30-
public void shouldReturnCustomerByName() {
30+
public void shouldReturnCustomersFromViews() {
3131
IncomingMessages customerEvents = kalixTestKit.getStreamIncomingMessages("customer-registry", "customer_events");
3232

3333
String bob = "bob";
@@ -36,20 +36,28 @@ public void shouldReturnCustomerByName() {
3636

3737
customerEvents.publish(created1, "b");
3838
customerEvents.publish(created2, "a");
39-
39+
4040
await()
41-
.ignoreExceptions()
42-
.atMost(20, TimeUnit.SECONDS)
43-
.pollInterval(1, TimeUnit.SECONDS)
44-
.untilAsserted(() -> {
45-
Customer customer = webClient.get()
46-
.uri("/customers/by_name/" + bob)
47-
.retrieve()
48-
.bodyToFlux(Customer.class)
49-
.blockFirst(timeout);
50-
51-
assertThat(customer).isEqualTo(new Customer("b", created1.email(), created1.name()));
52-
}
53-
);
41+
.ignoreExceptions()
42+
.atMost(20, TimeUnit.SECONDS)
43+
.pollInterval(1, TimeUnit.SECONDS)
44+
.untilAsserted(() -> {
45+
Customer customer = webClient.get()
46+
.uri("/customers/by_name/" + bob)
47+
.retrieve()
48+
.bodyToFlux(Customer.class)
49+
.blockFirst(timeout);
50+
51+
assertThat(customer).isEqualTo(new Customer("b", created1.email(), created1.name()));
52+
53+
Customer customer2 = webClient.get()
54+
.uri("/customers/by_email/" + created2.email())
55+
.retrieve()
56+
.bodyToFlux(Customer.class)
57+
.blockFirst(timeout);
58+
59+
assertThat(customer2).isEqualTo(new Customer("a", created2.email(), created2.name()));
60+
}
61+
);
5462
}
5563
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package customer.views;
2+
3+
import kalix.javasdk.annotations.Acl;
4+
import kalix.javasdk.annotations.Query;
5+
import kalix.javasdk.annotations.Subscribe;
6+
import kalix.javasdk.annotations.Table;
7+
import kalix.javasdk.view.View;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
import org.springframework.web.bind.annotation.GetMapping;
11+
import org.springframework.web.bind.annotation.PathVariable;
12+
import reactor.core.publisher.Flux;
13+
14+
// tag::view[]
15+
@Table("customers_by_email")
16+
@Subscribe.Stream( // <1>
17+
service = "customer-registry", // <2>
18+
id = "customer_events", // <3>
19+
consumerGroup = "customer-by-email-view" // <4>
20+
)
21+
public class CustomersByEmailView extends View<Customer> {
22+
// end::view[]
23+
private static final Logger logger = LoggerFactory.getLogger(CustomersByEmailView.class);
24+
// tag::view[]
25+
26+
public UpdateEffect<Customer> onEvent( // <4>
27+
CustomerPublicEvent.Created created) {
28+
// end::view[]
29+
logger.info("Received: {}", created);
30+
// tag::view[]
31+
var id = updateContext().eventSubject().get();
32+
return effects().updateState(
33+
new Customer(id, created.email(), created.name()));
34+
}
35+
36+
public UpdateEffect<Customer> onEvent(
37+
CustomerPublicEvent.NameChanged nameChanged) {
38+
// end::view[]
39+
logger.info("Received: {}", nameChanged);
40+
// tag::view[]
41+
var updated = viewState().withName(nameChanged.newName());
42+
return effects().updateState(updated);
43+
}
44+
45+
@GetMapping("/customers/by_email/{email}")
46+
@Query("SELECT * FROM customers_by_email WHERE email = :email")
47+
@Acl(allow = @Acl.Matcher(principal = Acl.Principal.INTERNET))
48+
public Flux<Customer> findByName(@PathVariable String email) {
49+
return null;
50+
}
51+
52+
}
53+
// end::view[]

samples/java-spring-eventsourced-customer-registry-subscriber/src/main/java/customer/views/CustomersByNameView.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
@Table("customers_by_name")
1616
@Subscribe.Stream( // <1>
1717
service = "customer-registry", // <2>
18-
id = "customer_events" // <3>
18+
id = "customer_events", // <3>
19+
consumerGroup = "customer-by-name-view"
1920
)
2021
public class CustomersByNameView extends View<Customer> {
2122
// end::view[]

sdk/java-sdk-spring/src/main/java/kalix/javasdk/annotations/Subscribe.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,13 @@
152152
*/
153153
String service();
154154

155+
/**
156+
* In case you need to consume the same stream multiple times, each subscription should have a unique consumer group.
157+
* <p>
158+
* Changing the consumer group will lead to starting over from the beginning of the stream.
159+
*/
160+
String consumerGroup() default "";
161+
155162
/**
156163
* When there is no method in the class whose input type matches the event type:
157164
* <ul>

sdk/java-sdk-spring/src/main/scala/kalix/javasdk/impl/ComponentDescriptorFactory.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,7 @@ private[impl] object ComponentDescriptorFactory {
342342
val in = EventSource
343343
.newBuilder()
344344
.setDirect(direct)
345+
.setConsumerGroup(streamAnn.consumerGroup())
345346

346347
val eventing =
347348
ServiceEventing

0 commit comments

Comments
 (0)