Skip to content

Commit c4a6c6d

Browse files
authored
Merge pull request #6918 from fstagni/80_fixSingleton
[8.0] make singletons of VirtualMachineDB and PilotAgentsDB
2 parents 9a91795 + 6ebf29c commit c4a6c6d

File tree

8 files changed

+60
-69
lines changed

8 files changed

+60
-69
lines changed

src/DIRAC/Core/Utilities/ServerUtils.py

Lines changed: 0 additions & 24 deletions
This file was deleted.

src/DIRAC/WorkloadManagementSystem/Agent/CloudDirector.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,14 @@
1010
from DIRAC.ConfigurationSystem.Client.Helpers import CSGlobals, Registry, Resources
1111
from DIRAC.WorkloadManagementSystem.Client.MatcherClient import MatcherClient
1212
from DIRAC.Core.Utilities.List import fromChar
13-
from DIRAC.WorkloadManagementSystem.Client.ServerUtils import pilotAgentsDB
13+
from DIRAC.WorkloadManagementSystem.Client.ServerUtils import getPilotAgentsDB, getVirtualMachineDB
1414
from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus
1515
from DIRAC.Resources.Cloud.EndpointFactory import EndpointFactory
1616
from DIRAC.ConfigurationSystem.Client.Helpers.Resources import (
1717
findGenericCloudCredentials,
1818
getVMTypes,
1919
getPilotBootstrapParameters,
2020
)
21-
from DIRAC.WorkloadManagementSystem.Client.ServerUtils import virtualMachineDB
2221
from DIRAC.WorkloadManagementSystem.Utilities.Utils import getProxyFileForCloud
2322

2423

@@ -54,6 +53,8 @@ def __init__(self, *args, **kwargs):
5453

5554
def initialize(self):
5655
self.siteClient = SiteStatus()
56+
self.pilotAgentsDB = getPilotAgentsDB()
57+
self.virtualMachineDB = getVirtualMachineDB()
5758
return S_OK()
5859

5960
def beginExecution(self):
@@ -324,7 +325,7 @@ def createVMs(self):
324325

325326
tqIDList = list(result["Value"].keys())
326327

327-
result = virtualMachineDB.getInstanceCounters("Status", {})
328+
result = self.virtualMachineDB.getInstanceCounters("Status", {})
328329
totalVMs = 0
329330
if result["OK"]:
330331
for status in result["Value"]:
@@ -410,7 +411,7 @@ def createVMs(self):
410411

411412
# Get the number of already instantiated VMs for these task queues
412413
totalWaitingVMs = 0
413-
result = virtualMachineDB.getInstanceCounters("Status", {"Endpoint": endpoint})
414+
result = self.virtualMachineDB.getInstanceCounters("Status", {"Endpoint": endpoint})
414415
if result["OK"]:
415416
for status in result["Value"]:
416417
if status in ["New", "Submitted"]:
@@ -465,7 +466,7 @@ def createVMs(self):
465466
for uuID in vmDict:
466467
diracUUID = vmDict[uuID]["InstanceID"]
467468
endpoint = f"{self.vmTypeDict[vmType]['Site']}::{ceName}"
468-
result = virtualMachineDB.insertInstance(uuID, vmTypeName, diracUUID, endpoint, self.vo)
469+
result = self.virtualMachineDB.insertInstance(uuID, vmTypeName, diracUUID, endpoint, self.vo)
469470
if not result["OK"]:
470471
continue
471472
pRef = "vm://" + ceName + "/" + diracUUID + ":00"
@@ -489,7 +490,9 @@ def createVMs(self):
489490
tqDict[tqID].append(pilotID)
490491

491492
for tqID, pilotList in tqDict.items():
492-
result = pilotAgentsDB.addPilotTQReference(pilotList, tqID, "", "", self.localhost, "Cloud", stampDict)
493+
result = self.pilotAgentsDB.addPilotTQReference(
494+
pilotList, tqID, "", "", self.localhost, "Cloud", stampDict
495+
)
493496
if not result["OK"]:
494497
self.log.error(f"Failed to insert pilots into the PilotAgentsDB: {result['Message']}")
495498

