Skip to content

Commit 51e48e2

Browse files
committed
chore(cubestore): Upgrade DF: Split SerializedPlan type into PreSerializedPlan
1 parent 9cbf9d0 commit 51e48e2

File tree

6 files changed

+175
-43
lines changed

6 files changed

+175
-43
lines changed

rust/cubestore/cubestore/src/queryplanner/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ pub mod serialized_plan;
1313
mod tail_limit;
1414
mod topk;
1515
pub mod trace_data_loaded;
16+
use serialized_plan::PreSerializedPlan;
1617
pub use topk::MIN_TOPK_STREAM_ROWS;
1718
use udfs::{aggregate_udf_by_kind, registerable_aggregate_udfs, registerable_scalar_udfs};
1819
mod filter_by_key_range;
@@ -122,7 +123,7 @@ crate::di_service!(QueryPlannerImpl, [QueryPlanner]);
122123

123124
pub enum QueryPlan {
124125
Meta(LogicalPlan),
125-
Select(SerializedPlan, /*workers*/ Vec<String>),
126+
Select(PreSerializedPlan, /*workers*/ Vec<String>),
126127
}
127128

128129
#[async_trait]
@@ -191,7 +192,7 @@ impl QueryPlanner for QueryPlannerImpl {
191192
&meta.multi_part_subtree,
192193
)?;
193194
QueryPlan::Select(
194-
SerializedPlan::try_new(logical_plan, meta, trace_obj).await?,
195+
PreSerializedPlan::try_new(logical_plan, meta, trace_obj)?,
195196
workers,
196197
)
197198
} else {

rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,19 @@ use rewrite_plan::rewrite_physical_plan;
3030
use std::sync::Arc;
3131
use trace_data_loaded::add_trace_data_loaded_exec;
3232

33+
use super::serialized_plan::PreSerializedPlan;
34+
3335
pub struct CubeQueryPlanner {
3436
cluster: Option<Arc<dyn Cluster>>,
35-
serialized_plan: Arc<SerializedPlan>,
37+
serialized_plan: Arc<PreSerializedPlan>,
3638
memory_handler: Arc<dyn MemoryHandler>,
3739
data_loaded_size: Option<Arc<DataLoadedSize>>,
3840
}
3941

4042
impl CubeQueryPlanner {
4143
pub fn new_on_router(
4244
cluster: Arc<dyn Cluster>,
43-
serialized_plan: Arc<SerializedPlan>,
45+
serialized_plan: Arc<PreSerializedPlan>,
4446
memory_handler: Arc<dyn MemoryHandler>,
4547
) -> CubeQueryPlanner {
4648
CubeQueryPlanner {
@@ -52,7 +54,7 @@ impl CubeQueryPlanner {
5254
}
5355

5456
pub fn new_on_worker(
55-
serialized_plan: Arc<SerializedPlan>,
57+
serialized_plan: Arc<PreSerializedPlan>,
5658
memory_handler: Arc<dyn MemoryHandler>,
5759
data_loaded_size: Option<Arc<DataLoadedSize>>,
5860
) -> CubeQueryPlanner {

rust/cubestore/cubestore/src/queryplanner/planning.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ use std::cmp::Ordering;
7272
use std::hash::{Hash, Hasher};
7373
use std::iter::FromIterator;
7474

75+
use super::serialized_plan::PreSerializedPlan;
76+
7577
#[cfg(test)]
7678
pub async fn choose_index(
7779
p: LogicalPlan,
@@ -1585,7 +1587,7 @@ fn pull_up_cluster_send(mut p: LogicalPlan) -> Result<LogicalPlan, DataFusionErr
15851587

15861588
pub struct CubeExtensionPlanner {
15871589
pub cluster: Option<Arc<dyn Cluster>>,
1588-
pub serialized_plan: Arc<SerializedPlan>,
1590+
pub serialized_plan: Arc<PreSerializedPlan>,
15891591
}
15901592

15911593
#[async_trait]

rust/cubestore/cubestore/src/queryplanner/query_executor.rs

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ use std::sync::Arc;
9393
use std::time::SystemTime;
9494
use tracing::{instrument, Instrument};
9595

96+
use super::serialized_plan::PreSerializedPlan;
9697
use super::udfs::{
9798
aggregate_udf_by_kind, registerable_aggregate_udfs, registerable_arc_aggregate_udfs,
9899
registerable_arc_scalar_udfs, CubeAggregateUDFKind,
@@ -287,19 +288,19 @@ impl QueryExecutor for QueryExecutorImpl {
287288
plan: SerializedPlan,
288289
cluster: Arc<dyn Cluster>,
289290
) -> Result<(Arc<dyn ExecutionPlan>, LogicalPlan), CubeError> {
290-
let plan_to_move = plan.logical_plan(
291+
let pre_serialized_plan = plan.to_pre_serialized(
291292
HashMap::new(),
292293
HashMap::new(),
293294
NoopParquetMetadataCache::new(),
294295
)?;
295-
let serialized_plan = Arc::new(plan);
296-
let ctx = self.router_context(cluster.clone(), serialized_plan.clone())?;
296+
let pre_serialized_plan = Arc::new(pre_serialized_plan);
297+
let ctx = self.router_context(cluster.clone(), pre_serialized_plan.clone())?;
297298
Ok((
298299
ctx.clone()
299300
.state()
300-
.create_physical_plan(&plan_to_move.clone())
301+
.create_physical_plan(pre_serialized_plan.logical_plan())
301302
.await?,
302-
plan_to_move,
303+
pre_serialized_plan.logical_plan().clone(),
303304
))
304305
}
305306

@@ -310,20 +311,20 @@ impl QueryExecutor for QueryExecutorImpl {
310311
chunk_id_to_record_batches: HashMap<u64, Vec<RecordBatch>>,
311312
data_loaded_size: Option<Arc<DataLoadedSize>>,
312313
) -> Result<(Arc<dyn ExecutionPlan>, LogicalPlan), CubeError> {
313-
let plan_to_move = plan.logical_plan(
314+
let pre_serialized_plan = plan.to_pre_serialized(
314315
remote_to_local_names,
315316
chunk_id_to_record_batches,
316317
self.parquet_metadata_cache.cache().clone(),
317318
)?;
318-
let plan = Arc::new(plan);
319-
let ctx = self.worker_context(plan.clone(), data_loaded_size)?;
319+
let pre_serialized_plan = Arc::new(pre_serialized_plan);
320+
let ctx = self.worker_context(pre_serialized_plan.clone(), data_loaded_size)?;
320321
let plan_ctx = ctx.clone();
321322
Ok((
322323
plan_ctx
323324
.state()
324-
.create_physical_plan(&plan_to_move.clone())
325+
.create_physical_plan(pre_serialized_plan.logical_plan())
325326
.await?,
326-
plan_to_move,
327+
pre_serialized_plan.logical_plan().clone(),
327328
))
328329
}
329330

@@ -372,7 +373,7 @@ impl QueryExecutorImpl {
372373
fn router_context(
373374
&self,
374375
cluster: Arc<dyn Cluster>,
375-
serialized_plan: Arc<SerializedPlan>,
376+
serialized_plan: Arc<PreSerializedPlan>,
376377
) -> Result<Arc<SessionContext>, CubeError> {
377378
let runtime = Arc::new(RuntimeEnv::default());
378379
let config = Self::session_config();
@@ -424,7 +425,7 @@ impl QueryExecutorImpl {
424425

425426
fn worker_context(
426427
&self,
427-
serialized_plan: Arc<SerializedPlan>,
428+
serialized_plan: Arc<PreSerializedPlan>,
428429
data_loaded_size: Option<Arc<DataLoadedSize>>,
429430
) -> Result<Arc<SessionContext>, CubeError> {
430431
let runtime = Arc::new(RuntimeEnv::default());
@@ -1229,7 +1230,7 @@ pub struct ClusterSendExec {
12291230
/// Never executed, only stored to allow consistent optimization on router and worker.
12301231
pub input_for_optimizations: Arc<dyn ExecutionPlan>,
12311232
pub cluster: Arc<dyn Cluster>,
1232-
pub serialized_plan: Arc<SerializedPlan>,
1233+
pub serialized_plan: Arc<PreSerializedPlan>,
12331234
pub use_streaming: bool,
12341235
}
12351236

@@ -1248,7 +1249,7 @@ pub enum InlineCompoundPartition {
12481249
impl ClusterSendExec {
12491250
pub fn new(
12501251
cluster: Arc<dyn Cluster>,
1251-
serialized_plan: Arc<SerializedPlan>,
1252+
serialized_plan: Arc<PreSerializedPlan>,
12521253
union_snapshots: &[Snapshots],
12531254
input_for_optimizations: Arc<dyn ExecutionPlan>,
12541255
use_streaming: bool,
@@ -1503,7 +1504,7 @@ impl ClusterSendExec {
15031504
}
15041505
}
15051506

1506-
pub fn worker_plans(&self) -> Vec<(String, SerializedPlan)> {
1507+
pub fn worker_plans(&self) -> Vec<(String, PreSerializedPlan)> {
15071508
let mut res = Vec::new();
15081509
for (node_name, partitions) in self.partitions.iter() {
15091510
res.push((
@@ -1517,7 +1518,7 @@ impl ClusterSendExec {
15171518
fn serialized_plan_for_partitions(
15181519
&self,
15191520
partitions: &(Vec<(u64, RowRange)>, Vec<InlineTableId>),
1520-
) -> SerializedPlan {
1521+
) -> PreSerializedPlan {
15211522
let (partitions, inline_table_ids) = partitions;
15221523
let mut ps = HashMap::<_, RowFilter>::new();
15231524
for (id, range) in partitions {
@@ -1583,13 +1584,13 @@ impl ExecutionPlan for ClusterSendExec {
15831584
let node_name = node_name.to_string();
15841585
if self.use_streaming {
15851586
// A future that yields a stream
1586-
let fut = async move { cluster.run_select_stream(&node_name, plan).await };
1587+
let fut = async move { cluster.run_select_stream(&node_name, plan.to_serialized_plan()?).await };
15871588
// Use TryStreamExt::try_flatten to flatten the stream of streams
15881589
let stream = futures::stream::once(fut).try_flatten();
15891590

15901591
Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
15911592
} else {
1592-
let record_batches = async move { cluster.run_select(&node_name, plan).await };
1593+
let record_batches = async move { cluster.run_select(&node_name, plan.to_serialized_plan()?).await };
15931594
let stream = futures::stream::once(record_batches).flat_map(|r| match r {
15941595
Ok(vec) => stream::iter(vec.into_iter().map(|b| Ok(b)).collect::<Vec<_>>()),
15951596
Err(e) => stream::iter(vec![Err(DataFusionError::Execution(e.to_string()))]),

rust/cubestore/cubestore/src/queryplanner/serialized_plan.rs

Lines changed: 136 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,16 @@ impl RowFilter {
7979
}
8080
}
8181

82+
/// SerializedPlan, but before we actually serialize the LogicalPlan.
83+
#[derive(Debug)]
84+
pub struct PreSerializedPlan {
85+
logical_plan: LogicalPlan,
86+
schema_snapshot: Arc<SchemaSnapshot>,
87+
partition_ids_to_execute: Vec<(u64, RowFilter)>,
88+
inline_table_ids_to_execute: Vec<InlineTableId>,
89+
trace_obj: Option<String>,
90+
}
91+
8292
#[derive(Clone, Serialize, Deserialize, Debug)]
8393
pub struct SerializedPlan {
8494
logical_plan: Arc<Vec<u8>>,
@@ -1052,21 +1062,31 @@ pub enum SerializedTableSource {
10521062
InlineTable(InlineTableProvider),
10531063
}
10541064

1055-
impl SerializedPlan {
1056-
pub async fn try_new(
1057-
plan: LogicalPlan,
1058-
index_snapshots: PlanningMeta,
1059-
trace_obj: Option<String>,
1060-
) -> Result<Self, CubeError> {
1065+
impl PreSerializedPlan {
1066+
pub fn to_serialized_plan(&self) -> Result<SerializedPlan, CubeError> {
10611067
let serialized_logical_plan =
10621068
datafusion_proto::bytes::logical_plan_to_bytes_with_extension_codec(
1063-
&plan,
1069+
&self.logical_plan,
10641070
&CubeExtensionCodec {
10651071
worker_context: None,
10661072
},
10671073
)?;
10681074
Ok(SerializedPlan {
10691075
logical_plan: Arc::new(serialized_logical_plan.to_vec()),
1076+
schema_snapshot: self.schema_snapshot.clone(),
1077+
partition_ids_to_execute: self.partition_ids_to_execute.clone(),
1078+
inline_table_ids_to_execute: self.inline_table_ids_to_execute.clone(),
1079+
trace_obj: self.trace_obj.clone(),
1080+
})
1081+
}
1082+
1083+
pub fn try_new(
1084+
plan: LogicalPlan,
1085+
index_snapshots: PlanningMeta,
1086+
trace_obj: Option<String>,
1087+
) -> Result<Self, CubeError> {
1088+
Ok(PreSerializedPlan {
1089+
logical_plan: plan,
10701090
schema_snapshot: Arc::new(SchemaSnapshot { index_snapshots }),
10711091
partition_ids_to_execute: Vec::new(),
10721092
inline_table_ids_to_execute: Vec::new(),
@@ -1093,6 +1113,115 @@ impl SerializedPlan {
10931113
}
10941114
}
10951115

1116+
// TODO upgrade DF: A literal copy/paste from SerializedPlan -- delete one
1117+
/// Note: avoid during normal execution, workers must filter the partitions they execute.
1118+
pub fn all_required_files(&self) -> Vec<(IdRow<Partition>, String, Option<u64>, Option<u64>)> {
1119+
self.list_files_to_download(|_| true)
1120+
}
1121+
1122+
// TODO upgrade DF: A literal copy/paste from SerializedPlan -- delete one
1123+
fn list_files_to_download(
1124+
&self,
1125+
include_partition: impl Fn(u64) -> bool,
1126+
) -> Vec<(
1127+
IdRow<Partition>,
1128+
/* file_name */ String,
1129+
/* size */ Option<u64>,
1130+
/* chunk_id */ Option<u64>,
1131+
)> {
1132+
let indexes = self.index_snapshots();
1133+
1134+
let mut files = Vec::new();
1135+
1136+
for index in indexes.iter() {
1137+
for partition in index.partitions() {
1138+
if !include_partition(partition.partition.get_id()) {
1139+
continue;
1140+
}
1141+
if let Some(file) = partition
1142+
.partition
1143+
.get_row()
1144+
.get_full_name(partition.partition.get_id())
1145+
{
1146+
files.push((
1147+
partition.partition.clone(),
1148+
file,
1149+
partition.partition.get_row().file_size(),
1150+
None,
1151+
));
1152+
}
1153+
1154+
for chunk in partition.chunks() {
1155+
if !chunk.get_row().in_memory() {
1156+
files.push((
1157+
partition.partition.clone(),
1158+
chunk.get_row().get_full_name(chunk.get_id()),
1159+
chunk.get_row().file_size(),
1160+
Some(chunk.get_id()),
1161+
))
1162+
}
1163+
}
1164+
}
1165+
}
1166+
1167+
files
1168+
}
1169+
1170+
pub fn index_snapshots(&self) -> &Vec<IndexSnapshot> {
1171+
&self.schema_snapshot.index_snapshots.indices
1172+
}
1173+
1174+
pub fn planning_meta(&self) -> &PlanningMeta {
1175+
&self.schema_snapshot.index_snapshots
1176+
}
1177+
1178+
pub fn logical_plan(&self) -> &LogicalPlan {
1179+
&self.logical_plan
1180+
}
1181+
}
1182+
1183+
impl SerializedPlan {
1184+
pub async fn try_new(
1185+
plan: LogicalPlan,
1186+
index_snapshots: PlanningMeta,
1187+
trace_obj: Option<String>,
1188+
) -> Result<Self, CubeError> {
1189+
let serialized_logical_plan =
1190+
datafusion_proto::bytes::logical_plan_to_bytes_with_extension_codec(
1191+
&plan,
1192+
&CubeExtensionCodec {
1193+
worker_context: None,
1194+
},
1195+
)?;
1196+
Ok(SerializedPlan {
1197+
logical_plan: Arc::new(serialized_logical_plan.to_vec()),
1198+
schema_snapshot: Arc::new(SchemaSnapshot { index_snapshots }),
1199+
partition_ids_to_execute: Vec::new(),
1200+
inline_table_ids_to_execute: Vec::new(),
1201+
trace_obj,
1202+
})
1203+
}
1204+
1205+
pub fn to_pre_serialized(
1206+
&self,
1207+
remote_to_local_names: HashMap<String, String>,
1208+
chunk_id_to_record_batches: HashMap<u64, Vec<RecordBatch>>,
1209+
parquet_metadata_cache: Arc<dyn ParquetFileReaderFactory>,
1210+
) -> Result<PreSerializedPlan, CubeError> {
1211+
let plan = self.logical_plan(
1212+
remote_to_local_names,
1213+
chunk_id_to_record_batches,
1214+
parquet_metadata_cache,
1215+
)?;
1216+
Ok(PreSerializedPlan {
1217+
logical_plan: plan,
1218+
schema_snapshot: self.schema_snapshot.clone(),
1219+
partition_ids_to_execute: self.partition_ids_to_execute.clone(),
1220+
inline_table_ids_to_execute: self.inline_table_ids_to_execute.clone(),
1221+
trace_obj: self.trace_obj.clone(),
1222+
})
1223+
}
1224+
10961225
pub fn logical_plan(
10971226
&self,
10981227
remote_to_local_names: HashMap<String, String>,

0 commit comments

Comments
 (0)