Skip to content

Commit 250223e

Browse files
feat(dfir_rs): provide DfirMetricsIntervals iterator to prevent race conditions when writing metrics (#2401)
Less confusing as `reset()` is no longer used, a la feedback #2241 (comment) Not actually a breaking change as the metrics API has not been published yet --- This PR refactors the DFIR metrics API to replace the mutable `reset()` pattern with an immutable iterator-based approach, addressing race condition concerns when writing metrics. The key change introduces `DfirMetricsIntervals`, an infinite iterator that provides snapshots of metrics for each interval. - Replaces the `reset()` method with an infinite iterator pattern via `DfirMetricsIntervals` - Restructures `DfirMetrics` to be a simpler snapshot struct instead of managing current/previous state - Adds new `metrics_intervals()` method to `Dfir` for obtaining the iterator --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent fca9826 commit 250223e

File tree

4 files changed

+280
-119
lines changed

4 files changed

+280
-119
lines changed

dfir_rs/src/scheduled/graph.rs

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use web_time::SystemTime;
2020
use super::context::Context;
2121
use super::handoff::handoff_list::PortList;
2222
use super::handoff::{Handoff, HandoffMeta, TeeingHandoff};
23-
use super::metrics::{DfirMetrics, DfirMetricsState, InstrumentSubgraph};
23+
use super::metrics::{DfirMetrics, DfirMetricsIntervals, InstrumentSubgraph};
2424
use super::port::{RECV, RecvCtx, RecvPort, SEND, SendCtx, SendPort};
2525
use super::reactor::Reactor;
2626
use super::state::StateHandle;
@@ -41,7 +41,8 @@ pub struct Dfir<'a> {
4141

4242
pub(super) handoffs: SlotVec<HandoffTag, HandoffData>,
4343

44-
metrics: Rc<DfirMetricsState>,
44+
/// Live-updating DFIR runtime metrics via interior mutability.
45+
metrics: Rc<DfirMetrics>,
4546

4647
#[cfg(feature = "meta")]
4748
/// See [`Self::meta_graph()`].
@@ -99,7 +100,7 @@ impl Dfir<'_> {
99100

100101
// Initialize handoff metrics struct.
101102
Rc::make_mut(&mut self.metrics)
102-
.handoff_metrics
103+
.handoffs
103104
.insert(new_hoff_id, Default::default());
104105

105106
let output_port = RecvPort {
@@ -303,7 +304,7 @@ impl<'a> Dfir<'a> {
303304
// times before the next subgraph consumes the output; don't double count those.
304305
// (*) - usually... always true for Hydro-generated DFIR at least.
305306
for &handoff_id in sg_data.preds.iter() {
306-
let handoff_metrics = &self.metrics.handoff_metrics[handoff_id];
307+
let handoff_metrics = &self.metrics.handoffs[handoff_id];
307308
let handoff_data = &mut self.handoffs[handoff_id];
308309
let handoff_len = handoff_data.handoff.len();
309310
handoff_metrics
@@ -413,7 +414,7 @@ impl<'a> Dfir<'a> {
413414
tracing::info!("Running subgraph.");
414415
sg_data.last_tick_run_in = Some(self.context.current_tick);
415416

416-
let sg_metrics = &self.metrics.subgraph_metrics[sg_id];
417+
let sg_metrics = &self.metrics.subgraphs[sg_id];
417418
let sg_fut =
418419
Box::into_pin(sg_data.subgraph.run(&mut self.context, &mut self.handoffs));
419420
// Update subgraph metrics.
@@ -450,7 +451,7 @@ impl<'a> Dfir<'a> {
450451
}
451452
}
452453
}
453-
let handoff_metrics = &self.metrics.handoff_metrics[handoff_id];
454+
let handoff_metrics = &self.metrics.handoffs[handoff_id];
454455
handoff_metrics.curr_items_count.set(handoff_len);
455456
}
456457

@@ -876,7 +877,7 @@ impl<'a> Dfir<'a> {
876877

877878
// Initialize subgraph metrics struct.
878879
Rc::make_mut(&mut self.metrics)
879-
.subgraph_metrics
880+
.subgraphs
880881
.insert(sg_id, Default::default());
881882

882883
sg_id
@@ -979,7 +980,7 @@ impl<'a> Dfir<'a> {
979980

980981
// Initialize subgraph metrics struct.
981982
Rc::make_mut(&mut self.metrics)
982-
.subgraph_metrics
983+
.subgraphs
983984
.insert(sg_id, Default::default());
984985

985986
sg_id
@@ -999,7 +1000,7 @@ impl<'a> Dfir<'a> {
9991000

10001001
// Initialize handoff metrics struct.
10011002
Rc::make_mut(&mut self.metrics)
1002-
.handoff_metrics
1003+
.handoffs
10031004
.insert(handoff_id, Default::default());
10041005

10051006
// Make ports.
@@ -1063,10 +1064,21 @@ impl<'a> Dfir<'a> {
10631064
loop_id
10641065
}
10651066

1066-
/// Returns DFIR runtime metrics accumulated since runtime creation.
1067-
pub fn metrics(&self) -> DfirMetrics {
1068-
DfirMetrics {
1069-
curr: Rc::clone(&self.metrics),
1067+
/// Returns a referenced-counted handle to the continually-updated runtime metrics for this DFIR instance.
1068+
pub fn metrics(&self) -> Rc<DfirMetrics> {
1069+
Rc::clone(&self.metrics)
1070+
}
1071+
1072+
/// Returns an infinite iterator of [`DfirMetrics`], where each call to `next()` ends an interval.
1073+
///
1074+
/// The first call to [`DfirMetricsIntervals::next`] returns metrics since this DFIR instance was created. Each
1075+
/// subsequent call to [`DfirMetricsIntervals::next`] returns metrics since the previous call.
1076+
///
1077+
/// Cloning the iterator "forks" it from the original, as afterwards each iterator will return different metrics
1078+
/// based on when [`DfirMetricsIntervals::next`] is called.
1079+
pub fn metrics_intervals(&self) -> DfirMetricsIntervals {
1080+
DfirMetricsIntervals {
1081+
curr: self.metrics(),
10701082
prev: None,
10711083
}
10721084
}

0 commit comments

Comments
 (0)