Skip to content

Commit 5875a20

Browse files
author
Devdutt Shenoi
authored
feat: rename API datebin to counts (#1103)
1 parent 35efb17 commit 5875a20

File tree

4 files changed

+56
-46
lines changed

4 files changed

+56
-46
lines changed

src/handlers/http/modal/query_server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ impl ParseableServer for QueryServer {
5252
config
5353
.service(
5454
web::scope(&base_path())
55-
.service(Server::get_date_bin())
5655
.service(Server::get_correlation_webscope())
5756
.service(Server::get_query_factory())
5857
.service(Server::get_liveness_factory())
@@ -65,6 +64,7 @@ impl ParseableServer for QueryServer {
6564
.service(Server::get_llm_webscope())
6665
.service(Server::get_oauth_webscope(oidc_client))
6766
.service(Self::get_user_role_webscope())
67+
.service(Server::get_counts_webscope())
6868
.service(Server::get_metrics_webscope())
6969
.service(Self::get_cluster_web_scope()),
7070
)

src/handlers/http/modal/server.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ impl ParseableServer for Server {
8080
.service(Self::get_llm_webscope())
8181
.service(Self::get_oauth_webscope(oidc_client))
8282
.service(Self::get_user_role_webscope())
83-
.service(Self::get_date_bin())
83+
.service(Self::get_counts_webscope())
8484
.service(Self::get_metrics_webscope()),
8585
)
8686
.service(Self::get_ingest_otel_factory())
@@ -267,9 +267,8 @@ impl Server {
267267
),
268268
)
269269
}
270-
pub fn get_date_bin() -> Resource {
271-
web::resource("/datebin")
272-
.route(web::post().to(query::get_date_bin).authorize(Action::Query))
270+
pub fn get_counts_webscope() -> Resource {
271+
web::resource("/counts").route(web::post().to(query::get_counts).authorize(Action::Query))
273272
}
274273

275274
// get the query factory

src/handlers/http/query.rs

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ use crate::event::commit_schema;
4141
use crate::metrics::QUERY_EXECUTE_TIME;
4242
use crate::option::{Mode, CONFIG};
4343
use crate::query::error::ExecuteError;
44-
use crate::query::{DateBinRequest, DateBinResponse, Query as LogicalQuery};
44+
use crate::query::{CountsRequest, CountsResponse, Query as LogicalQuery};
4545
use crate::query::{TableScanVisitor, QUERY_SESSION};
4646
use crate::rbac::Users;
4747
use crate::response::QueryResponse;
@@ -104,21 +104,24 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
104104
user_auth_for_query(&permissions, &tables)?;
105105

106106
let time = Instant::now();
107+
// Intercept `count(*)`` queries and use the counts API
107108
if let Some(column_name) = query.is_logical_plan_count_without_filters() {
108-
let date_bin_request = DateBinRequest {
109+
let counts_req = CountsRequest {
109110
stream: table_name.clone(),
110111
start_time: query_request.start_time.clone(),
111112
end_time: query_request.end_time.clone(),
112113
num_bins: 1,
113114
};
114-
let date_bin_records = date_bin_request.get_bin_density().await?;
115+
let count_records = counts_req.get_bin_density().await?;
116+
// NOTE: this should not panic, since there is atleast one bin, always
117+
let log_count = count_records[0].log_count;
115118
let response = if query_request.fields {
116119
json!({
117-
"fields": vec![&column_name],
118-
"records": vec![json!({column_name: date_bin_records[0].log_count})]
120+
"fields": [&column_name],
121+
"records": [json!({column_name: log_count})]
119122
})
120123
} else {
121-
Value::Array(vec![json!({column_name: date_bin_records[0].log_count})])
124+
Value::Array(vec![json!({column_name: log_count})])
122125
};
123126

124127
let time = time.elapsed().as_secs_f64();
@@ -148,21 +151,21 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<HttpRespons
148151
Ok(response)
149152
}
150153

151-
pub async fn get_date_bin(
154+
pub async fn get_counts(
152155
req: HttpRequest,
153-
date_bin: Json<DateBinRequest>,
156+
counts_request: Json<CountsRequest>,
154157
) -> Result<impl Responder, QueryError> {
155158
let creds = extract_session_key_from_req(&req)?;
156159
let permissions = Users.get_permissions(&creds);
157160

158161
// does user have access to table?
159-
user_auth_for_query(&permissions, &[date_bin.stream.clone()])?;
162+
user_auth_for_query(&permissions, &[counts_request.stream.clone()])?;
160163

161-
let date_bin_records = date_bin.get_bin_density().await?;
164+
let records = counts_request.get_bin_density().await?;
162165

163-
Ok(web::Json(DateBinResponse {
164-
fields: vec!["date_bin_timestamp".into(), "log_count".into()],
165-
records: date_bin_records,
166+
Ok(web::Json(CountsResponse {
167+
fields: vec!["counts_timestamp".into(), "log_count".into()],
168+
records,
166169
}))
167170
}
168171

src/query/mod.rs

Lines changed: 36 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -244,33 +244,39 @@ impl Query {
244244
}
245245
}
246246

247-
/// DateBinRecord
247+
/// Record of counts for a given time bin.
248248
#[derive(Debug, Serialize, Clone)]
249-
pub struct DateBinRecord {
250-
pub date_bin_timestamp: String,
249+
pub struct CountsRecord {
250+
/// Start time of the bin
251+
pub counts_timestamp: String,
252+
/// Number of logs in the bin
251253
pub log_count: u64,
252254
}
253255

254-
struct DateBinBounds {
256+
struct TimeBounds {
255257
start: DateTime<Utc>,
256258
end: DateTime<Utc>,
257259
}
258260

259-
/// DateBin Request.
261+
/// Request for counts, received from API/SQL query.
260262
#[derive(Debug, Deserialize, Clone)]
261263
#[serde(rename_all = "camelCase")]
262-
pub struct DateBinRequest {
264+
pub struct CountsRequest {
265+
/// Name of the stream to get counts for
263266
pub stream: String,
267+
/// Included start time for counts query
264268
pub start_time: String,
269+
/// Excluded end time for counts query
265270
pub end_time: String,
271+
/// Number of bins to divide the time range into
266272
pub num_bins: u64,
267273
}
268274

269-
impl DateBinRequest {
275+
impl CountsRequest {
270276
/// This function is supposed to read maninfest files for the given stream,
271277
/// get the sum of `num_rows` between the `startTime` and `endTime`,
272278
/// divide that by number of bins and return in a manner acceptable for the console
273-
pub async fn get_bin_density(&self) -> Result<Vec<DateBinRecord>, QueryError> {
279+
pub async fn get_bin_density(&self) -> Result<Vec<CountsRecord>, QueryError> {
274280
let time_partition = STREAM_INFO
275281
.get_time_partition(&self.stream.clone())
276282
.map_err(|err| anyhow::Error::msg(err.to_string()))?
@@ -279,22 +285,22 @@ impl DateBinRequest {
279285
// get time range
280286
let time_range = TimeRange::parse_human_time(&self.start_time, &self.end_time)?;
281287
let all_manifest_files = get_manifest_list(&self.stream, &time_range).await?;
282-
// get final date bins
283-
let final_date_bins = self.get_bins(&time_range);
288+
// get bounds
289+
let counts = self.get_bounds(&time_range);
284290

285291
// we have start and end times for each bin
286292
// we also have all the manifest files for the given time range
287293
// now we iterate over start and end times for each bin
288294
// then we iterate over the manifest files which are within that time range
289295
// we sum up the num_rows
290-
let mut date_bin_records = Vec::new();
296+
let mut counts_records = Vec::new();
291297

292-
for bin in final_date_bins {
298+
for bin in counts {
293299
// extract start and end time to compare
294-
let date_bin_timestamp = bin.start.timestamp_millis();
300+
let counts_timestamp = bin.start.timestamp_millis();
295301

296302
// Sum up the number of rows that fall within the bin
297-
let total_num_rows: u64 = all_manifest_files
303+
let log_count: u64 = all_manifest_files
298304
.iter()
299305
.flat_map(|m| &m.files)
300306
.filter_map(|f| {
@@ -315,18 +321,18 @@ impl DateBinRequest {
315321
})
316322
.sum();
317323

318-
date_bin_records.push(DateBinRecord {
319-
date_bin_timestamp: DateTime::from_timestamp_millis(date_bin_timestamp)
324+
counts_records.push(CountsRecord {
325+
counts_timestamp: DateTime::from_timestamp_millis(counts_timestamp)
320326
.unwrap()
321327
.to_rfc3339(),
322-
log_count: total_num_rows,
328+
log_count,
323329
});
324330
}
325-
Ok(date_bin_records)
331+
Ok(counts_records)
326332
}
327333

328334
/// Calculate the end time for each bin based on the number of bins
329-
fn get_bins(&self, time_range: &TimeRange) -> Vec<DateBinBounds> {
335+
fn get_bounds(&self, time_range: &TimeRange) -> Vec<TimeBounds> {
330336
let total_minutes = time_range
331337
.end
332338
.signed_duration_since(time_range.start)
@@ -337,9 +343,9 @@ impl DateBinRequest {
337343
let remainder = total_minutes % self.num_bins;
338344
let have_remainder = remainder > 0;
339345

340-
// now create multiple bins [startTime, endTime)
346+
// now create multiple bounds [startTime, endTime)
341347
// Should we exclude the last one???
342-
let mut final_date_bins = vec![];
348+
let mut bounds = vec![];
343349

344350
let mut start = time_range.start;
345351

@@ -352,32 +358,34 @@ impl DateBinRequest {
352358
// Create bins for all but the last date
353359
for _ in 0..loop_end {
354360
let end = start + Duration::minutes(quotient as i64);
355-
final_date_bins.push(DateBinBounds { start, end });
361+
bounds.push(TimeBounds { start, end });
356362
start = end;
357363
}
358364

359365
// Add the last bin, accounting for any remainder, should we include it?
360366
if have_remainder {
361-
final_date_bins.push(DateBinBounds {
367+
bounds.push(TimeBounds {
362368
start,
363369
end: start + Duration::minutes(remainder as i64),
364370
});
365371
} else {
366-
final_date_bins.push(DateBinBounds {
372+
bounds.push(TimeBounds {
367373
start,
368374
end: start + Duration::minutes(quotient as i64),
369375
});
370376
}
371377

372-
final_date_bins
378+
bounds
373379
}
374380
}
375381

376-
/// DateBin Response.
382+
/// Response for the counts API
377383
#[derive(Debug, Serialize, Clone)]
378-
pub struct DateBinResponse {
384+
pub struct CountsResponse {
385+
/// Fields in the log stream
379386
pub fields: Vec<String>,
380-
pub records: Vec<DateBinRecord>,
387+
/// Records in the response
388+
pub records: Vec<CountsRecord>,
381389
}
382390

383391
#[derive(Debug, Default)]

0 commit comments

Comments
 (0)