Skip to content

Commit b3ab9f8

Browse files
committed
fixes for quest OSS
1 parent 9802cbf commit b3ab9f8

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+1565
-640
lines changed

src/alerts/alert_structs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -713,7 +713,7 @@ pub struct DailyMTTRStats {
713713
pub struct MTTRHistory {
714714
/// Array of daily MTTR statistics
715715
pub daily_stats: Vec<DailyMTTRStats>,
716-
pub tenant_id: Option<String>
716+
pub tenant_id: Option<String>,
717717
}
718718

719719
/// Query parameters for MTTR API endpoint

src/alerts/alert_traits.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,27 +87,38 @@ pub trait AlertManagerTrait: Send + Sync {
8787
session: SessionKey,
8888
tags: Vec<String>,
8989
) -> Result<Vec<AlertConfig>, AlertError>;
90-
async fn get_alert_by_id(&self, id: Ulid, tenant_id: &Option<String>) -> Result<Box<dyn AlertTrait>, AlertError>;
90+
async fn get_alert_by_id(
91+
&self,
92+
id: Ulid,
93+
tenant_id: &Option<String>,
94+
) -> Result<Box<dyn AlertTrait>, AlertError>;
9195
async fn update(&self, alert: &dyn AlertTrait);
9296
async fn update_state(
9397
&self,
9498
alert_id: Ulid,
9599
new_state: AlertState,
96100
trigger_notif: Option<String>,
97-
tenant_id: &Option<String>
101+
tenant_id: &Option<String>,
98102
) -> Result<(), AlertError>;
99103
async fn update_notification_state(
100104
&self,
101105
alert_id: Ulid,
102106
new_notification_state: NotificationState,
103-
tenant_id: &Option<String>
107+
tenant_id: &Option<String>,
104108
) -> Result<(), AlertError>;
105109
async fn delete(&self, alert_id: Ulid, tenant_id: &Option<String>) -> Result<(), AlertError>;
106-
async fn get_state(&self, alert_id: Ulid, tenant_id: &Option<String>) -> Result<AlertState, AlertError>;
110+
async fn get_state(
111+
&self,
112+
alert_id: Ulid,
113+
tenant_id: &Option<String>,
114+
) -> Result<AlertState, AlertError>;
107115
async fn start_task(&self, alert: Box<dyn AlertTrait>) -> Result<(), AlertError>;
108116
async fn delete_task(&self, alert_id: Ulid) -> Result<(), AlertError>;
109117
async fn list_tags(&self, tenant_id: &Option<String>) -> Vec<String>;
110-
async fn get_all_alerts(&self, tenant_id: &Option<String>) -> HashMap<Ulid, Box<dyn AlertTrait>>;
118+
async fn get_all_alerts(
119+
&self,
120+
tenant_id: &Option<String>,
121+
) -> HashMap<Ulid, Box<dyn AlertTrait>>;
111122
}
112123

113124
#[async_trait]

