Skip to content

Commit b7c2392

Browse files
authored
Update kstat sampler futures when a target is updated (#8915)
- When a target is added to the `KstatSampler`, we also add a future that resolves when we need to generate a sample from that target. Prior to this, when a target was updated or removed, that future was never touched! That means we could be polling for samples for removed targets or that we might have multiple futures for the same target. This ensures that we always update or remove the futures, along with the targets themselves. - Fixes #8889
1 parent 29c5508 commit b7c2392

File tree

2 files changed

+162
-14
lines changed

2 files changed

+162
-14
lines changed

oximeter/instruments/src/kstat/link.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -740,4 +740,76 @@ mod tests {
740740
// kept the final samples.
741741
assert!(count.value() > 4096);
742742
}
743+
744+
// Regression for https://github.com/oxidecomputer/omicron/issues/8889
745+
#[tokio::test]
746+
async fn updating_target_changes_existing_sampling_interval() {
747+
let log = test_logger();
748+
let sampler = KstatSampler::new(&log).unwrap();
749+
let link = TestEtherstub::new();
750+
info!(log, "created test etherstub"; "name" => &link.name);
751+
let target = SledDataLinkTarget {
752+
rack_id: RACK_ID,
753+
sled_id: SLED_ID,
754+
sled_serial: SLED_SERIAL.into(),
755+
link_name: link.name.clone().into(),
756+
kind: KIND.into(),
757+
sled_model: SLED_MODEL.into(),
758+
sled_revision: SLED_REVISION,
759+
zone_name: ZONE_NAME.into(),
760+
};
761+
let dl = SledDataLink::new(target.clone(), true);
762+
let collection_interval = Duration::from_millis(10);
763+
let details = CollectionDetails::never(collection_interval);
764+
let id = sampler.add_target(dl.clone(), details).await.unwrap();
765+
766+
// Update the target.
767+
let new_duration = Duration::from_millis(15);
768+
sampler
769+
.update_target(dl, CollectionDetails::never(new_duration))
770+
.await
771+
.unwrap();
772+
773+
// Get the futures that the sampler knows about and ensure the value has
774+
// been updated.
775+
let futs = sampler.future_details().await;
776+
assert_eq!(futs.len(), 1, "should have updated the only target");
777+
assert_eq!(
778+
futs[0],
779+
(id, new_duration),
780+
"failed to correctly update target"
781+
);
782+
}
783+
784+
#[tokio::test]
785+
async fn no_futures_to_await_after_removing_target() {
786+
let log = test_logger();
787+
let sampler = KstatSampler::new(&log).unwrap();
788+
let link = TestEtherstub::new();
789+
info!(log, "created test etherstub"; "name" => &link.name);
790+
let target = SledDataLinkTarget {
791+
rack_id: RACK_ID,
792+
sled_id: SLED_ID,
793+
sled_serial: SLED_SERIAL.into(),
794+
link_name: link.name.clone().into(),
795+
kind: KIND.into(),
796+
sled_model: SLED_MODEL.into(),
797+
sled_revision: SLED_REVISION,
798+
zone_name: ZONE_NAME.into(),
799+
};
800+
let dl = SledDataLink::new(target.clone(), true);
801+
let collection_interval = Duration::from_millis(100);
802+
let details = CollectionDetails::never(collection_interval);
803+
let id = sampler.add_target(dl.clone(), details).await.unwrap();
804+
805+
// And remove right away.
806+
sampler.remove_target(id).await.unwrap();
807+
808+
// And ensure there are zero actual futures
809+
let futs = sampler.future_details().await;
810+
assert!(
811+
futs.is_empty(),
812+
"should have zero futures to poll after removing target"
813+
);
814+
}
743815
}

oximeter/instruments/src/kstat/sampler.rs

Lines changed: 90 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,10 @@ enum Request {
187187
CreationTimes {
188188
reply_tx: oneshot::Sender<BTreeMap<KstatPath, DateTime<Utc>>>,
189189
},
190+
/// Return the list of IDs and intervals in the set of futures the sampler
191+
/// is tracking.
192+
#[cfg(all(test, target_os = "illumos"))]
193+
FutureDetails { reply_tx: oneshot::Sender<Vec<(TargetId, Duration)>> },
190194
}
191195

