Skip to content

Commit d18a8d9

Browse files
committed
BugFix: Fixed query auth
Auth flow for query and permission assignment for `ListStream` changed
1 parent 503eaa0 commit d18a8d9

File tree

7 files changed

+64
-41
lines changed

7 files changed

+64
-41
lines changed

src/cli.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,9 @@ impl FromArgMatches for Cli {
529529
self.kafka_host = m.get_one::<String>(Self::KAFKA_HOST).cloned();
530530
self.kafka_group = m.get_one::<String>(Self::KAFKA_GROUP).cloned();
531531
self.kafka_client_id = m.get_one::<String>(Self::KAFKA_CLIENT_ID).cloned();
532-
self.kafka_security_protocol = m.get_one::<SslProtocol>(Self::KAFKA_SECURITY_PROTOCOL).cloned();
532+
self.kafka_security_protocol = m
533+
.get_one::<SslProtocol>(Self::KAFKA_SECURITY_PROTOCOL)
534+
.cloned();
533535
self.kafka_partitions = m.get_one::<String>(Self::KAFKA_PARTITIONS).cloned();
534536

535537
self.tls_cert_path = m.get_one::<PathBuf>(Self::TLS_CERT).cloned();

src/handlers/airplane.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ impl FlightService for AirServiceImpl {
157157
.ok_or_else(|| Status::aborted("Malformed SQL Provided, Table Name Not Found"))?
158158
.to_owned();
159159

160-
update_schema_when_distributed(streams)
160+
update_schema_when_distributed(&streams)
161161
.await
162162
.map_err(|err| Status::internal(err.to_string()))?;
163163

@@ -212,7 +212,7 @@ impl FlightService for AirServiceImpl {
212212

213213
let permissions = Users.get_permissions(&key);
214214

215-
authorize_and_set_filter_tags(&mut query, permissions, &stream_name).map_err(|_| {
215+
authorize_and_set_filter_tags(&mut query, permissions, &streams).map_err(|_| {
216216
Status::permission_denied("User Does not have permission to access this")
217217
})?;
218218
let time = Instant::now();

src/handlers/http/logstream.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,12 @@ use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
3232
use crate::metadata::STREAM_INFO;
3333
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
3434
use crate::option::{Mode, CONFIG};
35+
use crate::rbac::role::Action;
36+
use crate::rbac::Users;
3537
use crate::stats::{event_labels_date, storage_size_labels_date, Stats};
3638
use crate::storage::StreamType;
3739
use crate::storage::{retention::Retention, StorageDir, StreamInfo};
40+
use crate::utils::actix::extract_session_key_from_req;
3841
use crate::{event, stats};
3942

4043
use crate::{metadata, validator};
@@ -46,6 +49,7 @@ use arrow_schema::{Field, Schema};
4649
use bytes::Bytes;
4750
use chrono::Utc;
4851
use http::{HeaderName, HeaderValue};
52+
use itertools::Itertools;
4953
use serde_json::Value;
5054
use std::collections::HashMap;
5155
use std::fs;
@@ -85,16 +89,27 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
8589
Ok((format!("log stream {stream_name} deleted"), StatusCode::OK))
8690
}
8791

88-
pub async fn list(_: HttpRequest) -> impl Responder {
89-
//list all streams from storage
92+
pub async fn list(req: HttpRequest) -> Result<impl Responder, StreamError> {
93+
let key = extract_session_key_from_req(&req)
94+
.map_err(|err| StreamError::Anyhow(anyhow::Error::msg(err.to_string())))?;
95+
96+
// list all streams from storage
9097
let res = CONFIG
9198
.storage()
9299
.get_object_store()
93100
.list_streams()
94101
.await
95-
.unwrap();
102+
.unwrap()
103+
.into_iter()
104+
.filter(|logstream| {
105+
warn!("logstream-\n{logstream:?}");
106+
107+
Users.authorize(key.clone(), Action::ListStream, Some(&logstream.name), None)
108+
== crate::rbac::Response::Authorized
109+
})
110+
.collect_vec();
96111

97-
web::Json(res)
112+
Ok(web::Json(res))
98113
}
99114

100115
pub async fn detect_schema(body: Bytes) -> Result<impl Responder, StreamError> {
@@ -129,7 +144,7 @@ pub async fn schema(req: HttpRequest) -> Result<impl Responder, StreamError> {
129144
}
130145
Err(err) => return Err(StreamError::from(err)),
131146
};
132-
match update_schema_when_distributed(vec![stream_name.clone()]).await {
147+
match update_schema_when_distributed(&vec![stream_name.clone()]).await {
133148
Ok(_) => {
134149
let schema = STREAM_INFO.schema(&stream_name)?;
135150
Ok((web::Json(schema), StatusCode::OK))

src/handlers/http/query.rs

Lines changed: 36 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use std::collections::HashMap;
2929
use std::pin::Pin;
3030
use std::sync::Arc;
3131
use std::time::Instant;
32-
use tracing::error;
32+
use tracing::{error, trace};
3333

3434
use crate::event::error::EventError;
3535
use crate::handlers::http::fetch_schema;
@@ -90,7 +90,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
9090
let _ = raw_logical_plan.visit(&mut visitor);
9191

9292
let tables = visitor.into_inner();
93-
update_schema_when_distributed(tables).await?;
93+
update_schema_when_distributed(&tables).await?;
9494
let mut query: LogicalQuery = into_query(&query_request, &session_state, time_range).await?;
9595

9696
let creds = extract_session_key_from_req(&req)?;
@@ -100,7 +100,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
100100
.first_table_name()
101101
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?;
102102

103-
authorize_and_set_filter_tags(&mut query, permissions, &table_name)?;
103+
authorize_and_set_filter_tags(&mut query, permissions, &tables)?;
104104

105105
let time = Instant::now();
106106
let (records, fields) = query.execute(table_name.clone()).await?;
@@ -122,14 +122,14 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
122122
Ok(response)
123123
}
124124

125-
pub async fn update_schema_when_distributed(tables: Vec<String>) -> Result<(), QueryError> {
125+
pub async fn update_schema_when_distributed(tables: &Vec<String>) -> Result<(), QueryError> {
126126
if CONFIG.parseable.mode == Mode::Query {
127127
for table in tables {
128-
if let Ok(new_schema) = fetch_schema(&table).await {
128+
if let Ok(new_schema) = fetch_schema(table).await {
129129
// commit schema merges the schema internally and updates the schema in storage.
130-
commit_schema_to_storage(&table, new_schema.clone()).await?;
130+
commit_schema_to_storage(table, new_schema.clone()).await?;
131131

132-
commit_schema(&table, Arc::new(new_schema))?;
132+
commit_schema(table, Arc::new(new_schema))?;
133133
}
134134
}
135135
}
@@ -153,41 +153,46 @@ pub async fn create_streams_for_querier() {
153153
}
154154
}
155155

156+
// check auth for each table in the tables vector
156157
pub fn authorize_and_set_filter_tags(
157158
query: &mut LogicalQuery,
158159
permissions: Vec<Permission>,
159-
table_name: &str,
160+
tables: &Vec<String>,
160161
) -> Result<(), QueryError> {
161162
// check authorization of this query if it references physical table;
162163
let mut authorized = false;
163-
let mut tags = Vec::new();
164-
165-
// in permission check if user can run query on the stream.
166-
// also while iterating add any filter tags for this stream
167-
for permission in permissions {
168-
match permission {
169-
Permission::Stream(Action::All, _) => {
170-
authorized = true;
171-
break;
172-
}
173-
Permission::StreamWithTag(Action::Query, ref stream, tag)
174-
if stream == table_name || stream == "*" =>
175-
{
176-
authorized = true;
177-
if let Some(tag) = tag {
178-
tags.push(tag)
164+
165+
trace!("table names in auth- {tables:?}");
166+
for table_name in tables.iter() {
167+
let mut tags = Vec::new();
168+
169+
// in permission check if user can run query on the stream.
170+
// also while iterating add any filter tags for this stream
171+
for permission in &permissions {
172+
match permission {
173+
Permission::Stream(Action::All, _) => {
174+
authorized = true;
175+
break;
179176
}
177+
Permission::StreamWithTag(Action::Query, ref stream, tag)
178+
if stream == table_name || stream == "*" =>
179+
{
180+
authorized = true;
181+
if let Some(tag) = tag {
182+
tags.push(tag.clone())
183+
}
184+
}
185+
_ => (),
180186
}
181-
_ => (),
182187
}
183-
}
184188

185-
if !authorized {
186-
return Err(QueryError::Unauthorized);
187-
}
189+
if !authorized {
190+
return Err(QueryError::Unauthorized);
191+
}
188192

189-
if !tags.is_empty() {
190-
query.filter_tag = Some(tags)
193+
if !tags.is_empty() {
194+
query.filter_tag = Some(tags)
195+
}
191196
}
192197

193198
Ok(())

src/rbac/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use self::map::SessionKey;
3333
use self::role::{Permission, RoleBuilder};
3434
use self::user::UserType;
3535

36+
#[derive(PartialEq)]
3637
pub enum Response {
3738
Authorized,
3839
UnAuthorized,

src/rbac/role.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,6 @@ impl RoleBuilder {
117117
| Action::CreateStream
118118
| Action::DeleteStream
119119
| Action::GetStreamInfo
120-
| Action::ListStream
121120
| Action::ListCluster
122121
| Action::ListClusterMetrics
123122
| Action::Deleteingestor
@@ -134,6 +133,7 @@ impl RoleBuilder {
134133
| Action::DeleteFilter
135134
| Action::GetAnalytics => Permission::Unit(action),
136135
Action::Ingest
136+
| Action::ListStream
137137
| Action::GetSchema
138138
| Action::DetectSchema
139139
| Action::GetStats

src/storage/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ impl ObjectStoreFormat {
202202
}
203203
}
204204

205-
#[derive(serde::Serialize, PartialEq)]
205+
#[derive(serde::Serialize, PartialEq, Debug)]
206206
pub struct LogStream {
207207
pub name: String,
208208
}

0 commit comments

Comments
 (0)