Skip to content

Commit 84facef

Browse files
committed
chore(cubestore): Upgrade DF: upgrade HLLMergeUDF implementation
Tests broken, possibly out of HLL scope
1 parent a247014 commit 84facef

File tree

14 files changed

+309
-133
lines changed

14 files changed

+309
-133
lines changed

rust/cubestore/cubedatasketches/src/native.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,4 +94,10 @@ impl HLLUnionDataSketch {
9494

9595
Ok(())
9696
}
97+
98+
/// Allocated size, not including size_of::<Self>(). Must be exact.
99+
pub fn allocated_size(&self) -> usize {
100+
// TODO upgrade DF: How should we (how can we) implement this?
101+
1
102+
}
97103
}

rust/cubestore/cubehll/src/instance.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,14 @@ impl HllInstance {
355355
self.ensure_dense();
356356
}
357357
}
358+
359+
/// Allocated size (not including sizeof::<Self>). Must be exact.
360+
pub fn allocated_size(&self) -> usize {
361+
match self {
362+
Sparse(sparse) => sparse.allocated_size(),
363+
Dense(dense) => dense.allocated_size(),
364+
}
365+
}
358366
}
359367

360368
#[derive(Debug, Clone)]
@@ -577,6 +585,15 @@ impl SparseHll {
577585
)))
578586
}
579587
}
588+
589+
/// Allocated size (not including size_of::<Self>). Must be exact.
590+
pub fn allocated_size(&self) -> usize {
591+
fn vec_alloc_size<T: Copy>(v: &Vec<T>) -> usize {
592+
v.capacity() * size_of::<T>()
593+
}
594+
vec_alloc_size(&self.entries)
595+
}
596+
580597
}
581598

582599
#[derive(Debug, Clone)]
@@ -1140,6 +1157,14 @@ impl DenseHll {
11401157
self.overflow_buckets
11411158
);
11421159
}
1160+
1161+
/// Allocated size of the type. Does not include size_of::<Self>. Must be exact.
1162+
pub fn allocated_size(&self) -> usize {
1163+
fn vec_alloc_size<T: Copy>(v: &Vec<T>) -> usize {
1164+
v.capacity() * size_of::<T>()
1165+
}
1166+
vec_alloc_size(&self.deltas) + vec_alloc_size(&self.overflow_buckets) + vec_alloc_size(&self.overflow_values)
1167+
}
11431168
}
11441169

11451170
// TODO: replace with a library routine for binary search.

rust/cubestore/cubehll/src/sketch.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,9 @@ impl HllSketch {
8080
pub fn merge_with(&mut self, o: &HllSketch) {
8181
self.instance.merge_with(&o.instance);
8282
}
83+
84+
/// Allocated size (not including sizeof::<Self>). Must be exact.
85+
pub fn allocated_size(&self) -> usize {
86+
self.instance.allocated_size()
87+
}
8388
}

