Skip to content

Commit 0ef9884

Browse files
authored
Merge pull request #5553 from chrisburr/executor-fixes
[rel-v7r2] Fixes to executor infrastructure
2 parents 71d7512 + 63d3639 commit 0ef9884

File tree

7 files changed

+38
-29
lines changed

7 files changed

+38
-29
lines changed

src/DIRAC/Core/Base/ExecutorMindHandler.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ def __sendTask(self, taskId, taskObj, eId, eType):
124124
taskStub = result["Value"]
125125
result = self.srv_msgCreate("ProcessTask")
126126
if not result["OK"]:
127+
gLogger.error("Failed to create message for", "%s %r" % (taskId, result))
127128
return result
128129
msgObj = result["Value"]
129130
msgObj.taskId = taskId

src/DIRAC/Core/DISET/MessageClient.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,14 +83,12 @@ def __cbDisconnect(self, trid):
8383
self.__trid = False
8484
try:
8585
self.__transport.close()
86-
except BaseException:
86+
except Exception:
8787
pass
8888
for cb in self.__specialCallbacks["drop"]:
8989
try:
9090
cb(self)
91-
except SystemExit:
92-
raise
93-
except BaseException:
91+
except Exception:
9492
gLogger.exception("Exception while processing disconnect callbacks")
9593

9694
def __cbRecvMsg(self, trid, msgObj):
@@ -107,7 +105,7 @@ def __cbRecvMsg(self, trid, msgObj):
107105
# If no specific callback but a generic one, return the generic one
108106
if msgName not in self.__callbacks:
109107
return result
110-
except BaseException:
108+
except Exception:
111109
gLogger.exception("Exception while processing callbacks", msgObj.getName())
112110
if msgName not in self.__callbacks:
113111
return S_ERROR("Unexpected message")
@@ -117,7 +115,7 @@ def __cbRecvMsg(self, trid, msgObj):
117115
gLogger.error("Callback for message does not return S_OK/S_ERROR", msgName)
118116
return S_ERROR("No response")
119117
return result
120-
except BaseException:
118+
except Exception:
121119
gLogger.exception("Exception while processing callbacks", msgName)
122120
return S_ERROR("No response")
123121

@@ -128,6 +126,7 @@ def sendMessage(self, msgObj):
128126
if not self.__trid:
129127
result = self.connect()
130128
if not result["OK"]:
129+
gLogger.error("Failed connect for sending", "%r" % msgObj)
131130
return result
132131
return self.__msgBroker.sendMessage(self.__trid, msgObj)
133132

src/DIRAC/Core/DISET/private/MessageBroker.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -165,12 +165,8 @@ def __listenAutoReceiveConnections(self):
165165

166166
try:
167167
events = sel.select(timeout=1)
168-
except (socket.error, select.error):
169-
# TODO: When can this happen?
170-
time.sleep(0.001)
171-
continue
172-
except Exception as e:
173-
gLogger.exception("Exception while selecting persistent connections", lException=e)
168+
except Exception:
169+
gLogger.exception("Exception while selecting persistent connections")
174170
continue
175171

176172
for key, event in events:
@@ -205,7 +201,7 @@ def __receiveMsgDataAndQueue(self, trid):
205201

206202
def __processIncomingData(self, trid, receivedResult):
207203
# If keep alive, return OK
208-
if "keepAlive" in receivedResult and receivedResult["keepAlive"]:
204+
if receivedResult.get("keepAlive"):
209205
return S_OK()
210206
# If idle read return
211207
self.__trInOutLock.acquire()

src/DIRAC/Core/Utilities/ExecutorDispatcher.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@ def getIdleExecutor(self, eType):
103103
idleId = None
104104
maxFreeSlots = 0
105105
try:
106-
for eId in self.__typeToId[eType]:
106+
# Work on a copy of self.__typeToId[eType] to race conditions causing it's size to change while iterating
107+
for eId in list(self.__typeToId[eType]):
107108
freeSlots = self.freeSlots(eId)
108109
if freeSlots > maxFreeSlots:
109110
maxFreeSlots = freeSlots
@@ -425,7 +426,7 @@ def addExecutor(self, eId, eTypes, maxTasks=1):
425426
self.__fillExecutors(eType)
426427

