Skip to content

Commit 56e8707

Browse files
authored
Merge pull request #47377 from AdrianoDee/recycle_and_checks_runthematrix
Updates for `runTheMatrix.py`: input checks, GPUs repartition, input recycling
2 parents 5d817fc + 0f9e1bb commit 56e8707

File tree

8 files changed

+230
-38
lines changed

8 files changed

+230
-38
lines changed

Configuration/PyReleaseValidation/python/MatrixReader.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from Configuration.PyReleaseValidation.WorkFlow import WorkFlow
44
from Configuration.PyReleaseValidation.MatrixUtil import InputInfo
5-
5+
from Configuration.PyReleaseValidation.upgradeWorkflowComponents import defaultDataSets,undefInput
66
# ================================================================================
77

88
class MatrixException(Exception):
@@ -25,8 +25,9 @@ def __init__(self, opt):
2525
self.apply=opt.apply
2626
self.commandLineWf=opt.workflow
2727
self.overWrite=opt.overWrite
28-
28+
2929
self.noRun = opt.noRun
30+
self.checkInputs = opt.checkInputs
3031
return
3132

3233
def reset(self, what='all'):
@@ -127,6 +128,21 @@ def makeStep(self,step,overrides):
127128
else:
128129
return step
129130

131+
def verifyDefaultInputs(self):
132+
for wf in self.workFlowSteps.values():
133+
undefs = [driver for driver in wf[2] if isinstance(driver,str) and undefInput in driver ]
134+
if len(undefs)>0:
135+
raise ValueError("""in MatrixReader.py:{0}
136+
=============================================================================
137+
For wf {1}(*) the default dataset not defined in defaultDataSets dictionary.
138+
With --checkInputs option this throws an error.
139+
140+
(*)
141+
{2}
142+
143+
=============================================================================
144+
""".format(sys._getframe(1).f_lineno - 1,wf[0],wf))
145+
130146
def readMatrix(self, fileNameIn, useInput=None, refRel=None, fromScratch=None):
131147

