Skip to content

Commit 627c466

Browse files
committed
test/allocsim: replayer merge files by timestmap
Signed-off-by: Pere Diaz Bou <[email protected]>
1 parent cc8c766 commit 627c466

File tree

1 file changed

+67
-45
lines changed

1 file changed

+67
-45
lines changed

src/test/objectstore/allocsim/ops_replayer.cc

Lines changed: 67 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
#include <fstream>
1414
#include <filesystem>
1515
#include <mutex>
16-
#include <rados/buffer_fwd.h>
17-
#include <rados/librados.hpp>
16+
#include "include/rados/buffer_fwd.h"
17+
#include "include/rados/librados.hpp"
1818
#include <atomic>
1919
#include <fmt/format.h>
2020
#include <map>
@@ -321,6 +321,11 @@ void worker_thread_entry(uint64_t id, uint64_t nworker_threads, vector<Op> &ops,
321321
}
322322
}
323323

324+
325+
int op_comparison_by_date(const Op& lhs, const Op& rhs) {
326+
return lhs.at < rhs.at;
327+
}
328+
324329
void usage(po::options_description &desc) {
325330
cout << desc << std::endl;
326331
}
@@ -343,7 +348,7 @@ int main(int argc, char** argv) {
343348
po::options_description po_options("Options");
344349
po_options.add_options()
345350
("help,h", "produce help message")
346-
(",i", po::value<string>(&file)->default_value("input.txt"), "Input file (output of op_scraper.py)")
351+
("input-files,i", po::value<vector<string>>()->multitoken(), "List of input files (output of op_scraper.py). Multiple files will be merged and sorted by time order")
347352
("ceph-conf", po::value<string>(&ceph_conf_path)->default_value("ceph.conf"), "Path to ceph conf")
348353
("io-depth", po::value<uint64_t>(&io_depth)->default_value(64), "I/O depth")
349354
("parser-threads", po::value<uint64_t>(&nparser_threads)->default_value(16), "Number of parser threads")
@@ -359,53 +364,73 @@ int main(int argc, char** argv) {
359364
po::parsed_options parsed = po::command_line_parser(argc, argv).options(po_all).allow_unregistered().run();
360365
po::store( parsed, vm);
361366
po::notify(vm);
367+
362368
if (vm.count("help")) {
363369
usage(po_all);
364370
exit(EXIT_SUCCESS);
365371
}
366-
367-
// Parse input file
368-
vector<std::thread> parser_threads;
369-
vector<shared_ptr<ParserContext>> parser_contexts;
370-
int fd = open(file.c_str(), O_RDONLY);
371-
if (fd == -1) {
372-
cout << "Error opening file" << endl;
373-
}
374-
struct stat file_stat;
375-
fstat(fd, &file_stat);
376-
char* mapped_buffer = (char*)mmap(NULL, file_stat.st_size, PROT_READ, MAP_SHARED, fd, 0);
377-
if (mapped_buffer == nullptr) {
378-
cout << "error mapping buffer" << endl;
379-
}
380-
uint64_t start_offset = 0;
381-
uint64_t step_size = file_stat.st_size / nparser_threads;
382-
for (int i = 0; i < nparser_threads; i++) {
383-
char* end = mapped_buffer + start_offset + step_size;
384-
while(*end != '\n') {
385-
end--;
372+
373+
assert(vm.count("input-files") > 0);
374+
375+
vector<string> input_files = vm["input-files"].as<vector<string>>();
376+
377+
vector<shared_ptr<ParserContext>> complete_parser_contexts; // list of ALL parser contexts so that shared_ptrs do not die.
378+
for (auto &file : input_files) {
379+
// Parse input file
380+
vector<std::thread> parser_threads;
381+
vector<shared_ptr<ParserContext>> parser_contexts;
382+
cout << fmt::format("parsing file {}", file) << endl;
383+
int fd = open(file.c_str(), O_RDONLY);
384+
if (fd == -1) {
385+
cout << "Error opening file" << endl;
386+
exit(EXIT_FAILURE);
386387
}
387-
if (i == nparser_threads - 1) {
388-
end = mapped_buffer + file_stat.st_size;
388+
struct stat file_stat;
389+
fstat(fd, &file_stat);
390+
char* mapped_buffer = (char*)mmap(NULL, file_stat.st_size, PROT_READ, MAP_SHARED, fd, 0);
391+
if (mapped_buffer == nullptr) {
392+
cout << "error mapping buffer" << endl;
393+
exit(EXIT_FAILURE);
389394
}
390-
shared_ptr<ParserContext> context = make_shared<ParserContext>();
391-
context->start = mapped_buffer + start_offset;
392-
context->end = end;
393-
context->max_buffer_size = 0;
394-
parser_contexts.push_back(context);
395-
parser_threads.push_back(std::thread(parse_entry_point, context));
396-
start_offset += (end - mapped_buffer - start_offset);
397-
}
398-
for (auto& t : parser_threads) {
399-
t.join();
395+
uint64_t start_offset = 0;
396+
uint64_t step_size = file_stat.st_size / nparser_threads;
397+
for (int i = 0; i < nparser_threads; i++) {
398+
char* end = mapped_buffer + start_offset + step_size;
399+
while(*end != '\n') {
400+
end--;
401+
}
402+
if (i == nparser_threads - 1) {
403+
end = mapped_buffer + file_stat.st_size;
404+
}
405+
shared_ptr<ParserContext> context = make_shared<ParserContext>();
406+
context->start = mapped_buffer + start_offset;
407+
context->end = end;
408+
context->max_buffer_size = 0;
409+
parser_contexts.push_back(context);
410+
parser_threads.push_back(std::thread(parse_entry_point, context));
411+
start_offset += (end - mapped_buffer - start_offset);
412+
}
413+
for (auto& t : parser_threads) {
414+
t.join();
415+
}
416+
// reduce
417+
for (auto context : parser_contexts) {
418+
ops.insert(ops.end(), context->ops.begin(), context->ops.end());
419+
max_buffer_size = max(context->max_buffer_size, max_buffer_size);
420+
// context->ops.clear();
421+
}
422+
munmap(mapped_buffer, file_stat.st_size);
423+
complete_parser_contexts.insert(complete_parser_contexts.end(), parser_contexts.begin(), parser_contexts.end());
400424
}
401-
// reduce
402-
for (auto context : parser_contexts) {
403-
ops.insert(ops.end(), context->ops.begin(), context->ops.end());
404-
max_buffer_size = max(context->max_buffer_size, max_buffer_size);
405-
// context->ops.clear();
425+
426+
cout << "Sorting ops by date..." << endl;
427+
sort(ops.begin(), ops.end(), op_comparison_by_date);
428+
cout << "Sorting ops by date done" << endl;
429+
430+
if (skip_do_ops) {
431+
return EXIT_SUCCESS;
406432
}
407-
408-
433+
409434
int ret = cluster.init2("client.admin", "ceph", 0);
410435
if (ret < 0) {
411436
std::cerr << "Couldn't init ceph! error " << ret << std::endl;
@@ -433,9 +458,6 @@ int main(int argc, char** argv) {
433458
std::cout << fmt::format("pool {} ready", pool) << std::endl;
434459

435460

436-
if (skip_do_ops) {
437-
return EXIT_SUCCESS;
438-
}
439461
// process ops
440462
vector<thread> worker_threads;
441463
for (int i = 0; i < nworker_threads; i++) {

0 commit comments

Comments
 (0)