Skip to content

Commit fe7d0d9

Browse files
authored
Merge pull request #7025 from aldbr/rel-v8r0_FIX_SSHCEConnection
[8.0] fix: SSHCE.getPilotOutput()
2 parents 2315b13 + 38d8d1e commit fe7d0d9

File tree

1 file changed

+105
-118
lines changed

1 file changed

+105
-118
lines changed

src/DIRAC/Resources/Computing/SSHComputingElement.py

Lines changed: 105 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,12 @@
6262
6363
**Code Documentation**
6464
"""
65-
import os
65+
import errno
6666
import json
67-
import stat
67+
import os
68+
import pexpect
6869
import shutil
69-
import errno
70+
import stat
7071
from urllib.parse import urlparse
7172
from urllib.parse import quote
7273
from urllib.parse import unquote
@@ -136,46 +137,36 @@ def __ssh_call(self, command, timeout):
136137
if not timeout:
137138
timeout = 999
138139

140+
ssh_newkey = "Are you sure you want to continue connecting"
139141
try:
140-
import pexpect
141-
142-
ssh_newkey = "Are you sure you want to continue connecting"
143-
try:
144-
child = pexpect.spawn(command, timeout=timeout, encoding="utf-8")
145-
i = child.expect([pexpect.TIMEOUT, ssh_newkey, pexpect.EOF, "assword: "])
142+
child = pexpect.spawn(command, timeout=timeout, encoding="utf-8")
143+
i = child.expect([pexpect.TIMEOUT, ssh_newkey, pexpect.EOF, "assword: "])
144+
if i == 0: # Timeout
145+
return S_OK((-1, child.before, "SSH login failed"))
146+
147+
if i == 1: # SSH does not have the public key. Just accept it.
148+
child.sendline("yes")
149+
child.expect("assword: ")
150+
i = child.expect([pexpect.TIMEOUT, "assword: "])
146151
if i == 0: # Timeout
147-
return S_OK((-1, child.before, "SSH login failed"))
148-
elif i == 1: # SSH does not have the public key. Just accept it.
149-
child.sendline("yes")
150-
child.expect("assword: ")
151-
i = child.expect([pexpect.TIMEOUT, "assword: "])
152-
if i == 0: # Timeout
153-
return S_OK((-1, str(child.before) + str(child.after), "SSH login failed"))
154-
elif i == 1:
155-
child.sendline(self.password)
156-
child.expect(pexpect.EOF)
157-
return S_OK((0, child.before, ""))
158-
elif i == 2:
159-
# Passwordless login, get the output
160-
return S_OK((0, child.before, ""))
161-
162-
if self.password:
152+
return S_OK((-1, str(child.before) + str(child.after), "SSH login failed"))
153+
if i == 1:
163154
child.sendline(self.password)
164155
child.expect(pexpect.EOF)
165156
return S_OK((0, child.before, ""))
166-
return S_ERROR((-2, child.before, ""))
167-
except Exception as x:
168-
res = (-1, f"Encountered exception {Exception}: {str(x)}")
169-
return S_ERROR(res)
170-
except BaseException:
171-
from DIRAC.Core.Utilities.Subprocess import shellCall
172-
173-
# Try passwordless login
174-
result = shellCall(timeout, command)
175-
# print ( "!!! SSH command: %s returned %s\n" % (command, result) )
176-
if result["Value"][0] == 255:
177-
return S_ERROR((-1, f"Cannot connect to host {self.host}", ""))
178-
return result
157+
158+
if i == 2:
159+
# Passwordless login, get the output
160+
return S_OK((0, child.before, ""))
161+
162+
if self.password:
163+
child.sendline(self.password)
164+
child.expect(pexpect.EOF)
165+
return S_OK((0, child.before, ""))
166+
167+
return S_ERROR(f"Unknown error: {child.before}")
168+
except Exception as x:
169+
return S_ERROR(f"Encountered exception: {str(x)}")
179170

180171
def sshCall(self, timeout, cmdSeq):
181172
"""Execute remote command via a ssh remote call
@@ -423,7 +414,7 @@ def _prepareRemoteHost(self, host=None):
423414
self.log.verbose(f"Creating working directories on {self.ceParameters['SSHHost']}")
424415
result = ssh.sshCall(30, cmd)
425416
if not result["OK"]:
426-
self.log.error("Failed creating working directories", f"({result['Message'][1]})")
417+
self.log.error("Failed creating working directories", f"({result['Message']})")
427418
return result
428419
status, output, _error = result["Value"]
429420
if status == -1:
@@ -443,7 +434,7 @@ def _prepareRemoteHost(self, host=None):
443434
remoteScript = f"{self.sharedArea}/execute_batch"
444435
result = ssh.scpCall(30, localScript, remoteScript, postUploadCommand=f"chmod +x {remoteScript}")
445436
if not result["OK"]:
446-
self.log.warn(f"Failed uploading control script: {result['Message'][1]}")
437+
self.log.warn(f"Failed uploading control script: {result['Message']}")
447438
return result
448439
status, output, _error = result["Value"]
449440
if status != 0:
@@ -634,13 +625,11 @@ def killJob(self, jobIDList):
634625

