@@ -36,7 +36,7 @@ use lading_payload::block;
3636
3737use super :: General ;
3838use crate :: generator:: common:: {
39- BytesThrottleConfig , MetricsBuilder , ThrottleConversionError , create_throttle,
39+ BlockThrottle , MetricsBuilder , ThrottleConfig , ThrottleConversionError , create_throttle,
4040} ;
4141
4242/// An enum to allow us to determine what operation caused an IO errror as the
@@ -147,8 +147,9 @@ pub struct Config {
147147 /// Whether to use a fixed or streaming block cache
148148 #[ serde( default = "lading_payload::block::default_cache_method" ) ]
149149 block_cache_method : block:: CacheMethod ,
150- /// The load throttle configuration
151- pub throttle : Option < BytesThrottleConfig > ,
150+ /// Throughput profile controlling emission rate (bytes or blocks).
151+ #[ serde( default ) ]
152+ pub throttle : Option < ThrottleConfig > ,
152153}
153154
154155#[ derive( Debug ) ]
@@ -214,7 +215,7 @@ impl Server {
214215 let mut handles = Vec :: new ( ) ;
215216
216217 for idx in 0 ..config. concurrent_logs {
217- let throttle =
218+ let throughput_throttle =
218219 create_throttle ( config. throttle . as_ref ( ) , config. bytes_per_second . as_ref ( ) ) ?;
219220
220221 let mut dir_path = config. root . clone ( ) ;
@@ -234,8 +235,9 @@ impl Server {
234235 & basename,
235236 config. total_rotations ,
236237 maximum_bytes_per_log,
238+ maximum_block_size. get ( ) ,
237239 Arc :: clone ( & block_cache) ,
238- throttle ,
240+ throughput_throttle ,
239241 shutdown. clone ( ) ,
240242 child_labels,
241243 ) ;
@@ -283,8 +285,9 @@ struct Child {
283285 names : Vec < PathBuf > ,
284286 // The soft limit bytes per file that will trigger a rotation.
285287 maximum_bytes_per_log : NonZeroU32 ,
288+ maximum_block_size : u32 ,
286289 block_cache : Arc < block:: Cache > ,
287- throttle : lading_throttle :: Throttle ,
290+ throttle : BlockThrottle ,
288291 shutdown : lading_signal:: Watcher ,
289292 labels : Vec < ( String , String ) > ,
290293}
@@ -295,8 +298,9 @@ impl Child {
295298 basename : & Path ,
296299 total_rotations : u8 ,
297300 maximum_bytes_per_log : NonZeroU32 ,
301+ maximum_block_size : u32 ,
298302 block_cache : Arc < block:: Cache > ,
299- throttle : lading_throttle :: Throttle ,
303+ throttle : BlockThrottle ,
300304 shutdown : lading_signal:: Watcher ,
301305 labels : Vec < ( String , String ) > ,
302306 ) -> Self {
@@ -316,6 +320,7 @@ impl Child {
316320 Self {
317321 names,
318322 maximum_bytes_per_log,
323+ maximum_block_size,
319324 block_cache,
320325 throttle,
321326 shutdown,
@@ -324,7 +329,10 @@ impl Child {
324329 }
325330
326331 async fn spin ( mut self ) -> Result < ( ) , Error > {
327- let buffer_capacity = self . throttle . maximum_capacity ( ) as usize ;
332+ let mut handle = self . block_cache . handle ( ) ;
333+ let buffer_capacity = self
334+ . throttle
335+ . maximum_capacity_bytes ( self . maximum_block_size ) ;
328336 let mut total_bytes_written: u64 = 0 ;
329337 let maximum_bytes_per_log: u64 = u64:: from ( self . maximum_bytes_per_log . get ( ) ) ;
330338
@@ -357,21 +365,16 @@ impl Child {
357365 } ) ?,
358366 ) ;
359367
360- let mut handle = self . block_cache . handle ( ) ;
361-
362368 let shutdown_wait = self . shutdown . recv ( ) ;
363369 tokio:: pin!( shutdown_wait) ;
364370 loop {
365371 // SAFETY: By construction the block cache will never be empty
366372 // except in the event of a catastrophic failure.
367- let total_bytes = self . block_cache . peek_next_size ( & handle) ;
368-
369373 tokio:: select! {
370- result = self . throttle. wait_for ( total_bytes ) => {
374+ result = self . throttle. wait_for_block ( & self . block_cache , & handle ) => {
371375 match result {
372376 Ok ( ( ) ) => {
373- let block = self . block_cache. advance( & mut handle) ;
374- write_bytes( block,
377+ write_bytes( self . block_cache. advance( & mut handle) ,
375378 & mut fp,
376379 & mut total_bytes_written,
377380 buffer_capacity,
0 commit comments