Skip to content

Commit ecbf80d

Browse files
Merge branch 'main' into demo-data
2 parents 86a8bdc + c4533be commit ecbf80d

File tree

21 files changed

+1071
-239
lines changed

21 files changed

+1071
-239
lines changed

src/handlers/http/cluster/mod.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,20 +216,29 @@ pub async fn get_demo_data_from_ingestor(action: &str) -> Result<(), PostError>
216216
pub async fn sync_users_with_roles_with_ingestors(
217217
username: &str,
218218
role: &HashSet<String>,
219+
operation: &str,
219220
) -> Result<(), RBACError> {
221+
match operation {
222+
"add" | "remove" => {}
223+
_ => return Err(RBACError::InvalidSyncOperation(operation.to_string())),
224+
}
225+
220226
let role_data = to_vec(&role.clone()).map_err(|err| {
221227
error!("Fatal: failed to serialize role: {:?}", err);
222228
RBACError::SerdeError(err)
223229
})?;
224230

225231
let username = username.to_owned();
226232

233+
let op = operation.to_string();
234+
227235
for_each_live_ingestor(move |ingestor| {
228236
let url = format!(
229-
"{}{}/user/{}/role/sync",
237+
"{}{}/user/{}/role/sync/{}",
230238
ingestor.domain_name,
231239
base_path_without_preceding_slash(),
232-
username
240+
username,
241+
op
233242
);
234243

235244
let role_data = role_data.clone();

src/handlers/http/middleware.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ pub struct CommonAttributes {
5959

6060
pub trait RouteExt {
6161
fn authorize(self, action: Action) -> Self;
62-
fn authorize_for_stream(self, action: Action) -> Self;
62+
fn authorize_for_resource(self, action: Action) -> Self;
6363
fn authorize_for_user(self, action: Action) -> Self;
6464
}
6565

@@ -71,10 +71,10 @@ impl RouteExt for Route {
7171
})
7272
}
7373

74-
fn authorize_for_stream(self, action: Action) -> Self {
74+
fn authorize_for_resource(self, action: Action) -> Self {
7575
self.wrap(Auth {
7676
action,
77-
method: auth_stream_context,
77+
method: auth_resource_context,
7878
})
7979
}
8080

@@ -182,18 +182,26 @@ pub fn auth_no_context(req: &mut ServiceRequest, action: Action) -> Result<rbac:
182182
creds.map(|key| Users.authorize(key, action, None, None))
183183
}
184184

185-
pub fn auth_stream_context(
185+
pub fn auth_resource_context(
186186
req: &mut ServiceRequest,
187187
action: Action,
188188
) -> Result<rbac::Response, Error> {
189189
let creds = extract_session_key(req);
190+
let usergroup = req.match_info().get("usergroup");
191+
let llmid = req.match_info().get("llmid");
190192
let mut stream = req.match_info().get("logstream");
191-
if stream.is_none() {
193+
if let Some(usergroup) = usergroup {
194+
creds.map(|key| Users.authorize(key, action, Some(usergroup), None))
195+
} else if let Some(llmid) = llmid {
196+
creds.map(|key| Users.authorize(key, action, Some(llmid), None))
197+
} else if let Some(stream) = stream {
198+
creds.map(|key| Users.authorize(key, action, Some(stream), None))
199+
} else {
192200
if let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) {
193201
stream = Some(stream_name.to_str().unwrap());
194202
}
203+
creds.map(|key| Users.authorize(key, action, stream, None))
195204
}
196-
creds.map(|key| Users.authorize(key, action, stream, None))
197205
}
198206

199207
pub fn auth_user_context(

src/handlers/http/modal/ingest/ingestor_rbac.rs

Lines changed: 78 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use tokio::sync::Mutex;
2424
use crate::{
2525
handlers::http::{modal::utils::rbac_utils::get_metadata, rbac::RBACError},
2626
rbac::{
27+
map::roles,
2728
user::{self, User as ParseableUser},
2829
Users,
2930
},
@@ -48,7 +49,7 @@ pub async fn post_user(
4849
let _ = storage::put_staging_metadata(&metadata);
4950
let created_role = user.roles.clone();
5051
Users.put_user(user.clone());
51-
Users.put_role(&username, created_role.clone());
52+
Users.add_roles(&username, created_role.clone());
5253
}
5354

5455
Ok(generated_password)
@@ -73,34 +74,103 @@ pub async fn delete_user(username: web::Path<String>) -> Result<impl Responder,
7374
Ok(format!("deleted user: {username}"))
7475
}
7576

76-
// Handler PUT /user/{username}/roles => Put roles for user
77-
// Put roles for given user
78-
pub async fn put_role(
77+
// Handler PATCH /user/{username}/role/sync/add => Add roles to a user
78+
pub async fn add_roles_to_user(
7979
username: web::Path<String>,
80-
role: web::Json<HashSet<String>>,
80+
roles_to_add: web::Json<HashSet<String>>,
8181
) -> Result<String, RBACError> {
8282
let username = username.into_inner();
83-
let role = role.into_inner();
83+
let roles_to_add = roles_to_add.into_inner();
8484

8585
if !Users.contains(&username) {
8686
return Err(RBACError::UserDoesNotExist);
8787
};
88+
89+
// check if all roles exist
90+
let mut non_existent_roles = Vec::new();
91+
roles_to_add.iter().for_each(|r| {
92+
if roles().get(r).is_none() {
93+
non_existent_roles.push(r.clone());
94+
}
95+
});
96+
97+
if !non_existent_roles.is_empty() {
98+
return Err(RBACError::RolesDoNotExist(non_existent_roles));
99+
}
100+
101+
// update parseable.json first
102+
let mut metadata = get_metadata().await?;
103+
if let Some(user) = metadata
104+
.users
105+
.iter_mut()
106+
.find(|user| user.username() == username)
107+
{
108+
user.roles.extend(roles_to_add.clone());
109+
} else {
110+
// should be unreachable given state is always consistent
111+
return Err(RBACError::UserDoesNotExist);
112+
}
113+
114+
let _ = storage::put_staging_metadata(&metadata);
115+
// update in mem table
116+
Users.add_roles(&username.clone(), roles_to_add.clone());
117+
118+
Ok(format!("Roles updated successfully for {username}"))
119+
}
120+
121+
// Handler PATCH /user/{username}/role/sync/add => Add roles to a user
122+
pub async fn remove_roles_from_user(
123+
username: web::Path<String>,
124+
roles_to_remove: web::Json<HashSet<String>>,
125+
) -> Result<String, RBACError> {
126+
let username = username.into_inner();
127+
let roles_to_remove = roles_to_remove.into_inner();
128+
129+
if !Users.contains(&username) {
130+
return Err(RBACError::UserDoesNotExist);
131+
};
132+
133+
// check if all roles exist
134+
let mut non_existent_roles = Vec::new();
135+
roles_to_remove.iter().for_each(|r| {
136+
if roles().get(r).is_none() {
137+
non_existent_roles.push(r.clone());
138+
}
139+
});
140+
141+
if !non_existent_roles.is_empty() {
142+
return Err(RBACError::RolesDoNotExist(non_existent_roles));
143+
}
144+
145+
// check that user actually has these roles
146+
let user_roles: HashSet<String> = HashSet::from_iter(Users.get_role(&username));
147+
let roles_not_with_user: HashSet<String> =
148+
HashSet::from_iter(roles_to_remove.difference(&user_roles).cloned());
149+
150+
if !roles_not_with_user.is_empty() {
151+
return Err(RBACError::RolesNotAssigned(Vec::from_iter(
152+
roles_not_with_user,
153+
)));
154+
}
155+
88156
// update parseable.json first
89157
let mut metadata = get_metadata().await?;
90158
if let Some(user) = metadata
91159
.users
92160
.iter_mut()
93161
.find(|user| user.username() == username)
94162
{
95-
user.roles.clone_from(&role);
163+
let diff: HashSet<String> =
164+
HashSet::from_iter(user.roles.difference(&roles_to_remove).cloned());
165+
user.roles = diff;
96166
} else {
97167
// should be unreachable given state is always consistent
98168
return Err(RBACError::UserDoesNotExist);
99169
}
100170

101171
let _ = storage::put_staging_metadata(&metadata);
102172
// update in mem table
103-
Users.put_role(&username.clone(), role.clone());
173+
Users.remove_roles(&username.clone(), roles_to_remove.clone());
104174

105175
Ok(format!("Roles updated successfully for {username}"))
106176
}

src/handlers/http/modal/ingest/ingestor_role.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,19 @@
1616
*
1717
*/
1818

19+
use std::collections::HashSet;
20+
1921
use actix_web::{
2022
web::{self, Json},
2123
HttpResponse, Responder,
2224
};
2325

2426
use crate::{
2527
handlers::http::{modal::utils::rbac_utils::get_metadata, role::RoleError},
26-
rbac::{map::mut_roles, role::model::DefaultPrivilege},
28+
rbac::{
29+
map::{mut_roles, mut_sessions, read_user_groups, users},
30+
role::model::DefaultPrivilege,
31+
},
2732
storage,
2833
};
2934

@@ -40,5 +45,25 @@ pub async fn put(
4045
let _ = storage::put_staging_metadata(&metadata);
4146
mut_roles().insert(name.clone(), privileges);
4247

48+
// refresh the sessions of all users using this role
49+
// for this, iterate over all user_groups and users and create a hashset of users
50+
let mut session_refresh_users: HashSet<String> = HashSet::new();
51+
for user_group in read_user_groups().values().cloned() {
52+
if user_group.roles.contains(&name) {
53+
session_refresh_users.extend(user_group.users);
54+
}
55+
}
56+
57+
// iterate over all users to see if they have this role
58+
for user in users().values().cloned() {
59+
if user.roles.contains(&name) {
60+
session_refresh_users.insert(user.username().to_string());
61+
}
62+
}
63+
64+
for username in session_refresh_users {
65+
mut_sessions().remove_user(&username);
66+
}
67+
4368
Ok(HttpResponse::Ok().finish())
4469
}

src/handlers/http/modal/ingest_server.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -199,11 +199,21 @@ impl IngestServer {
199199
.wrap(DisAllowRootUser),
200200
)
201201
.service(
202-
web::resource("/{username}/role/sync")
203-
// PUT /user/{username}/roles => Put roles for user
202+
web::resource("/{username}/role/sync/add")
203+
// PATCH /user/{username}/role/sync/add => Add roles to a user
204204
.route(
205-
web::put()
206-
.to(ingestor_rbac::put_role)
205+
web::patch()
206+
.to(ingestor_rbac::add_roles_to_user)
207+
.authorize(Action::PutUserRoles)
208+
.wrap(DisAllowRootUser),
209+
),
210+
)
211+
.service(
212+
web::resource("/{username}/role/sync/remove")
213+
// PATCH /user/{username}/role/sync/remove => Remove roles from a user
214+
.route(
215+
web::patch()
216+
.to(ingestor_rbac::remove_roles_from_user)
207217
.authorize(Action::PutUserRoles)
208218
.wrap(DisAllowRootUser),
209219
),
@@ -228,7 +238,7 @@ impl IngestServer {
228238
.route(
229239
web::post()
230240
.to(ingest::post_event)
231-
.authorize_for_stream(Action::Ingest),
241+
.authorize_for_resource(Action::Ingest),
232242
)
233243
.wrap(from_fn(
234244
resource_check::check_resource_utilization_middleware,
@@ -246,31 +256,31 @@ impl IngestServer {
246256
.route(
247257
web::put()
248258
.to(ingestor_logstream::put_stream)
249-
.authorize_for_stream(Action::CreateStream),
259+
.authorize_for_resource(Action::CreateStream),
250260
),
251261
)
252262
.service(
253263
// GET "/logstream/{logstream}/info" ==> Get info for given log stream
254264
web::resource("/info").route(
255265
web::get()
256266
.to(logstream::get_stream_info)
257-
.authorize_for_stream(Action::GetStreamInfo),
267+
.authorize_for_resource(Action::GetStreamInfo),
258268
),
259269
)
260270
.service(
261271
// GET "/logstream/{logstream}/stats" ==> Get stats for given log stream
262272
web::resource("/stats").route(
263273
web::get()
264274
.to(logstream::get_stats)
265-
.authorize_for_stream(Action::GetStats),
275+
.authorize_for_resource(Action::GetStats),
266276
),
267277
)
268278
.service(
269279
web::scope("/retention").service(
270280
web::resource("/cleanup").route(
271281
web::post()
272282
.to(ingestor_logstream::retention_cleanup)
273-
.authorize_for_stream(Action::PutRetention),
283+
.authorize_for_resource(Action::PutRetention),
274284
),
275285
),
276286
),

0 commit comments

Comments
 (0)