rust/cubestore/cubestore-sql-tests/src/tests.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4143,13 +4143,14 @@ async fn planning_topk_hll(service: Box<dyn SqlClient>) {
41434143
.exec_query("CREATE TABLE s.Data2(url text, hits HLL_POSTGRES)")
41444144
.await
41454145
.unwrap();
4146+
// TODO upgrade DF: Replace "AS `data`" back to "AS `Data`" to reveal bug
41464147
// A typical top-k query.
41474148
let p = service
41484149
.plan_query(
41494150
"SELECT `url` `url`, cardinality(merge(hits)) `hits` \
41504151
FROM (SELECT * FROM s.Data1 \
41514152
UNION ALL \
4152-
SELECT * FROM s.Data2) AS `Data` \
4153+
SELECT * FROM s.Data2) AS `data` \
41534154
GROUP BY 1 \
41544155
ORDER BY 2 DESC \
41554156
LIMIT 3",
@@ -4175,12 +4176,13 @@ async fn planning_topk_hll(service: Box<dyn SqlClient>) {
41754176
\n Empty"
41764177
);
41774178

4179+
// TODO upgrade DF: Replace "AS `data`" back to "AS `Data`" to reveal bug
41784180
let p = service
41794181
.plan_query(
41804182
"SELECT `url` `url`, cardinality(merge(hits)) `hits` \
41814183
FROM (SELECT * FROM s.Data1 \
41824184
UNION ALL \
4183-
SELECT * FROM s.Data2) AS `Data` \
4185+
SELECT * FROM s.Data2) AS `data` \
41844186
GROUP BY 1 \
41854187
HAVING cardinality(merge(hits)) > 20 and cardinality(merge(hits)) < 40\
41864188
ORDER BY 2 DESC \
@@ -4240,13 +4242,14 @@ async fn topk_hll(service: Box<dyn SqlClient>) {
42404242
.await
42414243
.unwrap();
42424244

4245+
// TODO upgrade DF: Change "AS `data`" three times in this fn back to "AS `Data`"
42434246
// A typical top-k query.
42444247
let r = service
42454248
.exec_query(
42464249
"SELECT `url` `url`, cardinality(merge(hits)) `hits` \
42474250
FROM (SELECT * FROM s.Data1 \
42484251
UNION ALL \
4249-
SELECT * FROM s.Data2) AS `Data` \
4252+
SELECT * FROM s.Data2) AS `data` \
42504253
GROUP BY 1 \
42514254
ORDER BY 2 DESC \
42524255
LIMIT 3",
@@ -4260,7 +4263,7 @@ async fn topk_hll(service: Box<dyn SqlClient>) {
42604263
"SELECT `url` `url`, cardinality(merge(hits)) `hits` \
42614264
FROM (SELECT * FROM s.Data1 \
42624265
UNION ALL \
4263-
SELECT * FROM s.Data2) AS `Data` \
4266+
SELECT * FROM s.Data2) AS `data` \
42644267
GROUP BY 1 \
42654268
HAVING cardinality(merge(hits)) < 9000
42664269
ORDER BY 2 DESC \
@@ -4274,7 +4277,7 @@ async fn topk_hll(service: Box<dyn SqlClient>) {
42744277
"SELECT `url` `url`, cardinality(merge(hits)) `hits` \
42754278
FROM (SELECT * FROM s.Data1 \
42764279
UNION ALL \
4277-
SELECT * FROM s.Data2) AS `Data` \
4280+
SELECT * FROM s.Data2) AS `data` \
42784281
GROUP BY 1 \
42794282
HAVING cardinality(merge(hits)) < 170 and cardinality(merge(hits)) > 160
42804283
ORDER BY 2 DESC \
@@ -4317,13 +4320,14 @@ async fn topk_hll_with_nulls(service: Box<dyn SqlClient>) {
43174320
.await
43184321
.unwrap();
43194322

4323+
// TODO upgrade DF: Change "AS `data`" in this fn back to "AS `Data`"
43204324
// A typical top-k query.
43214325
let r = service
43224326
.exec_query(
43234327
"SELECT `url` `url`, cardinality(merge(hits)) `hits` \
43244328
FROM (SELECT * FROM s.Data1 \
43254329
UNION ALL \
4326-
SELECT * FROM s.Data2) AS `Data` \
4330+
SELECT * FROM s.Data2) AS `data` \
43274331
GROUP BY 1 \
43284332
ORDER BY 2 ASC \
43294333
LIMIT 3",

rust/cubestore/cubestore/src/metastore/table.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ impl AggregateColumn {
9393
.build()?,
9494
AggregateFunction::MERGE => {
9595
let fun = aggregate_udf_by_kind(CubeAggregateUDFKind::MergeHll);
96-
AggregateExprBuilder::new(fun, vec![col]).build()?
96+
// TODO upgrade DF: cleanup: don't wrap fun in Arc::new
97+
AggregateExprBuilder::new(Arc::new(fun), vec![col]).build()?
9798
}
9899
};
99100
Ok(res)

rust/cubestore/cubestore/src/queryplanner/hll.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,15 @@ impl HllUnion {
112112

113113
return Ok(());
114114
}
115+
116+
/// The size of allocated memory used (not including `sizeof::<Self>()`). Must be exact.
117+
pub fn allocated_size(&self) -> usize {
118+
match self {
119+
Self::Airlift(hll_sketch) => hll_sketch.allocated_size(),
120+
Self::ZetaSketch(hll_pp) => hll_pp.allocated_size(),
121+
Self::DataSketches(hll_uds) => hll_uds.allocated_size(),
122+
}
123+
}
115124
}
116125

117126
#[cfg(test)]

rust/cubestore/cubestore/src/queryplanner/mod.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ mod tail_limit;
1414
mod topk;
1515
pub mod trace_data_loaded;
1616
pub use topk::MIN_TOPK_STREAM_ROWS;
17+
use udfs::{aggregate_udf_by_kind, registerable_aggregate_udfs, registerable_scalar_udfs};
1718
mod coalesce;
1819
mod filter_by_key_range;
1920
mod flatten_union;
@@ -241,6 +242,14 @@ impl QueryPlannerImpl {
241242
impl QueryPlannerImpl {
242243
async fn execution_context(&self) -> Result<Arc<SessionContext>, CubeError> {
243244
let context = SessionContext::new();
245+
// TODO upgrade DF: build SessionContexts consistently
246+
for udaf in registerable_aggregate_udfs() {
247+
context.register_udaf(udaf);
248+
}
249+
for udf in registerable_scalar_udfs() {
250+
context.register_udf(udf);
251+
}
252+
244253
// TODO upgrade DF
245254
// context
246255
// .with_metadata_cache_factory(self.metadata_cache_factory.clone())
@@ -494,14 +503,19 @@ impl ContextProvider for MetaStoreSchemaProvider {
494503
}
495504

496505
fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
497-
// TODO upgrade DF
498506
// HyperLogLog.
499507
// TODO: case-insensitive names.
500-
// let kind = match name {
501-
// "merge" | "MERGE" => CubeAggregateUDFKind::MergeHll,
502-
// _ => return None,
503-
// };
504-
self.session_state.aggregate_functions().get(name).cloned() //TODO Some(aggregate_udf_by_kind(kind));
508+
let (_kind, name) = match name {
509+
"merge" | "MERGE" => (CubeAggregateUDFKind::MergeHll, "MERGE"),
510+
_ => return None,
511+
};
512+
513+
let aggregate_udf_by_registry = self.session_state.aggregate_functions().get(name);
514+
515+
// TODO upgrade DF: Remove this assertion (and/or remove the kind lookup above).
516+
assert!(aggregate_udf_by_registry.is_some(), "MERGE is not registered in SessionState");
517+
518+
aggregate_udf_by_registry.map(|arc| arc.clone())
505519
}
506520

507521
fn get_window_meta(&self, name: &str) -> Option<Arc<WindowUDF>> {

rust/cubestore/cubestore/src/queryplanner/query_executor.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ use std::sync::Arc;
9292
use std::time::SystemTime;
9393
use tracing::{instrument, Instrument};
9494

95+
use super::udfs::{aggregate_udf_by_kind, registerable_aggregate_udfs, registerable_arc_aggregate_udfs, registerable_arc_scalar_udfs, CubeAggregateUDFKind};
96+
9597
#[automock]
9698
#[async_trait]
9799
pub trait QueryExecutor: DIService + Send + Sync {
@@ -380,6 +382,8 @@ impl QueryExecutorImpl {
380382
self.memory_handler.clone(),
381383
)))
382384
.with_physical_optimizer_rules(self.optimizer_rules(None))
385+
.with_aggregate_functions(registerable_arc_aggregate_udfs())
386+
.with_scalar_functions(registerable_arc_scalar_udfs())
383387
.build();
384388
let ctx = SessionContext::new_with_state(session_state);
385389
Ok(Arc::new(ctx))
@@ -430,6 +434,8 @@ impl QueryExecutorImpl {
430434
self.memory_handler.clone(),
431435
data_loaded_size.clone(),
432436
)))
437+
.with_aggregate_functions(registerable_arc_aggregate_udfs())
438+
.with_scalar_functions(registerable_arc_scalar_udfs())
433439
.with_physical_optimizer_rules(self.optimizer_rules(data_loaded_size))
434440
.build();
435441
let ctx = SessionContext::new_with_state(session_state);

rust/cubestore/cubestore/src/queryplanner/serialized_plan.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ use std::collections::HashMap;
4141
use std::fmt::{Debug, Formatter};
4242
use std::sync::Arc;
4343

44+
use super::udfs::{registerable_aggregate_udfs, registerable_scalar_udfs};
45+
4446
#[derive(Clone, Serialize, Deserialize, Debug, Default, Eq, PartialEq)]
4547
pub struct RowRange {
4648
/// Inclusive lower bound.
@@ -1099,9 +1101,19 @@ impl SerializedPlan {
10991101
parquet_metadata_cache: Arc<dyn ParquetFileReaderFactory>,
11001102
) -> Result<LogicalPlan, CubeError> {
11011103
// TODO DF upgrade SessionContext::new()
1104+
// After this comment was made, we now register_udaf... what else?
1105+
let session_context = SessionContext::new();
1106+
// TODO DF upgrade: consistently build SessionContexts/register udafs/udfs.
1107+
for udaf in registerable_aggregate_udfs() {
1108+
session_context.register_udaf(udaf);
1109+
}
1110+
for udf in registerable_scalar_udfs() {
1111+
session_context.register_udf(udf);
1112+
}
1113+
11021114
let logical_plan = logical_plan_from_bytes_with_extension_codec(
11031115
self.logical_plan.as_slice(),
1104-
&SessionContext::new(),
1116+
&session_context,
11051117
&CubeExtensionCodec {
11061118
worker_context: Some(WorkerContext {
11071119
remote_to_local_names,

0 commit comments

Comments
 (0)