Skip to content

Commit df5cf07

Browse files
committed
feat: regional data residency
1 parent bbe6625 commit df5cf07

File tree

19 files changed

+2553
-445
lines changed

19 files changed

+2553
-445
lines changed

MANIFEST.md

Lines changed: 169 additions & 142 deletions
Large diffs are not rendered by default.

crates/proto/src/generated/ledger.v1.rs

Lines changed: 146 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -2140,43 +2140,58 @@ pub struct EraseUserRequest {
21402140
/// Region where the user's PII resides.
21412141
#[prost(enumeration = "Region", tag = "3")]
21422142
pub region: i32,
2143-
/// Confirmation token from RequestConfirmation RPC (hex-encoded).
2144-
/// Required — erasure is irreversible.
2145-
#[prost(string, tag = "4")]
2146-
pub confirmation_token: ::prost::alloc::string::String,
21472143
}
21482144
#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
21492145
pub struct EraseUserResponse {
21502146
/// User ID that was erased.
21512147
#[prost(int64, tag = "1")]
21522148
pub user_id: i64,
21532149
}
2154-
/// Request a confirmation token for an irreversible operation.
2150+
/// Migrate existing users from flat \_system store to regional directory
2151+
/// structure. Explicit admin action for pre-release data migration.
21552152
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
2156-
pub struct RequestConfirmationRequest {
2157-
/// The operation requiring confirmation.
2158-
#[prost(enumeration = "AdminPermission", tag = "1")]
2159-
pub operation: i32,
2160-
/// Target entity identifier (e.g., user ID string for EraseUser).
2153+
pub struct MigrateExistingUsersRequest {
2154+
/// Default region for users without an assigned region.
2155+
#[prost(enumeration = "Region", tag = "1")]
2156+
pub default_region: i32,
2157+
/// Hex-encoded email blinding key for HMAC computation.
2158+
/// The key stays in the handler and never enters the Raft log.
21612159
#[prost(string, tag = "2")]
2162-
pub target_id: ::prost::alloc::string::String,
2163-
/// Actor (admin user) requesting the confirmation.
2164-
#[prost(int64, tag = "3")]
2165-
pub actor_id: i64,
2160+
pub email_blinding_key: ::prost::alloc::string::String,
21662161
}
2167-
/// Response containing a time-limited, single-use confirmation token.
2168-
#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)]
2169-
pub struct RequestConfirmationResponse {
2170-
/// Opaque confirmation token (hex-encoded 32 random bytes).
2171-
/// Present this token when executing the confirmed operation.
2172-
#[prost(string, tag = "1")]
2173-
pub confirmation_token: ::prost::alloc::string::String,
2174-
/// When the token expires (RFC 3339 timestamp).
2175-
#[prost(string, tag = "2")]
2176-
pub expires_at: ::prost::alloc::string::String,
2177-
/// Human-readable summary of what the operation will do.
2178-
#[prost(string, tag = "3")]
2179-
pub operation_summary: ::prost::alloc::string::String,
2162+
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
2163+
pub struct MigrateExistingUsersResponse {
2164+
/// User records found in the flat \_system store.
2165+
#[prost(uint64, tag = "1")]
2166+
pub users: u64,
2167+
/// Users successfully migrated in this run.
2168+
#[prost(uint64, tag = "2")]
2169+
pub migrated: u64,
2170+
/// Users skipped (already have directory entries).
2171+
#[prost(uint64, tag = "3")]
2172+
pub skipped: u64,
2173+
/// Users that failed migration.
2174+
#[prost(uint64, tag = "4")]
2175+
pub errors: u64,
2176+
/// Elapsed time in seconds.
2177+
#[prost(double, tag = "5")]
2178+
pub elapsed_secs: f64,
2179+
}
2180+
/// Eagerly provision a Raft group for a region.
2181+
#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
2182+
pub struct ProvisionRegionRequest {
2183+
/// Region to provision.
2184+
#[prost(enumeration = "Region", tag = "1")]
2185+
pub region: i32,
2186+
}
2187+
#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)]
2188+
pub struct ProvisionRegionResponse {
2189+
/// Whether the region was newly created (false if already active).
2190+
#[prost(bool, tag = "1")]
2191+
pub created: bool,
2192+
/// Region that was provisioned.
2193+
#[prost(enumeration = "Region", tag = "2")]
2194+
pub region: i32,
21802195
}
21812196
/// Geographic region for data residency. Each Raft consensus group maps 1:1 to
21822197
/// a region. Organizations declare a region governing where their data is stored.
@@ -2879,55 +2894,6 @@ impl HealthStatus {
28792894
}
28802895
}
28812896
}
2882-
/// Administrative permission for sensitive operations.
2883-
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
2884-
#[repr(i32)]
2885-
pub enum AdminPermission {
2886-
Unspecified = 0,
2887-
MigrateOrganization = 1,
2888-
MigrateUserRegion = 2,
2889-
EraseUser = 3,
2890-
RotateRegionKey = 4,
2891-
DecommissionRegionKey = 5,
2892-
RotateBlindingKey = 6,
2893-
ManageRegionMembership = 7,
2894-
}
2895-
impl AdminPermission {
2896-
/// String value of the enum field names used in the ProtoBuf definition.
2897-
///
2898-
/// The values are not transformed in any way and thus are considered stable
2899-
/// (if the ProtoBuf definition does not change) and safe for programmatic use.
2900-
pub fn as_str_name(&self) -> &'static str {
2901-
match self {
2902-
Self::Unspecified => "ADMIN_PERMISSION_UNSPECIFIED",
2903-
Self::MigrateOrganization => "ADMIN_PERMISSION_MIGRATE_ORGANIZATION",
2904-
Self::MigrateUserRegion => "ADMIN_PERMISSION_MIGRATE_USER_REGION",
2905-
Self::EraseUser => "ADMIN_PERMISSION_ERASE_USER",
2906-
Self::RotateRegionKey => "ADMIN_PERMISSION_ROTATE_REGION_KEY",
2907-
Self::DecommissionRegionKey => "ADMIN_PERMISSION_DECOMMISSION_REGION_KEY",
2908-
Self::RotateBlindingKey => "ADMIN_PERMISSION_ROTATE_BLINDING_KEY",
2909-
Self::ManageRegionMembership => "ADMIN_PERMISSION_MANAGE_REGION_MEMBERSHIP",
2910-
}
2911-
}
2912-
/// Creates an enum from field names used in the ProtoBuf definition.
2913-
pub fn from_str_name(value: &str) -> ::core::option::Option<Self> {
2914-
match value {
2915-
"ADMIN_PERMISSION_UNSPECIFIED" => Some(Self::Unspecified),
2916-
"ADMIN_PERMISSION_MIGRATE_ORGANIZATION" => Some(Self::MigrateOrganization),
2917-
"ADMIN_PERMISSION_MIGRATE_USER_REGION" => Some(Self::MigrateUserRegion),
2918-
"ADMIN_PERMISSION_ERASE_USER" => Some(Self::EraseUser),
2919-
"ADMIN_PERMISSION_ROTATE_REGION_KEY" => Some(Self::RotateRegionKey),
2920-
"ADMIN_PERMISSION_DECOMMISSION_REGION_KEY" => {
2921-
Some(Self::DecommissionRegionKey)
2922-
}
2923-
"ADMIN_PERMISSION_ROTATE_BLINDING_KEY" => Some(Self::RotateBlindingKey),
2924-
"ADMIN_PERMISSION_MANAGE_REGION_MEMBERSHIP" => {
2925-
Some(Self::ManageRegionMembership)
2926-
}
2927-
_ => None,
2928-
}
2929-
}
2930-
}
29312897
/// Generated client implementations.
29322898
pub mod read_service_client {
29332899
#![allow(
@@ -5346,15 +5312,14 @@ pub mod admin_service_client {
53465312
.insert(GrpcMethod::new("ledger.v1.AdminService", "EraseUser"));
53475313
self.inner.unary(req, path, codec).await
53485314
}
5349-
/// Request a confirmation token for an irreversible operation.
5350-
/// Returns a single-use, time-limited token that must be presented
5351-
/// when executing the operation. Required for EraseUser and
5352-
/// DecommissionRegionKey.
5353-
pub async fn request_confirmation(
5315+
/// Migrate existing users from flat \_system store to regional directory
5316+
/// structure. One-time admin operation for pre-release data migration.
5317+
/// Idempotent — re-run skips already-migrated users.
5318+
pub async fn migrate_existing_users(
53545319
&mut self,
5355-
request: impl tonic::IntoRequest<super::RequestConfirmationRequest>,
5320+
request: impl tonic::IntoRequest<super::MigrateExistingUsersRequest>,
53565321
) -> std::result::Result<
5357-
tonic::Response<super::RequestConfirmationResponse>,
5322+
tonic::Response<super::MigrateExistingUsersResponse>,
53585323
tonic::Status,
53595324
> {
53605325
self.inner
@@ -5367,15 +5332,42 @@ pub mod admin_service_client {
53675332
})?;
53685333
let codec = tonic_prost::ProstCodec::default();
53695334
let path = http::uri::PathAndQuery::from_static(
5370-
"/ledger.v1.AdminService/RequestConfirmation",
5335+
"/ledger.v1.AdminService/MigrateExistingUsers",
53715336
);
53725337
let mut req = request.into_request();
53735338
req.extensions_mut()
53745339
.insert(
5375-
GrpcMethod::new("ledger.v1.AdminService", "RequestConfirmation"),
5340+
GrpcMethod::new("ledger.v1.AdminService", "MigrateExistingUsers"),
53765341
);
53775342
self.inner.unary(req, path, codec).await
53785343
}
5344+
/// Eagerly provision a Raft group for a region without assigning data.
5345+
/// Normally regional groups are created lazily on first organization or
5346+
/// user assignment. This RPC allows pre-provisioning for capacity planning.
5347+
pub async fn provision_region(
5348+
&mut self,
5349+
request: impl tonic::IntoRequest<super::ProvisionRegionRequest>,
5350+
) -> std::result::Result<
5351+
tonic::Response<super::ProvisionRegionResponse>,
5352+
tonic::Status,
5353+
> {
5354+
self.inner
5355+
.ready()
5356+
.await
5357+
.map_err(|e| {
5358+
tonic::Status::unknown(
5359+
format!("Service was not ready: {}", e.into()),
5360+
)
5361+
})?;
5362+
let codec = tonic_prost::ProstCodec::default();
5363+
let path = http::uri::PathAndQuery::from_static(
5364+
"/ledger.v1.AdminService/ProvisionRegion",
5365+
);
5366+
let mut req = request.into_request();
5367+
req.extensions_mut()
5368+
.insert(GrpcMethod::new("ledger.v1.AdminService", "ProvisionRegion"));
5369+
self.inner.unary(req, path, codec).await
5370+
}
53795371
}
53805372
}
53815373
/// Generated server implementations.
@@ -5661,15 +5653,24 @@ pub mod admin_service_server {
56615653
tonic::Response<super::EraseUserResponse>,
56625654
tonic::Status,
56635655
>;
5664-
/// Request a confirmation token for an irreversible operation.
5665-
/// Returns a single-use, time-limited token that must be presented
5666-
/// when executing the operation. Required for EraseUser and
5667-
/// DecommissionRegionKey.
5668-
async fn request_confirmation(
5656+
/// Migrate existing users from flat \_system store to regional directory
5657+
/// structure. One-time admin operation for pre-release data migration.
5658+
/// Idempotent — re-run skips already-migrated users.
5659+
async fn migrate_existing_users(
5660+
&self,
5661+
request: tonic::Request<super::MigrateExistingUsersRequest>,
5662+
) -> std::result::Result<
5663+
tonic::Response<super::MigrateExistingUsersResponse>,
5664+
tonic::Status,
5665+
>;
5666+
/// Eagerly provision a Raft group for a region without assigning data.
5667+
/// Normally regional groups are created lazily on first organization or
5668+
/// user assignment. This RPC allows pre-provisioning for capacity planning.
5669+
async fn provision_region(
56695670
&self,
5670-
request: tonic::Request<super::RequestConfirmationRequest>,
5671+
request: tonic::Request<super::ProvisionRegionRequest>,
56715672
) -> std::result::Result<
5672-
tonic::Response<super::RequestConfirmationResponse>,
5673+
tonic::Response<super::ProvisionRegionResponse>,
56735674
tonic::Status,
56745675
>;
56755676
}
@@ -7116,25 +7117,25 @@ pub mod admin_service_server {
71167117
};
71177118
Box::pin(fut)
71187119
}
7119-
"/ledger.v1.AdminService/RequestConfirmation" => {
7120+
"/ledger.v1.AdminService/MigrateExistingUsers" => {
71207121
#[allow(non_camel_case_types)]
7121-
struct RequestConfirmationSvc<T: AdminService>(pub Arc<T>);
7122+
struct MigrateExistingUsersSvc<T: AdminService>(pub Arc<T>);
71227123
impl<
71237124
T: AdminService,
7124-
> tonic::server::UnaryService<super::RequestConfirmationRequest>
7125-
for RequestConfirmationSvc<T> {
7126-
type Response = super::RequestConfirmationResponse;
7125+
> tonic::server::UnaryService<super::MigrateExistingUsersRequest>
7126+
for MigrateExistingUsersSvc<T> {
7127+
type Response = super::MigrateExistingUsersResponse;
71277128
type Future = BoxFuture<
71287129
tonic::Response<Self::Response>,
71297130
tonic::Status,
71307131
>;
71317132
fn call(
71327133
&mut self,
7133-
request: tonic::Request<super::RequestConfirmationRequest>,
7134+
request: tonic::Request<super::MigrateExistingUsersRequest>,
71347135
) -> Self::Future {
71357136
let inner = Arc::clone(&self.0);
71367137
let fut = async move {
7137-
<T as AdminService>::request_confirmation(&inner, request)
7138+
<T as AdminService>::migrate_existing_users(&inner, request)
71387139
.await
71397140
};
71407141
Box::pin(fut)
@@ -7146,7 +7147,52 @@ pub mod admin_service_server {
71467147
let max_encoding_message_size = self.max_encoding_message_size;
71477148
let inner = self.inner.clone();
71487149
let fut = async move {
7149-
let method = RequestConfirmationSvc(inner);
7150+
let method = MigrateExistingUsersSvc(inner);
7151+
let codec = tonic_prost::ProstCodec::default();
7152+
let mut grpc = tonic::server::Grpc::new(codec)
7153+
.apply_compression_config(
7154+
accept_compression_encodings,
7155+
send_compression_encodings,
7156+
)
7157+
.apply_max_message_size_config(
7158+
max_decoding_message_size,
7159+
max_encoding_message_size,
7160+
);
7161+
let res = grpc.unary(method, req).await;
7162+
Ok(res)
7163+
};
7164+
Box::pin(fut)
7165+
}
7166+
"/ledger.v1.AdminService/ProvisionRegion" => {
7167+
#[allow(non_camel_case_types)]
7168+
struct ProvisionRegionSvc<T: AdminService>(pub Arc<T>);
7169+
impl<
7170+
T: AdminService,
7171+
> tonic::server::UnaryService<super::ProvisionRegionRequest>
7172+
for ProvisionRegionSvc<T> {
7173+
type Response = super::ProvisionRegionResponse;
7174+
type Future = BoxFuture<
7175+
tonic::Response<Self::Response>,
7176+
tonic::Status,
7177+
>;
7178+
fn call(
7179+
&mut self,
7180+
request: tonic::Request<super::ProvisionRegionRequest>,
7181+
) -> Self::Future {
7182+
let inner = Arc::clone(&self.0);
7183+
let fut = async move {
7184+
<T as AdminService>::provision_region(&inner, request).await
7185+
};
7186+
Box::pin(fut)
7187+
}
7188+
}
7189+
let accept_compression_encodings = self.accept_compression_encodings;
7190+
let send_compression_encodings = self.send_compression_encodings;
7191+
let max_decoding_message_size = self.max_decoding_message_size;
7192+
let max_encoding_message_size = self.max_encoding_message_size;
7193+
let inner = self.inner.clone();
7194+
let fut = async move {
7195+
let method = ProvisionRegionSvc(inner);
71507196
let codec = tonic_prost::ProstCodec::default();
71517197
let mut grpc = tonic::server::Grpc::new(codec)
71527198
.apply_compression_config(
6 Bytes
Binary file not shown.

crates/raft/src/log_storage/mod.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2066,15 +2066,13 @@ mod tests {
20662066
request_hash: 0,
20672067
},
20682068
LedgerRequest::System(SystemRequest::CreateUser {
2069-
name: "Alice".to_string(),
2070-
email: "alice@example.com".to_string(),
2069+
user: UserId::new(1),
20712070
admin: false,
20722071
slug: UserSlug::new(111),
20732072
region: Region::US_EAST_VA,
20742073
}),
20752074
LedgerRequest::System(SystemRequest::CreateUser {
2076-
name: "Bob".to_string(),
2077-
email: "bob@example.com".to_string(),
2075+
user: UserId::new(2),
20782076
admin: false,
20792077
slug: UserSlug::new(222),
20802078
region: Region::US_EAST_VA,
@@ -4527,8 +4525,7 @@ mod tests {
45274525
let mut state = store.applied_state.write();
45284526

45294527
let request = LedgerRequest::System(SystemRequest::CreateUser {
4530-
name: "Alice Admin".to_string(),
4531-
email: "alice@example.com".to_string(),
4528+
user: UserId::new(1),
45324529
admin: true,
45334530
slug: UserSlug::new(333),
45344531
region: Region::US_EAST_VA,
@@ -4557,8 +4554,6 @@ mod tests {
45574554
user_event.details.contains_key("user_id"),
45584555
"UserCreated should have user_id detail"
45594556
);
4560-
assert_eq!(user_event.details.get("name").map(|s| s.as_str()), Some("Alice Admin"));
4561-
assert_eq!(user_event.details.get("email").map(|s| s.as_str()), Some("alice@example.com"));
45624557
assert_eq!(user_event.details.get("admin").map(|s| s.as_str()), Some("true"));
45634558
assert_eq!(user_event.organization_id, OrganizationId::new(0));
45644559
}

0 commit comments

Comments
 (0)