Skip to content

Commit 3f4ffe0

Browse files
committed
test/allocsim: osd op scraper replayer
For now this is a simple single threaded replayer with a 64 iodepth where we load all ops from a file and try to push them in order as soon as we can. Signed-off-by: Pere Diaz Bou <[email protected]>
1 parent c1b69fb commit 3f4ffe0

File tree

1 file changed

+230
-0
lines changed

1 file changed

+230
-0
lines changed
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
#include <algorithm>
2+
#include <cassert>
3+
#include <thread>
4+
#include <condition_variable>
5+
#include <cstdint>
6+
#include <ctime>
7+
#include <filesystem>
8+
#include <mutex>
9+
#include <rados/buffer_fwd.h>
10+
#include <rados/librados.hpp>
11+
#include <atomic>
12+
#include <fmt/format.h>
13+
#include <fstream>
14+
#include <map>
15+
#include <memory>
16+
#include <random>
17+
#include <string>
18+
#include <iostream>
19+
#include <vector>
20+
21+
22+
using namespace std;
23+
using namespace ceph;
24+
25+
26+
static map<string, shared_ptr<string>> string_cache;
27+
static std::atomic<uint64_t> in_flight_ops(0);
28+
static std::condition_variable cv;
29+
static std::mutex in_flight_mutex;
30+
31+
enum op_type {
32+
Write,
33+
Read
34+
};
35+
36+
struct Op {
37+
time_t at;
38+
op_type type;
39+
uint64_t offset;
40+
uint64_t length;
41+
shared_ptr<string> object;
42+
shared_ptr<string> collection;
43+
shared_ptr<string> who;
44+
librados::AioCompletion *completion;
45+
bufferlist read_bl;
46+
47+
Op(
48+
time_t at,
49+
op_type type,
50+
uint64_t offset,
51+
uint64_t length,
52+
shared_ptr<string> object,
53+
shared_ptr<string> collection,
54+
shared_ptr<string> who
55+
) : at(at), type(type), offset(offset), length(length), object(object), collection(collection), who(who), completion(nullptr) {}
56+
57+
};
58+
59+
void gen_buffer(bufferlist& bl, uint64_t size) {
60+
std::unique_ptr<char[]> buffer = std::make_unique<char[]>(size);
61+
std::independent_bits_engine<std::default_random_engine, CHAR_BIT, unsigned char> e;
62+
std::generate(buffer.get(), buffer.get()+size, std::ref(e));
63+
bl.append(buffer.get(), size);
64+
}
65+
66+
void completion_cb(librados::completion_t cb, void *arg) {
67+
Op *op = static_cast<Op*>(arg);
68+
// Process the completed operation here
69+
std::cout << fmt::format("Completed op {} object={} range={}~{}", op->type, *op->object, op->offset, op->length) << std::endl;
70+
71+
delete op->completion;
72+
op->completion = nullptr;
73+
if (op->type == Read) {
74+
op->read_bl.clear();
75+
}
76+
77+
{
78+
std::lock_guard<std::mutex> lock(in_flight_mutex);
79+
in_flight_ops--;
80+
}
81+
cv.notify_one();
82+
}
83+
84+
int main(int argc, char** argv) {
85+
vector<Op> ops;
86+
librados::Rados cluster;
87+
librados::IoCtx io;
88+
uint64_t max_buffer_size = 0;
89+
uint64_t io_depth = 64;
90+
string file;
91+
std::filesystem::path ceph_conf_path;
92+
93+
if (argc < 3) {
94+
cout << fmt::format("usage: ops_replayer file ceph.conf") << endl;
95+
}
96+
file = argv[1];
97+
ceph_conf_path = argv[2];
98+
cout << file << endl;
99+
100+
101+
102+
string date, time, who, type, range, object, collection;
103+
ifstream fstream(file, ifstream::in);
104+
const char* date_format_first_column = "%Y-%m-%d";
105+
// we expect this input:
106+
// 2024-05-10 12:06:24.990831+00:00 client.607247697.0:5632274 write 4096~4096 2:d03a455a:::08b0f2fd5f20f504e76c2dd3d24683a1:head 2.1c0b
107+
while (fstream >> date){
108+
cout << date << endl;
109+
tm t;
110+
char* res = strptime(date.c_str(), date_format_first_column, &t);
111+
if (res == nullptr) {
112+
fstream.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
113+
continue;
114+
}
115+
fstream >> time >> who >> type >> range >> object >> collection;
116+
117+
date += " " + time;
118+
cout << date << endl;
119+
// FIXME: this is wrong but it returns a reasonable bad timestamp :P
120+
const char* date_format_full = "%Y-%m-%d %H:%M:%S.%f%z";
121+
res = strptime(date.c_str(), date_format_full, &t);
122+
time_t at = mktime(&t);
123+
124+
cout << fmt::format("{} {} {} {} {} {} {}", date, at, who, type, range, object, collection) << endl;
125+
126+
shared_ptr<string> who_ptr = make_shared<string>(who);
127+
auto who_it = string_cache.find(who);
128+
if (who_it == string_cache.end()) {
129+
string_cache.insert({ who, who_ptr });
130+
} else {
131+
who_ptr = who_it->second;
132+
}
133+
134+
shared_ptr<string> object_ptr = make_shared<string>(object);
135+
auto object_it = string_cache.find(object);
136+
if (object_it == string_cache.end()) {
137+
string_cache.insert({ object, object_ptr });
138+
} else {
139+
object_ptr = object_it->second;
140+
}
141+
142+
shared_ptr<string> collection_ptr = make_shared<string>(collection);
143+
auto collection_it = string_cache.find(collection);
144+
if (collection_it == string_cache.end()) {
145+
string_cache.insert({ collection, collection_ptr });
146+
} else {
147+
collection_ptr = collection_it->second;
148+
}
149+
150+
uint64_t offset = 0, length = 0;
151+
stringstream range_stream(range);
152+
string offset_str, length_str;
153+
getline(range_stream, offset_str, '~');
154+
getline(range_stream, length_str, '~');
155+
offset = stoll(offset_str);
156+
length = stoll(length_str);
157+
158+
max_buffer_size = max(length, max_buffer_size);
159+
160+
op_type ot = type == "write" ? Write : Read;
161+
ops.push_back(Op(at, ot, offset, length, object_ptr, collection_ptr, who_ptr));
162+
}
163+
164+
int ret = cluster.init2("client.admin", "ceph", 0);
165+
if (ret < 0) {
166+
std::cerr << "Couldn't init ceph! error " << ret << std::endl;
167+
return EXIT_FAILURE;
168+
}
169+
std::cout << "cluster init ready" << std::endl;
170+
171+
ret = cluster.conf_read_file(ceph_conf_path.c_str());
172+
if (ret < 0) {
173+
std::cerr << "Couldn't read the Ceph configuration file! error " << ret << std::endl;
174+
return EXIT_FAILURE;
175+
}
176+
std::cout << "cluster config ready" << std::endl;
177+
ret = cluster.connect();
178+
if (ret < 0) {
179+
std::cerr << "Couldn't connect to cluster! error " << ret << std::endl;
180+
return EXIT_FAILURE;
181+
}
182+
std::cout << "cluster connect ready" << std::endl;
183+
cluster.ioctx_create("test_pool", io);
184+
if (ret < 0) {
185+
std::cerr << "Couldn't set up ioctx! error " << ret << std::endl;
186+
exit(EXIT_FAILURE);
187+
}
188+
std::cout << "test-pool ready" << std::endl;
189+
190+
191+
// process ops
192+
// Create a buffer big enough for every operation. We will take enoguh bytes from it for every operation
193+
bufferlist bl;
194+
gen_buffer(bl, max_buffer_size);
195+
196+
for (auto &op : ops) {
197+
{
198+
std::unique_lock<std::mutex> lock(in_flight_mutex);
199+
cv.wait(lock, [&io_depth] { return in_flight_ops < io_depth; });
200+
201+
}
202+
cout << fmt::format("Running op {} object={} range={}~{}", op.type, *op.object, op.offset, op.length) << endl;
203+
op.completion = librados::Rados::aio_create_completion(static_cast<void*>(&op), completion_cb);
204+
switch (op.type) {
205+
case Write: {
206+
int ret = io.aio_write(*op.object, op.completion, bl, op.length, op.offset);
207+
if (ret != 0) {
208+
cout << fmt::format("Error writing ecode={}", ret) << endl;;
209+
}
210+
break;
211+
}
212+
case Read: {
213+
bufferlist read;
214+
int ret = io.aio_read(*op.object, op.completion, &op.read_bl, op.length, op.offset);
215+
if (ret != 0) {
216+
cout << fmt::format("Error reading ecode={}", ret) << endl;;
217+
}
218+
break;
219+
}
220+
}
221+
in_flight_ops++;
222+
}
223+
while (in_flight_ops > 0) {
224+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
225+
}
226+
// io.write(const std::string &oid, bufferlist &bl, size_t len, uint64_t off)
227+
228+
cout << ops.size() << endl;
229+
return 0;
230+
}

0 commit comments

Comments
 (0)