Skip to content

Commit ae6dd84

Browse files
authored
chore(cubestore): Small refactoring of workers startup (#7416)
1 parent 9cc7ad8 commit ae6dd84

File tree

3 files changed

+30
-21
lines changed

3 files changed

+30
-21
lines changed

rust/cubestore/cubestore/src/cluster/mod.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ use std::sync::{Arc, Mutex};
7070
use std::time::Duration;
7171
use std::time::SystemTime;
7272
use tokio::net::{TcpListener, TcpStream};
73+
use tokio::runtime::Runtime;
7374
use tokio::sync::broadcast::{Receiver, Sender};
7475
use tokio::sync::{oneshot, watch, Notify, RwLock};
7576
use tokio::time::timeout;
@@ -225,6 +226,14 @@ impl Configurator for WorkerConfigurator {
225226
type Config = Config;
226227
type ServicesRequest = ();
227228
type ServicesResponse = ();
229+
230+
fn setup(runtime: &Runtime) {
231+
let startup = SELECT_WORKER_SETUP.read().unwrap();
232+
if startup.is_some() {
233+
startup.as_ref().unwrap()(runtime);
234+
}
235+
}
236+
228237
async fn configure(
229238
_services_client: Arc<
230239
dyn Callable<Request = Self::ServicesRequest, Response = Self::ServicesResponse>,
@@ -349,6 +358,17 @@ lazy_static! {
349358
std::sync::RwLock::new(None);
350359
}
351360

361+
lazy_static! {
362+
static ref SELECT_WORKER_SETUP: std::sync::RwLock<Option<Box<dyn Fn(&Runtime) + Send + Sync>>> =
363+
std::sync::RwLock::new(None);
364+
}
365+
366+
pub fn register_select_worker_setup(f: fn(&Runtime)) {
367+
let mut setup = SELECT_WORKER_SETUP.write().unwrap();
368+
assert!(setup.is_none(), "select worker setup already registered");
369+
*setup = Some(Box::new(f));
370+
}
371+
352372
pub fn register_select_worker_configure_fn(f: fn() -> BoxFuture<'static, Config>) {
353373
let mut func = SELECT_WORKER_CONFIGURE_FN.write().unwrap();
354374
assert!(

rust/cubestore/cubestore/src/cluster/worker_pool.rs

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use ipc_channel::ipc::{IpcReceiver, IpcSender};
1313
use log::error;
1414
use serde::de::DeserializeOwned;
1515
use serde::{Deserialize, Serialize};
16-
use tokio::runtime::{Builder, Runtime};
16+
use tokio::runtime::Builder;
1717
use tokio::sync::oneshot::Sender;
1818
use tokio::sync::{oneshot, watch, Mutex, Notify, RwLock};
1919
use tokio_util::sync::CancellationToken;
@@ -391,7 +391,7 @@ where
391391
let stack_size = env_parse("CUBESTORE_SELECT_WORKER_STACK_SIZE", 4 * 1024 * 1024);
392392
tokio_builder.thread_stack_size(stack_size);
393393
let runtime = tokio_builder.build().unwrap();
394-
worker_setup(&runtime);
394+
C::setup(&runtime);
395395
runtime.block_on(async move {
396396
let services_client = S::connect(services_sender, services_reciever, timeout);
397397
let config = match C::configure(services_client).await {
@@ -438,24 +438,6 @@ where
438438
})
439439
}
440440

441-
fn worker_setup(runtime: &Runtime) {
442-
let startup = SELECT_WORKER_SETUP.read().unwrap();
443-
if startup.is_some() {
444-
startup.as_ref().unwrap()(runtime);
445-
}
446-
}
447-
448-
lazy_static! {
449-
static ref SELECT_WORKER_SETUP: std::sync::RwLock<Option<Box<dyn Fn(&Runtime) + Send + Sync>>> =
450-
std::sync::RwLock::new(None);
451-
}
452-
453-
pub fn register_select_worker_setup(f: fn(&Runtime)) {
454-
let mut setup = SELECT_WORKER_SETUP.write().unwrap();
455-
assert!(setup.is_none(), "select worker setup already registered");
456-
*setup = Some(Box::new(f));
457-
}
458-
459441
#[cfg(test)]
460442
mod tests {
461443
use std::sync::Arc;
@@ -466,7 +448,7 @@ mod tests {
466448
use datafusion::logical_plan::ToDFSchema;
467449
use futures_timer::Delay;
468450
use serde::{Deserialize, Serialize};
469-
use tokio::runtime::Builder;
451+
use tokio::runtime::{Builder, Runtime};
470452

471453
use crate::cluster::worker_pool::{worker_main, WorkerPool};
472454
use crate::config::Config;
@@ -506,6 +488,7 @@ mod tests {
506488
type Config = Config;
507489
type ServicesRequest = ();
508490
type ServicesResponse = ();
491+
fn setup(_runtime: &Runtime) {}
509492
async fn configure(
510493
_services_client: Arc<dyn Callable<Request = (), Response = ()>>,
511494
) -> Result<Self::Config, CubeError> {
@@ -708,6 +691,9 @@ mod tests {
708691
type Config = TestConfig;
709692
type ServicesRequest = i64;
710693
type ServicesResponse = bool;
694+
695+
fn setup(_runtime: &Runtime) {}
696+
711697
async fn configure(
712698
services_client: Arc<dyn Callable<Request = i64, Response = bool>>,
713699
) -> Result<Self::Config, CubeError> {

rust/cubestore/cubestore/src/cluster/worker_services.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use std::fmt::Debug;
99
use std::marker::PhantomData;
1010
use std::sync::Arc;
1111
use std::time::Duration;
12+
use tokio::runtime::Runtime;
1213
use tokio::sync::{broadcast, oneshot, RwLock};
1314
use tokio::task::JoinHandle;
1415
use tokio_util::sync::CancellationToken;
@@ -26,6 +27,8 @@ pub trait Configurator: Send + Sync + 'static {
2627
type ServicesRequest: Debug + Serialize + DeserializeOwned + Sync + Send + 'static;
2728
type ServicesResponse: Debug + Serialize + DeserializeOwned + Sync + Send + 'static;
2829

30+
fn setup(runtime: &Runtime);
31+
2932
async fn configure(
3033
services_client: Arc<
3134
dyn Callable<Request = Self::ServicesRequest, Response = Self::ServicesResponse>,

0 commit comments

Comments
 (0)