diff --git a/nexus/db-queries/src/db/datastore/db_metadata.rs b/nexus/db-queries/src/db/datastore/db_metadata.rs index 782753854c5..f89b557402e 100644 --- a/nexus/db-queries/src/db/datastore/db_metadata.rs +++ b/nexus/db-queries/src/db/datastore/db_metadata.rs @@ -17,7 +17,6 @@ use nexus_db_model::DbMetadataBase; use nexus_db_model::DbMetadataUpdate; use nexus_db_model::EARLIEST_SUPPORTED_VERSION; use nexus_db_model::SchemaUpgradeStep; -use nexus_db_model::SchemaVersion; use omicron_common::api::external::Error; use semver::Version; use slog::{Logger, error, info, o}; @@ -102,33 +101,157 @@ fn skippable_version( return false; } -/// Reasons why [DataStore::ensure_schema] might fail -#[derive(thiserror::Error, Debug)] -pub enum EnsureSchemaError { - /// This Nexus is too old to use the schema currently in the database, - /// or quiescing has started (meaning that the schema will be updating - /// beyond this Nexus' compatibility very soon). +/// Reports how the schema version deployed in the database compares to what +/// this Nexus expects. +/// +/// When we say "what Nexus expects", we're referring to the +/// [nexus_db_model::SCHEMA_VERSION] which this particular Nexus binary was +/// compiled with. During an update, multiple Nexuses with different +/// values of `SCHEMA_VERSION` may be running at the same time, and handoff +/// between them must be carefully coordinated. +#[derive(Debug, Clone, PartialEq)] +pub enum SchemaVersionStatus { + /// The database version matches and we're not quiescing + DatabaseMatchesReady, + /// The database version matches but we should be quiescing + DatabaseMatchesQuiescing, + /// Database is actually on a newer version, or database is on the same + /// version but `quiesce_completed` is true + DatabaseIsNewer, + /// The database is on an older version and quiesce has not completed + DatabaseIsOlderUnquiesced, + /// The database is on an older version and quiesce has completed + DatabaseIsOlderQuiesced, +} + +impl SchemaVersionStatus { + /// Interpret how the current db metadata compares to our desired version + pub fn interpret( + db_metadata: &DbMetadata, + desired_version: &Version, + ) -> SchemaVersionStatus { + use std::cmp::Ordering; + + let found_version: Version = db_metadata.version().clone().into(); + let quiesce_started = db_metadata.quiesce_started(); + let quiesce_completed = db_metadata.quiesce_completed(); + + match found_version.cmp(desired_version) { + Ordering::Greater => SchemaVersionStatus::DatabaseIsNewer, + Ordering::Equal => { + if quiesce_completed { + SchemaVersionStatus::DatabaseIsNewer + } else if quiesce_started { + SchemaVersionStatus::DatabaseMatchesQuiescing + } else { + SchemaVersionStatus::DatabaseMatchesReady + } + } + Ordering::Less => { + if quiesce_completed { + SchemaVersionStatus::DatabaseIsOlderQuiesced + } else { + SchemaVersionStatus::DatabaseIsOlderUnquiesced + } + } + } + } +} + +/// A reason why [SchemaAction::Refuse] would be returned. +#[derive(Debug, Clone, PartialEq)] +pub enum SchemaRefuseReason { + /// The database appears out-of-date, and our policy will not allow + /// us to perform an upgrade + OutOfDate, + + /// The database schema appears too new + TooNew, +} + +/// Describes how to respond to a [SchemaVersionStatus] +#[derive(Debug, Clone, PartialEq)] +pub enum SchemaAction { + /// Normal operation: use the database normally + Ready, + + /// Start quiescing Nexus /// - /// Typically, this happens during an upgrade when the older version of - /// Nexus is starting up during the handoff to the newer Nexus. - #[error("Nexus is too old to use this schema; handoff: {handoff}")] - NexusTooOld { - /// Should this Nexus still try to participate in handoff to a newer - /// Nexus? - /// - /// If "no", then we should terminate. - handoff: bool, - }, + /// It's expected that this will not be an instantaneous operation; + /// we may need to complete sagas to finish quiescing. + Quiesce, + + /// Wait for either the schema to be updated or (if willing to update it + /// ourselves but we want to wait for quiesce to complete) for quiescing to + /// finish (do not use the database, do not try to upgrade it) + Wait, + + /// Start a schema update + Update, + + /// Do not touch the database - refuse the schema for the supplied reason. + Refuse { reason: SchemaRefuseReason }, +} - /// This Nexus could upgrade, but is waiting for quiesce to complete - #[error("Nexus is waiting for older Nexus versions to quiesce")] - WaitingForQuiesce, +/// Determines how the consumer wants to behave when the schema doesn't match +/// what they expect +#[derive(Debug, Copy, Clone, PartialEq)] +pub enum ConsumerPolicy { + /// Throw an error if the schema does not exactly match + FailOnMismatch, + /// Update if we're ready to update, otherwise identify that we should wait + UpdateGracefully, + /// Force an update regardless of quiesce state + UpdateForcefully, +} + +impl SchemaAction { + pub fn new( + schema_version_status: SchemaVersionStatus, + consumer_policy: ConsumerPolicy, + ) -> SchemaAction { + use ConsumerPolicy::*; + use SchemaAction::*; + use SchemaRefuseReason::*; + use SchemaVersionStatus::*; + + match (schema_version_status, consumer_policy) { + // Database is newer than what we expect + (DatabaseIsNewer, _) => Refuse { reason: TooNew }, + + // Database matches and we're ready + (DatabaseMatchesReady, _) => Ready, + + // Database matches but we're quiescing + (DatabaseMatchesQuiescing, FailOnMismatch) => Quiesce, + (DatabaseMatchesQuiescing, UpdateGracefully) => Quiesce, + (DatabaseMatchesQuiescing, UpdateForcefully) => Ready, + + // Database is older and unquiesced + (DatabaseIsOlderUnquiesced, FailOnMismatch) => { + Refuse { reason: OutOfDate } + } + (DatabaseIsOlderUnquiesced, UpdateGracefully) => Wait, + (DatabaseIsOlderUnquiesced, UpdateForcefully) => Update, - /// The schema does not match, and we aren't considering changing it - #[error( - "DB schema {found} < Nexus schema {desired}, but schema update is disabled" - )] - SchemaUpdateDisabled { found: Version, desired: Version }, + // Database is older and quiesced (ready for update) + (DatabaseIsOlderQuiesced, FailOnMismatch) => { + Refuse { reason: OutOfDate } + } + (DatabaseIsOlderQuiesced, UpdateGracefully) => Update, + (DatabaseIsOlderQuiesced, UpdateForcefully) => Update, + } + } +} + +/// Reasons why [DataStore::ensure_schema] might fail +#[derive(thiserror::Error, Debug)] +pub enum EnsureSchemaError { + /// The schema cannot be ensured, because our interpretation + /// of the policy and database state identifies that we should + /// not proceed with database usage or a schema update. + #[error("Unexpected action for schema update: {action:?}")] + UnexpectedAction { action: SchemaAction }, /// During schema upgrade, we couldn't find the necessary upgrade steps #[error("Schema update from {found} -> {desired} missing step: {missing}")] @@ -143,158 +266,112 @@ pub enum EnsureSchemaError { Other(#[from] anyhow::Error), } -/// Describes whether or not [DataStore::ensure_schema] should proceed -/// with a schema update if it finds the currently-deployed schema is -/// out-of-date. -pub enum UpdateConfiguration<'a> { - /// The update will not be triggered - Disabled, - - /// If the schema update is possible, it will trigger - Enabled { - /// Known schema versions, and ways to upgrade between them - all_versions: &'a AllSchemaVersions, - - /// If "true", proceed with schema updates (if necessary) even when the - /// database state suggests older Nexus instances may have not quiesced. - /// - /// This should only be set to "true" cautiously (e.g., this is used by - /// the schema-updater binary, which can be used by operators when they - /// know old Nexus instances are not running). - /// - /// For most cases, this should be set to "false". - ignore_quiesce: bool, - }, -} - impl DataStore { - /// Ensures that the database schema matches "desired_version". + /// Check the current schema version status compared to the desired version + pub async fn check_schema_version( + &self, + desired_version: &Version, + ) -> Result { + let db_metadata = self.database_metadata().await?; + Ok(SchemaVersionStatus::interpret(&db_metadata, desired_version)) + } + + /// Validates that the desired version of the schema is up-to-date, + /// according to the supplied [ConsumerPolicy], which possibly causes a + /// schema update. /// - /// The `config` argument can be used to upgrade the schema to the desired - /// version (if "Enabled") or can be used as a read-only check to validate - /// the schema (if "Disabled"). + /// This function internally calls the equivalent of + /// [Self::check_schema_version], and verifies that the [SchemaAction] + /// is either "Ready" or "Update". /// - /// - Updating the schema makes the database incompatible with older - /// versions of Nexus, which are not running "desired_version". - /// - This is a one-way operation that cannot be undone. - /// - The caller is responsible for ensuring that the new version is valid, - /// and that all running Nexus instances can understand the new schema - /// version. + /// Returns errors if the schema does not match or cannot be made + /// to match. pub async fn ensure_schema( &self, log: &Logger, desired_version: Version, - config: UpdateConfiguration<'_>, + consumer_policy: ConsumerPolicy, + all_versions: Option<&AllSchemaVersions>, ) -> Result<(), EnsureSchemaError> { + // Observe current status let db_metadata = self .database_metadata() .await .context("Cannot read database schema version")?; + let status = + SchemaVersionStatus::interpret(&db_metadata, &desired_version); - let found_version: Version = db_metadata.version().clone().into(); - let found_target_version: Option = - db_metadata.target_version().map(|v| v.clone().into()); - let quiesce_started = db_metadata.quiesce_started(); - let quiesce_completed = db_metadata.quiesce_completed(); - - let log = log.new(o!( - "found_version" => found_version.to_string(), - "desired_version" => desired_version.to_string(), - "quiesce_started" => quiesce_started, - "quiesce_completed" => quiesce_completed, - )); - - // This case is effectively: Quiesce has already completed, AND the - // schema migration has already been marked as completed. - if found_version > desired_version { - warn!( - log, - "Found schema version is newer than desired schema version"; - ); - return Err(EnsureSchemaError::NexusTooOld { handoff: false }); - } - - // NOTE: We could run with a less tight restriction. - // - // If we respect the meaning of the semver version, it should be - // possible to use subsequent versions, as long as they do not introduce - // breaking changes. - // - // However, at the moment, we opt for conservatism: if the database does - // not exactly match the schema version, we refuse to continue without - // modification. - if found_version == desired_version { - if quiesce_completed { - warn!( - log, - "Schema version is up-to-date, but quiescing has completed" - ); - return Err(EnsureSchemaError::NexusTooOld { handoff: false }); - } - if quiesce_started { - warn!( - log, - "Schema version is up-to-date, but quiescing is in-progress" - ); - return Err(EnsureSchemaError::NexusTooOld { handoff: true }); - } - info!(log, "Database schema version is up to date"); - return Ok(()); - } - - // At this point, we know the "found_version < desired_version". + // Determine what action to take based on policy + let action = SchemaAction::new(status.clone(), consumer_policy); - let (all_versions, ignore_quiesce) = match config { - UpdateConfiguration::Disabled => { - error!( - log, - "Database schema version is out of date, but automatic \ - update is disabled", - ); - return Err(EnsureSchemaError::SchemaUpdateDisabled { - found: found_version.clone(), - desired: desired_version.clone(), - }); + // Execute the determined action + match action { + SchemaAction::Ready => { + info!(log, "Database schema version is up to date"); + Ok(()) } - UpdateConfiguration::Enabled { all_versions, ignore_quiesce } => { - (all_versions, ignore_quiesce) + SchemaAction::Update => { + match all_versions { + Some(versions) => { + info!(log, "Starting schema update"); + // Inline the actual schema update logic to avoid recursion + let current_version = + db_metadata.version().clone().into(); + let target_version = db_metadata + .target_version() + .map(|v| v.clone().into()); + self.perform_schema_update( + log, + current_version, + target_version, + desired_version, + versions, + ) + .await + } + None => { + error!( + log, + "Schema update requested but no version information provided" + ); + Err(EnsureSchemaError::Other(anyhow!( + "No schema versions provided for update" + ))) + } + } } - }; - - if !ignore_quiesce && !quiesce_completed { - warn!( - log, - "Schema version is older than desired; waiting for quiesce to complete"; - ); - return Err(EnsureSchemaError::WaitingForQuiesce); + _ => Err(EnsureSchemaError::UnexpectedAction { action }), } + } - // If we're here, we know the following: - // - // - The schema does not match our expected version (or at least, it - // didn't when we read it moments ago). - // - We should attempt to automatically upgrade the schema. - info!(log, "Database schema is out of date. Attempting upgrade."); - if !all_versions.contains_version(&found_version) { + /// Performs the actual schema update from current_version to desired_version + async fn perform_schema_update( + &self, + log: &Logger, + current_version: Version, + found_target_version: Option, + desired_version: Version, + all_versions: &AllSchemaVersions, + ) -> Result<(), EnsureSchemaError> { + if !all_versions.contains_version(¤t_version) { return Err(EnsureSchemaError::SchemaUpdateStepMissing { - found: found_version.clone(), + found: current_version.clone(), desired: desired_version.clone(), - missing: found_version.clone(), + missing: current_version.clone(), }); } - // TODO: Test this? if !all_versions.contains_version(&desired_version) { return Err(EnsureSchemaError::SchemaUpdateStepMissing { - found: found_version.clone(), + found: current_version.clone(), desired: desired_version.clone(), missing: desired_version.clone(), }); } - let target_versions: Vec<&SchemaVersion> = all_versions + let target_versions: Vec<_> = all_versions .versions_range(( - Bound::Excluded(&found_version), + Bound::Excluded(¤t_version), Bound::Included(&desired_version), )) .collect(); @@ -303,7 +380,7 @@ impl DataStore { // // These are the user-defined `KNOWN_VERSIONS` defined in // nexus/db-model/src/schema_versions.rs. - let mut current_version = found_version; + let mut current_version = current_version; for target_version in target_versions.into_iter() { let log = log.new(o!( "current_version" => current_version.to_string(), @@ -331,7 +408,7 @@ impl DataStore { for (i, step) in target_version.upgrade_steps().enumerate() { let target_step = - StepSemverVersion::new(&target_version.semver(), i)?; + StepSemverVersion::new(target_version.semver(), i)?; let log = log.new(o!("target_step.version" => target_step.version.to_string())); self.apply_step_version_update( @@ -341,42 +418,16 @@ impl DataStore { ¤t_version, &found_target_version, ) - .await?; + .await + .context("Failed to apply schema update step")?; last_step_version = Some(target_step.clone()); } - info!( - log, - "Applied schema upgrade"; - ); + info!(log, "Applied schema upgrade"); - // NOTE: We could execute the schema change in a background task, - // and let it propagate, while observing it with the following - // snippet of SQL: - // - // WITH - // x AS (SHOW JOBS) - // SELECT * FROM x WHERE - // job_type = 'SCHEMA CHANGE' AND - // status != 'succeeded'; - // - // This would enable concurrent operations to happen on the database - // while we're mid-update. However, there is subtlety here around - // the visibility of renamed / deleted fields, unique indices, etc, - // so in the short-term we simply block on this job performing the - // update. - // - // NOTE: If we wanted to back-fill data manually, we could do so - // here. - - // Now that the schema change has completed, set the following: - // - db_metadata.version = new version - // - db_metadata.target_version = NULL - // - db_metadata.quiesce_started = false - // - db_metadata.quiesce_completed = false let last_step_version = last_step_version - .ok_or_else(|| anyhow::anyhow!("Missing final step version"))?; + .ok_or_else(|| anyhow!("Missing final step version"))?; // We will keep "quiesced" set to true up until the last update. // @@ -392,11 +443,7 @@ impl DataStore { .await .context("Failed to finalize schema update")?; - info!( - log, - "Finalized schema upgrade"; - ); - + info!(log, "Finalized schema upgrade"); current_version = target_version.semver().clone(); } @@ -697,7 +744,8 @@ mod test { .ensure_schema( &logctx.log, SCHEMA_VERSION, - UpdateConfiguration::Disabled, + ConsumerPolicy::FailOnMismatch, + None, ) .await .expect("Failed to ensure schema"); @@ -947,10 +995,8 @@ mod test { .ensure_schema( &log, SCHEMA_VERSION, - UpdateConfiguration::Enabled { - all_versions: &all_versions, - ignore_quiesce: false, - }, + ConsumerPolicy::UpdateGracefully, + Some(&all_versions), ) .await { @@ -1025,15 +1071,21 @@ mod test { .ensure_schema( &log, SCHEMA_VERSION, - UpdateConfiguration::Disabled, + ConsumerPolicy::FailOnMismatch, + None, ) .await else { panic!("Ensuring schema mid-quiesce should fail"); }; assert!( - matches!(err, EnsureSchemaError::NexusTooOld { handoff: true },), - "Unexpected error: {err:?} (expected: Nexus too old, with handoff)", + matches!( + err, + EnsureSchemaError::UnexpectedAction { + action: SchemaAction::Quiesce + } + ), + "Unexpected error: {err:?} (expected: Should Quiesce)", ); } @@ -1048,7 +1100,8 @@ mod test { .ensure_schema( &log, SCHEMA_VERSION, - UpdateConfiguration::Disabled, + ConsumerPolicy::FailOnMismatch, + None, ) .await else { @@ -1057,9 +1110,13 @@ mod test { assert!( matches!( err, - EnsureSchemaError::NexusTooOld { handoff: false }, + EnsureSchemaError::UnexpectedAction { + action: SchemaAction::Refuse { + reason: SchemaRefuseReason::TooNew + } + }, ), - "Unexpected error: {err:?} (expected: Nexus too old, with no handoff)", + "Unexpected error: {err:?} (expected: Database too new)", ); } @@ -1074,7 +1131,12 @@ mod test { set_quiesce(&datastore, quiesce_started, quiesce_completed).await; let Err(err) = datastore - .ensure_schema(&log, old_schema, UpdateConfiguration::Disabled) + .ensure_schema( + &log, + old_schema, + ConsumerPolicy::FailOnMismatch, + None, + ) .await else { panic!("Ensuring old schema should fail"); @@ -1082,9 +1144,13 @@ mod test { assert!( matches!( err, - EnsureSchemaError::NexusTooOld { handoff: false }, + EnsureSchemaError::UnexpectedAction { + action: SchemaAction::Refuse { + reason: SchemaRefuseReason::TooNew + } + }, ), - "Unexpected error: {err:?} (expected: Nexus too old, with no handoff)", + "Unexpected error: {err:?} (expected: Database too new)", ); } @@ -1127,17 +1193,20 @@ mod test { .ensure_schema( &log, new_schema.clone(), - UpdateConfiguration::Enabled { - all_versions: &all_versions, - ignore_quiesce: false, - }, + ConsumerPolicy::UpdateGracefully, + Some(&all_versions), ) .await else { panic!("Ensuring schema pre-quiesce should fail"); }; assert!( - matches!(err, EnsureSchemaError::WaitingForQuiesce), + matches!( + err, + EnsureSchemaError::UnexpectedAction { + action: SchemaAction::Wait + } + ), "Unexpected error: {err:?} (expected: Wait for Quiesce)", ); @@ -1151,17 +1220,20 @@ mod test { .ensure_schema( &log, new_schema.clone(), - UpdateConfiguration::Enabled { - all_versions: &all_versions, - ignore_quiesce: false, - }, + ConsumerPolicy::UpdateGracefully, + Some(&all_versions), ) .await else { panic!("Ensuring schema mid-quiesce should fail"); }; assert!( - matches!(err, EnsureSchemaError::WaitingForQuiesce), + matches!( + err, + EnsureSchemaError::UnexpectedAction { + action: SchemaAction::Wait + } + ), "Unexpected error: {err:?} (expected: Wait for Quiesce)", ); @@ -1175,10 +1247,8 @@ mod test { .ensure_schema( &log, new_schema.clone(), - UpdateConfiguration::Enabled { - all_versions: &all_versions, - ignore_quiesce: false, - }, + ConsumerPolicy::UpdateGracefully, + Some(&all_versions), ) .await .expect("Ensuring schema should have succeeded"); @@ -1217,17 +1287,237 @@ mod test { // Load the new version, but don't enable update. let datastore = DataStore::new_unchecked(log.clone(), pool.clone()); let Err(err) = datastore - .ensure_schema(&log, new_schema, UpdateConfiguration::Disabled) + .ensure_schema( + &log, + new_schema, + ConsumerPolicy::FailOnMismatch, + None, + ) .await else { panic!("Ensuring schema without enabling update should fail"); }; assert!( - matches!(err, EnsureSchemaError::SchemaUpdateDisabled { .. },), - "Unexpected error: {err:?} (expected: Schema Update Disabled)", + matches!( + err, + EnsureSchemaError::UnexpectedAction { + action: SchemaAction::Refuse { + reason: SchemaRefuseReason::OutOfDate + } + } + ), + "Unexpected error: {err:?} (expected: Schema out-of-date)", ); db.terminate().await; logctx.cleanup_successful(); } + + // Tests for the new policy-based architecture + #[tokio::test] + async fn test_schema_version_status_interpret() { + // Test all combinations of version comparisons and quiesce states + let logctx = + dev::test_setup_log("test_schema_version_status_interpret"); + let db = TestDatabase::new_with_pool(&logctx.log).await; + let datastore = + DataStore::new_unchecked(logctx.log.clone(), db.pool().clone()); + + let current_version = SCHEMA_VERSION; + let mut newer_version = SCHEMA_VERSION; + newer_version.major += 1; + let mut older_version = SCHEMA_VERSION; + older_version.major = older_version.major.saturating_sub(1); + + // Test: Database matches, no quiesce + set_quiesce(&datastore, false, false).await; + let metadata = datastore.database_metadata().await.unwrap(); + let status = + SchemaVersionStatus::interpret(&metadata, ¤t_version); + assert_eq!(status, SchemaVersionStatus::DatabaseMatchesReady); + + // Test: Database matches, quiesce started + set_quiesce(&datastore, true, false).await; + let metadata = datastore.database_metadata().await.unwrap(); + let status = + SchemaVersionStatus::interpret(&metadata, ¤t_version); + assert_eq!(status, SchemaVersionStatus::DatabaseMatchesQuiescing); + + // Test: Database matches, quiesce completed + set_quiesce(&datastore, true, true).await; + let metadata = datastore.database_metadata().await.unwrap(); + let status = + SchemaVersionStatus::interpret(&metadata, ¤t_version); + assert_eq!(status, SchemaVersionStatus::DatabaseIsNewer); + + // Test: Database is newer + let metadata = datastore.database_metadata().await.unwrap(); + let status = SchemaVersionStatus::interpret(&metadata, &older_version); + assert_eq!(status, SchemaVersionStatus::DatabaseIsNewer); + + // We can't easily test older database versions without more setup, + // but the logic is straightforward from the implementation + + db.terminate().await; + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_schema_action_policy_combinations() { + // Test the big match in SchemaAction::new for all meaningful combinations + + // DatabaseIsNewer should always result in "Refuse + TooNew" + for policy in [ + ConsumerPolicy::FailOnMismatch, + ConsumerPolicy::UpdateGracefully, + ConsumerPolicy::UpdateForcefully, + ] { + let action = + SchemaAction::new(SchemaVersionStatus::DatabaseIsNewer, policy); + assert_eq!( + action, + SchemaAction::Refuse { reason: SchemaRefuseReason::TooNew } + ); + } + + // DatabaseMatchesReady should always result in "Ready" + for policy in [ + ConsumerPolicy::FailOnMismatch, + ConsumerPolicy::UpdateGracefully, + ConsumerPolicy::UpdateForcefully, + ] { + let action = SchemaAction::new( + SchemaVersionStatus::DatabaseMatchesReady, + policy, + ); + assert_eq!(action, SchemaAction::Ready); + } + + // DatabaseMatchesQuiescing behavior varies by policy + assert_eq!( + SchemaAction::new( + SchemaVersionStatus::DatabaseMatchesQuiescing, + ConsumerPolicy::FailOnMismatch + ), + SchemaAction::Quiesce + ); + assert_eq!( + SchemaAction::new( + SchemaVersionStatus::DatabaseMatchesQuiescing, + ConsumerPolicy::UpdateGracefully + ), + SchemaAction::Quiesce + ); + assert_eq!( + SchemaAction::new( + SchemaVersionStatus::DatabaseMatchesQuiescing, + ConsumerPolicy::UpdateForcefully + ), + SchemaAction::Ready + ); + + // DatabaseIsOlderUnquiesced behavior varies by policy + assert_eq!( + SchemaAction::new( + SchemaVersionStatus::DatabaseIsOlderUnquiesced, + ConsumerPolicy::FailOnMismatch + ), + SchemaAction::Refuse { reason: SchemaRefuseReason::OutOfDate } + ); + assert_eq!( + SchemaAction::new( + SchemaVersionStatus::DatabaseIsOlderUnquiesced, + ConsumerPolicy::UpdateGracefully + ), + SchemaAction::Wait + ); + assert_eq!( + SchemaAction::new( + SchemaVersionStatus::DatabaseIsOlderUnquiesced, + ConsumerPolicy::UpdateForcefully + ), + SchemaAction::Update + ); + + // DatabaseIsOlderQuiesced behavior varies by policy + assert_eq!( + SchemaAction::new( + SchemaVersionStatus::DatabaseIsOlderQuiesced, + ConsumerPolicy::FailOnMismatch + ), + SchemaAction::Refuse { reason: SchemaRefuseReason::OutOfDate } + ); + assert_eq!( + SchemaAction::new( + SchemaVersionStatus::DatabaseIsOlderQuiesced, + ConsumerPolicy::UpdateGracefully + ), + SchemaAction::Update + ); + assert_eq!( + SchemaAction::new( + SchemaVersionStatus::DatabaseIsOlderQuiesced, + ConsumerPolicy::UpdateForcefully + ), + SchemaAction::Update + ); + } + + #[tokio::test] + async fn test_check_schema_version() { + let logctx = dev::test_setup_log("test_check_schema_version"); + let db = TestDatabase::new_with_pool(&logctx.log).await; + let datastore = + DataStore::new_unchecked(logctx.log.clone(), db.pool().clone()); + + // Test the current version and no quiesce + set_quiesce(&datastore, false, false).await; + let status = + datastore.check_schema_version(&SCHEMA_VERSION).await.unwrap(); + assert_eq!(status, SchemaVersionStatus::DatabaseMatchesReady); + + // Test the current version with quiesce started + set_quiesce(&datastore, true, false).await; + let status = + datastore.check_schema_version(&SCHEMA_VERSION).await.unwrap(); + assert_eq!(status, SchemaVersionStatus::DatabaseMatchesQuiescing); + + // Test the current version with quiesce completed + // + // (Once quiesce completes, the "current version" is equivalent + // to an old schema) + set_quiesce(&datastore, true, true).await; + let status = + datastore.check_schema_version(&SCHEMA_VERSION).await.unwrap(); + assert_eq!(status, SchemaVersionStatus::DatabaseIsNewer); + + // Test an older version without quiesce + let mut old_version = SCHEMA_VERSION; + old_version.major -= 1; + set_quiesce(&datastore, false, false).await; + let status = + datastore.check_schema_version(&old_version).await.unwrap(); + assert_eq!(status, SchemaVersionStatus::DatabaseIsNewer); + + // Test a newer schema with quiesce unfinished + let mut new_version = SCHEMA_VERSION; + new_version.major += 1; + set_quiesce(&datastore, false, false).await; + let status = + datastore.check_schema_version(&new_version).await.unwrap(); + assert_eq!(status, SchemaVersionStatus::DatabaseIsOlderUnquiesced); + set_quiesce(&datastore, true, false).await; + let status = + datastore.check_schema_version(&new_version).await.unwrap(); + assert_eq!(status, SchemaVersionStatus::DatabaseIsOlderUnquiesced); + + // Test a newer schema with quiesce finished + set_quiesce(&datastore, true, true).await; + let status = + datastore.check_schema_version(&new_version).await.unwrap(); + assert_eq!(status, SchemaVersionStatus::DatabaseIsOlderQuiesced); + + db.terminate().await; + logctx.cleanup_successful(); + } } diff --git a/nexus/db-queries/src/db/datastore/mod.rs b/nexus/db-queries/src/db/datastore/mod.rs index 2a852f173c7..5456f50995a 100644 --- a/nexus/db-queries/src/db/datastore/mod.rs +++ b/nexus/db-queries/src/db/datastore/mod.rs @@ -120,7 +120,9 @@ pub mod webhook_delivery; mod zpool; pub use address_lot::AddressLotCreateResult; -pub use db_metadata::UpdateConfiguration; +pub use db_metadata::{ + ConsumerPolicy, SchemaAction, SchemaRefuseReason, SchemaVersionStatus, +}; pub use dns::DataStoreDnsTest; pub use dns::DnsVersionUpdateBuilder; pub use ereport::EreportFilters; @@ -249,60 +251,84 @@ impl DataStore { } } - let config = if let Some(all_versions) = config { - crate::db::datastore::UpdateConfiguration::Enabled { - all_versions, - ignore_quiesce: false, - } + // Observe the schema + let schema_status = datastore.check_schema_version(&EXPECTED_VERSION).await + .map_err(|err| { + warn!(log, "Failed to check schema version"; &err); + BackoffError::transient(err.to_string()) + })?; + + let (policy, all_versions) = if let Some(all_versions) = config { + (ConsumerPolicy::UpdateGracefully, Some(all_versions)) } else { - crate::db::datastore::UpdateConfiguration::Disabled + (ConsumerPolicy::FailOnMismatch, None) }; - match datastore - .ensure_schema(&log, EXPECTED_VERSION, config) - .await - { - Ok(()) => return Ok(()), - Err(db_metadata::EnsureSchemaError::NexusTooOld { handoff }) => { - if handoff { - // We are running an out-of-date schema, but still - // need to participate in handoff. - // - // TODO(https://github.com/oxidecomputer/omicron/issues/8501): - // This should be implemented by contacting other - // Nexus instances, verifying that quiescing has - // completed, and setting "quiesce_complete" once - // that has completed. - // - // NOTE: We shouldn't hit this condition until any - // deployed Nexuses start setting the "quiesce" - // booleans of omicron.public.db_metadata to true. - error!(log, "Schema handoff from old -> new Nexus is not yet implemented"); - return Err(BackoffError::permanent(format!( - "Nexus @ schema {EXPECTED_VERSION} needs to \ - handoff, but not it is not implemented" - ))); - } else { - // We are running an out-of-date schema, and no - // longer need to participate in handoff (e.g., - // maybe we participated in handoff, rebooted, and - // the schema has progressed since then). - error!( - log, - "Our database schema appears too old"; - "version" => ?EXPECTED_VERSION - ); - return Err(BackoffError::permanent(format!( - "Schema {EXPECTED_VERSION:?} is too old" - ))); - } + // Determine what action to take based on policy + let action = SchemaAction::new(schema_status.clone(), policy); + match action { + SchemaAction::Ready => { + info!(log, "Database schema version is up to date"); + return Ok(()); } - Err(e) => { - let err = slog_error_chain::InlineErrorChain::new(&e); - warn!(log, "Failed to ensure schema version"; &err); - return Err(BackoffError::transient(err.to_string())); + SchemaAction::Quiesce => { + // We are running an out-of-date schema, but still + // need to participate in handoff. + // + // TODO(https://github.com/oxidecomputer/omicron/issues/8501): + // This should be implemented by contacting other + // Nexus instances, verifying that quiescing has + // completed, and setting "quiesce_complete" once + // that has completed. + // + // NOTE: We shouldn't hit this condition until any + // deployed Nexuses start setting the "quiesce" + // booleans of omicron.public.db_metadata to true. + error!(log, "Schema handoff from old -> new Nexus is not yet implemented"); + return Err(BackoffError::permanent(format!( + "Nexus @ schema {EXPECTED_VERSION} needs to \ + handoff, but not it is not implemented" + ))); } - }; + SchemaAction::Wait => { + warn!( + log, + "Schema version is older than desired; waiting for quiesce to complete" + ); + return Err(BackoffError::transient( + "Waiting for quiesce to complete".to_string() + )); + } + SchemaAction::Update => { + return datastore + .ensure_schema(&log, EXPECTED_VERSION, policy, all_versions) + .await + .map_err(|err| { + let err = slog_error_chain::InlineErrorChain::new(&err); + BackoffError::transient( + format!("Failed to ensure schema: {err}") + ) + }); + } + SchemaAction::Refuse { reason: SchemaRefuseReason::OutOfDate } => { + warn!( + log, + "Schema version is older than desired; waiting for operator to update schema" + ); + return Err(BackoffError::transient( + "Waiting for operator-controlled schema update".to_string() + )); + } + SchemaAction::Refuse { reason: SchemaRefuseReason::TooNew } => { + warn!( + log, + "Refusing to access too-new database"; + ); + return Err(BackoffError::permanent( + "Refusing to access too-new database".to_string() + )); + } + } }, |_, _| {}, ) diff --git a/nexus/src/bin/schema-updater.rs b/nexus/src/bin/schema-updater.rs index b6fff84d1d0..f1a9674d6cc 100644 --- a/nexus/src/bin/schema-updater.rs +++ b/nexus/src/bin/schema-updater.rs @@ -109,13 +109,13 @@ async fn main_impl() -> anyhow::Result<()> { Cmd::Upgrade { version } => { println!("Upgrading to {version}"); - let config = db::datastore::UpdateConfiguration::Enabled { - all_versions: &all_versions, - ignore_quiesce: true, - }; - datastore - .ensure_schema(&log, version.clone(), config) + .ensure_schema( + &log, + version.clone(), + db::datastore::ConsumerPolicy::UpdateForcefully, + Some(&all_versions), + ) .await .map_err(|e| anyhow!(e))?; println!("Upgrade to {version} complete");