@@ -5,7 +5,9 @@ use std::process::{Child, ExitStatus};
55use std:: sync:: Arc ;
66use std:: time:: Duration ;
77
8+ use crate :: app_metrics;
89use crate :: util:: cancellation_token_guard:: CancellationGuard ;
10+ use crate :: util:: metrics;
911use deadqueue:: unlimited;
1012use futures:: future:: join_all;
1113use ipc_channel:: ipc;
@@ -266,6 +268,13 @@ impl<C: Configurator, P: WorkerProcessing, S: ServicesTransport> WorkerProcess<C
266268 Ok ( ( res, a, r) ) => {
267269 if sender. send ( Ok ( res) ) . is_err ( ) {
268270 error ! ( "Error during worker message processing: Send Error" ) ;
271+ app_metrics:: WORKER_POOL_ERROR . add_with_tags (
272+ 1 ,
273+ Some ( & vec ! [
274+ metrics:: format_tag( "subprocess_type" , & P :: process_type( ) ) ,
275+ metrics:: format_tag( "error_type" , "send" ) ,
276+ ] ) ,
277+ ) ;
269278 }
270279 args_channel = Some ( ( a, r) ) ;
271280 }
@@ -304,6 +313,22 @@ impl<C: Configurator, P: WorkerProcessing, S: ServicesTransport> WorkerProcess<C
304313 > {
305314 args_tx. send ( message) ?;
306315 let ( res, res_rx) = cube_ext:: spawn_blocking ( move || ( res_rx. recv ( ) , res_rx) ) . await ?;
316+
317+ if let Err ( ipc_err) = res {
318+ app_metrics:: WORKER_POOL_ERROR . add_with_tags (
319+ 1 ,
320+ Some ( & vec ! [
321+ metrics:: format_tag( "subprocess_type" , & P :: process_type( ) ) ,
322+ metrics:: format_tag( "error_type" , "receive" ) ,
323+ ] ) ,
324+ ) ;
325+ return Err ( CubeError :: internal ( format ! (
326+ "Failed to receive response from subprocess {}: {}" ,
327+ P :: process_titile( ) ,
328+ ipc_err
329+ ) ) ) ;
330+ }
331+
307332 Ok ( ( res??, args_tx, res_rx) )
308333 }
309334
@@ -546,6 +571,10 @@ mod tests {
546571 fn process_titile ( ) -> String {
547572 "--sel-worker" . to_string ( )
548573 }
574+
575+ fn process_type ( ) -> String {
576+ "sel-worker" . to_string ( )
577+ }
549578 }
550579
551580 type Transport = DefaultServicesTransport < DefaultServicesServerProcessor > ;
@@ -758,6 +787,10 @@ mod tests {
758787 fn process_titile ( ) -> String {
759788 "--sel-worker" . to_string ( )
760789 }
790+
791+ fn process_type ( ) -> String {
792+ "sel-worker" . to_string ( )
793+ }
761794 }
762795
763796 type ServTransport = DefaultServicesTransport < TestServicesServerProcessor > ;
0 commit comments