@@ -36,7 +36,8 @@ use lading_payload::block;
3636
3737use super :: General ;
3838use crate :: generator:: common:: {
39- BytesThrottleConfig , MetricsBuilder , ThrottleConversionError , create_throttle,
39+ BlockThrottle , MetricsBuilder , ThrottleConfig , ThrottleConversionError , ThrottleMode ,
40+ create_throttle,
4041} ;
4142
4243/// An enum to allow us to determine what operation caused an IO errror as the
@@ -137,6 +138,7 @@ pub struct Config {
137138 /// Sets the [`crate::payload::Config`] of this template.
138139 pub variant : lading_payload:: Config ,
139140 /// Defines the number of bytes that written in each log file.
141+ #[ deprecated( note = "Use load_profile bytes-per-second instead" ) ]
140142 bytes_per_second : Option < Byte > ,
141143 /// Defines the maximum internal cache of this log target. `file_gen` will
142144 /// pre-build its outputs up to the byte capacity specified here.
@@ -147,8 +149,9 @@ pub struct Config {
147149 /// Whether to use a fixed or streaming block cache
148150 #[ serde( default = "lading_payload::block::default_cache_method" ) ]
149151 block_cache_method : block:: CacheMethod ,
150- /// The load throttle configuration
151- pub throttle : Option < BytesThrottleConfig > ,
152+ /// Throughput profile controlling emission rate (bytes or blocks).
153+ #[ serde( default ) ]
154+ pub load_profile : Option < ThrottleConfig > ,
152155}
153156
154157#[ derive( Debug ) ]
@@ -214,8 +217,13 @@ impl Server {
214217 let mut handles = Vec :: new ( ) ;
215218
216219 for idx in 0 ..config. concurrent_logs {
217- let throttle =
218- create_throttle ( config. throttle . as_ref ( ) , config. bytes_per_second . as_ref ( ) ) ?;
220+ let legacy_bps = {
221+ #[ allow( deprecated) ]
222+ {
223+ config. bytes_per_second . as_ref ( )
224+ }
225+ } ;
226+ let throughput_throttle = create_throttle ( config. load_profile . as_ref ( ) , legacy_bps) ?;
219227
220228 let mut dir_path = config. root . clone ( ) ;
221229 let depth = rng. random_range ( 0 ..config. max_depth ) ;
@@ -234,8 +242,9 @@ impl Server {
234242 & basename,
235243 config. total_rotations ,
236244 maximum_bytes_per_log,
245+ maximum_block_size. get ( ) ,
237246 Arc :: clone ( & block_cache) ,
238- throttle ,
247+ throughput_throttle ,
239248 shutdown. clone ( ) ,
240249 child_labels,
241250 ) ;
@@ -283,8 +292,9 @@ struct Child {
283292 names : Vec < PathBuf > ,
284293 // The soft limit bytes per file that will trigger a rotation.
285294 maximum_bytes_per_log : NonZeroU32 ,
295+ maximum_block_size : u32 ,
286296 block_cache : Arc < block:: Cache > ,
287- throttle : lading_throttle :: Throttle ,
297+ throttle : BlockThrottle ,
288298 shutdown : lading_signal:: Watcher ,
289299 labels : Vec < ( String , String ) > ,
290300}
@@ -295,8 +305,9 @@ impl Child {
295305 basename : & Path ,
296306 total_rotations : u8 ,
297307 maximum_bytes_per_log : NonZeroU32 ,
308+ maximum_block_size : u32 ,
298309 block_cache : Arc < block:: Cache > ,
299- throttle : lading_throttle :: Throttle ,
310+ throttle : BlockThrottle ,
300311 shutdown : lading_signal:: Watcher ,
301312 labels : Vec < ( String , String ) > ,
302313 ) -> Self {
@@ -316,6 +327,7 @@ impl Child {
316327 Self {
317328 names,
318329 maximum_bytes_per_log,
330+ maximum_block_size,
319331 block_cache,
320332 throttle,
321333 shutdown,
@@ -324,7 +336,11 @@ impl Child {
324336 }
325337
326338 async fn spin ( mut self ) -> Result < ( ) , Error > {
327- let buffer_capacity = self . throttle . maximum_capacity ( ) as usize ;
339+ let mut handle = self . block_cache . handle ( ) ;
340+ let buffer_capacity = match self . throttle . mode {
341+ ThrottleMode :: Bytes => self . throttle . maximum_capacity ( ) as usize ,
342+ ThrottleMode :: Blocks => self . maximum_block_size as usize ,
343+ } ;
328344 let mut total_bytes_written: u64 = 0 ;
329345 let maximum_bytes_per_log: u64 = u64:: from ( self . maximum_bytes_per_log . get ( ) ) ;
330346
@@ -357,21 +373,16 @@ impl Child {
357373 } ) ?,
358374 ) ;
359375
360- let mut handle = self . block_cache . handle ( ) ;
361-
362376 let shutdown_wait = self . shutdown . recv ( ) ;
363377 tokio:: pin!( shutdown_wait) ;
364378 loop {
365379 // SAFETY: By construction the block cache will never be empty
366380 // except in the event of a catastrophic failure.
367- let total_bytes = self . block_cache . peek_next_size ( & handle) ;
368-
369381 tokio:: select! {
370- result = self . throttle. wait_for ( total_bytes ) => {
382+ result = self . throttle. wait_for_block ( & self . block_cache , & handle ) => {
371383 match result {
372384 Ok ( ( ) ) => {
373- let block = self . block_cache. advance( & mut handle) ;
374- write_bytes( block,
385+ write_bytes( self . block_cache. advance( & mut handle) ,
375386 & mut fp,
376387 & mut total_bytes_written,
377388 buffer_capacity,
0 commit comments