Skip to content

Commit cf63476

Browse files
Separate out splitstream writer into its own struct
Separating out the writer allows us to abstract the parallelism of writing the splitstream. Signed-off-by: Pragyan Poudyal <[email protected]>
1 parent 6e75038 commit cf63476

File tree

4 files changed

+114
-61
lines changed

4 files changed

+114
-61
lines changed

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ pub mod repository;
1313
pub mod selabel;
1414
pub mod splitstream;
1515
pub mod util;
16+
pub mod zstd_encoder;
1617

1718
/// All files that contain 64 or fewer bytes (size <= INLINE_CONTENT_MAX) should be stored inline
1819
/// in the erofs image (and also in splitstreams). All files with 65 or more bytes (size > MAX)

src/repository.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ impl Repository {
171171
Ok(result)
172172
}
173173

174-
fn format_object_path(id: &Sha256HashValue) -> String {
174+
pub fn format_object_path(id: &Sha256HashValue) -> String {
175175
format!("objects/{:02x}/{}", id[0], hex::encode(&id[1..]))
176176
}
177177

@@ -236,7 +236,7 @@ impl Repository {
236236
writer: SplitStreamWriter,
237237
reference: Option<&str>,
238238
) -> Result<Sha256HashValue> {
239-
let Some((.., ref sha256)) = writer.sha256 else {
239+
let Some((.., ref sha256)) = writer.get_sha_builder() else {
240240
bail!("Writer doesn't have sha256 enabled");
241241
};
242242
let stream_path = format!("streams/{}", hex::encode(sha256));

src/splitstream.rs

Lines changed: 19 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@
66
use std::io::{BufReader, Read, Write};
77

88
use anyhow::{bail, Result};
9-
use sha2::{Digest, Sha256};
10-
use zstd::stream::{read::Decoder, write::Encoder};
9+
use sha2::Sha256;
10+
use zstd::stream::read::Decoder;
1111

1212
use crate::{
1313
fsverity::{FsVerityHashValue, Sha256HashValue},
1414
repository::Repository,
1515
util::read_exactish,
16+
zstd_encoder::ZstdWriter,
1617
};
1718

1819
#[derive(Debug)]
@@ -60,9 +61,8 @@ impl DigestMap {
6061

6162
pub struct SplitStreamWriter<'a> {
6263
repo: &'a Repository,
63-
inline_content: Vec<u8>,
64-
writer: Encoder<'a, Vec<u8>>,
65-
pub sha256: Option<(Sha256, Sha256HashValue)>,
64+
pub(crate) inline_content: Vec<u8>,
65+
writer: ZstdWriter,
6666
}
6767

6868
impl std::fmt::Debug for SplitStreamWriter<'_> {
@@ -71,7 +71,6 @@ impl std::fmt::Debug for SplitStreamWriter<'_> {
7171
f.debug_struct("SplitStreamWriter")
7272
.field("repo", &self.repo)
7373
.field("inline_content", &self.inline_content)
74-
.field("sha256", &self.sha256)
7574
.finish()
7675
}
7776
}
@@ -82,85 +81,46 @@ impl SplitStreamWriter<'_> {
8281
refs: Option<DigestMap>,
8382
sha256: Option<Sha256HashValue>,
8483
) -> SplitStreamWriter {
85-
// SAFETY: we surely can't get an error writing the header to a Vec<u8>
86-
let mut writer = Encoder::new(vec![], 0).unwrap();
87-
88-
match refs {
89-
Some(DigestMap { map }) => {
90-
writer.write_all(&(map.len() as u64).to_le_bytes()).unwrap();
91-
for ref entry in map {
92-
writer.write_all(&entry.body).unwrap();
93-
writer.write_all(&entry.verity).unwrap();
94-
}
95-
}
96-
None => {
97-
writer.write_all(&0u64.to_le_bytes()).unwrap();
98-
}
99-
}
100-
10184
SplitStreamWriter {
10285
repo,
10386
inline_content: vec![],
104-
writer,
105-
sha256: sha256.map(|x| (Sha256::new(), x)),
87+
writer: ZstdWriter::new(sha256, refs),
10688
}
10789
}
10890

