Skip to content

Commit f56789c

Browse files
authored
chore(cubestore): Fix for timeout in worker services (#7359)
1 parent a788954 commit f56789c

File tree

2 files changed

+17
-6
lines changed

2 files changed

+17
-6
lines changed

rust/cubestore/cubestore/src/cluster/worker_pool.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,7 @@ impl<C: Configurator, P: WorkerProcessing, S: ServicesTransport> WorkerProcess<C
342342
processor: PhantomData::<(C, P, S)>::default(),
343343
services_sender: service_request_tx,
344344
services_reciever: service_response_rx,
345+
timeout: self.timeout.clone(),
345346
},
346347
&[title],
347348
&envs,
@@ -365,6 +366,7 @@ pub struct WorkerProcessArgs<C: Configurator, P: WorkerProcessing, S: ServicesTr
365366
processor: PhantomData<(C, P, S)>,
366367
services_sender: IpcSender<S::TransportRequest>,
367368
services_reciever: IpcReceiver<S::TransportResponse>,
369+
timeout: Duration,
368370
}
369371

370372
pub fn worker_main<C, P, S>(a: WorkerProcessArgs<C, P, S>) -> i32
@@ -373,8 +375,13 @@ where
373375
P: WorkerProcessing<Config = C::Config>,
374376
S: ServicesTransport<Request = C::ServicesRequest, Response = C::ServicesResponse>,
375377
{
376-
let (rx, tx, services_sender, services_reciever) =
377-
(a.args, a.results, a.services_sender, a.services_reciever);
378+
let (rx, tx, services_sender, services_reciever, timeout) = (
379+
a.args,
380+
a.results,
381+
a.services_sender,
382+
a.services_reciever,
383+
a.timeout,
384+
);
378385
let mut tokio_builder = Builder::new_multi_thread();
379386
tokio_builder.enable_all();
380387
tokio_builder.thread_name("cubestore-worker");
@@ -386,7 +393,7 @@ where
386393
let runtime = tokio_builder.build().unwrap();
387394
worker_setup(&runtime);
388395
runtime.block_on(async move {
389-
let services_client = S::connect(services_sender, services_reciever);
396+
let services_client = S::connect(services_sender, services_reciever, timeout);
390397
let config = match C::configure(services_client).await {
391398
Err(e) => {
392399
error!(

rust/cubestore/cubestore/src/cluster/worker_services.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,9 @@ pub trait ServicesTransport {
8080
fn connect(
8181
sender: IpcSender<Self::TransportRequest>,
8282
reciever: IpcReceiver<Self::TransportResponse>,
83+
timeout: Duration,
8384
) -> Arc<Self::Client> {
84-
Self::Client::connect(sender, reciever)
85+
Self::Client::connect(sender, reciever, timeout)
8586
}
8687
}
8788

@@ -131,6 +132,7 @@ pub trait ServicesClient: Callable {
131132
fn connect(
132133
sender: IpcSender<Self::TransportRequest>,
133134
reciever: IpcReceiver<Self::TransportResponse>,
135+
timeout: Duration,
134136
) -> Arc<Self>;
135137

136138
fn stop(&self);
@@ -158,9 +160,10 @@ impl<P: Callable> ServicesClient for ServicesClientImpl<P> {
158160
fn connect(
159161
sender: IpcSender<Self::TransportRequest>,
160162
reciever: IpcReceiver<Self::TransportResponse>,
163+
timeout: Duration,
161164
) -> Arc<Self> {
162165
let queue = Arc::new(unlimited::Queue::new());
163-
let handle = Self::processing_loop(sender, reciever, queue.clone());
166+
let handle = Self::processing_loop(sender, reciever, queue.clone(), timeout);
164167
Arc::new(Self {
165168
processor: PhantomData,
166169
handle,
@@ -197,6 +200,7 @@ impl<P: Callable> ServicesClientImpl<P> {
197200
ServicesClientMessage<<Self as Callable>::Request, <Self as Callable>::Response>,
198201
>,
199202
>,
203+
timeout: Duration,
200204
) -> JoinHandle<()> {
201205
let (message_broadcast_tx, _) = broadcast::channel(10000);
202206

@@ -252,7 +256,7 @@ impl<P: Callable> ServicesClientImpl<P> {
252256
cube_ext::spawn(async move {
253257
loop {
254258
let broadcast_message = tokio::select! {
255-
_ = tokio::time::sleep(Duration::from_secs(5)) => { //TODO! config
259+
_ = tokio::time::sleep(timeout) => {
256260
Err(CubeError::internal(format!(
257261
"Worker service timeout for message id: {}",
258262
message_id

0 commit comments

Comments
 (0)