Skip to content

Commit 79a12dd

Browse files
committed
(5/N) Read database access records on boot
1 parent c329ca8 commit 79a12dd

File tree

7 files changed

+226
-48
lines changed

7 files changed

+226
-48
lines changed

nexus/db-queries/src/db/datastore/db_metadata.rs

Lines changed: 67 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -443,32 +443,35 @@ impl DataStore {
443443
})
444444
}
445445

446-
// Ensures that the database schema matches "desired_version".
447-
//
448-
// - Updating the schema makes the database incompatible with older
449-
// versions of Nexus, which are not running "desired_version".
450-
// - This is a one-way operation that cannot be undone.
451-
// - The caller is responsible for ensuring that the new version is valid,
452-
// and that all running Nexus instances can understand the new schema
453-
// version.
454-
//
455-
// TODO: This function assumes that all concurrently executing Nexus
456-
// instances on the rack are operating on the same version of software.
457-
// If that assumption is broken, nothing would stop a "new deployment"
458-
// from making a change that invalidates the queries used by an "old
459-
// deployment".
460-
pub async fn ensure_schema(
446+
/// Ensures that the database schema matches `desired_version`.
447+
///
448+
/// - `validated_action`: A [ValidatedDatastoreSetupAction], indicating that
449+
/// [Self::check_schema_and_access] has already been called.
450+
/// - `all_versions`: A description of all schema versions between
451+
/// "whatever is in the DB" and `desired_version`, instructing
452+
/// how to perform an update.
453+
pub async fn update_schema(
461454
&self,
462-
log: &Logger,
463-
desired_version: Version,
455+
validated_action: ValidatedDatastoreSetupAction,
464456
all_versions: Option<&AllSchemaVersions>,
465457
) -> Result<(), anyhow::Error> {
458+
let action = validated_action.action();
459+
460+
match action {
461+
DatastoreSetupAction::Ready => {
462+
bail!("No schema update is necessary")
463+
}
464+
DatastoreSetupAction::Update => (),
465+
_ => bail!("Not ready for schema update"),
466+
}
467+
468+
let desired_version = validated_action.desired_version().clone();
466469
let (found_version, found_target_version) = self
467470
.database_schema_version()
468471
.await
469472
.context("Cannot read database schema version")?;
470473

471-
let log = log.new(o!(
474+
let log = self.log.new(o!(
472475
"found_version" => found_version.to_string(),
473476
"desired_version" => desired_version.to_string(),
474477
));
@@ -1166,15 +1169,34 @@ mod test {
11661169
// Confirms that calling the internal "ensure_schema" function can succeed
11671170
// when the database is already at that version.
11681171
#[tokio::test]
1169-
async fn ensure_schema_is_current_version() {
1170-
let logctx = dev::test_setup_log("ensure_schema_is_current_version");
1172+
async fn check_schema_is_current_version() {
1173+
let logctx = dev::test_setup_log("check_schema_is_current_version");
11711174
let db = TestDatabase::new_with_raw_datastore(&logctx.log).await;
11721175
let datastore = db.datastore();
11731176

1174-
datastore
1175-
.ensure_schema(&logctx.log, SCHEMA_VERSION, None)
1177+
let checked_action = datastore
1178+
.check_schema_and_access(
1179+
IdentityCheckPolicy::DontCare,
1180+
SCHEMA_VERSION,
1181+
)
11761182
.await
1177-
.expect("Failed to ensure schema");
1183+
.expect("Failed to check schema and access");
1184+
1185+
assert!(
1186+
matches!(checked_action.action(), DatastoreSetupAction::Ready),
1187+
"Unexpected action: {:?}",
1188+
checked_action.action(),
1189+
);
1190+
assert_eq!(
1191+
checked_action.desired_version(),
1192+
&SCHEMA_VERSION,
1193+
"Unexpected desired version: {}",
1194+
checked_action.desired_version()
1195+
);
1196+
1197+
datastore.update_schema(checked_action, None).await.expect_err(
1198+
"Should not be able to update schema that's already up-to-date",
1199+
);
11781200

11791201
db.terminate().await;
11801202
logctx.cleanup_successful();
@@ -1277,8 +1299,13 @@ mod test {
12771299
let log = log.clone();
12781300
let pool = pool.clone();
12791301
tokio::task::spawn(async move {
1280-
let datastore =
1281-
DataStore::new(&log, pool, Some(&all_versions)).await?;
1302+
let datastore = DataStore::new(
1303+
&log,
1304+
pool,
1305+
Some(&all_versions),
1306+
IdentityCheckPolicy::DontCare,
1307+
)
1308+
.await?;
12821309

12831310
// This is the crux of this test: confirm that, as each
12841311
// migration completes, it's not possible to see any artifacts
@@ -1405,9 +1432,23 @@ mod test {
14051432

14061433
// Manually construct the datastore to avoid the backoff timeout.
14071434
// We want to trigger errors, but have no need to wait.
1435+
14081436
let datastore = DataStore::new_unchecked(log.clone(), pool.clone());
1437+
let checked_action = datastore
1438+
.check_schema_and_access(
1439+
IdentityCheckPolicy::DontCare,
1440+
SCHEMA_VERSION,
1441+
)
1442+
.await
1443+
.expect("Failed to check schema and access");
1444+
1445+
// This needs to be in a loop because we constructed a schema change
1446+
// that will intentionally fail sometimes when doing this work.
1447+
//
1448+
// This isn't a normal behavior! But we're trying to test the
1449+
// intermediate steps of a schema change here.
14091450
while let Err(e) = datastore
1410-
.ensure_schema(&log, SCHEMA_VERSION, Some(&all_versions))
1451+
.update_schema(checked_action.clone(), Some(&all_versions))
14111452
.await
14121453
{
14131454
warn!(log, "Failed to ensure schema"; "err" => %e);

nexus/db-queries/src/db/datastore/mod.rs

Lines changed: 88 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use omicron_common::backoff::{
4141
};
4242
use omicron_uuid_kinds::{GenericUuid, OmicronZoneUuid, SledUuid};
4343
use slog::Logger;
44+
use slog_error_chain::InlineErrorChain;
4445
use std::net::Ipv6Addr;
4546
use std::num::NonZeroU32;
4647
use std::sync::Arc;
@@ -121,6 +122,8 @@ pub mod webhook_delivery;
121122
mod zpool;
122123

123124
pub use address_lot::AddressLotCreateResult;
125+
pub use db_metadata::DatastoreSetupAction;
126+
pub use db_metadata::ValidatedDatastoreSetupAction;
124127
pub use dns::DataStoreDnsTest;
125128
pub use dns::DnsVersionUpdateBuilder;
126129
pub use ereport::EreportFilters;
@@ -239,16 +242,19 @@ impl DataStore {
239242
log: &Logger,
240243
pool: Arc<Pool>,
241244
config: Option<&AllSchemaVersions>,
245+
identity_check: IdentityCheckPolicy,
242246
) -> Result<Self, String> {
243-
Self::new_with_timeout(log, pool, config, None).await
247+
Self::new_with_timeout(log, pool, config, None, identity_check).await
244248
}
245249

246250
pub async fn new_with_timeout(
247251
log: &Logger,
248252
pool: Arc<Pool>,
249253
config: Option<&AllSchemaVersions>,
250254
try_for: Option<std::time::Duration>,
255+
identity_check: IdentityCheckPolicy,
251256
) -> Result<Self, String> {
257+
use db_metadata::DatastoreSetupAction;
252258
use nexus_db_model::SCHEMA_VERSION as EXPECTED_VERSION;
253259

254260
let datastore =
@@ -262,25 +268,95 @@ impl DataStore {
262268
|| async {
263269
if let Some(try_for) = try_for {
264270
if std::time::Instant::now() > start + try_for {
265-
return Err(BackoffError::permanent(()));
271+
return Err(BackoffError::permanent(
272+
"Timeout waiting for DataStore::new_with_timeout",
273+
));
266274
}
267275
}
268276

269-
match datastore
270-
.ensure_schema(&log, EXPECTED_VERSION, config)
271-
.await
272-
{
273-
Ok(()) => return Ok(()),
274-
Err(e) => {
275-
warn!(log, "Failed to ensure schema version"; "error" => #%e);
277+
loop {
278+
let checked_action = datastore
279+
.check_schema_and_access(
280+
identity_check,
281+
EXPECTED_VERSION,
282+
)
283+
.await
284+
.map_err(|err| {
285+
warn!(
286+
log,
287+
"Cannot check schema version / Nexus access";
288+
"error" => InlineErrorChain::new(err.as_ref()),
289+
);
290+
BackoffError::transient(
291+
"Cannot check schema version / Nexus access",
292+
)
293+
})?;
294+
295+
match checked_action.action() {
296+
DatastoreSetupAction::Ready => {
297+
info!(log, "Datastore is ready for usage");
298+
return Ok(());
299+
}
300+
DatastoreSetupAction::NeedsHandoff { nexus_id } => {
301+
info!(log, "Datastore is awaiting handoff");
302+
303+
datastore.attempt_handoff(*nexus_id).await.map_err(
304+
|err| {
305+
warn!(
306+
log,
307+
"Could not handoff to new nexus";
308+
err
309+
);
310+
BackoffError::transient(
311+
"Could not handoff to new nexus",
312+
)
313+
},
314+
)?;
315+
316+
// If the handoff was successful, immediately
317+
// re-evaluate the schema and access policies to see
318+
// if we should update or not.
319+
continue;
320+
}
321+
DatastoreSetupAction::TryLater => {
322+
error!(log, "Waiting for metadata; trying later");
323+
return Err(BackoffError::permanent(
324+
"Waiting for metadata; trying later"
325+
));
326+
}
327+
DatastoreSetupAction::Update => {
328+
info!(
329+
log,
330+
"Datastore should be updated before usage"
331+
);
332+
datastore
333+
.update_schema(checked_action, config)
334+
.await
335+
.map_err(|err| {
336+
warn!(
337+
log,
338+
"Failed to update schema version";
339+
"error" => InlineErrorChain::new(err.as_ref())
340+
);
341+
BackoffError::transient(
342+
"Failed to update schema version",
343+
)
344+
})?;
345+
return Ok(());
346+
}
347+
DatastoreSetupAction::Refuse => {
348+
error!(log, "Datastore should not be used");
349+
return Err(BackoffError::permanent(
350+
"Datastore should not be used",
351+
));
352+
}
276353
}
277-
};
278-
return Err(BackoffError::transient(()));
354+
}
279355
},
280356
|_, _| {},
281357
)
282358
.await
283-
.map_err(|_| "Failed to read valid DB schema".to_string())?;
359+
.map_err(|err| err.to_string())?;
284360

285361
Ok(datastore)
286362
}

nexus/db-queries/src/db/pub_test_utils/mod.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::authz;
1212
use crate::context::OpContext;
1313
use crate::db;
1414
use crate::db::DataStore;
15+
use crate::db::datastore::IdentityCheckPolicy;
1516
use omicron_test_utils::dev::db::CockroachInstance;
1617
use slog::Logger;
1718
use std::sync::Arc;
@@ -114,7 +115,14 @@ impl TestDatabaseBuilder {
114115
Interface::Datastore => {
115116
let pool = new_pool(log, &db);
116117
let datastore = Arc::new(
117-
DataStore::new(&log, pool, None).await.unwrap(),
118+
DataStore::new(
119+
&log,
120+
pool,
121+
None,
122+
IdentityCheckPolicy::DontCare,
123+
)
124+
.await
125+
.unwrap(),
118126
);
119127
TestDatabase {
120128
db,
@@ -300,7 +308,11 @@ async fn datastore_test(
300308

301309
let cfg = db::Config { url: db.pg_config().clone() };
302310
let pool = Arc::new(db::Pool::new_single_host(&log, &cfg));
303-
let datastore = Arc::new(DataStore::new(&log, pool, None).await.unwrap());
311+
let datastore = Arc::new(
312+
DataStore::new(&log, pool, None, IdentityCheckPolicy::DontCare)
313+
.await
314+
.unwrap(),
315+
);
304316

305317
// Create an OpContext with the credentials of "db-init" just for the
306318
// purpose of loading the built-in users, roles, and assignments.

nexus/src/app/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use nexus_db_queries::authn;
2424
use nexus_db_queries::authz;
2525
use nexus_db_queries::context::OpContext;
2626
use nexus_db_queries::db;
27+
use nexus_db_queries::db::datastore::IdentityCheckPolicy;
2728
use nexus_mgs_updates::ArtifactCache;
2829
use nexus_mgs_updates::MgsUpdateDriver;
2930
use nexus_types::deployment::PendingMgsUpdates;
@@ -310,12 +311,14 @@ impl Nexus {
310311
.map(|s| AllSchemaVersions::load(&s.schema_dir))
311312
.transpose()
312313
.map_err(|error| format!("{error:#}"))?;
314+
let nexus_id = config.deployment.id;
313315
let db_datastore = Arc::new(
314316
db::DataStore::new_with_timeout(
315317
&log,
316318
Arc::clone(&pool),
317319
all_versions.as_ref(),
318320
config.pkg.tunables.load_timeout,
321+
IdentityCheckPolicy::CheckAndTakeover { nexus_id },
319322
)
320323
.await?,
321324
);

nexus/src/bin/schema-updater.rs

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ use nexus_db_model::AllSchemaVersions;
1414
use nexus_db_model::SCHEMA_VERSION;
1515
use nexus_db_queries::db;
1616
use nexus_db_queries::db::DataStore;
17+
use nexus_db_queries::db::datastore::DatastoreSetupAction;
18+
use nexus_db_queries::db::datastore::IdentityCheckPolicy;
1719
use semver::Version;
1820
use slog::Drain;
1921
use slog::Level;
@@ -108,11 +110,40 @@ async fn main_impl() -> anyhow::Result<()> {
108110
}
109111
Cmd::Upgrade { version } => {
110112
println!("Upgrading to {version}");
111-
datastore
112-
.ensure_schema(&log, version.clone(), Some(&all_versions))
113-
.await
114-
.map_err(|e| anyhow!(e))?;
115-
println!("Upgrade to {version} complete");
113+
let checked_action = datastore
114+
.check_schema_and_access(
115+
IdentityCheckPolicy::DontCare,
116+
version.clone(),
117+
)
118+
.await?;
119+
120+
match checked_action.action() {
121+
DatastoreSetupAction::Ready => {
122+
println!("Already at version {version}")
123+
}
124+
DatastoreSetupAction::Update => {
125+
datastore
126+
.update_schema(checked_action, Some(&all_versions))
127+
.await
128+
.map_err(|e| anyhow!(e))?;
129+
println!("Update to {version} complete");
130+
}
131+
DatastoreSetupAction::Refuse => {
132+
println!("Refusing to update to version {version}")
133+
}
134+
DatastoreSetupAction::TryLater
135+
| DatastoreSetupAction::NeedsHandoff { .. } => {
136+
// This case should not happen - we supplied
137+
// IdentityCheckPolicy::DontCare, so we should not be told
138+
// to attempt a takeover by a specific Nexus.
139+
println!(
140+
"Refusing to update to version {version}. \
141+
The schema updater tried to ignore the identity check, \
142+
but got a response indicating handoff is needed. \
143+
This is unexpected, and probably a bug"
144+
)
145+
}
146+
}
116147
}
117148
}
118149
datastore.terminate().await;

0 commit comments

Comments
 (0)