Skip to content

Commit 37c59a3

Browse files
authored
Merge pull request #5793 from fstagni/cherry-pick-2-8150a4c54-integration
[sweep:integration] Fix setJobStatus service
2 parents 352b890 + 68d9b1e commit 37c59a3

File tree

4 files changed

+311
-71
lines changed

4 files changed

+311
-71
lines changed

src/DIRAC/WorkloadManagementSystem/Agent/PilotStatusAgent.py

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
""" The Pilot Status Agent updates the status of the pilot jobs in the
22
PilotAgents database.
3+
4+
.. literalinclude:: ../ConfigTemplate.cfg
5+
:start-after: ##BEGIN PilotStatusAgent
6+
:end-before: ##END
7+
:dedent: 2
8+
:caption: PilotStatusAgent options
39
"""
410

511
from DIRAC import S_OK, gConfig
@@ -15,9 +21,6 @@
1521
from DIRAC.WorkloadManagementSystem.DB.PilotAgentsDB import PilotAgentsDB
1622
from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB
1723

18-
MAX_JOBS_QUERY = 10
19-
MAX_WAITING_STATE_LENGTH = 3
20-
2124

2225
class PilotStatusAgent(AgentModule):
2326
"""
@@ -42,9 +45,7 @@ def __init__(self, *args, **kwargs):
4245
def initialize(self):
4346
"""Sets defaults"""
4447

45-
self.am_setOption("PollingTime", 120)
4648
self.am_setOption("GridEnv", "")
47-
self.am_setOption("PilotStalledDays", 3)
4849
self.pilotDB = PilotAgentsDB()
4950
self.diracadmin = DiracAdmin()
5051
self.jobDB = JobDB()
@@ -67,14 +68,13 @@ def execute(self):
6768
instance = gConfig.getValue("/DIRAC/Setups/%s/WorkloadManagement" % setup, "")
6869
if instance:
6970
self.gridEnv = gConfig.getValue("/Systems/WorkloadManagement/%s/GridEnv" % instance, "")
71+
7072
result = self.pilotDB._getConnection()
71-
if result["OK"]:
72-
connection = result["Value"]
73-
else:
73+
if not result["OK"]:
7474
return result
75+
connection = result["Value"]
7576

76-
# Now handle pilots not updated in the last N days (most likely the Broker is no
77-
# longer available) and declare them Deleted.
77+
# Now handle pilots not updated in the last N days and declare them Deleted.
7878
result = self.handleOldPilots(connection)
7979

8080
connection.close()

src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,12 +158,14 @@ Agents
158158
IncludeMasterCS = True
159159
}
160160
##END
161+
##BEGIN PilotStatusAgent
161162
PilotStatusAgent
162163
{
163164
PollingTime = 300
164165
# Flag enabling sending of the Pilot accounting info to the Accounting Service
165166
PilotAccountingEnabled = yes
166167
}
168+
##END
167169
JobAgent
168170
{
169171
FillingModeFlag = true
@@ -288,6 +290,8 @@ Agents
288290
Backends = Accounting
289291
# the name of the message queue used for the failover
290292
MessageQueue = dirac.wmshistory
293+
# Polling time. For this agent it should always be 15 minutes.
294+
PollingTime = 900
291295
}
292296
##END
293297
CloudDirector

src/DIRAC/WorkloadManagementSystem/Service/JobStateUpdateHandler.py

Lines changed: 68 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ def __setJobStatus(
124124
sDict["Source"] = source
125125
if not datetime:
126126
datetime = Time.toString()
127-
return cls.__setJobStatusBulk(jobID, {datetime: sDict}, force=force)
127+
return cls._setJobStatusBulk(jobID, {datetime: sDict}, force=force)
128128
return S_OK()
129129

130130
###########################################################################
@@ -133,31 +133,29 @@ def __setJobStatus(
133133
@classmethod
134134
def export_setJobStatusBulk(cls, jobID, statusDict, force=False):
135135
"""Set various job status fields with a time stamp and a source"""
136-
return cls.__setJobStatusBulk(jobID, statusDict, force=force)
136+
return cls._setJobStatusBulk(jobID, statusDict, force=force)
137137

138138
@classmethod
139-
def __setJobStatusBulk(cls, jobID, statusDict, force=False):
140-
"""Set various status fields for job specified by its JobId.
139+
def _setJobStatusBulk(cls, jobID, statusDict, force=False):
140+
"""Set various status fields for job specified by its jobId.
141141
Set only the last status in the JobDB, updating all the status
142142
logging information in the JobLoggingDB. The statusDict has datetime
143143
as a key and status information dictionary as values
144144
"""
145-
status = ""
146-
minor = ""
147-
application = ""
148145
jobID = int(jobID)
146+
log = cls.log.getLocalSubLogger("JobStatusBulk/Job-%d" % jobID)
149147

