Skip to content

Commit 6316cfd

Browse files
committed
chore(cubestore): Upgrade DF: fix partial aggregate not pushed under ClusterSend
1 parent 3f8b2df commit 6316cfd

File tree

4 files changed

+13
-28
lines changed

4 files changed

+13
-28
lines changed

rust/cubestore/cubestore-sql-tests/src/multiproc.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ where
3737
for inputs in worker_inputs {
3838
let (send_done, recv_done) = ipc_channel::ipc::bytes_channel().unwrap();
3939
let args = (send_init.clone(), recv_done, inputs, timeout);
40-
let handle = respawn(args, &[], &[]).unwrap();
40+
let handle = respawn(args, &["--".to_string(), "--nocapture".to_string()], &[]).unwrap();
4141
// Ensure we signal completion to all started workers even if errors occur along the way.
4242
join_workers.push(scopeguard::guard(
4343
(send_done, handle),

rust/cubestore/cubestore-sql-tests/tests/cluster.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use serde_derive::{Deserialize, Serialize};
66

77
use cubestore::config::Config;
88
use cubestore::util::respawn;
9+
use cubestore::util::respawn::register_pushdownable_envs;
910
use cubestore_sql_tests::multiproc::{
1011
multiproc_child_main, run_multiproc_test, MultiProcTest, SignalInit, WaitCompletion, WorkerProc,
1112
};
@@ -16,6 +17,7 @@ const WORKER_PORTS: [u16; 2] = [51337, 51338];
1617

1718
#[cfg(not(target_os = "windows"))]
1819
fn main() {
20+
register_pushdownable_envs(&["CUBESTORE_TEST_LOG_WORKER"]);
1921
respawn::register_handler(multiproc_child_main::<ClusterSqlTest>);
2022
respawn::init(); // TODO: logs in worker processes.
2123

@@ -99,7 +101,11 @@ impl WorkerProc<WorkerArgs> for WorkerFn {
99101
}
100102
Config::test(&test_name)
101103
.update_config(|mut c| {
102-
c.select_worker_pool_size = 2;
104+
c.select_worker_pool_size = if std::env::var("CUBESTORE_TEST_LOG_WORKER").is_ok() {
105+
0
106+
} else {
107+
2
108+
};
103109
c.server_name = format!("localhost:{}", WORKER_PORTS[id]);
104110
c.worker_bind_address = Some(c.server_name.clone());
105111
c.metastore_remote_address = Some(format!("localhost:{}", METASTORE_PORT));

rust/cubestore/cubestore/src/queryplanner/planning.rs

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1627,30 +1627,15 @@ impl CubeExtensionPlanner {
16271627
}
16281628
// Note that MergeExecs are added automatically when needed.
16291629
if let Some(c) = self.cluster.as_ref() {
1630-
let mut send: Arc<dyn ExecutionPlan> = Arc::new(ClusterSendExec::new(
1630+
Ok(Arc::new(ClusterSendExec::new(
16311631
schema,
16321632
c.clone(),
16331633
self.serialized_plan.clone(),
16341634
snapshots,
16351635
input,
16361636
use_streaming,
1637-
)?);
1638-
// TODO upgrade DF
1639-
if send.properties().partitioning.partition_count() != 1 {
1640-
send = Arc::new(RepartitionExec::try_new(
1641-
send,
1642-
Partitioning::UnknownPartitioning(1),
1643-
)?);
1644-
}
1645-
Ok(send)
1637+
)?))
16461638
} else {
1647-
// TODO upgrade DF
1648-
if input.output_partitioning().partition_count() != 1 {
1649-
input = Arc::new(RepartitionExec::try_new(
1650-
input,
1651-
Partitioning::UnknownPartitioning(1),
1652-
)?);
1653-
}
16541639
Ok(Arc::new(WorkerExec {
16551640
input,
16561641
schema,

rust/cubestore/cubestore/src/queryplanner/query_executor.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -748,12 +748,9 @@ impl CubeTable {
748748
}
749749

750750
let schema = table_projected_schema;
751-
let partition_num = partition_execs
752-
.iter()
753-
.map(|c| c.properties().partitioning.partition_count())
754-
.sum();
751+
let partition_num = partition_execs.len();
755752

756-
let read_data = Arc::new(CubeTableExec {
753+
let read_data: Arc<dyn ExecutionPlan> = Arc::new(CubeTableExec {
757754
schema: schema.clone(),
758755
partition_execs,
759756
index_snapshot: self.index_snapshot.clone(),
@@ -856,10 +853,7 @@ impl CubeTable {
856853
.collect::<Result<Vec<_>, _>>()?;
857854
Arc::new(SortPreservingMergeExec::new(join_columns, read_data))
858855
} else {
859-
Arc::new(RepartitionExec::try_new(
860-
read_data,
861-
Partitioning::UnknownPartitioning(1),
862-
)?)
856+
read_data
863857
};
864858

865859
Ok(plan)

0 commit comments

Comments
 (0)