Skip to content

Commit 5a824ed

Browse files
authored
feat: implement batch count limit, fix response handling (#12)
- Add max_requests_per_batch option (default: 1000) matching Go client - Implement can_add() in ReadBatch and WriteBatch using request count - Fix WriteBatch::flush() to properly error remaining inflight operations when server returns fewer responses than expected - Add doc comments to OxiaClientOptions fields Signed-off-by: mattisonchao <mattisonchao@gmail.com>
1 parent 96bbfcc commit 5a824ed

File tree

5 files changed

+82
-28
lines changed

5 files changed

+82
-28
lines changed

liboxia-native/src/batch.rs

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -52,18 +52,21 @@ pub(crate) struct ReadBatch {
5252
shard_id: i64,
5353
shard_manager: Arc<ShardManager>,
5454
provider_manager: Arc<ProviderManager>,
55+
max_requests: u32,
5556
}
5657
impl ReadBatch {
5758
pub fn new(
5859
shard_id: i64,
5960
shard_manager: Arc<ShardManager>,
6061
provider_manager: Arc<ProviderManager>,
62+
max_requests: u32,
6163
) -> Self {
6264
ReadBatch {
6365
get_inflight: Vec::new(),
6466
shard_id,
6567
shard_manager,
6668
provider_manager,
69+
max_requests,
6770
}
6871
}
6972

@@ -72,8 +75,7 @@ impl ReadBatch {
7275
}
7376

7477
fn can_add(&self, _: &Operation) -> bool {
75-
//todo: support it
76-
true
78+
(self.get_inflight.len() as u32) < self.max_requests
7779
}
7880
fn add(&mut self, operation: Operation) {
7981
if let Operation::Get(get) = operation {
@@ -155,16 +157,22 @@ pub(crate) struct WriteBatch {
155157
delete_inflight: Vec<DeleteOperation>,
156158
delete_range_inflight: Vec<DeleteRangeOperation>,
157159
write_stream_manager: Arc<WriteStreamManager>,
160+
max_requests: u32,
158161
}
159162

160163
impl WriteBatch {
161-
pub fn new(shard_id: i64, write_stream_manager: Arc<WriteStreamManager>) -> Self {
164+
pub fn new(
165+
shard_id: i64,
166+
write_stream_manager: Arc<WriteStreamManager>,
167+
max_requests: u32,
168+
) -> Self {
162169
WriteBatch {
163170
shard_id,
164171
put_inflight: Vec::new(),
165172
delete_inflight: Vec::new(),
166173
delete_range_inflight: Vec::new(),
167174
write_stream_manager,
175+
max_requests,
168176
}
169177
}
170178

@@ -174,9 +182,13 @@ impl WriteBatch {
174182
&& self.delete_range_inflight.is_empty()
175183
}
176184

185+
fn total_count(&self) -> u32 {
186+
(self.put_inflight.len() + self.delete_inflight.len() + self.delete_range_inflight.len())
187+
as u32
188+
}
189+
177190
fn can_add(&self, _: &Operation) -> bool {
178-
//todo: support it
179-
true
191+
self.total_count() < self.max_requests
180192
}
181193

182194
fn add(&mut self, operation: Operation) {
@@ -212,24 +224,32 @@ impl WriteBatch {
212224
}
213225
match self.write_stream_manager.write(write_request).await {
214226
Ok(response) => {
215-
for (mut operation, put_response) in
216-
self.put_inflight.drain(..).zip(response.puts.into_iter())
217-
{
218-
operation.complete(put_response);
227+
let mut put_responses = response.puts.into_iter();
228+
for mut operation in self.put_inflight.drain(..) {
229+
match put_responses.next() {
230+
Some(put_response) => operation.complete(put_response),
231+
None => operation.complete_exception(UnexpectedStatus(
232+
"missing put response from server".to_string(),
233+
)),
234+
}
219235
}
220-
for (mut operation, delete_response) in self
221-
.delete_inflight
222-
.drain(..)
223-
.zip(response.deletes.into_iter())
224-
{
225-
operation.complete(delete_response);
236+
let mut delete_responses = response.deletes.into_iter();
237+
for mut operation in self.delete_inflight.drain(..) {
238+
match delete_responses.next() {
239+
Some(delete_response) => operation.complete(delete_response),
240+
None => operation.complete_exception(UnexpectedStatus(
241+
"missing delete response from server".to_string(),
242+
)),
243+
}
226244
}
227-
for (mut operation, delete_range_response) in self
228-
.delete_range_inflight
229-
.drain(..)
230-
.zip(response.delete_ranges.into_iter())
231-
{
232-
operation.complete(delete_range_response);
245+
let mut delete_range_responses = response.delete_ranges.into_iter();
246+
for mut operation in self.delete_range_inflight.drain(..) {
247+
match delete_range_responses.next() {
248+
Some(delete_range_response) => operation.complete(delete_range_response),
249+
None => operation.complete_exception(UnexpectedStatus(
250+
"missing delete_range response from server".to_string(),
251+
)),
252+
}
233253
}
234254
}
235255
Err(err) => {

liboxia-native/src/batch_manager.rs

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,20 @@ impl Batcher {
2727
shard_manager: Arc<ShardManager>,
2828
provider_manager: Arc<ProviderManager>,
2929
write_stream_manager: Arc<WriteStreamManager>,
30+
max_requests_per_batch: u32,
3031
) -> Batch {
3132
match self {
32-
Batcher::Read => Batch::Read(ReadBatch::new(shard_id, shard_manager, provider_manager)),
33-
Batcher::Write => Batch::Write(WriteBatch::new(shard_id, write_stream_manager)),
33+
Batcher::Read => Batch::Read(ReadBatch::new(
34+
shard_id,
35+
shard_manager,
36+
provider_manager,
37+
max_requests_per_batch,
38+
)),
39+
Batcher::Write => Batch::Write(WriteBatch::new(
40+
shard_id,
41+
write_stream_manager,
42+
max_requests_per_batch,
43+
)),
3444
}
3545
}
3646
}
@@ -48,14 +58,16 @@ impl Drop for BatchManager {
4858
}
4959

5060
impl BatchManager {
61+
#[allow(clippy::too_many_arguments)]
5162
pub fn new(
5263
shard_id: i64,
5364
batcher: Batcher,
5465
shard_manager: Arc<ShardManager>,
5566
provider_manager: Arc<ProviderManager>,
5667
write_stream_manager: Arc<WriteStreamManager>,
5768
batch_linger: Duration,
58-
batch_max_size: u32,
69+
_batch_max_size: u32,
70+
max_requests_per_batch: u32,
5971
) -> Self {
6072
let (tx, rx) = mpsc::unbounded_channel();
6173
let context = CancellationToken::new();
@@ -68,7 +80,7 @@ impl BatchManager {
6880
provider_manager,
6981
write_stream_manager,
7082
batch_linger,
71-
batch_max_size,
83+
max_requests_per_batch,
7284
));
7385
BatchManager {
7486
context,
@@ -106,14 +118,15 @@ async fn start_batcher(
106118
provider_manager: Arc<ProviderManager>,
107119
write_stream_manager: Arc<WriteStreamManager>,
108120
batch_linger: Duration,
109-
_batch_max_size: u32,
121+
max_requests_per_batch: u32,
110122
) {
111123
let mut buffer = Vec::new();
112124
let mut batch = batcher.create_batch(
113125
shard_id,
114126
shard_manager.clone(),
115127
provider_manager.clone(),
116128
write_stream_manager.clone(),
129+
max_requests_per_batch,
117130
);
118131
let mut interval = interval(batch_linger);
119132
loop {
@@ -127,7 +140,7 @@ async fn start_batcher(
127140
continue
128141
}
129142
batch.flush().await;
130-
batch = batcher.create_batch( shard_id, shard_manager.clone(), provider_manager.clone(), write_stream_manager.clone());
143+
batch = batcher.create_batch( shard_id, shard_manager.clone(), provider_manager.clone(), write_stream_manager.clone(), max_requests_per_batch);
131144
}
132145
size = rx.recv_many(&mut buffer, usize::MAX) => {
133146
if size == 0 {
@@ -137,7 +150,7 @@ async fn start_batcher(
137150
for operation in buffer.drain(..) {
138151
if !batch.can_add(&operation) {
139152
batch.flush().await;
140-
batch = batcher.create_batch( shard_id, shard_manager.clone(), provider_manager.clone(), write_stream_manager.clone());
153+
batch = batcher.create_batch( shard_id, shard_manager.clone(), provider_manager.clone(), write_stream_manager.clone(), max_requests_per_batch);
141154
interval.reset();
142155
}
143156
batch.add(operation);

liboxia-native/src/client.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -516,6 +516,7 @@ impl OxiaClient {
516516
self.inner.write_stream_manager.clone(),
517517
self.inner.options.batch_linger,
518518
self.inner.options.batch_max_size,
519+
self.inner.options.max_requests_per_batch,
519520
))
520521
};
521522
let ref_mut = match batcher {

liboxia-native/src/client_builder.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ pub struct OxiaClientBuilder {
2828
identity: Option<String>,
2929
batch_linger: Option<Duration>,
3030
batch_max_size: Option<u32>,
31+
max_requests_per_batch: Option<u32>,
3132
session_timeout: Option<Duration>,
3233
request_timeout: Option<Duration>,
3334
}
@@ -62,6 +63,11 @@ impl OxiaClientBuilder {
6263
self
6364
}
6465

66+
pub fn max_requests_per_batch(mut self, max_requests_per_batch: u32) -> Self {
67+
self.max_requests_per_batch = Some(max_requests_per_batch);
68+
self
69+
}
70+
6571
pub fn session_timeout(mut self, session_timeout: Duration) -> Self {
6672
self.session_timeout = Some(session_timeout);
6773
self
@@ -89,6 +95,9 @@ impl OxiaClientBuilder {
8995
if let Some(batch_max_size) = self.batch_max_size {
9096
options.batch_max_size = batch_max_size;
9197
}
98+
if let Some(max_requests_per_batch) = self.max_requests_per_batch {
99+
options.max_requests_per_batch = max_requests_per_batch;
100+
}
92101
if let Some(session_timeout) = self.session_timeout {
93102
options.session_timeout = session_timeout;
94103
}

liboxia-native/src/client_options.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,24 @@
11
use std::time::Duration;
22
use uuid::Uuid;
33

4+
/// Configuration options for the Oxia client.
45
#[derive(Debug, Clone)]
56
pub struct OxiaClientOptions {
7+
/// The address of the Oxia service (e.g., "http://localhost:6648").
68
pub service_address: String,
9+
/// The Oxia namespace to use (default: "default").
710
pub namespace: String,
11+
/// Client identity string, used for ephemeral record tracking.
812
pub identity: String,
13+
/// Wait time before sending a batch (default: 5ms).
914
pub batch_linger: Duration,
15+
/// Maximum batch size in bytes (default: 128KB).
1016
pub batch_max_size: u32,
17+
/// Maximum number of requests per batch (default: 1000).
18+
pub max_requests_per_batch: u32,
19+
/// Session timeout for ephemeral records (default: 15s).
1120
pub session_timeout: Duration,
21+
/// Timeout for individual requests (default: 30s).
1222
pub request_timeout: Duration,
1323
}
1424

@@ -20,6 +30,7 @@ impl Default for OxiaClientOptions {
2030
identity: Uuid::new_v4().to_string(),
2131
batch_linger: Duration::from_millis(5),
2232
batch_max_size: 128 * 1024,
33+
max_requests_per_batch: 1000,
2334
session_timeout: Duration::from_secs(15),
2435
request_timeout: Duration::from_secs(30),
2536
}

0 commit comments

Comments
 (0)