635626
def _killJobOnHost(self, jobIDList, host=None):
636627
"""Kill the jobs for the given list of job IDs"""
637-
jobDict = {}
638-
for job in jobIDList:
639-
stamp = os.path.basename(urlparse(job).path)
640-
jobDict[stamp] = job
641-
stampList = list(jobDict)
628+
batchSystemJobList = []
629+
for jobID in jobIDList:
630+
batchSystemJobList.append(os.path.basename(urlparse(jobID.split(":::")[0]).path))
642631

643-
commandOptions = {"JobIDList": stampList, "User": self.user}
632+
commandOptions = {"JobIDList": batchSystemJobList, "User": self.user}
644633
resultCommand = self.__executeHostCommand("killJob", commandOptions, host=host)
645634
if not resultCommand["OK"]:
646635
return resultCommand
@@ -654,18 +643,6 @@ def _killJobOnHost(self, jobIDList, host=None):
654643

655644
return S_OK(len(result["Successful"]))
656645

657-
def _getHostStatus(self, host=None):
658-
"""Get jobs running at a given host"""
659-
resultCommand = self.__executeHostCommand("getCEStatus", {}, host=host)
660-
if not resultCommand["OK"]:
661-
return resultCommand
662-
663-
result = resultCommand["Value"]
664-
if result["Status"] != 0:
665-
return S_ERROR(f"Failed to get CE status: {result['Message']}")
666-
667-
return S_OK(result)
668-
669646
def getCEStatus(self):
670647
"""Method to return information on running and pending jobs."""
671648
result = S_OK()
@@ -686,21 +663,31 @@ def getCEStatus(self):
686663

687664
return result
688665

666+
def _getHostStatus(self, host=None):
667+
"""Get jobs running at a given host"""
668+
resultCommand = self.__executeHostCommand("getCEStatus", {}, host=host)
669+
if not resultCommand["OK"]:
670+
return resultCommand
671+
672+
result = resultCommand["Value"]
673+
if result["Status"] != 0:
674+
return S_ERROR(f"Failed to get CE status: {result['Message']}")
675+
676+
return S_OK(result)
677+
689678
def getJobStatus(self, jobIDList):
690679
"""Get the status information for the given list of jobs"""
691680
return self._getJobStatusOnHost(jobIDList)
692681

693682
def _getJobStatusOnHost(self, jobIDList, host=None):
694683
"""Get the status information for the given list of jobs"""
695-
696684
resultDict = {}
697-
jobDict = {}
698-
for job in jobIDList:
699-
stamp = os.path.basename(urlparse(job).path)
700-
jobDict[stamp] = job
701-
stampList = list(jobDict)
685+
batchSystemJobDict = {}
686+
for jobID in jobIDList:
687+
batchSystemJobID = os.path.basename(urlparse(jobID.split(":::")[0]).path)
688+
batchSystemJobDict[batchSystemJobID] = jobID
702689

703-
for jobList in breakListIntoChunks(stampList, 100):
690+
for jobList in breakListIntoChunks(list(batchSystemJobDict), 100):
704691
resultCommand = self.__executeHostCommand("getJobStatus", {"JobIDList": jobList}, host=host)
705692
if not resultCommand["OK"]:
706693
return resultCommand
@@ -709,14 +696,54 @@ def _getJobStatusOnHost(self, jobIDList, host=None):
709696
if result["Status"] != 0:
710697
return S_ERROR(f"Failed to get job status: {result['Message']}")
711698

712-
for stamp in result["Jobs"]:
713-
resultDict[jobDict[stamp]] = result["Jobs"][stamp]
699+
for batchSystemJobID in result["Jobs"]:
700+
resultDict[batchSystemJobDict[batchSystemJobID]] = result["Jobs"][batchSystemJobID]
714701

715702
return S_OK(resultDict)
716703

