diff --git a/Cargo.lock b/Cargo.lock index a2488d4..6fe32c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -41,6 +41,12 @@ dependencies = [ "libc", ] +[[package]] +name = "anstyle" +version = "1.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9" + [[package]] name = "async-trait" version = "0.1.88" @@ -155,6 +161,12 @@ dependencies = [ "syn", ] +[[package]] +name = "downcast" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" + [[package]] name = "either" version = "1.15.0" @@ -176,6 +188,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fragile" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28dd6caf6059519a65843af8fe2a3ae298b14b80179855aeb4adc2c1934ee619" + [[package]] name = "futures" version = "0.3.31" @@ -563,6 +581,32 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "mockall" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39a6bfcc6c8c7eed5ee98b9c3e33adc726054389233e201c95dab2d41a3839d2" +dependencies = [ + "cfg-if", + "downcast", + "fragile", + "mockall_derive", + "predicates", + "predicates-tree", +] + +[[package]] +name = "mockall_derive" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25ca3004c2efe9011bd4e461bd8256445052b9615405b4f7ea43fc8ca5c20898" +dependencies = [ + "cfg-if", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -666,6 +710,32 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "predicates" +version = "3.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5d19ee57562043d37e82899fade9a22ebab7be9cef5026b07fda9cdd4293573" +dependencies = [ + "anstyle", + "predicates-core", +] + +[[package]] +name = "predicates-core" +version = "1.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "727e462b119fe9c93fd0eb1429a5f7647394014cf3c04ab2c0350eeb09095ffa" + +[[package]] +name = "predicates-tree" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72dd2d6d381dfb73a193c7fca536518d7caee39fc8503f74e7dc0be0531b425c" +dependencies = [ + "predicates-core", + "termtree", +] + [[package]] name = "proc-macro2" version = "1.0.95" @@ -745,12 +815,13 @@ checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" [[package]] name = "riskless" -version = "0.2.1" +version = "0.3.0" dependencies = [ "async-trait", "bytes", "dashmap", "futures", + "mockall", "object_store", "thiserror", "tokio", @@ -884,6 +955,12 @@ dependencies = [ "syn", ] +[[package]] +name = "termtree" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f50febec83f5ee1df3015341d8bd429f2d1cc62bcba7ea2076759d315084683" + [[package]] name = "thiserror" version = "2.0.12" diff --git a/Cargo.toml b/Cargo.toml index 1e51257..18c89f1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "riskless" -version = "0.2.1" +version = "0.3.0" edition = "2024" description = "A pure Rust implementation of Diskless Topics" license = "MIT / Apache-2.0" @@ -18,3 +18,7 @@ async-trait = "0.1.88" tracing-subscriber = { version = "0.3"} tracing-test = {version = "0.2.5" , features = ["no-env-filter"]} dashmap = "6.1.0" + + +[dev-dependencies] +mockall = "0.13.1" diff --git a/src/batch_coordinator.rs b/src/batch_coordinator.rs index 72ad8af..cac207b 100644 --- a/src/batch_coordinator.rs +++ b/src/batch_coordinator.rs @@ -120,33 +120,33 @@ pub struct ListOffsetsRequest { #[derive(Debug)] pub struct ListOffsetsResponse { - errors: Vec, // TODO: fix this. This needs to be an Errors object. - topic_id_partition: TopicIdPartition, - timestamp: u64, - offset: u64, + pub errors: Vec, // TODO: fix this. This needs to be an Errors object. + pub topic_id_partition: TopicIdPartition, + pub timestamp: u64, + pub offset: u64, } #[derive(Debug)] pub struct DeleteRecordsRequest { - topic_id_partition: TopicIdPartition, - offset: u64, + pub topic_id_partition: TopicIdPartition, + pub offset: u64, } #[derive(Debug)] pub struct DeleteRecordsResponse { - errors: Vec, // TODO: fix this. This needs to be an Errors object. - low_watermark: u64, + pub errors: Vec, // TODO: fix this. This needs to be an Errors object. + pub low_watermark: u64, } #[derive(Debug)] pub struct FileToDelete { - object_key: String, - marked_for_deletion_at: SystemTime, + pub object_key: String, + pub marked_for_deletion_at: SystemTime, } #[derive(Debug)] pub struct DeleteFilesRequest { - object_key_paths: HashSet, + pub object_key_paths: HashSet, } #[async_trait::async_trait] @@ -221,7 +221,7 @@ where /// /// # Errors /// Returns an error if an unexpected error occurs. - async fn delete_topics(&self, topic_ids: HashSet); + async fn delete_topics(&self, topic_ids: HashSet); /// This operation allows a broker to get a list of soft deleted objects /// for asynchronous physical deletion from the object storage. diff --git a/src/broker.rs b/src/broker.rs index e8f5d09..058d8ed 100644 --- a/src/broker.rs +++ b/src/broker.rs @@ -5,8 +5,10 @@ use object_store::{ObjectStore, PutPayload, path::Path}; use tokio::sync::RwLock; use crate::{ - batch_coordinator::{BatchCoordinator, FindBatchRequest, TopicIdPartition}, - error::RisklessResult, + batch_coordinator::{ + BatchCoordinator, FindBatchRequest, TopicIdPartition, + }, + error::{RisklessError, RisklessResult}, messages::{ commit_batch_request::CommitBatchRequest, consume_request::ConsumeRequest, @@ -119,6 +121,33 @@ impl Broker { Ok(message) } + /// Delete a specific record. + /// + /// Important to note that this undertakes a "soft" delete, which means that + /// the record still persists in object storage, but does not persist in the BatchCoordinator. + /// + /// The process to permanently delete files and records from the underlying object storage is done by a separate function. + #[tracing::instrument(skip_all, name = "delete_records")] + pub async fn delete_record( + &mut self, + request: crate::messages::delete_record_request::DeleteRecordsRequest, + ) -> RisklessResult { + let result = self + .config + .batch_coordinator + .delete_records(vec![request.try_into().map_err(|e| { + RisklessError::Generic(format!( + "Failed to convert request into DeleteRecordsRequest with error {:#?}", + e + )) + })?]) + .await + .pop() + .ok_or(RisklessError::Unknown)?; + + result.try_into() + } + /// Handles a consume request by retrieving messages from object storage. #[tracing::instrument(skip_all, name = "consume")] pub async fn consume( diff --git a/src/error.rs b/src/error.rs index 46e33bb..0c371ce 100644 --- a/src/error.rs +++ b/src/error.rs @@ -6,6 +6,9 @@ pub type RisklessResult = Result; #[derive(Error, Debug)] pub enum RisklessError { + /// Generic Error for arbitrary errors that are generally not classified but should still convey information. + #[error("{0}")] + Generic(String), #[error("unknown data store error")] Unknown, diff --git a/src/messages/delete_record_request.rs b/src/messages/delete_record_request.rs new file mode 100644 index 0000000..50548e5 --- /dev/null +++ b/src/messages/delete_record_request.rs @@ -0,0 +1,19 @@ +use crate::{batch_coordinator::TopicIdPartition, error::RisklessError}; + +#[derive(Debug)] +pub struct DeleteRecordsRequest { + pub topic: String, + pub partition: u64, + pub offset: u64, +} + +impl TryInto for DeleteRecordsRequest { + type Error = RisklessError; + + fn try_into(self) -> Result { + Ok(crate::batch_coordinator::DeleteRecordsRequest { + topic_id_partition: TopicIdPartition(self.topic, self.partition), + offset: self.offset, + }) + } +} diff --git a/src/messages/delete_record_response.rs b/src/messages/delete_record_response.rs new file mode 100644 index 0000000..949075e --- /dev/null +++ b/src/messages/delete_record_response.rs @@ -0,0 +1,19 @@ + +use crate::error::RisklessError; + +#[derive(Debug)] +pub struct DeleteRecordsResponse { + pub errors: Vec, +} + +impl TryFrom for DeleteRecordsResponse { + type Error = RisklessError; + + fn try_from( + value: crate::batch_coordinator::DeleteRecordsResponse, + ) -> Result { + Ok(Self { + errors: value.errors, + }) + } +} diff --git a/src/messages/mod.rs b/src/messages/mod.rs index 14dd214..00145cd 100644 --- a/src/messages/mod.rs +++ b/src/messages/mod.rs @@ -1,6 +1,8 @@ +pub mod batch_coordinate; +pub mod commit_batch_request; pub mod consume_request; pub mod consume_response; +pub mod delete_record_request; +pub mod delete_record_response; pub mod produce_request; pub mod produce_response; -pub mod batch_coordinate; -pub mod commit_batch_request; \ No newline at end of file diff --git a/src/simple_batch_coordinator/simple.rs b/src/simple_batch_coordinator/simple.rs index 1b9d5c1..c675474 100644 --- a/src/simple_batch_coordinator/simple.rs +++ b/src/simple_batch_coordinator/simple.rs @@ -241,27 +241,32 @@ impl BatchCoordinator for SimpleBatchCoordinator { todo!() } + /// Simply returns errors as this implementation does not support this operation. async fn delete_records( &self, - _requests: Vec, + requests: Vec, ) -> Vec { - todo!() - } - - async fn delete_topics(&self, _topic_ids: HashSet) { - todo!() + requests + .iter() + .map(|_req| DeleteRecordsResponse { + errors: vec!["Coordinator does not support deleting records.".to_string()], + low_watermark: 1, + }) + .collect::>() } + /// No-op as this operation is not supported in the SimpleBatchCoordinator. + async fn delete_topics(&self, _topic_ids: HashSet) {} + /// Returns an empty vec as this operation is not supported in SimpleBatchCoordinator. async fn get_files_to_delete(&self) -> Vec { - todo!() - } - - async fn delete_files(&self, _request: DeleteFilesRequest) { - todo!() + vec![] } + /// No-op as this operation is not supported in the SimpleBatchCoordinator. + async fn delete_files(&self, _request: DeleteFilesRequest) {} + /// Always returns false. async fn is_safe_to_delete_file(&self, _object_key: String) -> bool { - todo!() + false } } @@ -450,9 +455,7 @@ mod tests { tracing::info!( "{:#?}", - std::fs::read_dir(&whole_dir) - .unwrap() - .collect::>() + std::fs::read_dir(&whole_dir).unwrap().collect::>() ); // File should now exist @@ -615,11 +618,7 @@ mod tests { index_path.push(&topic); - tracing::info!( - "{:#?}", - std::fs::read_dir(&index_path)? - .collect::>() - ); + tracing::info!("{:#?}", std::fs::read_dir(&index_path)?.collect::>()); index_path.push(format!("{:0>20}.index", partition.to_string())); diff --git a/tests/broker.rs b/tests/broker.rs index b5bd3a1..caee811 100644 --- a/tests/broker.rs +++ b/tests/broker.rs @@ -1,5 +1,3 @@ - - /// Awaits all the data that a receiver sends and /// returns a vec with the buffered data. async fn await_all_receiver(mut recv: tokio::sync::mpsc::Receiver) -> Vec { @@ -10,8 +8,10 @@ async fn await_all_receiver(mut recv: tokio::sync::mpsc::Receiver) -> Vec< } result -}#[cfg(test)] +} +#[cfg(test)] mod tests { + use riskless::messages::consume_request::ConsumeRequest; use riskless::messages::produce_request::ProduceRequest; use riskless::simple_batch_coordinator::SimpleBatchCoordinator; @@ -592,4 +592,41 @@ mod tests { tear_down_dirs(batch_coord_path, object_store_path); } + + #[tokio::test] + #[traced_test] + async fn can_request_deletion_of_records() { + let (batch_coord_path, object_store_path) = set_up_dirs(); + + let config = BrokerConfiguration { + object_store: Arc::new( + object_store::local::LocalFileSystem::new_with_prefix(&object_store_path).unwrap(), + ), + batch_coordinator: Arc::new(SimpleBatchCoordinator::new( + batch_coord_path.to_string_lossy().to_string(), + )), + segment_size_in_bytes: 50_000, + flush_interval_in_ms: 500, + }; + + let mut broker = Broker::new(config); + + let result = broker + .delete_record( + riskless::messages::delete_record_request::DeleteRecordsRequest { + topic: "".to_string(), + partition: 1, + offset: 0, + }, + ) + .await; + + assert!(result.is_ok()); + + let result = result.unwrap(); + + // Not really ideal test scenarios. + assert_eq!(result.errors.len(), 1); + tear_down_dirs(batch_coord_path, object_store_path); + } }