Skip to content

Commit 92cae7a

Browse files
authored
feat(cubestore): Tracking data amount using in processing (#6887)
1 parent 68c8ed6 commit 92cae7a

File tree

14 files changed

+482
-50
lines changed

14 files changed

+482
-50
lines changed

rust/cubestore/cubestore/src/cluster/mod.rs

Lines changed: 102 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@ pub mod transport;
44
#[cfg(not(target_os = "windows"))]
55
pub mod worker_pool;
66

7+
pub mod rate_limiter;
8+
79
#[cfg(not(target_os = "windows"))]
810
use crate::cluster::worker_pool::{worker_main, MessageProcessor, WorkerPool};
911

1012
use crate::ack_error;
1113
use crate::cluster::message::NetworkMessage;
14+
use crate::cluster::rate_limiter::{ProcessRateLimiter, TaskType};
1215
use crate::cluster::transport::{ClusterTransport, MetaStoreTransport, WorkerConnection};
1316
use crate::config::injection::{DIService, Injector};
1417
use crate::config::{is_router, WorkerServices};
@@ -28,6 +31,7 @@ use crate::metastore::{
2831
};
2932
use crate::queryplanner::query_executor::{QueryExecutor, SerializedRecordBatchStream};
3033
use crate::queryplanner::serialized_plan::SerializedPlan;
34+
use crate::queryplanner::trace_data_loaded::DataLoadedSize;
3135
use crate::remotefs::RemoteFs;
3236
use crate::store::compaction::CompactionService;
3337
use crate::store::ChunkDataStore;
@@ -183,7 +187,7 @@ pub struct ClusterImpl {
183187
Arc<
184188
WorkerPool<
185189
WorkerMessage,
186-
(SchemaRef, Vec<SerializedRecordBatchStream>),
190+
(SchemaRef, Vec<SerializedRecordBatchStream>, usize),
187191
WorkerProcessor,
188192
>,
189193
>,
@@ -195,6 +199,7 @@ pub struct ClusterImpl {
195199
close_worker_socket_tx: watch::Sender<bool>,
196200
close_worker_socket_rx: RwLock<watch::Receiver<bool>>,
197201
tracing_helper: Arc<dyn TracingHelper>,
202+
process_rate_limiter: Arc<dyn ProcessRateLimiter>,
198203
}
199204

200205
crate::di_service!(ClusterImpl, [Cluster]);
@@ -213,13 +218,13 @@ pub struct WorkerProcessor;
213218

214219
#[cfg(not(target_os = "windows"))]
215220
#[async_trait]
216-
impl MessageProcessor<WorkerMessage, (SchemaRef, Vec<SerializedRecordBatchStream>)>
221+
impl MessageProcessor<WorkerMessage, (SchemaRef, Vec<SerializedRecordBatchStream>, usize)>
217222
for WorkerProcessor
218223
{
219224
async fn process(
220225
services: &WorkerServices,
221226
args: WorkerMessage,
222-
) -> Result<(SchemaRef, Vec<SerializedRecordBatchStream>), CubeError> {
227+
) -> Result<(SchemaRef, Vec<SerializedRecordBatchStream>, usize), CubeError> {
223228
match args {
224229
WorkerMessage::Select(
225230
plan_node,
@@ -256,9 +261,9 @@ impl MessageProcessor<WorkerMessage, (SchemaRef, Vec<SerializedRecordBatchStream
256261
"Running select in worker completed ({:?})",
257262
time.elapsed().unwrap()
258263
);
259-
let (schema, records) = res?;
264+
let (schema, records, data_loaded_size) = res?;
260265
let records = SerializedRecordBatchStream::write(schema.as_ref(), records)?;
261-
Ok((schema, records))
266+
Ok((schema, records, data_loaded_size))
262267
};
263268
let span = trace_id_and_span_id.map(|(t, s)| {
264269
tracing::info_span!(
@@ -281,7 +286,11 @@ impl MessageProcessor<WorkerMessage, (SchemaRef, Vec<SerializedRecordBatchStream
281286
#[ctor::ctor]
282287
fn proc_handler() {
283288
crate::util::respawn::register_handler(
284-
worker_main::<WorkerMessage, (SchemaRef, Vec<SerializedRecordBatchStream>), WorkerProcessor>,
289+
worker_main::<
290+
WorkerMessage,
291+
(SchemaRef, Vec<SerializedRecordBatchStream>, usize),
292+
WorkerProcessor,
293+
>,
285294
);
286295
}
287296

@@ -291,6 +300,7 @@ struct JobRunner {
291300
chunk_store: Arc<dyn ChunkDataStore>,
292301
compaction_service: Arc<dyn CompactionService>,
293302
import_service: Arc<dyn ImportService>,
303+
process_rate_limiter: Arc<dyn ProcessRateLimiter>,
294304
server_name: String,
295305
notify: Arc<Notify>,
296306
stop_token: CancellationToken,
@@ -903,8 +913,20 @@ impl JobRunner {
903913
if let RowKey::Table(TableId::Partitions, partition_id) = job.row_reference() {
904914
let compaction_service = self.compaction_service.clone();
905915
let partition_id = *partition_id;
916+
let process_rate_limiter = self.process_rate_limiter.clone();
917+
let timeout = Some(Duration::from_secs(self.config_obj.import_job_timeout()));
906918
Ok(cube_ext::spawn(async move {
907-
compaction_service.compact(partition_id).await
919+
process_rate_limiter
920+
.wait_for_allow(TaskType::Job, timeout)
921+
.await?; //TODO config, may be same ad orphaned timeout
922+
let data_loaded_size = DataLoadedSize::new();
923+
let res = compaction_service
924+
.compact(partition_id, data_loaded_size.clone())
925+
.await;
926+
process_rate_limiter
927+
.commit_task_usage(TaskType::Job, data_loaded_size.get() as i64)
928+
.await;
929+
res
908930
}))
909931
} else {
910932
Self::fail_job_row_key(job)
@@ -984,11 +1006,30 @@ impl JobRunner {
9841006
let table_id = *table_id;
9851007
let import_service = self.import_service.clone();
9861008
let location = location.to_string();
1009+
let process_rate_limiter = self.process_rate_limiter.clone();
1010+
let timeout = Some(Duration::from_secs(self.config_obj.import_job_timeout()));
9871011
Ok(cube_ext::spawn(async move {
988-
import_service
1012+
let is_streaming = Table::is_stream_location(&location);
1013+
let data_loaded_size = if is_streaming {
1014+
None
1015+
} else {
1016+
Some(DataLoadedSize::new())
1017+
};
1018+
if !is_streaming {
1019+
process_rate_limiter
1020+
.wait_for_allow(TaskType::Job, timeout)
1021+
.await?; //TODO config, may be same ad orphaned timeout
1022+
}
1023+
let res = import_service
9891024
.clone()
990-
.import_table_part(table_id, &location)
991-
.await
1025+
.import_table_part(table_id, &location, data_loaded_size.clone())
1026+
.await;
1027+
if let Some(data_loaded) = &data_loaded_size {
1028+
process_rate_limiter
1029+
.commit_task_usage(TaskType::Job, data_loaded.get() as i64)
1030+
.await;
1031+
}
1032+
res
9921033
}))
9931034
} else {
9941035
Self::fail_job_row_key(job)
@@ -998,8 +1039,20 @@ impl JobRunner {
9981039
if let RowKey::Table(TableId::Chunks, chunk_id) = job.row_reference() {
9991040
let chunk_store = self.chunk_store.clone();
10001041
let chunk_id = *chunk_id;
1042+
let process_rate_limiter = self.process_rate_limiter.clone();
1043+
let timeout = Some(Duration::from_secs(self.config_obj.import_job_timeout()));
10011044
Ok(cube_ext::spawn(async move {
1002-
chunk_store.repartition_chunk(chunk_id).await
1045+
process_rate_limiter
1046+
.wait_for_allow(TaskType::Job, timeout)
1047+
.await?; //TODO config, may be same ad orphaned timeout
1048+
let data_loaded_size = DataLoadedSize::new();
1049+
let res = chunk_store
1050+
.repartition_chunk(chunk_id, data_loaded_size.clone())
1051+
.await;
1052+
process_rate_limiter
1053+
.commit_task_usage(TaskType::Job, data_loaded_size.get() as i64)
1054+
.await;
1055+
res
10031056
}))
10041057
} else {
10051058
Self::fail_job_row_key(job)
@@ -1030,6 +1083,7 @@ impl ClusterImpl {
10301083
meta_store_sender: Sender<MetaStoreEvent>,
10311084
cluster_transport: Arc<dyn ClusterTransport>,
10321085
tracing_helper: Arc<dyn TracingHelper>,
1086+
process_rate_limiter: Arc<dyn ProcessRateLimiter>,
10331087
) -> Arc<ClusterImpl> {
10341088
let (close_worker_socket_tx, close_worker_socket_rx) = watch::channel(false);
10351089
Arc::new_cyclic(|this| ClusterImpl {
@@ -1052,6 +1106,7 @@ impl ClusterImpl {
10521106
close_worker_socket_tx,
10531107
close_worker_socket_rx: RwLock::new(close_worker_socket_rx),
10541108
tracing_helper,
1109+
process_rate_limiter,
10551110
})
10561111
}
10571112

@@ -1100,6 +1155,7 @@ impl ClusterImpl {
11001155
chunk_store: self.injector.upgrade().unwrap().get_service_typed().await,
11011156
compaction_service: self.injector.upgrade().unwrap().get_service_typed().await,
11021157
import_service: self.injector.upgrade().unwrap().get_service_typed().await,
1158+
process_rate_limiter: self.process_rate_limiter.clone(),
11031159
server_name: self.server_name.clone(),
11041160
notify: if is_long_running {
11051161
self.long_running_job_notify.clone()
@@ -1113,6 +1169,10 @@ impl ClusterImpl {
11131169
job_runner.processing_loop().await;
11141170
}));
11151171
}
1172+
let process_rate_limiter = self.process_rate_limiter.clone();
1173+
futures.push(cube_ext::spawn(async move {
1174+
process_rate_limiter.wait_processing_loop().await;
1175+
}));
11161176

11171177
let stop_token = self.stop_token.clone();
11181178
let long_running_job_notify = self.long_running_job_notify.clone();
@@ -1147,6 +1207,8 @@ impl ClusterImpl {
11471207
pool.stop_workers().await?;
11481208
}
11491209

1210+
self.process_rate_limiter.stop_processing_loops();
1211+
11501212
self.close_worker_socket_tx.send(true)?;
11511213
Ok(())
11521214
}
@@ -1313,11 +1375,37 @@ impl ClusterImpl {
13131375
.await
13141376
}
13151377

1316-
#[instrument(level = "trace", skip(self, plan_node))]
13171378
async fn run_local_select_worker(
13181379
&self,
13191380
plan_node: SerializedPlan,
13201381
) -> Result<(SchemaRef, Vec<SerializedRecordBatchStream>), CubeError> {
1382+
self.process_rate_limiter
1383+
.wait_for_allow(
1384+
TaskType::Job,
1385+
Some(Duration::from_secs(self.config_obj.query_timeout())),
1386+
)
1387+
.await?;
1388+
let res = self.run_local_select_worker_impl(plan_node).await;
1389+
match res {
1390+
Ok((schema, records, data_loaded_size)) => {
1391+
self.process_rate_limiter
1392+
.commit_task_usage(TaskType::Select, data_loaded_size as i64)
1393+
.await;
1394+
Ok((schema, records))
1395+
}
1396+
Err(e) => {
1397+
self.process_rate_limiter
1398+
.commit_task_usage(TaskType::Select, 0)
1399+
.await;
1400+
Err(e)
1401+
}
1402+
}
1403+
}
1404+
#[instrument(level = "trace", skip(self, plan_node))]
1405+
async fn run_local_select_worker_impl(
1406+
&self,
1407+
plan_node: SerializedPlan,
1408+
) -> Result<(SchemaRef, Vec<SerializedRecordBatchStream>, usize), CubeError> {
13211409
let start = SystemTime::now();
13221410
debug!("Running select");
13231411
let remote_to_local_names = self.warmup_select_worker_files(&plan_node).await?;
@@ -1411,7 +1499,7 @@ impl ClusterImpl {
14111499

14121500
if res.is_none() {
14131501
// TODO optimize for no double conversion
1414-
let (schema, records) = self
1502+
let (schema, records, data_loaded_size) = self
14151503
.query_executor
14161504
.execute_worker_plan(
14171505
plan_node.clone(),
@@ -1420,7 +1508,7 @@ impl ClusterImpl {
14201508
)
14211509
.await?;
14221510
let records = SerializedRecordBatchStream::write(schema.as_ref(), records);
1423-
res = Some(Ok((schema, records?)))
1511+
res = Some(Ok((schema, records?, data_loaded_size)))
14241512
}
14251513

14261514
info!("Running select completed ({:?})", start.elapsed()?);
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
use crate::config::injection::DIService;
2+
use crate::CubeError;
3+
use async_trait::async_trait;
4+
use std::cmp::PartialEq;
5+
use std::hash::Hash;
6+
use std::sync::Arc;
7+
use std::time::Duration;
8+
9+
#[derive(Eq, PartialEq, Hash)]
10+
pub enum TaskType {
11+
Select,
12+
Job,
13+
}
14+
15+
#[async_trait]
16+
pub trait ProcessRateLimiter: DIService + Send + Sync {
17+
async fn commit_task_usage(&self, task_type: TaskType, size: i64);
18+
19+
async fn current_budget(&self, task_type: TaskType) -> Option<i64>;
20+
21+
async fn current_budget_f64(&self, task_type: TaskType) -> Option<f64>;
22+
23+
async fn wait_for_allow(
24+
&self,
25+
task_type: TaskType,
26+
timeout: Option<Duration>,
27+
) -> Result<(), CubeError>;
28+
29+
async fn wait_processing_loop(self: Arc<Self>);
30+
31+
async fn pending_size(&self, task_type: TaskType) -> Option<usize>;
32+
33+
fn stop_processing_loops(&self);
34+
}
35+
36+
crate::di_service!(BasicProcessRateLimiter, [ProcessRateLimiter]);
37+
38+
pub struct BasicProcessRateLimiter;
39+
40+
impl BasicProcessRateLimiter {
41+
pub fn new() -> Arc<Self> {
42+
Arc::new(Self {})
43+
}
44+
}
45+
46+
#[async_trait]
47+
impl ProcessRateLimiter for BasicProcessRateLimiter {
48+
async fn commit_task_usage(&self, _task_type: TaskType, _size: i64) {}
49+
50+
async fn current_budget(&self, _task_type: TaskType) -> Option<i64> {
51+
None
52+
}
53+
54+
async fn current_budget_f64(&self, _task_type: TaskType) -> Option<f64> {
55+
None
56+
}
57+
58+
async fn wait_for_allow(
59+
&self,
60+
_task_type: TaskType,
61+
_timeout: Option<Duration>,
62+
) -> Result<(), CubeError> {
63+
Ok(())
64+
}
65+
66+
async fn wait_processing_loop(self: Arc<Self>) {}
67+
68+
async fn pending_size(&self, _task_type: TaskType) -> Option<usize> {
69+
None
70+
}
71+
72+
fn stop_processing_loops(&self) {}
73+
}

rust/cubestore/cubestore/src/config/mod.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
#![allow(deprecated)] // 'vtable' and 'TraitObject' are deprecated.
1+
#![allow(deprecated)] // 'vtable' and 'TraitObject' are deprecated.confi
22
pub mod injection;
33
pub mod processing_loop;
44

55
use crate::cachestore::{
66
CacheStore, CacheStoreSchedulerImpl, ClusterCacheStoreClient, LazyRocksCacheStore,
77
};
8+
use crate::cluster::rate_limiter::{BasicProcessRateLimiter, ProcessRateLimiter};
89
use crate::cluster::transport::{
910
ClusterTransport, ClusterTransportImpl, MetaStoreTransport, MetaStoreTransportImpl,
1011
};
@@ -1755,6 +1756,12 @@ impl Config {
17551756
})
17561757
.await;
17571758

1759+
self.injector
1760+
.register_typed::<dyn ProcessRateLimiter, _, _, _>(async move |_| {
1761+
BasicProcessRateLimiter::new()
1762+
})
1763+
.await;
1764+
17581765
let cluster_meta_store_sender = metastore_event_sender_to_move.clone();
17591766

17601767
self.injector
@@ -1774,6 +1781,7 @@ impl Config {
17741781
cluster_meta_store_sender,
17751782
i.get_service_typed().await,
17761783
i.get_service_typed().await,
1784+
i.get_service_typed().await,
17771785
)
17781786
})
17791787
.await;

0 commit comments

Comments
 (0)