|
| 1 | +#include <atomic> |
| 2 | +#include <condition_variable> |
1 | 3 | #include <memory> |
| 4 | +#include <mutex> |
| 5 | +#include <random> |
| 6 | +#include <thread> |
| 7 | +#include <signal.h> |
| 8 | +#include <time.h> |
2 | 9 | #include <unistd.h> |
| 10 | +#include <boost/program_options.hpp> |
3 | 11 | #include "zlog/backend/ram.h" |
4 | 12 | #include "zlog/options.h" |
5 | 13 | #include "zlog/log.h" |
6 | 14 |
|
| 15 | +namespace po = boost::program_options; |
| 16 | + |
| 17 | +class rand_data_gen { |
| 18 | + public: |
| 19 | + rand_data_gen(size_t buf_size, size_t samp_size) : |
| 20 | + buf_size_(buf_size), |
| 21 | + dist_(0, buf_size_ - samp_size - 1) |
| 22 | + {} |
| 23 | + |
| 24 | + void generate() { |
| 25 | + std::uniform_int_distribution<uint64_t> d( |
| 26 | + std::numeric_limits<uint64_t>::min(), |
| 27 | + std::numeric_limits<uint64_t>::max()); |
| 28 | + buf_.reserve(buf_size_); |
| 29 | + while (buf_.size() < buf_size_) { |
| 30 | + uint64_t val = d(gen_); |
| 31 | + buf_.append((const char *)&val, sizeof(val)); |
| 32 | + } |
| 33 | + if (buf_.size() > buf_size_) |
| 34 | + buf_.resize(buf_size_); |
| 35 | + } |
| 36 | + |
| 37 | + inline const char *sample() { |
| 38 | + assert(!buf_.empty()); |
| 39 | + return buf_.c_str() + dist_(gen_); |
| 40 | + } |
| 41 | + |
| 42 | + private: |
| 43 | + const size_t buf_size_; |
| 44 | + std::string buf_; |
| 45 | + std::default_random_engine gen_; |
| 46 | + std::uniform_int_distribution<size_t> dist_; |
| 47 | +}; |
| 48 | + |
| 49 | +static inline uint64_t __getns(clockid_t clock) |
| 50 | +{ |
| 51 | + struct timespec ts; |
| 52 | + clock_gettime(clock, &ts); |
| 53 | + return (((uint64_t)ts.tv_sec) * 1000000000ULL) + ts.tv_nsec; |
| 54 | +} |
| 55 | + |
| 56 | +static inline uint64_t getus() |
| 57 | +{ |
| 58 | + return __getns(CLOCK_MONOTONIC) / 1000; |
| 59 | +} |
| 60 | + |
| 61 | +static std::atomic<bool> shutdown; |
| 62 | +static std::atomic<uint64_t> op_count; |
| 63 | + |
| 64 | +static std::mutex lock; |
| 65 | +static std::condition_variable cond; |
| 66 | + |
| 67 | +static void sig_handler(int sig) |
| 68 | +{ |
| 69 | + shutdown = true; |
| 70 | +} |
| 71 | + |
| 72 | +static void stats_entry() |
| 73 | +{ |
| 74 | + while (true) { |
| 75 | + auto start_ops_count = op_count.load(); |
| 76 | + auto start_us = getus(); |
| 77 | + |
| 78 | + std::unique_lock<std::mutex> lk(lock); |
| 79 | + cond.wait_for(lk, std::chrono::seconds(1), |
| 80 | + [&] { return shutdown.load(); }); |
| 81 | + if (shutdown) { |
| 82 | + break; |
| 83 | + } |
| 84 | + |
| 85 | + auto end_us = getus(); |
| 86 | + auto end_ops_count = op_count.load(); |
| 87 | + |
| 88 | + auto elapsed_us = end_us - start_us; |
| 89 | + |
| 90 | + auto iops = (double)((end_ops_count - start_ops_count) * |
| 91 | + 1000000ULL) / (double)elapsed_us; |
| 92 | + |
| 93 | + std::cout << iops << std::endl; |
| 94 | + } |
| 95 | +} |
| 96 | + |
7 | 97 | int main(int argc, char **argv) |
8 | 98 | { |
9 | | - auto backend = std::unique_ptr<zlog::storage::ram::RAMBackend>( |
10 | | - new zlog::storage::ram::RAMBackend()); |
| 99 | + std::string log_name; |
| 100 | + uint32_t width; |
| 101 | + uint32_t slots; |
| 102 | + size_t entry_size; |
| 103 | + int qdepth; |
| 104 | + bool excl_open; |
| 105 | + bool verify; |
| 106 | + int runtime; |
| 107 | + std::string backend; |
| 108 | + std::string pool; |
| 109 | + std::string db_path; |
| 110 | + bool blackhole; |
| 111 | + |
| 112 | + po::options_description opts("Benchmark options"); |
| 113 | + opts.add_options() |
| 114 | + ("help", "show help message") |
| 115 | + ("name", po::value<std::string>(&log_name)->default_value("bench"), "log name") |
| 116 | + ("width", po::value<uint32_t>(&width)->default_value(10), "stripe width") |
| 117 | + ("slots", po::value<uint32_t>(&slots)->default_value(10), "object slots") |
| 118 | + ("size", po::value<size_t>(&entry_size)->default_value(1024), "entry size") |
| 119 | + ("qdepth", po::value<int>(&qdepth)->default_value(1), "queue depth") |
| 120 | + ("excl", po::bool_switch(&excl_open), "exclusive open") |
| 121 | + ("verify", po::bool_switch(&verify), "verify writes") |
| 122 | + ("runtime", po::value<int>(&runtime)->default_value(0), "runtime") |
| 123 | + |
| 124 | + ("backend", po::value<std::string>(&backend)->required(), "backend") |
| 125 | + ("pool", po::value<std::string>(&pool)->default_value("zlog"), "pool (ceph)") |
| 126 | + ("db-path", po::value<std::string>(&db_path)->default_value("/tmp/zlog.bench.db"), "db path (lmdb)") |
| 127 | + ("blackhole", po::bool_switch(&blackhole), "black hole (ram)") |
| 128 | + ; |
| 129 | + |
| 130 | + po::variables_map vm; |
| 131 | + po::store(po::parse_command_line(argc, argv, opts), vm); |
| 132 | + |
| 133 | + if (vm.count("help")) { |
| 134 | + std::cout << opts << std::endl; |
| 135 | + return 1; |
| 136 | + } |
| 137 | + |
| 138 | + po::notify(vm); |
| 139 | + |
| 140 | + runtime = std::max(runtime, 0); |
11 | 141 |
|
12 | 142 | zlog::Options options; |
13 | | - options.backend = std::move(backend); |
| 143 | + options.backend_name = backend; |
| 144 | + |
| 145 | + if (backend == "ceph") { |
| 146 | + options.backend_options["pool"] = pool; |
| 147 | + // zero-length string here causes default path search |
| 148 | + options.backend_options["conf_file"] = ""; |
| 149 | + } |
| 150 | + |
| 151 | + if (backend == "lmdb") { |
| 152 | + options.backend_options["path"] = db_path; |
| 153 | + } |
| 154 | + |
| 155 | + if (backend == "ram") { |
| 156 | + if (blackhole) { |
| 157 | + options.backend_options["blackhole"] = "true"; |
| 158 | + } |
| 159 | + } |
| 160 | + |
14 | 161 | options.create_if_missing = true; |
15 | | - options.error_if_exists = true; |
16 | | - options.init_stripe_on_create = true; |
| 162 | + options.error_if_exists = excl_open; |
| 163 | + |
| 164 | + options.stripe_width = width; |
| 165 | + options.stripe_slots = slots; |
| 166 | + options.max_inflight_ops = qdepth; |
17 | 167 |
|
18 | 168 | zlog::Log *log; |
19 | | - int ret = zlog::Log::Open(options, "mylog", &log); |
20 | | - assert(ret == 0); |
21 | | - |
22 | | -#if 0 |
23 | | - for (int i = 0; i < 100000; i++) { |
24 | | - uint64_t pos; |
25 | | - int ret = log->Append("data", &pos); |
26 | | - assert(ret == 0); |
27 | | - std::cout << pos << std::endl; |
28 | | - } |
29 | | -#else |
30 | | - std::mutex lock; |
31 | | - for (int i = 0; i < 50000; i++) { |
32 | | - int ret = log->appendAsync("data", [&](int ret, uint64_t pos) { |
33 | | - std::lock_guard<std::mutex> lk(lock); |
34 | | - std::cout << pos << " " << ret << std::endl; |
| 169 | + int ret = zlog::Log::Open(options, log_name, &log); |
| 170 | + if (ret) { |
| 171 | + std::cerr << "log::open failed: " << strerror(-ret) << std::endl; |
| 172 | + return -1; |
| 173 | + } |
| 174 | + |
| 175 | + signal(SIGINT, sig_handler); |
| 176 | + signal(SIGALRM, sig_handler); |
| 177 | + alarm(runtime); |
| 178 | + |
| 179 | + rand_data_gen dgen(1ULL << 22, entry_size); |
| 180 | + dgen.generate(); |
| 181 | + |
| 182 | + // TODO: by always logging the same entry data we may trigger low-level |
| 183 | + // compression to take affect, if such a thing exists. something to be aware |
| 184 | + // of and watch out for. |
| 185 | + const auto entry_data = std::string(dgen.sample(), entry_size); |
| 186 | + |
| 187 | + std::thread stats_thread(stats_entry); |
| 188 | + |
| 189 | + op_count = 0; |
| 190 | + while (!shutdown) { |
| 191 | + int ret = log->appendAsync(entry_data, [&](int ret, uint64_t pos) { |
| 192 | + if (ret && ret != -ESHUTDOWN) { |
| 193 | + std::cerr << "appendAsync cb failed: " << strerror(-ret) << std::endl; |
| 194 | + assert(0); |
| 195 | + return; |
| 196 | + } |
| 197 | + op_count++; |
35 | 198 | }); |
36 | | - assert(ret == 0); |
| 199 | + if (ret) { |
| 200 | + std::cerr << "appendAsync failed: " << strerror(-ret) << std::endl; |
| 201 | + assert(0); |
| 202 | + break; |
| 203 | + } |
| 204 | + } |
| 205 | + |
| 206 | + shutdown = true; |
| 207 | + cond.notify_one(); |
| 208 | + stats_thread.join(); |
| 209 | + |
| 210 | + if (verify) { |
| 211 | + uint64_t tail; |
| 212 | + auto ret = log->CheckTail(&tail); |
| 213 | + if (ret) { |
| 214 | + std::cerr << "checktail failed: " << strerror(-ret) << std::endl; |
| 215 | + } else { |
| 216 | + for (uint64_t pos = 0; pos < tail; pos++) { |
| 217 | + std::string data; |
| 218 | + ret = log->Read(pos, &data); |
| 219 | + if (ret) { |
| 220 | + std::cerr << "read failed at pos " << pos << ": " << strerror(-ret) << std::endl; |
| 221 | + } else if (data != entry_data) { |
| 222 | + std::cerr << "verify failed at pos " << pos << std::endl; |
| 223 | + assert(0); |
| 224 | + } |
| 225 | + } |
| 226 | + } |
37 | 227 | } |
38 | | -#endif |
39 | 228 |
|
40 | | - // will wait for async ops to run callbacks |
41 | | - std::cout << "done looping" << std::endl; |
42 | | - sleep(3); |
43 | | - std::cout << "done sleeping" << std::endl; |
44 | 229 | delete log; |
45 | 230 |
|
46 | 231 | return 0; |
|
0 commit comments