Skip to content

refactor: utils/time parsing #1024

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 36 additions & 32 deletions src/handlers/airplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use crate::utils::arrow::flight::{
append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc,
send_to_ingester,
};
use crate::utils::time::TimeRange;
use arrow_flight::{
flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData,
FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc,
Expand Down Expand Up @@ -149,6 +150,8 @@ impl FlightService for AirServiceImpl {
Status::internal("Failed to create logical plan")
})?;

let time_range = TimeRange::parse_human_time(&ticket.start_time, &ticket.end_time)
.map_err(|e| Status::internal(e.to_string()))?;
// create a visitor to extract the table name
let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);
Expand Down Expand Up @@ -184,8 +187,7 @@ impl FlightService for AirServiceImpl {
query_cache_manager,
&stream_name,
user_id,
&ticket.start_time,
&ticket.end_time,
&time_range,
&ticket.query,
ticket.send_null,
ticket.fields,
Expand All @@ -200,38 +202,40 @@ impl FlightService for AirServiceImpl {
.map_err(|err| Status::internal(err.to_string()))?;

// map payload to query
let mut query = into_query(&ticket, &session_state)
let mut query = into_query(&ticket, &session_state, time_range)
.await
.map_err(|_| Status::internal("Failed to parse query"))?;

let event =
if send_to_ingester(query.start.timestamp_millis(), query.end.timestamp_millis()) {
let sql = format!("select * from {}", &stream_name);
let start_time = ticket.start_time.clone();
let end_time = ticket.end_time.clone();
let out_ticket = json!({
"query": sql,
"startTime": start_time,
"endTime": end_time
})
.to_string();

let ingester_metadatas = get_ingestor_info()
.await
.map_err(|err| Status::failed_precondition(err.to_string()))?;
let mut minute_result: Vec<RecordBatch> = vec![];

for im in ingester_metadatas {
if let Ok(mut batches) = run_do_get_rpc(im, out_ticket.clone()).await {
minute_result.append(&mut batches);
}
let event = if send_to_ingester(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the following is not my change, it is a side effect of cargo fmt

query.time_range.start.timestamp_millis(),
query.time_range.end.timestamp_millis(),
) {
let sql = format!("select * from {}", &stream_name);
let start_time = ticket.start_time.clone();
let end_time = ticket.end_time.clone();
let out_ticket = json!({
"query": sql,
"startTime": start_time,
"endTime": end_time
})
.to_string();

let ingester_metadatas = get_ingestor_info()
.await
.map_err(|err| Status::failed_precondition(err.to_string()))?;
let mut minute_result: Vec<RecordBatch> = vec![];

for im in ingester_metadatas {
if let Ok(mut batches) = run_do_get_rpc(im, out_ticket.clone()).await {
minute_result.append(&mut batches);
}
let mr = minute_result.iter().collect::<Vec<_>>();
let event = append_temporary_events(&stream_name, mr).await?;
Some(event)
} else {
None
};
}
let mr = minute_result.iter().collect::<Vec<_>>();
let event = append_temporary_events(&stream_name, mr).await?;
Some(event)
} else {
None
};

// try authorize
match Users.authorize(key.clone(), rbac::role::Action::Query, None, None) {
Expand Down Expand Up @@ -263,8 +267,8 @@ impl FlightService for AirServiceImpl {
query_cache_manager,
&stream_name,
&records,
query.start.to_rfc3339(),
query.end.to_rfc3339(),
query.time_range.start.to_rfc3339(),
query.time_range.end.to_rfc3339(),
ticket.query,
)
.await
Expand Down
63 changes: 14 additions & 49 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use std::time::Instant;
use crate::event::error::EventError;
use crate::handlers::http::fetch_schema;
use crate::metadata::STREAM_INFO;
use crate::utils::time::{TimeParseError, TimeRange};
use arrow_array::RecordBatch;

use crate::event::commit_schema;
Expand Down Expand Up @@ -84,6 +85,10 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
.await?
}
};

let time_range =
TimeRange::parse_human_time(&query_request.start_time, &query_request.end_time)?;

// create a visitor to extract the table name
let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);
Expand Down Expand Up @@ -114,8 +119,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
query_cache_manager,
stream,
user_id,
&query_request.start_time,
&query_request.end_time,
&time_range,
&query_request.query,
query_request.send_null,
query_request.fields,
Expand All @@ -127,7 +131,7 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon

let tables = visitor.into_inner();
update_schema_when_distributed(tables).await?;
let mut query: LogicalQuery = into_query(&query_request, &session_state).await?;
let mut query: LogicalQuery = into_query(&query_request, &session_state, time_range).await?;

let creds = extract_session_key_from_req(&req)?;
let permissions = Users.get_permissions(&creds);
Expand All @@ -147,8 +151,8 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
query_cache_manager,
&table_name,
&records,
query.start.to_rfc3339(),
query.end.to_rfc3339(),
query.time_range.start.to_rfc3339(),
query.time_range.end.to_rfc3339(),
query_request.query,
)
.await
Expand Down Expand Up @@ -269,8 +273,7 @@ pub async fn get_results_from_cache(
query_cache_manager: Option<&QueryCacheManager>,
stream: &str,
user_id: Option<&str>,
start_time: &str,
end_time: &str,
TimeRange { start, end }: &TimeRange,
query: &str,
send_null: bool,
send_fields: bool,
Expand All @@ -295,8 +298,6 @@ pub async fn get_results_from_cache(

let mut query_cache = query_cache_manager.get_cache(stream, user_id).await?;

let (start, end) = parse_human_time(start_time, end_time)?;

let file_path = query_cache.get_file(&CacheMetadata::new(
query.to_string(),
start.to_rfc3339(),
Expand Down Expand Up @@ -391,6 +392,7 @@ impl FromRequest for Query {
pub async fn into_query(
query: &Query,
session_state: &SessionState,
time_range: TimeRange,
) -> Result<LogicalQuery, QueryError> {
if query.query.is_empty() {
return Err(QueryError::EmptyQuery);
Expand All @@ -404,42 +406,13 @@ pub async fn into_query(
return Err(QueryError::EmptyEndTime);
}

let (start, end) = parse_human_time(&query.start_time, &query.end_time)?;

if start.timestamp() > end.timestamp() {
return Err(QueryError::StartTimeAfterEndTime);
}

Ok(crate::query::Query {
raw_logical_plan: session_state.create_logical_plan(&query.query).await?,
start,
end,
time_range,
filter_tag: query.filter_tags.clone(),
})
}

fn parse_human_time(
start_time: &str,
end_time: &str,
) -> Result<(DateTime<Utc>, DateTime<Utc>), QueryError> {
let start: DateTime<Utc>;
let end: DateTime<Utc>;

if end_time == "now" {
end = Utc::now();
start = end - chrono::Duration::from_std(humantime::parse_duration(start_time)?)?;
} else {
start = DateTime::parse_from_rfc3339(start_time)
.map_err(|_| QueryError::StartTimeParse)?
.into();
end = DateTime::parse_from_rfc3339(end_time)
.map_err(|_| QueryError::EndTimeParse)?
.into();
};

Ok((start, end))
}

/// unused for now, might need it in the future
#[allow(unused)]
fn transform_query_for_ingestor(query: &Query) -> Option<Query> {
Expand Down Expand Up @@ -485,16 +458,8 @@ pub enum QueryError {
EmptyStartTime,
#[error("End time cannot be empty")]
EmptyEndTime,
#[error("Could not parse start time correctly")]
StartTimeParse,
#[error("Could not parse end time correctly")]
EndTimeParse,
#[error("While generating times for 'now' failed to parse duration")]
NotValidDuration(#[from] humantime::DurationError),
#[error("Parsed duration out of range")]
OutOfRange(#[from] chrono::OutOfRangeError),
#[error("Start time cannot be greater than the end time")]
StartTimeAfterEndTime,
#[error("Error while parsing provided time range: {0}")]
TimeParse(#[from] TimeParseError),
#[error("Unauthorized")]
Unauthorized,
#[error("Datafusion Error: {0}")]
Expand Down
12 changes: 6 additions & 6 deletions src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use crate::event;
use crate::metadata::STREAM_INFO;
use crate::option::CONFIG;
use crate::storage::{ObjectStorageProvider, StorageDir};
use crate::utils::time::TimeRange;

pub static QUERY_SESSION: Lazy<SessionContext> =
Lazy::new(|| Query::create_session_context(CONFIG.storage()));
Expand All @@ -54,8 +55,7 @@ pub static QUERY_SESSION: Lazy<SessionContext> =
#[derive(Debug)]
pub struct Query {
pub raw_logical_plan: LogicalPlan,
pub start: DateTime<Utc>,
pub end: DateTime<Utc>,
pub time_range: TimeRange,
pub filter_tag: Option<Vec<String>>,
}

Expand Down Expand Up @@ -164,8 +164,8 @@ impl Query {
LogicalPlan::Explain(plan) => {
let transformed = transform(
plan.plan.as_ref().clone(),
self.start.naive_utc(),
self.end.naive_utc(),
self.time_range.start.naive_utc(),
self.time_range.end.naive_utc(),
filters,
time_partition,
);
Expand All @@ -182,8 +182,8 @@ impl Query {
x => {
transform(
x,
self.start.naive_utc(),
self.end.naive_utc(),
self.time_range.start.naive_utc(),
self.time_range.end.naive_utc(),
filters,
time_partition,
)
Expand Down
1 change: 1 addition & 0 deletions src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod actix;
pub mod arrow;
pub mod header_parsing;
pub mod json;
pub mod time;
pub mod uid;
pub mod update;
use crate::handlers::http::rbac::RBACError;
Expand Down
58 changes: 58 additions & 0 deletions src/utils/time.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use chrono::{DateTime, Utc};

#[derive(Debug, thiserror::Error)]
pub enum TimeParseError {
#[error("Parsing humantime")]
HumanTime(#[from] humantime::DurationError),
#[error("Out of Range")]
OutOfRange(#[from] chrono::OutOfRangeError),
#[error("Error parsing time: {0}")]
Chrono(#[from] chrono::ParseError),
#[error("Start time cannot be greater than the end time")]
StartTimeAfterEndTime,
}

#[derive(Debug)]
pub struct TimeRange {
pub start: DateTime<Utc>,
pub end: DateTime<Utc>,
}

impl TimeRange {
pub fn parse_human_time(start_time: &str, end_time: &str) -> Result<Self, TimeParseError> {
let start: DateTime<Utc>;
let end: DateTime<Utc>;

if end_time == "now" {
end = Utc::now();
start = end - chrono::Duration::from_std(humantime::parse_duration(start_time)?)?;
} else {
start = DateTime::parse_from_rfc3339(start_time)?.into();
end = DateTime::parse_from_rfc3339(end_time)?.into();
};

if start > end {
return Err(TimeParseError::StartTimeAfterEndTime);
}

Ok(Self { start, end })
}
}
Loading