Skip to content

Commit 593f293

Browse files
committed
got multithread working
1 parent 5961979 commit 593f293

File tree

4 files changed

+108
-38
lines changed

4 files changed

+108
-38
lines changed

src/bpftimeruntime.cpp

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ BpfTimeRuntime::BpfTimeRuntime(pid_t tid, std::string program_location)
3232
SPDLOG_INFO("GLOBAL memory initialized ");
3333
}
3434

35-
BpfTimeRuntime::~BpfTimeRuntime() { }
35+
BpfTimeRuntime::~BpfTimeRuntime() {}
3636
int BpfTimeRuntime::read(CXLController *controller, BPFTimeRuntimeElem *elem) {
3737
mem_stats stats;
3838
proc_info proc_info1;
@@ -42,14 +42,14 @@ int BpfTimeRuntime::read(CXLController *controller, BPFTimeRuntimeElem *elem) {
4242
uint64_t key = 0; // 改为8字节
4343
uint64_t key1 = 0; // 改为8字节
4444
void *item2 = (void *)1;
45-
while(item2){
45+
while (item2) {
4646
int ret = bpftime_map_get_next_key(i, &key1, &key); // 获取key
4747
if (ret != 0) {
4848
SPDLOG_DEBUG("Failed to get next key for map {}", i);
4949
break;
5050
}
5151

52-
item2 = (void*)bpftime_map_lookup_elem(i, &key);
52+
item2 = (void *)bpftime_map_lookup_elem(i, &key);
5353
SPDLOG_DEBUG("Process map key: {} {} thread_id:{}", key1, key,
5454
std::this_thread::get_id()); // 使用std::this_thread获取当前线程ID
5555

@@ -73,4 +73,3 @@ int BpfTimeRuntime::read(CXLController *controller, BPFTimeRuntimeElem *elem) {
7373
}
7474
return 0;
7575
}
76-

src/cxlcontroller.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,13 +81,13 @@ void CXLController::set_stats(mem_stats stats) {
8181
}
8282

8383
void CXLController::set_process_info(const proc_info &process_info) {
84-
monitors->enable(process_info.current_pid, process_info.current_tid, true, 1000, 0);
84+
monitors->enable(process_info.current_pid, process_info.current_tid, true, 1000, helper.num_of_cpu());
8585
}
8686

8787
void CXLController::set_thread_info(const proc_info &thread_info) {
88-
if (thread_info .parent_pid == monitors->mon[0].tgid) {
89-
monitors->enable(thread_info.current_pid, thread_info.current_tid, false, 0, 0);
90-
std::cout << "set thread info " << thread_info.current_pid << " " << thread_info.current_tid << std::endl;
88+
if (thread_info.current_pid == monitors->mon[0].tgid) {
89+
monitors->enable(thread_info.current_pid, thread_info.current_tid, false, 0, helper.num_of_cpu());
90+
// std::cout << "set thread info " << thread_info.current_pid << " " << thread_info.current_tid << std::endl;
9191
auto lbr_ = new lbr{.from = 0, .to = 0, .flags = 0};
9292
this->insert_one(thread_map[thread_info.current_tid], *lbr_);
9393
delete lbr_;

src/main.cc

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ int main(int argc, char *argv[]) {
3636
cxxopts::value<std::string>()->default_value("./microbench/malloc"))(
3737
"h,help", "Help for CXLMemSim", cxxopts::value<bool>()->default_value("false"))(
3838
"c,cpuset", "The CPUSET for CPU to set affinity on and only run the target process on those CPUs",
39-
cxxopts::value<std::vector<int>>()->default_value("0,1,2,3"))("d,dramlatency", "The current platform's dram latency",
40-
cxxopts::value<double>()->default_value("110"))(
39+
cxxopts::value<std::vector<int>>()->default_value("0,1,2,3"))(
40+
"d,dramlatency", "The current platform's dram latency", cxxopts::value<double>()->default_value("110"))(
4141
"p,pebsperiod", "The pebs sample period", cxxopts::value<int>()->default_value("10"))(
4242
"m,mode", "Page mode or cacheline mode", cxxopts::value<std::string>()->default_value("p"))(
4343
"o,topology", "The newick tree input for the CXL memory expander topology",
@@ -64,7 +64,8 @@ int main(int argc, char *argv[]) {
6464
"k,policy", "The policy of CXL memory controller",
6565
cxxopts::value<std::vector<std::string>>()->default_value("none,none,none,none"))(
6666
"e,env", "The environment variable for the CXL memory controller",
67-
cxxopts::value<std::vector<std::string>>()->default_value("OMP_NUM_THREADS=24"));;
67+
cxxopts::value<std::vector<std::string>>()->default_value("OMP_NUM_THREADS=24"));
68+
;
6869

6970
auto result = options.parse(argc, argv);
7071
if (result["help"].as<bool>()) {
@@ -130,7 +131,7 @@ int main(int argc, char *argv[]) {
130131
hybridPolicy->add_policy(new HeatAwareMigrationPolicy());
131132
hybridPolicy->add_policy(new FrequencyBasedMigrationPolicy());
132133
policy2 = hybridPolicy;
133-
}else {
134+
} else {
134135
SPDLOG_ERROR("Unknown migration policy: {}", policy[1]);
135136
policy2 = new MigrationPolicy();
136137
}
@@ -150,7 +151,7 @@ int main(int argc, char *argv[]) {
150151
policy4 = new FIFOPolicy();
151152
} else if (policy[3] == "frequency") {
152153
policy4 = new FrequencyBasedInvalidationPolicy();
153-
} else {
154+
} else {
154155
SPDLOG_ERROR("Unknown caching policy: {}", policy[3]);
155156
policy4 = new CachingPolicy();
156157
}
@@ -161,7 +162,7 @@ int main(int argc, char *argv[]) {
161162
for (auto i : cpuset) {
162163
if (!use_cpus || use_cpus & 1UL << i) {
163164
CPU_SET(i, &use_cpuset);
164-
SPDLOG_DEBUG("use cpuid: {}{}", i, use_cpus);
165+
std::cout << "use cpuid: " << i << " " << use_cpus << std::endl;
165166
}
166167
}
167168

@@ -229,7 +230,7 @@ int main(int argc, char *argv[]) {
229230
sleep(1);
230231
std::vector<const char *> envp;
231232
envp.emplace_back("LD_PRELOAD=/root/.bpftime/libbpftime-agent.so");
232-
envp.emplace_back("OMP_NUM_THREADS=24");
233+
envp.emplace_back("OMP_NUM_THREADS=4");
233234
while (!env.empty()) {
234235
envp.emplace_back(env.back().c_str());
235236
env.pop_back();
@@ -308,15 +309,19 @@ int main(int argc, char *argv[]) {
308309
all_llcmiss = 0, all_prefetch = 0;
309310
double writeback_latency;
310311
/* read BPFTIMERUNTIME sample */
311-
if (mon.bpftime_ctx->read(controller, &mon.after->bpftime) < 0) {
312-
SPDLOG_ERROR("[{}:{}:{}] Warning: Failed BPFTIMERUNTIME read", i, mon.tgid, mon.tid);
313-
}
314-
/* read PEBS sample */
315-
if (mon.pebs_ctx->read(controller, &mon.after->pebs) < 0) {
316-
SPDLOG_ERROR("[{}:{}:{}] Warning: Failed PEBS read", i, mon.tgid, mon.tid);
317-
}
318-
/* read LBR sample */
319-
if (mon.lbr_ctx->read(controller, &mon.after->lbr) < 0) {
312+
if (mon.is_process) {
313+
if (mon.bpftime_ctx->read(controller, &mon.after->bpftime) < 0) {
314+
SPDLOG_ERROR("[{}:{}:{}] Warning: Failed BPFTIMERUNTIME read", i, mon.tgid, mon.tid);
315+
}
316+
317+
/* read PEBS sample */
318+
if (mon.pebs_ctx->read(controller, &mon.after->pebs) < 0) {
319+
SPDLOG_ERROR("[{}:{}:{}] Warning: Failed PEBS read", i, mon.tgid, mon.tid);
320+
}
321+
/* read LBR sample */
322+
if (mon.lbr_ctx->read(controller, &mon.after->lbr) < 0) {
323+
SPDLOG_ERROR("[{}:{}:{}] Warning: Failed LBR read", i, mon.tgid, mon.tid);
324+
}
320325
}
321326
target_llcmiss = mon.after->pebs.total - mon.before->pebs.total;
322327

@@ -342,8 +347,8 @@ int main(int argc, char *argv[]) {
342347
writeback_latency = (double)target_l2stall * avg_weight *
343348
(wb_cnt * target_llcmiss / (all_llcmiss + all_prefetch + 1) /
344349
(target_llchits + avg_weight * target_llcmiss + 1));
345-
uint64_t emul_delay =0;//=
346-
//(controller->latency_lat + controller->bandwidth_lat + writeback_latency) * 1000000;
350+
uint64_t emul_delay =
351+
(controller->latency_lat + controller->bandwidth_lat + writeback_latency) * 1000000;
347352

348353
SPDLOG_DEBUG("[{}:{}:{}] pebs: total={}, ", i, mon.tgid, mon.tid, mon.after->pebs.total);
349354

src/monitor.cpp

Lines changed: 78 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,78 @@
1414
#include <csignal>
1515
#include <ctime>
1616
#include <iostream>
17+
#include <vector>
1718
timespec Monitor::last_delay = {0, 0};
1819

20+
std::vector<pid_t> get_thread_ids(pid_t pid) {
21+
std::vector<pid_t> thread_ids;
22+
23+
// 构建task目录路径
24+
std::string task_dir = "/proc/" + std::to_string(pid) + "/task";
25+
26+
DIR *dir = opendir(task_dir.c_str());
27+
if (dir == nullptr) {
28+
std::cerr << "无法打开目录: " << task_dir << " - " << strerror(errno) << std::endl;
29+
return thread_ids;
30+
}
31+
32+
struct dirent *entry;
33+
while ((entry = readdir(dir)) != nullptr) {
34+
// 跳过 . 和 ..
35+
if (strcmp(entry->d_name, ".") == 0 || strcmp(entry->d_name, "..") == 0) {
36+
continue;
37+
}
38+
39+
// 将线程ID添加到结果中
40+
pid_t tid = std::stoi(entry->d_name);
41+
thread_ids.push_back(tid);
42+
}
43+
44+
closedir(dir);
45+
return thread_ids;
46+
}
47+
48+
// 为特定线程设置CPU亲和性
49+
bool set_thread_affinity(pid_t tid, int cpu_id) {
50+
cpu_set_t cpuset;
51+
CPU_ZERO(&cpuset);
52+
CPU_SET(cpu_id, &cpuset);
53+
54+
int result = sched_setaffinity(tid, sizeof(cpu_set_t), &cpuset);
55+
56+
if (result != 0) {
57+
std::cerr << "设置线程 " << tid << " 的CPU亲和性失败: " << strerror(errno) << std::endl;
58+
return false;
59+
}
60+
61+
return true;
62+
}
63+
1964
Monitors::Monitors(int cpu_count, cpu_set_t *use_cpuset) : print_flag(true) {
2065
mon = std::vector<Monitor>(cpu_count);
2166
/** Init mon */
2267
for (int i = 0; i < cpu_count; i++) {
2368
disable(i);
24-
int cpucnt = 0;
69+
70+
// 直接分配第i个可用的CPU
71+
int available_cpu = -1;
72+
int count = 0;
73+
2574
for (int cpuid = 0; cpuid < helper.num_of_cpu(); cpuid++) {
2675
if (!CPU_ISSET(cpuid, use_cpuset)) {
27-
if (i == cpucnt) {
28-
mon[i].cpu_core = cpuid;
76+
if (count == i) {
77+
available_cpu = cpuid;
2978
break;
3079
}
31-
cpucnt++;
80+
count++;
3281
}
3382
}
83+
84+
if (available_cpu != -1) {
85+
mon[i].cpu_core = available_cpu;
86+
} else {
87+
std::cout << "No available CPU" << std::endl;
88+
}
3489
}
3590
}
3691
void Monitors::stop_all(const int processes) {
@@ -84,12 +139,24 @@ int Monitors::enable(uint32_t tgid, uint32_t tid, bool is_process, uint64_t pebs
84139
s = sched_setaffinity(tid, sizeof(cpu_set_t), &cpuset);
85140
if (s != 0) {
86141
if (errno == ESRCH) {
87-
SPDLOG_DEBUG("Process [{}:{}] is terminated.", tgid, tid);
88-
return -2;
142+
if (tid != tgid) {
143+
static auto thread_ids = get_thread_ids(tgid);
144+
tid = thread_ids.back();
145+
if (tid) {
146+
thread_ids.pop_back();
147+
std::cout << "set affinity for thread " << tid << std::endl;
148+
s = sched_setaffinity(tid, sizeof(cpu_set_t), &cpuset);
149+
if (s != 0) {
150+
std::cout << "Failed to setaffinity for thread " << tid << std::endl;
151+
return -2;
152+
}
153+
}
154+
} else {
155+
return -2;
156+
}
89157
} else {
90-
SPDLOG_ERROR("Failed to setaffinity");
158+
std::cout << "Failed to setaffinity" << std::endl;
91159
}
92-
throw;
93160
}
94161

95162
/* init */
@@ -106,10 +173,10 @@ int Monitors::enable(uint32_t tgid, uint32_t tid, bool is_process, uint64_t pebs
106173
SPDLOG_DEBUG("{}Process [tgid={}, tid={}]: enable to pebs.", target, mon[target].tgid,
107174
mon[target].tid); // multiple tid multiple pid
108175
mon[target].lbr_ctx = new LBR(tgid, 1000);
176+
new std::jthread(mon[target].wait, &mon, target);
109177
}
110178
SPDLOG_INFO("pid {}[tgid={}, tid={}] monitoring start", target, mon[target].tgid, mon[target].tid);
111179

112-
new std::jthread(mon[target].wait, &mon, target);
113180
return target;
114181
}
115182
void Monitors::disable(const uint32_t target) {
@@ -266,7 +333,7 @@ void Monitor::run() {
266333
} else {
267334
this->status = MONITOR_UNKNOWN;
268335
perror("Failed to signal to any of the target processes");
269-
SPDLOG_ERROR("I'm dying{} {}", this->tgid, this->tid);
336+
SPDLOG_ERROR("I'm dying {} {}", this->tgid, this->tid);
270337
}
271338
} else {
272339
this->status = MONITOR_ON;
@@ -342,7 +409,7 @@ void Monitor::wait(std::vector<Monitor> *mons, int target) {
342409
sleep_target = start_ts + wanted_delay * prev_wanted_delay;
343410
target_nsec = wanted_delay - prev_wanted_delay;
344411
interval_target = end_ts + interval_delay;
345-
if (mon.bpftime_ctx->updater->get(mon.tid))
412+
if (mon.bpftime_ctx && mon.bpftime_ctx->updater->get(mon.tid))
346413
mon.bpftime_ctx->updater->update(mon.tgid, prev_wanted_delay.tv_nsec - mon.wanted_delay.tv_nsec);
347414
else
348415
clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &interval_target, nullptr);
@@ -363,5 +430,4 @@ void Monitor::wait(std::vector<Monitor> *mons, int target) {
363430
clock_gettime(CLOCK_MONOTONIC, &end_ts);
364431
}
365432
// SPDLOG_INFO("{}:{}", prev_wanted_delay.tv_sec, prev_wanted_delay.tv_nsec);
366-
367433
}

0 commit comments

Comments
 (0)