Skip to content

Commit c86ae92

Browse files
Bump minimum protocol version to 2 (#401)
Move ctx.run feature out of experimental
1 parent 8eb78be commit c86ae92

File tree

12 files changed

+105
-229
lines changed

12 files changed

+105
-229
lines changed

README.md

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ gradle init --type java-application
4242
Add the runtime dependency [sdk-api](sdk-api) and the annotation processor dependency [sdk-api-gen](sdk-api-gen):
4343

4444
```
45-
annotationProcessor("dev.restate:sdk-api-gen:1.0.0")
46-
implementation("dev.restate:sdk-api:1.0.0")
45+
annotationProcessor("dev.restate:sdk-api-gen:1.2.0")
46+
implementation("dev.restate:sdk-api:1.2.0")
4747
```
4848

4949
### Setup a project (Kotlin)
@@ -65,8 +65,8 @@ plugins {
6565
Add the runtime dependency [sdk-api-kotlin](sdk-api-kotlin) and the ksp dependency [sdk-api-gen](sdk-api-kotlin-gen):
6666

6767
```
68-
ksp("dev.restate:sdk-api-kotlin-gen:1.0.0")
69-
implementation("dev.restate:sdk-api-kotlin:1.0.0")
68+
ksp("dev.restate:sdk-api-kotlin-gen:1.2.0")
69+
implementation("dev.restate:sdk-api-kotlin:1.2.0")
7070
```
7171

7272
### Implement your first Restate component (Java)
@@ -98,7 +98,7 @@ public class Greeter {
9898
When using composite types/POJOs for input/output, [Jackson Databind](https://github.com/FasterXML/jackson) will be used. The Jackson dependency is not automatically included, you must add it with [`sdk-serde-jackson`](sdk-serde-jackson):
9999

100100
```
101-
implementation("dev.restate:sdk-serde-jackson:1.0.0")
101+
implementation("dev.restate:sdk-serde-jackson:1.2.0")
102102
```
103103

104104
If you want to store types/POJOs in state, use `JacksonSerdes`:
@@ -138,7 +138,7 @@ When using composite data types for input/output, [`kotlinx.serialization`](http
138138
To deploy the Restate service as HTTP server, add [`sdk-http-vertx`](sdk-http-vertx) to the dependencies. For example, in Gradle:
139139

140140
```
141-
implementation("dev.restate:sdk-http-vertx:1.0.0")
141+
implementation("dev.restate:sdk-http-vertx:1.2.0")
142142
```
143143

144144
To deploy the service, add the following code to the `main`. For example in Java:
@@ -172,7 +172,7 @@ gradle run
172172
To deploy the Restate service as Lambda, add [`sdk-lambda`](sdk-lambda) to the dependencies. For example, in Gradle:
173173

174174
```
175-
implementation("dev.restate:sdk-lambda:1.0.0")
175+
implementation("dev.restate:sdk-lambda:1.2.0")
176176
```
177177

178178
Configure the build tool to generate Fat-JARs, which are required by AWS Lambda to correctly load the JAR. For example, using Gradle:
@@ -299,10 +299,10 @@ This library follows [Semantic Versioning](https://semver.org/).
299299

300300
The compatibility with Restate is described in the following table:
301301

302-
| Restate Server\sdk-java | 1.0 | 1.1 |
303-
|-------------------------|-----|-----|
304-
| 1.0 |||
305-
| 1.1 |||
302+
| Restate Server\sdk-java | 1.0 | 1.1 | 1.2 |
303+
|-------------------------|-----|-----|-----|
304+
| 1.0 ||||
305+
| 1.1 ||||
306306

307307
## Contributing
308308

sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ContextImpl.kt

Lines changed: 0 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -130,77 +130,6 @@ internal class ContextImpl internal constructor(internal val syscalls: Syscalls)
130130
}
131131
}
132132

133-
override suspend fun <T : Any?> runBlock(
134-
serde: Serde<T>,
135-
name: String,
136-
block: suspend () -> T
137-
): T {
138-
val exitResult =
139-
suspendCancellableCoroutine { cont: CancellableContinuation<CompletableDeferred<ByteBuffer>>
140-
->
141-
syscalls.enterSideEffectBlock(
142-
name,
143-
object : EnterSideEffectSyscallCallback {
144-
override fun onSuccess(t: ByteBuffer?) {
145-
val deferred: CompletableDeferred<ByteBuffer> = CompletableDeferred()
146-
deferred.complete(t!!)
147-
cont.resume(deferred)
148-
}
149-
150-
override fun onFailure(t: TerminalException) {
151-
val deferred: CompletableDeferred<ByteBuffer> = CompletableDeferred()
152-
deferred.completeExceptionally(t)
153-
cont.resume(deferred)
154-
}
155-
156-
override fun onCancel(t: Throwable?) {
157-
cont.cancel(t)
158-
}
159-
160-
override fun onNotExecuted() {
161-
cont.resume(CompletableDeferred())
162-
}
163-
})
164-
}
165-
166-
if (exitResult.isCompleted) {
167-
return serde.deserializeWrappingException(syscalls, exitResult.await())!!
168-
}
169-
170-
var actionReturnValue: T? = null
171-
var actionFailure: Throwable? = null
172-
try {
173-
actionReturnValue = block()
174-
} catch (t: Throwable) {
175-
actionFailure = t
176-
}
177-
178-
val exitCallback =
179-
object : ExitSideEffectSyscallCallback {
180-
override fun onSuccess(t: ByteBuffer?) {
181-
exitResult.complete(t!!)
182-
}
183-
184-
override fun onFailure(t: TerminalException) {
185-
exitResult.completeExceptionally(t)
186-
}
187-
188-
override fun onCancel(t: Throwable?) {
189-
exitResult.cancel(CancellationException(message = null, cause = t))
190-
}
191-
}
192-
193-
if (actionFailure != null) {
194-
syscalls.exitSideEffectBlockWithException(actionFailure, null, exitCallback)
195-
} else {
196-
syscalls.exitSideEffectBlock(
197-
serde.serializeWrappingException(syscalls, actionReturnValue), exitCallback)
198-
}
199-
200-
return serde.deserializeWrappingException(syscalls, exitResult.await())
201-
}
202-
203-
@UsePreviewContext
204133
override suspend fun <T : Any?> runBlock(
205134
serde: Serde<T>,
206135
name: String,

sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/RetryPolicy.kt

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import kotlin.time.Duration
1212
import kotlin.time.Duration.Companion.milliseconds
1313

1414
/** Retry policy configuration. */
15-
@UsePreviewContext
1615
data class RetryPolicy(
1716
/** Initial retry delay for the first retry attempt. */
1817
val initialDelay: Duration,
@@ -46,9 +45,7 @@ data class RetryPolicy(
4645
val maxDuration: Duration? = null
4746
) {
4847

49-
@UsePreviewContext
50-
data class Builder
51-
internal constructor(
48+
data class Builder(
5249
var initialDelay: Duration = 100.milliseconds,
5350
var exponentiationFactor: Float = 2.0f,
5451
var maxDelay: Duration? = null,
@@ -65,7 +62,6 @@ data class RetryPolicy(
6562
}
6663
}
6764

68-
@UsePreviewContext
6965
fun retryPolicy(init: RetryPolicy.Builder.() -> Unit): RetryPolicy {
7066
val builder = RetryPolicy.Builder()
7167
builder.init()

sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,8 @@ sealed interface Context {
111111
* observability tools.
112112
*
113113
* <p>The closure should tolerate retries, that is Restate might re-execute the closure multiple
114-
* times until it records a result.
114+
* times until it records a result. To control and limit the amount of retries, pass a
115+
* [RetryPolicy] to this function.
115116
*
116117
* <h2>Error handling</h2>
117118
*
@@ -147,16 +148,6 @@ sealed interface Context {
147148
* @param T type of the return value.
148149
* @return value of the runBlock operation.
149150
*/
150-
suspend fun <T : Any?> runBlock(serde: Serde<T>, name: String = "", block: suspend () -> T): T
151-
152-
/**
153-
* Like [runBlock], but using a custom retry policy.
154-
*
155-
* When a retry policy is not specified, the `runBlock` will be retried using the
156-
* [Restate invoker retry policy](https://docs.restate.dev/operate/configuration/server), which by
157-
* default retries indefinitely.
158-
*/
159-
@UsePreviewContext
160151
suspend fun <T : Any?> runBlock(
161152
serde: Serde<T>,
162153
name: String = "",
@@ -207,7 +198,8 @@ sealed interface Context {
207198
* want to perform <b>non-deterministic operations</b>.
208199
*
209200
* <p>The closure should tolerate retries, that is Restate might re-execute the closure multiple
210-
* times until it records a result.
201+
* times until it records a result. To control and limit the amount of retries, pass a [RetryPolicy]
202+
* to this function.
211203
*
212204
* <h2>Error handling</h2>
213205
*
@@ -241,14 +233,6 @@ sealed interface Context {
241233
* @param T type of the return value.
242234
* @return value of the runBlock operation.
243235
*/
244-
suspend inline fun <reified T : Any> Context.runBlock(
245-
name: String = "",
246-
noinline block: suspend () -> T
247-
): T {
248-
return this.runBlock(KtSerdes.json(), name, block)
249-
}
250-
251-
@UsePreviewContext
252236
suspend inline fun <reified T : Any> Context.runBlock(
253237
name: String = "",
254238
retryPolicy: RetryPolicy? = null,

sdk-api-kotlin/src/test/kotlin/dev/restate/sdk/kotlin/SideEffectTest.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ class SideEffectTest : SideEffectTestSuite() {
7777
ctx.runBlock(name) { throw IllegalStateException(reason) }
7878
}
7979

80-
@OptIn(UsePreviewContext::class)
8180
override fun failingSideEffectWithRetryPolicy(
8281
reason: String,
8382
retryPolicy: dev.restate.sdk.common.RetryPolicy?

sdk-api/src/main/java/dev/restate/sdk/Context.java

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ default void sleep(Duration duration) {
104104
* the observability tools.
105105
*
106106
* <p>The closure should tolerate retries, that is Restate might re-execute the closure multiple
107-
* times until it records a result.
107+
* times until it records a result. You can control and limit the amount of retries using {@link
108+
* #run(String, Serde, RetryPolicy, ThrowingSupplier)}.
108109
*
109110
* <p><b>Error handling</b>: Errors occurring within this closure won't be propagated to the
110111
* caller, unless they are {@link TerminalException}. Consider the following code:
@@ -140,7 +141,70 @@ default void sleep(Duration duration) {
140141
* @param <T> type of the return value.
141142
* @return value of the run operation.
142143
*/
143-
<T> T run(String name, Serde<T> serde, ThrowingSupplier<T> action) throws TerminalException;
144+
default <T> T run(String name, Serde<T> serde, ThrowingSupplier<T> action)
145+
throws TerminalException {
146+
return run(name, serde, null, action);
147+
}
148+
149+
/**
150+
* Like {@link #run(String, Serde, ThrowingSupplier)}, but using a custom retry policy.
151+
*
152+
* <p>When a retry policy is not specified, the {@code run} will be retried using the <a
153+
* href="https://docs.restate.dev/operate/configuration/server">Restate invoker retry policy</a>,
154+
* which by default retries indefinitely.
155+
*
156+
* @see RetryPolicy
157+
*/
158+
<T> T run(String name, Serde<T> serde, RetryPolicy retryPolicy, ThrowingSupplier<T> action)
159+
throws TerminalException;
160+
161+
/**
162+
* Like {@link #run(String, ThrowingRunnable)}, but using a custom retry policy.
163+
*
164+
* <p>When a retry policy is not specified, the {@code run} will be retried using the <a
165+
* href="https://docs.restate.dev/operate/configuration/server">Restate invoker retry policy</a>,
166+
* which by default retries indefinitely.
167+
*
168+
* @see RetryPolicy
169+
*/
170+
default void run(String name, RetryPolicy retryPolicy, ThrowingRunnable runnable)
171+
throws TerminalException {
172+
run(
173+
name,
174+
Serde.VOID,
175+
retryPolicy,
176+
() -> {
177+
runnable.run();
178+
return null;
179+
});
180+
}
181+
182+
/**
183+
* Like {@link #run(Serde, ThrowingSupplier)}, but using a custom retry policy.
184+
*
185+
* <p>When a retry policy is not specified, the {@code run} will be retried using the <a
186+
* href="https://docs.restate.dev/operate/configuration/server">Restate invoker retry policy</a>,
187+
* which by default retries indefinitely.
188+
*
189+
* @see RetryPolicy
190+
*/
191+
default <T> T run(Serde<T> serde, RetryPolicy retryPolicy, ThrowingSupplier<T> action)
192+
throws TerminalException {
193+
return run(null, serde, retryPolicy, action);
194+
}
195+
196+
/**
197+
* Like {@link #run(ThrowingRunnable)}, but using a custom retry policy.
198+
*
199+
* <p>When a retry policy is not specified, the {@code run} will be retried using the <a
200+
* href="https://docs.restate.dev/operate/configuration/server">Restate invoker retry policy</a>,
201+
* which by default retries indefinitely.
202+
*
203+
* @see RetryPolicy
204+
*/
205+
default void run(RetryPolicy retryPolicy, ThrowingRunnable runnable) throws TerminalException {
206+
run(null, retryPolicy, runnable);
207+
}
144208

145209
/** Like {@link #run(String, Serde, ThrowingSupplier)}, but without returning a value. */
146210
default void run(String name, ThrowingRunnable runnable) throws TerminalException {
@@ -160,7 +224,7 @@ default <T> T run(Serde<T> serde, ThrowingSupplier<T> action) throws TerminalExc
160224

161225
/** Like {@link #run(String, ThrowingRunnable)}, but without a name. */
162226
default void run(ThrowingRunnable runnable) throws TerminalException {
163-
run(null, runnable);
227+
run((String) null, runnable);
164228
}
165229

166230
/**

sdk-api/src/main/java/dev/restate/sdk/ContextImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,8 @@ public <T> void send(Target target, Serde<T> inputSerde, T parameter, Duration d
110110
}
111111

112112
@Override
113-
public <T> T run(String name, Serde<T> serde, ThrowingSupplier<T> action) {
113+
public <T> T run(
114+
String name, Serde<T> serde, RetryPolicy retryPolicy, ThrowingSupplier<T> action) {
114115
CompletableFuture<CompletableFuture<ByteBuffer>> enterFut = new CompletableFuture<>();
115116
syscalls.enterSideEffectBlock(
116117
name,
@@ -171,7 +172,7 @@ public void onCancel(@Nullable Throwable t) {
171172
}
172173

173174
if (failure != null) {
174-
syscalls.exitSideEffectBlockWithException(failure, null, exitCallback);
175+
syscalls.exitSideEffectBlockWithException(failure, retryPolicy, exitCallback);
175176
} else {
176177
syscalls.exitSideEffectBlock(
177178
Util.serializeWrappingException(syscalls, serde, res), exitCallback);

0 commit comments

Comments
 (0)