Skip to content

Commit 499d9a1

Browse files
authored
refactor to rewrite shard header with footer_len set to 0 (#551)
related to #547 When xet-core uploads the shard the header field where the footer length is specified is set to 200 where it should be 0 according to the specification. Note: this value is ignored by the server today, but ideally we would set this right since it can be useful to know if there is a footer on the shard when reading the shard as a whole in a non-streaming fashion.
1 parent ffa9faa commit 499d9a1

File tree

1 file changed

+47
-8
lines changed

1 file changed

+47
-8
lines changed

data/src/shard_interface.rs

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@ use std::time::SystemTime;
88
use bytes::Bytes;
99
use cas_client::Client;
1010
use error_printer::ErrorPrinter;
11-
use mdb_shard::ShardFileManager;
1211
use mdb_shard::cas_structs::MDBCASInfo;
1312
use mdb_shard::constants::MDB_SHARD_MAX_TARGET_SIZE;
1413
use mdb_shard::file_structs::{FileDataSequenceEntry, MDBFileInfo};
1514
use mdb_shard::session_directory::{ShardMergeResult, consolidate_shards_in_directory, merge_shards_background};
1615
use mdb_shard::shard_in_memory::MDBInMemoryShard;
16+
use mdb_shard::{MDBShardFile, MDBShardFileHeader, ShardFileManager};
1717
use merklehash::MerkleHash;
1818
use tempfile::TempDir;
1919
use tokio::sync::Mutex;
@@ -271,13 +271,7 @@ impl SessionShardInterface {
271271
debug!("Uploading shard {shard_prefix}/{:?} from staging area to CAS.", &si.shard_hash);
272272

273273
let data: Bytes = if !shard_client.use_shard_footer() {
274-
let split_off_index = si.shard.metadata.file_lookup_offset as usize;
275-
// Read only the portion of the shard file up to the file_lookup_offset,
276-
// which excludes the footer and lookup sections.
277-
let mut file = File::open(&si.path)?;
278-
let mut buf = vec![0u8; split_off_index];
279-
file.read_exact(&mut buf)?;
280-
Bytes::from(buf)
274+
read_shard_to_bytes_remove_footer(&si)?
281275
} else {
282276
std::fs::read(&si.path)?.into()
283277
};
@@ -328,3 +322,48 @@ impl SessionShardInterface {
328322
Ok(shard_bytes_uploaded.load(Ordering::Relaxed))
329323
}
330324
}
325+
326+
fn read_shard_to_bytes_remove_footer(si: &Arc<MDBShardFile>) -> Result<Bytes> {
327+
let split_off_index = si.shard.metadata.file_lookup_offset as usize;
328+
// Read only the portion of the shard file up to the file_lookup_offset,
329+
// which excludes the footer and lookup sections.
330+
let mut file = File::open(&si.path)?;
331+
let mut buf = vec![0u8; split_off_index];
332+
file.read_exact(&mut buf)?;
333+
// re-write the header to set footer_size to 0.
334+
let mut header = si.shard.header.clone();
335+
header.footer_size = 0;
336+
header.serialize(&mut (&mut buf[..size_of::<MDBShardFileHeader>()]))?;
337+
#[cfg(debug_assertions)]
338+
{
339+
let new_header =
340+
MDBShardFileHeader::deserialize(&mut std::io::Cursor::new(&buf[..size_of::<MDBShardFileHeader>()]))?;
341+
debug_assert_eq!(new_header.footer_size, 0);
342+
}
343+
Ok(Bytes::from(buf))
344+
}
345+
346+
#[cfg(test)]
347+
mod tests {
348+
use std::io::Cursor;
349+
350+
use super::*;
351+
352+
#[test]
353+
fn test_read_shard_to_bytes_remove_footer() -> Result<()> {
354+
let tmp_dir = TempDir::with_prefix("test_read_shard_to_bytes_remove_footer")?;
355+
let tmp_dir_path = tmp_dir.path();
356+
357+
let mdb_in_mem = MDBInMemoryShard::default();
358+
let temp_shard_file_path = mdb_in_mem.write_to_directory(tmp_dir_path, None)?;
359+
360+
let shard_file = MDBShardFile::load_from_file(&temp_shard_file_path)?;
361+
assert_eq!(shard_file.shard.header.footer_size, size_of::<mdb_shard::MDBShardFileFooter>() as u64);
362+
363+
let no_footer_shard_buf = read_shard_to_bytes_remove_footer(&shard_file)?;
364+
let buf_shard_header =
365+
MDBShardFileHeader::deserialize(&mut Cursor::new(&no_footer_shard_buf[..size_of::<MDBShardFileHeader>()]))?;
366+
assert_eq!(buf_shard_header.footer_size, 0);
367+
Ok(())
368+
}
369+
}

0 commit comments

Comments
 (0)