@@ -81,12 +81,12 @@ impl ReadServiceImpl {
8181 ///
8282 /// Returns false if Raft is not configured.
8383 fn is_leader ( & self ) -> bool {
84- match ( & self . raft , & self . node_id ) {
85- ( Some ( raft) , Some ( node_id ) ) => {
84+ match & self . raft {
85+ Some ( raft) => {
8686 let metrics = raft. metrics ( ) . borrow ( ) . clone ( ) ;
87- metrics. current_leader == Some ( * node_id )
87+ metrics. current_leader == Some ( metrics . id )
8888 } ,
89- _ => false ,
89+ None => false ,
9090 }
9191 }
9292
@@ -469,6 +469,13 @@ impl ReadService for ReadServiceImpl {
469469
470470 ctx. end_storage_timer ( ) ;
471471
472+ // Filter out expired entities (expires_at == 0 means never expires)
473+ let now = std:: time:: SystemTime :: now ( )
474+ . duration_since ( std:: time:: UNIX_EPOCH )
475+ . map ( |d| d. as_secs ( ) )
476+ . unwrap_or ( 0 ) ;
477+ let entity = entity. filter ( |e| e. expires_at == 0 || e. expires_at > now) ;
478+
472479 let found = entity. is_some ( ) ;
473480 let value_size = entity. as_ref ( ) . map ( |e| e. value . len ( ) ) . unwrap_or ( 0 ) ;
474481 ctx. set_found ( found) ;
@@ -558,6 +565,12 @@ impl ReadService for ReadServiceImpl {
558565 // Start storage timer
559566 ctx. start_storage_timer ( ) ;
560567
568+ // Get current time for TTL filtering
569+ let now = std:: time:: SystemTime :: now ( )
570+ . duration_since ( std:: time:: UNIX_EPOCH )
571+ . map ( |d| d. as_secs ( ) )
572+ . unwrap_or ( 0 ) ;
573+
561574 // Read all keys from state layer
562575 let state = & * self . state ;
563576 let mut results = Vec :: with_capacity ( req. keys . len ( ) ) ;
@@ -574,6 +587,9 @@ impl ReadService for ReadServiceImpl {
574587 } ,
575588 } ;
576589
590+ // Filter out expired entities (expires_at == 0 means never expires)
591+ let entity = entity. filter ( |e| e. expires_at == 0 || e. expires_at > now) ;
592+
577593 let found = entity. is_some ( ) ;
578594 if found {
579595 found_count += 1 ;
@@ -1401,24 +1417,31 @@ impl ReadService for ReadServiceImpl {
14011417 let limit = if req. limit == 0 { 100 } else { req. limit as usize } ;
14021418 let prefix = if req. key_prefix . is_empty ( ) { None } else { Some ( req. key_prefix . as_str ( ) ) } ;
14031419
1404- // Entities are namespace-level (stored in vault_id=0 by convention)
1405- let vault_id = 0i64 ;
1420+ // Use vault_id from request, defaulting to 0 for namespace-level entities
1421+ let vault_id = req . vault_id . as_ref ( ) . map_or ( 0 , |v| v . id ) ;
14061422
14071423 // Compute query hash from filter parameters for token validation
14081424 // This prevents clients from changing filters mid-pagination
1409- let query_params =
1410- format ! ( "prefix:{},include_expired:{}" , req. key_prefix, req. include_expired) ;
1425+ let query_params = format ! (
1426+ "prefix:{},include_expired:{},vault:{}" ,
1427+ req. key_prefix, req. include_expired, vault_id
1428+ ) ;
14111429 let query_hash = PageTokenCodec :: compute_query_hash ( query_params. as_bytes ( ) ) ;
14121430
14131431 // Get current block height for consistent pagination
1414- let block_height = self
1415- . applied_state
1416- . all_vault_heights ( )
1417- . iter ( )
1418- . filter ( |( ( ns, _) , _) | * ns == namespace_id)
1419- . map ( |( _, h) | * h)
1420- . max ( )
1421- . unwrap_or ( 0 ) ;
1432+ let block_height = if vault_id != 0 {
1433+ // Specific vault requested - use its height
1434+ self . applied_state . vault_height ( namespace_id, vault_id)
1435+ } else {
1436+ // Namespace-level entities - use max height across all vaults in namespace
1437+ self . applied_state
1438+ . all_vault_heights ( )
1439+ . iter ( )
1440+ . filter ( |( ( ns, _) , _) | * ns == namespace_id)
1441+ . map ( |( _, h) | * h)
1442+ . max ( )
1443+ . unwrap_or ( 0 )
1444+ } ;
14221445
14231446 // Decode and validate page token if provided
14241447 let ( resume_key, at_height) = if req. page_token . is_empty ( ) {
0 commit comments