Skip to content

Commit be1a48b

Browse files
committed
fixes for quest OSS
1 parent 9802cbf commit be1a48b

File tree

9 files changed

+74
-53
lines changed

9 files changed

+74
-53
lines changed

src/handlers/http/correlation.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ pub async fn delete(
108108
.map_err(|err| CorrelationError::AnyhowError(Error::msg(err.to_string())))?;
109109

110110
CORRELATIONS
111-
.delete(&correlation_id, &user_id, &Some(tenant_id))
111+
.delete(&correlation_id, &user_id, &tenant_id)
112112
.await?;
113113

114114
Ok(HttpResponse::Ok().finish())

src/handlers/http/middleware.rs

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use crate::{
3434
STREAM_NAME_HEADER_KEY, http::modal::OIDC_CLIENT,
3535
},
3636
option::Mode,
37-
parseable::PARSEABLE,
37+
parseable::{DEFAULT_TENANT, PARSEABLE},
3838
rbac::{
3939
EXPIRY_DURATION,
4040
map::{SessionKey, mut_sessions, mut_users, sessions, users},
@@ -167,10 +167,13 @@ where
167167
// append tenant id if present
168168
let user_and_tenant_id = match get_user_and_tenant_from_request(req.request()) {
169169
Ok((uid, tid)) => {
170-
req.headers_mut().insert(
171-
HeaderName::from_static("tenant"),
172-
HeaderValue::from_str(&tid).unwrap(),
173-
);
170+
if tid.is_some() {
171+
req.headers_mut().insert(
172+
HeaderName::from_static("tenant"),
173+
HeaderValue::from_str(&tid.as_ref().unwrap()).unwrap(),
174+
);
175+
}
176+
174177
Ok((uid, tid))
175178
}
176179
Err(e) => Err(e),
@@ -197,7 +200,8 @@ where
197200
&& let Ok((userid, tenant_id)) = user_and_tenant_id
198201
{
199202
let bearer_to_refresh = {
200-
if let Some(users) = users().get(&tenant_id)
203+
if let Some(users) =
204+
users().get(tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v))
201205
&& let Some(user) = users.get(&userid)
202206
{
203207
match &user.ty {
@@ -244,7 +248,8 @@ where
244248

245249
let user_roles = {
246250
let mut users_guard = mut_users();
247-
if let Some(users) = users_guard.get_mut(&tenant_id)
251+
if let Some(users) = users_guard
252+
.get_mut(tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v))
248253
&& let Some(user) = users.get_mut(&userid)
249254
{
250255
if let user::UserType::OAuth(oauth) = &mut user.ty {
@@ -262,18 +267,25 @@ where
262267
userid.clone(),
263268
key.clone(),
264269
Utc::now() + expires_in,
265-
roles_to_permission(user_roles, &tenant_id),
266-
&Some(tenant_id),
270+
roles_to_permission(
271+
user_roles,
272+
tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v),
273+
),
274+
&tenant_id,
267275
);
268-
} else if let Some(users) = users().get(&tenant_id)
276+
} else if let Some(users) =
277+
users().get(tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v))
269278
&& let Some(user) = users.get(&userid)
270279
{
271280
mut_sessions().track_new(
272281
userid.clone(),
273282
key.clone(),
274283
Utc::now() + EXPIRY_DURATION,
275-
roles_to_permission(user.roles(), &tenant_id),
276-
&Some(tenant_id),
284+
roles_to_permission(
285+
user.roles(),
286+
tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v),
287+
),
288+
&tenant_id,
277289
);
278290
}
279291
}

src/handlers/http/users/dashboards.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ pub async fn create_dashboard(
100100
let user_id = get_hash(&user_id);
101101

102102
DASHBOARDS
103-
.create(&user_id, &mut dashboard, &Some(tenant_id))
103+
.create(&user_id, &mut dashboard, &tenant_id)
104104
.await?;
105105
Ok((web::Json(dashboard), StatusCode::OK))
106106
}
@@ -114,9 +114,8 @@ pub async fn update_dashboard(
114114
let user_id = get_hash(&user_id);
115115
let dashboard_id = validate_dashboard_id(dashboard_id.into_inner())?;
116116
let is_admin = is_admin(&req).map_err(|e| DashboardError::Custom(e.to_string()))?;
117-
let tenant_id = &Some(tenant_id);
118117
let mut existing_dashboard = DASHBOARDS
119-
.get_dashboard_by_user(dashboard_id, &user_id, is_admin, tenant_id)
118+
.get_dashboard_by_user(dashboard_id, &user_id, is_admin, &tenant_id)
120119
.await
121120
.ok_or(DashboardError::Metadata(
122121
"Dashboard does not exist or user is not authorized",
@@ -189,7 +188,7 @@ pub async fn update_dashboard(
189188
};
190189

191190
DASHBOARDS
192-
.update(&user_id, dashboard_id, &mut final_dashboard, tenant_id)
191+
.update(&user_id, dashboard_id, &mut final_dashboard, &tenant_id)
193192
.await?;
194193

195194
Ok((web::Json(final_dashboard), StatusCode::OK))
@@ -206,7 +205,7 @@ pub async fn delete_dashboard(
206205
let dashboard_id = validate_dashboard_id(dashboard_id.into_inner())?;
207206

208207
DASHBOARDS
209-
.delete_dashboard(&user_id, dashboard_id, is_admin, &Some(tenant_id))
208+
.delete_dashboard(&user_id, dashboard_id, is_admin, &tenant_id)
210209
.await?;
211210

212211
Ok(HttpResponse::Ok().finish())
@@ -223,12 +222,11 @@ pub async fn add_tile(
223222

224223
let (user_id, tenant_id) = get_user_and_tenant_from_request(&req)?;
225224
let user_id = get_hash(&user_id);
226-
let tenant_id = &Some(tenant_id);
227225
let dashboard_id = validate_dashboard_id(dashboard_id.into_inner())?;
228226
let is_admin = is_admin(&req).map_err(|e| DashboardError::Custom(e.to_string()))?;
229227

230228
let mut dashboard = DASHBOARDS
231-
.get_dashboard_by_user(dashboard_id, &user_id, is_admin, tenant_id)
229+
.get_dashboard_by_user(dashboard_id, &user_id, is_admin, &tenant_id)
232230
.await
233231
.ok_or(DashboardError::Unauthorized)?;
234232

@@ -241,7 +239,7 @@ pub async fn add_tile(
241239
tiles.push(tile);
242240

243241
DASHBOARDS
244-
.update(&user_id, dashboard_id, &mut dashboard, tenant_id)
242+
.update(&user_id, dashboard_id, &mut dashboard, &tenant_id)
245243
.await?;
246244

247245
Ok((web::Json(dashboard), StatusCode::OK))

src/handlers/http/users/filters.rs

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ pub async fn get(
5050
let filter_id = filter_id.into_inner();
5151
let is_admin = is_admin(&req).map_err(|e| FiltersError::Custom(e.to_string()))?;
5252
if let Some(filter) = FILTERS
53-
.get_filter(&filter_id, &get_hash(&user_id), is_admin, &Some(tenant_id))
53+
.get_filter(&filter_id, &get_hash(&user_id), is_admin, &tenant_id)
5454
.await
5555
{
5656
return Ok((web::Json(filter), StatusCode::OK));
@@ -71,9 +71,8 @@ pub async fn post(
7171
filter.filter_id = Some(filter_id.clone());
7272
filter.user_id = Some(user_id.clone());
7373
filter.version = Some(CURRENT_FILTER_VERSION.to_string());
74-
let tenant_id = &Some(tenant_id);
75-
PARSEABLE.metastore.put_filter(&filter, tenant_id).await?;
76-
FILTERS.update(&filter, tenant_id).await;
74+
PARSEABLE.metastore.put_filter(&filter, &tenant_id).await?;
75+
FILTERS.update(&filter, &tenant_id).await;
7776

7877
Ok((web::Json(filter), StatusCode::OK))
7978
}
@@ -88,9 +87,8 @@ pub async fn update(
8887
let filter_id = filter_id.into_inner();
8988
let is_admin = is_admin(&req).map_err(|e| FiltersError::Custom(e.to_string()))?;
9089

91-
let tenant_id = &Some(tenant_id);
9290
if FILTERS
93-
.get_filter(&filter_id, &user_id, is_admin, tenant_id)
91+
.get_filter(&filter_id, &user_id, is_admin, &tenant_id)
9492
.await
9593
.is_none()
9694
{
@@ -102,8 +100,8 @@ pub async fn update(
102100
filter.user_id = Some(user_id.clone());
103101
filter.version = Some(CURRENT_FILTER_VERSION.to_string());
104102

105-
PARSEABLE.metastore.put_filter(&filter, tenant_id).await?;
106-
FILTERS.update(&filter, tenant_id).await;
103+
PARSEABLE.metastore.put_filter(&filter, &tenant_id).await?;
104+
FILTERS.update(&filter, &tenant_id).await;
107105

108106
Ok((web::Json(filter), StatusCode::OK))
109107
}
@@ -115,20 +113,19 @@ pub async fn delete(
115113
let (mut user_id, tenant_id) = get_user_and_tenant_from_request(&req)?;
116114
user_id = get_hash(&user_id);
117115
let filter_id = filter_id.into_inner();
118-
let tenant_id = &Some(tenant_id);
119116
let is_admin = is_admin(&req).map_err(|e| FiltersError::Custom(e.to_string()))?;
120117
let filter = FILTERS
121-
.get_filter(&filter_id, &user_id, is_admin, tenant_id)
118+
.get_filter(&filter_id, &user_id, is_admin, &tenant_id)
122119
.await
123120
.ok_or(FiltersError::Metadata(
124121
"Filter does not exist or user is not authorized",
125122
))?;
126123

127124
PARSEABLE
128125
.metastore
129-
.delete_filter(&filter, tenant_id)
126+
.delete_filter(&filter, &tenant_id)
130127
.await?;
131-
FILTERS.delete_filter(&filter_id, tenant_id).await;
128+
FILTERS.delete_filter(&filter_id, &tenant_id).await;
132129

133130
Ok(HttpResponse::Ok().finish())
134131
}

src/parseable/mod.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@ use std::{
2525
sync::{Arc, RwLock},
2626
};
2727

28-
use actix_web::http::{StatusCode, header::{CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue}};
28+
use actix_web::http::{
29+
StatusCode,
30+
header::{CONTENT_TYPE, HeaderMap, HeaderName, HeaderValue},
31+
};
2932
use arrow_schema::{Field, Schema};
3033
use bytes::Bytes;
3134
use chrono::Utc;
@@ -386,6 +389,8 @@ impl Parseable {
386389
storage.create_stream_from_ingestor(stream_name, tenant_id),
387390
storage.create_schema_from_metastore(stream_name, tenant_id)
388391
)?;
392+
tracing::warn!(stream_metadata_bytes=?stream_metadata_bytes);
393+
tracing::warn!(schema_bytes=?schema_bytes);
389394

390395
let stream_metadata = if stream_metadata_bytes.is_empty() {
391396
ObjectStoreFormat::default()
@@ -452,7 +457,7 @@ impl Parseable {
452457
if let Some(hot_tier_config) = hot_tier {
453458
stream.set_hot_tier(Some(hot_tier_config));
454459
}
455-
460+
tracing::warn!(commit_schema=?schema);
456461
// commit schema in memory
457462
commit_schema(stream_name, schema, tenant_id).map_err(|e| StreamError::Anyhow(e.into()))?;
458463

@@ -556,8 +561,8 @@ impl Parseable {
556561
}
557562

558563
// For distributed deployments, if the stream not found in memory map,
559-
//check if it exists in the storage
560-
//create stream and schema from storage
564+
// check if it exists in the storage
565+
// create stream and schema from storage
561566
if self.options.mode != Mode::All
562567
&& self
563568
.create_stream_and_schema_from_storage(stream_name, tenant_id)
@@ -794,6 +799,7 @@ impl Parseable {
794799
// Proceed to create log stream if it doesn't exist
795800
let storage = self.storage.get_object_store();
796801

802+
// update owner and permissions
797803
let meta = ObjectStoreFormat {
798804
created_at: Utc::now().to_rfc3339(),
799805
permissions: vec![Permisssion::new(PARSEABLE.options.username.clone())],
@@ -812,12 +818,16 @@ impl Parseable {
812818
..Default::default()
813819
};
814820

821+
tracing::warn!(meta=?meta);
815822
match storage
816823
.create_stream(&stream_name, meta, schema.clone(), tenant_id)
817824
.await
818825
{
819826
Ok(created_at) => {
820827
tracing::warn!(created_stream_at=?created_at);
828+
tracing::warn!(stream_name=?stream_name);
829+
tracing::warn!(schema=?schema);
830+
tracing::warn!(tenant_id=?tenant_id);
821831
let mut static_schema: HashMap<String, Arc<Field>> = HashMap::new();
822832

823833
for (field_name, field) in schema
@@ -1181,7 +1191,8 @@ impl Parseable {
11811191
}
11821192

11831193
pub fn list_tenants(&self) -> Option<Vec<String>> {
1184-
if let Ok(t) = self.tenants.as_ref().read() {
1194+
if let Ok(t) = self.tenants.as_ref().read()
1195+
&& !t.is_empty() {
11851196
let t = t.clone();
11861197
Some(t)
11871198
} else {

src/parseable/streams.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,7 @@ impl Stream {
646646
let schema = Arc::new(merged_schema);
647647

648648
let part_path = parquet_path.with_extension("part");
649+
tracing::warn!(part_path=?part_path);
649650
if !self.write_parquet_part_file(
650651
&part_path,
651652
record_reader,
@@ -1164,6 +1165,8 @@ impl Streams {
11641165
} else {
11651166
vec![DEFAULT_TENANT.to_owned()]
11661167
};
1168+
tracing::warn!(flush_and_convert_tenants=?tenants);
1169+
tracing::warn!(parseable_streams_tenants=?self.read().unwrap().keys());
11671170
for tenant_id in tenants {
11681171
let guard = self.read().expect(LOCK_EXPECT);
11691172
let streams: Vec<Arc<Stream>> = if let Some(tenant_streams) = guard.get(&tenant_id) {
@@ -1176,12 +1179,6 @@ impl Streams {
11761179
.spawn(async move { stream.flush_and_convert(init_signal, shutdown_signal) });
11771180
}
11781181
}
1179-
// let streams: Vec<Arc<Stream>> = self
1180-
// .read()
1181-
// .expect(LOCK_EXPECT)
1182-
// .values()
1183-
// .map(Arc::clone)
1184-
// .collect();
11851182
}
11861183
}
11871184

src/rbac/user.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ pub fn get_super_admin_user() -> User {
180180
}),
181181
roles: ["super-admin".to_string()].into(),
182182
user_groups: HashSet::new(),
183-
tenant: Some(DEFAULT_TENANT.to_owned()),
183+
tenant: None,
184184
}
185185
}
186186

src/storage/object_storage.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ use crate::metrics::increment_parquets_stored_by_date;
5353
use crate::metrics::increment_parquets_stored_size_by_date;
5454
use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE};
5555
use crate::option::Mode;
56+
use crate::parseable::DEFAULT_TENANT;
5657
use crate::parseable::{LogStream, PARSEABLE, Stream};
5758
use crate::stats::FullStats;
5859
use crate::storage::SETTINGS_ROOT_DIRECTORY;
@@ -630,7 +631,9 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
630631
.map_err(|e| ObjectStorageError::MetastoreError(Box::new(e.to_detail())))?;
631632
return Ok(stream_metadata_bytes);
632633
}
633-
tracing::warn!("unable to find stream- {stream_name} with tenant- {tenant_id:?} in PARSEABLE.get_stream");
634+
tracing::warn!(
635+
"unable to find stream- {stream_name} with tenant- {tenant_id:?} in PARSEABLE.get_stream"
636+
);
634637
let mut all_log_sources: Vec<LogSourceEntry> = Vec::new();
635638

636639
if let Some(stream_metadata_obs) = PARSEABLE
@@ -1055,6 +1058,7 @@ async fn process_schema_files(
10551058
tenant_id: &Option<String>,
10561059
) -> Result<(), ObjectStorageError> {
10571060
for path in upload_context.stream.schema_files() {
1061+
tracing::warn!(upload_context_schema_files=?path);
10581062
let file = File::open(&path)?;
10591063
let schema: Schema = serde_json::from_reader(file)?;
10601064
commit_schema_to_storage(stream_name, schema, tenant_id).await?;
@@ -1079,7 +1083,9 @@ fn stream_relative_path(
10791083
let custom_partition_list = custom_partition_fields.split(',').collect::<Vec<&str>>();
10801084
file_suffix = str::replacen(filename, ".", "/", 3 + custom_partition_list.len());
10811085
}
1082-
if let Some(tenant) = tenant_id {
1086+
if let Some(tenant) = tenant_id
1087+
&& !tenant.eq(DEFAULT_TENANT)
1088+
{
10831089
format!("{tenant}/{stream_name}/{file_suffix}")
10841090
} else {
10851091
format!("{stream_name}/{file_suffix}")

src/utils/mod.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,22 +58,22 @@ pub fn extract_datetime(path: &str) -> Option<NaiveDateTime> {
5858
}
5959
}
6060

61-
pub fn get_user_and_tenant_from_request(req: &HttpRequest) -> Result<(String, String), RBACError> {
61+
pub fn get_user_and_tenant_from_request(
62+
req: &HttpRequest,
63+
) -> Result<(String, Option<String>), RBACError> {
6264
let session_key = extract_session_key_from_req(req).map_err(|_| RBACError::UserDoesNotExist)?;
6365
match &session_key {
6466
SessionKey::BasicAuth { username, password } => {
65-
if let Some(user) = Users.get_user_from_basic(&username, &password)
66-
&& let Some(tenant) = user.tenant
67-
{
68-
return Ok((username.clone(), tenant));
67+
if let Some(user) = Users.get_user_from_basic(&username, &password) {
68+
return Ok((username.clone(), user.tenant.clone()));
6969
}
7070
}
7171
SessionKey::SessionId(_) => {}
7272
}
7373
let Some((user_id, tenant_id)) = Users.get_userid_from_session(&session_key) else {
7474
return Err(RBACError::UserDoesNotExist);
7575
};
76-
Ok((user_id, tenant_id))
76+
Ok((user_id, Some(tenant_id)))
7777
}
7878

7979
pub fn get_tenant_id_from_request(req: &HttpRequest) -> Option<String> {

0 commit comments

Comments
 (0)