Skip to content

Commit b02da3d

Browse files
authored
Merge pull request ceph#58748 from pereman2/replayer-merge-files
test/allocsim: Replayer merge files
2 parents 9f9be26 + 627c466 commit b02da3d

File tree

1 file changed

+149
-73
lines changed

1 file changed

+149
-73
lines changed

src/test/objectstore/allocsim/ops_replayer.cc

Lines changed: 149 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
#include <algorithm>
2+
#include <boost/program_options/value_semantic.hpp>
23
#include <cassert>
4+
#include <cctype>
35
#include <cstdlib>
46
#include <fcntl.h>
57
#include <sys/mman.h>
@@ -8,6 +10,7 @@
810
#include <condition_variable>
911
#include <cstdint>
1012
#include <ctime>
13+
#include <fstream>
1114
#include <filesystem>
1215
#include <mutex>
1316
#include "include/rados/buffer_fwd.h"
@@ -30,8 +33,20 @@ namespace po = boost::program_options;
3033
using namespace std;
3134
using namespace ceph;
3235

36+
// compare shared_ptr<string>
37+
struct StringPtrCompare
38+
{
39+
int operator()(const shared_ptr<string>& lhs, const shared_ptr<string>& rhs) const {
40+
if (lhs && rhs) {
41+
// Compare the content of the strings
42+
return *lhs < *rhs;
43+
}
44+
return lhs < rhs;
45+
}
46+
};
3347

34-
static map<string, shared_ptr<string>> string_cache;
48+
49+
static set<shared_ptr<string>, StringPtrCompare> string_cache;
3550
static std::atomic<uint64_t> in_flight_ops(0);
3651
static std::condition_variable cv;
3752
static std::mutex in_flight_mutex;
@@ -68,7 +83,9 @@ struct Op {
6883
};
6984

