@@ -1144,6 +1144,183 @@ def test_default_headers_with_token(self):
11441144 assert "Authorization" in headers
11451145 assert headers ["Authorization" ] == "Bearer test_token"
11461146
1147+ def test_setup_launcher_no_launcher (self ):
1148+ """Test _setup_launcher when no launcher is set."""
1149+ executor = DGXCloudExecutor (
1150+ base_url = "https://dgxapi.example.com" ,
1151+ kube_apiserver_url = "https://127.0.0.1:443" ,
1152+ app_id = "test_app_id" ,
1153+ app_secret = "test_app_secret" ,
1154+ project_name = "test_project" ,
1155+ container_image = "nvcr.io/nvidia/test:latest" ,
1156+ pvc_nemo_run_dir = "/workspace/nemo_run" ,
1157+ nprocs_per_node = 8 ,
1158+ )
1159+
1160+ # Set up job details required by _setup_launcher
1161+ executor .job_name = "test_job"
1162+ executor .job_dir = "/workspace/test_job"
1163+
1164+ with patch ("nemo_run.core.execution.dgxcloud.CONSOLE" ):
1165+ executor ._setup_launcher ()
1166+
1167+ # When no launcher, ntasks_per_node should remain as nprocs_per_node
1168+ assert executor .ntasks_per_node == 8
1169+ assert (
1170+ not hasattr (executor , "torchrun_nproc_per_node" )
1171+ or executor .torchrun_nproc_per_node is None
1172+ )
1173+
1174+ def test_setup_launcher_with_torchrun (self ):
1175+ """Test _setup_launcher with Torchrun launcher."""
1176+ from nemo_run .core .execution .launcher import Torchrun
1177+
1178+ executor = DGXCloudExecutor (
1179+ base_url = "https://dgxapi.example.com" ,
1180+ kube_apiserver_url = "https://127.0.0.1:443" ,
1181+ app_id = "test_app_id" ,
1182+ app_secret = "test_app_secret" ,
1183+ project_name = "test_project" ,
1184+ container_image = "nvcr.io/nvidia/test:latest" ,
1185+ pvc_nemo_run_dir = "/workspace/nemo_run" ,
1186+ nprocs_per_node = 8 ,
1187+ launcher = Torchrun (),
1188+ )
1189+
1190+ executor .job_name = "test_job"
1191+ executor .job_dir = "/workspace/test_job"
1192+
1193+ with patch ("nemo_run.core.execution.dgxcloud.CONSOLE" ) as mock_console :
1194+ executor ._setup_launcher ()
1195+
1196+ # With Torchrun, ntasks_per_node should be 1 and torchrun_nproc_per_node should be nprocs_per_node
1197+ assert executor .ntasks_per_node == 1
1198+ assert executor .torchrun_nproc_per_node == 8
1199+ mock_console .log .assert_called_once ()
1200+ assert "Torchrun" in mock_console .log .call_args [0 ][0 ]
1201+
1202+ def test_setup_launcher_with_fault_tolerance (self ):
1203+ """Test _setup_launcher with FaultTolerance launcher."""
1204+ from nemo_run .core .execution .launcher import FaultTolerance
1205+
1206+ ft_launcher = FaultTolerance ()
1207+ executor = DGXCloudExecutor (
1208+ base_url = "https://dgxapi.example.com" ,
1209+ kube_apiserver_url = "https://127.0.0.1:443" ,
1210+ app_id = "test_app_id" ,
1211+ app_secret = "test_app_secret" ,
1212+ project_name = "test_project" ,
1213+ container_image = "nvcr.io/nvidia/test:latest" ,
1214+ pvc_nemo_run_dir = "/workspace/nemo_run" ,
1215+ nprocs_per_node = 4 ,
1216+ launcher = ft_launcher ,
1217+ )
1218+
1219+ executor .job_name = "my_ft_job"
1220+ executor .job_dir = "/workspace/jobs/my_ft_job"
1221+
1222+ with patch ("nemo_run.core.execution.dgxcloud.CONSOLE" ) as mock_console :
1223+ with patch ("nemo_run.config.RUNDIR_NAME" , "nemo_run" ):
1224+ executor ._setup_launcher ()
1225+
1226+ # Verify Torchrun settings
1227+ assert executor .ntasks_per_node == 1
1228+ assert executor .torchrun_nproc_per_node == 4
1229+
1230+ # Verify FaultTolerance paths are set
1231+ assert ft_launcher .cfg_path == "/workspace/jobs/my_ft_job/my_ft_job/my_ft_job_ft_cfg.yml"
1232+ assert ft_launcher .finished_flag_file == "/nemo_run/my_ft_job_finished_flag"
1233+ assert (
1234+ ft_launcher .job_results_file
1235+ == "/workspace/jobs/my_ft_job/my_ft_job/my_ft_job_job_results"
1236+ )
1237+
1238+ # Verify console log was called
1239+ mock_console .log .assert_called_once ()
1240+ assert "FaultTolerance" in mock_console .log .call_args [0 ][0 ]
1241+
1242+ def test_setup_launcher_fault_tolerance_paths (self ):
1243+ """Test that FaultTolerance paths are correctly constructed."""
1244+ from nemo_run .core .execution .launcher import FaultTolerance
1245+
1246+ ft_launcher = FaultTolerance ()
1247+ executor = DGXCloudExecutor (
1248+ base_url = "https://dgxapi.example.com" ,
1249+ kube_apiserver_url = "https://127.0.0.1:443" ,
1250+ app_id = "test_app_id" ,
1251+ app_secret = "test_app_secret" ,
1252+ project_name = "test_project" ,
1253+ container_image = "nvcr.io/nvidia/test:latest" ,
1254+ pvc_nemo_run_dir = "/workspace/nemo_run" ,
1255+ launcher = ft_launcher ,
1256+ )
1257+
1258+ executor .job_name = "test_training"
1259+ executor .job_dir = "/mnt/workspace/test_training"
1260+
1261+ with patch ("nemo_run.core.execution.dgxcloud.CONSOLE" ):
1262+ with patch ("nemo_run.config.RUNDIR_NAME" , "custom_rundir" ):
1263+ executor ._setup_launcher ()
1264+
1265+ # Check path construction
1266+ base_dir = "/mnt/workspace/test_training/test_training"
1267+ assert ft_launcher .cfg_path == f"{ base_dir } /test_training_ft_cfg.yml"
1268+ assert ft_launcher .finished_flag_file == "/custom_rundir/test_training_finished_flag"
1269+ assert ft_launcher .job_results_file == f"{ base_dir } /test_training_job_results"
1270+
1271+ def test_setup_launcher_with_different_nprocs (self ):
1272+ """Test _setup_launcher with different nprocs_per_node values."""
1273+ from nemo_run .core .execution .launcher import Torchrun
1274+
1275+ for nprocs in [1 , 2 , 4 , 8 , 16 ]:
1276+ executor = DGXCloudExecutor (
1277+ base_url = "https://dgxapi.example.com" ,
1278+ kube_apiserver_url = "https://127.0.0.1:443" ,
1279+ app_id = "test_app_id" ,
1280+ app_secret = "test_app_secret" ,
1281+ project_name = "test_project" ,
1282+ container_image = "nvcr.io/nvidia/test:latest" ,
1283+ pvc_nemo_run_dir = "/workspace/nemo_run" ,
1284+ nprocs_per_node = nprocs ,
1285+ launcher = Torchrun (),
1286+ )
1287+
1288+ executor .job_name = "test_job"
1289+ executor .job_dir = "/workspace/test_job"
1290+
1291+ with patch ("nemo_run.core.execution.dgxcloud.CONSOLE" ):
1292+ executor ._setup_launcher ()
1293+
1294+ assert executor .torchrun_nproc_per_node == nprocs
1295+ assert executor .ntasks_per_node == 1
1296+
1297+ def test_setup_launcher_super_called (self ):
1298+ """Test that _setup_launcher calls super()._setup_launcher()."""
1299+ from nemo_run .core .execution .launcher import Torchrun
1300+
1301+ executor = DGXCloudExecutor (
1302+ base_url = "https://dgxapi.example.com" ,
1303+ kube_apiserver_url = "https://127.0.0.1:443" ,
1304+ app_id = "test_app_id" ,
1305+ app_secret = "test_app_secret" ,
1306+ project_name = "test_project" ,
1307+ container_image = "nvcr.io/nvidia/test:latest" ,
1308+ pvc_nemo_run_dir = "/workspace/nemo_run" ,
1309+ launcher = Torchrun (),
1310+ )
1311+
1312+ executor .job_name = "test_job"
1313+ executor .job_dir = "/workspace/test_job"
1314+
1315+ with patch ("nemo_run.core.execution.dgxcloud.CONSOLE" ):
1316+ with patch .object (
1317+ executor .__class__ .__bases__ [0 ], "_setup_launcher"
1318+ ) as mock_super_setup :
1319+ executor ._setup_launcher ()
1320+
1321+ # Verify super() was called
1322+ mock_super_setup .assert_called_once ()
1323+
11471324
11481325class TestDGXCloudRequest :
11491326 """Test DGXCloudRequest dataclass and its methods."""
0 commit comments