Skip to content

Commit d95de4f

Browse files
authored
Merge pull request ceph#58395 from pereman2/scrapper-replayer
test/allocsim: osd op scraper replayer Reviewed-by: Adam Kupczyk <[email protected]>
2 parents 96c7a23 + 7719091 commit d95de4f

File tree

1 file changed

+369
-0
lines changed

1 file changed

+369
-0
lines changed
Lines changed: 369 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,369 @@
1+
#include <algorithm>
2+
#include <cassert>
3+
#include <fcntl.h>
4+
#include <sys/mman.h>
5+
#include <sys/stat.h>
6+
#include <thread>
7+
#include <condition_variable>
8+
#include <cstdint>
9+
#include <ctime>
10+
#include <filesystem>
11+
#include <mutex>
12+
#include <rados/buffer_fwd.h>
13+
#include <rados/librados.hpp>
14+
#include <atomic>
15+
#include <fmt/format.h>
16+
#include <map>
17+
#include <memory>
18+
#include <random>
19+
#include <string>
20+
#include <iostream>
21+
#include <vector>
22+
23+
24+
using namespace std;
25+
using namespace ceph;
26+
27+
28+
static map<string, shared_ptr<string>> string_cache;
29+
static std::atomic<uint64_t> in_flight_ops(0);
30+
static std::condition_variable cv;
31+
static std::mutex in_flight_mutex;
32+
33+
enum op_type {
34+
Write,
35+
WriteFull,
36+
Read,
37+
Truncate,
38+
Zero
39+
};
40+
41+
struct Op {
42+
time_t at;
43+
op_type type;
44+
uint64_t offset;
45+
uint64_t length;
46+
shared_ptr<string> object;
47+
shared_ptr<string> collection;
48+
shared_ptr<string> who;
49+
librados::AioCompletion *completion;
50+
bufferlist read_bl;
51+
52+
Op(
53+
time_t at,
54+
op_type type,
55+
uint64_t offset,
56+
uint64_t length,
57+
shared_ptr<string> object,
58+
shared_ptr<string> collection,
59+
shared_ptr<string> who
60+
) : at(at), type(type), offset(offset), length(length), object(object), collection(collection), who(who), completion(nullptr) {}
61+
62+
};
63+
64+
struct ParserContext {
65+
map<string, shared_ptr<string>> string_cache;
66+
vector<Op> ops;
67+
char *start; // starts and ends in new line or eof
68+
char *end;
69+
uint64_t max_buffer_size;
70+
};
71+
72+
class MemoryStreamBuf : public std::streambuf {
73+
public:
74+
MemoryStreamBuf(const char* start, const char* end) {
75+
this->setg(const_cast<char*>(start), const_cast<char*>(start), const_cast<char*>(end));
76+
}
77+
};
78+
79+
class MemoryInputStream : public std::istream {
80+
MemoryStreamBuf _buffer;
81+
public:
82+
MemoryInputStream(const char* start, const char* end)
83+
: std::istream(&_buffer), _buffer(start, end) {
84+
rdbuf(&_buffer);
85+
}
86+
};
87+
88+
void gen_buffer(bufferlist& bl, uint64_t size) {
89+
std::unique_ptr<char[]> buffer = std::make_unique<char[]>(size);
90+
std::independent_bits_engine<std::default_random_engine, CHAR_BIT, unsigned char> e;
91+
std::generate(buffer.get(), buffer.get()+size, std::ref(e));
92+
bl.append(buffer.get(), size);
93+
}
94+
95+
void completion_cb(librados::completion_t cb, void *arg) {
96+
Op *op = static_cast<Op*>(arg);
97+
// Process the completed operation here
98+
// std::cout << fmt::format("Completed op {} object={} range={}~{}", op->type, *op->object, op->offset, op->length) << std::endl;
99+
100+
delete op->completion;
101+
op->completion = nullptr;
102+
if (op->type == Read) {
103+
op->read_bl.clear();
104+
}
105+
106+
{
107+
std::lock_guard<std::mutex> lock(in_flight_mutex);
108+
in_flight_ops--;
109+
}
110+
cv.notify_one();
111+
}
112+
113+
void parse_entry_point(shared_ptr<ParserContext> context) {
114+
string date, time, who, type, range, object, collection;
115+
MemoryInputStream fstream(context->start, context->end);
116+
const char* date_format_first_column = "%Y-%m-%d";
117+
// we expect this input:
118+
// 2024-05-10 12:06:24.990831+00:00 client.607247697.0:5632274 write 4096~4096 2:d03a455a:::08b0f2fd5f20f504e76c2dd3d24683a1:head 2.1c0b
119+
while (fstream >> date){
120+
// cout << date << endl;
121+
tm t;
122+
char* res = strptime(date.c_str(), date_format_first_column, &t);
123+
if (res == nullptr) {
124+
fstream.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
125+
continue;
126+
}
127+
fstream >> time >> who >> type >> range >> object >> collection;
128+
129+
date += " " + time;
130+
// cout << date << endl;
131+
// FIXME: this is wrong but it returns a reasonable bad timestamp :P
132+
const char* date_format_full = "%Y-%m-%d %H:%M:%S.%f%z";
133+
res = strptime(date.c_str(), date_format_full, &t);
134+
time_t at = mktime(&t);
135+
136+
// cout << fmt::format("{} {} {} {} {} {} {}", date, at, who, type, range, object, collection) << endl;
137+
138+
shared_ptr<string> who_ptr = make_shared<string>(who);
139+
auto who_it = string_cache.find(who);
140+
if (who_it == string_cache.end()) {
141+
string_cache.insert({ who, who_ptr });
142+
} else {
143+
who_ptr = who_it->second;
144+
}
145+
146+
shared_ptr<string> object_ptr = make_shared<string>(object);
147+
auto object_it = string_cache.find(object);
148+
if (object_it == string_cache.end()) {
149+
string_cache.insert({ object, object_ptr });
150+
} else {
151+
object_ptr = object_it->second;
152+
}
153+
154+
op_type ot;
155+
if (type == "write") {
156+
ot = Write;
157+
} else if (type == "writefull") {
158+
ot = WriteFull;
159+
} else if (type == "read") {
160+
ot = Read;
161+
} else if (type == "sparse-read") {
162+
ot = Read;
163+
} else if (type == "truncate") {
164+
ot = Truncate;
165+
} else if (type == "zero") {
166+
ot = Zero;
167+
} else {
168+
cout << "invalid type " << type << endl;
169+
exit(1);
170+
}
171+
172+
shared_ptr<string> collection_ptr = make_shared<string>(collection);
173+
auto collection_it = string_cache.find(collection);
174+
if (collection_it == string_cache.end()) {
175+
string_cache.insert({ collection, collection_ptr });
176+
} else {
177+
collection_ptr = collection_it->second;
178+
}
179+
180+
uint64_t offset = 0, length = 0;
181+
stringstream range_stream(range);
182+
string offset_str, length_str;
183+
getline(range_stream, offset_str, '~');
184+
offset = stoll(offset_str);
185+
186+
if (ot != Truncate) {
187+
// Truncate doesn't only has one number
188+
getline(range_stream, length_str, '~');
189+
length = stoll(length_str);
190+
}
191+
192+
context->max_buffer_size = max(length, context->max_buffer_size);
193+
194+
context->ops.push_back(Op(at, ot, offset, length, object_ptr, collection_ptr, who_ptr));
195+
}
196+
}
197+
198+
void worker_thread_entry(uint64_t id, uint64_t nworker_threads, vector<Op> &ops, uint64_t max_buffer_size, uint64_t io_depth, librados::IoCtx* io) {
199+
200+
bufferlist bl;
201+
gen_buffer(bl, max_buffer_size);
202+
hash<string> hasher;
203+
204+
cout << "starting thread " << io_depth << endl;
205+
for (auto &op : ops) {
206+
{
207+
std::unique_lock<std::mutex> lock(in_flight_mutex);
208+
cv.wait(lock, [&io_depth] { return in_flight_ops < io_depth; });
209+
210+
}
211+
size_t key = hasher(*op.who) % nworker_threads;
212+
if (key != id) {
213+
continue;
214+
}
215+
// cout << fmt::format("Running op {} object={} range={}~{}", op.type, *op.object, op.offset, op.length) << endl;
216+
op.completion = librados::Rados::aio_create_completion(static_cast<void*>(&op), completion_cb);
217+
switch (op.type) {
218+
case Write: {
219+
bufferlist trimmed;
220+
trimmed.substr_of(bl, 0, op.length);
221+
int ret = io->aio_write(*op.object, op.completion, trimmed, op.length, op.offset);
222+
if (ret != 0) {
223+
cout << fmt::format("Error writing ecode={}", ret) << endl;;
224+
}
225+
break;
226+
}
227+
case WriteFull: {
228+
bufferlist trimmed;
229+
trimmed.substr_of(bl, 0, op.length);
230+
int ret = io->aio_write_full(*op.object, op.completion, trimmed);
231+
if (ret != 0) {
232+
cout << fmt::format("Error writing full ecode={}", ret) << endl;;
233+
}
234+
break;
235+
}
236+
case Read: {
237+
bufferlist read;
238+
int ret = io->aio_read(*op.object, op.completion, &op.read_bl, op.length, op.offset);
239+
if (ret != 0) {
240+
cout << fmt::format("Error reading ecode={}", ret) << endl;;
241+
}
242+
break;
243+
}
244+
case Truncate: {
245+
librados::ObjectWriteOperation write_operation;
246+
write_operation.truncate(op.offset);
247+
int ret = io->aio_operate(*op.object, op.completion, &write_operation);
248+
if (ret != 0) {
249+
cout << fmt::format("Error truncating ecode={}", ret) << endl;;
250+
}
251+
break;
252+
}
253+
case Zero: {
254+
librados::ObjectWriteOperation write_operation;
255+
write_operation.zero(op.offset, op.length);
256+
int ret = io->aio_operate(*op.object, op.completion, &write_operation);
257+
if (ret != 0) {
258+
cout << fmt::format("Error zeroing ecode={}", ret) << endl;;
259+
}
260+
break;
261+
}
262+
}
263+
in_flight_ops++;
264+
}
265+
}
266+
267+
int main(int argc, char** argv) {
268+
vector<Op> ops;
269+
librados::Rados cluster;
270+
librados::IoCtx io;
271+
uint64_t max_buffer_size = 0;
272+
uint64_t io_depth = 8;
273+
string file;
274+
std::filesystem::path ceph_conf_path;
275+
276+
if (argc < 3) {
277+
cout << fmt::format("usage: ops_replayer file ceph.conf") << endl;
278+
}
279+
file = argv[1];
280+
ceph_conf_path = argv[2];
281+
cout << file << endl;
282+
283+
uint64_t nthreads = 16;
284+
vector<std::thread> parser_threads;
285+
vector<shared_ptr<ParserContext>> parser_contexts;
286+
int fd = open(file.c_str(), O_RDONLY);
287+
if (fd == -1) {
288+
cout << "Error opening file" << endl;
289+
}
290+
struct stat file_stat;
291+
fstat(fd, &file_stat);
292+
char* mapped_buffer = (char*)mmap(NULL, file_stat.st_size, PROT_READ, MAP_SHARED, fd, 0);
293+
if (mapped_buffer == nullptr) {
294+
cout << "error mapping buffer" << endl;
295+
}
296+
uint64_t start_offset = 0;
297+
uint64_t step_size = file_stat.st_size / nthreads;
298+
for (int i = 0; i < nthreads; i++) {
299+
char* end = mapped_buffer + start_offset + step_size;
300+
while(*end != '\n') {
301+
end--;
302+
}
303+
if (i == nthreads-1) {
304+
end = mapped_buffer + file_stat.st_size;
305+
}
306+
shared_ptr<ParserContext> context = make_shared<ParserContext>();
307+
context->start = mapped_buffer + start_offset;
308+
context->end = end;
309+
context->max_buffer_size = 0;
310+
parser_contexts.push_back(context);
311+
parser_threads.push_back(std::thread(parse_entry_point, context));
312+
start_offset += (end - mapped_buffer - start_offset);
313+
}
314+
for (auto& t : parser_threads) {
315+
t.join();
316+
}
317+
for (auto context : parser_contexts) {
318+
string_cache.insert(context->string_cache.begin(), context->string_cache.end());
319+
ops.insert(ops.end(), context->ops.begin(), context->ops.end());
320+
max_buffer_size = max(context->max_buffer_size, max_buffer_size);
321+
context->string_cache.clear();
322+
context->ops.clear();
323+
}
324+
325+
int ret = cluster.init2("client.admin", "ceph", 0);
326+
if (ret < 0) {
327+
std::cerr << "Couldn't init ceph! error " << ret << std::endl;
328+
return EXIT_FAILURE;
329+
}
330+
std::cout << "cluster init ready" << std::endl;
331+
332+
ret = cluster.conf_read_file(ceph_conf_path.c_str());
333+
if (ret < 0) {
334+
std::cerr << "Couldn't read the Ceph configuration file! error " << ret << std::endl;
335+
return EXIT_FAILURE;
336+
}
337+
std::cout << "cluster config ready" << std::endl;
338+
ret = cluster.connect();
339+
if (ret < 0) {
340+
std::cerr << "Couldn't connect to cluster! error " << ret << std::endl;
341+
return EXIT_FAILURE;
342+
}
343+
std::cout << "cluster connect ready" << std::endl;
344+
cluster.ioctx_create("test_pool", io);
345+
if (ret < 0) {
346+
std::cerr << "Couldn't set up ioctx! error " << ret << std::endl;
347+
exit(EXIT_FAILURE);
348+
}
349+
std::cout << "test-pool ready" << std::endl;
350+
351+
352+
// process ops
353+
// Create a buffer big enough for every operation. We will take enoguh bytes from it for every operation
354+
vector<thread> worker_threads;
355+
uint64_t nworker_threads = 16;
356+
for (int i = 0; i < nworker_threads; i++) {
357+
worker_threads.push_back(thread(worker_thread_entry, i, nworker_threads, std::ref(ops), max_buffer_size, io_depth, &io));
358+
}
359+
for (auto& worker : worker_threads) {
360+
worker.join();
361+
}
362+
while (in_flight_ops > 0) {
363+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
364+
}
365+
// io.write(const std::string &oid, bufferlist &bl, size_t len, uint64_t off)
366+
367+
cout << ops.size() << endl;
368+
return 0;
369+
}

0 commit comments

Comments
 (0)