Skip to content

Commit a636e4c

Browse files
committed
blocks
1 parent af8d207 commit a636e4c

File tree

15 files changed

+943
-241
lines changed

15 files changed

+943
-241
lines changed

lading/src/generator/common.rs

Lines changed: 565 additions & 107 deletions
Large diffs are not rendered by default.

lading/src/generator/file_gen/logrotate.rs

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ use lading_payload::block;
3636

3737
use super::General;
3838
use crate::generator::common::{
39-
BytesThrottleConfig, MetricsBuilder, ThrottleConversionError, create_throttle,
39+
BlockThrottle, MetricsBuilder, ThrottleConfig, ThrottleConversionError, create_throttle,
4040
};
4141

4242
/// An enum to allow us to determine what operation caused an IO errror as the
@@ -147,8 +147,9 @@ pub struct Config {
147147
/// Whether to use a fixed or streaming block cache
148148
#[serde(default = "lading_payload::block::default_cache_method")]
149149
block_cache_method: block::CacheMethod,
150-
/// The load throttle configuration
151-
pub throttle: Option<BytesThrottleConfig>,
150+
/// Throughput profile controlling emission rate (bytes or blocks).
151+
#[serde(default)]
152+
pub throttle: Option<ThrottleConfig>,
152153
}
153154

154155
#[derive(Debug)]
@@ -214,7 +215,7 @@ impl Server {
214215
let mut handles = Vec::new();
215216

216217
for idx in 0..config.concurrent_logs {
217-
let throttle =
218+
let throughput_throttle =
218219
create_throttle(config.throttle.as_ref(), config.bytes_per_second.as_ref())?;
219220

220221
let mut dir_path = config.root.clone();
@@ -234,8 +235,9 @@ impl Server {
234235
&basename,
235236
config.total_rotations,
236237
maximum_bytes_per_log,
238+
maximum_block_size.get(),
237239
Arc::clone(&block_cache),
238-
throttle,
240+
throughput_throttle,
239241
shutdown.clone(),
240242
child_labels,
241243
);
@@ -283,8 +285,9 @@ struct Child {
283285
names: Vec<PathBuf>,
284286
// The soft limit bytes per file that will trigger a rotation.
285287
maximum_bytes_per_log: NonZeroU32,
288+
maximum_block_size: u32,
286289
block_cache: Arc<block::Cache>,
287-
throttle: lading_throttle::Throttle,
290+
throttle: BlockThrottle,
288291
shutdown: lading_signal::Watcher,
289292
labels: Vec<(String, String)>,
290293
}
@@ -295,8 +298,9 @@ impl Child {
295298
basename: &Path,
296299
total_rotations: u8,
297300
maximum_bytes_per_log: NonZeroU32,
301+
maximum_block_size: u32,
298302
block_cache: Arc<block::Cache>,
299-
throttle: lading_throttle::Throttle,
303+
throttle: BlockThrottle,
300304
shutdown: lading_signal::Watcher,
301305
labels: Vec<(String, String)>,
302306
) -> Self {
@@ -316,6 +320,7 @@ impl Child {
316320
Self {
317321
names,
318322
maximum_bytes_per_log,
323+
maximum_block_size,
319324
block_cache,
320325
throttle,
321326
shutdown,
@@ -324,7 +329,10 @@ impl Child {
324329
}
325330

326331
async fn spin(mut self) -> Result<(), Error> {
327-
let buffer_capacity = self.throttle.maximum_capacity() as usize;
332+
let mut handle = self.block_cache.handle();
333+
let buffer_capacity = self
334+
.throttle
335+
.maximum_capacity_bytes(self.maximum_block_size);
328336
let mut total_bytes_written: u64 = 0;
329337
let maximum_bytes_per_log: u64 = u64::from(self.maximum_bytes_per_log.get());
330338

@@ -357,21 +365,16 @@ impl Child {
357365
})?,
358366
);
359367

360-
let mut handle = self.block_cache.handle();
361-
362368
let shutdown_wait = self.shutdown.recv();
363369
tokio::pin!(shutdown_wait);
364370
loop {
365371
// SAFETY: By construction the block cache will never be empty
366372
// except in the event of a catastrophic failure.
367-
let total_bytes = self.block_cache.peek_next_size(&handle);
368-
369373
tokio::select! {
370-
result = self.throttle.wait_for(total_bytes) => {
374+
result = self.throttle.wait_for_block(&self.block_cache, &handle) => {
371375
match result {
372376
Ok(()) => {
373-
let block = self.block_cache.advance(&mut handle);
374-
write_bytes(block,
377+
write_bytes(self.block_cache.advance(&mut handle),
375378
&mut fp,
376379
&mut total_bytes_written,
377380
buffer_capacity,

lading/src/generator/file_gen/logrotate_fs.rs

Lines changed: 142 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#![allow(clippy::cast_possible_wrap)]
66

77
use crate::generator;
8+
use crate::generator::common::{RateSpec, ThrottleConversionError, ThrottleMode};
89
use fuser::{
910
BackgroundSession, FileAttr, Filesystem, MountOption, ReplyAttr, ReplyData, ReplyDirectory,
1011
ReplyEntry, Request, spawn_mount2,
@@ -55,37 +56,149 @@ pub struct Config {
5556
maximum_block_size: byte_unit::Byte,
5657
/// The mount-point for this filesystem
5758
mount_point: PathBuf,
58-
/// The load profile, controlling bytes per second as a function of time.
59+
/// The load profile, controlling bytes or blocks per second as a function of time.
5960
load_profile: LoadProfile,
6061
}
6162

6263
/// Profile for load in this filesystem.
6364
#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq)]
6465
#[serde(rename_all = "snake_case")]
6566
pub enum LoadProfile {
66-
/// Constant bytes per second
67-
Constant(byte_unit::Byte),
68-
/// Linear growth of bytes per second
67+
/// Constant rate (bytes or blocks per second).
68+
Constant(RateSpec),
69+
/// Linear growth of rate (bytes or blocks per second).
6970
Linear {
70-
/// Starting point for bytes per second
71-
initial_bytes_per_second: byte_unit::Byte,
72-
/// Amount to increase per second
73-
rate: byte_unit::Byte,
71+
/// Starting point for the rate.
72+
#[serde(alias = "initial_bytes_per_second")]
73+
initial: RateSpec,
74+
/// Amount to increase per second.
75+
rate: RateSpec,
7476
},
7577
}
7678

7779
impl LoadProfile {
78-
fn to_model(self) -> model::LoadProfile {
80+
fn to_model(self) -> Result<model::LoadProfile, ThrottleConversionError> {
7981
// For now, one tick is one second.
8082
match self {
81-
LoadProfile::Constant(bpt) => model::LoadProfile::Constant(bpt.as_u128() as u64),
82-
LoadProfile::Linear {
83-
initial_bytes_per_second,
84-
rate,
85-
} => model::LoadProfile::Linear {
86-
start: initial_bytes_per_second.as_u128() as u64,
87-
rate: rate.as_u128() as u64,
88-
},
83+
LoadProfile::Constant(rate) => {
84+
let (mode, cap) = resolve_rate(&rate)?;
85+
match mode {
86+
ThrottleMode::Bytes => Ok(model::LoadProfile::Constant(u64::from(cap.get()))),
87+
ThrottleMode::Blocks => Ok(model::LoadProfile::Blocks {
88+
blocks_per_tick: u64::from(cap.get()),
89+
}),
90+
}
91+
}
92+
LoadProfile::Linear { initial, rate } => {
93+
let (m1, init) = resolve_rate(&initial)?;
94+
let (m2, rate) = resolve_rate(&rate)?;
95+
if m1 != m2 {
96+
return Err(ThrottleConversionError::MixedModes);
97+
}
98+
match m1 {
99+
ThrottleMode::Bytes => Ok(model::LoadProfile::Linear {
100+
start: u64::from(init.get()),
101+
rate: u64::from(rate.get()),
102+
}),
103+
ThrottleMode::Blocks => Ok(model::LoadProfile::BlocksLinear {
104+
start: u64::from(init.get()),
105+
rate: u64::from(rate.get()),
106+
}),
107+
}
108+
}
109+
}
110+
}
111+
}
112+
113+
fn resolve_rate(rate: &RateSpec) -> Result<(ThrottleMode, NonZeroU32), ThrottleConversionError> {
114+
match rate {
115+
RateSpec::Bytes { bytes_per_second } => {
116+
let val = bytes_per_second.as_u128();
117+
let val = u32::try_from(val)
118+
.map_err(|_| ThrottleConversionError::ValueTooLarge(*bytes_per_second))?;
119+
NonZeroU32::new(val)
120+
.map(|n| (ThrottleMode::Bytes, n))
121+
.ok_or(ThrottleConversionError::Zero)
122+
}
123+
RateSpec::Blocks { blocks_per_second } => Ok((ThrottleMode::Blocks, *blocks_per_second)),
124+
}
125+
}
126+
127+
#[cfg(test)]
128+
mod tests {
129+
use super::{LoadProfile, RateSpec};
130+
use serde::Deserialize;
131+
132+
#[derive(Debug, Deserialize)]
133+
#[serde(deny_unknown_fields)]
134+
struct Wrapper {
135+
load_profile: LoadProfile,
136+
}
137+
138+
/// Helper to deserialize Wrapper using singleton_map_recursive
139+
/// (matches how the main config deserializes nested enums)
140+
fn parse_wrapper(yaml: &str) -> Wrapper {
141+
let value: serde_yaml::Value = serde_yaml::from_str(yaml).unwrap();
142+
serde_yaml::with::singleton_map_recursive::deserialize(value).unwrap()
143+
}
144+
145+
#[test]
146+
fn load_profile_constant_with_direct_byte_value() {
147+
// Tuple variant allows direct RateSpec values including legacy "5 MiB" format
148+
let yaml = r#"
149+
load_profile:
150+
constant: "5 MiB"
151+
"#;
152+
let w = parse_wrapper(yaml);
153+
assert!(matches!(
154+
w.load_profile,
155+
LoadProfile::Constant(RateSpec::Bytes { .. })
156+
));
157+
}
158+
159+
#[test]
160+
fn load_profile_linear_using_aliases() {
161+
// Tests legacy field name aliases: initial_bytes_per_second -> initial, rate -> rate
162+
let yaml = r#"
163+
load_profile:
164+
linear:
165+
initial_bytes_per_second: "1 MiB"
166+
rate: "100 KiB"
167+
"#;
168+
let w = parse_wrapper(yaml);
169+
assert!(matches!(w.load_profile, LoadProfile::Linear { .. }));
170+
if let LoadProfile::Linear { initial, rate } = w.load_profile {
171+
assert!(matches!(initial, RateSpec::Bytes { .. }));
172+
assert!(matches!(rate, RateSpec::Bytes { .. }));
173+
}
174+
}
175+
176+
#[test]
177+
fn load_profile_new_constant_rate_spec_is_supported() {
178+
let yaml = r#"
179+
load_profile:
180+
constant:
181+
blocks_per_second: 100
182+
"#;
183+
let w = parse_wrapper(yaml);
184+
assert!(matches!(w.load_profile, LoadProfile::Constant(_)));
185+
}
186+
187+
#[test]
188+
fn load_profile_new_linear_blocks_is_supported() {
189+
let yaml = r#"
190+
load_profile:
191+
linear:
192+
initial:
193+
blocks_per_second: 100
194+
rate:
195+
blocks_per_second: 10
196+
"#;
197+
let w = parse_wrapper(yaml);
198+
assert!(matches!(w.load_profile, LoadProfile::Linear { .. }));
199+
if let LoadProfile::Linear { initial, rate } = w.load_profile {
200+
assert!(matches!(initial, RateSpec::Blocks { .. }));
201+
assert!(matches!(rate, RateSpec::Blocks { .. }));
89202
}
90203
}
91204
}
@@ -99,6 +212,9 @@ pub enum Error {
99212
/// Creation of payload blocks failed.
100213
#[error("Block creation error: {0}")]
101214
Block(#[from] block::Error),
215+
/// Throttle conversion error
216+
#[error("Throttle configuration error: {0}")]
217+
ThrottleConversion(#[from] ThrottleConversionError),
102218
/// Failed to convert, value is 0
103219
#[error("Value provided must not be zero")]
104220
Zero,
@@ -154,10 +270,18 @@ impl Server {
154270
// divvy this up in the future.
155271
total_bytes.get() as usize,
156272
)?;
273+
let load_profile = config.load_profile.to_model()?;
157274

158275
let start_time = Instant::now();
159276
let start_time_system = SystemTime::now();
160277

278+
let block_cache_size = block_cache.total_size();
279+
info!(
280+
"LogrotateFS block cache initialized: requested={}, actual={} bytes, blocks={}",
281+
config.maximum_prebuild_cache_size_bytes,
282+
block_cache_size,
283+
block_cache.len()
284+
);
161285
let state = model::State::new(
162286
&mut rng,
163287
start_time.elapsed().as_secs(),
@@ -166,7 +290,7 @@ impl Server {
166290
block_cache,
167291
config.max_depth,
168292
config.concurrent_logs,
169-
config.load_profile.to_model(),
293+
load_profile,
170294
);
171295

172296
info!(

0 commit comments

Comments
 (0)