7085
struct ParserContext {
71-
map<string, shared_ptr<string>> string_cache;
86+
set<shared_ptr<string>, StringPtrCompare> collection_cache;
87+
set<shared_ptr<string>, StringPtrCompare> object_cache;
88+
set<shared_ptr<string>, StringPtrCompare> who_cache;
7289
vector<Op> ops;
7390
char *start; // starts and ends in new line or eof
7491
char *end;
@@ -116,73 +133,106 @@ void completion_cb(librados::completion_t cb, void *arg) {
116133
cv.notify_one();
117134
}
118135

136+
137+
uint64_t timestamp_parser(std::string& date) {
138+
uint64_t timestamp = 0;
139+
uint64_t year, month, day, hour, minute, second;
140+
// expeted format
141+
// 2024-05-10 12:06:24.792232+00:00
142+
// 0123456789012345678------------
143+
year = std::stoull(date.substr(0, 4));
144+
month = std::stoull(date.substr(5, 2));
145+
day = std::stoull(date.substr(8, 2));
146+
hour = std::stoull(date.substr(11, 2));
147+
minute = std::stoull(date.substr(14, 2));
148+
second = std::stoull(date.substr(17, 2));
149+
// SECONDS SINCE JAN 01 1970. (UTC), we don't care about timestamp timezone accuracy
150+
timestamp += (year - 1970) * 365 * 24 * 60 * 60;
151+
timestamp += (month * 30 * 24 * 60 * 60); // Yes, 30 day month is the best format ever and you cannot complain
152+
timestamp += (day * 24 * 60 * 60);
153+
timestamp += (hour * 60 * 60);
154+
timestamp += (minute * 60);
155+
timestamp += second;
156+
return timestamp;
157+
}
158+
119159
void parse_entry_point(shared_ptr<ParserContext> context) {
120160
cout << fmt::format("Starting parser thread start={:p} end={:p}", context->start, context->end) << endl;
121161

122162
string date, time, who, type, range, object, collection;
123163
MemoryInputStream fstream(context->start, context->end);
124-
const char* date_format_first_column = "%Y-%m-%d";
125164
// we expect this input:
126165
// 2024-05-10 12:06:24.990831+00:00 client.607247697.0:5632274 write 4096~4096 2:d03a455a:::08b0f2fd5f20f504e76c2dd3d24683a1:head 2.1c0b
127166
while (fstream >> date){
128167
// cout << date << endl;
129-
tm t;
130-
char* res = strptime(date.c_str(), date_format_first_column, &t);
131-
if (res == nullptr) {
168+
if (!(date.size() > 4 && isdigit(date[0]) && isdigit(date[1]) && isdigit(date[2]) && isdigit(date[3]) && date[4] == '-')) {
132169
fstream.ignore(std::numeric_limits<std::streamsize>::max(), '\n');
133170
continue;
171+
134172
}
135173
fstream >> time >> who >> type >> range >> object >> collection;
136174

137175
date += " " + time;
138176
// cout << date << endl;
139177
// FIXME: this is wrong but it returns a reasonable bad timestamp :P
140-
const char* date_format_full = "%Y-%m-%d %H:%M:%S.%f%z";
141-
res = strptime(date.c_str(), date_format_full, &t);
142-
time_t at = mktime(&t);
178+
time_t at = timestamp_parser(date);
143179

144180
// cout << fmt::format("{} {} {} {} {} {} {}", date, at, who, type, range, object, collection) << endl;
145181

146182
shared_ptr<string> who_ptr = make_shared<string>(who);
147-
auto who_it = string_cache.find(who);
148-
if (who_it == string_cache.end()) {
149-
string_cache.insert({ who, who_ptr });
183+
auto who_it = context->who_cache.find(who_ptr);
184+
if (who_it == context->who_cache.end()) {
185+
context->who_cache.insert(who_ptr);
150186
} else {
151-
who_ptr = who_it->second;
187+
who_ptr = *who_it;
152188
}
153189

154190
shared_ptr<string> object_ptr = make_shared<string>(object);
155-
auto object_it = string_cache.find(object);
156-
if (object_it == string_cache.end()) {
157-
string_cache.insert({ object, object_ptr });
191+
auto object_it = context->object_cache.find(object_ptr);
192+
if (object_it == context->object_cache.end()) {
193+
context->object_cache.insert(object_ptr);
158194
} else {
159-
object_ptr = object_it->second;
195+
object_ptr = *object_it;
160196
}
161197

162198
op_type ot;
163-
if (type == "write") {
164-
ot = Write;
165-
} else if (type == "writefull") {
166-
ot = WriteFull;
167-
} else if (type == "read") {
199+
switch (type[0]) {
200+
case 'r': {
168201
ot = Read;
169-
} else if (type == "sparse-read") {
202+
break;
203+
}
204+
case 's': {
170205
ot = Read;
171-
} else if (type == "truncate") {
172-
ot = Truncate;
173-
} else if (type == "zero") {
206+
break;
207+
}
208+
case 'z': {
174209
ot = Zero;
175-
} else {
210+
break;
211+
}
212+
case 't': {
213+
ot = Truncate;
214+
break;
215+
}
216+
case 'w': {
217+
if (type.size() > 6) {
218+
ot = WriteFull;
219+
} else {
220+
ot = Write;
221+
}
222+
break;
223+
}
224+
default: {
176225
cout << "invalid type " << type << endl;
177226
exit(1);
227+
}
178228
}
179229

180230
shared_ptr<string> collection_ptr = make_shared<string>(collection);
181-
auto collection_it = string_cache.find(collection);
182-
if (collection_it == string_cache.end()) {
183-
string_cache.insert({ collection, collection_ptr });
231+
auto collection_it = context->collection_cache.find(collection_ptr);
232+
if (collection_it == context->collection_cache.end()) {
233+
context->collection_cache.insert(collection_ptr);
184234
} else {
185-
collection_ptr = collection_it->second;
235+
collection_ptr = *collection_it;
186236
}
187237

188238
uint64_t offset = 0, length = 0;
@@ -271,6 +321,11 @@ void worker_thread_entry(uint64_t id, uint64_t nworker_threads, vector<Op> &ops,
271321
}
272322
}
273323

324+
325+
int op_comparison_by_date(const Op& lhs, const Op& rhs) {
326+
return lhs.at < rhs.at;
327+
}
328+
274329
void usage(po::options_description &desc) {
275330
cout << desc << std::endl;
276331
}
@@ -288,16 +343,18 @@ int main(int argc, char** argv) {
288343
string file("input.txt");
289344
string ceph_conf_path("./ceph.conf");
290345
string pool("test_pool");
346+
bool skip_do_ops = false;
291347

292348
po::options_description po_options("Options");
293349
po_options.add_options()
294350
("help,h", "produce help message")
295-
(",i", po::value<string>(&file)->default_value("input.txt"), "Input file (output of op_scraper.py)")
351+
("input-files,i", po::value<vector<string>>()->multitoken(), "List of input files (output of op_scraper.py). Multiple files will be merged and sorted by time order")
296352
("ceph-conf", po::value<string>(&ceph_conf_path)->default_value("ceph.conf"), "Path to ceph conf")
297353
("io-depth", po::value<uint64_t>(&io_depth)->default_value(64), "I/O depth")
298354
("parser-threads", po::value<uint64_t>(&nparser_threads)->default_value(16), "Number of parser threads")
299355
("worker-threads", po::value<uint64_t>(&nworker_threads)->default_value(16), "Number of I/O worker threads")
300356
("pool", po::value<string>(&pool)->default_value("test_pool"), "Pool to use for I/O")
357+
("skip-do-ops", po::bool_switch(&skip_do_ops)->default_value(false), "Skip doing operations")
301358
;
302359

303360
po::options_description po_all("All options");
@@ -307,54 +364,73 @@ int main(int argc, char** argv) {
307364
po::parsed_options parsed = po::command_line_parser(argc, argv).options(po_all).allow_unregistered().run();
308365
po::store( parsed, vm);
309366
po::notify(vm);
367+
310368
if (vm.count("help")) {
311369
usage(po_all);
312370
exit(EXIT_SUCCESS);
313371
}
314-
315-
// Parse input file
316-
vector<std::thread> parser_threads;
317-
vector<shared_ptr<ParserContext>> parser_contexts;
318-
int fd = open(file.c_str(), O_RDONLY);
319-
if (fd == -1) {
320-
cout << "Error opening file" << endl;
321-
}
322-
struct stat file_stat;
323-
fstat(fd, &file_stat);
324-
char* mapped_buffer = (char*)mmap(NULL, file_stat.st_size, PROT_READ, MAP_SHARED, fd, 0);
325-
if (mapped_buffer == nullptr) {
326-
cout << "error mapping buffer" << endl;
327-
}
328-
uint64_t start_offset = 0;
329-
uint64_t step_size = file_stat.st_size / nparser_threads;
330-
for (int i = 0; i < nparser_threads; i++) {
331-
char* end = mapped_buffer + start_offset + step_size;
332-
while(*end != '\n') {
333-
end--;
372+
373+
assert(vm.count("input-files") > 0);
374+
375+
vector<string> input_files = vm["input-files"].as<vector<string>>();
376+
377+
vector<shared_ptr<ParserContext>> complete_parser_contexts; // list of ALL parser contexts so that shared_ptrs do not die.
378+
for (auto &file : input_files) {
379+
// Parse input file
380+
vector<std::thread> parser_threads;
381+
vector<shared_ptr<ParserContext>> parser_contexts;
382+
cout << fmt::format("parsing file {}", file) << endl;
383+
int fd = open(file.c_str(), O_RDONLY);
384+
if (fd == -1) {
385+
cout << "Error opening file" << endl;
386+
exit(EXIT_FAILURE);
334387
}
335-
if (i == nparser_threads - 1) {
336-
end = mapped_buffer + file_stat.st_size;
388+
struct stat file_stat;
389+
fstat(fd, &file_stat);
390+
char* mapped_buffer = (char*)mmap(NULL, file_stat.st_size, PROT_READ, MAP_SHARED, fd, 0);
391+
if (mapped_buffer == nullptr) {
392+
cout << "error mapping buffer" << endl;
393+
exit(EXIT_FAILURE);
337394
}
338-
shared_ptr<ParserContext> context = make_shared<ParserContext>();
339-
context->start = mapped_buffer + start_offset;
340-
context->end = end;
341-
context->max_buffer_size = 0;
342-
parser_contexts.push_back(context);
343-
parser_threads.push_back(std::thread(parse_entry_point, context));
344-
start_offset += (end - mapped_buffer - start_offset);
345-
}
346-
for (auto& t : parser_threads) {
347-
t.join();
395+
uint64_t start_offset = 0;
396+
uint64_t step_size = file_stat.st_size / nparser_threads;
397+
for (int i = 0; i < nparser_threads; i++) {
398+
char* end = mapped_buffer + start_offset + step_size;
399+
while(*end != '\n') {
400+
end--;
401+
}
402+
if (i == nparser_threads - 1) {
403+
end = mapped_buffer + file_stat.st_size;
404+
}
405+
shared_ptr<ParserContext> context = make_shared<ParserContext>();
406+
context->start = mapped_buffer + start_offset;
407+
context->end = end;
408+
context->max_buffer_size = 0;
409+
parser_contexts.push_back(context);
410+
parser_threads.push_back(std::thread(parse_entry_point, context));
411+
start_offset += (end - mapped_buffer - start_offset);
412+
}
413+
for (auto& t : parser_threads) {
414+
t.join();
415+
}
416+
// reduce
417+
for (auto context : parser_contexts) {
418+
ops.insert(ops.end(), context->ops.begin(), context->ops.end());
419+
max_buffer_size = max(context->max_buffer_size, max_buffer_size);
420+
// context->ops.clear();
421+
}
422+
munmap(mapped_buffer, file_stat.st_size);
423+
complete_parser_contexts.insert(complete_parser_contexts.end(), parser_contexts.begin(), parser_contexts.end());
348424
}
349-
// reduce
350-
for (auto context : parser_contexts) {
351-
string_cache.insert(context->string_cache.begin(), context->string_cache.end());
352-
ops.insert(ops.end(), context->ops.begin(), context->ops.end());
353-
max_buffer_size = max(context->max_buffer_size, max_buffer_size);
354-
context->string_cache.clear();
355-
context->ops.clear();
425+
426+
cout << "Sorting ops by date..." << endl;
427+
sort(ops.begin(), ops.end(), op_comparison_by_date);
428+
cout << "Sorting ops by date done" << endl;
429+
430+
if (skip_do_ops) {
431+
return EXIT_SUCCESS;
356432
}
357-
433+
358434
int ret = cluster.init2("client.admin", "ceph", 0);
359435
if (ret < 0) {
360436
std::cerr << "Couldn't init ceph! error " << ret << std::endl;
@@ -395,5 +471,5 @@ int main(int argc, char** argv) {
395471
}
396472

397473
cout << ops.size() << endl;
398-
return 0;
474+
return EXIT_SUCCESS;
399475
}

0 commit comments

Comments
 (0)