@@ -13,21 +13,11 @@ use tokio::time::Instant;
1313use crate :: types:: { AppendInput , AppendRecord , AppendRecordBatch , FencingToken , MeteredBytes } ;
1414
1515#[ derive( Debug , Clone ) ]
16- #[ non_exhaustive]
17- /// Configuration for batching [AppendRecord]s.
16+ /// Configuration for batching [`AppendRecord`]s.
1817pub struct BatchingConfig {
19- /// How long to wait for more records before flushing a batch.
20- ///
21- /// Defaults to `5ms`.
22- pub linger : Duration ,
23- /// Maximum bytes per batch.
24- ///
25- /// Defaults to `1MiB`.
26- pub max_batch_bytes : usize ,
27- /// Maximum number of records per batch.
28- ///
29- /// Defaults to `1000`.
30- pub max_batch_records : usize ,
18+ linger : Duration ,
19+ max_batch_bytes : usize ,
20+ max_batch_records : usize ,
3121}
3222
3323impl Default for BatchingConfig {
@@ -41,21 +31,27 @@ impl Default for BatchingConfig {
4131}
4232
4333impl BatchingConfig {
44- /// Create a new [BatchingConfig] with default settings.
34+ /// Create a new [` BatchingConfig` ] with default settings.
4535 pub fn new ( ) -> Self {
4636 Self :: default ( )
4737 }
4838
49- /// Set the linger duration.
39+ /// Set the duration for how long to wait for more records before flushing a batch.
40+ ///
41+ /// Defaults to `5ms`.
5042 pub fn with_linger ( self , linger : Duration ) -> Self {
5143 Self { linger, ..self }
5244 }
5345
54- /// Set the maximum bytes per batch.
46+ /// Set the maximum metered bytes per batch.
47+ ///
48+ /// **Note:** It must not exceed `1MiB`.
49+ ///
50+ /// Defaults to `1MiB`.
5551 pub fn with_max_batch_bytes ( self , max_batch_bytes : usize ) -> Result < Self , ValidationError > {
5652 if max_batch_bytes > RECORD_BATCH_MAX . bytes {
5753 return Err ( ValidationError ( format ! (
58- "max_batch_bytes: {max_batch_bytes} exceeds {}" ,
54+ "max_batch_bytes ( {max_batch_bytes}) exceeds {}" ,
5955 RECORD_BATCH_MAX . bytes
6056 ) ) ) ;
6157 }
@@ -66,10 +62,14 @@ impl BatchingConfig {
6662 }
6763
6864 /// Set the maximum number of records per batch.
65+ ///
66+ /// **Note:** It must not exceed `1000`.
67+ ///
68+ /// Defaults to `1000`.
6969 pub fn with_max_batch_records ( self , max_batch_records : usize ) -> Result < Self , ValidationError > {
7070 if max_batch_records > RECORD_BATCH_MAX . count {
7171 return Err ( ValidationError ( format ! (
72- "max_batch_records: {max_batch_records} exceeds {}" ,
72+ "max_batch_records ( {max_batch_records}) exceeds {}" ,
7373 RECORD_BATCH_MAX . count
7474 ) ) ) ;
7575 }
@@ -80,16 +80,15 @@ impl BatchingConfig {
8080 }
8181}
8282
83- #[ non_exhaustive]
84- /// A [Stream] that batches [AppendRecord]s into [AppendInput]s.
83+ /// A [`Stream`] that batches [`AppendRecord`]s into [`AppendInput`]s.
8584pub struct AppendInputs {
8685 pub ( crate ) batches : AppendRecordBatches ,
8786 pub ( crate ) fencing_token : Option < FencingToken > ,
8887 pub ( crate ) match_seq_num : Option < u64 > ,
8988}
9089
9190impl AppendInputs {
92- /// Create a new [AppendInputs] with the given records and config.
91+ /// Create a new [` AppendInputs` ] with the given records and config.
9392 pub fn new (
9493 records : impl Stream < Item = impl Into < AppendRecord > + Send > + Send + Unpin + ' static ,
9594 config : BatchingConfig ,
@@ -101,15 +100,15 @@ impl AppendInputs {
101100 }
102101 }
103102
104- /// Set the fencing token for all [AppendInput]s.
103+ /// Set the fencing token for all [` AppendInput` ]s.
105104 pub fn with_fencing_token ( self , fencing_token : FencingToken ) -> Self {
106105 Self {
107106 fencing_token : Some ( fencing_token) ,
108107 ..self
109108 }
110109 }
111110
112- /// Set the match sequence number for the initial [AppendInput]. It will be auto-incremented
111+ /// Set the match sequence number for the initial [` AppendInput` ]. It will be auto-incremented
113112 /// for the subsequent ones.
114113 pub fn with_match_seq_num ( self , seq_num : u64 ) -> Self {
115114 Self {
@@ -142,13 +141,13 @@ impl Stream for AppendInputs {
142141 }
143142}
144143
145- /// A [Stream] that batches [AppendRecord]s into [AppendRecordBatch]es.
144+ /// A [` Stream` ] that batches [` AppendRecord` ]s into [` AppendRecordBatch` ]es.
146145pub struct AppendRecordBatches {
147146 inner : Pin < Box < dyn Stream < Item = Result < AppendRecordBatch , ValidationError > > + Send > > ,
148147}
149148
150149impl AppendRecordBatches {
151- /// Create a new [AppendRecordBatches] with the given records and config.
150+ /// Create a new [` AppendRecordBatches` ] with the given records and config.
152151 pub fn new (
153152 records : impl Stream < Item = impl Into < AppendRecord > + Send > + Send + Unpin + ' static ,
154153 config : BatchingConfig ,
@@ -203,7 +202,7 @@ fn append_record_batches(
203202 let record_bytes = first_record. metered_bytes( ) ;
204203 if record_bytes > config. max_batch_bytes {
205204 yield Err ( ValidationError ( format!(
206- "record size ({record_bytes} bytes ) exceeds max_batch_bytes ({})" ,
205+ "record size in metered bytes ({record_bytes}) exceeds max_batch_bytes ({})" ,
207206 config. max_batch_bytes
208207 ) ) ) ;
209208 break ;
@@ -310,7 +309,7 @@ mod tests {
310309 }
311310
312311 #[ tokio:: test]
313- async fn batching_should_fail_when_it_sees_oversized_record ( ) -> Result < ( ) , ValidationError > {
312+ async fn batching_should_error_when_it_sees_oversized_record ( ) -> Result < ( ) , ValidationError > {
314313 let record = AppendRecord :: new ( "hello" ) ?;
315314 let record_bytes = record. metered_bytes ( ) ;
316315 let max_batch_bytes = 1 ;
@@ -324,7 +323,7 @@ mod tests {
324323 assert_matches ! ( & results[ 0 ] , Err ( err) => {
325324 assert_eq!(
326325 err. to_string( ) ,
327- format!( "record size ({record_bytes} bytes ) exceeds max_batch_bytes ({max_batch_bytes})" )
326+ format!( "record size in metered bytes ({record_bytes}) exceeds max_batch_bytes ({max_batch_bytes})" )
328327 ) ;
329328 } ) ;
330329
0 commit comments