@@ -8,6 +8,7 @@ use std::{
88} ;
99
1010use futures:: StreamExt ;
11+ use s2_common:: types:: ValidationError ;
1112use tokio:: {
1213 sync:: { OwnedSemaphorePermit , Semaphore , mpsc, oneshot} ,
1314 time:: Instant ,
@@ -26,6 +27,8 @@ use crate::{
2627 } ,
2728} ;
2829
30+ const ONE_MIB : u32 = 1024 * 1024 ;
31+
2932#[ derive( Debug , thiserror:: Error ) ]
3033pub enum AppendSessionError {
3134 #[ error( transparent) ]
@@ -87,18 +90,22 @@ impl Future for BatchSubmitTicket {
8790pub struct AppendSessionConfig {
8891 /// Limit on total metered bytes of unacknowledged [AppendInput]s held in memory.
8992 ///
93+ /// **Note:** It must be at least `1MiB`.
94+ ///
9095 /// Defaults to `10MiB`.
9196 pub max_inflight_bytes : u32 ,
9297 /// Limit on number of unacknowledged [AppendInput]s held in memory.
9398 ///
94- /// Defaults to `None`.
99+ /// **Note:** It must be at least `1`.
100+ ///
101+ /// Defaults to no limit.
95102 pub max_inflight_batches : Option < u32 > ,
96103}
97104
98105impl Default for AppendSessionConfig {
99106 fn default ( ) -> Self {
100107 Self {
101- max_inflight_bytes : 10 * 1024 * 1024 ,
108+ max_inflight_bytes : 10 * ONE_MIB ,
102109 max_inflight_batches : None ,
103110 }
104111 }
@@ -111,19 +118,28 @@ impl AppendSessionConfig {
111118 }
112119
113120 /// Set the limit on total metered bytes of unacknowledged [AppendInput]s held in memory.
114- pub fn with_max_inflight_bytes ( self , max_inflight_bytes : u32 ) -> Self {
115- Self {
121+ pub fn with_max_inflight_bytes ( self , max_inflight_bytes : u32 ) -> Result < Self , ValidationError > {
122+ if max_inflight_bytes < ONE_MIB {
123+ return Err ( format ! ( "max_inflight_bytes must be at least {ONE_MIB}" ) . into ( ) ) ;
124+ }
125+ Ok ( Self {
116126 max_inflight_bytes,
117127 ..self
118- }
128+ } )
119129 }
120130
121131 /// Set the limit on number of unacknowledged [AppendInput]s held in memory.
122- pub fn with_max_inflight_batches ( self , max_inflight_batches : u32 ) -> Self {
123- Self {
132+ pub fn with_max_inflight_batches (
133+ self ,
134+ max_inflight_batches : u32 ,
135+ ) -> Result < Self , ValidationError > {
136+ if max_inflight_batches < 1 {
137+ return Err ( "max_inflight_batches must be at least 1" . into ( ) ) ;
138+ }
139+ Ok ( Self {
124140 max_inflight_batches : Some ( max_inflight_batches) ,
125141 ..self
126- }
142+ } )
127143 }
128144}
129145
0 commit comments