Skip to content

Commit 10ba2f9

Browse files
committed
blocks
1 parent 7a819ad commit 10ba2f9

File tree

15 files changed

+1301
-232
lines changed

15 files changed

+1301
-232
lines changed

lading/src/generator/common.rs

Lines changed: 740 additions & 93 deletions
Large diffs are not rendered by default.

lading/src/generator/file_gen/logrotate.rs

Lines changed: 31 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ use lading_payload::block;
3636

3737
use super::General;
3838
use crate::generator::common::{
39-
BytesThrottleConfig, MetricsBuilder, ThrottleConversionError, create_throttle,
39+
BlockThrottle, MetricsBuilder, ThrottleConfig, ThrottleConversionError, ThrottleMode,
40+
create_throttle,
4041
};
4142

4243
/// An enum to allow us to determine what operation caused an IO errror as the
@@ -137,6 +138,7 @@ pub struct Config {
137138
/// Sets the [`crate::payload::Config`] of this template.
138139
pub variant: lading_payload::Config,
139140
/// Defines the number of bytes that written in each log file.
141+
#[deprecated(note = "Use load_profile bytes-per-second instead")]
140142
bytes_per_second: Option<Byte>,
141143
/// Defines the maximum internal cache of this log target. `file_gen` will
142144
/// pre-build its outputs up to the byte capacity specified here.
@@ -147,8 +149,9 @@ pub struct Config {
147149
/// Whether to use a fixed or streaming block cache
148150
#[serde(default = "lading_payload::block::default_cache_method")]
149151
block_cache_method: block::CacheMethod,
150-
/// The load throttle configuration
151-
pub throttle: Option<BytesThrottleConfig>,
152+
/// Throughput profile controlling emission rate (bytes or blocks).
153+
#[serde(default)]
154+
pub load_profile: Option<ThrottleConfig>,
152155
}
153156

154157
#[derive(Debug)]
@@ -214,8 +217,13 @@ impl Server {
214217
let mut handles = Vec::new();
215218

216219
for idx in 0..config.concurrent_logs {
217-
let throttle =
218-
create_throttle(config.throttle.as_ref(), config.bytes_per_second.as_ref())?;
220+
let legacy_bps = {
221+
#[allow(deprecated)]
222+
{
223+
config.bytes_per_second.as_ref()
224+
}
225+
};
226+
let throughput_throttle = create_throttle(config.load_profile.as_ref(), legacy_bps)?;
219227

220228
let mut dir_path = config.root.clone();
221229
let depth = rng.random_range(0..config.max_depth);
@@ -234,8 +242,9 @@ impl Server {
234242
&basename,
235243
config.total_rotations,
236244
maximum_bytes_per_log,
245+
maximum_block_size.get(),
237246
Arc::clone(&block_cache),
238-
throttle,
247+
throughput_throttle,
239248
shutdown.clone(),
240249
child_labels,
241250
);
@@ -283,8 +292,9 @@ struct Child {
283292
names: Vec<PathBuf>,
284293
// The soft limit bytes per file that will trigger a rotation.
285294
maximum_bytes_per_log: NonZeroU32,
295+
maximum_block_size: u32,
286296
block_cache: Arc<block::Cache>,
287-
throttle: lading_throttle::Throttle,
297+
throttle: BlockThrottle,
288298
shutdown: lading_signal::Watcher,
289299
labels: Vec<(String, String)>,
290300
}
@@ -295,8 +305,9 @@ impl Child {
295305
basename: &Path,
296306
total_rotations: u8,
297307
maximum_bytes_per_log: NonZeroU32,
308+
maximum_block_size: u32,
298309
block_cache: Arc<block::Cache>,
299-
throttle: lading_throttle::Throttle,
310+
throttle: BlockThrottle,
300311
shutdown: lading_signal::Watcher,
301312
labels: Vec<(String, String)>,
302313
) -> Self {
@@ -316,6 +327,7 @@ impl Child {
316327
Self {
317328
names,
318329
maximum_bytes_per_log,
330+
maximum_block_size,
319331
block_cache,
320332
throttle,
321333
shutdown,
@@ -324,7 +336,11 @@ impl Child {
324336
}
325337

326338
async fn spin(mut self) -> Result<(), Error> {
327-
let buffer_capacity = self.throttle.maximum_capacity() as usize;
339+
let mut handle = self.block_cache.handle();
340+
let buffer_capacity = match self.throttle.mode {
341+
ThrottleMode::Bytes => self.throttle.maximum_capacity() as usize,
342+
ThrottleMode::Blocks => self.maximum_block_size as usize,
343+
};
328344
let mut total_bytes_written: u64 = 0;
329345
let maximum_bytes_per_log: u64 = u64::from(self.maximum_bytes_per_log.get());
330346

@@ -357,21 +373,16 @@ impl Child {
357373
})?,
358374
);
359375

360-
let mut handle = self.block_cache.handle();
361-
362376
let shutdown_wait = self.shutdown.recv();
363377
tokio::pin!(shutdown_wait);
364378
loop {
365379
// SAFETY: By construction the block cache will never be empty
366380
// except in the event of a catastrophic failure.
367-
let total_bytes = self.block_cache.peek_next_size(&handle);
368-
369381
tokio::select! {
370-
result = self.throttle.wait_for(total_bytes) => {
382+
result = self.throttle.wait_for_block(&self.block_cache, &handle) => {
371383
match result {
372384
Ok(()) => {
373-
let block = self.block_cache.advance(&mut handle);
374-
write_bytes(block,
385+
write_bytes(self.block_cache.advance(&mut handle),
375386
&mut fp,
376387
&mut total_bytes_written,
377388
buffer_capacity,
@@ -382,7 +393,10 @@ impl Child {
382393
&self.labels).await?;
383394
}
384395
Err(err) => {
385-
error!("Throttle request of {} is larger than throttle capacity. Block will be discarded. Error: {}", total_bytes, err);
396+
let total_bytes = self.block_cache.peek_next_size(&handle);
397+
error!(
398+
"Throttle request for block size {total_bytes} failed. Block will be discarded. Error: {err}"
399+
);
386400
}
387401
}
388402
}

0 commit comments

Comments
 (0)