192196
/// Data about a single kstat target.
@@ -267,6 +271,17 @@ impl core::future::Future for YieldIdAfter {
267271
}
268272
}
269273

274+
// The operation we want to take on a future in our set, after handling an inbox
275+
// message.
276+
enum Operation {
277+
// We want to add a new future.
278+
Add(YieldIdAfter),
279+
// Remove a future with the existing ID.
280+
Remove(TargetId),
281+
// We want to update an existing future.
282+
Update((TargetId, Duration)),
283+
}
284+
270285
/// An owned type used to keep track of the creation time for each kstat in
271286
/// which interest has been signaled.
272287
#[derive(Clone, Debug, Eq, Ord, PartialEq, PartialOrd)]
@@ -345,6 +360,9 @@ struct KstatSamplerWorker {
345360
/// at construction time. In that case, we'll try again the next time we
346361
/// need it.
347362
self_stats: Option<self_stats::SelfStats>,
363+
364+
/// The futures that resolve when it's time to sample the next target.
365+
sample_timeouts: FuturesUnordered<YieldIdAfter>,
348366
}
349367

350368
fn hostname() -> Option<String> {
@@ -358,7 +376,7 @@ fn hostname() -> Option<String> {
358376

359377
/// Stores the number of samples taken, used for testing.
360378
#[cfg(all(test, target_os = "illumos"))]
361-
#[derive(Clone, Copy, Debug)]
379+
#[derive(Clone, Copy, Debug, Default)]
362380
pub(crate) struct SampleCounts {
363381
pub total: usize,
364382
pub overflow: usize,
@@ -393,6 +411,7 @@ impl KstatSamplerWorker {
393411
sample_limit,
394412
self_stat_queue,
395413
self_stats,
414+
sample_timeouts: FuturesUnordered::new(),
396415
})
397416
}
398417

@@ -405,7 +424,6 @@ impl KstatSamplerWorker {
405424
#[cfg(all(test, target_os = "illumos"))]
406425
sample_count_tx: mpsc::UnboundedSender<SampleCounts>,
407426
) {
408-
let mut sample_timeouts = FuturesUnordered::new();
409427
let mut creation_prune_interval =
410428
interval(CREATION_TIME_PRUNE_INTERVAL);
411429
creation_prune_interval.tick().await; // Completes immediately.
@@ -420,7 +438,7 @@ impl KstatSamplerWorker {
420438
);
421439
}
422440
}
423-
maybe_id = sample_timeouts.next(), if !sample_timeouts.is_empty() => {
441+
maybe_id = self.sample_timeouts.next(), if !self.sample_timeouts.is_empty() => {
424442
let Some((id, interval)) = maybe_id else {
425443
unreachable!();
426444
};
@@ -430,7 +448,7 @@ impl KstatSamplerWorker {
430448
#[cfg(all(test, target_os = "illumos"))]
431449
&sample_count_tx,
432450
) {
433-
sample_timeouts.push(next_timeout);
451+
self.sample_timeouts.push(next_timeout);
434452
}
435453
}
436454
maybe_request = self.inbox.recv() => {
@@ -443,19 +461,53 @@ impl KstatSamplerWorker {
443461
"received request on inbox";
444462
"request" => ?request,
445463
);
446-
if let Some(next_timeout) = self.handle_inbox_request(request) {
447-
sample_timeouts.push(next_timeout);
464+
if let Some(next_op) = self.handle_inbox_request(request) {
465+
self.update_sample_timeouts(next_op);
466+
}
467+
}
468+
}
469+
}
470+
}
471+
472+
fn update_sample_timeouts(&mut self, next_op: Operation) {
473+
match next_op {
474+
Operation::Add(fut) => self.sample_timeouts.push(fut),
475+
Operation::Remove(id) => {
476+
// Swap out all futures, and then filter out the one we're now
477+
// removing.
478+
let old = std::mem::take(&mut self.sample_timeouts);
479+
self.sample_timeouts
480+
.extend(old.into_iter().filter(|fut| fut.id != id));
481+
}
482+
Operation::Update((new_id, new_interval)) => {
483+
// Update just the one future, if it exists, or insert one.
484+
//
485+
// NOTE: we update the _interval_, not the sleep object itself,
486+
// which means this won't take effect until the next tick.
487+
match self
488+
.sample_timeouts
489+
.iter_mut()
490+
.find(|fut| fut.id == new_id)
491+
{
492+
Some(old) => old.interval = new_interval,
493+
None => {
494+
warn!(
495+
&self.log,
496+
"attempting to update the samping future \
497+
for a target, but no active future found \
498+
in the set, it will be added directly";
499+
"id" => %&new_id,
500+
);
501+
self.sample_timeouts
502+
.push(YieldIdAfter::new(new_id, new_interval));
448503
}
449504
}
450505
}
451506
}
452507
}
453508

