From bed66dca015d217aad82fa55275c7290a8033d3f Mon Sep 17 00:00:00 2001 From: Alexander Larsson Date: Wed, 18 Jun 2025 15:26:15 +0200 Subject: [PATCH] splitstream: Rework file format This changes the splitstream format a bit, with the goal of allowing splitstreams to support ostree files as well (see #144) The primary differences are: * The header is not compressed * All referenced fs-verity objects are stored in the header, including external chunks, mapped splitstreams and (a new feature) references that are not used in chunks. * The mapping table is separate from the reference table (and generally smaller), and indexes into it. * There is a magic value to detect the file format. * There is a magic content type to detect the type wrapped in the stream. * We store a tag for what ObjectID format is used * The total size of the stream is stored in the header. The ability to reference file objects in the repo even if they are not part of the splitstream "content" will be useful for the ostree support to reference file content objects. This change also allows more efficient GC enumeration, because we don't have to parse the entire splitstream to find the referenced objects. Signed-off-by: Alexander Larsson --- crates/cfsctl/src/main.rs | 2 +- crates/composefs-http/src/lib.rs | 8 +- crates/composefs-oci/src/image.rs | 10 +- crates/composefs-oci/src/lib.rs | 21 +- crates/composefs-oci/src/skopeo.rs | 22 +- crates/composefs-oci/src/tar.rs | 47 +++-- crates/composefs/src/fsverity/hashvalue.rs | 30 ++- crates/composefs/src/repository.rs | 37 ++-- crates/composefs/src/splitstream.rs | 221 ++++++++++++++++----- doc/splitstream.md | 67 ++++--- 10 files changed, 336 insertions(+), 129 deletions(-) diff --git a/crates/cfsctl/src/main.rs b/crates/cfsctl/src/main.rs index 0fabcb2d..25aafbff 100644 --- a/crates/cfsctl/src/main.rs +++ b/crates/cfsctl/src/main.rs @@ -188,7 +188,7 @@ async fn main() -> Result<()> { } } Command::Cat { name } => { - repo.merge_splitstream(&name, None, &mut std::io::stdout())?; + repo.merge_splitstream(&name, None, None, &mut std::io::stdout())?; } Command::ImportImage { reference } => { let image_id = repo.import_image(&reference, &mut std::io::stdin())?; diff --git a/crates/composefs-http/src/lib.rs b/crates/composefs-http/src/lib.rs index 1d983876..390d4a02 100644 --- a/crates/composefs-http/src/lib.rs +++ b/crates/composefs-http/src/lib.rs @@ -13,9 +13,7 @@ use sha2::{Digest, Sha256}; use tokio::task::JoinSet; use composefs::{ - fsverity::FsVerityHashValue, - repository::Repository, - splitstream::{DigestMapEntry, SplitStreamReader}, + fsverity::FsVerityHashValue, repository::Repository, splitstream::SplitStreamReader, util::Sha256Digest, }; @@ -61,7 +59,7 @@ impl Downloader { } fn open_splitstream(&self, id: &ObjectID) -> Result> { - SplitStreamReader::new(File::from(self.repo.open_object(id)?)) + SplitStreamReader::new(File::from(self.repo.open_object(id)?), None) } fn read_object(&self, id: &ObjectID) -> Result> { @@ -107,7 +105,7 @@ impl Downloader { // this part is fast: it only touches the header let mut reader = self.open_splitstream(&id)?; - for DigestMapEntry { verity, body } in &reader.refs.map { + for (body, verity) in reader.iter_mappings() { match splitstreams.insert(verity.clone(), Some(*body)) { // This is the (normal) case if we encounter a splitstream we didn't see yet... None => { diff --git a/crates/composefs-oci/src/image.rs b/crates/composefs-oci/src/image.rs index 93c8c756..8a36bdfa 100644 --- a/crates/composefs-oci/src/image.rs +++ b/crates/composefs-oci/src/image.rs @@ -9,6 +9,7 @@ use composefs::{ tree::{Directory, FileSystem, Inode, Leaf}, }; +use crate::skopeo::{OCI_CONFIG_CONTENT_TYPE, TAR_LAYER_CONTENT_TYPE}; use crate::tar::{TarEntry, TarItem}; pub fn process_entry( @@ -74,14 +75,19 @@ pub fn create_filesystem( ) -> Result> { let mut filesystem = FileSystem::default(); - let mut config_stream = repo.open_stream(config_name, config_verity)?; + let mut config_stream = + repo.open_stream(config_name, config_verity, Some(OCI_CONFIG_CONTENT_TYPE))?; let config = ImageConfiguration::from_reader(&mut config_stream)?; for diff_id in config.rootfs().diff_ids() { let layer_sha256 = super::sha256_from_digest(diff_id)?; let layer_verity = config_stream.lookup(&layer_sha256)?; - let mut layer_stream = repo.open_stream(&hex::encode(layer_sha256), Some(layer_verity))?; + let mut layer_stream = repo.open_stream( + &hex::encode(layer_sha256), + Some(layer_verity), + Some(TAR_LAYER_CONTENT_TYPE), + )?; while let Some(entry) = crate::tar::get_entry(&mut layer_stream)? { process_entry(&mut filesystem, entry)?; } diff --git a/crates/composefs-oci/src/lib.rs b/crates/composefs-oci/src/lib.rs index 6e77de29..64545aa6 100644 --- a/crates/composefs-oci/src/lib.rs +++ b/crates/composefs-oci/src/lib.rs @@ -16,6 +16,7 @@ use composefs::{ util::{parse_sha256, Sha256Digest}, }; +use crate::skopeo::{OCI_CONFIG_CONTENT_TYPE, TAR_LAYER_CONTENT_TYPE}; use crate::tar::get_entry; type ContentAndVerity = (Sha256Digest, ObjectID); @@ -40,14 +41,19 @@ pub fn import_layer( name: Option<&str>, tar_stream: &mut impl Read, ) -> Result { - repo.ensure_stream(sha256, |writer| tar::split(tar_stream, writer), name) + repo.ensure_stream( + sha256, + TAR_LAYER_CONTENT_TYPE, + |writer| tar::split(tar_stream, writer), + name, + ) } pub fn ls_layer( repo: &Repository, name: &str, ) -> Result<()> { - let mut split_stream = repo.open_stream(name, None)?; + let mut split_stream = repo.open_stream(name, None, Some(TAR_LAYER_CONTENT_TYPE))?; while let Some(entry) = get_entry(&mut split_stream)? { println!("{entry}"); @@ -83,9 +89,9 @@ pub fn open_config( .with_context(|| format!("Object {name} is unknown to us"))? } }; - let mut stream = repo.open_stream(name, Some(id))?; + let mut stream = repo.open_stream(name, Some(id), Some(OCI_CONFIG_CONTENT_TYPE))?; let config = ImageConfiguration::from_reader(&mut stream)?; - Ok((config, stream.refs)) + Ok((config, stream.get_mappings())) } fn hash(bytes: &[u8]) -> Sha256Digest { @@ -106,7 +112,7 @@ pub fn open_config_shallow( // we need to manually check the content digest let expected_hash = parse_sha256(name) .context("Containers must be referred to by sha256 if verity is missing")?; - let mut stream = repo.open_stream(name, None)?; + let mut stream = repo.open_stream(name, None, Some(OCI_CONFIG_CONTENT_TYPE))?; let mut raw_config = vec![]; stream.read_to_end(&mut raw_config)?; ensure!(hash(&raw_config) == expected_hash, "Data integrity issue"); @@ -123,7 +129,8 @@ pub fn write_config( let json = config.to_string()?; let json_bytes = json.as_bytes(); let sha256 = hash(json_bytes); - let mut stream = repo.create_stream(Some(sha256), Some(refs)); + let mut stream = repo.create_stream(OCI_CONFIG_CONTENT_TYPE, Some(sha256)); + stream.add_sha256_mappings(refs); stream.write_inline(json_bytes); let id = repo.write_stream(stream, None)?; Ok((sha256, id)) @@ -201,7 +208,7 @@ mod test { let id = import_layer(&repo, &layer_id, Some("name"), &mut layer.as_slice()).unwrap(); let mut dump = String::new(); - let mut split_stream = repo.open_stream("refs/name", Some(&id)).unwrap(); + let mut split_stream = repo.open_stream("refs/name", Some(&id), None).unwrap(); while let Some(entry) = tar::get_entry(&mut split_stream).unwrap() { writeln!(dump, "{entry}").unwrap(); } diff --git a/crates/composefs-oci/src/skopeo.rs b/crates/composefs-oci/src/skopeo.rs index a184d372..06c1448d 100644 --- a/crates/composefs-oci/src/skopeo.rs +++ b/crates/composefs-oci/src/skopeo.rs @@ -10,12 +10,14 @@ use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType}; use rustix::process::geteuid; use tokio::{io::AsyncReadExt, sync::Semaphore}; -use composefs::{ - fsverity::FsVerityHashValue, repository::Repository, splitstream::DigestMap, util::Sha256Digest, -}; +use composefs::{fsverity::FsVerityHashValue, repository::Repository, util::Sha256Digest}; use crate::{sha256_from_descriptor, sha256_from_digest, tar::split_async, ContentAndVerity}; +// These are randomly generated UUID-like content types +pub const TAR_LAYER_CONTENT_TYPE: u64 = 0x2a037edfcae1ffea; +pub const OCI_CONFIG_CONTENT_TYPE: u64 = 0x44218c839727a80b; + struct ImageOp { repo: Arc>, proxy: ImageProxy, @@ -95,7 +97,9 @@ impl ImageOp { self.progress .println(format!("Fetching layer {}", hex::encode(layer_sha256)))?; - let mut splitstream = self.repo.create_stream(Some(layer_sha256), None); + let mut splitstream = self + .repo + .create_stream(TAR_LAYER_CONTENT_TYPE, Some(layer_sha256)); match descriptor.media_type() { MediaType::ImageLayer => { split_async(progress, &mut splitstream).await?; @@ -172,15 +176,15 @@ impl ImageOp { entries.push((layer_sha256, future)); } + let mut splitstream = self + .repo + .create_stream(OCI_CONFIG_CONTENT_TYPE, Some(config_sha256)); + // Collect the results. - let mut config_maps = DigestMap::new(); for (layer_sha256, future) in entries { - config_maps.insert(&layer_sha256, &future.await??); + splitstream.add_sha256_mapping(&layer_sha256, &future.await??); } - let mut splitstream = self - .repo - .create_stream(Some(config_sha256), Some(config_maps)); splitstream.write_inline(&raw_config); let config_id = self.repo.write_stream(splitstream, None)?; diff --git a/crates/composefs-oci/src/tar.rs b/crates/composefs-oci/src/tar.rs index 6ea651be..c613946c 100644 --- a/crates/composefs-oci/src/tar.rs +++ b/crates/composefs-oci/src/tar.rs @@ -281,7 +281,10 @@ pub fn get_entry( } #[cfg(test)] + mod tests { + use crate::TAR_LAYER_CONTENT_TYPE; + use super::*; use composefs::{ fsverity::Sha256HashValue, generic_tree::LeafContent, repository::Repository, @@ -338,13 +341,15 @@ mod tests { fn read_all_via_splitstream(tar_data: Vec) -> Result>> { let mut tar_cursor = Cursor::new(tar_data); let repo = create_test_repository()?; - let mut writer = repo.create_stream(None, None); + let mut writer = repo.create_stream(TAR_LAYER_CONTENT_TYPE, None); split(&mut tar_cursor, &mut writer)?; let object_id = writer.done()?; - let mut reader: SplitStreamReader = - SplitStreamReader::new(repo.open_object(&object_id)?.into())?; + let mut reader: SplitStreamReader = SplitStreamReader::new( + repo.open_object(&object_id)?.into(), + Some(TAR_LAYER_CONTENT_TYPE), + )?; let mut entries = Vec::new(); while let Some(entry) = get_entry(&mut reader)? { @@ -363,13 +368,16 @@ mod tests { let mut tar_cursor = Cursor::new(tar_data); let repo = create_test_repository().unwrap(); - let mut writer = repo.create_stream(None, None); + let mut writer = repo.create_stream(TAR_LAYER_CONTENT_TYPE, None); split(&mut tar_cursor, &mut writer).unwrap(); let object_id = writer.done().unwrap(); - let mut reader: SplitStreamReader = - SplitStreamReader::new(repo.open_object(&object_id).unwrap().into()).unwrap(); + let mut reader: SplitStreamReader = SplitStreamReader::new( + repo.open_object(&object_id).unwrap().into(), + Some(TAR_LAYER_CONTENT_TYPE), + ) + .unwrap(); assert!(get_entry(&mut reader).unwrap().is_none()); } @@ -389,13 +397,16 @@ mod tests { let mut tar_cursor = Cursor::new(tar_data); let repo = create_test_repository().unwrap(); - let mut writer = repo.create_stream(None, None); + let mut writer = repo.create_stream(TAR_LAYER_CONTENT_TYPE, None); split(&mut tar_cursor, &mut writer).unwrap(); let object_id = writer.done().unwrap(); - let mut reader: SplitStreamReader = - SplitStreamReader::new(repo.open_object(&object_id).unwrap().into()).unwrap(); + let mut reader: SplitStreamReader = SplitStreamReader::new( + repo.open_object(&object_id).unwrap().into(), + Some(TAR_LAYER_CONTENT_TYPE), + ) + .unwrap(); // Should have exactly one entry let entry = get_entry(&mut reader) @@ -444,13 +455,16 @@ mod tests { let mut tar_cursor = Cursor::new(tar_data); let repo = create_test_repository().unwrap(); - let mut writer = repo.create_stream(None, None); + let mut writer = repo.create_stream(TAR_LAYER_CONTENT_TYPE, None); split(&mut tar_cursor, &mut writer).unwrap(); let object_id = writer.done().unwrap(); - let mut reader: SplitStreamReader = - SplitStreamReader::new(repo.open_object(&object_id).unwrap().into()).unwrap(); + let mut reader: SplitStreamReader = SplitStreamReader::new( + repo.open_object(&object_id).unwrap().into(), + Some(TAR_LAYER_CONTENT_TYPE), + ) + .unwrap(); let mut entries = Vec::new(); while let Some(entry) = get_entry(&mut reader).unwrap() { @@ -508,13 +522,16 @@ mod tests { // Split the tar let mut tar_cursor = Cursor::new(original_tar.clone()); let repo = create_test_repository().unwrap(); - let mut writer = repo.create_stream(None, None); + let mut writer = repo.create_stream(TAR_LAYER_CONTENT_TYPE, None); split(&mut tar_cursor, &mut writer).unwrap(); let object_id = writer.done().unwrap(); // Read back entries and compare with original headers - let mut reader: SplitStreamReader = - SplitStreamReader::new(repo.open_object(&object_id).unwrap().into()).unwrap(); + let mut reader: SplitStreamReader = SplitStreamReader::new( + repo.open_object(&object_id).unwrap().into(), + Some(TAR_LAYER_CONTENT_TYPE), + ) + .unwrap(); let mut entries = Vec::new(); while let Some(entry) = get_entry(&mut reader).unwrap() { diff --git a/crates/composefs/src/fsverity/hashvalue.rs b/crates/composefs/src/fsverity/hashvalue.rs index 5cfb6d71..f50abb64 100644 --- a/crates/composefs/src/fsverity/hashvalue.rs +++ b/crates/composefs/src/fsverity/hashvalue.rs @@ -2,6 +2,7 @@ use core::{fmt, hash::Hash}; use hex::FromHexError; use sha2::{digest::FixedOutputReset, digest::Output, Digest, Sha256, Sha512}; +use std::cmp::Ord; use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, Unaligned}; pub trait FsVerityHashValue @@ -12,6 +13,7 @@ where Self: Hash + Eq, Self: fmt::Debug, Self: Send + Sync + Unpin + 'static, + Self: PartialOrd + Ord, { type Digest: Digest + FixedOutputReset + fmt::Debug; const ALGORITHM: u8; @@ -93,7 +95,19 @@ impl fmt::Debug for Sha512HashValue { } } -#[derive(Clone, Eq, FromBytes, Hash, Immutable, IntoBytes, KnownLayout, PartialEq, Unaligned)] +#[derive( + Clone, + Eq, + FromBytes, + Hash, + Immutable, + IntoBytes, + KnownLayout, + PartialEq, + Unaligned, + PartialOrd, + Ord, +)] #[repr(C)] pub struct Sha256HashValue([u8; 32]); @@ -110,7 +124,19 @@ impl FsVerityHashValue for Sha256HashValue { const ID: &str = "sha256"; } -#[derive(Clone, Eq, FromBytes, Hash, Immutable, IntoBytes, KnownLayout, PartialEq, Unaligned)] +#[derive( + Clone, + Eq, + FromBytes, + Hash, + Immutable, + IntoBytes, + KnownLayout, + PartialEq, + Unaligned, + PartialOrd, + Ord, +)] #[repr(C)] pub struct Sha512HashValue([u8; 64]); diff --git a/crates/composefs/src/repository.rs b/crates/composefs/src/repository.rs index bc7652f0..00a89916 100644 --- a/crates/composefs/src/repository.rs +++ b/crates/composefs/src/repository.rs @@ -12,8 +12,8 @@ use anyhow::{bail, ensure, Context, Result}; use once_cell::sync::OnceCell; use rustix::{ fs::{ - fdatasync, flock, linkat, mkdirat, open, openat, readlinkat, AtFlags, Dir, FileType, - FlockOperation, Mode, OFlags, CWD, + fdatasync, flock, linkat, mkdirat, open, openat, readlinkat, statat, AtFlags, Dir, + FileType, FlockOperation, Mode, OFlags, CWD, }, io::{Errno, Result as ErrnoResult}, }; @@ -25,7 +25,7 @@ use crate::{ CompareVerityError, EnableVerityError, FsVerityHashValue, MeasureVerityError, }, mount::{composefs_fsmount, mount_at}, - splitstream::{DigestMap, SplitStreamReader, SplitStreamWriter}, + splitstream::{SplitStreamReader, SplitStreamWriter}, util::{proc_self_fd, replace_symlinkat, ErrnoFilter, Sha256Digest}, }; @@ -235,10 +235,10 @@ impl Repository { /// store the result. pub fn create_stream( self: &Arc, + content_type: u64, sha256: Option, - maps: Option>, ) -> SplitStreamWriter { - SplitStreamWriter::new(self, maps, sha256) + SplitStreamWriter::new(self, content_type, sha256) } fn format_object_path(id: &ObjectID) -> String { @@ -286,11 +286,11 @@ impl Repository { Err(other) => Err(other)?, }; let mut context = Sha256::new(); - let mut split_stream = SplitStreamReader::new(File::from(stream))?; + let mut split_stream = SplitStreamReader::new(File::from(stream), None)?; // check the verity of all linked streams - for entry in &split_stream.refs.map { - if self.check_stream(&entry.body)?.as_ref() != Some(&entry.verity) { + for (body, verity) in split_stream.iter_mappings() { + if self.check_stream(body)?.as_ref() != Some(verity) { bail!("reference mismatch"); } } @@ -335,6 +335,16 @@ impl Repository { Ok(object_id) } + pub fn has_named_stream(&self, name: &str) -> Result { + let stream_path = format!("streams/refs/{}", name); + + Ok(statat(&self.repository, &stream_path, AtFlags::empty()) + .filter_errno(Errno::NOENT) + .context("Looking for stream {name} in repository")? + .map(|s| FileType::from_raw_mode(s.st_mode).is_symlink()) + .unwrap_or(false)) + } + /// Assign the given name to a stream. The stream must already exist. After this operation it /// will be possible to refer to the stream by its new name 'refs/{name}'. pub fn name_stream(&self, sha256: Sha256Digest, name: &str) -> Result<()> { @@ -361,6 +371,7 @@ impl Repository { pub fn ensure_stream( self: &Arc, sha256: &Sha256Digest, + content_type: u64, callback: impl FnOnce(&mut SplitStreamWriter) -> Result<()>, reference: Option<&str>, ) -> Result { @@ -369,7 +380,7 @@ impl Repository { let object_id = match self.has_stream(sha256)? { Some(id) => id, None => { - let mut writer = self.create_stream(Some(*sha256), None); + let mut writer = self.create_stream(content_type, Some(*sha256)); callback(&mut writer)?; let object_id = writer.done()?; @@ -392,6 +403,7 @@ impl Repository { &self, name: &str, verity: Option<&ObjectID>, + expected_content_type: Option, ) -> Result> { let filename = format!("streams/{name}"); @@ -403,7 +415,7 @@ impl Repository { .with_context(|| format!("Opening ref 'streams/{name}'"))? }); - SplitStreamReader::new(file) + SplitStreamReader::new(file, expected_content_type) } /// Given an object identifier (a digest), return a read-only file descriptor @@ -416,9 +428,10 @@ impl Repository { &self, name: &str, verity: Option<&ObjectID>, + expected_content_type: Option, stream: &mut impl Write, ) -> Result<()> { - let mut split_stream = self.open_stream(name, verity)?; + let mut split_stream = self.open_stream(name, verity, expected_content_type)?; split_stream.cat(stream, |id| -> Result> { let mut data = vec![]; File::from(self.open_object(id)?).read_to_end(&mut data)?; @@ -659,7 +672,7 @@ impl Repository { println!("{object:?} lives as a stream"); objects.insert(object.clone()); - let mut split_stream = self.open_stream(&object.to_hex(), None)?; + let mut split_stream = self.open_stream(&object.to_hex(), None, None)?; split_stream.get_object_refs(|id| { println!(" with {id:?}"); objects.insert(id.clone()); diff --git a/crates/composefs/src/splitstream.rs b/crates/composefs/src/splitstream.rs index fe50cf4d..bc686d40 100644 --- a/crates/composefs/src/splitstream.rs +++ b/crates/composefs/src/splitstream.rs @@ -4,11 +4,12 @@ */ use std::{ + io, io::{BufReader, Read, Write}, sync::Arc, }; -use anyhow::{bail, Result}; +use anyhow::{bail, Error, Result}; use sha2::{Digest, Sha256}; use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout}; use zstd::stream::{read::Decoder, write::Encoder}; @@ -19,8 +20,33 @@ use crate::{ util::{read_exactish, Sha256Digest}, }; +pub const SPLITSTREAM_MAGIC: [u8; 7] = [b'S', b'p', b'l', b't', b'S', b't', b'r']; + #[derive(Debug, FromBytes, Immutable, IntoBytes, KnownLayout)] #[repr(C)] +pub struct SplitstreamHeader { + pub magic: [u8; 7], // Contains SPLITSTREAM_MAGIC + pub algorithm: u8, + pub content_type: u64, // User can put whatever magic identifier they want there + pub total_size: u64, // total size of inline chunks and external chunks + pub n_refs: u64, + pub n_mappings: u64, + pub extension_size: u64, + // Followed by extension_extension bytes of data, for extensibility + // Followed by n_refs ObjectIDs, sorted + // Followed by n_mappings MappingEntry, sorted by body + // Followed by zstd compressed chunks +} + +#[derive(Clone, Debug, FromBytes, Immutable, IntoBytes, KnownLayout)] +#[repr(C)] +pub struct MappingEntry { + pub body: Sha256Digest, + pub reference_idx: u64, +} + +// These are used during construction before we know the final reference indexes +#[derive(Debug)] pub struct DigestMapEntry { pub body: Sha256Digest, pub verity: ObjectID, @@ -65,8 +91,12 @@ impl DigestMap { pub struct SplitStreamWriter { repo: Arc>, + refs: Vec, + mappings: DigestMap, inline_content: Vec, + total_size: u64, writer: Encoder<'static, Vec>, + pub content_type: u64, pub sha256: Option<(Sha256, Sha256Digest)>, } @@ -84,30 +114,47 @@ impl std::fmt::Debug for SplitStreamWriter SplitStreamWriter { pub fn new( repo: &Arc>, - refs: Option>, + content_type: u64, sha256: Option, ) -> Self { // SAFETY: we surely can't get an error writing the header to a Vec - let mut writer = Encoder::new(vec![], 0).unwrap(); - - match refs { - Some(DigestMap { map }) => { - writer.write_all(&(map.len() as u64).to_le_bytes()).unwrap(); - writer.write_all(map.as_bytes()).unwrap(); - } - None => { - writer.write_all(&0u64.to_le_bytes()).unwrap(); - } - } + let writer = Encoder::new(vec![], 0).unwrap(); Self { repo: Arc::clone(repo), + content_type, inline_content: vec![], + refs: vec![], + total_size: 0, + mappings: DigestMap::new(), writer, sha256: sha256.map(|x| (Sha256::new(), x)), } } + pub fn add_external_reference(&mut self, verity: &ObjectID) { + match self.refs.binary_search(verity) { + Ok(_) => {} // Already added + Err(idx) => self.refs.insert(idx, verity.clone()), + } + } + + // Note: These are only stable if no more references are added + pub fn lookup_external_reference(&self, verity: &ObjectID) -> Option { + self.refs.binary_search(verity).ok() + } + + pub fn add_sha256_mappings(&mut self, maps: DigestMap) { + for m in maps.map { + self.add_sha256_mapping(&m.body, &m.verity); + } + } + + pub fn add_sha256_mapping(&mut self, digest: &Sha256Digest, verity: &ObjectID) { + self.add_external_reference(verity); + self.mappings.insert(digest, verity) + } + fn write_fragment(writer: &mut impl Write, size: usize, data: &[u8]) -> Result<()> { writer.write_all(&(size as u64).to_le_bytes())?; Ok(writer.write_all(data)?) @@ -121,6 +168,7 @@ impl SplitStreamWriter { self.inline_content.len(), &self.inline_content, )?; + self.total_size += self.inline_content.len() as u64; self.inline_content = new_value; } Ok(()) @@ -152,6 +200,9 @@ impl SplitStreamWriter { sha256.update(&padding); } let id = self.repo.ensure_object(data)?; + + self.add_external_reference(&id); + self.total_size += data.len() as u64; self.write_reference(&id, padding) } @@ -160,7 +211,9 @@ impl SplitStreamWriter { sha256.update(&data); sha256.update(&padding); } + self.total_size += data.len() as u64; let id = self.repo.ensure_object_async(data).await?; + self.add_external_reference(&id); self.write_reference(&id, padding) } @@ -173,7 +226,33 @@ impl SplitStreamWriter { } } - self.repo.ensure_object(&self.writer.finish()?) + let mut buf = vec![]; + let header = SplitstreamHeader { + magic: SPLITSTREAM_MAGIC, + algorithm: ObjectID::ALGORITHM, + content_type: self.content_type, + total_size: u64::to_le(self.total_size), + n_refs: u64::to_le(self.refs.len() as u64), + n_mappings: u64::to_le(self.mappings.map.len() as u64), + extension_size: u64::to_le(0u64), + }; + buf.extend_from_slice(header.as_bytes()); + + for ref_id in self.refs.iter() { + buf.extend_from_slice(ref_id.as_bytes()); + } + + for mapping in self.mappings.map { + let entry = MappingEntry { + body: mapping.body, + reference_idx: u64::to_le(self.refs.binary_search(&mapping.verity).unwrap() as u64), + }; + buf.extend_from_slice(entry.as_bytes()); + } + + buf.extend_from_slice(&self.writer.finish()?); + + self.repo.ensure_object(&buf) } } @@ -186,8 +265,11 @@ pub enum SplitStreamData { // utility class to help read splitstreams pub struct SplitStreamReader { decoder: Decoder<'static, BufReader>, - pub refs: DigestMap, inline_bytes: usize, + pub content_type: u64, + pub total_size: u64, + pub refs: Vec, + mappings: Vec, } impl std::fmt::Debug for SplitStreamReader { @@ -226,29 +308,84 @@ enum ChunkType { } impl SplitStreamReader { - pub fn new(reader: R) -> Result { - let mut decoder = Decoder::new(reader)?; + pub fn new(mut reader: R, expected_content_type: Option) -> Result { + let header = SplitstreamHeader::read_from_io(&mut reader) + .map_err(|e| Error::msg(format!("Error reading splitstream header: {:?}", e)))?; - let n_map_entries = { - let mut buf = [0u8; 8]; - decoder.read_exact(&mut buf)?; - u64::from_le_bytes(buf) - } as usize; + if header.magic != SPLITSTREAM_MAGIC { + bail!("Invalid splitstream header magic value"); + } - let mut refs = DigestMap:: { - map: Vec::with_capacity(n_map_entries), - }; - for _ in 0..n_map_entries { - refs.map.push(DigestMapEntry::read_from_io(&mut decoder)?); + if header.algorithm != ObjectID::ALGORITHM { + bail!("Invalid splitstream algorithm type"); } + let content_type = u64::from_le(header.content_type); + if let Some(expected) = expected_content_type { + if content_type != expected { + bail!("Invalid splitstream content type"); + } + } + + let total_size = u64::from_le(header.total_size); + let n_refs = usize::try_from(u64::from_le(header.n_refs))?; + let n_mappings = usize::try_from(u64::from_le(header.n_mappings))?; + let extension_size = u64::from_le(header.extension_size); + + if extension_size > 0 { + // Skip header_extension we don't know to handle + if io::copy(&mut reader.by_ref().take(extension_size), &mut io::sink())? + != extension_size + { + bail!("Error reading splitstream header, not enough header extension bytes"); + } + } + + let mut refs = Vec::::new(); + for _ in 0..n_refs { + let objid = ObjectID::read_from_io(&mut reader) + .map_err(|e| Error::msg(format!("Invalid refs array {:?}", e)))?; + refs.push(objid.clone()); + } + + let mut mappings = Vec::::new(); + for _ in 0..n_mappings { + let mut m = MappingEntry::read_from_io(&mut reader) + .map_err(|e| Error::msg(format!("Invalid mappings array {:?}", e)))?; + m.reference_idx = u64::from_le(m.reference_idx); + if m.reference_idx >= n_refs as u64 { + bail!("Invalid mapping reference") + } + mappings.push(m.clone()); + } + + let decoder = Decoder::new(reader)?; + Ok(Self { decoder, - refs, inline_bytes: 0, + content_type, + total_size, + refs, + mappings, }) } + pub fn iter_mappings(&self) -> impl Iterator { + self.mappings + .iter() + .map(|m| (&m.body, &self.refs[m.reference_idx as usize])) + } + + pub fn get_mappings(&self) -> DigestMap { + let mut m = DigestMap::new(); + + for (body, verity) in self.iter_mappings() { + m.insert(body, verity); + } + m + } + fn ensure_chunk( &mut self, eof_ok: bool, @@ -347,36 +484,22 @@ impl SplitStreamReader { } pub fn get_object_refs(&mut self, mut callback: impl FnMut(&ObjectID)) -> Result<()> { - let mut buffer = vec![]; - - for entry in &self.refs.map { - callback(&entry.verity); - } - - loop { - match self.ensure_chunk(true, true, 0)? { - ChunkType::Eof => break Ok(()), - ChunkType::Inline => { - read_into_vec(&mut self.decoder, &mut buffer, self.inline_bytes)?; - self.inline_bytes = 0; - } - ChunkType::External(ref id) => { - callback(id); - } - } + for entry in &self.refs { + callback(entry); } + Ok(()) } pub fn get_stream_refs(&mut self, mut callback: impl FnMut(&Sha256Digest)) { - for entry in &self.refs.map { + for entry in &self.mappings { callback(&entry.body); } } pub fn lookup(&self, body: &Sha256Digest) -> Result<&ObjectID> { - match self.refs.lookup(body) { - Some(id) => Ok(id), - None => bail!("Reference is not found in splitstream"), + match self.mappings.binary_search_by_key(body, |e| e.body) { + Ok(idx) => Ok(&self.refs[self.mappings[idx].reference_idx as usize]), + Err(..) => bail!("Reference is not found in splitstream"), } } } diff --git a/doc/splitstream.md b/doc/splitstream.md index 62df66e5..2bcb2230 100644 --- a/doc/splitstream.md +++ b/doc/splitstream.md @@ -22,39 +22,50 @@ extremely well. ## File format -The file format consists of a header, plus a number of data blocks. +The file format consists of a header, followed by a set of data blocks. -### Mappings +### Header -The file starts with a single u64 le integer which is the number of mapping -structures present in the file. A mapping is a relationship between a file -identified by its sha256 content hash and the fsverity hash of that same file. -These entries are encoded simply as the sha256 hash value (32 bytes) plus the -fsverity hash value (32 bytes) combined together into a single 64 byte record. - -For example, if we had a file that mapped `1234..` to `abcd..` and `5678..` to -`efab..`, the header would look like: +The header format looks like this, where all fields are little endian: ``` - 64bit 32 bytes 32 bytes + 32 bytes + 32 bytes - +--------+----------+----------+----------+---------+ - | 2 | 1234 | abcd | 5678 | efab | - +--------+----------+----------+----------+---------+ +pub const SPLITSTREAM_MAGIC : [u8; 7] = [b'S', b'p', b'l', b't', b'S', b't', b'r']; + +struct MappingEntry { + pub body: Sha256Digest, + pub reference_idx: u64, // index into references table +} + +struct SplitstreamHeader { + magic: [u8; 7], // Contains SPLITSTREAM_MAGIC + algorithm: u8, // The fs-verity algorithm used, 1 == sha256, 2 == sha512 + total_size: u64, // total size of inline chunks and external chunks + n_refs: u64, + n_mappings: u64, + refs: [ObjectID; n_refs] // sorted + mappings: [MappingEntry; n_mappings] // sorted by body +} ``` -The mappings in the header are always sorted by their sha256 content hash -values. - -The mappings serve two purposes: - - - in the case of splitstreams which refer to other splitstreams without - directly embedding the content of the other stream, this provides a - mechanism to find out which other streams are referenced. This is used for - garbage collection. - - - for the same usecase, it provides a mechanism to be able to verify the - content of the referred splitstream (by checking its fsverity digest) before - starting to iterate it +The table of references are used to allow splitstreams to refer to +other splitstreams or regular file content, either because it is +included in the stream, or just indirectly referenced. This is primarily +used to keep these objects alive during garbage collection. + +Examples of references are: + * OCI manifests reference splitstreams for tar layer split streams. + * External objects embedded in a splitstream, such as a tar layer + splitstream + * External objects indirectly references in a splitstream, such as + references from an ostree commit splitstream + +The mapping table provides a mechanismn to map the sha256 digest of a +split stream to a fs-verity digest. This allows checking of the target +fs-verity digest before use. The primary example here is OCI manifests +which reference the tar layer splitstreams. We could look up such +streams by the sha256 in the streams/ directory, but then we will not +have trusted information about what expected fs-verity the layers +would have. ### Data blocks @@ -81,6 +92,8 @@ There are two kinds of blocks: - "External" blocks (`size == 0`): in this case the length of the data is 32 bytes. This is the binary form of a sha256 hash value and is a reference to an object in the composefs repository (by its fs-verity digest). + Note that these references are *also* in the header, so there is no need + to read the entire file to find what objects are referenced. That's it, really. There's no header. The stream is over when there are no more blocks.