150148
result = cls.jobDB.getJobAttributes(jobID, ["Status", "StartExecTime", "EndExecTime"])
151149
if not result["OK"]:
152150
return result
153-
154151
if not result["Value"]:
155152
# if there is no matching Job it returns an empty dictionary
156153
return S_ERROR("No Matching Job")
154+
157155
# If the current status is Stalled and we get an update, it should probably be "Running"
158156
currentStatus = result["Value"]["Status"]
159157
if currentStatus == JobStatus.STALLED:
160-
status = JobStatus.RUNNING
158+
currentStatus = JobStatus.RUNNING
161159
startTime = result["Value"].get("StartExecTime")
162160
endTime = result["Value"].get("EndExecTime")
163161
# getJobAttributes only returns strings :(
@@ -166,66 +164,75 @@ def __setJobStatusBulk(cls, jobID, statusDict, force=False):
166164
if endTime == "None":
167165
endTime = None
168166

169-
# Get the latest WN time stamps of status updates
170-
result = cls.jobLoggingDB.getWMSTimeStamps(int(jobID))
171-
if not result["OK"]:
172-
return result
173-
lastTime = max([float(t) for s, t in result["Value"].items() if s != "LastTime"])
174-
lastTime = Time.toString(Time.fromEpoch(lastTime))
175-
176-
dates = sorted(statusDict)
177-
# If real updates, start from the current status
178-
if dates[0] >= lastTime and not status:
179-
status = currentStatus
180-
log = cls.log.getLocalSubLogger("JobStatusBulk/Job-%s" % jobID)
181-
log.debug("*** New call ***", "Last update time %s - Sorted new times %s" % (lastTime, dates))
182167
# Remove useless items in order to make it simpler later, although there should not be any
183168
for sDict in statusDict.values():
184169
for item in sorted(sDict):
185170
if not sDict[item]:
186171
sDict.pop(item, None)
187-
# Pick up start and end times from all updates, if they don't exist
188-
newStat = status
189-
for date in dates:
190-
sDict = statusDict[date]
191-
# This is to recover Matched jobs that set the application status: they are running!
192-
if sDict.get("ApplicationStatus") and newStat == JobStatus.MATCHED:
193-
sDict["Status"] = JobStatus.RUNNING
172+
173+
# Get the latest time stamps of major status updates
174+
result = cls.jobLoggingDB.getWMSTimeStamps(int(jobID))
175+
if not result["OK"]:
176+
return result
177+
if not result["Value"]:
178+
return S_ERROR("No registered WMS timeStamps")
179+
# This is more precise than "LastTime". timeStamps is a sorted list of tuples...
180+
timeStamps = sorted((float(t), s) for s, t in result["Value"].items() if s != "LastTime")
181+
lastTime = Time.toString(Time.fromEpoch(timeStamps[-1][0]))
182+
183+
# Get chronological order of new updates
184+
updateTimes = sorted(statusDict)
185+
log.debug("*** New call ***", "Last update time %s - Sorted new times %s" % (lastTime, updateTimes))
186+
# Get the status (if any) at the time of the first update
187+
newStat = ""
188+
firstUpdate = Time.toEpoch(Time.fromString(updateTimes[0]))
189+
for ts, st in timeStamps:
190+
if firstUpdate >= ts:
191+
newStat = st
192+
# Pick up start and end times from all updates
193+
for updTime in updateTimes:
194+
sDict = statusDict[updTime]
194195
newStat = sDict.get("Status", newStat)
195196

