Skip to content

Commit 0b29eee

Browse files
committed
chore(cubestore): Upgrade DF: Fix compaction merge_chunks in unique_key_columns case
1 parent 4f1830d commit 0b29eee

File tree

2 files changed

+14
-15
lines changed

2 files changed

+14
-15
lines changed

rust/cubestore/cubestore/src/queryplanner/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use udfs::{aggregate_udf_by_kind, registerable_aggregate_udfs, registerable_scal
1818
mod filter_by_key_range;
1919
mod flatten_union;
2020
pub mod info_schema;
21-
mod merge_sort;
21+
pub mod merge_sort;
2222
pub mod metadata_cache;
2323
pub mod providers;
2424
#[cfg(test)]

rust/cubestore/cubestore/src/store/compaction.rs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::metastore::{
99
deactivate_table_on_corrupt_data, table::Table, Chunk, IdRow, Index, IndexType, MetaStore,
1010
Partition, PartitionData,
1111
};
12+
use crate::queryplanner::merge_sort::LastRowByUniqueKeyExec;
1213
use crate::queryplanner::metadata_cache::MetadataCacheFactory;
1314
use crate::queryplanner::trace_data_loaded::{DataLoadedSize, TraceDataLoadedExec};
1415
use crate::remotefs::{ensure_temp_file_is_dropped, RemoteFs};
@@ -1406,20 +1407,18 @@ pub async fn merge_chunks(
14061407
schema,
14071408
)?);
14081409
} else if let Some(key_columns) = unique_key_columns {
1409-
todo!();
1410-
// TODO upgrade DF
1411-
// res = Arc::new(LastRowByUniqueKeyExec::try_new(
1412-
// res.clone(),
1413-
// key_columns
1414-
// .iter()
1415-
// .map(|c| {
1416-
// datafusion::physical_plan::expressions::Column::new_with_schema(
1417-
// c.get_name().as_str(),
1418-
// &res.schema(),
1419-
// )
1420-
// })
1421-
// .collect::<Result<Vec<_>, _>>()?,
1422-
// )?);
1410+
res = Arc::new(LastRowByUniqueKeyExec::try_new(
1411+
res.clone(),
1412+
key_columns
1413+
.iter()
1414+
.map(|c| {
1415+
datafusion::physical_plan::expressions::Column::new_with_schema(
1416+
c.get_name().as_str(),
1417+
&res.schema(),
1418+
)
1419+
})
1420+
.collect::<Result<Vec<_>, _>>()?,
1421+
)?);
14231422
}
14241423

14251424
Ok(res.execute(0, Arc::new(TaskContext::default()))?)

0 commit comments

Comments
 (0)