Skip to content

Commit 91e573f

Browse files
committed
chore: expand test coverage
1 parent c8121ff commit 91e573f

File tree

5 files changed

+1137
-0
lines changed

5 files changed

+1137
-0
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
target/
22
.DS_Store
3+
build_rs_cov.profraw
Lines changed: 375 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,375 @@
1+
//! Integration tests for the orphan cleanup background job.
2+
//!
3+
//! Tests that:
4+
//! - Orphan cleanup job starts and runs without errors
5+
//! - Orphan cleanup only runs on leader
6+
//! - Deleted users' memberships are identified as orphans
7+
//! - Orphaned memberships are cleaned up
8+
9+
#![allow(clippy::unwrap_used, clippy::expect_used, clippy::panic, clippy::disallowed_methods)]
10+
11+
mod common;
12+
13+
use std::time::Duration;
14+
15+
use common::{TestCluster, create_admin_client, create_read_client, create_write_client};
16+
use serial_test::serial;
17+
18+
// ============================================================================
19+
// Test Helpers
20+
// ============================================================================
21+
22+
/// Create a namespace and return its ID.
23+
async fn create_namespace(
24+
addr: std::net::SocketAddr,
25+
name: &str,
26+
) -> Result<i64, Box<dyn std::error::Error>> {
27+
let mut client = create_admin_client(addr).await?;
28+
let response = client
29+
.create_namespace(inferadb_ledger_raft::proto::CreateNamespaceRequest {
30+
name: name.to_string(),
31+
shard_id: None,
32+
})
33+
.await?;
34+
35+
let namespace_id =
36+
response.into_inner().namespace_id.map(|n| n.id).ok_or("No namespace_id in response")?;
37+
38+
Ok(namespace_id)
39+
}
40+
41+
/// Write an entity to a specific namespace.
42+
async fn write_entity(
43+
addr: std::net::SocketAddr,
44+
namespace_id: i64,
45+
vault_id: i64,
46+
key: &str,
47+
value: &serde_json::Value,
48+
client_id: &str,
49+
sequence: u64,
50+
) -> Result<(), Box<dyn std::error::Error>> {
51+
let mut client = create_write_client(addr).await?;
52+
53+
let request = inferadb_ledger_raft::proto::WriteRequest {
54+
namespace_id: Some(inferadb_ledger_raft::proto::NamespaceId { id: namespace_id }),
55+
vault_id: Some(inferadb_ledger_raft::proto::VaultId { id: vault_id }),
56+
client_id: Some(inferadb_ledger_raft::proto::ClientId { id: client_id.to_string() }),
57+
sequence,
58+
operations: vec![inferadb_ledger_raft::proto::Operation {
59+
op: Some(inferadb_ledger_raft::proto::operation::Op::SetEntity(
60+
inferadb_ledger_raft::proto::SetEntity {
61+
key: key.to_string(),
62+
value: serde_json::to_vec(value).unwrap(),
63+
condition: None,
64+
expires_at: None,
65+
},
66+
)),
67+
}],
68+
include_tx_proof: false,
69+
};
70+
71+
let response = client.write(request).await?.into_inner();
72+
73+
match response.result {
74+
Some(inferadb_ledger_raft::proto::write_response::Result::Success(_)) => Ok(()),
75+
Some(inferadb_ledger_raft::proto::write_response::Result::Error(e)) => {
76+
Err(format!("Write error: {:?}", e).into())
77+
},
78+
None => Err("No result in write response".into()),
79+
}
80+
}
81+
82+
/// Read an entity from a namespace.
83+
async fn read_entity(
84+
addr: std::net::SocketAddr,
85+
namespace_id: i64,
86+
vault_id: i64,
87+
key: &str,
88+
) -> Result<Option<Vec<u8>>, Box<dyn std::error::Error>> {
89+
let mut client = create_read_client(addr).await?;
90+
91+
let request = inferadb_ledger_raft::proto::ReadRequest {
92+
namespace_id: Some(inferadb_ledger_raft::proto::NamespaceId { id: namespace_id }),
93+
vault_id: Some(inferadb_ledger_raft::proto::VaultId { id: vault_id }),
94+
key: key.to_string(),
95+
consistency: 0, // EVENTUAL
96+
};
97+
98+
let response = client.read(request).await?.into_inner();
99+
Ok(response.value)
100+
}
101+
102+
// ============================================================================
103+
// Orphan Cleanup Tests
104+
// ============================================================================
105+
106+
/// Test that orphan cleanup job starts and runs without errors.
107+
#[serial]
108+
#[tokio::test]
109+
async fn test_orphan_cleanup_job_starts() {
110+
let cluster = TestCluster::new(1).await;
111+
let _leader_id = cluster.wait_for_leader().await;
112+
let leader = cluster.leader().expect("should have leader");
113+
114+
// Orphan cleanup runs in background
115+
// Verify cluster remains healthy
116+
let metrics = leader.raft.metrics().borrow().clone();
117+
assert!(metrics.current_leader.is_some(), "leader should be elected");
118+
119+
// Give cleanup job time to run at least one cycle
120+
tokio::time::sleep(Duration::from_millis(100)).await;
121+
122+
// Cluster should still be healthy
123+
let metrics = leader.raft.metrics().borrow().clone();
124+
assert!(metrics.current_leader.is_some(), "cluster should remain healthy");
125+
}
126+
127+
/// Test that orphan cleanup only runs on leader.
128+
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
129+
async fn test_orphan_cleanup_leader_only() {
130+
let cluster = TestCluster::new(3).await;
131+
let leader_id = cluster.wait_for_leader().await;
132+
133+
// Verify we have followers
134+
let followers = cluster.followers();
135+
assert_eq!(followers.len(), 2, "should have 2 followers");
136+
137+
// Give cleanup time to potentially run
138+
tokio::time::sleep(Duration::from_millis(200)).await;
139+
140+
// Cluster should remain stable
141+
let new_leader_id = cluster.wait_for_leader().await;
142+
assert_eq!(leader_id, new_leader_id, "leader should not have changed");
143+
}
144+
145+
/// Test detection of deleted users.
146+
///
147+
/// Per DESIGN.md: Users with deleted_at or status=DELETED/DELETING are considered deleted.
148+
#[serial]
149+
#[tokio::test]
150+
async fn test_deleted_user_detection() {
151+
let cluster = TestCluster::new(1).await;
152+
let _leader_id = cluster.wait_for_leader().await;
153+
let leader = cluster.leader().expect("should have leader");
154+
155+
// Create a user with deleted_at timestamp
156+
let deleted_user_id = 1001i64;
157+
let deleted_user_key = format!("user:{}", deleted_user_id);
158+
let deleted_user_value = serde_json::json!({
159+
"id": deleted_user_id,
160+
"name": "Deleted User",
161+
"email": "deleted@example.com",
162+
"deleted_at": "2024-01-15T10:00:00Z",
163+
});
164+
165+
write_entity(leader.addr, 0, 0, &deleted_user_key, &deleted_user_value, "orphan-test", 1)
166+
.await
167+
.expect("create deleted user");
168+
169+
// Create a user with DELETED status
170+
let deleted_status_user_id = 1002i64;
171+
let deleted_status_user_key = format!("user:{}", deleted_status_user_id);
172+
let deleted_status_user_value = serde_json::json!({
173+
"id": deleted_status_user_id,
174+
"name": "Status Deleted User",
175+
"email": "status-deleted@example.com",
176+
"status": "DELETED",
177+
});
178+
179+
write_entity(
180+
leader.addr,
181+
0,
182+
0,
183+
&deleted_status_user_key,
184+
&deleted_status_user_value,
185+
"orphan-test",
186+
2,
187+
)
188+
.await
189+
.expect("create status-deleted user");
190+
191+
// Create an active user
192+
let active_user_id = 1003i64;
193+
let active_user_key = format!("user:{}", active_user_id);
194+
let active_user_value = serde_json::json!({
195+
"id": active_user_id,
196+
"name": "Active User",
197+
"email": "active@example.com",
198+
"status": "ACTIVE",
199+
});
200+
201+
write_entity(leader.addr, 0, 0, &active_user_key, &active_user_value, "orphan-test", 3)
202+
.await
203+
.expect("create active user");
204+
205+
// Verify all users were written
206+
let deleted_bytes = read_entity(leader.addr, 0, 0, &deleted_user_key)
207+
.await
208+
.expect("read deleted user")
209+
.expect("deleted user should exist");
210+
211+
let deleted_user: serde_json::Value = serde_json::from_slice(&deleted_bytes).unwrap();
212+
assert!(deleted_user.get("deleted_at").is_some(), "User should have deleted_at");
213+
214+
let status_bytes = read_entity(leader.addr, 0, 0, &deleted_status_user_key)
215+
.await
216+
.expect("read status-deleted user")
217+
.expect("status-deleted user should exist");
218+
219+
let status_user: serde_json::Value = serde_json::from_slice(&status_bytes).unwrap();
220+
assert_eq!(status_user.get("status").and_then(|s| s.as_str()), Some("DELETED"));
221+
222+
let active_bytes = read_entity(leader.addr, 0, 0, &active_user_key)
223+
.await
224+
.expect("read active user")
225+
.expect("active user should exist");
226+
227+
let active_user: serde_json::Value = serde_json::from_slice(&active_bytes).unwrap();
228+
assert_eq!(active_user.get("status").and_then(|s| s.as_str()), Some("ACTIVE"));
229+
assert!(active_user.get("deleted_at").is_none(), "Active user should not have deleted_at");
230+
}
231+
232+
/// Test membership data format for orphan detection.
233+
#[serial]
234+
#[tokio::test]
235+
async fn test_membership_data_format() {
236+
let cluster = TestCluster::new(1).await;
237+
let _leader_id = cluster.wait_for_leader().await;
238+
let leader = cluster.leader().expect("should have leader");
239+
240+
// Create a namespace
241+
let ns_id =
242+
create_namespace(leader.addr, "membership-test-ns").await.expect("create namespace");
243+
244+
// Create a membership record
245+
let user_id = 2001i64;
246+
let member_key = format!("member:{}", user_id);
247+
let member_value = serde_json::json!({
248+
"user_id": user_id,
249+
"role": "member",
250+
"created_at": "2024-01-01T00:00:00Z",
251+
});
252+
253+
write_entity(leader.addr, ns_id, 0, &member_key, &member_value, "membership-test", 1)
254+
.await
255+
.expect("create membership");
256+
257+
// Verify membership was written
258+
let member_bytes = read_entity(leader.addr, ns_id, 0, &member_key)
259+
.await
260+
.expect("read membership")
261+
.expect("membership should exist");
262+
263+
let membership: serde_json::Value = serde_json::from_slice(&member_bytes).unwrap();
264+
assert_eq!(membership.get("user_id").and_then(|v| v.as_i64()), Some(user_id));
265+
assert_eq!(membership.get("role").and_then(|r| r.as_str()), Some("member"));
266+
}
267+
268+
/// Test that orphan cleanup respects system namespace boundaries.
269+
///
270+
/// Cleanup should skip the _system namespace (namespace_id = 0).
271+
#[serial]
272+
#[tokio::test]
273+
async fn test_orphan_cleanup_skips_system_namespace() {
274+
let cluster = TestCluster::new(1).await;
275+
let _leader_id = cluster.wait_for_leader().await;
276+
let leader = cluster.leader().expect("should have leader");
277+
278+
// Write a "member" record directly to _system (which shouldn't be cleaned up)
279+
let system_member_key = "member:9999";
280+
let system_member_value = serde_json::json!({
281+
"user_id": 9999,
282+
"role": "admin",
283+
"note": "This is in _system and should not be cleaned",
284+
});
285+
286+
write_entity(
287+
leader.addr,
288+
0,
289+
0,
290+
system_member_key,
291+
&system_member_value,
292+
"system-member-test",
293+
1,
294+
)
295+
.await
296+
.expect("create system member");
297+
298+
// Give cleanup time to run
299+
tokio::time::sleep(Duration::from_millis(200)).await;
300+
301+
// The system member should still exist (cleanup skips _system)
302+
let member_bytes = read_entity(leader.addr, 0, 0, system_member_key)
303+
.await
304+
.expect("read system member")
305+
.expect("system member should still exist");
306+
307+
let member: serde_json::Value = serde_json::from_slice(&member_bytes).unwrap();
308+
assert_eq!(member.get("user_id").and_then(|v| v.as_i64()), Some(9999));
309+
}
310+
311+
/// Test orphan cleanup handles empty namespaces gracefully.
312+
#[serial]
313+
#[tokio::test]
314+
async fn test_orphan_cleanup_handles_empty_namespace() {
315+
let cluster = TestCluster::new(1).await;
316+
let _leader_id = cluster.wait_for_leader().await;
317+
let leader = cluster.leader().expect("should have leader");
318+
319+
// Create a namespace with no memberships
320+
let _ns_id = create_namespace(leader.addr, "empty-ns").await.expect("create namespace");
321+
322+
// Give cleanup time to run
323+
tokio::time::sleep(Duration::from_millis(200)).await;
324+
325+
// Cluster should remain healthy (no errors from empty namespace scan)
326+
let metrics = leader.raft.metrics().borrow().clone();
327+
assert!(metrics.current_leader.is_some(), "cluster should remain healthy");
328+
}
329+
330+
/// Test concurrent background jobs don't interfere.
331+
#[serial]
332+
#[tokio::test]
333+
async fn test_orphan_cleanup_with_concurrent_jobs() {
334+
let cluster = TestCluster::new(3).await;
335+
let leader = cluster.leader().expect("has leader");
336+
337+
// Create some state to exercise all background jobs
338+
let mut client = create_admin_client(leader.addr).await.unwrap();
339+
340+
// Create namespace
341+
let ns_response = client
342+
.create_namespace(inferadb_ledger_raft::proto::CreateNamespaceRequest {
343+
name: "concurrent-jobs-test".to_string(),
344+
shard_id: None,
345+
})
346+
.await
347+
.unwrap();
348+
349+
let namespace_id = ns_response.into_inner().namespace_id.map(|n| n.id).unwrap();
350+
351+
// Create vault
352+
let _vault_response = client
353+
.create_vault(inferadb_ledger_raft::proto::CreateVaultRequest {
354+
namespace_id: Some(inferadb_ledger_raft::proto::NamespaceId { id: namespace_id }),
355+
replication_factor: 0,
356+
initial_nodes: vec![],
357+
retention_policy: None,
358+
})
359+
.await
360+
.unwrap();
361+
362+
// Let all background jobs run concurrently for a bit
363+
// OrphanCleanup, TtlGC, SagaOrchestrator, AutoRecovery, LearnerRefresh
364+
tokio::time::sleep(Duration::from_millis(500)).await;
365+
366+
// Verify cluster is still healthy
367+
let leader_id = cluster.wait_for_leader().await;
368+
assert!(leader_id > 0, "cluster should still have a leader");
369+
370+
// All nodes should still be responsive
371+
for node in cluster.nodes() {
372+
let metrics = node.raft.metrics().borrow().clone();
373+
assert!(metrics.current_leader.is_some(), "node {} should know the leader", node.id);
374+
}
375+
}

0 commit comments

Comments
 (0)