Skip to content

Commit 2fc772e

Browse files
authored
Shard utilities needed for GC pass and server-side xorb rewriting. (#532)
This PR adds a utility that rewrites a shard to include only the relevant xorb information, dropping unreferenced file information. In addition, to preserve the global dedup tracking information associated with the files, this PR also adds a backwards-compatible flag to the chunk metadata that marks a specific chunk as global dedup eligible. This allows the global dedup information to be tracked independently of the file metadata.
1 parent 03c1903 commit 2fc772e

File tree

6 files changed

+318
-55
lines changed

6 files changed

+318
-55
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

hf_xet/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mdb_shard/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ tempfile = { workspace = true }
2626
thiserror = { workspace = true }
2727
tokio = { workspace = true }
2828
tracing = { workspace = true }
29+
more-asserts = { workspace = true }
2930

3031
[target.'cfg(target_family = "wasm")'.dependencies]
3132
uuid = { workspace = true, features = ["v4", "js"] }

mdb_shard/src/cas_structs.rs

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,12 @@ use bytes::Bytes;
55
use merklehash::MerkleHash;
66
use utils::serialization_utils::*;
77

8+
use crate::hash_is_global_dedup_eligible;
9+
810
pub const MDB_DEFAULT_CAS_FLAG: u32 = 0;
911

12+
pub const MDB_CHUNK_WITH_GLOBAL_DEDUP_FLAG: u32 = 1 << 31;
13+
1014
/// Each CAS consists of a CASChunkSequenceHeader following
1115
/// a sequence of CASChunkSequenceEntry.
1216
@@ -89,7 +93,8 @@ pub struct CASChunkSequenceEntry {
8993
pub chunk_hash: MerkleHash,
9094
pub chunk_byte_range_start: u32,
9195
pub unpacked_segment_bytes: u32,
92-
pub _unused: u64,
96+
pub flags: u32,
97+
pub _unused: u32,
9398
}
9499

95100
impl CASChunkSequenceEntry {
@@ -106,13 +111,31 @@ impl CASChunkSequenceEntry {
106111
chunk_hash,
107112
unpacked_segment_bytes: unpacked_segment_bytes.try_into().unwrap(),
108113
chunk_byte_range_start: chunk_byte_range_start.try_into().unwrap(),
109-
#[cfg(test)]
110-
_unused: 216944691646848u64,
111-
#[cfg(not(test))]
114+
flags: 0,
112115
_unused: 0,
113116
}
114117
}
115118

119+
/// Mark this chunk as a candidate for population in the global dedup table.
120+
pub fn with_global_dedup_flag(self, is_global_dedup_chunk: bool) -> Self {
121+
if is_global_dedup_chunk {
122+
Self {
123+
flags: self.flags | MDB_CHUNK_WITH_GLOBAL_DEDUP_FLAG,
124+
..self
125+
}
126+
} else {
127+
Self {
128+
flags: self.flags & !MDB_CHUNK_WITH_GLOBAL_DEDUP_FLAG,
129+
..self
130+
}
131+
}
132+
}
133+
134+
// Is this chunk elegible for a global dedup query?
135+
pub fn is_global_dedup_eligible(&self) -> bool {
136+
(self.flags & MDB_CHUNK_WITH_GLOBAL_DEDUP_FLAG) != 0 || hash_is_global_dedup_eligible(&self.chunk_hash)
137+
}
138+
116139
pub fn serialize<W: Write>(&self, writer: &mut W) -> Result<usize, std::io::Error> {
117140
let mut buf = [0u8; size_of::<Self>()];
118141
{
@@ -122,7 +145,8 @@ impl CASChunkSequenceEntry {
122145
write_hash(writer, &self.chunk_hash)?;
123146
write_u32(writer, self.chunk_byte_range_start)?;
124147
write_u32(writer, self.unpacked_segment_bytes)?;
125-
write_u64(writer, self._unused)?;
148+
write_u32(writer, self.flags)?;
149+
write_u32(writer, self._unused)?;
126150
}
127151

128152
writer.write_all(&buf[..])?;
@@ -140,7 +164,8 @@ impl CASChunkSequenceEntry {
140164
chunk_hash: read_hash(reader)?,
141165
chunk_byte_range_start: read_u32(reader)?,
142166
unpacked_segment_bytes: read_u32(reader)?,
143-
_unused: read_u64(reader)?,
167+
flags: read_u32(reader)?,
168+
_unused: read_u32(reader)?,
144169
})
145170
}
146171
}
@@ -253,4 +278,21 @@ impl MDBCASInfoView {
253278
writer.write_all(&self.data[..n_bytes])?;
254279
Ok(n_bytes)
255280
}
281+
282+
#[inline]
283+
pub fn serialize_with_chunk_rewrite<W: Write>(
284+
&self,
285+
writer: &mut W,
286+
chunk_rewrite_fn: impl Fn(usize, CASChunkSequenceEntry) -> CASChunkSequenceEntry,
287+
) -> std::io::Result<usize> {
288+
let mut n_out_bytes = 0;
289+
n_out_bytes += self.header.serialize(writer)?;
290+
291+
for idx in 0..self.num_entries() {
292+
let rewritten_chunk = chunk_rewrite_fn(idx, self.chunk(idx));
293+
n_out_bytes += rewritten_chunk.serialize(writer)?;
294+
}
295+
296+
Ok(n_out_bytes)
297+
}
256298
}

