Skip to content

Commit fa5d46f

Browse files
committed
refactor: moved query auth function to utils
1 parent f59cc8a commit fa5d46f

File tree

6 files changed

+101
-139
lines changed

6 files changed

+101
-139
lines changed

src/correlation/correlation_utils.rs

Lines changed: 0 additions & 62 deletions
This file was deleted.

src/correlation/mod.rs

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,8 @@
1818

1919
use std::collections::HashSet;
2020

21-
use actix_web::http::header::ContentType;
21+
use actix_web::{http::header::ContentType, Error};
2222
use chrono::Utc;
23-
use correlation_utils::user_auth_for_query;
2423
use datafusion::error::DataFusionError;
2524
use http::StatusCode;
2625
use itertools::Itertools;
@@ -31,12 +30,15 @@ use tokio::sync::RwLock;
3130
use tracing::{error, trace, warn};
3231

3332
use crate::{
34-
handlers::http::rbac::RBACError, option::CONFIG, query::QUERY_SESSION, rbac::map::SessionKey,
35-
storage::ObjectStorageError, users::filters::FilterQuery, utils::get_hash,
33+
handlers::http::rbac::RBACError,
34+
option::CONFIG,
35+
query::QUERY_SESSION,
36+
rbac::{map::SessionKey, Users},
37+
storage::ObjectStorageError,
38+
users::filters::FilterQuery,
39+
utils::{get_hash, user_auth_for_query},
3640
};
3741

38-
pub mod correlation_utils;
39-
4042
pub static CORRELATIONS: Lazy<Correlation> = Lazy::new(Correlation::default);
4143

