Skip to content

Commit 433ee48

Browse files
authored
Merge pull request #5937 from DIRACGridBot/cherry-pick-2-0fdafbc52-integration
[sweep:integration] using a DErrno for no match found
2 parents 378eb49 + 057dd94 commit 433ee48

File tree

7 files changed

+41
-37
lines changed

7 files changed

+41
-37
lines changed

src/DIRAC/Core/Base/AgentModule.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
from DIRAC.Core.Utilities import Time, MemStat, Network
1919
from DIRAC.Core.Utilities.Shifter import setupShifterProxyInEnv
2020
from DIRAC.Core.Utilities.ReturnValues import isReturnStructure
21-
from DIRAC.FrameworkSystem.Client.MonitoringClient import gMonitor
2221
from DIRAC.ConfigurationSystem.Client import PathFinder
2322
from DIRAC.FrameworkSystem.Client.MonitoringClient import MonitoringClient
23+
from DIRAC.FrameworkSystem.Client.MonitoringClient import gMonitor
2424
from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler
2525
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
2626

@@ -145,8 +145,6 @@ def __init__(self, agentName, loadName, baseAgentName=False, properties={}):
145145

146146
self.__monitorLastStatsUpdate = -1
147147
self.monitor = None
148-
self.__initializeMonitor()
149-
self.__initialized = False
150148

151149
def __getCodeInfo(self):
152150

@@ -187,6 +185,8 @@ def am_initialize(self, *initArgs):
187185
# Set the work directory in an environment variable available to subprocesses if needed
188186
os.environ["AGENT_WORKDIRECTORY"] = workDirectory
189187

188+
self.__initializeMonitor()
189+
190190
self.__moduleProperties["shifterProxy"] = self.am_getOption("shifterProxy")
191191
if self.am_monitoringEnabled() and not self.activityMonitoring:
192192
self.monitor.enable()
@@ -217,7 +217,6 @@ def am_initialize(self, *initArgs):
217217
else:
218218
self.log.notice(" Watchdog interval: disabled ")
219219
self.log.notice("=" * 40)
220-
self.__initialized = True
221220
return S_OK()
222221

223222
def am_getControlDirectory(self):

src/DIRAC/Core/Utilities/DErrno.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@
119119
EWMSSUBM = 1503
120120
EWMSJMAN = 1504
121121
EWMSSTATUS = 1505
122+
EWMSNOMATCH = 1510
122123
EWMSNOPILOT = 1550
123124

124125
# ## DMS/StorageManagement (16XX)
@@ -198,6 +199,7 @@
198199
1503: "EWMSSUBM",
199200
1504: "EWMSJMAN",
200201
1505: "EWMSSTATUS",
202+
1510: "EWMSNOMATCH",
201203
1550: "EWMSNOPILOT",
202204
# DMS/StorageManagement
203205
1601: "EFILESIZE",
@@ -274,6 +276,7 @@
274276
EWMSJMAN: "Job management error",
275277
EWMSSTATUS: "Job status error",
276278
EWMSNOPILOT: "No pilots found",
279+
EWMSNOMATCH: "No match found",
277280
# DMS/StorageManagement
278281
EFILESIZE: "Bad file size",
279282
EGFAL: "Error with the gfal call",

src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from DIRAC.Core.Base.AgentModule import AgentModule
2424
from DIRAC.Core.Security.ProxyInfo import getProxyInfo
2525
from DIRAC.Core.Security import Properties
26+
from DIRAC.Core.Utilities import DErrno
2627
from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager
2728
from DIRAC.Resources.Computing.BatchSystems.TimeLeft.TimeLeft import TimeLeft
2829
from DIRAC.Resources.Computing.ComputingElementFactory import ComputingElementFactory
@@ -181,7 +182,10 @@ def execute(self):
181182
# if we don't match a job, independently from the reason,
182183
# we wait a bit longer before trying again
183184
self.am_setOption("PollingTime", int(self.am_getOption("PollingTime") * 1.5))
184-
return self._checkMatchingIssues(jobRequest["Message"])
185+
res = self._checkMatchingIssues(jobRequest)
186+
if not res["OK"]:
187+
self._finish(res["Message"])
188+
return res
185189

186190
# Reset the Counter
187191
self.matchFailedCount = 0
@@ -543,28 +547,29 @@ def _matchAJob(self, ceDictList):
543547
break
544548
return jobRequest
545549

546-
def _checkMatchingIssues(self, issueMessage):
550+
def _checkMatchingIssues(self, jobRequest):
547551
"""Check the source of the matching issue
548552
549-
:param str issueMessage: message returned by the matcher
553+
:param dict jobRequest: S_ERROR returned by the matcher
550554
:return: S_OK/S_ERROR
551555
"""
552-
if issueMessage.find("Pilot version does not match") != -1:
556+
557+
if jobRequest["Message"].find("Pilot version does not match") != -1:
553558
errorMsg = "Pilot version does not match the production version"
554-
self.log.error(errorMsg, issueMessage.replace(errorMsg, ""))
555-
return S_ERROR(issueMessage)
559+
self.log.error(errorMsg, jobRequest["Message"].replace(errorMsg, ""))
560+
return jobRequest
556561

557-
if re.search("No match found", issueMessage):
558-
self.log.notice("Job request OK, but no match found", ": %s" % issueMessage)
559-
elif issueMessage.find("seconds timeout") != -1:
560-
self.log.error("Timeout while requesting job", issueMessage)
562+
if DErrno.cmpError(jobRequest, DErrno.EWMSNOMATCH):
563+
self.log.notice("Job request OK, but no match found", jobRequest["Message"])
564+
elif jobRequest["Message"].find("seconds timeout") != -1:
565+
self.log.error("Timeout while requesting job", jobRequest["Message"])
561566
else:
562-
self.log.notice("Failed to get jobs", ": %s" % issueMessage)
567+
self.log.notice("Failed to get jobs", jobRequest["Message"])
563568

