@@ -124,7 +124,7 @@ def __setJobStatus(
124
124
sDict ["Source" ] = source
125
125
if not datetime :
126
126
datetime = Time .toString ()
127
- return cls .__setJobStatusBulk (jobID , {datetime : sDict }, force = force )
127
+ return cls ._setJobStatusBulk (jobID , {datetime : sDict }, force = force )
128
128
return S_OK ()
129
129
130
130
###########################################################################
@@ -133,31 +133,29 @@ def __setJobStatus(
133
133
@classmethod
134
134
def export_setJobStatusBulk (cls , jobID , statusDict , force = False ):
135
135
"""Set various job status fields with a time stamp and a source"""
136
- return cls .__setJobStatusBulk (jobID , statusDict , force = force )
136
+ return cls ._setJobStatusBulk (jobID , statusDict , force = force )
137
137
138
138
@classmethod
139
- def __setJobStatusBulk (cls , jobID , statusDict , force = False ):
140
- """Set various status fields for job specified by its JobId .
139
+ def _setJobStatusBulk (cls , jobID , statusDict , force = False ):
140
+ """Set various status fields for job specified by its jobId .
141
141
Set only the last status in the JobDB, updating all the status
142
142
logging information in the JobLoggingDB. The statusDict has datetime
143
143
as a key and status information dictionary as values
144
144
"""
145
- status = ""
146
- minor = ""
147
- application = ""
148
145
jobID = int (jobID )
146
+ log = cls .log .getLocalSubLogger ("JobStatusBulk/Job-%d" % jobID )
149
147
150
148
result = cls .jobDB .getJobAttributes (jobID , ["Status" , "StartExecTime" , "EndExecTime" ])
151
149
if not result ["OK" ]:
152
150
return result
153
-
154
151
if not result ["Value" ]:
155
152
# if there is no matching Job it returns an empty dictionary
156
153
return S_ERROR ("No Matching Job" )
154
+
157
155
# If the current status is Stalled and we get an update, it should probably be "Running"
158
156
currentStatus = result ["Value" ]["Status" ]
159
157
if currentStatus == JobStatus .STALLED :
160
- status = JobStatus .RUNNING
158
+ currentStatus = JobStatus .RUNNING
161
159
startTime = result ["Value" ].get ("StartExecTime" )
162
160
endTime = result ["Value" ].get ("EndExecTime" )
163
161
# getJobAttributes only returns strings :(
@@ -166,66 +164,75 @@ def __setJobStatusBulk(cls, jobID, statusDict, force=False):
166
164
if endTime == "None" :
167
165
endTime = None
168
166
169
- # Get the latest WN time stamps of status updates
170
- result = cls .jobLoggingDB .getWMSTimeStamps (int (jobID ))
171
- if not result ["OK" ]:
172
- return result
173
- lastTime = max ([float (t ) for s , t in result ["Value" ].items () if s != "LastTime" ])
174
- lastTime = Time .toString (Time .fromEpoch (lastTime ))
175
-
176
- dates = sorted (statusDict )
177
- # If real updates, start from the current status
178
- if dates [0 ] >= lastTime and not status :
179
- status = currentStatus
180
- log = cls .log .getLocalSubLogger ("JobStatusBulk/Job-%s" % jobID )
181
- log .debug ("*** New call ***" , "Last update time %s - Sorted new times %s" % (lastTime , dates ))
182
167
# Remove useless items in order to make it simpler later, although there should not be any
183
168
for sDict in statusDict .values ():
184
169
for item in sorted (sDict ):
185
170
if not sDict [item ]:
186
171
sDict .pop (item , None )
187
- # Pick up start and end times from all updates, if they don't exist
188
- newStat = status
189
- for date in dates :
190
- sDict = statusDict [date ]
191
- # This is to recover Matched jobs that set the application status: they are running!
192
- if sDict .get ("ApplicationStatus" ) and newStat == JobStatus .MATCHED :
193
- sDict ["Status" ] = JobStatus .RUNNING
172
+
173
+ # Get the latest time stamps of major status updates
174
+ result = cls .jobLoggingDB .getWMSTimeStamps (int (jobID ))
175
+ if not result ["OK" ]:
176
+ return result
177
+ if not result ["Value" ]:
178
+ return S_ERROR ("No registered WMS timeStamps" )
179
+ # This is more precise than "LastTime". timeStamps is a sorted list of tuples...
180
+ timeStamps = sorted ((float (t ), s ) for s , t in result ["Value" ].items () if s != "LastTime" )
181
+ lastTime = Time .toString (Time .fromEpoch (timeStamps [- 1 ][0 ]))
182
+
183
+ # Get chronological order of new updates
184
+ updateTimes = sorted (statusDict )
185
+ log .debug ("*** New call ***" , "Last update time %s - Sorted new times %s" % (lastTime , updateTimes ))
186
+ # Get the status (if any) at the time of the first update
187
+ newStat = ""
188
+ firstUpdate = Time .toEpoch (Time .fromString (updateTimes [0 ]))
189
+ for ts , st in timeStamps :
190
+ if firstUpdate >= ts :
191
+ newStat = st
192
+ # Pick up start and end times from all updates
193
+ for updTime in updateTimes :
194
+ sDict = statusDict [updTime ]
194
195
newStat = sDict .get ("Status" , newStat )
195
196
196
- # evaluate the state machine
197
- if not force and newStat :
198
- res = JobStatus .JobsStateMachine (currentStatus ).getNextState (newStat )
199
- if not res ["OK" ]:
200
- return res
201
- nextState = res ["Value" ]
202
-
203
- # If the JobsStateMachine does not accept the candidate, don't update
204
- if newStat != nextState :
205
- log .error (
206
- "Job Status Error" ,
207
- "%s can't move from %s to %s: using %s" % (jobID , currentStatus , newStat , nextState ),
208
- )
209
- newStat = nextState
210
- sDict ["Status" ] = newStat
211
- currentStatus = newStat
212
-
213
- if newStat == JobStatus .RUNNING and not startTime :
197
+ if not startTime and newStat == JobStatus .RUNNING :
214
198
# Pick up the start date when the job starts running if not existing
215
- startTime = date
199
+ startTime = updTime
216
200
log .debug ("Set job start time" , startTime )
217
- elif newStat in JobStatus .JOB_FINAL_STATES and not endTime :
201
+ elif not endTime and newStat in JobStatus .JOB_FINAL_STATES :
218
202
# Pick up the end time when the job is in a final status
219
- endTime = date
203
+ endTime = updTime
220
204
log .debug ("Set job end time" , endTime )
221
205
222
- # We should only update the status if its time stamp is more recent than the last update
223
- if dates [- 1 ] >= lastTime :
224
- # Get the last status values
225
- for date in [dt for dt in dates if dt >= lastTime ]:
226
- sDict = statusDict [date ]
227
- log .debug ("\t " , "Time %s - Statuses %s" % (date , str (sDict )))
228
- status = sDict .get ("Status" , status )
206
+ # We should only update the status to the last one if its time stamp is more recent than the last update
207
+ if updateTimes [- 1 ] >= lastTime :
208
+ minor = ""
209
+ application = ""
210
+ # Get the last status values looping on the most recent upupdateTimes in chronological order
211
+ for updTime in [dt for dt in updateTimes if dt >= lastTime ]:
212
+ sDict = statusDict [updTime ]
213
+ log .debug ("\t " , "Time %s - Statuses %s" % (updTime , str (sDict )))
214
+ status = sDict .get ("Status" , currentStatus )
215
+ # evaluate the state machine if the status is changing
216
+ if not force and status != currentStatus :
217
+ res = JobStatus .JobsStateMachine (currentStatus ).getNextState (status )
218
+ if not res ["OK" ]:
219
+ return res
220
+ newStat = res ["Value" ]
221
+ # If the JobsStateMachine does not accept the candidate, don't update
222
+ if newStat != status :
223
+ # keeping the same status
224
+ log .error (
225
+ "Job Status Error" ,
226
+ "%s can't move from %s to %s: using %s" % (jobID , currentStatus , status , newStat ),
227
+ )
228
+ status = newStat
229
+ sDict ["Status" ] = newStat
230
+ # Change the source to indicate this is not what was requested
231
+ source = sDict .get ("Source" , "" )
232
+ sDict ["Source" ] = source + "(SM)"
233
+ # at this stage status == newStat. Set currentStatus to this new status
234
+ currentStatus = newStat
235
+
229
236
minor = sDict .get ("MinorStatus" , minor )
230
237
application = sDict .get ("ApplicationStatus" , application )
231
238
@@ -257,19 +264,19 @@ def __setJobStatusBulk(cls, jobID, statusDict, force=False):
257
264
return result
258
265
259
266
# Update the JobLoggingDB records
260
- for date in dates :
261
- sDict = statusDict [date ]
267
+ for updTime in updateTimes :
268
+ sDict = statusDict [updTime ]
262
269
status = sDict .get ("Status" , "idem" )
263
270
minor = sDict .get ("MinorStatus" , "idem" )
264
271
application = sDict .get ("ApplicationStatus" , "idem" )
265
272
source = sDict .get ("Source" , "Unknown" )
266
273
result = cls .jobLoggingDB .addLoggingRecord (
267
- jobID , status = status , minorStatus = minor , applicationStatus = application , date = date , source = source
274
+ jobID , status = status , minorStatus = minor , applicationStatus = application , date = updTime , source = source
268
275
)
269
276
if not result ["OK" ]:
270
277
return result
271
278
272
- return S_OK ()
279
+ return S_OK (( attrNames , attrValues ) )
273
280
274
281
###########################################################################
275
282
types_setJobAttribute = [[str , int ], str , str ]
0 commit comments