Skip to content

Commit d61c3eb

Browse files
committed
parallel rob and fix rob
1 parent 41e9b5d commit d61c3eb

File tree

9 files changed

+342
-46
lines changed

9 files changed

+342
-46
lines changed

include/cxlcontroller.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,8 @@ class CXLController : public CXLSwitch {
8080
int num_end_points = 0;
8181
int last_index = 0;
8282
uint64_t freed = 0;
83-
uint64_t latency_lat{};
84-
uint64_t bandwidth_lat{};
83+
double latency_lat{};
84+
double bandwidth_lat{};
8585
double dramlatency;
8686
// ring buffer
8787
std::queue<lbr> ring_buffer;

include/cxlendpoint.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class CXLMemExpander : public CXLEndPoint {
6464
std::vector<occupation_info> occupation; // timestamp, pa
6565
CXLMemExpanderEvent counter{};
6666
CXLMemExpanderEvent last_counter{};
67-
67+
mutable std::shared_mutex occupationMutex_; // 使用共享互斥锁允许多个读取者
6868
// LRUCache lru_cache;
6969
// tlb map and paging map -> invalidate
7070
int last_read = 0;

include/lbr.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ struct lbr_sample {
3939
uint32_t tid;
4040
// uint64_t nr;
4141
// uint64_t ips[4];
42-
uint32_t cpu;
4342
uint64_t timestamp;
43+
uint32_t cpu;
4444
uint64_t nr2;
4545
// uint64_t hw_idx;
4646
lbr lbrs[32];

include/rob.h

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ class Rob {
3737
int64_t cur_latency = 0;
3838
int64_t totalLatency_ = 0;
3939
int64_t currentCycle_ = 0; // 当前周期
40+
int counter = 0;
4041
// 主要方法
4142
bool issue(const InstructionGroup &ins);
4243
bool canRetire(const InstructionGroup &ins);
@@ -48,5 +49,52 @@ class Rob {
4849
int64_t getCurrentCycle() const { return currentCycle_; }
4950
double getAverageLatency() const { return queue_.empty() ? 0 : static_cast<double>(totalLatency_) / queue_.size(); }
5051
};
52+
class ParallelRob {
53+
public:
54+
explicit ParallelRob(CXLController *controller, size_t size = 256, uint64_t cycle = 1687)
55+
: controller_(controller), maxSize_(size), currentCycle_(cycle) {
56+
// 初始化多个ROB分区
57+
for (int i = 0; i < NUM_PARTITIONS; ++i) {
58+
partitions_.emplace_back(RobPartition());
59+
}
60+
}
61+
62+
// 基本属性
63+
CXLController *controller_;
64+
const size_t maxSize_;
65+
std::atomic<int64_t> stallCount_{0};
66+
std::atomic<int64_t> totalLatency_{0};
67+
std::atomic<int64_t> currentCycle_{0};
68+
std::atomic<int> counter{0};
69+
70+
// 分区处理
71+
static constexpr int NUM_PARTITIONS = 8; // 可以根据CPU核心数调整
72+
73+
struct RobPartition {
74+
std::mutex mutex;
75+
std::vector<InstructionGroup> queue;
76+
int64_t cur_latency = 0;
77+
RobPartition() : mutex(std::mutex()), cur_latency(0) {};
78+
RobPartition(RobPartition&&){};
79+
};
80+
81+
// 使用智能指针容器
82+
std::vector<RobPartition> partitions_;
83+
84+
// 获取指令对应的分区
85+
size_t getPartitionIndex(const InstructionGroup& ins) {
86+
return ins.address ? (ins.address % NUM_PARTITIONS) : (ins.cycleCount % NUM_PARTITIONS);
87+
}
88+
bool processRetirement(InstructionGroup& ins, RobPartition& partition);
89+
90+
// 并行处理接口
91+
void processInstructions(const std::vector<InstructionGroup>& instructions);
92+
93+
// 性能统计
94+
int64_t getStallCount() const { return stallCount_.load(); }
95+
int64_t getCurrentCycle() const { return currentCycle_.load(); }
96+
double getAverageLatency();
97+
};
98+
5199

52100
#endif // CXLMEMSIM_ROB_H

src/cxlendpoint.cpp

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ CXLMemExpander::CXLMemExpander(int read_bw, int write_bw, int read_lat, int writ
2121
}
2222
// 修改CXLMemExpander的calculate_latency函数
2323
double CXLMemExpander::calculate_latency(const std::vector<std::tuple<uint64_t, uint64_t>> &elem, double dramlatency) {
24+
2425
if (elem.empty()) {
2526
return 0.0;
2627
}
@@ -113,6 +114,7 @@ int CXLMemExpander::insert(uint64_t timestamp, uint64_t tid, uint64_t phys_addr,
113114
if (index == this->id) {
114115
last_timestamp = last_timestamp > timestamp ? last_timestamp : timestamp; // Update the last timestamp
115116
// Check if the address is already in the map)
117+
116118
if (phys_addr != 0) {
117119
for (auto it = this->occupation.cbegin(); it != this->occupation.cend(); it++) {
118120
if (it->address == phys_addr) {
@@ -132,16 +134,24 @@ int CXLMemExpander::insert(uint64_t timestamp, uint64_t tid, uint64_t phys_addr,
132134
return 0;
133135
}
134136
std::vector<std::tuple<uint64_t, uint64_t>> CXLMemExpander::get_access(uint64_t timestamp) {
137+
// 原子操作更新计数器
135138
last_counter = CXLMemExpanderEvent(counter);
136-
// Iterate the map within the last 100ns
137-
auto res = occupation |
138-
std::views::filter([timestamp](const auto &it) { return it.timestamp > timestamp - 1000; }) |
139-
std::views::transform([](const auto &it) { return std::make_tuple(it.timestamp, it.address); }) |
140-
std::ranges::to<std::vector>();
141-
return res;
139+
140+
// 使用互斥锁保护对共享资源的访问
141+
142+
// 创建一个本地副本,减少持锁时间
143+
std::vector<std::tuple<uint64_t, uint64_t>> result;
144+
for (const auto &it : occupation) {
145+
// 如果 occupation 中的元素是指针,需要先检查指针有效性
146+
if (it.timestamp > timestamp - 1000000) {
147+
result.emplace_back(it.timestamp, it.address);
148+
}
149+
}
150+
return result;
142151
}
143152
void CXLMemExpander::set_epoch(int epoch) { this->epoch = epoch; }
144153
void CXLMemExpander::free_stats(double size) {
154+
145155
// 随机删除
146156
std::random_device rd;
147157
std::mt19937 gen(rd());

src/lbr.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,8 +155,8 @@ int LBR::read(CXLController *controller, LBRElem *elem) {
155155
if (this->pid == data->pid) {
156156
SPDLOG_ERROR("pid:{} tid:{} size:{} nr2:{} data-size:{} cpu:{} timestamp:{} hw_idx: lbrs:{} "
157157
"counters:{} {} {}",
158-
data->pid, data->tid, header->size, /*data->nr,*/ data->nr2, sizeof(*data),
159-
/*data->ips[0],*/ data->cpu, data->timestamp, /* data->hw_idx,*/ data->lbrs[0].from,
158+
data->pid, data->tid, header->size, data->nr2, sizeof(*data),
159+
data->cpu, data->timestamp, data->lbrs[0].from,
160160
data->counters[0].counters, data->counters[1].counters, data->counters[2].counters);
161161

162162
memcpy(&elem->branch_stack,

src/main.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ int main(int argc, char *argv[]) {
276276

277277
clflush = cha_vec[0];
278278
target_l2miss = cha_vec[2];
279-
uint64_t emul_delay = controller->latency_lat + controller->bandwidth_lat;
279+
uint64_t emul_delay = (controller->latency_lat + controller->bandwidth_lat) * 1000000;
280280

281281
SPDLOG_DEBUG("[{}:{}:{}] pebs: total={}, ", i, mon.tgid, mon.tid, mon.after->pebs.total);
282282

src/rob.cc

Lines changed: 88 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
11
#include "rob.h"
22
#include "policy.h"
3+
#include <atomic>
34
#include <cxxopts.hpp>
45
#include <fstream>
6+
#include <future>
57
#include <iostream>
8+
#include <mutex>
9+
#include <queue>
610
#include <ranges>
711
#include <spdlog/cfg/env.h>
812
#include <sstream>
13+
#include <thread>
14+
#include <vector>
915

1016
Helper helper{};
1117
CXLController *controller;
@@ -123,6 +129,86 @@ InstructionGroup parseGroup(const std::vector<std::string> &group) {
123129
}
124130
return ig;
125131
}
132+
void parseInParallel(std::ifstream &file, std::vector<InstructionGroup> &instructions) {
133+
std::mutex queueMutex;
134+
std::condition_variable cv;
135+
std::queue<std::vector<std::string>> groupsQueue;
136+
std::atomic<bool> done{false};
137+
std::vector<std::string> groupLines;
138+
139+
// 创建解析线程池
140+
const int numThreads = std::thread::hardware_concurrency();
141+
std::vector<std::thread> parseThreads;
142+
std::mutex resultsMutex;
143+
144+
// 消费者线程函数
145+
auto parseWorker = [&]() {
146+
while (true) {
147+
std::vector<std::string> group;
148+
{
149+
std::unique_lock<std::mutex> lock(queueMutex);
150+
cv.wait(lock, [&]() { return !groupsQueue.empty() || done; });
151+
if (groupsQueue.empty() && done)
152+
break;
153+
group = std::move(groupsQueue.front());
154+
groupsQueue.pop();
155+
}
156+
157+
auto result = parseGroup(group);
158+
if (result.retireTimestamp != 0) {
159+
std::lock_guard<std::mutex> lock(resultsMutex);
160+
instructions.emplace_back(std::move(result));
161+
}
162+
}
163+
};
164+
165+
// 启动消费者线程
166+
for (int i = 0; i < numThreads; ++i) {
167+
parseThreads.emplace_back(parseWorker);
168+
}
169+
170+
// 生产者部分 - 主线程
171+
for (const std::string &line : std::ranges::istream_view<std::string>(file)) {
172+
if (line.rfind("O3PipeView:fetch:", 0) == 0) {
173+
if (!groupLines.empty()) {
174+
{
175+
std::lock_guard<std::mutex> lock(queueMutex);
176+
groupsQueue.push(groupLines);
177+
}
178+
cv.notify_one();
179+
groupLines.clear();
180+
}
181+
}
182+
if (!line.empty()) {
183+
groupLines.push_back(line);
184+
}
185+
}
186+
187+
// 处理最后一组
188+
if (!groupLines.empty()) {
189+
{
190+
std::lock_guard<std::mutex> lock(queueMutex);
191+
groupsQueue.push(std::move(groupLines));
192+
}
193+
cv.notify_one();
194+
}
195+
196+
// 通知所有消费者线程完成
197+
{
198+
std::lock_guard<std::mutex> lock(queueMutex);
199+
done = true;
200+
}
201+
cv.notify_all();
202+
203+
// 等待所有线程完成
204+
for (auto &thread : parseThreads) {
205+
thread.join();
206+
}
207+
208+
// 排序结果
209+
std::sort(instructions.begin(), instructions.end(),
210+
[](InstructionGroup &a, InstructionGroup &b) { return a.cycleCount < b.cycleCount; });
211+
}
126212

127213
int main(int argc, char *argv[]) {
128214
spdlog::cfg::load_env_levels();
@@ -194,38 +280,9 @@ int main(int argc, char *argv[]) {
194280
return 1;
195281
}
196282

197-
std::vector<std::string> groupLines;
283+
// std::vector<std::string> groupLines;
198284
std::vector<InstructionGroup> instructions;
199-
200-
// Read the file line by line using std::ranges::istream_view.
201-
for (const std::string &line : std::ranges::istream_view<std::string>(file)) {
202-
// If the line starts with "O3PipeView:fetch:" then it's the start of a new group.
203-
if (line.rfind("O3PipeView:fetch:", 0) == 0) {
204-
// If we have an existing group, process it.
205-
if (!groupLines.empty()) {
206-
instructions.emplace_back(parseGroup(groupLines));
207-
if (instructions.back().retireTimestamp == 0) {
208-
// auto& back = instructions.back();
209-
// std::cout << "throwing out: " << back.address << back.cycleCount << "[]" << back.retireTimestamp
210-
// << std::endl;
211-
instructions.pop_back();
212-
}
213-
// Clear the group for the next one.
214-
groupLines.clear();
215-
}
216-
}
217-
// Add the current line to the group if it’s not empty.
218-
if (!line.empty()) {
219-
groupLines.push_back(line);
220-
}
221-
}
222-
// Process any remaining group.
223-
if (!groupLines.empty()) {
224-
instructions.emplace_back(parseGroup(groupLines));
225-
}
226-
227-
std::sort(instructions.begin(), instructions.end(),
228-
[](InstructionGroup &a, InstructionGroup &b) { return a.cycleCount < b.cycleCount; });
285+
parseInParallel(file, instructions);
229286
// Now simulate issuing them into the ROB
230287
for (const auto &instruction : instructions) {
231288
bool issued = false;

0 commit comments

Comments
 (0)