Skip to content
This repository was archived by the owner on Dec 8, 2021. It is now read-only.

Commit 2cbd53f

Browse files
authored
feat: a benchmark for single row throughput (#1007)
Implement the first benchmark to compare throughput vs. number of threads. In this iteration just for `InsertOrMutate` mutations.
1 parent 0433f46 commit 2cbd53f

File tree

3 files changed

+353
-0
lines changed

3 files changed

+353
-0
lines changed

google/cloud/spanner/integration_tests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ function (spanner_client_define_integration_tests)
4545
client_stress_test.cc
4646
instance_admin_integration_test.cc
4747
rpc_failure_threshold_integration_test.cc
48+
single_row_throughput_benchmark.cc
4849
throughput_benchmark.cc)
4950
set(spanner_client_install_tests spanner_install_test.cc)
5051

Lines changed: 351 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,351 @@
1+
// Copyright 2019 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "google/cloud/spanner/client.h"
16+
#include "google/cloud/spanner/database_admin_client.h"
17+
#include "google/cloud/spanner/internal/build_info.h"
18+
#include "google/cloud/spanner/internal/compiler_info.h"
19+
#include "google/cloud/spanner/testing/pick_random_instance.h"
20+
#include "google/cloud/spanner/testing/random_database_name.h"
21+
#include "google/cloud/internal/getenv.h"
22+
#include "google/cloud/internal/random.h"
23+
#include <algorithm>
24+
#include <future>
25+
#include <random>
26+
#include <sstream>
27+
#include <thread>
28+
29+
namespace {
30+
31+
namespace cloud_spanner = google::cloud::spanner;
32+
33+
struct Config {
34+
std::string project_id;
35+
std::string instance_id;
36+
37+
int samples = 2;
38+
std::chrono::seconds iteration_duration = std::chrono::seconds(5);
39+
40+
int minimum_threads = 1;
41+
int maximum_threads = 4;
42+
int minimum_clients = 1;
43+
int maximum_clients = 4;
44+
std::int64_t table_size = 10 * 1000 * 1000;
45+
};
46+
47+
struct SingleRowInsertSample {
48+
int client_count;
49+
int thread_count;
50+
int insert_count;
51+
std::chrono::microseconds elapsed;
52+
};
53+
54+
void RunExperiment(Config const& config,
55+
cloud_spanner::Database const& database);
56+
57+
google::cloud::StatusOr<Config> ParseArgs(std::vector<std::string> args);
58+
59+
} // namespace
60+
61+
int main(int argc, char* argv[]) {
62+
Config config;
63+
{
64+
std::vector<std::string> args{argv, argv + argc};
65+
auto c = ParseArgs(args);
66+
if (!c) {
67+
std::cerr << "Error parsing command-line arguments: " << c.status()
68+
<< "\n";
69+
return 1;
70+
}
71+
config = *std::move(c);
72+
}
73+
74+
auto generator = google::cloud::internal::MakeDefaultPRNG();
75+
if (config.instance_id.empty()) {
76+
auto instance = google::cloud::spanner_testing::PickRandomInstance(
77+
generator, config.project_id);
78+
if (!instance) {
79+
std::cerr << "Error selecting an instance to run the experiment: "
80+
<< instance.status() << "\n";
81+
return 1;
82+
}
83+
config.instance_id = *std::move(instance);
84+
}
85+
86+
cloud_spanner::Database database(
87+
config.project_id, config.instance_id,
88+
google::cloud::spanner_testing::RandomDatabaseName(generator));
89+
90+
std::cout << std::boolalpha << "# Experiment: Single Row Throughput"
91+
<< "\n# Project: " << config.project_id
92+
<< "\n# Instance: " << config.instance_id
93+
<< "\n# Database: " << database.database_id()
94+
<< "\n# Samples: " << config.samples
95+
<< "\n# Minimum Threads: " << config.minimum_threads
96+
<< "\n# Maximum Threads: " << config.maximum_threads
97+
<< "\n# Minimum Clients: " << config.minimum_clients
98+
<< "\n# Maximum Clients: " << config.maximum_clients
99+
<< "\n# Iteration Duration: " << config.iteration_duration.count()
100+
<< "s"
101+
<< "\n# Table Size: " << config.table_size
102+
<< "\n# Compiler: " << cloud_spanner::internal::CompilerId() << "-"
103+
<< cloud_spanner::internal::CompilerVersion()
104+
<< "\n# Build Flags: " << cloud_spanner::internal::BuildFlags()
105+
<< "\n"
106+
<< std::flush;
107+
108+
cloud_spanner::DatabaseAdminClient admin_client;
109+
auto created =
110+
admin_client.CreateDatabase(database, {R"sql(CREATE TABLE KeyValue (
111+
Key INT64 NOT NULL,
112+
Data STRING(1024),
113+
) PRIMARY KEY (Key))sql"});
114+
std::cout << "# Waiting for database creation to complete " << std::flush;
115+
for (;;) {
116+
auto status = created.wait_for(std::chrono::seconds(1));
117+
if (status == std::future_status::ready) break;
118+
std::cout << '.' << std::flush;
119+
}
120+
std::cout << " DONE\n";
121+
auto db = created.get();
122+
if (!db) {
123+
std::cerr << "Error creating database: " << db.status() << "\n";
124+
return 1;
125+
}
126+
std::cout << "ClientCount,ThreadCount,InsertCount,ElapsedTime\n"
127+
<< std::flush;
128+
129+
RunExperiment(config, database);
130+
131+
auto drop = admin_client.DropDatabase(database);
132+
if (!drop.ok()) {
133+
std::cerr << "Error dropping database: " << drop << "\n";
134+
}
135+
std::cout << "# Experiment finished, database dropped\n";
136+
return 0;
137+
}
138+
139+
namespace {
140+
141+
using RandomKeyGenerator = std::function<std::int64_t()>;
142+
using SampleSink = std::function<void(std::vector<SingleRowInsertSample>)>;
143+
using ErrorSink = std::function<void(std::vector<google::cloud::Status>)>;
144+
145+
int RunTask(Config const& config, cloud_spanner::Client client,
146+
RandomKeyGenerator const& key_generator,
147+
ErrorSink const& error_sink) {
148+
int count = 0;
149+
std::string value(1024, 'A');
150+
std::vector<google::cloud::Status> errors;
151+
for (auto start = std::chrono::steady_clock::now(),
152+
deadline = start + config.iteration_duration;
153+
start < deadline; start = std::chrono::steady_clock::now()) {
154+
auto key = key_generator();
155+
auto m = cloud_spanner::MakeInsertOrUpdateMutation(
156+
"KeyValue", {"Key", "Data"}, key, value);
157+
auto result = client.Commit([&m](cloud_spanner::Transaction const&) {
158+
return cloud_spanner::Mutations{m};
159+
});
160+
if (!result) {
161+
errors.push_back(std::move(result).status());
162+
}
163+
++count;
164+
}
165+
error_sink(std::move(errors));
166+
return count;
167+
}
168+
169+
void RunIteration(Config const& config,
170+
std::vector<cloud_spanner::Client> const& clients,
171+
int thread_count, SampleSink const& sink,
172+
google::cloud::internal::DefaultPRNG generator) {
173+
std::mutex mu;
174+
std::uniform_int_distribution<std::int64_t> random_key(0, config.table_size);
175+
RandomKeyGenerator locked_random_key = [&mu, &generator, &random_key] {
176+
std::lock_guard<std::mutex> lk(mu);
177+
return random_key(generator);
178+
};
179+
180+
std::mutex cerr_mu;
181+
ErrorSink error_sink =
182+
[&cerr_mu](std::vector<google::cloud::Status> const& errors) {
183+
std::lock_guard<std::mutex> lk(cerr_mu);
184+
for (auto const& e : errors) {
185+
std::cerr << "# " << e << "\n";
186+
}
187+
};
188+
189+
std::vector<std::future<int>> tasks(thread_count);
190+
auto start = std::chrono::steady_clock::now();
191+
int task_id = 0;
192+
for (auto& t : tasks) {
193+
auto client = clients[task_id++ % clients.size()];
194+
t = std::async(std::launch::async, RunTask, config, client,
195+
locked_random_key, error_sink);
196+
}
197+
int insert_count = 0;
198+
for (auto& t : tasks) {
199+
insert_count += t.get();
200+
}
201+
auto elapsed = std::chrono::steady_clock::now() - start;
202+
203+
sink({SingleRowInsertSample{
204+
static_cast<int>(clients.size()), thread_count, insert_count,
205+
std::chrono::duration_cast<std::chrono::microseconds>(elapsed)}});
206+
}
207+
208+
void RunExperiment(Config const& config,
209+
cloud_spanner::Database const& database) {
210+
// Create enough clients for the worst case
211+
std::vector<cloud_spanner::Client> clients;
212+
std::cout << "# Creating clients " << std::flush;
213+
for (int i = 0; i != config.maximum_clients; ++i) {
214+
clients.emplace_back(cloud_spanner::Client(cloud_spanner::MakeConnection(
215+
database, cloud_spanner::ConnectionOptions().set_channel_pool_domain(
216+
"task:" + std::to_string(i)))));
217+
std::cout << '.' << std::flush;
218+
}
219+
std::cout << " DONE\n";
220+
221+
auto generator = google::cloud::internal::MakeDefaultPRNG();
222+
std::uniform_int_distribution<int> thread_count_gen(config.minimum_threads,
223+
config.maximum_threads);
224+
225+
std::mutex cout_mu;
226+
auto cout_sink =
227+
[&cout_mu](std::vector<SingleRowInsertSample> const& samples) mutable {
228+
std::unique_lock<std::mutex> lk(cout_mu);
229+
for (auto const& s : samples) {
230+
std::cout << std::boolalpha << s.client_count << ',' << s.thread_count
231+
<< ',' << s.insert_count << ',' << s.elapsed.count() << '\n'
232+
<< std::flush;
233+
}
234+
};
235+
236+
for (int i = 0; i != config.samples; ++i) {
237+
auto const thread_count = thread_count_gen(generator);
238+
// TODO(#1000) - avoid deadlocks with more than 100 threads per client
239+
auto min_clients =
240+
(std::max)(thread_count / 100 + 1, config.minimum_clients);
241+
auto const client_count = std::uniform_int_distribution<std::size_t>(
242+
min_clients, clients.size() - 1)(generator);
243+
std::vector<cloud_spanner::Client> iteration_clients(
244+
clients.begin(), clients.begin() + client_count);
245+
RunIteration(config, iteration_clients, thread_count, cout_sink, generator);
246+
}
247+
}
248+
249+
google::cloud::StatusOr<Config> ParseArgs(std::vector<std::string> args) {
250+
Config config;
251+
252+
config.project_id =
253+
google::cloud::internal::GetEnv("GOOGLE_CLOUD_PROJECT").value_or("");
254+
config.instance_id =
255+
google::cloud::internal::GetEnv("GOOGLE_CLOUD_CPP_SPANNER_INSTANCE")
256+
.value_or("");
257+
258+
struct Flag {
259+
std::string flag_name;
260+
std::function<void(Config&, std::string)> parser;
261+
};
262+
263+
// NOLINTNEXTLINE(modernize-avoid-c-arrays)
264+
Flag flags[] = {
265+
{"--project=",
266+
[](Config& c, std::string v) { c.project_id = std::move(v); }},
267+
{"--instance=",
268+
[](Config& c, std::string v) { c.instance_id = std::move(v); }},
269+
{"--samples=",
270+
[](Config& c, std::string const& v) { c.samples = std::stoi(v); }},
271+
{"--iteration-duration=",
272+
[](Config& c, std::string const& v) {
273+
c.iteration_duration = std::chrono::seconds(std::stoi(v));
274+
}},
275+
{"--minimum-threads=",
276+
[](Config& c, std::string const& v) {
277+
c.minimum_threads = std::stoi(v);
278+
}},
279+
{"--maximum-threads=",
280+
[](Config& c, std::string const& v) {
281+
c.maximum_threads = std::stoi(v);
282+
}},
283+
{"--minimum-clients=",
284+
[](Config& c, std::string const& v) {
285+
c.minimum_clients = std::stoi(v);
286+
}},
287+
{"--maximum-clients=",
288+
[](Config& c, std::string const& v) {
289+
c.maximum_clients = std::stoi(v);
290+
}},
291+
{"--table-size=",
292+
[](Config& c, std::string const& v) { c.table_size = std::stol(v); }},
293+
};
294+
295+
auto invalid_argument = [](std::string msg) {
296+
return google::cloud::Status(google::cloud::StatusCode::kInvalidArgument,
297+
std::move(msg));
298+
};
299+
300+
for (auto i = std::next(args.begin()); i != args.end(); ++i) {
301+
std::string const& arg = *i;
302+
bool found = false;
303+
for (auto const& flag : flags) {
304+
if (arg.rfind(flag.flag_name, 0) != 0) continue;
305+
found = true;
306+
flag.parser(config, arg.substr(flag.flag_name.size()));
307+
308+
break;
309+
}
310+
if (!found && arg.rfind("--", 0) == 0) {
311+
return invalid_argument("Unexpected command-line flag " + arg);
312+
}
313+
}
314+
315+
if (config.project_id.empty()) {
316+
return invalid_argument(
317+
"The project id is not set, provide a value in the --project flag,"
318+
" or set the GOOGLE_CLOUD_PROJECT environment variable");
319+
}
320+
321+
if (config.minimum_threads <= 0) {
322+
std::ostringstream os;
323+
os << "The minimum number of threads (" << config.minimum_threads << ")"
324+
<< " must be greater than zero";
325+
return invalid_argument(os.str());
326+
}
327+
if (config.maximum_threads < config.minimum_threads) {
328+
std::ostringstream os;
329+
os << "The maximum number of threads (" << config.maximum_threads << ")"
330+
<< " must be greater or equal than the minimum number of threads ("
331+
<< config.minimum_threads << ")";
332+
return invalid_argument(os.str());
333+
}
334+
335+
if (config.minimum_clients <= 0) {
336+
std::ostringstream os;
337+
os << "The minimum number of clients (" << config.minimum_clients << ")"
338+
<< " must be greater than zero";
339+
return invalid_argument(os.str());
340+
}
341+
if (config.maximum_clients < config.minimum_clients) {
342+
std::ostringstream os;
343+
os << "The maximum number of clients (" << config.maximum_clients << ")"
344+
<< " must be greater or equal than the minimum number of clients ("
345+
<< config.minimum_clients << ")";
346+
return invalid_argument(os.str());
347+
}
348+
return config;
349+
}
350+
351+
} // namespace

google/cloud/spanner/integration_tests/spanner_client_integration_tests.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,6 @@ spanner_client_integration_tests = [
2222
"client_stress_test.cc",
2323
"instance_admin_integration_test.cc",
2424
"rpc_failure_threshold_integration_test.cc",
25+
"single_row_throughput_benchmark.cc",
2526
"throughput_benchmark.cc",
2627
]

0 commit comments

Comments
 (0)