Skip to content

Commit 7e4007c

Browse files
committed
feat: token handling
1 parent a55e7f2 commit 7e4007c

File tree

19 files changed

+513
-578
lines changed

19 files changed

+513
-578
lines changed

AGENTS.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ impl TlsConfig {
244244

245245
## Code Quality
246246

247-
**Linting:** `cargo +1.92 clippy --all-targets -- -D warnings`
247+
**Linting:** `cargo +1.92 clippy --workspace --all-targets -- -D warnings`
248248

249249
**Formatting:** `cargo +nightly fmt` (nightly required)
250250

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ aes-gcm = "0.10"
5353
aes-kw = "0.2"
5454
zeroize = { version = "1", features = ["derive"] }
5555
bcrypt = "0.17"
56-
ed25519-dalek = { version = "2", features = ["pkcs8", "pem", "rand_core"] }
56+
ed25519-dalek = { version = "2", features = ["pkcs8", "pem", "rand_core", "zeroize"] }
5757
# JWT: rust_crypto backend provides CryptoProvider auto-registration; ed25519-dalek for EdDSA
5858
jsonwebtoken = { version = "10", features = ["rust_crypto"] }
5959

DESIGN.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -792,12 +792,12 @@ Each refresh token belongs to a **family** (random 16-byte ID assigned at initia
792792

793793
Entity lifecycle changes trigger cascade token revocation through the Raft state machine:
794794

795-
| Event | Revocation Scope | Mechanism |
796-
| ---------------------------- | ------------------------------------- | --------------------------------------------------- |
797-
| App disabled | All tokens for app (all vaults) | `RevokeAllSubjectTokens(App(slug))` |
798-
| App-vault connection removed | Tokens for app+vault pair | `RevokeAppVaultTokens(app, vault)` |
799-
| Organization deleted | All signing keys + all refresh tokens | `delete_org_signing_keys` + subject index scan |
800-
| Password change | All user sessions | `RevokeAllUserSessions` (increments `TokenVersion`) |
795+
| Event | Revocation Scope | Mechanism |
796+
| ---------------------------- | ------------------------------------- | ---------------------------------------------------------- |
797+
| App disabled | All tokens for app (all vaults) | Inline cascade in `SetAppEnabled` apply handler |
798+
| App-vault connection removed | Tokens for app+vault pair | Inline cascade in `RemoveAppVault` apply handler |
799+
| Organization deleted | All signing keys + all refresh tokens | `delete_org_signing_keys` + subject index scan |
800+
| Password change | All user sessions | `RevokeAllUserSessions` (increments `TokenVersion`) |
801801

802802
**TokenVersion for immediate user invalidation**: Each user has an atomic `TokenVersion` counter. `RevokeAllUserSessions` increments it. `ValidateToken` compares the token's `version` claim against current state — stale versions are rejected without waiting for token expiry.
803803

MANIFEST.md

Lines changed: 135 additions & 41 deletions
Large diffs are not rendered by default.

crates/proto/src/convert.rs

Lines changed: 2 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ use inferadb_ledger_types::{
3636
events::{EventAction, EventEmission, EventEntry, EventOutcome, EventScope},
3737
merkle::MerkleProof as InternalMerkleProof,
3838
token::{
39-
TokenPair as DomainTokenPair, TokenType, UserSessionClaims as DomainUserSessionClaims,
40-
ValidatedToken, VaultTokenClaims as DomainVaultTokenClaims,
39+
TokenType, UserSessionClaims as DomainUserSessionClaims, ValidatedToken,
40+
VaultTokenClaims as DomainVaultTokenClaims,
4141
},
4242
};
4343
use openraft::Vote;
@@ -971,25 +971,6 @@ pub fn user_role_from_i32(value: i32) -> Result<inferadb_ledger_types::UserRole,
971971
inferadb_ledger_types::UserRole::try_from(proto_role)
972972
}
973973

974-
// =============================================================================
975-
// TokenPair conversions (domain::TokenPair <-> proto::TokenPair)
976-
// =============================================================================
977-
978-
/// Converts a domain [`TokenPair`](DomainTokenPair) to its protobuf representation.
979-
///
980-
/// The domain `token_type` field is intentionally dropped — the proto response
981-
/// context already implies the token type (user session vs vault access).
982-
impl From<DomainTokenPair> for proto::TokenPair {
983-
fn from(pair: DomainTokenPair) -> Self {
984-
proto::TokenPair {
985-
access_token: pair.access_token,
986-
refresh_token: pair.refresh_token,
987-
access_expires_at: Some(datetime_to_proto_timestamp(&pair.access_expires_at)),
988-
refresh_expires_at: Some(datetime_to_proto_timestamp(&pair.refresh_expires_at)),
989-
}
990-
}
991-
}
992-
993974
// =============================================================================
994975
// ValidatedToken conversions (domain::ValidatedToken -> proto::ValidateTokenResponse)
995976
// =============================================================================
@@ -2473,31 +2454,6 @@ mod tests {
24732454
assert_eq!(result.expect("unspecified should map to User"), UserRole::User);
24742455
}
24752456

2476-
// -------------------------------------------------------------------------
2477-
// TokenPair conversion tests
2478-
// -------------------------------------------------------------------------
2479-
2480-
#[test]
2481-
fn token_pair_domain_to_proto() {
2482-
use inferadb_ledger_types::token::{TokenPair as DomainTP, TokenType};
2483-
2484-
let dt = DateTime::from_timestamp(1700000000, 500_000_000).unwrap();
2485-
let pair = DomainTP {
2486-
access_token: "access.jwt".to_string(),
2487-
refresh_token: "ilrt_refresh".to_string(),
2488-
access_expires_at: dt,
2489-
refresh_expires_at: dt,
2490-
token_type: TokenType::UserSession,
2491-
};
2492-
2493-
let proto_pair: proto::TokenPair = pair.into();
2494-
assert_eq!(proto_pair.access_token, "access.jwt");
2495-
assert_eq!(proto_pair.refresh_token, "ilrt_refresh");
2496-
let ts = proto_pair.access_expires_at.unwrap();
2497-
assert_eq!(ts.seconds, 1700000000);
2498-
assert_eq!(ts.nanos, 500_000_000);
2499-
}
2500-
25012457
// -------------------------------------------------------------------------
25022458
// ValidatedToken conversion tests
25032459
// -------------------------------------------------------------------------
@@ -2699,28 +2655,4 @@ mod tests {
26992655
assert!(validate_signing_key_status("").is_err());
27002656
assert!(validate_signing_key_status("Active").is_err()); // case-sensitive
27012657
}
2702-
2703-
// -------------------------------------------------------------------------
2704-
// TokenPair roundtrip test
2705-
// -------------------------------------------------------------------------
2706-
2707-
#[test]
2708-
fn token_pair_roundtrip_timestamps_preserved() {
2709-
use inferadb_ledger_types::token::{TokenPair as DomainTP, TokenType};
2710-
2711-
let access_dt = DateTime::from_timestamp(1700001800, 0).unwrap();
2712-
let refresh_dt = DateTime::from_timestamp(1700086200, 0).unwrap();
2713-
2714-
let pair = DomainTP {
2715-
access_token: "a".to_string(),
2716-
refresh_token: "r".to_string(),
2717-
access_expires_at: access_dt,
2718-
refresh_expires_at: refresh_dt,
2719-
token_type: TokenType::UserSession,
2720-
};
2721-
2722-
let proto_pair: proto::TokenPair = pair.into();
2723-
assert_eq!(proto_pair.access_expires_at.unwrap().seconds, 1700001800);
2724-
assert_eq!(proto_pair.refresh_expires_at.unwrap().seconds, 1700086200);
2725-
}
27262658
}

