From 91000db11b268b0f3ee5284297b297bf3a2ccba6 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 2 Feb 2026 11:06:20 +0100 Subject: [PATCH] feat: Start views and actions from snapshot --- .gitignore | 1 + .../scala/kalix/codegen/ModelBuilder.scala | 2 ++ .../java/ActionServiceSourceGenerator.scala | 14 +++++++++++++ .../java/ViewServiceSourceGenerator.scala | 21 +++++++++++++++++++ .../impl/ActionServiceSourceGenerator.scala | 12 +++++++++++ .../impl/ViewServiceSourceGenerator.scala | 19 +++++++++++++++++ project/Dependencies.scala | 2 +- .../pom.xml | 2 +- .../customer/view/CustomerByNameView.java | 20 ++++++++++++------ .../proto/customer/view/customer_view.proto | 12 +++++++++++ 10 files changed, 97 insertions(+), 8 deletions(-) diff --git a/.gitignore b/.gitignore index 2e6216ac4f..3296876876 100644 --- a/.gitignore +++ b/.gitignore @@ -51,3 +51,4 @@ sdk/java-sdk-spring/tempo-data sdk-version-raw.txt sdk-version.txt sbt-plugin/src/sbt-test/sbt-kalix/*/global/ +.claude/settings.local.json diff --git a/codegen/core/src/main/scala/kalix/codegen/ModelBuilder.scala b/codegen/core/src/main/scala/kalix/codegen/ModelBuilder.scala index 21f4f01651..9a57800a21 100644 --- a/codegen/core/src/main/scala/kalix/codegen/ModelBuilder.scala +++ b/codegen/core/src/main/scala/kalix/codegen/ModelBuilder.scala @@ -326,6 +326,7 @@ object ModelBuilder { outToTopic: Boolean, ignore: Boolean, handleDeletes: Boolean, + startFromSnapshots: Boolean, viewTable: String) { def isUnary: Boolean = !streamedInput && !streamedOutput @@ -350,6 +351,7 @@ object ModelBuilder { outToTopic = eventing.hasOut && eventing.getOut.hasTopic, ignore = eventing.hasIn && eventing.getIn.getIgnore, handleDeletes = eventing.hasIn && eventing.getIn.getHandleDeletes, + startFromSnapshots = eventing.hasIn && eventing.getIn.getStartFromSnapshots, viewUpdate.getTable) } } diff --git a/codegen/java-gen/src/main/scala/kalix/codegen/java/ActionServiceSourceGenerator.scala b/codegen/java-gen/src/main/scala/kalix/codegen/java/ActionServiceSourceGenerator.scala index b43678a478..416877b246 100644 --- a/codegen/java-gen/src/main/scala/kalix/codegen/java/ActionServiceSourceGenerator.scala +++ b/codegen/java-gen/src/main/scala/kalix/codegen/java/ActionServiceSourceGenerator.scala @@ -70,6 +70,12 @@ object ActionServiceSourceGenerator { |public Effect<$outputType> ${lowerFirst(methodName)}() { | throw new RuntimeException("The delete handler for `$methodName` is not implemented, yet"); |}""".stripMargin + } else if (cmd.startFromSnapshots) { + s"""|@Override + |public Effect<$outputType> ${lowerFirst(methodName)}($inputTypeFullName $input) { + | // Snapshot handler processes entity state before any events + | ${jsonTopicHint}throw new RuntimeException("The snapshot handler for `$methodName` is not implemented, yet"); + |}""".stripMargin } else { s"""|@Override |public Effect<$outputType> ${lowerFirst(methodName)}($inputTypeFullName $input) { @@ -131,6 +137,9 @@ object ActionServiceSourceGenerator { if (cmd.isUnary) { if (cmd.handleDeletes) { s"""|public abstract Effect<$outputType> ${lowerFirst(methodName)}();""".stripMargin + } else if (cmd.startFromSnapshots) { + s"""|/** Snapshot handler for processing entity snapshots. */ + |public abstract Effect<$outputType> ${lowerFirst(methodName)}($inputTypeFullName $input);""".stripMargin } else { s"""|public abstract Effect<$outputType> ${lowerFirst(methodName)}($inputTypeFullName $input);""".stripMargin } @@ -180,6 +189,11 @@ object ActionServiceSourceGenerator { | return action() | .${lowerFirst(methodName)}(); |""".stripMargin + } else if (cmd.startFromSnapshots) { + s"""|case "$methodName": + | return action() + | .${lowerFirst(methodName)}(($inputTypeFullName) message.payload()); + |""".stripMargin } else { s"""|case "$methodName": | return action() diff --git a/codegen/java-gen/src/main/scala/kalix/codegen/java/ViewServiceSourceGenerator.scala b/codegen/java-gen/src/main/scala/kalix/codegen/java/ViewServiceSourceGenerator.scala index 1fdfd10951..f555ad2afe 100644 --- a/codegen/java-gen/src/main/scala/kalix/codegen/java/ViewServiceSourceGenerator.scala +++ b/codegen/java-gen/src/main/scala/kalix/codegen/java/ViewServiceSourceGenerator.scala @@ -55,6 +55,12 @@ object ViewServiceSourceGenerator { c"""|case "$methodName": | return view().${lowerFirst(methodName)}(state); |""" + } else if (cmd.startFromSnapshots) { + c"""|case "$methodName": + | return view().${lowerFirst(methodName)}( + | state, + | ($inputType) event); + |""" } else { c"""|case "$methodName": | return view().${lowerFirst(methodName)}( @@ -261,6 +267,15 @@ object ViewServiceSourceGenerator { | throw new UnsupportedOperationException("Delete handler for '${update.name}' not implemented yet"); |} |""" + } else if (update.startFromSnapshots) { + c"""|@Override + |public $View.UpdateEffect<$stateType> ${lowerFirst(update.name)}( + | $stateType state, + | ${update.inputType} ${lowerFirst(update.inputType.name)}) { + | // Snapshot handler processes entity state on view startup before any events + | throw new UnsupportedOperationException("Snapshot handler for '${update.name}' not implemented yet"); + |} + |""" } else { c"""|@Override |public $View.UpdateEffect<$stateType> ${lowerFirst(update.name)}( @@ -342,6 +357,12 @@ object ViewServiceSourceGenerator { c"""|public abstract $View.UpdateEffect<$stateType> ${lowerFirst(update.name)}( | $stateType state); |""" + } else if (update.startFromSnapshots) { + c"""|/** Snapshot handler for initializing view state from entity snapshots. */ + |public abstract $View.UpdateEffect<$stateType> ${lowerFirst(update.name)}( + | $stateType state, + | ${update.inputType} ${lowerFirst(update.inputType.name)}); + |""" } else { c"""|public abstract $View.UpdateEffect<$stateType> ${lowerFirst(update.name)}( | $stateType state, diff --git a/codegen/scala-gen/src/main/scala/kalix/codegen/scalasdk/impl/ActionServiceSourceGenerator.scala b/codegen/scala-gen/src/main/scala/kalix/codegen/scalasdk/impl/ActionServiceSourceGenerator.scala index b16c9b6519..7a4e5821d4 100644 --- a/codegen/scala-gen/src/main/scala/kalix/codegen/scalasdk/impl/ActionServiceSourceGenerator.scala +++ b/codegen/scala-gen/src/main/scala/kalix/codegen/scalasdk/impl/ActionServiceSourceGenerator.scala @@ -58,6 +58,11 @@ object ActionServiceSourceGenerator { c"""|override def ${lowerFirst(methodName)}(): $Action.Effect[$outputType] = { | ${jsonTopicHint}throw new RuntimeException("The delete handler for `$methodName` is not implemented, yet") |}""" + } else if (cmd.startFromSnapshots) { + c"""|override def ${lowerFirst(methodName)}($input: $inputType): $Action.Effect[$outputType] = { + | // Snapshot handler processes entity state before any events + | ${jsonTopicHint}throw new RuntimeException("The snapshot handler for `$methodName` is not implemented, yet") + |}""" } else { c"""|override def ${lowerFirst(methodName)}($input: $inputType): $Action.Effect[$outputType] = { | ${jsonTopicHint}throw new RuntimeException("The command handler for `$methodName` is not implemented, yet") @@ -107,6 +112,9 @@ object ActionServiceSourceGenerator { if (cmd.isUnary) { if (cmd.handleDeletes) { c"""|def ${lowerFirst(methodName)}(): $Action.Effect[$outputType]""" + } else if (cmd.startFromSnapshots) { + c"""|/** Snapshot handler for processing entity snapshots. */ + |def ${lowerFirst(methodName)}($input: $inputType): $Action.Effect[$outputType]""" } else { c"""|def ${lowerFirst(methodName)}($input: $inputType): $Action.Effect[$outputType]""" } @@ -155,6 +163,10 @@ object ActionServiceSourceGenerator { c"""|case "$methodName" => | action.${lowerFirst(methodName)}() |""" + } else if (cmd.startFromSnapshots) { + c"""|case "$methodName" => + | action.${lowerFirst(methodName)}(message.payload.asInstanceOf[$inputType]) + |""" } else { c"""|case "$methodName" => | action.${lowerFirst(methodName)}(message.payload.asInstanceOf[$inputType]) diff --git a/codegen/scala-gen/src/main/scala/kalix/codegen/scalasdk/impl/ViewServiceSourceGenerator.scala b/codegen/scala-gen/src/main/scala/kalix/codegen/scalasdk/impl/ViewServiceSourceGenerator.scala index 9e6eedf2da..444b0a9ac0 100644 --- a/codegen/scala-gen/src/main/scala/kalix/codegen/scalasdk/impl/ViewServiceSourceGenerator.scala +++ b/codegen/scala-gen/src/main/scala/kalix/codegen/scalasdk/impl/ViewServiceSourceGenerator.scala @@ -72,6 +72,12 @@ object ViewServiceSourceGenerator { | view.${lowerFirst(methodName)}( | state) |""".stripMargin + } else if (cmd.startFromSnapshots) { + s"""|case "$methodName" => + | view.${lowerFirst(methodName)}( + | state, + | event.asInstanceOf[${typeName(cmd.inputType)}]) + |""".stripMargin } else { s"""|case "$methodName" => | view.${lowerFirst(methodName)}( @@ -261,6 +267,13 @@ object ViewServiceSourceGenerator { | state: $stateType): UpdateEffect[$stateType] = | throw new UnsupportedOperationException("Delete handler for '${update.name}' not implemented yet") |""".stripMargin + } else if (update.startFromSnapshots) { + s"""|override def ${lowerFirst(update.name)}( + | state: $stateType, + | ${lowerFirst(update.inputType.name)}: ${typeName(update.inputType)}): UpdateEffect[$stateType] = + | // Snapshot handler processes entity state on view startup before any events + | throw new UnsupportedOperationException("Snapshot handler for '${update.name}' not implemented yet") + |""".stripMargin } else { s"""|override def ${lowerFirst(update.name)}( | state: $stateType, @@ -339,6 +352,12 @@ object ViewServiceSourceGenerator { s"""|def ${lowerFirst(update.name)}( | state: $stateType): View.UpdateEffect[$stateType] |""".stripMargin + } else if (update.startFromSnapshots) { + s"""|/** Snapshot handler for initializing view state from entity snapshots. */ + |def ${lowerFirst(update.name)}( + | state: $stateType, + | ${lowerFirst(update.inputType.name)}: ${typeName(update.inputType)}): View.UpdateEffect[$stateType] + |""".stripMargin } else { s"""|def ${lowerFirst(update.name)}( | state: $stateType, diff --git a/project/Dependencies.scala b/project/Dependencies.scala index c3b7bc1bdd..a7910b0e62 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -9,7 +9,7 @@ object Dependencies { val RuntimeVersion = System.getProperty( "kalix-runtime.version", // temporarily accept the old system property name - System.getProperty("kalix-proxy.version", "1.2.38")) + System.getProperty("kalix-proxy.version", "1.2.38-1-6f468ecf-SNAPSHOT")) } // changing the Scala version of the Java SDK affects end users diff --git a/samples/java-protobuf-eventsourced-customer-registry/pom.xml b/samples/java-protobuf-eventsourced-customer-registry/pom.xml index 0a3d7b46ec..1a80a8825e 100644 --- a/samples/java-protobuf-eventsourced-customer-registry/pom.xml +++ b/samples/java-protobuf-eventsourced-customer-registry/pom.xml @@ -4,7 +4,7 @@ io.kalix kalix-java-sdk-protobuf-parent - 1.5.21 + 1.5.21-4-53b2fd32-dev io.kalix.samples diff --git a/samples/java-protobuf-eventsourced-customer-registry/src/main/java/customer/view/CustomerByNameView.java b/samples/java-protobuf-eventsourced-customer-registry/src/main/java/customer/view/CustomerByNameView.java index faa1c263ac..c5df71b97d 100644 --- a/samples/java-protobuf-eventsourced-customer-registry/src/main/java/customer/view/CustomerByNameView.java +++ b/samples/java-protobuf-eventsourced-customer-registry/src/main/java/customer/view/CustomerByNameView.java @@ -33,8 +33,8 @@ public CustomerApi.Customer emptyState() { // <2> @Override // <3> public UpdateEffect processCustomerCreated( - CustomerApi.Customer state, - CustomerDomain.CustomerCreated customerCreated) { + CustomerApi.Customer state, + CustomerDomain.CustomerCreated customerCreated) { if (state != null) { return effects().ignore(); // already created } else { @@ -44,20 +44,28 @@ public UpdateEffect processCustomerCreated( @Override // <3> public UpdateEffect processCustomerNameChanged( - CustomerApi.Customer state, - CustomerDomain.CustomerNameChanged customerNameChanged) { + CustomerApi.Customer state, + CustomerDomain.CustomerNameChanged customerNameChanged) { return effects().updateState( state.toBuilder().setName(customerNameChanged.getNewName()).build()); } @Override // <3> public UpdateEffect processCustomerAddressChanged( - CustomerApi.Customer state, - CustomerDomain.CustomerAddressChanged customerAddressChanged) { + CustomerApi.Customer state, + CustomerDomain.CustomerAddressChanged customerAddressChanged) { return effects().updateState( state.toBuilder().setAddress(convertToApi(customerAddressChanged.getNewAddress())).build()); } + @Override + public UpdateEffect processSnapshot( + CustomerApi.Customer state, + CustomerDomain.CustomerState customerState) { + // Initialize view from entity snapshot + return effects().updateState(convertToApi(customerState)); + } + private CustomerApi.Customer convertToApi(CustomerDomain.CustomerState s) { CustomerApi.Address address = CustomerApi.Address.getDefaultInstance(); if (s.hasAddress()) { diff --git a/samples/java-protobuf-eventsourced-customer-registry/src/main/proto/customer/view/customer_view.proto b/samples/java-protobuf-eventsourced-customer-registry/src/main/proto/customer/view/customer_view.proto index 7157eef000..05f708377e 100644 --- a/samples/java-protobuf-eventsourced-customer-registry/src/main/proto/customer/view/customer_view.proto +++ b/samples/java-protobuf-eventsourced-customer-registry/src/main/proto/customer/view/customer_view.proto @@ -73,6 +73,18 @@ service CustomerByName { }; // end::ignore-other-events[] + // tag::snapshot-handler[] + rpc ProcessSnapshot(domain.CustomerState) returns (api.Customer) { + option (kalix.method).eventing.in = { + event_sourced_entity: "customers" + start_from_snapshots: true + }; + option (kalix.method).view.update = { + table: "customers" + }; + } + // end::snapshot-handler[] + rpc GetCustomers(ByNameRequest) returns (stream api.Customer) { option (kalix.method).view.query = { query: "SELECT * FROM customers WHERE name = :customer_name"