4244
#[derive(Debug, Default)]
@@ -77,11 +79,15 @@ impl Correlation {
7779
let correlations = self.0.read().await.iter().cloned().collect_vec();
7880

7981
let mut user_correlations = vec![];
82+
let permissions = Users.get_permissions(session_key);
83+
8084
for c in correlations {
81-
if user_auth_for_query(session_key, &c.table_configs)
82-
.await
83-
.is_ok()
84-
{
85+
let tables = &c
86+
.table_configs
87+
.iter()
88+
.map(|t| t.table_name.clone())
89+
.collect_vec();
90+
if user_auth_for_query(&permissions, tables).is_ok() {
8591
user_correlations.push(c);
8692
}
8793
}
@@ -220,7 +226,14 @@ impl CorrelationRequest {
220226
}
221227

222228
// check if user has access to table
223-
user_auth_for_query(session_key, &self.table_configs).await?;
229+
let permissions = Users.get_permissions(session_key);
230+
let tables = &self
231+
.table_configs
232+
.iter()
233+
.map(|t| t.table_name.clone())
234+
.collect_vec();
235+
236+
user_auth_for_query(&permissions, tables)?;
224237

225238
// to validate table config, we need to check whether the mentioned fields
226239
// are present in the table or not
@@ -271,6 +284,8 @@ pub enum CorrelationError {
271284
Unauthorized,
272285
#[error("DataFusion Error: {0}")]
273286
DataFusion(#[from] DataFusionError),
287+
#[error("{0}")]
288+
ActixError(#[from] Error),
274289
}
275290

276291
impl actix_web::ResponseError for CorrelationError {
@@ -283,6 +298,7 @@ impl actix_web::ResponseError for CorrelationError {
283298
Self::AnyhowError(_) => StatusCode::INTERNAL_SERVER_ERROR,
284299
Self::Unauthorized => StatusCode::BAD_REQUEST,
285300
Self::DataFusion(_) => StatusCode::INTERNAL_SERVER_ERROR,
301+
Self::ActixError(_) => StatusCode::BAD_REQUEST,
286302
}
287303
}
288304

src/handlers/airplane.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,7 @@ use tonic::transport::{Identity, Server, ServerTlsConfig};
3434
use tonic_web::GrpcWebLayer;
3535

3636
use crate::handlers::http::cluster::get_ingestor_info;
37-
use crate::handlers::http::query::{
38-
authorize_and_set_filter_tags, into_query, update_schema_when_distributed,
39-
};
37+
use crate::handlers::http::query::{into_query, update_schema_when_distributed};
4038
use crate::handlers::livetail::cross_origin_config;
4139
use crate::metrics::QUERY_EXECUTE_TIME;
4240
use crate::option::CONFIG;
@@ -46,6 +44,7 @@ use crate::utils::arrow::flight::{
4644
send_to_ingester,
4745
};
4846
use crate::utils::time::TimeRange;
47+
use crate::utils::user_auth_for_query;
4948
use arrow_flight::{
5049
flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData,
5150
FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc,
@@ -162,7 +161,7 @@ impl FlightService for AirServiceImpl {
162161
.map_err(|err| Status::internal(err.to_string()))?;
163162

164163
// map payload to query
165-
let mut query = into_query(&ticket, &session_state, time_range)
164+
let query = into_query(&ticket, &session_state, time_range)
166165
.await
167166
.map_err(|_| Status::internal("Failed to parse query"))?;
168167

@@ -202,7 +201,7 @@ impl FlightService for AirServiceImpl {
202201
rbac::Response::Authorized => (),
203202
rbac::Response::UnAuthorized => {
204203
return Err(Status::permission_denied(
205-
"user is not authenticated to access this resource",
204+
"user is not authorized to access this resource",
206205
))
207206
}
208207
rbac::Response::ReloadRequired => {
@@ -212,7 +211,7 @@ impl FlightService for AirServiceImpl {
212211

213212
let permissions = Users.get_permissions(&key);
214213

215-
authorize_and_set_filter_tags(&mut query, permissions, &streams).map_err(|_| {
214+
user_auth_for_query(&permissions, &streams).map_err(|_| {
216215
Status::permission_denied("User Does not have permission to access this")
217216
})?;
218217
let time = Instant::now();

src/handlers/http/correlation.rs

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,17 @@
1818

1919
use actix_web::{web, HttpRequest, Responder};
2020
use bytes::Bytes;
21+
use itertools::Itertools;
2122
use relative_path::RelativePathBuf;
2223

24+
use crate::rbac::Users;
25+
use crate::utils::user_auth_for_query;
2326
use crate::{
2427
option::CONFIG, storage::CORRELATIONS_ROOT_DIRECTORY,
2528
utils::actix::extract_session_key_from_req,
2629
};
2730

28-
use crate::correlation::{
29-
correlation_utils::user_auth_for_query, CorrelationConfig, CorrelationError,
30-
CorrelationRequest, CORRELATIONS,
31-
};
31+
use crate::correlation::{CorrelationConfig, CorrelationError, CorrelationRequest, CORRELATIONS};
3232

3333
pub async fn list(req: HttpRequest) -> Result<impl Responder, CorrelationError> {
3434
let session_key = extract_session_key_from_req(&req)
@@ -52,14 +52,17 @@ pub async fn get(req: HttpRequest) -> Result<impl Responder, CorrelationError> {
5252

5353
let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?;
5454

55-
if user_auth_for_query(&session_key, &correlation.table_configs)
56-
.await
57-
.is_ok()
58-
{
59-
Ok(web::Json(correlation))
60-
} else {
61-
Err(CorrelationError::Unauthorized)
62-
}
55+
let permissions = Users.get_permissions(&session_key);
56+
57+
let tables = &correlation
58+
.table_configs
59+
.iter()
60+
.map(|t| t.table_name.clone())
61+
.collect_vec();
62+
63+
user_auth_for_query(&permissions, tables)?;
64+
65+
Ok(web::Json(correlation))
6366
}
6467

6568
pub async fn post(req: HttpRequest, body: Bytes) -> Result<impl Responder, CorrelationError> {
@@ -93,7 +96,14 @@ pub async fn modify(req: HttpRequest, body: Bytes) -> Result<impl Responder, Cor
9396

9497
// validate whether user has access to this correlation object or not
9598
let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?;
96-
user_auth_for_query(&session_key, &correlation.table_configs).await?;
99+
let permissions = Users.get_permissions(&session_key);
100+
let tables = &correlation
101+
.table_configs
102+
.iter()
103+
.map(|t| t.table_name.clone())
104+
.collect_vec();
105+
106+
user_auth_for_query(&permissions, tables)?;
97107

98108
let correlation_request: CorrelationRequest = serde_json::from_slice(&body)?;
99109
correlation_request.validate(&session_key).await?;
@@ -122,7 +132,14 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, CorrelationError
122132
let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?;
123133

124134
// validate user's query auth
125-
user_auth_for_query(&session_key, &correlation.table_configs).await?;
135+
let permissions = Users.get_permissions(&session_key);
136+
let tables = &correlation
137+
.table_configs
138+
.iter()
139+
.map(|t| t.table_name.clone())
140+
.collect_vec();
141+
142+
user_auth_for_query(&permissions, tables)?;
126143

127144
// Delete from disk
128145
let store = CONFIG.storage().get_object_store();

src/handlers/http/query.rs

Lines changed: 3 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,13 @@ use crate::option::{Mode, CONFIG};
4141
use crate::query::error::ExecuteError;
4242
use crate::query::Query as LogicalQuery;
4343
use crate::query::{TableScanVisitor, QUERY_SESSION};
44-
use crate::rbac::role::{Action, Permission};
4544
use crate::rbac::Users;
4645
use crate::response::QueryResponse;
4746
use crate::storage::object_storage::commit_schema_to_storage;
4847
use crate::storage::ObjectStorageError;
4948
use crate::utils::actix::extract_session_key_from_req;
5049
use crate::utils::time::{TimeParseError, TimeRange};
50+
use crate::utils::user_auth_for_query;
5151

5252
use super::modal::utils::logstream_utils::create_stream_and_schema_from_storage;
5353

@@ -91,7 +91,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
9191

9292
let tables = visitor.into_inner();
9393
update_schema_when_distributed(&tables).await?;
94-
let mut query: LogicalQuery = into_query(&query_request, &session_state, time_range).await?;
94+
let query: LogicalQuery = into_query(&query_request, &session_state, time_range).await?;
9595

9696
let creds = extract_session_key_from_req(&req)?;
9797
let permissions = Users.get_permissions(&creds);
@@ -100,7 +100,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
100100
.first_table_name()
101101
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?;
102102

103-
authorize_and_set_filter_tags(&mut query, permissions, &tables)?;
103+
user_auth_for_query(&permissions, &tables)?;
104104

105105
let time = Instant::now();
106106
let (records, fields) = query.execute(table_name.clone()).await?;
@@ -153,49 +153,6 @@ pub async fn create_streams_for_querier() {
153153
}
154154
}
155155

156-
pub fn authorize_and_set_filter_tags(
157-
query: &mut LogicalQuery,
158-
permissions: Vec<Permission>,
159-
tables: &[String],
160-
) -> Result<(), QueryError> {
161-
for table_name in tables.iter() {
162-
// check authorization of this query if it references physical table;
163-
let mut authorized = false;
164-
165-
let mut tags = Vec::new();
166-
167-
// in permission check if user can run query on the stream.
168-
// also while iterating add any filter tags for this stream
169-
for permission in &permissions {
170-
match permission {
171-
Permission::Stream(Action::All, _) => {
172-
authorized = true;
173-
break;
174-
}
175-
Permission::StreamWithTag(Action::Query, ref stream, tag)
176-
if stream == table_name || stream == "*" =>
177-
{
178-
authorized = true;
179-
if let Some(tag) = tag {
180-
tags.push(tag.clone())
181-
}
182-
}
183-
_ => (),
184-
}
185-
}
186-
187-
if !authorized {
188-
return Err(QueryError::Unauthorized);
189-
}
190-
191-
if !tags.is_empty() {
192-
query.filter_tag = Some(tags)
193-
}
194-
}
195-
196-
Ok(())
197-
}
198-
199156
impl FromRequest for Query {
200157
type Error = actix_web::Error;
201158
type Future = Pin<Box<dyn Future<Output = Result<Self, Self::Error>>>>;

0 commit comments

Comments
 (0)