src/alerts/alert_types.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,11 +110,11 @@ impl AlertTrait for ThresholdAlert {
110110
// save that user's basic auth
111111
// use that to send request
112112
None
113-
},
113+
}
114114
crate::rbac::user::UserType::OAuth(_) => {
115115
tracing::warn!("admin user is oauth");
116116
None
117-
},
117+
}
118118
}
119119
} else {
120120
None

src/alerts/alerts_utils.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ pub async fn execute_alert_query(
8282
) -> Result<AlertQueryResult, AlertError> {
8383
match PARSEABLE.options.mode {
8484
Mode::All | Mode::Query => execute_local_query(query, time_range, tenant_id).await,
85-
Mode::Prism => execute_remote_query(auth_token, query, time_range).await,
85+
Mode::Prism => execute_remote_query(auth_token, query, time_range, tenant_id).await,
8686
_ => Err(AlertError::CustomError(format!(
8787
"Unsupported mode '{:?}' for alert evaluation",
8888
PARSEABLE.options.mode
@@ -131,6 +131,7 @@ async fn execute_remote_query(
131131
auth_token: Option<String>,
132132
query: &str,
133133
time_range: &TimeRange,
134+
tenant_id: &Option<String>,
134135
) -> Result<AlertQueryResult, AlertError> {
135136
let session_state = QUERY_SESSION.get_ctx().state();
136137
let raw_logical_plan = session_state.create_logical_plan(query).await?;
@@ -145,7 +146,7 @@ async fn execute_remote_query(
145146
filter_tags: None,
146147
};
147148

148-
let (result_value, _) = send_query_request(None,&query_request)
149+
let (result_value, _) = send_query_request(None, &query_request, tenant_id)
149150
.await
150151
.map_err(|err| AlertError::CustomError(format!("Failed to send query request: {err}")))?;
151152

src/alerts/mod.rs

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1043,7 +1043,11 @@ impl AlertManagerTrait for Alerts {
10431043
let mut map = self.alerts.write().await;
10441044

10451045
for (tenant_id, raw_bytes) in raw_objects {
1046-
let tenant = &Some(tenant_id.clone());
1046+
let tenant = if tenant_id.is_empty() {
1047+
&None
1048+
} else {
1049+
&Some(tenant_id.clone())
1050+
};
10471051
for alert_bytes in raw_bytes {
10481052
// First, try to parse as JSON Value to check version
10491053
let json_value: JsonValue = match serde_json::from_slice(&alert_bytes) {
@@ -1233,25 +1237,17 @@ impl AlertManagerTrait for Alerts {
12331237
"No alert found for the given ID- {id}"
12341238
)))
12351239
}
1236-
// if let Some(alert) = read_access.get(&id) {
1237-
// Ok(alert.clone_box())
1238-
// } else {
1239-
// Err(AlertError::CustomError(format!(
1240-
// "No alert found for the given ID- {id}"
1241-
// )))
1242-
// }
12431240
}
12441241

12451242
/// Update the in-mem vector of alerts
12461243
async fn update(&self, alert: &dyn AlertTrait) {
12471244
let tenant = alert.get_tenant_id().as_ref().map_or(DEFAULT_TENANT, |v| v);
1248-
if let Some(alerts) = self.alerts.write().await.get_mut(tenant) {
1249-
alerts.insert(*alert.get_id(), alert.clone_box());
1250-
}
1251-
// self.alerts
1252-
// .write()
1253-
// .await
1254-
// .insert(*alert.get_id(), alert.clone_box());
1245+
self.alerts
1246+
.write()
1247+
.await
1248+
.entry(tenant.to_owned())
1249+
.or_default()
1250+
.insert(*alert.get_id(), alert.clone_box());
12551251
}
12561252

12571253
/// Update the state of alert

src/alerts/target.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,8 @@ impl Target {
341341

342342
match retry {
343343
Retry::Infinite => loop {
344-
let current_state = if let Ok(state) = alerts.get_state(alert_id, &tenant_id).await
344+
let current_state = if let Ok(state) =
345+
alerts.get_state(alert_id, &tenant_id).await
345346
{
346347
state
347348
} else {

src/analytics.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,8 @@ impl Report {
119119

120120
// check liveness of indexers
121121
// get the count of active and inactive indexers
122-
let indexer_infos: Vec<NodeMetadata> = cluster::get_node_info(NodeType::Indexer).await?;
122+
let indexer_infos: Vec<NodeMetadata> =
123+
cluster::get_node_info(NodeType::Indexer, &None).await?;
123124
for indexer in indexer_infos {
124125
if check_liveness(&indexer.domain_name).await {
125126
active_indexers += 1;
@@ -130,7 +131,8 @@ impl Report {
130131

131132
// check liveness of queriers
132133
// get the count of active and inactive queriers
133-
let query_infos: Vec<NodeMetadata> = cluster::get_node_info(NodeType::Querier).await?;
134+
let query_infos: Vec<NodeMetadata> =
135+
cluster::get_node_info(NodeType::Querier, &None).await?;
134136
for query in query_infos {
135137
if check_liveness(&query.domain_name).await {
136138
active_queriers += 1;
@@ -274,7 +276,8 @@ async fn fetch_ingestors_metrics()
274276
// send analytics for ingest servers
275277

276278
// ingestor infos should be valid here, if not some thing is wrong
277-
let ingestor_infos: Vec<NodeMetadata> = cluster::get_node_info(NodeType::Ingestor).await?;
279+
let ingestor_infos: Vec<NodeMetadata> =
280+
cluster::get_node_info(NodeType::Ingestor, &None).await?;
278281

279282
for im in ingestor_infos {
280283
if !check_liveness(&im.domain_name).await {

src/banner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ fn status_info(config: &Parseable, scheme: &str, id: Uid) {
9595
/// - Store (path where the data is stored and its latency)
9696
async fn storage_info(config: &Parseable) {
9797
let storage = config.storage();
98-
let latency = storage.get_object_store().get_latency().await;
98+
let latency = storage.get_object_store().get_latency(&None).await;
9999

100100
eprintln!(
101101
"

src/catalog/mod.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use crate::{
3535
},
3636
metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE},
3737
option::Mode,
38-
parseable::PARSEABLE,
38+
parseable::{DEFAULT_TENANT, PARSEABLE},
3939
query::PartialTimeFilter,
4040
stats::{event_labels_date, get_current_stats, storage_size_labels_date, update_deleted_stats},
4141
storage::{
@@ -187,10 +187,14 @@ fn create_partition_bounds(lower_bound: DateTime<Utc>) -> (DateTime<Utc>, DateTi
187187
}
188188

189189
/// Extracts statistics from live metrics for a given partition date
190-
fn extract_partition_metrics(stream_name: &str, partition_lower: DateTime<Utc>) -> (u64, u64, u64) {
190+
fn extract_partition_metrics(
191+
stream_name: &str,
192+
partition_lower: DateTime<Utc>,
193+
tenant_id: &str,
194+
) -> (u64, u64, u64) {
191195
let date_str = partition_lower.date_naive().to_string();
192-
let event_labels = event_labels_date(stream_name, "json", &date_str);
193-
let storage_labels = storage_size_labels_date(stream_name, &date_str);
196+
let event_labels = event_labels_date(stream_name, "json", &date_str, tenant_id);
197+
let storage_labels = storage_size_labels_date(stream_name, &date_str, tenant_id);
194198

195199
let events_ingested = EVENTS_INGESTED_DATE
196200
.get_metric_with_label_values(&event_labels)
@@ -220,8 +224,11 @@ async fn process_partition_groups(
220224
let mut new_manifest_entries = Vec::new();
221225

222226
for ((partition_lower, _partition_upper), partition_changes) in partition_groups {
223-
let (events_ingested, ingestion_size, storage_size) =
224-
extract_partition_metrics(stream_name, partition_lower);
227+
let (events_ingested, ingestion_size, storage_size) = extract_partition_metrics(
228+
stream_name,
229+
partition_lower,
230+
tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v),
231+
);
225232

226233
let manifest_entry = process_single_partition(
227234
partition_lower,
@@ -530,7 +537,7 @@ pub async fn remove_manifest_from_snapshot(
530537
let stream_name_clone = stream_name.to_string();
531538
let dates_clone = dates.clone();
532539

533-
for_each_live_node(move |ingestor| {
540+
for_each_live_node(tenant_id, move |ingestor| {
534541
let stream_name = stream_name_clone.clone();
535542
let dates = dates_clone.clone();
536543
async move {

src/event/format/json.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ impl EventFormat for Event {
157157
stream_type: StreamType,
158158
p_custom_fields: &HashMap<String, String>,
159159
telemetry_type: TelemetryType,
160-
tenant_id: &Option<String>
160+
tenant_id: &Option<String>,
161161
) -> Result<super::Event, anyhow::Error> {
162162
let custom_partition_values = match custom_partitions.as_ref() {
163163
Some(custom_partition) => {
@@ -191,7 +191,7 @@ impl EventFormat for Event {
191191
custom_partition_values,
192192
stream_type,
193193
telemetry_type,
194-
tenant_id: tenant_id.to_owned()
194+
tenant_id: tenant_id.to_owned(),
195195
})
196196
}
197197
}

0 commit comments

Comments
 (0)