132148
prefix = self.filesPrefMap[fileNameIn]
@@ -332,6 +348,8 @@ def showRaw(self, useInput, refRel=None, fromScratch=None, what='all',step1Only=
332348

333349
try:
334350
self.readMatrix(matrixFile, useInput, refRel, fromScratch)
351+
if self.checkInputs:
352+
self.verifyDefaultInputs()
335353
except Exception as e:
336354
print("ERROR reading file:", matrixFile, str(e))
337355
raise
@@ -507,6 +525,8 @@ def prepare(self, useInput=None, refRel='', fromScratch=None):
507525

508526
try:
509527
self.readMatrix(matrixFile, useInput, refRel, fromScratch)
528+
if self.checkInputs:
529+
self.verifyDefaultInputs()
510530
except Exception as e:
511531
print("ERROR reading file:", matrixFile, str(e))
512532
raise

Configuration/PyReleaseValidation/python/MatrixRunner.py

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,21 @@
11
import os, sys, time
22

3-
from Configuration.PyReleaseValidation.WorkFlow import WorkFlow
4-
from Configuration.PyReleaseValidation.WorkFlowRunner import WorkFlowRunner
3+
from collections import Counter
54

5+
from Configuration.PyReleaseValidation.WorkFlowRunner import WorkFlowRunner
6+
from Configuration.PyReleaseValidation.MatrixUtil import check_dups
67
# ================================================================================
78

89
class MatrixRunner(object):
910

10-
def __init__(self, wfIn=None, nThrMax=4, nThreads=1):
11+
def __init__(self, wfIn=None, nThrMax=4, nThreads=1, gpus=None):
1112

1213
self.workFlows = wfIn
1314

1415
self.threadList = []
1516
self.maxThreads = nThrMax
1617
self.nThreads = nThreads
18+
self.gpus = gpus
1719

1820
#the directories in which it happened
1921
self.runDirs={}
@@ -43,12 +45,22 @@ def runTests(self, opt):
4345
print('resetting to default number of process threads = %s' % self.maxThreads)
4446

4547
print('Running %s %s %s, each with %s thread%s per process' % ('up to' if self.maxThreads > 1 else '', self.maxThreads, 'concurrent jobs' if self.maxThreads > 1 else 'job', self.nThreads, 's' if self.nThreads > 1 else ''))
46-
47-
48-
for wf in self.workFlows:
49-
50-
if testList and float(wf.numId) not in [float(x) for x in testList]: continue
51-
48+
49+
njob = None
50+
wfs_to_run = self.workFlows
51+
withDups = False
52+
if testList:
53+
if opt.allowDuplicates:
54+
withDups = len(check_dups(testList))>0
55+
else:
56+
testList = set(testList)
57+
wfs_to_run = [wf for wf in self.workFlows if float(wf.numId) in testList for i in range(Counter(testList)[wf.numId])]
58+
59+
for n,wf in enumerate(wfs_to_run):
60+
61+
if opt.allowDuplicates and withDups and opt.nProcs > 1: # to avoid overwriting the work areas
62+
njob = n
63+
5264
item = wf.nameId
5365
if os.path.islink(item) : continue # ignore symlinks
5466

@@ -58,7 +70,10 @@ def runTests(self, opt):
5870

5971
print('\nPreparing to run %s %s' % (wf.numId, item))
6072
sys.stdout.flush()
61-
current = WorkFlowRunner(wf,noRun,dryRun,cafVeto, opt.dasOptions, opt.jobReports, opt.nThreads, opt.nStreams, opt.maxSteps, opt.nEvents)
73+
gpu_cmd = None
74+
if self.gpus is not None:
75+
gpu_cmd = next(self.gpus).gpuBind()
76+
current = WorkFlowRunner(wf,opt,noRun,dryRun,cafVeto,njob,gpu_cmd)
6277
self.threadList.append(current)
6378
current.start()
6479
if not dryRun:

Configuration/PyReleaseValidation/python/MatrixUtil.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import os
2+
import subprocess
3+
24
class Matrix(dict):
35
def __setitem__(self,key,value):
46
if key in self:
@@ -281,3 +283,58 @@ def check_dups(input):
281283
dups = set(x for x in input if x in seen or seen.add(x))
282284

283285
return dups
286+
287+
class AvailableGPU():
288+
289+
def __init__(self, make, counter, id, capability, name):
290+
self.make = make
291+
self.counter = counter
292+
self.id = id
293+
self.capability = capability
294+
self.name = name
295+
296+
def __str__(self):
297+
return "> GPU no.{0}: {1} - {2} - {3} - {4}".format(self.counter,self.make,self.id,self.capability,self.name)
298+
299+
def isCUDA(self):
300+
return self.make == 'CUDA'
301+
def isROCM(self):
302+
return self.make == 'ROCM'
303+
304+
def gpuBind(self):
305+
306+
cmd = ''
307+
if self.make == 'CUDA':
308+
cmd = 'CUDA_VISIBLE_DEVICES=' + str(self.id) + " HIP_VISIBLE_DEVICES= "
309+
elif self.make == 'ROCM':
310+
cmd = 'CUDA_VISIBLE_DEVICES= HIP_VISIBLE_DEVICES=' + str(self.id) + " "
311+
312+
return cmd
313+
314+
def cleanComputeCapabilities(make, offset = 0):
315+
316+
# Building on top of {cuda|rocm}ComputeCapabilities
317+
# with output:
318+
# ID computeCapability Architetcure Model Info
319+
320+
out = subprocess.run(make + "ComputeCapabilities", capture_output = True, text = True)
321+
322+
if out.returncode > 0:
323+
return []
324+
325+
gpus = []
326+
for f in out.stdout.split("\n"):
327+
328+
if not len(f)>0:
329+
continue
330+
331+
if "unsupported" in f:
332+
print("> Warning! Unsupported GPU:")
333+
print(" > " + " ".join(f))
334+
continue
335+
336+
gpus.append(f.split())
337+
338+
gpus = [AvailableGPU(make.upper(), i + offset, int(f[0]),f[1]," ".join(f[2:])) for i,f in enumerate(gpus)]
339+
340+
return gpus

Configuration/PyReleaseValidation/python/WorkFlowRunner.py

Lines changed: 38 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,26 +7,33 @@
77
from datetime import datetime
88

99
class WorkFlowRunner(Thread):
10-
def __init__(self, wf, noRun=False,dryRun=False,cafVeto=True,dasOptions="",jobReport=False, nThreads=1, nStreams=0, maxSteps=9999, nEvents=0):
10+
def __init__(self, wf, opt, noRun=False, dryRun=False, cafVeto=True, jobNumber=None, gpu = None):
1111
Thread.__init__(self)
1212
self.wf = wf
1313

14-
self.status=-1
15-
self.report=''
16-
self.nfail=0
17-
self.npass=0
18-
self.noRun=noRun
19-
self.dryRun=dryRun
20-
self.cafVeto=cafVeto
21-
self.dasOptions=dasOptions
22-
self.jobReport=jobReport
23-
self.nThreads=nThreads
24-
self.nStreams=nStreams
25-
self.maxSteps=maxSteps
26-
self.nEvents=nEvents
27-
self.recoOutput=''
14+
self.status = -1
15+
self.report =''
16+
self.nfail = 0
17+
self.npass = 0
18+
self.noRun = noRun
19+
self.dryRun = dryRun
20+
self.cafVeto = cafVeto
21+
self.gpu = gpu
22+
23+
self.dasOptions = opt.dasOptions
24+
self.jobReport = opt.jobReports
25+
self.nThreads = opt.nThreads
26+
self.nStreams = opt.nStreams
27+
self.maxSteps = opt.maxSteps
28+
self.nEvents = opt.nEvents
29+
self.recoOutput = ''
30+
self.startFrom = opt.startFrom
31+
self.recycle = opt.recycle
2832

2933
self.wfDir=str(self.wf.numId)+'_'+self.wf.nameId
34+
if jobNumber is not None:
35+
self.wfDir = self.wfDir + '_job' + str(jobNumber)
36+
3037
return
3138

3239
def doCmd(self, cmd):
@@ -98,6 +105,9 @@ def closeCmd(i,ID):
98105
self.stat.append('NOTRUN')
99106
continue
100107
if not isinstance(com,str):
108+
if self.recycle:
109+
inFile = self.recycle
110+
continue
101111
if self.cafVeto and (com.location == 'CAF' and not onCAF):
102112
print("You need to be no CAF to run",self.wf.numId)
103113
self.npass.append(0)
@@ -146,7 +156,19 @@ def closeCmd(i,ID):
146156

147157
else:
148158
#chaining IO , which should be done in WF object already and not using stepX.root but <stepName>.root
159+
if self.gpu is not None:
160+
cmd = cmd + self.gpu
161+
149162
cmd += com
163+
164+
if self.startFrom:
165+
steps = cmd.split("-s ")[1].split(" ")[0]
166+
if self.startFrom not in steps:
167+
continue
168+
else:
169+
self.startFrom = False
170+
inFile = self.recycle
171+
150172
if self.noRun:
151173
cmd +=' --no_exec'
152174
# in case previous step used DAS query (either filelist of das:)
@@ -191,6 +213,7 @@ def closeCmd(i,ID):
191213
cmd = split[0] + event_token + '%s ' % self.nEvents + pos_cmd
192214
cmd+=closeCmd(istep,self.wf.nameId)
193215
retStep = 0
216+
194217
if istep>self.maxSteps:
195218
wf_stats = open("%s/wf_steps.txt" % self.wfDir,"a")
196219
wf_stats.write('step%s:%s\n' % (istep, cmd))

Configuration/PyReleaseValidation/python/relval_data_highstats.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
wf_number = wf_number + offset_pd * p_n
6060
wf_number = wf_number + offset_events * evs
6161
wf_number = round(wf_number,6)
62-
step_name = "Run" + pd + era.split("Run")[1] + "_10k"
62+
step_name = "Run" + pd + era.split("Run")[1] + "_" + e_key
6363
y = str(int(base_wf))
6464
suff = "ZB_" if "ZeroBias" in step_name else ""
6565
workflows[wf_number] = ['',[step_name,'HLTDR3_' + y,'RECONANORUN3_' + suff + 'reHLT_'+y,'HARVESTRUN3_' + suff + y]]

Configuration/PyReleaseValidation/python/relval_steps.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
import sys
2+
13
from .MatrixUtil import *
24

35
from Configuration.HLT.autoHLT import autoHLT
46
from Configuration.AlCa.autoPCL import autoPCL
57
from Configuration.Skimming.autoSkim import autoSkim
6-
from .upgradeWorkflowComponents import step3_trackingOnly
8+
from Configuration.PyReleaseValidation.upgradeWorkflowComponents import step3_trackingOnly,undefInput
79

810
# step1 gensim: for run1
911
step1Defaults = {'--relval' : None, # need to be explicitly set
@@ -4595,13 +4597,15 @@ def gen2024HiMix(fragment,howMuch):
45954597
for gen in upgradeFragments:
45964598
for ds in defaultDataSets:
45974599
key=gen[:-4]+'_'+ds
4598-
version='1'
4600+
version = undefInput if defaultDataSets[ds] == '' else '1'
45994601
if key in versionOverrides:
46004602
version = versionOverrides[key]
46014603
baseDataSetReleaseBetter[key]=defaultDataSets[ds]+version
46024604

46034605
PUDataSets={}
46044606
for ds in defaultDataSets:
4607+
if "GenOnly" in ds:
4608+
continue
46054609
key='MinBias_14TeV_pythia8_TuneCP5'+'_'+ds
46064610
name=baseDataSetReleaseBetter[key]
46074611
if '2017' in ds:
@@ -4621,7 +4625,6 @@ def gen2024HiMix(fragment,howMuch):
46214625
#PUDataSets[ds]={'-n':10,'--pileup':'AVE_50_BX_25ns','--pileup_input':'das:/RelValMinBias_13/%s/GEN-SIM'%(name,)}
46224626
#PUDataSets[ds]={'-n':10,'--pileup':'AVE_70_BX_25ns','--pileup_input':'das:/RelValMinBias_13/%s/GEN-SIM'%(name,)}
46234627

4624-
46254628
upgradeStepDict={}
46264629
for specialType,specialWF in upgradeWFs.items():
46274630
specialWF.init(upgradeStepDict)

Configuration/PyReleaseValidation/python/upgradeWorkflowComponents.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
from .MatrixUtil import merge, Kby, Mby, check_dups
44
import re
55

6+
undefInput = "UNDEF"
7+
68
U2000by1={'--relval': '2000,1'}
79

810
# DON'T CHANGE THE ORDER, only append new keys. Otherwise the numbering for the runTheMatrix tests will change.
@@ -3164,6 +3166,8 @@ def condition(self, fragment, stepList, key, hasHarvest):
31643166

31653167
# standard PU sequences
31663168
for key in list(upgradeProperties[2017].keys()):
3169+
if "GenOnly" in key:
3170+
continue
31673171
upgradeProperties[2017][key+'PU'] = deepcopy(upgradeProperties[2017][key])
31683172
if 'FS' not in key:
31693173
# update ScenToRun list
@@ -3413,6 +3417,8 @@ def condition(self, fragment, stepList, key, hasHarvest):
34133417

34143418
# standard PU sequences
34153419
for key in list(upgradeProperties['Run4'].keys()):
3420+
if "GenOnly" in key:
3421+
continue
34163422
upgradeProperties['Run4'][key+'PU'] = deepcopy(upgradeProperties['Run4'][key])
34173423
upgradeProperties['Run4'][key+'PU']['ScenToRun'] = ['GenSimHLBeamSpot','DigiTriggerPU','RecoGlobalPU', 'HARVESTGlobalPU']
34183424

0 commit comments

Comments
 (0)