Skip to content

Commit 60d389f

Browse files
committed
(4/N) Read database access records on boot
1 parent c3be777 commit 60d389f

File tree

7 files changed

+219
-48
lines changed

7 files changed

+219
-48
lines changed

nexus/db-queries/src/db/datastore/db_metadata.rs

Lines changed: 67 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -422,32 +422,35 @@ impl DataStore {
422422
})
423423
}
424424

425-
// Ensures that the database schema matches "desired_version".
426-
//
427-
// - Updating the schema makes the database incompatible with older
428-
// versions of Nexus, which are not running "desired_version".
429-
// - This is a one-way operation that cannot be undone.
430-
// - The caller is responsible for ensuring that the new version is valid,
431-
// and that all running Nexus instances can understand the new schema
432-
// version.
433-
//
434-
// TODO: This function assumes that all concurrently executing Nexus
435-
// instances on the rack are operating on the same version of software.
436-
// If that assumption is broken, nothing would stop a "new deployment"
437-
// from making a change that invalidates the queries used by an "old
438-
// deployment".
439-
pub async fn ensure_schema(
425+
/// Ensures that the database schema matches `desired_version`.
426+
///
427+
/// - `validated_action`: A [ValidatedDatastoreSetupAction], indicating that
428+
/// [Self::check_schema_and_access] has already been called.
429+
/// - `all_versions`: A description of all schema versions between
430+
/// "whatever is in the DB" and `desired_version`, instructing
431+
/// how to perform an update.
432+
pub async fn update_schema(
440433
&self,
441-
log: &Logger,
442-
desired_version: Version,
434+
validated_action: ValidatedDatastoreSetupAction,
443435
all_versions: Option<&AllSchemaVersions>,
444436
) -> Result<(), anyhow::Error> {
437+
let action = validated_action.action();
438+
439+
match action {
440+
DatastoreSetupAction::Ready => {
441+
bail!("No schema update is necessary")
442+
}
443+
DatastoreSetupAction::Update => (),
444+
_ => bail!("Not ready for schema update"),
445+
}
446+
447+
let desired_version = validated_action.desired_version().clone();
445448
let (found_version, found_target_version) = self
446449
.database_schema_version()
447450
.await
448451
.context("Cannot read database schema version")?;
449452

450-
let log = log.new(o!(
453+
let log = self.log.new(o!(
451454
"found_version" => found_version.to_string(),
452455
"desired_version" => desired_version.to_string(),
453456
));
@@ -1145,15 +1148,34 @@ mod test {
11451148
// Confirms that calling the internal "ensure_schema" function can succeed
11461149
// when the database is already at that version.
11471150
#[tokio::test]
1148-
async fn ensure_schema_is_current_version() {
1149-
let logctx = dev::test_setup_log("ensure_schema_is_current_version");
1151+
async fn check_schema_is_current_version() {
1152+
let logctx = dev::test_setup_log("check_schema_is_current_version");
11501153
let db = TestDatabase::new_with_raw_datastore(&logctx.log).await;
11511154
let datastore = db.datastore();
11521155

1153-
datastore
1154-
.ensure_schema(&logctx.log, SCHEMA_VERSION, None)
1156+
let checked_action = datastore
1157+
.check_schema_and_access(
1158+
IdentityCheckPolicy::DontCare,
1159+
SCHEMA_VERSION,
1160+
)
11551161
.await
1156-
.expect("Failed to ensure schema");
1162+
.expect("Failed to check schema and access");
1163+
1164+
assert!(
1165+
matches!(checked_action.action(), DatastoreSetupAction::Ready),
1166+
"Unexpected action: {:?}",
1167+
checked_action.action(),
1168+
);
1169+
assert_eq!(
1170+
checked_action.desired_version(),
1171+
&SCHEMA_VERSION,
1172+
"Unexpected desired version: {}",
1173+
checked_action.desired_version()
1174+
);
1175+
1176+
datastore.update_schema(checked_action, None).await.expect_err(
1177+
"Should not be able to update schema that's already up-to-date",
1178+
);
11571179

11581180
db.terminate().await;
11591181
logctx.cleanup_successful();
@@ -1256,8 +1278,13 @@ mod test {
12561278
let log = log.clone();
12571279
let pool = pool.clone();
12581280
tokio::task::spawn(async move {
1259-
let datastore =
1260-
DataStore::new(&log, pool, Some(&all_versions)).await?;
1281+
let datastore = DataStore::new(
1282+
&log,
1283+
pool,
1284+
Some(&all_versions),
1285+
IdentityCheckPolicy::DontCare,
1286+
)
1287+
.await?;
12611288

12621289
// This is the crux of this test: confirm that, as each
12631290
// migration completes, it's not possible to see any artifacts
@@ -1384,9 +1411,23 @@ mod test {
13841411

13851412
// Manually construct the datastore to avoid the backoff timeout.
13861413
// We want to trigger errors, but have no need to wait.
1414+
13871415
let datastore = DataStore::new_unchecked(log.clone(), pool.clone());
1416+
let checked_action = datastore
1417+
.check_schema_and_access(
1418+
IdentityCheckPolicy::DontCare,
1419+
SCHEMA_VERSION,
1420+
)
1421+
.await
1422+
.expect("Failed to check schema and access");
1423+
1424+
// This needs to be in a loop because we constructed a schema change
1425+
// that will intentionally fail sometimes when doing this work.
1426+
//
1427+
// This isn't a normal behavior! But we're trying to test the
1428+
// intermediate steps of a schema change here.
13881429
while let Err(e) = datastore
1389-
.ensure_schema(&log, SCHEMA_VERSION, Some(&all_versions))
1430+
.update_schema(checked_action.clone(), Some(&all_versions))
13901431
.await
13911432
{
13921433
warn!(log, "Failed to ensure schema"; "err" => %e);

nexus/db-queries/src/db/datastore/mod.rs

Lines changed: 82 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ use omicron_common::backoff::{
4141
};
4242
use omicron_uuid_kinds::{GenericUuid, OmicronZoneUuid, SledUuid};
4343
use slog::Logger;
44+
use slog_error_chain::InlineErrorChain;
4445
use std::net::Ipv6Addr;
4546
use std::num::NonZeroU32;
4647
use std::sync::Arc;
@@ -121,6 +122,8 @@ pub mod webhook_delivery;
121122
mod zpool;
122123

123124
pub use address_lot::AddressLotCreateResult;
125+
pub use db_metadata::DatastoreSetupAction;
126+
pub use db_metadata::ValidatedDatastoreSetupAction;
124127
pub use dns::DataStoreDnsTest;
125128
pub use dns::DnsVersionUpdateBuilder;
126129
pub use ereport::EreportFilters;
@@ -239,16 +242,19 @@ impl DataStore {
239242
log: &Logger,
240243
pool: Arc<Pool>,
241244
config: Option<&AllSchemaVersions>,
245+
identity_check: IdentityCheckPolicy,
242246
) -> Result<Self, String> {
243-
Self::new_with_timeout(log, pool, config, None).await
247+
Self::new_with_timeout(log, pool, config, None, identity_check).await
244248
}
245249

246250
pub async fn new_with_timeout(
247251
log: &Logger,
248252
pool: Arc<Pool>,
249253
config: Option<&AllSchemaVersions>,
250254
try_for: Option<std::time::Duration>,
255+
identity_check: IdentityCheckPolicy,
251256
) -> Result<Self, String> {
257+
use db_metadata::DatastoreSetupAction;
252258
use nexus_db_model::SCHEMA_VERSION as EXPECTED_VERSION;
253259

254260
let datastore =
@@ -262,25 +268,89 @@ impl DataStore {
262268
|| async {
263269
if let Some(try_for) = try_for {
264270
if std::time::Instant::now() > start + try_for {
265-
return Err(BackoffError::permanent(()));
271+
return Err(BackoffError::permanent(
272+
"Timeout waiting for DataStore::new_with_timeout",
273+
));
266274
}
267275
}
268276

269-
match datastore
270-
.ensure_schema(&log, EXPECTED_VERSION, config)
271-
.await
272-
{
273-
Ok(()) => return Ok(()),
274-
Err(e) => {
275-
warn!(log, "Failed to ensure schema version"; "error" => #%e);
277+
loop {
278+
let checked_action = datastore
279+
.check_schema_and_access(
280+
identity_check,
281+
EXPECTED_VERSION,
282+
)
283+
.await
284+
.map_err(|err| {
285+
warn!(
286+
log,
287+
"Cannot check schema version / Nexus access";
288+
"error" => InlineErrorChain::new(err.as_ref()),
289+
);
290+
BackoffError::transient(
291+
"Cannot check schema version / Nexus access",
292+
)
293+
})?;
294+
295+
match checked_action.action() {
296+
DatastoreSetupAction::Ready => {
297+
info!(log, "Datastore is ready for usage");
298+
return Ok(());
299+
}
300+
DatastoreSetupAction::NeedsHandoff { nexus_id } => {
301+
info!(log, "Datastore is awaiting handoff");
302+
303+
datastore.attempt_handoff(*nexus_id).await.map_err(
304+
|err| {
305+
warn!(
306+
log,
307+
"Could not handoff to new nexus";
308+
err
309+
);
310+
BackoffError::transient(
311+
"Could not handoff to new nexus",
312+
)
313+
},
314+
)?;
315+
316+
// If the handoff was successful, immediately
317+
// re-evaluate the schema and access policies to see
318+
// if we should update or not.
319+
continue;
320+
}
321+
DatastoreSetupAction::Update => {
322+
info!(
323+
log,
324+
"Datastore should be updated before usage"
325+
);
326+
datastore
327+
.update_schema(checked_action, config)
328+
.await
329+
.map_err(|err| {
330+
warn!(
331+
log,
332+
"Failed to update schema version";
333+
"error" => InlineErrorChain::new(err.as_ref())
334+
);
335+
BackoffError::transient(
336+
"Failed to update schema version",
337+
)
338+
})?;
339+
return Ok(());
340+
}
341+
DatastoreSetupAction::Refuse => {
342+
error!(log, "Datastore should not be used");
343+
return Err(BackoffError::permanent(
344+
"Datastore should not be used",
345+
));
346+
}
276347
}
277-
};
278-
return Err(BackoffError::transient(()));
348+
}
279349
},
280350
|_, _| {},
281351
)
282352
.await
283-
.map_err(|_| "Failed to read valid DB schema".to_string())?;
353+
.map_err(|err| err.to_string())?;
284354

285355
Ok(datastore)
286356
}

nexus/db-queries/src/db/pub_test_utils/mod.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use crate::authz;
1212
use crate::context::OpContext;
1313
use crate::db;
1414
use crate::db::DataStore;
15+
use crate::db::datastore::IdentityCheckPolicy;
1516
use omicron_test_utils::dev::db::CockroachInstance;
1617
use slog::Logger;
1718
use std::sync::Arc;
@@ -114,7 +115,14 @@ impl TestDatabaseBuilder {
114115
Interface::Datastore => {
115116
let pool = new_pool(log, &db);
116117
let datastore = Arc::new(
117-
DataStore::new(&log, pool, None).await.unwrap(),
118+
DataStore::new(
119+
&log,
120+
pool,
121+
None,
122+
IdentityCheckPolicy::DontCare,
123+
)
124+
.await
125+
.unwrap(),
118126
);
119127
TestDatabase {
120128
db,
@@ -300,7 +308,11 @@ async fn datastore_test(
300308

301309
let cfg = db::Config { url: db.pg_config().clone() };
302310
let pool = Arc::new(db::Pool::new_single_host(&log, &cfg));
303-
let datastore = Arc::new(DataStore::new(&log, pool, None).await.unwrap());
311+
let datastore = Arc::new(
312+
DataStore::new(&log, pool, None, IdentityCheckPolicy::DontCare)
313+
.await
314+
.unwrap(),
315+
);
304316

305317
// Create an OpContext with the credentials of "db-init" just for the
306318
// purpose of loading the built-in users, roles, and assignments.

nexus/src/app/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use nexus_db_queries::authn;
2424
use nexus_db_queries::authz;
2525
use nexus_db_queries::context::OpContext;
2626
use nexus_db_queries::db;
27+
use nexus_db_queries::db::datastore::IdentityCheckPolicy;
2728
use nexus_mgs_updates::ArtifactCache;
2829
use nexus_mgs_updates::MgsUpdateDriver;
2930
use nexus_types::deployment::PendingMgsUpdates;
@@ -310,12 +311,14 @@ impl Nexus {
310311
.map(|s| AllSchemaVersions::load(&s.schema_dir))
311312
.transpose()
312313
.map_err(|error| format!("{error:#}"))?;
314+
let nexus_id = config.deployment.id;
313315
let db_datastore = Arc::new(
314316
db::DataStore::new_with_timeout(
315317
&log,
316318
Arc::clone(&pool),
317319
all_versions.as_ref(),
318320
config.pkg.tunables.load_timeout,
321+
IdentityCheckPolicy::CheckAndTakeover { nexus_id },
319322
)
320323
.await?,
321324
);

nexus/src/bin/schema-updater.rs

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ use nexus_db_model::AllSchemaVersions;
1414
use nexus_db_model::SCHEMA_VERSION;
1515
use nexus_db_queries::db;
1616
use nexus_db_queries::db::DataStore;
17+
use nexus_db_queries::db::datastore::DatastoreSetupAction;
18+
use nexus_db_queries::db::datastore::IdentityCheckPolicy;
1719
use semver::Version;
1820
use slog::Drain;
1921
use slog::Level;
@@ -108,11 +110,39 @@ async fn main_impl() -> anyhow::Result<()> {
108110
}
109111
Cmd::Upgrade { version } => {
110112
println!("Upgrading to {version}");
111-
datastore
112-
.ensure_schema(&log, version.clone(), Some(&all_versions))
113-
.await
114-
.map_err(|e| anyhow!(e))?;
115-
println!("Upgrade to {version} complete");
113+
let checked_action = datastore
114+
.check_schema_and_access(
115+
IdentityCheckPolicy::DontCare,
116+
version.clone(),
117+
)
118+
.await?;
119+
120+
match checked_action.action() {
121+
DatastoreSetupAction::Ready => {
122+
println!("Already at version {version}")
123+
}
124+
DatastoreSetupAction::Update => {
125+
datastore
126+
.update_schema(checked_action, Some(&all_versions))
127+
.await
128+
.map_err(|e| anyhow!(e))?;
129+
println!("Update to {version} complete");
130+
}
131+
DatastoreSetupAction::Refuse => {
132+
println!("Refusing to update to version {version}")
133+
}
134+
DatastoreSetupAction::NeedsHandoff { .. } => {
135+
// This case should not happen - we supplied
136+
// IdentityCheckPolicy::DontCare, so we should not be told
137+
// to attempt a takeover by a specific Nexus.
138+
println!(
139+
"Refusing to update to version {version}. \
140+
The schema updater tried to ignore the identity check, \
141+
but got a response indicating handoff is needed. \
142+
This is unexpected, and probably a bug"
143+
)
144+
}
145+
}
116146
}
117147
}
118148
datastore.terminate().await;

0 commit comments

Comments
 (0)