@@ -31,13 +31,14 @@ use tokio::{
3131 io:: { AsyncWriteExt , BufWriter } ,
3232 task:: { JoinError , JoinSet } ,
3333} ;
34- use tracing:: { error, info} ;
34+ use tracing:: { error, info, warn } ;
3535
3636use lading_payload:: { self , block} ;
3737
3838use super :: General ;
3939use crate :: generator:: common:: {
40- BlockThrottle , MetricsBuilder , ThrottleConfig , ThrottleConversionError , create_throttle,
40+ BlockThrottle , MetricsBuilder , ThrottleConfig , ThrottleConversionError , ThrottleMode ,
41+ create_throttle,
4142} ;
4243
4344#[ derive( thiserror:: Error , Debug ) ]
@@ -121,6 +122,14 @@ pub struct Config {
121122 rotate : bool ,
122123 /// The load throttle configuration
123124 pub throttle : Option < ThrottleConfig > ,
125+ /// Force flush every n blocks. This generator uses a BufWriter with capacity
126+ /// based on the throttle's maximum capacity. So when the block cache outputs data roughly
127+ /// equal to the throttle's max rate, the BufWriter will flush roughly every second.
128+ /// However when blocks are small relative to the maximum possible rate, the BufWriter
129+ /// will flush less frequently. This setting allows you to force a flush after writing to N blocks
130+ /// to create a more consistent flush interval.
131+ #[ serde( default ) ]
132+ pub flush_every_n_blocks : Option < NonZeroU32 > ,
124133}
125134
126135#[ derive( Debug ) ]
@@ -200,6 +209,7 @@ impl Server {
200209 file_index : Arc :: clone ( & file_index) ,
201210 rotate : config. rotate ,
202211 shutdown : shutdown. clone ( ) ,
212+ flush_every_n_blocks : config. flush_every_n_blocks ,
203213 } ;
204214
205215 handles. spawn ( child. spin ( ) ) ;
@@ -275,6 +285,7 @@ struct Child {
275285 rotate : bool ,
276286 file_index : Arc < AtomicU32 > ,
277287 shutdown : lading_signal:: Watcher ,
288+ flush_every_n_blocks : Option < NonZeroU32 > ,
278289}
279290
280291impl Child {
@@ -288,7 +299,16 @@ impl Child {
288299
289300 let mut handle = self . block_cache . handle ( ) ;
290301 // Setting write buffer capacity (per second) approximately equal to the throttle's maximum capacity
291- // (converted to bytes if necessary) to approximate flush every second.
302+ // (converted to bytes if necessary) to approximate flush every second (ASSUMING that throttler will write
303+ // bytes at the throttle's maximum byte rate. When using a block throttler with blocks smaller than
304+ // the throttle's maximum block size, this will flush less frequently and thus it's reccomended to use
305+ // the flush_every_n_blocks setting).
306+ if self . throttle . mode == ThrottleMode :: Blocks && self . flush_every_n_blocks . is_none ( ) {
307+ warn ! (
308+ "BufWriter flush frequency can be inconsistent when using block-based throttling -
309+ consider setting flush_every_n_blocks in your generator config"
310+ ) ;
311+ }
292312 let buffer_capacity = self
293313 . throttle
294314 . maximum_capacity_bytes ( self . maximum_block_size ) ;
@@ -312,6 +332,7 @@ impl Child {
312332 ) ;
313333 let shutdown_wait = self . shutdown . recv ( ) ;
314334 tokio:: pin!( shutdown_wait) ;
335+ let mut blocks_since_flush = 0 ;
315336 loop {
316337 tokio:: select! {
317338 result = self . throttle. wait_for_block( & self . block_cache, & handle) => {
@@ -324,10 +345,12 @@ impl Child {
324345 fp. write_all( & block. bytes) . await ?;
325346 counter!( "bytes_written" ) . increment( total_bytes) ;
326347 total_bytes_written += total_bytes;
348+ blocks_since_flush += 1 ;
327349 }
328350
329351 if total_bytes_written > maximum_bytes_per_file {
330352 fp. flush( ) . await ?;
353+ blocks_since_flush = 0 ;
331354 if self . rotate {
332355 // Delete file, leaving any open file handlers intact. This
333356 // includes our own `fp` for the time being.
@@ -355,6 +378,11 @@ impl Child {
355378 } ) ?,
356379 ) ;
357380 total_bytes_written = 0 ;
381+ } else if let Some ( flush_every_n_blocks) = self . flush_every_n_blocks {
382+ if blocks_since_flush == u32 :: from( flush_every_n_blocks. get( ) ) {
383+ fp. flush( ) . await ?;
384+ blocks_since_flush = 0 ;
385+ }
358386 }
359387 }
360388 Err ( err) => {
0 commit comments