diff --git a/src/correlation/correlation_utils.rs b/src/correlation/correlation_utils.rs
deleted file mode 100644
index 0975836bf..000000000
--- a/src/correlation/correlation_utils.rs
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Parseable Server (C) 2022 - 2024 Parseable, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- *
- */
-
-use itertools::Itertools;
-
-use crate::rbac::{
- map::SessionKey,
- role::{Action, Permission},
- Users,
-};
-
-use super::{CorrelationError, TableConfig};
-
-pub async fn user_auth_for_query(
- session_key: &SessionKey,
- table_configs: &[TableConfig],
-) -> Result<(), CorrelationError> {
- let tables = table_configs.iter().map(|t| &t.table_name).collect_vec();
- let permissions = Users.get_permissions(session_key);
-
- for table_name in tables {
- let mut authorized = false;
-
- // in permission check if user can run query on the stream.
- // also while iterating add any filter tags for this stream
- for permission in permissions.iter() {
- match permission {
- Permission::Stream(Action::All, _) => {
- authorized = true;
- break;
- }
- Permission::StreamWithTag(Action::Query, ref stream, _)
- if stream == table_name || stream == "*" =>
- {
- authorized = true;
- }
- _ => (),
- }
- }
-
- if !authorized {
- return Err(CorrelationError::Unauthorized);
- }
- }
-
- Ok(())
-}
diff --git a/src/correlation/mod.rs b/src/correlation/mod.rs
index a5022407a..9c1f6fb89 100644
--- a/src/correlation/mod.rs
+++ b/src/correlation/mod.rs
@@ -18,9 +18,8 @@
use std::collections::HashSet;
-use actix_web::http::header::ContentType;
+use actix_web::{http::header::ContentType, Error};
use chrono::Utc;
-use correlation_utils::user_auth_for_query;
use datafusion::error::DataFusionError;
use http::StatusCode;
use itertools::Itertools;
@@ -31,12 +30,15 @@ use tokio::sync::RwLock;
use tracing::{error, trace, warn};
use crate::{
- handlers::http::rbac::RBACError, option::CONFIG, query::QUERY_SESSION, rbac::map::SessionKey,
- storage::ObjectStorageError, users::filters::FilterQuery, utils::get_hash,
+ handlers::http::rbac::RBACError,
+ option::CONFIG,
+ query::QUERY_SESSION,
+ rbac::{map::SessionKey, Users},
+ storage::ObjectStorageError,
+ users::filters::FilterQuery,
+ utils::{get_hash, user_auth_for_query},
};
-pub mod correlation_utils;
-
pub static CORRELATIONS: Lazy = Lazy::new(Correlation::default);
#[derive(Debug, Default)]
@@ -77,11 +79,15 @@ impl Correlation {
let correlations = self.0.read().await.iter().cloned().collect_vec();
let mut user_correlations = vec![];
+ let permissions = Users.get_permissions(session_key);
+
for c in correlations {
- if user_auth_for_query(session_key, &c.table_configs)
- .await
- .is_ok()
- {
+ let tables = &c
+ .table_configs
+ .iter()
+ .map(|t| t.table_name.clone())
+ .collect_vec();
+ if user_auth_for_query(&permissions, tables).is_ok() {
user_correlations.push(c);
}
}
@@ -220,7 +226,14 @@ impl CorrelationRequest {
}
// check if user has access to table
- user_auth_for_query(session_key, &self.table_configs).await?;
+ let permissions = Users.get_permissions(session_key);
+ let tables = &self
+ .table_configs
+ .iter()
+ .map(|t| t.table_name.clone())
+ .collect_vec();
+
+ user_auth_for_query(&permissions, tables)?;
// to validate table config, we need to check whether the mentioned fields
// are present in the table or not
@@ -271,6 +284,8 @@ pub enum CorrelationError {
Unauthorized,
#[error("DataFusion Error: {0}")]
DataFusion(#[from] DataFusionError),
+ #[error("{0}")]
+ ActixError(#[from] Error),
}
impl actix_web::ResponseError for CorrelationError {
@@ -283,6 +298,7 @@ impl actix_web::ResponseError for CorrelationError {
Self::AnyhowError(_) => StatusCode::INTERNAL_SERVER_ERROR,
Self::Unauthorized => StatusCode::BAD_REQUEST,
Self::DataFusion(_) => StatusCode::INTERNAL_SERVER_ERROR,
+ Self::ActixError(_) => StatusCode::BAD_REQUEST,
}
}
diff --git a/src/handlers/airplane.rs b/src/handlers/airplane.rs
index e910c7035..899157df9 100644
--- a/src/handlers/airplane.rs
+++ b/src/handlers/airplane.rs
@@ -34,9 +34,7 @@ use tonic::transport::{Identity, Server, ServerTlsConfig};
use tonic_web::GrpcWebLayer;
use crate::handlers::http::cluster::get_ingestor_info;
-use crate::handlers::http::query::{
- authorize_and_set_filter_tags, into_query, update_schema_when_distributed,
-};
+use crate::handlers::http::query::{into_query, update_schema_when_distributed};
use crate::handlers::livetail::cross_origin_config;
use crate::metrics::QUERY_EXECUTE_TIME;
use crate::option::CONFIG;
@@ -46,6 +44,7 @@ use crate::utils::arrow::flight::{
send_to_ingester,
};
use crate::utils::time::TimeRange;
+use crate::utils::user_auth_for_query;
use arrow_flight::{
flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData,
FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc,
@@ -157,12 +156,12 @@ impl FlightService for AirServiceImpl {
.ok_or_else(|| Status::aborted("Malformed SQL Provided, Table Name Not Found"))?
.to_owned();
- update_schema_when_distributed(streams)
+ update_schema_when_distributed(&streams)
.await
.map_err(|err| Status::internal(err.to_string()))?;
// map payload to query
- let mut query = into_query(&ticket, &session_state, time_range)
+ let query = into_query(&ticket, &session_state, time_range)
.await
.map_err(|_| Status::internal("Failed to parse query"))?;
@@ -202,7 +201,7 @@ impl FlightService for AirServiceImpl {
rbac::Response::Authorized => (),
rbac::Response::UnAuthorized => {
return Err(Status::permission_denied(
- "user is not authenticated to access this resource",
+ "user is not authorized to access this resource",
))
}
rbac::Response::ReloadRequired => {
@@ -212,7 +211,7 @@ impl FlightService for AirServiceImpl {
let permissions = Users.get_permissions(&key);
- authorize_and_set_filter_tags(&mut query, permissions, &stream_name).map_err(|_| {
+ user_auth_for_query(&permissions, &streams).map_err(|_| {
Status::permission_denied("User Does not have permission to access this")
})?;
let time = Instant::now();
diff --git a/src/handlers/http/correlation.rs b/src/handlers/http/correlation.rs
index dc5799c01..df0e7a86e 100644
--- a/src/handlers/http/correlation.rs
+++ b/src/handlers/http/correlation.rs
@@ -18,17 +18,17 @@
use actix_web::{web, HttpRequest, Responder};
use bytes::Bytes;
+use itertools::Itertools;
use relative_path::RelativePathBuf;
+use crate::rbac::Users;
+use crate::utils::user_auth_for_query;
use crate::{
option::CONFIG, storage::CORRELATIONS_ROOT_DIRECTORY,
utils::actix::extract_session_key_from_req,
};
-use crate::correlation::{
- correlation_utils::user_auth_for_query, CorrelationConfig, CorrelationError,
- CorrelationRequest, CORRELATIONS,
-};
+use crate::correlation::{CorrelationConfig, CorrelationError, CorrelationRequest, CORRELATIONS};
pub async fn list(req: HttpRequest) -> Result {
let session_key = extract_session_key_from_req(&req)
@@ -52,14 +52,17 @@ pub async fn get(req: HttpRequest) -> Result {
let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?;
- if user_auth_for_query(&session_key, &correlation.table_configs)
- .await
- .is_ok()
- {
- Ok(web::Json(correlation))
- } else {
- Err(CorrelationError::Unauthorized)
- }
+ let permissions = Users.get_permissions(&session_key);
+
+ let tables = &correlation
+ .table_configs
+ .iter()
+ .map(|t| t.table_name.clone())
+ .collect_vec();
+
+ user_auth_for_query(&permissions, tables)?;
+
+ Ok(web::Json(correlation))
}
pub async fn post(req: HttpRequest, body: Bytes) -> Result {
@@ -93,7 +96,14 @@ pub async fn modify(req: HttpRequest, body: Bytes) -> Result Result Result {
Ok((format!("log stream {stream_name} deleted"), StatusCode::OK))
}
-pub async fn list(_: HttpRequest) -> impl Responder {
- //list all streams from storage
+pub async fn list(req: HttpRequest) -> Result {
+ let key = extract_session_key_from_req(&req)
+ .map_err(|err| StreamError::Anyhow(anyhow::Error::msg(err.to_string())))?;
+
+ // list all streams from storage
let res = CONFIG
.storage()
.get_object_store()
.list_streams()
.await
- .unwrap();
+ .unwrap()
+ .into_iter()
+ .filter(|logstream| {
+ warn!("logstream-\n{logstream:?}");
+
+ Users.authorize(key.clone(), Action::ListStream, Some(&logstream.name), None)
+ == crate::rbac::Response::Authorized
+ })
+ .collect_vec();
- web::Json(res)
+ Ok(web::Json(res))
}
pub async fn detect_schema(body: Bytes) -> Result {
@@ -130,7 +145,7 @@ pub async fn schema(req: HttpRequest) -> Result {
}
Err(err) => return Err(StreamError::from(err)),
};
- match update_schema_when_distributed(vec![stream_name.clone()]).await {
+ match update_schema_when_distributed(&vec![stream_name.clone()]).await {
Ok(_) => {
let schema = STREAM_INFO.schema(&stream_name)?;
Ok((web::Json(schema), StatusCode::OK))
diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs
index 895032d20..7cecb1416 100644
--- a/src/handlers/http/query.rs
+++ b/src/handlers/http/query.rs
@@ -41,13 +41,13 @@ use crate::option::{Mode, CONFIG};
use crate::query::error::ExecuteError;
use crate::query::Query as LogicalQuery;
use crate::query::{TableScanVisitor, QUERY_SESSION};
-use crate::rbac::role::{Action, Permission};
use crate::rbac::Users;
use crate::response::QueryResponse;
use crate::storage::object_storage::commit_schema_to_storage;
use crate::storage::ObjectStorageError;
use crate::utils::actix::extract_session_key_from_req;
use crate::utils::time::{TimeParseError, TimeRange};
+use crate::utils::user_auth_for_query;
use super::modal::utils::logstream_utils::create_stream_and_schema_from_storage;
@@ -85,13 +85,13 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result Result) -> Result<(), QueryError> {
+pub async fn update_schema_when_distributed(tables: &Vec) -> Result<(), QueryError> {
if CONFIG.parseable.mode == Mode::Query {
for table in tables {
- if let Ok(new_schema) = fetch_schema(&table).await {
+ if let Ok(new_schema) = fetch_schema(table).await {
// commit schema merges the schema internally and updates the schema in storage.
- commit_schema_to_storage(&table, new_schema.clone()).await?;
+ commit_schema_to_storage(table, new_schema.clone()).await?;
- commit_schema(&table, Arc::new(new_schema))?;
+ commit_schema(table, Arc::new(new_schema))?;
}
}
}
@@ -153,46 +153,6 @@ pub async fn create_streams_for_querier() {
}
}
-pub fn authorize_and_set_filter_tags(
- query: &mut LogicalQuery,
- permissions: Vec,
- table_name: &str,
-) -> Result<(), QueryError> {
- // check authorization of this query if it references physical table;
- let mut authorized = false;
- let mut tags = Vec::new();
-
- // in permission check if user can run query on the stream.
- // also while iterating add any filter tags for this stream
- for permission in permissions {
- match permission {
- Permission::Stream(Action::All, _) => {
- authorized = true;
- break;
- }
- Permission::StreamWithTag(Action::Query, ref stream, tag)
- if stream == table_name || stream == "*" =>
- {
- authorized = true;
- if let Some(tag) = tag {
- tags.push(tag)
- }
- }
- _ => (),
- }
- }
-
- if !authorized {
- return Err(QueryError::Unauthorized);
- }
-
- if !tags.is_empty() {
- query.filter_tag = Some(tags)
- }
-
- Ok(())
-}
-
impl FromRequest for Query {
type Error = actix_web::Error;
type Future = Pin>>>;
diff --git a/src/rbac/mod.rs b/src/rbac/mod.rs
index adee85920..463c6cfe0 100644
--- a/src/rbac/mod.rs
+++ b/src/rbac/mod.rs
@@ -33,6 +33,7 @@ use self::map::SessionKey;
use self::role::{Permission, RoleBuilder};
use self::user::UserType;
+#[derive(PartialEq)]
pub enum Response {
Authorized,
UnAuthorized,
diff --git a/src/rbac/role.rs b/src/rbac/role.rs
index f383345ad..bc1deb58e 100644
--- a/src/rbac/role.rs
+++ b/src/rbac/role.rs
@@ -121,7 +121,6 @@ impl RoleBuilder {
| Action::CreateStream
| Action::DeleteStream
| Action::GetStreamInfo
- | Action::ListStream
| Action::ListCluster
| Action::ListClusterMetrics
| Action::CreateCorrelation
@@ -142,6 +141,7 @@ impl RoleBuilder {
| Action::DeleteFilter
| Action::GetAnalytics => Permission::Unit(action),
Action::Ingest
+ | Action::ListStream
| Action::GetSchema
| Action::DetectSchema
| Action::GetStats
diff --git a/src/storage/mod.rs b/src/storage/mod.rs
index 9f2d1d281..9c6ef9a5c 100644
--- a/src/storage/mod.rs
+++ b/src/storage/mod.rs
@@ -197,7 +197,7 @@ impl Default for ObjectStoreFormat {
}
}
-#[derive(serde::Serialize, PartialEq)]
+#[derive(serde::Serialize, PartialEq, Debug)]
pub struct LogStream {
pub name: String,
}
diff --git a/src/utils/mod.rs b/src/utils/mod.rs
index 2db5567bf..0dc5f9765 100644
--- a/src/utils/mod.rs
+++ b/src/utils/mod.rs
@@ -25,6 +25,7 @@ pub mod uid;
pub mod update;
use crate::handlers::http::rbac::RBACError;
use crate::option::CONFIG;
+use crate::rbac::role::{Action, Permission};
use crate::rbac::Users;
use actix::extract_session_key_from_req;
use actix_web::HttpRequest;
@@ -344,6 +345,40 @@ pub fn get_hash(key: &str) -> String {
result
}
+pub fn user_auth_for_query(
+ permissions: &[Permission],
+ tables: &[String],
+) -> Result<(), actix_web::error::Error> {
+ for table_name in tables {
+ let mut authorized = false;
+
+ // in permission check if user can run query on the stream.
+ // also while iterating add any filter tags for this stream
+ for permission in permissions.iter() {
+ match permission {
+ Permission::Stream(Action::All, _) => {
+ authorized = true;
+ break;
+ }
+ Permission::StreamWithTag(Action::Query, ref stream, _)
+ if stream == table_name || stream == "*" =>
+ {
+ authorized = true;
+ }
+ _ => (),
+ }
+ }
+
+ if !authorized {
+ return Err(actix_web::error::ErrorUnauthorized(format!(
+ "User does not have access to stream- {table_name}"
+ )));
+ }
+ }
+
+ Ok(())
+}
+
#[cfg(test)]
mod tests {
use chrono::DateTime;