196-
# evaluate the state machine
197-
if not force and newStat:
198-
res = JobStatus.JobsStateMachine(currentStatus).getNextState(newStat)
199-
if not res["OK"]:
200-
return res
201-
nextState = res["Value"]
202-
203-
# If the JobsStateMachine does not accept the candidate, don't update
204-
if newStat != nextState:
205-
log.error(
206-
"Job Status Error",
207-
"%s can't move from %s to %s: using %s" % (jobID, currentStatus, newStat, nextState),
208-
)
209-
newStat = nextState
210-
sDict["Status"] = newStat
211-
currentStatus = newStat
212-
213-
if newStat == JobStatus.RUNNING and not startTime:
197+
if not startTime and newStat == JobStatus.RUNNING:
214198
# Pick up the start date when the job starts running if not existing
215-
startTime = date
199+
startTime = updTime
216200
log.debug("Set job start time", startTime)
217-
elif newStat in JobStatus.JOB_FINAL_STATES and not endTime:
201+
elif not endTime and newStat in JobStatus.JOB_FINAL_STATES:
218202
# Pick up the end time when the job is in a final status
219-
endTime = date
203+
endTime = updTime
220204
log.debug("Set job end time", endTime)
221205

222-
# We should only update the status if its time stamp is more recent than the last update
223-
if dates[-1] >= lastTime:
224-
# Get the last status values
225-
for date in [dt for dt in dates if dt >= lastTime]:
226-
sDict = statusDict[date]
227-
log.debug("\t", "Time %s - Statuses %s" % (date, str(sDict)))
228-
status = sDict.get("Status", status)
206+
# We should only update the status to the last one if its time stamp is more recent than the last update
207+
if updateTimes[-1] >= lastTime:
208+
minor = ""
209+
application = ""
210+
# Get the last status values looping on the most recent upupdateTimes in chronological order
211+
for updTime in [dt for dt in updateTimes if dt >= lastTime]:
212+
sDict = statusDict[updTime]
213+
log.debug("\t", "Time %s - Statuses %s" % (updTime, str(sDict)))
214+
status = sDict.get("Status", currentStatus)
215+
# evaluate the state machine if the status is changing
216+
if not force and status != currentStatus:
217+
res = JobStatus.JobsStateMachine(currentStatus).getNextState(status)
218+
if not res["OK"]:
219+
return res
220+
newStat = res["Value"]
221+
# If the JobsStateMachine does not accept the candidate, don't update
222+
if newStat != status:
223+
# keeping the same status
224+
log.error(
225+
"Job Status Error",
226+
"%s can't move from %s to %s: using %s" % (jobID, currentStatus, status, newStat),
227+
)
228+
status = newStat
229+
sDict["Status"] = newStat
230+
# Change the source to indicate this is not what was requested
231+
source = sDict.get("Source", "")
232+
sDict["Source"] = source + "(SM)"
233+
# at this stage status == newStat. Set currentStatus to this new status
234+
currentStatus = newStat
235+
229236
minor = sDict.get("MinorStatus", minor)
230237
application = sDict.get("ApplicationStatus", application)
231238

@@ -257,19 +264,19 @@ def __setJobStatusBulk(cls, jobID, statusDict, force=False):
257264
return result
258265

259266
# Update the JobLoggingDB records
260-
for date in dates:
261-
sDict = statusDict[date]
267+
for updTime in updateTimes:
268+
sDict = statusDict[updTime]
262269
status = sDict.get("Status", "idem")
263270
minor = sDict.get("MinorStatus", "idem")
264271
application = sDict.get("ApplicationStatus", "idem")
265272
source = sDict.get("Source", "Unknown")
266273
result = cls.jobLoggingDB.addLoggingRecord(
267-
jobID, status=status, minorStatus=minor, applicationStatus=application, date=date, source=source
274+
jobID, status=status, minorStatus=minor, applicationStatus=application, date=updTime, source=source
268275
)
269276
if not result["OK"]:
270277
return result
271278

272-
return S_OK()
279+
return S_OK((attrNames, attrValues))
273280

274281
###########################################################################
275282
types_setJobAttribute = [[str, int], str, str]

0 commit comments

Comments
 (0)