Skip to content

Commit b62ccd5

Browse files
committed
feat: proto deduplication, raft proposal timeout
1 parent 07b4e16 commit b62ccd5

18 files changed

+1694
-482
lines changed

DESIGN.md

Lines changed: 218 additions & 81 deletions
Large diffs are not rendered by default.

crates/raft/src/proto_convert.rs

Lines changed: 538 additions & 1 deletion
Large diffs are not rendered by default.

crates/raft/src/services/admin.rs

Lines changed: 8 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
55
use std::{collections::BTreeSet, net::SocketAddr, sync::Arc, time::Duration};
66

7-
use inferadb_ledger_state::{BlockArchive, StateLayer, system::NamespaceStatus};
7+
use inferadb_ledger_state::{BlockArchive, StateLayer};
88
use inferadb_ledger_store::{Database, FileBackend};
99
use inferadb_ledger_types::{
1010
NamespaceId as DomainNamespaceId, ShardId as DomainShardId, VaultEntry,
@@ -476,14 +476,7 @@ impl AdminService for AdminServiceImpl {
476476
Some(ns) => {
477477
ctx.set_namespace_id(ns.namespace_id.value());
478478
ctx.set_success();
479-
// Map internal NamespaceStatus to proto NamespaceStatus
480-
let status = match ns.status {
481-
NamespaceStatus::Active => crate::proto::NamespaceStatus::Active,
482-
NamespaceStatus::Migrating => crate::proto::NamespaceStatus::Migrating,
483-
NamespaceStatus::Suspended => crate::proto::NamespaceStatus::Suspended,
484-
NamespaceStatus::Deleting => crate::proto::NamespaceStatus::Deleting,
485-
NamespaceStatus::Deleted => crate::proto::NamespaceStatus::Deleted,
486-
};
479+
let status: crate::proto::NamespaceStatus = ns.status.into();
487480
Ok(Response::new(GetNamespaceResponse {
488481
namespace_id: Some(NamespaceId { id: ns.namespace_id.value() }),
489482
name: ns.name,
@@ -521,14 +514,7 @@ impl AdminService for AdminServiceImpl {
521514
.list_namespaces()
522515
.into_iter()
523516
.map(|ns| {
524-
// Map internal NamespaceStatus to proto NamespaceStatus
525-
let status = match ns.status {
526-
NamespaceStatus::Active => crate::proto::NamespaceStatus::Active,
527-
NamespaceStatus::Migrating => crate::proto::NamespaceStatus::Migrating,
528-
NamespaceStatus::Suspended => crate::proto::NamespaceStatus::Suspended,
529-
NamespaceStatus::Deleting => crate::proto::NamespaceStatus::Deleting,
530-
NamespaceStatus::Deleted => crate::proto::NamespaceStatus::Deleted,
531-
};
517+
let status: crate::proto::NamespaceStatus = ns.status.into();
532518
crate::proto::GetNamespaceResponse {
533519
namespace_id: Some(NamespaceId { id: ns.namespace_id.value() }),
534520
name: ns.name,
@@ -591,25 +577,12 @@ impl AdminService for AdminServiceImpl {
591577

592578
// Convert proto retention policy to internal type
593579
let retention_policy = req.retention_policy.map(|proto_policy| {
594-
use crate::proto::BlockRetentionMode as ProtoMode;
595-
let mode = match proto_policy.mode() {
596-
ProtoMode::Unspecified | ProtoMode::Full => {
597-
ctx.set_retention_mode("full");
598-
BlockRetentionMode::Full
599-
},
600-
ProtoMode::Compacted => {
601-
ctx.set_retention_mode("compacted");
602-
BlockRetentionMode::Compacted
603-
},
604-
};
605-
BlockRetentionPolicy {
606-
mode,
607-
retention_blocks: if proto_policy.retention_blocks > 0 {
608-
proto_policy.retention_blocks
609-
} else {
610-
10_000 // Default
611-
},
580+
let policy = BlockRetentionPolicy::from(&proto_policy);
581+
match policy.mode {
582+
BlockRetentionMode::Full => ctx.set_retention_mode("full"),
583+
BlockRetentionMode::Compacted => ctx.set_retention_mode("compacted"),
612584
}
585+
policy
613586
});
614587

615588
// Submit create vault through Raft

crates/raft/src/services/multi_shard_write.rs

Lines changed: 6 additions & 246 deletions
Original file line numberDiff line numberDiff line change
@@ -260,66 +260,6 @@ impl MultiShardWriteService {
260260
}
261261
}
262262

263-
/// Convert a proto SetCondition to internal SetCondition.
264-
fn convert_set_condition(
265-
proto_condition: &crate::proto::SetCondition,
266-
) -> Option<inferadb_ledger_types::SetCondition> {
267-
use crate::proto::set_condition::Condition;
268-
269-
proto_condition.condition.as_ref().map(|c| match c {
270-
Condition::NotExists(true) => inferadb_ledger_types::SetCondition::MustNotExist,
271-
Condition::NotExists(false) => inferadb_ledger_types::SetCondition::MustExist,
272-
Condition::MustExists(true) => inferadb_ledger_types::SetCondition::MustExist,
273-
Condition::MustExists(false) => inferadb_ledger_types::SetCondition::MustNotExist,
274-
Condition::Version(v) => inferadb_ledger_types::SetCondition::VersionEquals(*v),
275-
Condition::ValueEquals(v) => {
276-
inferadb_ledger_types::SetCondition::ValueEquals(v.clone())
277-
},
278-
})
279-
}
280-
281-
/// Convert a proto operation to internal operation.
282-
fn convert_operation(op: &Operation) -> Result<inferadb_ledger_types::Operation, Status> {
283-
use crate::proto::operation::Op;
284-
285-
let inner_op =
286-
op.op.as_ref().ok_or_else(|| Status::invalid_argument("Operation missing op field"))?;
287-
288-
match inner_op {
289-
Op::CreateRelationship(cr) => {
290-
Ok(inferadb_ledger_types::Operation::CreateRelationship {
291-
resource: cr.resource.clone(),
292-
relation: cr.relation.clone(),
293-
subject: cr.subject.clone(),
294-
})
295-
},
296-
Op::DeleteRelationship(dr) => {
297-
Ok(inferadb_ledger_types::Operation::DeleteRelationship {
298-
resource: dr.resource.clone(),
299-
relation: dr.relation.clone(),
300-
subject: dr.subject.clone(),
301-
})
302-
},
303-
Op::SetEntity(se) => {
304-
let condition = se.condition.as_ref().and_then(Self::convert_set_condition);
305-
306-
Ok(inferadb_ledger_types::Operation::SetEntity {
307-
key: se.key.clone(),
308-
value: se.value.clone(),
309-
condition,
310-
expires_at: se.expires_at,
311-
})
312-
},
313-
Op::DeleteEntity(de) => {
314-
Ok(inferadb_ledger_types::Operation::DeleteEntity { key: de.key.clone() })
315-
},
316-
Op::ExpireEntity(ee) => Ok(inferadb_ledger_types::Operation::ExpireEntity {
317-
key: ee.key.clone(),
318-
expired_at: ee.expired_at,
319-
}),
320-
}
321-
}
322-
323263
/// Build a ledger request from operations.
324264
///
325265
/// Server-assigned sequences: The transaction's sequence is set to 0 here;
@@ -332,8 +272,10 @@ impl MultiShardWriteService {
332272
client_id: &str,
333273
actor: &str,
334274
) -> Result<LedgerRequest, Status> {
335-
let internal_ops: Vec<inferadb_ledger_types::Operation> =
336-
operations.iter().map(Self::convert_operation).collect::<Result<Vec<_>, Status>>()?;
275+
let internal_ops: Vec<inferadb_ledger_types::Operation> = operations
276+
.iter()
277+
.map(inferadb_ledger_types::Operation::try_from)
278+
.collect::<Result<Vec<_>, Status>>()?;
337279

338280
if internal_ops.is_empty() {
339281
return Err(Status::invalid_argument("No operations provided"));
@@ -774,194 +716,12 @@ impl WriteService for MultiShardWriteService {
774716
}
775717

776718
#[cfg(test)]
777-
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic, clippy::disallowed_methods)]
778719
mod tests {
779-
use super::*;
780-
781720
#[test]
782721
fn test_multi_shard_write_service_creation() {
783722
// Basic struct test - full testing requires Raft setup
784723
}
785724

786-
#[test]
787-
fn test_convert_set_condition_not_exists() {
788-
use crate::proto::{SetCondition, set_condition::Condition};
789-
790-
let proto_condition = SetCondition { condition: Some(Condition::NotExists(true)) };
791-
792-
let result = MultiShardWriteService::convert_set_condition(&proto_condition);
793-
assert!(matches!(result, Some(inferadb_ledger_types::SetCondition::MustNotExist)));
794-
}
795-
796-
#[test]
797-
fn test_convert_set_condition_must_exists() {
798-
use crate::proto::{SetCondition, set_condition::Condition};
799-
800-
let proto_condition = SetCondition { condition: Some(Condition::MustExists(true)) };
801-
802-
let result = MultiShardWriteService::convert_set_condition(&proto_condition);
803-
assert!(matches!(result, Some(inferadb_ledger_types::SetCondition::MustExist)));
804-
}
805-
806-
#[test]
807-
fn test_convert_set_condition_version() {
808-
use crate::proto::{SetCondition, set_condition::Condition};
809-
810-
let proto_condition = SetCondition { condition: Some(Condition::Version(42)) };
811-
812-
let result = MultiShardWriteService::convert_set_condition(&proto_condition);
813-
assert!(matches!(result, Some(inferadb_ledger_types::SetCondition::VersionEquals(42))));
814-
}
815-
816-
#[test]
817-
fn test_convert_set_condition_value_equals() {
818-
use crate::proto::{SetCondition, set_condition::Condition};
819-
820-
let proto_condition =
821-
SetCondition { condition: Some(Condition::ValueEquals(b"test_value".to_vec())) };
822-
823-
let result = MultiShardWriteService::convert_set_condition(&proto_condition);
824-
match result {
825-
Some(inferadb_ledger_types::SetCondition::ValueEquals(v)) => {
826-
assert_eq!(v, b"test_value");
827-
},
828-
_ => unreachable!("Expected ValueEquals condition"),
829-
}
830-
}
831-
832-
#[test]
833-
fn test_convert_set_condition_none() {
834-
use crate::proto::SetCondition;
835-
836-
let proto_condition = SetCondition { condition: None };
837-
838-
let result = MultiShardWriteService::convert_set_condition(&proto_condition);
839-
assert!(result.is_none());
840-
}
841-
842-
#[test]
843-
fn test_convert_operation_create_relationship() {
844-
use crate::proto::{CreateRelationship, Operation, operation::Op};
845-
846-
let op = Operation {
847-
op: Some(Op::CreateRelationship(CreateRelationship {
848-
resource: "document:123".to_string(),
849-
relation: "viewer".to_string(),
850-
subject: "user:456".to_string(),
851-
})),
852-
};
853-
854-
let result = MultiShardWriteService::convert_operation(&op).unwrap();
855-
match result {
856-
inferadb_ledger_types::Operation::CreateRelationship {
857-
resource,
858-
relation,
859-
subject,
860-
} => {
861-
assert_eq!(resource, "document:123");
862-
assert_eq!(relation, "viewer");
863-
assert_eq!(subject, "user:456");
864-
},
865-
_ => unreachable!("Expected CreateRelationship operation"),
866-
}
867-
}
868-
869-
#[test]
870-
fn test_convert_operation_delete_relationship() {
871-
use crate::proto::{DeleteRelationship, Operation, operation::Op};
872-
873-
let op = Operation {
874-
op: Some(Op::DeleteRelationship(DeleteRelationship {
875-
resource: "document:123".to_string(),
876-
relation: "viewer".to_string(),
877-
subject: "user:456".to_string(),
878-
})),
879-
};
880-
881-
let result = MultiShardWriteService::convert_operation(&op).unwrap();
882-
match result {
883-
inferadb_ledger_types::Operation::DeleteRelationship {
884-
resource,
885-
relation,
886-
subject,
887-
} => {
888-
assert_eq!(resource, "document:123");
889-
assert_eq!(relation, "viewer");
890-
assert_eq!(subject, "user:456");
891-
},
892-
_ => unreachable!("Expected DeleteRelationship operation"),
893-
}
894-
}
895-
896-
#[test]
897-
fn test_convert_operation_set_entity() {
898-
use crate::proto::{Operation, SetEntity, operation::Op};
899-
900-
let op = Operation {
901-
op: Some(Op::SetEntity(SetEntity {
902-
key: "user:123".to_string(),
903-
value: b"test_data".to_vec(),
904-
condition: None,
905-
expires_at: Some(1000),
906-
})),
907-
};
908-
909-
let result = MultiShardWriteService::convert_operation(&op).unwrap();
910-
match result {
911-
inferadb_ledger_types::Operation::SetEntity { key, value, condition, expires_at } => {
912-
assert_eq!(key, "user:123");
913-
assert_eq!(value, b"test_data");
914-
assert!(condition.is_none());
915-
assert_eq!(expires_at, Some(1000));
916-
},
917-
_ => unreachable!("Expected SetEntity operation"),
918-
}
919-
}
920-
921-
#[test]
922-
fn test_convert_operation_delete_entity() {
923-
use crate::proto::{DeleteEntity, Operation, operation::Op};
924-
925-
let op =
926-
Operation { op: Some(Op::DeleteEntity(DeleteEntity { key: "user:123".to_string() })) };
927-
928-
let result = MultiShardWriteService::convert_operation(&op).unwrap();
929-
match result {
930-
inferadb_ledger_types::Operation::DeleteEntity { key } => {
931-
assert_eq!(key, "user:123");
932-
},
933-
_ => unreachable!("Expected DeleteEntity operation"),
934-
}
935-
}
936-
937-
#[test]
938-
fn test_convert_operation_expire_entity() {
939-
use crate::proto::{ExpireEntity, Operation, operation::Op};
940-
941-
let op = Operation {
942-
op: Some(Op::ExpireEntity(ExpireEntity {
943-
key: "user:123".to_string(),
944-
expired_at: 2000,
945-
})),
946-
};
947-
948-
let result = MultiShardWriteService::convert_operation(&op).unwrap();
949-
match result {
950-
inferadb_ledger_types::Operation::ExpireEntity { key, expired_at } => {
951-
assert_eq!(key, "user:123");
952-
assert_eq!(expired_at, 2000);
953-
},
954-
_ => unreachable!("Expected ExpireEntity operation"),
955-
}
956-
}
957-
958-
#[test]
959-
fn test_convert_operation_missing_op() {
960-
use crate::proto::Operation;
961-
962-
let op = Operation { op: None };
963-
964-
let result = MultiShardWriteService::convert_operation(&op);
965-
assert!(result.is_err());
966-
}
725+
// Proto↔domain conversion tests (set_condition, operation variants) are in
726+
// proto_convert.rs which owns the centralized From/TryFrom implementations.
967727
}

crates/raft/src/services/read.rs

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1294,11 +1294,7 @@ impl ReadService for ReadServiceImpl {
12941294
.into_iter()
12951295
.filter(|r| req.resource.as_ref().is_none_or(|res| r.resource == *res))
12961296
.filter(|r| req.relation.as_ref().is_none_or(|rel| r.relation == *rel))
1297-
.map(|r| crate::proto::Relationship {
1298-
resource: r.resource,
1299-
relation: r.relation,
1300-
subject: r.subject,
1301-
})
1297+
.map(|r| r.into())
13021298
.collect()
13031299
};
13041300

@@ -1486,16 +1482,7 @@ impl ReadService for ReadServiceImpl {
14861482
.collect();
14871483

14881484
// Convert to proto entities
1489-
let entities: Vec<crate::proto::Entity> = filtered
1490-
.iter()
1491-
.map(|e| crate::proto::Entity {
1492-
key: String::from_utf8_lossy(&e.key).to_string(),
1493-
value: e.value.clone(),
1494-
version: e.version,
1495-
// Convert 0 (never expires) to None
1496-
expires_at: if e.expires_at == 0 { None } else { Some(e.expires_at) },
1497-
})
1498-
.collect();
1485+
let entities: Vec<crate::proto::Entity> = filtered.iter().map(|e| e.into()).collect();
14991486

15001487
// Create secure pagination token from last key if there are more
15011488
let next_page_token = if entities.len() >= limit {

0 commit comments

Comments
 (0)