Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ gradle init --type java-application
Add the runtime dependency [sdk-api](sdk-api) and the annotation processor dependency [sdk-api-gen](sdk-api-gen):

```
annotationProcessor("dev.restate:sdk-api-gen:1.0.0")
implementation("dev.restate:sdk-api:1.0.0")
annotationProcessor("dev.restate:sdk-api-gen:1.2.0")
implementation("dev.restate:sdk-api:1.2.0")
```

### Setup a project (Kotlin)
Expand All @@ -65,8 +65,8 @@ plugins {
Add the runtime dependency [sdk-api-kotlin](sdk-api-kotlin) and the ksp dependency [sdk-api-gen](sdk-api-kotlin-gen):

```
ksp("dev.restate:sdk-api-kotlin-gen:1.0.0")
implementation("dev.restate:sdk-api-kotlin:1.0.0")
ksp("dev.restate:sdk-api-kotlin-gen:1.2.0")
implementation("dev.restate:sdk-api-kotlin:1.2.0")
```

### Implement your first Restate component (Java)
Expand Down Expand Up @@ -98,7 +98,7 @@ public class Greeter {
When using composite types/POJOs for input/output, [Jackson Databind](https://github.com/FasterXML/jackson) will be used. The Jackson dependency is not automatically included, you must add it with [`sdk-serde-jackson`](sdk-serde-jackson):

```
implementation("dev.restate:sdk-serde-jackson:1.0.0")
implementation("dev.restate:sdk-serde-jackson:1.2.0")
```

If you want to store types/POJOs in state, use `JacksonSerdes`:
Expand Down Expand Up @@ -138,7 +138,7 @@ When using composite data types for input/output, [`kotlinx.serialization`](http
To deploy the Restate service as HTTP server, add [`sdk-http-vertx`](sdk-http-vertx) to the dependencies. For example, in Gradle:

```
implementation("dev.restate:sdk-http-vertx:1.0.0")
implementation("dev.restate:sdk-http-vertx:1.2.0")
```

To deploy the service, add the following code to the `main`. For example in Java:
Expand Down Expand Up @@ -172,7 +172,7 @@ gradle run
To deploy the Restate service as Lambda, add [`sdk-lambda`](sdk-lambda) to the dependencies. For example, in Gradle:

```
implementation("dev.restate:sdk-lambda:1.0.0")
implementation("dev.restate:sdk-lambda:1.2.0")
```

Configure the build tool to generate Fat-JARs, which are required by AWS Lambda to correctly load the JAR. For example, using Gradle:
Expand Down Expand Up @@ -299,10 +299,10 @@ This library follows [Semantic Versioning](https://semver.org/).

The compatibility with Restate is described in the following table:

| Restate Server\sdk-java | 1.0 | 1.1 |
|-------------------------|-----|-----|
| 1.0 | ✅ | ✅ |
| 1.1 | ✅ | ✅ |
| Restate Server\sdk-java | 1.0 | 1.1 | 1.2 |
|-------------------------|-----|-----|-----|
| 1.0 | ✅ | ✅ | ❌ |
| 1.1 | ✅ | ✅ | ✅ |

## Contributing

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,77 +130,6 @@ internal class ContextImpl internal constructor(internal val syscalls: Syscalls)
}
}

override suspend fun <T : Any?> runBlock(
serde: Serde<T>,
name: String,
block: suspend () -> T
): T {
val exitResult =
suspendCancellableCoroutine { cont: CancellableContinuation<CompletableDeferred<ByteBuffer>>
->
syscalls.enterSideEffectBlock(
name,
object : EnterSideEffectSyscallCallback {
override fun onSuccess(t: ByteBuffer?) {
val deferred: CompletableDeferred<ByteBuffer> = CompletableDeferred()
deferred.complete(t!!)
cont.resume(deferred)
}

override fun onFailure(t: TerminalException) {
val deferred: CompletableDeferred<ByteBuffer> = CompletableDeferred()
deferred.completeExceptionally(t)
cont.resume(deferred)
}

override fun onCancel(t: Throwable?) {
cont.cancel(t)
}

override fun onNotExecuted() {
cont.resume(CompletableDeferred())
}
})
}

if (exitResult.isCompleted) {
return serde.deserializeWrappingException(syscalls, exitResult.await())!!
}

var actionReturnValue: T? = null
var actionFailure: Throwable? = null
try {
actionReturnValue = block()
} catch (t: Throwable) {
actionFailure = t
}

val exitCallback =
object : ExitSideEffectSyscallCallback {
override fun onSuccess(t: ByteBuffer?) {
exitResult.complete(t!!)
}

override fun onFailure(t: TerminalException) {
exitResult.completeExceptionally(t)
}

override fun onCancel(t: Throwable?) {
exitResult.cancel(CancellationException(message = null, cause = t))
}
}

if (actionFailure != null) {
syscalls.exitSideEffectBlockWithException(actionFailure, null, exitCallback)
} else {
syscalls.exitSideEffectBlock(
serde.serializeWrappingException(syscalls, actionReturnValue), exitCallback)
}

return serde.deserializeWrappingException(syscalls, exitResult.await())
}

@UsePreviewContext
override suspend fun <T : Any?> runBlock(
serde: Serde<T>,
name: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds

/** Retry policy configuration. */
@UsePreviewContext
data class RetryPolicy(
/** Initial retry delay for the first retry attempt. */
val initialDelay: Duration,
Expand Down Expand Up @@ -46,9 +45,7 @@ data class RetryPolicy(
val maxDuration: Duration? = null
) {

@UsePreviewContext
data class Builder
internal constructor(
data class Builder(
var initialDelay: Duration = 100.milliseconds,
var exponentiationFactor: Float = 2.0f,
var maxDelay: Duration? = null,
Expand All @@ -65,7 +62,6 @@ data class RetryPolicy(
}
}

@UsePreviewContext
fun retryPolicy(init: RetryPolicy.Builder.() -> Unit): RetryPolicy {
val builder = RetryPolicy.Builder()
builder.init()
Expand Down
24 changes: 4 additions & 20 deletions sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ sealed interface Context {
* observability tools.
*
* <p>The closure should tolerate retries, that is Restate might re-execute the closure multiple
* times until it records a result.
* times until it records a result. To control and limit the amount of retries, pass a
* [RetryPolicy] to this function.
*
* <h2>Error handling</h2>
*
Expand Down Expand Up @@ -147,16 +148,6 @@ sealed interface Context {
* @param T type of the return value.
* @return value of the runBlock operation.
*/
suspend fun <T : Any?> runBlock(serde: Serde<T>, name: String = "", block: suspend () -> T): T

/**
* Like [runBlock], but using a custom retry policy.
*
* When a retry policy is not specified, the `runBlock` will be retried using the
* [Restate invoker retry policy](https://docs.restate.dev/operate/configuration/server), which by
* default retries indefinitely.
*/
@UsePreviewContext
suspend fun <T : Any?> runBlock(
serde: Serde<T>,
name: String = "",
Expand Down Expand Up @@ -207,7 +198,8 @@ sealed interface Context {
* want to perform <b>non-deterministic operations</b>.
*
* <p>The closure should tolerate retries, that is Restate might re-execute the closure multiple
* times until it records a result.
* times until it records a result. To control and limit the amount of retries, pass a [RetryPolicy]
* to this function.
*
* <h2>Error handling</h2>
*
Expand Down Expand Up @@ -241,14 +233,6 @@ sealed interface Context {
* @param T type of the return value.
* @return value of the runBlock operation.
*/
suspend inline fun <reified T : Any> Context.runBlock(
name: String = "",
noinline block: suspend () -> T
): T {
return this.runBlock(KtSerdes.json(), name, block)
}

@UsePreviewContext
suspend inline fun <reified T : Any> Context.runBlock(
name: String = "",
retryPolicy: RetryPolicy? = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ class SideEffectTest : SideEffectTestSuite() {
ctx.runBlock(name) { throw IllegalStateException(reason) }
}

@OptIn(UsePreviewContext::class)
override fun failingSideEffectWithRetryPolicy(
reason: String,
retryPolicy: dev.restate.sdk.common.RetryPolicy?
Expand Down
70 changes: 67 additions & 3 deletions sdk-api/src/main/java/dev/restate/sdk/Context.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ default void sleep(Duration duration) {
* the observability tools.
*
* <p>The closure should tolerate retries, that is Restate might re-execute the closure multiple
* times until it records a result.
* times until it records a result. You can control and limit the amount of retries using {@link
* #run(String, Serde, RetryPolicy, ThrowingSupplier)}.
*
* <p><b>Error handling</b>: Errors occurring within this closure won't be propagated to the
* caller, unless they are {@link TerminalException}. Consider the following code:
Expand Down Expand Up @@ -140,7 +141,70 @@ default void sleep(Duration duration) {
* @param <T> type of the return value.
* @return value of the run operation.
*/
<T> T run(String name, Serde<T> serde, ThrowingSupplier<T> action) throws TerminalException;
default <T> T run(String name, Serde<T> serde, ThrowingSupplier<T> action)
throws TerminalException {
return run(name, serde, null, action);
}

/**
* Like {@link #run(String, Serde, ThrowingSupplier)}, but using a custom retry policy.
*
* <p>When a retry policy is not specified, the {@code run} will be retried using the <a
* href="https://docs.restate.dev/operate/configuration/server">Restate invoker retry policy</a>,
* which by default retries indefinitely.
*
* @see RetryPolicy
*/
<T> T run(String name, Serde<T> serde, RetryPolicy retryPolicy, ThrowingSupplier<T> action)
throws TerminalException;

/**
* Like {@link #run(String, ThrowingRunnable)}, but using a custom retry policy.
*
* <p>When a retry policy is not specified, the {@code run} will be retried using the <a
* href="https://docs.restate.dev/operate/configuration/server">Restate invoker retry policy</a>,
* which by default retries indefinitely.
*
* @see RetryPolicy
*/
default void run(String name, RetryPolicy retryPolicy, ThrowingRunnable runnable)
throws TerminalException {
run(
name,
Serde.VOID,
retryPolicy,
() -> {
runnable.run();
return null;
});
}

/**
* Like {@link #run(Serde, ThrowingSupplier)}, but using a custom retry policy.
*
* <p>When a retry policy is not specified, the {@code run} will be retried using the <a
* href="https://docs.restate.dev/operate/configuration/server">Restate invoker retry policy</a>,
* which by default retries indefinitely.
*
* @see RetryPolicy
*/
default <T> T run(Serde<T> serde, RetryPolicy retryPolicy, ThrowingSupplier<T> action)
throws TerminalException {
return run(null, serde, retryPolicy, action);
}

/**
* Like {@link #run(ThrowingRunnable)}, but using a custom retry policy.
*
* <p>When a retry policy is not specified, the {@code run} will be retried using the <a
* href="https://docs.restate.dev/operate/configuration/server">Restate invoker retry policy</a>,
* which by default retries indefinitely.
*
* @see RetryPolicy
*/
default void run(RetryPolicy retryPolicy, ThrowingRunnable runnable) throws TerminalException {
run(null, retryPolicy, runnable);
}

/** Like {@link #run(String, Serde, ThrowingSupplier)}, but without returning a value. */
default void run(String name, ThrowingRunnable runnable) throws TerminalException {
Expand All @@ -160,7 +224,7 @@ default <T> T run(Serde<T> serde, ThrowingSupplier<T> action) throws TerminalExc

/** Like {@link #run(String, ThrowingRunnable)}, but without a name. */
default void run(ThrowingRunnable runnable) throws TerminalException {
run(null, runnable);
run((String) null, runnable);
}

/**
Expand Down
5 changes: 3 additions & 2 deletions sdk-api/src/main/java/dev/restate/sdk/ContextImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ public <T> void send(Target target, Serde<T> inputSerde, T parameter, Duration d
}

@Override
public <T> T run(String name, Serde<T> serde, ThrowingSupplier<T> action) {
public <T> T run(
String name, Serde<T> serde, RetryPolicy retryPolicy, ThrowingSupplier<T> action) {
CompletableFuture<CompletableFuture<ByteBuffer>> enterFut = new CompletableFuture<>();
syscalls.enterSideEffectBlock(
name,
Expand Down Expand Up @@ -171,7 +172,7 @@ public void onCancel(@Nullable Throwable t) {
}

if (failure != null) {
syscalls.exitSideEffectBlockWithException(failure, null, exitCallback);
syscalls.exitSideEffectBlockWithException(failure, retryPolicy, exitCallback);
} else {
syscalls.exitSideEffectBlock(
Util.serializeWrappingException(syscalls, serde, res), exitCallback);
Expand Down
Loading
Loading