@@ -165,14 +165,16 @@ mod tests {
165165 // We create a TCP collector and a TCP reporter, then we send events
166166 // to the reporter and check if the collector receives them.
167167 //
168- // We use a bounded channel to send the events from the reporter to the
169- // collector. The collector reads the events from the channel and checks
170- // if they are the same as the original events .
168+ // A channel is used so the collector thread can signal that all events
169+ // have been received, and a timeout prevents the test from hanging
170+ // indefinitely if event delivery is broken .
171171 #[ test]
172172 fn tcp_reporter_and_collectors_work ( ) {
173173 let ( collector, address) = CollectorOnTcp :: new ( ) . unwrap ( ) ;
174174 let collector_arc = Arc :: new ( collector) ;
175- let reporter = ReporterOnTcp :: new ( address) ;
175+
176+ // Channel for the collector to signal "I've received everything"
177+ let ( done_tx, done_rx) = std:: sync:: mpsc:: sync_channel :: < ( ) > ( 0 ) ;
176178
177179 // Start the collector in a separate thread using the events iterator
178180 let collector_thread = {
@@ -184,6 +186,7 @@ mod tests {
184186 Ok ( event) => {
185187 received_events. push ( event) ;
186188 if received_events. len ( ) == fixtures:: EVENTS . len ( ) {
189+ let _ = done_tx. send ( ( ) ) ;
187190 break ;
188191 }
189192 }
@@ -199,30 +202,43 @@ mod tests {
199202
200203 // Send events to the reporter.
201204 for event in fixtures:: EVENTS . iter ( ) {
205+ let reporter = ReporterOnTcp :: new ( address) ;
202206 let result = reporter. report ( event. clone ( ) ) ;
203207 assert ! ( result. is_ok( ) ) ;
204208 }
205209
206- // Give some time for events to be processed before shutdown
207- std:: thread:: sleep ( std:: time:: Duration :: from_millis ( 10 ) ) ;
208-
209- // Call the stop method to stop the collector.
210- {
211- let tcp_collector = Arc :: clone ( & collector_arc) ;
212- tcp_collector. shutdown ( ) . unwrap ( ) ;
213- }
210+ // Wait with a timeout — if delivery is broken, fail instead of hang.
211+ done_rx
212+ . recv_timeout ( std:: time:: Duration :: from_secs ( 5 ) )
213+ . expect ( "timed out waiting for collector to receive all events" ) ;
214214
215- // Wait for all events to be consumed
215+ // Now safe to shutdown and join.
216+ collector_arc. shutdown ( ) . unwrap ( ) ;
216217 let received_events = collector_thread. join ( ) . unwrap ( ) ;
217218
218219 // Assert that we received all the events.
219- assert_eq ! ( received_events . len( ) , fixtures :: EVENTS . len( ) ) ;
220+ assert_eq ! ( fixtures :: EVENTS . len( ) , received_events . len( ) ) ;
220221 for event in received_events {
221222 assert ! ( fixtures:: EVENTS . contains( & event) ) ;
222223 }
224+ }
225+
226+ // Test that calling shutdown on the collector stops the events iterator.
227+ // No events are sent — this purely tests the shutdown mechanism.
228+ #[ test]
229+ fn tcp_collector_shutdown_stops_iterator ( ) {
230+ let ( collector, _address) = CollectorOnTcp :: new ( ) . unwrap ( ) ;
231+ let collector_arc = Arc :: new ( collector) ;
232+
233+ let collector_thread = {
234+ let tcp_collector = Arc :: clone ( & collector_arc) ;
235+ std:: thread:: spawn ( move || tcp_collector. events ( ) . count ( ) )
236+ } ;
237+
238+ collector_arc. shutdown ( ) . unwrap ( ) ;
223239
224- // shutdown the receiver thread
225- // Note: collector_thread is already joined above
240+ let count = collector_thread . join ( ) . unwrap ( ) ;
241+ assert_eq ! ( count , 0 ) ;
226242 }
227243
228244 mod fixtures {
0 commit comments