Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 78 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "riskless"
version = "0.2.1"
version = "0.3.0"
edition = "2024"
description = "A pure Rust implementation of Diskless Topics"
license = "MIT / Apache-2.0"
Expand All @@ -18,3 +18,7 @@ async-trait = "0.1.88"
tracing-subscriber = { version = "0.3"}
tracing-test = {version = "0.2.5" , features = ["no-env-filter"]}
dashmap = "6.1.0"


[dev-dependencies]
mockall = "0.13.1"
24 changes: 12 additions & 12 deletions src/batch_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,33 +120,33 @@ pub struct ListOffsetsRequest {

#[derive(Debug)]
pub struct ListOffsetsResponse {
errors: Vec<String>, // TODO: fix this. This needs to be an Errors object.
topic_id_partition: TopicIdPartition,
timestamp: u64,
offset: u64,
pub errors: Vec<String>, // TODO: fix this. This needs to be an Errors object.
pub topic_id_partition: TopicIdPartition,
pub timestamp: u64,
pub offset: u64,
}

#[derive(Debug)]
pub struct DeleteRecordsRequest {
topic_id_partition: TopicIdPartition,
offset: u64,
pub topic_id_partition: TopicIdPartition,
pub offset: u64,
}

#[derive(Debug)]
pub struct DeleteRecordsResponse {
errors: Vec<String>, // TODO: fix this. This needs to be an Errors object.
low_watermark: u64,
pub errors: Vec<String>, // TODO: fix this. This needs to be an Errors object.
pub low_watermark: u64,
}

#[derive(Debug)]
pub struct FileToDelete {
object_key: String,
marked_for_deletion_at: SystemTime,
pub object_key: String,
pub marked_for_deletion_at: SystemTime,
}

#[derive(Debug)]
pub struct DeleteFilesRequest {
object_key_paths: HashSet<String>,
pub object_key_paths: HashSet<String>,
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -221,7 +221,7 @@ where
///
/// # Errors
/// Returns an error if an unexpected error occurs.
async fn delete_topics(&self, topic_ids: HashSet<uuid::Uuid>);
async fn delete_topics(&self, topic_ids: HashSet<String>);

/// This operation allows a broker to get a list of soft deleted objects
/// for asynchronous physical deletion from the object storage.
Expand Down
33 changes: 31 additions & 2 deletions src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ use object_store::{ObjectStore, PutPayload, path::Path};
use tokio::sync::RwLock;

use crate::{
batch_coordinator::{BatchCoordinator, FindBatchRequest, TopicIdPartition},
error::RisklessResult,
batch_coordinator::{
BatchCoordinator, FindBatchRequest, TopicIdPartition,
},
error::{RisklessError, RisklessResult},
messages::{
commit_batch_request::CommitBatchRequest,
consume_request::ConsumeRequest,
Expand Down Expand Up @@ -119,6 +121,33 @@ impl Broker {
Ok(message)
}

/// Delete a specific record.
///
/// Important to note that this undertakes a "soft" delete, which means that
/// the record still persists in object storage, but does not persist in the BatchCoordinator.
///
/// The process to permanently delete files and records from the underlying object storage is done by a separate function.
#[tracing::instrument(skip_all, name = "delete_records")]
pub async fn delete_record(
&mut self,
request: crate::messages::delete_record_request::DeleteRecordsRequest,
) -> RisklessResult<crate::messages::delete_record_response::DeleteRecordsResponse> {
let result = self
.config
.batch_coordinator
.delete_records(vec![request.try_into().map_err(|e| {
RisklessError::Generic(format!(
"Failed to convert request into DeleteRecordsRequest with error {:#?}",
e
))
})?])
.await
.pop()
.ok_or(RisklessError::Unknown)?;

result.try_into()
}

/// Handles a consume request by retrieving messages from object storage.
#[tracing::instrument(skip_all, name = "consume")]
pub async fn consume(
Expand Down
3 changes: 3 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ pub type RisklessResult<T> = Result<T, RisklessError>;

#[derive(Error, Debug)]
pub enum RisklessError {
/// Generic Error for arbitrary errors that are generally not classified but should still convey information.
#[error("{0}")]
Generic(String),
#[error("unknown data store error")]
Unknown,

Expand Down
19 changes: 19 additions & 0 deletions src/messages/delete_record_request.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use crate::{batch_coordinator::TopicIdPartition, error::RisklessError};

#[derive(Debug)]
pub struct DeleteRecordsRequest {
pub topic: String,
pub partition: u64,
pub offset: u64,
}

impl TryInto<crate::batch_coordinator::DeleteRecordsRequest> for DeleteRecordsRequest {
type Error = RisklessError;

fn try_into(self) -> Result<crate::batch_coordinator::DeleteRecordsRequest, Self::Error> {
Ok(crate::batch_coordinator::DeleteRecordsRequest {
topic_id_partition: TopicIdPartition(self.topic, self.partition),
offset: self.offset,
})
}
}
19 changes: 19 additions & 0 deletions src/messages/delete_record_response.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@

use crate::error::RisklessError;

#[derive(Debug)]
pub struct DeleteRecordsResponse {
pub errors: Vec<String>,
}

impl TryFrom<crate::batch_coordinator::DeleteRecordsResponse> for DeleteRecordsResponse {
type Error = RisklessError;

fn try_from(
value: crate::batch_coordinator::DeleteRecordsResponse,
) -> Result<Self, Self::Error> {
Ok(Self {
errors: value.errors,
})
}
}
6 changes: 4 additions & 2 deletions src/messages/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
pub mod batch_coordinate;
pub mod commit_batch_request;
pub mod consume_request;
pub mod consume_response;
pub mod delete_record_request;
pub mod delete_record_response;
pub mod produce_request;
pub mod produce_response;
pub mod batch_coordinate;
pub mod commit_batch_request;
39 changes: 19 additions & 20 deletions src/simple_batch_coordinator/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,27 +241,32 @@ impl BatchCoordinator for SimpleBatchCoordinator {
todo!()
}

/// Simply returns errors as this implementation does not support this operation.
async fn delete_records(
&self,
_requests: Vec<DeleteRecordsRequest>,
requests: Vec<DeleteRecordsRequest>,
) -> Vec<DeleteRecordsResponse> {
todo!()
}

async fn delete_topics(&self, _topic_ids: HashSet<uuid::Uuid>) {
todo!()
requests
.iter()
.map(|_req| DeleteRecordsResponse {
errors: vec!["Coordinator does not support deleting records.".to_string()],
low_watermark: 1,
})
.collect::<Vec<_>>()
}
/// No-op as this operation is not supported in the SimpleBatchCoordinator.
async fn delete_topics(&self, _topic_ids: HashSet<String>) {}

/// Returns an empty vec as this operation is not supported in SimpleBatchCoordinator.
async fn get_files_to_delete(&self) -> Vec<FileToDelete> {
todo!()
}

async fn delete_files(&self, _request: DeleteFilesRequest) {
todo!()
vec![]
}

/// No-op as this operation is not supported in the SimpleBatchCoordinator.
async fn delete_files(&self, _request: DeleteFilesRequest) {}
/// Always returns false.
async fn is_safe_to_delete_file(&self, _object_key: String) -> bool {
todo!()
false
}
}

Expand Down Expand Up @@ -450,9 +455,7 @@ mod tests {

tracing::info!(
"{:#?}",
std::fs::read_dir(&whole_dir)
.unwrap()
.collect::<Vec<_>>()
std::fs::read_dir(&whole_dir).unwrap().collect::<Vec<_>>()
);

// File should now exist
Expand Down Expand Up @@ -615,11 +618,7 @@ mod tests {

index_path.push(&topic);

tracing::info!(
"{:#?}",
std::fs::read_dir(&index_path)?
.collect::<Vec<_>>()
);
tracing::info!("{:#?}", std::fs::read_dir(&index_path)?.collect::<Vec<_>>());

index_path.push(format!("{:0>20}.index", partition.to_string()));

Expand Down
Loading