Skip to content

Commit 7de7caf

Browse files
server bug fix (#927)
1. delete hottier should be restricted for internal streams 2. POST /logstream/{logstream} should be restricted for query mode 3. update role sync between querier to ingestors 4. get user role priviledge to reader, writer and editor roles 5. issue: any user having ingestion priviledge can ingest to all streams using /ingest API current state: check stream name in resource path for authorization fix: check the x-p-stream header for the stream name also
1 parent db8b726 commit 7de7caf

File tree

7 files changed

+112
-9
lines changed

7 files changed

+112
-9
lines changed

server/src/handlers/http/cluster/mod.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@ use crate::handlers::http::cluster::utils::{
2323
};
2424
use crate::handlers::http::ingest::{ingest_internal_stream, PostError};
2525
use crate::handlers::http::logstream::error::StreamError;
26+
use crate::handlers::http::role::RoleError;
2627
use crate::option::CONFIG;
2728

2829
use crate::metrics::prom_utils::Metrics;
30+
use crate::rbac::role::model::DefaultPrivilege;
2931
use crate::rbac::user::User;
3032
use crate::stats::Stats;
3133
use crate::storage::object_storage::ingestor_metadata_path;
@@ -364,6 +366,63 @@ pub async fn sync_password_reset_with_ingestors(username: &String) -> Result<(),
364366
Ok(())
365367
}
366368

369+
// forward the put role request to all ingestors to keep them in sync
370+
pub async fn sync_role_update_with_ingestors(
371+
name: String,
372+
body: Vec<DefaultPrivilege>,
373+
) -> Result<(), RoleError> {
374+
let ingestor_infos = get_ingestor_info().await.map_err(|err| {
375+
log::error!("Fatal: failed to get ingestor info: {:?}", err);
376+
RoleError::Anyhow(err)
377+
})?;
378+
379+
let roles = to_vec(&body).map_err(|err| {
380+
log::error!("Fatal: failed to serialize roles: {:?}", err);
381+
RoleError::SerdeError(err)
382+
})?;
383+
let roles = Bytes::from(roles);
384+
let client = reqwest::Client::new();
385+
386+
for ingestor in ingestor_infos.iter() {
387+
if !utils::check_liveness(&ingestor.domain_name).await {
388+
log::warn!("Ingestor {} is not live", ingestor.domain_name);
389+
continue;
390+
}
391+
let url = format!(
392+
"{}{}/role/{}",
393+
ingestor.domain_name,
394+
base_path_without_preceding_slash(),
395+
name
396+
);
397+
398+
let res = client
399+
.put(url)
400+
.header(header::AUTHORIZATION, &ingestor.token)
401+
.header(header::CONTENT_TYPE, "application/json")
402+
.body(roles.clone())
403+
.send()
404+
.await
405+
.map_err(|err| {
406+
log::error!(
407+
"Fatal: failed to forward request to ingestor: {}\n Error: {:?}",
408+
ingestor.domain_name,
409+
err
410+
);
411+
RoleError::Network(err)
412+
})?;
413+
414+
if !res.status().is_success() {
415+
log::error!(
416+
"failed to forward request to ingestor: {}\nResponse Returned: {:?}",
417+
ingestor.domain_name,
418+
res.text().await
419+
);
420+
}
421+
}
422+
423+
Ok(())
424+
}
425+
367426
pub async fn fetch_daily_stats_from_ingestors(
368427
stream_name: &str,
369428
date: &str,

server/src/handlers/http/ingest.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,12 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result<HttpResponse, P
186186
if !STREAM_INFO.stream_exists(&stream_name) {
187187
return Err(PostError::StreamNotFound(stream_name));
188188
}
189+
190+
if CONFIG.parseable.mode == Mode::Query {
191+
return Err(PostError::Invalid(anyhow::anyhow!(
192+
"Ingestion is not allowed in Query mode"
193+
)));
194+
}
189195
flatten_and_push_logs(req, body, stream_name).await?;
190196
Ok(HttpResponse::Ok().finish())
191197
}

server/src/handlers/http/logstream.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1057,6 +1057,13 @@ pub async fn delete_stream_hot_tier(req: HttpRequest) -> Result<impl Responder,
10571057
return Err(StreamError::HotTierNotEnabled(stream_name));
10581058
}
10591059

1060+
if STREAM_INFO.stream_type(&stream_name).unwrap() == Some(StreamType::Internal.to_string()) {
1061+
return Err(StreamError::Custom {
1062+
msg: "Hot tier can not be deleted for internal stream".to_string(),
1063+
status: StatusCode::BAD_REQUEST,
1064+
});
1065+
}
1066+
10601067
if let Some(hot_tier_manager) = HotTierManager::global() {
10611068
hot_tier_manager.delete_hot_tier(&stream_name).await?;
10621069
}

