Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 19 additions & 16 deletions bin/propolis-server/src/lib/initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use std::time::{SystemTime, UNIX_EPOCH};
use crate::serial::Serial;
use crate::spec::{self, Spec, StorageBackend, StorageDevice};
use crate::stats::{
track_network_interface_kstats, track_vcpu_kstats, VirtualDiskProducer,
VirtualMachine,
track_network_interface_kstats, track_vcpu_kstats, BlockMetrics,
VirtualDisk, VirtualMachine,
};
use crate::vm::{
BlockBackendMap, CrucibleBackendMap, DeviceMap, NetworkInterfaceIds,
Expand Down Expand Up @@ -658,7 +658,8 @@ impl MachineInitializer<'_> {
let vioblk = virtio::PciVirtioBlock::new(0x100);

self.devices.insert(device_id.clone(), vioblk.clone());
block::attach(vioblk.clone(), backend).unwrap();
block::attach(&vioblk.block_attach, backend.attachment())
.unwrap();
chipset.pci_attach(bdf, vioblk.clone());
vioblk
}
Expand All @@ -678,7 +679,8 @@ impl MachineInitializer<'_> {
self.log.new(slog::o!("component" => component)),
);
self.devices.insert(device_id.clone(), nvme.clone());
block::attach(nvme.clone(), backend).unwrap();
block::attach(&nvme.block_attach, backend.attachment())
.unwrap();
chipset.pci_attach(bdf, nvme.clone());
nvme
}
Expand Down Expand Up @@ -720,14 +722,20 @@ impl MachineInitializer<'_> {
};

