Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,4 @@ sdk/java-sdk-spring/tempo-data
sdk-version-raw.txt
sdk-version.txt
sbt-plugin/src/sbt-test/sbt-kalix/*/global/
.claude/settings.local.json
2 changes: 2 additions & 0 deletions codegen/core/src/main/scala/kalix/codegen/ModelBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ object ModelBuilder {
outToTopic: Boolean,
ignore: Boolean,
handleDeletes: Boolean,
startFromSnapshots: Boolean,
viewTable: String) {

def isUnary: Boolean = !streamedInput && !streamedOutput
Expand All @@ -350,6 +351,7 @@ object ModelBuilder {
outToTopic = eventing.hasOut && eventing.getOut.hasTopic,
ignore = eventing.hasIn && eventing.getIn.getIgnore,
handleDeletes = eventing.hasIn && eventing.getIn.getHandleDeletes,
startFromSnapshots = eventing.hasIn && eventing.getIn.getStartFromSnapshots,
viewUpdate.getTable)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ object ActionServiceSourceGenerator {
|public Effect<$outputType> ${lowerFirst(methodName)}() {
| throw new RuntimeException("The delete handler for `$methodName` is not implemented, yet");
|}""".stripMargin
} else if (cmd.startFromSnapshots) {
s"""|@Override
|public Effect<$outputType> ${lowerFirst(methodName)}($inputTypeFullName $input) {
| // Snapshot handler processes entity state before any events
| ${jsonTopicHint}throw new RuntimeException("The snapshot handler for `$methodName` is not implemented, yet");
|}""".stripMargin
} else {
s"""|@Override
|public Effect<$outputType> ${lowerFirst(methodName)}($inputTypeFullName $input) {
Expand Down Expand Up @@ -131,6 +137,9 @@ object ActionServiceSourceGenerator {
if (cmd.isUnary) {
if (cmd.handleDeletes) {
s"""|public abstract Effect<$outputType> ${lowerFirst(methodName)}();""".stripMargin
} else if (cmd.startFromSnapshots) {
s"""|/** Snapshot handler for processing entity snapshots. */
|public abstract Effect<$outputType> ${lowerFirst(methodName)}($inputTypeFullName $input);""".stripMargin
} else {
s"""|public abstract Effect<$outputType> ${lowerFirst(methodName)}($inputTypeFullName $input);""".stripMargin
}
Expand Down Expand Up @@ -180,6 +189,11 @@ object ActionServiceSourceGenerator {
| return action()
| .${lowerFirst(methodName)}();
|""".stripMargin
} else if (cmd.startFromSnapshots) {
s"""|case "$methodName":
| return action()
| .${lowerFirst(methodName)}(($inputTypeFullName) message.payload());
|""".stripMargin
} else {
s"""|case "$methodName":
| return action()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ object ViewServiceSourceGenerator {
c"""|case "$methodName":
| return view().${lowerFirst(methodName)}(state);
|"""
} else if (cmd.startFromSnapshots) {
c"""|case "$methodName":
| return view().${lowerFirst(methodName)}(
| state,
| ($inputType) event);
|"""
} else {
c"""|case "$methodName":
| return view().${lowerFirst(methodName)}(
Expand Down Expand Up @@ -261,6 +267,15 @@ object ViewServiceSourceGenerator {
| throw new UnsupportedOperationException("Delete handler for '${update.name}' not implemented yet");
|}
|"""
} else if (update.startFromSnapshots) {
c"""|@Override
|public $View.UpdateEffect<$stateType> ${lowerFirst(update.name)}(
| $stateType state,
| ${update.inputType} ${lowerFirst(update.inputType.name)}) {
| // Snapshot handler processes entity state on view startup before any events
| throw new UnsupportedOperationException("Snapshot handler for '${update.name}' not implemented yet");
|}
|"""
} else {
c"""|@Override
|public $View.UpdateEffect<$stateType> ${lowerFirst(update.name)}(
Expand Down Expand Up @@ -342,6 +357,12 @@ object ViewServiceSourceGenerator {
c"""|public abstract $View.UpdateEffect<$stateType> ${lowerFirst(update.name)}(
| $stateType state);
|"""
} else if (update.startFromSnapshots) {
c"""|/** Snapshot handler for initializing view state from entity snapshots. */
|public abstract $View.UpdateEffect<$stateType> ${lowerFirst(update.name)}(
| $stateType state,
| ${update.inputType} ${lowerFirst(update.inputType.name)});
|"""
} else {
c"""|public abstract $View.UpdateEffect<$stateType> ${lowerFirst(update.name)}(
| $stateType state,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ object ActionServiceSourceGenerator {
c"""|override def ${lowerFirst(methodName)}(): $Action.Effect[$outputType] = {
| ${jsonTopicHint}throw new RuntimeException("The delete handler for `$methodName` is not implemented, yet")
|}"""
} else if (cmd.startFromSnapshots) {
c"""|override def ${lowerFirst(methodName)}($input: $inputType): $Action.Effect[$outputType] = {
| // Snapshot handler processes entity state before any events
| ${jsonTopicHint}throw new RuntimeException("The snapshot handler for `$methodName` is not implemented, yet")
|}"""
} else {
c"""|override def ${lowerFirst(methodName)}($input: $inputType): $Action.Effect[$outputType] = {
| ${jsonTopicHint}throw new RuntimeException("The command handler for `$methodName` is not implemented, yet")
Expand Down Expand Up @@ -107,6 +112,9 @@ object ActionServiceSourceGenerator {
if (cmd.isUnary) {
if (cmd.handleDeletes) {
c"""|def ${lowerFirst(methodName)}(): $Action.Effect[$outputType]"""
} else if (cmd.startFromSnapshots) {
c"""|/** Snapshot handler for processing entity snapshots. */
|def ${lowerFirst(methodName)}($input: $inputType): $Action.Effect[$outputType]"""
} else {
c"""|def ${lowerFirst(methodName)}($input: $inputType): $Action.Effect[$outputType]"""
}
Expand Down Expand Up @@ -155,6 +163,10 @@ object ActionServiceSourceGenerator {
c"""|case "$methodName" =>
| action.${lowerFirst(methodName)}()
|"""
} else if (cmd.startFromSnapshots) {
c"""|case "$methodName" =>
| action.${lowerFirst(methodName)}(message.payload.asInstanceOf[$inputType])
|"""
} else {
c"""|case "$methodName" =>
| action.${lowerFirst(methodName)}(message.payload.asInstanceOf[$inputType])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,12 @@ object ViewServiceSourceGenerator {
| view.${lowerFirst(methodName)}(
| state)
|""".stripMargin
} else if (cmd.startFromSnapshots) {
s"""|case "$methodName" =>
| view.${lowerFirst(methodName)}(
| state,
| event.asInstanceOf[${typeName(cmd.inputType)}])
|""".stripMargin
} else {
s"""|case "$methodName" =>
| view.${lowerFirst(methodName)}(
Expand Down Expand Up @@ -261,6 +267,13 @@ object ViewServiceSourceGenerator {
| state: $stateType): UpdateEffect[$stateType] =
| throw new UnsupportedOperationException("Delete handler for '${update.name}' not implemented yet")
|""".stripMargin
} else if (update.startFromSnapshots) {
s"""|override def ${lowerFirst(update.name)}(
| state: $stateType,
| ${lowerFirst(update.inputType.name)}: ${typeName(update.inputType)}): UpdateEffect[$stateType] =
| // Snapshot handler processes entity state on view startup before any events
| throw new UnsupportedOperationException("Snapshot handler for '${update.name}' not implemented yet")
|""".stripMargin
} else {
s"""|override def ${lowerFirst(update.name)}(
| state: $stateType,
Expand Down Expand Up @@ -339,6 +352,12 @@ object ViewServiceSourceGenerator {
s"""|def ${lowerFirst(update.name)}(
| state: $stateType): View.UpdateEffect[$stateType]
|""".stripMargin
} else if (update.startFromSnapshots) {
s"""|/** Snapshot handler for initializing view state from entity snapshots. */
|def ${lowerFirst(update.name)}(
| state: $stateType,
| ${lowerFirst(update.inputType.name)}: ${typeName(update.inputType)}): View.UpdateEffect[$stateType]
|""".stripMargin
} else {
s"""|def ${lowerFirst(update.name)}(
| state: $stateType,
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ object Dependencies {
val RuntimeVersion = System.getProperty(
"kalix-runtime.version",
// temporarily accept the old system property name
System.getProperty("kalix-proxy.version", "1.2.38"))
System.getProperty("kalix-proxy.version", "1.2.38-1-6f468ecf-SNAPSHOT"))
}

// changing the Scala version of the Java SDK affects end users
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<groupId>io.kalix</groupId>
<artifactId>kalix-java-sdk-protobuf-parent</artifactId>
<version>1.5.21</version>
<version>1.5.21-4-53b2fd32-dev</version>
</parent>

<groupId>io.kalix.samples</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public CustomerApi.Customer emptyState() { // <2>

@Override // <3>
public UpdateEffect<CustomerApi.Customer> processCustomerCreated(
CustomerApi.Customer state,
CustomerDomain.CustomerCreated customerCreated) {
CustomerApi.Customer state,
CustomerDomain.CustomerCreated customerCreated) {
if (state != null) {
return effects().ignore(); // already created
} else {
Expand All @@ -44,20 +44,28 @@ public UpdateEffect<CustomerApi.Customer> processCustomerCreated(

@Override // <3>
public UpdateEffect<CustomerApi.Customer> processCustomerNameChanged(
CustomerApi.Customer state,
CustomerDomain.CustomerNameChanged customerNameChanged) {
CustomerApi.Customer state,
CustomerDomain.CustomerNameChanged customerNameChanged) {
return effects().updateState(
state.toBuilder().setName(customerNameChanged.getNewName()).build());
}

@Override // <3>
public UpdateEffect<CustomerApi.Customer> processCustomerAddressChanged(
CustomerApi.Customer state,
CustomerDomain.CustomerAddressChanged customerAddressChanged) {
CustomerApi.Customer state,
CustomerDomain.CustomerAddressChanged customerAddressChanged) {
return effects().updateState(
state.toBuilder().setAddress(convertToApi(customerAddressChanged.getNewAddress())).build());
}

@Override
public UpdateEffect<CustomerApi.Customer> processSnapshot(
CustomerApi.Customer state,
CustomerDomain.CustomerState customerState) {
// Initialize view from entity snapshot
return effects().updateState(convertToApi(customerState));
}

private CustomerApi.Customer convertToApi(CustomerDomain.CustomerState s) {
CustomerApi.Address address = CustomerApi.Address.getDefaultInstance();
if (s.hasAddress()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,18 @@ service CustomerByName {
};
// end::ignore-other-events[]

// tag::snapshot-handler[]
rpc ProcessSnapshot(domain.CustomerState) returns (api.Customer) {
option (kalix.method).eventing.in = {
event_sourced_entity: "customers"
start_from_snapshots: true
};
option (kalix.method).view.update = {
table: "customers"
};
}
// end::snapshot-handler[]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested more combinations with snapshots in this sample:

  • plain action
  • s2s to the subscriber service, which has a view


rpc GetCustomers(ByNameRequest) returns (stream api.Customer) {
option (kalix.method).view.query = {
query: "SELECT * FROM customers WHERE name = :customer_name"
Expand Down
Loading