diff --git a/Cargo.toml b/Cargo.toml index 48dc2a89..b43e5f98 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ composefs = { version = "0.3.0", path = "crates/composefs", default-features = f composefs-oci = { version = "0.3.0", path = "crates/composefs-oci", default-features = false } composefs-boot = { version = "0.3.0", path = "crates/composefs-boot", default-features = false } composefs-http = { version = "0.3.0", path = "crates/composefs-http", default-features = false } +composefs-ostree = { version = "0.3.0", path = "crates/composefs-ostree", default-features = false } [profile.dev.package.sha2] # this is *really* slow otherwise diff --git a/crates/cfsctl/Cargo.toml b/crates/cfsctl/Cargo.toml index 4901f490..6ffe0f66 100644 --- a/crates/cfsctl/Cargo.toml +++ b/crates/cfsctl/Cargo.toml @@ -11,9 +11,10 @@ rust-version.workspace = true version.workspace = true [features] -default = ['pre-6.15', 'oci'] +default = ['pre-6.15', 'oci','ostree'] http = ['composefs-http'] oci = ['composefs-oci'] +ostree = ['composefs-ostree'] rhel9 = ['composefs/rhel9'] 'pre-6.15' = ['composefs/pre-6.15'] @@ -24,6 +25,7 @@ composefs = { workspace = true } composefs-boot = { workspace = true } composefs-oci = { workspace = true, optional = true } composefs-http = { workspace = true, optional = true } +composefs-ostree = { workspace = true, optional = true } env_logger = { version = "0.11.0", default-features = false } hex = { version = "0.4.0", default-features = false } rustix = { version = "1.0.0", default-features = false, features = ["fs", "process"] } diff --git a/crates/cfsctl/src/main.rs b/crates/cfsctl/src/main.rs index dd66a949..7018c539 100644 --- a/crates/cfsctl/src/main.rs +++ b/crates/cfsctl/src/main.rs @@ -92,6 +92,33 @@ enum OciCommand { }, } +#[cfg(feature = "ostree")] +#[derive(Debug, Subcommand)] +enum OstreeCommand { + PullLocal { + repo_path: PathBuf, + ostree_ref: String, + name: Option, + #[clap(long)] + base_name: Option, + }, + Pull { + repo_url: String, + ostree_ref: String, + name: Option, + #[clap(long)] + base_name: Option, + }, + CreateImage { + commit_name: String, + #[clap(long)] + image_name: Option, + }, + Inspect { + commit_name: String, + }, +} + #[derive(Debug, Subcommand)] enum Command { /// Take a transaction lock on the repository. @@ -114,6 +141,12 @@ enum Command { #[clap(subcommand)] cmd: OciCommand, }, + /// Commands for dealing with OSTree commits + #[cfg(feature = "ostree")] + Ostree { + #[clap(subcommand)] + cmd: OstreeCommand, + }, /// Mounts a composefs, possibly enforcing fsverity of the image Mount { /// the name of the image to mount, either a sha256 digest or prefixed with 'ref/' @@ -188,7 +221,7 @@ async fn main() -> Result<()> { } } Command::Cat { name } => { - repo.merge_splitstream(&name, None, &mut std::io::stdout())?; + repo.merge_splitstream(&name, None, None, &mut std::io::stdout())?; } Command::ImportImage { reference } => { let image_id = repo.import_image(&reference, &mut std::io::stdin())?; @@ -311,6 +344,54 @@ async fn main() -> Result<()> { create_dir_all(state.join("etc/work"))?; } }, + #[cfg(feature = "ostree")] + Command::Ostree { cmd: ostree_cmd } => match ostree_cmd { + OstreeCommand::PullLocal { + ref repo_path, + ref ostree_ref, + name, + base_name, + } => { + let verity = composefs_ostree::pull_local( + &Arc::new(repo), + repo_path, + ostree_ref, + name.as_deref(), + base_name.as_deref(), + ) + .await?; + + println!("verity {}", verity.to_hex()); + } + OstreeCommand::Pull { + ref repo_url, + ref ostree_ref, + name, + base_name, + } => { + let verity = composefs_ostree::pull( + &Arc::new(repo), + repo_url, + ostree_ref, + name.as_deref(), + base_name.as_deref(), + ) + .await?; + + println!("verity {}", verity.to_hex()); + } + OstreeCommand::CreateImage { + ref commit_name, + ref image_name, + } => { + let mut fs = composefs_ostree::create_filesystem(&repo, commit_name)?; + let image_id = fs.commit_image(&repo, image_name.as_deref())?; + println!("{}", image_id.to_id()); + } + OstreeCommand::Inspect { ref commit_name } => { + composefs_ostree::inspect(&repo, commit_name)?; + } + }, Command::ComputeId { ref path, bootable, diff --git a/crates/composefs-http/src/lib.rs b/crates/composefs-http/src/lib.rs index 1d983876..390d4a02 100644 --- a/crates/composefs-http/src/lib.rs +++ b/crates/composefs-http/src/lib.rs @@ -13,9 +13,7 @@ use sha2::{Digest, Sha256}; use tokio::task::JoinSet; use composefs::{ - fsverity::FsVerityHashValue, - repository::Repository, - splitstream::{DigestMapEntry, SplitStreamReader}, + fsverity::FsVerityHashValue, repository::Repository, splitstream::SplitStreamReader, util::Sha256Digest, }; @@ -61,7 +59,7 @@ impl Downloader { } fn open_splitstream(&self, id: &ObjectID) -> Result> { - SplitStreamReader::new(File::from(self.repo.open_object(id)?)) + SplitStreamReader::new(File::from(self.repo.open_object(id)?), None) } fn read_object(&self, id: &ObjectID) -> Result> { @@ -107,7 +105,7 @@ impl Downloader { // this part is fast: it only touches the header let mut reader = self.open_splitstream(&id)?; - for DigestMapEntry { verity, body } in &reader.refs.map { + for (body, verity) in reader.iter_mappings() { match splitstreams.insert(verity.clone(), Some(*body)) { // This is the (normal) case if we encounter a splitstream we didn't see yet... None => { diff --git a/crates/composefs-oci/src/image.rs b/crates/composefs-oci/src/image.rs index 93c8c756..8a36bdfa 100644 --- a/crates/composefs-oci/src/image.rs +++ b/crates/composefs-oci/src/image.rs @@ -9,6 +9,7 @@ use composefs::{ tree::{Directory, FileSystem, Inode, Leaf}, }; +use crate::skopeo::{OCI_CONFIG_CONTENT_TYPE, TAR_LAYER_CONTENT_TYPE}; use crate::tar::{TarEntry, TarItem}; pub fn process_entry( @@ -74,14 +75,19 @@ pub fn create_filesystem( ) -> Result> { let mut filesystem = FileSystem::default(); - let mut config_stream = repo.open_stream(config_name, config_verity)?; + let mut config_stream = + repo.open_stream(config_name, config_verity, Some(OCI_CONFIG_CONTENT_TYPE))?; let config = ImageConfiguration::from_reader(&mut config_stream)?; for diff_id in config.rootfs().diff_ids() { let layer_sha256 = super::sha256_from_digest(diff_id)?; let layer_verity = config_stream.lookup(&layer_sha256)?; - let mut layer_stream = repo.open_stream(&hex::encode(layer_sha256), Some(layer_verity))?; + let mut layer_stream = repo.open_stream( + &hex::encode(layer_sha256), + Some(layer_verity), + Some(TAR_LAYER_CONTENT_TYPE), + )?; while let Some(entry) = crate::tar::get_entry(&mut layer_stream)? { process_entry(&mut filesystem, entry)?; } diff --git a/crates/composefs-oci/src/lib.rs b/crates/composefs-oci/src/lib.rs index 4617f905..1481050d 100644 --- a/crates/composefs-oci/src/lib.rs +++ b/crates/composefs-oci/src/lib.rs @@ -15,6 +15,7 @@ use composefs::{ util::{parse_sha256, Sha256Digest}, }; +use crate::skopeo::{OCI_CONFIG_CONTENT_TYPE, TAR_LAYER_CONTENT_TYPE}; use crate::tar::get_entry; type ContentAndVerity = (Sha256Digest, ObjectID); @@ -39,14 +40,19 @@ pub fn import_layer( name: Option<&str>, tar_stream: &mut impl Read, ) -> Result { - repo.ensure_stream(sha256, |writer| tar::split(tar_stream, writer), name) + repo.ensure_stream( + sha256, + TAR_LAYER_CONTENT_TYPE, + |writer| tar::split(tar_stream, writer), + name, + ) } pub fn ls_layer( repo: &Repository, name: &str, ) -> Result<()> { - let mut split_stream = repo.open_stream(name, None)?; + let mut split_stream = repo.open_stream(name, None, Some(TAR_LAYER_CONTENT_TYPE))?; while let Some(entry) = get_entry(&mut split_stream)? { println!("{entry}"); @@ -81,9 +87,9 @@ pub fn open_config( .with_context(|| format!("Object {name} is unknown to us"))? } }; - let mut stream = repo.open_stream(name, Some(id))?; + let mut stream = repo.open_stream(name, Some(id), Some(OCI_CONFIG_CONTENT_TYPE))?; let config = ImageConfiguration::from_reader(&mut stream)?; - Ok((config, stream.refs)) + Ok((config, stream.get_mappings())) } fn hash(bytes: &[u8]) -> Sha256Digest { @@ -104,7 +110,7 @@ pub fn open_config_shallow( // we need to manually check the content digest let expected_hash = parse_sha256(name) .context("Containers must be referred to by sha256 if verity is missing")?; - let mut stream = repo.open_stream(name, None)?; + let mut stream = repo.open_stream(name, None, Some(OCI_CONFIG_CONTENT_TYPE))?; let mut raw_config = vec![]; stream.read_to_end(&mut raw_config)?; ensure!(hash(&raw_config) == expected_hash, "Data integrity issue"); @@ -121,7 +127,8 @@ pub fn write_config( let json = config.to_string()?; let json_bytes = json.as_bytes(); let sha256 = hash(json_bytes); - let mut stream = repo.create_stream(Some(sha256), Some(refs)); + let mut stream = repo.create_stream(OCI_CONFIG_CONTENT_TYPE, Some(sha256)); + stream.add_sha256_mappings(refs); stream.write_inline(json_bytes); let id = repo.write_stream(stream, None)?; Ok((sha256, id)) @@ -199,7 +206,7 @@ mod test { let id = import_layer(&repo, &layer_id, Some("name"), &mut layer.as_slice()).unwrap(); let mut dump = String::new(); - let mut split_stream = repo.open_stream("refs/name", Some(&id)).unwrap(); + let mut split_stream = repo.open_stream("refs/name", Some(&id), None).unwrap(); while let Some(entry) = tar::get_entry(&mut split_stream).unwrap() { writeln!(dump, "{entry}").unwrap(); } diff --git a/crates/composefs-oci/src/skopeo.rs b/crates/composefs-oci/src/skopeo.rs index 62d981fd..2f137448 100644 --- a/crates/composefs-oci/src/skopeo.rs +++ b/crates/composefs-oci/src/skopeo.rs @@ -10,12 +10,13 @@ use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType}; use rustix::process::geteuid; use tokio::{io::AsyncReadExt, sync::Semaphore}; -use composefs::{ - fsverity::FsVerityHashValue, repository::Repository, splitstream::DigestMap, util::Sha256Digest, -}; +use composefs::{fsverity::FsVerityHashValue, repository::Repository, util::Sha256Digest}; use crate::{sha256_from_descriptor, sha256_from_digest, tar::split_async, ContentAndVerity}; +pub const TAR_LAYER_CONTENT_TYPE: u64 = 0x2a037edfcae1ffea; +pub const OCI_CONFIG_CONTENT_TYPE: u64 = 0x44218c839727a80b; + struct ImageOp { repo: Arc>, proxy: ImageProxy, @@ -78,7 +79,9 @@ impl ImageOp { self.progress .println(format!("Fetching layer {}", hex::encode(layer_sha256)))?; - let mut splitstream = self.repo.create_stream(Some(layer_sha256), None); + let mut splitstream = self + .repo + .create_stream(TAR_LAYER_CONTENT_TYPE, Some(layer_sha256)); match descriptor.media_type() { MediaType::ImageLayer => { split_async(progress, &mut splitstream).await?; @@ -155,15 +158,15 @@ impl ImageOp { entries.push((layer_sha256, future)); } + let mut splitstream = self + .repo + .create_stream(OCI_CONFIG_CONTENT_TYPE, Some(config_sha256)); + // Collect the results. - let mut config_maps = DigestMap::new(); for (layer_sha256, future) in entries { - config_maps.insert(&layer_sha256, &future.await??); + splitstream.add_sha256_mapping(&layer_sha256, &future.await??); } - let mut splitstream = self - .repo - .create_stream(Some(config_sha256), Some(config_maps)); splitstream.write_inline(&raw_config); let config_id = self.repo.write_stream(splitstream, None)?; diff --git a/crates/composefs-ostree/Cargo.toml b/crates/composefs-ostree/Cargo.toml new file mode 100644 index 00000000..24c07db3 --- /dev/null +++ b/crates/composefs-ostree/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "composefs-ostree" +description = "ostree support for composefs" +keywords = ["composefs", "ostree"] + +edition.workspace = true +license.workspace = true +readme.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true + +[dependencies] +anyhow = { version = "1.0.87", default-features = false } +composefs = { workspace = true } +configparser = { version = "3.1.0", features = [] } +flate2 = { version = "1.1.2", default-features = true } +gvariant = { version = "0.5.0", default-features = true} +hex = { version = "0.4.0", default-features = false, features = ["std"] } +rustix = { version = "1.0.0", default-features = false, features = ["fs", "mount", "process", "std"] } +sha2 = { version = "0.10.1", default-features = false } +zerocopy = { version = "0.8.0", default-features = false, features = ["derive", "std"] } +reqwest = { version = "0.12.15", features = ["zstd"] } + +[dev-dependencies] +similar-asserts = "1.7.0" + +[lints] +workspace = true diff --git a/crates/composefs-ostree/src/commit.rs b/crates/composefs-ostree/src/commit.rs new file mode 100644 index 00000000..b75576ab --- /dev/null +++ b/crates/composefs-ostree/src/commit.rs @@ -0,0 +1,534 @@ +/* Commit objects are stored in a splitstream with the content being just the + * commit data. This means that the content will match the ostree commit id. + * + * Additionally there is an objmap splitstream referenced by a splitstream + * external references. This objmap contains all the external objects referencesd + * by the commit. + */ +use anyhow::{bail, Error, Result}; +use composefs::{ + fsverity::FsVerityHashValue, + repository::Repository, + tree::{Directory, FileSystem, Inode, Leaf, LeafContent, RegularFile, Stat}, + util::Sha256Digest, +}; +use gvariant::aligned_bytes::{AlignedBuf, AlignedSlice, AsAligned, TryAsAligned, A8}; +use gvariant::{gv, Marker, Structure}; +use sha2::{Digest, Sha256}; +use std::{ + collections::{BTreeMap, HashSet, VecDeque}, + ffi::OsStr, + os::unix::ffi::OsStrExt, +}; +use std::{fmt, io::Read, sync::Arc}; + +use crate::objmap::{ObjectMapReader, ObjectMapWriter}; +use crate::repo::{split_sized_variant, ObjectType, OstreeRepo}; + +pub const COMMIT_CONTENT_TYPE: u64 = 0xc72d30f121a31936; + +const S_IFMT: u32 = 0o170000; +const S_IFLNK: u32 = 0o120000; + +#[derive(Debug)] +pub struct OstreeCommit { + data: AlignedBuf, + objmap_id: ObjectID, + objmap: ObjectMapReader, +} + +impl OstreeCommit { + pub fn load(repo: &Repository, commit_name: &str) -> Result { + let mut commit_stream = repo.open_stream(commit_name, None, Some(COMMIT_CONTENT_TYPE))?; + let mut buffer = Vec::new(); + commit_stream.read_to_end(&mut buffer)?; + + // TODO: Should we somehow validate the checksum of the commit? + // We don't have anything (other than the filename) to really tie it down though. + // Maybe gpg validate it per the ostree metadata? + + let Some((_objmap_sha, objmap_id)) = commit_stream.iter_mappings().next() else { + bail!("Missing objmap id mapping") + }; + + let objmap = ObjectMapReader::::load(repo, &objmap_id)?; + + Ok(OstreeCommit { + data: buffer.into(), + objmap_id: objmap_id.clone(), + objmap: objmap, + }) + } + + fn create_filesystem_file(&self, id: &Sha256Digest) -> Result> { + let (maybe_obj_id, file_header) = self.objmap.lookup(id).ok_or(Error::msg(format!( + "Unexpectedly missing ostree file object {}", + hex::encode(id) + )))?; + + let (_sized_data, variant_data, remaining_data) = split_sized_variant(&file_header)?; + + let data = gv!("(tuuuusa(ayay))").cast(variant_data.try_as_aligned()?); + let (size, uid, gid, mode, _zero, symlink_target, xattrs_data) = data.to_tuple(); + let mut xattrs = BTreeMap::, Box<[u8]>>::new(); + for x in xattrs_data.iter() { + let (key, value) = x.to_tuple(); + xattrs.insert(OsStr::from_bytes(key).into(), Box::from(value)); + } + + let stat = Stat { + st_mode: u32::from_be(*mode), + st_uid: u32::from_be(*uid), + st_gid: u32::from_be(*gid), + st_mtim_sec: 0, + xattrs: xattrs.into(), + }; + + let content = if (stat.st_mode & S_IFMT) == S_IFLNK { + LeafContent::Symlink(OsStr::new(symlink_target.to_str()).into()) + } else { + let file = if let Some(obj_id) = maybe_obj_id { + if remaining_data.len() > 0 { + bail!("Unexpected trailing file data"); + } + RegularFile::External(obj_id.clone(), u64::from_be(*size)) + } else { + RegularFile::Inline(remaining_data.into()) + }; + LeafContent::Regular(file) + }; + + Ok(Leaf { stat, content }) + } + + fn create_filesystem_dir( + &self, + dirtree_id: &Sha256Digest, + dirmeta_id: &Sha256Digest, + ) -> Result> { + let (_obj_id, dirmeta) = + self.objmap + .lookup(dirmeta_id.into()) + .ok_or(Error::msg(format!( + "Unexpectedly missing ostree dirmeta object {}", + hex::encode(dirmeta_id) + )))?; + let (_obj_id, dirtree) = + self.objmap + .lookup(dirtree_id.into()) + .ok_or(Error::msg(format!( + "Unexpectedly missing ostree dirtree object {}", + hex::encode(dirtree_id) + )))?; + + let dirmeta_sha = Sha256::digest(dirmeta); + if *dirmeta_sha != *dirmeta_id { + bail!( + "Invalid dirmeta checksum {:?}, expected {:?}", + dirmeta_sha, + dirmeta_id + ); + } + let dirtree_sha = Sha256::digest(dirtree); + if *dirtree_sha != *dirtree_id { + bail!( + "Invalid dirtree checksum {:?}, expected {:?}", + dirtree_sha, + dirtree_id + ); + } + + let data = gv!("(uuua(ayay))").cast(dirmeta.as_aligned()); + let (uid, gid, mode, xattrs_data) = data.to_tuple(); + let mut xattrs = BTreeMap::, Box<[u8]>>::new(); + for x in xattrs_data.iter() { + let (key, value) = x.to_tuple(); + xattrs.insert(OsStr::from_bytes(key).into(), Box::from(value)); + } + + let stat = Stat { + st_mode: u32::from_be(*mode), + st_uid: u32::from_be(*uid), + st_gid: u32::from_be(*gid), + st_mtim_sec: 0, + xattrs: xattrs.into(), + }; + + let mut directory = Directory::new(stat); + + let tree_data = gv!("(a(say)a(sayay))").cast(dirtree.as_aligned()); + let (files_data, dirs_data) = tree_data.to_tuple(); + + for f in files_data.iter() { + let (name, checksum) = f.to_tuple(); + + let file = self.create_filesystem_file(checksum.try_into()?)?; + directory.insert(OsStr::new(name.to_str()), Inode::Leaf(file.into())); + } + + for d in dirs_data.iter() { + let (name, tree_checksum, meta_checksum) = d.to_tuple(); + + let subdir = + self.create_filesystem_dir(tree_checksum.try_into()?, meta_checksum.try_into()?)?; + + directory.insert( + OsStr::new(name.to_str()), + Inode::Directory(Box::new(subdir)), + ); + } + + Ok(directory) + } + + pub fn create_filesystem(&self) -> Result> { + let data = gv!("(a{sv}aya(say)sstayay)").cast(&self.data); + let ( + _metadata_data, + _parent_checksum, + _related_objects, + _subject, + _body, + _timestamp, + root_tree, + root_metadata, + ) = data.to_tuple(); + + let root = self.create_filesystem_dir(root_tree.try_into()?, root_metadata.try_into()?)?; + + Ok(FileSystem:: { + root: root, + have_root_stat: true, + }) + } + + fn lookup_dirmeta(&self, id: &Sha256Digest) -> Option<&AlignedSlice> { + if let Some((None, data)) = self.objmap.lookup(id) { + Some(data) + } else { + None + } + } + + fn lookup_dirtree(&self, id: &Sha256Digest) -> Option<&AlignedSlice> { + if let Some((None, data)) = self.objmap.lookup(id) { + Some(data) + } else { + None + } + } + + fn lookup_file(&self, id: &Sha256Digest) -> Option<(&AlignedSlice, Option<&ObjectID>)> { + if let Some((objectid, data)) = self.objmap.lookup(id) { + Some((data, objectid)) + } else { + None + } + } + + pub(crate) fn inspect(&self) { + println!("objmap id: {:?}", &self.objmap_id); + for (ostree_digest, maybe_obj_id, _data) in self.objmap.iter() { + if let Some(obj_id) = maybe_obj_id { + println!("Ostree {} => {:?}", hex::encode(ostree_digest), obj_id); + } + } + } +} + +#[derive(Debug)] +pub struct OstreeCommitWriter { + repo: Arc>, + objmap: ObjectMapWriter, + commit_id: Option, + commit: Option, +} + +impl OstreeCommitWriter { + pub fn new(repo: &Arc>) -> Self { + OstreeCommitWriter { + repo: repo.clone(), + commit: None, + commit_id: None, + objmap: ObjectMapWriter::::new(), + } + } + + pub fn ensure_commit(&self) -> Result<(Sha256Digest, ObjectID)> { + let commit = self + .commit + .as_ref() + .ok_or(Error::msg("No commit was pulled"))?; + + let commit_id = self + .commit_id + .as_ref() + .ok_or(Error::msg("No commit was pulled"))?; + + let (objmap_id, objmap_sha256) = self.objmap.serialize(&self.repo)?; + + let mut stream = self + .repo + .create_stream(COMMIT_CONTENT_TYPE, Some(*commit_id)); + + stream.add_sha256_mapping(&objmap_sha256, &objmap_id); + + stream.write_inline(&commit); + let object_id = self.repo.write_stream(stream, None)?; + + Ok((*commit_id, object_id)) + } + + fn insert_dirmeta(&mut self, id: &Sha256Digest, data: &[u8]) { + self.objmap.insert(id, None, data); + } + + fn insert_dirtree(&mut self, id: &Sha256Digest, data: &[u8]) { + self.objmap.insert(id, None, data); + } + + fn insert_file( + &mut self, + id: &Sha256Digest, + obj_id: Option<&ObjectID>, + file_header: AlignedBuf, + ) { + self.objmap.insert(id, obj_id, &file_header); + } +} + +struct Outstanding { + id: Sha256Digest, + obj_type: ObjectType, +} + +impl fmt::Debug for Outstanding { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Outstanding") + .field("id", &hex::encode(self.id)) + .field("obj_type", &self.obj_type) + .finish() + } +} + +#[derive(Debug)] +pub struct PullOperation> { + repo: Arc>, + builder: OstreeCommitWriter, + ostree_repo: RepoType, + base_commits: Vec>, + outstanding: VecDeque, + // All ids that were ever enqueued (including already fetched and currently being fetched) + fetched: HashSet, +} + +impl> + PullOperation +{ + pub fn new(repo: &Arc>, ostree_repo: RepoType) -> Self { + PullOperation { + repo: repo.clone(), + builder: OstreeCommitWriter::::new(repo), + ostree_repo: ostree_repo, + outstanding: VecDeque::new(), + base_commits: vec![], + fetched: HashSet::new(), + } + } + + pub fn add_base(&mut self, base_name: &str) -> Result<()> { + let base = OstreeCommit::::load(&self.repo, base_name)?; + self.base_commits.push(base); + Ok(()) + } + + fn enqueue_fetch(&mut self, id: &Sha256Digest, obj_type: ObjectType) { + // To avoid fetching twice, even if the id is not in the outstanding list + // (for example we may be currenly downloading it) we keep all ids we ever + // fetch in a map + if self.fetched.contains(id) { + return; + } + self.fetched.insert(*id); + // We request metadata objects first + if obj_type == ObjectType::File { + self.outstanding.push_back(Outstanding { + id: *id, + obj_type: obj_type, + }); + } else { + self.outstanding.push_front(Outstanding { + id: *id, + obj_type: obj_type, + }); + } + } + + fn maybe_fetch_file(&mut self, id: &Sha256Digest) { + if self.builder.objmap.contains(id) { + return; + } + + for base in self.base_commits.iter() { + if let Some((file_header, obj_id)) = base.lookup_file(id) { + self.add_file(id, obj_id.cloned().as_ref(), file_header.to_owned()); + return; + } + } + + self.enqueue_fetch(id, ObjectType::File); + } + + fn add_file(&mut self, id: &Sha256Digest, obj_id: Option<&ObjectID>, file_header: AlignedBuf) { + self.builder.insert_file(id, obj_id, file_header); + } + + fn maybe_fetch_dirmeta(&mut self, id: &Sha256Digest) { + if self.builder.objmap.contains(id) { + return; + } + + for base in self.base_commits.iter() { + if let Some(dirmeta) = base.lookup_dirmeta(id) { + self.add_dirmeta(id, dirmeta.to_owned()); + return; + } + } + + self.enqueue_fetch(id, ObjectType::DirMeta); + } + + fn add_dirmeta(&mut self, id: &Sha256Digest, data: AlignedBuf) { + self.builder.insert_dirmeta(id, &data); + } + + fn maybe_fetch_dirtree(&mut self, id: &Sha256Digest) { + if self.builder.objmap.contains(id) { + return; + } + + for base in self.base_commits.iter() { + if let Some(dirtree) = base.lookup_dirtree(id) { + self.add_dirtree(id, dirtree.to_owned()); + return; + } + } + + self.enqueue_fetch(id, ObjectType::DirTree); + } + + fn add_dirtree(&mut self, id: &Sha256Digest, buf: AlignedBuf) { + let data = gv!("(a(say)a(sayay))").cast(buf.as_aligned()); + let (files_data, dirs_data) = data.to_tuple(); + + for f in files_data.iter() { + let (_name, checksum) = f.to_tuple(); + + self.maybe_fetch_file(checksum.try_into().unwrap()); + } + + for d in dirs_data.iter() { + let (_name, tree_checksum, meta_checksum) = d.to_tuple(); + + self.maybe_fetch_dirmeta(meta_checksum.try_into().unwrap()); + self.maybe_fetch_dirtree(tree_checksum.try_into().unwrap()); + } + + self.builder.insert_dirtree(id, &buf); + } + + fn add_commit(&mut self, id: &Sha256Digest, commit: AlignedBuf) { + let data = gv!("(a{sv}aya(say)sstayay)").cast(&commit); + let ( + _metadata_data, + _parent_checksum, + _related_objects, + _subject, + _body, + _timestamp, + root_tree, + root_metadata, + ) = data.to_tuple(); + + self.maybe_fetch_dirmeta(root_metadata.try_into().unwrap()); + self.maybe_fetch_dirtree(root_tree.try_into().unwrap()); + + self.builder.commit_id = Some(*id); + self.builder.commit = Some(commit); + } + + pub async fn pull_commit( + &mut self, + commit_id: &Sha256Digest, + ) -> Result<(Sha256Digest, ObjectID)> { + self.enqueue_fetch(commit_id, ObjectType::Commit); + + // TODO: Support deltas + + // TODO: At least for http we should make parallel fetches + while self.outstanding.len() > 0 { + let fetch = self.outstanding.pop_front().unwrap(); + println!( + "Fetching ostree {:?} object {} ", + fetch.obj_type, + hex::encode(fetch.id) + ); + + match fetch.obj_type { + ObjectType::Commit => { + let data = self + .ostree_repo + .fetch_object(&fetch.id, fetch.obj_type) + .await?; + let data_sha = Sha256::digest(&*data); + if *data_sha != fetch.id { + bail!( + "Invalid commit checksum {:?}, expected {:?}", + data_sha, + fetch.id + ); + } + self.add_commit(&fetch.id, data); + } + ObjectType::DirMeta => { + let data = self + .ostree_repo + .fetch_object(&fetch.id, fetch.obj_type) + .await?; + let data_sha = Sha256::digest(&*data); + if *data_sha != fetch.id { + bail!( + "Invalid dirmeta checksum {:?}, expected {:?}", + data_sha, + fetch.id + ); + } + self.add_dirmeta(&fetch.id, data); + } + ObjectType::DirTree => { + let data = self + .ostree_repo + .fetch_object(&fetch.id, fetch.obj_type) + .await?; + let data_sha = Sha256::digest(&*data); + if *data_sha != fetch.id { + bail!( + "Invalid dirtree checksum {:?}, expected {:?}", + data_sha, + fetch.id + ); + } + self.add_dirtree(&fetch.id, data); + } + ObjectType::File => { + let (file_header, obj_id) = self.ostree_repo.fetch_file(&fetch.id).await?; + + self.add_file(&fetch.id, obj_id.as_ref(), file_header); + } + _ => {} + } + } + + self.builder.ensure_commit() + } +} diff --git a/crates/composefs-ostree/src/lib.rs b/crates/composefs-ostree/src/lib.rs new file mode 100644 index 00000000..28b79fb1 --- /dev/null +++ b/crates/composefs-ostree/src/lib.rs @@ -0,0 +1,102 @@ +use anyhow::Result; +use rustix::fs::CWD; +use std::{path::Path, sync::Arc}; + +use composefs::{fsverity::FsVerityHashValue, repository::Repository, tree::FileSystem}; + +pub mod commit; +pub mod objmap; +pub mod repo; + +use crate::commit::{OstreeCommit, PullOperation}; +use crate::repo::{LocalRepo, RemoteRepo}; + +pub async fn pull_local( + repo: &Arc>, + path: &Path, + ostree_ref: &str, + reference: Option<&str>, + base_reference: Option<&str>, +) -> Result { + let ostree_repo = LocalRepo::open_path(repo, CWD, path)?; + + let commit_checksum = ostree_repo.read_ref(ostree_ref)?; + + let mut op = PullOperation::>::new(repo, ostree_repo); + if let Some(base_name) = base_reference { + op.add_base(base_name)?; + } + + // If we're giving the new image a new, use any existing image + // with that name as a potential base + if let Some(reference) = reference { + if repo.has_named_stream(&reference) { + let reference_path = format!("refs/{reference}"); + op.add_base(&reference_path)?; + } + } + + let (sha256, objid) = op.pull_commit(&commit_checksum).await?; + + if let Some(name) = reference { + repo.name_stream(sha256, name)?; + } + + Ok(objid) +} + +pub async fn pull( + repo: &Arc>, + url: &str, + ostree_ref: &str, + reference: Option<&str>, + base_reference: Option<&str>, +) -> Result { + let ostree_repo = RemoteRepo::new(repo, url)?; + + let commit_checksum = ostree_repo.resolve_ref(ostree_ref).await?; + + let mut op = PullOperation::>::new(repo, ostree_repo); + if let Some(base_name) = base_reference { + op.add_base(base_name)?; + } + + // If we're giving the new image a new, use any existing image + // with that name as a potential base + if let Some(reference) = reference { + if repo.has_named_stream(&reference) { + let reference_path = format!("refs/{reference}"); + op.add_base(&reference_path)?; + } + } + + let (sha256, objid) = op.pull_commit(&commit_checksum).await?; + + if let Some(name) = reference { + repo.name_stream(sha256, name)?; + } + + Ok(objid) +} + +/// Creates a filesystem from the given OSTree commit. +pub fn create_filesystem( + repo: &Repository, + commit_name: &str, +) -> Result> { + let image = OstreeCommit::::load(repo, commit_name)?; + let fs = image.create_filesystem()?; + + Ok(fs) +} + +/// Creates a filesystem from the given OSTree commit. +pub fn inspect( + repo: &Repository, + commit_name: &str, +) -> Result<()> { + let image = OstreeCommit::::load(repo, commit_name)?; + image.inspect(); + + Ok(()) +} diff --git a/crates/composefs-ostree/src/objmap.rs b/crates/composefs-ostree/src/objmap.rs new file mode 100644 index 00000000..34b8d3c9 --- /dev/null +++ b/crates/composefs-ostree/src/objmap.rs @@ -0,0 +1,461 @@ +/* Implementation of the ObjectID Map format + * + * ObjectID maps are mappings from a foreign sha256 digest of some + * form into an header of data, and an optional reference to an + * external ObjectID (i.e. a fsverity) matching a composefs repo + * ObjectID format. + * + * The file format is intended to be inside of a splitstream and + * uses the splitstream header to reference the external object ids. + * + * An object file has this format: + * (All ints are in little endian) + * + * buckets; + * 256 x (indexes are into mapped_ids) + * +-----------------------------------+ + * | u32: end index of bucket | + * +-----------------------------------+ + * + * mapped_ids: + * n_objects x (sorted) + * +-----------------------------------+ + * | [u8; 32] mapped object id | + * +-----------------------------------+ + * + * object_data: + * n_objects x (same order as object_ids) + * +-----------------------------------+ + * | u32: offset to per-object data | + * | u32: length of per-object data | + * | u32: Index of external object ref | + * | or MAXUINT32 if none. | + * +-----------------------------------+ + * + * Offset are 8byte aligned offsets from after the end of the + * object_data array. + * + */ +use anyhow::{Error, Result}; +use gvariant::aligned_bytes::{AlignedBuf, AlignedSlice, TryAsAligned, A8}; +use std::{fmt, fs::File, io::Read, mem::size_of, sync::Arc}; +use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout}; + +use composefs::{ + fsverity::FsVerityHashValue, + repository::Repository, + splitstream::{SplitStreamReader, SplitStreamWriter}, + util::Sha256Digest, +}; + +const OBJMAP_CONTENT_TYPE: u64 = 0xAFE138C18C463EF1; + +#[derive(Debug, FromBytes, Immutable, IntoBytes, KnownLayout)] +#[repr(C)] +struct ObjMapHeader { + bucket_ends: [u32; 256], +} + +#[derive(Debug, FromBytes, Immutable, KnownLayout)] +#[repr(C)] +struct Sha256DigestArray { + ids: [Sha256Digest], +} + +#[derive(Debug, FromBytes, Immutable, KnownLayout)] +#[repr(C)] +struct ObjectIDArray { + ids: [ObjectID], +} + +const NO_EXTERNAL_INDEX: u32 = u32::MAX; + +#[derive(Debug, FromBytes, Immutable, IntoBytes, KnownLayout, Clone)] +#[repr(C)] +struct DataRef { + offset: u32, + size: u32, + external_index: u32, +} + +impl DataRef { + pub fn new(offset: usize, size: usize, external_index: Option) -> Self { + DataRef { + offset: u32::to_le(offset as u32), + size: u32::to_le(size as u32), + external_index: u32::to_le(match external_index { + Some(idx) => idx as u32, + None => NO_EXTERNAL_INDEX, + }), + } + } + pub fn get_offset(&self) -> usize { + return u32::from_le(self.offset) as usize; + } + pub fn get_size(&self) -> usize { + return u32::from_le(self.size) as usize; + } + pub fn get_external_index(&self) -> Option { + match u32::from_le(self.external_index) { + NO_EXTERNAL_INDEX => None, + idx => Some(idx as usize), + } + } +} + +#[derive(Debug, FromBytes, Immutable, IntoBytes, KnownLayout)] +#[repr(C)] +struct DataRefs { + datas: [DataRef], +} + +#[derive(Debug)] +struct WriterMapEntry { + mapped_id: Sha256Digest, + verity: Option, + data: AlignedBuf, +} + +#[derive(Debug)] +pub struct ObjectMapWriter { + map: Vec>, +} + +fn align8(x: usize) -> usize { + (x + 7) & !7 +} + +impl ObjectMapWriter { + pub fn new() -> Self { + ObjectMapWriter { map: vec![] } + } + + pub fn iter( + &self, + ) -> impl Iterator, &AlignedSlice)> { + self.map + .iter() + .map(|e| (&e.mapped_id, e.verity.as_ref(), &e.data[..])) + } + + pub fn contains(&self, mapped_id: &Sha256Digest) -> bool { + match self.map.binary_search_by_key(mapped_id, |e| e.mapped_id) { + Ok(_) => true, + Err(..) => false, + } + } + + pub fn lookup( + &self, + mapped_id: &Sha256Digest, + ) -> Option<(Option<&ObjectID>, &AlignedSlice)> { + match self.map.binary_search_by_key(mapped_id, |e| e.mapped_id) { + Ok(idx) => Some((self.map[idx].verity.as_ref(), &self.map[idx].data)), + Err(..) => None, + } + } + + pub fn insert(&mut self, mapped_id: &Sha256Digest, verity: Option<&ObjectID>, data: &[u8]) { + match self.map.binary_search_by_key(mapped_id, |e| e.mapped_id) { + Ok(_idx) => {} + Err(idx) => { + let mut aligned_data = AlignedBuf::new(); + aligned_data.with_vec(|v| v.extend_from_slice(data)); + self.map.insert( + idx, + WriterMapEntry { + mapped_id: *mapped_id, + verity: verity.cloned(), + data: aligned_data, + }, + ); + } + } + } + + pub fn merge_from(&mut self, reader: &ObjectMapReader) { + for (sha256, objid, data) in reader.iter() { + self.insert(sha256, objid, data); + } + } + + pub fn serialize(&self, repo: &Arc>) -> Result<(ObjectID, Sha256Digest)> { + let mut ss = SplitStreamWriter::::new(repo, OBJMAP_CONTENT_TYPE, true, None); + + /* Ensure we can index and count items using u32 (leaving one for NO_EXTERNAL_INDEX) */ + let item_count = self.map.len(); + if item_count > (NO_EXTERNAL_INDEX - 1) as usize { + return Err(Error::msg("Too many items in object map")); + } + + let mut header = ObjMapHeader { + bucket_ends: [0; 256], + }; + + // Compute data offsets and add external object references + let mut data_size = 0usize; + let mut data_offsets = vec![0usize; item_count]; + for (i, e) in self.map.iter().enumerate() { + data_offsets[i] = data_size; + data_size += align8(e.data.len()); + + if let Some(verity) = &e.verity { + ss.add_external_reference(&verity) + } + } + + // Ensure all data can be indexed by u32 + if data_size > u32::MAX as usize { + return Err(Error::msg("Too large data in object map")); + } + + // Compute bucket ends + for e in self.map.iter() { + // Initially end is just the count + header.bucket_ends[e.mapped_id[0] as usize] += 1; + } + for i in 1..256 { + // Then we sum them up to the end + header.bucket_ends[i] += header.bucket_ends[i - 1]; + } + // Convert buckets to little endian + for i in 0..256 { + header.bucket_ends[i] = u32::to_le(header.bucket_ends[i]); + } + + // Add header + ss.write_inline(header.as_bytes()); + // Add mapped ids + for e in self.map.iter() { + ss.write_inline(&e.mapped_id); + } + // Add data refs + for (i, e) in self.map.iter().enumerate() { + let idx = if let Some(verity) = &e.verity { + ss.lookup_external_reference(&verity) + } else { + None + }; + let d = DataRef::new(data_offsets[i], e.data.len(), idx); + ss.write_inline(d.as_bytes()); + } + + // Add 8-aligned data chunks + for e in self.map.iter() { + ss.write_inline(&e.data); + // Pad to 8 + let padding = align8(e.data.len()) - e.data.len(); + if padding > 0 { + ss.write_inline(&vec![0u8; padding]); + } + } + + let (objid, sha256) = ss.done()?; + + // This is safe because we passed true to compute this above + Ok((objid, sha256.unwrap())) + } +} + +pub struct ObjectMapReader { + data: AlignedBuf, + bucket_ends: [u32; 256], + mapped_ids: Vec, + datas: Vec, + pub refs: Vec, +} + +impl fmt::Debug for ObjectMapReader { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut m = f.debug_map(); + for mapped_id in self.mapped_ids.iter() { + m.entry( + &hex::encode(mapped_id), + &format!("{:?}", self.lookup(mapped_id).unwrap()), + ); + } + m.finish() + } +} + +fn validate_buckets(buckets: &[u32; 256]) -> Result<()> { + for i in 1..256 { + // Bucket ends are (non-strictly) increasing + if buckets[i] < buckets[i - 1] { + return Err(Error::msg(format!("Invalid objmap bucket data"))); + } + } + Ok(()) +} + +impl ObjectMapReader { + pub fn load(repo: &Repository, obj_id: &ObjectID) -> Result { + let fd = repo.open_object(obj_id)?; + + let file = File::from(fd); + let mut ss = SplitStreamReader::new(file, Some(OBJMAP_CONTENT_TYPE))?; + + let mut buf = AlignedBuf::new(); + + buf.with_vec(|v| v.resize(size_of::(), 0u8)); + let n_read = ss.read(&mut buf)?; + if n_read != buf.len() { + return Err(Error::msg("Not enough data")); + } + + let h = ObjMapHeader::ref_from_bytes(&buf) + .map_err(|_e| Error::msg(format!("Invalid objmap header")))?; + + let mut buckets: [u32; 256] = h.bucket_ends; + for b in buckets.iter_mut() { + *b = u32::from_le(*b); + } + validate_buckets(&buckets)?; + let item_count = buckets[255] as usize; + + buf.with_vec(|v| v.resize(item_count * size_of::(), 0u8)); + let n_read = ss.read(&mut buf)?; + if n_read != buf.len() { + return Err(Error::msg("Not enough data")); + }; + let mapped_ids = Sha256DigestArray::ref_from_bytes(&buf) + .map_err(|_e| Error::msg(format!("Invalid objmap array")))?; + + if mapped_ids.ids.len() != item_count { + return Err(Error::msg("Invalid objmap array")); + } + let mapped = mapped_ids.ids.to_vec(); + + buf.with_vec(|v| v.resize(item_count * size_of::(), 0u8)); + let n_read = ss.read(&mut buf)?; + if n_read != buf.len() { + return Err(Error::msg("Not enough data")); + }; + + let data_refs = DataRefs::ref_from_bytes(&buf) + .map_err(|_e| Error::msg(format!("Invalid objmap array")))?; + + if data_refs.datas.len() != item_count { + return Err(Error::msg("Invalid objmap array")); + } + + let datas = data_refs.datas.to_vec(); + + buf.with_vec(|v| { + v.resize(0, 0u8); + ss.read_to_end(v) + })?; + + Ok(ObjectMapReader { + data: buf, + bucket_ends: buckets, + mapped_ids: mapped, + datas: datas, + refs: ss.refs.clone(), + }) + } + + fn get_data(&self, data_ref: &DataRef) -> (Option<&ObjectID>, &AlignedSlice) { + let start = data_ref.get_offset(); + let end = start + data_ref.get_size(); + // The unwrap here is safe, because data is always 8 aligned + let data = &self.data[start..end].try_as_aligned().unwrap(); + + if let Some(index) = data_ref.get_external_index() { + (Some(&self.refs[index]), data) + } else { + (None, data) + } + } + + fn get_bucket(&self, mapped_id: &Sha256Digest) -> (usize, usize) { + let first = mapped_id[0] as usize; + let start = if first == 0 { + 0 + } else { + self.bucket_ends[first - 1] + }; + let end = self.bucket_ends[first]; + (start as usize, end as usize) + } + + pub fn contains(&self, mapped_id: &Sha256Digest) -> bool { + let (start, end) = self.get_bucket(mapped_id); + let in_bucket = &self.mapped_ids[start..end]; + match in_bucket.binary_search(mapped_id) { + Ok(_) => true, + Err(..) => false, + } + } + + pub fn lookup( + &self, + mapped_id: &Sha256Digest, + ) -> Option<(Option<&ObjectID>, &AlignedSlice)> { + let (start, end) = self.get_bucket(mapped_id); + let mapped_ids_in_bucket = &self.mapped_ids[start..end]; + let data_refs_in_bucket = &self.datas[start..end]; + let index = match mapped_ids_in_bucket.binary_search(mapped_id) { + Ok(i) => i, + Err(..) => return None, + }; + Some(self.get_data(&data_refs_in_bucket[index])) + } + + pub fn iter( + &self, + ) -> impl Iterator, &AlignedSlice)> { + self.mapped_ids.iter().enumerate().map(|e| { + let (objid, data) = self.get_data(&self.datas[e.0]); + (e.1, objid, data) + }) + } +} + +#[cfg(test)] +mod test { + use super::*; + + use composefs::{fsverity::Sha256HashValue, util::parse_sha256}; + + #[test] + fn test_roundtrip() -> Result<()> { + let mut writer = ObjectMapWriter::::new(); + + let mapped_1 = + parse_sha256("84682bb6f0404ba9b81d5f3b753be2a08f1165389229ee8516acbd5700182cad")?; + let mapped_2 = + parse_sha256("4b37fb400b28a686343ba83f00789608e0b624b13bf50d713bc8a9b0de514e00")?; + let mapped_3 = + parse_sha256("4b37fb400b28a686343ba83f00789608e0b624b13bf50d713bc8a9b0de514e01")?; + let mapped_4 = + parse_sha256("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")?; + let objid_1 = Sha256HashValue::from_hex( + "0a11b9bb6258495dbe1677b2dc4e0d6c4cc86aef8b7c274756d40a878a921a8a", + )?; + let objid_2 = Sha256HashValue::from_hex( + "a0729185616450a10bd8439549221433edc7154d9f87a454768a368de2e5967a", + )?; + let objid_3 = Sha256HashValue::from_hex( + "37d2eeabfa179742b9b490cc3072cc289124e74f5aa3d4bc270862f07890c1cc", + )?; + let data_1 = vec![42u8]; + let data_2 = vec![12u8, 17u8]; + let data_3 = vec![]; + + writer.insert(&mapped_1, Some(&objid_1), &data_1); + writer.insert(&mapped_2, Some(&objid_2), &data_2); + writer.insert(&mapped_3, Some(&objid_3), &data_3); + + let r1 = writer.lookup(&mapped_1); + assert_eq!(r1, Some((Some(&objid_1), data_1.as_slice()))); + let r2 = writer.lookup(&mapped_2); + assert_eq!(r2, Some((Some(&objid_2), data_2.as_slice()))); + let r3 = writer.lookup(&mapped_3); + assert_eq!(r3, Some((Some(&objid_3), data_3.as_slice()))); + let r4 = writer.lookup(&mapped_4); + assert_eq!(r4, None); + + Ok(()) + } +} diff --git a/crates/composefs-ostree/src/repo.rs b/crates/composefs-ostree/src/repo.rs new file mode 100644 index 00000000..00e11f14 --- /dev/null +++ b/crates/composefs-ostree/src/repo.rs @@ -0,0 +1,553 @@ +use anyhow::{bail, Context, Error, Result}; +use configparser::ini::Ini; +use flate2::read::DeflateDecoder; +use gvariant::aligned_bytes::{AlignedBuf, AlignedSlice, A8}; +use gvariant::{gv, Marker, Structure}; +use reqwest::{Client, Url}; +use rustix::fd::AsRawFd; +use rustix::fs::{fstat, openat, readlinkat, FileType, Mode, OFlags}; +use rustix::io::Errno; +use sha2::{Digest, Sha256}; +use std::{ + fs::File, + future::Future, + io::{empty, Read}, + os::fd::{AsFd, OwnedFd}, + path::Path, + sync::Arc, +}; +use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout}; + +use composefs::{ + fsverity::FsVerityHashValue, + repository::Repository, + util::{parse_sha256, ErrnoFilter, Sha256Digest}, + INLINE_CONTENT_MAX, +}; + +#[derive(Debug, PartialEq, Copy, Clone)] +pub enum RepoMode { + Bare, + Archive, + BareUser, + BareUserOnly, + BareSplitXAttrs, +} + +#[derive(Debug, PartialEq)] +pub enum ObjectType { + File, + DirTree, + DirMeta, + Commit, + TombstoneCommit, + PayloadLink, + FileXAttrs, + FileXAttrsLink, +} + +impl ObjectType { + pub fn extension(&self, repo_mode: RepoMode) -> &'static str { + match self { + ObjectType::File => { + if repo_mode == RepoMode::Archive { + ".filez" + } else { + ".file" + } + } + ObjectType::DirTree => ".dirtree", + ObjectType::DirMeta => ".dirmeta", + ObjectType::Commit => ".commit", + ObjectType::TombstoneCommit => ".commit-tombstone", + ObjectType::PayloadLink => ".payload-link", + ObjectType::FileXAttrs => ".file-xattrs", + ObjectType::FileXAttrsLink => ".file-xattrs-link", + } + } +} + +impl RepoMode { + pub fn parse(s: &str) -> Result { + match s { + "bare" => Ok(RepoMode::Bare), + "archive" => Ok(RepoMode::Archive), + "archive-z2" => Ok(RepoMode::Archive), + "bare-user" => Ok(RepoMode::BareUser), + "bare-user-only" => Ok(RepoMode::BareUserOnly), + "bare-split-xattrs" => Ok(RepoMode::BareSplitXAttrs), + _ => Err(Error::msg(format!("Unsupported repo mode {}", s))), + } + } +} + +/* Source for locally available data about ostree objects, typically + * in-memory caches */ +pub trait ObjectStore { + fn lookup_dirmeta(&self, _id: &Sha256Digest) -> Option<&AlignedSlice>; + fn lookup_dirtree(&self, _id: &Sha256Digest) -> Option<&AlignedSlice>; + fn lookup_file(&self, _id: &Sha256Digest) -> Option<(&AlignedSlice, &ObjectID)>; +} + +fn get_object_pathname(mode: RepoMode, checksum: &Sha256Digest, object_type: ObjectType) -> String { + format!( + "{:02x}/{}{}", + checksum[0], + hex::encode(&checksum[1..]), + object_type.extension(mode) + ) +} + +fn size_prefix(data: &[u8]) -> AlignedBuf { + let mut buf = AlignedBuf::new(); + let svh = SizedVariantHeader { + size: u32::to_be(data.len() as u32), + padding: 0, + }; + buf.with_vec(|v| v.extend_from_slice(svh.as_bytes())); + buf.with_vec(|v| v.extend_from_slice(data)); + buf +} + +pub(crate) fn get_sized_variant_size(data: &[u8]) -> Result { + let variant_header_size = size_of::(); + if data.len() < variant_header_size { + bail!("Sized variant too small"); + } + + let aligned: AlignedBuf = data[0..variant_header_size].to_vec().into(); + let h = SizedVariantHeader::ref_from_bytes(&aligned) + .map_err(|e| Error::msg(format!("Sized variant header: {:?}", e)))?; + Ok(u32::from_be(h.size) as usize) +} + +pub(crate) fn split_sized_variant(data: &[u8]) -> Result<(&[u8], &[u8], &[u8])> { + let variant_size = get_sized_variant_size(data)?; + let header_size = size_of::(); + if data.len() < header_size + variant_size { + bail!("Sized variant too small"); + } + + let sized_data = &data[0..header_size + variant_size]; + let variant_data = &data[header_size..header_size + variant_size]; + let remaining_data = &data[header_size + variant_size..]; + + Ok((sized_data, variant_data, remaining_data)) +} + +pub(crate) fn ostree_zlib_file_header_to_regular(zlib_header_data: &AlignedSlice) -> Vec { + let data = gv!("(tuuuusa(ayay))").cast(zlib_header_data); + let (_size, uid, gid, mode, zero, symlink_target, xattrs_data) = data.to_tuple(); + let mut s = Vec::<(&[u8], &[u8])>::new(); + for x in xattrs_data.iter() { + let (key, value) = x.to_tuple(); + s.push((key, value)) + } + + gv!("(uuuusa(ayay))").serialize_to_vec(&(*uid, *gid, *mode, *zero, symlink_target.to_str(), &s)) +} + +/* This is how ostree stores gvariants on disk when used as a header for filez objects */ +#[derive(Debug, FromBytes, Immutable, IntoBytes, KnownLayout)] +#[repr(C)] +pub(crate) struct SizedVariantHeader { + size: u32, + padding: u32, +} + +pub trait OstreeRepo { + fn fetch_object( + &self, + checksum: &Sha256Digest, + object_type: ObjectType, + ) -> impl Future>; + fn fetch_file( + &self, + checksum: &Sha256Digest, + ) -> impl Future)>>; +} + +#[derive(Debug)] +pub struct RemoteRepo { + repo: Arc>, + client: Client, + url: Url, +} + +impl RemoteRepo { + pub fn new(repo: &Arc>, url: &str) -> Result { + Ok(RemoteRepo { + repo: repo.clone(), + client: Client::new(), + url: Url::parse(url)?, + }) + } + + pub async fn resolve_ref(&self, ref_name: &str) -> Result { + // TODO: Support summary format + let path = format!("refs/heads/{}", ref_name); + let url = self.url.join(&path)?; + + let t = self + .client + .get(url.clone()) + .send() + .await? + .text() + .await + .with_context(|| format!("Cannot get ostree ref at {}", url))?; + + Ok(parse_sha256(&t.trim())?) + } +} + +impl OstreeRepo for RemoteRepo { + async fn fetch_object( + &self, + checksum: &Sha256Digest, + object_type: ObjectType, + ) -> Result { + let path = format!( + "objects/{}", + get_object_pathname(RepoMode::Archive, checksum, object_type) + ); + let url = self.url.join(&path)?; + + let response = self.client.get(url.clone()).send().await?; + response.error_for_status_ref()?; + let b = response + .bytes() + .await + .with_context(|| format!("Cannot get ostree object at {}", url))?; + + Ok(b.to_vec().into()) + } + + async fn fetch_file(&self, checksum: &Sha256Digest) -> Result<(AlignedBuf, Option)> { + let path = format!( + "objects/{}", + get_object_pathname(RepoMode::Archive, checksum, ObjectType::File) + ); + let url = self.url.join(&path)?; + + let response = self.client.get(url.clone()).send().await?; + response.error_for_status_ref()?; + + let data = response + .bytes() + .await + .with_context(|| format!("Cannot get ostree file at {}", url))?; + + let (file_header, variant_data, compressed_data) = split_sized_variant(&data)?; + + // Force align the data as there is a gvariant-rs bug (https://github.com/ostreedev/gvariant-rs/pull/9) + let mut aligned_variant_data = AlignedBuf::new(); + aligned_variant_data.with_vec(|v| v.extend_from_slice(variant_data)); + + // Compute the checksum of (regular) header + data + let mut hasher = Sha256::new(); + let regular_header = ostree_zlib_file_header_to_regular(&aligned_variant_data); + let sized_regular_header = size_prefix(®ular_header); + hasher.update(&*sized_regular_header); + + // Decompress rest + let mut uncompressed = DeflateDecoder::new(compressed_data); + + // TODO: Stream files into repo instead of reading it all + + let mut file_content = Vec::new(); + uncompressed.read_to_end(&mut file_content)?; + + hasher.update(&file_content); + let actual_checksum = hasher.finalize(); + if *actual_checksum != *checksum { + bail!( + "Unexpected file checksum {:?}, expected {:?}", + actual_checksum, + checksum + ); + } + + let mut file_data = file_header.to_vec(); + let obj_id = if file_content.len() <= INLINE_CONTENT_MAX { + file_data.extend_from_slice(&file_content); + None + } else { + Some(self.repo.ensure_object(&file_content)?) + }; + + Ok((file_data.into(), obj_id)) + } +} + +#[derive(Debug)] +pub struct LocalRepo { + repo: Arc>, + mode: RepoMode, + dir: OwnedFd, + objects: OwnedFd, +} + +impl LocalRepo { + pub fn open_path( + repo: &Arc>, + dirfd: impl AsFd, + path: impl AsRef, + ) -> Result { + let path = path.as_ref(); + let repofd = openat( + &dirfd, + path, + OFlags::RDONLY | OFlags::CLOEXEC, + Mode::empty(), + ) + .with_context(|| format!("Cannot open ostree repository at {}", path.display()))?; + + let configfd = openat( + &repofd, + "config", + OFlags::RDONLY | OFlags::CLOEXEC, + Mode::empty(), + ) + .with_context(|| format!("Cannot open ostree repo config file at {}", path.display()))?; + + let mut config_data = String::new(); + + File::from(configfd) + .read_to_string(&mut config_data) + .with_context(|| format!("Can't read config file"))?; + + let mut config = Ini::new(); + let map = config + .read(config_data) + .map_err(Error::msg) + .with_context(|| format!("Can't read config file"))?; + + let core = if let Some(core_map) = map.get("core") { + core_map + } else { + return Err(Error::msg(format!("No [core] section in config"))); + }; + + let mode = if let Some(Some(mode)) = core.get("mode") { + RepoMode::parse(mode)? + } else { + return Err(Error::msg(format!("No mode in [core] section in config"))); + }; + + if mode != RepoMode::Archive && mode != RepoMode::BareUserOnly { + return Err(Error::msg(format!("Unsupported repo mode {mode:?}"))); + } + + let objectsfd = openat( + &repofd, + "objects", + OFlags::PATH | OFlags::CLOEXEC | OFlags::DIRECTORY, + 0o666.into(), + ) + .with_context(|| { + format!( + "Cannot open ostree repository objects directory at {}", + path.display() + ) + })?; + + Ok(Self { + repo: repo.clone(), + mode: mode, + dir: repofd, + objects: objectsfd, + }) + } + + pub fn open_object_flags( + &self, + checksum: &Sha256Digest, + object_type: ObjectType, + flags: OFlags, + ) -> Result { + let cs = checksum.into(); + let path = get_object_pathname(self.mode, cs, object_type); + + openat(&self.objects, &path, flags | OFlags::CLOEXEC, Mode::empty()) + .with_context(|| format!("Cannot open ostree objects object at {}", path)) + } + + pub fn open_object(&self, checksum: &Sha256Digest, object_type: ObjectType) -> Result { + self.open_object_flags(checksum, object_type, OFlags::RDONLY | OFlags::NOFOLLOW) + } + + pub fn read_ref(&self, ref_name: &str) -> Result { + let path1 = format!("refs/{}", ref_name); + let path2 = format!("refs/heads/{}", ref_name); + + let fd1 = openat( + &self.dir, + &path1, + OFlags::RDONLY | OFlags::CLOEXEC, + Mode::empty(), + ) + .filter_errno(Errno::NOENT) + .with_context(|| format!("Cannot open ostree ref at {}", path1))?; + + let fd = if let Some(fd) = fd1 { + fd + } else { + openat( + &self.dir, + &path2, + OFlags::RDONLY | OFlags::CLOEXEC, + Mode::empty(), + ) + .with_context(|| format!("Cannot open ostree ref at {}", path2))? + }; + + let mut buffer = String::new(); + File::from(fd) + .read_to_string(&mut buffer) + .with_context(|| format!("Can't read ref file"))?; + + Ok(parse_sha256(&buffer.trim())?) + } + + async fn fetch_file_bare( + &self, + checksum: &Sha256Digest, + ) -> Result<(AlignedBuf, Box)> { + let path_fd = self.open_object_flags( + checksum.into(), + ObjectType::File, + OFlags::PATH | OFlags::NOFOLLOW, + )?; + + let st = fstat(&path_fd)?; + + let filetype = FileType::from_raw_mode(st.st_mode); + + let symlink_target = if filetype.is_symlink() { + readlinkat(&path_fd, "", [])?.into_string()? + } else { + String::from("") + }; + + let xattrs = Vec::<(&[u8], &[u8])>::new(); + + let (uid, gid, mode) = match self.mode { + RepoMode::Bare => { + // TODO: Read xattrs from disk + (st.st_uid, st.st_gid, st.st_mode) + } + RepoMode::BareUser => { + // TODO: read user.ostreemeta xattr + bail!("BareUser not supported yet") + } + RepoMode::BareUserOnly => (0, 0, st.st_mode), + _ => { + bail!("Unsupported repo mode {:?}", self.mode) + } + }; + + let v = gv!("(tuuuusa(ayay))").serialize_to_vec(&( + u64::to_be(st.st_size as u64), + u32::to_be(uid), + u32::to_be(gid), + u32::to_be(mode), + u32::to_be(0), // rdev + &symlink_target, + &xattrs, + )); + + let zlib_header = size_prefix(&v); + + if filetype.is_symlink() { + Ok((zlib_header, Box::new(empty()))) + } else { + let fd_path = format!("/proc/self/fd/{}", path_fd.as_fd().as_raw_fd()); + Ok((zlib_header, Box::new(File::open(fd_path)?))) + } + } + + async fn fetch_file_archive( + &self, + checksum: &Sha256Digest, + ) -> Result<(AlignedBuf, Box)> { + let fd = self.open_object(checksum.into(), ObjectType::File)?; + let mut file = File::from(fd); + + let mut header_buf = AlignedBuf::new(); + + // Read variant size header + let header_size = size_of::(); + header_buf.with_vec(|v| { + v.resize(header_size, 0u8); + file.read_exact(v) + })?; + + // Read variant + let variant_size = get_sized_variant_size(&header_buf)?; + header_buf.with_vec(|v| { + v.resize(header_size + variant_size, 0u8); + file.read_exact(&mut v[header_size..]) + })?; + + // Decompress rest + Ok((header_buf, Box::new(DeflateDecoder::new(file)))) + } +} + +impl OstreeRepo for LocalRepo { + async fn fetch_object( + &self, + checksum: &Sha256Digest, + object_type: ObjectType, + ) -> Result { + let fd = self.open_object(checksum.into(), object_type)?; + + let mut buffer = Vec::new(); + File::from(fd).read_to_end(&mut buffer)?; + Ok(buffer.into()) + } + + async fn fetch_file(&self, checksum: &Sha256Digest) -> Result<(AlignedBuf, Option)> { + let (mut header_buf, mut rest) = if self.mode == RepoMode::Archive { + self.fetch_file_archive(checksum).await? + } else { + self.fetch_file_bare(checksum).await? + }; + + // Force align the data as there is a gvariant-rs bug (https://github.com/ostreedev/gvariant-rs/pull/9) + let mut aligned_variant_data = AlignedBuf::new(); + let header_size = size_of::(); + aligned_variant_data.with_vec(|v| v.extend_from_slice(&header_buf[header_size..])); + + // Compute the checksum of (regular) header + data + let mut hasher = Sha256::new(); + let regular_header = ostree_zlib_file_header_to_regular(&aligned_variant_data); + let sized_regular_header = size_prefix(®ular_header); + hasher.update(&*sized_regular_header); + + // TODO: Stream files into repo instead of reading it all + let mut file_content = Vec::new(); + rest.read_to_end(&mut file_content)?; + hasher.update(&file_content); + + // Ensure matching checksum + let actual_checksum = hasher.finalize(); + if *actual_checksum != *checksum { + bail!( + "Unexpected file checksum {}, expected {}", + hex::encode(actual_checksum), + hex::encode(checksum) + ); + } + + let obj_id = if file_content.len() <= INLINE_CONTENT_MAX { + header_buf.with_vec(|v| v.extend_from_slice(&file_content)); + None + } else { + Some(self.repo.ensure_object(&file_content)?) + }; + + Ok((header_buf.into(), obj_id)) + } +} diff --git a/crates/composefs/src/fsverity/hashvalue.rs b/crates/composefs/src/fsverity/hashvalue.rs index 5cfb6d71..f50abb64 100644 --- a/crates/composefs/src/fsverity/hashvalue.rs +++ b/crates/composefs/src/fsverity/hashvalue.rs @@ -2,6 +2,7 @@ use core::{fmt, hash::Hash}; use hex::FromHexError; use sha2::{digest::FixedOutputReset, digest::Output, Digest, Sha256, Sha512}; +use std::cmp::Ord; use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, Unaligned}; pub trait FsVerityHashValue @@ -12,6 +13,7 @@ where Self: Hash + Eq, Self: fmt::Debug, Self: Send + Sync + Unpin + 'static, + Self: PartialOrd + Ord, { type Digest: Digest + FixedOutputReset + fmt::Debug; const ALGORITHM: u8; @@ -93,7 +95,19 @@ impl fmt::Debug for Sha512HashValue { } } -#[derive(Clone, Eq, FromBytes, Hash, Immutable, IntoBytes, KnownLayout, PartialEq, Unaligned)] +#[derive( + Clone, + Eq, + FromBytes, + Hash, + Immutable, + IntoBytes, + KnownLayout, + PartialEq, + Unaligned, + PartialOrd, + Ord, +)] #[repr(C)] pub struct Sha256HashValue([u8; 32]); @@ -110,7 +124,19 @@ impl FsVerityHashValue for Sha256HashValue { const ID: &str = "sha256"; } -#[derive(Clone, Eq, FromBytes, Hash, Immutable, IntoBytes, KnownLayout, PartialEq, Unaligned)] +#[derive( + Clone, + Eq, + FromBytes, + Hash, + Immutable, + IntoBytes, + KnownLayout, + PartialEq, + Unaligned, + PartialOrd, + Ord, +)] #[repr(C)] pub struct Sha512HashValue([u8; 64]); diff --git a/crates/composefs/src/repository.rs b/crates/composefs/src/repository.rs index cb4c0bcd..831765fd 100644 --- a/crates/composefs/src/repository.rs +++ b/crates/composefs/src/repository.rs @@ -25,7 +25,7 @@ use crate::{ EnableVerityError, FsVerityHashValue, MeasureVerityError, }, mount::{composefs_fsmount, mount_at}, - splitstream::{DigestMap, SplitStreamReader, SplitStreamWriter}, + splitstream::{SplitStreamReader, SplitStreamWriter}, util::{proc_self_fd, replace_symlinkat, ErrnoFilter, Sha256Digest}, }; @@ -223,10 +223,10 @@ impl Repository { /// store the result. pub fn create_stream( self: &Arc, + content_type: u64, sha256: Option, - maps: Option>, ) -> SplitStreamWriter { - SplitStreamWriter::new(self, maps, sha256) + SplitStreamWriter::new(self, content_type, false, sha256) } fn format_object_path(id: &ObjectID) -> String { @@ -272,11 +272,11 @@ impl Repository { Err(other) => Err(other)?, }; let mut context = Sha256::new(); - let mut split_stream = SplitStreamReader::new(File::from(stream))?; + let mut split_stream = SplitStreamReader::new(File::from(stream), None)?; // check the verity of all linked streams - for entry in &split_stream.refs.map { - if self.check_stream(&entry.body)?.as_ref() != Some(&entry.verity) { + for (body, verity) in split_stream.iter_mappings() { + if self.check_stream(body)?.as_ref() != Some(verity) { bail!("reference mismatch"); } } @@ -303,11 +303,11 @@ impl Repository { writer: SplitStreamWriter, reference: Option<&str>, ) -> Result { - let Some((.., ref sha256)) = writer.sha256 else { + let Some(sha256) = writer.expected_sha256 else { bail!("Writer doesn't have sha256 enabled"); }; let stream_path = format!("streams/{}", hex::encode(sha256)); - let object_id = writer.done()?; + let (object_id, _) = writer.done()?; let object_path = Self::format_object_path(&object_id); self.symlink(&stream_path, &object_path)?; @@ -319,6 +319,12 @@ impl Repository { Ok(object_id) } + pub fn has_named_stream(&self, name: &str) -> bool { + let stream_path = format!("streams/refs/{}", name); + + readlinkat(&self.repository, &stream_path, []).is_ok() + } + /// Assign the given name to a stream. The stream must already exist. After this operation it /// will be possible to refer to the stream by its new name 'refs/{name}'. pub fn name_stream(&self, sha256: Sha256Digest, name: &str) -> Result<()> { @@ -345,6 +351,7 @@ impl Repository { pub fn ensure_stream( self: &Arc, sha256: &Sha256Digest, + content_type: u64, callback: impl FnOnce(&mut SplitStreamWriter) -> Result<()>, reference: Option<&str>, ) -> Result { @@ -353,9 +360,9 @@ impl Repository { let object_id = match self.has_stream(sha256)? { Some(id) => id, None => { - let mut writer = self.create_stream(Some(*sha256), None); + let mut writer = self.create_stream(content_type, Some(*sha256)); callback(&mut writer)?; - let object_id = writer.done()?; + let (object_id, _) = writer.done()?; let object_path = Self::format_object_path(&object_id); self.symlink(&stream_path, &object_path)?; @@ -375,6 +382,7 @@ impl Repository { &self, name: &str, verity: Option<&ObjectID>, + expected_content_type: Option, ) -> Result> { let filename = format!("streams/{name}"); @@ -386,7 +394,7 @@ impl Repository { .with_context(|| format!("Opening ref 'streams/{name}'"))? }); - SplitStreamReader::new(file) + SplitStreamReader::new(file, expected_content_type) } pub fn open_object(&self, id: &ObjectID) -> Result { @@ -397,9 +405,10 @@ impl Repository { &self, name: &str, verity: Option<&ObjectID>, + expected_content_type: Option, stream: &mut impl Write, ) -> Result<()> { - let mut split_stream = self.open_stream(name, verity)?; + let mut split_stream = self.open_stream(name, verity, expected_content_type)?; split_stream.cat(stream, |id| -> Result> { let mut data = vec![]; File::from(self.open_object(id)?).read_to_end(&mut data)?; @@ -618,7 +627,7 @@ impl Repository { println!("{object:?} lives as a stream"); objects.insert(object.clone()); - let mut split_stream = self.open_stream(&object.to_hex(), None)?; + let mut split_stream = self.open_stream(&object.to_hex(), None, None)?; split_stream.get_object_refs(|id| { println!(" with {id:?}"); objects.insert(id.clone()); diff --git a/crates/composefs/src/splitstream.rs b/crates/composefs/src/splitstream.rs index fe50cf4d..0e0bdf54 100644 --- a/crates/composefs/src/splitstream.rs +++ b/crates/composefs/src/splitstream.rs @@ -8,7 +8,7 @@ use std::{ sync::Arc, }; -use anyhow::{bail, Result}; +use anyhow::{bail, Error, Result}; use sha2::{Digest, Sha256}; use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout}; use zstd::stream::{read::Decoder, write::Encoder}; @@ -19,8 +19,31 @@ use crate::{ util::{read_exactish, Sha256Digest}, }; +pub const SPLITSTREAM_MAGIC: [u8; 7] = [b'S', b'p', b'l', b't', b'S', b't', b'r']; + #[derive(Debug, FromBytes, Immutable, IntoBytes, KnownLayout)] #[repr(C)] +pub struct SplitstreamHeader { + pub magic: [u8; 7], // Contains SPLITSTREAM_MAGIC + pub algorithm: u8, + pub content_type: u64, // User can put whatever magic identifier they want there + pub total_size: u64, // total size of inline chunks and external chunks + pub n_refs: u64, + pub n_mappings: u64, + // Followed by n_refs ObjectIDs, sorted + // Followed by n_mappings MappingEntry, sorted by body + // Followed by zstd compressed chunks +} + +#[derive(Clone, Debug, FromBytes, Immutable, IntoBytes, KnownLayout)] +#[repr(C)] +pub struct MappingEntry { + pub body: Sha256Digest, + pub reference_idx: u64, +} + +// These are used during construction before we know the final reference indexes +#[derive(Debug)] pub struct DigestMapEntry { pub body: Sha256Digest, pub verity: ObjectID, @@ -65,9 +88,14 @@ impl DigestMap { pub struct SplitStreamWriter { repo: Arc>, + refs: Vec, + mappings: DigestMap, inline_content: Vec, + total_size: u64, writer: Encoder<'static, Vec>, - pub sha256: Option<(Sha256, Sha256Digest)>, + pub content_type: u64, + pub sha256: Option, + pub expected_sha256: Option, } impl std::fmt::Debug for SplitStreamWriter { @@ -76,6 +104,7 @@ impl std::fmt::Debug for SplitStreamWriter std::fmt::Debug for SplitStreamWriter SplitStreamWriter { pub fn new( repo: &Arc>, - refs: Option>, - sha256: Option, + content_type: u64, + compute_sha256: bool, + expected_sha256: Option, ) -> Self { // SAFETY: we surely can't get an error writing the header to a Vec - let mut writer = Encoder::new(vec![], 0).unwrap(); - - match refs { - Some(DigestMap { map }) => { - writer.write_all(&(map.len() as u64).to_le_bytes()).unwrap(); - writer.write_all(map.as_bytes()).unwrap(); - } - None => { - writer.write_all(&0u64.to_le_bytes()).unwrap(); - } - } + let writer = Encoder::new(vec![], 0).unwrap(); Self { repo: Arc::clone(repo), + content_type, inline_content: vec![], + refs: vec![], + total_size: 0, + mappings: DigestMap::new(), writer, - sha256: sha256.map(|x| (Sha256::new(), x)), + sha256: if compute_sha256 || expected_sha256.is_some() { + Some(Sha256::new()) + } else { + None + }, + expected_sha256, + } + } + + pub fn add_external_reference(&mut self, verity: &ObjectID) { + match self.refs.binary_search(verity) { + Ok(_) => {} // Already added + Err(idx) => self.refs.insert(idx, verity.clone()), + } + } + + // Note: These are only stable if no more references are added + pub fn lookup_external_reference(&self, verity: &ObjectID) -> Option { + self.refs.binary_search(verity).ok() + } + + pub fn add_sha256_mappings(&mut self, maps: DigestMap) { + for m in maps.map { + self.add_sha256_mapping(&m.body, &m.verity); } } + pub fn add_sha256_mapping(&mut self, digest: &Sha256Digest, verity: &ObjectID) { + self.add_external_reference(verity); + self.mappings.insert(digest, verity) + } + fn write_fragment(writer: &mut impl Write, size: usize, data: &[u8]) -> Result<()> { writer.write_all(&(size as u64).to_le_bytes())?; Ok(writer.write_all(data)?) @@ -121,6 +173,7 @@ impl SplitStreamWriter { self.inline_content.len(), &self.inline_content, )?; + self.total_size += self.inline_content.len() as u64; self.inline_content = new_value; } Ok(()) @@ -129,7 +182,7 @@ impl SplitStreamWriter { /// really, "add inline content to the buffer" /// you need to call .flush_inline() later pub fn write_inline(&mut self, data: &[u8]) { - if let Some((ref mut sha256, ..)) = self.sha256 { + if let Some(ref mut sha256) = self.sha256 { sha256.update(data); } self.inline_content.extend(data); @@ -147,33 +200,69 @@ impl SplitStreamWriter { } pub fn write_external(&mut self, data: &[u8], padding: Vec) -> Result<()> { - if let Some((ref mut sha256, ..)) = self.sha256 { + if let Some(ref mut sha256, ..) = self.sha256 { sha256.update(data); sha256.update(&padding); } let id = self.repo.ensure_object(data)?; + + self.add_external_reference(&id); + self.total_size += data.len() as u64; self.write_reference(&id, padding) } pub async fn write_external_async(&mut self, data: Vec, padding: Vec) -> Result<()> { - if let Some((ref mut sha256, ..)) = self.sha256 { + if let Some(ref mut sha256, ..) = self.sha256 { sha256.update(&data); sha256.update(&padding); } + self.total_size += data.len() as u64; let id = self.repo.ensure_object_async(data).await?; + self.add_external_reference(&id); self.write_reference(&id, padding) } - pub fn done(mut self) -> Result { + pub fn done(mut self) -> Result<(ObjectID, Option)> { self.flush_inline(vec![])?; - if let Some((context, expected)) = self.sha256 { - if Into::::into(context.finalize()) != expected { - bail!("Content doesn't have expected SHA256 hash value!"); + let sha256_digest = if let Some(sha256) = self.sha256 { + let actual = Into::::into(sha256.finalize()); + if let Some(expected) = self.expected_sha256 { + if actual != expected { + bail!("Content doesn't have expected SHA256 hash value!"); + } } + Some(actual) + } else { + None + }; + + let mut buf = vec![]; + let header = SplitstreamHeader { + magic: SPLITSTREAM_MAGIC, + algorithm: ObjectID::ALGORITHM, + content_type: self.content_type, + total_size: u64::to_le(self.total_size), + n_refs: u64::to_le(self.refs.len() as u64), + n_mappings: u64::to_le(self.mappings.map.len() as u64), + }; + buf.extend_from_slice(header.as_bytes()); + + for ref_id in self.refs.iter() { + buf.extend_from_slice(ref_id.as_bytes()); + } + + for mapping in self.mappings.map { + let entry = MappingEntry { + body: mapping.body, + reference_idx: u64::to_le(self.refs.binary_search(&mapping.verity).unwrap() as u64), + }; + buf.extend_from_slice(entry.as_bytes()); } - self.repo.ensure_object(&self.writer.finish()?) + buf.extend_from_slice(&self.writer.finish()?); + + Ok((self.repo.ensure_object(&buf)?, sha256_digest)) } } @@ -186,8 +275,11 @@ pub enum SplitStreamData { // utility class to help read splitstreams pub struct SplitStreamReader { decoder: Decoder<'static, BufReader>, - pub refs: DigestMap, inline_bytes: usize, + pub content_type: u64, + pub total_size: u64, + pub refs: Vec, + mappings: Vec, } impl std::fmt::Debug for SplitStreamReader { @@ -226,29 +318,74 @@ enum ChunkType { } impl SplitStreamReader { - pub fn new(reader: R) -> Result { - let mut decoder = Decoder::new(reader)?; + pub fn new(mut reader: R, expected_content_type: Option) -> Result { + let header = SplitstreamHeader::read_from_io(&mut reader) + .map_err(|e| Error::msg(format!("Error reading splitstream header: {:?}", e)))?; - let n_map_entries = { - let mut buf = [0u8; 8]; - decoder.read_exact(&mut buf)?; - u64::from_le_bytes(buf) - } as usize; + if header.magic != SPLITSTREAM_MAGIC { + bail!("Invalid splitstream header magic value"); + } - let mut refs = DigestMap:: { - map: Vec::with_capacity(n_map_entries), - }; - for _ in 0..n_map_entries { - refs.map.push(DigestMapEntry::read_from_io(&mut decoder)?); + if header.algorithm != ObjectID::ALGORITHM { + bail!("Invalid splitstream algorithm type"); } + let content_type = u64::from_le(header.content_type); + if let Some(expected) = expected_content_type { + if content_type != expected { + bail!("Invalid splitstream content type"); + } + } + + let total_size = u64::from_le(header.total_size); + let n_refs = usize::try_from(u64::from_le(header.n_refs))?; + let n_mappings = usize::try_from(u64::from_le(header.n_mappings))?; + + let mut refs = Vec::::new(); + for _ in 0..n_refs { + let objid = ObjectID::read_from_io(&mut reader) + .map_err(|e| Error::msg(format!("Invalid refs array {:?}", e)))?; + refs.push(objid.clone()); + } + + let mut mappings = Vec::::new(); + for _ in 0..n_mappings { + let mut m = MappingEntry::read_from_io(&mut reader) + .map_err(|e| Error::msg(format!("Invalid mappings array {:?}", e)))?; + m.reference_idx = u64::from_le(m.reference_idx); + if m.reference_idx >= n_refs as u64 { + bail!("Invalid mapping reference") + } + mappings.push(m.clone()); + } + + let decoder = Decoder::new(reader)?; + Ok(Self { decoder, - refs, inline_bytes: 0, + content_type, + total_size, + refs, + mappings, }) } + pub fn iter_mappings(&self) -> impl Iterator { + self.mappings + .iter() + .map(|m| (&m.body, &self.refs[m.reference_idx as usize])) + } + + pub fn get_mappings(&self) -> DigestMap { + let mut m = DigestMap::new(); + + for (body, verity) in self.iter_mappings() { + m.insert(body, verity); + } + m + } + fn ensure_chunk( &mut self, eof_ok: bool, @@ -347,36 +484,22 @@ impl SplitStreamReader { } pub fn get_object_refs(&mut self, mut callback: impl FnMut(&ObjectID)) -> Result<()> { - let mut buffer = vec![]; - - for entry in &self.refs.map { - callback(&entry.verity); - } - - loop { - match self.ensure_chunk(true, true, 0)? { - ChunkType::Eof => break Ok(()), - ChunkType::Inline => { - read_into_vec(&mut self.decoder, &mut buffer, self.inline_bytes)?; - self.inline_bytes = 0; - } - ChunkType::External(ref id) => { - callback(id); - } - } + for entry in &self.refs { + callback(entry); } + Ok(()) } pub fn get_stream_refs(&mut self, mut callback: impl FnMut(&Sha256Digest)) { - for entry in &self.refs.map { + for entry in &self.mappings { callback(&entry.body); } } pub fn lookup(&self, body: &Sha256Digest) -> Result<&ObjectID> { - match self.refs.lookup(body) { - Some(id) => Ok(id), - None => bail!("Reference is not found in splitstream"), + match self.mappings.binary_search_by_key(body, |e| e.body) { + Ok(idx) => Ok(&self.refs[self.mappings[idx].reference_idx as usize]), + Err(..) => bail!("Reference is not found in splitstream"), } } } diff --git a/crates/composefs/src/util.rs b/crates/composefs/src/util.rs index 75dbca5f..9ba09b1b 100644 --- a/crates/composefs/src/util.rs +++ b/crates/composefs/src/util.rs @@ -106,7 +106,7 @@ pub fn parse_sha256(string: impl AsRef) -> Result { Ok(value) } -pub(crate) trait ErrnoFilter { +pub trait ErrnoFilter { fn filter_errno(self, ignored: Errno) -> ErrnoResult>; } diff --git a/doc/splitstream.md b/doc/splitstream.md index 62df66e5..2bcb2230 100644 --- a/doc/splitstream.md +++ b/doc/splitstream.md @@ -22,39 +22,50 @@ extremely well. ## File format -The file format consists of a header, plus a number of data blocks. +The file format consists of a header, followed by a set of data blocks. -### Mappings +### Header -The file starts with a single u64 le integer which is the number of mapping -structures present in the file. A mapping is a relationship between a file -identified by its sha256 content hash and the fsverity hash of that same file. -These entries are encoded simply as the sha256 hash value (32 bytes) plus the -fsverity hash value (32 bytes) combined together into a single 64 byte record. - -For example, if we had a file that mapped `1234..` to `abcd..` and `5678..` to -`efab..`, the header would look like: +The header format looks like this, where all fields are little endian: ``` - 64bit 32 bytes 32 bytes + 32 bytes + 32 bytes - +--------+----------+----------+----------+---------+ - | 2 | 1234 | abcd | 5678 | efab | - +--------+----------+----------+----------+---------+ +pub const SPLITSTREAM_MAGIC : [u8; 7] = [b'S', b'p', b'l', b't', b'S', b't', b'r']; + +struct MappingEntry { + pub body: Sha256Digest, + pub reference_idx: u64, // index into references table +} + +struct SplitstreamHeader { + magic: [u8; 7], // Contains SPLITSTREAM_MAGIC + algorithm: u8, // The fs-verity algorithm used, 1 == sha256, 2 == sha512 + total_size: u64, // total size of inline chunks and external chunks + n_refs: u64, + n_mappings: u64, + refs: [ObjectID; n_refs] // sorted + mappings: [MappingEntry; n_mappings] // sorted by body +} ``` -The mappings in the header are always sorted by their sha256 content hash -values. - -The mappings serve two purposes: - - - in the case of splitstreams which refer to other splitstreams without - directly embedding the content of the other stream, this provides a - mechanism to find out which other streams are referenced. This is used for - garbage collection. - - - for the same usecase, it provides a mechanism to be able to verify the - content of the referred splitstream (by checking its fsverity digest) before - starting to iterate it +The table of references are used to allow splitstreams to refer to +other splitstreams or regular file content, either because it is +included in the stream, or just indirectly referenced. This is primarily +used to keep these objects alive during garbage collection. + +Examples of references are: + * OCI manifests reference splitstreams for tar layer split streams. + * External objects embedded in a splitstream, such as a tar layer + splitstream + * External objects indirectly references in a splitstream, such as + references from an ostree commit splitstream + +The mapping table provides a mechanismn to map the sha256 digest of a +split stream to a fs-verity digest. This allows checking of the target +fs-verity digest before use. The primary example here is OCI manifests +which reference the tar layer splitstreams. We could look up such +streams by the sha256 in the streams/ directory, but then we will not +have trusted information about what expected fs-verity the layers +would have. ### Data blocks @@ -81,6 +92,8 @@ There are two kinds of blocks: - "External" blocks (`size == 0`): in this case the length of the data is 32 bytes. This is the binary form of a sha256 hash value and is a reference to an object in the composefs repository (by its fs-verity digest). + Note that these references are *also* in the header, so there is no need + to read the entire file to find what objects are referenced. That's it, really. There's no header. The stream is over when there are no more blocks.