if let Some(registry) = &self.producer_registry {
let stats = VirtualDiskProducer::new(
block_size,
self.properties.id,
volume_id,
&self.properties.metadata,
let block_metrics = BlockMetrics::new(
VirtualDisk {
attached_instance_id: self.properties.id,
block_size,
disk_id: volume_id,
project_id: self.properties.metadata.project_id,
silo_id: self.properties.metadata.silo_id,
},
block_dev.attachment().max_queues(),
);

if let Err(e) = registry.register_producer(stats.clone()) {
if let Err(e) =
registry.register_producer(block_metrics.producer())
{
slog::error!(
self.log,
"Could not register virtual disk producer, \
Expand All @@ -739,12 +747,7 @@ impl MachineInitializer<'_> {
continue;
};

// Set the on-completion callback for the block device, to
// update stats.
let callback = move |op, result, duration| {
stats.on_completion(op, result, duration);
};
block_dev.on_completion(Box::new(callback));
block_dev.attachment().set_metric_consumer(block_metrics);
};
}
}
Expand Down
2 changes: 1 addition & 1 deletion bin/propolis-server/src/lib/stats/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ mod virtual_machine;
#[cfg(all(not(test), target_os = "illumos"))]
use self::network_interface::InstanceNetworkInterfaces;
pub(crate) use self::pvpanic::PvpanicProducer;
pub(crate) use self::virtual_disk::VirtualDiskProducer;
pub(crate) use self::virtual_disk::{BlockMetrics, VirtualDisk};
pub(crate) use self::virtual_machine::VirtualMachine;

/// Interval on which we ask `oximeter` to poll us for metric data.
Expand Down
159 changes: 100 additions & 59 deletions bin/propolis-server/src/lib/stats/virtual_disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//! Types for tracking statistics about virtual disks.

use std::{
num::NonZeroUsize,
sync::{Arc, Mutex},
time::Duration,
};
Expand All @@ -22,12 +23,11 @@ use oximeter::{
MetricsError, Producer, Sample,
};
use propolis::block::{self, Operation};
use propolis_api_types::InstanceMetadata;
use uuid::Uuid;

pub use self::virtual_disk::VirtualDisk;
use self::virtual_disk::{
BytesRead, BytesWritten, FailedFlushes, FailedReads, FailedWrites, Flushes,
IoLatency, IoSize, Reads, VirtualDisk, Writes,
IoLatency, IoSize, Reads, Writes,
};

/// Type for tracking virtual disk stats.
Expand Down Expand Up @@ -67,12 +67,8 @@ struct VirtualDiskStats {

impl VirtualDiskStats {
/// Update the tracked statistics with the result of an I/O completion.
fn on_completion(
&mut self,
op: block::Operation,
result: block::Result,
duration: Duration,
) {
fn on_completion(&mut self, sample: BlockSample) {
let BlockSample { op, result, duration } = sample;
match op {
Operation::Read(_, len) => {
self.on_read_completion(result, len, duration)
Expand Down Expand Up @@ -190,6 +186,12 @@ const LATENCY_POWERS: (u16, u16) = (3, 10);
/// We use 512B as the minimum since that is the minimum supported block size.
const SIZE_POWERS: (u16, u16) = (9, 30);

/// Maximum number of samples to buffer for a given device queue before
/// consolidating them into the totals for the device.
///
/// This value was arbitrarily chosen
const MAX_BUFFERED_SAMPLES: usize = 512;

/// A [`Producer`] that emits statistics about virtual disks.
///
/// This type is shared between the block devie that handles guest I/Os, and the
Expand All @@ -199,32 +201,19 @@ const SIZE_POWERS: (u16, u16) = (9, 30);
///
/// As oximeter polls us, the producer server also collects these updated
/// statistics.
#[derive(Clone, Debug)]
pub struct VirtualDiskProducer {
// Shareable inner type actually managing the stats.
inner: Arc<Mutex<VirtualDiskStats>>,
}

impl VirtualDiskProducer {
/// Create a producer to track a virtual disk.
pub fn new(
block_size: u32,
instance_id: Uuid,
disk_id: Uuid,
metadata: &InstanceMetadata,
) -> Self {
let disk = VirtualDisk {
attached_instance_id: instance_id,
block_size,
disk_id,
project_id: metadata.project_id,
silo_id: metadata.silo_id,
};
#[derive(Debug)]
pub(crate) struct BlockMetrics {
sample_buffer: Vec<Mutex<Vec<BlockSample>>>,
stats: Mutex<VirtualDiskStats>,
}
impl BlockMetrics {
pub fn new(disk: VirtualDisk, max_queues: NonZeroUsize) -> Arc<Self> {
let now = Utc::now();
let datum = Cumulative::with_start_time(now, 0);
let latency_histogram = Self::latency_histogram();
let size_histogram = Self::size_histogram();
let inner = VirtualDiskStats {
let stats = VirtualDiskStats {
disk,
reads: Reads { datum },
bytes_read: BytesRead { datum },
Expand Down Expand Up @@ -274,17 +263,14 @@ impl VirtualDiskProducer {
},
],
};
Self { inner: Arc::new(Mutex::new(inner)) }
let mut sample_buffer = Vec::with_capacity(max_queues.get());
sample_buffer.resize_with(max_queues.get(), Default::default);

Arc::new(Self { sample_buffer, stats: Mutex::new(stats) })
}

/// A callback that updates statistics with the result of a completed I/O.
pub fn on_completion(
&self,
op: block::Operation,
result: block::Result,
duration: Duration,
) {
self.inner.lock().unwrap().on_completion(op, result, duration);
pub(crate) fn producer(self: &Arc<Self>) -> VirtualDiskProducer {
VirtualDiskProducer(self.clone())
}

/// Construct a histogram for tracking I/O latencies.
Expand All @@ -307,60 +293,115 @@ impl VirtualDiskProducer {
// Safety: This only fails if the bins are not valid.
Histogram::new(&bins).unwrap()
}

fn consolidate_one(&self, idx: usize) {
let mut stats = self.stats.lock().unwrap();
let mut buf = self.sample_buffer.get(idx).unwrap().lock().unwrap();
for sample in buf.drain(..) {
stats.on_completion(sample);
}
}
fn consolidate_all(&self) {
let mut stats = self.stats.lock().unwrap();
for buf in self.sample_buffer.iter() {
let mut buf = buf.lock().unwrap();
for sample in buf.drain(..) {
stats.on_completion(sample);
}
}
}
}

#[derive(Debug)]
struct BlockSample {
op: block::Operation,
result: block::Result,
duration: Duration,
}

#[derive(Clone, Debug)]
pub struct VirtualDiskProducer(Arc<BlockMetrics>);

impl block::MetricConsumer for BlockMetrics {
fn request_completed(
&self,
queue_id: block::QueueId,
op: Operation,
result: block::Result,
_time_queued: Duration,
time_processed: Duration,
) {
let idx = usize::from(queue_id);
let buf = self
.sample_buffer
.get(idx)
.expect("queue ID should be within maximum");
let mut guard = buf.lock().unwrap();
guard.push(BlockSample { op, result, duration: time_processed });

// Do not let an unbounded number of samples accumulate
if guard.len() > MAX_BUFFERED_SAMPLES {
drop(guard);
self.consolidate_one(idx);
}
}
}

impl Producer for VirtualDiskProducer {
fn produce(
&mut self,
) -> Result<Box<dyn Iterator<Item = Sample>>, MetricsError> {
// Consolidate any buffer samples first
self.0.consolidate_all();

// 5 scalar samples (reads, writes, flushes, bytes read / written)
// 3 scalars broken out by failure kind
// 2 histograms broken out by I/O kind
const N_SAMPLES: usize = 5 + 3 * N_FAILURE_KINDS + 2 * N_IO_KINDS;
let mut out = Vec::with_capacity(N_SAMPLES);
let inner = self.inner.lock().unwrap();
let stats = self.0.stats.lock().unwrap();

// Read statistics.
out.push(Sample::new(&inner.disk, &inner.reads)?);
out.push(Sample::new(&inner.disk, &inner.bytes_read)?);
for failed in inner.failed_reads.iter() {
out.push(Sample::new(&inner.disk, failed)?);
out.push(Sample::new(&stats.disk, &stats.reads)?);
out.push(Sample::new(&stats.disk, &stats.bytes_read)?);
for failed in stats.failed_reads.iter() {
out.push(Sample::new(&stats.disk, failed)?);
}

// Write statistics.
out.push(Sample::new(&inner.disk, &inner.writes)?);
out.push(Sample::new(&inner.disk, &inner.bytes_written)?);
for failed in inner.failed_writes.iter() {
out.push(Sample::new(&inner.disk, failed)?);
out.push(Sample::new(&stats.disk, &stats.writes)?);
out.push(Sample::new(&stats.disk, &stats.bytes_written)?);
for failed in stats.failed_writes.iter() {
out.push(Sample::new(&stats.disk, failed)?);
}

// Flushes
out.push(Sample::new(&inner.disk, &inner.flushes)?);
for failed in inner.failed_flushes.iter() {
out.push(Sample::new(&inner.disk, failed)?);
out.push(Sample::new(&stats.disk, &stats.flushes)?);
for failed in stats.failed_flushes.iter() {
out.push(Sample::new(&stats.disk, failed)?);
}

// Histograms for latency and size.
for hist in inner.io_latency.iter() {
out.push(Sample::new(&inner.disk, hist)?);
for hist in stats.io_latency.iter() {
out.push(Sample::new(&stats.disk, hist)?);
}
for hist in inner.io_size.iter() {
out.push(Sample::new(&inner.disk, hist)?);
for hist in stats.io_size.iter() {
out.push(Sample::new(&stats.disk, hist)?);
}
drop(inner);
drop(stats);
Ok(Box::new(out.into_iter()))
}
}

#[cfg(test)]
mod test {
use super::VirtualDiskProducer;
use super::BlockMetrics;
use super::LATENCY_POWERS;
use super::SIZE_POWERS;

#[test]
fn test_latency_histogram() {
let hist = VirtualDiskProducer::latency_histogram();
let hist = BlockMetrics::latency_histogram();
println!("{:#?}", hist.iter().map(|bin| bin.range).collect::<Vec<_>>());
// The math here is a bit silly, but we end up with 9 bins in each
// "interior" power of 10, plus one more bin on the right and left for
Expand All @@ -373,7 +414,7 @@ mod test {

#[test]
fn test_size_histogram() {
let hist = VirtualDiskProducer::size_histogram();
let hist = BlockMetrics::size_histogram();
println!("{:#?}", hist.iter().map(|bin| bin.range).collect::<Vec<_>>());
// 1 extra left bin for [0, 512), and 1 because the range is inclusive.
assert_eq!(
Expand Down
8 changes: 2 additions & 6 deletions bin/propolis-server/src/lib/vm/objects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use propolis::{
Machine,
};
use propolis_api_types::instance_spec::SpecKey;
use slog::{error, info};
use slog::info;
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};

use crate::{serial::Serial, spec::Spec, vcpu_tasks::VcpuTaskController};
Expand Down Expand Up @@ -383,11 +383,7 @@ impl VmObjectsLocked {
for (id, backend) in self.block_backends.iter() {
info!(self.log, "stopping and detaching block backend {}", id);
backend.stop().await;
if let Err(err) = backend.detach() {
error!(self.log, "error detaching block backend";
"id" => %id,
"error" => ?err);
}
backend.attachment().detach();
}
}

Expand Down
Loading
Loading