-
Notifications
You must be signed in to change notification settings - Fork 0
Open
Labels
enhancementNew feature or requestNew feature or request
Milestone
Description
Phase 5, Step 2: WormValidator Test Framework
Parent Issue: #89 - Phase 5: Observability & Production Readiness
Timeline: Days 3-4 of Phase 5
Status: Not Started
Overview
Build a comprehensive test framework that can bootstrap multi-node clusters, simulate FUSE clients, execute various test scenarios including chaos testing, and generate detailed test reports.
Objectives
- Create cluster bootstrapping for multi-process test clusters
- Implement FUSE client simulator with gRPC
- Design test scenarios for various failure modes
- Add chaos testing with random failures
- Create detailed test reporting with HTML output
Technical Design
Test Framework Architecture
pub struct WormValidator {
config: ValidatorConfig,
cluster: Option<TestCluster>,
client: Option<TestClient>,
chaos_engine: Option<ChaosEngine>,
test_runner: TestRunner,
reporter: TestReporter,
}
pub struct TestCluster {
nodes: Vec<TestNode>,
network: TestNetwork,
data_dirs: Vec<TempDir>,
}
pub struct TestNode {
id: NodeId,
process: Child,
grpc_endpoint: SocketAddr,
metrics_endpoint: SocketAddr,
data_dir: PathBuf,
role: NodeRole,
}
pub struct TestClient {
grpc_client: FilesystemServiceClient,
mount_point: Option<PathBuf>,
}Cluster Bootstrapping
impl TestCluster {
pub async fn bootstrap(config: ClusterConfig) -> Result<Self> {
info!("Bootstrapping {}-node test cluster", config.node_count);
let mut nodes = Vec::new();
let mut data_dirs = Vec::new();
// Create temporary directories for each node
for i in 0..config.node_count {
let data_dir = TempDir::new()?;
data_dirs.push(data_dir);
}
// Start first node as bootstrap
let bootstrap_node = Self::start_node(
NodeConfig {
id: 0,
data_dir: data_dirs[0].path().to_path_buf(),
grpc_port: 50051,
metrics_port: 9090,
bootstrap: true,
peers: vec![],
}
).await?;
nodes.push(bootstrap_node);
// Start remaining nodes
for i in 1..config.node_count {
let node = Self::start_node(
NodeConfig {
id: i as u64,
data_dir: data_dirs[i].path().to_path_buf(),
grpc_port: 50051 + i as u16,
metrics_port: 9090 + i as u16,
bootstrap: false,
peers: vec![nodes[0].grpc_endpoint],
}
).await?;
nodes.push(node);
}
// Wait for cluster formation
Self::wait_for_cluster(&nodes).await?;
Ok(TestCluster {
nodes,
network: TestNetwork::new(),
data_dirs,
})
}
async fn start_node(config: NodeConfig) -> Result<TestNode> {
// Build node configuration file
let config_path = config.data_dir.join("config.toml");
let config_content = format!(
r#"
[node]
id = {}
data_dir = "{}"
[grpc]
listen_address = "127.0.0.1:{}"
[metrics]
listen_address = "127.0.0.1:{}"
[cluster]
bootstrap = {}
peers = {:?}
"#,
config.id,
config.data_dir.display(),
config.grpc_port,
config.metrics_port,
config.bootstrap,
config.peers
);
fs::write(&config_path, config_content)?;
// Start node process
let mut cmd = Command::new("target/release/wormfs");
cmd.arg("--config").arg(config_path);
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let process = cmd.spawn()?;
// Wait for node to be ready
let grpc_endpoint = format!("127.0.0.1:{}", config.grpc_port).parse()?;
Self::wait_for_node_ready(&grpc_endpoint).await?;
Ok(TestNode {
id: config.id,
process,
grpc_endpoint,
metrics_endpoint: format!("127.0.0.1:{}", config.metrics_port).parse()?,
data_dir: config.data_dir,
role: NodeRole::Follower,
})
}
async fn wait_for_cluster(nodes: &[TestNode]) -> Result<()> {
let timeout = Duration::from_secs(30);
let start = Instant::now();
loop {
// Check if cluster has formed
let leader_count = Self::count_leaders(nodes).await?;
if leader_count == 1 {
info!("Cluster formed with leader");
return Ok(());
}
if start.elapsed() > timeout {
return Err(Error::ClusterFormationTimeout);
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
}Test Client Implementation
impl TestClient {
pub async fn connect(endpoint: SocketAddr) -> Result<Self> {
let channel = Channel::from_shared(format!("http://{}", endpoint))?
.connect()
.await?;
let client = FilesystemServiceClient::new(channel);
Ok(TestClient {
grpc_client: client,
mount_point: None,
})
}
pub async fn mount(&mut self, mount_point: PathBuf) -> Result<()> {
// Create mount point if needed
fs::create_dir_all(&mount_point)?;
// Mount FUSE filesystem
let mount_handle = tokio::spawn({
let endpoint = self.grpc_client.endpoint();
async move {
mount_fuse_client(endpoint, mount_point).await
}
});
self.mount_point = Some(mount_point);
Ok(())
}
pub async fn write_file(&mut self, path: &str, data: &[u8]) -> Result<()> {
let request = WriteFileRequest {
path: path.to_string(),
data: data.to_vec(),
};
self.grpc_client.write_file(request).await?;
Ok(())
}
pub async fn read_file(&mut self, path: &str) -> Result<Vec<u8>> {
let request = ReadFileRequest {
path: path.to_string(),
};
let response = self.grpc_client.read_file(request).await?;
Ok(response.into_inner().data)
}
}Test Scenarios
pub struct TestScenario {
name: String,
description: String,
setup: Box<dyn Fn(&mut TestContext) -> BoxFuture<'_, Result<()>>>,
execute: Box<dyn Fn(&mut TestContext) -> BoxFuture<'_, Result<()>>>,
verify: Box<dyn Fn(&TestContext) -> BoxFuture<'_, Result<()>>>,
teardown: Box<dyn Fn(&mut TestContext) -> BoxFuture<'_, Result<()>>>,
}
impl WormValidator {
pub fn register_scenarios(&mut self) {
// Basic file operations
self.add_scenario(TestScenario {
name: "basic_file_ops".to_string(),
description: "Test basic CRUD operations".to_string(),
setup: Box::new(|ctx| Box::pin(async move { Ok(()) })),
execute: Box::new(|ctx| Box::pin(async move {
// Create file
ctx.client.write_file("/test.txt", b"Hello World").await?;
// Read file
let data = ctx.client.read_file("/test.txt").await?;
assert_eq!(data, b"Hello World");
// Update file
ctx.client.write_file("/test.txt", b"Updated").await?;
// Delete file
ctx.client.delete_file("/test.txt").await?;
Ok(())
})),
verify: Box::new(|ctx| Box::pin(async move {
// Verify file doesn't exist
match ctx.client.read_file("/test.txt").await {
Err(_) => Ok(()),
Ok(_) => Err(Error::VerificationFailed("File should not exist")),
}
})),
teardown: Box::new(|ctx| Box::pin(async move { Ok(()) })),
});
// Large file transfer
self.add_scenario(TestScenario {
name: "large_file_transfer".to_string(),
description: "Test 100MB file operations".to_string(),
execute: Box::new(|ctx| Box::pin(async move {
let data = vec![0u8; 100 * 1024 * 1024]; // 100MB
let start = Instant::now();
ctx.client.write_file("/large.bin", &data).await?;
let write_duration = start.elapsed();
let start = Instant::now();
let read_data = ctx.client.read_file("/large.bin").await?;
let read_duration = start.elapsed();
assert_eq!(read_data.len(), data.len());
ctx.metrics.record("large_file_write_duration", write_duration);
ctx.metrics.record("large_file_read_duration", read_duration);
Ok(())
})),
// ... other fields
});
// Node failure during write
self.add_scenario(TestScenario {
name: "node_failure_during_write".to_string(),
description: "Test write operation with node failure".to_string(),
execute: Box::new(|ctx| Box::pin(async move {
// Start writing large file
let write_handle = tokio::spawn({
let mut client = ctx.client.clone();
async move {
let data = vec![0u8; 50 * 1024 * 1024];
client.write_file("/failure_test.bin", &data).await
}
});
// Wait a bit then kill a node
tokio::time::sleep(Duration::from_millis(500)).await;
ctx.cluster.kill_node(1).await?;
// Write should still complete
write_handle.await??;
// Verify data integrity
let data = ctx.client.read_file("/failure_test.bin").await?;
assert_eq!(data.len(), 50 * 1024 * 1024);
Ok(())
})),
// ... other fields
});
}
}Chaos Testing
pub struct ChaosEngine {
scenarios: Vec<ChaosScenario>,
rng: StdRng,
active: Arc<AtomicBool>,
}
pub enum ChaosScenario {
KillRandomNode { probability: f64 },
NetworkPartition { probability: f64, duration: Duration },
HighLatency { probability: f64, latency: Duration },
DiskFailure { probability: f64 },
CPUStress { probability: f64, load: f64 },
}
impl ChaosEngine {
pub async fn run(&mut self, cluster: &mut TestCluster, duration: Duration) {
let start = Instant::now();
self.active.store(true, Ordering::Release);
while start.elapsed() < duration && self.active.load(Ordering::Acquire) {
// Select random chaos scenario
let scenario = self.scenarios.choose(&mut self.rng).unwrap();
match scenario {
ChaosScenario::KillRandomNode { probability } => {
if self.rng.gen::<f64>() < *probability {
let node_id = self.rng.gen_range(0..cluster.nodes.len());
info!("Chaos: Killing node {}", node_id);
cluster.kill_node(node_id).await.ok();
// Restart after delay
tokio::time::sleep(Duration::from_secs(10)).await;
cluster.restart_node(node_id).await.ok();
}
}
ChaosScenario::NetworkPartition { probability, duration } => {
if self.rng.gen::<f64>() < *probability {
info!("Chaos: Creating network partition");
cluster.create_partition(duration).await.ok();
}
}
// ... other scenarios
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}Test Reporting
pub struct TestReporter {
results: Vec<TestResult>,
metrics: HashMap<String, Vec<f64>>,
start_time: SystemTime,
}
impl TestReporter {
pub fn generate_report(&self) -> Report {
let duration = SystemTime::now()
.duration_since(self.start_time)
.unwrap();
let passed = self.results.iter().filter(|r| r.passed).count();
let failed = self.results.iter().filter(|r| !r.passed).count();
Report {
summary: Summary {
total_tests: self.results.len(),
passed,
failed,
duration,
pass_rate: passed as f64 / self.results.len() as f64 * 100.0,
},
results: self.results.clone(),
metrics: self.calculate_metric_stats(),
timestamp: SystemTime::now(),
}
}
pub fn generate_html(&self) -> String {
let report = self.generate_report();
format!(
r#"
<!DOCTYPE html>
<html>
<head>
<title>WormFS Test Report</title>
<style>
.passed {{ color: green; }}
.failed {{ color: red; }}
table {{ border-collapse: collapse; }}
th, td {{ border: 1px solid #ddd; padding: 8px; }}
</style>
</head>
<body>
<h1>WormFS Test Report</h1>
<h2>Summary</h2>
<p>Total Tests: {}</p>
<p>Passed: <span class="passed">{}</span></p>
<p>Failed: <span class="failed">{}</span></p>
<p>Pass Rate: {:.1}%</p>
<p>Duration: {:?}</p>
<h2>Test Results</h2>
<table>
<tr>
<th>Test Name</th>
<th>Status</th>
<th>Duration</th>
<th>Error</th>
</tr>
{}
</table>
<h2>Performance Metrics</h2>
<table>
<tr>
<th>Metric</th>
<th>Min</th>
<th>Avg</th>
<th>Max</th>
<th>P99</th>
</tr>
{}
</table>
</body>
</html>
"#,
report.summary.total_tests,
report.summary.passed,
report.summary.failed,
report.summary.pass_rate,
report.summary.duration,
self.format_test_results(),
self.format_metrics(),
)
}
}Implementation Tasks
Day 3: Framework Core
- Create WormValidator structure
- Implement cluster bootstrapping
- Create test client
- Add test context management
- Implement test runner
Day 4: Scenarios & Reporting
- Define test scenarios
- Implement chaos engine
- Create test reporter
- Add HTML report generation
- Implement CI integration
Testing Requirements
Self-Tests
- Test cluster bootstrapping
- Test client operations
- Test scenario execution
- Test chaos engine
- Test report generation
Validation Tests
- Validate all scenarios pass
- Test failure detection
- Test metric collection
- Test report accuracy
Configuration
[validator]
cluster_size = 3
test_duration_minutes = 30
chaos_enabled = true
report_format = "html"
output_dir = "test_results"
[scenarios]
enabled = ["basic_ops", "large_files", "node_failures", "chaos"]
[chaos]
kill_node_probability = 0.1
partition_probability = 0.05
latency_probability = 0.2Success Criteria
- All test scenarios execute
- Chaos testing finds no data loss
- Reports generated correctly
- CI integration works
- Performance baselines established
- >90% code coverage of validator
Dependencies
tempfile- Temporary directoriestokio- Async runtimetonic- gRPC clientrand- Random number generation
References
Blocked By: None
Blocks: Phase 5.3 (Production Deployment)
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
enhancementNew feature or requestNew feature or request