109-
fn write_fragment(writer: &mut impl Write, size: usize, data: &[u8]) -> Result<()> {
110-
writer.write_all(&(size as u64).to_le_bytes())?;
111-
Ok(writer.write_all(data)?)
91+
pub fn get_sha_builder(&self) -> &Option<(Sha256, Sha256HashValue)> {
92+
&self.writer.sha256_builder
11293
}
11394

11495
/// flush any buffered inline data, taking new_value as the new value of the buffer
11596
fn flush_inline(&mut self, new_value: Vec<u8>) -> Result<()> {
116-
if !self.inline_content.is_empty() {
117-
SplitStreamWriter::write_fragment(
118-
&mut self.writer,
119-
self.inline_content.len(),
120-
&self.inline_content,
121-
)?;
122-
self.inline_content = new_value;
123-
}
97+
self.writer.flush_inline(&self.inline_content)?;
98+
self.inline_content = new_value;
12499
Ok(())
125100
}
126101

127102
/// really, "add inline content to the buffer"
128103
/// you need to call .flush_inline() later
129104
pub fn write_inline(&mut self, data: &[u8]) {
130-
if let Some((ref mut sha256, ..)) = self.sha256 {
131-
sha256.update(data);
132-
}
105+
self.writer.update_sha(data);
133106
self.inline_content.extend(data);
134107
}
135108

136-
/// write a reference to external data to the stream. If the external data had padding in the
137-
/// stream which is not stored in the object then pass it here as well and it will be stored
138-
/// inline after the reference.
139-
fn write_reference(&mut self, reference: Sha256HashValue, padding: Vec<u8>) -> Result<()> {
140-
// Flush the inline data before we store the external reference. Any padding from the
141-
// external data becomes the start of a new inline block.
142-
self.flush_inline(padding)?;
109+
pub fn write_external(&mut self, data: &[u8], padding: Vec<u8>) -> Result<()> {
110+
let id = self.repo.ensure_object(&data)?;
143111

144-
SplitStreamWriter::write_fragment(&mut self.writer, 0, &reference)
145-
}
112+
self.writer.update_sha(data);
113+
self.writer.update_sha(&padding);
114+
self.writer.flush_inline(&padding)?;
146115

147-
pub fn write_external(&mut self, data: &[u8], padding: Vec<u8>) -> Result<()> {
148-
if let Some((ref mut sha256, ..)) = self.sha256 {
149-
sha256.update(data);
150-
sha256.update(&padding);
151-
}
152-
let id = self.repo.ensure_object(data)?;
153-
self.write_reference(id, padding)
116+
self.writer.write_fragment(0, &id)?;
117+
Ok(())
154118
}
155119

156120
pub fn done(mut self) -> Result<Sha256HashValue> {
157121
self.flush_inline(vec![])?;
158122

159-
if let Some((context, expected)) = self.sha256 {
160-
if Into::<Sha256HashValue>::into(context.finalize()) != expected {
161-
bail!("Content doesn't have expected SHA256 hash value!");
162-
}
163-
}
123+
self.writer.finalize_sha256_builder()?;
164124

165125
self.repo.ensure_object(&self.writer.finish()?)
166126
}

src/zstd_encoder.rs

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
use std::io::{self, Write};
2+
3+
use sha2::{Digest, Sha256};
4+
5+
use anyhow::{bail, Result};
6+
7+
use crate::{
8+
fsverity::{FsVerityHashValue, Sha256HashValue},
9+
splitstream::DigestMap,
10+
};
11+
12+
pub(crate) struct ZstdWriter {
13+
writer: zstd::Encoder<'static, Vec<u8>>,
14+
pub(crate) sha256_builder: Option<(Sha256, Sha256HashValue)>,
15+
}
16+
17+
impl ZstdWriter {
18+
pub fn new(sha256: Option<Sha256HashValue>, refs: Option<DigestMap>) -> Self {
19+
Self {
20+
writer: ZstdWriter::instantiate_writer(refs),
21+
sha256_builder: sha256.map(|x| (Sha256::new(), x)),
22+
}
23+
}
24+
25+
fn instantiate_writer(refs: Option<DigestMap>) -> zstd::Encoder<'static, Vec<u8>> {
26+
let mut writer = zstd::Encoder::new(vec![], 0).unwrap();
27+
28+
match refs {
29+
Some(DigestMap { map }) => {
30+
writer.write_all(&(map.len() as u64).to_le_bytes()).unwrap();
31+
32+
for ref entry in map {
33+
writer.write_all(&entry.body).unwrap();
34+
writer.write_all(&entry.verity).unwrap();
35+
}
36+
}
37+
38+
None => {
39+
writer.write_all(&0u64.to_le_bytes()).unwrap();
40+
}
41+
}
42+
43+
return writer;
44+
}
45+
46+
pub(crate) fn write_fragment(&mut self, size: usize, data: &[u8]) -> Result<()> {
47+
self.writer.write_all(&(size as u64).to_le_bytes())?;
48+
Ok(self.writer.write_all(data)?)
49+
}
50+
51+
pub(crate) fn update_sha(&mut self, data: &[u8]) {
52+
if let Some((sha256, ..)) = &mut self.sha256_builder {
53+
sha256.update(&data);
54+
}
55+
}
56+
57+
pub(crate) fn flush_inline(&mut self, inline_content: &Vec<u8>) -> Result<()> {
58+
if inline_content.is_empty() {
59+
return Ok(());
60+
}
61+
62+
self.write_fragment(inline_content.len(), &inline_content)?;
63+
64+
Ok(())
65+
}
66+
67+
pub(crate) fn finalize_sha256_builder(&mut self) -> Result<Sha256HashValue> {
68+
let sha256_builder = std::mem::replace(&mut self.sha256_builder, None);
69+
70+
let mut sha = Sha256HashValue::EMPTY;
71+
72+
if let Some((context, expected)) = sha256_builder {
73+
let final_sha = Into::<Sha256HashValue>::into(context.finalize());
74+
75+
if final_sha != expected {
76+
bail!(
77+
"Content doesn't have expected SHA256 hash value!\nExpected: {}, final: {}",
78+
hex::encode(expected),
79+
hex::encode(final_sha)
80+
);
81+
}
82+
83+
sha = final_sha;
84+
}
85+
86+
return Ok(sha);
87+
}
88+
89+
pub(crate) fn finish(self) -> io::Result<Vec<u8>> {
90+
self.writer.finish()
91+
}
92+
}

0 commit comments

Comments
 (0)