crates/raft/src/log_storage/operations.rs

Lines changed: 41 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ use inferadb_ledger_state::{
1717
};
1818
use inferadb_ledger_store::StorageBackend;
1919
use inferadb_ledger_types::{
20-
AppId, AppSlug, Hash, LedgerErrorCode, Operation, OrganizationId, TeamId, TeamSlug,
21-
TokenSubject, TokenType, VaultEntry, VaultId, compute_tx_merkle_root, decode, encode,
20+
AppId, AppSlug, Hash, LedgerErrorCode, Operation, OrganizationId, SetCondition, TeamId,
21+
TeamSlug, TokenSubject, TokenType, VaultEntry, VaultId, compute_tx_merkle_root, decode, encode,
2222
events::{EventAction, EventEntry, EventOutcome},
2323
};
2424

@@ -58,33 +58,55 @@ fn try_encode<T: serde::Serialize>(value: &T, context: &str) -> Option<Vec<u8>>
5858
/// the record on its next poll cycle.
5959
///
6060
/// This is a synchronous write through the state layer (not Raft) because
61-
/// the apply handler is already inside a Raft commit.
61+
/// the apply handler is already inside a Raft commit. The saga ID is derived
62+
/// deterministically from the scope to ensure all replicas produce identical
63+
/// state when applying the same log entry.
6264
fn write_signing_key_saga<B: StorageBackend>(
6365
state_layer: &Arc<StateLayer<B>>,
6466
scope: SigningKeyScope,
6567
) {
66-
let saga_id = uuid::Uuid::new_v4().to_string();
68+
let saga_id = match &scope {
69+
SigningKeyScope::Global => "create-signing-key-global".to_owned(),
70+
SigningKeyScope::Organization(org_id) => {
71+
format!("create-signing-key-org-{}", org_id.value())
72+
},
73+
};
6774
let saga = CreateSigningKeySaga::new(saga_id.clone(), CreateSigningKeyInput { scope });
6875
let wrapped = Saga::CreateSigningKey(saga);
6976

7077
let key = format!("saga:{saga_id}");
7178
match serde_json::to_vec(&wrapped) {
7279
Ok(value) => {
73-
let op =
74-
Operation::SetEntity { key: key.clone(), value, condition: None, expires_at: None };
75-
if let Err(e) = state_layer.apply_operations(SYSTEM_VAULT_ID, &[op], 0) {
76-
tracing::error!(
77-
saga_id = %saga_id,
78-
scope = ?scope,
79-
error = %e,
80-
"Failed to write CreateSigningKeySaga record"
81-
);
82-
} else {
83-
tracing::info!(
84-
saga_id = %saga_id,
85-
scope = ?scope,
86-
"Wrote CreateSigningKeySaga from apply handler"
87-
);
80+
let op = Operation::SetEntity {
81+
key: key.clone(),
82+
value,
83+
condition: Some(SetCondition::MustNotExist),
84+
expires_at: None,
85+
};
86+
match state_layer.apply_operations(SYSTEM_VAULT_ID, &[op], 0) {
87+
Ok(_) => {
88+
tracing::info!(
89+
saga_id = %saga_id,
90+
scope = ?scope,
91+
"Wrote CreateSigningKeySaga from apply handler"
92+
);
93+
},
94+
Err(StateError::PreconditionFailed { .. }) => {
95+
// Expected on log replay — saga already exists from prior apply.
96+
tracing::info!(
97+
saga_id = %saga_id,
98+
scope = ?scope,
99+
"CreateSigningKeySaga already exists (idempotent replay)"
100+
);
101+
},
102+
Err(e) => {
103+
tracing::error!(
104+
saga_id = %saga_id,
105+
scope = ?scope,
106+
error = %e,
107+
"Failed to write CreateSigningKeySaga record"
108+
);
109+
},
88110
}
89111
},
90112
Err(e) => {
@@ -4138,41 +4160,6 @@ impl<B: StorageBackend> RaftLogStore<B> {
41384160
}
41394161
},
41404162

4141-
LedgerRequest::RevokeAllSubjectTokens { subject } => {
4142-
let Some(state_layer) = &self.state_layer else {
4143-
return error_result(LedgerErrorCode::Internal, "State layer not available");
4144-
};
4145-
let sys = SystemOrganizationService::new(state_layer.clone());
4146-
4147-
match sys.revoke_all_subject_tokens(subject, block_timestamp) {
4148-
Ok(result) => {
4149-
(LedgerResponse::SubjectTokensRevoked { count: result.revoked_count }, None)
4150-
},
4151-
Err(e) => error_result(
4152-
LedgerErrorCode::Internal,
4153-
format!("Failed to revoke subject tokens: {e}"),
4154-
),
4155-
}
4156-
},
4157-
4158-
LedgerRequest::RevokeAppVaultTokens { app, vault } => {
4159-
let Some(state_layer) = &self.state_layer else {
4160-
return error_result(LedgerErrorCode::Internal, "State layer not available");
4161-
};
4162-
let sys = SystemOrganizationService::new(state_layer.clone());
4163-
4164-
match sys.revoke_app_vault_tokens(*app, *vault, block_timestamp) {
4165-
Ok(result) => (
4166-
LedgerResponse::AppVaultTokensRevoked { count: result.revoked_count },
4167-
None,
4168-
),
4169-
Err(e) => error_result(
4170-
LedgerErrorCode::Internal,
4171-
format!("Failed to revoke app vault tokens: {e}"),
4172-
),
4173-
}
4174-
},
4175-
41764163
LedgerRequest::RevokeAllUserSessions { user } => {
41774164
let Some(state_layer) = &self.state_layer else {
41784165
return error_result(LedgerErrorCode::Internal, "State layer not available");

crates/raft/src/saga_orchestrator.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1247,6 +1247,19 @@ impl<B: StorageBackend + 'static> SagaOrchestrator<B> {
12471247
&self,
12481248
saga: &mut CreateSigningKeySaga,
12491249
) -> Result<(), SagaError> {
1250+
// Check timeout before each step (30s — signing key creation is fast)
1251+
let timeout = Duration::from_secs(30);
1252+
if saga.is_timed_out(timeout) {
1253+
warn!(
1254+
saga_id = %saga.id,
1255+
scope = ?saga.input.scope,
1256+
elapsed_secs = (chrono::Utc::now() - saga.created_at).num_seconds(),
1257+
"CreateSigningKey saga timed out"
1258+
);
1259+
saga.transition(CreateSigningKeySagaState::TimedOut);
1260+
return Ok(());
1261+
}
1262+
12501263
match saga.state.clone() {
12511264
CreateSigningKeySagaState::Pending => {
12521265
// Idempotency: check if an active key already exists for this scope

crates/raft/src/token_maintenance.rs

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -65,22 +65,6 @@ pub struct TokenMaintenanceJob<B: StorageBackend + 'static> {
6565
}
6666

6767
impl<B: StorageBackend + 'static> TokenMaintenanceJob<B> {
68-
/// Creates a maintenance job from a config interval in seconds.
69-
pub fn from_interval_secs(
70-
raft: Arc<Raft<LedgerTypeConfig>>,
71-
node_id: LedgerNodeId,
72-
state: Arc<StateLayer<B>>,
73-
interval_secs: u64,
74-
) -> Self {
75-
Self {
76-
raft,
77-
node_id,
78-
state,
79-
interval: Duration::from_secs(interval_secs),
80-
watchdog_handle: None,
81-
}
82-
}
83-
8468
/// Checks if this node is the current leader.
8569
fn is_leader(&self) -> bool {
8670
let metrics = self.raft.metrics().borrow().clone();
@@ -103,6 +87,7 @@ impl<B: StorageBackend + 'static> TokenMaintenanceJob<B> {
10387
debug!(trace_id = %trace_ctx.trace_id, "Starting token maintenance cycle");
10488

10589
let mut result = MaintenanceResult::default();
90+
let mut had_errors = false;
10691

10792
// Phase 1: Delete expired refresh tokens (the apply handler does the actual work)
10893
match self
@@ -125,6 +110,7 @@ impl<B: StorageBackend + 'static> TokenMaintenanceJob<B> {
125110
}
126111
},
127112
Err(e) => {
113+
had_errors = true;
128114
warn!(
129115
trace_id = %trace_ctx.trace_id,
130116
error = %e,
@@ -155,6 +141,7 @@ impl<B: StorageBackend + 'static> TokenMaintenanceJob<B> {
155141
);
156142
},
157143
Err(e) => {
144+
had_errors = true;
158145
warn!(
159146
trace_id = %trace_ctx.trace_id,
160147
kid = %kid,
@@ -166,6 +153,7 @@ impl<B: StorageBackend + 'static> TokenMaintenanceJob<B> {
166153
}
167154
},
168155
Err(e) => {
156+
had_errors = true;
169157
warn!(
170158
trace_id = %trace_ctx.trace_id,
171159
error = %e,
@@ -176,7 +164,8 @@ impl<B: StorageBackend + 'static> TokenMaintenanceJob<B> {
176164

177165
let duration = cycle_start.elapsed().as_secs_f64();
178166
record_background_job_duration("token_maintenance", duration);
179-
record_background_job_run("token_maintenance", "success");
167+
let status = if had_errors { "failure" } else { "success" };
168+
record_background_job_run("token_maintenance", status);
180169
record_background_job_items(
181170
"token_maintenance",
182171
result.expired_tokens_deleted + result.signing_keys_revoked,

0 commit comments

Comments
 (0)