Skip to content

Commit b4a2b4b

Browse files
fix: auth flow for query and permission assignment for ListStream (parseablehq#1048)
--------- Co-authored-by: Nikhil Sinha <[email protected]>
1 parent 59cdecc commit b4a2b4b

File tree

10 files changed

+147
-130
lines changed

10 files changed

+147
-130
lines changed

src/correlation/correlation_utils.rs

Lines changed: 0 additions & 62 deletions
This file was deleted.

src/correlation/mod.rs

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,8 @@
1818

1919
use std::collections::HashSet;
2020

21-
use actix_web::http::header::ContentType;
21+
use actix_web::{http::header::ContentType, Error};
2222
use chrono::Utc;
23-
use correlation_utils::user_auth_for_query;
2423
use datafusion::error::DataFusionError;
2524
use http::StatusCode;
2625
use itertools::Itertools;
@@ -31,12 +30,15 @@ use tokio::sync::RwLock;
3130
use tracing::{error, trace, warn};
3231

3332
use crate::{
34-
handlers::http::rbac::RBACError, option::CONFIG, query::QUERY_SESSION, rbac::map::SessionKey,
35-
storage::ObjectStorageError, users::filters::FilterQuery, utils::get_hash,
33+
handlers::http::rbac::RBACError,
34+
option::CONFIG,
35+
query::QUERY_SESSION,
36+
rbac::{map::SessionKey, Users},
37+
storage::ObjectStorageError,
38+
users::filters::FilterQuery,
39+
utils::{get_hash, user_auth_for_query},
3640
};
3741

38-
pub mod correlation_utils;
39-
4042
pub static CORRELATIONS: Lazy<Correlation> = Lazy::new(Correlation::default);
4143

4244
#[derive(Debug, Default)]
@@ -77,11 +79,15 @@ impl Correlation {
7779
let correlations = self.0.read().await.iter().cloned().collect_vec();
7880

7981
let mut user_correlations = vec![];
82+
let permissions = Users.get_permissions(session_key);
83+
8084
for c in correlations {
81-
if user_auth_for_query(session_key, &c.table_configs)
82-
.await
83-
.is_ok()
84-
{
85+
let tables = &c
86+
.table_configs
87+
.iter()
88+
.map(|t| t.table_name.clone())
89+
.collect_vec();
90+
if user_auth_for_query(&permissions, tables).is_ok() {
8591
user_correlations.push(c);
8692
}
8793
}
@@ -220,7 +226,14 @@ impl CorrelationRequest {
220226
}
221227

222228
// check if user has access to table
223-
user_auth_for_query(session_key, &self.table_configs).await?;
229+
let permissions = Users.get_permissions(session_key);
230+
let tables = &self
231+
.table_configs
232+
.iter()
233+
.map(|t| t.table_name.clone())
234+
.collect_vec();
235+
236+
user_auth_for_query(&permissions, tables)?;
224237

225238
// to validate table config, we need to check whether the mentioned fields
226239
// are present in the table or not
@@ -271,6 +284,8 @@ pub enum CorrelationError {
271284
Unauthorized,
272285
#[error("DataFusion Error: {0}")]
273286
DataFusion(#[from] DataFusionError),
287+
#[error("{0}")]
288+
ActixError(#[from] Error),
274289
}
275290

276291
impl actix_web::ResponseError for CorrelationError {
@@ -283,6 +298,7 @@ impl actix_web::ResponseError for CorrelationError {
283298
Self::AnyhowError(_) => StatusCode::INTERNAL_SERVER_ERROR,
284299
Self::Unauthorized => StatusCode::BAD_REQUEST,
285300
Self::DataFusion(_) => StatusCode::INTERNAL_SERVER_ERROR,
301+
Self::ActixError(_) => StatusCode::BAD_REQUEST,
286302
}
287303
}
288304

src/handlers/airplane.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,7 @@ use tonic::transport::{Identity, Server, ServerTlsConfig};
3434
use tonic_web::GrpcWebLayer;
3535

3636
use crate::handlers::http::cluster::get_ingestor_info;
37-
use crate::handlers::http::query::{
38-
authorize_and_set_filter_tags, into_query, update_schema_when_distributed,
39-
};
37+
use crate::handlers::http::query::{into_query, update_schema_when_distributed};
4038
use crate::handlers::livetail::cross_origin_config;
4139
use crate::metrics::QUERY_EXECUTE_TIME;
4240
use crate::option::CONFIG;
@@ -46,6 +44,7 @@ use crate::utils::arrow::flight::{
4644
send_to_ingester,
4745
};
4846
use crate::utils::time::TimeRange;
47+
use crate::utils::user_auth_for_query;
4948
use arrow_flight::{
5049
flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData,
5150
FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc,
@@ -162,7 +161,7 @@ impl FlightService for AirServiceImpl {
162161
.map_err(|err| Status::internal(err.to_string()))?;
163162

164163
// map payload to query
165-
let mut query = into_query(&ticket, &session_state, time_range)
164+
let query = into_query(&ticket, &session_state, time_range)
166165
.await
167166
.map_err(|_| Status::internal("Failed to parse query"))?;
168167

@@ -202,7 +201,7 @@ impl FlightService for AirServiceImpl {
202201
rbac::Response::Authorized => (),
203202
rbac::Response::UnAuthorized => {
204203
return Err(Status::permission_denied(
205-
"user is not authenticated to access this resource",
204+
"user is not authorized to access this resource",
206205
))
207206
}
208207
rbac::Response::ReloadRequired => {
@@ -212,7 +211,7 @@ impl FlightService for AirServiceImpl {
212211

213212
let permissions = Users.get_permissions(&key);
214213

215-
authorize_and_set_filter_tags(&mut query, permissions, &streams).map_err(|_| {
214+
authorize_and_set_filter_tags(&mut query, permissions, &stream_name).map_err(|_| {
216215
Status::permission_denied("User Does not have permission to access this")
217216
})?;
218217
let time = Instant::now();

src/handlers/http/correlation.rs

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,17 @@
1818

1919
use actix_web::{web, HttpRequest, Responder};
2020
use bytes::Bytes;
21+
use itertools::Itertools;
2122
use relative_path::RelativePathBuf;
2223

24+
use crate::rbac::Users;
25+
use crate::utils::user_auth_for_query;
2326
use crate::{
2427
option::CONFIG, storage::CORRELATIONS_ROOT_DIRECTORY,
2528
utils::actix::extract_session_key_from_req,
2629
};
2730

28-
use crate::correlation::{
29-
correlation_utils::user_auth_for_query, CorrelationConfig, CorrelationError,
30-
CorrelationRequest, CORRELATIONS,
31-
};
31+
use crate::correlation::{CorrelationConfig, CorrelationError, CorrelationRequest, CORRELATIONS};
3232

3333
pub async fn list(req: HttpRequest) -> Result<impl Responder, CorrelationError> {
3434
let session_key = extract_session_key_from_req(&req)
@@ -52,14 +52,17 @@ pub async fn get(req: HttpRequest) -> Result<impl Responder, CorrelationError> {
5252

5353
let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?;
5454

55-
if user_auth_for_query(&session_key, &correlation.table_configs)
56-
.await
57-
.is_ok()
58-
{
59-
Ok(web::Json(correlation))
60-
} else {
61-
Err(CorrelationError::Unauthorized)
62-
}
55+
let permissions = Users.get_permissions(&session_key);
56+
57+
let tables = &correlation
58+
.table_configs
59+
.iter()
60+
.map(|t| t.table_name.clone())
61+
.collect_vec();
62+
63+
user_auth_for_query(&permissions, tables)?;
64+
65+
Ok(web::Json(correlation))
6366
}
6467

6568
pub async fn post(req: HttpRequest, body: Bytes) -> Result<impl Responder, CorrelationError> {
@@ -93,7 +96,14 @@ pub async fn modify(req: HttpRequest, body: Bytes) -> Result<impl Responder, Cor
9396

9497
// validate whether user has access to this correlation object or not
9598
let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?;
96-
user_auth_for_query(&session_key, &correlation.table_configs).await?;
99+
let permissions = Users.get_permissions(&session_key);
100+
let tables = &correlation
101+
.table_configs
102+
.iter()
103+
.map(|t| t.table_name.clone())
104+
.collect_vec();
105+
106+
user_auth_for_query(&permissions, tables)?;
97107

98108
let correlation_request: CorrelationRequest = serde_json::from_slice(&body)?;
99109
correlation_request.validate(&session_key).await?;
@@ -122,7 +132,14 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, CorrelationError
122132
let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?;
123133

124134
// validate user's query auth
125-
user_auth_for_query(&session_key, &correlation.table_configs).await?;
135+
let permissions = Users.get_permissions(&session_key);
136+
let tables = &correlation
137+
.table_configs
138+
.iter()
139+
.map(|t| t.table_name.clone())
140+
.collect_vec();
141+
142+
user_auth_for_query(&permissions, tables)?;
126143

127144
// Delete from disk
128145
let store = CONFIG.storage().get_object_store();

src/handlers/http/logstream.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,12 @@ use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
3131
use crate::metadata::{SchemaVersion, STREAM_INFO};
3232
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
3333
use crate::option::{Mode, CONFIG};
34+
use crate::rbac::role::Action;
35+
use crate::rbac::Users;
3436
use crate::stats::{event_labels_date, storage_size_labels_date, Stats};
3537
use crate::storage::StreamType;
3638
use crate::storage::{retention::Retention, StorageDir, StreamInfo};
39+
use crate::utils::actix::extract_session_key_from_req;
3740
use crate::{event, stats};
3841

3942
use crate::{metadata, validator};
@@ -45,6 +48,7 @@ use arrow_schema::{Field, Schema};
4548
use bytes::Bytes;
4649
use chrono::Utc;
4750
use http::{HeaderName, HeaderValue};
51+
use itertools::Itertools;
4852
use serde_json::Value;
4953
use std::collections::HashMap;
5054
use std::fs;
@@ -85,16 +89,27 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
8589
Ok((format!("log stream {stream_name} deleted"), StatusCode::OK))
8690
}
8791

88-
pub async fn list(_: HttpRequest) -> impl Responder {
89-
//list all streams from storage
92+
pub async fn list(req: HttpRequest) -> Result<impl Responder, StreamError> {
93+
let key = extract_session_key_from_req(&req)
94+
.map_err(|err| StreamError::Anyhow(anyhow::Error::msg(err.to_string())))?;
95+
96+
// list all streams from storage
9097
let res = CONFIG
9198
.storage()
9299
.get_object_store()
93100
.list_streams()
94101
.await
95-
.unwrap();
102+
.unwrap()
103+
.into_iter()
104+
.filter(|logstream| {
105+
warn!("logstream-\n{logstream:?}");
106+
107+
Users.authorize(key.clone(), Action::ListStream, Some(&logstream.name), None)
108+
== crate::rbac::Response::Authorized
109+
})
110+
.collect_vec();
96111

97-
web::Json(res)
112+
Ok(web::Json(res))
98113
}
99114

100115
pub async fn detect_schema(body: Bytes) -> Result<impl Responder, StreamError> {

0 commit comments

Comments
 (0)