Skip to content

Commit c6df6d3

Browse files
aludwikoefgpinto
andauthored
feat: workflow delete effect (#2305)
* feat: workflows deletion * fixing readme files * Update docs/src/modules/java-protobuf/pages/workflows.adoc Co-authored-by: Eduardo Pinto <[email protected]> * runtime bump --------- Co-authored-by: Eduardo Pinto <[email protected]>
1 parent e74f0e7 commit c6df6d3

File tree

24 files changed

+249
-30
lines changed

24 files changed

+249
-30
lines changed

docs/src/modules/java-protobuf/pages/workflows.adoc

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,36 @@ IMPORTANT: For simplicity purposes, we are returning the internal state directly
179179

180180
A full transfer workflow source code is available https://github.com/lightbend/kalix-jvm-sdk/tree/main/samples/java-protobuf-transfer-workflow[here (Java Protobuf), {tab-icon}, window="new"] or https://github.com/lightbend/kalix-jvm-sdk/tree/main/samples/scala-protobuf-transfer-workflow[here (Scala Protobuf), {tab-icon}, window="new"]. Follow the `README` file to run and test it.
181181

182+
183+
== Deleting state
184+
185+
If you want to delete the workflow, you can use the `delete` method from the effect API.
186+
187+
[.tabset]
188+
Java::
189+
+
190+
[source,java,indent=0]
191+
.src/main/java/com/example/transfer/api/TransferWorkflow.java
192+
----
193+
include::example$java-protobuf-transfer-workflow-compensation/src/main/java/com/example/transfer/api/TransferWorkflow.java[tag=delete-workflow]
194+
----
195+
<1> Deletes the workflow state.
196+
197+
Scala::
198+
+
199+
[source,scala,indent=0]
200+
.src/main/scala/com/example/transfer/api/TransferWorkflow.scala
201+
----
202+
include::example$scala-protobuf-transfer-workflow-compensation/src/main/scala/com/example/transfer/api/TransferWorkflow.scala[tag=delete-workflow]
203+
----
204+
<1> Deletes the workflow state.
205+
206+
When you give the instruction to delete a running workflow it's equivalent to ending and deleting a workflow. For already finished workflows, it is possible to delete them in the command handler, but any state changes will be ignored. By default, the existence of the workflow is completely cleaned up after a week.
207+
208+
You can still handle read requests to the workflow until it has been completely removed, but the current state will be empty (or null). To check whether the workflow has been deleted, you can use the inherited method `isDeleted`.
209+
210+
It is best to not reuse the same workflow id after deletion, but if that happens after the workflow has been completely removed it will be instantiated as a completely new workflow without any knowledge of previous state.
211+
182212
== Pausing workflow
183213

184214
A long-running workflow can be paused while waiting for some additional information to continue processing. A special `pause` transition can be used to inform Kalix that the execution of the Workflow should be postponed. By launching a request to a Workflow endpoint, the user can then resume the processing. Additionally, a Kalix Timer can be scheduled to automatically inform the Workflow that the expected time for the additional data has passed.

samples/java-protobuf-transfer-workflow-compensation/README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,25 +41,25 @@ With both the Kalix Runtime and your service running, any defined endpoints shou
4141
Create wallet `a` with an initial balance
4242

4343
```shell
44-
grpcurl -plaintext -d '{"wallet_id": "a", "balance": 100}' localhost:9000 com.example.wallet.api.TransferWorkflowService/Create
44+
grpcurl -plaintext -d '{"wallet_id": "a", "balance": 100}' localhost:9000 com.example.wallet.api.WalletService/Create
4545
```
4646

4747
Create wallet `b` with an initial balance
4848

4949
```shell
50-
grpcurl -plaintext -d '{"wallet_id": "b", "balance": 100}' localhost:9000 com.example.wallet.api.TransferWorkflowService/Create
50+
grpcurl -plaintext -d '{"wallet_id": "b", "balance": 100}' localhost:9000 com.example.wallet.api.WalletService/Create
5151
```
5252

5353
Get wallet `a` current balance
5454

5555
```shell
56-
grpcurl -plaintext -d '{"wallet_id": "a"}' localhost:9000 com.example.wallet.api.TransferWorkflowService/GetWalletState
56+
grpcurl -plaintext -d '{"wallet_id": "a"}' localhost:9000 com.example.wallet.api.WalletService/GetWalletState
5757
```
5858

5959
Get wallet `b` current balance
6060

6161
```shell
62-
grpcurl -plaintext -d '{"wallet_id": "b"}' localhost:9000 com.example.wallet.api.TransferWorkflowService/GetWalletState
62+
grpcurl -plaintext -d '{"wallet_id": "b"}' localhost:9000 com.example.wallet.api.WalletService/GetWalletState
6363
```
6464

6565
Start transfer from wallet `a` to wallet `b`

samples/java-protobuf-transfer-workflow-compensation/src/it/java/com/example/transfer/api/TransferWorkflowIntegrationTest.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,13 @@ public TransferWorkflowIntegrationTest() {
5151
public void showTransferFunds() throws Exception {
5252
String walletAId = randomId();
5353
String walletBId = randomId();
54+
String transferId = "1";
5455
InitialBalance initWalletA = InitialBalance.newBuilder().setWalletId(walletAId).setBalance(100).build();
5556
InitialBalance initWalletB = InitialBalance.newBuilder().setWalletId(walletBId).setBalance(100).build();
5657
walletClient.create(initWalletA).toCompletableFuture().get(5, SECONDS);
5758
walletClient.create(initWalletB).toCompletableFuture().get(5, SECONDS);
5859

59-
Transfer transfer = Transfer.newBuilder().setTransferId("1").setFrom(walletAId).setTo(walletBId).setAmount(10).build();
60+
Transfer transfer = Transfer.newBuilder().setTransferId(transferId).setFrom(walletAId).setTo(walletBId).setAmount(10).build();
6061
workflowClient.start(transfer).toCompletableFuture().get(5, SECONDS);
6162

6263
await().atMost(10, SECONDS).untilAsserted(() -> {
@@ -68,6 +69,14 @@ public void showTransferFunds() throws Exception {
6869
assertEquals(90, walletA.getBalance());
6970
assertEquals(110, walletB.getBalance());
7071
});
72+
73+
var response = workflowClient.hasBeenDeleted(TransferApi.HasBeenDeletedRequest.newBuilder().setTransferId(transferId).build()).toCompletableFuture().get(5, SECONDS);
74+
assertEquals(false, response.getDeleted());
75+
76+
workflowClient.delete(TransferApi.DeleteRequest.newBuilder().setTransferId(transferId).build()).toCompletableFuture().get(5, SECONDS);
77+
78+
var response2 = workflowClient.hasBeenDeleted(TransferApi.HasBeenDeletedRequest.newBuilder().setTransferId(transferId).build()).toCompletableFuture().get(5, SECONDS);
79+
assertEquals(true, response2.getDeleted());
7180
}
7281

7382
@Test

samples/java-protobuf-transfer-workflow-compensation/src/main/java/com/example/transfer/api/TransferWorkflow.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,4 +260,18 @@ public Effect<TransferState> getTransferState(TransferState currentState, Transf
260260
return effects().reply(currentState);
261261
}
262262
}
263+
264+
// tag::delete-workflow[]
265+
@Override
266+
public Effect<Empty> delete(TransferState currentState, TransferApi.DeleteRequest deleteRequest) {
267+
return effects()
268+
.delete() // <1>
269+
.thenReply(Empty.getDefaultInstance());
270+
}
271+
// end::delete-workflow[]
272+
273+
@Override
274+
public Effect<TransferApi.HasBeenDeletedResponse> hasBeenDeleted(TransferState currentState, TransferApi.HasBeenDeletedRequest hasBeenDeletedRequest) {
275+
return effects().reply(TransferApi.HasBeenDeletedResponse.newBuilder().setDeleted(isDeleted()).build());
276+
}
263277
}

samples/java-protobuf-transfer-workflow-compensation/src/main/proto/com/example/transfer/transfer_api.proto

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,18 @@ message AcceptationTimeoutRequest {
2828
string transfer_id = 1 [(kalix.field).id = true];
2929
}
3030

31+
message DeleteRequest{
32+
string transfer_id = 1 [(kalix.field).id = true];
33+
}
34+
35+
message HasBeenDeletedRequest{
36+
string transfer_id = 1 [(kalix.field).id = true];
37+
}
38+
39+
message HasBeenDeletedResponse{
40+
bool deleted = 1;
41+
}
42+
3143
service TransferWorkflowService {
3244
option (kalix.codegen) = {
3345
workflow: {
@@ -40,4 +52,6 @@ service TransferWorkflowService {
4052
rpc Accept(AcceptRequest) returns (google.protobuf.Empty) {}
4153
rpc AcceptationTimeout(AcceptationTimeoutRequest) returns (google.protobuf.Empty) {}
4254
rpc GetTransferState(GetRequest) returns (com.example.transfer.domain.TransferState) {}
55+
rpc Delete(DeleteRequest) returns (google.protobuf.Empty) {}
56+
rpc HasBeenDeleted(HasBeenDeletedRequest) returns (HasBeenDeletedResponse) {}
4357
}

samples/java-protobuf-transfer-workflow/README.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,25 +41,25 @@ With both the Kalix Runtime and your service running, any defined endpoints shou
4141
Create wallet `a` with an initial balance
4242

4343
```shell
44-
grpcurl -plaintext -d '{"wallet_id": "a", "balance": 100}' localhost:9000 com.example.wallet.api.TransferWorkflowService/Create
44+
grpcurl -plaintext -d '{"wallet_id": "a", "balance": 100}' localhost:9000 com.example.wallet.api.WalletService/Create
4545
```
4646

4747
Create wallet `b` with an initial balance
4848

4949
```shell
50-
grpcurl -plaintext -d '{"wallet_id": "b", "balance": 100}' localhost:9000 com.example.wallet.api.TransferWorkflowService/Create
50+
grpcurl -plaintext -d '{"wallet_id": "b", "balance": 100}' localhost:9000 com.example.wallet.api.WalletService/Create
5151
```
5252

5353
Get wallet `a` current balance
5454

5555
```shell
56-
grpcurl -plaintext -d '{"wallet_id": "a"}' localhost:9000 com.example.wallet.api.TransferWorkflowService/GetWalletState
56+
grpcurl -plaintext -d '{"wallet_id": "a"}' localhost:9000 com.example.wallet.api.WalletService/GetWalletState
5757
```
5858

5959
Get wallet `b` current balance
6060

6161
```shell
62-
grpcurl -plaintext -d '{"wallet_id": "b"}' localhost:9000 com.example.wallet.api.TransferWorkflowService/GetWalletState
62+
grpcurl -plaintext -d '{"wallet_id": "b"}' localhost:9000 com.example.wallet.api.WalletService/GetWalletState
6363
```
6464

6565
Start transfer from wallet `a` to wallet `b`

samples/scala-protobuf-transfer-workflow-compensation/README.md

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,25 +29,25 @@ With both the Kalix Runtime and your service running, any defined endpoints shou
2929
Create wallet `a` with an initial balance
3030

3131
```shell
32-
grpcurl -plaintext -d '{"wallet_id": "a", "balance": 100}' localhost:9000 com.example.wallet.api.TransferWorkflowService/Create
32+
grpcurl -plaintext -d '{"wallet_id": "a", "balance": 100}' localhost:9000 com.example.wallet.api.WalletService/Create
3333
```
3434

3535
Create wallet `b` with an initial balance
3636

3737
```shell
38-
grpcurl -plaintext -d '{"wallet_id": "b", "balance": 100}' localhost:9000 com.example.wallet.api.TransferWorkflowService/Create
38+
grpcurl -plaintext -d '{"wallet_id": "b", "balance": 100}' localhost:9000 com.example.wallet.api.WalletService/Create
3939
```
4040

4141
Get wallet `a` current balance
4242

4343
```shell
44-
grpcurl -plaintext -d '{"wallet_id": "a"}' localhost:9000 com.example.wallet.api.TransferWorkflowService/GetWalletState
44+
grpcurl -plaintext -d '{"wallet_id": "a"}' localhost:9000 com.example.wallet.api.WalletService/GetWalletState
4545
```
4646

4747
Get wallet `b` current balance
4848

4949
```shell
50-
grpcurl -plaintext -d '{"wallet_id": "b"}' localhost:9000 com.example.wallet.api.TransferWorkflowService/GetWalletState
50+
grpcurl -plaintext -d '{"wallet_id": "b"}' localhost:9000 com.example.wallet.api.WalletService/GetWalletState
5151
```
5252

5353
Start transfer from wallet `a` to wallet `b`
@@ -62,6 +62,10 @@ Get transfer state
6262
grpcurl -plaintext -d '{"transfer_id": "1"}' localhost:9000 com.example.transfer.api.TransferWorkflowService/GetTransferState
6363
```
6464

65+
```shell
66+
grpcurl -plaintext -d '{"transfer_id": "1"}' localhost:9000 com.example.transfer.api.TransferWorkflowService/GetTransferState
67+
```
68+
6569
## Deploying
6670

6771
To deploy your service, install the `kalix` CLI as documented in

samples/scala-protobuf-transfer-workflow-compensation/src/main/proto/com/example/transfer/transfer_api.proto

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,18 @@ message AcceptationTimeoutRequest {
2626
string transfer_id = 1 [(kalix.field).id = true];
2727
}
2828

29+
message DeleteRequest{
30+
string transfer_id = 1 [(kalix.field).id = true];
31+
}
32+
33+
message HasBeenDeletedRequest{
34+
string transfer_id = 1 [(kalix.field).id = true];
35+
}
36+
37+
message HasBeenDeletedResponse{
38+
bool deleted = 1;
39+
}
40+
2941
service TransferWorkflowService {
3042
option (kalix.codegen) = {
3143
workflow: {
@@ -38,4 +50,6 @@ service TransferWorkflowService {
3850
rpc Accept(AcceptRequest) returns (google.protobuf.Empty) {}
3951
rpc AcceptationTimeout(AcceptationTimeoutRequest) returns (google.protobuf.Empty) {}
4052
rpc GetTransferState(GetRequest) returns (com.example.transfer.domain.TransferState) {}
53+
rpc Delete(DeleteRequest) returns (google.protobuf.Empty) {}
54+
rpc HasBeenDeleted(HasBeenDeletedRequest) returns (HasBeenDeletedResponse) {}
4155
}

samples/scala-protobuf-transfer-workflow-compensation/src/main/scala/com/example/transfer/api/TransferWorkflow.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ class TransferWorkflow(context: WorkflowContext) extends AbstractTransferWorkflo
160160
.addStep(waitForAcceptation)
161161
.addStep(compensateWithdraw) // <4>
162162
.addStep(failoverHandler);
163-
// end::recover-strategy[]
163+
// end::recover-strategy[]
164164
}
165165

166166
override def start(currentState: TransferState, transfer: Transfer): Effect[Empty] = {
@@ -230,4 +230,16 @@ class TransferWorkflow(context: WorkflowContext) extends AbstractTransferWorkflo
230230
}
231231
}
232232

233+
// tag::delete-workflow[]
234+
override def delete(currentState: TransferState, deleteRequest: DeleteRequest): Effect[Empty] = {
235+
effects.delete // <1>
236+
.thenReply(Empty())
237+
}
238+
// end::delete-workflow[]
239+
240+
override def hasBeenDeleted(
241+
currentState: TransferState,
242+
hasBeenDeletedRequest: HasBeenDeletedRequest): Effect[HasBeenDeletedResponse] =
243+
effects.reply(HasBeenDeletedResponse(isDeleted))
244+
233245
}

samples/scala-protobuf-transfer-workflow-compensation/src/test/scala/com/example/transfer/api/TransferWorkflowIntegrationSpec.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,23 @@ class TransferWorkflowIntegrationSpec
4747
walletClient.create(initWalletA).futureValue
4848
walletClient.create(initWalletB).futureValue
4949

50-
transferClient.start(Transfer("1", "a", "b", 10)).futureValue
50+
val transferId = "1"
51+
transferClient.start(Transfer(transferId, "a", "b", 10)).futureValue
5152

5253
eventually {
5354
val walletA = walletClient.getWalletState(api.GetRequest("a")).futureValue
5455
val walletB = walletClient.getWalletState(api.GetRequest("b")).futureValue
5556
walletA.balance shouldBe 90
5657
walletB.balance shouldBe 110
5758
}
59+
60+
val response = transferClient.hasBeenDeleted(HasBeenDeletedRequest(transferId)).futureValue
61+
response.deleted shouldBe false
62+
63+
transferClient.delete(DeleteRequest(transferId)).futureValue
64+
65+
val response2 = transferClient.hasBeenDeleted(HasBeenDeletedRequest(transferId)).futureValue
66+
response2.deleted shouldBe true
5867
}
5968

6069
"transfer funds with acceptation" in {

0 commit comments

Comments
 (0)