Skip to content

Commit b267984

Browse files
committed
fix hyperthreading cores reservation
1 parent ce61412 commit b267984

File tree

2 files changed

+55
-26
lines changed

2 files changed

+55
-26
lines changed

rqd/rqd/rqcore.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -423,17 +423,24 @@ def launchFrame(self, runFrame):
423423

424424
# See if all requested cores are available
425425
with self.__threadLock:
426-
# pylint: disable=no-member
427-
if self.cores.idle_cores < runFrame.num_cores:
428-
err = "Not launching, insufficient idle cores"
429-
log.critical(err)
430-
raise rqd.rqexceptions.CoreReservationFailureException(err)
431-
# pylint: enable=no-member
432-
426+
# For hyperthreading workloads, check HT core availability first
427+
# as it uses a different counting mechanism than idle_cores
433428
if runFrame.environment.get('CUE_THREADABLE') == '1':
429+
if not self.machine.canReserveHT(runFrame.num_cores):
430+
err = "Not launching, insufficient hyperthreading cores available"
431+
log.critical(err)
432+
raise rqd.rqexceptions.CoreReservationFailureException(err)
434433
reserveHT = self.machine.reserveHT(runFrame.num_cores)
435434
if reserveHT:
436435
runFrame.attributes['CPU_LIST'] = reserveHT
436+
else:
437+
# Only check idle cores for non-hyperthreading workloads
438+
# pylint: disable=no-member
439+
if self.cores.idle_cores < runFrame.num_cores:
440+
err = "Not launching, insufficient idle cores"
441+
log.critical(err)
442+
raise rqd.rqexceptions.CoreReservationFailureException(err)
443+
# pylint: enable=no-member
437444

438445
if runFrame.num_gpus:
439446
reserveGpus = self.machine.reserveGpus(runFrame.num_gpus)

rqd/rqd/rqmachine.py

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -936,7 +936,41 @@ def setupGpu(self):
936936
""" Setup rqd for Gpus """
937937
self.__gpusets = set(range(self.getGpuCount()))
938938

939-
def reserveHT(self, frameCores):
939+
def _getAvailableHTCores(self):
940+
"""Get available hyperthreading cores information
941+
@rtype: tuple
942+
@return: (avail_cores_dict, avail_cores_count) where avail_cores_dict
943+
maps physid -> set(coreid) and avail_cores_count is the total count
944+
"""
945+
avail_cores = {}
946+
avail_cores_count = 0
947+
reserved_cores = self.__coreInfo.reserved_cores
948+
949+
for physid, cores in self.__procs_by_physid_and_coreid.items():
950+
for coreid in cores.keys():
951+
if int(physid) in reserved_cores and \
952+
int(coreid) in reserved_cores[int(physid)].coreid:
953+
continue
954+
avail_cores.setdefault(physid, set()).add(coreid)
955+
avail_cores_count += 1
956+
957+
return avail_cores, avail_cores_count
958+
959+
def canReserveHT(self, frameCores: int):
960+
"""Check if hyperthreading cores can be reserved without actually reserving them
961+
@type frameCores: int
962+
@param frameCores: The total physical cores required by the frame.
963+
@rtype: bool
964+
@return: True if cores can be reserved, False otherwise
965+
"""
966+
if frameCores % 100:
967+
return False
968+
969+
_, avail_cores_count = self._getAvailableHTCores()
970+
remaining_cores = frameCores / 100
971+
return avail_cores_count >= remaining_cores
972+
973+
def reserveHT(self, frameCores: int):
940974
""" Reserve cores for use by taskset
941975
taskset -c 0,1,8,9 COMMAND
942976
Not thread save, use with locking.
@@ -951,21 +985,8 @@ def reserveHT(self, frameCores):
951985
return None
952986
log.info('Taskset: Requesting reserve of %d', (frameCores // 100))
953987

954-
# Look for the most idle physical cpu.
955-
# Prefer to assign cores from the same physical cpu.
956-
# Spread different frames around on different physical cpus.
957-
avail_cores = {}
958-
avail_cores_count = 0
959-
reserved_cores = self.__coreInfo.reserved_cores
960-
961-
for physid, cores in self.__procs_by_physid_and_coreid.items():
962-
for coreid in cores.keys():
963-
if int(physid) in reserved_cores and \
964-
int(coreid) in reserved_cores[int(physid)].coreid:
965-
continue
966-
avail_cores.setdefault(physid, set()).add(coreid)
967-
avail_cores_count += 1
968-
988+
# Get available cores using factored method
989+
avail_cores, avail_cores_count = self._getAvailableHTCores()
969990
remaining_cores = frameCores / 100
970991

971992
if avail_cores_count < remaining_cores:
@@ -976,6 +997,7 @@ def reserveHT(self, frameCores):
976997
raise rqd.rqexceptions.CoreReservationFailureException(err)
977998

978999
tasksets = []
1000+
reserved_cores = self.__coreInfo.reserved_cores
9791001

9801002
for physid, cores in sorted(
9811003
avail_cores.items(),
@@ -997,9 +1019,9 @@ def reserveHT(self, frameCores):
9971019
if remaining_cores == 0:
9981020
break
9991021

1000-
log.warning('Taskset: Reserving procs - %s', ','.join(tasksets))
1001-
1002-
return ','.join(tasksets)
1022+
joined_tasksets = ','.join(sorted(tasksets, key=int))
1023+
log.warning('Taskset: Reserving procs - %s', joined_tasksets)
1024+
return joined_tasksets
10031025

10041026
# pylint: disable=inconsistent-return-statements
10051027
def releaseHT(self, reservedHT):

0 commit comments

Comments
 (0)