Skip to content

Commit 2ee5592

Browse files
committed
chore(cubestore): Upgrade DF: Implement PanicWorkerNode
1 parent ddec665 commit 2ee5592

File tree

3 files changed

+33
-12
lines changed

3 files changed

+33
-12
lines changed

rust/cubestore/cubestore/src/queryplanner/panic.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use datafusion::physical_plan::{
1010
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, Partitioning, PlanProperties,
1111
SendableRecordBatchStream,
1212
};
13+
use serde::{Deserialize, Serialize};
1314
use std::any::Any;
1415
use std::cmp::Ordering;
1516
use std::fmt::{Formatter, Pointer};
@@ -25,6 +26,16 @@ impl PanicWorkerNode {
2526
node: Arc::new(self),
2627
})
2728
}
29+
30+
pub fn from_serialized(inputs: &[LogicalPlan], serialized: PanicWorkerSerialized) -> Self {
31+
assert_eq!(0, inputs.len());
32+
let PanicWorkerSerialized {} = serialized;
33+
Self {}
34+
}
35+
36+
pub fn to_serialized(&self) -> PanicWorkerSerialized {
37+
PanicWorkerSerialized {}
38+
}
2839
}
2940

3041
lazy_static! {
@@ -81,6 +92,9 @@ impl UserDefinedLogicalNode for PanicWorkerNode {
8192
}
8293
}
8394

95+
#[derive(Clone, Serialize, Deserialize, Debug)]
96+
pub struct PanicWorkerSerialized {}
97+
8498
#[derive(Debug)]
8599
pub struct PanicWorkerExec {
86100
properties: PlanProperties,

rust/cubestore/cubestore/src/queryplanner/planning.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ use crate::metastore::table::{Table, TablePath};
3838
use crate::metastore::{
3939
AggregateFunction, Chunk, Column, IdRow, Index, IndexType, MetaStore, Partition, Schema,
4040
};
41+
use crate::queryplanner::metadata_cache::NoopParquetMetadataCache;
4142
use crate::queryplanner::optimizations::rewrite_plan::{rewrite_plan, PlanRewriter};
43+
use crate::queryplanner::panic::PanicWorkerSerialized;
4244
use crate::queryplanner::panic::{plan_panic_worker, PanicWorkerNode};
4345
use crate::queryplanner::partition_filter::PartitionFilter;
4446
use crate::queryplanner::providers::InfoSchemaQueryCacheTableProvider;
@@ -50,7 +52,6 @@ use crate::queryplanner::topk::ClusterAggregateTopK;
5052
use crate::queryplanner::{CubeTableLogical, InfoSchemaTableProvider};
5153
use crate::table::{cmp_same_types, Row};
5254
use crate::CubeError;
53-
use crate::queryplanner::metadata_cache::NoopParquetMetadataCache;
5455
use datafusion::common;
5556
use datafusion::common::DFSchemaRef;
5657
use datafusion::datasource::DefaultTableSource;
@@ -1366,6 +1367,7 @@ pub type Snapshots = Vec<Snapshot>;
13661367
#[derive(Clone, Serialize, Deserialize, Debug)]
13671368
pub enum ExtensionNodeSerialized {
13681369
ClusterSend(ClusterSendSerialized),
1370+
PanicWorker(PanicWorkerSerialized),
13691371
}
13701372

13711373
#[derive(Debug, Clone)]

rust/cubestore/cubestore/src/queryplanner/serialized_plan.rs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@ use crate::queryplanner::query_executor::{CubeTable, InlineTableId, InlineTableP
99
use crate::queryplanner::topk::{ClusterAggregateTopK, SortColumn};
1010
use crate::queryplanner::udfs::aggregate_udf_by_kind;
1111
use crate::queryplanner::udfs::{
12-
aggregate_kind_by_name, scalar_udf_by_kind, CubeAggregateUDFKind,
13-
CubeScalarUDFKind,
12+
aggregate_kind_by_name, scalar_udf_by_kind, CubeAggregateUDFKind, CubeScalarUDFKind,
1413
};
1514
use crate::queryplanner::{CubeTableLogical, InfoSchemaTableProvider};
1615
use crate::table::Row;
@@ -1332,23 +1331,29 @@ impl LogicalExtensionCodec for CubeExtensionCodec {
13321331
let serialized = ExtensionNodeSerialized::deserialize(r)
13331332
.map_err(|e| DataFusionError::Execution(format!("try_decode: {}", e)))?;
13341333
Ok(Extension {
1335-
node: Arc::new(match serialized {
1334+
node: match serialized {
13361335
ExtensionNodeSerialized::ClusterSend(serialized) => {
1337-
ClusterSendNode::from_serialized(inputs, serialized)
1336+
Arc::new(ClusterSendNode::from_serialized(inputs, serialized))
13381337
}
1339-
}),
1338+
ExtensionNodeSerialized::PanicWorker(serialized) => {
1339+
Arc::new(PanicWorkerNode::from_serialized(inputs, serialized))
1340+
}
1341+
},
13401342
})
13411343
}
13421344

13431345
fn try_encode(&self, node: &Extension, buf: &mut Vec<u8>) -> datafusion::common::Result<()> {
13441346
use serde::Serialize;
13451347
let mut ser = flexbuffers::FlexbufferSerializer::new();
1346-
let to_serialize =
1347-
if let Some(cluster_send) = node.node.as_any().downcast_ref::<ClusterSendNode>() {
1348-
ExtensionNodeSerialized::ClusterSend(cluster_send.to_serialized())
1349-
} else {
1350-
todo!("{:?}", node)
1351-
};
1348+
let to_serialize = if let Some(cluster_send) =
1349+
node.node.as_any().downcast_ref::<ClusterSendNode>()
1350+
{
1351+
ExtensionNodeSerialized::ClusterSend(cluster_send.to_serialized())
1352+
} else if let Some(panic_worker) = node.node.as_any().downcast_ref::<PanicWorkerNode>() {
1353+
ExtensionNodeSerialized::PanicWorker(panic_worker.to_serialized())
1354+
} else {
1355+
todo!("{:?}", node)
1356+
};
13521357
to_serialize
13531358
.serialize(&mut ser)
13541359
.map_err(|e| DataFusionError::Execution(format!("try_encode: {}", e)))?;

0 commit comments

Comments
 (0)