704+
def getJobOutput(self, jobID, localDir=None):
705+
"""Get the specified job standard output and error files. If the localDir is provided,
706+
the output is returned as file in this directory. Otherwise, the output is returned
707+
as strings.
708+
"""
709+
self.log.verbose("Getting output for jobID", jobID)
710+
result = self._getJobOutputFiles(jobID)
711+
if not result["OK"]:
712+
return result
713+
714+
batchSystemJobID, host, outputFile, errorFile = result["Value"]
715+
716+
if localDir:
717+
localOutputFile = f"{localDir}/{batchSystemJobID}.out"
718+
localErrorFile = f"{localDir}/{batchSystemJobID}.err"
719+
else:
720+
localOutputFile = "Memory"
721+
localErrorFile = "Memory"
722+
723+
# Take into account the SSHBatch possible SSHHost syntax
724+
host = host.split("/")[0]
725+
726+
ssh = SSH(host=host, parameters=self.ceParameters)
727+
resultStdout = ssh.scpCall(30, localOutputFile, outputFile, upload=False)
728+
if not resultStdout["OK"]:
729+
return resultStdout
730+
731+
resultStderr = ssh.scpCall(30, localErrorFile, errorFile, upload=False)
732+
if not resultStderr["OK"]:
733+
return resultStderr
734+
735+
if localDir:
736+
output = localOutputFile
737+
error = localErrorFile
738+
else:
739+
output = resultStdout["Value"][1]
740+
error = resultStderr["Value"][1]
741+
742+
return S_OK((output, error))
743+
717744
def _getJobOutputFiles(self, jobID):
718745
"""Get output file names for the specific CE"""
719-
jobStamp = os.path.basename(urlparse(jobID).path)
746+
batchSystemJobID = os.path.basename(urlparse(jobID.split(":::")[0]).path)
720747
# host can be retrieved from the path of the jobID
721748
# it might not be present, in this case host is an empty string and will be defined by the CE parameters later
722749
host = os.path.dirname(urlparse(jobID).path).lstrip("/")
@@ -726,19 +753,19 @@ def _getJobOutputFiles(self, jobID):
726753
self.errorTemplate = self.ceParameters["ErrorTemplate"]
727754

728755
if self.outputTemplate:
729-
output = self.outputTemplate % jobStamp
730-
error = self.errorTemplate % jobStamp
756+
output = self.outputTemplate % batchSystemJobID
757+
error = self.errorTemplate % batchSystemJobID
731758
elif "OutputTemplate" in self.ceParameters:
732759
self.outputTemplate = self.ceParameters["OutputTemplate"]
733760
self.errorTemplate = self.ceParameters["ErrorTemplate"]
734-
output = self.outputTemplate % jobStamp
735-
error = self.errorTemplate % jobStamp
761+
output = self.outputTemplate % batchSystemJobID
762+
error = self.errorTemplate % batchSystemJobID
736763
elif hasattr(self.batchSystem, "getJobOutputFiles"):
737764
# numberOfNodes is treated as a string as it can contain values such as "2-4"
738765
# where 2 would represent the minimum number of nodes to allocate, and 4 the maximum
739766
numberOfNodes = self.ceParameters.get("NumberOfNodes", "1")
740767
commandOptions = {
741-
"JobIDList": [jobStamp],
768+
"JobIDList": [batchSystemJobID],
742769
"OutputDir": self.batchOutput,
743770
"ErrorDir": self.batchError,
744771
"NumberOfNodes": numberOfNodes,
@@ -755,50 +782,10 @@ def _getJobOutputFiles(self, jobID):
755782
self.outputTemplate = result["OutputTemplate"]
756783
self.errorTemplate = result["ErrorTemplate"]
757784

758-
output = result["Jobs"][jobStamp]["Output"]
759-
error = result["Jobs"][jobStamp]["Error"]
760-
else:
761-
output = f"{self.batchOutput}/{jobStamp}.out"
762-
error = f"{self.batchError}/{jobStamp}.err"
763-
764-
return S_OK((jobStamp, host, output, error))
765-
766-
def getJobOutput(self, jobID, localDir=None):
767-
"""Get the specified job standard output and error files. If the localDir is provided,
768-
the output is returned as file in this directory. Otherwise, the output is returned
769-
as strings.
770-
"""
771-
self.log.verbose("Getting output for jobID", jobID)
772-
result = self._getJobOutputFiles(jobID)
773-
if not result["OK"]:
774-
return result
775-
776-
jobStamp, host, outputFile, errorFile = result["Value"]
777-
778-
if localDir:
779-
localOutputFile = f"{localDir}/{jobStamp}.out"
780-
localErrorFile = f"{localDir}/{jobStamp}.err"
781-
else:
782-
localOutputFile = "Memory"
783-
localErrorFile = "Memory"
784-
785-
# Take into account the SSHBatch possible SSHHost syntax
786-
host = host.split("/")[0]
787-
788-
ssh = SSH(host=host, parameters=self.ceParameters)
789-
resultStdout = ssh.scpCall(30, localOutputFile, outputFile, upload=False)
790-
if not resultStdout["OK"]:
791-
return resultStdout
792-
793-
resultStderr = ssh.scpCall(30, localErrorFile, errorFile, upload=False)
794-
if not resultStderr["OK"]:
795-
return resultStderr
796-
797-
if localDir:
798-
output = localOutputFile
799-
error = localErrorFile
785+
output = result["Jobs"][batchSystemJobID]["Output"]
786+
error = result["Jobs"][batchSystemJobID]["Error"]
800787
else:
801-
output = resultStdout["Value"][1]
802-
error = resultStderr["Value"][1]
788+
output = f"{self.batchOutput}/{batchSystemJobID}.out"
789+
error = f"{self.batchError}/{batchSystemJobID}.err"
803790

804-
return S_OK((output, error))
791+
return S_OK((batchSystemJobID, host, output, error))

0 commit comments

Comments
 (0)