564569
self.matchFailedCount += 1
565570
if self.matchFailedCount > self.stopAfterFailedMatches:
566571
return self._finish("Nothing to do for more than %d cycles" % self.stopAfterFailedMatches)
567-
return S_OK(issueMessage)
572+
return S_OK()
568573

569574
def _checkMatcherInfo(self, matcherInfo, matcherParams, jobReport):
570575
"""Check that all relevant information about the job are available"""

src/DIRAC/WorkloadManagementSystem/Agent/PushJobAgent.py

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,12 @@
1111

1212
import sys
1313
import re
14-
import time
15-
import six
1614
import random
1715
from collections import defaultdict
1816

1917
from DIRAC import S_OK, S_ERROR, gConfig
2018
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
19+
from DIRAC.Core.Utilities import DErrno
2120
from DIRAC.ConfigurationSystem.Client.PathFinder import getSystemInstance
2221
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
2322
from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager
@@ -336,7 +335,7 @@ def execute(self):
336335
break
337336

338337
if not jobRequest["OK"]:
339-
self._checkMatchingIssues(jobRequest["Message"])
338+
self._checkMatchingIssues(jobRequest)
340339
self.failedQueues[queueName] += 1
341340
continue
342341

@@ -402,18 +401,17 @@ def _setCEDict(self, ceDict):
402401
if project:
403402
ceDict["ReleaseProject"] = project
404403

405-
def _checkMatchingIssues(self, issueMessage):
404+
def _checkMatchingIssues(self, jobRequest):
406405
"""Check the source of the matching issue
407406
408-
:param str issueMessage: message returned by the matcher
409-
:return: S_OK/S_ERROR
407+
:param dict jobRequest: S_ERROR returned by the matcher
408+
:return: S_OK
410409
"""
411-
matchingFailed = False
412-
if re.search("No match found", issueMessage):
413-
self.log.notice("Job request OK, but no match found", ": %s" % issueMessage)
414-
elif issueMessage.find("seconds timeout") != -1:
415-
self.log.error("Timeout while requesting job", issueMessage)
410+
if DErrno.cmpError(jobRequest, DErrno.EWMSNOMATCH):
411+
self.log.notice("Job request OK, but no match found", jobRequest["Message"])
412+
elif jobRequest["Message"].find("seconds timeout") != -1:
413+
self.log.error("Timeout while requesting job", jobRequest["Message"])
416414
else:
417-
self.log.notice("Failed to get jobs", ": %s" % issueMessage)
415+
self.log.notice("Failed to get jobs", jobRequest["Message"])
418416

419-
return S_OK(issueMessage)
417+
return S_OK()

src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_JobAgent.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from DIRAC.WorkloadManagementSystem.Client.JobReport import JobReport
88
from DIRAC.Resources.Computing.ComputingElementFactory import ComputingElementFactory
99
from DIRAC.Resources.Computing.BatchSystems.TimeLeft.TimeLeft import TimeLeft
10-
from DIRAC import gLogger
10+
from DIRAC import gLogger, S_ERROR
1111

1212
gLogger.setLevel("DEBUG")
1313

@@ -215,7 +215,7 @@ def test__checkMatchingIssues(mocker, issueMessage, stopAfterFailedMatches, matc
215215
jobAgent.stopAfterFailedMatches = stopAfterFailedMatches
216216
jobAgent.matchFailedCount = matchFailedCount
217217

218-
result = jobAgent._checkMatchingIssues(issueMessage)
218+
result = jobAgent._checkMatchingIssues(S_ERROR(issueMessage))
219219
assert result["OK"] == expectedResult
220220

221221

src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_PushJobAgent.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,10 @@
66
from collections import defaultdict
77

88
# DIRAC Components
9-
from DIRAC.WorkloadManagementSystem.Agent.PushJobAgent import PushJobAgent
10-
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
119
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
10+
from DIRAC.WorkloadManagementSystem.Agent.PushJobAgent import PushJobAgent
1211

13-
from DIRAC import gLogger
12+
from DIRAC import gLogger, S_ERROR
1413

1514
gLogger.setLevel("DEBUG")
1615

@@ -98,5 +97,5 @@ def test__checkMatchingIssues(mocker, issueMessage, expectedResult):
9897
jobAgent.log = gLogger
9998
jobAgent.log.setLevel("DEBUG")
10099

101-
result = jobAgent._checkMatchingIssues(issueMessage)
100+
result = jobAgent._checkMatchingIssues(S_ERROR(issueMessage))
102101
assert result["OK"] == expectedResult

src/DIRAC/WorkloadManagementSystem/Service/MatcherHandler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
99
from DIRAC.Core.DISET.RequestHandler import RequestHandler
10+
from DIRAC.Core.Utilities import DErrno
1011
from DIRAC.Core.Utilities.DEncode import ignoreEncodeWarning
1112
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
1213

@@ -87,8 +88,7 @@ def export_requestJob(self, resourceDescription):
8788
gMonitor.addMark("matchesDone")
8889
gMonitor.addMark("matchesOK")
8990
return S_OK(result)
90-
# FIXME: This is correctly interpreted by the JobAgent, but DErrno should be used instead
91-
return S_ERROR("No match found")
91+
return S_ERROR(DErrno.EWMSNOMATCH)
9292

9393
##############################################################################
9494
types_getActiveTaskQueues = []

0 commit comments

Comments
 (0)