Skip to content

Commit 82651ea

Browse files
authored
chore(cubestore): Memory usage tracing (#6663)
1 parent 1a494a2 commit 82651ea

File tree

11 files changed

+361
-15
lines changed

11 files changed

+361
-15
lines changed

rust/cubestore/cubestore/src/cluster/worker_pool.rs

Lines changed: 53 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::time::Duration;
88
use async_trait::async_trait;
99
use deadqueue::unlimited;
1010
use futures::future::join_all;
11+
use futures::future::BoxFuture;
1112
use ipc_channel::ipc;
1213
use ipc_channel::ipc::{IpcReceiver, IpcSender};
1314
use log::error;
@@ -334,10 +335,11 @@ where
334335
let runtime = tokio_builder.build().unwrap();
335336
worker_setup(&runtime);
336337
runtime.block_on(async move {
337-
let config = Config::default();
338-
config.configure_injector().await;
338+
let config = get_worker_config().await;
339339
let services = config.worker_services().await;
340340

341+
spawn_background_processes(config.clone());
342+
341343
loop {
342344
let res = rx.recv();
343345
match res {
@@ -369,15 +371,61 @@ fn worker_setup(runtime: &Runtime) {
369371
}
370372
}
371373

374+
async fn get_worker_config() -> Config {
375+
let custom_fn = SELECT_WORKER_CONFIGURE_FN.read().unwrap();
376+
if let Some(func) = custom_fn.as_ref() {
377+
func().await
378+
} else {
379+
let config = Config::default();
380+
config.configure_injector().await;
381+
config
382+
}
383+
}
384+
385+
fn spawn_background_processes(config: Config) {
386+
let custom_fn = SELECT_WORKER_SPAWN_BACKGROUND_FN.read().unwrap();
387+
if let Some(func) = custom_fn.as_ref() {
388+
func(config);
389+
}
390+
}
391+
372392
lazy_static! {
373393
static ref SELECT_WORKER_SETUP: std::sync::RwLock<Option<Box<dyn Fn(&Runtime) + Send + Sync>>> =
374394
std::sync::RwLock::new(None);
375395
}
376396

397+
lazy_static! {
398+
static ref SELECT_WORKER_CONFIGURE_FN: std::sync::RwLock<Option<Box<dyn Fn() -> BoxFuture<'static, Config> + Send + Sync>>> =
399+
std::sync::RwLock::new(None);
400+
}
401+
402+
lazy_static! {
403+
static ref SELECT_WORKER_SPAWN_BACKGROUND_FN: std::sync::RwLock<Option<Box<dyn Fn(Config) + Send + Sync>>> =
404+
std::sync::RwLock::new(None);
405+
}
406+
377407
pub fn register_select_worker_setup(f: fn(&Runtime)) {
378-
let mut startup = SELECT_WORKER_SETUP.write().unwrap();
379-
assert!(startup.is_none(), "select worker setup already registered");
380-
*startup = Some(Box::new(f));
408+
let mut setup = SELECT_WORKER_SETUP.write().unwrap();
409+
assert!(setup.is_none(), "select worker setup already registered");
410+
*setup = Some(Box::new(f));
411+
}
412+
413+
pub fn register_select_worker_configure_fn(f: fn() -> BoxFuture<'static, Config>) {
414+
let mut func = SELECT_WORKER_CONFIGURE_FN.write().unwrap();
415+
assert!(
416+
func.is_none(),
417+
"select worker configure function already registered"
418+
);
419+
*func = Some(Box::new(f));
420+
}
421+
422+
pub fn register_select_worker_spawn_background_fn(f: fn(Config)) {
423+
let mut func = SELECT_WORKER_SPAWN_BACKGROUND_FN.write().unwrap();
424+
assert!(
425+
func.is_none(),
426+
"select worker spawn background function already registered"
427+
);
428+
*func = Some(Box::new(f));
381429
}
382430

383431
#[cfg(test)]

rust/cubestore/cubestore/src/config/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use crate::telemetry::tracing::{TracingHelper, TracingHelperImpl};
3737
use crate::telemetry::{
3838
start_agent_event_loop, start_track_event_loop, stop_agent_event_loop, stop_track_event_loop,
3939
};
40+
use crate::util::memory::{MemoryHandler, MemoryHandlerImpl};
4041
use crate::CubeError;
4142
use datafusion::cube_ext;
4243
use datafusion::physical_plan::parquet::{LruParquetMetadataCache, NoopParquetMetadataCache};
@@ -1761,7 +1762,7 @@ impl Config {
17611762

17621763
self.injector
17631764
.register_typed_with_default::<dyn QueryExecutor, _, _, _>(async move |i| {
1764-
QueryExecutorImpl::new(i.get_service_typed().await)
1765+
QueryExecutorImpl::new(i.get_service_typed().await, i.get_service_typed().await)
17651766
})
17661767
.await;
17671768

@@ -1781,6 +1782,10 @@ impl Config {
17811782
})
17821783
.await;
17831784

1785+
self.injector
1786+
.register_typed::<dyn MemoryHandler, _, _, _>(async move |_| MemoryHandlerImpl::new())
1787+
.await;
1788+
17841789
let query_cache_to_move = query_cache.clone();
17851790
self.injector
17861791
.register_typed::<dyn SqlService, _, _, _>(async move |i| {
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
use crate::util::memory::MemoryHandler;
2+
use arrow::datatypes::SchemaRef;
3+
use arrow::error::Result as ArrowResult;
4+
use arrow::record_batch::RecordBatch;
5+
use async_trait::async_trait;
6+
use datafusion::error::DataFusionError;
7+
use datafusion::physical_plan::{
8+
ExecutionPlan, OptimizerHints, Partitioning, RecordBatchStream, SendableRecordBatchStream,
9+
};
10+
use flatbuffers::bitflags::_core::any::Any;
11+
use futures::stream::Stream;
12+
use futures::StreamExt;
13+
use std::pin::Pin;
14+
use std::sync::Arc;
15+
use std::task::{Context, Poll};
16+
17+
#[derive(Debug)]
18+
pub struct CheckMemoryExec {
19+
pub input: Arc<dyn ExecutionPlan>,
20+
pub memory_handler: Arc<dyn MemoryHandler>,
21+
}
22+
23+
impl CheckMemoryExec {
24+
pub fn new(input: Arc<dyn ExecutionPlan>, memory_handler: Arc<dyn MemoryHandler>) -> Self {
25+
Self {
26+
input,
27+
memory_handler,
28+
}
29+
}
30+
}
31+
32+
#[async_trait]
33+
impl ExecutionPlan for CheckMemoryExec {
34+
fn as_any(&self) -> &dyn Any {
35+
self
36+
}
37+
38+
fn schema(&self) -> SchemaRef {
39+
self.input.schema()
40+
}
41+
42+
fn output_partitioning(&self) -> Partitioning {
43+
self.input.output_partitioning()
44+
}
45+
46+
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
47+
vec![self.input.clone()]
48+
}
49+
50+
fn with_new_children(
51+
&self,
52+
children: Vec<Arc<dyn ExecutionPlan>>,
53+
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
54+
assert_eq!(children.len(), 1);
55+
Ok(Arc::new(CheckMemoryExec {
56+
input: children.into_iter().next().unwrap(),
57+
memory_handler: self.memory_handler.clone(),
58+
}))
59+
}
60+
61+
fn output_hints(&self) -> OptimizerHints {
62+
self.input.output_hints()
63+
}
64+
65+
async fn execute(
66+
&self,
67+
partition: usize,
68+
) -> Result<SendableRecordBatchStream, DataFusionError> {
69+
if partition >= self.input.output_partitioning().partition_count() {
70+
return Err(DataFusionError::Internal(format!(
71+
"ExecutionPlanExec invalid partition {}",
72+
partition
73+
)));
74+
}
75+
76+
let input = self.input.execute(partition).await?;
77+
Ok(Box::pin(CheckMemoryStream {
78+
schema: self.schema(),
79+
memory_handler: self.memory_handler.clone(),
80+
input,
81+
}))
82+
}
83+
}
84+
85+
struct CheckMemoryStream {
86+
schema: SchemaRef,
87+
memory_handler: Arc<dyn MemoryHandler>,
88+
input: SendableRecordBatchStream,
89+
}
90+
91+
impl Stream for CheckMemoryStream {
92+
type Item = ArrowResult<RecordBatch>;
93+
94+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
95+
self.input.poll_next_unpin(cx).map(|x| match x {
96+
Some(Ok(batch)) => {
97+
let r = self
98+
.memory_handler
99+
.check_memory()
100+
.map(|_| batch)
101+
.map_err(|e| e.into());
102+
Some(r)
103+
}
104+
other => other,
105+
})
106+
}
107+
108+
fn size_hint(&self) -> (usize, Option<usize>) {
109+
// same number of record batches
110+
self.input.size_hint()
111+
}
112+
}
113+
114+
impl RecordBatchStream for CheckMemoryStream {
115+
/// Get the schema
116+
fn schema(&self) -> SchemaRef {
117+
self.schema.clone()
118+
}
119+
}

rust/cubestore/cubestore/src/queryplanner/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ pub mod panic;
44
mod partition_filter;
55
mod planning;
66
pub use planning::PlanningMeta;
7+
mod check_memory;
78
pub mod pretty_printers;
89
pub mod query_executor;
910
pub mod serialized_plan;
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
use crate::queryplanner::check_memory::CheckMemoryExec;
2+
use crate::queryplanner::query_executor::ClusterSendExec;
3+
use crate::util::memory::MemoryHandler;
4+
use datafusion::error::DataFusionError;
5+
use datafusion::physical_plan::memory::MemoryExec;
6+
use datafusion::physical_plan::parquet::ParquetExec;
7+
use datafusion::physical_plan::ExecutionPlan;
8+
use std::sync::Arc;
9+
10+
/// Add `CheckMemoryExec` behind some nodes.
11+
pub fn add_check_memory_exec(
12+
p: Arc<dyn ExecutionPlan>,
13+
mem_handler: Arc<dyn MemoryHandler>,
14+
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
15+
let p_any = p.as_any();
16+
if p_any.is::<ParquetExec>() || p_any.is::<MemoryExec>() || p_any.is::<ClusterSendExec>() {
17+
let memory_check = Arc::new(CheckMemoryExec::new(p, mem_handler.clone()));
18+
Ok(memory_check)
19+
} else {
20+
Ok(p)
21+
}
22+
}

rust/cubestore/cubestore/src/queryplanner/optimizations/mod.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
1+
mod check_memory;
2+
mod distributed_partial_aggregate;
3+
mod prefer_inplace_aggregates;
4+
pub mod rewrite_plan;
5+
16
use crate::cluster::Cluster;
27
use crate::queryplanner::optimizations::distributed_partial_aggregate::{
38
add_limit_to_workers, push_aggregate_to_workers,
49
};
510
use crate::queryplanner::optimizations::prefer_inplace_aggregates::try_switch_to_inplace_aggregates;
611
use crate::queryplanner::planning::CubeExtensionPlanner;
712
use crate::queryplanner::serialized_plan::SerializedPlan;
13+
use crate::util::memory::MemoryHandler;
14+
use check_memory::add_check_memory_exec;
815
use datafusion::error::DataFusionError;
916
use datafusion::execution::context::{ExecutionContextState, QueryPlanner};
1017
use datafusion::logical_plan::LogicalPlan;
@@ -13,30 +20,33 @@ use datafusion::physical_plan::{ExecutionPlan, PhysicalPlanner};
1320
use rewrite_plan::rewrite_physical_plan;
1421
use std::sync::Arc;
1522

16-
mod distributed_partial_aggregate;
17-
mod prefer_inplace_aggregates;
18-
pub mod rewrite_plan;
19-
2023
pub struct CubeQueryPlanner {
2124
cluster: Option<Arc<dyn Cluster>>,
2225
serialized_plan: Arc<SerializedPlan>,
26+
memory_handler: Arc<dyn MemoryHandler>,
2327
}
2428

2529
impl CubeQueryPlanner {
2630
pub fn new_on_router(
2731
cluster: Arc<dyn Cluster>,
2832
serialized_plan: Arc<SerializedPlan>,
33+
memory_handler: Arc<dyn MemoryHandler>,
2934
) -> CubeQueryPlanner {
3035
CubeQueryPlanner {
3136
cluster: Some(cluster),
3237
serialized_plan,
38+
memory_handler,
3339
}
3440
}
3541

36-
pub fn new_on_worker(serialized_plan: Arc<SerializedPlan>) -> CubeQueryPlanner {
42+
pub fn new_on_worker(
43+
serialized_plan: Arc<SerializedPlan>,
44+
memory_handler: Arc<dyn MemoryHandler>,
45+
) -> CubeQueryPlanner {
3746
CubeQueryPlanner {
3847
serialized_plan,
3948
cluster: None,
49+
memory_handler,
4050
}
4151
}
4252
}
@@ -54,14 +64,18 @@ impl QueryPlanner for CubeQueryPlanner {
5464
})])
5565
.create_physical_plan(logical_plan, ctx_state)?;
5666
// TODO: assert there is only a single ClusterSendExec in the plan.
57-
finalize_physical_plan(p)
67+
finalize_physical_plan(p, self.memory_handler.clone())
5868
}
5969
}
6070