454509
// Handle a message on the worker's inbox.
455-
fn handle_inbox_request(
456-
&mut self,
457-
request: Request,
458-
) -> Option<YieldIdAfter> {
510+
fn handle_inbox_request(&mut self, request: Request) -> Option<Operation> {
459511
match request {
460512
Request::AddTarget { target, details, reply_tx } => {
461513
match self.add_target(target, details) {
@@ -475,7 +527,10 @@ impl KstatSamplerWorker {
475527
"error" => ?e,
476528
),
477529
}
478-
Some(YieldIdAfter::new(id, details.interval))
530+
Some(Operation::Add(YieldIdAfter::new(
531+
id,
532+
details.interval,
533+
)))
479534
}
480535
Err(e) => {
481536
error!(
@@ -513,7 +568,7 @@ impl KstatSamplerWorker {
513568
"error" => ?e,
514569
),
515570
}
516-
Some(YieldIdAfter::new(id, details.interval))
571+
Some(Operation::Update((id, details.interval)))
517572
}
518573
Err(e) => {
519574
error!(
@@ -534,7 +589,7 @@ impl KstatSamplerWorker {
534589
}
535590
}
536591
Request::RemoveTarget { id, reply_tx } => {
537-
self.targets.remove(&id);
592+
let do_remove = self.targets.remove(&id).is_some();
538593
if let Some(remaining_samples) =
539594
self.samples.lock().unwrap().remove(&id)
540595
{
@@ -555,7 +610,7 @@ impl KstatSamplerWorker {
555610
"error" => ?e,
556611
),
557612
}
558-
None
613+
if do_remove { Some(Operation::Remove(id)) } else { None }
559614
}
560615
Request::TargetStatus { id, reply_tx } => {
561616
trace!(
@@ -594,6 +649,18 @@ impl KstatSamplerWorker {
594649
debug!(self.log, "sent reply for creation times");
595650
None
596651
}
652+
#[cfg(all(test, target_os = "illumos"))]
653+
Request::FutureDetails { reply_tx } => {
654+
debug!(self.log, "request for future details");
655+
let details = self
656+
.sample_timeouts
657+
.iter()
658+
.map(|fut| (fut.id, fut.interval))
659+
.collect();
660+
reply_tx.send(details).unwrap();
661+
debug!(self.log, "sent reply for future details");
662+
None
663+
}
597664
}
598665
}
599666

@@ -1296,6 +1363,15 @@ impl KstatSampler {
12961363
self.outbox.send(request).await.map_err(|_| Error::SendError).unwrap();
12971364
reply_rx.await.map_err(|_| Error::RecvError).unwrap()
12981365
}
1366+
1367+
/// Return the IDs and sampling intervals for all futures in the sampler.
1368+
#[cfg(all(test, target_os = "illumos"))]
1369+
pub(crate) async fn future_details(&self) -> Vec<(TargetId, Duration)> {
1370+
let (reply_tx, reply_rx) = oneshot::channel();
1371+
let request = Request::FutureDetails { reply_tx };
1372+
self.outbox.send(request).await.map_err(|_| Error::SendError).unwrap();
1373+
reply_rx.await.map_err(|_| Error::RecvError).unwrap()
1374+
}
12991375
}
13001376

13011377
impl oximeter::Producer for KstatSampler {

0 commit comments

Comments
 (0)