@@ -540,29 +540,32 @@ def test_run_pipeline_process_exception_handling(self, mock_launch_pipeline):
540540class TestKillPipelineProcessGroup :
541541 """Test suite for kill_pipeline_process_group function."""
542542
543- @patch ("nv_ingest.framework.orchestration.process.execution.os.getpgid" )
544- @patch ("nv_ingest.framework.orchestration.process.execution.os.killpg" )
545- def test_kill_pipeline_process_group_success (self , mock_killpg , mock_getpgid ):
543+ @patch ("nv_ingest.framework.orchestration.process.termination.os.getpgid" )
544+ @patch ("nv_ingest.framework.orchestration.process.termination.os.killpg" )
545+ @patch ("nv_ingest.framework.orchestration.process.termination.os.kill" )
546+ def test_kill_pipeline_process_group_success (self , mock_kill , mock_killpg , mock_getpgid ):
546547 """Test successful process group termination."""
547548 # Setup
548549 mock_process = Mock ()
549550 mock_process .pid = 12345
550- # Configure process to terminate gracefully after first SIGTERM
551+ # Configure process to terminate gracefully after SIGUSR1
551552 mock_process .is_alive .side_effect = [True , False ] # Alive initially, then terminates
552553 pgid = 54321
553554 mock_getpgid .return_value = pgid
554555
555556 # Execute
556557 kill_pipeline_process_group (mock_process )
557558
558- # Verify graceful termination
559+ # Verify graceful termination via SIGUSR1 (not SIGTERM, to avoid gRPC stack traces)
559560 mock_getpgid .assert_called_once_with (mock_process .pid )
560- mock_killpg .assert_called_once_with (pgid , signal .SIGTERM )
561+ mock_kill .assert_called_once_with (mock_process .pid , signal .SIGUSR1 )
562+ mock_killpg .assert_not_called ()
561563 mock_process .join .assert_called_once_with (timeout = 5.0 )
562564
563- @patch ("nv_ingest.framework.orchestration.process.execution.os.getpgid" )
564- @patch ("nv_ingest.framework.orchestration.process.execution.os.killpg" )
565- def test_kill_pipeline_process_group_already_dead (self , mock_killpg , mock_getpgid ):
565+ @patch ("nv_ingest.framework.orchestration.process.termination.os.getpgid" )
566+ @patch ("nv_ingest.framework.orchestration.process.termination.os.killpg" )
567+ @patch ("nv_ingest.framework.orchestration.process.termination.os.kill" )
568+ def test_kill_pipeline_process_group_already_dead (self , mock_kill , mock_killpg , mock_getpgid ):
566569 """Test kill_pipeline_process_group when process is already dead."""
567570 # Setup
568571 mock_process = Mock ()
@@ -574,12 +577,14 @@ def test_kill_pipeline_process_group_already_dead(self, mock_killpg, mock_getpgi
574577
575578 # Verify - should not attempt to kill if already dead
576579 mock_getpgid .assert_not_called ()
580+ mock_kill .assert_not_called ()
577581 mock_killpg .assert_not_called ()
578582 mock_process .join .assert_not_called ()
579583
580- @patch ("nv_ingest.framework.orchestration.process.execution.os.getpgid" )
581- @patch ("nv_ingest.framework.orchestration.process.execution.os.killpg" )
582- def test_kill_pipeline_process_group_getpgid_exception (self , mock_killpg , mock_getpgid ):
584+ @patch ("nv_ingest.framework.orchestration.process.termination.os.getpgid" )
585+ @patch ("nv_ingest.framework.orchestration.process.termination.os.killpg" )
586+ @patch ("nv_ingest.framework.orchestration.process.termination.os.kill" )
587+ def test_kill_pipeline_process_group_getpgid_exception (self , mock_kill , mock_killpg , mock_getpgid ):
583588 """Test kill_pipeline_process_group when getpgid fails."""
584589 # Setup
585590 mock_process = Mock ()
@@ -592,49 +597,52 @@ def test_kill_pipeline_process_group_getpgid_exception(self, mock_killpg, mock_g
592597
593598 # Verify
594599 mock_getpgid .assert_called_once_with (mock_process .pid )
600+ mock_kill .assert_not_called ()
595601 mock_killpg .assert_not_called ()
596602 mock_process .join .assert_not_called ()
597603
598- @patch ("nv_ingest.framework.orchestration.process.execution.os.getpgid" )
599- @patch ("nv_ingest.framework.orchestration.process.execution.os.killpg" )
600- def test_kill_pipeline_process_group_killpg_exception (self , mock_killpg , mock_getpgid ):
601- """Test kill_pipeline_process_group when killpg fails."""
604+ @patch ("nv_ingest.framework.orchestration.process.termination.os.getpgid" )
605+ @patch ("nv_ingest.framework.orchestration.process.termination.os.killpg" )
606+ @patch ("nv_ingest.framework.orchestration.process.termination.os.kill" )
607+ def test_kill_pipeline_process_group_kill_exception (self , mock_kill , mock_killpg , mock_getpgid ):
608+ """Test kill_pipeline_process_group when os.kill fails."""
602609 # Setup
603610 mock_process = Mock ()
604611 mock_process .pid = 12345
605612 mock_process .is_alive .return_value = True
606613 pgid = 54321
607614 mock_getpgid .return_value = pgid
608- mock_killpg .side_effect = OSError ("Permission denied" )
615+ mock_kill .side_effect = OSError ("Permission denied" )
609616
610617 # Execute - should not raise exception
611618 kill_pipeline_process_group (mock_process )
612619
613- # Verify - when killpg fails, the exception is caught and join() is not called
620+ # Verify - when os.kill fails, the exception is caught and join() is not called
614621 mock_getpgid .assert_called_once_with (mock_process .pid )
615- mock_killpg .assert_called_once_with (pgid , signal .SIGTERM )
622+ mock_kill .assert_called_once_with (mock_process . pid , signal .SIGUSR1 )
616623 mock_process .join .assert_not_called ()
617624
618- @patch ("nv_ingest.framework.orchestration.process.execution.os.getpgid" )
619- @patch ("nv_ingest.framework.orchestration.process.execution.os.killpg" )
620- def test_kill_pipeline_process_group_graceful_then_force (self , mock_killpg , mock_getpgid ):
625+ @patch ("nv_ingest.framework.orchestration.process.termination.os.getpgid" )
626+ @patch ("nv_ingest.framework.orchestration.process.termination.os.killpg" )
627+ @patch ("nv_ingest.framework.orchestration.process.termination.os.kill" )
628+ def test_kill_pipeline_process_group_graceful_then_force (self , mock_kill , mock_killpg , mock_getpgid ):
621629 """Test kill_pipeline_process_group escalates to SIGKILL if needed."""
622630 # Setup
623631 mock_process = Mock ()
624632 mock_process .pid = 12345
625- mock_process .is_alive .side_effect = [True , True , False ] # alive, still alive after SIGTERM , then dead
633+ mock_process .is_alive .side_effect = [True , True , False ] # alive, still alive after SIGUSR1 , then dead
626634 pgid = 54321
627635 mock_getpgid .return_value = pgid
628636
629637 # Execute
630638 kill_pipeline_process_group (mock_process )
631639
632- # Verify both SIGTERM and SIGKILL were sent
633- # getpgid is called twice - once for SIGTERM , once for SIGKILL
640+ # Verify SIGUSR1 was sent to main process, then SIGKILL to group
641+ # getpgid is called twice - once for initial check , once for SIGKILL
634642 assert mock_getpgid .call_count == 2
635643 mock_getpgid .assert_has_calls ([call (mock_process .pid ), call (mock_process .pid )])
636- assert mock_killpg . call_count == 2
637- mock_killpg .assert_has_calls ([ call ( pgid , signal .SIGTERM ), call ( pgid , signal . SIGKILL )] )
644+ mock_kill . assert_called_once_with ( mock_process . pid , signal . SIGUSR1 )
645+ mock_killpg .assert_called_once_with ( pgid , signal .SIGKILL )
638646 assert mock_process .join .call_count == 2
639647
640648 def test_kill_pipeline_process_group_parameter_validation (self ):
@@ -683,17 +691,18 @@ def test_launch_and_run_pipeline_process_integration(
683691 # run_pipeline_process only passes block=True to launch_pipeline
684692 mock_launch .assert_called_once_with (mock_config , block = True )
685693
686- @patch ("nv_ingest.framework.orchestration.process.execution.os.getpgid" )
687- @patch ("nv_ingest.framework.orchestration.process.execution.os.killpg" )
688- def test_process_lifecycle_management (self , mock_killpg , mock_getpgid ):
694+ @patch ("nv_ingest.framework.orchestration.process.termination.os.getpgid" )
695+ @patch ("nv_ingest.framework.orchestration.process.termination.os.killpg" )
696+ @patch ("nv_ingest.framework.orchestration.process.termination.os.kill" )
697+ def test_process_lifecycle_management (self , mock_kill , mock_killpg , mock_getpgid ):
689698 """Test complete process lifecycle management."""
690699 # Setup
691700 mock_process = Mock ()
692701 mock_process .pid = 99999
693702 pgid = 88888
694703 mock_getpgid .return_value = pgid
695704
696- # Configure process to terminate gracefully after first SIGTERM
705+ # Configure process to terminate gracefully after SIGUSR1
697706 mock_process .is_alive .side_effect = [True , False ] # Alive initially, then terminates
698707
699708 # Simulate process lifecycle
@@ -703,7 +712,8 @@ def test_process_lifecycle_management(self, mock_killpg, mock_getpgid):
703712
704713 # Verify cleanup - should only call getpgid once for graceful termination
705714 mock_getpgid .assert_called_once_with (mock_process .pid )
706- mock_killpg .assert_called_once_with (pgid , signal .SIGTERM )
715+ mock_kill .assert_called_once_with (mock_process .pid , signal .SIGUSR1 )
716+ mock_killpg .assert_not_called ()
707717 mock_process .join .assert_called_once_with (timeout = 5.0 )
708718
709719 @patch ("nv_ingest.framework.orchestration.process.execution.ray.init" )
0 commit comments