-
Notifications
You must be signed in to change notification settings - Fork 0
Open
Labels
enhancementNew feature or requestNew feature or request
Milestone
Description
Phase 5, Step 3: Production Deployment Preparation
Parent Issue: #89 - Phase 5: Observability & Production Readiness
Timeline: Day 5 of Phase 5
Status: Not Started
Overview
Finalize WormFS for production deployment by implementing graceful shutdown, component health checks, performance benchmarks, and creating comprehensive operational documentation.
Objectives
- Implement graceful shutdown with signal handling
- Add comprehensive component health checks
- Create performance benchmarks with Criterion
- Develop production deployment guide
- Create Grafana dashboards and monitoring setup
Technical Design
Graceful Shutdown Implementation
pub struct ShutdownController {
shutdown_tx: broadcast::Sender<()>,
components: Arc<RwLock<Vec<ShutdownComponent>>>,
grace_period: Duration,
}
pub struct ShutdownComponent {
name: String,
priority: u32, // Lower = shutdown first
shutdown_fn: Box<dyn Fn() -> BoxFuture<'static, Result<()>> + Send + Sync>,
}
impl ShutdownController {
pub async fn init() -> Self {
let (shutdown_tx, _) = broadcast::channel(1);
let controller = Self {
shutdown_tx,
components: Arc::new(RwLock::new(Vec::new())),
grace_period: Duration::from_secs(30),
};
// Register signal handlers
controller.register_signal_handlers().await;
controller
}
async fn register_signal_handlers(&self) {
let shutdown_tx = self.shutdown_tx.clone();
tokio::spawn(async move {
let mut sigterm = signal(SignalKind::terminate()).unwrap();
let mut sigint = signal(SignalKind::interrupt()).unwrap();
tokio::select! {
_ = sigterm.recv() => {
info!("Received SIGTERM, initiating graceful shutdown");
}
_ = sigint.recv() => {
info!("Received SIGINT, initiating graceful shutdown");
}
}
let _ = shutdown_tx.send(());
});
}
pub async fn shutdown(&self) -> Result<()> {
info!("Starting graceful shutdown sequence");
let start = Instant::now();
// Sort components by priority
let mut components = self.components.write().await;
components.sort_by_key(|c| c.priority);
// Shutdown in priority order
for component in components.iter() {
info!("Shutting down component: {}", component.name);
let remaining = self.grace_period.saturating_sub(start.elapsed());
if remaining.is_zero() {
warn!("Grace period exceeded, forcing shutdown");
break;
}
match timeout(remaining, (component.shutdown_fn)()).await {
Ok(Ok(_)) => {
info!("Component {} shutdown successfully", component.name);
}
Ok(Err(e)) => {
error!("Component {} shutdown failed: {}", component.name, e);
}
Err(_) => {
error!("Component {} shutdown timed out", component.name);
}
}
}
info!("Graceful shutdown completed in {:?}", start.elapsed());
Ok(())
}
pub fn register_component(&self, component: ShutdownComponent) {
self.components.blocking_write().push(component);
}
}
// Example integration
impl StorageNode {
pub fn register_shutdown(&self, controller: &ShutdownController) {
let components = vec![
ShutdownComponent {
name: "FUSE Mount".to_string(),
priority: 0, // Unmount first
shutdown_fn: {
let fuse = self.fuse_handle.clone();
Box::new(move || Box::pin(async move {
fuse.unmount().await
}))
},
},
ShutdownComponent {
name: "gRPC Server".to_string(),
priority: 1,
shutdown_fn: {
let server = self.grpc_server.clone();
Box::new(move || Box::pin(async move {
server.graceful_shutdown().await
}))
},
},
ShutdownComponent {
name: "Raft Member".to_string(),
priority: 2,
shutdown_fn: {
let raft = self.raft_member.clone();
Box::new(move || Box::pin(async move {
raft.leave_cluster().await
}))
},
},
ShutdownComponent {
name: "Metadata Store".to_string(),
priority: 3,
shutdown_fn: {
let metadata = self.metadata_store.clone();
Box::new(move || Box::pin(async move {
metadata.flush_and_close().await
}))
},
},
];
for component in components {
controller.register_component(component);
}
}
}Component Health Checks
pub trait HealthCheck: Send + Sync {
async fn check(&self) -> HealthResult;
}
pub struct HealthResult {
pub status: HealthStatus,
pub message: Option<String>,
pub details: HashMap<String, serde_json::Value>,
}
// Metadata Store Health Check
impl HealthCheck for MetadataStore {
async fn check(&self) -> HealthResult {
let mut details = HashMap::new();
// Check database connection
match self.pool.acquire().await {
Ok(conn) => {
details.insert("database".to_string(), json!("connected"));
// Check table accessibility
match sqlx::query("SELECT COUNT(*) FROM files")
.fetch_one(&mut conn)
.await
{
Ok(_) => {
details.insert("tables".to_string(), json!("accessible"));
}
Err(e) => {
return HealthResult {
status: HealthStatus::Unhealthy,
message: Some(format!("Table query failed: {}", e)),
details,
};
}
}
}
Err(e) => {
return HealthResult {
status: HealthStatus::Unhealthy,
message: Some(format!("Database connection failed: {}", e)),
details,
};
}
}
// Check pool stats
let pool_stats = self.pool.stats();
details.insert("pool_size".to_string(), json!(pool_stats.size));
details.insert("pool_idle".to_string(), json!(pool_stats.idle));
HealthResult {
status: HealthStatus::Healthy,
message: None,
details,
}
}
}
// Raft Health Check
impl HealthCheck for StorageRaftMember {
async fn check(&self) -> HealthResult {
let mut details = HashMap::new();
let metrics = self.raft.metrics().await;
details.insert("role".to_string(), json!(metrics.role));
details.insert("term".to_string(), json!(metrics.current_term));
details.insert("last_log_index".to_string(), json!(metrics.last_log_index));
details.insert("commit_index".to_string(), json!(metrics.commit_index));
details.insert("leader_id".to_string(), json!(metrics.current_leader));
// Check if we have a leader
let status = if metrics.current_leader.is_some() {
HealthStatus::Healthy
} else {
HealthStatus::Degraded
};
HealthResult {
status,
message: None,
details,
}
}
}Performance Benchmarks
// benches/wormfs_benchmarks.rs
use criterion::{black_box, criterion_group, criterion_main, Criterion, BenchmarkId};
use wormfs::*;
fn bench_file_operations(c: &mut Criterion) {
let runtime = tokio::runtime::Runtime::new().unwrap();
let test_cluster = runtime.block_on(TestCluster::new(3)).unwrap();
let mut group = c.benchmark_group("file_operations");
// Benchmark small file writes
for size in [1024, 4096, 16384, 65536].iter() {
group.throughput(Throughput::Bytes(*size as u64));
group.bench_with_input(
BenchmarkId::new("write", size),
size,
|b, &size| {
let data = vec![0u8; size];
b.to_async(&runtime).iter(|| async {
test_cluster.client()
.write_file("/bench.txt", &data)
.await
.unwrap()
});
}
);
}
// Benchmark file reads
for size in [1024, 4096, 16384, 65536].iter() {
group.throughput(Throughput::Bytes(*size as u64));
group.bench_with_input(
BenchmarkId::new("read", size),
size,
|b, &size| {
let path = format!("/bench_{}.txt", size);
runtime.block_on(async {
let data = vec![0u8; size];
test_cluster.client().write_file(&path, &data).await.unwrap();
});
b.to_async(&runtime).iter(|| async {
black_box(
test_cluster.client()
.read_file(&path)
.await
.unwrap()
)
});
}
);
}
group.finish();
}
fn bench_metadata_operations(c: &mut Criterion) {
let mut group = c.benchmark_group("metadata");
group.bench_function("create_file", |b| {
b.iter(|| {
metadata_store.create_file(
black_box("test.txt"),
black_box(FileMetadata::default()),
)
});
});
group.bench_function("list_directory", |b| {
// Populate with 1000 files
for i in 0..1000 {
metadata_store.create_file(&format!("file_{}.txt", i), FileMetadata::default());
}
b.iter(|| {
black_box(metadata_store.list_directory("/"))
});
});
group.finish();
}
fn bench_erasure_coding(c: &mut Criterion) {
let mut group = c.benchmark_group("erasure_coding");
for stripe_size in [1024 * 1024, 4 * 1024 * 1024, 16 * 1024 * 1024].iter() {
group.throughput(Throughput::Bytes(*stripe_size as u64));
group.bench_with_input(
BenchmarkId::new("encode", stripe_size),
stripe_size,
|b, &size| {
let data = vec![0u8; size];
let encoder = ReedSolomonEncoder::new(4, 2);
b.iter(|| {
black_box(encoder.encode(&data))
});
}
);
group.bench_with_input(
BenchmarkId::new("decode", stripe_size),
stripe_size,
|b, &size| {
let data = vec![0u8; size];
let encoder = ReedSolomonEncoder::new(4, 2);
let encoded = encoder.encode(&data);
b.iter(|| {
black_box(encoder.decode(&encoded[..4]))
});
}
);
}
group.finish();
}
criterion_group!(
benches,
bench_file_operations,
bench_metadata_operations,
bench_erasure_coding
);
criterion_main!(benches);Production Deployment Guide
# WormFS Production Deployment Guide
## Prerequisites
- Linux kernel 4.9+ with FUSE support
- Docker 20.10+ or Kubernetes 1.20+
- Prometheus 2.30+ for metrics
- Grafana 8.0+ for dashboards
## Hardware Requirements
### Minimum (3-node cluster)
- CPU: 4 cores per node
- RAM: 8GB per node
- Storage: 100GB SSD per node
- Network: 1Gbps interconnect
### Recommended (5+ node cluster)
- CPU: 8+ cores per node
- RAM: 16GB+ per node
- Storage: 1TB+ NVMe per node
- Network: 10Gbps interconnect
## Installation
### Docker Deployment
```bash
# Create network
docker network create wormfs-net
# Start first node (bootstrap)
docker run -d \
--name wormfs-node1 \
--network wormfs-net \
-v /data/node1:/var/lib/wormfs \
-p 50051:50051 \
-p 9090:9090 \
wormfs:latest \
--bootstrap
# Start additional nodes
for i in 2 3; do
docker run -d \
--name wormfs-node$i \
--network wormfs-net \
-v /data/node$i:/var/lib/wormfs \
-p 5005$i:50051 \
-p 909$i:9090 \
wormfs:latest \
--peer wormfs-node1:50051
doneKubernetes Deployment
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: wormfs
spec:
serviceName: wormfs
replicas: 3
selector:
matchLabels:
app: wormfs
template:
metadata:
labels:
app: wormfs
spec:
containers:
- name: wormfs
image: wormfs:latest
ports:
- containerPort: 50051
name: grpc
- containerPort: 9090
name: metrics
volumeMounts:
- name: data
mountPath: /var/lib/wormfs
livenessProbe:
httpGet:
path: /health/live
port: 9090
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health/ready
port: 9090
initialDelaySeconds: 10
periodSeconds: 5
volumeClaimTemplates:
- metadata:
name: data
spec:
accessModes: ["ReadWriteOnce"]
resources:
requests:
storage: 100GiConfiguration
Production Settings
[node]
id = 1
data_dir = "/var/lib/wormfs"
[cluster]
min_nodes = 3
replication_factor = 3
[storage]
stripe_size = 4194304 # 4MB
data_shards = 4
parity_shards = 2
[performance]
cache_size_mb = 1024
write_buffer_size_mb = 256
max_concurrent_operations = 1000
[security]
tls_enabled = true
tls_cert_path = "/etc/wormfs/server.crt"
tls_key_path = "/etc/wormfs/server.key"
auth_enabled = true
[monitoring]
metrics_enabled = true
metrics_port = 9090
log_level = "info"Monitoring Setup
Prometheus Configuration
scrape_configs:
- job_name: 'wormfs'
static_configs:
- targets: ['node1:9090', 'node2:9090', 'node3:9090']
metrics_path: '/metrics'
scrape_interval: 15sKey Metrics to Monitor
wormfs_cluster_leader- Current leader nodewormfs_file_operations_total- File operation countswormfs_storage_used_bytes- Storage utilizationwormfs_repair_queue_size- Pending repairswormfs_request_duration_seconds- Operation latency
Operational Procedures
Adding a Node
wormfs admin add-node \
--cluster-endpoint node1:50051 \
--new-node-address node4:50051Removing a Node
wormfs admin remove-node \
--cluster-endpoint node1:50051 \
--node-id 3Backup Procedures
# Create snapshot
wormfs admin create-snapshot \
--cluster-endpoint node1:50051 \
--output /backup/snapshot-$(date +%Y%m%d).worm
# Restore from snapshot
wormfs admin restore-snapshot \
--snapshot /backup/snapshot-20240101.wormTroubleshooting
Common Issues
-
Cluster won't form
- Check network connectivity
- Verify all nodes can reach each other
- Check firewall rules
-
High latency
- Check network bandwidth
- Verify disk I/O performance
- Review cache hit rates
-
Data corruption
- Run deep integrity check
- Review watchdog logs
- Check for disk errors
Emergency Procedures
# Force leader election
wormfs admin force-election --cluster-endpoint node1:50051
# Emergency read-only mode
wormfs admin set-readonly --cluster-endpoint node1:50051
# Export metadata for analysis
wormfs debug export-metadata --output metadata.sql
### Grafana Dashboards
```json
{
"dashboard": {
"title": "WormFS Cluster Overview",
"panels": [
{
"title": "Cluster Health",
"type": "stat",
"targets": [
{
"expr": "up{job='wormfs'}"
}
]
},
{
"title": "File Operations/sec",
"type": "graph",
"targets": [
{
"expr": "rate(wormfs_file_operations_total[1m])"
}
]
},
{
"title": "Storage Utilization",
"type": "gauge",
"targets": [
{
"expr": "(wormfs_storage_used_bytes / wormfs_storage_total_bytes) * 100"
}
]
},
{
"title": "Request Latency (p99)",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.99, rate(wormfs_request_duration_seconds_bucket[5m]))"
}
]
}
]
}
}
Implementation Tasks
Day 5: Production Readiness
- Implement graceful shutdown
- Add component health checks
- Create performance benchmarks
- Write deployment documentation
- Create Grafana dashboards
Testing Requirements
Integration Tests
- Test graceful shutdown
- Test health check accuracy
- Verify benchmark results
- Test deployment procedures
Performance Tests
- Benchmark all operations
- Establish baselines
- Test under production load
Success Criteria
- Graceful shutdown completes <30s
- Health checks accurate
- Performance baselines documented
- Deployment guide complete
- Dashboards functional
- >95% uptime in testing
Dependencies
criterion- Benchmarking frameworktokio- Async runtimesignal-hook- Signal handling
References
Blocked By: Phase 5.1, Phase 5.2
Blocks: None (Final step)
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request