Skip to content
This repository was archived by the owner on Dec 8, 2021. It is now read-only.

Commit 272b31e

Browse files
authored
feat: implement benchmark for read throughput (#1011)
1 parent 8a0b81b commit 272b31e

File tree

1 file changed

+232
-25
lines changed

1 file changed

+232
-25
lines changed

google/cloud/spanner/integration_tests/single_row_throughput_benchmark.cc

Lines changed: 232 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ struct Config {
3535

3636
std::string project_id;
3737
std::string instance_id;
38+
std::string database_id;
3839

3940
int samples = 2;
4041
std::chrono::seconds iteration_duration = std::chrono::seconds(5);
@@ -43,13 +44,16 @@ struct Config {
4344
int maximum_threads = 4;
4445
int minimum_clients = 1;
4546
int maximum_clients = 4;
46-
std::int64_t table_size = 10 * 1000 * 1000;
47+
48+
std::int64_t table_size = 1000 * 1000L;
4749
};
4850

51+
std::ostream& operator<<(std::ostream& os, Config const& config);
52+
4953
struct SingleRowThroughputSample {
5054
int client_count;
5155
int thread_count;
52-
int insert_count;
56+
int event_count;
5357
std::chrono::microseconds elapsed;
5458
};
5559

@@ -100,24 +104,7 @@ int main(int argc, char* argv[]) {
100104
cloud_spanner::Database database(
101105
config.project_id, config.instance_id,
102106
google::cloud::spanner_testing::RandomDatabaseName(generator));
103-
104-
std::cout << std::boolalpha << "# Experiment: Single Row Throughput"
105-
<< "\n# Project: " << config.project_id
106-
<< "\n# Instance: " << config.instance_id
107-
<< "\n# Database: " << database.database_id()
108-
<< "\n# Samples: " << config.samples
109-
<< "\n# Minimum Threads: " << config.minimum_threads
110-
<< "\n# Maximum Threads: " << config.maximum_threads
111-
<< "\n# Minimum Clients: " << config.minimum_clients
112-
<< "\n# Maximum Clients: " << config.maximum_clients
113-
<< "\n# Iteration Duration: " << config.iteration_duration.count()
114-
<< "s"
115-
<< "\n# Table Size: " << config.table_size
116-
<< "\n# Compiler: " << cloud_spanner::internal::CompilerId() << "-"
117-
<< cloud_spanner::internal::CompilerVersion()
118-
<< "\n# Build Flags: " << cloud_spanner::internal::BuildFlags()
119-
<< "\n"
120-
<< std::flush;
107+
config.database_id = database.database_id();
121108

122109
auto available = AvailableExperiments();
123110
auto e = available.find(config.experiment);
@@ -145,8 +132,7 @@ int main(int argc, char* argv[]) {
145132
return 1;
146133
}
147134

148-
std::cout << "ClientCount,ThreadCount,InsertCount,ElapsedTime\n"
149-
<< std::flush;
135+
std::cout << "ClientCount,ThreadCount,EventCount,ElapsedTime\n" << std::flush;
150136

151137
std::mutex cout_mu;
152138
auto cout_sink =
@@ -155,7 +141,7 @@ int main(int argc, char* argv[]) {
155141
std::unique_lock<std::mutex> lk(cout_mu);
156142
for (auto const& s : samples) {
157143
std::cout << std::boolalpha << s.client_count << ',' << s.thread_count
158-
<< ',' << s.insert_count << ',' << s.elapsed.count() << '\n'
144+
<< ',' << s.event_count << ',' << s.elapsed.count() << '\n'
159145
<< std::flush;
160146
}
161147
};
@@ -166,14 +152,33 @@ int main(int argc, char* argv[]) {
166152

167153
auto drop = admin_client.DropDatabase(database);
168154
if (!drop.ok()) {
169-
std::cerr << "Error dropping database: " << drop << "\n";
155+
std::cerr << "# Error dropping database: " << drop << "\n";
170156
}
171157
std::cout << "# Experiment finished, database dropped\n";
172158
return 0;
173159
}
174160

175161
namespace {
176162

163+
std::ostream& operator<<(std::ostream& os, Config const& config) {
164+
return os << std::boolalpha << "# Experiment: " << config.experiment
165+
<< "\n# Project: " << config.project_id
166+
<< "\n# Instance: " << config.instance_id
167+
<< "\n# Database: " << config.database_id
168+
<< "\n# Samples: " << config.samples
169+
<< "\n# Minimum Threads: " << config.minimum_threads
170+
<< "\n# Maximum Threads: " << config.maximum_threads
171+
<< "\n# Minimum Clients: " << config.minimum_clients
172+
<< "\n# Maximum Clients: " << config.maximum_clients
173+
<< "\n# Iteration Duration: " << config.iteration_duration.count()
174+
<< "s"
175+
<< "\n# Table Size: " << config.table_size
176+
<< "\n# Compiler: " << cloud_spanner::internal::CompilerId() << "-"
177+
<< cloud_spanner::internal::CompilerVersion()
178+
<< "\n# Build Flags: " << cloud_spanner::internal::BuildFlags()
179+
<< "\n";
180+
}
181+
177182
using RandomKeyGenerator = std::function<std::int64_t()>;
178183
using ErrorSink = std::function<void(std::vector<google::cloud::Status>)>;
179184

@@ -183,6 +188,7 @@ class InsertOrUpdateExperiment : public Experiment {
183188

184189
void Run(Config const& config, cloud_spanner::Database const& database,
185190
SampleSink const& sink) override {
191+
std::cout << config << std::flush;
186192
// Create enough clients for the worst case
187193
std::vector<cloud_spanner::Client> clients;
188194
std::cout << "# Creating clients " << std::flush;
@@ -276,16 +282,217 @@ class InsertOrUpdateExperiment : public Experiment {
276282
}
277283
};
278284

285+
class ReadExperiment : public Experiment {
286+
public:
287+
ReadExperiment() : generator_(google::cloud::internal::MakeDefaultPRNG()) {}
288+
289+
void SetUpTask(Config const& config, cloud_spanner::Client client,
290+
int task_count, int task_id) {
291+
std::string value = [this] {
292+
std::lock_guard<std::mutex> lk(mu_);
293+
return google::cloud::internal::Sample(
294+
generator_, 1024, "#@$%^&*()-=+_0123456789[]{}|;:,./<>?");
295+
}();
296+
297+
auto mutation = cloud_spanner::InsertOrUpdateMutationBuilder(
298+
"KeyValue", {"Key", "Data"});
299+
int current_mutations = 0;
300+
301+
auto maybe_flush = [&mutation, &current_mutations, &client,
302+
this](bool force) {
303+
if (current_mutations == 0) {
304+
return;
305+
}
306+
if (!force && current_mutations < 1000) {
307+
return;
308+
}
309+
auto m = std::move(mutation).Build();
310+
auto result = client.Commit([&m](cloud_spanner::Transaction const&) {
311+
return cloud_spanner::Mutations{m};
312+
});
313+
if (!result) {
314+
std::lock_guard<std::mutex> lk(mu_);
315+
std::cerr << "# Error in Commit() " << result.status() << "\n";
316+
}
317+
mutation = cloud_spanner::InsertOrUpdateMutationBuilder("KeyValue",
318+
{"Key", "Data"});
319+
current_mutations = 0;
320+
};
321+
auto force_flush = [&maybe_flush] { maybe_flush(true); };
322+
auto flush_as_needed = [&maybe_flush] { maybe_flush(false); };
323+
324+
auto const report_period =
325+
(std::max)(static_cast<std::int64_t>(2), config.table_size / 50);
326+
for (std::int64_t key = 0; key != config.table_size; ++key) {
327+
// Each thread does a fraction of the key space.
328+
if (key % task_count != task_id) continue;
329+
// Have one of the threads report progress about 50 times.
330+
if (task_id == 0 && key % report_period == 0) {
331+
std::cout << '.' << std::flush;
332+
}
333+
mutation.EmplaceRow(key, value);
334+
current_mutations++;
335+
flush_as_needed();
336+
}
337+
force_flush();
338+
}
339+
340+
void SetUp(Config const& config,
341+
cloud_spanner::Database const& database) override {
342+
// We need to populate some data or all the requests to read will fail.
343+
cloud_spanner::Client client(cloud_spanner::MakeConnection(database));
344+
std::cout << "# Populating database " << std::flush;
345+
int const task_count = 16;
346+
std::vector<std::future<void>> tasks(task_count);
347+
int task_id = 0;
348+
for (auto& t : tasks) {
349+
t = std::async(
350+
std::launch::async,
351+
[this, &config, &client](int tc, int ti) {
352+
SetUpTask(config, client, tc, ti);
353+
},
354+
task_count, task_id++);
355+
}
356+
for (auto& t : tasks) {
357+
t.get();
358+
}
359+
std::cout << " DONE\n";
360+
}
361+
362+
void Run(Config const& config, cloud_spanner::Database const& database,
363+
SampleSink const& sink) override {
364+
std::cout << config << std::flush;
365+
// Create enough clients for the worst case
366+
std::vector<cloud_spanner::Client> clients;
367+
std::cout << "# Creating clients " << std::flush;
368+
for (int i = 0; i != config.maximum_clients; ++i) {
369+
clients.emplace_back(cloud_spanner::Client(cloud_spanner::MakeConnection(
370+
database, cloud_spanner::ConnectionOptions().set_channel_pool_domain(
371+
"task:" + std::to_string(i)))));
372+
std::cout << '.' << std::flush;
373+
}
374+
std::cout << " DONE\n";
375+
376+
std::uniform_int_distribution<int> thread_count_gen(config.minimum_threads,
377+
config.maximum_threads);
378+
379+
for (int i = 0; i != config.samples; ++i) {
380+
auto const thread_count = thread_count_gen(generator_);
381+
// TODO(#1000) - avoid deadlocks with more than 100 threads per client
382+
auto min_clients =
383+
(std::max)(thread_count / 100 + 1, config.minimum_clients);
384+
auto const client_count = std::uniform_int_distribution<std::size_t>(
385+
min_clients, clients.size() - 1)(generator_);
386+
std::vector<cloud_spanner::Client> iteration_clients(
387+
clients.begin(), clients.begin() + client_count);
388+
RunIteration(config, iteration_clients, thread_count, sink);
389+
}
390+
}
391+
392+
void RunIteration(Config const& config,
393+
std::vector<cloud_spanner::Client> const& clients,
394+
int thread_count, SampleSink const& sink) {
395+
std::uniform_int_distribution<std::int64_t> random_key(0,
396+
config.table_size);
397+
RandomKeyGenerator locked_random_key = [this, &random_key] {
398+
std::lock_guard<std::mutex> lk(mu_);
399+
return random_key(generator_);
400+
};
401+
402+
std::mutex cerr_mu;
403+
ErrorSink error_sink =
404+
[&cerr_mu](std::vector<google::cloud::Status> const& errors) {
405+
std::lock_guard<std::mutex> lk(cerr_mu);
406+
for (auto const& e : errors) {
407+
std::cerr << "# " << e << "\n";
408+
}
409+
};
410+
411+
std::vector<std::future<int>> tasks(thread_count);
412+
auto start = std::chrono::steady_clock::now();
413+
int task_id = 0;
414+
for (auto& t : tasks) {
415+
auto client = clients[task_id++ % clients.size()];
416+
t = std::async(std::launch::async, &ReadExperiment::RunTask, this, config,
417+
client, locked_random_key, error_sink);
418+
}
419+
int total_count = 0;
420+
for (auto& t : tasks) {
421+
total_count += t.get();
422+
}
423+
auto elapsed = std::chrono::steady_clock::now() - start;
424+
425+
sink({SingleRowThroughputSample{
426+
static_cast<int>(clients.size()), thread_count, total_count,
427+
std::chrono::duration_cast<std::chrono::microseconds>(elapsed)}});
428+
}
429+
430+
int RunTask(Config const& config, cloud_spanner::Client client,
431+
RandomKeyGenerator const& key_generator,
432+
ErrorSink const& error_sink) {
433+
int count = 0;
434+
std::string value(1024, 'A');
435+
std::vector<google::cloud::Status> errors;
436+
for (auto start = std::chrono::steady_clock::now(),
437+
deadline = start + config.iteration_duration;
438+
start < deadline; start = std::chrono::steady_clock::now()) {
439+
auto key = key_generator();
440+
auto rows = client.Read(
441+
"KeyValue",
442+
cloud_spanner::KeySet().AddKey(cloud_spanner::MakeKey(key)),
443+
{"Key", "Data"});
444+
for (auto& row :
445+
cloud_spanner::StreamOf<std::tuple<std::int64_t, std::string>>(
446+
rows)) {
447+
if (!row) {
448+
errors.push_back(std::move(row).status());
449+
break;
450+
}
451+
++count;
452+
}
453+
}
454+
error_sink(std::move(errors));
455+
return count;
456+
}
457+
458+
private:
459+
std::mutex mu_;
460+
google::cloud::internal::DefaultPRNG generator_;
461+
};
462+
463+
class RunAllExperiment : public Experiment {
464+
public:
465+
void SetUp(Config const&, cloud_spanner::Database const&) override {}
466+
467+
void Run(Config const& cfg, cloud_spanner::Database const& database,
468+
SampleSink const& sink) override {
469+
// Smoke test all the experiments by running a very small version of each.
470+
for (auto& kv : AvailableExperiments()) {
471+
// Do not recurse, skip this experiment.
472+
if (kv.first == "run-all") continue;
473+
Config config = cfg;
474+
config.table_size = 10;
475+
config.samples = 2;
476+
config.iteration_duration = std::chrono::seconds(1);
477+
std::cout << "# Smoke test for experiment: " << kv.first << "\n";
478+
kv.second->SetUp(config, database);
479+
kv.second->Run(config, database, sink);
480+
}
481+
}
482+
};
483+
279484
std::map<std::string, std::shared_ptr<Experiment>> AvailableExperiments() {
280485
return {
486+
{"run-all", std::make_shared<RunAllExperiment>()},
281487
{"insert-or-update", std::make_shared<InsertOrUpdateExperiment>()},
488+
{"read", std::make_shared<ReadExperiment>()},
282489
};
283490
}
284491

285492
google::cloud::StatusOr<Config> ParseArgs(std::vector<std::string> args) {
286493
Config config;
287494

288-
config.experiment = "insert-or-update";
495+
config.experiment = "run-all";
289496

290497
config.project_id =
291498
google::cloud::internal::GetEnv("GOOGLE_CLOUD_PROJECT").value_or("");

0 commit comments

Comments
 (0)