Skip to content

Commit 6731ce2

Browse files
authored
skip pruned TUF repos when creating artifact config (#9109)
Part of #7135.
1 parent a49aa1b commit 6731ce2

File tree

5 files changed

+157
-40
lines changed

5 files changed

+157
-40
lines changed

nexus/db-queries/src/db/datastore/update.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,19 @@ impl DataStore {
414414
})
415415
}
416416

417+
/// List the artifacts present in a TUF repo.
418+
pub async fn tuf_list_repo_artifacts(
419+
&self,
420+
opctx: &OpContext,
421+
repo_id: TufRepoUuid,
422+
) -> ListResultVec<TufArtifact> {
423+
opctx.authorize(authz::Action::Read, &authz::FLEET).await?;
424+
let conn = self.pool_connection_authorized(opctx).await?;
425+
artifacts_for_repo(repo_id, &conn)
426+
.await
427+
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))
428+
}
429+
417430
/// Returns the current TUF repo generation number.
418431
pub async fn tuf_get_generation(
419432
&self,

nexus/src/app/background/tasks/tuf_artifact_replication.rs

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,13 @@ use std::ops::ControlFlow;
6262
use std::str::FromStr;
6363
use std::sync::Arc;
6464

65-
use anyhow::{Context, Result};
65+
use anyhow::{Context, Result, ensure};
6666
use chrono::Utc;
6767
use futures::future::{BoxFuture, FutureExt};
6868
use futures::stream::{FuturesUnordered, Stream, StreamExt};
6969
use http::StatusCode;
7070
use nexus_auth::context::OpContext;
71-
use nexus_db_queries::db::{
72-
DataStore, datastore::SQL_BATCH_SIZE, pagination::Paginator,
73-
};
71+
use nexus_db_queries::db::DataStore;
7472
use nexus_networking::sled_client_from_address;
7573
use nexus_types::deployment::SledFilter;
7674
use nexus_types::identity::Asset;
@@ -79,7 +77,7 @@ use nexus_types::internal_api::background::{
7977
TufArtifactReplicationRequest, TufArtifactReplicationStatus,
8078
};
8179
use omicron_common::api::external::Generation;
82-
use omicron_uuid_kinds::{GenericUuid, SledUuid};
80+
use omicron_uuid_kinds::SledUuid;
8381
use rand::seq::{IndexedRandom, SliceRandom};
8482
use serde_json::json;
8583
use sled_agent_client::types::ArtifactConfig;
@@ -593,18 +591,26 @@ impl ArtifactReplication {
593591
opctx: &OpContext,
594592
) -> Result<(ArtifactConfig, Inventory)> {
595593
let generation = self.datastore.tuf_get_generation(opctx).await?;
594+
let repos =
595+
self.datastore.tuf_list_repos_unpruned_batched(opctx).await?;
596+
// `tuf_list_repos_unpruned_batched` performs pagination internally,
597+
// so check that the generation hasn't changed during our pagination to
598+
// ensure we got a consistent read.
599+
{
600+
let generation_now =
601+
self.datastore.tuf_get_generation(opctx).await?;
602+
ensure!(
603+
generation == generation_now,
604+
"generation changed from {generation} \
605+
to {generation_now}, bailing"
606+
);
607+
}
608+
596609
let mut inventory = Inventory::default();
597-
let mut paginator = Paginator::new(
598-
SQL_BATCH_SIZE,
599-
dropshot::PaginationOrder::Ascending,
600-
);
601-
while let Some(p) = paginator.next() {
602-
let batch = self
603-
.datastore
604-
.tuf_list_repos(opctx, generation, &p.current_pagparams())
605-
.await?;
606-
paginator = p.found_batch(&batch, &|a| a.id.into_untyped_uuid());
607-
for artifact in batch {
610+
for repo in repos {
611+
for artifact in
612+
self.datastore.tuf_list_repo_artifacts(opctx, repo.id()).await?
613+
{
608614
inventory.0.entry(artifact.sha256.0).or_insert_with(|| {
609615
ArtifactPresence { sleds: BTreeMap::new(), local: None }
610616
});
@@ -785,6 +791,7 @@ mod tests {
785791
use std::fmt::Write;
786792

787793
use expectorate::assert_contents;
794+
use omicron_uuid_kinds::GenericUuid;
788795
use rand::{Rng, SeedableRng, rngs::StdRng};
789796

790797
use super::*;

nexus/tests/integration_tests/updates.rs

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -428,7 +428,7 @@ async fn test_repo_upload() -> Result<()> {
428428

429429
// Upload a new repository with a different system version but no other
430430
// changes. This should be accepted.
431-
{
431+
let initial_installinator_doc = {
432432
let tweaks = &[ManifestTweak::SystemVersion("2.0.0".parse().unwrap())];
433433
let response = trust_root
434434
.assemble_repo(&logctx.log, tweaks)
@@ -509,7 +509,9 @@ async fn test_repo_upload() -> Result<()> {
509509
description, get_description,
510510
"initial description matches fetched description"
511511
);
512-
}
512+
513+
installinator_doc_1
514+
};
513515
// The installinator document changed, so the generation number is bumped to
514516
// 3.
515517
assert_eq!(
@@ -525,6 +527,75 @@ async fn test_repo_upload() -> Result<()> {
525527
assert_eq!(status.last_run_counters.put_ok, 3);
526528
assert_eq!(status.last_run_counters.copy_ok, 1);
527529
assert_eq!(status.local_repos, 1);
530+
// Run the replication background task again; the local repos should be
531+
// dropped.
532+
let status =
533+
run_tuf_artifact_replication_step(&cptestctx.lockstep_client).await;
534+
eprintln!("{status:?}");
535+
assert_eq!(status.last_run_counters.put_config_ok, 4);
536+
assert_eq!(status.last_run_counters.list_ok, 4);
537+
assert_eq!(status.last_run_counters.sum(), 8);
538+
assert_eq!(status.local_repos, 0);
539+
540+
// Verify the initial installinator document is present on all sled-agents.
541+
let installinator_doc_hash = initial_installinator_doc.hash.to_string();
542+
for sled_agent in &cptestctx.sled_agents {
543+
for dir in sled_agent.sled_agent().artifact_store().storage_paths() {
544+
let path = dir.join(&installinator_doc_hash);
545+
assert!(path.exists(), "{path} does not exist");
546+
}
547+
}
548+
// Collect watchers for all of the sled-agent artifact delete reconcilers.
549+
let mut delete_watchers = cptestctx
550+
.sled_agents
551+
.iter()
552+
.map(|sled_agent| {
553+
sled_agent.sled_agent().artifact_store().subscribe_delete_done()
554+
})
555+
.collect::<Vec<_>>();
556+
// Manually prune the first repo.
557+
let initial_repo = datastore
558+
.tuf_repo_get_by_version(
559+
&opctx,
560+
"1.0.0".parse::<Version>().unwrap().into(),
561+
)
562+
.await?;
563+
let recent_releases =
564+
datastore.target_release_fetch_recent_distinct(&opctx, 3).await?;
565+
datastore
566+
.tuf_repo_mark_pruned(
567+
&opctx,
568+
status.generation,
569+
&recent_releases,
570+
initial_repo.repo.id(),
571+
)
572+
.await
573+
.unwrap();
574+
// Marking a repository as pruned bumps the generation number.
575+
assert_eq!(
576+
datastore.tuf_get_generation(&opctx).await.unwrap(),
577+
4u32.into()
578+
);
579+
// Run the replication background task; we should see new configs be put.
580+
let status =
581+
run_tuf_artifact_replication_step(&cptestctx.lockstep_client).await;
582+
eprintln!("{status:?}");
583+
assert_eq!(status.last_run_counters.put_config_ok, 4);
584+
assert_eq!(status.last_run_counters.list_ok, 4);
585+
assert_eq!(status.last_run_counters.sum(), 8);
586+
assert_eq!(status.generation, 4u32.into());
587+
// Wait for the delete reconciler to finish on all sled agents.
588+
futures::future::join_all(
589+
delete_watchers.iter_mut().map(|watcher| watcher.changed()),
590+
)
591+
.await;
592+
// Verify the installinator document from the initial repo is deleted.
593+
for sled_agent in &cptestctx.sled_agents {
594+
for dir in sled_agent.sled_agent().artifact_store().storage_paths() {
595+
let path = dir.join(&installinator_doc_hash);
596+
assert!(!path.exists(), "{path} was not deleted");
597+
}
598+
}
528599

529600
cptestctx.teardown().await;
530601
Ok(())

sled-agent/src/artifact_store.rs

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,6 @@ pub struct ArtifactStore<T: DatasetsManager> {
8181
ledger_tx: mpsc::Sender<LedgerManagerRequest>,
8282
config: watch::Receiver<Option<ArtifactConfig>>,
8383
pub(crate) storage: T,
84-
85-
/// Used for synchronization in unit tests.
86-
#[cfg(test)]
87-
delete_done: watch::Receiver<Generation>,
8884
}
8985

9086
impl<T: DatasetsManager> ArtifactStore<T> {
@@ -136,14 +132,10 @@ impl<T: DatasetsManager> ArtifactStore<T> {
136132
config_tx,
137133
));
138134

139-
#[cfg(test)]
140-
let (done_signal, delete_done) = watch::channel(0u32.into());
141135
tokio::task::spawn(delete_reconciler(
142136
log.clone(),
143137
storage.clone(),
144138
config.clone(),
145-
#[cfg(test)]
146-
done_signal,
147139
));
148140

149141
ArtifactStore {
@@ -155,9 +147,6 @@ impl<T: DatasetsManager> ArtifactStore<T> {
155147
ledger_tx,
156148
config,
157149
storage,
158-
159-
#[cfg(test)]
160-
delete_done,
161150
}
162151
}
163152
}
@@ -506,7 +495,6 @@ async fn delete_reconciler<T: DatasetsManager>(
506495
log: Logger,
507496
storage: T,
508497
mut receiver: watch::Receiver<Option<ArtifactConfig>>,
509-
#[cfg(test)] done_signal: watch::Sender<Generation>,
510498
) {
511499
while let Ok(()) = receiver.changed().await {
512500
let generation = match receiver.borrow_and_update().as_ref() {
@@ -580,12 +568,7 @@ async fn delete_reconciler<T: DatasetsManager>(
580568
}
581569
}
582570
}
583-
#[cfg(test)]
584-
done_signal.send_if_modified(|old| {
585-
let modified = *old != generation;
586-
*old = generation;
587-
modified
588-
});
571+
storage.signal_delete_done(generation);
589572
}
590573
warn!(log, "Delete reconciler sender dropped");
591574
}
@@ -602,6 +585,8 @@ pub trait DatasetsManager: Clone + Send + Sync + 'static {
602585
async fn copy_permit(&self) -> Option<OwnedSemaphorePermit> {
603586
None
604587
}
588+
589+
fn signal_delete_done(&self, _generation: Generation) {}
605590
}
606591

607592
impl DatasetsManager for InternalDisksReceiver {
@@ -908,16 +893,20 @@ mod test {
908893
use camino_tempfile::Utf8TempDir;
909894
use futures::stream::{self, StreamExt};
910895
use hex_literal::hex;
896+
use omicron_common::api::external::Generation;
911897
use omicron_test_utils::dev::test_setup_log;
912898
use sled_agent_api::ArtifactConfig;
913899
use tokio::io::AsyncReadExt;
914900
use tokio::sync::oneshot;
901+
use tokio::sync::watch;
915902
use tufaceous_artifact::ArtifactHash;
916903

917904
use super::{ArtifactStore, DatasetsManager, Error};
918905

919906
#[derive(Clone)]
920907
struct TestBackend {
908+
delete_done_tx: watch::Sender<Generation>,
909+
delete_done_rx: watch::Receiver<Generation>,
921910
datasets: Vec<Utf8PathBuf>,
922911
_tempdir: Arc<Utf8TempDir>,
923912
}
@@ -934,7 +923,13 @@ mod test {
934923
datasets.push(dataset)
935924
}
936925

937-
TestBackend { datasets, _tempdir: tempdir }
926+
let (delete_done_tx, delete_done_rx) = watch::channel(0u32.into());
927+
TestBackend {
928+
delete_done_tx,
929+
delete_done_rx,
930+
datasets,
931+
_tempdir: tempdir,
932+
}
938933
}
939934
}
940935

@@ -944,6 +939,14 @@ mod test {
944939
) -> impl Iterator<Item = camino::Utf8PathBuf> + '_ {
945940
self.datasets.iter().cloned()
946941
}
942+
943+
fn signal_delete_done(&self, generation: Generation) {
944+
self.delete_done_tx.send_if_modified(|old| {
945+
let modified = *old != generation;
946+
*old = generation;
947+
modified
948+
});
949+
}
947950
}
948951

949952
const TEST_ARTIFACT: Bytes = Bytes::from_static(b"I'm an artifact!\n");
@@ -1121,7 +1124,7 @@ mod test {
11211124
}
11221125

11231126
// clear `delete_done` so we can synchronize with the delete reconciler
1124-
store.delete_done.mark_unchanged();
1127+
store.storage.delete_done_rx.mark_unchanged();
11251128
// put a new config that says we don't want the artifact anymore.
11261129
config.generation = config.generation.next();
11271130
config.artifacts.remove(&TEST_HASH);
@@ -1130,7 +1133,7 @@ mod test {
11301133
// has actually occurred yet
11311134
assert!(store.list().await.unwrap().list.is_empty());
11321135
// wait for deletion to actually complete
1133-
store.delete_done.changed().await.unwrap();
1136+
store.storage.delete_done_rx.changed().await.unwrap();
11341137
// get fails, because it has been deleted
11351138
assert!(matches!(
11361139
store.get(TEST_HASH).await,

sled-agent/src/sim/artifact_store.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@
77
88
use std::sync::Arc;
99

10+
use camino::Utf8Path;
1011
use camino_tempfile::Utf8TempDir;
1112
use dropshot::{
1213
Body, ConfigDropshot, FreeformBody, HttpError, HttpResponseOk, HttpServer,
1314
Path, RequestContext, ServerBuilder,
1415
};
16+
use omicron_common::api::external::Generation;
1517
use repo_depot_api::*;
16-
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
18+
use tokio::sync::{OwnedSemaphorePermit, Semaphore, watch};
1719

1820
use crate::artifact_store::{ArtifactStore, DatasetsManager};
1921

@@ -28,6 +30,10 @@ pub struct SimArtifactStorage {
2830
// Semaphore to keep track of how many copy requests are in flight, and to
2931
// be able to await on their completion. Used in integration tests.
3032
copy_semaphore: Arc<Semaphore>,
33+
34+
// Watch channel to be able to await on the delete reconciler completing in
35+
// integration tests.
36+
delete_done: watch::Sender<Generation>,
3137
}
3238

3339
impl SimArtifactStorage {
@@ -40,6 +46,7 @@ impl SimArtifactStorage {
4046
copy_semaphore: Arc::new(
4147
const { Semaphore::const_new(MAX_PERMITS as usize) },
4248
),
49+
delete_done: watch::Sender::new(0u32.into()),
4350
}
4451
}
4552
}
@@ -54,6 +61,14 @@ impl DatasetsManager for SimArtifactStorage {
5461
async fn copy_permit(&self) -> Option<OwnedSemaphorePermit> {
5562
Some(self.copy_semaphore.clone().acquire_owned().await.unwrap())
5663
}
64+
65+
fn signal_delete_done(&self, generation: Generation) {
66+
self.delete_done.send_if_modified(|old| {
67+
let modified = *old != generation;
68+
*old = generation;
69+
modified
70+
});
71+
}
5772
}
5873

5974
impl ArtifactStore<SimArtifactStorage> {
@@ -73,6 +88,10 @@ impl ArtifactStore<SimArtifactStorage> {
7388
.unwrap()
7489
}
7590

91+
pub fn storage_paths(&self) -> impl Iterator<Item = &Utf8Path> {
92+
self.storage.dirs.iter().map(|p| p.path())
93+
}
94+
7695
pub async fn wait_for_copy_tasks(&self) {
7796
// Acquire a permit for MAX_PERMITS, which requires that all copy tasks
7897
// have dropped their permits. Then immediately drop it.
@@ -83,6 +102,10 @@ impl ArtifactStore<SimArtifactStorage> {
83102
.await
84103
.unwrap();
85104
}
105+
106+
pub fn subscribe_delete_done(&self) -> watch::Receiver<Generation> {
107+
self.storage.delete_done.subscribe()
108+
}
86109
}
87110

88111
/// Implementation of the Repo Depot API backed by an

0 commit comments

Comments
 (0)