diff --git a/.gitignore b/.gitignore
index 2e6216ac4f..3296876876 100644
--- a/.gitignore
+++ b/.gitignore
@@ -51,3 +51,4 @@ sdk/java-sdk-spring/tempo-data
sdk-version-raw.txt
sdk-version.txt
sbt-plugin/src/sbt-test/sbt-kalix/*/global/
+.claude/settings.local.json
diff --git a/codegen/core/src/main/scala/kalix/codegen/ModelBuilder.scala b/codegen/core/src/main/scala/kalix/codegen/ModelBuilder.scala
index 21f4f01651..9a57800a21 100644
--- a/codegen/core/src/main/scala/kalix/codegen/ModelBuilder.scala
+++ b/codegen/core/src/main/scala/kalix/codegen/ModelBuilder.scala
@@ -326,6 +326,7 @@ object ModelBuilder {
outToTopic: Boolean,
ignore: Boolean,
handleDeletes: Boolean,
+ startFromSnapshots: Boolean,
viewTable: String) {
def isUnary: Boolean = !streamedInput && !streamedOutput
@@ -350,6 +351,7 @@ object ModelBuilder {
outToTopic = eventing.hasOut && eventing.getOut.hasTopic,
ignore = eventing.hasIn && eventing.getIn.getIgnore,
handleDeletes = eventing.hasIn && eventing.getIn.getHandleDeletes,
+ startFromSnapshots = eventing.hasIn && eventing.getIn.getStartFromSnapshots,
viewUpdate.getTable)
}
}
diff --git a/codegen/java-gen/src/main/scala/kalix/codegen/java/ActionServiceSourceGenerator.scala b/codegen/java-gen/src/main/scala/kalix/codegen/java/ActionServiceSourceGenerator.scala
index b43678a478..416877b246 100644
--- a/codegen/java-gen/src/main/scala/kalix/codegen/java/ActionServiceSourceGenerator.scala
+++ b/codegen/java-gen/src/main/scala/kalix/codegen/java/ActionServiceSourceGenerator.scala
@@ -70,6 +70,12 @@ object ActionServiceSourceGenerator {
|public Effect<$outputType> ${lowerFirst(methodName)}() {
| throw new RuntimeException("The delete handler for `$methodName` is not implemented, yet");
|}""".stripMargin
+ } else if (cmd.startFromSnapshots) {
+ s"""|@Override
+ |public Effect<$outputType> ${lowerFirst(methodName)}($inputTypeFullName $input) {
+ | // Snapshot handler processes entity state before any events
+ | ${jsonTopicHint}throw new RuntimeException("The snapshot handler for `$methodName` is not implemented, yet");
+ |}""".stripMargin
} else {
s"""|@Override
|public Effect<$outputType> ${lowerFirst(methodName)}($inputTypeFullName $input) {
@@ -131,6 +137,9 @@ object ActionServiceSourceGenerator {
if (cmd.isUnary) {
if (cmd.handleDeletes) {
s"""|public abstract Effect<$outputType> ${lowerFirst(methodName)}();""".stripMargin
+ } else if (cmd.startFromSnapshots) {
+ s"""|/** Snapshot handler for processing entity snapshots. */
+ |public abstract Effect<$outputType> ${lowerFirst(methodName)}($inputTypeFullName $input);""".stripMargin
} else {
s"""|public abstract Effect<$outputType> ${lowerFirst(methodName)}($inputTypeFullName $input);""".stripMargin
}
@@ -180,6 +189,11 @@ object ActionServiceSourceGenerator {
| return action()
| .${lowerFirst(methodName)}();
|""".stripMargin
+ } else if (cmd.startFromSnapshots) {
+ s"""|case "$methodName":
+ | return action()
+ | .${lowerFirst(methodName)}(($inputTypeFullName) message.payload());
+ |""".stripMargin
} else {
s"""|case "$methodName":
| return action()
diff --git a/codegen/java-gen/src/main/scala/kalix/codegen/java/ViewServiceSourceGenerator.scala b/codegen/java-gen/src/main/scala/kalix/codegen/java/ViewServiceSourceGenerator.scala
index 1fdfd10951..f555ad2afe 100644
--- a/codegen/java-gen/src/main/scala/kalix/codegen/java/ViewServiceSourceGenerator.scala
+++ b/codegen/java-gen/src/main/scala/kalix/codegen/java/ViewServiceSourceGenerator.scala
@@ -55,6 +55,12 @@ object ViewServiceSourceGenerator {
c"""|case "$methodName":
| return view().${lowerFirst(methodName)}(state);
|"""
+ } else if (cmd.startFromSnapshots) {
+ c"""|case "$methodName":
+ | return view().${lowerFirst(methodName)}(
+ | state,
+ | ($inputType) event);
+ |"""
} else {
c"""|case "$methodName":
| return view().${lowerFirst(methodName)}(
@@ -261,6 +267,15 @@ object ViewServiceSourceGenerator {
| throw new UnsupportedOperationException("Delete handler for '${update.name}' not implemented yet");
|}
|"""
+ } else if (update.startFromSnapshots) {
+ c"""|@Override
+ |public $View.UpdateEffect<$stateType> ${lowerFirst(update.name)}(
+ | $stateType state,
+ | ${update.inputType} ${lowerFirst(update.inputType.name)}) {
+ | // Snapshot handler processes entity state on view startup before any events
+ | throw new UnsupportedOperationException("Snapshot handler for '${update.name}' not implemented yet");
+ |}
+ |"""
} else {
c"""|@Override
|public $View.UpdateEffect<$stateType> ${lowerFirst(update.name)}(
@@ -342,6 +357,12 @@ object ViewServiceSourceGenerator {
c"""|public abstract $View.UpdateEffect<$stateType> ${lowerFirst(update.name)}(
| $stateType state);
|"""
+ } else if (update.startFromSnapshots) {
+ c"""|/** Snapshot handler for initializing view state from entity snapshots. */
+ |public abstract $View.UpdateEffect<$stateType> ${lowerFirst(update.name)}(
+ | $stateType state,
+ | ${update.inputType} ${lowerFirst(update.inputType.name)});
+ |"""
} else {
c"""|public abstract $View.UpdateEffect<$stateType> ${lowerFirst(update.name)}(
| $stateType state,
diff --git a/codegen/scala-gen/src/main/scala/kalix/codegen/scalasdk/impl/ActionServiceSourceGenerator.scala b/codegen/scala-gen/src/main/scala/kalix/codegen/scalasdk/impl/ActionServiceSourceGenerator.scala
index b16c9b6519..7a4e5821d4 100644
--- a/codegen/scala-gen/src/main/scala/kalix/codegen/scalasdk/impl/ActionServiceSourceGenerator.scala
+++ b/codegen/scala-gen/src/main/scala/kalix/codegen/scalasdk/impl/ActionServiceSourceGenerator.scala
@@ -58,6 +58,11 @@ object ActionServiceSourceGenerator {
c"""|override def ${lowerFirst(methodName)}(): $Action.Effect[$outputType] = {
| ${jsonTopicHint}throw new RuntimeException("The delete handler for `$methodName` is not implemented, yet")
|}"""
+ } else if (cmd.startFromSnapshots) {
+ c"""|override def ${lowerFirst(methodName)}($input: $inputType): $Action.Effect[$outputType] = {
+ | // Snapshot handler processes entity state before any events
+ | ${jsonTopicHint}throw new RuntimeException("The snapshot handler for `$methodName` is not implemented, yet")
+ |}"""
} else {
c"""|override def ${lowerFirst(methodName)}($input: $inputType): $Action.Effect[$outputType] = {
| ${jsonTopicHint}throw new RuntimeException("The command handler for `$methodName` is not implemented, yet")
@@ -107,6 +112,9 @@ object ActionServiceSourceGenerator {
if (cmd.isUnary) {
if (cmd.handleDeletes) {
c"""|def ${lowerFirst(methodName)}(): $Action.Effect[$outputType]"""
+ } else if (cmd.startFromSnapshots) {
+ c"""|/** Snapshot handler for processing entity snapshots. */
+ |def ${lowerFirst(methodName)}($input: $inputType): $Action.Effect[$outputType]"""
} else {
c"""|def ${lowerFirst(methodName)}($input: $inputType): $Action.Effect[$outputType]"""
}
@@ -155,6 +163,10 @@ object ActionServiceSourceGenerator {
c"""|case "$methodName" =>
| action.${lowerFirst(methodName)}()
|"""
+ } else if (cmd.startFromSnapshots) {
+ c"""|case "$methodName" =>
+ | action.${lowerFirst(methodName)}(message.payload.asInstanceOf[$inputType])
+ |"""
} else {
c"""|case "$methodName" =>
| action.${lowerFirst(methodName)}(message.payload.asInstanceOf[$inputType])
diff --git a/codegen/scala-gen/src/main/scala/kalix/codegen/scalasdk/impl/ViewServiceSourceGenerator.scala b/codegen/scala-gen/src/main/scala/kalix/codegen/scalasdk/impl/ViewServiceSourceGenerator.scala
index 9e6eedf2da..444b0a9ac0 100644
--- a/codegen/scala-gen/src/main/scala/kalix/codegen/scalasdk/impl/ViewServiceSourceGenerator.scala
+++ b/codegen/scala-gen/src/main/scala/kalix/codegen/scalasdk/impl/ViewServiceSourceGenerator.scala
@@ -72,6 +72,12 @@ object ViewServiceSourceGenerator {
| view.${lowerFirst(methodName)}(
| state)
|""".stripMargin
+ } else if (cmd.startFromSnapshots) {
+ s"""|case "$methodName" =>
+ | view.${lowerFirst(methodName)}(
+ | state,
+ | event.asInstanceOf[${typeName(cmd.inputType)}])
+ |""".stripMargin
} else {
s"""|case "$methodName" =>
| view.${lowerFirst(methodName)}(
@@ -261,6 +267,13 @@ object ViewServiceSourceGenerator {
| state: $stateType): UpdateEffect[$stateType] =
| throw new UnsupportedOperationException("Delete handler for '${update.name}' not implemented yet")
|""".stripMargin
+ } else if (update.startFromSnapshots) {
+ s"""|override def ${lowerFirst(update.name)}(
+ | state: $stateType,
+ | ${lowerFirst(update.inputType.name)}: ${typeName(update.inputType)}): UpdateEffect[$stateType] =
+ | // Snapshot handler processes entity state on view startup before any events
+ | throw new UnsupportedOperationException("Snapshot handler for '${update.name}' not implemented yet")
+ |""".stripMargin
} else {
s"""|override def ${lowerFirst(update.name)}(
| state: $stateType,
@@ -339,6 +352,12 @@ object ViewServiceSourceGenerator {
s"""|def ${lowerFirst(update.name)}(
| state: $stateType): View.UpdateEffect[$stateType]
|""".stripMargin
+ } else if (update.startFromSnapshots) {
+ s"""|/** Snapshot handler for initializing view state from entity snapshots. */
+ |def ${lowerFirst(update.name)}(
+ | state: $stateType,
+ | ${lowerFirst(update.inputType.name)}: ${typeName(update.inputType)}): View.UpdateEffect[$stateType]
+ |""".stripMargin
} else {
s"""|def ${lowerFirst(update.name)}(
| state: $stateType,
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index c3b7bc1bdd..a7910b0e62 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -9,7 +9,7 @@ object Dependencies {
val RuntimeVersion = System.getProperty(
"kalix-runtime.version",
// temporarily accept the old system property name
- System.getProperty("kalix-proxy.version", "1.2.38"))
+ System.getProperty("kalix-proxy.version", "1.2.38-1-6f468ecf-SNAPSHOT"))
}
// changing the Scala version of the Java SDK affects end users
diff --git a/samples/java-protobuf-eventsourced-customer-registry/pom.xml b/samples/java-protobuf-eventsourced-customer-registry/pom.xml
index 0a3d7b46ec..1a80a8825e 100644
--- a/samples/java-protobuf-eventsourced-customer-registry/pom.xml
+++ b/samples/java-protobuf-eventsourced-customer-registry/pom.xml
@@ -4,7 +4,7 @@
io.kalix
kalix-java-sdk-protobuf-parent
- 1.5.21
+ 1.5.21-4-53b2fd32-dev
io.kalix.samples
diff --git a/samples/java-protobuf-eventsourced-customer-registry/src/main/java/customer/view/CustomerByNameView.java b/samples/java-protobuf-eventsourced-customer-registry/src/main/java/customer/view/CustomerByNameView.java
index faa1c263ac..c5df71b97d 100644
--- a/samples/java-protobuf-eventsourced-customer-registry/src/main/java/customer/view/CustomerByNameView.java
+++ b/samples/java-protobuf-eventsourced-customer-registry/src/main/java/customer/view/CustomerByNameView.java
@@ -33,8 +33,8 @@ public CustomerApi.Customer emptyState() { // <2>
@Override // <3>
public UpdateEffect processCustomerCreated(
- CustomerApi.Customer state,
- CustomerDomain.CustomerCreated customerCreated) {
+ CustomerApi.Customer state,
+ CustomerDomain.CustomerCreated customerCreated) {
if (state != null) {
return effects().ignore(); // already created
} else {
@@ -44,20 +44,28 @@ public UpdateEffect processCustomerCreated(
@Override // <3>
public UpdateEffect processCustomerNameChanged(
- CustomerApi.Customer state,
- CustomerDomain.CustomerNameChanged customerNameChanged) {
+ CustomerApi.Customer state,
+ CustomerDomain.CustomerNameChanged customerNameChanged) {
return effects().updateState(
state.toBuilder().setName(customerNameChanged.getNewName()).build());
}
@Override // <3>
public UpdateEffect processCustomerAddressChanged(
- CustomerApi.Customer state,
- CustomerDomain.CustomerAddressChanged customerAddressChanged) {
+ CustomerApi.Customer state,
+ CustomerDomain.CustomerAddressChanged customerAddressChanged) {
return effects().updateState(
state.toBuilder().setAddress(convertToApi(customerAddressChanged.getNewAddress())).build());
}
+ @Override
+ public UpdateEffect processSnapshot(
+ CustomerApi.Customer state,
+ CustomerDomain.CustomerState customerState) {
+ // Initialize view from entity snapshot
+ return effects().updateState(convertToApi(customerState));
+ }
+
private CustomerApi.Customer convertToApi(CustomerDomain.CustomerState s) {
CustomerApi.Address address = CustomerApi.Address.getDefaultInstance();
if (s.hasAddress()) {
diff --git a/samples/java-protobuf-eventsourced-customer-registry/src/main/proto/customer/view/customer_view.proto b/samples/java-protobuf-eventsourced-customer-registry/src/main/proto/customer/view/customer_view.proto
index 7157eef000..05f708377e 100644
--- a/samples/java-protobuf-eventsourced-customer-registry/src/main/proto/customer/view/customer_view.proto
+++ b/samples/java-protobuf-eventsourced-customer-registry/src/main/proto/customer/view/customer_view.proto
@@ -73,6 +73,18 @@ service CustomerByName {
};
// end::ignore-other-events[]
+ // tag::snapshot-handler[]
+ rpc ProcessSnapshot(domain.CustomerState) returns (api.Customer) {
+ option (kalix.method).eventing.in = {
+ event_sourced_entity: "customers"
+ start_from_snapshots: true
+ };
+ option (kalix.method).view.update = {
+ table: "customers"
+ };
+ }
+ // end::snapshot-handler[]
+
rpc GetCustomers(ByNameRequest) returns (stream api.Customer) {
option (kalix.method).view.query = {
query: "SELECT * FROM customers WHERE name = :customer_name"