Skip to content

Phase 5.1: Implement MetricService #90

@avirtuos

Description

@avirtuos

Phase 5, Step 1: MetricService Implementation

Parent Issue: #89 - Phase 5: Observability & Production Readiness
Timeline: Days 1-2 of Phase 5
Status: Not Started

Overview

Implement a comprehensive metrics collection and exposure system using lock-free data structures, with Prometheus-compatible endpoints and health checks for production monitoring.

Objectives

  1. Create lock-free metrics collection via channels
  2. Implement counter, gauge, and histogram types
  3. Add background aggregation loop
  4. Create Prometheus exporter with TextEncoder
  5. Implement health check endpoints

Technical Design

MetricService Architecture

pub struct MetricService {
    registry: Arc<MetricRegistry>,
    aggregator: Arc<MetricAggregator>,
    exporter: PrometheusExporter,
    health_checker: Arc<HealthChecker>,
    http_server: Option<JoinHandle<()>>,
}

pub struct MetricRegistry {
    counters: DashMap<String, Arc<AtomicU64>>,
    gauges: DashMap<String, Arc<AtomicI64>>,
    histograms: DashMap<String, Arc<Mutex<Histogram>>>,
    labels: DashMap<String, HashMap<String, String>>,
}

pub struct MetricAggregator {
    update_rx: mpsc::UnboundedReceiver<MetricUpdate>,
    registry: Arc<MetricRegistry>,
    aggregation_interval: Duration,
}

pub enum MetricUpdate {
    IncrementCounter { name: String, value: u64 },
    SetGauge { name: String, value: i64 },
    RecordHistogram { name: String, value: f64 },
}

Lock-Free Collection

impl MetricService {
    pub fn new(config: MetricConfig) -> Self {
        let (update_tx, update_rx) = mpsc::unbounded_channel();
        
        let registry = Arc::new(MetricRegistry::new());
        let aggregator = Arc::new(MetricAggregator {
            update_rx,
            registry: registry.clone(),
            aggregation_interval: config.aggregation_interval,
        });
        
        // Start aggregation loop
        let agg_handle = tokio::spawn({
            let aggregator = aggregator.clone();
            async move {
                aggregator.run().await;
            }
        });
        
        Self {
            registry,
            aggregator,
            exporter: PrometheusExporter::new(registry.clone()),
            health_checker: Arc::new(HealthChecker::new()),
            http_server: None,
        }
    }
    
    pub fn counter(&self, name: &str) -> Counter {
        Counter {
            name: name.to_string(),
            update_tx: self.aggregator.update_tx.clone(),
            value: self.registry.counters
                .entry(name.to_string())
                .or_insert_with(|| Arc::new(AtomicU64::new(0)))
                .clone(),
        }
    }
    
    pub fn gauge(&self, name: &str) -> Gauge {
        Gauge {
            name: name.to_string(),
            update_tx: self.aggregator.update_tx.clone(),
            value: self.registry.gauges
                .entry(name.to_string())
                .or_insert_with(|| Arc::new(AtomicI64::new(0)))
                .clone(),
        }
    }
    
    pub fn histogram(&self, name: &str, buckets: Vec<f64>) -> Histogram {
        Histogram {
            name: name.to_string(),
            update_tx: self.aggregator.update_tx.clone(),
            inner: self.registry.histograms
                .entry(name.to_string())
                .or_insert_with(|| Arc::new(Mutex::new(
                    HistogramImpl::new(buckets)
                )))
                .clone(),
        }
    }
}

pub struct Counter {
    name: String,
    update_tx: mpsc::UnboundedSender<MetricUpdate>,
    value: Arc<AtomicU64>,
}

impl Counter {
    pub fn inc(&self) {
        self.inc_by(1);
    }
    
    pub fn inc_by(&self, value: u64) {
        // Lock-free increment
        self.value.fetch_add(value, Ordering::Relaxed);
        
        // Send update for aggregation
        let _ = self.update_tx.send(MetricUpdate::IncrementCounter {
            name: self.name.clone(),
            value,
        });
    }
}

Metric Aggregation

