Skip to content

Commit 0f6d69e

Browse files
committed
splitstream: Rework file format
This changes the splitstream format a bit. The primary differences are: * The header is not compressed * All referenced fs-verity objects are stored in the header, including external chunks, mapped splitstreams and (a new feature) references that are not used in chunks. * The mapping table is separate from the reference table (and generally smaller), and indexes into it. * There is a magic value to detect the file format. * There is a magic content type to detect the type wrapped in the stream. * We store a tag for what ObjectID format is used * The total size of the stream is stored in the header. The ability to reference file objects in the repo even if they are not part of the splitstream "content" will be useful for the ostree support to reference file content objects. This change also allows More efficient GC enumeration, because we don't have to parse the entire splitstream to find the referenced objects. Signed-off-by: Alexander Larsson <[email protected]>
1 parent c932c95 commit 0f6d69e

File tree

9 files changed

+283
-112
lines changed

9 files changed

+283
-112
lines changed

crates/cfsctl/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ async fn main() -> Result<()> {
188188
}
189189
}
190190
Command::Cat { name } => {
191-
repo.merge_splitstream(&name, None, &mut std::io::stdout())?;
191+
repo.merge_splitstream(&name, None, None, &mut std::io::stdout())?;
192192
}
193193
Command::ImportImage { reference } => {
194194
let image_id = repo.import_image(&reference, &mut std::io::stdin())?;

crates/composefs-http/src/lib.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@ use sha2::{Digest, Sha256};
1313
use tokio::task::JoinSet;
1414

1515
use composefs::{
16-
fsverity::FsVerityHashValue,
17-
repository::Repository,
18-
splitstream::{DigestMapEntry, SplitStreamReader},
16+
fsverity::FsVerityHashValue, repository::Repository, splitstream::SplitStreamReader,
1917
util::Sha256Digest,
2018
};
2119

@@ -61,7 +59,7 @@ impl<ObjectID: FsVerityHashValue> Downloader<ObjectID> {
6159
}
6260

6361
fn open_splitstream(&self, id: &ObjectID) -> Result<SplitStreamReader<File, ObjectID>> {
64-
SplitStreamReader::new(File::from(self.repo.open_object(id)?))
62+
SplitStreamReader::new(File::from(self.repo.open_object(id)?), None)
6563
}
6664

6765
fn read_object(&self, id: &ObjectID) -> Result<Vec<u8>> {
@@ -107,7 +105,7 @@ impl<ObjectID: FsVerityHashValue> Downloader<ObjectID> {
107105

108106
// this part is fast: it only touches the header
109107
let mut reader = self.open_splitstream(&id)?;
110-
for DigestMapEntry { verity, body } in &reader.refs.map {
108+
for (body, verity) in reader.iter_mappings() {
111109
match splitstreams.insert(verity.clone(), Some(*body)) {
112110
// This is the (normal) case if we encounter a splitstream we didn't see yet...
113111
None => {

crates/composefs-oci/src/image.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use composefs::{
99
tree::{Directory, FileSystem, Inode, Leaf},
1010
};
1111

12+
use crate::skopeo::{OCI_CONFIG_CONTENT_TYPE, TAR_LAYER_CONTENT_TYPE};
1213
use crate::tar::{TarEntry, TarItem};
1314

1415
pub fn process_entry<ObjectID: FsVerityHashValue>(
@@ -74,14 +75,19 @@ pub fn create_filesystem<ObjectID: FsVerityHashValue>(
7475
) -> Result<FileSystem<ObjectID>> {
7576
let mut filesystem = FileSystem::default();
7677

77-
let mut config_stream = repo.open_stream(config_name, config_verity)?;
78+
let mut config_stream =
79+
repo.open_stream(config_name, config_verity, Some(OCI_CONFIG_CONTENT_TYPE))?;
7880
let config = ImageConfiguration::from_reader(&mut config_stream)?;
7981

8082
for diff_id in config.rootfs().diff_ids() {
8183
let layer_sha256 = super::sha256_from_digest(diff_id)?;
8284
let layer_verity = config_stream.lookup(&layer_sha256)?;
8385

84-
let mut layer_stream = repo.open_stream(&hex::encode(layer_sha256), Some(layer_verity))?;
86+
let mut layer_stream = repo.open_stream(
87+
&hex::encode(layer_sha256),
88+
Some(layer_verity),
89+
Some(TAR_LAYER_CONTENT_TYPE),
90+
)?;
8591
while let Some(entry) = crate::tar::get_entry(&mut layer_stream)? {
8692
process_entry(&mut filesystem, entry)?;
8793
}

crates/composefs-oci/src/lib.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use composefs::{
1515
util::{parse_sha256, Sha256Digest},
1616
};
1717

18+
use crate::skopeo::{OCI_CONFIG_CONTENT_TYPE, TAR_LAYER_CONTENT_TYPE};
1819
use crate::tar::get_entry;
1920

2021
type ContentAndVerity<ObjectID> = (Sha256Digest, ObjectID);
@@ -39,14 +40,19 @@ pub fn import_layer<ObjectID: FsVerityHashValue>(
3940
name: Option<&str>,
4041
tar_stream: &mut impl Read,
4142
) -> Result<ObjectID> {
42-
repo.ensure_stream(sha256, |writer| tar::split(tar_stream, writer), name)
43+
repo.ensure_stream(
44+
sha256,
45+
TAR_LAYER_CONTENT_TYPE,
46+
|writer| tar::split(tar_stream, writer),
47+
name,
48+
)
4349
}
4450

4551
pub fn ls_layer<ObjectID: FsVerityHashValue>(
4652
repo: &Repository<ObjectID>,
4753
name: &str,
4854
) -> Result<()> {
49-
let mut split_stream = repo.open_stream(name, None)?;
55+
let mut split_stream = repo.open_stream(name, None, Some(TAR_LAYER_CONTENT_TYPE))?;
5056

5157
while let Some(entry) = get_entry(&mut split_stream)? {
5258
println!("{entry}");
@@ -81,9 +87,9 @@ pub fn open_config<ObjectID: FsVerityHashValue>(
8187
.with_context(|| format!("Object {name} is unknown to us"))?
8288
}
8389
};
84-
let mut stream = repo.open_stream(name, Some(id))?;
90+
let mut stream = repo.open_stream(name, Some(id), Some(OCI_CONFIG_CONTENT_TYPE))?;
8591
let config = ImageConfiguration::from_reader(&mut stream)?;
86-
Ok((config, stream.refs))
92+
Ok((config, stream.get_mappings()))
8793
}
8894

8995
fn hash(bytes: &[u8]) -> Sha256Digest {
@@ -104,7 +110,7 @@ pub fn open_config_shallow<ObjectID: FsVerityHashValue>(
104110
// we need to manually check the content digest
105111
let expected_hash = parse_sha256(name)
106112
.context("Containers must be referred to by sha256 if verity is missing")?;
107-
let mut stream = repo.open_stream(name, None)?;
113+
let mut stream = repo.open_stream(name, None, Some(OCI_CONFIG_CONTENT_TYPE))?;
108114
let mut raw_config = vec![];
109115
stream.read_to_end(&mut raw_config)?;
110116
ensure!(hash(&raw_config) == expected_hash, "Data integrity issue");
@@ -121,7 +127,8 @@ pub fn write_config<ObjectID: FsVerityHashValue>(
121127
let json = config.to_string()?;
122128
let json_bytes = json.as_bytes();
123129
let sha256 = hash(json_bytes);
124-
let mut stream = repo.create_stream(Some(sha256), Some(refs));
130+
let mut stream = repo.create_stream(OCI_CONFIG_CONTENT_TYPE, Some(sha256));
131+
stream.add_sha256_mappings(refs);
125132
stream.write_inline(json_bytes);
126133
let id = repo.write_stream(stream, None)?;
127134
Ok((sha256, id))
@@ -199,7 +206,7 @@ mod test {
199206
let id = import_layer(&repo, &layer_id, Some("name"), &mut layer.as_slice()).unwrap();
200207

201208
let mut dump = String::new();
202-
let mut split_stream = repo.open_stream("refs/name", Some(&id)).unwrap();
209+
let mut split_stream = repo.open_stream("refs/name", Some(&id), None).unwrap();
203210
while let Some(entry) = tar::get_entry(&mut split_stream).unwrap() {
204211
writeln!(dump, "{entry}").unwrap();
205212
}

crates/composefs-oci/src/skopeo.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@ use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType};
1010
use rustix::process::geteuid;
1111
use tokio::{io::AsyncReadExt, sync::Semaphore};
1212

13-
use composefs::{
14-
fsverity::FsVerityHashValue, repository::Repository, splitstream::DigestMap, util::Sha256Digest,
15-
};
13+
use composefs::{fsverity::FsVerityHashValue, repository::Repository, util::Sha256Digest};
1614

1715
use crate::{sha256_from_descriptor, sha256_from_digest, tar::split_async, ContentAndVerity};
1816

17+
pub const TAR_LAYER_CONTENT_TYPE: u64 = 0x2a037edfcae1ffea;
18+
pub const OCI_CONFIG_CONTENT_TYPE: u64 = 0x44218c839727a80b;
19+
1920
struct ImageOp<ObjectID: FsVerityHashValue> {
2021
repo: Arc<Repository<ObjectID>>,
2122
proxy: ImageProxy,
@@ -78,7 +79,9 @@ impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
7879
self.progress
7980
.println(format!("Fetching layer {}", hex::encode(layer_sha256)))?;
8081

81-
let mut splitstream = self.repo.create_stream(Some(layer_sha256), None);
82+
let mut splitstream = self
83+
.repo
84+
.create_stream(TAR_LAYER_CONTENT_TYPE, Some(layer_sha256));
8285
match descriptor.media_type() {
8386
MediaType::ImageLayer => {
8487
split_async(progress, &mut splitstream).await?;
@@ -155,15 +158,15 @@ impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
155158
entries.push((layer_sha256, future));
156159
}
157160

161+
let mut splitstream = self
162+
.repo
163+
.create_stream(OCI_CONFIG_CONTENT_TYPE, Some(config_sha256));
164+
158165
// Collect the results.
159-
let mut config_maps = DigestMap::new();
160166
for (layer_sha256, future) in entries {
161-
config_maps.insert(&layer_sha256, &future.await??);
167+
splitstream.add_sha256_mapping(&layer_sha256, &future.await??);
162168
}
163169

164-
let mut splitstream = self
165-
.repo
166-
.create_stream(Some(config_sha256), Some(config_maps));
167170
splitstream.write_inline(&raw_config);
168171
let config_id = self.repo.write_stream(splitstream, None)?;
169172

crates/composefs/src/fsverity/hashvalue.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use core::{fmt, hash::Hash};
22

33
use hex::FromHexError;
44
use sha2::{digest::FixedOutputReset, digest::Output, Digest, Sha256, Sha512};
5+
use std::cmp::Ord;
56
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, Unaligned};
67

78
pub trait FsVerityHashValue
@@ -12,6 +13,7 @@ where
1213
Self: Hash + Eq,
1314
Self: fmt::Debug,
1415
Self: Send + Sync + Unpin + 'static,
16+
Self: PartialOrd + Ord,
1517
{
1618
type Digest: Digest + FixedOutputReset + fmt::Debug;
1719
const ALGORITHM: u8;
@@ -93,7 +95,19 @@ impl fmt::Debug for Sha512HashValue {
9395
}
9496
}
9597

96-
#[derive(Clone, Eq, FromBytes, Hash, Immutable, IntoBytes, KnownLayout, PartialEq, Unaligned)]
98+
#[derive(
99+
Clone,
100+
Eq,
101+
FromBytes,
102+
Hash,
103+
Immutable,
104+
IntoBytes,
105+
KnownLayout,
106+
PartialEq,
107+
Unaligned,
108+
PartialOrd,
109+
Ord,
110+
)]
97111
#[repr(C)]
98112
pub struct Sha256HashValue([u8; 32]);
99113

@@ -110,7 +124,19 @@ impl FsVerityHashValue for Sha256HashValue {
110124
const ID: &str = "sha256";
111125
}
112126

113-
#[derive(Clone, Eq, FromBytes, Hash, Immutable, IntoBytes, KnownLayout, PartialEq, Unaligned)]
127+
#[derive(
128+
Clone,
129+
Eq,
130+
FromBytes,
131+
Hash,
132+
Immutable,
133+
IntoBytes,
134+
KnownLayout,
135+
PartialEq,
136+
Unaligned,
137+
PartialOrd,
138+
Ord,
139+
)]
114140
#[repr(C)]
115141
pub struct Sha512HashValue([u8; 64]);
116142

crates/composefs/src/repository.rs

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use crate::{
2525
EnableVerityError, FsVerityHashValue, MeasureVerityError,
2626
},
2727
mount::{composefs_fsmount, mount_at},
28-
splitstream::{DigestMap, SplitStreamReader, SplitStreamWriter},
28+
splitstream::{SplitStreamReader, SplitStreamWriter},
2929
util::{proc_self_fd, replace_symlinkat, ErrnoFilter, Sha256Digest},
3030
};
3131

@@ -223,10 +223,10 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
223223
/// store the result.
224224
pub fn create_stream(
225225
self: &Arc<Self>,
226+
content_type: u64,
226227
sha256: Option<Sha256Digest>,
227-
maps: Option<DigestMap<ObjectID>>,
228228
) -> SplitStreamWriter<ObjectID> {
229-
SplitStreamWriter::new(self, maps, sha256)
229+
SplitStreamWriter::new(self, content_type, sha256)
230230
}
231231

232232
fn format_object_path(id: &ObjectID) -> String {
@@ -272,11 +272,11 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
272272
Err(other) => Err(other)?,
273273
};
274274
let mut context = Sha256::new();
275-
let mut split_stream = SplitStreamReader::new(File::from(stream))?;
275+
let mut split_stream = SplitStreamReader::new(File::from(stream), None)?;
276276

277277
// check the verity of all linked streams
278-
for entry in &split_stream.refs.map {
279-
if self.check_stream(&entry.body)?.as_ref() != Some(&entry.verity) {
278+
for (body, verity) in split_stream.iter_mappings() {
279+
if self.check_stream(body)?.as_ref() != Some(verity) {
280280
bail!("reference mismatch");
281281
}
282282
}
@@ -319,6 +319,12 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
319319
Ok(object_id)
320320
}
321321

322+
pub fn has_named_stream(&self, name: &str) -> bool {
323+
let stream_path = format!("streams/refs/{}", name);
324+
325+
readlinkat(&self.repository, &stream_path, []).is_ok()
326+
}
327+
322328
/// Assign the given name to a stream. The stream must already exist. After this operation it
323329
/// will be possible to refer to the stream by its new name 'refs/{name}'.
324330
pub fn name_stream(&self, sha256: Sha256Digest, name: &str) -> Result<()> {
@@ -345,6 +351,7 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
345351
pub fn ensure_stream(
346352
self: &Arc<Self>,
347353
sha256: &Sha256Digest,
354+
content_type: u64,
348355
callback: impl FnOnce(&mut SplitStreamWriter<ObjectID>) -> Result<()>,
349356
reference: Option<&str>,
350357
) -> Result<ObjectID> {
@@ -353,7 +360,7 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
353360
let object_id = match self.has_stream(sha256)? {
354361
Some(id) => id,
355362
None => {
356-
let mut writer = self.create_stream(Some(*sha256), None);
363+
let mut writer = self.create_stream(content_type, Some(*sha256));
357364
callback(&mut writer)?;
358365
let object_id = writer.done()?;
359366

@@ -375,6 +382,7 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
375382
&self,
376383
name: &str,
377384
verity: Option<&ObjectID>,
385+
expected_content_type: Option<u64>,
378386
) -> Result<SplitStreamReader<File, ObjectID>> {
379387
let filename = format!("streams/{name}");
380388

@@ -386,7 +394,7 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
386394
.with_context(|| format!("Opening ref 'streams/{name}'"))?
387395
});
388396

389-
SplitStreamReader::new(file)
397+
SplitStreamReader::new(file, expected_content_type)
390398
}
391399

392400
pub fn open_object(&self, id: &ObjectID) -> Result<OwnedFd> {
@@ -397,9 +405,10 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
397405
&self,
398406
name: &str,
399407
verity: Option<&ObjectID>,
408+
expected_content_type: Option<u64>,
400409
stream: &mut impl Write,
401410
) -> Result<()> {
402-
let mut split_stream = self.open_stream(name, verity)?;
411+
let mut split_stream = self.open_stream(name, verity, expected_content_type)?;
403412
split_stream.cat(stream, |id| -> Result<Vec<u8>> {
404413
let mut data = vec![];
405414
File::from(self.open_object(id)?).read_to_end(&mut data)?;
@@ -618,7 +627,7 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
618627
println!("{object:?} lives as a stream");
619628
objects.insert(object.clone());
620629

621-
let mut split_stream = self.open_stream(&object.to_hex(), None)?;
630+
let mut split_stream = self.open_stream(&object.to_hex(), None, None)?;
622631
split_stream.get_object_refs(|id| {
623632
println!(" with {id:?}");
624633
objects.insert(id.clone());

0 commit comments

Comments
 (0)