427428
def removeExecutor(self, eId):
428-
self.__log.verbose("Removing executor %s" % eId)
429+
self.__log.info("Removing executor %s" % eId)
429430
self.__executorsLock.acquire()
430431
try:
431432
if eId not in self.__idMap:
@@ -439,8 +440,8 @@ def removeExecutor(self, eId):
439440
eTask = self.__tasks[taskId]
440441
except KeyError:
441442
# Task already removed
442-
pass
443-
if eTask.eType:
443+
eTask = None
444+
if eTask and eTask.eType:
444445
self.__queues.pushTask(eTask.eType, taskId, ahead=True)
445446
else:
446447
self.__dispatchTask(taskId)
@@ -844,6 +845,8 @@ def __msgTaskToExecutor(self, taskId, eId, eType):
844845
errMsg = "Send task callback did not send back an S_OK/S_ERROR structure"
845846
self.__log.fatal(errMsg)
846847
raise ValueError(errMsg)
848+
if not result["OK"]:
849+
self.__log.error("Failed to cbSendTask", "%r" % result)
847850

848851

849852
class UnrecoverableTaskException(Exception):

src/DIRAC/WorkloadManagementSystem/Client/JobState/JobState.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,10 @@ def reset(self):
3737
def checkDBAccess(cls):
3838
# Init DB if there
3939
if not JobState.__db.checked:
40-
JobState.__db.checked = True
4140
JobState.__db.jobDB = JobDB()
4241
JobState.__db.logDB = JobLoggingDB()
4342
JobState.__db.tqDB = TaskQueueDB()
43+
JobState.__db.checked = True
4444

4545
def __init__(self, jid, source="Unknown"):
4646
self.__jid = jid

src/DIRAC/WorkloadManagementSystem/DB/JobLoggingDB.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,11 @@ def __init__(self):
3636
def addLoggingRecord(
3737
self,
3838
jobID,
39-
status="idem",
40-
minorStatus="idem",
41-
applicationStatus="idem",
39+
status=None,
40+
minorStatus=None,
41+
applicationStatus=None,
4242
date=None,
43-
source="Unknown",
43+
source=None,
4444
minor=None,
4545
application=None,
4646
):
@@ -59,6 +59,11 @@ def addLoggingRecord(
5959
if application:
6060
applicationStatus = application
6161

62+
status = status or "idem"
63+
minorStatus = minorStatus or "idem"
64+
applicationStatus = applicationStatus or "idem"
65+
source = source or "Unknown"
66+
6267
event = "status/minor/app=%s/%s/%s" % (status, minorStatus, applicationStatus)
6368
self.log.info("Adding record for job ", str(jobID) + ": '" + event + "' from " + source)
6469

src/DIRAC/WorkloadManagementSystem/Service/OptimizationMindHandler.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -201,15 +201,20 @@ def exec_deserializeTask(cls, taskStub):
201201

202202
@classmethod
203203
def exec_taskError(cls, jid, cachedJobState, errorMsg):
204-
result = cachedJobState.commitChanges()
205-
if not result["OK"]:
206-
cls.log.error("Cannot write changes to job %s: %s" % (jid, result["Message"]))
204+
if cachedJobState:
205+
result = cachedJobState.commitChanges()
206+
if not result["OK"]:
207+
cls.log.error("Cannot write changes to job %s: %s" % (jid, result["Message"]))
208+
else:
209+
cls.log.error(
210+
"Called exec_taskError with",
211+
"jid=%r cachedJobState=%r errorMsg=%r" % (jid, cachedJobState, errorMsg),
212+
)
207213
jobState = JobState(jid)
208214
result = jobState.getStatus()
209-
if result["OK"]:
210-
if result["Value"][0].lower() == "failed":
211-
return S_OK()
212-
else:
215+
if not result["OK"]:
213216
cls.log.error("Could not get status of job %s: %s" % (jid, result["Message "]))
217+
elif result["Value"][0].lower() == "failed":
218+
return S_OK()
214219
cls.log.notice("Job %s: Setting to Failed|%s" % (jid, errorMsg))
215220
return jobState.setStatus("Failed", errorMsg, source="OptimizationMindHandler")

0 commit comments

Comments
 (0)