6171
fn finalize_physical_plan(
6272
p: Arc<dyn ExecutionPlan>,
73+
memory_handler: Arc<dyn MemoryHandler>,
6374
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
6475
let p = rewrite_physical_plan(p.as_ref(), &mut |p| try_switch_to_inplace_aggregates(p))?;
6576
let p = rewrite_physical_plan(p.as_ref(), &mut |p| push_aggregate_to_workers(p))?;
77+
let p = rewrite_physical_plan(p.as_ref(), &mut |p| {
78+
add_check_memory_exec(p, memory_handler.clone())
79+
})?;
6680
rewrite_physical_plan(p.as_ref(), &mut |p| add_limit_to_workers(p))
6781
}

rust/cubestore/cubestore/src/queryplanner/pretty_printers.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use datafusion::physical_plan::sort::SortExec;
1919
use datafusion::physical_plan::ExecutionPlan;
2020
use itertools::{repeat_n, Itertools};
2121

22+
use crate::queryplanner::check_memory::CheckMemoryExec;
2223
use crate::queryplanner::filter_by_key_range::FilterByKeyRangeExec;
2324
use crate::queryplanner::panic::{PanicWorkerExec, PanicWorkerNode};
2425
use crate::queryplanner::planning::{ClusterSendNode, Snapshot, WorkerExec};
@@ -50,6 +51,7 @@ pub struct PPOptions {
5051
pub show_aggregations: bool,
5152
// Applies only to physical plan.
5253
pub show_output_hints: bool,
54+
pub show_check_memory_nodes: bool,
5355
}
5456

5557
pub fn pp_phys_plan(p: &dyn ExecutionPlan) -> String {
@@ -279,6 +281,13 @@ fn pp_sort_columns(first_agg: usize, cs: &[SortColumn]) -> String {
279281
}
280282

281283
fn pp_phys_plan_indented(p: &dyn ExecutionPlan, indent: usize, o: &PPOptions, out: &mut String) {
284+
if p.as_any().is::<CheckMemoryExec>() && !o.show_check_memory_nodes {
285+
//We don't show CheckMemoryExec in plan by default
286+
if let Some(child) = p.children().first() {
287+
pp_phys_plan_indented(child.as_ref(), indent, o, out)
288+
}
289+
return;
290+
}
282291
pp_instance(p, indent, o, out);
283292
if p.as_any().is::<ClusterSendExec>() {
284293
// Do not show children of ClusterSend. This is a hack to avoid rewriting all tests.

0 commit comments

Comments
 (0)