impl MetricAggregator {
    async fn run(&self) {
        let mut interval = tokio::time::interval(self.aggregation_interval);
        let mut pending_updates = Vec::new();
        
        loop {
            tokio::select! {
                _ = interval.tick() => {
                    self.process_batch(&mut pending_updates).await;
                }
                
                // Receive updates with batching
                _ = async {
                    while let Ok(update) = self.update_rx.try_recv() {
                        pending_updates.push(update);
                        if pending_updates.len() >= 1000 {
                            break;
                        }
                    }
                } => {
                    if pending_updates.len() >= 1000 {
                        self.process_batch(&mut pending_updates).await;
                    }
                }
            }
        }
    }
    
    async fn process_batch(&self, updates: &mut Vec<MetricUpdate>) {
        for update in updates.drain(..) {
            match update {
                MetricUpdate::IncrementCounter { name, value } => {
                    // Counter already updated atomically, just track
                    self.track_counter_rate(&name, value);
                }
                MetricUpdate::SetGauge { name, value } => {
                    if let Some(gauge) = self.registry.gauges.get(&name) {
                        gauge.store(value, Ordering::Relaxed);
                    }
                }
                MetricUpdate::RecordHistogram { name, value } => {
                    if let Some(hist) = self.registry.histograms.get(&name) {
                        if let Ok(mut h) = hist.try_lock() {
                            h.record(value);
                        }
                    }
                }
            }
        }
    }
}

Prometheus Exporter

pub struct PrometheusExporter {
    registry: Arc<MetricRegistry>,
    encoder: TextEncoder,
}

impl PrometheusExporter {
    pub fn export(&self) -> Result<String> {
        let mut buffer = String::new();
        
        // Export counters
        for entry in self.registry.counters.iter() {
            let (name, value) = entry.pair();
            let v = value.load(Ordering::Relaxed);
            
            write!(
                buffer,
                "# HELP {} Counter metric\n# TYPE {} counter\n{} {}\n",
                name, name, name, v
            )?;
        }
        
        // Export gauges
        for entry in self.registry.gauges.iter() {
            let (name, value) = entry.pair();
            let v = value.load(Ordering::Relaxed);
            
            write!(
                buffer,
                "# HELP {} Gauge metric\n# TYPE {} gauge\n{} {}\n",
                name, name, name, v
            )?;
        }
        
        // Export histograms
        for entry in self.registry.histograms.iter() {
            let (name, hist) = entry.pair();
            
            if let Ok(h) = hist.lock() {
                let snapshot = h.snapshot();
                
                // Export buckets
                for (bucket, count) in &snapshot.buckets {
                    write!(
                        buffer,
                        "{}_bucket{{le=\"{}\"}} {}\n",
                        name, bucket, count
                    )?;
                }
                
                // Export summary stats
                write!(
                    buffer,
                    "{}_sum {}\n{}_count {}\n",
                    name, snapshot.sum, name, snapshot.count
                )?;
            }
        }
        
        Ok(buffer)
    }
}

Health Check Implementation

pub struct HealthChecker {
    components: Arc<RwLock<HashMap<String, ComponentHealth>>>,
}

pub struct ComponentHealth {
    name: String,
    status: HealthStatus,
    last_check: SystemTime,
    details: HashMap<String, String>,
}

pub enum HealthStatus {
    Healthy,
    Degraded,
    Unhealthy,
}

impl HealthChecker {
    pub async fn check_liveness(&self) -> LivenessResponse {
        // Basic liveness - is the process running?
        LivenessResponse {
            status: "alive".to_string(),
            timestamp: SystemTime::now(),
        }
    }
    
    pub async fn check_readiness(&self) -> ReadinessResponse {
        let components = self.components.read().await;
        
        // Check all critical components
        let all_healthy = components.values()
            .filter(|c| c.is_critical())
            .all(|c| matches!(c.status, HealthStatus::Healthy));
        
        ReadinessResponse {
            ready: all_healthy,
            components: components.values()
                .map(|c| ComponentStatus {
                    name: c.name.clone(),
                    healthy: matches!(c.status, HealthStatus::Healthy),
                    details: c.details.clone(),
                })
                .collect(),
            timestamp: SystemTime::now(),
        }
    }
    
    pub async fn register_component(&self, name: String, checker: HealthCheckFn) {
        // Start periodic health check
        let components = self.components.clone();
        
        tokio::spawn(async move {
            let mut interval = tokio::time::interval(Duration::from_secs(10));
            
            loop {
                interval.tick().await;
                
                let health = checker().await;
                
                components.write().await.insert(name.clone(), ComponentHealth {
                    name: name.clone(),
                    status: health.status,
                    last_check: SystemTime::now(),
                    details: health.details,
                });
            }
        });
    }
}

