Skip to content

Phase 4.4: Add Orphan Cleanup System #88

@avirtuos

Description

@avirtuos

Phase 4, Step 4: Orphan Cleanup System

Parent Issue: #84 - Phase 4: Robustness & Recovery
Timeline: Days 9-10 of Phase 4
Status: Not Started

Overview

Implement a comprehensive system for detecting and cleaning up orphaned chunks, unreferenced metadata, and other data inconsistencies that accumulate over time in a distributed storage system.

Objectives

  1. Detect orphaned chunks not referenced by any file
  2. Identify unreferenced metadata entries
  3. Implement safe cleanup with grace periods
  4. Add audit logging for all cleanup operations
  5. Create rollback capability for accidental deletions

Technical Design

Orphan Cleanup Architecture

pub struct OrphanCleanupService {
    metadata_store: Arc<MetadataStore>,
    chunk_service: Arc<ChunkService>,
    cleanup_queue: Arc<CleanupQueue>,
    audit_log: Arc<AuditLog>,
    grace_period: Duration,
    dry_run_mode: bool,
}

pub struct CleanupQueue {
    pending: Arc<Mutex<VecDeque<CleanupItem>>>,
    in_progress: Arc<RwLock<HashMap<ItemId, CleanupItem>>>,
    completed: Arc<RwLock<Vec<CompletedCleanup>>>,
}

pub struct CleanupItem {
    id: ItemId,
    item_type: OrphanType,
    resource_id: String,
    detected_at: SystemTime,
    grace_expires_at: SystemTime,
    size_bytes: u64,
    references_checked: u32,
}

pub enum OrphanType {
    Chunk,
    Stripe,
    PartialFile,
    TempFile,
    StagedChunk,
    AbortedTransaction,
}

Orphan Detection Engine

impl OrphanCleanupService {
    pub async fn run_detection_cycle(&self) -> Result<DetectionReport> {
        info!("Starting orphan detection cycle");
        let mut report = DetectionReport::new();
        
        // 1. Detect orphaned chunks
        let orphaned_chunks = self.detect_orphaned_chunks().await?;
        report.orphaned_chunks = orphaned_chunks.len();
        
        // 2. Detect unreferenced stripes
        let orphaned_stripes = self.detect_orphaned_stripes().await?;
        report.orphaned_stripes = orphaned_stripes.len();
        
        // 3. Detect partial files
        let partial_files = self.detect_partial_files().await?;
        report.partial_files = partial_files.len();
        
        // 4. Detect stale temporary files
        let temp_files = self.detect_temp_files().await?;
        report.temp_files = temp_files.len();
        
        // 5. Detect staged chunks from failed transactions
        let staged_chunks = self.detect_staged_chunks().await?;
        report.staged_chunks = staged_chunks.len();
        
        // Queue items for cleanup
        self.queue_cleanup_items(
            orphaned_chunks,
            orphaned_stripes,
            partial_files,
            temp_files,
            staged_chunks,
        ).await?;
        
        info!("Detection cycle completed: {}", report.summary());
        Ok(report)
    }
    
    async fn detect_orphaned_chunks(&self) -> Result<Vec<ChunkInfo>> {
        // 1. Build reference map from metadata
        let mut referenced = HashSet::new();
        
        let stripes = self.metadata_store.stream_all_stripes().await?;
        let mut stripe_stream = stripes;
        
        while let Some(stripe) = stripe_stream.next().await {
            let stripe = stripe?;
            for location in &stripe.chunk_locations {
                referenced.insert(location.chunk_id.clone());
            }
        }
        
        // 2. Scan all chunks on disk
        let mut orphans = Vec::new();
        let disk_chunks = self.chunk_service.scan_chunks_with_metadata().await?;
        
        for chunk_info in disk_chunks {
            if !referenced.contains(&chunk_info.chunk_id) {
                // 3. Double-check with direct metadata query
                if !self.verify_chunk_reference(&chunk_info.chunk_id).await? {
                    orphans.push(chunk_info);
                }
            }
        }
        
        info!("Found {} orphaned chunks", orphans.len());
        Ok(orphans)
    }
    
    async fn verify_chunk_reference(&self, chunk_id: &ChunkId) -> Result<bool> {
        // Perform thorough check across all metadata
        let queries = vec![
            self.metadata_store.chunk_in_stripe(chunk_id),
            self.metadata_store.chunk_in_staging(chunk_id),
            self.metadata_store.chunk_in_repair_queue(chunk_id),
        ];
        
        let results = futures::future::join_all(queries).await;
        
        for result in results {
            if result? {
                return Ok(true); // Chunk is referenced
            }
        }
        
        Ok(false) // Chunk is orphaned
    }
}

