Skip to content

Commit ef7c8be

Browse files
committed
splitstream: Rework file format
This changes the splitstream format a bit. The primary differences are: * The header is not compressed * All referenced fs-verity objects are stored in the header, including external chunks, mapped splitstreams and (a new feature) references that are not used in chunks. * The mapping table is separate from the reference table (and generally smaller), and indexes into it. * There is a magic value to detect the file format. * There is a magic content type to detect the type wrapped in the stream. * We store a tag for what ObjectID format is used * The total size of the stream is stored in the header. The ability to reference file objects in the repo even if they are not part of the splitstream "content" will be useful for the ostree support to reference file content objects. This change also allows More efficient GC enumeration, because we don't have to parse the entire splitstream to find the referenced objects.
1 parent 454449c commit ef7c8be

File tree

9 files changed

+283
-112
lines changed

9 files changed

+283
-112
lines changed

crates/cfsctl/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ async fn main() -> Result<()> {
180180
}
181181
}
182182
Command::Cat { name } => {
183-
repo.merge_splitstream(&name, None, &mut std::io::stdout())?;
183+
repo.merge_splitstream(&name, None, None, &mut std::io::stdout())?;
184184
}
185185
Command::ImportImage { reference } => {
186186
let image_id = repo.import_image(&reference, &mut std::io::stdin())?;

crates/composefs-http/src/lib.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@ use sha2::{Digest, Sha256};
1313
use tokio::task::JoinSet;
1414

1515
use composefs::{
16-
fsverity::FsVerityHashValue,
17-
repository::Repository,
18-
splitstream::{DigestMapEntry, SplitStreamReader},
16+
fsverity::FsVerityHashValue, repository::Repository, splitstream::SplitStreamReader,
1917
util::Sha256Digest,
2018
};
2119

@@ -61,7 +59,7 @@ impl<ObjectID: FsVerityHashValue> Downloader<ObjectID> {
6159
}
6260

6361
fn open_splitstream(&self, id: &ObjectID) -> Result<SplitStreamReader<File, ObjectID>> {
64-
SplitStreamReader::new(File::from(self.repo.open_object(id)?))
62+
SplitStreamReader::new(File::from(self.repo.open_object(id)?), None)
6563
}
6664

6765
fn read_object(&self, id: &ObjectID) -> Result<Vec<u8>> {
@@ -107,7 +105,7 @@ impl<ObjectID: FsVerityHashValue> Downloader<ObjectID> {
107105

108106
// this part is fast: it only touches the header
109107
let mut reader = self.open_splitstream(&id)?;
110-
for DigestMapEntry { verity, body } in &reader.refs.map {
108+
for (body, verity) in reader.iter_mappings() {
111109
match splitstreams.insert(verity.clone(), Some(*body)) {
112110
// This is the (normal) case if we encounter a splitstream we didn't see yet...
113111
None => {

crates/composefs-oci/src/image.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use composefs::{
99
tree::{Directory, FileSystem, Inode, Leaf},
1010
};
1111

12+
use crate::skopeo::{OCI_CONFIG_CONTENT_TYPE, TAR_LAYER_CONTENT_TYPE};
1213
use crate::tar::{TarEntry, TarItem};
1314

1415
pub fn process_entry<ObjectID: FsVerityHashValue>(
@@ -74,14 +75,19 @@ pub fn create_filesystem<ObjectID: FsVerityHashValue>(
7475
) -> Result<FileSystem<ObjectID>> {
7576
let mut filesystem = FileSystem::default();
7677

77-
let mut config_stream = repo.open_stream(config_name, config_verity)?;
78+
let mut config_stream =
79+
repo.open_stream(config_name, config_verity, Some(OCI_CONFIG_CONTENT_TYPE))?;
7880
let config = ImageConfiguration::from_reader(&mut config_stream)?;
7981

8082
for diff_id in config.rootfs().diff_ids() {
8183
let layer_sha256 = super::sha256_from_digest(diff_id)?;
8284
let layer_verity = config_stream.lookup(&layer_sha256)?;
8385

84-
let mut layer_stream = repo.open_stream(&hex::encode(layer_sha256), Some(layer_verity))?;
86+
let mut layer_stream = repo.open_stream(
87+
&hex::encode(layer_sha256),
88+
Some(layer_verity),
89+
Some(TAR_LAYER_CONTENT_TYPE),
90+
)?;
8591
while let Some(entry) = crate::tar::get_entry(&mut layer_stream)? {
8692
process_entry(&mut filesystem, entry)?;
8793
}

crates/composefs-oci/src/lib.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use composefs::{
1515
util::{parse_sha256, Sha256Digest},
1616
};
1717

18+
use crate::skopeo::{OCI_CONFIG_CONTENT_TYPE, TAR_LAYER_CONTENT_TYPE};
1819
use crate::tar::get_entry;
1920

2021
type ContentAndVerity<ObjectID> = (Sha256Digest, ObjectID);
@@ -39,14 +40,19 @@ pub fn import_layer<ObjectID: FsVerityHashValue>(
3940
name: Option<&str>,
4041
tar_stream: &mut impl Read,
4142
) -> Result<ObjectID> {
42-
repo.ensure_stream(sha256, |writer| tar::split(tar_stream, writer), name)
43+
repo.ensure_stream(
44+
sha256,
45+
TAR_LAYER_CONTENT_TYPE,
46+
|writer| tar::split(tar_stream, writer),
47+
name,
48+
)
4349
}
4450

4551
pub fn ls_layer<ObjectID: FsVerityHashValue>(
4652
repo: &Repository<ObjectID>,
4753
name: &str,
4854
) -> Result<()> {
49-
let mut split_stream = repo.open_stream(name, None)?;
55+
let mut split_stream = repo.open_stream(name, None, Some(TAR_LAYER_CONTENT_TYPE))?;
5056

5157
while let Some(entry) = get_entry(&mut split_stream)? {
5258
println!("{entry}");
@@ -81,9 +87,9 @@ pub fn open_config<ObjectID: FsVerityHashValue>(
8187
.with_context(|| format!("Object {name} is unknown to us"))?
8288
}
8389
};
84-
let mut stream = repo.open_stream(name, Some(id))?;
90+
let mut stream = repo.open_stream(name, Some(id), Some(OCI_CONFIG_CONTENT_TYPE))?;
8591
let config = ImageConfiguration::from_reader(&mut stream)?;
86-
Ok((config, stream.refs))
92+
Ok((config, stream.get_mappings()))
8793
}
8894

8995
fn hash(bytes: &[u8]) -> Sha256Digest {
@@ -104,7 +110,7 @@ pub fn open_config_shallow<ObjectID: FsVerityHashValue>(
104110
// we need to manually check the content digest
105111
let expected_hash = parse_sha256(name)
106112
.context("Containers must be referred to by sha256 if verity is missing")?;
107-
let mut stream = repo.open_stream(name, None)?;
113+
let mut stream = repo.open_stream(name, None, Some(OCI_CONFIG_CONTENT_TYPE))?;
108114
let mut raw_config = vec![];
109115
stream.read_to_end(&mut raw_config)?;
110116
ensure!(hash(&raw_config) == expected_hash, "Data integrity issue");
@@ -121,7 +127,8 @@ pub fn write_config<ObjectID: FsVerityHashValue>(
121127
let json = config.to_string()?;
122128
let json_bytes = json.as_bytes();
123129
let sha256 = hash(json_bytes);
124-
let mut stream = repo.create_stream(Some(sha256), Some(refs));
130+
let mut stream = repo.create_stream(OCI_CONFIG_CONTENT_TYPE, Some(sha256));
131+
stream.add_sha256_mappings(refs);
125132
stream.write_inline(json_bytes);
126133
let id = repo.write_stream(stream, None)?;
127134
Ok((sha256, id))
@@ -199,7 +206,7 @@ mod test {
199206
let id = import_layer(&repo, &layer_id, Some("name"), &mut layer.as_slice()).unwrap();
200207

201208
let mut dump = String::new();
202-
let mut split_stream = repo.open_stream("refs/name", Some(&id)).unwrap();
209+
let mut split_stream = repo.open_stream("refs/name", Some(&id), None).unwrap();
203210
while let Some(entry) = tar::get_entry(&mut split_stream).unwrap() {
204211
writeln!(dump, "{entry}").unwrap();
205212
}

crates/composefs-oci/src/skopeo.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@ use oci_spec::image::{Descriptor, ImageConfiguration, ImageManifest, MediaType};
1010
use rustix::process::geteuid;
1111
use tokio::{io::AsyncReadExt, sync::Semaphore};
1212

13-
use composefs::{
14-
fsverity::FsVerityHashValue, repository::Repository, splitstream::DigestMap, util::Sha256Digest,
15-
};
13+
use composefs::{fsverity::FsVerityHashValue, repository::Repository, util::Sha256Digest};
1614

1715
use crate::{sha256_from_descriptor, sha256_from_digest, tar::split_async, ContentAndVerity};
1816

17+
pub const TAR_LAYER_CONTENT_TYPE: u64 = 0x2a037edfcae1ffea;
18+
pub const OCI_CONFIG_CONTENT_TYPE: u64 = 0x44218c839727a80b;
19+
1920
struct ImageOp<ObjectID: FsVerityHashValue> {
2021
repo: Arc<Repository<ObjectID>>,
2122
proxy: ImageProxy,
@@ -78,7 +79,9 @@ impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
7879
self.progress
7980
.println(format!("Fetching layer {}", hex::encode(layer_sha256)))?;
8081

81-
let mut splitstream = self.repo.create_stream(Some(layer_sha256), None);
82+
let mut splitstream = self
83+
.repo
84+
.create_stream(TAR_LAYER_CONTENT_TYPE, Some(layer_sha256));
8285
match descriptor.media_type() {
8386
MediaType::ImageLayer => {
8487
split_async(progress, &mut splitstream).await?;
@@ -155,15 +158,15 @@ impl<ObjectID: FsVerityHashValue> ImageOp<ObjectID> {
155158
entries.push((layer_sha256, future));
156159
}
157160

161+
let mut splitstream = self
162+
.repo
163+
.create_stream(OCI_CONFIG_CONTENT_TYPE, Some(config_sha256));
164+
158165
// Collect the results.
159-
let mut config_maps = DigestMap::new();
160166
for (layer_sha256, future) in entries {
161-
config_maps.insert(&layer_sha256, &future.await??);
167+
splitstream.add_sha256_mapping(&layer_sha256, &future.await??);
162168
}
163169

164-
let mut splitstream = self
165-
.repo
166-
.create_stream(Some(config_sha256), Some(config_maps));
167170
splitstream.write_inline(&raw_config);
168171
let config_id = self.repo.write_stream(splitstream, None)?;
169172

crates/composefs/src/fsverity/hashvalue.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use core::{fmt, hash::Hash};
22

33
use hex::FromHexError;
44
use sha2::{digest::FixedOutputReset, digest::Output, Digest, Sha256, Sha512};
5+
use std::cmp::Ord;
56
use zerocopy::{FromBytes, Immutable, IntoBytes, KnownLayout, Unaligned};
67

78
pub trait FsVerityHashValue
@@ -12,6 +13,7 @@ where
1213
Self: Hash + Eq,
1314
Self: fmt::Debug,
1415
Self: Send + Sync + Unpin + 'static,
16+
Self: PartialOrd + Ord,
1517
{
1618
type Digest: Digest + FixedOutputReset + fmt::Debug;
1719
const ALGORITHM: u8;
@@ -93,7 +95,19 @@ impl fmt::Debug for Sha512HashValue {
9395
}
9496
}
9597

96-
#[derive(Clone, Eq, FromBytes, Hash, Immutable, IntoBytes, KnownLayout, PartialEq, Unaligned)]
98+
#[derive(
99+
Clone,
100+
Eq,
101+
FromBytes,
102+
Hash,
103+
Immutable,
104+
IntoBytes,
105+
KnownLayout,
106+
PartialEq,
107+
Unaligned,
108+
PartialOrd,
109+
Ord,
110+
)]
97111
#[repr(C)]
98112
pub struct Sha256HashValue([u8; 32]);
99113

@@ -110,7 +124,19 @@ impl FsVerityHashValue for Sha256HashValue {
110124
const ID: &str = "sha256";
111125
}
112126

113-
#[derive(Clone, Eq, FromBytes, Hash, Immutable, IntoBytes, KnownLayout, PartialEq, Unaligned)]
127+
#[derive(
128+
Clone,
129+
Eq,
130+
FromBytes,
131+
Hash,
132+
Immutable,
133+
IntoBytes,
134+
KnownLayout,
135+
PartialEq,
136+
Unaligned,
137+
PartialOrd,
138+
Ord,
139+
)]
114140
#[repr(C)]
115141
pub struct Sha512HashValue([u8; 64]);
116142

crates/composefs/src/repository.rs

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::{
2424
compute_verity, enable_verity, ensure_verity_equal, measure_verity, FsVerityHashValue,
2525
},
2626
mount::mount_composefs_at,
27-
splitstream::{DigestMap, SplitStreamReader, SplitStreamWriter},
27+
splitstream::{SplitStreamReader, SplitStreamWriter},
2828
util::{proc_self_fd, replace_symlinkat, ErrnoFilter, Sha256Digest},
2929
};
3030

@@ -184,10 +184,10 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
184184
/// store the result.
185185
pub fn create_stream(
186186
self: &Arc<Self>,
187+
content_type: u64,
187188
sha256: Option<Sha256Digest>,
188-
maps: Option<DigestMap<ObjectID>>,
189189
) -> SplitStreamWriter<ObjectID> {
190-
SplitStreamWriter::new(self, maps, sha256)
190+
SplitStreamWriter::new(self, content_type, sha256)
191191
}
192192

193193
fn format_object_path(id: &ObjectID) -> String {
@@ -224,11 +224,11 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
224224
Ok(stream) => {
225225
let measured_verity: ObjectID = measure_verity(&stream)?;
226226
let mut context = Sha256::new();
227-
let mut split_stream = SplitStreamReader::new(File::from(stream))?;
227+
let mut split_stream = SplitStreamReader::new(File::from(stream), None)?;
228228

229229
// check the verity of all linked streams
230-
for entry in &split_stream.refs.map {
231-
if self.check_stream(&entry.body)?.as_ref() != Some(&entry.verity) {
230+
for (body, verity) in split_stream.iter_mappings() {
231+
if self.check_stream(body)?.as_ref() != Some(verity) {
232232
bail!("reference mismatch");
233233
}
234234
}
@@ -271,6 +271,12 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
271271
Ok(object_id)
272272
}
273273

274+
pub fn has_named_stream(&self, name: &str) -> bool {
275+
let stream_path = format!("streams/refs/{}", name);
276+
277+
readlinkat(&self.repository, &stream_path, []).is_ok()
278+
}
279+
274280
/// Assign the given name to a stream. The stream must already exist. After this operation it
275281
/// will be possible to refer to the stream by its new name 'refs/{name}'.
276282
pub fn name_stream(&self, sha256: Sha256Digest, name: &str) -> Result<()> {
@@ -297,6 +303,7 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
297303
pub fn ensure_stream(
298304
self: &Arc<Self>,
299305
sha256: &Sha256Digest,
306+
content_type: u64,
300307
callback: impl FnOnce(&mut SplitStreamWriter<ObjectID>) -> Result<()>,
301308
reference: Option<&str>,
302309
) -> Result<ObjectID> {
@@ -305,7 +312,7 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
305312
let object_id = match self.has_stream(sha256)? {
306313
Some(id) => id,
307314
None => {
308-
let mut writer = self.create_stream(Some(*sha256), None);
315+
let mut writer = self.create_stream(content_type, Some(*sha256));
309316
callback(&mut writer)?;
310317
let object_id = writer.done()?;
311318

@@ -327,6 +334,7 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
327334
&self,
328335
name: &str,
329336
verity: Option<&ObjectID>,
337+
expected_content_type: Option<u64>,
330338
) -> Result<SplitStreamReader<File, ObjectID>> {
331339
let filename = format!("streams/{name}");
332340

@@ -338,7 +346,7 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
338346
.with_context(|| format!("Opening ref 'streams/{name}'"))?
339347
});
340348

341-
SplitStreamReader::new(file)
349+
SplitStreamReader::new(file, expected_content_type)
342350
}
343351

344352
pub fn open_object(&self, id: &ObjectID) -> Result<OwnedFd> {
@@ -349,9 +357,10 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
349357
&self,
350358
name: &str,
351359
verity: Option<&ObjectID>,
360+
expected_content_type: Option<u64>,
352361
stream: &mut impl Write,
353362
) -> Result<()> {
354-
let mut split_stream = self.open_stream(name, verity)?;
363+
let mut split_stream = self.open_stream(name, verity, expected_content_type)?;
355364
split_stream.cat(stream, |id| -> Result<Vec<u8>> {
356365
let mut data = vec![];
357366
File::from(self.open_object(id)?).read_to_end(&mut data)?;
@@ -551,7 +560,7 @@ impl<ObjectID: FsVerityHashValue> Repository<ObjectID> {
551560
println!("{object:?} lives as a stream");
552561
objects.insert(object.clone());
553562

554-
let mut split_stream = self.open_stream(&object.to_hex(), None)?;
563+
let mut split_stream = self.open_stream(&object.to_hex(), None, None)?;
555564
split_stream.get_object_refs(|id| {
556565
println!(" with {id:?}");
557566
objects.insert(id.clone());

0 commit comments

Comments
 (0)