Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 67 additions & 26 deletions nexus/db-queries/src/db/datastore/db_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,32 +443,35 @@ impl DataStore {
})
}

// Ensures that the database schema matches "desired_version".
//
// - Updating the schema makes the database incompatible with older
// versions of Nexus, which are not running "desired_version".
// - This is a one-way operation that cannot be undone.
// - The caller is responsible for ensuring that the new version is valid,
// and that all running Nexus instances can understand the new schema
// version.
//
// TODO: This function assumes that all concurrently executing Nexus
// instances on the rack are operating on the same version of software.
// If that assumption is broken, nothing would stop a "new deployment"
// from making a change that invalidates the queries used by an "old
// deployment".
pub async fn ensure_schema(
/// Ensures that the database schema matches `desired_version`.
///
/// - `validated_action`: A [ValidatedDatastoreSetupAction], indicating that
/// [Self::check_schema_and_access] has already been called.
/// - `all_versions`: A description of all schema versions between
/// "whatever is in the DB" and `desired_version`, instructing
/// how to perform an update.
pub async fn update_schema(
&self,
log: &Logger,
desired_version: Version,
validated_action: ValidatedDatastoreSetupAction,
all_versions: Option<&AllSchemaVersions>,
) -> Result<(), anyhow::Error> {
let action = validated_action.action();

match action {
DatastoreSetupAction::Ready => {
bail!("No schema update is necessary")
}
DatastoreSetupAction::Update => (),
_ => bail!("Not ready for schema update"),
}

let desired_version = validated_action.desired_version().clone();
let (found_version, found_target_version) = self
.database_schema_version()
.await
.context("Cannot read database schema version")?;

let log = log.new(o!(
let log = self.log.new(o!(
"found_version" => found_version.to_string(),
"desired_version" => desired_version.to_string(),
));
Expand Down Expand Up @@ -1166,15 +1169,34 @@ mod test {
// Confirms that calling the internal "ensure_schema" function can succeed
// when the database is already at that version.
#[tokio::test]
async fn ensure_schema_is_current_version() {
let logctx = dev::test_setup_log("ensure_schema_is_current_version");
async fn check_schema_is_current_version() {
let logctx = dev::test_setup_log("check_schema_is_current_version");
let db = TestDatabase::new_with_raw_datastore(&logctx.log).await;
let datastore = db.datastore();

datastore
.ensure_schema(&logctx.log, SCHEMA_VERSION, None)
let checked_action = datastore
.check_schema_and_access(
IdentityCheckPolicy::DontCare,
SCHEMA_VERSION,
)
.await
.expect("Failed to ensure schema");
.expect("Failed to check schema and access");

assert!(
matches!(checked_action.action(), DatastoreSetupAction::Ready),
"Unexpected action: {:?}",
checked_action.action(),
);
assert_eq!(
checked_action.desired_version(),
&SCHEMA_VERSION,
"Unexpected desired version: {}",
checked_action.desired_version()
);

datastore.update_schema(checked_action, None).await.expect_err(
"Should not be able to update schema that's already up-to-date",
);

db.terminate().await;
logctx.cleanup_successful();
Expand Down Expand Up @@ -1277,8 +1299,13 @@ mod test {
let log = log.clone();
let pool = pool.clone();
tokio::task::spawn(async move {
let datastore =
DataStore::new(&log, pool, Some(&all_versions)).await?;
let datastore = DataStore::new(
&log,
pool,
Some(&all_versions),
IdentityCheckPolicy::DontCare,
)
.await?;

// This is the crux of this test: confirm that, as each
// migration completes, it's not possible to see any artifacts
Expand Down Expand Up @@ -1405,9 +1432,23 @@ mod test {

// Manually construct the datastore to avoid the backoff timeout.
// We want to trigger errors, but have no need to wait.

let datastore = DataStore::new_unchecked(log.clone(), pool.clone());
let checked_action = datastore
.check_schema_and_access(
IdentityCheckPolicy::DontCare,
SCHEMA_VERSION,
)
.await
.expect("Failed to check schema and access");

// This needs to be in a loop because we constructed a schema change
// that will intentionally fail sometimes when doing this work.
//
// This isn't a normal behavior! But we're trying to test the
// intermediate steps of a schema change here.
while let Err(e) = datastore
.ensure_schema(&log, SCHEMA_VERSION, Some(&all_versions))
.update_schema(checked_action.clone(), Some(&all_versions))
.await
{
warn!(log, "Failed to ensure schema"; "err" => %e);
Expand Down
101 changes: 89 additions & 12 deletions nexus/db-queries/src/db/datastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use omicron_common::backoff::{
};
use omicron_uuid_kinds::{GenericUuid, OmicronZoneUuid, SledUuid};
use slog::Logger;
use slog_error_chain::InlineErrorChain;
use std::net::Ipv6Addr;
use std::num::NonZeroU32;
use std::sync::Arc;
Expand Down Expand Up @@ -121,6 +122,8 @@ pub mod webhook_delivery;
mod zpool;

pub use address_lot::AddressLotCreateResult;
pub use db_metadata::DatastoreSetupAction;
pub use db_metadata::ValidatedDatastoreSetupAction;
pub use dns::DataStoreDnsTest;
pub use dns::DnsVersionUpdateBuilder;
pub use ereport::EreportFilters;
Expand Down Expand Up @@ -239,16 +242,19 @@ impl DataStore {
log: &Logger,
pool: Arc<Pool>,
config: Option<&AllSchemaVersions>,
identity_check: IdentityCheckPolicy,
) -> Result<Self, String> {
Self::new_with_timeout(log, pool, config, None).await
Self::new_with_timeout(log, pool, config, None, identity_check).await
}

pub async fn new_with_timeout(
log: &Logger,
pool: Arc<Pool>,
config: Option<&AllSchemaVersions>,
try_for: Option<std::time::Duration>,
identity_check: IdentityCheckPolicy,
) -> Result<Self, String> {
use db_metadata::DatastoreSetupAction;
use nexus_db_model::SCHEMA_VERSION as EXPECTED_VERSION;

let datastore =
Expand All @@ -262,25 +268,96 @@ impl DataStore {
|| async {
if let Some(try_for) = try_for {
if std::time::Instant::now() > start + try_for {
return Err(BackoffError::permanent(()));
return Err(BackoffError::permanent(
"Timeout waiting for DataStore::new_with_timeout",
));
}
}

match datastore
.ensure_schema(&log, EXPECTED_VERSION, config)
.await
{
Ok(()) => return Ok(()),
Err(e) => {
warn!(log, "Failed to ensure schema version"; "error" => #%e);
loop {
let checked_action = datastore
.check_schema_and_access(
identity_check,
EXPECTED_VERSION,
)
.await
.map_err(|err| {
warn!(
log,
"Cannot check schema version / Nexus access";
InlineErrorChain::new(err.as_ref()),
);
BackoffError::transient(
"Cannot check schema version / Nexus access",
)
})?;

match checked_action.action() {
DatastoreSetupAction::Ready => {
info!(log, "Datastore is ready for usage");
return Ok(());
}
DatastoreSetupAction::NeedsHandoff { nexus_id } => {
info!(log, "Datastore is awaiting handoff");

datastore
.attempt_handoff(*nexus_id)
.await
.map_err(|err| {
warn!(
log,
"Could not handoff to new nexus";
err
);
BackoffError::transient(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are all the error cases here transient? Looking at 8932, maybe NexusInWrongState is a permanent error?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think even in this case, I do want us to retry.

Right now, it's actually not possible for us to return from the NexusInWrongState case:

  • It's returned from a transaction that checks "all nexus state records are either quiesced or not_yet"
  • ... and it's returned when our record specifically != not_yet, therefore it must be quiesced.
  • ... but if our record is quiesced, the previous call to check_schema_and_access would have returned Refuse, rather than NeedsHandoff
  • ... further, the transition from active to quiesced shouldn't be racy (without operator intervention) because each Nexus should be responsible for performing this transition for itself.

So, TL;DR:

  • This check is currently defensive, and not possible without someone poking db records manually
  • When we retry, we retry calling check_schema_and_access again, before re-trying attempt_handoff
  • If we do that, while also being quiesced, we'll converge to "locked out of the db"
  • ... BUT ALSO in the future, or in the face of weird concurrent events, IMO the decision to "restart all checks" doesn't seem like a bad choice.

I think that seeing a weird state in one of these branches, and choosing to "re-evaluate the world again from scratch" seems like a reasonable choice. check_schema_and_access should be able to determine a reasonable next step, and it only throws errors if we cannot access the database (which I consider to be transient).

"Could not handoff to new nexus",
)
})?;

// If the handoff was successful, immediately
// re-evaluate the schema and access policies to see
// if we should update or not.
continue;
}
DatastoreSetupAction::TryLater => {
error!(log, "Waiting for metadata; trying later");
return Err(BackoffError::permanent(
"Waiting for metadata; trying later",
));
}
DatastoreSetupAction::Update => {
info!(
log,
"Datastore should be updated before usage"
);
datastore
.update_schema(checked_action, config)
.await
.map_err(|err| {
warn!(
log,
"Failed to update schema version";
InlineErrorChain::new(err.as_ref())
);
BackoffError::transient(
"Failed to update schema version",
)
})?;
return Ok(());
}
DatastoreSetupAction::Refuse => {
error!(log, "Datastore should not be used");
return Err(BackoffError::permanent(
"Datastore should not be used",
));
}
}
};
return Err(BackoffError::transient(()));
}
},
|_, _| {},
)
.await
.map_err(|_| "Failed to read valid DB schema".to_string())?;
.map_err(|err| err.to_string())?;

Ok(datastore)
}
Expand Down
16 changes: 14 additions & 2 deletions nexus/db-queries/src/db/pub_test_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::authz;
use crate::context::OpContext;
use crate::db;
use crate::db::DataStore;
use crate::db::datastore::IdentityCheckPolicy;
use omicron_test_utils::dev::db::CockroachInstance;
use slog::Logger;
use std::sync::Arc;
Expand Down Expand Up @@ -114,7 +115,14 @@ impl TestDatabaseBuilder {
Interface::Datastore => {
let pool = new_pool(log, &db);
let datastore = Arc::new(
DataStore::new(&log, pool, None).await.unwrap(),
DataStore::new(
&log,
pool,
None,
IdentityCheckPolicy::DontCare,
)
.await
.unwrap(),
);
TestDatabase {
db,
Expand Down Expand Up @@ -300,7 +308,11 @@ async fn datastore_test(

let cfg = db::Config { url: db.pg_config().clone() };
let pool = Arc::new(db::Pool::new_single_host(&log, &cfg));
let datastore = Arc::new(DataStore::new(&log, pool, None).await.unwrap());
let datastore = Arc::new(
DataStore::new(&log, pool, None, IdentityCheckPolicy::DontCare)
.await
.unwrap(),
);

// Create an OpContext with the credentials of "db-init" just for the
// purpose of loading the built-in users, roles, and assignments.
Expand Down
3 changes: 3 additions & 0 deletions nexus/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use nexus_db_queries::authn;
use nexus_db_queries::authz;
use nexus_db_queries::context::OpContext;
use nexus_db_queries::db;
use nexus_db_queries::db::datastore::IdentityCheckPolicy;
use nexus_mgs_updates::ArtifactCache;
use nexus_mgs_updates::MgsUpdateDriver;
use nexus_types::deployment::PendingMgsUpdates;
Expand Down Expand Up @@ -306,12 +307,14 @@ impl Nexus {
.map(|s| AllSchemaVersions::load(&s.schema_dir))
.transpose()
.map_err(|error| format!("{error:#}"))?;
let nexus_id = config.deployment.id;
let db_datastore = Arc::new(
db::DataStore::new_with_timeout(
&log,
Arc::clone(&pool),
all_versions.as_ref(),
config.pkg.tunables.load_timeout,
IdentityCheckPolicy::CheckAndTakeover { nexus_id },
)
.await?,
);
Expand Down
41 changes: 36 additions & 5 deletions nexus/src/bin/schema-updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use nexus_db_model::AllSchemaVersions;
use nexus_db_model::SCHEMA_VERSION;
use nexus_db_queries::db;
use nexus_db_queries::db::DataStore;
use nexus_db_queries::db::datastore::DatastoreSetupAction;
use nexus_db_queries::db::datastore::IdentityCheckPolicy;
use semver::Version;
use slog::Drain;
use slog::Level;
Expand Down Expand Up @@ -108,11 +110,40 @@ async fn main_impl() -> anyhow::Result<()> {
}
Cmd::Upgrade { version } => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily on this PR - should we do more to discourage the use of this tool? If something goes wrong with handoffs we'll needed it, but in general we shouldn't run this anymore (once the whole stack of work lands), right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

totally agreed - I think the main usage of this is from https://github.com/oxidecomputer/meta/blob/master/engineering/dogfood/overview.adoc , which we can and should patch once we're ready to go to the full "online update" world for Nexus.

println!("Upgrading to {version}");
datastore
.ensure_schema(&log, version.clone(), Some(&all_versions))
.await
.map_err(|e| anyhow!(e))?;
println!("Upgrade to {version} complete");
let checked_action = datastore
.check_schema_and_access(
IdentityCheckPolicy::DontCare,
version.clone(),
)
.await?;

match checked_action.action() {
DatastoreSetupAction::Ready => {
println!("Already at version {version}")
}
DatastoreSetupAction::Update => {
datastore
.update_schema(checked_action, Some(&all_versions))
.await
.map_err(|e| anyhow!(e))?;
println!("Update to {version} complete");
}
DatastoreSetupAction::Refuse => {
println!("Refusing to update to version {version}")
}
DatastoreSetupAction::TryLater
| DatastoreSetupAction::NeedsHandoff { .. } => {
// This case should not happen - we supplied
// IdentityCheckPolicy::DontCare, so we should not be told
// to attempt a takeover by a specific Nexus.
println!(
"Refusing to update to version {version}. \
The schema updater tried to ignore the identity check, \
but got a response indicating handoff is needed. \
This is unexpected, and probably a bug"
)
}
}
}
}
datastore.terminate().await;
Expand Down
Loading
Loading