Skip to content

Commit 51806c1

Browse files
committed
tools/contrib: Add balance-cpu.py to distribute Seastar reactors to CPU cores using two strategies: OSD and NUMA socket based
Signed-off-by: Jose J Palacios-Perez <[email protected]>
1 parent f83110a commit 51806c1

File tree

1 file changed

+283
-0
lines changed

1 file changed

+283
-0
lines changed

src/tools/contrib/balance_cpu.py

Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
#!/usr/bin/python
2+
"""
3+
This script gets the output from lscpu and produces a list of CPU uids
4+
corresponding to physical cores, intended to use to allocate Seastar reactors
5+
in a balanced way across sockets.
6+
7+
Two strategies of balancing reactors over CPU cores:
8+
9+
1) OSD based: all the reactors of each OSD run in the same CPU NUMA socket (default),
10+
2) Socket based: reactors for the same OSD are distributed evenly across CPU NUMA sockets.
11+
"""
12+
13+
import argparse
14+
import logging
15+
import sys
16+
import tempfile
17+
from lscpu import LsCpuJson
18+
19+
__author__ = "Jose J Palacios-Perez"
20+
21+
logger = logging.getLogger(__name__)
22+
23+
# Defaults
24+
NUM_OSD = 8
25+
NUM_REACTORS = 3
26+
27+
28+
class CpuCoreAllocator(object):
29+
"""
30+
Process a sequence of CPU core ids to be used for the allocation of Seastar reactors
31+
32+
# lscpu --json
33+
{
34+
"lscpu": [
35+
{
36+
d: {'field': 'NUMA node(s):', 'data': '2'}
37+
d: {'field': 'NUMA node0 CPU(s):', 'data': '0-27,56-83'}
38+
d: {'field': 'NUMA node1 CPU(s):', 'data': '28-55,84-111'}
39+
}
40+
:
41+
}
42+
"""
43+
44+
def __init__(self, json_file: str, num_osd: int, num_react: int):
45+
"""
46+
This class expects the output from lscpu --json, from there
47+
it works out a list of physical CPU uids to allocate Seastar reactors
48+
"""
49+
self.json_file = json_file
50+
self.num_osd = num_osd
51+
self.num_react = num_react
52+
self._dict = {}
53+
self.lscpu = LsCpuJson(json_file)
54+
# self.socket_lst = LsCpuJson(json_file)
55+
56+
def do_distrib_socket_based(self):
57+
"""
58+
Distribution criteria: the reactors of each OSD are distributed across the available
59+
NUMA sockets evenly.
60+
Each OSD uses step cores from each NUMA socket.
61+
Produces a list of ranges to use for the ceph config set CLI.
62+
"""
63+
# Init:
64+
control = []
65+
cores_to_disable = set([])
66+
num_sockets = self.lscpu.get_num_sockets()
67+
# step = self.num_react
68+
total_phys_cores = self.lscpu.get_total_physical()
69+
# Max num of OSD that can be allocated
70+
max_osd_num = total_phys_cores // self.num_react
71+
72+
# Each OSD uses num reactor//sockets cores
73+
step = self.num_react // num_sockets
74+
reminder = self.num_react % num_sockets
75+
76+
logger.debug(
77+
f"total_phys_cores: {total_phys_cores}, max_osd_num: {max_osd_num}, step:{step}"
78+
)
79+
assert max_osd_num > self.num_osd, "Not enough physical CPU cores"
80+
81+
# Copy the original physical ranges to the control dict
82+
for socket in self.lscpu.get_sockets(): # socket_lst["sockets"]:
83+
control.append(socket)
84+
# Traverse the OSD to produce an allocation
85+
# f"total_phys_cores: {total_phys_cores}, max_osd_num: {max_osd_num}, step:{step}, rem:{reminder} "
86+
for osd in range(self.num_osd):
87+
osds = []
88+
for socket in control:
89+
_start = socket["physical_start"]
90+
_step = step
91+
# If there is a reminder, use a round-robin technique so all
92+
# sockets are candidate for it
93+
_candidate = osd % num_sockets
94+
_so_id = socket["socket"]
95+
if _candidate == _so_id:
96+
_step += reminder
97+
_end = socket["physical_start"] + _step
98+
# For cephadm, construct a dictionary for these intervals
99+
logger.debug(
100+
f"osd: {osd}, socket:{_so_id}, _start:{_start}, _end:{_end - 1}"
101+
)
102+
osds.append(f"{_start}-{_end - 1}")
103+
104+
if _end <= socket["physical_end"]:
105+
socket["physical_start"] = _end
106+
# Produce the HT sibling list to disable
107+
# Consider to use sets to avoid dupes
108+
plist = list(
109+
range(
110+
socket["ht_sibling_start"],
111+
(socket["ht_sibling_start"] + _step),
112+
1,
113+
)
114+
)
115+
logger.debug(f"plist: {plist}")
116+
pset = set(plist)
117+
# _to_disable=pset.union(cores_to_disable)
118+
cores_to_disable = pset.union(cores_to_disable)
119+
logger.debug(f"cores_to_disable: {list(cores_to_disable)}")
120+
socket["ht_sibling_start"] += _step
121+
else:
122+
# bail out
123+
_sops = socket["physical_start"] + step
124+
logger.debug(f"out of range: {_sops}")
125+
break
126+
print(",".join(osds))
127+
_to_disable = sorted(list(cores_to_disable))
128+
logger.debug(f"Cores to disable: {_to_disable}")
129+
print(" ".join(map(str, _to_disable)))
130+
131+
def do_distrib_osd_based(self):
132+
"""
133+
Given a number of Seastar reactor threads and number of OSD,
134+
distributes all the reactors of the same OSD in the same NUMA socket
135+
using only physical core CPUs.
136+
Produces a list of ranges to use for the ceph config set CLI.
137+
"""
138+
control = []
139+
cores_to_disable = set([])
140+
# Each OSD uses num reactor cores from the same NUMA socket
141+
num_sockets = self.lscpu.get_num_sockets()
142+
step = self.num_react
143+
total_phys_cores = self.lscpu.get_total_physical()
144+
# Max num of OSD that can be allocated
145+
max_osd_num = total_phys_cores // self.num_react
146+
147+
logger.debug(
148+
f"total_phys_cores: {total_phys_cores}, max_osd_num: {max_osd_num}, step:{step}"
149+
)
150+
assert max_osd_num > self.num_osd, "Not enough physical CPU cores"
151+
152+
# Copy the original physical ranges to the control dict
153+
for socket in self.lscpu.get_sockets():
154+
control.append(socket)
155+
# Traverse the OSD to produce an allocation
156+
# even OSD num uses socket0, odd OSD number uses socket 1
157+
for osd in range(self.num_osd):
158+
_so_id = osd % num_sockets
159+
socket = control[_so_id]
160+
_start = socket["physical_start"]
161+
_end = socket["physical_start"] + step
162+
# For cephadm, construct a dictionary for these intervals
163+
logger.debug(
164+
f"osd: {osd}, socket:{_so_id}, _start:{_start}, _end:{_end - 1}"
165+
)
166+
print(f"{_start}-{_end - 1}")
167+
if _end <= socket["physical_end"]:
168+
socket["physical_start"] = _end
169+
# Produce the HT sibling list to disable
170+
# Consider to use sets to avoid dupes
171+
plist = list(
172+
range(
173+
socket["ht_sibling_start"],
174+
(socket["ht_sibling_start"] + step),
175+
1,
176+
)
177+
)
178+
logger.debug(f"plist: {plist}")
179+
pset = set(plist)
180+
# _to_disable = pset.union(cores_to_disable)
181+
cores_to_disable = pset.union(cores_to_disable)
182+
logger.debug(f"cores_to_disable: {list(cores_to_disable)}")
183+
socket["ht_sibling_start"] += step
184+
else:
185+
# bail out
186+
_sops = socket["physical_start"] + step
187+
logger.debug(f"Out of range: {_sops}")
188+
break
189+
_to_disable = sorted(list(cores_to_disable))
190+
logger.debug(f"Cores to disable: {_to_disable}")
191+
print(" ".join(map(str, _to_disable)))
192+
193+
def run(self, distribute_strat):
194+
"""
195+
Load the .json from lscpu, get the ranges of CPU cores per socket,
196+
produce the corresponding balance, print the balance as a list intended to be
197+
consumed by vstart.sh -- a dictionary will be used for cephadm.
198+
"""
199+
self.lscpu.load_json()
200+
self.lscpu.get_ranges()
201+
if distribute_strat == "socket":
202+
self.do_distrib_socket_based()
203+
else:
204+
self.do_distrib_osd_based()
205+
206+
207+
def main(argv):
208+
examples = """
209+
Examples:
210+
# Produce a balanced CPU distribution of physical CPU cores intended for the Seastar
211+
reactor threads
212+
%prog -u <lscpu.json> [-b <osd|socket>] [-d<dir>] [-v]
213+
[-o <num_OSDs>] [-r <num_reactors>]
214+
215+
# such a list can be used for vstart.sh/cephadm to issue ceph conf set commands.
216+
"""
217+
parser = argparse.ArgumentParser(
218+
description="""This tool is used to produce CPU core balanced allocation""",
219+
epilog=examples,
220+
formatter_class=argparse.RawDescriptionHelpFormatter,
221+
)
222+
parser.add_argument(
223+
"-u",
224+
"--lscpu",
225+
type=str,
226+
required=True,
227+
help="Input file: .json file produced by lscpu --json",
228+
default=None,
229+
)
230+
parser.add_argument(
231+
"-o",
232+
"--num_osd",
233+
type=int,
234+
required=False,
235+
help="Number of OSDs",
236+
default=NUM_OSD,
237+
)
238+
parser.add_argument(
239+
"-r",
240+
"--num_reactor", # value of --crimson-smp
241+
type=int,
242+
required=False,
243+
help="Number of Seastar reactors",
244+
default=NUM_REACTORS,
245+
)
246+
parser.add_argument(
247+
"-d", "--directory", type=str, help="Directory to examine", default="./"
248+
)
249+
parser.add_argument(
250+
"-b",
251+
"--balance",
252+
type=str,
253+
required=False,
254+
help="CPU balance strategy: osd (default), socket (NUMA)",
255+
default="osd",
256+
)
257+
parser.add_argument(
258+
"-v",
259+
"--verbose",
260+
action="store_true",
261+
help="True to enable verbose logging mode",
262+
default=False,
263+
)
264+
265+
# parser.set_defaults(numosd=1)
266+
options = parser.parse_args(argv)
267+
268+
if options.verbose:
269+
logLevel = logging.DEBUG
270+
else:
271+
logLevel = logging.INFO
272+
273+
with tempfile.NamedTemporaryFile(dir="/tmp", delete=False) as tmpfile:
274+
logging.basicConfig(filename=tmpfile.name, encoding="utf-8", level=logLevel)
275+
276+
logger.debug(f"Got options: {options}")
277+
278+
cpu_cores = CpuCoreAllocator(options.lscpu, options.num_osd, options.num_reactor)
279+
cpu_cores.run(options.balance)
280+
281+
282+
if __name__ == "__main__":
283+
main(sys.argv[1:])

0 commit comments

Comments
 (0)