Skip to content

Commit a4ebbd5

Browse files
committed
chore(cubestore): Upgrade DF: post_process_columns aggregate index maintaining sort order
1 parent 5144594 commit a4ebbd5

File tree

1 file changed

+9
-9
lines changed
  • rust/cubestore/cubestore/src/store

1 file changed

+9
-9
lines changed

rust/cubestore/cubestore/src/store/mod.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ pub mod compaction;
22

33
use async_trait::async_trait;
44
use datafusion::arrow::compute::{concat_batches, lexsort_to_indices, SortColumn, SortOptions};
5+
use datafusion::physical_expr::PhysicalSortExpr;
56
use datafusion::physical_plan::collect;
67
use datafusion::physical_plan::common::collect as common_collect;
78
use datafusion::physical_plan::empty::EmptyExec;
@@ -1306,33 +1307,30 @@ impl ChunkStore {
13061307

13071308
let batch = RecordBatch::try_new(schema.clone(), data)?;
13081309

1309-
let input = Arc::new(MemoryExec::try_new(&[vec![batch]], schema.clone(), None)?);
1310+
let memory_exec = MemoryExec::try_new(&[vec![batch]], schema.clone(), None)?;
13101311

13111312
let key_size = index.get_row().sort_key_size() as usize;
13121313
let mut groups = Vec::with_capacity(key_size);
1314+
let mut lex_ordering = Vec::<PhysicalSortExpr>::with_capacity(key_size);
13131315
for i in 0..key_size {
13141316
let f = schema.field(i);
13151317
let col: Arc<dyn PhysicalExpr> =
13161318
Arc::new(FusionColumn::new(f.name().as_str(), i));
1317-
groups.push((col, f.name().clone()));
1319+
groups.push((col.clone(), f.name().clone()));
1320+
lex_ordering.push(PhysicalSortExpr::new(col, SortOptions::default()));
13181321
}
13191322

1323+
let input = Arc::new(memory_exec.with_sort_information(vec![lex_ordering]));
1324+
13201325
let aggregates = table
13211326
.get_row()
13221327
.aggregate_columns()
13231328
.iter()
13241329
.map(|aggr_col| aggr_col.aggregate_expr(&schema))
13251330
.collect::<Result<Vec<_>, _>>()?;
13261331

1327-
// TODO upgrade DF
1328-
// let output_sort_order = (0..index.get_row().sort_key_size())
1329-
// .map(|x| x as usize)
1330-
// .collect();
1331-
1332-
// TODO upgrade DF: this is probably correct, but find out if we now need to supply some filter_expr from some loose end.
13331332
let filter_expr: Vec<Option<Arc<dyn PhysicalExpr>>> = vec![None; aggregates.len()];
13341333

1335-
// TODO merge sort
13361334
let aggregate = Arc::new(AggregateExec::try_new(
13371335
AggregateMode::Single,
13381336
PhysicalGroupBy::new_single(groups),
@@ -1342,6 +1340,8 @@ impl ChunkStore {
13421340
schema.clone(),
13431341
)?);
13441342

1343+
assert!(aggregate.properties().output_ordering().is_some_and(|ordering| ordering.len() == key_size));
1344+
13451345
let batches = collect(aggregate, Arc::new(TaskContext::default())).await?;
13461346
if batches.is_empty() {
13471347
Ok(vec![])

0 commit comments

Comments
 (0)