Skip to content
This repository was archived by the owner on Feb 16, 2026. It is now read-only.

Commit d8bfe57

Browse files
authored
feat!: add lower bounds for max_batch_bytes and max_batch_records (#309)
1 parent c2215e0 commit d8bfe57

File tree

1 file changed

+21
-7
lines changed

1 file changed

+21
-7
lines changed

src/batching.rs

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,15 @@ use std::{
77
};
88

99
use futures::{Stream, StreamExt};
10-
use s2_common::caps::RECORD_BATCH_MAX;
10+
use s2_common::{caps::RECORD_BATCH_MAX, read_extent::CountOrBytes};
1111
use tokio::time::Instant;
1212

1313
use crate::types::{
1414
AppendInput, AppendRecord, AppendRecordBatch, FencingToken, MeteredBytes, ValidationError,
1515
};
1616

17+
const RECORD_BATCH_MIN: CountOrBytes = CountOrBytes { count: 1, bytes: 8 };
18+
1719
#[derive(Debug, Clone)]
1820
/// Configuration for batching [`AppendRecord`]s.
1921
pub struct BatchingConfig {
@@ -47,13 +49,19 @@ impl BatchingConfig {
4749

4850
/// Set the maximum metered bytes per batch.
4951
///
50-
/// **Note:** It must not exceed `1MiB`.
52+
/// **Note:** It must be at least `8B` and must not exceed `1MiB`.
5153
///
5254
/// Defaults to `1MiB`.
5355
pub fn with_max_batch_bytes(self, max_batch_bytes: usize) -> Result<Self, ValidationError> {
56+
if max_batch_bytes < RECORD_BATCH_MIN.bytes {
57+
return Err(ValidationError(format!(
58+
"max_batch_bytes ({max_batch_bytes}) must be at least {}",
59+
RECORD_BATCH_MIN.bytes
60+
)));
61+
}
5462
if max_batch_bytes > RECORD_BATCH_MAX.bytes {
5563
return Err(ValidationError(format!(
56-
"max_batch_bytes ({max_batch_bytes}) exceeds {}",
64+
"max_batch_bytes ({max_batch_bytes}) must not exceed {}",
5765
RECORD_BATCH_MAX.bytes
5866
)));
5967
}
@@ -65,13 +73,19 @@ impl BatchingConfig {
6573

6674
/// Set the maximum number of records per batch.
6775
///
68-
/// **Note:** It must not exceed `1000`.
76+
/// **Note:** It must be at least `1` and must not exceed `1000`.
6977
///
7078
/// Defaults to `1000`.
7179
pub fn with_max_batch_records(self, max_batch_records: usize) -> Result<Self, ValidationError> {
80+
if max_batch_records < RECORD_BATCH_MIN.count {
81+
return Err(ValidationError(format!(
82+
"max_batch_records ({max_batch_records}) must be at least {}",
83+
RECORD_BATCH_MIN.count
84+
)));
85+
}
7286
if max_batch_records > RECORD_BATCH_MAX.count {
7387
return Err(ValidationError(format!(
74-
"max_batch_records ({max_batch_records}) exceeds {}",
88+
"max_batch_records ({max_batch_records}) must not exceed {}",
7589
RECORD_BATCH_MAX.count
7690
)));
7791
}
@@ -311,9 +325,9 @@ mod tests {
311325

312326
#[tokio::test]
313327
async fn batching_should_error_when_it_sees_oversized_record() -> Result<(), ValidationError> {
314-
let record = AppendRecord::new("hello")?;
328+
let record = AppendRecord::new("hello-world")?;
315329
let record_bytes = record.metered_bytes();
316-
let max_batch_bytes = 1;
330+
let max_batch_bytes = 10;
317331

318332
let config = BatchingConfig::default().with_max_batch_bytes(max_batch_bytes)?;
319333
let results: Vec<_> = AppendRecordBatches::new(futures::stream::iter(vec![record]), config)

0 commit comments

Comments
 (0)