@@ -302,6 +302,7 @@ def _status(
302302 result = executor .tunnel .run (cmd )
303303
304304 job_id = result .stdout .strip ()
305+ job_id = job_id .split ("\n " )[- 1 ]
305306
306307 # If job not found in running jobs, check if it's in cluster_map
307308 if not job_id :
@@ -664,7 +665,11 @@ def run(self):
664665 ]
665666 )
666667
667- jump_arg_str = f"{ executor .tunnel .user } @{ executor .tunnel .host } "
668+ jump_arg_str = (
669+ f"{ executor .tunnel .user } @{ executor .tunnel .host } "
670+ if isinstance (executor .tunnel , SSHTunnel )
671+ else None
672+ )
668673 raw_jump_identity = getattr (executor .tunnel , "identity" , None )
669674 jump_identity_path_for_proxy = None
670675 if raw_jump_identity :
@@ -1106,13 +1111,14 @@ def start(
11061111 # ------------------------------------------------------------------
11071112 # Ship *workdir* over to the remote side (or package via packager)
11081113 # ------------------------------------------------------------------
1114+ cluster_dir = os .path .join (self .executor .tunnel .job_dir , self .name )
11091115 remote_workdir : Optional [str ] = None
11101116
11111117 if workdir :
1112- if isinstance ( self . executor . tunnel , SSHTunnel ):
1113- # Rsync workdir honouring .gitignore
1114- remote_workdir = os . path . join (self .executor .tunnel . job_dir , self . name , "code" )
1115- if not dryrun :
1118+ remote_workdir = os . path . join ( cluster_dir , "code" )
1119+ if not dryrun :
1120+ if isinstance (self .executor .tunnel , SSHTunnel ):
1121+ # Rsync workdir honouring .gitignore
11161122 self .executor .tunnel .connect ()
11171123 assert self .executor .tunnel .session is not None , (
11181124 "Tunnel session is not connected"
@@ -1123,11 +1129,24 @@ def start(
11231129 remote_workdir ,
11241130 rsync_opts = "--filter=':- .gitignore'" ,
11251131 )
1126- else :
1127- remote_workdir = workdir
1132+ else :
1133+ os .makedirs (remote_workdir , exist_ok = True )
1134+ subprocess .run (
1135+ [
1136+ "rsync" ,
1137+ "-pthrvz" ,
1138+ "--filter=:- .gitignore" ,
1139+ f"{ os .path .join (workdir , '' )} " ,
1140+ remote_workdir ,
1141+ ],
1142+ check = True ,
1143+ stdout = subprocess .DEVNULL ,
1144+ stderr = subprocess .DEVNULL ,
1145+ )
11281146 elif self .executor .packager is not None :
11291147 # Use the packager to create an archive which we then extract on the
11301148 # submission host and optionally rsync to the target.
1149+ remote_workdir = os .path .join (cluster_dir , "code" )
11311150 if not dryrun :
11321151 if isinstance (self .executor .tunnel , SSHTunnel ):
11331152 package_dir = tempfile .mkdtemp (prefix = "nemo_packager_" )
@@ -1157,7 +1176,6 @@ def start(
11571176 )
11581177
11591178 if isinstance (self .executor .tunnel , SSHTunnel ):
1160- remote_workdir = os .path .join (self .executor .tunnel .job_dir , self .name , "code" )
11611179 self .executor .tunnel .connect ()
11621180 assert self .executor .tunnel .session is not None , (
11631181 "Tunnel session is not connected"
@@ -1169,7 +1187,19 @@ def start(
11691187 rsync_opts = "--filter=':- .gitignore'" ,
11701188 )
11711189 else :
1172- remote_workdir = local_code_extraction_path
1190+ os .makedirs (remote_workdir , exist_ok = True )
1191+ subprocess .run (
1192+ [
1193+ "rsync" ,
1194+ "-pthrvz" ,
1195+ "--filter=:- .gitignore" ,
1196+ f"{ os .path .join (local_code_extraction_path , '' )} " ,
1197+ remote_workdir ,
1198+ ],
1199+ check = True ,
1200+ stdout = subprocess .DEVNULL ,
1201+ stderr = subprocess .DEVNULL ,
1202+ )
11731203
11741204 assert remote_workdir is not None , "workdir could not be determined"
11751205
0 commit comments