diff --git a/Cargo.lock b/Cargo.lock index bb5478d575f5c..8f39225685997 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2227,8 +2227,20 @@ version = "0.6.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b6372023ac861f6e6dc89c8344a8f398fb42aaba2b5dbc649ca0c0e9dbcb627" dependencies = [ - "bytecheck_derive", - "ptr_meta", + "bytecheck_derive 0.6.11", + "ptr_meta 0.1.4", + "simdutf8", +] + +[[package]] +name = "bytecheck" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0caa33a2c0edca0419d15ac723dff03f1956f7978329b1e3b5fdaaaed9d3ca8b" +dependencies = [ + "bytecheck_derive 0.8.2", + "ptr_meta 0.3.1", + "rancor", "simdutf8", ] @@ -2243,6 +2255,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "bytecheck_derive" +version = "0.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89385e82b5d1821d2219e0b095efa2cc1f246cbf99080f3be46a1a85c0d392d9" +dependencies = [ + "proc-macro2 1.0.101", + "quote 1.0.40", + "syn 2.0.106", +] + [[package]] name = "bytecount" version = "0.6.9" @@ -7022,6 +7045,26 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" +[[package]] +name = "munge" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e17401f259eba956ca16491461b6e8f72913a0a114e39736ce404410f915a0c" +dependencies = [ + "munge_macro", +] + +[[package]] +name = "munge_macro" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4568f25ccbd45ab5d5603dc34318c1ec56b117531781260002151b8530a9f931" +dependencies = [ + "proc-macro2 1.0.101", + "quote 1.0.40", + "syn 2.0.106", +] + [[package]] name = "native-tls" version = "0.2.14" @@ -8675,7 +8718,16 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0738ccf7ea06b608c10564b31debd4f5bc5e197fc8bfe088f68ae5ce81e7a4f1" dependencies = [ - "ptr_meta_derive", + "ptr_meta_derive 0.1.4", +] + +[[package]] +name = "ptr_meta" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b9a0cf95a1196af61d4f1cbdab967179516d9a4a4312af1f31948f8f6224a79" +dependencies = [ + "ptr_meta_derive 0.3.1", ] [[package]] @@ -8689,6 +8741,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "ptr_meta_derive" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7347867d0a7e1208d93b46767be83e2b8f978c3dad35f775ac8d8847551d6fe1" +dependencies = [ + "proc-macro2 1.0.101", + "quote 1.0.40", + "syn 2.0.106", +] + [[package]] name = "publicsuffix" version = "2.3.0" @@ -8918,6 +8981,15 @@ dependencies = [ "nibble_vec", ] +[[package]] +name = "rancor" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a063ea72381527c2a0561da9c80000ef822bdd7c3241b1cc1b12100e3df081ee" +dependencies = [ + "ptr_meta 0.3.1", +] + [[package]] name = "rand" version = "0.7.3" @@ -9324,7 +9396,16 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2571463863a6bd50c32f94402933f03457a3fbaf697a707c5be741e459f08fd" dependencies = [ - "bytecheck", + "bytecheck 0.6.11", +] + +[[package]] +name = "rend" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cadadef317c2f20755a64d7fdc48f9e7178ee6b0e1f7fce33fa60f1d68a276e6" +dependencies = [ + "bytecheck 0.8.2", ] [[package]] @@ -9507,17 +9588,36 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9008cd6385b9e161d8229e1f6549dd23c3d022f132a2ea37ac3a10ac4935779b" dependencies = [ "bitvec", - "bytecheck", + "bytecheck 0.6.11", "bytes 1.10.1", "hashbrown 0.12.3", - "ptr_meta", - "rend", - "rkyv_derive", + "ptr_meta 0.1.4", + "rend 0.4.1", + "rkyv_derive 0.7.45", "seahash", "tinyvec", "uuid", ] +[[package]] +name = "rkyv" +version = "0.8.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b2e88acca7157d83d789836a3987dafc12bc3d88a050e54b8fe9ea4aaa29d20" +dependencies = [ + "bytecheck 0.8.2", + "bytes 1.10.1", + "hashbrown 0.16.0", + "indexmap 2.12.0", + "munge", + "ptr_meta 0.3.1", + "rancor", + "rend 0.5.3", + "rkyv_derive 0.8.13", + "tinyvec", + "uuid", +] + [[package]] name = "rkyv_derive" version = "0.7.45" @@ -9529,6 +9629,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "rkyv_derive" +version = "0.8.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f6dffea3c91fa91a3c0fc8a061b0e27fef25c6304728038a6d6bcb1c58ba9bd" +dependencies = [ + "proc-macro2 1.0.101", + "quote 1.0.40", + "syn 2.0.106", +] + [[package]] name = "rle-decode-fast" version = "1.0.3" @@ -9663,7 +9774,7 @@ dependencies = [ "bytes 1.10.1", "num-traits", "rand 0.8.5", - "rkyv", + "rkyv 0.7.45", "serde", "serde_json", ] @@ -12747,7 +12858,7 @@ dependencies = [ "async-recursion", "async-stream", "async-trait", - "bytecheck", + "bytecheck 0.8.2", "bytes 1.10.1", "clap", "crc32fast", @@ -12769,7 +12880,7 @@ dependencies = [ "proptest", "quickcheck", "rand 0.9.2", - "rkyv", + "rkyv 0.8.13", "serde", "serde_yaml", "snafu 0.8.9", diff --git a/lib/vector-buffers/Cargo.toml b/lib/vector-buffers/Cargo.toml index 3eb7a674ca1c4..bf03cd5e72598 100644 --- a/lib/vector-buffers/Cargo.toml +++ b/lib/vector-buffers/Cargo.toml @@ -12,7 +12,7 @@ unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] } async-recursion = "1.1.1" async-stream = "0.3.6" async-trait.workspace = true -bytecheck = { version = "0.6.9", default-features = false, features = ["std"] } +bytecheck = { version = "0.8", default-features = false } bytes.workspace = true crc32fast = { version = "1.5.0", default-features = false } crossbeam-queue = { version = "0.3.12", default-features = false, features = ["std"] } @@ -24,7 +24,7 @@ memmap2 = { version = "0.9.8", default-features = false } metrics.workspace = true num-traits = { version = "0.2.19", default-features = false } paste.workspace = true -rkyv = { version = "0.7.45", default-features = false, features = ["size_32", "std", "strict", "validation"] } +rkyv = { version = "0.8.13", default-features = true, features = ["pointer_width_32", "std", "bytecheck"] } serde.workspace = true snafu.workspace = true tokio-util = { version = "0.7.0", default-features = false } diff --git a/lib/vector-buffers/src/variants/disk_v2/backed_archive.rs b/lib/vector-buffers/src/variants/disk_v2/backed_archive.rs index 264d868d1183e..48ef96da70f9d 100644 --- a/lib/vector-buffers/src/variants/disk_v2/backed_archive.rs +++ b/lib/vector-buffers/src/variants/disk_v2/backed_archive.rs @@ -1,22 +1,12 @@ use std::marker::PhantomData; -#[cfg(test)] -use std::pin::Pin; -use bytecheck::CheckBytes; use rkyv::{ - Archive, Serialize, archived_root, check_archived_root, - ser::{Serializer, serializers::AllocSerializer}, - validation::validators::DefaultValidator, + Archive, Serialize, + access_unchecked, }; use super::ser::{DeserializeError, SerializeError}; -/// A batteries-included serializer implementation. -/// -/// Callers do not need to know or care about this, but it must be public as it is part of the -/// `BackedArchive` API. -pub type DefaultSerializer = AllocSerializer<4096>; - /// Backed wrapper for any type that implements [`Archive`][archive]. /// /// For any backing store that can provide references to an underlying byte slice of suitable size, @@ -59,6 +49,7 @@ impl BackedArchive where B: AsRef<[u8]>, T: Archive, + T::Archived: rkyv::Portable + for<'a> bytecheck::CheckBytes, rkyv::validation::shared::SharedValidator>, rkyv::rancor::Error>>, { /// Deserializes the archived value from the backing store and wraps it. /// @@ -66,13 +57,11 @@ where /// /// If the data in the backing store is not valid for `T`, an error variant will be returned. pub fn from_backing(backing: B) -> Result, DeserializeError> - where - for<'a> T::Archived: CheckBytes>, { - // Validate that the input is, well, valid. - _ = check_archived_root::(backing.as_ref())?; + // Validate the archived data using rkyv::access with CheckBytes + let _ = rkyv::access::(backing.as_ref()) + .map_err(|e| DeserializeError::InvalidStructure(e.to_string()))?; - // Now that we know the buffer fits T, we're good to go! Ok(Self { backing, _archive: PhantomData, @@ -86,14 +75,15 @@ where /// Gets a reference to the archived value. pub fn get_archive_ref(&self) -> &T::Archived { - unsafe { archived_root::(self.backing.as_ref()) } + // SAFETY: We validated the data in from_backing, so this is safe + unsafe { access_unchecked::(self.backing.as_ref()) } } } impl BackedArchive where B: AsMut<[u8]>, - T: Archive, + T: Archive + for<'a> Serialize, rkyv::ser::sharing::Share>, rkyv::rancor::Error>>, { /// Serializes the provided value to the backing store and wraps it. /// @@ -104,17 +94,11 @@ where /// the value, an error variant will be returned defining the minimum size the backing store /// must be, as well containing the value that failed to get serialized. pub fn from_value(mut backing: B, value: T) -> Result, SerializeError> - where - T: Serialize, { // Serialize our value so we can shove it into the backing. - let mut serializer = DefaultSerializer::default(); - _ = serializer - .serialize_value(&value) + let src_buf = rkyv::to_bytes::(&value) .map_err(|e| SerializeError::FailedToSerialize(e.to_string()))?; - let src_buf = serializer.into_serializer().into_inner(); - // Now we have to write the serialized version to the backing store. For obvious reasons, // the backing store needs to be able to hold the entire serialized representation, so we // check for that. As well, instead of using `archived_root_mut`, we use @@ -132,13 +116,4 @@ where _archive: PhantomData, }) } - - /// Gets a reference to the archived value. - #[cfg(test)] - pub fn get_archive_mut(&mut self) -> Pin<&mut T::Archived> { - use rkyv::archived_root_mut; - - let pinned = Pin::new(self.backing.as_mut()); - unsafe { archived_root_mut::(pinned) } - } } diff --git a/lib/vector-buffers/src/variants/disk_v2/ledger.rs b/lib/vector-buffers/src/variants/disk_v2/ledger.rs index e63fc5fb16973..bb96bab6ee086 100644 --- a/lib/vector-buffers/src/variants/disk_v2/ledger.rs +++ b/lib/vector-buffers/src/variants/disk_v2/ledger.rs @@ -8,12 +8,11 @@ use std::{ time::Instant, }; -use bytecheck::CheckBytes; use bytes::BytesMut; use crossbeam_utils::atomic::AtomicCell; use fslock::LockFile; use futures::StreamExt; -use rkyv::{Archive, Serialize, with::Atomic}; +use rkyv::{Archive, Serialize}; use snafu::{ResultExt, Snafu}; use tokio::{fs, io::AsyncWriteExt, sync::Notify}; use vector_common::finalizer::OrderedFinalizer; @@ -90,20 +89,16 @@ pub enum LedgerLoadCreateError { /// /// Do not do any of the listed things unless you _absolutely_ know what you're doing. :) #[derive(Archive, Serialize, Debug)] -#[archive_attr(derive(CheckBytes, Debug))] +#[rkyv(attr(derive(Debug)))] pub struct LedgerState { /// Next record ID to use when writing a record. - #[with(Atomic)] - writer_next_record: AtomicU64, + writer_next_record: u64, /// The current data file ID being written to. - #[with(Atomic)] - writer_current_data_file: AtomicU16, + writer_current_data_file: u16, /// The current data file ID being read from. - #[with(Atomic)] - reader_current_data_file: AtomicU16, + reader_current_data_file: u16, /// The last record ID read by the reader. - #[with(Atomic)] - reader_last_record: AtomicU64, + reader_last_record: u64, } impl Default for LedgerState { @@ -112,17 +107,42 @@ impl Default for LedgerState { // First record written is always 1, so that our default of 0 for // `reader_last_record_id` ensures we start up in a state of "alright, waiting to read // record #1 next". - writer_next_record: AtomicU64::new(1), - writer_current_data_file: AtomicU16::new(0), - reader_current_data_file: AtomicU16::new(0), - reader_last_record: AtomicU64::new(0), + writer_next_record: 1, + writer_current_data_file: 0, + reader_current_data_file: 0, + reader_last_record: 0, } } } +// Helper functions to safely access archived atomic fields. +// These use UnsafeCell to provide safe atomic access to memory-mapped data. +// SAFETY: The transmutation from &u64 to &AtomicU64 is sound because: +// 1. They have the same size and alignment (guaranteed by Rust) +// 2. The memory is properly aligned (guaranteed by rkyv) +// 3. We only access via atomic operations (no data races) +// 4. The memory is validly initialized (guaranteed by rkyv validation) impl ArchivedLedgerState { + #[inline] + fn atomic_u64_from_le(value: &rkyv::rend::u64_le) -> &AtomicU64 { + // SAFETY: u64_le and AtomicU64 have the same memory layout and alignment. + // This is guaranteed by the Rust standard library and is a common pattern + // for accessing archived atomic fields in memory-mapped rkyv data. + // rkyv 0.8 archives integers as little-endian types which have the same layout. + unsafe { &*(value as *const _ as *const AtomicU64) } + } + + #[inline] + fn atomic_u16_from_le(value: &rkyv::rend::u16_le) -> &AtomicU16 { + // SAFETY: u16_le and AtomicU16 have the same memory layout and alignment. + // This is guaranteed by the Rust standard library and is a common pattern + // for accessing archived atomic fields in memory-mapped rkyv data. + // rkyv 0.8 archives integers as little-endian types which have the same layout. + unsafe { &*(value as *const _ as *const AtomicU16) } + } + fn get_current_writer_file_id(&self) -> u16 { - self.writer_current_data_file.load(Ordering::Acquire) + Self::atomic_u16_from_le(&self.writer_current_data_file).load(Ordering::Acquire) } fn get_next_writer_file_id(&self) -> u16 { @@ -130,21 +150,22 @@ impl ArchivedLedgerState { } pub(super) fn increment_writer_file_id(&self) { - self.writer_current_data_file + Self::atomic_u16_from_le(&self.writer_current_data_file) .store(self.get_next_writer_file_id(), Ordering::Release); } pub(super) fn get_next_writer_record_id(&self) -> u64 { - self.writer_next_record.load(Ordering::Acquire) + Self::atomic_u64_from_le(&self.writer_next_record).load(Ordering::Acquire) } pub(super) fn increment_next_writer_record_id(&self, amount: u64) -> u64 { - let previous = self.writer_next_record.fetch_add(amount, Ordering::AcqRel); + let previous = Self::atomic_u64_from_le(&self.writer_next_record) + .fetch_add(amount, Ordering::AcqRel); previous.wrapping_add(amount) } fn get_current_reader_file_id(&self) -> u16 { - self.reader_current_data_file.load(Ordering::Acquire) + Self::atomic_u16_from_le(&self.reader_current_data_file).load(Ordering::Acquire) } fn get_next_reader_file_id(&self) -> u16 { @@ -157,17 +178,16 @@ impl ArchivedLedgerState { fn increment_reader_file_id(&self) -> u16 { let value = self.get_next_reader_file_id(); - self.reader_current_data_file - .store(value, Ordering::Release); + Self::atomic_u16_from_le(&self.reader_current_data_file).store(value, Ordering::Release); value } pub(super) fn get_last_reader_record_id(&self) -> u64 { - self.reader_last_record.load(Ordering::Acquire) + Self::atomic_u64_from_le(&self.reader_last_record).load(Ordering::Acquire) } pub(super) fn increment_last_reader_record_id(&self, amount: u64) { - self.reader_last_record.fetch_add(amount, Ordering::AcqRel); + Self::atomic_u64_from_le(&self.reader_last_record).fetch_add(amount, Ordering::AcqRel); } #[cfg(test)] @@ -183,7 +203,7 @@ impl ArchivedLedgerState { // // Despite it being test-only, we're really amping up the "this is only for testing!" factor // by making it an actual `unsafe` function, and putting "unsafe" in the name. :) - self.writer_next_record.store(id, Ordering::Release); + Self::atomic_u64_from_le(&self.writer_next_record).store(id, Ordering::Release); } #[cfg(test)] @@ -199,7 +219,7 @@ impl ArchivedLedgerState { // // Despite it being test-only, we're really amping up the "this is only for testing!" factor // by making it an actual `unsafe` function, and putting "unsafe" in the name. :) - self.reader_last_record.store(id, Ordering::Release); + Self::atomic_u64_from_le(&self.reader_last_record).store(id, Ordering::Release); } } diff --git a/lib/vector-buffers/src/variants/disk_v2/reader.rs b/lib/vector-buffers/src/variants/disk_v2/reader.rs index 8f15bfedb9fff..625b3ef32d0a2 100644 --- a/lib/vector-buffers/src/variants/disk_v2/reader.rs +++ b/lib/vector-buffers/src/variants/disk_v2/reader.rs @@ -8,7 +8,7 @@ use std::{ }; use crc32fast::Hasher; -use rkyv::{AlignedVec, archived_root}; +use rkyv::{util::AlignedVec, access_unchecked}; use snafu::{ResultExt, Snafu}; use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader}; use vector_common::{finalization::BatchNotifier, finalizer::OrderedFinalizer}; @@ -17,7 +17,7 @@ use super::{ Filesystem, common::create_crc32c_hasher, ledger::Ledger, - record::{ArchivedRecord, Record, RecordStatus, validate_record_archive}, + record::{ArchivedRecord, RecordStatus, validate_record_archive}, }; use crate::{ Bufferable, @@ -372,7 +372,7 @@ where // - `try_next_record` is the only method that can hand back a `ReadToken` // - we only get a `ReadToken` if there's a valid record in `self.aligned_buf` // - `try_next_record` does all the archive checks, checksum validation, etc - let record = unsafe { archived_root::>(&self.aligned_buf) }; + let record = unsafe { access_unchecked::(&self.aligned_buf) }; decode_record_payload(record) } @@ -1133,7 +1133,7 @@ where } pub(crate) fn decode_record_payload( - record: &ArchivedRecord<'_>, + record: &ArchivedRecord, ) -> Result> { // Try and convert the raw record metadata into the true metadata type used by `T`, and then // also verify that `T` is able to decode records with the metadata used for this record in particular. diff --git a/lib/vector-buffers/src/variants/disk_v2/record.rs b/lib/vector-buffers/src/variants/disk_v2/record.rs index 986beb807fafd..7404e2f404ae0 100644 --- a/lib/vector-buffers/src/variants/disk_v2/record.rs +++ b/lib/vector-buffers/src/variants/disk_v2/record.rs @@ -1,11 +1,8 @@ -use std::{mem, ptr::addr_of}; +use std::mem; -use bytecheck::{CheckBytes, ErrorBox, StructCheckError}; use crc32fast::Hasher; use rkyv::{ - Archive, Archived, Serialize, - boxed::ArchivedBox, - with::{CopyOptimize, RefAsBox}, + Archive, Serialize, }; use super::{ @@ -13,7 +10,7 @@ use super::{ ser::{DeserializeError, try_as_archive}, }; -pub const RECORD_HEADER_LEN: usize = align16(mem::size_of::>() + 8); +pub const RECORD_HEADER_LEN: usize = align16(mem::size_of::() + 8); /// Result of checking if a buffer contained a valid record. pub enum RecordStatus { @@ -44,11 +41,8 @@ pub enum RecordStatus { /// /// Do not do any of the listed things unless you _absolutely_ know what you're doing. :) #[derive(Archive, Serialize, Debug)] -// Switch back to the derived implementation of CheckBytes once the upstream ICE issue is fixed. -// -// Upstream issue: https://github.com/rkyv/rkyv/issues/221 -//#[archive_attr(derive(CheckBytes))] -pub struct Record<'a> { +#[rkyv(attr(derive(Debug)))] +pub struct Record { /// The checksum of the record. /// /// The checksum is CRC32C(BE(id) + BE(metadata) + payload), where BE(x) returns a byte slice of @@ -68,71 +62,26 @@ pub struct Record<'a> { /// The record payload. /// /// This is the encoded form of the actual record itself. - #[with(CopyOptimize, RefAsBox)] - payload: &'a [u8], + payload: Vec, } -// Manual implementation of CheckBytes required as the derived version currently causes an internal -// compiler error. -// -// Upstream issue: https://github.com/rkyv/rkyv/issues/221 -impl<'a, C: ?Sized> CheckBytes for ArchivedRecord<'a> -where - rkyv::with::With<&'a [u8], RefAsBox>: Archive>, - ArchivedBox<[u8]>: CheckBytes, -{ - type Error = StructCheckError; - unsafe fn check_bytes<'b>( - value: *const Self, - context: &mut C, - ) -> Result<&'b Self, Self::Error> { - unsafe { - Archived::::check_bytes(addr_of!((*value).checksum), context).map_err(|e| { - StructCheckError { - field_name: "checksum", - inner: ErrorBox::new(e), - } - })?; - Archived::::check_bytes(addr_of!((*value).id), context).map_err(|e| { - StructCheckError { - field_name: "id", - inner: ErrorBox::new(e), - } - })?; - Archived::::check_bytes(addr_of!((*value).metadata), context).map_err(|e| { - StructCheckError { - field_name: "schema_metadata", - inner: ErrorBox::new(e), - } - })?; - ArchivedBox::<[u8]>::check_bytes(addr_of!((*value).payload), context).map_err(|e| { - StructCheckError { - field_name: "payload", - inner: ErrorBox::new(e), - } - })?; - Ok(&*value) - } - } -} - -impl<'a> Record<'a> { +impl Record { /// Creates a [`Record`] from the ID and payload, and calculates the checksum. - pub fn with_checksum(id: u64, metadata: u32, payload: &'a [u8], checksummer: &Hasher) -> Self { + pub fn with_checksum(id: u64, metadata: u32, payload: &[u8], checksummer: &Hasher) -> Self { let checksum = generate_checksum(checksummer, id, metadata, payload); Self { checksum, id, metadata, - payload, + payload: payload.to_vec(), } } } -impl ArchivedRecord<'_> { +impl ArchivedRecord { /// Gets the metadata of this record. pub fn metadata(&self) -> u32 { - self.metadata + self.metadata.into() } /// Gets the payload of this record. @@ -142,13 +91,15 @@ impl ArchivedRecord<'_> { /// Verifies if the stored checksum of this record matches the record itself. pub fn verify_checksum(&self, checksummer: &Hasher) -> RecordStatus { - let calculated = generate_checksum(checksummer, self.id, self.metadata, &self.payload); - if self.checksum == calculated { - RecordStatus::Valid { id: self.id } + let calculated = generate_checksum(checksummer, self.id.into(), self.metadata.into(), &self.payload); + let checksum: u32 = self.checksum.into(); + let id: u64 = self.id.into(); + if checksum == calculated { + RecordStatus::Valid { id } } else { RecordStatus::Corrupted { calculated, - actual: self.checksum, + actual: checksum, } } } @@ -190,6 +141,6 @@ pub fn validate_record_archive(buf: &[u8], checksummer: &Hasher) -> RecordStatus /// will be returned. Otherwise, the deserialization error encountered will be provided, which describes the error in a more verbose, /// debugging-oriented fashion. #[cfg_attr(test, instrument(skip_all, level = "trace"))] -pub fn try_as_record_archive(buf: &[u8]) -> Result<&ArchivedRecord<'_>, DeserializeError> { - try_as_archive::>(buf) +pub fn try_as_record_archive(buf: &[u8]) -> Result<&ArchivedRecord, DeserializeError> { + try_as_archive::(buf) } diff --git a/lib/vector-buffers/src/variants/disk_v2/ser.rs b/lib/vector-buffers/src/variants/disk_v2/ser.rs index 9f7fa08769ff9..810ef912ce518 100644 --- a/lib/vector-buffers/src/variants/disk_v2/ser.rs +++ b/lib/vector-buffers/src/variants/disk_v2/ser.rs @@ -1,10 +1,4 @@ -use std::fmt; - -use bytecheck::CheckBytes; -use rkyv::{ - Archive, check_archived_root, - validation::{CheckArchiveError, validators::DefaultValidator}, -}; +use rkyv::{Archive, rancor::Error}; /// Error that occurred during serialization. #[derive(Debug)] @@ -45,6 +39,7 @@ pub enum DeserializeError { /// /// The backing store that was given is returned, along with an error string that briefly /// describes the error in a more verbose fashion, suitable for debugging. + #[allow(dead_code)] InvalidData(String), } @@ -58,20 +53,9 @@ impl DeserializeError { } } -impl From> for DeserializeError -where - T: fmt::Display, - C: fmt::Display, -{ - fn from(e: CheckArchiveError) -> Self { - match e { - CheckArchiveError::ContextError(ce) => { - DeserializeError::InvalidStructure(ce.to_string()) - } - CheckArchiveError::CheckBytesError(cbe) => { - DeserializeError::InvalidData(cbe.to_string()) - } - } +impl From for DeserializeError { + fn from(e: Error) -> Self { + DeserializeError::InvalidStructure(e.to_string()) } } @@ -88,8 +72,11 @@ where pub fn try_as_archive<'a, T>(buf: &'a [u8]) -> Result<&'a T::Archived, DeserializeError> where T: Archive, - T::Archived: for<'b> CheckBytes>, + T::Archived: rkyv::Portable + for<'b> bytecheck::CheckBytes, rkyv::validation::shared::SharedValidator>, rkyv::rancor::Error>>, { debug_assert!(!buf.is_empty()); - check_archived_root::(buf).map_err(Into::into) + + // Use rkyv::access which performs CheckBytes validation in rkyv 0.8 + rkyv::access::(buf) + .map_err(|e| DeserializeError::InvalidStructure(e.to_string())) } diff --git a/lib/vector-buffers/src/variants/disk_v2/tests/known_errors.rs b/lib/vector-buffers/src/variants/disk_v2/tests/known_errors.rs index 9f5ec073c00a7..bdfc020883c2d 100644 --- a/lib/vector-buffers/src/variants/disk_v2/tests/known_errors.rs +++ b/lib/vector-buffers/src/variants/disk_v2/tests/known_errors.rs @@ -1,5 +1,4 @@ use bytes::{Buf, BufMut}; -use memmap2::MmapMut; use std::{ io::{self, SeekFrom}, path::PathBuf, @@ -7,7 +6,7 @@ use std::{ }; use tokio::{ fs::OpenOptions, - io::{AsyncSeekExt, AsyncWriteExt}, + io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}, time::{Duration, timeout}, }; use tracing::Instrument; @@ -23,7 +22,7 @@ use crate::{ assert_file_exists_async, assert_reader_writer_v2_file_positions, await_timeout, encoding::{AsMetadata, Encodable}, test::{SizedRecord, UndecodableRecord, acknowledge, install_tracing_helpers, with_temp_dir}, - variants::disk_v2::{ReaderError, backed_archive::BackedArchive, record::Record}, + variants::disk_v2::ReaderError, }; impl AsMetadata for u32 { @@ -488,8 +487,10 @@ async fn writer_detects_when_last_record_has_invalid_checksum() { // We should not have seen a call to `mark_for_skip` yet. assert!(!marked_for_skip.try_assert()); - // Open the file, mutably deserialize the record, and flip a bit in the checksum. - let data_file = OpenOptions::new() + // Open the file and corrupt the checksum field by flipping the 15th bit. + // The checksum is the first field in the archived record, located at offset 8 + // (after the 8-byte length delimiter). + let mut data_file = OpenOptions::new() .read(true) .write(true) .open(&data_file_path) @@ -503,30 +504,38 @@ async fn writer_detects_when_last_record_has_invalid_checksum() { .expect("metadata should not fail"); assert_eq!(expected_data_file_len, metadata.len()); - let std_data_file = data_file.into_std().await; - let record_mmap = - unsafe { MmapMut::map_mut(&std_data_file).expect("mmap should not fail") }; - drop(std_data_file); - - let mut backed_record = BackedArchive::<_, Record>::from_backing(record_mmap) - .expect("archive should not fail"); - let record = backed_record.get_archive_mut(); - - // Just flip the 15th bit. Should be enough. *shrug* - { - let projected_checksum = - unsafe { record.map_unchecked_mut(|record| &mut record.checksum) }; - let projected_checksum = projected_checksum.get_mut(); - let new_checksum = *projected_checksum ^ (1 << 15); - *projected_checksum = new_checksum; - } + // Seek to the checksum field (8 bytes for length delimiter, then the checksum is first) + let checksum_offset = 8u64; + let pos = data_file + .seek(SeekFrom::Start(checksum_offset)) + .await + .expect("seek should not fail"); + assert_eq!(checksum_offset, pos); + + // Read the current checksum bytes + let mut checksum_bytes = [0u8; 4]; + data_file + .read_exact(&mut checksum_bytes) + .await + .expect("read should not fail"); + + // Flip the 15th bit + let current_checksum = u32::from_le_bytes(checksum_bytes); + let new_checksum = current_checksum ^ (1 << 15); + let new_checksum_bytes = new_checksum.to_le_bytes(); - // Flush the memory-mapped data file to disk and we're done with our modification. - backed_record - .get_backing_ref() - .flush() - .expect("flush should not fail"); - drop(backed_record); + // Write back the corrupted checksum + data_file + .seek(SeekFrom::Start(checksum_offset)) + .await + .expect("seek should not fail"); + data_file + .write_all(&new_checksum_bytes) + .await + .expect("write should not fail"); + data_file.flush().await.expect("flush should not fail"); + data_file.sync_all().await.expect("sync should not fail"); + drop(data_file); // Now reopen the buffer, which should trigger a `Writer::mark_for_skip` call which // instructs the writer to skip to the next data file, although this doesn't happen diff --git a/lib/vector-buffers/src/variants/disk_v2/writer.rs b/lib/vector-buffers/src/variants/disk_v2/writer.rs index f12ce6ca94dae..ee513d7a2e350 100644 --- a/lib/vector-buffers/src/variants/disk_v2/writer.rs +++ b/lib/vector-buffers/src/variants/disk_v2/writer.rs @@ -1,6 +1,5 @@ use std::{ cmp::Ordering, - convert::Infallible as StdInfallible, fmt, io::{self, ErrorKind}, marker::PhantomData, @@ -10,16 +9,7 @@ use std::{ use bytes::BufMut; use crc32fast::Hasher; -use rkyv::{ - AlignedVec, Infallible, - ser::{ - Serializer, - serializers::{ - AlignedSerializer, AllocScratch, AllocScratchError, BufferScratch, CompositeSerializer, - CompositeSerializerError, FallbackScratch, - }, - }, -}; +use rkyv::util::AlignedVec; use snafu::{ResultExt, Snafu}; use tokio::io::{AsyncWrite, AsyncWriteExt}; @@ -179,18 +169,13 @@ impl PartialEq for WriterError { } } -impl From> - for WriterError +impl From for WriterError where T: Bufferable, { - fn from(e: CompositeSerializerError) -> Self { - match e { - CompositeSerializerError::ScratchSpaceError(sse) => WriterError::FailedToSerialize { - reason: format!("insufficient space to serialize encoded record: {sse}"), - }, - // Only our scratch space strategy is fallible, so we should never get here. - _ => unreachable!(), + fn from(e: rkyv::rancor::Error) -> Self { + WriterError::FailedToSerialize { + reason: format!("failed to serialize encoded record: {e}"), } } } @@ -485,9 +470,7 @@ where Record::with_checksum(id, metadata, &self.encode_buf, &self.checksummer); // Push 8 dummy bytes where our length delimiter will sit. We'll fix this up after - // serialization. Notably, `AlignedSerializer` will report the serializer position as - // the length of its backing store, which now includes our 8 bytes, so we _subtract_ - // those from the position when figuring out the actual value to write back after. + // serialization. // // We write it this way -- in the serializer buffer, and not as a separate write -- so that // we can do a single write but also so that we always have an aligned buffer. @@ -496,18 +479,10 @@ where // Now serialize the record, which puts it into its archived form. This is what powers our // ability to do zero-copy deserialization from disk. - let mut serializer = CompositeSerializer::new( - AlignedSerializer::new(&mut self.ser_buf), - FallbackScratch::new( - BufferScratch::new(&mut self.ser_scratch), - AllocScratch::new(), - ), - Infallible, - ); + let serialized_record = rkyv::to_bytes::(&wrapped_record)?; + self.ser_buf.extend_from_slice(&serialized_record); - let serialized_len = serializer - .serialize_value(&wrapped_record) - .map(|_| serializer.pos())?; + let serialized_len = self.ser_buf.len(); // Sanity check before we do our length math. if serialized_len <= 8 || self.ser_buf.len() != serialized_len {