Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 122 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions lib/vector-buffers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
async-recursion = "1.1.1"
async-stream = "0.3.6"
async-trait.workspace = true
bytecheck = { version = "0.6.9", default-features = false, features = ["std"] }
bytecheck = { version = "0.8", default-features = false }
bytes.workspace = true
crc32fast = { version = "1.5.0", default-features = false }
crossbeam-queue = { version = "0.3.12", default-features = false, features = ["std"] }
Expand All @@ -24,7 +24,7 @@ memmap2 = { version = "0.9.8", default-features = false }
metrics.workspace = true
num-traits = { version = "0.2.19", default-features = false }
paste.workspace = true
rkyv = { version = "0.7.45", default-features = false, features = ["size_32", "std", "strict", "validation"] }
rkyv = { version = "0.8.13", default-features = true, features = ["pointer_width_32", "std", "bytecheck"] }
serde.workspace = true
snafu.workspace = true
tokio-util = { version = "0.7.0", default-features = false }
Expand Down
45 changes: 10 additions & 35 deletions lib/vector-buffers/src/variants/disk_v2/backed_archive.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,12 @@
use std::marker::PhantomData;
#[cfg(test)]
use std::pin::Pin;

use bytecheck::CheckBytes;
use rkyv::{
Archive, Serialize, archived_root, check_archived_root,
ser::{Serializer, serializers::AllocSerializer},
validation::validators::DefaultValidator,
Archive, Serialize,
access_unchecked,
};

use super::ser::{DeserializeError, SerializeError};

/// A batteries-included serializer implementation.
///
/// Callers do not need to know or care about this, but it must be public as it is part of the
/// `BackedArchive` API.
pub type DefaultSerializer = AllocSerializer<4096>;

/// Backed wrapper for any type that implements [`Archive`][archive].
///
/// For any backing store that can provide references to an underlying byte slice of suitable size,
Expand Down Expand Up @@ -59,20 +49,19 @@ impl<B, T> BackedArchive<B, T>
where
B: AsRef<[u8]>,
T: Archive,
T::Archived: rkyv::Portable + for<'a> bytecheck::CheckBytes<rkyv::rancor::Strategy<rkyv::validation::Validator<rkyv::validation::archive::ArchiveValidator<'a>, rkyv::validation::shared::SharedValidator>, rkyv::rancor::Error>>,
{
/// Deserializes the archived value from the backing store and wraps it.
///
/// # Errors
///
/// If the data in the backing store is not valid for `T`, an error variant will be returned.
pub fn from_backing(backing: B) -> Result<BackedArchive<B, T>, DeserializeError>
where
for<'a> T::Archived: CheckBytes<DefaultValidator<'a>>,
{
// Validate that the input is, well, valid.
_ = check_archived_root::<T>(backing.as_ref())?;
// Validate the archived data using rkyv::access with CheckBytes
let _ = rkyv::access::<T::Archived, rkyv::rancor::Error>(backing.as_ref())
.map_err(|e| DeserializeError::InvalidStructure(e.to_string()))?;

// Now that we know the buffer fits T, we're good to go!
Ok(Self {
backing,
_archive: PhantomData,
Expand All @@ -86,14 +75,15 @@ where

/// Gets a reference to the archived value.
pub fn get_archive_ref(&self) -> &T::Archived {
unsafe { archived_root::<T>(self.backing.as_ref()) }
// SAFETY: We validated the data in from_backing, so this is safe
unsafe { access_unchecked::<T::Archived>(self.backing.as_ref()) }
}
}

impl<B, T> BackedArchive<B, T>
where
B: AsMut<[u8]>,
T: Archive,
T: Archive + for<'a> Serialize<rkyv::rancor::Strategy<rkyv::ser::Serializer<rkyv::util::AlignedVec, rkyv::ser::allocator::ArenaHandle<'a>, rkyv::ser::sharing::Share>, rkyv::rancor::Error>>,
{
/// Serializes the provided value to the backing store and wraps it.
///
Expand All @@ -104,17 +94,11 @@ where
/// the value, an error variant will be returned defining the minimum size the backing store
/// must be, as well containing the value that failed to get serialized.
pub fn from_value(mut backing: B, value: T) -> Result<BackedArchive<B, T>, SerializeError<T>>
where
T: Serialize<DefaultSerializer>,
{
// Serialize our value so we can shove it into the backing.
let mut serializer = DefaultSerializer::default();
_ = serializer
.serialize_value(&value)
let src_buf = rkyv::to_bytes::<rkyv::rancor::Error>(&value)
.map_err(|e| SerializeError::FailedToSerialize(e.to_string()))?;

let src_buf = serializer.into_serializer().into_inner();

// Now we have to write the serialized version to the backing store. For obvious reasons,
// the backing store needs to be able to hold the entire serialized representation, so we
// check for that. As well, instead of using `archived_root_mut`, we use
Expand All @@ -132,13 +116,4 @@ where
_archive: PhantomData,
})
}

/// Gets a reference to the archived value.
#[cfg(test)]
pub fn get_archive_mut(&mut self) -> Pin<&mut T::Archived> {
use rkyv::archived_root_mut;

let pinned = Pin::new(self.backing.as_mut());
unsafe { archived_root_mut::<T>(pinned) }
}
}
Loading
Loading