diff --git a/nexus/db-queries/src/db/datastore/db_metadata.rs b/nexus/db-queries/src/db/datastore/db_metadata.rs index 4d679d2dfef..6dd529cd40a 100644 --- a/nexus/db-queries/src/db/datastore/db_metadata.rs +++ b/nexus/db-queries/src/db/datastore/db_metadata.rs @@ -443,32 +443,35 @@ impl DataStore { }) } - // Ensures that the database schema matches "desired_version". - // - // - Updating the schema makes the database incompatible with older - // versions of Nexus, which are not running "desired_version". - // - This is a one-way operation that cannot be undone. - // - The caller is responsible for ensuring that the new version is valid, - // and that all running Nexus instances can understand the new schema - // version. - // - // TODO: This function assumes that all concurrently executing Nexus - // instances on the rack are operating on the same version of software. - // If that assumption is broken, nothing would stop a "new deployment" - // from making a change that invalidates the queries used by an "old - // deployment". - pub async fn ensure_schema( + /// Ensures that the database schema matches `desired_version`. + /// + /// - `validated_action`: A [ValidatedDatastoreSetupAction], indicating that + /// [Self::check_schema_and_access] has already been called. + /// - `all_versions`: A description of all schema versions between + /// "whatever is in the DB" and `desired_version`, instructing + /// how to perform an update. + pub async fn update_schema( &self, - log: &Logger, - desired_version: Version, + validated_action: ValidatedDatastoreSetupAction, all_versions: Option<&AllSchemaVersions>, ) -> Result<(), anyhow::Error> { + let action = validated_action.action(); + + match action { + DatastoreSetupAction::Ready => { + bail!("No schema update is necessary") + } + DatastoreSetupAction::Update => (), + _ => bail!("Not ready for schema update"), + } + + let desired_version = validated_action.desired_version().clone(); let (found_version, found_target_version) = self .database_schema_version() .await .context("Cannot read database schema version")?; - let log = log.new(o!( + let log = self.log.new(o!( "found_version" => found_version.to_string(), "desired_version" => desired_version.to_string(), )); @@ -1166,15 +1169,34 @@ mod test { // Confirms that calling the internal "ensure_schema" function can succeed // when the database is already at that version. #[tokio::test] - async fn ensure_schema_is_current_version() { - let logctx = dev::test_setup_log("ensure_schema_is_current_version"); + async fn check_schema_is_current_version() { + let logctx = dev::test_setup_log("check_schema_is_current_version"); let db = TestDatabase::new_with_raw_datastore(&logctx.log).await; let datastore = db.datastore(); - datastore - .ensure_schema(&logctx.log, SCHEMA_VERSION, None) + let checked_action = datastore + .check_schema_and_access( + IdentityCheckPolicy::DontCare, + SCHEMA_VERSION, + ) .await - .expect("Failed to ensure schema"); + .expect("Failed to check schema and access"); + + assert!( + matches!(checked_action.action(), DatastoreSetupAction::Ready), + "Unexpected action: {:?}", + checked_action.action(), + ); + assert_eq!( + checked_action.desired_version(), + &SCHEMA_VERSION, + "Unexpected desired version: {}", + checked_action.desired_version() + ); + + datastore.update_schema(checked_action, None).await.expect_err( + "Should not be able to update schema that's already up-to-date", + ); db.terminate().await; logctx.cleanup_successful(); @@ -1277,8 +1299,13 @@ mod test { let log = log.clone(); let pool = pool.clone(); tokio::task::spawn(async move { - let datastore = - DataStore::new(&log, pool, Some(&all_versions)).await?; + let datastore = DataStore::new( + &log, + pool, + Some(&all_versions), + IdentityCheckPolicy::DontCare, + ) + .await?; // This is the crux of this test: confirm that, as each // migration completes, it's not possible to see any artifacts @@ -1405,9 +1432,23 @@ mod test { // Manually construct the datastore to avoid the backoff timeout. // We want to trigger errors, but have no need to wait. + let datastore = DataStore::new_unchecked(log.clone(), pool.clone()); + let checked_action = datastore + .check_schema_and_access( + IdentityCheckPolicy::DontCare, + SCHEMA_VERSION, + ) + .await + .expect("Failed to check schema and access"); + + // This needs to be in a loop because we constructed a schema change + // that will intentionally fail sometimes when doing this work. + // + // This isn't a normal behavior! But we're trying to test the + // intermediate steps of a schema change here. while let Err(e) = datastore - .ensure_schema(&log, SCHEMA_VERSION, Some(&all_versions)) + .update_schema(checked_action.clone(), Some(&all_versions)) .await { warn!(log, "Failed to ensure schema"; "err" => %e); diff --git a/nexus/db-queries/src/db/datastore/mod.rs b/nexus/db-queries/src/db/datastore/mod.rs index 23642d93340..755df6e9860 100644 --- a/nexus/db-queries/src/db/datastore/mod.rs +++ b/nexus/db-queries/src/db/datastore/mod.rs @@ -41,6 +41,7 @@ use omicron_common::backoff::{ }; use omicron_uuid_kinds::{GenericUuid, OmicronZoneUuid, SledUuid}; use slog::Logger; +use slog_error_chain::InlineErrorChain; use std::net::Ipv6Addr; use std::num::NonZeroU32; use std::sync::Arc; @@ -121,6 +122,8 @@ pub mod webhook_delivery; mod zpool; pub use address_lot::AddressLotCreateResult; +pub use db_metadata::DatastoreSetupAction; +pub use db_metadata::ValidatedDatastoreSetupAction; pub use dns::DataStoreDnsTest; pub use dns::DnsVersionUpdateBuilder; pub use ereport::EreportFilters; @@ -239,8 +242,9 @@ impl DataStore { log: &Logger, pool: Arc, config: Option<&AllSchemaVersions>, + identity_check: IdentityCheckPolicy, ) -> Result { - Self::new_with_timeout(log, pool, config, None).await + Self::new_with_timeout(log, pool, config, None, identity_check).await } pub async fn new_with_timeout( @@ -248,7 +252,9 @@ impl DataStore { pool: Arc, config: Option<&AllSchemaVersions>, try_for: Option, + identity_check: IdentityCheckPolicy, ) -> Result { + use db_metadata::DatastoreSetupAction; use nexus_db_model::SCHEMA_VERSION as EXPECTED_VERSION; let datastore = @@ -262,25 +268,96 @@ impl DataStore { || async { if let Some(try_for) = try_for { if std::time::Instant::now() > start + try_for { - return Err(BackoffError::permanent(())); + return Err(BackoffError::permanent( + "Timeout waiting for DataStore::new_with_timeout", + )); } } - match datastore - .ensure_schema(&log, EXPECTED_VERSION, config) - .await - { - Ok(()) => return Ok(()), - Err(e) => { - warn!(log, "Failed to ensure schema version"; "error" => #%e); + loop { + let checked_action = datastore + .check_schema_and_access( + identity_check, + EXPECTED_VERSION, + ) + .await + .map_err(|err| { + warn!( + log, + "Cannot check schema version / Nexus access"; + InlineErrorChain::new(err.as_ref()), + ); + BackoffError::transient( + "Cannot check schema version / Nexus access", + ) + })?; + + match checked_action.action() { + DatastoreSetupAction::Ready => { + info!(log, "Datastore is ready for usage"); + return Ok(()); + } + DatastoreSetupAction::NeedsHandoff { nexus_id } => { + info!(log, "Datastore is awaiting handoff"); + + datastore + .attempt_handoff(*nexus_id) + .await + .map_err(|err| { + warn!( + log, + "Could not handoff to new nexus"; + err + ); + BackoffError::transient( + "Could not handoff to new nexus", + ) + })?; + + // If the handoff was successful, immediately + // re-evaluate the schema and access policies to see + // if we should update or not. + continue; + } + DatastoreSetupAction::TryLater => { + error!(log, "Waiting for metadata; trying later"); + return Err(BackoffError::permanent( + "Waiting for metadata; trying later", + )); + } + DatastoreSetupAction::Update => { + info!( + log, + "Datastore should be updated before usage" + ); + datastore + .update_schema(checked_action, config) + .await + .map_err(|err| { + warn!( + log, + "Failed to update schema version"; + InlineErrorChain::new(err.as_ref()) + ); + BackoffError::transient( + "Failed to update schema version", + ) + })?; + return Ok(()); + } + DatastoreSetupAction::Refuse => { + error!(log, "Datastore should not be used"); + return Err(BackoffError::permanent( + "Datastore should not be used", + )); + } } - }; - return Err(BackoffError::transient(())); + } }, |_, _| {}, ) .await - .map_err(|_| "Failed to read valid DB schema".to_string())?; + .map_err(|err| err.to_string())?; Ok(datastore) } diff --git a/nexus/db-queries/src/db/pub_test_utils/mod.rs b/nexus/db-queries/src/db/pub_test_utils/mod.rs index db2acabd06a..420f96ee6ab 100644 --- a/nexus/db-queries/src/db/pub_test_utils/mod.rs +++ b/nexus/db-queries/src/db/pub_test_utils/mod.rs @@ -12,6 +12,7 @@ use crate::authz; use crate::context::OpContext; use crate::db; use crate::db::DataStore; +use crate::db::datastore::IdentityCheckPolicy; use omicron_test_utils::dev::db::CockroachInstance; use slog::Logger; use std::sync::Arc; @@ -114,7 +115,14 @@ impl TestDatabaseBuilder { Interface::Datastore => { let pool = new_pool(log, &db); let datastore = Arc::new( - DataStore::new(&log, pool, None).await.unwrap(), + DataStore::new( + &log, + pool, + None, + IdentityCheckPolicy::DontCare, + ) + .await + .unwrap(), ); TestDatabase { db, @@ -300,7 +308,11 @@ async fn datastore_test( let cfg = db::Config { url: db.pg_config().clone() }; let pool = Arc::new(db::Pool::new_single_host(&log, &cfg)); - let datastore = Arc::new(DataStore::new(&log, pool, None).await.unwrap()); + let datastore = Arc::new( + DataStore::new(&log, pool, None, IdentityCheckPolicy::DontCare) + .await + .unwrap(), + ); // Create an OpContext with the credentials of "db-init" just for the // purpose of loading the built-in users, roles, and assignments. diff --git a/nexus/src/app/mod.rs b/nexus/src/app/mod.rs index c9c9e151614..300869bd866 100644 --- a/nexus/src/app/mod.rs +++ b/nexus/src/app/mod.rs @@ -24,6 +24,7 @@ use nexus_db_queries::authn; use nexus_db_queries::authz; use nexus_db_queries::context::OpContext; use nexus_db_queries::db; +use nexus_db_queries::db::datastore::IdentityCheckPolicy; use nexus_mgs_updates::ArtifactCache; use nexus_mgs_updates::MgsUpdateDriver; use nexus_types::deployment::PendingMgsUpdates; @@ -306,12 +307,14 @@ impl Nexus { .map(|s| AllSchemaVersions::load(&s.schema_dir)) .transpose() .map_err(|error| format!("{error:#}"))?; + let nexus_id = config.deployment.id; let db_datastore = Arc::new( db::DataStore::new_with_timeout( &log, Arc::clone(&pool), all_versions.as_ref(), config.pkg.tunables.load_timeout, + IdentityCheckPolicy::CheckAndTakeover { nexus_id }, ) .await?, ); diff --git a/nexus/src/bin/schema-updater.rs b/nexus/src/bin/schema-updater.rs index ead8554f734..e6d10ef18c1 100644 --- a/nexus/src/bin/schema-updater.rs +++ b/nexus/src/bin/schema-updater.rs @@ -14,6 +14,8 @@ use nexus_db_model::AllSchemaVersions; use nexus_db_model::SCHEMA_VERSION; use nexus_db_queries::db; use nexus_db_queries::db::DataStore; +use nexus_db_queries::db::datastore::DatastoreSetupAction; +use nexus_db_queries::db::datastore::IdentityCheckPolicy; use semver::Version; use slog::Drain; use slog::Level; @@ -108,11 +110,40 @@ async fn main_impl() -> anyhow::Result<()> { } Cmd::Upgrade { version } => { println!("Upgrading to {version}"); - datastore - .ensure_schema(&log, version.clone(), Some(&all_versions)) - .await - .map_err(|e| anyhow!(e))?; - println!("Upgrade to {version} complete"); + let checked_action = datastore + .check_schema_and_access( + IdentityCheckPolicy::DontCare, + version.clone(), + ) + .await?; + + match checked_action.action() { + DatastoreSetupAction::Ready => { + println!("Already at version {version}") + } + DatastoreSetupAction::Update => { + datastore + .update_schema(checked_action, Some(&all_versions)) + .await + .map_err(|e| anyhow!(e))?; + println!("Update to {version} complete"); + } + DatastoreSetupAction::Refuse => { + println!("Refusing to update to version {version}") + } + DatastoreSetupAction::TryLater + | DatastoreSetupAction::NeedsHandoff { .. } => { + // This case should not happen - we supplied + // IdentityCheckPolicy::DontCare, so we should not be told + // to attempt a takeover by a specific Nexus. + println!( + "Refusing to update to version {version}. \ + The schema updater tried to ignore the identity check, \ + but got a response indicating handoff is needed. \ + This is unexpected, and probably a bug" + ) + } + } } } datastore.terminate().await; diff --git a/nexus/src/populate.rs b/nexus/src/populate.rs index b77245d5df4..93632d77c10 100644 --- a/nexus/src/populate.rs +++ b/nexus/src/populate.rs @@ -339,6 +339,7 @@ mod test { use nexus_db_queries::authz; use nexus_db_queries::context::OpContext; use nexus_db_queries::db; + use nexus_db_queries::db::datastore::IdentityCheckPolicy; use nexus_db_queries::db::pub_test_utils::TestDatabase; use omicron_common::api::external::Error; use omicron_test_utils::dev; @@ -364,7 +365,14 @@ mod test { let cfg = db::Config { url: db.crdb().pg_config().clone() }; let pool = Arc::new(db::Pool::new_single_host(&logctx.log, &cfg)); let datastore = Arc::new( - db::DataStore::new(&logctx.log, pool, None).await.unwrap(), + db::DataStore::new( + &logctx.log, + pool, + None, + IdentityCheckPolicy::DontCare, + ) + .await + .unwrap(), ); let opctx = OpContext::for_background( logctx.log.clone(), @@ -415,7 +423,14 @@ mod test { // We need to create the datastore before tearing down the database, as // it verifies the schema version of the DB while booting. let datastore = Arc::new( - db::DataStore::new(&logctx.log, pool, None).await.unwrap(), + db::DataStore::new( + &logctx.log, + pool, + None, + IdentityCheckPolicy::DontCare, + ) + .await + .unwrap(), ); let opctx = OpContext::for_background( logctx.log.clone(), diff --git a/nexus/tests/integration_tests/initialization.rs b/nexus/tests/integration_tests/initialization.rs index fa4511d6de1..32c030534f2 100644 --- a/nexus/tests/integration_tests/initialization.rs +++ b/nexus/tests/integration_tests/initialization.rs @@ -261,7 +261,7 @@ async fn test_nexus_does_not_boot_without_valid_schema() { .expect_err("Nexus should have failed to start"); assert!( - err.contains("Failed to read valid DB schema"), + err.contains("Datastore should not be used"), "Saw error: {err}" );