-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathbatch_coordinator.rs
More file actions
242 lines (214 loc) · 7.34 KB
/
batch_coordinator.rs
File metadata and controls
242 lines (214 loc) · 7.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
//! This entire interface is generated directly from the underlying KIP-1164 interface found here:
//! <https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=350783984#KIP1164:TopicBasedBatchCoordinator-BatchCoordinatorpluggableinterface>
#![allow(dead_code)]
use std::collections::HashSet;
use std::vec::Vec;
use std::time::SystemTime;
use crate::messages::commit_batch_request::CommitBatchRequest;
#[derive(Debug, Hash, PartialEq, Eq, Clone, Default)]
pub struct TopicIdPartition(pub String, pub u64);
#[derive(Debug, Default, Clone)]
pub enum TimestampType {
#[default]
Dummy,
}
#[derive(Debug)]
pub struct CreateTopicAndPartitionsRequest {
pub topic_id: uuid::Uuid,
pub topic_name: String,
pub num_partitions: u32,
}
#[derive(Debug)]
pub struct CommitBatchResponse {
pub errors: Vec<String>, // TODO: fix this. This needs to be an Errors object.
pub assigned_base_offset: u64,
pub log_append_time: u64,
pub log_start_offset: u64,
pub is_duplicate: bool,
pub request: CommitBatchRequest,
}
#[derive(Debug)]
pub struct FindBatchRequest {
pub topic_id_partition: TopicIdPartition,
pub offset: u64,
pub max_partition_fetch_bytes: u32,
}
#[derive(Debug, Clone)]
pub struct FindBatchResponse {
pub errors: Vec<String>, // TODO: fix this. This needs to be an Errors object.
pub batches: Vec<BatchInfo>,
pub log_start_offset: u64,
pub high_watermark: u64,
}
#[derive(Debug, Clone)]
pub struct BatchInfo {
pub batch_id: u64,
pub object_key: String,
pub metadata: BatchMetadata,
}
#[derive(Debug, Default, Clone)]
pub struct BatchMetadata {
pub topic_id_partition: TopicIdPartition,
pub byte_offset: u64,
pub byte_size: u32,
pub base_offset: u64,
pub last_offset: u64,
pub log_append_timestamp: u64,
pub batch_max_timestamp: u64,
pub timestamp_type: TimestampType,
pub producer_id: u64,
pub producer_epoch: i16,
pub base_sequence: u32,
pub last_sequence: u32,
}
#[derive(Debug)]
pub struct ListOffsetsRequest {
pub topic_id_partition: TopicIdPartition,
pub timestamp: u64,
}
// impl ListOffsetsRequest {
// pub const EARLIEST_TIMESTAMP: u64 = org
// .apache
// .kafka
// .common
// .requests
// .ListOffsetsRequest
// .EARLIEST_TIMESTAMP;
// pub const LATEST_TIMESTAMP: u64 = org
// .apache
// .kafka
// .common
// .requests
// .ListOffsetsRequest
// .LATEST_TIMESTAMP;
// pub const MAX_TIMESTAMP: u64 = org
// .apache
// .kafka
// .common
// .requests
// .ListOffsetsRequest
// .MAX_TIMESTAMP;
// pub const EARLIEST_LOCAL_TIMESTAMP: u64 = org
// .apache
// .kafka
// .common
// .requests
// .ListOffsetsRequest
// .EARLIEST_LOCAL_TIMESTAMP;
// pub const LATEST_TIERED_TIMESTAMP: u64 = org
// .apache
// .kafka
// .common
// .requests
// .ListOffsetsRequest
// .LATEST_TIERED_TIMESTAMP;
// }
#[derive(Debug)]
pub struct ListOffsetsResponse {
pub errors: Vec<String>, // TODO: fix this. This needs to be an Errors object.
pub topic_id_partition: TopicIdPartition,
pub timestamp: u64,
pub offset: u64,
}
#[derive(Debug)]
pub struct DeleteRecordsRequest {
pub topic_id_partition: TopicIdPartition,
pub offset: u64,
}
#[derive(Debug)]
pub struct DeleteRecordsResponse {
pub errors: Vec<String>, // TODO: fix this. This needs to be an Errors object.
pub low_watermark: u64,
}
#[derive(Debug)]
pub struct FileToDelete {
pub object_key: String,
pub marked_for_deletion_at: SystemTime,
}
#[derive(Debug)]
pub struct DeleteFilesRequest {
pub object_key_paths: HashSet<String>,
}
#[async_trait::async_trait]
pub trait BatchCoordinator
where
Self: Send + Sync + std::fmt::Debug,
{
/// This operation is called when a Diskless partition
/// (or a topic with one or more partitions) is created in the cluster.
/// The Batch Coordinator initializes the corresponding logs.
///
/// # Errors
/// Returns an error if an unexpected error occurs.
async fn create_topic_and_partitions(&self, requests: HashSet<CreateTopicAndPartitionsRequest>);
/// This operation is called by a broker after uploading the
/// shared log segment object to the object storage.
///
/// The Batch Coordinator:
/// 1. Performs the necessary checks for idempotent produce.
/// 2. Accordingly increases the high watermark of the affected logs.
/// 3. Assigns offsets to the batches.
/// 4. Saves the batch and object metadata.
/// 5. Returns the result to the broker.
///
/// # Errors
/// Returns an error if an unexpected error occurs.
async fn commit_file(
&self,
object_key: [u8; 16],
uploader_broker_id: u32,
file_size: u64,
batches: Vec<CommitBatchRequest>,
) -> Vec<CommitBatchResponse>;
/// This operation is called by a broker when it needs to serve a Fetch request.
/// The Batch Coordinator collects the batch coordinates to satisfy
/// this request and sends the response back to the broker.
///
/// # Errors
/// Returns an error if an unexpected error occurs.
async fn find_batches(
&self,
find_batch_requests: Vec<FindBatchRequest>,
fetch_max_bytes: u32,
) -> Vec<FindBatchResponse>;
/// This operation allows the broker to get the information about log offsets:
/// earliest, latest, etc. The operation is a read-only operation.
///
/// # Errors
/// Returns an error if an unexpected error occurs.
async fn list_offsets(&self, requests: Vec<ListOffsetsRequest>) -> Vec<ListOffsetsResponse>;
/// This operation is called when a partition needs to be truncated by the user.
/// The Batch Coordinator:
/// 1. Modifies the log start offset for the affected partitions (logs).
/// 2. Deletes the batches that are no longer needed due to this truncation.
/// 3. If some objects become empty after deleting these batches,
/// they are marked for deletion as well.
///
/// # Errors
/// Returns an error if an unexpected error occurs.
async fn delete_records(
&self,
requests: Vec<DeleteRecordsRequest>,
) -> Vec<DeleteRecordsResponse>;
/// This operation is called when topics are deleted.
/// It’s similar to deleting records, but all the associated batches
/// are deleted and the log metadata are deleted as well.
///
/// # Errors
/// Returns an error if an unexpected error occurs.
async fn delete_topics(&self, topic_ids: HashSet<String>);
/// This operation allows a broker to get a list of soft deleted objects
/// for asynchronous physical deletion from the object storage.
///
/// # Errors
/// Returns an error if an unexpected error occurs.
async fn get_files_to_delete(&self) -> Vec<FileToDelete>;
/// This operation informs the Batch Coordinator that certain soft deleted
/// objects were also deleted physically from the object storage.
/// The Batch Coordinator removes all metadata about these objects.
///
/// # Errors
/// Returns an error if an unexpected error occurs.
async fn delete_files(&self, request: DeleteFilesRequest);
async fn is_safe_to_delete_file(&self, object_key: String) -> bool;
}