HTTP Server

impl MetricService {
    pub async fn start_http_server(&mut self, addr: SocketAddr) -> Result<()> {
        let registry = self.registry.clone();
        let health_checker = self.health_checker.clone();
        let exporter = self.exporter.clone();
        
        let app = Router::new()
            .route("/metrics", get({
                let exporter = exporter.clone();
                move || async move {
                    match exporter.export() {
                        Ok(metrics) => Response::builder()
                            .header("Content-Type", "text/plain; version=0.0.4")
                            .body(metrics)
                            .unwrap(),
                        Err(e) => Response::builder()
                            .status(500)
                            .body(format!("Error: {}", e))
                            .unwrap(),
                    }
                }
            }))
            .route("/health", get({
                let checker = health_checker.clone();
                move || async move {
                    Json(HealthResponse {
                        status: "healthy",
                        timestamp: SystemTime::now(),
                    })
                }
            }))
            .route("/health/live", get({
                let checker = health_checker.clone();
                move || async move {
                    Json(checker.check_liveness().await)
                }
            }))
            .route("/health/ready", get({
                let checker = health_checker.clone();
                move || async move {
                    let response = checker.check_readiness().await;
                    if response.ready {
                        (StatusCode::OK, Json(response))
                    } else {
                        (StatusCode::SERVICE_UNAVAILABLE, Json(response))
                    }
                }
            }));
        
        let handle = tokio::spawn(async move {
            axum::Server::bind(&addr)
                .serve(app.into_make_service())
                .await
                .expect("Failed to start HTTP server");
        });
        
        self.http_server = Some(handle);
        info!("Metrics server started on {}", addr);
        Ok(())
    }
}

Integration with Components

// Example: Integrating with FileStore
impl FileStore {
    pub fn with_metrics(mut self, metrics: Arc<MetricService>) -> Self {
        self.read_counter = Some(metrics.counter("filestore_reads_total"));
        self.write_counter = Some(metrics.counter("filestore_writes_total"));
        self.read_latency = Some(metrics.histogram(
            "filestore_read_duration_seconds",
            vec![0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0],
        ));
        self.write_latency = Some(metrics.histogram(
            "filestore_write_duration_seconds",
            vec![0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0],
        ));
        self
    }
    
    pub async fn read_stripe(&self, stripe_id: &StripeId) -> Result<Vec<u8>> {
        let start = Instant::now();
        
        let result = self.read_stripe_internal(stripe_id).await;
        
        // Record metrics
        if let Some(counter) = &self.read_counter {
            counter.inc();
        }
        
        if let Some(histogram) = &self.read_latency {
            histogram.record(start.elapsed().as_secs_f64());
        }
        
        result
    }
}

Implementation Tasks

Day 1: Core Metrics

  • Create MetricService structure
  • Implement lock-free collectors
  • Create aggregation loop
  • Add metric types (counter, gauge, histogram)
  • Implement metric registry

Day 2: Export & Health

  • Implement Prometheus exporter
  • Create HTTP server
  • Add health check endpoints
  • Integrate with components
  • Add configuration support

Testing Requirements

Unit Tests

  • Test metric collection
  • Test aggregation logic
  • Test Prometheus export format
  • Test health checks
  • Test concurrent updates

Integration Tests

  • Test HTTP endpoints
  • Test component integration
  • Test under load
  • Test metric accuracy
  • Test health status changes

Configuration

[metrics]
enabled = true
listen_address = "0.0.0.0:9090"
aggregation_interval_ms = 1000
retention_period_minutes = 60

[health]
check_interval_seconds = 10
timeout_seconds = 5
critical_components = ["metadata_store", "file_store", "raft"]

Success Criteria

  • All components expose metrics
  • Prometheus endpoint works correctly
  • Health checks respond <10ms
  • No memory leaks under load
  • Metrics accurate under concurrency
  • >95% test coverage

Dependencies

  • prometheus - Metrics library
  • axum - HTTP server
  • tokio - Async runtime
  • dashmap - Concurrent collections

References

Blocked By: None
Blocks: Phase 5.3 (Production Deployment)

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions