Skip to content

Commit 9dd389a

Browse files
authored
Review load-test configs (#19)
* fix(lt): dynamic engine load-test * chore(lt): update oneshot stress test config * feat(lt): review loadtest configs
1 parent d6ff2e5 commit 9dd389a

20 files changed

+625
-49
lines changed

crates/nodes/src/audio/codecs/opus.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -512,11 +512,6 @@ impl ProcessorNode for OpusEncoderNode {
512512

513513
// Only clone if padding is needed, otherwise use slice directly
514514
let encode_result = if samples.len() < expected_samples {
515-
tracing::debug!(
516-
"Padding frame from {} to {} samples with silence",
517-
samples.len(),
518-
expected_samples
519-
);
520515
let mut padded = samples.as_ref().to_vec();
521516
padded.resize(expected_samples, 0.0);
522517
enc.encode_float(&padded, &mut encode_buffer)

crates/nodes/src/containers/ogg.rs

Lines changed: 11 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -208,12 +208,13 @@ impl ProcessorNode for OggMuxerNode {
208208
if let Packet::Binary { data, metadata, .. } = packet {
209209
packet_count += 1;
210210
stats_tracker.received();
211-
212-
tracing::debug!(
213-
"OggMuxer received packet #{}, {} bytes",
214-
packet_count,
215-
data.len()
216-
);
211+
if packet_count.is_multiple_of(1000) {
212+
tracing::debug!(
213+
"OggMuxer processed {} packets (last packet: {} bytes)",
214+
packet_count,
215+
data.len()
216+
);
217+
}
217218

218219
// Force every packet to end a page for maximum streaming behavior.
219220
// This allows chunk_size to work as expected by ensuring
@@ -229,21 +230,10 @@ impl ProcessorNode for OggMuxerNode {
229230
if let Some(timestamp_us) = meta.timestamp_us {
230231
// Convert timestamp from microseconds to 48kHz samples
231232
last_granule_pos = (timestamp_us * 48000) / 1_000_000;
232-
tracing::debug!(
233-
"Using metadata timestamp: {}us -> granule_pos: {}",
234-
timestamp_us,
235-
last_granule_pos
236-
);
237233
} else if let Some(duration_us) = meta.duration_us {
238234
// If we don't have timestamp but have duration, accumulate
239235
let samples = (duration_us * 48000) / 1_000_000;
240236
last_granule_pos += samples;
241-
tracing::debug!(
242-
"Using metadata duration: {}us ({} samples) -> granule_pos: {}",
243-
duration_us,
244-
samples,
245-
last_granule_pos
246-
);
247237
} else {
248238
// Fallback: assume 960 samples (20ms at 48kHz)
249239
last_granule_pos = 960 * packet_count;
@@ -253,11 +243,6 @@ impl ProcessorNode for OggMuxerNode {
253243
last_granule_pos = 960 * packet_count;
254244
}
255245

256-
tracing::debug!(
257-
"About to write packet #{} to OGG writer (granule: {})",
258-
packet_count,
259-
last_granule_pos
260-
);
261246
if let Err(e) = writer.write_packet(
262247
data.to_vec(),
263248
self.config.stream_serial,
@@ -270,7 +255,6 @@ impl ProcessorNode for OggMuxerNode {
270255
state_helpers::emit_failed(&context.state_tx, &node_name, &err_msg);
271256
return Err(StreamKitError::Runtime(err_msg));
272257
}
273-
tracing::debug!("Packet #{} written to OGG writer successfully", packet_count);
274258

275259
// Flush any bytes accumulated by the Ogg writer immediately to maximize streaming.
276260
// This avoids buffering large chunks in memory and delivers data as soon as pages are ready.
@@ -288,7 +272,6 @@ impl ProcessorNode for OggMuxerNode {
288272
};
289273

290274
if let Some(data) = data_to_send {
291-
tracing::trace!("Flushing {} bytes to output", data.len());
292275
if context
293276
.output_sender
294277
.send(
@@ -468,7 +451,7 @@ impl ProcessorNode for OggDemuxerNode {
468451
});
469452

470453
// Process packets from the async reader
471-
let mut packets_extracted = 0;
454+
let mut packets_extracted = 0u64;
472455
let mut last_granule_pos: Option<u64> = None;
473456
let mut packets_at_granule_pos = 0u64;
474457
let mut detected_frame_duration_us: Option<u64> = None;
@@ -494,6 +477,9 @@ impl ProcessorNode for OggDemuxerNode {
494477
Ok(packet) => {
495478
packets_extracted += 1;
496479
stats_tracker.received();
480+
if packets_extracted.is_multiple_of(1000) {
481+
tracing::debug!("OggDemuxer extracted {} packets", packets_extracted);
482+
}
497483

498484
// Extract granule position for timing metadata
499485
let granule_pos = packet.absgp_page();
@@ -559,14 +545,6 @@ impl ProcessorNode for OggDemuxerNode {
559545
None
560546
};
561547

562-
tracing::debug!(
563-
"Extracted Ogg packet {} with {} bytes (granule_pos: {}, metadata: {:?})",
564-
packets_extracted,
565-
packet.data.len(),
566-
granule_pos,
567-
metadata
568-
);
569-
570548
// Send the packet data to the output with timing metadata
571549
let output_packet = Packet::Binary {
572550
data: Bytes::from(packet.data),
@@ -817,7 +795,6 @@ impl ProcessorNode for SymphoniaOggDemuxerNode {
817795
let data_tx = data_tx;
818796
while let Some(packet) = input_rx.recv().await {
819797
if let Packet::Binary { data, .. } = packet {
820-
tracing::debug!("Forwarding {} bytes to Symphonia reader", data.len());
821798
if data_tx.send(data).await.is_err() {
822799
break;
823800
}

docs/astro.config.mjs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ export default defineConfig({
5757
items: [
5858
{ label: 'Creating Pipelines', slug: 'guides/creating-pipelines' },
5959
{ label: 'Performance Tuning', slug: 'guides/performance' },
60+
{ label: 'Load Testing', slug: 'guides/load-testing' },
6061
{ label: 'Observability', slug: 'guides/observability' },
6162
{ label: 'Script Node', slug: 'guides/script-node' },
6263
{ label: 'Using the Web UI', slug: 'guides/web-ui' },
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
---
2+
# SPDX-FileCopyrightText: © 2025 StreamKit Contributors
3+
# SPDX-License-Identifier: MPL-2.0
4+
title: Load Testing
5+
description: Run targeted stress tests and capture profiles for StreamKit
6+
---
7+
8+
StreamKit ships with a small load-test runner (`skit-cli loadtest`) plus a set of ready-made configs under `samples/loadtest/`.
9+
10+
Use these when you want to:
11+
12+
- Reproduce and profile a specific hotspot (codec, muxing, mixing, control plane, session lifecycle).
13+
- Compare changes across runs with consistent inputs.
14+
- Stress a single subsystem without the UI in the loop.
15+
16+
## Prerequisites
17+
18+
- Start the server: `just skit serve`
19+
- Load tests use the client binary: `just skit-cli -- lt <config>`
20+
- Some presets require a local MoQ relay at `http://localhost:4443`
21+
22+
## Running Presets
23+
24+
All configs live in `samples/loadtest/`. You can run them via a single `just` target:
25+
26+
- `just lt <id>` runs `samples/loadtest/<id>.toml`
27+
- `just lt <path>` runs an explicit TOML path
28+
29+
Examples:
30+
31+
- `just lt stress-oneshot`
32+
- `just lt oneshot-opus-transcode-fast`
33+
- `just lt dynamic-tune-heavy --cleanup`
34+
35+
### Oneshot (HTTP batch pipelines)
36+
37+
- `just lt stress-oneshot` — default oneshot stress preset
38+
- `just lt oneshot-http-passthrough` — multipart upload + oneshot engine overhead (minimal node CPU)
39+
- `just lt oneshot-graph-chain` — graph wiring + channel hops (passthrough chain)
40+
- `just lt oneshot-opus-transcode-fast` — codec-heavy (Ogg demux + Opus decode/encode), no pacer
41+
42+
### Dynamic (long-lived sessions)
43+
44+
- `just lt stress-dynamic` — default dynamic stress preset
45+
- `just lt dynamic-scale-audio-gain` — many sessions, sustained decode, low tune rate
46+
- `just lt dynamic-tune-heavy` — stresses control plane param updates (frequent tuning, many `audio::gain` nodes)
47+
- `just lt dynamic-moq-fanout` — MoQ fanout (requires relay at `http://localhost:4443`)
48+
49+
## Capturing CPU Profiles
50+
51+
The easiest workflow is:
52+
53+
1. Run a profiling build of the server: `just skit-profiling serve`
54+
2. Run a load test preset in another terminal (examples above)
55+
3. Fetch profiles:
56+
- Top view: `just profile-top 30`
57+
- Web UI: `just profile-web 30`
58+
59+
`profile-*` commands require Go (`go tool pprof`).
60+
61+
## What Each Preset Targets
62+
63+
### `lt-oneshot-http-passthrough`
64+
65+
Targets request handling and oneshot overhead:
66+
67+
- Multipart parsing and streaming input
68+
- Pipeline compilation/validation
69+
- Graph wiring/spawn + channel plumbing
70+
71+
### `lt-oneshot-opus-transcode-fast`
72+
73+
Targets codec throughput:
74+
75+
- `containers::ogg::demuxer` (parsing)
76+
- `audio::opus::{decoder,encoder}`
77+
78+
This intentionally runs “as fast as possible” (no pacer), so it’s useful for CPU profiling and throughput regressions.
79+
80+
### `lt-dynamic-tune-heavy`
81+
82+
Targets control-plane churn:
83+
84+
- Session creation churn (up to `dynamic.session_count`)
85+
- Control WebSocket tuning rate (`dynamic.tune_interval_ms`)
86+
- Parameter updates to many `audio::gain` nodes
87+
88+
### `lt-dynamic-moq-fanout`
89+
90+
Targets MoQ transport + data plane in dynamic sessions:
91+
92+
- One broadcaster session publishes to `input`
93+
- Many subscriber sessions subscribe/transcode/publish
94+
95+
## Writing Your Own Config
96+
97+
Configs are TOML and validated by `skit-cli` before running:
98+
99+
- Pick a scenario: `test.scenario = "oneshot" | "dynamic" | "mixed"`
100+
- Point to a pipeline YAML for that scenario
101+
- Adjust `oneshot.concurrency` or `dynamic.session_count`
102+
103+
See `apps/skit-cli/src/load_test/config.rs` for the full schema.
104+
105+
## Tips
106+
107+
- For profiling, keep logging quiet (e.g. `RUST_LOG=warn`) to avoid measuring log formatting instead of pipeline CPU.
108+
- For dynamic tests, use `--cleanup` when you want sessions deleted at the end: `just lt-dynamic-cleanup`.
109+
- Prefer small input files for high-throughput profiling (e.g. `samples/audio/system/speech_2m.opus`) and larger files for sustained steady-state load.

justfile

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,25 +99,43 @@ skit-cli *args='':
9999
skit-lt config='loadtest.toml' *args='':
100100
@cargo run -p streamkit-client --bin skit-cli -- loadtest {{config}} {{args}}
101101

102-
# Alias for skit-lt
103-
alias lt := skit-lt
102+
# Run a load test by preset id (maps to `samples/loadtest/<id>.toml`) or by explicit path.
103+
#
104+
# Examples:
105+
# - `just lt` # runs `samples/loadtest/stress-oneshot.toml` by default
106+
# - `just lt stress-dynamic` # runs `samples/loadtest/stress-dynamic.toml`
107+
# - `just lt dynamic-tune-heavy --cleanup`
108+
# - `just lt samples/loadtest/ui-demo.toml`
109+
lt preset_or_path='stress-oneshot' *args='':
110+
@cfg=""
111+
@if [ -f "{{preset_or_path}}" ]; then \
112+
cfg="{{preset_or_path}}"; \
113+
elif [ -f "samples/loadtest/{{preset_or_path}}.toml" ]; then \
114+
cfg="samples/loadtest/{{preset_or_path}}.toml"; \
115+
else \
116+
echo "❌ Loadtest config not found: '{{preset_or_path}}'"; \
117+
echo " - If passing a preset, expected: samples/loadtest/{{preset_or_path}}.toml"; \
118+
echo " - If passing a path, ensure the file exists"; \
119+
exit 1; \
120+
fi; \
121+
just skit-lt "$cfg" {{args}}
104122

105123
# --- Load test presets ---
106124
# Run the standard oneshot stress test config
107125
lt-oneshot *args='':
108-
@just skit-lt samples/loadtest/stress-oneshot.toml {{args}}
126+
@just lt stress-oneshot {{args}}
109127

110128
# Run the standard dynamic session stress test config
111129
lt-dynamic *args='':
112-
@just skit-lt samples/loadtest/stress-dynamic.toml {{args}}
130+
@just lt stress-dynamic {{args}}
113131

114132
# Run the standard dynamic session stress test config with cleanup enabled
115133
lt-dynamic-cleanup *args='':
116-
@just skit-lt samples/loadtest/stress-dynamic.toml --cleanup {{args}}
134+
@just lt stress-dynamic --cleanup {{args}}
117135

118136
# Run the long-running UI demo config
119137
lt-ui-demo *args='':
120-
@just skit-lt samples/loadtest/ui-demo.toml {{args}}
138+
@just lt ui-demo {{args}}
121139

122140
# Run skit tests
123141
# Note: We exclude dhat-heap since it's mutually exclusive with profiling (both define global allocators)
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# Load Test Configuration: Dynamic MoQ Fanout
2+
# Requires a MoQ relay at http://localhost:4443.
3+
# Creates a broadcaster session publishing to "input", then many subscriber sessions that transcode.
4+
5+
[server]
6+
url = "http://127.0.0.1:4545"
7+
8+
[test]
9+
duration_secs = 180
10+
scenario = "dynamic"
11+
12+
[oneshot]
13+
enabled = false
14+
concurrency = 0
15+
pipeline = ""
16+
input_file = ""
17+
18+
[dynamic]
19+
enabled = true
20+
session_count = 100
21+
tune_interval_ms = 1000
22+
pipelines = [
23+
"samples/loadtest/pipelines/moq_subscriber_transcode.yml",
24+
]
25+
26+
[dynamic.broadcaster]
27+
pipeline = "samples/loadtest/pipelines/moq_broadcaster.yml"
28+
count = 1
29+
30+
[populate]
31+
load_plugins = false
32+
plugins_native = []
33+
plugins_wasm = []
34+
35+
[output]
36+
format = "text"
37+
real_time_updates = true
38+
update_interval_ms = 2000
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Load Test Configuration: Dynamic Scale (Audio Gain + Sink)
2+
# Stresses dynamic engine graph management and session lifecycle at high session counts.
3+
4+
[server]
5+
url = "http://127.0.0.1:4545"
6+
7+
[test]
8+
duration_secs = 180
9+
scenario = "dynamic"
10+
11+
[oneshot]
12+
enabled = false
13+
concurrency = 0
14+
pipeline = ""
15+
input_file = ""
16+
17+
[dynamic]
18+
enabled = true
19+
session_count = 200
20+
tune_interval_ms = 5000
21+
pipelines = [
22+
"samples/loadtest/pipelines/dynamic_audio_gain_chain.yml",
23+
]
24+
25+
[populate]
26+
load_plugins = false
27+
plugins_native = []
28+
plugins_wasm = []
29+
30+
[output]
31+
format = "text"
32+
real_time_updates = true
33+
update_interval_ms = 2000
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Load Test Configuration: Dynamic Tune Heavy (Many Gain Nodes)
2+
# Stresses the control WebSocket + node param updates under sustained tuning.
3+
4+
[server]
5+
url = "http://127.0.0.1:4545"
6+
7+
[test]
8+
duration_secs = 180
9+
scenario = "dynamic"
10+
11+
[oneshot]
12+
enabled = false
13+
concurrency = 0
14+
pipeline = ""
15+
input_file = ""
16+
17+
[dynamic]
18+
enabled = true
19+
session_count = 50
20+
tune_interval_ms = 200
21+
pipelines = [
22+
"samples/loadtest/pipelines/dynamic_many_gains.yml",
23+
]
24+
25+
[populate]
26+
load_plugins = false
27+
plugins_native = []
28+
plugins_wasm = []
29+
30+
[output]
31+
format = "text"
32+
real_time_updates = true
33+
update_interval_ms = 2000

0 commit comments

Comments
 (0)