Safe Cleanup with Grace Period

impl OrphanCleanupService {
    pub async fn cleanup_worker(&self) {
        info!("Starting cleanup worker");
        
        loop {
            // Get items past grace period
            let ready_items = self.get_ready_for_cleanup().await;
            
            if ready_items.is_empty() {
                tokio::time::sleep(Duration::from_secs(60)).await;
                continue;
            }
            
            for item in ready_items {
                // Final verification before cleanup
                if self.verify_still_orphaned(&item).await? {
                    self.execute_cleanup(&item).await?;
                } else {
                    info!("Item {} no longer orphaned, skipping", item.id);
                    self.remove_from_queue(&item.id).await;
                }
            }
        }
    }
    
    async fn get_ready_for_cleanup(&self) -> Vec<CleanupItem> {
        let now = SystemTime::now();
        let mut ready = Vec::new();
        
        let queue = self.cleanup_queue.pending.lock().await;
        for item in queue.iter() {
            if item.grace_expires_at <= now {
                ready.push(item.clone());
            }
        }
        
        ready
    }
    
    async fn verify_still_orphaned(&self, item: &CleanupItem) -> Result<bool> {
        // Increment check counter
        item.references_checked += 1;
        
        match item.item_type {
            OrphanType::Chunk => {
                !self.verify_chunk_reference(&item.resource_id.parse()?).await?
            }
            OrphanType::Stripe => {
                !self.verify_stripe_reference(&item.resource_id.parse()?).await?
            }
            OrphanType::PartialFile => {
                self.is_file_still_partial(&item.resource_id.parse()?).await?
            }
            OrphanType::TempFile => {
                self.is_temp_file_stale(&item.resource_id).await?
            }
            OrphanType::StagedChunk => {
                self.is_staging_expired(&item.resource_id.parse()?).await?
            }
            OrphanType::AbortedTransaction => {
                self.is_transaction_aborted(&item.resource_id.parse()?).await?
            }
        }
    }
    
    async fn execute_cleanup(&self, item: &CleanupItem) -> Result<()> {
        // Log cleanup action
        self.audit_log.log_cleanup(CleanupAuditEntry {
            timestamp: SystemTime::now(),
            item_type: item.item_type.clone(),
            resource_id: item.resource_id.clone(),
            size_bytes: item.size_bytes,
            detected_at: item.detected_at,
            cleaned_at: SystemTime::now(),
            dry_run: self.dry_run_mode,
        }).await?;
        
        if self.dry_run_mode {
            info!("DRY RUN: Would clean up {} {}", item.item_type, item.resource_id);
            return Ok(());
        }
        
        // Perform actual cleanup
        match item.item_type {
            OrphanType::Chunk => {
                self.chunk_service.delete_chunk(&item.resource_id.parse()?).await?;
            }
            OrphanType::Stripe => {
                self.cleanup_stripe(&item.resource_id.parse()?).await?;
            }
            OrphanType::PartialFile => {
                self.cleanup_partial_file(&item.resource_id.parse()?).await?;
            }
            OrphanType::TempFile => {
                self.chunk_service.delete_temp_file(&item.resource_id).await?;
            }
            OrphanType::StagedChunk => {
                self.cleanup_staged_chunk(&item.resource_id.parse()?).await?;
            }
            OrphanType::AbortedTransaction => {
                self.cleanup_aborted_transaction(&item.resource_id.parse()?).await?;
            }
        }
        
        // Move to completed
        self.cleanup_queue.mark_completed(item.id, SystemTime::now()).await;
        
        info!("Cleaned up {} {}", item.item_type, item.resource_id);
        Ok(())
    }
}

Rollback Capability

pub struct CleanupRollback {
    audit_log: Arc<AuditLog>,
    restore_service: Arc<RestoreService>,
}

