diff --git a/.gitignore b/.gitignore index b1cc2407b8..53124d257c 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,6 @@ **/*.trace **/flamegraph.svg .sl + +# Claude code +.claude \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 2e9824a6c0..fe9549cfc4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -945,7 +945,6 @@ dependencies = [ "tower", "tower-layer", "tower-service", - "tracing", ] [[package]] @@ -964,7 +963,6 @@ dependencies = [ "sync_wrapper", "tower-layer", "tower-service", - "tracing", ] [[package]] @@ -5513,50 +5511,6 @@ dependencies = [ "web-time", ] -[[package]] -name = "okapi" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a64853d7ab065474e87696f7601cee817d200e86c42e04004e005cb3e20c3c5" -dependencies = [ - "log", - "schemars 0.8.22", - "serde", - "serde_json", -] - -[[package]] -name = "okapi-operation" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62b3ac29c4611a403bd1239d580058541b18984c2e603153f754c49993f0c080" -dependencies = [ - "anyhow", - "axum", - "bytes", - "http 1.4.0", - "indexmap 2.12.0", - "mime", - "okapi", - "okapi-operation-macro", - "paste", - "serde", - "tower", -] - -[[package]] -name = "okapi-operation-macro" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5391918a0c3a742ad2e76c69581e3b142795dc4c3f3c468f37acf92e434e6a2e" -dependencies = [ - "darling 0.20.10", - "proc-macro2", - "quote", - "syn 2.0.111", - "thiserror 2.0.17", -] - [[package]] name = "once_cell" version = "1.21.3" @@ -6929,7 +6883,6 @@ dependencies = [ "itertools 0.14.0", "metrics", "mime_guess", - "okapi-operation", "parking_lot", "prost-dto", "rand 0.9.2", @@ -6948,7 +6901,6 @@ dependencies = [ "restate-wal-protocol", "restate-web-ui", "restate-workspace-hack", - "schemars 0.8.22", "serde", "serde_json", "serde_with", @@ -6960,6 +6912,8 @@ dependencies = [ "tower-http", "tracing", "urlencoding", + "utoipa", + "utoipa-axum", ] [[package]] @@ -6975,11 +6929,11 @@ dependencies = [ "restate-time-util", "restate-types", "restate-workspace-hack", - "schemars 0.8.22", "serde", "serde_json", "serde_with", "strum", + "utoipa", ] [[package]] @@ -7925,6 +7879,7 @@ dependencies = [ "serde", "serde_json", "serde_with", + "utoipa", ] [[package]] @@ -8310,6 +8265,7 @@ dependencies = [ "typed-builder", "typify", "ulid", + "utoipa", "xxhash-rust", ] @@ -8462,7 +8418,6 @@ dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", "axum", - "axum-core", "bilrost", "bitflags 2.10.0", "byteorder", @@ -8551,7 +8506,6 @@ dependencies = [ "rustls 0.23.35", "rustls-pki-types", "rustls-webpki 0.103.8", - "schemars 0.8.22", "semver", "serde", "serde_core", @@ -8576,6 +8530,8 @@ dependencies = [ "tracing-subscriber", "typenum", "ulid", + "utoipa", + "utoipa-gen", "uuid", "xxhash-rust", "zerocopy 0.8.31", @@ -8909,7 +8865,6 @@ dependencies = [ "bytes", "dyn-clone", "enumset", - "indexmap 1.9.3", "schemars_derive", "serde", "serde_json", @@ -10565,6 +10520,43 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" +[[package]] +name = "utoipa" +version = "5.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fcc29c80c21c31608227e0912b2d7fddba57ad76b606890627ba8ee7964e993" +dependencies = [ + "indexmap 2.12.0", + "serde", + "serde_json", + "utoipa-gen", +] + +[[package]] +name = "utoipa-axum" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c25bae5bccc842449ec0c5ddc5cbb6a3a1eaeac4503895dc105a1138f8234a0" +dependencies = [ + "axum", + "paste", + "tower-layer", + "tower-service", + "utoipa", +] + +[[package]] +name = "utoipa-gen" +version = "5.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d79d08d92ab8af4c5e8a6da20c47ae3f61a0f1dabc1997cdf2d082b757ca08b" +dependencies = [ + "proc-macro2", + "quote", + "regex", + "syn 2.0.111", +] + [[package]] name = "uuid" version = "1.19.0" diff --git a/Cargo.toml b/Cargo.toml index f2ed79cabd..8a359e91d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -264,6 +264,8 @@ typify = { version = "0.5.0" } ulid = { version = "1.2.1" } url = { version = "2.5" } urlencoding = { version = "2.1" } +utoipa = { version = "5.4" } +utoipa-axum = "0.2" uuid = { version = "1.19.0", features = ["v7", "serde"] } vergen = { version = "8.0.0", default-features = false } xxhash-rust = { version = "0.8", features = ["xxh3"] } diff --git a/crates/admin-rest-model/Cargo.toml b/crates/admin-rest-model/Cargo.toml index de152a192e..1f75993fe3 100644 --- a/crates/admin-rest-model/Cargo.toml +++ b/crates/admin-rest-model/Cargo.toml @@ -9,7 +9,7 @@ publish = false [features] default = [] -schema = ["dep:schemars", "restate-serde-util/schema", "restate-types/schemars"] +schema = ["dep:utoipa", "restate-serde-util/utoipa-schema", "restate-types/utoipa-schema"] [dependencies] restate-workspace-hack = { workspace = true } @@ -23,7 +23,7 @@ derive_more = { workspace = true } http = { workspace = true } http-serde = { workspace = true } humantime = { workspace = true } -schemars = { workspace = true, optional = true } +utoipa = { workspace = true, optional = true } serde = { workspace = true } serde_json = { workspace = true } serde_with = { workspace = true } diff --git a/crates/admin-rest-model/src/deployments.rs b/crates/admin-rest-model/src/deployments.rs index 0886737fc1..dd9c6e912f 100644 --- a/crates/admin-rest-model/src/deployments.rs +++ b/crates/admin-rest-model/src/deployments.rs @@ -21,23 +21,18 @@ use std::collections::HashMap; // This enum could be a struct with a nested enum to avoid repeating some fields, but serde(flatten) unfortunately breaks the openapi code generation #[serde_as] -#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "schema", derive(utoipa::ToSchema))] #[derive(Debug, Serialize, Deserialize)] #[serde(untagged)] pub enum RegisterDeploymentRequest { - #[cfg_attr( - feature = "schema", - schemars( - title = "RegisterHttpDeploymentRequest", - description = "Register HTTP deployment request" - ) - )] + /// Register HTTP deployment request + #[cfg_attr(feature = "schema", schema(title = "RegisterHttpDeploymentRequest"))] Http { /// # Uri /// /// Uri to use to discover/invoke the http deployment. #[serde_as(as = "serde_with::DisplayFromStr")] - #[cfg_attr(feature = "schema", schemars(with = "String"))] + #[cfg_attr(feature = "schema", schema(value_type = String, format = "uri"))] uri: Uri, /// # Additional headers @@ -80,10 +75,7 @@ pub enum RegisterDeploymentRequest { /// When set to `true`, it implies `breaking = true`. /// /// See the [versioning documentation](https://docs.restate.dev/operate/versioning) for more information. - #[cfg_attr( - feature = "schema", - schemars(default = "restate_serde_util::default::bool::") - )] + #[cfg_attr(feature = "schema", schema(default = true))] force: Option, /// # Dry-run mode @@ -94,13 +86,8 @@ pub enum RegisterDeploymentRequest { #[serde(default = "restate_serde_util::default::bool::")] dry_run: bool, }, - #[cfg_attr( - feature = "schema", - schemars( - title = "RegisterLambdaDeploymentRequest", - description = "Register Lambda deployment request" - ) - )] + /// Register Lambda deployment request + #[cfg_attr(feature = "schema", schema(title = "RegisterLambdaDeploymentRequest"))] Lambda { /// # ARN /// @@ -140,10 +127,7 @@ pub enum RegisterDeploymentRequest { /// This implies `breaking = true`. /// /// See the [versioning documentation](https://docs.restate.dev/operate/versioning) for more information. - #[cfg_attr( - feature = "schema", - schemars(default = "restate_serde_util::default::bool::") - )] + #[cfg_attr(feature = "schema", schema(default = true))] force: Option, /// # Dry-run mode @@ -156,14 +140,14 @@ pub enum RegisterDeploymentRequest { }, } -#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "schema", derive(utoipa::ToSchema))] #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ServiceNameRevPair { pub name: String, pub revision: ServiceRevision, } -#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "schema", derive(utoipa::ToSchema))] #[derive(Debug, Serialize, Deserialize)] pub struct RegisterDeploymentResponse { pub id: DeploymentId, @@ -195,23 +179,19 @@ pub struct RegisterDeploymentResponse { pub info: Vec, } -#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] +/// List of all registered deployments +#[cfg_attr(feature = "schema", derive(utoipa::ToSchema))] #[derive(Debug, Serialize, Deserialize)] pub struct ListDeploymentsResponse { pub deployments: Vec, } -#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "schema", derive(utoipa::ToSchema))] #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(untagged)] pub enum DeploymentResponse { - #[cfg_attr( - feature = "schema", - schemars( - title = "HttpDeploymentResponse", - description = "Deployment response for HTTP deployments" - ) - )] + /// Deployment response for HTTP deployments + #[cfg_attr(feature = "schema", schema(title = "HttpDeploymentResponse"))] Http { /// # Deployment ID id: DeploymentId, @@ -220,7 +200,7 @@ pub enum DeploymentResponse { /// /// URI used to invoke this service deployment. #[serde(with = "serde_with::As::")] - #[cfg_attr(feature = "schema", schemars(with = "String"))] + #[cfg_attr(feature = "schema", schema(value_type = String, format = "uri"))] uri: Uri, /// # Protocol Type @@ -232,7 +212,7 @@ pub enum DeploymentResponse { /// /// HTTP Version used to invoke this service deployment. #[serde(with = "http_serde::version")] - #[cfg_attr(feature = "schema", schemars(with = "String"))] + #[cfg_attr(feature = "schema", schema(value_type = String))] http_version: Version, /// # Additional headers @@ -249,7 +229,7 @@ pub enum DeploymentResponse { metadata: HashMap, #[serde(with = "serde_with::As::")] - #[cfg_attr(feature = "schema", schemars(with = "String"))] + #[cfg_attr(feature = "schema", schema(value_type = String))] created_at: humantime::Timestamp, /// # Minimum Service Protocol version @@ -280,13 +260,8 @@ pub enum DeploymentResponse { #[serde(default, skip_serializing_if = "Vec::is_empty")] info: Vec, }, - #[cfg_attr( - feature = "schema", - schemars( - title = "LambdaDeploymentResponse", - description = "Deployment response for Lambda deployments" - ) - )] + /// Deployment response for Lambda deployments + #[cfg_attr(feature = "schema", schema(title = "LambdaDeploymentResponse"))] Lambda { /// # Deployment ID id: DeploymentId, @@ -321,7 +296,7 @@ pub enum DeploymentResponse { metadata: HashMap, #[serde(with = "serde_with::As::")] - #[cfg_attr(feature = "schema", schemars(with = "String"))] + #[cfg_attr(feature = "schema", schema(value_type = String))] created_at: humantime::Timestamp, /// # Minimum Service Protocol version @@ -362,17 +337,13 @@ impl DeploymentResponse { } } -#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] +/// Detailed information about Restate deployments +#[cfg_attr(feature = "schema", derive(utoipa::ToSchema))] #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(untagged)] pub enum DetailedDeploymentResponse { - #[cfg_attr( - feature = "schema", - schemars( - title = "HttpDetailedDeploymentResponse", - description = "Detailed deployment response for HTTP deployments" - ) - )] + /// Detailed deployment response for HTTP deployments + #[cfg_attr(feature = "schema", schema(title = "HttpDetailedDeploymentResponse"))] Http { /// # Deployment ID id: DeploymentId, @@ -381,7 +352,7 @@ pub enum DetailedDeploymentResponse { /// /// URI used to invoke this service deployment. #[serde(with = "serde_with::As::")] - #[cfg_attr(feature = "schema", schemars(with = "String"))] + #[cfg_attr(feature = "schema", schema(value_type = String, format = "uri"))] uri: Uri, /// # Protocol Type @@ -393,7 +364,7 @@ pub enum DetailedDeploymentResponse { /// /// HTTP Version used to invoke this service deployment. #[serde(with = "http_serde::version")] - #[cfg_attr(feature = "schema", schemars(with = "String"))] + #[cfg_attr(feature = "schema", schema(value_type = String))] http_version: Version, /// # Additional headers @@ -409,7 +380,7 @@ pub enum DetailedDeploymentResponse { metadata: HashMap, #[serde(with = "serde_with::As::")] - #[cfg_attr(feature = "schema", schemars(with = "String"))] + #[cfg_attr(feature = "schema", schema(value_type = String))] created_at: humantime::Timestamp, /// # Minimum Service Protocol version @@ -439,13 +410,8 @@ pub enum DetailedDeploymentResponse { #[serde(default, skip_serializing_if = "Vec::is_empty")] info: Vec, }, - #[cfg_attr( - feature = "schema", - schemars( - title = "LambdaDetailedDeploymentResponse", - description = "Detailed deployment response for Lambda deployments" - ) - )] + /// Detailed deployment response for Lambda deployments + #[cfg_attr(feature = "schema", schema(title = "LambdaDetailedDeploymentResponse"))] Lambda { /// # Deployment ID id: DeploymentId, @@ -480,7 +446,7 @@ pub enum DetailedDeploymentResponse { metadata: HashMap, #[serde(with = "serde_with::As::")] - #[cfg_attr(feature = "schema", schemars(with = "String"))] + #[cfg_attr(feature = "schema", schema(value_type = String))] created_at: humantime::Timestamp, /// # Minimum Service Protocol version @@ -522,17 +488,12 @@ impl DetailedDeploymentResponse { } } -#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "schema", derive(utoipa::ToSchema))] #[derive(Debug, Serialize, Deserialize)] #[serde(untagged)] pub enum UpdateDeploymentRequest { - #[cfg_attr( - feature = "schema", - schemars( - title = "UpdateHttpDeploymentRequest", - description = "Update HTTP deployment request" - ) - )] + /// Update HTTP deployment request + #[cfg_attr(feature = "schema", schema(title = "UpdateHttpDeploymentRequest"))] Http { /// # Uri /// @@ -541,7 +502,7 @@ pub enum UpdateDeploymentRequest { with = "serde_with::As::>", skip_serializing_if = "Option::is_none" )] - #[cfg_attr(feature = "schema", schemars(with = "Option"))] + #[cfg_attr(feature = "schema", schema(value_type = Option, format = "uri"))] uri: Option, /// # Additional headers @@ -572,13 +533,8 @@ pub enum UpdateDeploymentRequest { #[serde(default = "restate_serde_util::default::bool::")] dry_run: bool, }, - #[cfg_attr( - feature = "schema", - schemars( - title = "UpdateLambdaDeploymentRequest", - description = "Update Lambda deployment request" - ) - )] + /// Update Lambda deployment request + #[cfg_attr(feature = "schema", schema(title = "UpdateLambdaDeploymentRequest"))] Lambda { /// # ARN /// diff --git a/crates/admin-rest-model/src/handlers.rs b/crates/admin-rest-model/src/handlers.rs index ebf5866e4c..cd8e277108 100644 --- a/crates/admin-rest-model/src/handlers.rs +++ b/crates/admin-rest-model/src/handlers.rs @@ -12,7 +12,8 @@ use serde::{Deserialize, Serialize}; use restate_types::schema::service::HandlerMetadata; -#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] +/// List of all the handlers of a service +#[cfg_attr(feature = "schema", derive(utoipa::ToSchema))] #[derive(Debug, Serialize, Deserialize)] pub struct ListServiceHandlersResponse { pub handlers: Vec, diff --git a/crates/admin-rest-model/src/invocations.rs b/crates/admin-rest-model/src/invocations.rs index 4578b6efd1..d161623f85 100644 --- a/crates/admin-rest-model/src/invocations.rs +++ b/crates/admin-rest-model/src/invocations.rs @@ -11,8 +11,9 @@ use restate_types::identifiers::InvocationId; use serde::{Deserialize, Serialize}; +/// The invocation was restarted as new. #[derive(Debug, Serialize, Deserialize)] -#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "schema", derive(utoipa::ToSchema))] pub struct RestartAsNewInvocationResponse { /// The invocation id of the new invocation. pub new_invocation_id: InvocationId, diff --git a/crates/admin-rest-model/src/lib.rs b/crates/admin-rest-model/src/lib.rs index 60466ea850..bdf50563b0 100644 --- a/crates/admin-rest-model/src/lib.rs +++ b/crates/admin-rest-model/src/lib.rs @@ -11,6 +11,7 @@ pub mod deployments; pub mod handlers; pub mod invocations; +pub mod query; pub mod services; pub mod subscriptions; pub mod version; diff --git a/crates/admin-rest-model/src/query.rs b/crates/admin-rest-model/src/query.rs new file mode 100644 index 0000000000..e3e48fa694 --- /dev/null +++ b/crates/admin-rest-model/src/query.rs @@ -0,0 +1,20 @@ +// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. +// All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use serde::Deserialize; +use serde_with::serde_as; + +#[serde_as] +#[derive(Debug, Deserialize)] +#[cfg_attr(feature = "schema", derive(utoipa::ToSchema))] +pub struct QueryRequest { + /// SQL query to run against the storage + pub query: String, +} diff --git a/crates/admin-rest-model/src/services.rs b/crates/admin-rest-model/src/services.rs index 9fbe976088..f6cf2b7bf8 100644 --- a/crates/admin-rest-model/src/services.rs +++ b/crates/admin-rest-model/src/services.rs @@ -17,13 +17,14 @@ use serde::{Deserialize, Serialize}; use restate_time_util::FriendlyDuration; use restate_types::schema::service::ServiceMetadata; -#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] +/// List of all registered services. +#[cfg_attr(feature = "schema", derive(utoipa::ToSchema))] #[derive(Debug, Serialize, Deserialize)] pub struct ListServicesResponse { pub services: Vec, } -#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "schema", derive(utoipa::ToSchema))] #[derive(Debug, Serialize, Deserialize)] pub struct ModifyServiceRequest { /// # Public @@ -39,7 +40,6 @@ pub struct ModifyServiceRequest { /// /// Can be configured using the [`jiff::fmt::friendly`](https://docs.rs/jiff/latest/jiff/fmt/friendly/index.html) format or ISO8601, for example `5 hours`. #[serde(default, with = "serde_with::As::>")] - #[cfg_attr(feature = "schema", schemars(with = "Option" /* TODO(slinkydeveloper) https://github.com/restatedev/restate/issues/3766 */))] pub idempotency_retention: Option, /// # Workflow completion retention @@ -48,7 +48,6 @@ pub struct ModifyServiceRequest { /// /// Can be configured using the [`jiff::fmt::friendly`](https://docs.rs/jiff/latest/jiff/fmt/friendly/index.html) format or ISO8601, for example `5 hours`. #[serde(default, with = "serde_with::As::>")] - #[cfg_attr(feature = "schema", schemars(with = "Option" /* TODO(slinkydeveloper) https://github.com/restatedev/restate/issues/3766 */))] pub workflow_completion_retention: Option, /// # Journal retention @@ -60,7 +59,6 @@ pub struct ModifyServiceRequest { /// /// Can be configured using the [`jiff::fmt::friendly`](https://docs.rs/jiff/latest/jiff/fmt/friendly/index.html) format or ISO8601, for example `5 hours`. #[serde(default, with = "serde_with::As::>")] - #[cfg_attr(feature = "schema", schemars(with = "Option" /* TODO(slinkydeveloper) https://github.com/restatedev/restate/issues/3766 */))] pub journal_retention: Option, /// # Inactivity timeout @@ -76,7 +74,6 @@ pub struct ModifyServiceRequest { /// /// This overrides the default inactivity timeout set in invoker options. #[serde(default, with = "serde_with::As::>")] - #[cfg_attr(feature = "schema", schemars(with = "Option" /* TODO(slinkydeveloper) https://github.com/restatedev/restate/issues/3766 */))] pub inactivity_timeout: Option, /// # Abort timeout @@ -93,11 +90,10 @@ pub struct ModifyServiceRequest { /// /// This overrides the default abort timeout set in invoker options. #[serde(default, with = "serde_with::As::>")] - #[cfg_attr(feature = "schema", schemars(with = "Option" /* TODO(slinkydeveloper) https://github.com/restatedev/restate/issues/3766 */))] pub abort_timeout: Option, } -#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "schema", derive(utoipa::ToSchema))] #[derive(Debug, Serialize, Deserialize)] pub struct ModifyServiceStateRequest { /// # Version @@ -114,5 +110,6 @@ pub struct ModifyServiceStateRequest { /// # New State /// /// The new state to replace the previous state with + #[cfg_attr(feature = "schema", schema(value_type = HashMap>))] pub new_state: HashMap, } diff --git a/crates/admin-rest-model/src/subscriptions.rs b/crates/admin-rest-model/src/subscriptions.rs index cb673419b7..4250d51914 100644 --- a/crates/admin-rest-model/src/subscriptions.rs +++ b/crates/admin-rest-model/src/subscriptions.rs @@ -18,7 +18,7 @@ use restate_types::identifiers::SubscriptionId; use restate_types::schema::subscriptions::Subscription; #[serde_as] -#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "schema", derive(utoipa::ToSchema))] #[derive(Debug, Serialize, Deserialize)] pub struct CreateSubscriptionRequest { /// # Source @@ -27,7 +27,7 @@ pub struct CreateSubscriptionRequest { /// /// * `kafka:///`, e.g. `kafka://my-cluster/my-topic` #[serde_as(as = "serde_with::DisplayFromStr")] - #[cfg_attr(feature = "schema", schemars(with = "String"))] + #[cfg_attr(feature = "schema", schema(value_type = String, format = "uri"))] pub source: Uri, /// # Sink /// @@ -35,7 +35,7 @@ pub struct CreateSubscriptionRequest { /// /// * `service:///`, e.g. `service://Counter/count` #[serde_as(as = "serde_with::DisplayFromStr")] - #[cfg_attr(feature = "schema", schemars(with = "String"))] + #[cfg_attr(feature = "schema", schema(value_type = String, format = "uri"))] pub sink: Uri, /// # Options /// @@ -43,7 +43,8 @@ pub struct CreateSubscriptionRequest { pub options: Option>, } -#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] +/// Subscription details. +#[cfg_attr(feature = "schema", derive(utoipa::ToSchema))] #[derive(Debug, Deserialize, Serialize)] pub struct SubscriptionResponse { pub id: SubscriptionId, @@ -63,14 +64,17 @@ impl From for SubscriptionResponse { } } -#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "schema", derive(utoipa::IntoParams))] #[derive(Debug, Deserialize, Serialize)] pub struct ListSubscriptionsParams { + /// Filter by the exact specified sink. pub sink: Option, + /// Filter by the exact specified source. pub source: Option, } -#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] +/// List of all subscriptions. +#[cfg_attr(feature = "schema", derive(utoipa::ToSchema))] #[derive(Debug, Deserialize, Serialize)] pub struct ListSubscriptionsResponse { pub subscriptions: Vec, diff --git a/crates/admin-rest-model/src/version.rs b/crates/admin-rest-model/src/version.rs index d2952546fc..b0e103d2cc 100644 --- a/crates/admin-rest-model/src/version.rs +++ b/crates/admin-rest-model/src/version.rs @@ -52,7 +52,8 @@ impl AdminApiVersion { } } -#[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] +/// Admin API version information +#[cfg_attr(feature = "schema", derive(utoipa::ToSchema))] #[derive(Debug, Serialize, Deserialize)] pub struct VersionInformation { /// # Admin server version diff --git a/crates/admin/Cargo.toml b/crates/admin/Cargo.toml index 5fe1d9484d..c897f25207 100644 --- a/crates/admin/Cargo.toml +++ b/crates/admin/Cargo.toml @@ -11,7 +11,6 @@ publish = false default = ["serve-web-ui"] options_schema = ["restate-service-client/options_schema", "restate-types/schemars"] serve-web-ui = ["restate-web-ui", "mime_guess"] -storage-query = [] metadata-api = [] restate-web-ui = ["dep:restate-web-ui"] all-metadata-providers = [ @@ -58,11 +57,9 @@ hyper-util = { workspace = true } itertools = { workspace = true } metrics = { workspace = true } mime_guess = { version = "2.0.5", optional = true } -okapi-operation = { version = "0.3.0", features = ["axum-integration"] } parking_lot = { workspace = true } prost-dto = { workspace = true } rand = { workspace = true } -schemars = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } serde_with = { workspace = true } @@ -73,6 +70,8 @@ tower = { workspace = true, features = ["load-shed", "limit"] } tower-http = { workspace = true, features = ["trace"] } tracing = { workspace = true } urlencoding = { workspace = true } +utoipa = { workspace = true, features = ["axum_extras"] } +utoipa-axum = { workspace = true } [dev-dependencies] restate-bifrost = { workspace = true, features = ["test-util"] } diff --git a/crates/admin/src/lib.rs b/crates/admin/src/lib.rs index d8fd1c2119..2461cdd03a 100644 --- a/crates/admin/src/lib.rs +++ b/crates/admin/src/lib.rs @@ -19,8 +19,6 @@ pub mod schema_registry_integration; pub mod service; mod state; mod storage_accounting; -#[cfg(feature = "storage-query")] -mod storage_query; #[cfg(feature = "serve-web-ui")] mod web_ui; diff --git a/crates/admin/src/rest_api/cluster_health.rs b/crates/admin/src/rest_api/cluster_health.rs index 896df74ed1..4b3880e2e5 100644 --- a/crates/admin/src/rest_api/cluster_health.rs +++ b/crates/admin/src/rest_api/cluster_health.rs @@ -10,7 +10,6 @@ use axum::Json; use http::StatusCode; -use okapi_operation::openapi; use restate_core::network::net_util::{DNSResolution, create_tonic_channel}; use restate_core::protobuf::node_ctl_svc::new_node_ctl_client; @@ -18,17 +17,22 @@ use restate_core::{Metadata, my_node_id}; use restate_types::config::Configuration; use restate_types::{NodeId, PlainNodeId}; -use crate::rest_api::error::GenericRestError; +use crate::rest_api::error::{ErrorDescriptionResponse, GenericRestError}; /// Cluster state endpoint -#[openapi( - summary = "Cluster health", - description = "Get the cluster health.", +#[utoipa::path( + get, + path = "/cluster-health", operation_id = "cluster_health", - tags = "cluster_health", - deprecated = true + tag = "cluster_health", + responses( + (status = 200, description = "Cluster health information", body = ClusterHealthResponse), + (status = 500, description = "Internal Server Error", body = ErrorDescriptionResponse), + (status = 503, description = "The cluster does not seem to be provisioned yet.", body = ErrorDescriptionResponse), + ) )] // todo: Remove in v1.7.0 as it should no longer be actively used +#[deprecated] pub async fn cluster_health() -> Result, GenericRestError> { let nodes_configuration = Metadata::with_current(|m| m.nodes_config_ref()); let node_config = nodes_configuration @@ -59,8 +63,9 @@ pub async fn cluster_health() -> Result, GenericRest Ok(Json(cluster_health_response)) } +/// Cluster health information #[derive( - Debug, Clone, serde::Serialize, serde::Deserialize, schemars::JsonSchema, prost_dto::FromProst, + Debug, Clone, serde::Serialize, serde::Deserialize, utoipa::ToSchema, prost_dto::FromProst, )] #[prost(target = "restate_core::protobuf::node_ctl_svc::ClusterHealthResponse")] pub struct ClusterHealthResponse { @@ -71,7 +76,7 @@ pub struct ClusterHealthResponse { } #[derive( - Debug, Clone, serde::Serialize, serde::Deserialize, schemars::JsonSchema, prost_dto::FromProst, + Debug, Clone, serde::Serialize, serde::Deserialize, utoipa::ToSchema, prost_dto::FromProst, )] #[prost(target = "restate_core::protobuf::node_ctl_svc::EmbeddedMetadataClusterHealth")] pub struct EmbeddedMetadataClusterHealth { diff --git a/crates/admin/src/rest_api/deployments.rs b/crates/admin/src/rest_api/deployments.rs index 02e70dbd0a..cceffc6495 100644 --- a/crates/admin/src/rest_api/deployments.rs +++ b/crates/admin/src/rest_api/deployments.rs @@ -17,7 +17,6 @@ use axum::http::{StatusCode, header}; use axum::response::IntoResponse; use axum::{Extension, Json}; use http::{Method, Uri}; -use okapi_operation::*; use restate_admin_rest_model::deployments::*; use restate_admin_rest_model::version::AdminApiVersion; use restate_errors::warn_it; @@ -32,36 +31,30 @@ use restate_types::schema::registry::{ use restate_types::schema::service::ServiceMetadata; use serde::Deserialize; -/// Create deployment and return discovered services. -#[openapi( - summary = "Create deployment", - description = "Create and register a new deployment. \ - Restate will invoke the endpoint to gather additional information required for registration, such as the services exposed by the deployment. \ - If the deployment is already registered, this method will return 200 and no changes will be made. \ - If the deployment updates some already existing services, schema breaking changes checks will run. If you want to bypass them, use `breaking: true`. \ - To overwrite an already existing deployment, use `force: true`", +/// Register deployment +/// +/// Registers a new deployment (HTTP or Lambda). Restate will invoke the endpoint to discover available services and handlers, +/// and make them available for invocation. For more information, see the [deployment documentation](https://docs.restate.dev/services/versioning#registering-a-deployment). +#[utoipa::path( + post, + path = "/deployments", operation_id = "create_deployment", - tags = "deployment", - external_docs(url = "https://docs.restate.dev/operate/registration"), + tag = "deployment", + request_body = RegisterDeploymentRequest, responses( - ignore_return_type = true, - response( - status = "200", - description = "Already exists. No change if force = false, overwritten if force = true", - content = "Json", - ), - response( - status = "201", - description = "Created", - content = "Json", - ), - from_type = "MetaApiError", + (status = 200, description = "Deployment already exists. No change if force = false, services overwritten if force = true", body = RegisterDeploymentResponse, headers( + ("Location" = String, description = "URI of the deployment") + )), + (status = 201, description = "Deployment created successfully and services discovered", body = RegisterDeploymentResponse, headers( + ("Location" = String, description = "URI of the created deployment") + )), + MetaApiError ) )] pub async fn create_deployment( State(state): State>, Extension(version): Extension, - #[request_body(required = true)] Json(payload): Json, + Json(payload): Json, ) -> Result where Metadata: MetadataService, @@ -176,17 +169,21 @@ where )) } -/// Return deployment -#[openapi( - summary = "Get deployment", - description = "Get deployment metadata", +/// Get deployment +/// +/// Returns detailed information about a registered deployment, including deployment metadata and the services it exposes. +#[utoipa::path( + get, + path = "/deployments/{deployment}", operation_id = "get_deployment", - tags = "deployment", - parameters(path( - name = "deployment", - description = "Deployment identifier", - schema = "std::string::String" - )) + tag = "deployment", + params( + ("deployment" = String, Path, description = "Deployment identifier"), + ), + responses( + (status = 200, description = "Deployment details including services and configuration", body = DetailedDeploymentResponse), + MetaApiError + ) )] pub async fn get_deployment( State(state): State>, @@ -204,11 +201,16 @@ where } /// List deployments -#[openapi( - summary = "List deployments", - description = "List all registered deployments.", +/// +/// Returns a list of all registered deployments, including their endpoints and associated services. +#[utoipa::path( + get, + path = "/deployments", operation_id = "list_deployments", - tags = "deployment" + tag = "deployment", + responses( + (status = 200, description = "List of all registered deployments with their metadata", body = ListDeploymentsResponse) + ) )] pub async fn list_deployments( State(state): State>, @@ -226,45 +228,29 @@ where ListDeploymentsResponse { deployments }.into() } -#[derive(Debug, Deserialize, JsonSchema)] +#[derive(Debug, Deserialize, utoipa::IntoParams)] pub struct DeleteDeploymentParams { + /// If true, the deployment will be forcefully deleted. This might break in-flight invocations, use with caution. pub force: Option, } -/// Discover endpoint and return discovered endpoints. -#[openapi( - summary = "Delete deployment", - description = "Delete deployment. Currently it's supported to remove a deployment only using the force flag", +/// Delete deployment +/// +/// Delete a deployment. Currently, only forced deletions are supported. +/// **Use with caution**: forcing a deployment deletion can break in-flight invocations. +#[utoipa::path( + delete, + path = "/deployments/{deployment}", operation_id = "delete_deployment", - tags = "deployment", - parameters( - path( - name = "deployment", - description = "Deployment identifier", - schema = "std::string::String" - ), - query( - name = "force", - description = "If true, the deployment will be forcefully deleted. This might break in-flight invocations, use with caution.", - required = false, - style = "simple", - allow_empty_value = false, - schema = "bool", - ) + tag = "deployment", + params( + ("deployment" = String, Path, description = "Deployment identifier"), + DeleteDeploymentParams ), responses( - ignore_return_type = true, - response( - status = "202", - description = "Accepted", - content = "okapi_operation::Empty", - ), - response( - status = "501", - description = "Not implemented. Only using the force flag is supported at the moment.", - content = "okapi_operation::Empty", - ), - from_type = "MetaApiError", + (status = 202, description = "Deployment deletion accepted and will be processed asynchronously"), + (status = 501, description = "Not implemented. Graceful deployment deletion (force=false) is not yet supported.", body = ErrorDescriptionResponse), + MetaApiError ) )] pub async fn delete_deployment( @@ -287,27 +273,29 @@ where } } -/// Update a deployment -#[openapi( - summary = "Update deployment", - description = "Update an already existing deployment. \ - This lets you update the address and options when invoking the deployment, such as the additional headers for HTTP or the assume role for Lambda. \ - The registered services and handlers won't be overwritten, unless `overwrite: true`.", +/// Update deployment +/// +/// Updates an existing deployment configuration, such as the endpoint address or invocation headers. +/// By default, service schemas are not re-discovered. Set `overwrite: true` to trigger re-discovery. +#[utoipa::path( + patch, + path = "/deployments/{deployment}", operation_id = "update_deployment", - tags = "deployment", - external_docs(url = "https://docs.restate.dev/operate/versioning"), - parameters(path( - name = "deployment", - description = "Deployment identifier", - schema = "std::string::String" - )) + tag = "deployment", + params( + ("deployment" = String, Path, description = "Deployment identifier"), + ), + responses( + (status = 200, description = "Deployment updated successfully. Address and invocation options are updated. Service schemas are only updated if overwrite was set to true.", body = DetailedDeploymentResponse), + MetaApiError + ) )] pub async fn update_deployment( State(state): State>, Extension(version): Extension, method: Method, Path(deployment_id): Path, - #[request_body(required = true)] Json(payload): Json, + Json(payload): Json, ) -> Result, MetaApiError> where Metadata: MetadataService, diff --git a/crates/admin/src/rest_api/error.rs b/crates/admin/src/rest_api/error.rs index d35e9c3992..d92e1b3700 100644 --- a/crates/admin/src/rest_api/error.rs +++ b/crates/admin/src/rest_api/error.rs @@ -8,26 +8,21 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use assert2::let_assert; use axum::Json; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; use codederror::{Code, CodedError}; -use okapi_operation::okapi::map; -use okapi_operation::okapi::openapi3::{RefOr, Responses}; -use okapi_operation::{Components, ToMediaTypes, ToResponses, okapi}; use restate_core::ShutdownError; use restate_types::identifiers::{DeploymentId, SubscriptionId}; use restate_types::invocation::ServiceType; use restate_types::schema::registry::SchemaRegistryError; -use schemars::JsonSchema; use serde::Serialize; use std::ops::RangeInclusive; // --- Few helpers to define Admin API errors. /// Macro to generate an Admin API Error enum with the given variants. /// -/// All the errors should implement both axum IntoResponse and okapi_operation ToResponses (see macro below). +/// All the errors should implement axum IntoResponse and Utoipa's IntoResponses (see macro below). /// /// Example usage: /// @@ -59,49 +54,24 @@ macro_rules! generate_meta_api_error { } } - // Generate ToResponses implementation - impl okapi_operation::ToResponses for $enum_name { - fn generate(components: &mut okapi_operation::Components) -> Result { - // Collect responses from all variants - let responses = vec![ - $( - <$variant as okapi_operation::ToResponses>::generate(components)?, - )* - ]; + impl utoipa::IntoResponses for $enum_name { + fn responses() -> std::collections::BTreeMap> { + let mut result = std::collections::BTreeMap::default(); - // Fold the responses into one - $crate::rest_api::error::merge_error_responses(responses) + $( + result.extend(<$variant as utoipa::IntoResponses>::responses()); + )* + + result } } }; } -pub(crate) fn merge_error_responses(responses: Vec) -> Result { - let mut result_responses = Responses::default(); - for t_responses in responses { - assert!( - t_responses.default.is_none(), - "Errors should not define a default response" - ); - for (status, response) in t_responses.responses { - let_assert!(RefOr::Object(new_response) = response); - result_responses - .responses - .entry(status) - .and_modify(|res| { - let_assert!(RefOr::Object(old_response) = res); - old_response.description = - format!("{}\n{}", old_response.description, new_response.description); - }) - .or_insert_with(|| RefOr::Object(new_response)); - } - } +// merge_error_responses removed - no longer needed with utoipa - Ok(result_responses) -} - -/// Macro to implement both axum IntoResponse and okapi_operation ToResponses, -/// such that the error can be used both as value from axum handlers and to auto generate the OpenAPI documentation. +/// Macro to implement both axum IntoResponse and Utoipa's IntoResponses for error types. +/// Error responses are listed explicitly in handler #[utoipa::path] annotations. /// /// Example usage: /// @@ -125,18 +95,22 @@ macro_rules! impl_meta_api_error { } } - impl ToResponses for $error_name { - fn generate(components: &mut Components) -> Result { - let error_media_type = - as ToMediaTypes>::generate(components)?; - Ok(Responses { - responses: map! { - StatusCode::$status_code.to_string() => okapi::openapi3::RefOr::Object( - okapi::openapi3::Response { content: error_media_type.clone(), description: $description.to_owned(), ..Default::default() } - ), - }, - ..Default::default() - }) + impl utoipa::IntoResponses for $error_name { + fn responses() -> std::collections::BTreeMap> { + + utoipa::openapi::ResponsesBuilder::new() + .response( + StatusCode::$status_code.as_str(), + utoipa::openapi::ResponseBuilder::new() + .description($description) + .content( + "application/json", + utoipa::openapi::ContentBuilder::new() + .schema(Some(::schema())) + .build()) + .build()) + .build() + .into() } } }; @@ -307,7 +281,7 @@ pub enum MetaApiError { /// # Error description response /// /// Error details of the response -#[derive(Debug, Serialize, JsonSchema)] +#[derive(Debug, Serialize, utoipa::ToSchema)] pub(crate) struct ErrorDescriptionResponse { message: String, /// # Restate code @@ -346,36 +320,63 @@ impl IntoResponse for MetaApiError { } } -impl ToResponses for MetaApiError { - fn generate(components: &mut Components) -> Result { - let error_media_type = - as ToMediaTypes>::generate(components)?; - Ok(Responses { - responses: map! { - "400".into() => okapi::openapi3::RefOr::Object( - okapi::openapi3::Response { content: error_media_type.clone(), ..Default::default() } - ), - "403".into() => okapi::openapi3::RefOr::Object( - okapi::openapi3::Response { content: error_media_type.clone(), ..Default::default() } - ), - "404".into() => okapi::openapi3::RefOr::Object( - okapi::openapi3::Response { content: error_media_type.clone(), ..Default::default() } - ), - "409".into() => okapi::openapi3::RefOr::Object( - okapi::openapi3::Response { content: error_media_type.clone(), ..Default::default() } - ), - "500".into() => okapi::openapi3::RefOr::Object( - okapi::openapi3::Response { content: error_media_type.clone(), ..Default::default() } - ), - "503".into() => okapi::openapi3::RefOr::Object( - okapi::openapi3::Response { content: error_media_type, ..Default::default() } - ) - }, - ..Default::default() - }) +impl utoipa::IntoResponses for MetaApiError { + fn responses() + -> std::collections::BTreeMap> { + use std::collections::BTreeMap; + use utoipa::openapi::{Ref, RefOr}; + + let mut responses = BTreeMap::new(); + responses.insert( + "400".to_string(), + RefOr::Ref(Ref::from_response_name("BadRequest")), + ); + responses.insert( + "404".to_string(), + RefOr::Ref(Ref::from_response_name("NotFound")), + ); + responses.insert( + "405".to_string(), + RefOr::Ref(Ref::from_response_name("MethodNotAllowed")), + ); + responses.insert( + "409".to_string(), + RefOr::Ref(Ref::from_response_name("Conflict")), + ); + responses.insert( + "500".to_string(), + RefOr::Ref(Ref::from_response_name("InternalServerError")), + ); + responses } } +pub mod meta_api_error { + //! Those types are only used to generate the corresponding OpenAPI specification for error types + //! that are referenced by [`super::MetaApiError`] when calling [`utoipa::IntoResponses`]. + #![allow(dead_code)] + + /// Bad request + #[derive(utoipa::ToResponse)] + pub struct BadRequest(super::ErrorDescriptionResponse); + + /// Not found + #[derive(utoipa::ToResponse)] + pub struct NotFound(super::ErrorDescriptionResponse); + + /// Method not allowed + #[derive(utoipa::ToResponse)] + pub struct MethodNotAllowed(super::ErrorDescriptionResponse); + + /// Conflict + #[derive(utoipa::ToResponse)] + pub struct Conflict(super::ErrorDescriptionResponse); + + /// Internal server error + #[derive(utoipa::ToResponse)] + pub struct InternalServerError(super::ErrorDescriptionResponse); +} + impl From for MetaApiError { fn from(value: ShutdownError) -> Self { MetaApiError::Internal(value.to_string()) @@ -402,32 +403,4 @@ impl IntoResponse for GenericRestError { } } -impl ToResponses for GenericRestError { - fn generate(components: &mut Components) -> Result { - let error_media_type = - as ToMediaTypes>::generate(components)?; - Ok(Responses { - responses: map! { - "400".into() => okapi::openapi3::RefOr::Object( - okapi::openapi3::Response { content: error_media_type.clone(), ..Default::default() } - ), - "403".into() => okapi::openapi3::RefOr::Object( - okapi::openapi3::Response { content: error_media_type.clone(), ..Default::default() } - ), - "404".into() => okapi::openapi3::RefOr::Object( - okapi::openapi3::Response { content: error_media_type.clone(), ..Default::default() } - ), - "409".into() => okapi::openapi3::RefOr::Object( - okapi::openapi3::Response { content: error_media_type.clone(), ..Default::default() } - ), - "500".into() => okapi::openapi3::RefOr::Object( - okapi::openapi3::Response { content: error_media_type.clone(), ..Default::default() } - ), - "503".into() => okapi::openapi3::RefOr::Object( - okapi::openapi3::Response { content: error_media_type, ..Default::default() } - ) - }, - ..Default::default() - }) - } -} +// ToResponses removed - errors listed explicitly in handler annotations diff --git a/crates/admin/src/rest_api/handlers.rs b/crates/admin/src/rest_api/handlers.rs index 9750d095cf..67d729f9b0 100644 --- a/crates/admin/src/rest_api/handlers.rs +++ b/crates/admin/src/rest_api/handlers.rs @@ -13,22 +13,25 @@ use super::error::*; use crate::state::AdminServiceState; use axum::Json; use axum::extract::{Path, State}; -use okapi_operation::*; use restate_admin_rest_model::handlers::*; use restate_types::schema::registry::MetadataService; use restate_types::schema::service::HandlerMetadata; -/// List discovered handlers for service -#[openapi( - summary = "List service handlers", - description = "List all the handlers of the given service.", +/// List service handlers +/// +/// Returns a list of all handlers (methods) available in the specified service. +#[utoipa::path( + get, + path = "/services/{service}/handlers", operation_id = "list_service_handlers", - tags = "service_handler", - parameters(path( - name = "service", - description = "Fully qualified service name.", - schema = "std::string::String" - )) + tag = "service_handler", + params( + ("service" = String, Path, description = "Fully qualified service name."), + ), + responses( + (status = 200, description = "List of handlers available in the service", body = ListServiceHandlersResponse), + MetaApiError + ) )] pub async fn list_service_handlers( State(state): State>, @@ -43,23 +46,21 @@ where } } -/// Get a handler of a service -#[openapi( - summary = "Get service handler", - description = "Get the handler of a service", +/// Get service handler +/// +/// Returns detailed metadata about a specific handler within a service, including its input/output types and handler type. +#[utoipa::path( + get, + path = "/services/{service}/handlers/{handler}", operation_id = "get_service_handler", - tags = "service_handler", - parameters( - path( - name = "service", - description = "Fully qualified service name.", - schema = "std::string::String" - ), - path( - name = "handler", - description = "Handler name.", - schema = "std::string::String" - ) + tag = "service_handler", + params( + ("service" = String, Path, description = "Fully qualified service name."), + ("handler" = String, Path, description = "Handler name."), + ), + responses( + (status = 200, description = "Handler metadata including input/output types and configuration", body = HandlerMetadata), + MetaApiError ) )] pub async fn get_service_handler( diff --git a/crates/admin/src/rest_api/health.rs b/crates/admin/src/rest_api/health.rs index 4f9fa230de..c68a6575cf 100644 --- a/crates/admin/src/rest_api/health.rs +++ b/crates/admin/src/rest_api/health.rs @@ -9,17 +9,15 @@ // by the Apache License, Version 2.0. use axum::http::StatusCode; -use okapi_operation::*; /// Health check endpoint -#[openapi( - summary = "Health check", - description = "Check REST API Health.", +#[utoipa::path( + get, + path = "/health", operation_id = "health", - tags = "health", + tag = "health", responses( - ignore_return_type = true, - response(status = "200", description = "OK", content = "okapi_operation::Empty"), + (status = 200, description = "The Admin API is ready to accept requests."), ) )] pub async fn health() -> StatusCode { diff --git a/crates/admin/src/rest_api/invocations.rs b/crates/admin/src/rest_api/invocations.rs index 43518e7d62..8c0cdfe130 100644 --- a/crates/admin/src/rest_api/invocations.rs +++ b/crates/admin/src/rest_api/invocations.rs @@ -11,9 +11,6 @@ use axum::Json; use axum::extract::{Path, Query, State}; use axum::http::StatusCode; -use okapi_operation::*; -use serde::Deserialize; - use restate_admin_rest_model::invocations::RestartAsNewInvocationResponse; use restate_types::identifiers::{DeploymentId, InvocationId, PartitionProcessorRpcRequestId}; use restate_types::invocation::client::{ @@ -21,6 +18,7 @@ use restate_types::invocation::client::{ PauseInvocationResponse, PurgeInvocationResponse, ResumeInvocationResponse, }; use restate_types::journal_v2::EntryIndex; +use serde::Deserialize; use super::error::*; use crate::generate_meta_api_error; @@ -29,17 +27,22 @@ use crate::state::AdminServiceState; generate_meta_api_error!(KillInvocationError: [InvocationNotFoundError, InvocationClientError, InvalidFieldError, InvocationWasAlreadyCompletedError]); /// Kill an invocation -#[openapi( - summary = "Kill an invocation", - description = "Kill the given invocation. \ - This does not guarantee consistency for virtual object instance state, in-flight invocations to other services, etc.", +/// +/// Forcefully terminates an invocation. **Warning**: This operation does not guarantee consistency for virtual object instance state, +/// in-flight invocations to other services, or other side effects. Use with caution. +/// For more information, see the [cancellation documentation](https://docs.restate.dev/services/invocation/managing-invocations#kill). +#[utoipa::path( + patch, + path = "/invocations/{invocation_id}/kill", operation_id = "kill_invocation", - tags = "invocation", - parameters(path( - name = "invocation_id", - description = "Invocation identifier.", - schema = "std::string::String" - )) + tag = "invocation", + params( + ("invocation_id" = String, Path, description = "Invocation identifier."), + ), + responses( + (status = 200, description = "Invocation killed successfully"), + KillInvocationError + ) )] pub async fn kill_invocation( State(state): State>, @@ -73,32 +76,21 @@ where generate_meta_api_error!(CancelInvocationError: [InvocationNotFoundError, InvocationClientError, InvalidFieldError, InvocationWasAlreadyCompletedError]); /// Cancel an invocation -#[openapi( - summary = "Cancel an invocation", - description = "Cancel the given invocation. \ - Canceling an invocation allows it to free any resources it is holding and roll back any changes it has made so far, running compensation code. \ - For more details, checkout https://docs.restate.dev/guides/sagas", +/// +/// Gracefully cancels an invocation. The invocation is terminated, but its progress is persisted, allowing consistency guarantees to be maintained. +/// For more information, see the [cancellation documentation](https://docs.restate.dev/services/invocation/managing-invocations#cancel). +#[utoipa::path( + patch, + path = "/invocations/{invocation_id}/cancel", operation_id = "cancel_invocation", - tags = "invocation", - external_docs(url = "https://docs.restate.dev/guides/sagas"), - parameters(path( - name = "invocation_id", - description = "Invocation identifier.", - schema = "std::string::String" - )), + tag = "invocation", + params( + ("invocation_id" = String, Path, description = "Invocation identifier."), + ), responses( - ignore_return_type = true, - response( - status = "200", - description = "The invocation has been cancelled.", - content = "okapi_operation::Empty", - ), - response( - status = "202", - description = "The cancellation signal was appended to the journal and will be processed by the SDK.", - content = "okapi_operation::Empty", - ), - from_type = "CancelInvocationError", + (status = 200, description = "Invocation cancelled successfully"), + (status = 202, description = "Cancellation request accepted and will be processed asynchronously"), + CancelInvocationError ) )] pub async fn cancel_invocation( @@ -131,17 +123,23 @@ where generate_meta_api_error!(PurgeInvocationError: [InvocationNotFoundError, InvocationClientError, InvalidFieldError, PurgeInvocationNotCompletedError]); -/// Purge an invocation -#[openapi( - summary = "Purge an invocation", - description = "Purge the given invocation. This cleanups all the state for the given invocation. This command applies only to completed invocations.", +/// Purge a completed invocation +/// +/// Deletes all state associated with a completed invocation, including its journal and metadata. +/// This operation only applies to invocations that have already completed. For more information, +/// see the [purging documentation](https://docs.restate.dev/services/invocation/managing-invocations#purge). +#[utoipa::path( + patch, + path = "/invocations/{invocation_id}/purge", operation_id = "purge_invocation", - tags = "invocation", - parameters(path( - name = "invocation_id", - description = "Invocation identifier.", - schema = "std::string::String" - )) + tag = "invocation", + params( + ("invocation_id" = String, Path, description = "Invocation identifier."), + ), + responses( + (status = 200, description = "Invocation purged successfully"), + PurgeInvocationError + ) )] pub async fn purge_invocation( State(state): State>, @@ -174,17 +172,22 @@ where generate_meta_api_error!(PurgeJournalError: [InvocationNotFoundError, InvocationClientError, InvalidFieldError, PurgeInvocationNotCompletedError]); -/// Purge an invocation -#[openapi( - summary = "Purge an invocation journal", - description = "Purge the given invocation journal. This cleanups only the journal for the given invocation, retaining the metadata. This command applies only to completed invocations.", +/// Purge invocation journal +/// +/// Deletes only the journal entries for a completed invocation, while retaining its metadata. +/// This operation only applies to invocations that have already completed. +#[utoipa::path( + patch, + path = "/invocations/{invocation_id}/purge-journal", operation_id = "purge_journal", - tags = "invocation", - parameters(path( - name = "invocation_id", - description = "Invocation identifier.", - schema = "std::string::String" - )) + tag = "invocation", + params( + ("invocation_id" = String, Path, description = "Invocation identifier."), + ), + responses( + (status = 200, description = "Invocation journal purged successfully, metadata retained"), + PurgeJournalError + ) )] pub async fn purge_journal( State(state): State>, @@ -215,7 +218,7 @@ where Ok(()) } -#[derive(Debug, Default, Deserialize)] +#[derive(Debug, Default, Deserialize, utoipa::ToSchema)] pub enum PatchDeploymentId { #[default] #[serde(alias = "keep")] @@ -240,9 +243,19 @@ impl PatchDeploymentId { } } -#[derive(Debug, Default, Deserialize)] +#[derive(Debug, Default, Deserialize, utoipa::IntoParams)] pub struct RestartAsNewInvocationQueryParams { + /// From which entry index the invocation should restart from. + /// By default the invocation restarts from the beginning (equivalent to 'from = 0'), retaining only the input of the original invocation. + /// When greater than 0, the new invocation will copy the old journal prefix up to 'from' included, plus eventual completions for commands in the given prefix. + /// If the journal prefix contains commands that have not been completed, this operation will fail. pub from: Option, + /// When restarting from journal prefix, provide a deployment id to use to replace the currently pinned deployment id. + /// If 'latest', use the latest deployment id. If 'keep', keeps the pinned deployment id. + /// When not provided, the invocation will resume on latest. + /// Note: this parameter can be used only in combination with 'from'. + // TODO inline is a workaround for https://github.com/juhaku/utoipa/issues/1388 + #[param(inline)] pub deployment: Option, } @@ -261,43 +274,22 @@ generate_meta_api_error!(RestartInvocationError: [ RestartAsNewInvocationIncompatibleDeploymentIdError ]); -/// Restart an invocation -#[openapi( - summary = "Restart as new invocation", - description = "Restart the given invocation as new. \ - This will restart the invocation as a new invocation with a different invocation id. \ - By using the 'from' query parameter, some of the partial progress can be copied over to the new invocation.", +/// Restart invocation as new +/// +/// Creates a new invocation from a completed invocation, optionally copying partial progress from the original invocation's journal. +/// The new invocation will have a different invocation ID. Use the `from` parameter to specify how much of the original journal to preserve. +#[utoipa::path( + patch, + path = "/invocations/{invocation_id}/restart-as-new", operation_id = "restart_as_new_invocation", - tags = "invocation", - parameters( - path( - name = "invocation_id", - description = "Invocation identifier.", - schema = "std::string::String" - ), - query( - name = "from", - description = "From which entry index the invocation should restart from. \ - By default the invocation restarts from the beginning (equivalent to 'from = 0'), retaining only the input of the original invocation. \ - When greater than 0, the new invocation will copy the old journal prefix up to 'from' included, plus eventual completions for commands in the given prefix. \ - If the journal prefix contains commands that have not been completed, this operation will fail.", - required = false, - style = "simple", - allow_empty_value = false, - schema = "u32", - ), - query( - name = "deployment", - description = "When restarting from journal prefix, provide a deployment id to use to replace the currently pinned deployment id. \ - If 'latest', use the latest deployment id. If 'keep', keeps the pinned deployment id. \ - When not provided, the invocation will resume on latest. \ - Note: this parameter can be used only in combination with 'from'.", - required = false, - style = "simple", - allow_empty_value = false, - // TODO(slinkydeveloper) https://github.com/restatedev/restate/issues/3766 - schema = "String", - ), + tag = "invocation", + params( + ("invocation_id" = String, Path, description = "Invocation identifier."), + RestartAsNewInvocationQueryParams + ), + responses( + (status = 200, description = "Invocation restarted successfully with a new invocation ID", body = RestartAsNewInvocationResponse), + RestartInvocationError ) )] pub async fn restart_as_new_invocation( @@ -370,8 +362,14 @@ where } } -#[derive(Debug, Default, Deserialize)] +#[derive(Debug, Default, Deserialize, utoipa::IntoParams)] pub struct ResumeInvocationQueryParams { + /// When resuming from paused/suspended, provide a deployment id to use to replace the currently pinned deployment id. + /// If 'latest', use the latest deployment id. If 'keep', keeps the pinned deployment id. + /// When not provided, the invocation will resume on the pinned deployment id. + /// When provided and the invocation is either running, or no deployment is pinned, this operation will fail. + // TODO inline is a workaround for https://github.com/juhaku/utoipa/issues/1388 + #[param(inline)] pub deployment: Option, } @@ -387,29 +385,21 @@ generate_meta_api_error!(ResumeInvocationError: [ ]); /// Resume an invocation -#[openapi( - summary = "Resume an invocation", - description = "Resume the given invocation. In case the invocation is backing-off, this will immediately trigger the retry timer. If the invocation is suspended or paused, this will resume it.", +/// +/// Resumes a paused or suspended invocation. If the invocation is backing off due to a retry, this will immediately trigger the retry. +/// Optionally, you can change the deployment ID that will be used when the invocation resumes. For more information see [resume documentation](https://docs.restate.dev/services/invocation/managing-invocations#resume) +#[utoipa::path( + patch, + path = "/invocations/{invocation_id}/resume", operation_id = "resume_invocation", - tags = "invocation", - parameters( - path( - name = "invocation_id", - description = "Invocation identifier.", - schema = "std::string::String" - ), - query( - name = "deployment", - description = "When resuming from paused/suspended, provide a deployment id to use to replace the currently pinned deployment id. \ - If 'latest', use the latest deployment id. If 'keep', keeps the pinned deployment id. \ - When not provided, the invocation will resume on the pinned deployment id. \ - When provided and the invocation is either running, or no deployment is pinned, this operation will fail.", - required = false, - style = "simple", - allow_empty_value = false, - // TODO(slinkydeveloper) https://github.com/restatedev/restate/issues/3766 - schema = "String", - ) + tag = "invocation", + params( + ("invocation_id" = String, Path, description = "Invocation identifier."), + ResumeInvocationQueryParams + ), + responses( + (status = 200, description = "Invocation resumed successfully"), + ResumeInvocationError ) )] pub async fn resume_invocation( @@ -473,29 +463,18 @@ generate_meta_api_error!(PauseInvocationError: [ ]); /// Pause an invocation -#[openapi( - summary = "Pause an invocation", - description = "Pause the given invocation. This applies only to running invocations, and will cause them to eventually pause.", +#[utoipa::path( + patch, + path = "/invocations/{invocation_id}/pause", operation_id = "pause_invocation", - tags = "invocation", - parameters(path( - name = "invocation_id", - description = "Invocation identifier.", - schema = "std::string::String" - )), + tag = "invocation", + params( + ("invocation_id" = String, Path, description = "Invocation identifier."), + ), responses( - ignore_return_type = true, - response( - status = "200", - description = "Already paused", - content = "okapi_operation::Empty", - ), - response( - status = "202", - description = "Accepted", - content = "okapi_operation::Empty", - ), - from_type = "PauseInvocationError", + (status = 200, description = "Invocation is already paused"), + (status = 202, description = "Pausing invocation"), + PauseInvocationError, ) )] pub async fn pause_invocation( diff --git a/crates/admin/src/rest_api/mod.rs b/crates/admin/src/rest_api/mod.rs index 191534723f..aba0b67409 100644 --- a/crates/admin/src/rest_api/mod.rs +++ b/crates/admin/src/rest_api/mod.rs @@ -16,13 +16,14 @@ mod error; mod handlers; mod health; mod invocations; +mod query; mod services; mod subscriptions; mod version; -use okapi_operation::axum_integration::{delete, get, patch, post}; -use okapi_operation::okapi::openapi3::{ExternalDocs, Tag}; -use okapi_operation::*; +use utoipa::OpenApi; +use utoipa_axum::{router::OpenApiRouter, routes}; + use restate_core::network::TransportConnect; use restate_types::identifiers::PartitionKey; use restate_types::invocation::client::InvocationClient; @@ -33,6 +34,40 @@ use crate::state::AdminServiceState; pub use version::{MAX_ADMIN_API_VERSION, MIN_ADMIN_API_VERSION}; +#[derive(OpenApi)] +#[openapi( + info( + title = "Admin API", + version = env!("CARGO_PKG_VERSION"), + description = "This API exposes the admin operations of a Restate cluster, such as registering new service deployments, interacting with running invocations, register Kafka subscriptions, retrieve service metadata. For an overview, check out the [Operate documentation](https://docs.restate.dev/operate/). If you're looking for how to call your services, check out the [Ingress HTTP API](https://docs.restate.dev/invoke/http) instead.", + license( + name = "MIT", + url = "https://opensource.org/license/mit" + ), + ), + external_docs(url = "https://docs.restate.dev/operate/", description = "Restate operations documentation"), + tags( + (name = "deployment", description = "Service Deployment management"), + (name = "invocation", description = "Invocation management", + external_docs(url = "https://docs.restate.dev/operate/invocation", description = "Invocations documentation")), + (name = "subscription", description = "Subscription management", + external_docs(url = "https://docs.restate.dev/operate/invocation#managing-kafka-subscriptions", description = "Kafka subscriptions documentation")), + (name = "service", description = "Service management"), + (name = "service_handler", description = "Service handlers metadata"), + (name = "cluster_health", description = "Cluster health"), + (name = "health", description = "Admin API health"), + (name = "version", description = "API Version"), + (name = "introspection", description = "System introspection"), + ), + components(responses( + error::meta_api_error::BadRequest, + error::meta_api_error::NotFound, + error::meta_api_error::MethodNotAllowed, + error::meta_api_error::Conflict, + error::meta_api_error::InternalServerError)) +)] +struct AdminApiDoc; + pub fn create_router( state: AdminServiceState, ) -> axum::Router<()> @@ -43,168 +78,58 @@ where Invocations: InvocationClient + Send + Sync + Clone + 'static, Transport: TransportConnect, { - let mut router = axum_integration::Router::new() - .route( - "/deployments", - get(openapi_handler!(deployments::list_deployments)), - ) - .route( - "/deployments", - post(openapi_handler!(deployments::create_deployment)), - ) - .route( - "/deployments/{deployment}", - get(openapi_handler!(deployments::get_deployment)), - ) - .route( - "/deployments/{deployment}", - delete(openapi_handler!(deployments::delete_deployment)), - ) - .route( - "/deployments/{deployment}", - patch(openapi_handler!(deployments::update_deployment)), - ) + let router = { + #[allow(deprecated)] + OpenApiRouter::with_openapi(AdminApiDoc::openapi()) + .routes(routes!(health::health)) + .routes(routes!(version::version)) + .routes(routes!(cluster_health::cluster_health)) + // Deployment endpoints + .routes(routes!(deployments::list_deployments)) + .routes(routes!(deployments::create_deployment)) + .routes(routes!(deployments::get_deployment)) + .routes(routes!(deployments::delete_deployment)) + .routes(routes!(deployments::update_deployment)) + // Service endpoints + .routes(routes!(services::list_services)) + .routes(routes!(services::get_service)) + .routes(routes!(services::get_service_openapi)) + .routes(routes!(services::modify_service)) + .routes(routes!(services::modify_service_state)) + // Handler endpoints + .routes(routes!(handlers::list_service_handlers)) + .routes(routes!(handlers::get_service_handler)) + // Invocation endpoints + .routes(routes!(invocations::kill_invocation)) + .routes(routes!(invocations::cancel_invocation)) + .routes(routes!(invocations::purge_invocation)) + .routes(routes!(invocations::purge_journal)) + .routes(routes!(invocations::restart_as_new_invocation)) + .routes(routes!(invocations::resume_invocation)) + .routes(routes!(invocations::pause_invocation)) + // Subscription endpoints + .routes(routes!(subscriptions::create_subscription)) + .routes(routes!(subscriptions::list_subscriptions)) + .routes(routes!(subscriptions::get_subscription)) + .routes(routes!(subscriptions::delete_subscription)) + // Query endpoint + .routes(routes!(query::query)) + }; + + let (router, api) = router.split_for_parts(); + + // The PUT route is deprecated and intentionally not documented in OpenAPI + // Only PATCH has #[utoipa::path], PUT is kept for backward compatibility only + axum::Router::new() + .merge(router) .route( "/deployments/{deployment}", axum::routing::put(deployments::update_deployment), ) - .route("/services", get(openapi_handler!(services::list_services))) - .route( - "/services/{service}", - get(openapi_handler!(services::get_service)), - ) - .route( - "/services/{service}/openapi", - get(openapi_handler!(services::get_service_openapi)), - ) - .route( - "/services/{service}", - patch(openapi_handler!(services::modify_service)), - ) - .route( - "/services/{service}/state", - post(openapi_handler!(services::modify_service_state)), - ) - .route( - "/services/{service}/handlers", - get(openapi_handler!(handlers::list_service_handlers)), - ) - .route( - "/services/{service}/handlers/{handler}", - get(openapi_handler!(handlers::get_service_handler)), - ) - .route( - "/invocations/{invocation_id}/kill", - patch(openapi_handler!(invocations::kill_invocation)), - ) - .route( - "/invocations/{invocation_id}/cancel", - patch(openapi_handler!(invocations::cancel_invocation)), - ) - .route( - "/invocations/{invocation_id}/purge", - patch(openapi_handler!(invocations::purge_invocation)), - ) - .route( - "/invocations/{invocation_id}/purge-journal", - patch(openapi_handler!(invocations::purge_journal)), - ) - .route( - "/invocations/{invocation_id}/restart-as-new", - patch(openapi_handler!(invocations::restart_as_new_invocation)), - ) .route( - "/invocations/{invocation_id}/resume", - patch(openapi_handler!(invocations::resume_invocation)), + "/openapi", + axum::routing::get(|| async move { axum::Json(api) }), ) - .route( - "/invocations/{invocation_id}/pause", - patch(openapi_handler!(invocations::pause_invocation)), - ) - .route( - "/subscriptions", - post(openapi_handler!(subscriptions::create_subscription)), - ) - .route( - "/subscriptions", - get(openapi_handler!(subscriptions::list_subscriptions)), - ) - .route( - "/subscriptions/{subscription}", - get(openapi_handler!(subscriptions::get_subscription)), - ) - .route( - "/subscriptions/{subscription}", - delete(openapi_handler!(subscriptions::delete_subscription)), - ) - .route("/health", get(openapi_handler!(health::health))) - .route("/version", get(openapi_handler!(version::version))) - // The cluster_health endpoint is deprecated in the OpenAPI spec in preparation for removing it - .route( - "/cluster-health", - get(openapi_handler!(cluster_health::cluster_health)), - ); - - // Add some additional OpenAPI metadata - router.openapi_builder_template_mut() - .description("This API exposes the admin operations of a Restate cluster, such as registering new service deployments, interacting with running invocations, register Kafka subscriptions, retrieve service metadata. For an overview, check out the [Operate documentation](https://docs.restate.dev/operate/). If you're looking for how to call your services, check out the [Ingress HTTP API](https://docs.restate.dev/invoke/http) instead.") - .external_docs(ExternalDocs { - url: "https://docs.restate.dev/operate/".to_string(), - ..Default::default() - }) - .tag(Tag { - name: "deployment".to_string(), - description: Some("Service Deployment management".to_string()), - ..Default::default() - }) - .tag(Tag { - name: "invocation".to_string(), - description: Some("Invocation management".to_string()), - external_docs: Some(ExternalDocs { - url: "https://docs.restate.dev/operate/invocation".to_string(), - ..Default::default() - }), - ..Default::default() - }) - .tag(Tag { - name: "subscription".to_string(), - description: Some("Subscription management".to_string()), - external_docs: Some(ExternalDocs { - url: "https://docs.restate.dev/operate/invocation#managing-kafka-subscriptions".to_string(), - ..Default::default() - }), - ..Default::default() - }) - .tag(Tag { - name: "service".to_string(), - description: Some("Service management".to_string()), - ..Default::default() - }) - .tag(Tag { - name: "service_handler".to_string(), - description: Some("Service handlers metadata".to_string()), - ..Default::default() - }) - .tag(Tag { - name: "cluster_health".to_string(), - description: Some("Cluster health".to_string()), - ..Default::default() - }) - .tag(Tag { - name: "health".to_string(), - description: Some("Admin API health".to_string()), - ..Default::default() - }) - .tag(Tag { - name: "version".to_string(), - description: Some("API Version".to_string()), - ..Default::default() - }); - - // Finish router - router - .finish_openapi("/openapi", "Admin API", env!("CARGO_PKG_VERSION")) - .expect("Error when building the OpenAPI specification") .with_state(state) } diff --git a/crates/admin/src/storage_query/query.rs b/crates/admin/src/rest_api/query.rs similarity index 64% rename from crates/admin/src/storage_query/query.rs rename to crates/admin/src/rest_api/query.rs index 3ffbcee36e..bc9f6c9b08 100644 --- a/crates/admin/src/storage_query/query.rs +++ b/crates/admin/src/rest_api/query.rs @@ -12,11 +12,11 @@ use std::io::Write; use std::pin::Pin; use std::sync::Arc; +use super::error::GenericRestError; use crate::query_utils::{RecordBatchWriter, WriteRecordBatchStream}; - -use super::QueryServiceState; -use super::error::StorageQueryError; +use crate::state::AdminServiceState; use axum::extract::State; +use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; use axum::{Json, http}; use bytes::Bytes; @@ -29,41 +29,58 @@ use futures::{StreamExt, TryStreamExt}; use http::{HeaderMap, HeaderValue}; use http_body::Frame; use http_body_util::StreamBody; -use okapi_operation::*; use parking_lot::Mutex; -use schemars::JsonSchema; -use serde::Deserialize; -use serde_with::serde_as; - -#[serde_as] -#[derive(Debug, Deserialize, JsonSchema)] -pub struct QueryRequest { - /// # Query - /// - /// SQL query to run against the storage - #[serde_as(as = "serde_with::DisplayFromStr")] - #[schemars(with = "String")] - pub query: String, -} - -/// Query storage -#[openapi( - summary = "Query storage", - description = "Query the storage API", +use restate_admin_rest_model::query::QueryRequest; +use restate_core::network::TransportConnect; +use restate_types::invocation::client::InvocationClient; +use restate_types::schema::registry::{DiscoveryClient, MetadataService, TelemetryClient}; + +/// Query the system and service state by using SQL. +#[utoipa::path( + post, + path = "/query", operation_id = "query", - tags = "storage", - responses(ignore_return_type = true, from_type = "StorageQueryError") + tag = "introspection", + responses( + (status = 200, description = "Query results", + content ( + ("application/vnd.apache.arrow.stream"), + ("application/json", example = json!({"rows": []})) + )), + (status = 500, description = "Internal Datafusion error"), + (status = 503, description = "Query service not available"), + ) )] -pub async fn query( - State(state): State>, +pub(crate) async fn query( + State(state): State>, headers: HeaderMap, - #[request_body(required = true)] Json(payload): Json, -) -> Result { - let record_batch_stream = state.query_context.execute(&payload.query).await?; + Json(payload): Json, +) -> Result +where + Metadata: MetadataService + Send + Sync + Clone + 'static, + Discovery: DiscoveryClient + Send + Sync + Clone + 'static, + Telemetry: TelemetryClient + Send + Sync + Clone + 'static, + Invocations: InvocationClient + Send + Sync + Clone + 'static, + Transport: TransportConnect, +{ + let Some(query_context) = state.query_context.as_ref() else { + return Err(GenericRestError::new( + StatusCode::SERVICE_UNAVAILABLE, + "Query service not available", + )); + }; + + let record_batch_stream = query_context + .execute(&payload.query) + .await + .map_err(|e| GenericRestError::new(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?; let (result_stream, content_type) = match headers.get(http::header::ACCEPT) { Some(v) if v == HeaderValue::from_static("application/json") => ( - WriteRecordBatchStream::::new(record_batch_stream, payload.query)? + WriteRecordBatchStream::::new(record_batch_stream, payload.query) + .map_err(|e| { + GenericRestError::new(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()) + })? .map_ok(Frame::data) .left_stream(), "application/json", @@ -72,7 +89,8 @@ pub async fn query( WriteRecordBatchStream::>>::new( record_batch_stream, payload.query, - )? + ) + .map_err(|e| GenericRestError::new(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))? .map_ok(Frame::data) .right_stream(), "application/vnd.apache.arrow.stream", @@ -84,7 +102,10 @@ pub async fn query( // return an error (instead of just closing the stream) if there is a error getting the first record batch (eg, out of memory) if let Some(Err(_)) = futures::stream::Peekable::peek(Pin::new(&mut result_stream)).await { let err = result_stream.next().await.unwrap().unwrap_err(); - return Err(StorageQueryError::DataFusion(err)); + return Err(GenericRestError::new( + StatusCode::INTERNAL_SERVER_ERROR, + err.to_string(), + )); } Ok(Response::builder() @@ -120,7 +141,7 @@ impl Write for LockWriter { } } -pub struct JsonWriter { +pub(crate) struct JsonWriter { json_writer: datafusion::arrow::json::Writer, lock_writer: LockWriter, finished: bool, diff --git a/crates/admin/src/rest_api/services.rs b/crates/admin/src/rest_api/services.rs index a089915c16..cb83c3e3ba 100644 --- a/crates/admin/src/rest_api/services.rs +++ b/crates/admin/src/rest_api/services.rs @@ -14,7 +14,6 @@ use axum::Json; use axum::extract::{Path, State}; use bytes::Bytes; use http::StatusCode; -use okapi_operation::*; use restate_admin_rest_model::services::ListServicesResponse; use restate_admin_rest_model::services::*; @@ -34,11 +33,17 @@ use super::error::*; use crate::state::AdminServiceState; /// List services -#[openapi( - summary = "List services", - description = "List all registered services.", +/// +/// Returns a list of all registered services, including their metadata and configuration. +#[utoipa::path( + get, + path = "/services", operation_id = "list_services", - tags = "service" + tag = "service", + responses( + (status = 200, description = "List of all registered services with their metadata", body = ListServicesResponse), + MetaApiError + ) )] pub async fn list_services( State(state): State>, @@ -51,17 +56,21 @@ where Ok(ListServicesResponse { services }.into()) } -/// Get a service -#[openapi( - summary = "Get service", - description = "Get a registered service.", +/// Get service +/// +/// Returns detailed metadata about a specific service, including its type, handlers, and configuration settings. +#[utoipa::path( + get, + path = "/services/{service}", operation_id = "get_service", - tags = "service", - parameters(path( - name = "service", - description = "Fully qualified service name.", - schema = "std::string::String" - )) + tag = "service", + params( + ("service" = String, Path, description = "Fully qualified service name."), + ), + responses( + (status = 200, description = "Service metadata including type, revision, handlers, and configuration", body = ServiceMetadata), + MetaApiError + ) )] pub async fn get_service( State(state): State>, @@ -78,24 +87,19 @@ where } /// Get service OpenAPI definition -#[openapi( - summary = "Get service OpenAPI", - description = "Get the service OpenAPI 3.1 contract.", +/// +/// Returns the OpenAPI 3.1 specification for the service, describing all handlers and their request/response schemas. +#[utoipa::path( + get, + path = "/services/{service}/openapi", operation_id = "get_service_openapi", - tags = "service", - parameters(path( - name = "service", - description = "Fully qualified service name.", - schema = "std::string::String" - )), + tag = "service", + params( + ("service" = String, Path, description = "Fully qualified service name."), + ), responses( - ignore_return_type = true, - response( - status = "200", - description = "OpenAPI 3.1 of the service", - content = "Json", - ), - from_type = "MetaApiError", + (status = 200, description = "OpenAPI 3.1 specification document describing the service's API", body = serde_json::Value), + MetaApiError ) )] pub async fn get_service_openapi( @@ -119,22 +123,27 @@ where .ok_or_else(|| MetaApiError::ServiceNotFound(service_name)) } -/// Modify a service -#[openapi( - summary = "Modify a service", - description = "Modify a registered service configuration. NOTE: Service re-discovery will update the settings based on the service endpoint configuration.", +/// Modify service configuration +/// +/// Updates the configuration of a registered service, such as public visibility, retention policies, and timeout settings. +/// Note: Service re-discovery will update these settings based on the service endpoint configuration. +#[utoipa::path( + patch, + path = "/services/{service}", operation_id = "modify_service", - tags = "service", - parameters(path( - name = "service", - description = "Fully qualified service name.", - schema = "std::string::String" - )) + tag = "service", + params( + ("service" = String, Path, description = "Fully qualified service name."), + ), + responses( + (status = 200, description = "Service configuration updated successfully", body = ServiceMetadata), + MetaApiError + ) )] pub async fn modify_service( State(state): State>, Path(service_name): Path, - #[request_body(required = true)] Json(ModifyServiceRequest { + Json(ModifyServiceRequest { public, idempotency_retention, workflow_completion_retention, @@ -175,31 +184,26 @@ where Ok(response.into()) } -/// Modify a service state -#[openapi( - summary = "Modify a service state", - description = "Modify service state", +/// Modify service state +/// +/// Modifies the K/V state of a Virtual Object. For a detailed description of this API and how to use it, see the [state documentation](https://docs.restate.dev/operate/invocation#modifying-service-state). +#[utoipa::path( + post, + path = "/services/{service}/state", operation_id = "modify_service_state", - tags = "service", - parameters(path( - name = "service", - description = "Fully qualified service name.", - schema = "std::string::String" - )), + tag = "service", + params( + ("service" = String, Path, description = "Fully qualified service name."), + ), responses( - ignore_return_type = true, - response( - status = "202", - description = "Accepted", - content = "okapi_operation::Empty", - ), - from_type = "MetaApiError", + (status = 202, description = "State modification request accepted and will be applied asynchronously"), + MetaApiError ) )] pub async fn modify_service_state( State(state): State>, Path(service_name): Path, - #[request_body(required = true)] Json(ModifyServiceStateRequest { + Json(ModifyServiceStateRequest { version, object_key, new_state, diff --git a/crates/admin/src/rest_api/subscriptions.rs b/crates/admin/src/rest_api/subscriptions.rs index 0159fa84d3..087a9ff83f 100644 --- a/crates/admin/src/rest_api/subscriptions.rs +++ b/crates/admin/src/rest_api/subscriptions.rs @@ -18,33 +18,30 @@ use axum::extract::Query; use axum::extract::{Path, State}; use axum::http::StatusCode; use axum::{Json, http}; -use okapi_operation::*; use restate_errors::warn_it; use restate_types::identifiers::SubscriptionId; use restate_types::schema::registry::MetadataService; -/// Create subscription. -#[openapi( - summary = "Create subscription", - description = "Create subscription.", +/// Create subscription +/// +/// Creates a new subscription that connects an event source (e.g., a Kafka topic) to a Restate service handler. +/// For more information, see the [subscription documentation](https://docs.restate.dev/operate/invocation#managing-kafka-subscriptions). +#[utoipa::path( + post, + path = "/subscriptions", operation_id = "create_subscription", - tags = "subscription", - external_docs( - url = "https://docs.restate.dev/operate/invocation#managing-kafka-subscriptions" - ), + tag = "subscription", + request_body = CreateSubscriptionRequest, responses( - ignore_return_type = true, - response( - status = "201", - description = "Created", - content = "Json", - ), - from_type = "MetaApiError", + (status = 201, description = "Subscription created successfully", body = SubscriptionResponse, headers( + ("Location" = String, description = "URI of the created subscription") + )), + MetaApiError ) )] pub async fn create_subscription( State(state): State>, - #[request_body(required = true)] Json(payload): Json, + Json(payload): Json, ) -> Result where Metadata: MetadataService, @@ -65,17 +62,21 @@ where )) } -/// Get subscription. -#[openapi( - summary = "Get subscription", - description = "Get subscription", +/// Get subscription +/// +/// Returns the details of a specific subscription, including its source, sink, and configuration options. +#[utoipa::path( + get, + path = "/subscriptions/{subscription}", operation_id = "get_subscription", - tags = "subscription", - parameters(path( - name = "subscription", - description = "Subscription identifier", - schema = "std::string::String" - )) + tag = "subscription", + params( + ("subscription" = String, Path, description = "Subscription identifier"), + ), + responses( + (status = 200, description = "Subscription details including source, sink, and options", body = SubscriptionResponse), + MetaApiError + ) )] pub async fn get_subscription( State(state): State>, @@ -92,29 +93,17 @@ where Ok(SubscriptionResponse::from(subscription).into()) } -/// List subscriptions. -#[openapi( - summary = "List subscriptions", - description = "List all subscriptions.", +/// List subscriptions +/// +/// Returns a list of all registered subscriptions, optionally filtered by source or sink. +#[utoipa::path( + get, + path = "/subscriptions", operation_id = "list_subscriptions", - tags = "subscription", - parameters( - query( - name = "sink", - description = "Filter by the exact specified sink.", - required = false, - style = "simple", - allow_empty_value = false, - schema = "String", - ), - query( - name = "source", - description = "Filter by the exact specified source.", - required = false, - style = "simple", - allow_empty_value = false, - schema = "String", - ) + tag = "subscription", + params(ListSubscriptionsParams), + responses( + (status = 200, description = "List of subscriptions matching the filter criteria", body = ListSubscriptionsResponse) ) )] pub async fn list_subscriptions( @@ -147,25 +136,20 @@ where .into() } -/// Delete subscription. -#[openapi( - summary = "Delete subscription", - description = "Delete subscription.", +/// Delete subscription +/// +/// Deletes a subscription. This will stop events from the source from being forwarded to the sink. +#[utoipa::path( + delete, + path = "/subscriptions/{subscription}", operation_id = "delete_subscription", - tags = "subscription", - parameters(path( - name = "subscription", - description = "Subscription identifier", - schema = "std::string::String" - )), + tag = "subscription", + params( + ("subscription" = String, Path, description = "Subscription identifier"), + ), responses( - ignore_return_type = true, - response( - status = "202", - description = "Accepted", - content = "okapi_operation::Empty", - ), - from_type = "MetaApiError", + (status = 202, description = "Subscription deletion accepted and will be processed asynchronously"), + MetaApiError ) )] pub async fn delete_subscription( diff --git a/crates/admin/src/rest_api/version.rs b/crates/admin/src/rest_api/version.rs index 020be98c0f..7651e7e507 100644 --- a/crates/admin/src/rest_api/version.rs +++ b/crates/admin/src/rest_api/version.rs @@ -9,7 +9,6 @@ // by the Apache License, Version 2.0. use axum::Json; -use okapi_operation::*; use restate_admin_rest_model::version::{AdminApiVersion, VersionInformation}; use restate_core::TaskCenter; use restate_types::config::Configuration; @@ -22,12 +21,17 @@ use restate_types::config::Configuration; pub const MIN_ADMIN_API_VERSION: AdminApiVersion = AdminApiVersion::V3; pub const MAX_ADMIN_API_VERSION: AdminApiVersion = AdminApiVersion::V3; -/// Version information endpoint -#[openapi( - summary = "Admin version information", - description = "Obtain admin version information.", +/// Get version information +/// +/// Returns the server version, supported Admin API versions, and the advertised ingress endpoint. +#[utoipa::path( + get, + path = "/version", operation_id = "version", - tags = "version" + tag = "version", + responses( + (status = 200, description = "Server version information including supported API versions and ingress endpoint.", body = VersionInformation) + ) )] pub async fn version() -> Json { Json(VersionInformation { diff --git a/crates/admin/src/service.rs b/crates/admin/src/service.rs index 905dc6040a..37eb7ff0e3 100644 --- a/crates/admin/src/service.rs +++ b/crates/admin/src/service.rs @@ -45,7 +45,6 @@ pub struct AdminService ingestion_client: IngestionClient, schema_registry: SchemaRegistry, invocation_client: Invocations, - #[cfg(feature = "storage-query")] query_context: Option, #[cfg(feature = "metadata-api")] metadata_writer: MetadataWriter, @@ -76,12 +75,10 @@ where TelemetryClient(telemetry_http_client), ), invocation_client, - #[cfg(feature = "storage-query")] query_context: None, } } - #[cfg(feature = "storage-query")] pub fn with_query_context( self, query_context: restate_storage_query_datafusion::context::QueryContext, @@ -102,17 +99,11 @@ where self.schema_registry, self.invocation_client, self.ingestion_client, + self.query_context, ); let router = axum::Router::new(); - #[cfg(feature = "storage-query")] - let router = if let Some(query_context) = self.query_context { - router.merge(crate::storage_query::router(query_context)) - } else { - router - }; - #[cfg(feature = "metadata-api")] let router = router.merge(crate::metadata_api::router( self.metadata_writer.raw_metadata_store_client(), diff --git a/crates/admin/src/state.rs b/crates/admin/src/state.rs index 4a8cbbffaa..6b555db545 100644 --- a/crates/admin/src/state.rs +++ b/crates/admin/src/state.rs @@ -10,6 +10,7 @@ use restate_core::network::TransportConnect; use restate_ingestion_client::IngestionClient; +use restate_storage_query_datafusion::context::QueryContext; use restate_types::schema::registry::SchemaRegistry; use restate_wal_protocol::Envelope; @@ -18,6 +19,8 @@ pub struct AdminServiceState, pub invocation_client: Invocations, pub ingestion_client: IngestionClient, + // Some value if the query endpoint is activated + pub query_context: Option, } impl @@ -29,11 +32,13 @@ where schema_registry: SchemaRegistry, invocation_client: Invocations, ingestion_client: IngestionClient, + query_context: Option, ) -> Self { Self { schema_registry, invocation_client, ingestion_client, + query_context, } } } diff --git a/crates/admin/src/storage_query/error.rs b/crates/admin/src/storage_query/error.rs deleted file mode 100644 index 625fa15c6a..0000000000 --- a/crates/admin/src/storage_query/error.rs +++ /dev/null @@ -1,80 +0,0 @@ -// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. -// All rights reserved. -// -// Use of this software is governed by the Business Source License -// included in the LICENSE file. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0. - -use axum::Json; -use axum::http::StatusCode; -use axum::response::{IntoResponse, Response}; -use datafusion::error::DataFusionError; -use okapi_operation::anyhow::Error; -use okapi_operation::okapi::map; -use okapi_operation::okapi::openapi3::Responses; -use okapi_operation::{Components, ToMediaTypes, ToResponses, okapi}; -use schemars::JsonSchema; -use serde::Serialize; - -/// This error is used by handlers to propagate API errors, -/// and later converted to a response through the IntoResponse implementation -#[derive(Debug, thiserror::Error)] -pub enum StorageQueryError { - #[error("datafusion failed: {0}")] - DataFusion(#[from] DataFusionError), -} - -/// # Error description response -/// -/// Error details of the response -#[derive(Debug, Serialize, JsonSchema)] -struct ErrorDescriptionResponse { - message: String, -} - -impl IntoResponse for StorageQueryError { - fn into_response(self) -> Response { - let status_code = StatusCode::INTERNAL_SERVER_ERROR; - - ( - status_code, - Json(ErrorDescriptionResponse { - message: self.to_string(), - }), - ) - .into_response() - } -} - -impl ToResponses for StorageQueryError { - fn generate(components: &mut Components) -> Result { - let error_media_type = - as ToMediaTypes>::generate(components)?; - Ok(Responses { - responses: map! { - "400".into() => okapi::openapi3::RefOr::Object( - okapi::openapi3::Response { content: error_media_type.clone(), ..Default::default() } - ), - "403".into() => okapi::openapi3::RefOr::Object( - okapi::openapi3::Response { content: error_media_type.clone(), ..Default::default() } - ), - "404".into() => okapi::openapi3::RefOr::Object( - okapi::openapi3::Response { content: error_media_type.clone(), ..Default::default() } - ), - "409".into() => okapi::openapi3::RefOr::Object( - okapi::openapi3::Response { content: error_media_type.clone(), ..Default::default() } - ), - "500".into() => okapi::openapi3::RefOr::Object( - okapi::openapi3::Response { content: error_media_type.clone(), ..Default::default() } - ), - "503".into() => okapi::openapi3::RefOr::Object( - okapi::openapi3::Response { content: error_media_type, ..Default::default() } - ) - }, - ..Default::default() - }) - } -} diff --git a/crates/admin/src/storage_query/mod.rs b/crates/admin/src/storage_query/mod.rs deleted file mode 100644 index 83626c2690..0000000000 --- a/crates/admin/src/storage_query/mod.rs +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH. -// All rights reserved. -// -// Use of this software is governed by the Business Source License -// included in the LICENSE file. -// -// As of the Change Date specified in that file, in accordance with -// the Business Source License, use of this software will be governed -// by the Apache License, Version 2.0. - -mod error; -mod query; - -use axum::{Router, routing::post}; -use std::sync::Arc; - -use restate_storage_query_datafusion::context::QueryContext; - -#[derive(Clone)] -pub struct QueryServiceState { - pub query_context: QueryContext, -} - -pub fn router(query_context: QueryContext) -> Router { - let query_state = Arc::new(QueryServiceState { query_context }); - - // Setup the router - axum::Router::new() - .route("/query", post(query::query)) - .with_state(query_state) -} diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index c25fccea62..826844131a 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -27,7 +27,7 @@ kafka-oidc = ["restate-worker/kafka-oidc"] [dependencies] restate-workspace-hack = { workspace = true } -restate-admin = { workspace = true, features = ["storage-query"]} +restate-admin = { workspace = true } restate-bifrost = { workspace = true, features = ["local-loglet", "replicated-loglet"] } restate-core = { workspace = true } restate-futures-util = { workspace = true } diff --git a/crates/serde-util/Cargo.toml b/crates/serde-util/Cargo.toml index 47391b7c05..e18b182e0c 100644 --- a/crates/serde-util/Cargo.toml +++ b/crates/serde-util/Cargo.toml @@ -10,6 +10,7 @@ publish = false [features] default = [] schema = ["dep:schemars", "dep:serde_json"] +utoipa-schema = ["dep:utoipa"] proto = ["dep:prost", "dep:bytes"] [dependencies] @@ -24,6 +25,7 @@ schemars = { workspace = true, optional = true } serde = { workspace = true } serde_json = { workspace = true, optional = true } serde_with = { workspace = true } +utoipa = { workspace = true, optional = true } [dev-dependencies] serde_json = { workspace = true } diff --git a/crates/serde-util/src/header_map.rs b/crates/serde-util/src/header_map.rs index 4759996692..5d4fd8248b 100644 --- a/crates/serde-util/src/header_map.rs +++ b/crates/serde-util/src/header_map.rs @@ -20,6 +20,8 @@ use std::fmt; #[derive(Debug, Default, Clone)] #[cfg_attr(feature = "schema", derive(schemars::JsonSchema))] #[cfg_attr(feature = "schema", schemars(transparent))] +#[cfg_attr(feature = "utoipa-schema", derive(utoipa::ToSchema))] +#[cfg_attr(feature = "utoipa-schema", schema(value_type = HashMap))] pub struct SerdeableHeaderHashMap( #[cfg_attr(feature = "schema", schemars(with = "HashMap"))] HashMap, diff --git a/crates/types/Cargo.toml b/crates/types/Cargo.toml index 64fea4a09d..2b13bb24e4 100644 --- a/crates/types/Cargo.toml +++ b/crates/types/Cargo.toml @@ -10,6 +10,7 @@ publish = false [features] default = [] schemars = ["dep:schemars", "restate-serde-util/schema", "restate-time-util/schema", "restate-clock/schemars"] +utoipa-schema = ["dep:utoipa",] unsafe-mutable-config = [] # Enables the tokio console subscriber and exposes its configuration console = [] @@ -98,6 +99,7 @@ tonic = { workspace = true } tracing = { workspace = true } typed-builder = { workspace = true } ulid = { workspace = true } +utoipa = { workspace = true, optional = true, features = ["time"] } xxhash-rust = { workspace = true, features = ["xxh3"] } [dev-dependencies] diff --git a/crates/types/src/identifiers.rs b/crates/types/src/identifiers.rs index d4ef656b43..2ef689526b 100644 --- a/crates/types/src/identifiers.rs +++ b/crates/types/src/identifiers.rs @@ -21,11 +21,10 @@ use bytes::Bytes; use bytestring::ByteString; use generic_array::ArrayLength; use rand::RngCore; +use restate_encoding::{BilrostNewType, NetSerde}; use sha2::{Digest, Sha256}; use ulid::Ulid; -use restate_encoding::{BilrostNewType, NetSerde}; - use crate::base62_util::{base62_encode_fixed_width_u128, base62_max_length_for_type}; use crate::errors::IdDecodeError; use crate::id_util::IdResourceType; @@ -786,6 +785,44 @@ impl schemars::JsonSchema for LambdaARN { } } +#[cfg(feature = "utoipa-schema")] +mod utoipa_schema { + use crate::identifiers::{InvocationId, LambdaARN}; + use std::borrow::Cow; + use utoipa::openapi::{Object, RefOr, Schema, SchemaFormat, Type}; + use utoipa::{PartialSchema, ToSchema}; + + impl ToSchema for LambdaARN { + fn name() -> Cow<'static, str> { + "LambdaARN".into() + } + } + + impl PartialSchema for LambdaARN { + fn schema() -> RefOr { + Schema::Object( + Object::builder() + .schema_type(Type::String) + .format(Some(SchemaFormat::Custom("arn".to_owned()))) + .build(), + ) + .into() + } + } + + impl ToSchema for InvocationId { + fn name() -> Cow<'static, str> { + String::name() + } + } + + impl PartialSchema for InvocationId { + fn schema() -> RefOr { + String::schema() + } + } +} + impl Display for LambdaARN { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { self.arn.fmt(f) @@ -961,6 +998,8 @@ macro_rules! ulid_backed_id { ::restate_encoding::BilrostAs )] #[bilrost_as([< $res_name IdMessage >])] + #[cfg_attr(feature = "utoipa-schema", derive(::utoipa::ToSchema))] + #[cfg_attr(feature = "utoipa-schema", schema(value_type = String))] pub struct [< $res_name Id >](pub(crate) Ulid); impl [< $res_name Id >] { diff --git a/crates/types/src/invocation/mod.rs b/crates/types/src/invocation/mod.rs index a6e55be717..2f98d70069 100644 --- a/crates/types/src/invocation/mod.rs +++ b/crates/types/src/invocation/mod.rs @@ -37,6 +37,7 @@ pub use opentelemetry::trace::TraceId; #[derive(Eq, Hash, PartialEq, Clone, Copy, Debug, serde::Serialize, serde::Deserialize)] #[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "utoipa-schema", derive(utoipa::ToSchema))] pub enum ServiceType { Service, VirtualObject, diff --git a/crates/types/src/net/address.rs b/crates/types/src/net/address.rs index 977eb7b5bd..a343e6c12d 100644 --- a/crates/types/src/net/address.rs +++ b/crates/types/src/net/address.rs @@ -41,6 +41,7 @@ pub trait GrpcPort {} /// HTTP Ingress Service 8080 #[derive(Debug, Clone, Default, PartialEq, Eq, Hash, Serialize, Deserialize)] #[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "utoipa-schema", derive(utoipa::ToSchema))] pub struct HttpIngressPort; impl ListenerPort for HttpIngressPort { const NAME: &'static str = "http-ingress-server"; @@ -307,6 +308,36 @@ impl schemars::JsonSchema for AdvertisedAddress

