@@ -16,10 +16,12 @@ use crate::telemetry::{SCHEDULED_THREAD_POOL, TELEMETRY};
1616use cpp:: cpp;
1717use datatypes:: { BaguaBucket , BaguaTensor } ;
1818use events:: BaguaEventChannel ;
19+ use flume:: RecvTimeoutError ;
1920use hashbrown:: { HashMap , HashSet } ;
2021use std:: collections:: VecDeque ;
2122use std:: fmt:: Debug ;
2223use std:: sync:: Arc ;
24+ use std:: time:: Duration ;
2325use thiserror:: Error ;
2426
2527cpp ! { {
@@ -120,6 +122,7 @@ pub struct BaguaCommBackend {
120122 channels : Arc < BaguaCommOpChannels > ,
121123 managed_ptrs : HashSet < u64 > ,
122124 comm_worker : std:: thread:: JoinHandle < ( ) > ,
125+ comm_monitor : std:: thread:: JoinHandle < ( ) > ,
123126}
124127
125128impl BaguaCommBackend {
@@ -168,6 +171,10 @@ impl BaguaCommBackend {
168171
169172 let channels = Arc :: new ( BaguaCommOpChannels :: new ( schedule_channel_cap) ) ;
170173 let channels_clone = channels. clone ( ) ;
174+ let ( monitor_op_start_channel_sender, monitor_op_start_channel_receiver) =
175+ flume:: unbounded ( ) ;
176+ let ( monitor_op_finish_channel_sender, monitor_op_finish_channel_receiver) =
177+ flume:: unbounded ( ) ;
171178
172179 BaguaCommBackend {
173180 ordered_buckets : Default :: default ( ) ,
@@ -190,6 +197,7 @@ impl BaguaCommBackend {
190197 "worker received scheduled communication operation {:?}" ,
191198 comm_op
192199 ) ;
200+ monitor_op_start_channel_sender. send ( comm_op. bucket . clone ( ) ) ;
193201 for op in & comm_op. ops {
194202 op. execute_background_communication (
195203 comm_op. bucket . clone ( ) ,
@@ -199,6 +207,18 @@ impl BaguaCommBackend {
199207 tracing:: debug!( "comm op executed: {:?}" , comm_op) ;
200208 comm_op. event_channel . finish ( ) ;
201209 tracing:: debug!( "comm op marked finished: {:?}" , comm_op) ;
210+ monitor_op_finish_channel_sender. send ( ( ) ) ;
211+ }
212+ } ) ,
213+ comm_monitor : std:: thread:: spawn ( move || loop {
214+ let op_bucket = monitor_op_start_channel_receiver
215+ . recv ( )
216+ . expect ( "monitor cannot receive next comm op bucket" ) ;
217+ match monitor_op_finish_channel_receiver. recv_timeout ( Duration :: from_secs ( 300 ) ) {
218+ Ok ( _) => { }
219+ Err ( _) => {
220+ panic ! ( "{:?} comm op has not finished for 5 min, panic" , op_bucket) ;
221+ }
202222 }
203223 } ) ,
204224 }
0 commit comments