Skip to content

Commit 0f6a4c7

Browse files
committed
feat (diracx): add initial future client for DiracX job monitoring
1 parent 4d534a3 commit 0f6a4c7

File tree

4 files changed

+47
-0
lines changed

4 files changed

+47
-0
lines changed

docs/source/AdministratorGuide/ServerInstallations/environment_variable_configuration.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ DIRAC_DEPRECATED_FAIL
1818
If set, the use of functions or objects that are marked ``@deprecated`` will fail. Useful for example in continuous
1919
integration tests against future versions of DIRAC
2020

21+
DIRAC_ENABLE_DIRACX_JOB_MONITORING
22+
If set, calls the diracx job monitoring service. Off by default.
23+
2124
DIRAC_FEWER_CFG_LOCKS
2225
If ``true`` or ``yes`` or ``on`` or ``1`` or ``y`` or ``t``, DIRAC will reduce the number of locks used when accessing the CS for better performance (default, ``no``).
2326

src/DIRAC/WorkloadManagementSystem/Client/JobMonitoringClient.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
""" Class that contains client access to the job monitoring handler. """
22

3+
import os
34
from DIRAC.Core.Base.Client import Client, createClient
45
from DIRAC.Core.Utilities.DEncode import ignoreEncodeWarning
56
from DIRAC.Core.Utilities.JEncode import strToIntDict
@@ -11,6 +12,13 @@ def __init__(self, **kwargs):
1112
super().__init__(**kwargs)
1213
self.setServer("WorkloadManagement/JobMonitoring")
1314

15+
if os.getenv("DIRAC_ENABLE_DIRACX_JOB_MONITORING", "No").lower() in ("yes", "true"):
16+
from DIRAC.WorkloadManagementSystem.FutureClient.JobMonitoringClient import (
17+
JobMonitoringClient as futureJobMonitoringClient,
18+
)
19+
20+
httpsClient = futureJobMonitoringClient
21+
1422
@ignoreEncodeWarning
1523
def getJobsStatus(self, jobIDs):
1624
res = self._getRPC().getJobsStatus(jobIDs)
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# pylint: disable=import-error
2+
import os
3+
4+
if os.getenv("DIRAC_ENABLE_DIRACX_JOB_MONITORING", "No").lower() in ("yes", "true"):
5+
from diracx.client import Dirac
6+
from diracx.client.models import JobSearchParams
7+
8+
from diracx.cli.utils import get_auth_headers
9+
from diracx.core.preferences import DiracxPreferences
10+
11+
from DIRAC.Core.Utilities.ReturnValues import convertToReturnValue
12+
13+
class JobMonitoringClient:
14+
def __init__(self, *args, **kwargs):
15+
self.endpoint = DiracxPreferences().url
16+
17+
def fetch(self, parameters, jobIDs):
18+
with Dirac(endpoint=self.endpoint) as api:
19+
jobs = api.jobs.search(
20+
parameters=["JobID"] + parameters,
21+
search=[{"parameter": "JobID", "operator": "in", "values": jobIDs}],
22+
headers=get_auth_headers(),
23+
)
24+
return {j["JobID"]: {param: j[param] for param in parameters} for j in jobs}
25+
26+
@convertToReturnValue
27+
def getJobsMinorStatus(self, jobIDs):
28+
return self.fetch(["MinorStatus"], jobIDs)
29+
30+
@convertToReturnValue
31+
def getJobsStates(self, jobIDs):
32+
return self.fetch(["Status", "MinorStatus", "ApplicationStatus"], jobIDs)
33+
34+
@convertToReturnValue
35+
def getJobsSites(self, jobIDs):
36+
return self.fetch(["Site"], jobIDs)

src/DIRAC/WorkloadManagementSystem/FutureClient/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)