diff --git a/README.md b/README.md index 2d146fd7..4ce0a0a6 100644 --- a/README.md +++ b/README.md @@ -42,8 +42,8 @@ gradle init --type java-application Add the runtime dependency [sdk-api](sdk-api) and the annotation processor dependency [sdk-api-gen](sdk-api-gen): ``` -annotationProcessor("dev.restate:sdk-api-gen:1.0.0") -implementation("dev.restate:sdk-api:1.0.0") +annotationProcessor("dev.restate:sdk-api-gen:1.2.0") +implementation("dev.restate:sdk-api:1.2.0") ``` ### Setup a project (Kotlin) @@ -65,8 +65,8 @@ plugins { Add the runtime dependency [sdk-api-kotlin](sdk-api-kotlin) and the ksp dependency [sdk-api-gen](sdk-api-kotlin-gen): ``` -ksp("dev.restate:sdk-api-kotlin-gen:1.0.0") -implementation("dev.restate:sdk-api-kotlin:1.0.0") +ksp("dev.restate:sdk-api-kotlin-gen:1.2.0") +implementation("dev.restate:sdk-api-kotlin:1.2.0") ``` ### Implement your first Restate component (Java) @@ -98,7 +98,7 @@ public class Greeter { When using composite types/POJOs for input/output, [Jackson Databind](https://github.com/FasterXML/jackson) will be used. The Jackson dependency is not automatically included, you must add it with [`sdk-serde-jackson`](sdk-serde-jackson): ``` -implementation("dev.restate:sdk-serde-jackson:1.0.0") +implementation("dev.restate:sdk-serde-jackson:1.2.0") ``` If you want to store types/POJOs in state, use `JacksonSerdes`: @@ -138,7 +138,7 @@ When using composite data types for input/output, [`kotlinx.serialization`](http To deploy the Restate service as HTTP server, add [`sdk-http-vertx`](sdk-http-vertx) to the dependencies. For example, in Gradle: ``` -implementation("dev.restate:sdk-http-vertx:1.0.0") +implementation("dev.restate:sdk-http-vertx:1.2.0") ``` To deploy the service, add the following code to the `main`. For example in Java: @@ -172,7 +172,7 @@ gradle run To deploy the Restate service as Lambda, add [`sdk-lambda`](sdk-lambda) to the dependencies. For example, in Gradle: ``` -implementation("dev.restate:sdk-lambda:1.0.0") +implementation("dev.restate:sdk-lambda:1.2.0") ``` Configure the build tool to generate Fat-JARs, which are required by AWS Lambda to correctly load the JAR. For example, using Gradle: @@ -299,10 +299,10 @@ This library follows [Semantic Versioning](https://semver.org/). The compatibility with Restate is described in the following table: -| Restate Server\sdk-java | 1.0 | 1.1 | -|-------------------------|-----|-----| -| 1.0 | ✅ | ✅ | -| 1.1 | ✅ | ✅ | +| Restate Server\sdk-java | 1.0 | 1.1 | 1.2 | +|-------------------------|-----|-----|-----| +| 1.0 | ✅ | ✅ | ❌ | +| 1.1 | ✅ | ✅ | ✅ | ## Contributing diff --git a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ContextImpl.kt b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ContextImpl.kt index 4dcc4159..84d519f1 100644 --- a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ContextImpl.kt +++ b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ContextImpl.kt @@ -130,77 +130,6 @@ internal class ContextImpl internal constructor(internal val syscalls: Syscalls) } } - override suspend fun runBlock( - serde: Serde, - name: String, - block: suspend () -> T - ): T { - val exitResult = - suspendCancellableCoroutine { cont: CancellableContinuation> - -> - syscalls.enterSideEffectBlock( - name, - object : EnterSideEffectSyscallCallback { - override fun onSuccess(t: ByteBuffer?) { - val deferred: CompletableDeferred = CompletableDeferred() - deferred.complete(t!!) - cont.resume(deferred) - } - - override fun onFailure(t: TerminalException) { - val deferred: CompletableDeferred = CompletableDeferred() - deferred.completeExceptionally(t) - cont.resume(deferred) - } - - override fun onCancel(t: Throwable?) { - cont.cancel(t) - } - - override fun onNotExecuted() { - cont.resume(CompletableDeferred()) - } - }) - } - - if (exitResult.isCompleted) { - return serde.deserializeWrappingException(syscalls, exitResult.await())!! - } - - var actionReturnValue: T? = null - var actionFailure: Throwable? = null - try { - actionReturnValue = block() - } catch (t: Throwable) { - actionFailure = t - } - - val exitCallback = - object : ExitSideEffectSyscallCallback { - override fun onSuccess(t: ByteBuffer?) { - exitResult.complete(t!!) - } - - override fun onFailure(t: TerminalException) { - exitResult.completeExceptionally(t) - } - - override fun onCancel(t: Throwable?) { - exitResult.cancel(CancellationException(message = null, cause = t)) - } - } - - if (actionFailure != null) { - syscalls.exitSideEffectBlockWithException(actionFailure, null, exitCallback) - } else { - syscalls.exitSideEffectBlock( - serde.serializeWrappingException(syscalls, actionReturnValue), exitCallback) - } - - return serde.deserializeWrappingException(syscalls, exitResult.await()) - } - - @UsePreviewContext override suspend fun runBlock( serde: Serde, name: String, diff --git a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/RetryPolicy.kt b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/RetryPolicy.kt index 4d0595c8..b74b236a 100644 --- a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/RetryPolicy.kt +++ b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/RetryPolicy.kt @@ -12,7 +12,6 @@ import kotlin.time.Duration import kotlin.time.Duration.Companion.milliseconds /** Retry policy configuration. */ -@UsePreviewContext data class RetryPolicy( /** Initial retry delay for the first retry attempt. */ val initialDelay: Duration, @@ -46,9 +45,7 @@ data class RetryPolicy( val maxDuration: Duration? = null ) { - @UsePreviewContext - data class Builder - internal constructor( + data class Builder( var initialDelay: Duration = 100.milliseconds, var exponentiationFactor: Float = 2.0f, var maxDelay: Duration? = null, @@ -65,7 +62,6 @@ data class RetryPolicy( } } -@UsePreviewContext fun retryPolicy(init: RetryPolicy.Builder.() -> Unit): RetryPolicy { val builder = RetryPolicy.Builder() builder.init() diff --git a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt index ccbfafa9..2c8364aa 100644 --- a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt +++ b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/api.kt @@ -111,7 +111,8 @@ sealed interface Context { * observability tools. * *

