Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ test-tmt:
# set of default rustc warnings.
# We intentionally don't gate on this for local builds in cargo.toml
# because it impedes iteration speed.
CLIPPY_CONFIG = -A clippy::all -D clippy::correctness -D clippy::suspicious -Dunused_imports -Ddead_code
CLIPPY_CONFIG = -A clippy::all -D clippy::correctness -D clippy::suspicious -D clippy::disallowed-methods -Dunused_imports -Ddead_code
validate-rust:
cargo fmt -- --check -l
cargo test --no-run
Expand Down
109 changes: 57 additions & 52 deletions crates/lib/src/deploy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,26 +138,29 @@ fn prefix_of_progress(p: &ImportProgress) -> &'static str {
}
}

/// Write container fetch progress to standard output.
async fn handle_layer_progress_print(
mut layers: tokio::sync::mpsc::Receiver<ostree_container::store::ImportProgress>,
mut layer_bytes: tokio::sync::watch::Receiver<Option<ostree_container::store::LayerProgress>>,
/// Configuration for layer progress printing
struct LayerProgressConfig {
layers: tokio::sync::mpsc::Receiver<ostree_container::store::ImportProgress>,
layer_bytes: tokio::sync::watch::Receiver<Option<ostree_container::store::LayerProgress>>,
digest: Box<str>,
n_layers_to_fetch: usize,
layers_total: usize,
bytes_to_download: u64,
bytes_total: u64,
prog: ProgressWriter,
quiet: bool,
) -> ProgressWriter {
}

/// Write container fetch progress to standard output.
async fn handle_layer_progress_print(mut config: LayerProgressConfig) -> ProgressWriter {
let start = std::time::Instant::now();
let mut total_read = 0u64;
let bar = indicatif::MultiProgress::new();
if quiet {
if config.quiet {
bar.set_draw_target(indicatif::ProgressDrawTarget::hidden());
}
let layers_bar = bar.add(indicatif::ProgressBar::new(
n_layers_to_fetch.try_into().unwrap(),
config.n_layers_to_fetch.try_into().unwrap(),
));
let byte_bar = bar.add(indicatif::ProgressBar::new(0));
// let byte_bar = indicatif::ProgressBar::new(0);
Expand Down Expand Up @@ -185,7 +188,7 @@ async fn handle_layer_progress_print(
tokio::select! {
// Always handle layer changes first.
biased;
layer = layers.recv() => {
layer = config.layers.recv() => {
if let Some(l) = layer {
let layer = descriptor_of_progress(&l);
let layer_type = prefix_of_progress(&l);
Expand Down Expand Up @@ -213,16 +216,16 @@ async fn handle_layer_progress_print(
// Emit an event where bytes == total to signal completion.
subtask.bytes = layer_size;
subtasks.push(subtask.clone());
prog.send(Event::ProgressBytes {
config.prog.send(Event::ProgressBytes {
task: "pulling".into(),
description: format!("Pulling Image: {digest}").into(),
id: (*digest).into(),
bytes_cached: bytes_total - bytes_to_download,
description: format!("Pulling Image: {}", config.digest).into(),
id: (*config.digest).into(),
bytes_cached: config.bytes_total - config.bytes_to_download,
bytes: total_read,
bytes_total: bytes_to_download,
steps_cached: (layers_total - n_layers_to_fetch) as u64,
bytes_total: config.bytes_to_download,
steps_cached: (config.layers_total - config.n_layers_to_fetch) as u64,
steps: layers_bar.position(),
steps_total: n_layers_to_fetch as u64,
steps_total: config.n_layers_to_fetch as u64,
subtasks: subtasks.clone(),
}).await;
}
Expand All @@ -231,28 +234,28 @@ async fn handle_layer_progress_print(
break
};
},
r = layer_bytes.changed() => {
r = config.layer_bytes.changed() => {
if r.is_err() {
// If the receiver is disconnected, then we're done
break
}
let bytes = {
let bytes = layer_bytes.borrow_and_update();
let bytes = config.layer_bytes.borrow_and_update();
bytes.as_ref().cloned()
};
if let Some(bytes) = bytes {
byte_bar.set_position(bytes.fetched);
subtask.bytes = byte_bar.position();
prog.send_lossy(Event::ProgressBytes {
config.prog.send_lossy(Event::ProgressBytes {
task: "pulling".into(),
description: format!("Pulling Image: {digest}").into(),
id: (*digest).into(),
bytes_cached: bytes_total - bytes_to_download,
description: format!("Pulling Image: {}", config.digest).into(),
id: (*config.digest).into(),
bytes_cached: config.bytes_total - config.bytes_to_download,
bytes: total_read + byte_bar.position(),
bytes_total: bytes_to_download,
steps_cached: (layers_total - n_layers_to_fetch) as u64,
bytes_total: config.bytes_to_download,
steps_cached: (config.layers_total - config.n_layers_to_fetch) as u64,
steps: layers_bar.position(),
steps_total: n_layers_to_fetch as u64,
steps_total: config.n_layers_to_fetch as u64,
subtasks: subtasks.clone().into_iter().chain([subtask.clone()]).collect(),
}).await;
}
Expand Down Expand Up @@ -280,25 +283,27 @@ async fn handle_layer_progress_print(
// Since the progress notifier closed, we know import has started
// use as a heuristic to begin import progress
// Cannot be lossy or it is dropped
prog.send(Event::ProgressSteps {
task: "importing".into(),
description: "Importing Image".into(),
id: (*digest).into(),
steps_cached: 0,
steps: 0,
steps_total: 1,
subtasks: [SubTaskStep {
subtask: "importing".into(),
config
.prog
.send(Event::ProgressSteps {
task: "importing".into(),
description: "Importing Image".into(),
id: "importing".into(),
completed: false,
}]
.into(),
})
.await;
id: (*config.digest).into(),
steps_cached: 0,
steps: 0,
steps_total: 1,
subtasks: [SubTaskStep {
subtask: "importing".into(),
description: "Importing Image".into(),
id: "importing".into(),
completed: false,
}]
.into(),
})
.await;

// Return the writer
prog
config.prog
}

/// Gather all bound images in all deployments, then prune the image store,
Expand Down Expand Up @@ -332,7 +337,7 @@ pub(crate) struct PreparedImportMeta {
}

pub(crate) enum PreparedPullResult {
Ready(PreparedImportMeta),
Ready(Box<PreparedImportMeta>),
AlreadyPresent(Box<ImageState>),
}

Expand Down Expand Up @@ -372,7 +377,7 @@ pub(crate) async fn prepare_for_pull(
prep,
};

Ok(PreparedPullResult::Ready(prepared_image))
Ok(PreparedPullResult::Ready(Box::new(prepared_image)))
}

#[context("Pulling")]
Expand All @@ -388,17 +393,17 @@ pub(crate) async fn pull_from_prepared(
let digest_imp = prepared_image.digest.clone();

let printer = tokio::task::spawn(async move {
handle_layer_progress_print(
layer_progress,
layer_byte_progress,
digest.as_ref().into(),
prepared_image.n_layers_to_fetch,
prepared_image.layers_total,
prepared_image.bytes_to_fetch,
prepared_image.bytes_total,
handle_layer_progress_print(LayerProgressConfig {
layers: layer_progress,
layer_bytes: layer_byte_progress,
digest: digest.as_ref().into(),
n_layers_to_fetch: prepared_image.n_layers_to_fetch,
layers_total: prepared_image.layers_total,
bytes_to_download: prepared_image.bytes_to_fetch,
bytes_total: prepared_image.bytes_total,
prog,
quiet,
)
})
.await
});
let import = prepared_image.imp.import(prepared_image.prep).await;
Expand Down Expand Up @@ -444,7 +449,7 @@ pub(crate) async fn pull(
match prepare_for_pull(repo, imgref, target_imgref).await? {
PreparedPullResult::AlreadyPresent(existing) => Ok(existing),
PreparedPullResult::Ready(prepared_image_meta) => {
Ok(pull_from_prepared(imgref, quiet, prog, prepared_image_meta).await?)
Ok(pull_from_prepared(imgref, quiet, prog, *prepared_image_meta).await?)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/lib/src/install.rs
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ async fn install_container(
PreparedPullResult::AlreadyPresent(existing) => existing,
PreparedPullResult::Ready(image_meta) => {
check_disk_space(root_setup.physical_root.as_fd(), &image_meta, &spec_imgref)?;
pull_from_prepared(&spec_imgref, false, ProgressWriter::default(), image_meta).await?
pull_from_prepared(&spec_imgref, false, ProgressWriter::default(), *image_meta).await?
}
};

Expand Down
6 changes: 2 additions & 4 deletions crates/lib/src/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,8 @@ fn canonicalize_reference(reference: Reference) -> Option<Reference> {
reference.tag()?;

// No digest? Also pass through.
let Some(digest) = reference.digest() else {
return None;
};

let digest = reference.digest()?;
// Otherwise, replace with the digest
Some(reference.clone_with_digest(digest.to_owned()))
}

Expand Down
8 changes: 4 additions & 4 deletions crates/sysusers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,10 @@ impl SysusersEntry {
fn next_token(s: &str) -> Option<(&str, &str)> {
let s = s.trim_start();
let (first, rest) = match s.strip_prefix('"') {
None => {
let idx = s.find(|c: char| c.is_whitespace()).unwrap_or(s.len());
s.split_at(idx)
}
None => match s.find(|c: char| c.is_whitespace()) {
Some(idx) => s.split_at(idx),
None => (s, ""),
},
Some(rest) => {
let end = rest.find('"')?;
(&rest[..end], &rest[end + 1..])
Expand Down
72 changes: 37 additions & 35 deletions crates/tmpfiles/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,14 +263,16 @@ pub fn var_to_tmpfiles<U: uzers::Users, G: uzers::Groups>(
let mut prefix = PathBuf::from("/var");
let mut unsupported = Vec::new();
convert_path_to_tmpfiles_d_recurse(
&TmpfilesConvertConfig {
users,
groups,
rootfs,
existing: &existing_tmpfiles,
readonly: false,
},
&mut entries,
&mut unsupported,
users,
groups,
rootfs,
&existing_tmpfiles,
&mut prefix,
false,
)?;

// If there's no entries, don't write a file
Expand Down Expand Up @@ -309,52 +311,59 @@ pub fn var_to_tmpfiles<U: uzers::Users, G: uzers::Groups>(
})
}

/// Configuration for recursive tmpfiles conversion
struct TmpfilesConvertConfig<'a, U: uzers::Users, G: uzers::Groups> {
users: &'a U,
groups: &'a G,
rootfs: &'a Dir,
existing: &'a BTreeMap<PathBuf, String>,
readonly: bool,
}

/// Recursively explore target directory and translate content to tmpfiles.d entries. See
/// `convert_var_to_tmpfiles_d` for more background.
///
/// This proceeds depth-first and progressively deletes translated subpaths as it goes.
/// `prefix` is updated at each recursive step, so that in case of errors it can be
/// used to pinpoint the faulty path.
fn convert_path_to_tmpfiles_d_recurse<U: uzers::Users, G: uzers::Groups>(
config: &TmpfilesConvertConfig<'_, U, G>,
out_entries: &mut BTreeSet<String>,
out_unsupported: &mut Vec<PathBuf>,
users: &U,
groups: &G,
rootfs: &Dir,
existing: &BTreeMap<PathBuf, String>,
prefix: &mut PathBuf,
readonly: bool,
) -> Result<()> {
let relpath = prefix.strip_prefix("/").unwrap();
for subpath in rootfs.read_dir(relpath)? {
for subpath in config.rootfs.read_dir(relpath)? {
let subpath = subpath?;
let meta = subpath.metadata()?;
let fname = subpath.file_name();
prefix.push(fname);

let has_tmpfiles_entry = existing.contains_key(prefix);
let has_tmpfiles_entry = config.existing.contains_key(prefix);

// Translate this file entry.
if !has_tmpfiles_entry {
let entry = {
// SAFETY: We know this path is absolute
let relpath = prefix.strip_prefix("/").unwrap();
let Some(tmpfiles_meta) = FileMeta::from_fs(rootfs, &relpath)? else {
let Some(tmpfiles_meta) = FileMeta::from_fs(config.rootfs, &relpath)? else {
out_unsupported.push(relpath.into());
assert!(prefix.pop());
continue;
};
let uid = meta.uid();
let gid = meta.gid();
let user = users
let user = config
.users
.get_user_by_uid(meta.uid())
.ok_or(Error::UserNotFound(uid))?;
let username = user.name();
let username: &str = username.to_str().ok_or_else(|| Error::NonUtf8User {
uid,
name: username.to_string_lossy().into_owned(),
})?;
let group = groups
let group = config
.groups
.get_group_by_gid(gid)
.ok_or(Error::GroupNotFound(gid))?;
let groupname = group.name();
Expand All @@ -371,27 +380,18 @@ fn convert_path_to_tmpfiles_d_recurse<U: uzers::Users, G: uzers::Groups>(
// SAFETY: We know this path is absolute
let relpath = prefix.strip_prefix("/").unwrap();
// Avoid traversing mount points by default
if rootfs.open_dir_noxdev(relpath)?.is_some() {
convert_path_to_tmpfiles_d_recurse(
out_entries,
out_unsupported,
users,
groups,
rootfs,
existing,
prefix,
readonly,
)?;
if config.rootfs.open_dir_noxdev(relpath)?.is_some() {
convert_path_to_tmpfiles_d_recurse(config, out_entries, out_unsupported, prefix)?;
let relpath = prefix.strip_prefix("/").unwrap();
if !readonly {
rootfs.remove_dir_all(relpath)?;
if !config.readonly {
config.rootfs.remove_dir_all(relpath)?;
}
}
} else {
// SAFETY: We know this path is absolute
let relpath = prefix.strip_prefix("/").unwrap();
if !readonly {
rootfs.remove_file(relpath)?;
if !config.readonly {
config.rootfs.remove_file(relpath)?;
}
}
assert!(prefix.pop());
Expand Down Expand Up @@ -435,14 +435,16 @@ pub fn find_missing_tmpfiles_current_root() -> Result<TmpfilesResult> {
let mut tmpfiles = BTreeSet::new();
let mut unsupported = Vec::new();
convert_path_to_tmpfiles_d_recurse(
&TmpfilesConvertConfig {
users: &usergroups,
groups: &usergroups,
rootfs: &rootfs,
existing: &existing_tmpfiles,
readonly: true,
},
&mut tmpfiles,
&mut unsupported,
&usergroups,
&usergroups,
&rootfs,
&existing_tmpfiles,
&mut prefix,
true,
)?;
Ok(TmpfilesResult {
tmpfiles,
Expand Down