Skip to content

Commit 6e91c5c

Browse files
authored
Merge pull request #37 from ilikepi63/feat/decompose-batch-coordinator-trait
feat: Decompose BatchCoordinator Trait
2 parents 95c6cb3 + bfd02c8 commit 6e91c5c

File tree

5 files changed

+47
-53
lines changed

5 files changed

+47
-53
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "riskless"
3-
version = "0.5.2"
3+
version = "0.6.1"
44
edition = "2024"
55
description = "A pure Rust implementation of Diskless Topics"
66
license = "MIT / Apache-2.0"

src/batch_coordinator/mod.rs

Lines changed: 27 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -213,62 +213,59 @@ pub struct DeleteFilesRequest {
213213
pub object_key_paths: HashSet<String>,
214214
}
215215

216-
/// The BatchCoordinator trait.
217-
///
218-
/// This structure is responsible for handling the indexing of offsets within topic's partitions.
219-
/// It is designed to be reimplementable for custom usecases depending on what the specific need is.
216+
/// Trait for enabling the creation of topics and partitions.
220217
#[async_trait::async_trait]
221-
pub trait BatchCoordinator
222-
where
223-
Self: Send + Sync + std::fmt::Debug,
224-
{
218+
pub trait CreateTopicAndPartitions {
225219
/// This operation is called when a Diskless partition
226220
/// (or a topic with one or more partitions) is created in the cluster.
227221
/// The Batch Coordinator initializes the corresponding logs.
228222
///
229223
/// # Errors
230224
/// Returns an error if an unexpected error occurs.
231225
async fn create_topic_and_partitions(&self, requests: HashSet<CreateTopicAndPartitionsRequest>);
226+
}
232227

228+
/// This operation is called by a broker after uploading the
229+
/// shared log segment object to the object storage.
230+
#[async_trait::async_trait]
231+
pub trait CommitFile
232+
where
233+
Self: Send + Sync + std::fmt::Debug,
234+
{
233235
/// This operation is called by a broker after uploading the
234236
/// shared log segment object to the object storage.
235-
///
236-
/// The Batch Coordinator:
237-
/// 1. Performs the necessary checks for idempotent produce.
238-
/// 2. Accordingly increases the high watermark of the affected logs.
239-
/// 3. Assigns offsets to the batches.
240-
/// 4. Saves the batch and object metadata.
241-
/// 5. Returns the result to the broker.
242-
///
243-
/// # Errors
244-
/// Returns an error if an unexpected error occurs.
245237
async fn commit_file(
246238
&self,
247239
object_key: [u8; 16],
248240
uploader_broker_id: u32,
249241
file_size: u64,
250242
batches: Vec<CommitBatchRequest>,
251243
) -> Vec<CommitBatchResponse>;
244+
}
252245

246+
/// Trait for Implementing fn find_batches.
247+
#[async_trait::async_trait]
248+
pub trait FindBatches
249+
where
250+
Self: Send + Sync + std::fmt::Debug,
251+
{
253252
/// This operation is called by a broker when it needs to serve a Fetch request.
254-
/// The Batch Coordinator collects the batch coordinates to satisfy
255-
/// this request and sends the response back to the broker.
256-
///
257-
/// # Errors
258-
/// Returns an error if an unexpected error occurs.
259253
async fn find_batches(
260254
&self,
261255
find_batch_requests: Vec<FindBatchRequest>,
262256
fetch_max_bytes: u32,
263257
) -> Vec<FindBatchResponse>;
258+
}
264259

265-
/// This operation allows the broker to get the information about log offsets:
266-
/// earliest, latest, etc. The operation is a read-only operation.
267-
///
268-
/// # Errors
269-
/// Returns an error if an unexpected error occurs.
270-
async fn list_offsets(&self, requests: Vec<ListOffsetsRequest>) -> Vec<ListOffsetsResponse>;
271-
260+
/// The BatchCoordinator trait.
261+
///
262+
/// This structure is responsible for handling the indexing of offsets within topic's partitions.
263+
/// It is designed to be reimplementable for custom usecases depending on what the specific need is.
264+
#[async_trait::async_trait]
265+
pub trait DeleteFiles
266+
where
267+
Self: Send + Sync + std::fmt::Debug,
268+
{
272269
/// This operation is called when a partition needs to be truncated by the user.
273270
/// The Batch Coordinator:
274271
/// 1. Modifies the log start offset for the affected partitions (logs).

src/batch_coordinator/simple/mod.rs

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@ use uuid::Uuid;
1515
use crate::{batch_coordinator::BatchInfo, error::RisklessResult, messages::CommitBatchRequest};
1616

1717
use crate::batch_coordinator::{
18-
BatchCoordinator, BatchMetadata, CommitBatchResponse, CreateTopicAndPartitionsRequest,
19-
DeleteFilesRequest, DeleteRecordsRequest, DeleteRecordsResponse, FileToDelete,
20-
FindBatchRequest, FindBatchResponse, ListOffsetsRequest, ListOffsetsResponse,
18+
BatchMetadata, CommitBatchResponse, DeleteFilesRequest, DeleteRecordsRequest,
19+
DeleteRecordsResponse, FileToDelete, FindBatchRequest, FindBatchResponse,
2120
};
2221

22+
use super::{CommitFile, DeleteFiles, FindBatches};
23+
2324
/// The SimpleBatchCoordinator is a default implementation that is
2425
///
2526
/// - Single Node
@@ -95,15 +96,7 @@ impl SimpleBatchCoordinator {
9596
}
9697

9798
#[async_trait::async_trait]
98-
impl BatchCoordinator for SimpleBatchCoordinator {
99-
async fn create_topic_and_partitions(
100-
&self,
101-
_requests: HashSet<CreateTopicAndPartitionsRequest>,
102-
) {
103-
// This is not implemented for SimpleBatchCoordinator as the topics + partitions get
104-
// created as they have data produced to them.
105-
}
106-
99+
impl CommitFile for SimpleBatchCoordinator {
107100
async fn commit_file(
108101
&self,
109102
object_key: [u8; 16],
@@ -163,7 +156,10 @@ impl BatchCoordinator for SimpleBatchCoordinator {
163156

164157
commit_batch_responses
165158
}
159+
}
166160

161+
#[async_trait::async_trait]
162+
impl FindBatches for SimpleBatchCoordinator {
167163
async fn find_batches(
168164
&self,
169165
find_batch_requests: Vec<FindBatchRequest>,
@@ -295,11 +291,10 @@ impl BatchCoordinator for SimpleBatchCoordinator {
295291

296292
results
297293
}
294+
}
298295

299-
async fn list_offsets(&self, _requests: Vec<ListOffsetsRequest>) -> Vec<ListOffsetsResponse> {
300-
todo!()
301-
}
302-
296+
#[async_trait::async_trait]
297+
impl DeleteFiles for SimpleBatchCoordinator {
303298
/// Simply returns errors as this implementation does not support this operation.
304299
async fn delete_records(
305300
&self,

src/lib.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ mod shared_log_segment;
5555

5656
use std::{collections::HashSet, sync::Arc};
5757

58-
use batch_coordinator::{BatchCoordinator, DeleteFilesRequest, FindBatchRequest, TopicIdPartition};
58+
use batch_coordinator::{
59+
CommitFile, DeleteFiles, DeleteFilesRequest, FindBatchRequest, FindBatches, TopicIdPartition,
60+
};
5961
use bytes::Bytes;
6062
use messages::{
6163
CommitBatchRequest, ConsumeBatch, ConsumeRequest, ConsumeResponse, ProduceRequestCollection,
@@ -72,7 +74,7 @@ use shared_log_segment::SharedLogSegment;
7274
pub async fn flush(
7375
reqs: ProduceRequestCollection,
7476
object_storage: Arc<dyn ObjectStore>,
75-
batch_coordinator: Arc<dyn BatchCoordinator>,
77+
batch_coordinator: Arc<dyn CommitFile>,
7678
) -> RisklessResult<Vec<ProduceResponse>> {
7779
tracing::info!("Produce Requests: {:#?}", reqs);
7880

@@ -120,7 +122,7 @@ pub async fn flush(
120122
pub async fn consume(
121123
request: ConsumeRequest,
122124
object_storage: Arc<dyn ObjectStore>,
123-
batch_coordinator: Arc<dyn BatchCoordinator>,
125+
batch_coordinator: Arc<dyn FindBatches>,
124126
) -> RisklessResult<tokio::sync::mpsc::Receiver<ConsumeResponse>> {
125127
let batch_responses = batch_coordinator
126128
.find_batches(
@@ -207,7 +209,7 @@ pub async fn consume(
207209
#[tracing::instrument(skip_all, name = "delete_records")]
208210
pub async fn delete_record(
209211
request: crate::messages::DeleteRecordsRequest,
210-
batch_coordinator: Arc<dyn BatchCoordinator>,
212+
batch_coordinator: Arc<dyn DeleteFiles>,
211213
) -> RisklessResult<crate::messages::DeleteRecordsResponse> {
212214
let result = batch_coordinator
213215
.delete_records(vec![request.try_into().map_err(|e| {
@@ -232,7 +234,7 @@ pub async fn delete_record(
232234
/// The interval at which this happens is delegated to the implementor.
233235
#[tracing::instrument(skip_all, name = "heartbeat_permanent_delete")]
234236
pub async fn scan_and_permanently_delete_records(
235-
batch_coordinator: Arc<dyn BatchCoordinator>,
237+
batch_coordinator: Arc<dyn DeleteFiles>,
236238
object_store: Arc<dyn ObjectStore>,
237239
) -> RisklessResult<()> {
238240
let files_to_delete = batch_coordinator.get_files_to_delete().await;

0 commit comments

Comments
 (0)