Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion server/src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ async fn fetch_ingestors_metrics(
// send analytics for ingest servers

// ingestor infos should be valid here, if not some thing is wrong
let ingestor_infos = cluster::get_ingestor_info().await.unwrap();
let ingestor_infos = cluster::get_ingestor_info_storage().await.unwrap();

for im in ingestor_infos {
if !check_liveness(&im.domain_name).await {
Expand Down
13 changes: 6 additions & 7 deletions server/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,13 +396,12 @@ pub async fn get_first_event(
}
}
Mode::Query => {
let ingestor_metadata =
handlers::http::cluster::get_ingestor_info()
.await
.map_err(|err| {
log::error!("Fatal: failed to get ingestor info: {:?}", err);
ObjectStorageError::from(err)
})?;
let ingestor_metadata = handlers::http::cluster::get_ingestor_info_storage()
.await
.map_err(|err| {
log::error!("Fatal: failed to get ingestor info: {:?}", err);
ObjectStorageError::from(err)
})?;
let mut ingestors_first_event_at: Vec<String> = Vec::new();
for ingestor in ingestor_metadata {
let url = format!(
Expand Down
1 change: 1 addition & 0 deletions server/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ impl Event {
.map_err(PostError::Event)
}

#[allow(unused)]
pub fn clear(&self, stream_name: &str) {
STREAM_WRITERS.clear(stream_name);
}
Expand Down
1 change: 1 addition & 0 deletions server/src/event/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ impl WriterTable {
Ok(())
}

#[allow(unused)]
pub fn clear(&self, stream_name: &str) {
let map = self.write().unwrap();
if let Some(writer) = map.get(stream_name) {
Expand Down
26 changes: 14 additions & 12 deletions server/src/handlers/airplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use futures_util::{Future, TryFutureExt};
use tonic::transport::{Identity, Server, ServerTlsConfig};
use tonic_web::GrpcWebLayer;

use crate::handlers::http::cluster::get_ingestor_info;
use crate::handlers::http::cluster::get_ingestor_info_storage;

use crate::handlers::{CACHE_RESULTS_HEADER_KEY, CACHE_VIEW_HEADER_KEY, USER_ID_HEADER_KEY};
use crate::metrics::QUERY_EXECUTE_TIME;
Expand All @@ -46,8 +46,7 @@ use crate::handlers::http::query::{
use crate::query::{TableScanVisitor, QUERY_SESSION};
use crate::querycache::QueryCacheManager;
use crate::utils::arrow::flight::{
append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc,
send_to_ingester,
get_query_from_ticket, into_flight_data, run_do_get_rpc, send_to_ingester,
};
use arrow_flight::{
flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData,
Expand Down Expand Up @@ -204,6 +203,8 @@ impl FlightService for AirServiceImpl {
.await
.map_err(|_| Status::internal("Failed to parse query"))?;

// this is the flow for getting staging data from Ingestors
// fix this (it sends double records)
let event =
if send_to_ingester(query.start.timestamp_millis(), query.end.timestamp_millis()) {
let sql = format!("select * from {}", &stream_name);
Expand All @@ -216,7 +217,7 @@ impl FlightService for AirServiceImpl {
})
.to_string();

let ingester_metadatas = get_ingestor_info()
let ingester_metadatas = get_ingestor_info_storage()
.await
.map_err(|err| Status::failed_precondition(err.to_string()))?;
let mut minute_result: Vec<RecordBatch> = vec![];
Expand All @@ -226,9 +227,10 @@ impl FlightService for AirServiceImpl {
minute_result.append(&mut batches);
}
}
let mr = minute_result.iter().collect::<Vec<_>>();
let event = append_temporary_events(&stream_name, mr).await?;
Some(event)

// log::warn!("minute_result-\n{mr:?}\n");
// let event = append_temporary_events(&stream_name, mr).await?;
Some(minute_result)
} else {
None
};
Expand All @@ -252,11 +254,15 @@ impl FlightService for AirServiceImpl {
Status::permission_denied("User Does not have permission to access this")
})?;
let time = Instant::now();
let (records, _) = query
let (mut records, _) = query
.execute(stream_name.clone())
.await
.map_err(|err| Status::internal(err.to_string()))?;

// if let Some(event) = event {
// records.extend(event);
// }

if let Err(err) = put_results_in_cache(
cache_results,
user_id,
Expand Down Expand Up @@ -285,10 +291,6 @@ impl FlightService for AirServiceImpl {
*/
let out = into_flight_data(records);

if let Some(event) = event {
event.clear(&stream_name);
}

let time = time.elapsed().as_secs_f64();
QUERY_EXECUTE_TIME
.with_label_values(&[&format!("flight-query-{}", stream_name)])
Expand Down
4 changes: 2 additions & 2 deletions server/src/handlers/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use serde_json::Value;

use crate::option::CONFIG;

use self::{cluster::get_ingestor_info, query::Query};
use self::{cluster::get_ingestor_info_storage, query::Query};

pub(crate) mod about;
mod cache;
Expand Down Expand Up @@ -99,7 +99,7 @@ pub async fn fetch_schema(stream_name: &str) -> anyhow::Result<arrow_schema::Sch
pub async fn send_query_request_to_ingestor(query: &Query) -> anyhow::Result<Vec<Value>> {
// send the query request to the ingestor
let mut res = vec![];
let ima = get_ingestor_info().await?;
let ima = get_ingestor_info_storage().await?;

for im in ima.iter() {
let uri = format!(
Expand Down
Loading