server/src/handlers/http/middleware.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,16 @@ pub fn auth_stream_context(
190190
action: Action,
191191
) -> Result<rbac::Response, Error> {
192192
let creds = extract_session_key(req);
193-
let stream = req.match_info().get("logstream");
193+
let mut stream = req.match_info().get("logstream");
194+
if stream.is_none() {
195+
if let Some((_, stream_name)) = req
196+
.headers()
197+
.iter()
198+
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
199+
{
200+
stream = Some(stream_name.to_str().unwrap());
201+
}
202+
}
194203
creds.map(|key| Users.authorize(key, action, stream, None))
195204
}
196205

server/src/handlers/http/modal/ingest_server.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ impl IngestServer {
185185
.service(Self::analytics_factory())
186186
.service(Server::get_liveness_factory())
187187
.service(Self::get_user_webscope())
188+
.service(Server::get_user_role_webscope())
188189
.service(Server::get_metrics_webscope())
189190
.service(Server::get_readiness_factory()),
190191
)

server/src/handlers/http/role.rs

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,29 +17,38 @@
1717
*/
1818

1919
use actix_web::{http::header::ContentType, web, HttpResponse, Responder};
20+
use bytes::Bytes;
2021
use http::StatusCode;
2122

2223
use crate::{
23-
option::CONFIG,
24+
option::{Mode, CONFIG},
2425
rbac::{
2526
map::{mut_roles, DEFAULT_ROLE},
2627
role::model::DefaultPrivilege,
2728
},
2829
storage::{self, ObjectStorageError, StorageMetadata},
2930
};
3031

32+
use super::cluster::sync_role_update_with_ingestors;
33+
3134
// Handler for PUT /api/v1/role/{name}
3235
// Creates a new role or update existing one
33-
pub async fn put(
34-
name: web::Path<String>,
35-
body: web::Json<Vec<DefaultPrivilege>>,
36-
) -> Result<impl Responder, RoleError> {
36+
pub async fn put(name: web::Path<String>, body: Bytes) -> Result<impl Responder, RoleError> {
3737
let name = name.into_inner();
38-
let privileges = body.into_inner();
38+
let privileges = serde_json::from_slice::<Vec<DefaultPrivilege>>(&body)?;
3939
let mut metadata = get_metadata().await?;
4040
metadata.roles.insert(name.clone(), privileges.clone());
41-
put_metadata(&metadata).await?;
42-
mut_roles().insert(name, privileges);
41+
if CONFIG.parseable.mode == Mode::Ingest {
42+
let _ = storage::put_staging_metadata(&metadata);
43+
mut_roles().insert(name.clone(), privileges.clone());
44+
} else {
45+
put_metadata(&metadata).await?;
46+
mut_roles().insert(name.clone(), privileges.clone());
47+
if CONFIG.parseable.mode == Mode::Query {
48+
sync_role_update_with_ingestors(name.clone(), privileges.clone()).await?;
49+
}
50+
}
51+
4352
Ok(HttpResponse::Ok().finish())
4453
}
4554

@@ -118,13 +127,22 @@ pub enum RoleError {
118127
ObjectStorageError(#[from] ObjectStorageError),
119128
#[error("Cannot perform this operation as role is assigned to an existing user.")]
120129
RoleInUse,
130+
#[error("Error: {0}")]
131+
Anyhow(#[from] anyhow::Error),
132+
#[error("{0}")]
133+
SerdeError(#[from] serde_json::Error),
134+
#[error("Network Error: {0}")]
135+
Network(#[from] reqwest::Error),
121136
}
122137

123138
impl actix_web::ResponseError for RoleError {
124139
fn status_code(&self) -> http::StatusCode {
125140
match self {
126141
Self::ObjectStorageError(_) => StatusCode::INTERNAL_SERVER_ERROR,
127142
Self::RoleInUse => StatusCode::BAD_REQUEST,
143+
Self::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR,
144+
Self::SerdeError(_) => StatusCode::BAD_REQUEST,
145+
Self::Network(_) => StatusCode::BAD_GATEWAY,
128146
}
129147
}
130148

server/src/rbac/role.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ pub mod model {
234234
Action::GetDashboard,
235235
Action::CreateDashboard,
236236
Action::DeleteDashboard,
237+
Action::GetUserRoles,
237238
],
238239
stream: Some("*".to_string()),
239240
tag: None,
@@ -269,6 +270,7 @@ pub mod model {
269270
Action::ListFilter,
270271
Action::CreateFilter,
271272
Action::DeleteFilter,
273+
Action::GetUserRoles,
272274
],
273275
stream: None,
274276
tag: None,
@@ -294,6 +296,7 @@ pub mod model {
294296
Action::CreateDashboard,
295297
Action::DeleteDashboard,
296298
Action::GetStreamInfo,
299+
Action::GetUserRoles,
297300
],
298301
stream: None,
299302
tag: None,

0 commit comments

Comments
 (0)