@@ -499,7 +502,7 @@ def createVMs(self):
499502
return S_OK()
500503

501504
def getVMInstances(self, endpoint, maxInstances):
502-
result = virtualMachineDB.getInstanceCounters("Status", {"Endpoint": endpoint})
505+
result = self.virtualMachineDB.getInstanceCounters("Status", {"Endpoint": endpoint})
503506
if not result["OK"]:
504507
return result
505508

src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py

Lines changed: 22 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
from DIRAC.ResourceStatusSystem.Client.SiteStatus import SiteStatus
3434
from DIRAC.WorkloadManagementSystem.Client import PilotStatus
3535
from DIRAC.WorkloadManagementSystem.Client.MatcherClient import MatcherClient
36-
from DIRAC.WorkloadManagementSystem.Client.ServerUtils import pilotAgentsDB
36+
from DIRAC.WorkloadManagementSystem.Client.ServerUtils import getPilotAgentsDB
3737
from DIRAC.WorkloadManagementSystem.private.ConfigHelper import findGenericPilotCredentials
3838
from DIRAC.WorkloadManagementSystem.Service.WMSUtilities import getGridEnv
3939
from DIRAC.WorkloadManagementSystem.Utilities.PilotWrapper import (
@@ -93,6 +93,7 @@ def __init__(self, *args, **kwargs):
9393
self.sendSubmissionMonitoring = False
9494
self.siteClient = None
9595
self.rssClient = None
96+
self.pilotAgentsDB = None
9697
self.rssFlag = None
9798

9899
self.globalParameters = {"NumberOfProcessors": 1, "MaxRAM": 2048}
@@ -148,6 +149,7 @@ def initialize(self):
148149
self.siteClient = SiteStatus()
149150
self.rssClient = ResourceStatus()
150151
self.matcherClient = MatcherClient()
152+
self.pilotAgentsDB = getPilotAgentsDB()
151153

152154
return S_OK()
153155

@@ -390,7 +392,7 @@ def submitPilots(self):
390392
manyWaitingPilotsFlag = False
391393
if self.pilotWaitingFlag:
392394
tqIDList = list(additionalInfo)
393-
result = pilotAgentsDB.countPilots(
395+
result = self.pilotAgentsDB.countPilots(
394396
{"TaskQueueID": tqIDList, "Status": PilotStatus.PILOT_WAITING_STATES}, None
395397
)
396398
if not result["OK"]:
@@ -533,7 +535,9 @@ def _ifAndWhereToSubmit(self):
533535
def monitorJobsQueuesPilots(self, matchingTQs):
534536
"""Just printout of jobs queues and pilots status in TQ"""
535537
tqIDList = list(matchingTQs)
536-
result = pilotAgentsDB.countPilots({"TaskQueueID": tqIDList, "Status": PilotStatus.PILOT_WAITING_STATES}, None)
538+
result = self.pilotAgentsDB.countPilots(
539+
{"TaskQueueID": tqIDList, "Status": PilotStatus.PILOT_WAITING_STATES}, None
540+
)
537541

538542
totalWaitingJobs = 0
539543
for tqDescription in matchingTQs.values():
@@ -834,7 +838,7 @@ def _addPilotTQReference(self, queue, taskQueueDict, pilotList, stampDict):
834838
tqDict[tqID].append(pilotID)
835839

836840
for tqID, pilotsList in tqDict.items():
837-
result = pilotAgentsDB.addPilotTQReference(
841+
result = self.pilotAgentsDB.addPilotTQReference(
838842
pilotsList,
839843
tqID,
840844
self.pilotDN,
@@ -847,7 +851,7 @@ def _addPilotTQReference(self, queue, taskQueueDict, pilotList, stampDict):
847851
self.log.error("Failed add pilots to the PilotAgentsDB", result["Message"])
848852
continue
849853
for pilot in pilotsList:
850-
result = pilotAgentsDB.setPilotStatus(
854+
result = self.pilotAgentsDB.setPilotStatus(
851855
pilot,
852856
PilotStatus.SUBMITTED,
853857
self.queueDict[queue]["CEName"],
@@ -871,7 +875,7 @@ def getQueueSlots(self, queue, manyWaitingPilotsFlag):
871875

872876
# See if there are waiting pilots for this queue. If not, allow submission
873877
if totalSlots and manyWaitingPilotsFlag:
874-
result = pilotAgentsDB.selectPilots(
878+
result = self.pilotAgentsDB.selectPilots(
875879
{"DestinationSite": ceName, "Queue": queueName, "Status": PilotStatus.PILOT_WAITING_STATES}
876880
)
877881
if result["OK"]:
@@ -886,7 +890,7 @@ def getQueueSlots(self, queue, manyWaitingPilotsFlag):
886890
if availableSlotsCount % self.availableSlotsUpdateCycleFactor == 0:
887891
# Get the list of already existing pilots for this queue
888892
jobIDList = None
889-
result = pilotAgentsDB.selectPilots(
893+
result = self.pilotAgentsDB.selectPilots(
890894
{"DestinationSite": ceName, "Queue": queueName, "Status": PilotStatus.PILOT_TRANSIENT_STATES}
891895
)
892896

@@ -916,7 +920,7 @@ def getQueueSlots(self, queue, manyWaitingPilotsFlag):
916920
waitingJobs = 0
917921
totalJobs = 0
918922
if jobIDList:
919-
result = pilotAgentsDB.getPilotInfo(jobIDList)
923+
result = self.pilotAgentsDB.getPilotInfo(jobIDList)
920924
if not result["OK"]:
921925
self.log.warn("Failed to check PilotAgentsDB", f"for queue {queue}: \n{result['Message']}")
922926
self.failedQueues[queue] += 1
@@ -1127,7 +1131,7 @@ def updatePilotStatus(self):
11271131
queueName = self.queueDict[queue]["QueueName"]
11281132
ceType = self.queueDict[queue]["CEType"]
11291133
siteName = self.queueDict[queue]["Site"]
1130-
result = pilotAgentsDB.selectPilots(
1134+
result = self.pilotAgentsDB.selectPilots(
11311135
{
11321136
"DestinationSite": ceName,
11331137
"Queue": queueName,
@@ -1144,7 +1148,7 @@ def updatePilotStatus(self):
11441148
pilotRefs = result["Value"]
11451149
if not pilotRefs:
11461150
continue
1147-
result = pilotAgentsDB.getPilotInfo(pilotRefs)
1151+
result = self.pilotAgentsDB.getPilotInfo(pilotRefs)
11481152
if not result["OK"]:
11491153
self.log.error("Failed to get pilots info from DB", result["Message"])
11501154
continue
@@ -1155,7 +1159,7 @@ def updatePilotStatus(self):
11551159

11561160
# Check if the accounting is to be sent
11571161
if self.sendAccounting:
1158-
result = pilotAgentsDB.selectPilots(
1162+
result = self.pilotAgentsDB.selectPilots(
11591163
{
11601164
"DestinationSite": ceName,
11611165
"Queue": queueName,
@@ -1172,7 +1176,7 @@ def updatePilotStatus(self):
11721176
pilotRefs = result["Value"]
11731177
if not pilotRefs:
11741178
continue
1175-
result = pilotAgentsDB.getPilotInfo(pilotRefs)
1179+
result = self.pilotAgentsDB.getPilotInfo(pilotRefs)
11761180
if not result["OK"]:
11771181
self.log.error("Failed to get pilots info from DB", result["Message"])
11781182
continue
@@ -1195,7 +1199,7 @@ def _updatePilotStatusPerQueue(self, queue, proxy):
11951199
ceType = self.queueDict[queue]["CEType"]
11961200
siteName = self.queueDict[queue]["Site"]
11971201

1198-
result = pilotAgentsDB.selectPilots(
1202+
result = self.pilotAgentsDB.selectPilots(
11991203
{
12001204
"DestinationSite": ceName,
12011205
"Queue": queueName,
@@ -1213,7 +1217,7 @@ def _updatePilotStatusPerQueue(self, queue, proxy):
12131217
if not pilotRefs:
12141218
return
12151219

1216-
result = pilotAgentsDB.getPilotInfo(pilotRefs)
1220+
result = self.pilotAgentsDB.getPilotInfo(pilotRefs)
12171221
if not result["OK"]:
12181222
self.log.error("Failed to get pilots info from DB", result["Message"])
12191223
return
@@ -1289,7 +1293,7 @@ def _updatePilotStatus(self, pilotRefs, pilotDict, pilotCEDict):
12891293

12901294
if newStatus:
12911295
self.log.info("Updating status", f"to {newStatus} for pilot {pRef}")
1292-
result = pilotAgentsDB.setPilotStatus(pRef, newStatus, "", "Updated by SiteDirector")
1296+
result = self.pilotAgentsDB.setPilotStatus(pRef, newStatus, "", "Updated by SiteDirector")
12931297
if not result["OK"]:
12941298
self.log.error(result["Message"])
12951299
if newStatus == "Aborted":
@@ -1328,7 +1332,7 @@ def _getPilotOutput(self, pRef, pilotDict, ce, ceName):
13281332
output, error = result["Value"]
13291333

13301334
if output:
1331-
result = pilotAgentsDB.storePilotOutput(pRef, output, error)
1335+
result = self.pilotAgentsDB.storePilotOutput(pRef, output, error)
13321336
if not result["OK"]:
13331337
self.log.error("Failed to store pilot output", result["Message"])
13341338
else:
@@ -1368,7 +1372,7 @@ def sendPilotAccounting(self, pilotDict):
13681372
self.log.error("Failed to send accounting info for pilot ", pRef)
13691373
else:
13701374
# Set up AccountingSent flag
1371-
result = pilotAgentsDB.setAccountingFlag(pRef)
1375+
result = self.pilotAgentsDB.setAccountingFlag(pRef)
13721376
if not result["OK"]:
13731377
self.log.error("Failed to set accounting flag for pilot ", pRef)
13741378

@@ -1377,7 +1381,7 @@ def sendPilotAccounting(self, pilotDict):
13771381
if result["OK"]:
13781382
for pRef in pilotDict:
13791383
self.log.verbose("Setting AccountingSent flag", f"for pilot {pRef}")
1380-
result = pilotAgentsDB.setAccountingFlag(pRef)
1384+
result = self.pilotAgentsDB.setAccountingFlag(pRef)
13811385
if not result["OK"]:
13821386
self.log.error("Failed to set accounting flag for pilot ", pRef)
13831387
else:

src/DIRAC/WorkloadManagementSystem/Agent/VirtualMachineMonitorAgent.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from DIRAC.ConfigurationSystem.Client.Helpers import Operations
1111
from DIRAC.Core.Base.AgentModule import AgentModule
1212
from DIRAC.Core.Utilities import List, Network
13-
from DIRAC.WorkloadManagementSystem.Client.ServerUtils import virtualMachineDB
13+
from DIRAC.WorkloadManagementSystem.Client.ServerUtils import getVirtualMachineDB
1414

1515

1616
class VirtualMachineMonitorAgent(AgentModule):
@@ -61,7 +61,7 @@ def __declareInstanceRunning(self):
6161
retries = 3
6262
sleepTime = 30
6363
for i in range(retries):
64-
result = virtualMachineDB.declareInstanceRunning(self.uniqueID, self.ipAddress)
64+
result = self.virtualMachineDB.declareInstanceRunning(self.uniqueID, self.ipAddress)
6565
if result["OK"]:
6666
self.log.info("Declared instance running")
6767
return result
@@ -87,6 +87,7 @@ def initialize(self):
8787
self.heartBeatPeriod = None
8888
self.am_setOption("MaxCycles", 0)
8989
self.am_setOption("PollingTime", 60)
90+
self.virtualMachineDB = getVirtualMachineDB()
9091

9192
# Discover net address
9293
self.ipAddress = None
@@ -100,7 +101,7 @@ def initialize(self):
100101

101102
# Declare instance running
102103
self.uniqueID = ""
103-
result = virtualMachineDB.getUniqueIDByName(self.vmID)
104+
result = self.virtualMachineDB.getUniqueIDByName(self.vmID)
104105
if result["OK"]:
105106
self.uniqueID = result["Value"]
106107
result = self.__declareInstanceRunning()
@@ -169,7 +170,7 @@ def execute(self):
169170
if uptime % self.heartBeatPeriod <= self.am_getPollingTime():
170171
# Heartbeat time!
171172
self.log.info("Sending hearbeat...")
172-
result = virtualMachineDB.instanceIDHeartBeat(
173+
result = self.virtualMachineDB.instanceIDHeartBeat(
173174
self.uniqueID,
174175
avgLoad,
175176
numJobs,
@@ -223,7 +224,7 @@ def __haltInstance(self, avgLoad=0.0):
223224
retries = 3
224225
sleepTime = 10
225226
for i in range(retries):
226-
result = virtualMachineDB.declareInstanceHalting(self.uniqueID, avgLoad)
227+
result = self.virtualMachineDB.declareInstanceHalting(self.uniqueID, avgLoad)
227228
if result["OK"]:
228229
self.log.info("Declared instance halting")
229230
break

src/DIRAC/WorkloadManagementSystem/Agent/test/Test_Agent_SiteDirector.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ def sd(mocker):
5555
sd.log.setLevel("DEBUG")
5656
sd.rpcMatcher = MagicMock()
5757
sd.rssClient = MagicMock()
58+
sd.pilotAgentsDB = MagicMock()
5859
sd.workingDirectory = ""
5960
sd.queueDict = {
6061
"aQueue": {

src/DIRAC/WorkloadManagementSystem/Client/ServerUtils.py

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,30 +3,24 @@
33
It always try to insert the records directly. In case of failure a WMS client is used...
44
"""
55

6-
from DIRAC.Core.Utilities.ServerUtils import getDBOrClient
6+
from DIRAC.Core.Base.Client import Client
7+
from DIRAC.WorkloadManagementSystem.DB.PilotAgentsDB import gPilotAgentsDB
8+
from DIRAC.WorkloadManagementSystem.DB.VirtualMachineDB import gVirtualMachineDB
79

810

911
def getPilotAgentsDB():
10-
serverName = "WorkloadManagement/PilotManager"
11-
PilotAgentsDB = None
1212
try:
13-
from DIRAC.WorkloadManagementSystem.DB.PilotAgentsDB import PilotAgentsDB
13+
if gPilotAgentsDB and gPilotAgentsDB._connected:
14+
return gPilotAgentsDB
1415
except Exception:
1516
pass
16-
return getDBOrClient(PilotAgentsDB, serverName)
17-
18-
19-
pilotAgentsDB = getPilotAgentsDB()
17+
return Client(url="WorkloadManagement/PilotManager")
2018

2119

2220
def getVirtualMachineDB():
23-
serverName = "WorkloadManagement/VirtualMachineManager"
24-
VirtualMachineDB = None
2521
try:
26-
from DIRAC.WorkloadManagementSystem.DB.VirtualMachineDB import VirtualMachineDB
22+
if gVirtualMachineDB and gVirtualMachineDB._connected:
23+
return gVirtualMachineDB
2724
except Exception:
2825
pass
29-
return getDBOrClient(VirtualMachineDB, serverName)
30-
31-
32-
virtualMachineDB = getVirtualMachineDB()
26+
return Client(url="WorkloadManagement/VirtualMachineManager")

src/DIRAC/WorkloadManagementSystem/DB/PilotAgentsDB.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1272,3 +1272,9 @@ def buildSQL(self, selectDict=None):
12721272

12731273
def getColumnList(self):
12741274
return self._columns
1275+
1276+
1277+
try:
1278+
gPilotAgentsDB = PilotAgentsDB()
1279+
except:
1280+
gPilotAgentsDB = None

src/DIRAC/WorkloadManagementSystem/DB/VirtualMachineDB.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1165,3 +1165,9 @@ def __setError(self, element, iD, reason):
11651165
return ret
11661166

11671167
return S_ERROR(reason)
1168+
1169+
1170+
try:
1171+
gVirtualMachineDB = VirtualMachineDB()
1172+
except:
1173+
gVirtualMachineDB = None

0 commit comments

Comments
 (0)