The closure should tolerate retries, that is Restate might re-execute the closure multiple - * times until it records a result. + * times until it records a result. To control and limit the amount of retries, pass a + * [RetryPolicy] to this function. * *

Error handling

* @@ -147,16 +148,6 @@ sealed interface Context { * @param T type of the return value. * @return value of the runBlock operation. */ - suspend fun runBlock(serde: Serde, name: String = "", block: suspend () -> T): T - - /** - * Like [runBlock], but using a custom retry policy. - * - * When a retry policy is not specified, the `runBlock` will be retried using the - * [Restate invoker retry policy](https://docs.restate.dev/operate/configuration/server), which by - * default retries indefinitely. - */ - @UsePreviewContext suspend fun runBlock( serde: Serde, name: String = "", @@ -207,7 +198,8 @@ sealed interface Context { * want to perform non-deterministic operations. * *

The closure should tolerate retries, that is Restate might re-execute the closure multiple - * times until it records a result. + * times until it records a result. To control and limit the amount of retries, pass a [RetryPolicy] + * to this function. * *

Error handling

* @@ -241,14 +233,6 @@ sealed interface Context { * @param T type of the return value. * @return value of the runBlock operation. */ -suspend inline fun Context.runBlock( - name: String = "", - noinline block: suspend () -> T -): T { - return this.runBlock(KtSerdes.json(), name, block) -} - -@UsePreviewContext suspend inline fun Context.runBlock( name: String = "", retryPolicy: RetryPolicy? = null, diff --git a/sdk-api-kotlin/src/test/kotlin/dev/restate/sdk/kotlin/SideEffectTest.kt b/sdk-api-kotlin/src/test/kotlin/dev/restate/sdk/kotlin/SideEffectTest.kt index d7923acd..27a2eae0 100644 --- a/sdk-api-kotlin/src/test/kotlin/dev/restate/sdk/kotlin/SideEffectTest.kt +++ b/sdk-api-kotlin/src/test/kotlin/dev/restate/sdk/kotlin/SideEffectTest.kt @@ -77,7 +77,6 @@ class SideEffectTest : SideEffectTestSuite() { ctx.runBlock(name) { throw IllegalStateException(reason) } } - @OptIn(UsePreviewContext::class) override fun failingSideEffectWithRetryPolicy( reason: String, retryPolicy: dev.restate.sdk.common.RetryPolicy? diff --git a/sdk-api/src/main/java/dev/restate/sdk/Context.java b/sdk-api/src/main/java/dev/restate/sdk/Context.java index 84c91761..6161aaea 100644 --- a/sdk-api/src/main/java/dev/restate/sdk/Context.java +++ b/sdk-api/src/main/java/dev/restate/sdk/Context.java @@ -104,7 +104,8 @@ default void sleep(Duration duration) { * the observability tools. * *

The closure should tolerate retries, that is Restate might re-execute the closure multiple - * times until it records a result. + * times until it records a result. You can control and limit the amount of retries using {@link + * #run(String, Serde, RetryPolicy, ThrowingSupplier)}. * *

Error handling: Errors occurring within this closure won't be propagated to the * caller, unless they are {@link TerminalException}. Consider the following code: @@ -140,7 +141,70 @@ default void sleep(Duration duration) { * @param type of the return value. * @return value of the run operation. */ - T run(String name, Serde serde, ThrowingSupplier action) throws TerminalException; + default T run(String name, Serde serde, ThrowingSupplier action) + throws TerminalException { + return run(name, serde, null, action); + } + + /** + * Like {@link #run(String, Serde, ThrowingSupplier)}, but using a custom retry policy. + * + *

When a retry policy is not specified, the {@code run} will be retried using the Restate invoker retry policy, + * which by default retries indefinitely. + * + * @see RetryPolicy + */ + T run(String name, Serde serde, RetryPolicy retryPolicy, ThrowingSupplier action) + throws TerminalException; + + /** + * Like {@link #run(String, ThrowingRunnable)}, but using a custom retry policy. + * + *

When a retry policy is not specified, the {@code run} will be retried using the Restate invoker retry policy, + * which by default retries indefinitely. + * + * @see RetryPolicy + */ + default void run(String name, RetryPolicy retryPolicy, ThrowingRunnable runnable) + throws TerminalException { + run( + name, + Serde.VOID, + retryPolicy, + () -> { + runnable.run(); + return null; + }); + } + + /** + * Like {@link #run(Serde, ThrowingSupplier)}, but using a custom retry policy. + * + *

When a retry policy is not specified, the {@code run} will be retried using the Restate invoker retry policy, + * which by default retries indefinitely. + * + * @see RetryPolicy + */ + default T run(Serde serde, RetryPolicy retryPolicy, ThrowingSupplier action) + throws TerminalException { + return run(null, serde, retryPolicy, action); + } + + /** + * Like {@link #run(ThrowingRunnable)}, but using a custom retry policy. + * + *

When a retry policy is not specified, the {@code run} will be retried using the Restate invoker retry policy, + * which by default retries indefinitely. + * + * @see RetryPolicy + */ + default void run(RetryPolicy retryPolicy, ThrowingRunnable runnable) throws TerminalException { + run(null, retryPolicy, runnable); + } /** Like {@link #run(String, Serde, ThrowingSupplier)}, but without returning a value. */ default void run(String name, ThrowingRunnable runnable) throws TerminalException { @@ -160,7 +224,7 @@ default T run(Serde serde, ThrowingSupplier action) throws TerminalExc /** Like {@link #run(String, ThrowingRunnable)}, but without a name. */ default void run(ThrowingRunnable runnable) throws TerminalException { - run(null, runnable); + run((String) null, runnable); } /** diff --git a/sdk-api/src/main/java/dev/restate/sdk/ContextImpl.java b/sdk-api/src/main/java/dev/restate/sdk/ContextImpl.java index db6a9f5e..3fb4ee16 100644 --- a/sdk-api/src/main/java/dev/restate/sdk/ContextImpl.java +++ b/sdk-api/src/main/java/dev/restate/sdk/ContextImpl.java @@ -110,7 +110,8 @@ public void send(Target target, Serde inputSerde, T parameter, Duration d } @Override - public T run(String name, Serde serde, ThrowingSupplier action) { + public T run( + String name, Serde serde, RetryPolicy retryPolicy, ThrowingSupplier action) { CompletableFuture> enterFut = new CompletableFuture<>(); syscalls.enterSideEffectBlock( name, @@ -171,7 +172,7 @@ public void onCancel(@Nullable Throwable t) { } if (failure != null) { - syscalls.exitSideEffectBlockWithException(failure, null, exitCallback); + syscalls.exitSideEffectBlockWithException(failure, retryPolicy, exitCallback); } else { syscalls.exitSideEffectBlock( Util.serializeWrappingException(syscalls, serde, res), exitCallback); diff --git a/sdk-api/src/main/java/dev/restate/sdk/PreviewContext.java b/sdk-api/src/main/java/dev/restate/sdk/PreviewContext.java index 017210bf..755aa9dc 100644 --- a/sdk-api/src/main/java/dev/restate/sdk/PreviewContext.java +++ b/sdk-api/src/main/java/dev/restate/sdk/PreviewContext.java @@ -13,12 +13,6 @@ import dev.restate.sdk.common.TerminalException; import dev.restate.sdk.common.function.ThrowingRunnable; import dev.restate.sdk.common.function.ThrowingSupplier; -import dev.restate.sdk.common.syscalls.EnterSideEffectSyscallCallback; -import dev.restate.sdk.common.syscalls.ExitSideEffectSyscallCallback; -import dev.restate.sdk.common.syscalls.Syscalls; -import java.nio.ByteBuffer; -import java.util.concurrent.CompletableFuture; -import org.jspecify.annotations.Nullable; /** * Preview of new context features. Please note that the methods in this class may break between @@ -30,128 +24,41 @@ public class PreviewContext { /** - * Like {@link Context#run(String, Serde, ThrowingSupplier)}, but using a custom retry policy. - * - *

When a retry policy is not specified, the {@code run} will be retried using the Restate invoker retry policy, - * which by default retries indefinitely. + * @deprecated Use {@link Context#run(String, Serde, RetryPolicy, ThrowingSupplier)} */ + @Deprecated(since = "1.2", forRemoval = true) public static T run( Context ctx, String name, Serde serde, RetryPolicy retryPolicy, ThrowingSupplier action) throws TerminalException { - Syscalls syscalls = ((ContextImpl) ctx).syscalls; - CompletableFuture> enterFut = new CompletableFuture<>(); - syscalls.enterSideEffectBlock( - name, - new EnterSideEffectSyscallCallback() { - @Override - public void onNotExecuted() { - enterFut.complete(new CompletableFuture<>()); - } - - @Override - public void onSuccess(ByteBuffer result) { - enterFut.complete(CompletableFuture.completedFuture(result)); - } - - @Override - public void onFailure(TerminalException t) { - enterFut.complete(CompletableFuture.failedFuture(t)); - } - - @Override - public void onCancel(Throwable t) { - enterFut.cancel(true); - } - }); - - // If a failure was stored, it's simply thrown here - CompletableFuture exitFut = Util.awaitCompletableFuture(enterFut); - if (exitFut.isDone()) { - // We already have a result, we don't need to execute the action - return Util.deserializeWrappingException( - syscalls, serde, Util.awaitCompletableFuture(exitFut)); - } - - ExitSideEffectSyscallCallback exitCallback = - new ExitSideEffectSyscallCallback() { - @Override - public void onSuccess(ByteBuffer result) { - exitFut.complete(result); - } - - @Override - public void onFailure(TerminalException t) { - exitFut.completeExceptionally(t); - } - - @Override - public void onCancel(@Nullable Throwable t) { - exitFut.cancel(true); - } - }; - - T res = null; - Throwable failure = null; - try { - res = action.get(); - } catch (Throwable e) { - failure = e; - } - - if (failure != null) { - syscalls.exitSideEffectBlockWithException(failure, retryPolicy, exitCallback); - } else { - syscalls.exitSideEffectBlock( - Util.serializeWrappingException(syscalls, serde, res), exitCallback); - } - - return Util.deserializeWrappingException(syscalls, serde, Util.awaitCompletableFuture(exitFut)); + return ctx.run(name, serde, retryPolicy, action); } /** - * Like {@link Context#run(String, ThrowingRunnable)}, but using a custom retry policy. - * - *

When a retry policy is not specified, the {@code run} will be retried using the Restate invoker retry policy, - * which by default retries indefinitely. + * @deprecated Use {@link Context#run(String, RetryPolicy, ThrowingRunnable)} */ + @Deprecated(since = "1.2", forRemoval = true) public static void run( Context ctx, String name, RetryPolicy retryPolicy, ThrowingRunnable runnable) throws TerminalException { - run( - ctx, - name, - Serde.VOID, - retryPolicy, - () -> { - runnable.run(); - return null; - }); + ctx.run(name, retryPolicy, runnable); } /** - * Like {@link Context#run(Serde, ThrowingSupplier)}, but using a custom retry policy. - * - *

When a retry policy is not specified, the {@code run} will be retried using the Restate invoker retry policy, - * which by default retries indefinitely. + * @deprecated Use {@link Context#run(Serde, RetryPolicy, ThrowingSupplier)} */ + @Deprecated(since = "1.2", forRemoval = true) public static T run( Context ctx, Serde serde, RetryPolicy retryPolicy, ThrowingSupplier action) throws TerminalException { - return run(ctx, null, serde, retryPolicy, action); + return ctx.run(serde, retryPolicy, action); } /** - * Like {@link Context#run(ThrowingRunnable)}, but using a custom retry policy. - * - *

When a retry policy is not specified, the {@code run} will be retried using the Restate invoker retry policy, - * which by default retries indefinitely. + * @deprecated Use {@link Context#run(RetryPolicy, ThrowingRunnable)} */ + @Deprecated(since = "1.2", forRemoval = true) public static void run(Context ctx, RetryPolicy retryPolicy, ThrowingRunnable runnable) throws TerminalException { - run(ctx, null, retryPolicy, runnable); + ctx.run(retryPolicy, runnable); } } diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/ServiceProtocol.java b/sdk-core/src/main/java/dev/restate/sdk/core/ServiceProtocol.java index 6aa0033b..241aca53 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/ServiceProtocol.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/ServiceProtocol.java @@ -23,9 +23,9 @@ class ServiceProtocol { static final Protocol.ServiceProtocolVersion MIN_SERVICE_PROTOCOL_VERSION = - Protocol.ServiceProtocolVersion.V1; + Protocol.ServiceProtocolVersion.V2; private static final Protocol.ServiceProtocolVersion MAX_SERVICE_PROTOCOL_VERSION = - Protocol.ServiceProtocolVersion.V1; + Protocol.ServiceProtocolVersion.V2; static final Discovery.ServiceDiscoveryProtocolVersion MIN_SERVICE_DISCOVERY_PROTOCOL_VERSION = Discovery.ServiceDiscoveryProtocolVersion.V1; @@ -56,10 +56,8 @@ static String serviceProtocolVersionToHeaderValue(Protocol.ServiceProtocolVersio } static Protocol.ServiceProtocolVersion maxServiceProtocolVersion( - boolean experimentalContextEnabled) { - return experimentalContextEnabled - ? Protocol.ServiceProtocolVersion.V2 - : Protocol.ServiceProtocolVersion.V1; + boolean ignoredExperimentalContextEnabled) { + return Protocol.ServiceProtocolVersion.V2; } static boolean isSupported( diff --git a/sdk-core/src/test/java/dev/restate/sdk/core/ProtoUtils.java b/sdk-core/src/test/java/dev/restate/sdk/core/ProtoUtils.java index 8390d310..ccca1423 100644 --- a/sdk-core/src/test/java/dev/restate/sdk/core/ProtoUtils.java +++ b/sdk-core/src/test/java/dev/restate/sdk/core/ProtoUtils.java @@ -28,7 +28,8 @@ public class ProtoUtils { public static String serviceProtocolContentTypeHeader() { - return ServiceProtocol.serviceProtocolVersionToHeaderValue(Protocol.ServiceProtocolVersion.V1); + return ServiceProtocol.serviceProtocolVersionToHeaderValue( + ServiceProtocol.MIN_SERVICE_PROTOCOL_VERSION); } public static String serviceProtocolContentTypeHeader(boolean enableContextPreview) { diff --git a/test-services/src/main/kotlin/dev/restate/sdk/testservices/FailingImpl.kt b/test-services/src/main/kotlin/dev/restate/sdk/testservices/FailingImpl.kt index 3d6a4b7f..362812a4 100644 --- a/test-services/src/main/kotlin/dev/restate/sdk/testservices/FailingImpl.kt +++ b/test-services/src/main/kotlin/dev/restate/sdk/testservices/FailingImpl.kt @@ -10,7 +10,6 @@ package dev.restate.sdk.testservices import dev.restate.sdk.common.TerminalException import dev.restate.sdk.kotlin.ObjectContext -import dev.restate.sdk.kotlin.UsePreviewContext import dev.restate.sdk.kotlin.retryPolicy import dev.restate.sdk.kotlin.runBlock import dev.restate.sdk.testservices.contracts.Failing @@ -65,7 +64,6 @@ class FailingImpl : Failing { throw IllegalStateException("Should not be reached.") } - @OptIn(UsePreviewContext::class) override suspend fun sideEffectSucceedsAfterGivenAttempts( context: ObjectContext, minimumAttempts: Int @@ -86,7 +84,6 @@ class FailingImpl : Failing { } } - @OptIn(UsePreviewContext::class) override suspend fun sideEffectFailsAfterGivenAttempts( context: ObjectContext, retryPolicyMaxRetryCount: Int diff --git a/test-services/src/main/kotlin/dev/restate/sdk/testservices/Main.kt b/test-services/src/main/kotlin/dev/restate/sdk/testservices/Main.kt index 8e336b33..67a244ef 100644 --- a/test-services/src/main/kotlin/dev/restate/sdk/testservices/Main.kt +++ b/test-services/src/main/kotlin/dev/restate/sdk/testservices/Main.kt @@ -32,7 +32,7 @@ val KNOWN_SERVICES_FACTORIES: Map Any> = interpreterName(2) to { ObjectInterpreterImpl.getInterpreterDefinition(2) }, ServiceInterpreterHelperDefinitions.SERVICE_NAME to { ServiceInterpreterHelperImpl() }) -val NEEDS_EXPERIMENTAL_CONTEXT: Set = setOf(FailingDefinitions.SERVICE_NAME) +val NEEDS_EXPERIMENTAL_CONTEXT: Set = setOf() fun main(args: Array) { var env = System.getenv("SERVICES")