mdb_shard/src/shard_format.rs

Lines changed: 4 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::collections::{BTreeMap, HashMap};
1+
use std::collections::BTreeMap;
22
use std::io::{Read, Seek, SeekFrom, Write, copy};
33
use std::mem::size_of;
44
use std::ops::Add;
@@ -13,11 +13,11 @@ use tracing::debug;
1313
use utils::serialization_utils::*;
1414

1515
use crate::cas_structs::*;
16-
use crate::constants::*;
1716
use crate::error::{MDBShardError, Result};
1817
use crate::file_structs::*;
1918
use crate::interpolation_search::search_on_sorted_u64s;
2019
use crate::shard_in_memory::MDBInMemoryShard;
20+
use crate::streaming_shard::MDBMinimalShard;
2121
use crate::utils::{shard_expiry_time, truncate_hash};
2222

2323
// Same size for FileDataSequenceHeader and FileDataSequenceEntry
@@ -931,44 +931,9 @@ impl MDBShardInfo {
931931
/// The chunk hashes are either multiple of 'hash_filter_modulues',
932932
/// or the hash of the first chunk of a file present in the shard.
933933
pub fn filter_cas_chunks_for_global_dedup<R: Read + Seek>(reader: &mut R) -> Result<Vec<MerkleHash>> {
934-
let mut ret = Vec::new();
935-
936-
// First, go through and get all of the cas chunks. This allows us to form the lookup for the CAS block
937-
// hashes later.
938-
let shard = MDBShardInfo::load_from_reader(reader)?;
939-
940-
let cas_chunks = shard.read_all_cas_blocks_full(reader)?;
941-
let mut cas_block_lookup = HashMap::<MerkleHash, usize>::with_capacity(cas_chunks.len());
942-
943-
for (i, cas_info) in cas_chunks.iter().enumerate() {
944-
cas_block_lookup.insert(cas_info.metadata.cas_hash, i);
945-
for chunk in cas_info.chunks.iter() {
946-
if hash_is_global_dedup_eligible(&chunk.chunk_hash) {
947-
ret.push(chunk.chunk_hash);
948-
}
949-
}
950-
}
951-
952-
// Now, go through all the files present, collecting the first chunks of the files.
953-
// TODO: break this out into a utility if needed.
954-
let files = shard.read_all_file_info_sections(reader)?;
934+
let shard = MDBMinimalShard::from_reader(reader, true, true)?;
955935

956-
for fi in files {
957-
let Some(entry) = fi.segments.first() else {
958-
continue;
959-
};
960-
961-
let Some(cas_block_index) = cas_block_lookup.get(&entry.cas_hash) else {
962-
continue;
963-
};
964-
965-
// Scan the cas entries to get the proper index
966-
let first_chunk_hash = cas_chunks[*cas_block_index].chunks[entry.chunk_index_start as usize].chunk_hash;
967-
968-
ret.push(first_chunk_hash);
969-
}
970-
971-
Ok(ret)
936+
Ok(shard.global_dedup_eligible_chunks())
972937
}
973938

974939
/// Export the current shard as an hmac keyed shard, returning the number of bytes written

0 commit comments

Comments
 (0)