From 68d00456a279e5513ccb7266929c3f2165930432 Mon Sep 17 00:00:00 2001 From: Shubham Mishra Date: Fri, 8 Aug 2025 00:35:08 +0530 Subject: [PATCH] print-plan endpoint & accordingly change publish endpoint --- Cargo.lock | 1 + crates/client-api-messages/src/name.rs | 19 +++ crates/client-api/src/lib.rs | 16 ++- crates/client-api/src/routes/database.rs | 136 +++++++++++++++++- crates/core/src/host/host_controller.rs | 73 +++++++++- crates/core/src/host/mod.rs | 4 +- crates/core/src/host/module_host.rs | 9 +- crates/core/src/host/v8/mod.rs | 2 + .../src/host/wasm_common/module_host_actor.rs | 22 ++- crates/schema/src/auto_migrate.rs | 111 ++++++++++++-- crates/schema/src/auto_migrate/formatter.rs | 7 + crates/standalone/Cargo.toml | 1 + crates/standalone/src/lib.rs | 33 ++++- crates/testing/src/modules.rs | 2 + 14 files changed, 403 insertions(+), 33 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e6b4f6f1b5d..279208d7623 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6071,6 +6071,7 @@ dependencies = [ "spacetimedb-datastore", "spacetimedb-lib", "spacetimedb-paths", + "spacetimedb-schema", "spacetimedb-table", "tempfile", "thiserror 1.0.69", diff --git a/crates/client-api-messages/src/name.rs b/crates/client-api-messages/src/name.rs index 1966297cf26..268ffd9a246 100644 --- a/crates/client-api-messages/src/name.rs +++ b/crates/client-api-messages/src/name.rs @@ -106,6 +106,25 @@ pub enum PublishResult { PermissionDenied { name: DatabaseName }, } +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub enum MigrationPolicy { + Compatible, + BreakClients, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub enum PrettyPrintStyle { + AnsiColor, + NoColor, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct PrintPlanResult { + pub migrate_plan: Box, + pub break_clients: bool, + pub token: spacetimedb_lib::Hash, +} + #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum DnsLookupResponse { /// The lookup was successful and the domain and identity are returned. diff --git a/crates/client-api/src/lib.rs b/crates/client-api/src/lib.rs index ac82a3e142d..c96724fa05b 100644 --- a/crates/client-api/src/lib.rs +++ b/crates/client-api/src/lib.rs @@ -7,7 +7,7 @@ use http::StatusCode; use spacetimedb::client::ClientActorIndex; use spacetimedb::energy::{EnergyBalance, EnergyQuanta}; -use spacetimedb::host::{HostController, ModuleHost, NoSuchModule, UpdateDatabaseResult}; +use spacetimedb::host::{HostController, MigratePlanResult, ModuleHost, NoSuchModule, UpdateDatabaseResult}; use spacetimedb::identity::{AuthCtx, Identity}; use spacetimedb::messages::control_db::{Database, HostType, Node, Replica}; use spacetimedb::sql; @@ -15,6 +15,7 @@ use spacetimedb_client_api_messages::http::{SqlStmtResult, SqlStmtStats}; use spacetimedb_client_api_messages::name::{DomainName, InsertDomainResult, RegisterTldResult, SetDomainsResult, Tld}; use spacetimedb_lib::{ProductTypeElement, ProductValue}; use spacetimedb_paths::server::ModuleLogsDir; +use spacetimedb_schema::auto_migrate::{MigrationPolicy, PrettyPrintStyle}; use tokio::sync::watch; pub mod auth; @@ -134,9 +135,10 @@ impl Host { database: Database, host_type: HostType, program_bytes: Box<[u8]>, + policy: MigrationPolicy, ) -> anyhow::Result { self.host_controller - .update_module_host(database, host_type, self.replica_id, program_bytes) + .update_module_host(database, host_type, self.replica_id, program_bytes, policy) .await } } @@ -219,8 +221,11 @@ pub trait ControlStateWriteAccess: Send + Sync { &self, publisher: &Identity, spec: DatabaseDef, + policy: MigrationPolicy, ) -> anyhow::Result>; + async fn migrate_plan(&self, spec: DatabaseDef, style: PrettyPrintStyle) -> anyhow::Result; + async fn delete_database(&self, caller_identity: &Identity, database_identity: &Identity) -> anyhow::Result<()>; // Energy @@ -309,8 +314,13 @@ impl ControlStateWriteAccess for Arc { &self, identity: &Identity, spec: DatabaseDef, + policy: MigrationPolicy, ) -> anyhow::Result> { - (**self).publish_database(identity, spec).await + (**self).publish_database(identity, spec, policy).await + } + + async fn migrate_plan(&self, spec: DatabaseDef, style: PrettyPrintStyle) -> anyhow::Result { + (**self).migrate_plan(spec, style).await } async fn delete_database(&self, caller_identity: &Identity, database_identity: &Identity) -> anyhow::Result<()> { diff --git a/crates/client-api/src/routes/database.rs b/crates/client-api/src/routes/database.rs index 91a67f9c406..a48a169a423 100644 --- a/crates/client-api/src/routes/database.rs +++ b/crates/client-api/src/routes/database.rs @@ -20,16 +20,21 @@ use http::StatusCode; use serde::Deserialize; use spacetimedb::database_logger::DatabaseLogger; use spacetimedb::host::module_host::ClientConnectedError; -use spacetimedb::host::ReducerArgs; use spacetimedb::host::ReducerCallError; use spacetimedb::host::ReducerOutcome; use spacetimedb::host::UpdateDatabaseResult; +use spacetimedb::host::{MigratePlanResult, ReducerArgs}; use spacetimedb::identity::Identity; use spacetimedb::messages::control_db::{Database, HostType}; -use spacetimedb_client_api_messages::name::{self, DatabaseName, DomainName, PublishOp, PublishResult}; +use spacetimedb_client_api_messages::name::{ + self, DatabaseName, DomainName, MigrationPolicy, PrettyPrintStyle, PrintPlanResult, PublishOp, PublishResult, +}; use spacetimedb_lib::db::raw_def::v9::RawModuleDefV9; use spacetimedb_lib::identity::AuthCtx; use spacetimedb_lib::{sats, Timestamp}; +use spacetimedb_schema::auto_migrate::{ + MigrationPolicy as SchemaMigrationPolicy, MigrationToken, PrettyPrintStyle as AutoMigratePrettyPrintStyle, +}; use super::subscribe::{handle_websocket, HasWebSocketOptions}; @@ -469,6 +474,9 @@ pub struct PublishDatabaseQueryParams { #[serde(default)] clear: bool, num_replicas: Option, + // `Hash` of `MigrationToken` to be checked if `MigrationPolicy::BreakClients` is set. + token: Option, + policy: Option, } use std::env; @@ -496,7 +504,12 @@ fn allow_creation(auth: &SpacetimeAuth) -> Result<(), ErrorResponse> { pub async fn publish( State(ctx): State, Path(PublishDatabaseParams { name_or_identity }): Path, - Query(PublishDatabaseQueryParams { clear, num_replicas }): Query, + Query(PublishDatabaseQueryParams { + clear, + num_replicas, + token, + policy, + }): Query, Extension(auth): Extension, body: Bytes, ) -> axum::response::Result> { @@ -546,6 +559,21 @@ pub async fn publish( } }; + let policy: SchemaMigrationPolicy = match policy.unwrap_or(MigrationPolicy::Compatible) { + MigrationPolicy::BreakClients => { + if let Some(token) = token { + Ok(SchemaMigrationPolicy::BreakClients(token)) + } else { + Err(( + StatusCode::BAD_REQUEST, + "Migration policy is set to `BreakClients`, but no migration token was provided.", + )) + } + } + + MigrationPolicy::Compatible => Ok(SchemaMigrationPolicy::Compatible), + }?; + log::trace!("Publishing to the identity: {}", database_identity.to_hex()); let op = { @@ -587,6 +615,7 @@ pub async fn publish( num_replicas, host_type: HostType::Wasm, }, + policy, ) .await .map_err(log_and_500)?; @@ -614,6 +643,102 @@ pub async fn publish( })) } +#[derive(serde::Deserialize)] +pub struct PrintPlanParams { + name_or_identity: NameOrIdentity, +} + +#[derive(serde::Deserialize)] +pub struct PrintPlanQueryParams { + style: Option, +} + +pub async fn print_migration_plan( + State(ctx): State, + Path(PrintPlanParams { name_or_identity }): Path, + Query(PrintPlanQueryParams { style }): Query, + Extension(auth): Extension, + body: Bytes, +) -> axum::response::Result> { + // User should not be able to print migration plans for a database that they do not own + let database_identity = resolve_and_authenticate(&ctx, &name_or_identity, &auth).await?; + let style = style + .map(|s| match s { + PrettyPrintStyle::NoColor => AutoMigratePrettyPrintStyle::NoColor, + PrettyPrintStyle::AnsiColor => AutoMigratePrettyPrintStyle::AnsiColor, + }) + .unwrap_or_default(); + + let migrate_plan = ctx + .migrate_plan( + DatabaseDef { + database_identity, + program_bytes: body.into(), + num_replicas: None, + host_type: HostType::Wasm, + }, + style, + ) + .await + .map_err(log_and_500)?; + + match migrate_plan { + MigratePlanResult::Success { + old_module_hash, + new_module_hash, + breaks_client, + plan, + } => { + let token = MigrationToken { + database_identity, + old_module_hash, + new_module_hash, + } + .hash(); + + Ok(PrintPlanResult { + token, + migrate_plan: plan, + break_clients: breaks_client, + }) + } + MigratePlanResult::AutoMigrationError(e) => Err(( + StatusCode::BAD_REQUEST, + format!("Automatic migration is not possible: {e}"), + ) + .into()), + } + .map(axum::Json) +} + +/// Resolves the `NameOrIdentity` to a database identity and checks if the +/// `auth` identity owns the database. +async fn resolve_and_authenticate( + ctx: &S, + name_or_identity: &NameOrIdentity, + auth: &SpacetimeAuth, +) -> axum::response::Result { + let database_identity = name_or_identity.resolve(ctx).await?; + + let database = worker_ctx_find_database(ctx, &database_identity) + .await? + .ok_or(NO_SUCH_DATABASE)?; + + if database.owner_identity != auth.identity { + return Err(( + StatusCode::BAD_REQUEST, + format!( + "Identity does not own database, expected: {} got: {}", + database.owner_identity.to_hex(), + auth.identity.to_hex() + ), + ) + .into()); + } + + Ok(database_identity) +} + #[derive(Deserialize)] pub struct DeleteDatabaseParams { name_or_identity: NameOrIdentity, @@ -783,7 +908,8 @@ pub struct DatabaseRoutes { pub logs_get: MethodRouter, /// POST: /database/:name_or_identity/sql pub sql_post: MethodRouter, - + /// POST: /database/print-plan/:name_or_identity/sql + pub print_migration_plan: MethodRouter, /// GET: /database/: name_or_identity/unstable/timestamp pub timestamp_get: MethodRouter, } @@ -808,6 +934,7 @@ where schema_get: get(schema::), logs_get: get(logs::), sql_post: post(sql::), + print_migration_plan: post(print_migration_plan::), timestamp_get: get(get_timestamp::), } } @@ -835,6 +962,7 @@ where axum::Router::new() .route("/", self.root_post) + .route("/print-plan/:name_or_identity", self.print_migration_plan) .nest("/:name_or_identity", db_router) .route_layer(axum::middleware::from_fn_with_state(ctx, anon_auth_middleware::)) } diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index 5c9f08fff52..cf56f7c6ede 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -21,6 +21,7 @@ use async_trait::async_trait; use durability::{Durability, EmptyHistory}; use log::{info, trace, warn}; use parking_lot::Mutex; +use spacetimedb_data_structures::error_stream::ErrorStream; use spacetimedb_data_structures::map::IntMap; use spacetimedb_datastore::db_metrics::data_size::DATA_SIZE_METRICS; use spacetimedb_datastore::db_metrics::DB_METRICS; @@ -30,6 +31,7 @@ use spacetimedb_lib::{hash_bytes, Identity}; use spacetimedb_paths::server::{ReplicaDir, ServerDataDir}; use spacetimedb_paths::FromPathUnchecked; use spacetimedb_sats::hash::Hash; +use spacetimedb_schema::auto_migrate::{ponder_migrate, AutoMigrateError, MigrationPolicy, PrettyPrintStyle}; use spacetimedb_schema::def::ModuleDef; use spacetimedb_table::page_pool::PagePool; use std::future::Future; @@ -355,6 +357,7 @@ impl HostController { host_type: HostType, replica_id: u64, program_bytes: Box<[u8]>, + policy: MigrationPolicy, ) -> anyhow::Result { let program = Program { hash: hash_bytes(&program_bytes), @@ -404,6 +407,7 @@ impl HostController { this.runtimes.clone(), host_type, program, + policy, this.energy_monitor.clone(), this.unregister_fn(replica_id), this.db_cores.take(), @@ -418,6 +422,32 @@ impl HostController { Ok(update_result) } + pub async fn migrate_plan( + &self, + database: Database, + host_type: HostType, + replica_id: u64, + program_bytes: Box<[u8]>, + style: PrettyPrintStyle, + ) -> anyhow::Result { + let program = Program { + hash: hash_bytes(&program_bytes), + bytes: program_bytes, + }; + trace!( + "migrate plan {}/{}: genesis={} update-to={}", + database.database_identity, + replica_id, + database.initial_program, + program.hash + ); + + let guard = self.acquire_read_lock(replica_id).await; + let host = guard.as_ref().ok_or(NoSuchModule)?; + + host.migrate_plan(host_type, program, style).await + } + /// Start the host `replica_id` and conditionally update it. /// /// If the host was not initialized before, it is initialized with the @@ -503,6 +533,7 @@ impl HostController { this.runtimes.clone(), host_type, program, + MigrationPolicy::Compatible, this.energy_monitor.clone(), this.unregister_fn(replica_id), this.db_cores.take(), @@ -773,6 +804,7 @@ async fn update_module( module: &ModuleHost, program: Program, old_module_info: Arc, + policy: MigrationPolicy, ) -> anyhow::Result { let addr = db.database_identity(); match stored_program_hash(db)? { @@ -783,7 +815,7 @@ async fn update_module( UpdateDatabaseResult::NoUpdateNeeded } else { info!("updating `{}` from {} to {}", addr, stored, program.hash); - module.update_database(program, old_module_info).await? + module.update_database(program, old_module_info, policy).await? }; Ok(res) @@ -1016,11 +1048,13 @@ impl Host { /// otherwise it stays the same. /// /// Either way, the [`UpdateDatabaseResult`] is returned. + #[allow(clippy::too_many_arguments)] async fn update_module( &mut self, runtimes: Arc, host_type: HostType, program: Program, + policy: MigrationPolicy, energy_monitor: Arc, on_panic: impl Fn() + Send + Sync + 'static, core: JobCore, @@ -1043,7 +1077,8 @@ impl Host { // Get the old module info to diff against when building a migration plan. let old_module_info = self.module.borrow().info.clone(); - let update_result = update_module(&replica_ctx.relational_db, &module, program, old_module_info).await?; + let update_result = + update_module(&replica_ctx.relational_db, &module, program, old_module_info, policy).await?; trace!("update result: {update_result:?}"); // Only replace the module + scheduler if the update succeeded. // Otherwise, we want the database to continue running with the old state. @@ -1057,6 +1092,30 @@ impl Host { Ok(update_result) } + /// Generate a migration plan for the given `program`. + async fn migrate_plan( + &self, + host_type: HostType, + program: Program, + style: PrettyPrintStyle, + ) -> anyhow::Result { + let old_module = self.module.borrow().info.clone(); + + let module_def = extract_schema(program.bytes, host_type).await?; + + let res = match ponder_migrate(&old_module.module_def, &module_def) { + Ok(plan) => MigratePlanResult::Success { + old_module_hash: old_module.module_hash, + new_module_hash: program.hash, + breaks_client: plan.breaks_client(), + plan: plan.pretty_print(style)?.into(), + }, + Err(e) => MigratePlanResult::AutoMigrationError(e), + }; + + Ok(res) + } + fn db(&self) -> &RelationalDB { &self.replica_ctx.relational_db } @@ -1069,6 +1128,16 @@ impl Drop for Host { } } +pub enum MigratePlanResult { + Success { + old_module_hash: Hash, + new_module_hash: Hash, + plan: Box, + breaks_client: bool, + }, + AutoMigrationError(ErrorStream), +} + const STORAGE_METERING_INTERVAL: Duration = Duration::from_secs(15); /// Periodically collect gauge stats and update prometheus metrics. diff --git a/crates/core/src/host/mod.rs b/crates/core/src/host/mod.rs index 1434f50917c..beb72fc2f99 100644 --- a/crates/core/src/host/mod.rs +++ b/crates/core/src/host/mod.rs @@ -25,8 +25,8 @@ mod wasm_common; pub use disk_storage::DiskStorage; pub use host_controller::{ - extract_schema, DurabilityProvider, ExternalDurability, ExternalStorage, HostController, ProgramStorage, - ReducerCallResult, ReducerOutcome, StartSnapshotWatcher, + extract_schema, DurabilityProvider, ExternalDurability, ExternalStorage, HostController, MigratePlanResult, + ProgramStorage, ReducerCallResult, ReducerOutcome, StartSnapshotWatcher, }; pub use module_host::{ModuleHost, NoSuchModule, ReducerCallError, UpdateDatabaseResult}; pub use scheduler::Scheduler; diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 29749f651db..d5beba9eb6e 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -41,7 +41,7 @@ use spacetimedb_lib::Timestamp; use spacetimedb_primitives::TableId; use spacetimedb_query::compile_subscription; use spacetimedb_sats::ProductValue; -use spacetimedb_schema::auto_migrate::AutoMigrateError; +use spacetimedb_schema::auto_migrate::{AutoMigrateError, MigrationPolicy}; use spacetimedb_schema::def::deserialize::ReducerArgsDeserializeSeed; use spacetimedb_schema::def::{ModuleDef, ReducerDef, TableDef}; use spacetimedb_schema::schema::{Schema, TableSchema}; @@ -334,6 +334,7 @@ pub trait ModuleInstance: Send + 'static { &mut self, program: Program, old_module_info: Arc, + policy: MigrationPolicy, ) -> anyhow::Result; fn call_reducer(&mut self, tx: Option, params: CallReducerParams) -> ReducerCallResult; @@ -458,8 +459,9 @@ impl ModuleInstance for AutoReplacingModuleInstance { &mut self, program: Program, old_module_info: Arc, + policy: MigrationPolicy, ) -> anyhow::Result { - let ret = self.inst.update_database(program, old_module_info); + let ret = self.inst.update_database(program, old_module_info, policy); self.check_trap(); ret } @@ -1057,9 +1059,10 @@ impl ModuleHost { &self, program: Program, old_module_info: Arc, + policy: MigrationPolicy, ) -> Result { self.call("", move |inst| { - inst.update_database(program, old_module_info) + inst.update_database(program, old_module_info, policy) }) .await? } diff --git a/crates/core/src/host/v8/mod.rs b/crates/core/src/host/v8/mod.rs index fd49d1e5a3c..e988f42176a 100644 --- a/crates/core/src/host/v8/mod.rs +++ b/crates/core/src/host/v8/mod.rs @@ -10,6 +10,7 @@ use crate::{ }; use anyhow::anyhow; use spacetimedb_datastore::locking_tx_datastore::MutTxId; +use spacetimedb_schema::auto_migrate::MigrationPolicy; use std::sync::{Arc, LazyLock}; mod error; @@ -120,6 +121,7 @@ impl ModuleInstance for JsInstance { &mut self, _program: spacetimedb_datastore::traits::Program, _old_module_info: Arc, + _policy: MigrationPolicy, ) -> anyhow::Result { todo!() } diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index c337140ac94..5f6d3572285 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -1,7 +1,7 @@ use bytes::Bytes; use prometheus::IntGauge; use spacetimedb_lib::db::raw_def::v9::Lifecycle; -use spacetimedb_schema::auto_migrate::ponder_migrate; +use spacetimedb_schema::auto_migrate::{MigratePlan, MigrationPolicy, MigrationPolicyError}; use std::sync::Arc; use std::time::Duration; @@ -245,16 +245,24 @@ impl ModuleInstance for WasmModuleInstance { &mut self, program: Program, old_module_info: Arc, + policy: MigrationPolicy, ) -> Result { - let plan = ponder_migrate(&old_module_info.module_def, &self.info.module_def); - let plan = match plan { + let stdb = &*self.replica_context().relational_db; + let plan: MigratePlan = match policy.try_migrate( + self.info.database_identity, + old_module_info.module_hash, + &old_module_info.module_def, + self.info.module_hash, + &self.info.module_def, + ) { Ok(plan) => plan, - Err(errs) => { - return Ok(UpdateDatabaseResult::AutoMigrateError(errs)); + Err(e) => { + return match e { + MigrationPolicyError::AutoMigrateFailure(e) => Ok(UpdateDatabaseResult::AutoMigrateError(e)), + _ => Ok(UpdateDatabaseResult::ErrorExecutingMigration(e.into())), + } } }; - let stdb = &*self.replica_context().relational_db; - let program_hash = program.hash; let tx = stdb.begin_mut_tx(IsolationLevel::Serializable, Workload::Internal); let (mut tx, _) = stdb.with_auto_rollback(tx, |tx| stdb.update_program(tx, HostType::Wasm, program))?; diff --git a/crates/schema/src/auto_migrate.rs b/crates/schema/src/auto_migrate.rs index 82ea3cfe5fe..e3df1dc6682 100644 --- a/crates/schema/src/auto_migrate.rs +++ b/crates/schema/src/auto_migrate.rs @@ -9,16 +9,17 @@ use spacetimedb_data_structures::{ }; use spacetimedb_lib::{ db::raw_def::v9::{RawRowLevelSecurityDefV9, TableType}, - AlgebraicType, + hash_bytes, AlgebraicType, Identity, }; use spacetimedb_sats::{ layout::{HasLayout, SumTypeLayout}, WithTypespace, }; +use thiserror::Error; mod ansi_formatter; mod formatter; mod plain_formatter; - +pub use formatter::PrettyPrintStyle; pub type Result = std::result::Result>; /// A plan for a migration. @@ -28,12 +29,6 @@ pub enum MigratePlan<'def> { Auto(AutoMigratePlan<'def>), } -#[derive(Copy, Clone, PartialEq, Eq)] -pub enum PrettyPrintStyle { - AnsiColor, - NoColor, -} - impl<'def> MigratePlan<'def> { /// Get the old `ModuleDef` for this migration plan. pub fn old_def(&self) -> &'def ModuleDef { @@ -51,9 +46,19 @@ impl<'def> MigratePlan<'def> { } } + pub fn breaks_client(&self) -> bool { + match self { + //TODO: fix it when support for manual migration plans is added. + MigratePlan::Manual(_) => true, + MigratePlan::Auto(plan) => plan + .steps + .iter() + .any(|step| matches!(step, AutoMigrateStep::DisconnectAllUsers)), + } + } + pub fn pretty_print(&self, style: PrettyPrintStyle) -> anyhow::Result { use PrettyPrintStyle::*; - match self { MigratePlan::Manual(_) => { anyhow::bail!("Manual migration plans are not yet supported for pretty printing.") @@ -74,6 +79,94 @@ impl<'def> MigratePlan<'def> { } } +/// A migration policy that determines whether a module update is allowed to break client compatibility. +/// +/// `Compatible` requires migration to maintain backward compatibility. +/// `BreakingChange` allows breaking changes but requires a valid `MigrationToken`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum MigrationPolicy { + Compatible, + BreakClients(spacetimedb_lib::Hash), +} + +impl MigrationPolicy { + /// Verifies whether the given migration plan is allowed under the current policy. + /// + /// Returns `Ok(())` if allowed, otherwise an appropriate `MigrationPolicyError` + fn permits_plan(&self, plan: &MigratePlan<'_>, token: &MigrationToken) -> anyhow::Result<(), MigrationPolicyError> { + match self { + MigrationPolicy::Compatible => { + if plan.breaks_client() { + Err(MigrationPolicyError::ClientBreakingChangeDisallowed) + } else { + Ok(()) + } + } + MigrationPolicy::BreakClients(expected_hash) => { + if token.hash() == *expected_hash { + Ok(()) + } else { + Err(MigrationPolicyError::InvalidToken) + } + } + } + } + + /// Attempts to generate a migration plan and validate it under this policy. + /// + /// Fails if migration is not permitted by the policy or migration planning fails. + pub fn try_migrate<'def>( + &self, + database_identity: Identity, + old_module_hash: spacetimedb_lib::Hash, + old_module_def: &'def ModuleDef, + new_module_hash: spacetimedb_lib::Hash, + new_module_def: &'def ModuleDef, + ) -> anyhow::Result, MigrationPolicyError> { + let plan = ponder_migrate(old_module_def, new_module_def).map_err(MigrationPolicyError::AutoMigrateFailure)?; + + let token = MigrationToken { + database_identity, + old_module_hash, + new_module_hash, + }; + self.permits_plan(&plan, &token)?; + Ok(plan) + } +} + +#[derive(Debug, Error)] +pub enum MigrationPolicyError { + #[error("Automatic migration planning failed")] + AutoMigrateFailure(ErrorStream), + + #[error("Token provided is invalid or does not match expected hash")] + InvalidToken, + + #[error("Migration plan contains a client-breaking change which is disallowed under current policy")] + ClientBreakingChangeDisallowed, +} + +pub struct MigrationToken { + pub database_identity: Identity, + pub old_module_hash: spacetimedb_lib::Hash, + pub new_module_hash: spacetimedb_lib::Hash, +} + +impl MigrationToken { + pub fn hash(&self) -> spacetimedb_lib::Hash { + hash_bytes( + format!( + "{}{}{}", + self.database_identity.to_hex(), + self.old_module_hash.to_hex(), + self.new_module_hash.to_hex() + ) + .as_str(), + ) + } +} + /// A plan for a manual migration. /// `new` must have a reducer marked with `Lifecycle::Update`. #[derive(Debug)] diff --git a/crates/schema/src/auto_migrate/formatter.rs b/crates/schema/src/auto_migrate/formatter.rs index 7cd8ede2aa7..8f3b2d490a1 100644 --- a/crates/schema/src/auto_migrate/formatter.rs +++ b/crates/schema/src/auto_migrate/formatter.rs @@ -14,6 +14,13 @@ use spacetimedb_lib::{ use spacetimedb_sats::WithTypespace; use thiserror::Error; +#[derive(Default, Copy, Clone, PartialEq, Eq)] +pub enum PrettyPrintStyle { + #[default] + AnsiColor, + NoColor, +} + pub fn format_plan(f: &mut F, plan: &AutoMigratePlan) -> Result<(), FormattingErrors> { f.format_header(); diff --git a/crates/standalone/Cargo.toml b/crates/standalone/Cargo.toml index 4e74fd7316d..625f751457c 100644 --- a/crates/standalone/Cargo.toml +++ b/crates/standalone/Cargo.toml @@ -28,6 +28,7 @@ spacetimedb-datastore.workspace = true spacetimedb-lib.workspace = true spacetimedb-paths.workspace = true spacetimedb-table.workspace = true +spacetimedb-schema.workspace = true anyhow.workspace = true async-trait.workspace = true diff --git a/crates/standalone/src/lib.rs b/crates/standalone/src/lib.rs index cb179bb258e..9ebb9c23047 100644 --- a/crates/standalone/src/lib.rs +++ b/crates/standalone/src/lib.rs @@ -5,7 +5,7 @@ pub mod version; use crate::control_db::ControlDb; use crate::subcommands::{extract_schema, start}; -use anyhow::{ensure, Context, Ok}; +use anyhow::{ensure, Context as _, Ok}; use async_trait::async_trait; use clap::{ArgMatches, Command}; use spacetimedb::client::ClientActorIndex; @@ -13,7 +13,8 @@ use spacetimedb::config::{CertificateAuthority, MetadataFile}; use spacetimedb::db::{self, relational_db}; use spacetimedb::energy::{EnergyBalance, EnergyQuanta, NullEnergyMonitor}; use spacetimedb::host::{ - DiskStorage, DurabilityProvider, ExternalDurability, HostController, StartSnapshotWatcher, UpdateDatabaseResult, + DiskStorage, DurabilityProvider, ExternalDurability, HostController, MigratePlanResult, StartSnapshotWatcher, + UpdateDatabaseResult, }; use spacetimedb::identity::Identity; use spacetimedb::messages::control_db::{Database, Node, Replica}; @@ -28,6 +29,7 @@ use spacetimedb_datastore::db_metrics::DB_METRICS; use spacetimedb_datastore::traits::Program; use spacetimedb_paths::server::{ModuleLogsDir, PidFile, ServerDataDir}; use spacetimedb_paths::standalone::StandaloneDataDirExt; +use spacetimedb_schema::auto_migrate::{MigrationPolicy, PrettyPrintStyle}; use spacetimedb_table::page_pool::PagePool; use std::sync::Arc; @@ -239,6 +241,7 @@ impl spacetimedb_client_api::ControlStateWriteAccess for StandaloneEnv { &self, publisher: &Identity, spec: spacetimedb_client_api::DatabaseDef, + policy: MigrationPolicy, ) -> anyhow::Result> { let existing_db = self.control_db.get_database_by_identity(&spec.database_identity)?; @@ -295,7 +298,7 @@ impl spacetimedb_client_api::ControlStateWriteAccess for StandaloneEnv { .await? .ok_or_else(|| anyhow::anyhow!("No leader for database"))?; let update_result = leader - .update(database, spec.host_type, spec.program_bytes.into()) + .update(database, spec.host_type, spec.program_bytes.into(), policy) .await?; if update_result.was_successful() { let replicas = self.control_db.get_replicas_by_database(database_id)?; @@ -345,6 +348,30 @@ impl spacetimedb_client_api::ControlStateWriteAccess for StandaloneEnv { } } + async fn migrate_plan( + &self, + spec: spacetimedb_client_api::DatabaseDef, + style: PrettyPrintStyle, + ) -> anyhow::Result { + let existing_db = self.control_db.get_database_by_identity(&spec.database_identity)?; + + match existing_db { + Some(db) => { + let host = self + .leader(db.id) + .await? + .ok_or_else(|| anyhow::anyhow!("No leader for database"))?; + self.host_controller + .migrate_plan(db, spec.host_type, host.replica_id, spec.program_bytes.into(), style) + .await + } + None => anyhow::bail!( + "Database `{}` does not exist", + spec.database_identity.to_abbreviated_hex() + ), + } + } + async fn delete_database(&self, caller_identity: &Identity, database_identity: &Identity) -> anyhow::Result<()> { let Some(database) = self.control_db.get_database_by_identity(database_identity)? else { return Ok(()); diff --git a/crates/testing/src/modules.rs b/crates/testing/src/modules.rs index 8a62076b64a..eefc5791a7c 100644 --- a/crates/testing/src/modules.rs +++ b/crates/testing/src/modules.rs @@ -12,6 +12,7 @@ use spacetimedb::Identity; use spacetimedb_client_api::auth::SpacetimeAuth; use spacetimedb_client_api::routes::subscribe::{generate_random_connection_id, WebSocketOptions}; use spacetimedb_paths::{RootDir, SpacetimePaths}; +use spacetimedb_schema::auto_migrate::MigrationPolicy; use spacetimedb_schema::def::ModuleDef; use tokio::runtime::{Builder, Runtime}; @@ -205,6 +206,7 @@ impl CompiledModule { num_replicas: None, host_type: HostType::Wasm, }, + MigrationPolicy::Compatible, ) .await .unwrap();