|
11 | 11 | import random
|
12 | 12 | import json
|
13 | 13 | import datetime
|
| 14 | +from functools import cached_property |
14 | 15 |
|
15 | 16 | # # from DIRAC
|
16 | 17 | from DIRAC import gLogger, S_OK, S_ERROR
|
17 | 18 | from DIRAC.Core.Utilities.List import randomize, fromChar
|
18 | 19 | from DIRAC.Core.Utilities.JEncode import strToIntDict
|
19 | 20 | from DIRAC.Core.Utilities.DEncode import ignoreEncodeWarning
|
| 21 | +from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader |
| 22 | +from DIRAC.Core.Utilities.ReturnValues import returnValueOrRaise |
20 | 23 | from DIRAC.ConfigurationSystem.Client import PathFinder
|
21 | 24 | from DIRAC.Core.Base.Client import Client, createClient
|
22 | 25 | from DIRAC.RequestManagementSystem.Client.Request import Request
|
23 | 26 | from DIRAC.RequestManagementSystem.private.RequestValidator import RequestValidator
|
24 | 27 | from DIRAC.WorkloadManagementSystem.Client import JobStatus
|
25 | 28 | from DIRAC.WorkloadManagementSystem.Client import JobMinorStatus
|
26 |
| -from DIRAC.WorkloadManagementSystem.Client.JobStateUpdateClient import JobStateUpdateClient |
27 | 29 | from DIRAC.WorkloadManagementSystem.Client.JobMonitoringClient import JobMonitoringClient
|
| 30 | +from DIRAC.WorkloadManagementSystem.Client.JobStateUpdateClient import JobStateUpdateClient |
| 31 | +from DIRAC.WorkloadManagementSystem.Utilities.JobStatusUtility import JobStatusUtility |
28 | 32 |
|
29 | 33 |
|
30 | 34 | @createClient("RequestManagement/ReqManager")
|
@@ -309,8 +313,7 @@ def finalizeRequest(self, requestID, jobID, useCertificates=True):
|
309 | 313 | :param str requestID: request id
|
310 | 314 | :param int jobID: job id
|
311 | 315 | """
|
312 |
| - |
313 |
| - stateServer = JobStateUpdateClient(useCertificates=useCertificates) |
| 316 | + stateServer = _JobDBInteraction(useCertificates) |
314 | 317 |
|
315 | 318 | # Checking if to update the job status - we should fail here, so it will be re-tried later
|
316 | 319 | # Checking the state, first
|
@@ -688,3 +691,52 @@ def recoverableRequest(request):
|
688 | 691 | return False
|
689 | 692 | return True
|
690 | 693 | return True
|
| 694 | + |
| 695 | + |
| 696 | +class _JobDBInteraction: |
| 697 | + """Class to handle the JobDB interaction. |
| 698 | +
|
| 699 | + This will either connect to the DB directly or use the client depending on |
| 700 | + if use_certificates is set or not. |
| 701 | +
|
| 702 | + WARNING: This is not intended for use outside of ReqClient! |
| 703 | + """ |
| 704 | + |
| 705 | + def __init__(self, useCertificates: bool): |
| 706 | + self._useCertificates = useCertificates |
| 707 | + |
| 708 | + def setJobParameter(self, jobID: int, key: str, value: str): |
| 709 | + if self._useCertificates: |
| 710 | + vo = returnValueOrRaise(self._jobStatusUtility.jobDB.getJobAttribute(jobID, "VO")) |
| 711 | + return self._elasticJobParametersDB.setJobParameter(int(jobID), key, value, vo=vo) |
| 712 | + else: |
| 713 | + return self._client.setJobParameter(jobID, key, value) |
| 714 | + |
| 715 | + def setJobStatus(self, jobID: int, newStatus: str, minorStatus: str, source: str): |
| 716 | + if self._useCertificates: |
| 717 | + return self._jobStatusUtility.setJobStatus( |
| 718 | + int(jobID), status=newStatus, minorStatus=minorStatus, source=source |
| 719 | + ) |
| 720 | + else: |
| 721 | + return self._client.setJobStatus(jobID, minorStatus, minorStatus, source) |
| 722 | + |
| 723 | + def setJobApplicationStatus(self, jobID: int, appStatus: str, source: str): |
| 724 | + if self._useCertificates: |
| 725 | + return self._jobStatusUtility.setJobStatus(int(jobID), appStatus=appStatus, source=source) |
| 726 | + else: |
| 727 | + return self._client.setJobApplicationStatus(jobID, appStatus, source) |
| 728 | + |
| 729 | + @cached_property |
| 730 | + def _client(self): |
| 731 | + return JobStateUpdateClient(useCertificates=False) |
| 732 | + |
| 733 | + @cached_property |
| 734 | + def _jobStatusUtility(self): |
| 735 | + return JobStatusUtility() |
| 736 | + |
| 737 | + @cached_property |
| 738 | + def _elasticJobParametersDB(self): |
| 739 | + result = ObjectLoader().loadObject("WorkloadManagementSystem.DB.JobParametersDB", "JobParametersDB") |
| 740 | + if not result["OK"]: |
| 741 | + return result |
| 742 | + return result["Value"]() |
0 commit comments