@@ -122,6 +122,14 @@ pub struct Config {
122122 rotate : bool ,
123123 /// The load throttle configuration
124124 pub throttle : Option < ThrottleConfig > ,
125+ /// Force flush every n blocks. This generator uses a BufWriter with capacity
126+ /// based on the throttle's maximum capacity. So when the block cache outputs data roughly
127+ /// equal to the throttle's max rate, the BufWriter will flush roughly every second.
128+ /// However when blocks are small relative to the maximum possible rate, the BufWriter
129+ /// will flush less frequently. This setting allows you to force a flush after writing to N blocks
130+ /// to create a more consistent flush interval.
131+ #[ serde( default ) ]
132+ pub flush_every_n_blocks : Option < NonZeroU32 > ,
125133}
126134
127135#[ derive( Debug ) ]
@@ -201,6 +209,7 @@ impl Server {
201209 file_index : Arc :: clone ( & file_index) ,
202210 rotate : config. rotate ,
203211 shutdown : shutdown. clone ( ) ,
212+ flush_every_n_blocks : config. flush_every_n_blocks ,
204213 } ;
205214
206215 handles. spawn ( child. spin ( ) ) ;
@@ -276,6 +285,7 @@ struct Child {
276285 rotate : bool ,
277286 file_index : Arc < AtomicU32 > ,
278287 shutdown : lading_signal:: Watcher ,
288+ flush_every_n_blocks : Option < NonZeroU32 > ,
279289}
280290
281291impl Child {
@@ -289,7 +299,10 @@ impl Child {
289299
290300 let mut handle = self . block_cache . handle ( ) ;
291301 // Setting write buffer capacity (per second) approximately equal to the throttle's maximum capacity
292- // (converted to bytes if necessary) to approximate flush every second.
302+ // (converted to bytes if necessary) to approximate flush every second (ASSUMING that you are writing
303+ // bytes at the throttle's maximum byte rate. When using a block throttler with blocks smaller than
304+ // the throttle's maximum block size, this will flush less frequently and thus it's reccomended to use
305+ // the flush_every_n_blocks setting).
293306 let buffer_capacity = match self . throttle . mode {
294307 ThrottleMode :: Bytes => self . throttle . maximum_capacity ( ) as usize ,
295308 ThrottleMode :: Blocks => usize:: try_from (
@@ -319,6 +332,7 @@ impl Child {
319332 ) ;
320333 let shutdown_wait = self . shutdown . recv ( ) ;
321334 tokio:: pin!( shutdown_wait) ;
335+ let mut blocks_since_flush = 0 ;
322336 loop {
323337 tokio:: select! {
324338 result = self . throttle. wait_for_block( & self . block_cache, & handle) => {
@@ -331,10 +345,12 @@ impl Child {
331345 fp. write_all( & block. bytes) . await ?;
332346 counter!( "bytes_written" ) . increment( total_bytes) ;
333347 total_bytes_written += total_bytes;
348+ blocks_since_flush += 1 ;
334349 }
335350
336351 if total_bytes_written > maximum_bytes_per_file {
337352 fp. flush( ) . await ?;
353+ blocks_since_flush = 0 ;
338354 if self . rotate {
339355 // Delete file, leaving any open file handlers intact. This
340356 // includes our own `fp` for the time being.
@@ -362,6 +378,11 @@ impl Child {
362378 } ) ?,
363379 ) ;
364380 total_bytes_written = 0 ;
381+ } else if let Some ( flush_every_n_blocks) = self . flush_every_n_blocks {
382+ if blocks_since_flush == u32 :: from( flush_every_n_blocks. get( ) ) {
383+ fp. flush( ) . await ?;
384+ blocks_since_flush = 0 ;
385+ }
365386 }
366387 }
367388 Err ( err) => {
0 commit comments