Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions rust/cubestore/cubestore/src/app_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ use crate::util::metrics::{Counter, Gauge, Histogram};
/// The number of process startups.
pub static STARTUPS: Counter = metrics::counter("cs.startup");

/// Errots on reciving from subprocesses.
pub static WORKER_POOL_ERROR: Counter = metrics::counter("cs.worker_pool.errors");

/// Incoming SQL queries that do data reads.
pub static DATA_QUERIES: Counter = metrics::counter("cs.sql.query.data");
pub static DATA_QUERIES_CACHE_HIT: Counter = metrics::counter("cs.sql.query.data.cache.hit");
Expand Down
4 changes: 4 additions & 0 deletions rust/cubestore/cubestore/src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,10 @@ impl WorkerProcessing for WorkerProcessor {
.ok()
.unwrap_or("--sel-worker".to_string())
}

fn process_type() -> String {
"sel-worker".to_string()
}
}

#[cfg(not(target_os = "windows"))]
Expand Down
33 changes: 33 additions & 0 deletions rust/cubestore/cubestore/src/cluster/worker_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use std::process::{Child, ExitStatus};
use std::sync::Arc;
use std::time::Duration;

use crate::app_metrics;
use crate::util::cancellation_token_guard::CancellationGuard;
use crate::util::metrics;
use deadqueue::unlimited;
use futures::future::join_all;
use ipc_channel::ipc;
Expand Down Expand Up @@ -266,6 +268,13 @@ impl<C: Configurator, P: WorkerProcessing, S: ServicesTransport> WorkerProcess<C
Ok((res, a, r)) => {
if sender.send(Ok(res)).is_err() {
error!("Error during worker message processing: Send Error");
app_metrics::WORKER_POOL_ERROR.add_with_tags(
1,
Some(&vec![
metrics::format_tag("subprocess_type", &P::process_type()),
metrics::format_tag("error_type", "send"),
]),
);
}
args_channel = Some((a, r));
}
Expand Down Expand Up @@ -304,6 +313,22 @@ impl<C: Configurator, P: WorkerProcessing, S: ServicesTransport> WorkerProcess<C
> {
args_tx.send(message)?;
let (res, res_rx) = cube_ext::spawn_blocking(move || (res_rx.recv(), res_rx)).await?;

if let Err(ipc_err) = res {
app_metrics::WORKER_POOL_ERROR.add_with_tags(
1,
Some(&vec![
metrics::format_tag("subprocess_type", &P::process_type()),
metrics::format_tag("error_type", "receive"),
]),
);
return Err(CubeError::internal(format!(
"Failed to receive response from subprocess {}: {}",
P::process_titile(),
ipc_err
)));
}

Ok((res??, args_tx, res_rx))
}

Expand Down Expand Up @@ -546,6 +571,10 @@ mod tests {
fn process_titile() -> String {
"--sel-worker".to_string()
}

fn process_type() -> String {
"sel-worker".to_string()
}
}

type Transport = DefaultServicesTransport<DefaultServicesServerProcessor>;
Expand Down Expand Up @@ -758,6 +787,10 @@ mod tests {
fn process_titile() -> String {
"--sel-worker".to_string()
}

fn process_type() -> String {
"sel-worker".to_string()
}
}

type ServTransport = DefaultServicesTransport<TestServicesServerProcessor>;
Expand Down
2 changes: 2 additions & 0 deletions rust/cubestore/cubestore/src/cluster/worker_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub trait WorkerProcessing: Send + Sync + 'static {
fn is_single_job_process() -> bool;

fn process_titile() -> String;

fn process_type() -> String;
}

pub trait ServicesTransport {
Expand Down
Loading