diff --git a/dev-tools/omdb/src/bin/omdb/nexus/quiesce.rs b/dev-tools/omdb/src/bin/omdb/nexus/quiesce.rs index 76c0a229c3..7372027252 100644 --- a/dev-tools/omdb/src/bin/omdb/nexus/quiesce.rs +++ b/dev-tools/omdb/src/bin/omdb/nexus/quiesce.rs @@ -11,7 +11,10 @@ use chrono::TimeDelta; use chrono::Utc; use clap::Args; use clap::Subcommand; +use nexus_client::types::PendingRecovery; use nexus_client::types::QuiesceState; +use nexus_client::types::QuiesceStatus; +use nexus_client::types::SagaQuiesceStatus; use std::time::Duration; #[derive(Debug, Args)] @@ -31,9 +34,9 @@ pub enum QuiesceCommands { #[derive(Debug, Args)] pub struct QuiesceShowArgs { - /// Show details about held database connections + /// Show stack traces for held database connections #[clap(short, long, default_value_t = false)] - verbose: bool, + stacks: bool, } pub async fn cmd_nexus_quiesce( @@ -60,7 +63,10 @@ async fn quiesce_show( .await .context("fetching quiesce state")? .into_inner(); - match quiesce.state { + + let QuiesceStatus { db_claims, sagas, state } = quiesce; + + match state { QuiesceState::Undetermined => { println!("has not yet determined if it is quiescing"); } @@ -145,25 +151,83 @@ async fn quiesce_show( } } - println!("sagas running: {}", quiesce.sagas_pending.len()); - for saga in &quiesce.sagas_pending { + let SagaQuiesceStatus { + sagas_pending, + drained_blueprint_id, + first_recovery_complete, + new_sagas_allowed, + reassignment_blueprint_id, + reassignment_generation, + reassignment_pending, + recovered_blueprint_id, + recovered_reassignment_generation, + recovery_pending, + } = sagas; + + println!("saga quiesce:"); + println!(" new sagas: {:?}", new_sagas_allowed); + println!( + " drained as of blueprint: {}", + drained_blueprint_id + .map(|s| s.to_string()) + .as_deref() + .unwrap_or("none") + ); + println!( + " blueprint for last completed recovery pass: {}", + recovered_blueprint_id + .map(|s| s.to_string()) + .as_deref() + .unwrap_or("none") + ); + println!( + " blueprint for last reassignment pass: {}", + reassignment_blueprint_id + .map(|s| s.to_string()) + .as_deref() + .unwrap_or("none") + ); + println!( + " reassignment generation: {} (pass running: {})", + reassignment_generation, + if reassignment_pending { "yes" } else { "no" } + ); + println!(" recovered generation: {}", recovered_reassignment_generation); + println!( + " recovered at least once successfully: {}", + if first_recovery_complete { "yes" } else { "no" }, + ); + print!(" recovery pending: "); + if let Some(PendingRecovery { generation, blueprint_id }) = recovery_pending + { + println!( + "yes (generation {}, blueprint id {})", + generation, + blueprint_id.map(|s| s.to_string()).as_deref().unwrap_or("none") + ); + } else { + println!("no"); + } + + println!(" sagas running: {}", sagas_pending.len()); + for saga in &sagas_pending { println!( - " saga {} pending since {} ({})", + " saga {} pending since {} ({})", saga.saga_id, humantime::format_rfc3339_millis(saga.time_pending.into()), saga.saga_name ); } - println!("database connections held: {}", quiesce.db_claims.len()); - for claim in &quiesce.db_claims { + println!("database connections held: {}", db_claims.len()); + for claim in &db_claims { println!( " claim {} held since {} ({} ago)", claim.id, claim.held_since, format_time_delta(Utc::now() - claim.held_since), ); - if args.verbose { + if args.stacks { println!(" acquired by:"); println!("{}", textwrap::indent(&claim.debug, " ")); } @@ -177,7 +241,7 @@ async fn quiesce_start( _token: DestructiveOperationToken, ) -> Result<(), anyhow::Error> { client.quiesce_start().await.context("quiescing Nexus")?; - quiesce_show(client, &QuiesceShowArgs { verbose: false }).await + quiesce_show(client, &QuiesceShowArgs { stacks: false }).await } fn format_duration_ms(duration: Duration) -> String { diff --git a/nexus/reconfigurator/execution/src/lib.rs b/nexus/reconfigurator/execution/src/lib.rs index a4f802b3c9..ced20480e7 100644 --- a/nexus/reconfigurator/execution/src/lib.rs +++ b/nexus/reconfigurator/execution/src/lib.rs @@ -22,6 +22,7 @@ use nexus_types::deployment::execution::{ StepHandle, StepResult, UpdateEngine, }; use nexus_types::quiesce::SagaQuiesceHandle; +use nexus_types::quiesce::SagaReassignmentDone; use omicron_uuid_kinds::OmicronZoneUuid; use slog::info; use slog_error_chain::InlineErrorChain; @@ -627,18 +628,16 @@ fn register_reassign_sagas_step<'a>( match reassigned { Ok(needs_saga_recovery) => ( StepSuccess::new(needs_saga_recovery).build(), - needs_saga_recovery, + SagaReassignmentDone::ReassignedAllAsOf( + blueprint.id, + needs_saga_recovery, + ), + ), + Err(error) => ( + StepWarning::new(false, error.to_string()) + .build(), + SagaReassignmentDone::Indeterminate, ), - Err(error) => { - // It's possible that we failed after having - // re-assigned sagas in the database. - let maybe_reassigned = true; - ( - StepWarning::new(false, error.to_string()) - .build(), - maybe_reassigned, - ) - } } }) .await) diff --git a/nexus/src/app/quiesce.rs b/nexus/src/app/quiesce.rs index 3b36ec9799..2270452122 100644 --- a/nexus/src/app/quiesce.rs +++ b/nexus/src/app/quiesce.rs @@ -32,9 +32,9 @@ impl super::Nexus { ) -> LookupResult { opctx.authorize(authz::Action::Read, &authz::QUIESCE_STATE).await?; let state = self.quiesce.state(); - let sagas_pending = self.quiesce.sagas().sagas_pending(); + let sagas = self.quiesce.sagas().status(); let db_claims = self.datastore().claims_held(); - Ok(QuiesceStatus { state, sagas_pending, db_claims }) + Ok(QuiesceStatus { state, sagas, db_claims }) } } @@ -281,7 +281,7 @@ mod test { assert!(duration_total >= duration_draining_db); assert!(duration_total >= duration_recording_quiesce); assert!(duration_total <= (after - before).to_std().unwrap()); - assert!(status.sagas_pending.is_empty()); + assert!(status.sagas.sagas_pending.is_empty()); assert!(status.db_claims.is_empty()); } @@ -355,7 +355,9 @@ mod test { quiesce_status.state, QuiesceState::DrainingSagas { .. } ); - assert!(quiesce_status.sagas_pending.contains_key(&demo_saga.saga_id)); + assert!( + quiesce_status.sagas.sagas_pending.contains_key(&demo_saga.saga_id) + ); // We should see at least one held database claim from the one we took // above. assert!(!quiesce_status.db_claims.is_empty()); @@ -419,7 +421,7 @@ mod test { if !matches!(rv.state, QuiesceState::DrainingDb { .. }) { return Err(CondCheckError::::NotYet); } - assert!(rv.sagas_pending.is_empty()); + assert!(rv.sagas.sagas_pending.is_empty()); // The database claim we took is still held. assert!(!rv.db_claims.is_empty()); Ok(()) diff --git a/nexus/types/src/internal_api/views.rs b/nexus/types/src/internal_api/views.rs index 972a0b92df..df5a0ef8ed 100644 --- a/nexus/types/src/internal_api/views.rs +++ b/nexus/types/src/internal_api/views.rs @@ -8,6 +8,7 @@ use crate::inventory::BaseboardId; use crate::inventory::Caboose; use crate::inventory::CabooseWhich; use crate::inventory::Collection; +use crate::quiesce::SagaQuiesceStatus; use chrono::DateTime; use chrono::SecondsFormat; use chrono::Utc; @@ -721,12 +722,8 @@ pub struct QuiesceStatus { /// what stage of quiescing is Nexus at pub state: QuiesceState, - /// what sagas are currently running or known needing to be recovered - /// - /// This should only be non-empty when state is `Running` or - /// `WaitingForSagas`. Entries here prevent transitioning from - /// `WaitingForSagas` to `WaitingForDb`. - pub sagas_pending: IdOrdMap, + /// information about saga quiescing + pub sagas: SagaQuiesceStatus, /// what database claims are currently held (by any part of Nexus) /// diff --git a/nexus/types/src/quiesce.rs b/nexus/types/src/quiesce.rs index 7c2b3ad42d..532322917a 100644 --- a/nexus/types/src/quiesce.rs +++ b/nexus/types/src/quiesce.rs @@ -11,6 +11,9 @@ use futures::future::BoxFuture; use iddqd::IdOrdMap; use omicron_common::api::external::Error; use omicron_common::api::external::Generation; +use omicron_uuid_kinds::BlueprintUuid; +use schemars::JsonSchema; +use serde::Serialize; use slog::Logger; use slog::error; use slog::info; @@ -24,7 +27,8 @@ use tokio::sync::watch; /// /// This is used by Nexus quiesce to disallow creation of new sagas when we're /// trying to quiesce Nexus. -#[derive(Debug, Clone, Copy, Eq, PartialEq)] +#[derive(Debug, Clone, Copy, Eq, PartialEq, JsonSchema, Serialize)] +#[serde(rename_all = "snake_case")] enum SagasAllowed { /// New sagas may be started (normal condition) Allowed, @@ -48,6 +52,13 @@ impl From for Error { } } +/// Describes the result of a saga re-assignment +#[derive(Debug)] +pub enum SagaReassignmentDone { + Indeterminate, + ReassignedAllAsOf(BlueprintUuid, bool), +} + /// Describes both the configuration (whether sagas are allowed to be created) /// and the state (how many sagas are pending) for the purpose of quiescing /// Nexus. @@ -93,11 +104,11 @@ pub struct SagaQuiesceHandle { // cancellation behavior is abysmal), but we don't want to block on a // std `Condvar` in an async thread. There are options here (e.g., // `block_on`), but they're not pleasant. - inner: watch::Sender, + inner: watch::Sender, } -#[derive(Debug, Clone)] -struct SagaQuiesceInner { +#[derive(Debug, Clone, Serialize, JsonSchema)] +pub struct SagaQuiesceStatus { /// current policy: are we allowed to *create* new sagas? /// /// This also affects re-assigning sagas from expunged Nexus instances to @@ -124,6 +135,13 @@ struct SagaQuiesceInner { /// we've recovered all sagas that could be assigned to us. reassignment_generation: Generation, + /// blueprint id associated with last successful saga reassignment + /// + /// Similar to the generation number, this is used to track whether we've + /// accounted for all sagas for all expungements up through this target + /// blueprint. + reassignment_blueprint_id: Option, + /// whether there is a saga reassignment operation happening /// /// These operatinos may assign new sagas to Nexus that must be recovered @@ -138,14 +156,50 @@ struct SagaQuiesceInner { /// given reassignment pass. See `reassignment_done()` for details. recovered_reassignment_generation: Generation, - /// whether a saga recovery operation is ongoing, and if one is, what - /// `reassignment_generation` was when it started - recovery_pending: Option, + /// blueprint id that saga recovery has "caught up to" + /// + /// This means that we have finished recovering any sagas that were + /// re-assigned to us due to expungements of other Nexus zones up through + /// this blueprint. Put differently: we know that we will never be assigned + /// more sagas due to expungement unless the target blueprint changes past + /// this one. + /// + /// This does not mean that we've fully drained all sagas up through this + /// blueprint. There may still be sagas running. + recovered_blueprint_id: Option, + + /// blueprint id that we're "fully drained up to" + /// + /// If this value is non-`None`, that means that: + /// + /// - saga creation is disallowed + /// - no sagas are running + /// - we have re-assigned sagas from other Nexus instances expunged in this + /// blueprint or earlier + /// - we have finished recovery for all those sagas (that had been assigned + /// to us as of the re-assignment pass for this blueprint id) + /// + /// This means that the only way we can wind up running another saga is if + /// there's a new blueprint that expunges a different Nexus zone. + drained_blueprint_id: Option, + + /// If a recovery pass is ongoing, a snapshot of reassignment state when it + /// started (which reflects what we'll be caught up to when it finishes) + recovery_pending: Option, +} + +/// Snapshot of reassignment state when a recovery pass started +#[derive(Debug, Clone, Serialize, JsonSchema)] +struct PendingRecovery { + /// what `reassignment_generation` was when this recovery started + generation: Generation, + /// which blueprint id we'd be fully caught up to upon completion + blueprint_id: Option, } impl SagaQuiesceHandle { pub fn new(log: Logger) -> SagaQuiesceHandle { - let (inner, _) = watch::channel(SagaQuiesceInner { + let (inner, _) = watch::channel(SagaQuiesceStatus { new_sagas_allowed: SagasAllowed::DisallowedUnknown, sagas_pending: IdOrdMap::new(), first_recovery_complete: false, @@ -153,6 +207,9 @@ impl SagaQuiesceHandle { reassignment_pending: false, recovered_reassignment_generation: Generation::new(), recovery_pending: None, + reassignment_blueprint_id: None, + recovered_blueprint_id: None, + drained_blueprint_id: None, }); SagaQuiesceHandle { log, inner } } @@ -163,7 +220,7 @@ impl SagaQuiesceHandle { /// cannot then re-enable sagas. pub fn set_quiescing(&self, quiescing: bool) { self.inner.send_if_modified(|q| { - match q.new_sagas_allowed { + let changed = match q.new_sagas_allowed { SagasAllowed::DisallowedUnknown => { let new_state = if quiescing { SagasAllowed::DisallowedQuiesce @@ -199,15 +256,28 @@ impl SagaQuiesceHandle { // Either way, we're not changing anything. false } - } + }; + + q.latch_blueprint_if_drained(); + changed }); } + /// Returns the blueprint id as of which sagas are fully drained + /// + /// We may become un-drained if another re-assignment pass starts for a + /// subsequent blueprint, but this fact will still be true that we *were* + /// fully drained as of expungements included up through this blueprint. + pub fn fully_drained_blueprint(&self) -> Option { + self.inner.borrow().drained_blueprint_id + } + /// Returns whether sagas are fully drained /// /// Note that this state can change later if new sagas get assigned to this /// Nexus. - pub fn is_fully_drained(&self) -> bool { + #[cfg(test)] + fn is_fully_drained(&self) -> bool { self.inner.borrow().is_fully_drained() } @@ -231,8 +301,14 @@ impl SagaQuiesceHandle { .await; } + /// Returns a summary of internal state for debugging (involves a clone) + pub fn status(&self) -> SagaQuiesceStatus { + self.inner.borrow().clone() + } + /// Returns information about running sagas (involves a clone) - pub fn sagas_pending(&self) -> IdOrdMap { + #[cfg(test)] + fn sagas_pending(&self) -> IdOrdMap { self.inner.borrow().sagas_pending.clone() } @@ -261,7 +337,7 @@ impl SagaQuiesceHandle { // those. pub async fn reassign_sagas(&self, f: F) -> T where - F: AsyncFnOnce() -> (T, bool), + F: AsyncFnOnce() -> (T, SagaReassignmentDone), { let in_progress = self.reassignment_start(); let (result, maybe_reassigned) = f().await; @@ -291,24 +367,70 @@ impl SagaQuiesceHandle { /// Record that we've finished an operation that might assign new sagas to /// ourselves. - fn reassignment_done(&self, maybe_reassigned: bool) { + fn reassignment_done(&self, result: SagaReassignmentDone) { info!( &self.log, "saga re-assignment pass finished"; - "maybe_reassigned" => maybe_reassigned + "result" => ?result ); self.inner.send_modify(|q| { assert!(q.reassignment_pending); q.reassignment_pending = false; - // If we may have assigned new sagas to ourselves, bump the - // generation number. We won't report being drained until a - // recovery pass has finished that *started* with this generation - // number. So this ensures that we won't report being drained until - // any sagas that may have been assigned to us have been recovered. - if maybe_reassigned { - q.reassignment_generation = q.reassignment_generation.next(); + match result { + SagaReassignmentDone::ReassignedAllAsOf( + blueprint_id, + reassigned_any, + ) => { + // Record that we've completed assignments of all sagas from + // all Nexus instances expunged as of this blueprint. The + // only way we could re-assign ourselves more sagas is if + // the target blueprint changes. + q.reassignment_blueprint_id = Some(blueprint_id); + + if reassigned_any { + // If we assigned new sagas to ourselves, bump the + // generation number. We won't report being drained + // until a recovery pass has finished that *started* + // with this generation number. This ensures that we + // won't report being drained until any sagas that may + // have been assigned to us have been recovered. + q.reassignment_generation = + q.reassignment_generation.next(); + } else if q.reassignment_generation + <= q.recovered_reassignment_generation + && q.first_recovery_complete + { + // If recovery has caught up to the current reassignment + // generation, then we can also say that we're recovered + // up to this blueprint. + q.recovered_blueprint_id = q.reassignment_blueprint_id; + } + } + SagaReassignmentDone::Indeterminate => { + // This means the caller doesn't know for sure whether they + // re-assigned us any sagas. (This can happen if there's a + // network error talking to the database. We don't know if + // that happened before or after the database transaction + // committed.) + // + // The comment above about the reassignment_generation + // applies in this case. We must assume in this case that + // there may be sagas that we need to recover before we + // consider ourselves drained. That means we need another + // recovery pass, which means bumping this generation + // number. + // + // However, once we *do* finish that, we won't know that + // we've finished recovering all sagas associated with Nexus + // instances expunged in this blueprint. So we *don't* + // update `reassignment_blueprint_id`. + q.reassignment_generation = + q.reassignment_generation.next(); + } } + + q.latch_blueprint_if_drained(); }); } @@ -353,7 +475,10 @@ impl SagaQuiesceHandle { "recovery_start() called twice without intervening \ recovery_done() (concurrent calls to recover()?)", ); - q.recovery_pending = Some(q.reassignment_generation); + q.recovery_pending = Some(PendingRecovery { + generation: q.reassignment_generation, + blueprint_id: q.reassignment_blueprint_id, + }); }); info!(&self.log, "saga recovery pass starting"); @@ -364,7 +489,9 @@ impl SagaQuiesceHandle { fn recovery_done(&self, success: bool) { let log = self.log.clone(); self.inner.send_modify(|q| { - let Some(generation) = q.recovery_pending.take() else { + let Some(PendingRecovery { generation, blueprint_id }) = + q.recovery_pending.take() + else { panic!("cannot finish saga recovery when it was not running"); }; @@ -372,10 +499,13 @@ impl SagaQuiesceHandle { info!( &log, "saga recovery pass finished"; - "generation" => generation.to_string() + "generation" => generation.to_string(), + "blueprint_id" => ?blueprint_id, ); + q.recovered_blueprint_id = blueprint_id; q.recovered_reassignment_generation = generation; q.first_recovery_complete = true; + q.latch_blueprint_if_drained(); } else { info!(&log, "saga recovery pass failed"); } @@ -487,12 +617,12 @@ impl SagaQuiesceHandle { } } -impl SagaQuiesceInner { +impl SagaQuiesceStatus { /// Returns whether sagas are fully drained /// /// This condition is not permanent. New sagas can be re-assigned to this /// Nexus. - pub fn is_fully_drained(&self) -> bool { + fn is_fully_drained(&self) -> bool { // No new sagas may be created self.new_sagas_allowed == SagasAllowed::DisallowedQuiesce // and there are none currently running @@ -507,6 +637,23 @@ impl SagaQuiesceInner { // and blueprint execution is not currently re-assigning stuff to us && !self.reassignment_pending } + + /// Invoked whenever the quiesce state changes to determine if we are + /// currently fully drained up to a given blueprint id + /// + /// We want to keep track of this even if the target blueprint moves beyond + /// this blueprint and we start re-assigning new sagas to ourselves as a + /// result of that blueprint. The rest of our bookkeeping would reflect + /// that we're not fully drained, which is true, but we still want to be + /// able to report that we were fully drained _as of this blueprint_. + fn latch_blueprint_if_drained(&mut self) { + if self.is_fully_drained() { + // If we've recovered up through a given blueprint id and are now + // fully drained, then we have definitely fully drained up through + // that blueprint id. + self.drained_blueprint_id = self.recovered_blueprint_id; + } + } } /// Handle used to ensure that we clean up records for a pending saga @@ -524,7 +671,7 @@ impl SagaQuiesceInner { #[must_use = "must record the saga completion future once the saga is running"] pub struct NewlyPendingSagaRef { log: Logger, - quiesce: watch::Sender, + quiesce: watch::Sender, saga_id: steno::SagaId, init_finished: bool, } @@ -568,6 +715,7 @@ impl NewlyPendingSagaRef { q.sagas_pending .remove(&saga_id) .expect("saga should have been running"); + q.latch_blueprint_if_drained(); }); rv }); @@ -651,21 +799,25 @@ struct SagaReassignmentInProgress { } impl SagaReassignmentInProgress { - fn reassignment_done(self, maybe_reassigned: bool) { - self.q.reassignment_done(maybe_reassigned) + fn reassignment_done(self, result: SagaReassignmentDone) { + self.q.reassignment_done(result); } } #[cfg(test)] mod test { use crate::quiesce::SagaQuiesceHandle; + use crate::quiesce::SagaReassignmentDone; use futures::FutureExt; use omicron_test_utils::dev::test_setup_log; + use omicron_uuid_kinds::BlueprintUuid; use std::sync::LazyLock; use uuid::Uuid; static SAGA_ID: LazyLock = LazyLock::new(|| steno::SagaId(Uuid::new_v4())); + static BLUEPRINT_ID: LazyLock = + LazyLock::new(|| BlueprintUuid::new_v4()); static SAGA_NAME: LazyLock = LazyLock::new(|| steno::SagaName::new("test-saga")); @@ -861,7 +1013,9 @@ mod test { // When re-assignment finishes *without* having re-assigned anything, // then we're immediately all set. - reassignment.reassignment_done(false); + reassignment.reassignment_done( + SagaReassignmentDone::ReassignedAllAsOf(*BLUEPRINT_ID, false), + ); assert!(qq.is_fully_drained()); qq.wait_for_drained().await; assert!(qq.is_fully_drained()); @@ -895,7 +1049,9 @@ mod test { // When re-assignment finishes and re-assigned sagas, we're still // blocked. - reassignment.reassignment_done(true); + reassignment.reassignment_done( + SagaReassignmentDone::ReassignedAllAsOf(*BLUEPRINT_ID, true), + ); assert!(!qq.is_fully_drained()); // If the next recovery pass fails, we're still blocked. @@ -946,7 +1102,9 @@ mod test { // When re-assignment finishes and re-assigned sagas, we're still // blocked. - reassignment.reassignment_done(true); + reassignment.reassignment_done( + SagaReassignmentDone::ReassignedAllAsOf(*BLUEPRINT_ID, true), + ); assert!(!qq.is_fully_drained()); // Even if this recovery pass succeeds, we're still blocked, because it @@ -999,7 +1157,9 @@ mod test { // When re-assignment finishes and re-assigned sagas, we're still // blocked because we haven't run recovery. - reassignment.reassignment_done(true); + reassignment.reassignment_done( + SagaReassignmentDone::ReassignedAllAsOf(*BLUEPRINT_ID, true), + ); assert!(!qq.is_fully_drained()); // Start a recovery pass. Pretend like we found something. @@ -1087,7 +1247,9 @@ mod test { // from being drained. let reassignment = qq.reassignment_start(); assert!(!qq.is_fully_drained()); - reassignment.reassignment_done(false); + reassignment.reassignment_done( + SagaReassignmentDone::ReassignedAllAsOf(*BLUEPRINT_ID, false), + ); // We're fully drained as soon as this one is done, since we know we // didn't assign any sagas. assert!(qq.is_fully_drained()); @@ -1095,7 +1257,9 @@ mod test { // Try again. This time, we'll act like we did reassign sagas. let reassignment = qq.reassignment_start(); assert!(!qq.is_fully_drained()); - reassignment.reassignment_done(true); + reassignment.reassignment_done( + SagaReassignmentDone::ReassignedAllAsOf(*BLUEPRINT_ID, true), + ); assert!(!qq.is_fully_drained()); // Do a failed recovery pass. We still won't be fully drained. let recovery = qq.recovery_start(); @@ -1110,4 +1274,143 @@ mod test { logctx.cleanup_successful(); } + + /// Tests tracking of the drained blueprint id + #[tokio::test] + async fn test_drained_blueprint() { + let logctx = test_setup_log("test_drained_blueprint"); + let log = &logctx.log; + + let qq = SagaQuiesceHandle::new(log.clone()); + assert!(qq.fully_drained_blueprint().is_none()); + + // Basic tests where we're *not* fully drained + + // Recovery by itself does not mean we're fully drained. + qq.recovery_start().recovery_done(true); + assert!(qq.fully_drained_blueprint().is_none()); + + // Even if we're quiescing now, we're not fully drained. + qq.set_quiescing(true); + assert!(qq.fully_drained_blueprint().is_none()); + + // Recovery still isn't enough. We haven't done a re-assignment pass. + // We are currently drained, though. + qq.recovery_start().recovery_done(true); + assert!(qq.fully_drained_blueprint().is_none()); + assert!(qq.is_fully_drained()); + + // No change after an indeterminate re-assignment. + let reassignment = qq.reassignment_start(); + reassignment.reassignment_done(SagaReassignmentDone::Indeterminate); + assert!(qq.fully_drained_blueprint().is_none()); + assert!(!qq.is_fully_drained()); + + // Fully drained case 1: saga re-assignment causes us to become fully + // drained. + // + // First, recover whatever we may have just assigned ourselves. + qq.recovery_start().recovery_done(true); + + // Now if we do a re-assignment pass that assigns no sagas, then we + // finally are fully drained up through this blueprint. This does not + // require recovery since no sagas were re-assigned. + let blueprint1_id = BlueprintUuid::new_v4(); + let reassignment = qq.reassignment_start(); + reassignment.reassignment_done( + SagaReassignmentDone::ReassignedAllAsOf(blueprint1_id, false), + ); + assert!(qq.is_fully_drained()); + assert_eq!(qq.fully_drained_blueprint(), Some(blueprint1_id)); + + // Next, test that even if we become no-longer-drained because we do + // another reassignment, we still record that we're fully drained as of + // the older blueprint. + + // Start another re-assignment pass. + let blueprint2_id = BlueprintUuid::new_v4(); + let reassignment = qq.reassignment_start(); + assert_eq!(qq.fully_drained_blueprint(), Some(blueprint1_id)); + // Act like we assigned some sagas. + reassignment.reassignment_done( + SagaReassignmentDone::ReassignedAllAsOf(blueprint2_id, true), + ); + // We're not fully drained because we haven't recovered those sagas. + assert!(!qq.is_fully_drained()); + // So the fully drained blueprint is the one from before. + assert_eq!(qq.fully_drained_blueprint(), Some(blueprint1_id)); + + // Start a recovery pass. Pretend like we found a saga. + // We'll use a oneshot channel to emulate the saga completion future. + let (tx, rx) = tokio::sync::oneshot::channel(); + let recovery = qq.recovery_start(); + let pending = recovery.record_saga_recovery(*SAGA_ID, &SAGA_NAME); + let consumer_completion = pending.saga_completion_future( + async { rx.await.expect("cannot drop this before dropping tx") } + .boxed(), + ); + recovery.recovery_done(true); + + // We're still not fully drained because we haven't finished that saga. + assert!(!qq.is_fully_drained()); + // So the fully drained blueprint is the one from before. + assert_eq!(qq.fully_drained_blueprint(), Some(blueprint1_id)); + + // Fully drained case 2: saga completion causes us to become fully + // drained. + // + // Complete the saga. + tx.send(saga_result()).unwrap(); + let _ = consumer_completion.await; + // Now, we should be fully drained up to the new blueprint. + assert!(qq.is_fully_drained()); + assert_eq!(qq.fully_drained_blueprint(), Some(blueprint2_id)); + + // Fully drained case 3: saga recovery causes us to become fully + // drained. + // + // For this case, imagine that we think we may have re-assigned + // ourselves some sagas, but recovery completes with no sagas + // outstanding. + let blueprint3_id = BlueprintUuid::new_v4(); + let reassignment = qq.reassignment_start(); + assert_eq!(qq.fully_drained_blueprint(), Some(blueprint2_id)); + // Act like we assigned some sagas. + reassignment.reassignment_done( + SagaReassignmentDone::ReassignedAllAsOf(blueprint3_id, true), + ); + assert!(!qq.is_fully_drained()); + assert_eq!(qq.fully_drained_blueprint(), Some(blueprint2_id)); + + // Quick check: failed recovery changes nothing. + qq.recovery_start().recovery_done(false); + assert!(!qq.is_fully_drained()); + assert_eq!(qq.fully_drained_blueprint(), Some(blueprint2_id)); + + // Successful recovery with no sagas running means we're fully drained + // as of the new blueprint. + qq.recovery_start().recovery_done(true); + assert_eq!(qq.fully_drained_blueprint(), Some(blueprint3_id)); + assert!(qq.is_fully_drained()); + + // Fully drained case 3: quiescing itself causes us to immediately + // become fully drained. + // + // This case requires a fresh handle, since the current one is already + // quiesced. + let blueprint4_id = BlueprintUuid::new_v4(); + let qq = SagaQuiesceHandle::new(log.clone()); + qq.reassignment_start().reassignment_done( + SagaReassignmentDone::ReassignedAllAsOf(blueprint4_id, true), + ); + qq.recovery_start().recovery_done(true); + assert!(qq.fully_drained_blueprint().is_none()); + assert!(!qq.is_fully_drained()); + + qq.set_quiescing(true); + assert_eq!(qq.fully_drained_blueprint(), Some(blueprint4_id)); + assert!(qq.is_fully_drained()); + + logctx.cleanup_successful(); + } } diff --git a/openapi/nexus-internal.json b/openapi/nexus-internal.json index dc31e3d3ca..e54c975e8e 100644 --- a/openapi/nexus-internal.json +++ b/openapi/nexus-internal.json @@ -6527,6 +6527,32 @@ "by_baseboard" ] }, + "PendingRecovery": { + "description": "Snapshot of reassignment state when a recovery pass started", + "type": "object", + "properties": { + "blueprint_id": { + "nullable": true, + "description": "which blueprint id we'd be fully caught up to upon completion", + "allOf": [ + { + "$ref": "#/components/schemas/TypedUuidForBlueprintKind" + } + ] + }, + "generation": { + "description": "what `reassignment_generation` was when this recovery started", + "allOf": [ + { + "$ref": "#/components/schemas/Generation" + } + ] + } + }, + "required": [ + "generation" + ] + }, "PendingSagaInfo": { "description": "Describes a pending saga (for debugging why quiesce is stuck)", "type": "object", @@ -7726,24 +7752,13 @@ }, "uniqueItems": true }, - "sagas_pending": { - "title": "IdOrdMap", - "description": "what sagas are currently running or known needing to be recovered\n\nThis should only be non-empty when state is `Running` or `WaitingForSagas`. Entries here prevent transitioning from `WaitingForSagas` to `WaitingForDb`.", - "x-rust-type": { - "crate": "iddqd", - "parameters": [ - { - "$ref": "#/components/schemas/PendingSagaInfo" - } - ], - "path": "iddqd::IdOrdMap", - "version": "*" - }, - "type": "array", - "items": { - "$ref": "#/components/schemas/PendingSagaInfo" - }, - "uniqueItems": true + "sagas": { + "description": "information about saga quiescing", + "allOf": [ + { + "$ref": "#/components/schemas/SagaQuiesceStatus" + } + ] }, "state": { "description": "what stage of quiescing is Nexus at", @@ -7756,7 +7771,7 @@ }, "required": [ "db_claims", - "sagas_pending", + "sagas", "state" ] }, @@ -8250,6 +8265,106 @@ } ] }, + "SagaQuiesceStatus": { + "type": "object", + "properties": { + "drained_blueprint_id": { + "nullable": true, + "description": "blueprint id that we're \"fully drained up to\"\n\nIf this value is non-`None`, that means that:\n\n- saga creation is disallowed - no sagas are running - we have re-assigned sagas from other Nexus instances expunged in this blueprint or earlier - we have finished recovery for all those sagas (that had been assigned to us as of the re-assignment pass for this blueprint id)\n\nThis means that the only way we can wind up running another saga is if there's a new blueprint that expunges a different Nexus zone.", + "allOf": [ + { + "$ref": "#/components/schemas/TypedUuidForBlueprintKind" + } + ] + }, + "first_recovery_complete": { + "description": "whether at least one recovery pass has successfully completed\n\nWe have to track this because we can't quiesce until we know we've recovered all outstanding sagas.", + "type": "boolean" + }, + "new_sagas_allowed": { + "description": "current policy: are we allowed to *create* new sagas?\n\nThis also affects re-assigning sagas from expunged Nexus instances to ourselves. It does **not** affect saga recovery.", + "allOf": [ + { + "$ref": "#/components/schemas/SagasAllowed" + } + ] + }, + "reassignment_blueprint_id": { + "nullable": true, + "description": "blueprint id associated with last successful saga reassignment\n\nSimilar to the generation number, this is used to track whether we've accounted for all sagas for all expungements up through this target blueprint.", + "allOf": [ + { + "$ref": "#/components/schemas/TypedUuidForBlueprintKind" + } + ] + }, + "reassignment_generation": { + "description": "generation number for the saga reassignment\n\nThis gets bumped whenever a saga reassignment operation completes that may have re-assigned us some sagas. It's used to keep track of when we've recovered all sagas that could be assigned to us.", + "allOf": [ + { + "$ref": "#/components/schemas/Generation" + } + ] + }, + "reassignment_pending": { + "description": "whether there is a saga reassignment operation happening\n\nThese operatinos may assign new sagas to Nexus that must be recovered and completed before quiescing can finish.", + "type": "boolean" + }, + "recovered_blueprint_id": { + "nullable": true, + "description": "blueprint id that saga recovery has \"caught up to\"\n\nThis means that we have finished recovering any sagas that were re-assigned to us due to expungements of other Nexus zones up through this blueprint. Put differently: we know that we will never be assigned more sagas due to expungement unless the target blueprint changes past this one.\n\nThis does not mean that we've fully drained all sagas up through this blueprint. There may still be sagas running.", + "allOf": [ + { + "$ref": "#/components/schemas/TypedUuidForBlueprintKind" + } + ] + }, + "recovered_reassignment_generation": { + "description": "\"saga reassignment generation number\" that was \"caught up to\" by the last recovery pass\n\nThis is used with `reassignment_generation` to help us know when we've recovered all the sagas that may have been assigned to us during a given reassignment pass. See `reassignment_done()` for details.", + "allOf": [ + { + "$ref": "#/components/schemas/Generation" + } + ] + }, + "recovery_pending": { + "nullable": true, + "description": "If a recovery pass is ongoing, a snapshot of reassignment state when it started (which reflects what we'll be caught up to when it finishes)", + "allOf": [ + { + "$ref": "#/components/schemas/PendingRecovery" + } + ] + }, + "sagas_pending": { + "title": "IdOrdMap", + "description": "list of sagas we need to wait to complete before quiescing\n\nThese are basically running sagas. They may have been created in this Nexus process lifetime or created in another process and then recovered in this one.", + "x-rust-type": { + "crate": "iddqd", + "parameters": [ + { + "$ref": "#/components/schemas/PendingSagaInfo" + } + ], + "path": "iddqd::IdOrdMap", + "version": "*" + }, + "type": "array", + "items": { + "$ref": "#/components/schemas/PendingSagaInfo" + }, + "uniqueItems": true + } + }, + "required": [ + "first_recovery_complete", + "new_sagas_allowed", + "reassignment_generation", + "reassignment_pending", + "recovered_reassignment_generation", + "sagas_pending" + ] + }, "SagaResultsPage": { "description": "A single page of results", "type": "object", @@ -8357,6 +8472,32 @@ } ] }, + "SagasAllowed": { + "description": "Policy determining whether new sagas are allowed to be started\n\nThis is used by Nexus quiesce to disallow creation of new sagas when we're trying to quiesce Nexus.", + "oneOf": [ + { + "description": "New sagas may be started (normal condition)", + "type": "string", + "enum": [ + "allowed" + ] + }, + { + "description": "New sagas may not be started because we're quiescing or quiesced", + "type": "string", + "enum": [ + "disallowed_quiesce" + ] + }, + { + "description": "New sagas may not be started because we just started up and haven't determined if we're quiescing yet", + "type": "string", + "enum": [ + "disallowed_unknown" + ] + } + ] + }, "ServerId": { "description": "A unique ID for a Clickhouse Server", "type": "integer",