impl CleanupRollback {
    pub async fn rollback_cleanup(
        &self,
        cleanup_id: &CleanupId,
    ) -> Result<()> {
        // 1. Find cleanup in audit log
        let audit_entry = self.audit_log.get_entry(cleanup_id).await?
            .ok_or(Error::CleanupNotFound)?;
        
        // 2. Check if rollback is possible
        let age = SystemTime::now().duration_since(audit_entry.cleaned_at)?;
        if age > Duration::from_days(7) {
            return Err(Error::RollbackWindowExpired);
        }
        
        // 3. Restore from backup if available
        match audit_entry.item_type {
            OrphanType::Chunk => {
                self.restore_chunk(&audit_entry.resource_id).await?;
            }
            OrphanType::Stripe => {
                self.restore_stripe(&audit_entry.resource_id).await?;
            }
            OrphanType::PartialFile => {
                self.restore_partial_file(&audit_entry.resource_id).await?;
            }
            _ => {
                return Err(Error::CannotRollback(audit_entry.item_type));
            }
        }
        
        // 4. Log rollback
        self.audit_log.log_rollback(RollbackEntry {
            cleanup_id: cleanup_id.clone(),
            restored_at: SystemTime::now(),
            resource_type: audit_entry.item_type,
            resource_id: audit_entry.resource_id,
        }).await?;
        
        info!("Successfully rolled back cleanup {}", cleanup_id);
        Ok(())
    }
    
    async fn restore_chunk(&self, chunk_id: &str) -> Result<()> {
        // Check if chunk exists in backup
        if let Some(backup_data) = self.restore_service.get_chunk_backup(chunk_id).await? {
            // Restore chunk to original location
            let location = backup_data.original_location;
            self.chunk_service.store_chunk(&location, &backup_data.data).await?;
            
            // Restore metadata references
            self.metadata_store.restore_chunk_reference(
                chunk_id.parse()?,
                backup_data.stripe_id,
            ).await?;
            
            Ok(())
        } else {
            Err(Error::BackupNotAvailable)
        }
    }
}

Monitoring and Reporting

impl OrphanCleanupService {
    pub async fn generate_cleanup_report(
        &self,
        period: Duration,
    ) -> Result<CleanupReport> {
        let since = SystemTime::now() - period;
        
        // Query audit log for statistics
        let entries = self.audit_log.query_range(since, SystemTime::now()).await?;
        
        let mut report = CleanupReport {
            period_start: since,
            period_end: SystemTime::now(),
            total_items_cleaned: entries.len(),
            space_reclaimed_bytes: 0,
            items_by_type: HashMap::new(),
            errors: Vec::new(),
            rollbacks: Vec::new(),
        };
        
        for entry in entries {
            report.space_reclaimed_bytes += entry.size_bytes;
            *report.items_by_type
                .entry(entry.item_type)
                .or_insert(0) += 1;
        }
        
        // Add error information
        report.errors = self.audit_log.get_errors(since).await?;
        
        // Add rollback information
        report.rollbacks = self.audit_log.get_rollbacks(since).await?;
        
        Ok(report)
    }
    
    pub async fn get_cleanup_metrics(&self) -> CleanupMetrics {
        CleanupMetrics {
            queue_size: self.cleanup_queue.size().await,
            in_progress: self.cleanup_queue.in_progress_count().await,
            completed_today: self.cleanup_queue.completed_today().await,
            space_reclaimed_today: self.audit_log.space_reclaimed_today().await,
            last_run: self.last_detection_time().await,
            next_run: self.next_detection_time().await,
        }
    }
}

Implementation Tasks

Day 9: Detection & Queueing

  • Create OrphanCleanupService struct
  • Implement orphan detection algorithms
  • Create cleanup queue management
  • Add grace period handling
  • Implement verification logic

Day 10: Cleanup & Rollback

  • Implement cleanup execution
  • Add audit logging
  • Create rollback capability
  • Implement monitoring/reporting
  • Add dry-run mode

Testing Requirements

Unit Tests

  • Test orphan detection logic
  • Test grace period calculations
  • Test verification checks
  • Test cleanup execution
  • Test rollback operations

Integration Tests

  • Test full detection cycle
  • Test cleanup with grace period
  • Test concurrent cleanup operations
  • Test rollback functionality
  • Test audit logging

Safety Tests

  • Test false positive detection
  • Test cleanup during active writes
  • Test rollback window
  • Test dry-run mode

Configuration

[orphan_cleanup]
enabled = true
detection_interval_hours = 24
grace_period_hours = 48
dry_run = false
max_queue_size = 10000
cleanup_parallelism = 5
rollback_window_days = 7
audit_retention_days = 90

Success Criteria

  • Orphans detected with 100% accuracy
  • No false positives in production
  • Grace period prevents premature deletion
  • Rollback works within window
  • Audit log captures all operations
  • >95% test coverage

Dependencies

  • Phase 1: MetadataStore
  • Phase 3: ChunkService
  • Phase 4.3: StorageWatchdog

References

Blocked By: Phase 4.3
Blocks: None (Phase 4 completion)

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions