Skip to content

feat: Introduce User Groups to Parseable #1366

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jul 10, 2025
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,20 +160,29 @@ pub async fn sync_streams_with_ingestors(
pub async fn sync_users_with_roles_with_ingestors(
username: &str,
role: &HashSet<String>,
operation: &str,
) -> Result<(), RBACError> {
match operation {
"add" | "remove" => {}
_ => return Err(RBACError::InvalidSyncOperation(operation.to_string())),
}

let role_data = to_vec(&role.clone()).map_err(|err| {
error!("Fatal: failed to serialize role: {:?}", err);
RBACError::SerdeError(err)
})?;

let username = username.to_owned();

let op = operation.to_string();

for_each_live_ingestor(move |ingestor| {
let url = format!(
"{}{}/user/{}/role/sync",
"{}{}/user/{}/role/sync/{}",
ingestor.domain_name,
base_path_without_preceding_slash(),
username
username,
op
);

let role_data = role_data.clone();
Expand Down
20 changes: 14 additions & 6 deletions src/handlers/http/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub struct CommonAttributes {

pub trait RouteExt {
fn authorize(self, action: Action) -> Self;
fn authorize_for_stream(self, action: Action) -> Self;
fn authorize_for_resource(self, action: Action) -> Self;
fn authorize_for_user(self, action: Action) -> Self;
}

Expand All @@ -71,10 +71,10 @@ impl RouteExt for Route {
})
}

fn authorize_for_stream(self, action: Action) -> Self {
fn authorize_for_resource(self, action: Action) -> Self {
self.wrap(Auth {
action,
method: auth_stream_context,
method: auth_resource_context,
})
}

Expand Down Expand Up @@ -182,18 +182,26 @@ pub fn auth_no_context(req: &mut ServiceRequest, action: Action) -> Result<rbac:
creds.map(|key| Users.authorize(key, action, None, None))
}

pub fn auth_stream_context(
pub fn auth_resource_context(
req: &mut ServiceRequest,
action: Action,
) -> Result<rbac::Response, Error> {
let creds = extract_session_key(req);
let usergroup = req.match_info().get("usergroup");
let llmid = req.match_info().get("llmid");
let mut stream = req.match_info().get("logstream");
if stream.is_none() {
if let Some(usergroup) = usergroup {
creds.map(|key| Users.authorize(key, action, Some(usergroup), None))
} else if let Some(llmid) = llmid {
creds.map(|key| Users.authorize(key, action, Some(llmid), None))
} else if let Some(stream) = stream {
creds.map(|key| Users.authorize(key, action, Some(stream), None))
} else {
if let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) {
stream = Some(stream_name.to_str().unwrap());
}
creds.map(|key| Users.authorize(key, action, stream, None))
}
creds.map(|key| Users.authorize(key, action, stream, None))
}

pub fn auth_user_context(
Expand Down
86 changes: 78 additions & 8 deletions src/handlers/http/modal/ingest/ingestor_rbac.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use tokio::sync::Mutex;
use crate::{
handlers::http::{modal::utils::rbac_utils::get_metadata, rbac::RBACError},
rbac::{
map::roles,
user::{self, User as ParseableUser},
Users,
},
Expand All @@ -48,7 +49,7 @@ pub async fn post_user(
let _ = storage::put_staging_metadata(&metadata);
let created_role = user.roles.clone();
Users.put_user(user.clone());
Users.put_role(&username, created_role.clone());
Users.add_roles(&username, created_role.clone());
}

Ok(generated_password)
Expand All @@ -73,34 +74,103 @@ pub async fn delete_user(username: web::Path<String>) -> Result<impl Responder,
Ok(format!("deleted user: {username}"))
}

// Handler PUT /user/{username}/roles => Put roles for user
// Put roles for given user
pub async fn put_role(
// Handler PATCH /user/{username}/role/sync/add => Add roles to a user
pub async fn add_roles_to_user(
username: web::Path<String>,
role: web::Json<HashSet<String>>,
roles_to_add: web::Json<HashSet<String>>,
) -> Result<String, RBACError> {
let username = username.into_inner();
let role = role.into_inner();
let roles_to_add = roles_to_add.into_inner();

if !Users.contains(&username) {
return Err(RBACError::UserDoesNotExist);
};

// check if all roles exist
let mut non_existent_roles = Vec::new();
roles_to_add.iter().for_each(|r| {
if roles().get(r).is_none() {
non_existent_roles.push(r.clone());
}
});

if !non_existent_roles.is_empty() {
return Err(RBACError::RolesDoNotExist(non_existent_roles));
}

// update parseable.json first
let mut metadata = get_metadata().await?;
if let Some(user) = metadata
.users
.iter_mut()
.find(|user| user.username() == username)
{
user.roles.extend(roles_to_add.clone());
} else {
// should be unreachable given state is always consistent
return Err(RBACError::UserDoesNotExist);
}

let _ = storage::put_staging_metadata(&metadata);
// update in mem table
Users.add_roles(&username.clone(), roles_to_add.clone());

Ok(format!("Roles updated successfully for {username}"))
}

// Handler PATCH /user/{username}/role/sync/add => Add roles to a user
pub async fn remove_roles_from_user(
username: web::Path<String>,
roles_to_remove: web::Json<HashSet<String>>,
) -> Result<String, RBACError> {
let username = username.into_inner();
let roles_to_remove = roles_to_remove.into_inner();

if !Users.contains(&username) {
return Err(RBACError::UserDoesNotExist);
};

// check if all roles exist
let mut non_existent_roles = Vec::new();
roles_to_remove.iter().for_each(|r| {
if roles().get(r).is_none() {
non_existent_roles.push(r.clone());
}
});

if !non_existent_roles.is_empty() {
return Err(RBACError::RolesDoNotExist(non_existent_roles));
}

// check that user actually has these roles
let user_roles: HashSet<String> = HashSet::from_iter(Users.get_role(&username));
let roles_not_with_user: HashSet<String> =
HashSet::from_iter(roles_to_remove.difference(&user_roles).cloned());

if !roles_not_with_user.is_empty() {
return Err(RBACError::RolesNotAssigned(Vec::from_iter(
roles_not_with_user,
)));
}

// update parseable.json first
let mut metadata = get_metadata().await?;
if let Some(user) = metadata
.users
.iter_mut()
.find(|user| user.username() == username)
{
user.roles.clone_from(&role);
let diff: HashSet<String> =
HashSet::from_iter(user.roles.difference(&roles_to_remove).cloned());
user.roles = diff;
} else {
// should be unreachable given state is always consistent
return Err(RBACError::UserDoesNotExist);
}

let _ = storage::put_staging_metadata(&metadata);
// update in mem table
Users.put_role(&username.clone(), role.clone());
Users.remove_roles(&username.clone(), roles_to_remove.clone());

Ok(format!("Roles updated successfully for {username}"))
}
Expand Down
28 changes: 19 additions & 9 deletions src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,11 +198,21 @@ impl IngestServer {
.wrap(DisAllowRootUser),
)
.service(
web::resource("/{username}/role/sync")
// PUT /user/{username}/roles => Put roles for user
web::resource("/{username}/role/sync/add")
// PATCH /user/{username}/role/sync/add => Add roles to a user
.route(
web::put()
.to(ingestor_rbac::put_role)
web::patch()
.to(ingestor_rbac::add_roles_to_user)
.authorize(Action::PutUserRoles)
.wrap(DisAllowRootUser),
),
)
.service(
web::resource("/{username}/role/sync/remove")
// PATCH /user/{username}/role/sync/remove => Remove roles from a user
.route(
web::patch()
.to(ingestor_rbac::remove_roles_from_user)
.authorize(Action::PutUserRoles)
.wrap(DisAllowRootUser),
),
Expand All @@ -227,7 +237,7 @@ impl IngestServer {
.route(
web::post()
.to(ingest::post_event)
.authorize_for_stream(Action::Ingest),
.authorize_for_resource(Action::Ingest),
)
.wrap(from_fn(
resource_check::check_resource_utilization_middleware,
Expand All @@ -245,31 +255,31 @@ impl IngestServer {
.route(
web::put()
.to(ingestor_logstream::put_stream)
.authorize_for_stream(Action::CreateStream),
.authorize_for_resource(Action::CreateStream),
),
)
.service(
// GET "/logstream/{logstream}/info" ==> Get info for given log stream
web::resource("/info").route(
web::get()
.to(logstream::get_stream_info)
.authorize_for_stream(Action::GetStreamInfo),
.authorize_for_resource(Action::GetStreamInfo),
),
)
.service(
// GET "/logstream/{logstream}/stats" ==> Get stats for given log stream
web::resource("/stats").route(
web::get()
.to(logstream::get_stats)
.authorize_for_stream(Action::GetStats),
.authorize_for_resource(Action::GetStats),
),
)
.service(
web::scope("/retention").service(
web::resource("/cleanup").route(
web::post()
.to(ingestor_logstream::retention_cleanup)
.authorize_for_stream(Action::PutRetention),
.authorize_for_resource(Action::PutRetention),
),
),
),
Expand Down
Loading
Loading