Skip to content

Commit 76a51bb

Browse files
committed
introduce ability to define CPU affinity for input and output workers
1 parent f79342b commit 76a51bb

File tree

1 file changed

+49
-5
lines changed

1 file changed

+49
-5
lines changed

ipfixprobe.cpp

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ void print_help(ipxp_conf_t &conf, const std::string &arg)
138138
}
139139
}
140140

141-
void process_plugin_argline(const std::string &args, std::string &plugin, std::string &params)
141+
void process_plugin_argline(const std::string &args, std::string &plugin, std::string &params, std::vector<int> &affinity)
142142
{
143143
size_t delim;
144144

@@ -148,6 +148,16 @@ void process_plugin_argline(const std::string &args, std::string &plugin, std::s
148148
plugin = params.substr(0, delim);
149149
params.erase(0, delim == std::string::npos ? delim : delim + 1);
150150

151+
delim = plugin.find('@');
152+
if (delim != std::string::npos) {
153+
try {
154+
affinity.emplace_back(std::stoi(plugin.substr(delim + 1)));
155+
} catch (const std::invalid_argument &ex) {
156+
throw IPXPError("CPU affinity must be single number: " + std::string(ex.what()));
157+
}
158+
}
159+
plugin = plugin.substr(0, delim);
160+
151161
trim_str(plugin);
152162
trim_str(params);
153163
}
@@ -168,6 +178,28 @@ telemetry::Content get_ipx_ring_telemetry(ipx_ring_t* ring)
168178
return dict;
169179
}
170180

181+
void set_thread_details(pthread_t thread, const std::string &name, const std::vector<int> &affinity)
182+
{
183+
// Set thread name and affinity
184+
if (name.length() > 0) {
185+
pthread_setname_np(thread, name.substr(0, 15).c_str());
186+
}
187+
if (affinity.size() > 0) {
188+
cpu_set_t cpuset;
189+
CPU_ZERO(&cpuset);
190+
for (auto cpu : affinity) {
191+
CPU_SET(cpu, &cpuset);
192+
}
193+
int ret = pthread_setaffinity_np(thread, sizeof(cpu_set_t), &cpuset);
194+
if (ret != 0) {
195+
throw IPXPError(
196+
"pthread_setaffinity_np failed, CPU(s) "
197+
+ vec2str(affinity) + " probably cannot be set"
198+
);
199+
}
200+
}
201+
}
202+
171203
bool process_plugin_args(ipxp_conf_t &conf, IpfixprobeOptParser &parser)
172204
{
173205
auto deleter = [&](OutputPlugin::Plugins *p) {
@@ -183,18 +215,27 @@ bool process_plugin_args(ipxp_conf_t &conf, IpfixprobeOptParser &parser)
183215
std::string output_params = "";
184216

185217
if (parser.m_storage.size()) {
186-
process_plugin_argline(parser.m_storage[0], storage_name, storage_params);
218+
std::vector<int> affinity;
219+
process_plugin_argline(parser.m_storage[0], storage_name, storage_params, affinity);
220+
if (affinity.size() != 0) {
221+
throw IPXPError("cannot set CPU affinity for storage plugin (storage plugin is invoked inside input threads)");
222+
}
187223
}
224+
std::vector<int> output_worker_affinity;
188225
if (parser.m_output.size()) {
189-
process_plugin_argline(parser.m_output[0], output_name, output_params);
226+
process_plugin_argline(parser.m_output[0], output_name, output_params, output_worker_affinity);
190227
}
191228

192229
// Process
193230
for (auto &it : parser.m_process) {
194231
ProcessPlugin *process_plugin = nullptr;
195232
std::string process_params;
196233
std::string process_name;
197-
process_plugin_argline(it, process_name, process_params);
234+
std::vector<int> affinity;
235+
process_plugin_argline(it, process_name, process_params, affinity);
236+
if (affinity.size() != 0) {
237+
throw IPXPError("cannot set CPU affinity for process plugin (process plugins are invoked inside input threads)");
238+
}
198239
for (auto &it : *process_plugins) {
199240
std::string plugin_name = it.first;
200241
if (plugin_name == process_name) {
@@ -272,6 +313,7 @@ bool process_plugin_args(ipxp_conf_t &conf, IpfixprobeOptParser &parser)
272313
output_stats,
273314
output_queue
274315
};
316+
set_thread_details(tmp.thread->native_handle(), "out_" + output_name, output_worker_affinity);
275317
conf.outputs.push_back(tmp);
276318
conf.output_fut.push_back(output_res->get_future());
277319
}
@@ -286,7 +328,8 @@ bool process_plugin_args(ipxp_conf_t &conf, IpfixprobeOptParser &parser)
286328
StoragePlugin *storage_plugin = nullptr;
287329
std::string input_params;
288330
std::string input_name;
289-
process_plugin_argline(it, input_name, input_params);
331+
std::vector<int> affinity;
332+
process_plugin_argline(it, input_name, input_params, affinity);
290333

291334
auto input_plugin_dir = input_dir->addDir(input_name);
292335
auto pipeline_queue_dir = pipeline_dir->addDir("queues")->addDir(std::to_string(pipeline_idx));
@@ -358,6 +401,7 @@ bool process_plugin_args(ipxp_conf_t &conf, IpfixprobeOptParser &parser)
358401
storage_process_plugins
359402
}
360403
};
404+
set_thread_details(tmp.input.thread->native_handle(), "in_"+ std::to_string(pipeline_idx) + "_" + input_name, affinity);
361405
conf.pipelines.push_back(tmp);
362406
pipeline_idx++;
363407
}

0 commit comments

Comments
 (0)