Skip to content

Commit 3803dc3

Browse files
committed
splitstream: Rework file format
This changes the splitstream format a bit, with the goal of allowing splitstreams to support ostree files as well (see #144) The primary differences are: * The header is not compressed * All referenced fs-verity objects are stored in the header, including external chunks, mapped splitstreams and (a new feature) references that are not used in chunks. * The mapping table is separate from the reference table (and generally smaller), and indexes into it. * There is a magic value to detect the file format. * There is a magic content type to detect the type wrapped in the stream. * We store a tag for what ObjectID format is used * The total size of the stream is stored in the header. The ability to reference file objects in the repo even if they are not part of the splitstream "content" will be useful for the ostree support to reference file content objects. This change also allows more efficient GC enumeration, because we don't have to parse the entire splitstream to find the referenced objects. Signed-off-by: Alexander Larsson <[email protected]>
1 parent 81a8e01 commit 3803dc3

File tree

9 files changed

+283
-112
lines changed

9 files changed

+283
-112
lines changed

crates/cfsctl/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ async fn main() -> Result<()> {
188188
}
189189
}
190190
Command::Cat { name } => {
191-
repo.merge_splitstream(&name, None, &mut std::io::stdout())?;
191+
repo.merge_splitstream(&name, None, None, &mut std::io::stdout())?;
192192
}
193193
Command::ImportImage { reference } => {
194194
let image_id = repo.import_image(&reference, &mut std::io::stdin())?;

crates/composefs-http/src/lib.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@ use sha2::{Digest, Sha256};
1313
use tokio::task::JoinSet;
1414

1515
use composefs::{
16-
fsverity::FsVerityHashValue,
17-
repository::Repository,
18-
splitstream::{DigestMapEntry, SplitStreamReader},
16+
fsverity::FsVerityHashValue, repository::Repository, splitstream::SplitStreamReader,
1917
util::Sha256Digest,
2018
};
2119

@@ -61,7 +59,7 @@ impl<ObjectID: FsVerityHashValue> Downloader<ObjectID> {
6159
}
6260

6361
fn open_splitstream(&self, id: &ObjectID) -> Result<SplitStreamReader<File, ObjectID>> {
64-
SplitStreamReader::new(File::from(self.repo.open_object(id)?))
62+
SplitStreamReader::new(File::from(self.repo.open_object(id)?), None)
6563
}
6664

6765
fn read_object(&self, id: &ObjectID) -> Result<Vec<u8>> {
@@ -107,7 +105,7 @@ impl<ObjectID: FsVerityHashValue> Downloader<ObjectID> {
107105

108106
// this part is fast: it only touches the header
109107
let mut reader = self.open_splitstream(&id)?;
110-
for DigestMapEntry { verity, body } in &reader.refs.map {
108+
for (body, verity) in reader.iter_mappings() {
111109
match splitstreams.insert(verity.clone(), Some(*body)) {
112110
// This is the (normal) case if we encounter a splitstream we didn't see yet...
113111
None => {

crates/composefs-oci/src/image.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use composefs::{
99
tree::{Directory, FileSystem, Inode, Leaf},
1010
};
1111

12+
use crate::skopeo::{OCI_CONFIG_CONTENT_TYPE, TAR_LAYER_CONTENT_TYPE};
1213
use crate::tar::{TarEntry, TarItem};
1314

1415
pub fn process_entry<ObjectID: FsVerityHashValue>(
@@ -74,14 +75,19 @@ pub fn create_filesystem<ObjectID: FsVerityHashValue>(
7475
) -> Result<FileSystem<ObjectID>> {
7576
let mut filesystem = FileSystem::default();
7677

77-
let mut config_stream = repo.open_stream(config_name, config_verity)?;
78+
let mut config_stream =
79+
repo.open_stream(config_name, config_verity, Some(OCI_CONFIG_CONTENT_TYPE))?;
7880
let config = ImageConfiguration::from_reader(&mut config_stream)?;
7981

8082
for diff_id in config.rootfs().diff_ids() {
8183
let layer_sha256 = super::sha256_from_digest(diff_id)?;
8284
let layer_verity = config_stream.lookup(&layer_sha256)?;
8385

84-
let mut layer_stream = repo.open_stream(&hex::encode(layer_sha256), Some(layer_verity))?;
86+
let mut layer_stream = repo.open_stream(
87+
&hex::encode(layer_sha256),
88+
Some(layer_verity),
89+
Some(TAR_LAYER_CONTENT_TYPE),
90+
)?;
8591
while let Some(entry) = crate::tar::get_entry(&mut layer_stream)? {
8692
process_entry(&mut filesystem, entry)?;
8793
}

crates/composefs-oci/src/lib.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use composefs::{
1616
util::{parse_sha256, Sha256Digest},
1717
};
1818

19+
use crate::skopeo::{OCI_CONFIG_CONTENT_TYPE, TAR_LAYER_CONTENT_TYPE};
1920
use crate::tar::get_entry;
2021

2122
type ContentAndVerity<ObjectID> = (Sha256Digest, ObjectID);
@@ -40,14 +41,19 @@ pub fn import_layer<ObjectID: FsVerityHashValue>(
4041
name: Option<&str>,
4142
tar_stream: &mut impl Read,
4243
) -> Result<ObjectID> {
43-
repo.ensure_stream(sha256, |writer| tar::split(tar_stream, writer), name)
44+
repo.ensure_stream(
45+
sha256,
46+
TAR_LAYER_CONTENT_TYPE,
47+
|writer| tar::split(tar_stream, writer),
48+
name,
49+
)
4450
}
4551

4652
pub fn ls_layer<ObjectID: FsVerityHashValue>(
4753
repo: &Repository<ObjectID>,
4854
name: &str,
4955
) -> Result<()> {
50-
let mut split_stream = repo.open_stream(name, None)?;
56+
let mut split_stream = repo.open_stream(name, None, Some(TAR_LAYER_CONTENT_TYPE))?;
5157

5258
while let Some(entry) = get_entry(&mut split_stream)? {
5359
println!("{entry}");
@@ -83,9 +89,9 @@ pub fn open_config<ObjectID: FsVerityHashValue>(
8389
.with_context(|| format!("Object {name} is unknown to us"))?
8490
}
8591
};
86-
let mut stream = repo.open_stream(name, Some(id))?;
92+
let mut stream = repo.open_stream(name, Some(id), Some(OCI_CONFIG_CONTENT_TYPE))?;
8793
let config = ImageConfiguration::from_reader(&mut stream)?;
88-
Ok((config, stream.refs))
94+
Ok((config, stream.get_mappings()))
8995
}
9096

9197
fn hash(bytes: &[u8]) -> Sha256Digest {
@@ -106,7 +112,7 @@ pub fn open_config_shallow<ObjectID: FsVerityHashValue>(
106112
// we need to manually check the content digest
107113
let expected_hash = parse_sha256(name)
108114
.context("Containers must be referred to by sha256 if verity is missing")?;
109-
let mut stream = repo.open_stream(name, None)?;
115+
let mut stream = repo.open_stream(name, None, Some(OCI_CONFIG_CONTENT_TYPE))?;
110116
let mut raw_config = vec![];
111117
stream.read_to_end(&mut raw_config)?;
112118
ensure!(hash(&raw_config) == expected_hash, "Data integrity issue");
@@ -123,7 +129,8 @@ pub fn write_config<ObjectID: FsVerityHashValue>(
123129
let json = config.to_string()?;
124130
let json_bytes = json.as_bytes();
125131
let sha256 = hash(json_bytes);
126-
let mut stream = repo.create_stream(Some(sha256), Some(refs));
132+
let mut stream = repo.create_stream(OCI_CONFIG_CONTENT_TYPE, Some(sha256));
133+
stream.add_sha256_mappings(refs);
127134
stream.write_inline(json_bytes);
128135
let id = repo.write_stream(stream, None)?;
129136
Ok((sha256, id))
@@ -201,7 +208,7 @@ mod test {
201208
let id = import_layer(&repo, &layer_id, Some("name"), &mut layer.as_slice()).unwrap();
202209

203210
let mut dump = String::new();
204-
let mut split_stream = repo.open_stream("refs/name", Some(&id)).unwrap();
211+
let mut split_stream = repo.open_stream("refs/name", Some(&id), None).unwrap();
205212
while let Some(entry) = tar::get_entry(&mut split_stream).unwrap() {
206213
writeln!(dump, "{entry}").unwrap();
207214
}

crates/composefs-oci/src/skopeo.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@ use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType};
1010
use rustix::process::geteuid;
1111
use tokio::{io::AsyncReadExt, sync::Semaphore};
1212

13-
use composefs::{
14-
fsverity::FsVerityHashValue, repository::Repository, splitstream::DigestMap, util::Sha256Digest,
15-
};
13+
use composefs::{fsverity::FsVerityHashValue, repository::Repository, util::Sha256Digest};
1614

1715
use crate::{sha256_from_descriptor, sha256_from_digest, tar::split_async, ContentAndVerity};
1816

17+
pub const TAR_LAYER_CONTENT_TYPE: u64 = 0x2a037edfcae1ffea;
18+
pub const OCI_CONFIG_CONTENT_TYPE: u64 = 0x44218c839727a80b;
19+
1920
struct ImageOp<ObjectID: FsVerityHashValue> {
2021
repo: Arc<Repository<ObjectID>>,
2122
proxy: ImageProxy,
@@ -95,7 +96,9 @@ impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
9596
self.progress
9697
.println(format!("Fetching layer {}", hex::encode(layer_sha256)))?;
9798

98-
let mut splitstream = self.repo.create_stream(Some(layer_sha256), None);
99+
let mut splitstream = self
100+
.repo
101+
.create_stream(TAR_LAYER_CONTENT_TYPE, Some(layer_sha256));
99102
match descriptor.media_type() {
100103
MediaType::ImageLayer => {
101104
split_async(progress, &mut splitstream).await?;
@@ -172,15 +175,15 @@ impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
172175
entries.push((layer_sha256, future));
173176
}
174177

178+
let mut splitstream = self
179+
.repo
180+
.create_stream(OCI_CONFIG_CONTENT_TYPE, Some(config_sha256));
181+
175182
// Collect the results.
176-
let mut config_maps = DigestMap::new();
177183
for (layer_sha256, future) in entries {
178-
config_maps.insert(&layer_sha256, &future.await??);
184+
splitstream.add_sha256_mapping(&layer_sha256, &future.await??);
179185
}
180186

181-
let mut splitstream = self
182-
.repo
183-
.create_stream(Some(config_sha256), Some(config_maps));
184187
splitstream.write_inline(&raw_config);
185188
let config_id = self.repo.write_stream(splitstream, None)?;
186189

crates/composefs/src/fsverity/hashvalue.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use core::{fmt, hash::Hash};
22

33
use hex::FromHexError;
44
use sha2::{digest::FixedOutputReset, digest::Output, Digest, Sha256, Sha512};
5+
use std::cmp::Ord;
56
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, Unaligned};
67

78
pub trait FsVerityHashValue
@@ -12,6 +13,7 @@ where
1213
Self: Hash + Eq,
1314
Self: fmt::Debug,
1415
Self: Send + Sync + Unpin + 'static,
16+
Self: PartialOrd + Ord,
1517
{
1618
type Digest: Digest + FixedOutputReset + fmt::Debug;
1719
const ALGORITHM: u8;
@@ -93,7 +95,19 @@ impl fmt::Debug for Sha512HashValue {
9395
}
9496
}
9597

96-
#[derive(Clone, Eq, FromBytes, Hash, Immutable, IntoBytes, KnownLayout, PartialEq, Unaligned)]
98+
#[derive(
99+
Clone,
100+
Eq,
101+
FromBytes,
102+
Hash,
103+
Immutable,
104+
IntoBytes,
105+
KnownLayout,
106+
PartialEq,
107+
Unaligned,
108+
PartialOrd,
109+
Ord,
110+
)]
97111
#[repr(C)]
98112
pub struct Sha256HashValue([u8; 32]);
99113

@@ -110,7 +124,19 @@ impl FsVerityHashValue for Sha256HashValue {
110124
const ID: &str = "sha256";
111125
}
112126

113-
#[derive(Clone, Eq, FromBytes, Hash, Immutable, IntoBytes, KnownLayout, PartialEq, Unaligned)]
127+
#[derive(
128+
Clone,
129+
Eq,
130+
FromBytes,
131+
Hash,
132+
Immutable,
133+
IntoBytes,
134+
KnownLayout,
135+
PartialEq,
136+
Unaligned,
137+
PartialOrd,
138+
Ord,
139+
)]
114140
#[repr(C)]
115141
pub struct Sha512HashValue([u8; 64]);
116142

crates/composefs/src/repository.rs

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use crate::{
2525
CompareVerityError, EnableVerityError, FsVerityHashValue, MeasureVerityError,
2626
},
2727
mount::{composefs_fsmount, mount_at},
28-
splitstream::{DigestMap, SplitStreamReader, SplitStreamWriter},
28+
splitstream::{SplitStreamReader, SplitStreamWriter},
2929
util::{proc_self_fd, replace_symlinkat, ErrnoFilter, Sha256Digest},
3030
};
3131

@@ -235,10 +235,10 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
235235
/// store the result.
236236
pub fn create_stream(
237237
self: &Arc<Self>,
238+
content_type: u64,
238239
sha256: Option<Sha256Digest>,
239-
maps: Option<DigestMap<ObjectID>>,
240240
) -> SplitStreamWriter<ObjectID> {
241-
SplitStreamWriter::new(self, maps, sha256)
241+
SplitStreamWriter::new(self, content_type, sha256)
242242
}
243243

244244
fn format_object_path(id: &ObjectID) -> String {
@@ -286,11 +286,11 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
286286
Err(other) => Err(other)?,
287287
};
288288
let mut context = Sha256::new();
289-
let mut split_stream = SplitStreamReader::new(File::from(stream))?;
289+
let mut split_stream = SplitStreamReader::new(File::from(stream), None)?;
290290

291291
// check the verity of all linked streams
292-
for entry in &split_stream.refs.map {
293-
if self.check_stream(&entry.body)?.as_ref() != Some(&entry.verity) {
292+
for (body, verity) in split_stream.iter_mappings() {
293+
if self.check_stream(body)?.as_ref() != Some(verity) {
294294
bail!("reference mismatch");
295295
}
296296
}
@@ -335,6 +335,12 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
335335
Ok(object_id)
336336
}
337337

338+
pub fn has_named_stream(&self, name: &str) -> bool {
339+
let stream_path = format!("streams/refs/{}", name);
340+
341+
readlinkat(&self.repository, &stream_path, []).is_ok()
342+
}
343+
338344
/// Assign the given name to a stream. The stream must already exist. After this operation it
339345
/// will be possible to refer to the stream by its new name 'refs/{name}'.
340346
pub fn name_stream(&self, sha256: Sha256Digest, name: &str) -> Result<()> {
@@ -361,6 +367,7 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
361367
pub fn ensure_stream(
362368
self: &Arc<Self>,
363369
sha256: &Sha256Digest,
370+
content_type: u64,
364371
callback: impl FnOnce(&mut SplitStreamWriter<ObjectID>) -> Result<()>,
365372
reference: Option<&str>,
366373
) -> Result<ObjectID> {
@@ -369,7 +376,7 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
369376
let object_id = match self.has_stream(sha256)? {
370377
Some(id) => id,
371378
None => {
372-
let mut writer = self.create_stream(Some(*sha256), None);
379+
let mut writer = self.create_stream(content_type, Some(*sha256));
373380
callback(&mut writer)?;
374381
let object_id = writer.done()?;
375382

@@ -392,6 +399,7 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
392399
&self,
393400
name: &str,
394401
verity: Option<&ObjectID>,
402+
expected_content_type: Option<u64>,
395403
) -> Result<SplitStreamReader<File, ObjectID>> {
396404
let filename = format!("streams/{name}");
397405

@@ -403,7 +411,7 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
403411
.with_context(|| format!("Opening ref 'streams/{name}'"))?
404412
});
405413

406-
SplitStreamReader::new(file)
414+
SplitStreamReader::new(file, expected_content_type)
407415
}
408416

409417
/// Given an object identifier (a digest), return a read-only file descriptor
@@ -416,9 +424,10 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
416424
&self,
417425
name: &str,
418426
verity: Option<&ObjectID>,
427+
expected_content_type: Option<u64>,
419428
stream: &mut impl Write,
420429
) -> Result<()> {
421-
let mut split_stream = self.open_stream(name, verity)?;
430+
let mut split_stream = self.open_stream(name, verity, expected_content_type)?;
422431
split_stream.cat(stream, |id| -> Result<Vec<u8>> {
423432
let mut data = vec![];
424433
File::from(self.open_object(id)?).read_to_end(&mut data)?;
@@ -659,7 +668,7 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
659668
println!("{object:?} lives as a stream");
660669
objects.insert(object.clone());
661670

662-
let mut split_stream = self.open_stream(&object.to_hex(), None)?;
671+
let mut split_stream = self.open_stream(&object.to_hex(), None, None)?;
663672
split_stream.get_object_refs(|id| {
664673
println!(" with {id:?}");
665674
objects.insert(id.clone());

0 commit comments

Comments
 (0)