Skip to content

Commit e50f540

Browse files
authored
fix: stabilize load tests and monitoring (#23)
1 parent befa722 commit e50f540

File tree

11 files changed

+239
-25
lines changed

11 files changed

+239
-25
lines changed

apps/skit-cli/src/load_test/mod.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use crate::load_test::scenarios::{run_dynamic_scenario, run_mixed_scenario, run_
2929
pub async fn run_load_test(
3030
config_path: &str,
3131
server_override: Option<String>,
32+
sessions_override: Option<usize>,
3233
duration_override: Option<u64>,
3334
cleanup: bool,
3435
) -> Result<()> {
@@ -43,13 +44,29 @@ pub async fn run_load_test(
4344
if let Some(duration) = duration_override {
4445
config.test.duration_secs = duration;
4546
}
47+
if let Some(sessions) = sessions_override {
48+
if sessions == 0 {
49+
anyhow::bail!("--sessions must be > 0");
50+
}
51+
match config.test.scenario {
52+
Scenario::Dynamic | Scenario::Mixed => {
53+
config.dynamic.session_count = sessions;
54+
},
55+
Scenario::OneShot => {
56+
warn!("Ignoring --sessions because test.scenario=oneshot");
57+
},
58+
}
59+
}
4660

4761
config.validate()?;
4862

4963
info!("Load test configuration loaded from: {}", config_path);
5064
info!("Server: {}", config.server.url);
5165
info!("Scenario: {:?}", config.test.scenario);
5266
info!("Duration: {}s", config.test.duration_secs);
67+
if matches!(config.test.scenario, Scenario::Dynamic | Scenario::Mixed) {
68+
info!("Dynamic sessions: {}", config.dynamic.session_count);
69+
}
5370

5471
// Set up graceful shutdown handler
5572
let shutdown_token = tokio_util::sync::CancellationToken::new();

apps/skit-cli/src/main.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@ enum Commands {
8484
/// Override server URL from config
8585
#[arg(long)]
8686
server: Option<String>,
87+
/// Override dynamic.session_count from config
88+
///
89+
/// Useful for quickly scaling down presets like `stress-dynamic` on laptops.
90+
#[arg(long)]
91+
sessions: Option<usize>,
8792
/// Override test duration (seconds)
8893
#[arg(short, long)]
8994
duration: Option<u64>,
@@ -382,7 +387,7 @@ async fn main() {
382387
std::process::exit(1);
383388
}
384389
},
385-
Commands::LoadTest { config_path, config, server, duration, cleanup } => {
390+
Commands::LoadTest { config_path, config, server, sessions, duration, cleanup } => {
386391
info!("Starting StreamKit load test");
387392

388393
let config = match (config_path, config) {
@@ -397,7 +402,7 @@ async fn main() {
397402
};
398403

399404
if let Err(e) =
400-
streamkit_client::run_load_test(&config, server, duration, cleanup).await
405+
streamkit_client::run_load_test(&config, server, sessions, duration, cleanup).await
401406
{
402407
// Error already logged via tracing above
403408
error!(error = %e, "Load test failed");

apps/skit-cli/src/shell.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -636,7 +636,7 @@ impl Shell {
636636
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
637637
if args.is_empty() {
638638
eprintln!(
639-
"Usage: loadtest <config.toml> [--server <url>] [--duration <seconds>] [--cleanup]"
639+
"Usage: loadtest <config.toml> [--server <url>] [--sessions <n>] [--duration <seconds>] [--cleanup]"
640640
);
641641
eprintln!("Example: loadtest samples/loadtest/stress-moq-peer.toml --duration 30");
642642
return Ok(());
@@ -654,6 +654,7 @@ impl Shell {
654654
}
655655

656656
let mut server_override = None;
657+
let mut sessions_override = None;
657658
let mut duration_override = None;
658659
let mut cleanup = false;
659660

@@ -684,6 +685,20 @@ impl Shell {
684685
return Ok(());
685686
}
686687
},
688+
"--sessions" => {
689+
if i + 1 < args.len() {
690+
if let Ok(sessions) = args[i + 1].parse::<usize>() {
691+
sessions_override = Some(sessions);
692+
i += 2;
693+
} else {
694+
eprintln!("--sessions requires a numeric value");
695+
return Ok(());
696+
}
697+
} else {
698+
eprintln!("--sessions requires a value");
699+
return Ok(());
700+
}
701+
},
687702
"--cleanup" => {
688703
cleanup = true;
689704
i += 1;
@@ -701,6 +716,7 @@ impl Shell {
701716
match crate::load_test::run_load_test(
702717
config_path,
703718
server_override,
719+
sessions_override,
704720
duration_override,
705721
cleanup,
706722
)

crates/engine/src/dynamic_actor.rs

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -303,8 +303,22 @@ impl DynamicEngine {
303303

304304
// Broadcast to all subscribers
305305
self.state_subscribers.retain(|subscriber| {
306-
// If send fails, the subscriber has disconnected, so we remove it
307-
subscriber.try_send(update.clone()).is_ok()
306+
// Keep subscribers on transient backpressure (Full); remove only when Closed.
307+
//
308+
// For state updates we also try to deliver eventually: dropping a state transition
309+
// (e.g. Running -> Recovering) can leave clients showing a stale "healthy" status.
310+
match subscriber.try_send(update.clone()) {
311+
Ok(()) => true,
312+
Err(mpsc::error::TrySendError::Full(_)) => {
313+
let subscriber = subscriber.clone();
314+
let update = update.clone();
315+
tokio::spawn(async move {
316+
let _ = subscriber.send(update).await;
317+
});
318+
true
319+
},
320+
Err(mpsc::error::TrySendError::Closed(_)) => false,
321+
}
308322
});
309323
}
310324

@@ -362,8 +376,13 @@ impl DynamicEngine {
362376

363377
// Broadcast to all subscribers
364378
self.stats_subscribers.retain(|subscriber| {
365-
// If send fails, the subscriber has disconnected, so we remove it
366-
subscriber.try_send(update.clone()).is_ok()
379+
// Keep subscribers on transient backpressure (Full); remove only when Closed.
380+
//
381+
// Stats are high-frequency, best-effort updates; dropping an update is acceptable.
382+
match subscriber.try_send(update.clone()) {
383+
Ok(()) | Err(mpsc::error::TrySendError::Full(_)) => true,
384+
Err(mpsc::error::TrySendError::Closed(_)) => false,
385+
}
367386
});
368387
}
369388

@@ -463,10 +482,12 @@ impl DynamicEngine {
463482
};
464483

465484
// 5. Spawn Node
466-
let task_handle =
467-
tokio::spawn(node.run(context).instrument(
468-
tracing::info_span!("node_run", node.name = %node_id, node.kind = %kind),
469-
));
485+
let task_handle = tokio::spawn(node.run(context).instrument(tracing::info_span!(
486+
"node_run",
487+
session.id = %self.session_id.as_deref().unwrap_or("<unknown>"),
488+
node.name = %node_id,
489+
node.kind = %kind
490+
)));
470491
self.live_nodes
471492
.insert(node_id.to_string(), graph_builder::LiveNode { control_tx, task_handle });
472493
self.nodes_active_gauge.record(self.live_nodes.len() as u64, &[]);

crates/nodes/src/transport/moq/pull.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -541,6 +541,8 @@ impl MoqPullNode {
541541
let mut current_group: Option<moq_lite::GroupConsumer> = None;
542542

543543
let mut session_packet_count: u32 = 0;
544+
let mut consecutive_cancels: u32 = 0;
545+
let mut last_payload_at = tokio::time::Instant::now();
544546

545547
// Stats tracking
546548
let node_name = context.output_sender.node_name().to_string();
@@ -601,6 +603,8 @@ impl MoqPullNode {
601603

602604
match read_result {
603605
Ok(Some(first_payload)) => {
606+
consecutive_cancels = 0;
607+
last_payload_at = tokio::time::Instant::now();
604608
// Batching is disabled by default (batch_ms=0).
605609
if self.config.batch_ms > 0 {
606610
let mut batch = Vec::with_capacity(context.batch_size);
@@ -719,12 +723,30 @@ impl MoqPullNode {
719723
return Ok(StreamEndReason::Natural);
720724
},
721725
Err(moq_lite::Error::Cancel) => {
726+
// moq_lite cancels groups when the producer advances and drops old groups.
727+
// This is expected with our "latest group" semantics under load: skip to the
728+
// next group rather than tearing down the entire WebTransport connection.
729+
consecutive_cancels = consecutive_cancels.saturating_add(1);
722730
tracing::debug!(
723731
session_packet_count,
724732
total_packet_count = *total_packet_count,
725-
"Track read cancelled"
733+
consecutive_cancels,
734+
"Track read cancelled (skipping to next group)"
726735
);
727-
return Ok(StreamEndReason::Reconnect);
736+
737+
// Safety valve: if we see cancels for too long with no payloads, reconnect.
738+
if last_payload_at.elapsed() > Duration::from_secs(5)
739+
&& consecutive_cancels >= 50
740+
{
741+
tracing::warn!(
742+
session_packet_count,
743+
total_packet_count = *total_packet_count,
744+
consecutive_cancels,
745+
elapsed_ms = last_payload_at.elapsed().as_millis(),
746+
"Excessive track cancels without payloads; reconnecting"
747+
);
748+
return Ok(StreamEndReason::Reconnect);
749+
}
728750
},
729751
Err(e) => {
730752
tracing::error!(error = %e, session_packet_count, "Error reading from track");

docs/src/content/docs/guides/load-testing.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ Examples:
3030

3131
- `just lt stress-oneshot`
3232
- `just lt oneshot-opus-transcode-fast`
33+
- `just lt stress-dynamic sessions=10` (or `just lt stress-dynamic --sessions 10`)
3334
- `just lt dynamic-tune-heavy --cleanup`
3435

3536
### Oneshot (HTTP batch pipelines)

justfile

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ skit-lt config='loadtest.toml' *args='':
104104
# Examples:
105105
# - `just lt` # runs `samples/loadtest/stress-oneshot.toml` by default
106106
# - `just lt stress-dynamic` # runs `samples/loadtest/stress-dynamic.toml`
107+
# - `just lt stress-dynamic sessions=10` # shorthand for `--sessions 10`
108+
# - `just lt stress-dynamic --sessions 10`
107109
# - `just lt dynamic-tune-heavy --cleanup`
108110
# - `just lt samples/loadtest/ui-demo.toml`
109111
lt preset_or_path='stress-oneshot' *args='':
@@ -118,7 +120,21 @@ lt preset_or_path='stress-oneshot' *args='':
118120
echo " - If passing a path, ensure the file exists"; \
119121
exit 1; \
120122
fi; \
121-
just skit-lt "$cfg" {{args}}
123+
sessions=""; \
124+
set -- {{args}}; \
125+
if [ $# -ge 1 ]; then \
126+
case "$1" in \
127+
sessions=*) sessions="${1#sessions=}"; shift;; \
128+
[0-9]*) sessions="$1"; shift;; \
129+
esac; \
130+
fi; \
131+
if [ -n "$sessions" ]; then \
132+
case "$sessions" in \
133+
''|*[!0-9]*) echo "❌ sessions must be an integer (got: '$sessions')"; exit 1;; \
134+
esac; \
135+
set -- --sessions "$sessions" "$@"; \
136+
fi; \
137+
just skit-lt "$cfg" "$@"
122138

123139
# --- Load test presets ---
124140
# Run the standard oneshot stress test config

ui/src/components/NodeStateIndicator.tsx

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -413,15 +413,22 @@ export const NodeStateIndicator: React.FC<NodeStateIndicatorProps> = ({
413413
nodeId,
414414
sessionId,
415415
}) => {
416-
// Get live stats for error badge display
416+
const [isTooltipOpen, setIsTooltipOpen] = React.useState(false);
417+
418+
// IMPORTANT: avoid subscribing to node stats while the tooltip is closed.
419+
// Stats are high-frequency and would otherwise cause constant re-renders of all node indicators.
417420
const liveStats = useSessionStore(
418421
React.useCallback(
419-
(s) => (nodeId && sessionId ? s.sessions.get(sessionId)?.nodeStats[nodeId] : undefined),
420-
[nodeId, sessionId]
422+
(s) =>
423+
isTooltipOpen && nodeId && sessionId
424+
? s.sessions.get(sessionId)?.nodeStats[nodeId]
425+
: undefined,
426+
[isTooltipOpen, nodeId, sessionId]
421427
)
422428
);
429+
423430
const stats = liveStats ?? propStats;
424-
const hasErrors = stats && stats.errored > 0;
431+
const hasErrors = isTooltipOpen && stats && stats.errored > 0;
425432

426433
const color = getStateColor(state);
427434
const label = getStateLabel(state);
@@ -439,7 +446,7 @@ export const NodeStateIndicator: React.FC<NodeStateIndicatorProps> = ({
439446
);
440447

441448
return (
442-
<SKTooltip content={content} side="top">
449+
<SKTooltip content={content} side="top" onOpenChange={setIsTooltipOpen}>
443450
<div
444451
className="nodrag"
445452
style={{ display: 'flex', alignItems: 'center', gap: 6, cursor: 'help' }}

ui/src/services/websocket.reconnection.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -325,12 +325,12 @@ describe('WebSocketService reconnection', () => {
325325
expect(session?.isConnected).toBe(true);
326326
});
327327

328-
it('should unsubscribe from session and clear it', () => {
328+
it('should unsubscribe from session and keep cached session', () => {
329329
service.subscribeToSession('session-1');
330330
service.unsubscribeFromSession('session-1');
331331

332332
const session = useSessionStore.getState().getSession('session-1');
333-
expect(session).toBeUndefined();
333+
expect(session?.isConnected).toBe(false);
334334
});
335335

336336
it('should update all subscribed sessions on connection status change', () => {

ui/src/services/websocket.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,9 @@ export class WebSocketService {
201201
}
202202

203203
private handleSessionDestroyed(payload: SessionDestroyedPayload): void {
204+
this.subscribedSessions.delete(payload.session_id);
205+
useSessionStore.getState().clearSession(payload.session_id);
206+
useNodeParamsStore.getState().resetSession(payload.session_id);
204207
useTelemetryStore.getState().clearSession(payload.session_id);
205208
}
206209

@@ -350,7 +353,9 @@ export class WebSocketService {
350353

351354
unsubscribeFromSession(sessionId: string): void {
352355
this.subscribedSessions.delete(sessionId);
353-
useSessionStore.getState().clearSession(sessionId);
356+
// Keep the session entry so the Monitor session list can display the latest known status
357+
// even when a session is not actively selected/subscribed.
358+
useSessionStore.getState().setConnected(sessionId, false);
354359
useNodeParamsStore.getState().resetSession(sessionId);
355360
}
356361

0 commit comments

Comments
 (0)