diff --git a/examples/src/main/java/my/restate/sdk/examples/Counter.java b/examples/src/main/java/my/restate/sdk/examples/Counter.java index ab9225ff..6bba0933 100644 --- a/examples/src/main/java/my/restate/sdk/examples/Counter.java +++ b/examples/src/main/java/my/restate/sdk/examples/Counter.java @@ -38,7 +38,6 @@ public void reset(ObjectContext ctx) { /** Add the given value to the count. */ @Handler public void add(ObjectContext ctx, long request) { - long currentValue = ctx.get(TOTAL).orElse(0L); long newValue = currentValue + request; ctx.sleep(Duration.ofSeconds(120)); diff --git a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/endpoint/endpoint.kt b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/endpoint/endpoint.kt index d87ea1df..c8a50d1a 100644 --- a/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/endpoint/endpoint.kt +++ b/sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/endpoint/endpoint.kt @@ -9,6 +9,11 @@ package dev.restate.sdk.kotlin.endpoint import dev.restate.sdk.endpoint.Endpoint +import dev.restate.sdk.endpoint.definition.HandlerDefinition +import dev.restate.sdk.endpoint.definition.ServiceDefinition +import kotlin.time.Duration +import kotlin.time.toJavaDuration +import kotlin.time.toKotlinDuration /** Endpoint builder function. */ fun endpoint(init: Endpoint.Builder.() -> Unit): Endpoint { @@ -16,3 +21,280 @@ fun endpoint(init: Endpoint.Builder.() -> Unit): Endpoint { builder.init() return builder.build() } + +/** + * Documentation as shown in the UI, Admin REST API, and the generated OpenAPI documentation of this + * service. + */ +var ServiceDefinition.Configurator.documentation: String? + get() { + return this.documentation() + } + set(value) { + this.documentation(value) + } + +/** Service metadata, as propagated in the Admin REST API. */ +var ServiceDefinition.Configurator.metadata: Map? + get() { + return this.metadata() + } + set(value) { + this.metadata(value) + } + +/** + * This timer guards against stalled invocations. Once it expires, Restate triggers a graceful + * termination by asking the invocation to suspend (which preserves intermediate progress). + * + * The [abortTimeout] is used to abort the invocation, in case it doesn't react to the request to + * suspend. + * + * This overrides the default inactivity timeout configured in the restate-server for all + * invocations to this service. + * + * *NOTE:* You can set this field only if you register this service against restate-server >= 1.4, + * otherwise the service discovery will fail. + */ +var ServiceDefinition.Configurator.inactivityTimeout: Duration? + get() { + return this.inactivityTimeout()?.toKotlinDuration() + } + set(value) { + this.inactivityTimeout(value?.toJavaDuration()) + } + +/** + * This timer guards against stalled service/handler invocations that are supposed to terminate. The + * abort timeout is started after the [inactivityTimeout] has expired and the service/handler + * invocation has been asked to gracefully terminate. Once the timer expires, it will abort the + * service/handler invocation. + * + * This timer potentially *interrupts* user code. If the user code needs longer to gracefully + * terminate, then this value needs to be set accordingly. + * + * This overrides the default abort timeout configured in the restate-server for all invocations to + * this service. + * + * *NOTE:* You can set this field only if you register this service against restate-server >= 1.4, + * otherwise the service discovery will fail. + */ +var ServiceDefinition.Configurator.abortTimeout: Duration? + get() { + return this.abortTimeout()?.toKotlinDuration() + } + set(value) { + this.abortTimeout(value?.toJavaDuration()) + } + +/** + * The retention duration of idempotent requests to this service. + * + * *NOTE:* You can set this field only if you register this service against restate-server >= 1.4, + * otherwise the service discovery will fail. + */ +var ServiceDefinition.Configurator.idempotencyRetention: Duration? + get() { + return this.idempotencyRetention()?.toKotlinDuration() + } + set(value) { + this.idempotencyRetention(value?.toJavaDuration()) + } + +/** + * The journal retention. When set, this applies to all requests to all handlers of this service. + * + * In case the request has an idempotency key, the [idempotencyRetention] caps the journal retention + * time. + * + * *NOTE:* You can set this field only if you register this service against restate-server >= 1.4, + * otherwise the service discovery will fail. + * + * @return this + */ +var ServiceDefinition.Configurator.journalRetention: Duration? + get() { + return this.journalRetention()?.toKotlinDuration() + } + set(value) { + this.journalRetention(value?.toJavaDuration()) + } + +/** + * When set to `true`, lazy state will be enabled for all invocations to this service. This is + * relevant only for workflows and virtual objects. + * + * *NOTE:* You can set this field only if you register this service against restate-server >= 1.4, + * otherwise the service discovery will fail. + */ +var ServiceDefinition.Configurator.enableLazyState: Boolean? + get() { + return this.enableLazyState() + } + set(value) { + this.enableLazyState(value) + } + +/** + * When set to `true` this service, with all its handlers, cannot be invoked from the restate-server + * HTTP and Kafka ingress, but only from other services. + * + * *NOTE:* You can set this field only if you register this service against restate-server >= 1.4, + * otherwise the service discovery will fail. + */ +var ServiceDefinition.Configurator.ingressPrivate: Boolean? + get() { + return this.ingressPrivate() + } + set(value) { + this.ingressPrivate(value) + } + +/** + * Set the acceptable content type when ingesting HTTP requests. Wildcards can be used, e.g. + * `application/*` or `*/*`. + */ +var HandlerDefinition.Configurator.acceptContentType: String? + get() { + return this.acceptContentType() + } + set(value) { + this.acceptContentType(value) + } + +/** + * Documentation as shown in the UI, Admin REST API, and the generated OpenAPI documentation of this + * handler. + */ +var HandlerDefinition.Configurator.documentation: String? + get() { + return this.documentation() + } + set(value) { + this.documentation(value) + } + +/** Handler metadata, as propagated in the Admin REST API. */ +var HandlerDefinition.Configurator.metadata: Map? + get() { + return this.metadata() + } + set(value) { + this.metadata(value) + } + +/** + * This timer guards against stalled invocations. Once it expires, Restate triggers a graceful + * termination by asking the invocation to suspend (which preserves intermediate progress). + * + * The [abortTimeout] is used to abort the invocation, in case it doesn't react to the request to + * suspend. + * + * This overrides the inactivity timeout set for the service and the default set in restate-server. + * + * *NOTE:* You can set this field only if you register this service against restate-server >= 1.4, + * otherwise the service discovery will fail. + */ +var HandlerDefinition.Configurator.inactivityTimeout: Duration? + get() { + return this.inactivityTimeout()?.toKotlinDuration() + } + set(value) { + this.inactivityTimeout(value?.toJavaDuration()) + } + +/** + * This timer guards against stalled invocations that are supposed to terminate. The abort timeout + * is started after the [inactivityTimeout] has expired and the invocation has been asked to + * gracefully terminate. Once the timer expires, it will abort the invocation. + * + * This timer potentially *interrupts* user code. If the user code needs longer to gracefully + * terminate, then this value needs to be set accordingly. + * + * This overrides the abort timeout set for the service and the default set in restate-server. + * + * *NOTE:* You can set this field only if you register this service against restate-server >= 1.4, + * otherwise the service discovery will fail. + */ +var HandlerDefinition.Configurator.abortTimeout: Duration? + get() { + return this.abortTimeout()?.toKotlinDuration() + } + set(value) { + this.abortTimeout(value?.toJavaDuration()) + } + +/** + * The retention duration of idempotent requests to this service. + * + * *NOTE:* You can set this field only if you register this service against restate-server >= 1.4, + * otherwise the service discovery will fail. + */ +var HandlerDefinition.Configurator.idempotencyRetention: Duration? + get() { + return this.idempotencyRetention()?.toKotlinDuration() + } + set(value) { + this.idempotencyRetention(value?.toJavaDuration()) + } + +/** + * The retention duration for this workflow handler. + * + * *NOTE:* You can set this field only if you register this service against restate-server >= 1.4, + * otherwise the service discovery will fail. + */ +var HandlerDefinition.Configurator.workflowRetention: Duration? + get() { + return this.workflowRetention()?.toKotlinDuration() + } + set(value) { + this.workflowRetention(value?.toJavaDuration()) + } + +/** + * The journal retention for invocations to this handler. + * + * In case the request has an idempotency key, the [idempotencyRetention] caps the journal retention + * time. + * + * *NOTE:* You can set this field only if you register this service against restate-server >= 1.4, + * otherwise the service discovery will fail. + */ +var HandlerDefinition.Configurator.journalRetention: Duration? + get() { + return this.journalRetention()?.toKotlinDuration() + } + set(value) { + this.journalRetention(value?.toJavaDuration()) + } + +/** + * When set to `true` this handler cannot be invoked from the restate-server HTTP and Kafka ingress, + * but only from other services. + * + * *NOTE:* You can set this field only if you register this service against restate-server >= 1.4, + * otherwise the service discovery will fail. + */ +var HandlerDefinition.Configurator.ingressPrivate: Boolean? + get() { + return this.ingressPrivate() + } + set(value) { + this.ingressPrivate(value) + } + +/** + * When set to `true`, lazy state will be enabled for all invocations to this handler. This is + * relevant only for workflows and virtual objects. + * + * *NOTE:* You can set this field only if you register this service against restate-server >= 1.4, + * otherwise the service discovery will fail. + */ +var HandlerDefinition.Configurator.enableLazyState: Boolean? + get() { + return this.enableLazyState() + } + set(value) { + this.enableLazyState(value) + } diff --git a/sdk-common/src/main/java/dev/restate/sdk/endpoint/Endpoint.java b/sdk-common/src/main/java/dev/restate/sdk/endpoint/Endpoint.java index 81bb10e6..024f77aa 100644 --- a/sdk-common/src/main/java/dev/restate/sdk/endpoint/Endpoint.java +++ b/sdk-common/src/main/java/dev/restate/sdk/endpoint/Endpoint.java @@ -13,6 +13,7 @@ import dev.restate.sdk.endpoint.definition.ServiceDefinitionFactories; import io.opentelemetry.api.OpenTelemetry; import java.util.*; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -46,11 +47,14 @@ public static class Builder { * Add a Restate service to the endpoint. This will automatically discover the generated factory * based on the class name. * + *

If you want to modify some of the service definition options, such as documentation, + * inactivity timeout, and so on, use {@link #bind(Object, Consumer)} instead. + * *

You can also manually instantiate the {@link ServiceDefinition} using {@link * #bind(ServiceDefinition)}. */ public Builder bind(Object service) { - return this.bind(ServiceDefinitionFactories.discover(service).create(service, null)); + return this.bind(service, ignored -> {}); } /** @@ -61,10 +65,56 @@ public Builder bind(Object service) { *

Look at the respective documentations of the HandlerRunner class in the Java or in the * Kotlin module. * + *

If you want to modify some of the service definition options, such as documentation, + * inactivity timeout, and so on, use {@link #bind(Object, HandlerRunner.Options, Consumer)} + * instead. + * * @see #bind(Object) */ public Builder bind(Object service, HandlerRunner.Options options) { - return this.bind(ServiceDefinitionFactories.discover(service).create(service, options)); + return this.bind(service, options, ignored -> {}); + } + + /** + * Same as {@link #bind(Object)} but allows to configure the {@link ServiceDefinition} before + * binding it. + * + *

{@code
+     * Endpoint endpoint = Endpoint
+     *   .builder()
+     *   .bind(
+     *     new Counter(),
+     *     // Configure the service
+     *     s -> s.journalRetention(Duration.ofDays(1))
+     *   )
+     *   .build();
+     * }
+ * + * @see #bind(Object) + * @see ServiceDefinition.Configurator + */ + public Builder bind(Object service, Consumer configurator) { + return this.bind( + ServiceDefinitionFactories.discover(service) + .create(service, null) + .configure(configurator)); + } + + /** + * Same as {@link #bind(Object, HandlerRunner.Options)} but allows to configure the {@link + * ServiceDefinition} before binding it. + * + * @see #bind(Object, HandlerRunner.Options) + * @see ServiceDefinition.Configurator + */ + public Builder bind( + Object service, + HandlerRunner.Options options, + Consumer configurator) { + return this.bind( + ServiceDefinitionFactories.discover(service) + .create(service, options) + .configure(configurator)); } /** Add a manual {@link ServiceDefinition} to the endpoint. */ @@ -153,6 +203,23 @@ public static Builder bind(Object service, HandlerRunner.Options options) { return new Builder().bind(service, options); } + /** + * @see Builder#bind(Object, Consumer) + */ + public static Builder bind(Object object, Consumer configurator) { + return new Builder().bind(object, configurator); + } + + /** + * @see Builder#bind(Object, HandlerRunner.Options, Consumer) + */ + public static Builder bind( + Object service, + HandlerRunner.Options options, + Consumer configurator) { + return new Builder().bind(service, options, configurator); + } + /** * @see Builder#bind(ServiceDefinition) */ diff --git a/sdk-common/src/main/java/dev/restate/sdk/endpoint/definition/HandlerDefinition.java b/sdk-common/src/main/java/dev/restate/sdk/endpoint/definition/HandlerDefinition.java index 82682f45..b83e765f 100644 --- a/sdk-common/src/main/java/dev/restate/sdk/endpoint/definition/HandlerDefinition.java +++ b/sdk-common/src/main/java/dev/restate/sdk/endpoint/definition/HandlerDefinition.java @@ -9,10 +9,15 @@ package dev.restate.sdk.endpoint.definition; import dev.restate.serde.Serde; +import java.time.Duration; import java.util.Collections; +import java.util.HashMap; import java.util.Map; +import java.util.Objects; +import java.util.function.Consumer; import org.jspecify.annotations.Nullable; +/** This class represents a Restate handler. */ public final class HandlerDefinition { private final String name; @@ -23,6 +28,13 @@ public final class HandlerDefinition { private final @Nullable String documentation; private final Map metadata; private final HandlerRunner runner; + private final @Nullable Duration inactivityTimeout; + private final @Nullable Duration abortTimeout; + private final @Nullable Duration idempotencyRetention; + private final @Nullable Duration workflowRetention; + private final @Nullable Duration journalRetention; + private final @Nullable Boolean ingressPrivate; + private final @Nullable Boolean enableLazyState; HandlerDefinition( String name, @@ -32,7 +44,14 @@ public final class HandlerDefinition { Serde responseSerde, @Nullable String documentation, Map metadata, - HandlerRunner runner) { + HandlerRunner runner, + @Nullable Duration inactivityTimeout, + @Nullable Duration abortTimeout, + @Nullable Duration idempotencyRetention, + @Nullable Duration workflowRetention, + @Nullable Duration journalRetention, + @Nullable Boolean ingressPrivate, + @Nullable Boolean enableLazyState) { this.name = name; this.handlerType = handlerType; this.acceptContentType = acceptContentType; @@ -41,32 +60,62 @@ public final class HandlerDefinition { this.documentation = documentation; this.metadata = metadata; this.runner = runner; + this.inactivityTimeout = inactivityTimeout; + this.abortTimeout = abortTimeout; + this.idempotencyRetention = idempotencyRetention; + this.workflowRetention = workflowRetention; + this.journalRetention = journalRetention; + this.ingressPrivate = ingressPrivate; + this.enableLazyState = enableLazyState; } + /** + * @return handler name. + */ public String getName() { return name; } + /** + * @return handler type. + */ public HandlerType getHandlerType() { return handlerType; } + /** + * @return the acceptable content type when ingesting HTTP requests. Wildcards can be used, e.g. + * {@code application / *} or {@code * / *}. + */ public @Nullable String getAcceptContentType() { return acceptContentType; } + /** + * @return request {@link Serde} + */ public Serde getRequestSerde() { return requestSerde; } + /** + * @return response {@link Serde} + */ public Serde getResponseSerde() { return responseSerde; } + /** + * @return handler documentation. When using the annotation processor, this will contain the + * javadoc of the annotated methods. + */ public @Nullable String getDocumentation() { return documentation; } + /** + * @return metadata, as shown in the Admin REST API. + */ public Map getMetadata() { return metadata; } @@ -75,6 +124,63 @@ public HandlerRunner getRunner() { return runner; } + /** + * @return the inactivity timeout applied to this handler. + * @see Configurator#inactivityTimeout(Duration) + */ + public @Nullable Duration getInactivityTimeout() { + return inactivityTimeout; + } + + /** + * @return the abort timeout applied to this handler. + * @see Configurator#abortTimeout(Duration) + */ + public @Nullable Duration getAbortTimeout() { + return abortTimeout; + } + + /** + * @return the idempotency retention applied to this handler. + * @see Configurator#idempotencyRetention(Duration) + */ + public @Nullable Duration getIdempotencyRetention() { + return idempotencyRetention; + } + + /** + * @return the workflow retention applied to this handler. + * @see Configurator#workflowRetention(Duration) + */ + public @Nullable Duration getWorkflowRetention() { + return workflowRetention; + } + + /** + * @return the journal retention applied to this handler. + * @see Configurator#journalRetention(Duration) + */ + public @Nullable Duration getJournalRetention() { + return journalRetention; + } + + /** + * @return true if this handler cannot be invoked from the restate-server HTTP and Kafka ingress, + * but only from other services. + * @see Configurator#ingressPrivate(Boolean) + */ + public @Nullable Boolean getIngressPrivate() { + return ingressPrivate; + } + + /** + * @return true if this handler will use lazy state. + * @see Configurator#enableLazyState(Boolean) + */ + public @Nullable Boolean getEnableLazyState() { + return enableLazyState; + } + public HandlerDefinition withAcceptContentType(String acceptContentType) { return new HandlerDefinition<>( name, @@ -84,7 +190,14 @@ public HandlerDefinition withAcceptContentType(String acceptContentTyp responseSerde, documentation, metadata, - runner); + runner, + inactivityTimeout, + abortTimeout, + idempotencyRetention, + workflowRetention, + journalRetention, + ingressPrivate, + enableLazyState); } public HandlerDefinition withDocumentation(@Nullable String documentation) { @@ -96,7 +209,14 @@ public HandlerDefinition withDocumentation(@Nullable String documentat responseSerde, documentation, metadata, - runner); + runner, + inactivityTimeout, + abortTimeout, + idempotencyRetention, + journalRetention, + workflowRetention, + ingressPrivate, + enableLazyState); } public HandlerDefinition withMetadata(Map metadata) { @@ -108,7 +228,333 @@ public HandlerDefinition withMetadata(Map metadata) { responseSerde, documentation, metadata, - runner); + runner, + inactivityTimeout, + abortTimeout, + idempotencyRetention, + workflowRetention, + journalRetention, + ingressPrivate, + enableLazyState); + } + + /** + * @return a copy of this {@link HandlerDefinition}, configured with the {@link Configurator}. + */ + public HandlerDefinition configure( + Consumer configurator) { + HandlerDefinition.Configurator configuratorObj = + new HandlerDefinition.Configurator( + handlerType, + acceptContentType, + documentation, + metadata, + inactivityTimeout, + abortTimeout, + idempotencyRetention, + workflowRetention, + journalRetention, + ingressPrivate, + enableLazyState); + configurator.accept(configuratorObj); + + return new HandlerDefinition<>( + name, + handlerType, + configuratorObj.acceptContentType, + requestSerde, + responseSerde, + configuratorObj.documentation, + configuratorObj.metadata, + runner, + configuratorObj.inactivityTimeout, + configuratorObj.abortTimeout, + configuratorObj.idempotencyRetention, + configuratorObj.workflowRetention, + configuratorObj.journalRetention, + configuratorObj.ingressPrivate, + configuratorObj.enableLazyState); + } + + /** Configurator for a {@link HandlerDefinition}. */ + public static final class Configurator { + + private final HandlerType handlerType; + private @Nullable String acceptContentType; + private @Nullable String documentation; + private Map metadata; + private @Nullable Duration inactivityTimeout; + private @Nullable Duration abortTimeout; + private @Nullable Duration idempotencyRetention; + private @Nullable Duration workflowRetention; + private @Nullable Duration journalRetention; + private @Nullable Boolean ingressPrivate; + private @Nullable Boolean enableLazyState; + + private Configurator( + HandlerType handlerType, + @Nullable String acceptContentType, + @Nullable String documentation, + Map metadata, + @Nullable Duration inactivityTimeout, + @Nullable Duration abortTimeout, + @Nullable Duration idempotencyRetention, + @Nullable Duration workflowRetention, + @Nullable Duration journalRetention, + @Nullable Boolean ingressPrivate, + @Nullable Boolean enableLazyState) { + this.handlerType = handlerType; + this.acceptContentType = acceptContentType; + this.documentation = documentation; + this.metadata = new HashMap<>(metadata); + this.inactivityTimeout = inactivityTimeout; + this.abortTimeout = abortTimeout; + this.idempotencyRetention = idempotencyRetention; + this.workflowRetention = workflowRetention; + this.journalRetention = journalRetention; + this.ingressPrivate = ingressPrivate; + this.enableLazyState = enableLazyState; + } + + /** + * @return configured accepted content type. + * @see #acceptContentType(String) + */ + public @Nullable String acceptContentType() { + return acceptContentType; + } + + /** + * Set the acceptable content type when ingesting HTTP requests. Wildcards can be used, e.g. + * {@code application / *} or {@code * / *}. + * + * @return this + */ + public Configurator acceptContentType(@Nullable String acceptContentType) { + this.acceptContentType = acceptContentType; + return this; + } + + /** + * @return configured documentation. + * @see #documentation(String) + */ + public @Nullable String documentation() { + return documentation; + } + + /** + * Documentation as shown in the UI, Admin REST API, and the generated OpenAPI documentation of + * this handler. + * + * @return this + */ + public Configurator documentation(@Nullable String documentation) { + this.documentation = documentation; + return this; + } + + /** + * @return configured metadata. + * @see #metadata(Map) + */ + public Map metadata() { + return metadata; + } + + /** + * @see #metadata(Map) + */ + public Configurator addMetadata(String key, String value) { + this.metadata.put(key, value); + return this; + } + + /** + * Handler metadata, as propagated in the Admin REST API. + * + * @return this + */ + public Configurator metadata(Map metadata) { + this.metadata = metadata; + return this; + } + + /** + * @return configured inactivity timeout. + * @see #inactivityTimeout(Duration) + */ + public @Nullable Duration inactivityTimeout() { + return inactivityTimeout; + } + + /** + * This timer guards against stalled invocations. Once it expires, Restate triggers a graceful + * termination by asking the invocation to suspend (which preserves intermediate progress). + * + *

The {@link #abortTimeout(Duration)} is used to abort the invocation, in case it doesn't + * react to the request to suspend. + * + *

This overrides the inactivity timeout set for the service and the default set in + * restate-server. + * + *

NOTE: You can set this field only if you register this service against + * restate-server >= 1.4, otherwise the service discovery will fail. + * + * @return this + */ + public Configurator inactivityTimeout(@Nullable Duration inactivityTimeout) { + this.inactivityTimeout = inactivityTimeout; + return this; + } + + /** + * @return configured abort timeout. + * @see #abortTimeout(Duration) + */ + public @Nullable Duration abortTimeout() { + return abortTimeout; + } + + /** + * This timer guards against stalled invocations that are supposed to terminate. The abort + * timeout is started after the {@link #inactivityTimeout(Duration)} has expired and the + * invocation has been asked to gracefully terminate. Once the timer expires, it will abort the + * invocation. + * + *

This timer potentially interrupts user code. If the user code needs longer to + * gracefully terminate, then this value needs to be set accordingly. + * + *

This overrides the abort timeout set for the service and the default set in + * restate-server. + * + *

NOTE: You can set this field only if you register this service against + * restate-server >= 1.4, otherwise the service discovery will fail. + * + * @return this + */ + public Configurator abortTimeout(@Nullable Duration abortTimeout) { + this.abortTimeout = abortTimeout; + return this; + } + + /** + * @return configured idempotency retention. + * @see #idempotencyRetention(Duration) + */ + public @Nullable Duration idempotencyRetention() { + return idempotencyRetention; + } + + /** + * The retention duration of idempotent requests to this service. + * + *

NOTE: You can set this field only if you register this service against + * restate-server >= 1.4, otherwise the service discovery will fail. + * + * @return this + */ + public Configurator idempotencyRetention(@Nullable Duration idempotencyRetention) { + if (handlerType == HandlerType.WORKFLOW) { + throw new IllegalArgumentException( + "The idempotency retention cannot be set for workflow handlers. Use workflowRetention(Duration) instead"); + } + this.idempotencyRetention = idempotencyRetention; + return this; + } + + /** + * @return configured workflow retention. + * @see #workflowRetention(Duration) + */ + public @Nullable Duration workflowRetention() { + return workflowRetention; + } + + /** + * The retention duration for this workflow handler. + * + *

NOTE: You can set this field only if you register this service against + * restate-server >= 1.4, otherwise the service discovery will fail. + * + * @return this + */ + public Configurator workflowRetention(@Nullable Duration workflowRetention) { + if (handlerType != HandlerType.WORKFLOW) { + throw new IllegalArgumentException( + "Workflow retention can be set only for workflow handlers"); + } + this.workflowRetention = workflowRetention; + return this; + } + + /** + * @return configured journal retention. + * @see #journalRetention(Duration) + */ + public @Nullable Duration journalRetention() { + return journalRetention; + } + + /** + * The journal retention for invocations to this handler. + * + *

In case the request has an idempotency key, the {@link #idempotencyRetention(Duration)} + * caps the journal retention time. + * + *

NOTE: You can set this field only if you register this service against + * restate-server >= 1.4, otherwise the service discovery will fail. + * + * @return this + */ + public Configurator journalRetention(@Nullable Duration journalRetention) { + this.journalRetention = journalRetention; + return this; + } + + /** + * @return configured ingress private. + * @see #ingressPrivate(Boolean) + */ + public @Nullable Boolean ingressPrivate() { + return ingressPrivate; + } + + /** + * When set to {@code true} this handler cannot be invoked from the restate-server HTTP and + * Kafka ingress, but only from other services. + * + *

NOTE: You can set this field only if you register this service against + * restate-server >= 1.4, otherwise the service discovery will fail. + * + * @return this + */ + public Configurator ingressPrivate(@Nullable Boolean ingressPrivate) { + this.ingressPrivate = ingressPrivate; + return this; + } + + /** + * @return configured enable lazy state. + * @see #enableLazyState(Boolean) + */ + public @Nullable Boolean enableLazyState() { + return enableLazyState; + } + + /** + * When set to {@code true}, lazy state will be enabled for all invocations to this handler. + * This is relevant only for workflows and virtual objects. + * + *

NOTE: You can set this field only if you register this service against + * restate-server >= 1.4, otherwise the service discovery will fail. + * + * @return this + */ + public Configurator enableLazyState(@Nullable Boolean enableLazyState) { + this.enableLazyState = enableLazyState; + return this; + } } public static HandlerDefinition of( @@ -125,6 +571,53 @@ public static HandlerDefinition of( responseSerde, null, Collections.emptyMap(), - runner); + runner, + null, + null, + null, + null, + null, + null, + null); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof HandlerDefinition that)) return false; + return Objects.equals(getName(), that.getName()) + && getHandlerType() == that.getHandlerType() + && Objects.equals(getAcceptContentType(), that.getAcceptContentType()) + && Objects.equals(getRequestSerde(), that.getRequestSerde()) + && Objects.equals(getResponseSerde(), that.getResponseSerde()) + && Objects.equals(getDocumentation(), that.getDocumentation()) + && Objects.equals(getMetadata(), that.getMetadata()) + && Objects.equals(getRunner(), that.getRunner()) + && Objects.equals(getInactivityTimeout(), that.getInactivityTimeout()) + && Objects.equals(getAbortTimeout(), that.getAbortTimeout()) + && Objects.equals(getIdempotencyRetention(), that.getIdempotencyRetention()) + && Objects.equals(getWorkflowRetention(), that.getWorkflowRetention()) + && Objects.equals(getJournalRetention(), that.getJournalRetention()) + && Objects.equals(getIngressPrivate(), that.getIngressPrivate()) + && Objects.equals(getEnableLazyState(), that.getEnableLazyState()); + } + + @Override + public int hashCode() { + return Objects.hash( + getName(), + getHandlerType(), + getAcceptContentType(), + getRequestSerde(), + getResponseSerde(), + getDocumentation(), + getMetadata(), + getRunner(), + getInactivityTimeout(), + getAbortTimeout(), + getIdempotencyRetention(), + getWorkflowRetention(), + getJournalRetention(), + getIngressPrivate(), + getEnableLazyState()); } } diff --git a/sdk-common/src/main/java/dev/restate/sdk/endpoint/definition/ServiceDefinition.java b/sdk-common/src/main/java/dev/restate/sdk/endpoint/definition/ServiceDefinition.java index 7f6d4522..8f960e6d 100644 --- a/sdk-common/src/main/java/dev/restate/sdk/endpoint/definition/ServiceDefinition.java +++ b/sdk-common/src/main/java/dev/restate/sdk/endpoint/definition/ServiceDefinition.java @@ -8,11 +8,14 @@ // https://github.com/restatedev/sdk-java/blob/main/LICENSE package dev.restate.sdk.endpoint.definition; +import java.time.Duration; import java.util.*; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; import org.jspecify.annotations.Nullable; +/** This class represents a Restate service. */ public final class ServiceDefinition { private final String serviceName; @@ -20,65 +23,462 @@ public final class ServiceDefinition { private final Map> handlers; private final @Nullable String documentation; private final Map metadata; + private final @Nullable Duration inactivityTimeout; + private final @Nullable Duration abortTimeout; + private final @Nullable Duration idempotencyRetention; + private final @Nullable Duration journalRetention; + private final @Nullable Boolean ingressPrivate; + private final @Nullable Boolean enableLazyState; private ServiceDefinition( String serviceName, ServiceType serviceType, Map> handlers, @Nullable String documentation, - Map metadata) { + Map metadata, + @Nullable Duration inactivityTimeout, + @Nullable Duration abortTimeout, + @Nullable Duration idempotencyRetention, + @Nullable Duration journalRetention, + @Nullable Boolean ingressPrivate, + @Nullable Boolean enableLazyState) { this.serviceName = serviceName; this.serviceType = serviceType; this.handlers = handlers; this.documentation = documentation; this.metadata = metadata; + this.inactivityTimeout = inactivityTimeout; + this.abortTimeout = abortTimeout; + this.idempotencyRetention = idempotencyRetention; + this.journalRetention = journalRetention; + this.ingressPrivate = ingressPrivate; + this.enableLazyState = enableLazyState; } + /** + * @return service name. + */ public String getServiceName() { return serviceName; } + /** + * @return service type. + */ public ServiceType getServiceType() { return serviceType; } + /** + * @return handlers. + */ public Collection> getHandlers() { return handlers.values(); } - public HandlerDefinition getHandler(String name) { + /** + * @return a specific handler. + */ + public @Nullable HandlerDefinition getHandler(String name) { return handlers.get(name); } + /** + * @return service documentation. When using the annotation processor, this will contain the + * javadoc of the annotated service class or interface. + */ public @Nullable String getDocumentation() { return documentation; } + /** + * @return metadata, as shown in the Admin REST API. + */ public Map getMetadata() { return metadata; } + /** + * @return the inactivity timeout applied to all handlers of this service. + * @see Configurator#inactivityTimeout(Duration) + */ + public @Nullable Duration getInactivityTimeout() { + return inactivityTimeout; + } + + /** + * @return the abort timeout applied to all handlers of this service. + * @see Configurator#abortTimeout(Duration) + */ + public @Nullable Duration getAbortTimeout() { + return abortTimeout; + } + + /** + * @return the idempotency retention applied to all handlers of this service. + * @see Configurator#idempotencyRetention(Duration) + */ + public @Nullable Duration getIdempotencyRetention() { + return idempotencyRetention; + } + + /** + * @return the journal retention applied to all handlers of this service. + * @see Configurator#journalRetention(Duration) + */ + public @Nullable Duration getJournalRetention() { + return journalRetention; + } + + /** + * @return true if the service, with all its handlers, cannot be invoked from the restate-server + * HTTP and Kafka ingress, but only from other services. + * @see Configurator#ingressPrivate(Boolean) + */ + public @Nullable Boolean getIngressPrivate() { + return ingressPrivate; + } + + /** + * @return true if the service, with all its handlers, will use lazy state. + * @see Configurator#enableLazyState(Boolean) + */ + public @Nullable Boolean getEnableLazyState() { + return enableLazyState; + } + public ServiceDefinition withDocumentation(@Nullable String documentation) { - return new ServiceDefinition(serviceName, serviceType, handlers, documentation, metadata); + return new ServiceDefinition( + serviceName, + serviceType, + handlers, + documentation, + metadata, + inactivityTimeout, + abortTimeout, + idempotencyRetention, + journalRetention, + ingressPrivate, + enableLazyState); } public ServiceDefinition withMetadata(Map metadata) { - return new ServiceDefinition(serviceName, serviceType, handlers, documentation, metadata); + return new ServiceDefinition( + serviceName, + serviceType, + handlers, + documentation, + metadata, + inactivityTimeout, + abortTimeout, + idempotencyRetention, + journalRetention, + ingressPrivate, + enableLazyState); + } + + /** + * @return a copy of this {@link ServiceDefinition}, configured with the {@link Configurator}. + */ + public ServiceDefinition configure(Consumer configurator) { + Configurator configuratorObj = + new Configurator( + handlers, + documentation, + metadata, + inactivityTimeout, + abortTimeout, + idempotencyRetention, + journalRetention, + ingressPrivate, + enableLazyState); + configurator.accept(configuratorObj); + return new ServiceDefinition( + serviceName, + serviceType, + configuratorObj.handlers, + configuratorObj.documentation, + configuratorObj.metadata, + configuratorObj.inactivityTimeout, + configuratorObj.abortTimeout, + configuratorObj.idempotencyRetention, + configuratorObj.journalRetention, + configuratorObj.ingressPrivate, + configuratorObj.enableLazyState); + } + + /** Configurator for a {@link ServiceDefinition}. */ + public static final class Configurator { + + private Map> handlers; + private @Nullable String documentation; + private Map metadata; + private @Nullable Duration inactivityTimeout; + private @Nullable Duration abortTimeout; + private @Nullable Duration idempotencyRetention; + private @Nullable Duration journalRetention; + private @Nullable Boolean ingressPrivate; + private @Nullable Boolean enableLazyState; + + private Configurator( + Map> handlers, + @Nullable String documentation, + Map metadata, + @Nullable Duration inactivityTimeout, + @Nullable Duration abortTimeout, + @Nullable Duration idempotencyRetention, + @Nullable Duration journalRetention, + @Nullable Boolean ingressPrivate, + @Nullable Boolean enableLazyState) { + this.handlers = new HashMap<>(handlers); + this.documentation = documentation; + this.metadata = new HashMap<>(metadata); + this.inactivityTimeout = inactivityTimeout; + this.abortTimeout = abortTimeout; + this.idempotencyRetention = idempotencyRetention; + this.journalRetention = journalRetention; + this.ingressPrivate = ingressPrivate; + this.enableLazyState = enableLazyState; + } + + /** + * @return configured documentation. + * @see #documentation(String) + */ + public @Nullable String documentation() { + return documentation; + } + + /** + * Documentation as shown in the UI, Admin REST API, and the generated OpenAPI documentation of + * this service. + * + * @return this + */ + public Configurator documentation(@Nullable String documentation) { + this.documentation = documentation; + return this; + } + + /** + * @return configured metadata. + * @see #metadata(Map) + */ + public Map metadata() { + return metadata; + } + + /** + * @see #metadata(Map) + */ + public Configurator addMetadata(String key, String value) { + this.metadata.put(key, value); + return this; + } + + /** + * Service metadata, as propagated in the Admin REST API. + * + * @return this + */ + public Configurator metadata(Map metadata) { + this.metadata = metadata; + return this; + } + + /** + * @return configured inactivity timeout. + * @see #inactivityTimeout(Duration) + */ + public @Nullable Duration inactivityTimeout() { + return inactivityTimeout; + } + + /** + * This timer guards against stalled invocations. Once it expires, Restate triggers a graceful + * termination by asking the invocation to suspend (which preserves intermediate progress). + * + *

The {@link #abortTimeout(Duration)} is used to abort the invocation, in case it doesn't + * react to the request to suspend. + * + *

This overrides the default inactivity timeout configured in the restate-server for all + * invocations to this service. + * + *

NOTE: You can set this field only if you register this service against + * restate-server >= 1.4, otherwise the service discovery will fail. + * + * @return this + */ + public Configurator inactivityTimeout(@Nullable Duration inactivityTimeout) { + this.inactivityTimeout = inactivityTimeout; + return this; + } + + /** + * @return configured abort timeout. + * @see #abortTimeout(Duration) + */ + public @Nullable Duration abortTimeout() { + return abortTimeout; + } + + /** + * This timer guards against stalled service/handler invocations that are supposed to terminate. + * The abort timeout is started after the {@link #inactivityTimeout(Duration)} has expired and + * the service/handler invocation has been asked to gracefully terminate. Once the timer + * expires, it will abort the service/handler invocation. + * + *

This timer potentially interrupts user code. If the user code needs longer to + * gracefully terminate, then this value needs to be set accordingly. + * + *

This overrides the default abort timeout configured in the restate-server for all + * invocations to this service. + * + *

NOTE: You can set this field only if you register this service against + * restate-server >= 1.4, otherwise the service discovery will fail. + * + * @return this + */ + public Configurator abortTimeout(@Nullable Duration abortTimeout) { + this.abortTimeout = abortTimeout; + return this; + } + + /** + * @return configured idempotency retention. + * @see #idempotencyRetention(Duration) + */ + public @Nullable Duration idempotencyRetention() { + return idempotencyRetention; + } + + /** + * The retention duration of idempotent requests to this service. + * + *

NOTE: You can set this field only if you register this service against + * restate-server >= 1.4, otherwise the service discovery will fail. + * + * @return this + */ + public Configurator idempotencyRetention(@Nullable Duration idempotencyRetention) { + this.idempotencyRetention = idempotencyRetention; + return this; + } + + /** + * @return configured journal retention. + * @see #journalRetention(Duration) + */ + public @Nullable Duration journalRetention() { + return journalRetention; + } + + /** + * The journal retention. When set, this applies to all requests to all handlers of this + * service. + * + *

In case the request has an idempotency key, the {@link #idempotencyRetention(Duration)} + * caps the journal retention time. + * + *

NOTE: You can set this field only if you register this service against + * restate-server >= 1.4, otherwise the service discovery will fail. + * + * @return this + */ + public Configurator journalRetention(@Nullable Duration journalRetention) { + this.journalRetention = journalRetention; + return this; + } + + /** + * @return configured ingress private. + * @see #ingressPrivate(Boolean) + */ + public @Nullable Boolean ingressPrivate() { + return ingressPrivate; + } + + /** + * When set to {@code true} this service, with all its handlers, cannot be invoked from the + * restate-server HTTP and Kafka ingress, but only from other services. + * + *

NOTE: You can set this field only if you register this service against + * restate-server >= 1.4, otherwise the service discovery will fail. + * + * @return this + */ + public Configurator ingressPrivate(@Nullable Boolean ingressPrivate) { + this.ingressPrivate = ingressPrivate; + return this; + } + + /** + * @return configured enable lazy state. + * @see #enableLazyState(Boolean) + */ + public @Nullable Boolean enableLazyState() { + return enableLazyState; + } + + /** + * When set to {@code true}, lazy state will be enabled for all invocations to this service. + * This is relevant only for workflows and virtual objects. + * + *

NOTE: You can set this field only if you register this service against + * restate-server >= 1.4, otherwise the service discovery will fail. + * + * @return this + */ + public Configurator enableLazyState(@Nullable Boolean enableLazyState) { + this.enableLazyState = enableLazyState; + return this; + } + + /** + * Configure a specific handler of this service. + * + * @return this + */ + public Configurator configureHandler( + String handlerName, Consumer configurator) { + if (!handlers.containsKey(handlerName)) { + throw new IllegalArgumentException("Handler " + handlerName + " not found"); + } + handlers.computeIfPresent(handlerName, (k, v) -> v.configure(configurator)); + return this; + } } @Override - public boolean equals(Object object) { - if (this == object) return true; - if (object == null || getClass() != object.getClass()) return false; - ServiceDefinition that = (ServiceDefinition) object; - return Objects.equals(serviceName, that.serviceName) - && serviceType == that.serviceType - && Objects.equals(handlers, that.handlers); + public boolean equals(Object o) { + if (!(o instanceof ServiceDefinition that)) return false; + return Objects.equals(getServiceName(), that.getServiceName()) + && getServiceType() == that.getServiceType() + && Objects.equals(getHandlers(), that.getHandlers()) + && Objects.equals(getDocumentation(), that.getDocumentation()) + && Objects.equals(getMetadata(), that.getMetadata()) + && Objects.equals(inactivityTimeout, that.inactivityTimeout) + && Objects.equals(abortTimeout, that.abortTimeout) + && Objects.equals(idempotencyRetention, that.idempotencyRetention) + && Objects.equals(journalRetention, that.journalRetention) + && Objects.equals(ingressPrivate, that.ingressPrivate) + && Objects.equals(enableLazyState, that.enableLazyState); } @Override public int hashCode() { - return Objects.hash(serviceName, serviceType, handlers); + return Objects.hash( + getServiceName(), + getServiceType(), + getHandlers(), + getDocumentation(), + getMetadata(), + inactivityTimeout, + abortTimeout, + idempotencyRetention, + journalRetention, + ingressPrivate, + enableLazyState); } public static ServiceDefinition of( @@ -89,6 +489,12 @@ public static ServiceDefinition of( handlers.stream() .collect(Collectors.toMap(HandlerDefinition::getName, Function.identity())), null, - Collections.emptyMap()); + Collections.emptyMap(), + null, + null, + null, + null, + null, + null); } } diff --git a/sdk-core/build.gradle.kts b/sdk-core/build.gradle.kts index 31991323..b8b99beb 100644 --- a/sdk-core/build.gradle.kts +++ b/sdk-core/build.gradle.kts @@ -84,7 +84,7 @@ jsonSchema2Pojo { targetPackage = "dev.restate.sdk.core.generated.manifest" targetDirectory = generatedJ2SPDir.get().asFile - useLongIntegers = false + useLongIntegers = true includeSetters = true includeGetters = true generateBuilders = true diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/DiscoveryProtocol.java b/sdk-core/src/main/java/dev/restate/sdk/core/DiscoveryProtocol.java index c894123d..ab5fc4b5 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/DiscoveryProtocol.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/DiscoveryProtocol.java @@ -19,12 +19,15 @@ import dev.restate.sdk.core.generated.manifest.Service; import java.util.Objects; import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; class DiscoveryProtocol { static final Discovery.ServiceDiscoveryProtocolVersion MIN_SERVICE_DISCOVERY_PROTOCOL_VERSION = Discovery.ServiceDiscoveryProtocolVersion.V1; static final Discovery.ServiceDiscoveryProtocolVersion MAX_SERVICE_DISCOVERY_PROTOCOL_VERSION = - Discovery.ServiceDiscoveryProtocolVersion.V2; + Discovery.ServiceDiscoveryProtocolVersion.V3; static boolean isSupported( Discovery.ServiceDiscoveryProtocolVersion serviceDiscoveryProtocolVersion) { @@ -78,6 +81,9 @@ static Optional parseServiceDiscovery if (versionString.equals("application/vnd.restate.endpointmanifest.v2+json")) { return Optional.of(Discovery.ServiceDiscoveryProtocolVersion.V2); } + if (versionString.equals("application/vnd.restate.endpointmanifest.v3+json")) { + return Optional.of(Discovery.ServiceDiscoveryProtocolVersion.V3); + } return Optional.empty(); } @@ -89,6 +95,9 @@ static String serviceDiscoveryProtocolVersionToHeaderValue( if (Objects.requireNonNull(version) == Discovery.ServiceDiscoveryProtocolVersion.V2) { return "application/vnd.restate.endpointmanifest.v2+json"; } + if (Objects.requireNonNull(version) == Discovery.ServiceDiscoveryProtocolVersion.V3) { + return "application/vnd.restate.endpointmanifest.v3+json"; + } throw new IllegalArgumentException( String.format( "Service discovery protocol version '%s' has no header value", version.getNumber())); @@ -96,13 +105,24 @@ static String serviceDiscoveryProtocolVersionToHeaderValue( static final ObjectMapper MANIFEST_OBJECT_MAPPER = new ObjectMapper(); - @JsonFilter("V2FieldsFilter") - interface V2Mixin {} + static final Set DISCOVERY_FIELDS_ADDED_IN_V2 = Set.of("documentation", "metadata"); + static final Set DISCOVERY_FIELDS_ADDED_IN_V3 = + Set.of( + "inactivityTimeout", + "abortTimeout", + "journalRetention", + "idempotencyRetention", + "workflowCompletionRetention", + "enableLazyState", + "ingressPrivate"); + + @JsonFilter("DiscoveryFieldsFilter") + interface FieldsMixin {} static { // Mixin to add fields filter, used to filter v2 fields - MANIFEST_OBJECT_MAPPER.addMixIn(Service.class, V2Mixin.class); - MANIFEST_OBJECT_MAPPER.addMixIn(Handler.class, V2Mixin.class); + MANIFEST_OBJECT_MAPPER.addMixIn(Service.class, FieldsMixin.class); + MANIFEST_OBJECT_MAPPER.addMixIn(Handler.class, FieldsMixin.class); } static byte[] serializeManifest( @@ -110,13 +130,22 @@ static byte[] serializeManifest( EndpointManifestSchema response) throws ProtocolException { try { - // Don't serialize the documentation and metadata fields for V1! - SimpleBeanPropertyFilter filter = - serviceDiscoveryProtocolVersion == Discovery.ServiceDiscoveryProtocolVersion.V1 - ? SimpleBeanPropertyFilter.serializeAllExcept("documentation", "metadata") - : SimpleBeanPropertyFilter.serializeAll(); + SimpleBeanPropertyFilter filter; + if (serviceDiscoveryProtocolVersion == Discovery.ServiceDiscoveryProtocolVersion.V1) { + filter = + SimpleBeanPropertyFilter.serializeAllExcept( + Stream.concat( + DISCOVERY_FIELDS_ADDED_IN_V2.stream(), + DISCOVERY_FIELDS_ADDED_IN_V3.stream()) + .collect(Collectors.toSet())); + } else if (serviceDiscoveryProtocolVersion == Discovery.ServiceDiscoveryProtocolVersion.V2) { + filter = SimpleBeanPropertyFilter.serializeAllExcept(DISCOVERY_FIELDS_ADDED_IN_V3); + } else { + filter = SimpleBeanPropertyFilter.serializeAll(); + } + return MANIFEST_OBJECT_MAPPER - .writer(new SimpleFilterProvider().addFilter("V2FieldsFilter", filter)) + .writer(new SimpleFilterProvider().addFilter("DiscoveryFieldsFilter", filter)) .writeValueAsBytes(response); } catch (JsonProcessingException e) { throw new ProtocolException( diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/EndpointManifest.java b/sdk-core/src/main/java/dev/restate/sdk/core/EndpointManifest.java index da52546c..f86495ec 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/EndpointManifest.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/EndpointManifest.java @@ -13,12 +13,15 @@ import static dev.restate.sdk.core.statemachine.ServiceProtocol.MIN_SERVICE_PROTOCOL_VERSION; import com.fasterxml.jackson.core.JsonProcessingException; +import dev.restate.sdk.core.generated.discovery.Discovery; import dev.restate.sdk.core.generated.manifest.*; import dev.restate.sdk.endpoint.definition.HandlerDefinition; import dev.restate.sdk.endpoint.definition.HandlerType; import dev.restate.sdk.endpoint.definition.ServiceDefinition; import dev.restate.sdk.endpoint.definition.ServiceType; import dev.restate.serde.Serde; +import java.util.Objects; +import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -35,8 +38,8 @@ final class EndpointManifest { boolean experimentalContextEnabled) { this.manifest = new EndpointManifestSchema() - .withMinProtocolVersion(MIN_SERVICE_PROTOCOL_VERSION.getNumber()) - .withMaxProtocolVersion(MAX_SERVICE_PROTOCOL_VERSION.getNumber()) + .withMinProtocolVersion((long) MIN_SERVICE_PROTOCOL_VERSION.getNumber()) + .withMaxProtocolVersion((long) MAX_SERVICE_PROTOCOL_VERSION.getNumber()) .withProtocolMode(protocolMode) .withServices( components @@ -46,6 +49,24 @@ final class EndpointManifest { .withName(svc.getServiceName()) .withTy(convertServiceType(svc.getServiceType())) .withDocumentation(svc.getDocumentation()) + .withIdempotencyRetention( + svc.getIdempotencyRetention() != null + ? svc.getIdempotencyRetention().toMillis() + : null) + .withJournalRetention( + svc.getJournalRetention() != null + ? svc.getJournalRetention().toMillis() + : null) + .withInactivityTimeout( + svc.getInactivityTimeout() != null + ? svc.getInactivityTimeout().toMillis() + : null) + .withAbortTimeout( + svc.getAbortTimeout() != null + ? svc.getAbortTimeout().toMillis() + : null) + .withEnableLazyState(svc.getEnableLazyState()) + .withIngressPrivate(svc.getIngressPrivate()) .withMetadata( svc.getMetadata().entrySet().stream() .reduce( @@ -65,7 +86,41 @@ final class EndpointManifest { .collect(Collectors.toList())); } - EndpointManifestSchema manifest() { + EndpointManifestSchema manifest(Discovery.ServiceDiscoveryProtocolVersion version) { + // Verify that the user didn't set fields that we don't support in the discovery version we set + for (var service : this.manifest.getServices()) { + if (version.getNumber() < Discovery.ServiceDiscoveryProtocolVersion.V2.getNumber()) { + verifyFieldNotSet( + "metadata", + service, + s -> s.getMetadata() != null && !s.getMetadata().getAdditionalProperties().isEmpty()); + } + if (version.getNumber() < Discovery.ServiceDiscoveryProtocolVersion.V3.getNumber()) { + verifyFieldNull("idempotency retention", service.getIdempotencyRetention()); + verifyFieldNull("journal retention", service.getJournalRetention()); + verifyFieldNull("inactivity timeout", service.getInactivityTimeout()); + verifyFieldNull("abort timeout", service.getAbortTimeout()); + verifyFieldNull("enable lazy state", service.getEnableLazyState()); + verifyFieldNull("ingress private", service.getIngressPrivate()); + } + for (var handler : service.getHandlers()) { + if (version.getNumber() < Discovery.ServiceDiscoveryProtocolVersion.V2.getNumber()) { + verifyFieldNotSet( + "metadata", + handler, + h -> h.getMetadata() != null && !h.getMetadata().getAdditionalProperties().isEmpty()); + } + if (version.getNumber() < Discovery.ServiceDiscoveryProtocolVersion.V3.getNumber()) { + verifyFieldNull("idempotency retention", handler.getIdempotencyRetention()); + verifyFieldNull("journal retention", handler.getJournalRetention()); + verifyFieldNull("inactivity timeout", handler.getInactivityTimeout()); + verifyFieldNull("abort timeout", handler.getAbortTimeout()); + verifyFieldNull("enable lazy state", handler.getEnableLazyState()); + verifyFieldNull("ingress private", handler.getIngressPrivate()); + } + } + } + return this.manifest; } @@ -84,6 +139,24 @@ private static Handler convertHandler(HandlerDefinition handler) { .withInput(convertHandlerInput(handler)) .withOutput(convertHandlerOutput(handler)) .withDocumentation(handler.getDocumentation()) + .withIdempotencyRetention( + handler.getIdempotencyRetention() != null + ? handler.getIdempotencyRetention().toMillis() + : null) + .withWorkflowCompletionRetention( + handler.getWorkflowRetention() != null + ? handler.getWorkflowRetention().toMillis() + : null) + .withJournalRetention( + handler.getJournalRetention() != null ? handler.getJournalRetention().toMillis() : null) + .withInactivityTimeout( + handler.getInactivityTimeout() != null + ? handler.getInactivityTimeout().toMillis() + : null) + .withAbortTimeout( + handler.getAbortTimeout() != null ? handler.getAbortTimeout().toMillis() : null) + .withEnableLazyState(handler.getEnableLazyState()) + .withIngressPrivate(handler.getIngressPrivate()) .withMetadata( handler.getMetadata().entrySet().stream() .reduce( @@ -151,4 +224,19 @@ private static Handler.Ty convertHandlerType(HandlerType handlerType) { case SHARED -> Handler.Ty.SHARED; }; } + + private static void verifyFieldNotSet( + String featureName, T value, Predicate isSetPredicate) { + if (isSetPredicate.test(value)) { + throw new ProtocolException( + "The code uses the new discovery feature '" + + featureName + + "' but the runtime doesn't support it yet. Either remove the usage of this feature, or upgrade the runtime.", + 500); + } + } + + private static void verifyFieldNull(String featureName, T value) { + verifyFieldNotSet(featureName, value, Objects::nonNull); + } } diff --git a/sdk-core/src/main/java/dev/restate/sdk/core/EndpointRequestHandler.java b/sdk-core/src/main/java/dev/restate/sdk/core/EndpointRequestHandler.java index da93af28..3e845422 100644 --- a/sdk-core/src/main/java/dev/restate/sdk/core/EndpointRequestHandler.java +++ b/sdk-core/src/main/java/dev/restate/sdk/core/EndpointRequestHandler.java @@ -195,7 +195,7 @@ StaticResponseRequestProcessor handleDiscoveryRequest(HeadersAccessor headersAcc ProtocolException.UNSUPPORTED_MEDIA_TYPE_CODE); } - EndpointManifestSchema response = this.deploymentManifest.manifest(); + EndpointManifestSchema response = this.deploymentManifest.manifest(version); LOG.info( "Replying to discovery request with services [{}]", response.getServices().stream().map(Service::getName).collect(Collectors.joining(","))); diff --git a/sdk-core/src/main/service-protocol/dev/restate/service/discovery.proto b/sdk-core/src/main/service-protocol/dev/restate/service/discovery.proto index bede0be3..6c30d090 100644 --- a/sdk-core/src/main/service-protocol/dev/restate/service/discovery.proto +++ b/sdk-core/src/main/service-protocol/dev/restate/service/discovery.proto @@ -20,4 +20,6 @@ enum ServiceDiscoveryProtocolVersion { V1 = 1; // add custom metadata and documentation for services/handlers V2 = 2; + // add options for ingress private, journal retention, idempotency retention, workflow completion retention, inactivity timeout, abort timeout + V3 = 3; } diff --git a/sdk-core/src/main/service-protocol/endpoint_manifest_schema.json b/sdk-core/src/main/service-protocol/endpoint_manifest_schema.json index d7620b7c..8c0c60f9 100644 --- a/sdk-core/src/main/service-protocol/endpoint_manifest_schema.json +++ b/sdk-core/src/main/service-protocol/endpoint_manifest_schema.json @@ -3,7 +3,7 @@ "$schema": "https://json-schema.org/draft/2020-12/schema", "type": "object", "title": "Endpoint", - "description": "Restate endpoint manifest v2", + "description": "Restate endpoint manifest v3", "properties": { "protocolMode": { "title": "ProtocolMode", @@ -142,6 +142,39 @@ } } }, + "inactivityTimeout": { + "type": "integer", + "minimum": 1, + "description": "Inactivity timeout duration, expressed in milliseconds." + }, + "abortTimeout": { + "type": "integer", + "minimum": 1, + "description": "Abort timeout duration, expressed in milliseconds." + }, + "journalRetention": { + "type": "integer", + "minimum": 1, + "description": "Journal retention duration, expressed in milliseconds." + }, + "idempotencyRetention": { + "type": "integer", + "minimum": 1, + "description": "Idempotency retention duration, expressed in milliseconds. This is NOT VALID when HandlerType == WORKFLOW" + }, + "workflowCompletionRetention": { + "type": "integer", + "minimum": 1, + "description": "Workflow completion retention duration, expressed in milliseconds. This is valid ONLY when HandlerType == WORKFLOW" + }, + "enableLazyState": { + "type": "boolean", + "description": "If true, lazy state is enabled." + }, + "ingressPrivate": { + "type": "boolean", + "description": "If true, the service cannot be invoked from the HTTP nor Kafka ingress." + }, "metadata": { "type": "object", "description": "Custom metadata of this handler definition. This metadata is shown on the Admin API when querying the service/handler definition.", @@ -156,6 +189,34 @@ "additionalProperties": false } }, + "inactivityTimeout": { + "type": "integer", + "minimum": 1, + "description": "Inactivity timeout duration, expressed in milliseconds." + }, + "abortTimeout": { + "type": "integer", + "minimum": 1, + "description": "Abort timeout duration, expressed in milliseconds." + }, + "journalRetention": { + "type": "integer", + "minimum": 1, + "description": "Journal retention duration, expressed in milliseconds." + }, + "idempotencyRetention": { + "type": "integer", + "minimum": 1, + "description": "Idempotency retention duration, expressed in milliseconds. When ServiceType == WORKFLOW, this option will be applied only to the shared handlers. See workflowCompletionRetention for more details." + }, + "enableLazyState": { + "type": "boolean", + "description": "If true, lazy state is enabled." + }, + "ingressPrivate": { + "type": "boolean", + "description": "If true, the service cannot be invoked from the HTTP nor Kafka ingress." + }, "metadata": { "type": "object", "description": "Custom metadata of this service definition. This metadata is shown on the Admin API when querying the service definition.", diff --git a/sdk-core/src/test/java/dev/restate/sdk/core/AssertUtils.java b/sdk-core/src/test/java/dev/restate/sdk/core/AssertUtils.java index 369298e9..24d74c7a 100644 --- a/sdk-core/src/test/java/dev/restate/sdk/core/AssertUtils.java +++ b/sdk-core/src/test/java/dev/restate/sdk/core/AssertUtils.java @@ -83,12 +83,20 @@ public static EndpointManifestSchemaAssert assertThatDiscovery(Object... service builder.bind(svc); } + return assertThatDiscovery(builder); + } + + public static EndpointManifestSchemaAssert assertThatDiscovery(Endpoint.Builder builder) { + return assertThatDiscovery(builder.build()); + } + + public static EndpointManifestSchemaAssert assertThatDiscovery(Endpoint endpoint) { return new EndpointManifestSchemaAssert( new EndpointManifest( EndpointManifestSchema.ProtocolMode.BIDI_STREAM, - builder.build().getServiceDefinitions(), + endpoint.getServiceDefinitions(), true) - .manifest(), + .manifest(DiscoveryProtocol.MAX_SERVICE_DISCOVERY_PROTOCOL_VERSION), EndpointManifestSchemaAssert.class); } diff --git a/sdk-core/src/test/java/dev/restate/sdk/core/ComponentDiscoveryHandlerTest.java b/sdk-core/src/test/java/dev/restate/sdk/core/ComponentDiscoveryHandlerTest.java index 8e2f5e79..49da929a 100644 --- a/sdk-core/src/test/java/dev/restate/sdk/core/ComponentDiscoveryHandlerTest.java +++ b/sdk-core/src/test/java/dev/restate/sdk/core/ComponentDiscoveryHandlerTest.java @@ -37,7 +37,8 @@ void handleWithMultipleServices() { "greet", HandlerType.EXCLUSIVE, Serde.VOID, Serde.VOID, null)))), false); - EndpointManifestSchema manifest = deploymentManifest.manifest(); + EndpointManifestSchema manifest = + deploymentManifest.manifest(DiscoveryProtocol.MAX_SERVICE_DISCOVERY_PROTOCOL_VERSION); assertThat(manifest.getServices()).extracting(Service::getName).containsOnly("MyGreeter"); assertThat(manifest.getProtocolMode()) diff --git a/sdk-core/src/test/java/dev/restate/sdk/core/javaapi/CodegenDiscoveryTest.java b/sdk-core/src/test/java/dev/restate/sdk/core/javaapi/CodegenDiscoveryTest.java index f0b65b02..e9dc2e2f 100644 --- a/sdk-core/src/test/java/dev/restate/sdk/core/javaapi/CodegenDiscoveryTest.java +++ b/sdk-core/src/test/java/dev/restate/sdk/core/javaapi/CodegenDiscoveryTest.java @@ -16,6 +16,7 @@ import dev.restate.sdk.core.generated.manifest.Input; import dev.restate.sdk.core.generated.manifest.Output; import dev.restate.sdk.core.generated.manifest.Service; +import dev.restate.sdk.endpoint.Endpoint; import org.junit.jupiter.api.Test; public class CodegenDiscoveryTest { @@ -66,4 +67,20 @@ void workflowType() { .extractingHandler("run") .returns(Handler.Ty.WORKFLOW, Handler::getTy); } + + @Test + void usingTransformer() { + assertThatDiscovery( + Endpoint.bind( + new CodegenTest.RawInputOutput(), + sd -> + sd.documentation("My service documentation") + .configureHandler( + "rawInputWithCustomCt", + hd -> hd.documentation("My handler documentation")))) + .extractingService("RawInputOutput") + .returns("My service documentation", Service::getDocumentation) + .extractingHandler("rawInputWithCustomCt") + .returns("My handler documentation", Handler::getDocumentation); + } } diff --git a/sdk-core/src/test/kotlin/dev/restate/sdk/core/kotlinapi/CodegenDiscoveryTest.kt b/sdk-core/src/test/kotlin/dev/restate/sdk/core/kotlinapi/CodegenDiscoveryTest.kt index 50c28720..57138ef6 100644 --- a/sdk-core/src/test/kotlin/dev/restate/sdk/core/kotlinapi/CodegenDiscoveryTest.kt +++ b/sdk-core/src/test/kotlin/dev/restate/sdk/core/kotlinapi/CodegenDiscoveryTest.kt @@ -8,12 +8,12 @@ // https://github.com/restatedev/sdk-java/blob/main/LICENSE package dev.restate.sdk.core.kotlinapi -import dev.restate.sdk.Context import dev.restate.sdk.core.AssertUtils.assertThatDiscovery import dev.restate.sdk.core.generated.manifest.Handler import dev.restate.sdk.core.generated.manifest.Input import dev.restate.sdk.core.generated.manifest.Output import dev.restate.sdk.core.generated.manifest.Service +import dev.restate.sdk.kotlin.endpoint.* import org.assertj.core.api.Assertions import org.assertj.core.api.InstanceOfAssertFactories.type import org.junit.jupiter.api.Test @@ -72,4 +72,21 @@ class CodegenDiscoveryTest { .extractingHandler("run") .returns(Handler.Ty.WORKFLOW) { obj -> obj.ty } } + + @Test + fun usingTransformer() { + assertThatDiscovery( + endpoint { + bind(CodegenTest.RawInputOutput()) { + it.documentation = "My service documentation" + it.configureHandler("rawInputWithCustomCt") { + it.documentation = "My handler documentation" + } + } + }) + .extractingService("RawInputOutput") + .returns("My service documentation", Service::getDocumentation) + .extractingHandler("rawInputWithCustomCt") + .returns("My handler documentation", Handler::getDocumentation) + } }