{ } } +#[cfg(feature = "utoipa-schema")] +mod utoipa_schema { + use crate::net::address::{AdvertisedAddress, ListenerPort}; + use std::borrow::Cow; + use utoipa::__dev::ComposeSchema; + use utoipa::ToSchema; + use utoipa::openapi::{Object, RefOr, Schema, Type}; + + impl ToSchema for AdvertisedAddress

{ + fn name() -> Cow<'static, str> { + format!("AdvertisedAddress-{}", P::NAME).into() + } + } + + impl ComposeSchema for AdvertisedAddress

{ + fn compose(_new_generics: Vec>) -> RefOr { + Schema::Object(Object::builder().schema_type(Type::String).title(Some("advertised address")).description(Some(format!( + "An externally accessible URI address for {}. This can be set to unix:restate-data/{} \ + to advertise the automatically created unix-socket instead of using tcp if needed", + P::NAME, + P::UDS_NAME + ))).examples(vec![ + format!("http//127.0.0.1:{}/", P::DEFAULT_PORT), + "https://my-host/".to_owned(), + format!("unix:/data/restate-data/{}", P::UDS_NAME), + ]).build()).into() + } + } +} + impl Default for AdvertisedAddress

{ fn default() -> Self { Self { diff --git a/crates/types/src/node_id.rs b/crates/types/src/node_id.rs index 6eec61b6e7..419ed21f67 100644 --- a/crates/types/src/node_id.rs +++ b/crates/types/src/node_id.rs @@ -175,6 +175,7 @@ impl From for GenerationalNodeId { )] #[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] #[cfg_attr(feature = "schemars", schemars(transparent))] +#[cfg_attr(feature = "utoipa-schema", derive(utoipa::ToSchema))] #[display("N{}", _0)] #[debug("N{}", _0)] pub struct PlainNodeId(u32); diff --git a/crates/types/src/schema/deployment.rs b/crates/types/src/schema/deployment.rs index b7e79dc415..327c2369da 100644 --- a/crates/types/src/schema/deployment.rs +++ b/crates/types/src/schema/deployment.rs @@ -28,10 +28,12 @@ use serde::{Deserialize, Serialize}; use serde_with::serde_as; #[derive(Debug, Copy, Clone, Eq, PartialEq, Serialize, Deserialize, derive_more::Display)] -#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "utoipa-schema", derive(utoipa::ToSchema))] pub enum ProtocolType { + /// Request/Response #[display("Request/Response")] RequestResponse, + /// Bidirectional Stream #[display("Bidirectional Stream")] BidiStream, } @@ -119,7 +121,7 @@ impl Deployment { /// Lambda compression #[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] -#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "utoipa-schema", derive(utoipa::ToSchema))] pub enum EndpointLambdaCompression { Zstd, } diff --git a/crates/types/src/schema/info.rs b/crates/types/src/schema/info.rs index 70e3074386..9b8595c41f 100644 --- a/crates/types/src/schema/info.rs +++ b/crates/types/src/schema/info.rs @@ -12,7 +12,7 @@ use codederror::Code; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] -#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "utoipa-schema", derive(utoipa::ToSchema))] pub struct Info { #[serde(default, skip_serializing_if = "Option::is_none")] code: Option, diff --git a/crates/types/src/schema/invocation_target.rs b/crates/types/src/schema/invocation_target.rs index 2ffd450f42..e754735c6a 100644 --- a/crates/types/src/schema/invocation_target.rs +++ b/crates/types/src/schema/invocation_target.rs @@ -63,7 +63,7 @@ pub trait InvocationTargetResolver { } #[derive(Debug, Clone, Copy, Default, PartialEq, Serialize, Deserialize)] -#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "utoipa-schema", derive(utoipa::ToSchema))] pub enum OnMaxAttempts { /// Pause the invocation when max attempts are reached. #[default] diff --git a/crates/types/src/schema/service.rs b/crates/types/src/schema/service.rs index 0c9782bcff..1fd10e9c1f 100644 --- a/crates/types/src/schema/service.rs +++ b/crates/types/src/schema/service.rs @@ -44,8 +44,9 @@ pub trait ServiceMetadataResolver { fn list_service_names(&self) -> Vec; } +/// Metadata of a registered service. #[derive(Debug, Clone, Serialize, Deserialize)] -#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "utoipa-schema", derive(utoipa::ToSchema))] pub struct ServiceMetadata { /// # Name /// @@ -61,7 +62,7 @@ pub struct ServiceMetadata { /// /// Handlers for this service. #[serde(with = "serde_with::As::")] - #[cfg_attr(feature = "schemars", schemars(with = "Vec"))] + #[cfg_attr(feature = "utoipa-schema", schema(value_type = Vec))] pub handlers: HashMap, /// # Documentation @@ -103,7 +104,6 @@ pub struct ServiceMetadata { with = "serde_with::As::", default = "default_idempotency_retention" )] - #[cfg_attr(feature = "schemars", schemars(with = "String" /* TODO(slinkydeveloper) https://github.com/restatedev/restate/issues/3766 */))] pub idempotency_retention: Duration, /// # Workflow completion retention @@ -116,7 +116,6 @@ pub struct ServiceMetadata { skip_serializing_if = "Option::is_none", default )] - #[cfg_attr(feature = "schemars", schemars(with = "Option" /* TODO(slinkydeveloper) https://github.com/restatedev/restate/issues/3766 */))] pub workflow_completion_retention: Option, /// # Journal retention @@ -132,7 +131,6 @@ pub struct ServiceMetadata { skip_serializing_if = "Option::is_none", default )] - #[cfg_attr(feature = "schemars", schemars(with = "Option" /* TODO(slinkydeveloper) https://github.com/restatedev/restate/issues/3766 */))] pub journal_retention: Option, /// # Inactivity timeout @@ -151,7 +149,6 @@ pub struct ServiceMetadata { with = "serde_with::As::", default = "default_inactivity_timeout" )] - #[cfg_attr(feature = "schemars", schemars(with = "String" /* TODO(slinkydeveloper) https://github.com/restatedev/restate/issues/3766 */))] pub inactivity_timeout: Duration, /// # Abort timeout @@ -171,7 +168,6 @@ pub struct ServiceMetadata { with = "serde_with::As::", default = "default_abort_timeout" )] - #[cfg_attr(feature = "schemars", schemars(with = "String" /* TODO(slinkydeveloper) https://github.com/restatedev/restate/issues/3766 */))] pub abort_timeout: Duration, /// # Enable lazy state @@ -196,14 +192,6 @@ pub struct ServiceMetadata { pub info: Vec, } -impl restate_serde_util::MapAsVecItem for ServiceMetadata { - type Key = String; - - fn key(&self) -> Self::Key { - self.name.clone() - } -} - fn default_idempotency_retention() -> Duration { DEFAULT_IDEMPOTENCY_RETENTION } @@ -218,7 +206,7 @@ fn default_abort_timeout() -> Duration { /// # Service retry policy #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "utoipa-schema", derive(utoipa::ToSchema))] pub struct ServiceRetryPolicyMetadata { /// # Initial Interval /// @@ -229,7 +217,6 @@ pub struct ServiceRetryPolicyMetadata { default = "default_initial_interval", with = "serde_with::As::" )] - #[cfg_attr(feature = "schemars", schemars(with = "String" /* TODO(slinkydeveloper) https://github.com/restatedev/restate/issues/3766 */))] pub initial_interval: Duration, /// # Factor @@ -242,6 +229,8 @@ pub struct ServiceRetryPolicyMetadata { /// /// Number of maximum attempts (including the initial) before giving up. Infinite retries if unset. No retries if set to 1. #[serde(default)] + // todo remove once utoipa supports NonZero integers (https://github.com/juhaku/utoipa/pull/1334) + #[cfg_attr(feature = "utoipa-schema", schema(value_type = Option, minimum = 1))] pub max_attempts: Option, /// # Max interval @@ -250,7 +239,6 @@ pub struct ServiceRetryPolicyMetadata { /// /// Can be configured using the [`jiff::fmt::friendly`](https://docs.rs/jiff/latest/jiff/fmt/friendly/index.html) format or ISO8601, for example `5 hours`. #[serde(default, with = "serde_with::As::>")] - #[cfg_attr(feature = "schemars", schemars(with = "Option" /* TODO(slinkydeveloper) https://github.com/restatedev/restate/issues/3766 */))] pub max_interval: Option, /// # On max attempts @@ -282,7 +270,7 @@ fn default_exponentiation_factor() -> f32 { // This type is used only for exposing the handler metadata, and not internally. See [ServiceAndHandlerType]. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "utoipa-schema", derive(utoipa::ToSchema))] pub enum HandlerMetadataType { Exclusive, Shared, @@ -305,8 +293,9 @@ impl From for Option { } } +/// Handler metadata #[derive(Debug, Clone, Serialize, Deserialize)] -#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "utoipa-schema", derive(utoipa::ToSchema))] pub struct HandlerMetadata { /// # Name /// @@ -340,7 +329,6 @@ pub struct HandlerMetadata { skip_serializing_if = "Option::is_none", default )] - #[cfg_attr(feature = "schemars", schemars(with = "Option" /* TODO(slinkydeveloper) https://github.com/restatedev/restate/issues/3766 */))] pub idempotency_retention: Option, /// # Journal retention @@ -358,7 +346,6 @@ pub struct HandlerMetadata { skip_serializing_if = "Option::is_none", default )] - #[cfg_attr(feature = "schemars", schemars(with = "Option" /* TODO(slinkydeveloper) https://github.com/restatedev/restate/issues/3766 */))] pub journal_retention: Option, /// # Inactivity timeout @@ -378,7 +365,6 @@ pub struct HandlerMetadata { skip_serializing_if = "Option::is_none", default )] - #[cfg_attr(feature = "schemars", schemars(with = "Option" /* TODO(slinkydeveloper) https://github.com/restatedev/restate/issues/3766 */))] pub inactivity_timeout: Option, /// # Abort timeout @@ -399,7 +385,6 @@ pub struct HandlerMetadata { skip_serializing_if = "Option::is_none", default )] - #[cfg_attr(feature = "schemars", schemars(with = "Option" /* TODO(slinkydeveloper) https://github.com/restatedev/restate/issues/3766 */))] pub abort_timeout: Option, /// # Enable lazy state @@ -463,7 +448,7 @@ impl restate_serde_util::MapAsVecItem for HandlerMetadata { /// # Handler retry policy overrides #[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq)] -#[cfg_attr(feature = "schemars", derive(schemars::JsonSchema))] +#[cfg_attr(feature = "utoipa-schema", derive(utoipa::ToSchema))] pub struct HandlerRetryPolicyMetadata { /// # Initial Interval /// @@ -475,7 +460,6 @@ pub struct HandlerRetryPolicyMetadata { skip_serializing_if = "Option::is_none", with = "serde_with::As::>" )] - #[cfg_attr(feature = "schemars", schemars(with = "Option" /* TODO(slinkydeveloper) https://github.com/restatedev/restate/issues/3766 */))] pub initial_interval: Option, /// # Factor @@ -487,7 +471,8 @@ pub struct HandlerRetryPolicyMetadata { /// # Max attempts /// /// Number of maximum attempts (including the initial) before giving up. Infinite retries if unset. No retries if set to 1. - #[serde(default, skip_serializing_if = "Option::is_none")] + // todo remove once utoipa supports NonZero integers (https://github.com/juhaku/utoipa/pull/1334) + #[cfg_attr(feature = "utoipa-schema", schema(value_type = Option, minimum = 1))] pub max_attempts: Option, /// # Max interval @@ -500,7 +485,6 @@ pub struct HandlerRetryPolicyMetadata { skip_serializing_if = "Option::is_none", with = "serde_with::As::>" )] - #[cfg_attr(feature = "schemars", schemars(with = "Option" /* TODO(slinkydeveloper) https://github.com/restatedev/restate/issues/3766 */))] pub max_interval: Option, /// # On max attempts diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 822642b2f0..9c99f4b560 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -33,8 +33,7 @@ aws-smithy-http-client = { version = "1", default-features = false, features = [ aws-smithy-runtime = { version = "1", default-features = false, features = ["client", "default-https-client", "rt-tokio", "tls-rustls"] } aws-smithy-runtime-api = { version = "1", features = ["client", "http-02x", "http-auth", "test-util"] } aws-smithy-types = { version = "1", default-features = false, features = ["byte-stream-poll-next", "http-body-0-4-x", "http-body-1-x", "rt-tokio", "test-util"] } -axum = { version = "0.8", features = ["http2"] } -axum-core = { version = "0.5", default-features = false, features = ["tracing"] } +axum = { version = "0.8", default-features = false, features = ["http1", "http2", "json", "query", "tokio"] } bilrost = { version = "0.1014", features = ["bytestring", "smallvec"] } byteorder = { version = "1" } bytes = { version = "1", features = ["serde"] } @@ -112,7 +111,6 @@ ring = { version = "0.17", features = ["std"] } rustls = { version = "0.23", default-features = false, features = ["logging", "prefer-post-quantum", "ring", "std", "tls12"] } rustls-pki-types = { version = "1", features = ["std"] } rustls-webpki = { version = "0.103", default-features = false, features = ["aws-lc-rs", "ring", "std"] } -schemars = { version = "0.8", features = ["bytes", "enumset", "preserve_order"] } semver = { version = "1", features = ["serde"] } serde = { version = "1", features = ["alloc", "derive", "rc"] } serde_core = { version = "1", features = ["alloc", "rc"] } @@ -129,7 +127,7 @@ tokio-util = { version = "0.7", features = ["codec", "io-util", "net", "rt", "ti toml_datetime = { version = "0.6", default-features = false, features = ["serde"] } toml_edit = { version = "0.22", features = ["serde"] } tonic = { version = "0.14", features = ["gzip", "tls-native-roots", "tls-ring", "zstd"] } -tower = { version = "0.5", default-features = false, features = ["balance", "buffer", "limit", "load-shed", "log", "retry", "timeout"] } +tower = { version = "0.5", default-features = false, features = ["balance", "buffer", "limit", "load-shed", "retry", "timeout"] } tower-http = { version = "0.6", features = ["cors", "follow-redirect", "map-response-body", "normalize-path", "trace"] } tracing = { version = "0.1", features = ["log", "max_level_trace", "release_max_level_debug"] } tracing-core = { version = "0.1" } @@ -137,6 +135,7 @@ tracing-log = { version = "0.2" } tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "parking_lot"] } typenum = { version = "1", default-features = false, features = ["const-generics"] } ulid = { version = "1", features = ["serde"] } +utoipa = { version = "5", features = ["axum_extras", "time"] } uuid = { version = "1", features = ["js", "serde", "v4", "v7"] } xxhash-rust = { version = "0.8", default-features = false, features = ["std", "xxh3"] } zerocopy = { version = "0.8", default-features = false, features = ["derive", "simd"] } @@ -163,8 +162,7 @@ aws-smithy-http-client = { version = "1", default-features = false, features = [ aws-smithy-runtime = { version = "1", default-features = false, features = ["client", "default-https-client", "rt-tokio", "tls-rustls"] } aws-smithy-runtime-api = { version = "1", features = ["client", "http-02x", "http-auth", "test-util"] } aws-smithy-types = { version = "1", default-features = false, features = ["byte-stream-poll-next", "http-body-0-4-x", "http-body-1-x", "rt-tokio", "test-util"] } -axum = { version = "0.8", features = ["http2"] } -axum-core = { version = "0.5", default-features = false, features = ["tracing"] } +axum = { version = "0.8", default-features = false, features = ["http1", "http2", "json", "query", "tokio"] } bilrost = { version = "0.1014", features = ["bytestring", "smallvec"] } byteorder = { version = "1" } bytes = { version = "1", features = ["serde"] } @@ -244,7 +242,6 @@ ring = { version = "0.17", features = ["std"] } rustls = { version = "0.23", default-features = false, features = ["logging", "prefer-post-quantum", "ring", "std", "tls12"] } rustls-pki-types = { version = "1", features = ["std"] } rustls-webpki = { version = "0.103", default-features = false, features = ["aws-lc-rs", "ring", "std"] } -schemars = { version = "0.8", features = ["bytes", "enumset", "preserve_order"] } semver = { version = "1", features = ["serde"] } serde = { version = "1", features = ["alloc", "derive", "rc"] } serde_core = { version = "1", features = ["alloc", "rc"] } @@ -261,7 +258,7 @@ tokio-util = { version = "0.7", features = ["codec", "io-util", "net", "rt", "ti toml_datetime = { version = "0.6", default-features = false, features = ["serde"] } toml_edit = { version = "0.22", features = ["serde"] } tonic = { version = "0.14", features = ["gzip", "tls-native-roots", "tls-ring", "zstd"] } -tower = { version = "0.5", default-features = false, features = ["balance", "buffer", "limit", "load-shed", "log", "retry", "timeout"] } +tower = { version = "0.5", default-features = false, features = ["balance", "buffer", "limit", "load-shed", "retry", "timeout"] } tower-http = { version = "0.6", features = ["cors", "follow-redirect", "map-response-body", "normalize-path", "trace"] } tracing = { version = "0.1", features = ["log", "max_level_trace", "release_max_level_debug"] } tracing-core = { version = "0.1" } @@ -269,6 +266,8 @@ tracing-log = { version = "0.2" } tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "parking_lot"] } typenum = { version = "1", default-features = false, features = ["const-generics"] } ulid = { version = "1", features = ["serde"] } +utoipa = { version = "5", features = ["axum_extras", "time"] } +utoipa-gen = { version = "5", default-features = false, features = ["axum_extras", "time"] } uuid = { version = "1", features = ["js", "serde", "v4", "v7"] } xxhash-rust = { version = "0.8", default-features = false, features = ["std", "xxh3"] } zerocopy = { version = "0.8", default-features = false, features = ["derive", "simd"] }