@@ -31,6 +31,8 @@ namespace {
3131namespace cloud_spanner = google::cloud::spanner;
3232
3333struct Config {
34+ std::string experiment;
35+
3436 std::string project_id;
3537 std::string instance_id;
3638
@@ -44,15 +46,27 @@ struct Config {
4446 std::int64_t table_size = 10 * 1000 * 1000 ;
4547};
4648
47- struct SingleRowInsertSample {
49+ struct SingleRowThroughputSample {
4850 int client_count;
4951 int thread_count;
5052 int insert_count;
5153 std::chrono::microseconds elapsed;
5254};
5355
54- void RunExperiment (Config const & config,
55- cloud_spanner::Database const & database);
56+ using SampleSink = std::function<void (std::vector<SingleRowThroughputSample>)>;
57+
58+ class Experiment {
59+ public:
60+ virtual ~Experiment () = default ;
61+
62+ virtual void SetUp (Config const & config,
63+ cloud_spanner::Database const & database) = 0;
64+ virtual void Run (Config const & config,
65+ cloud_spanner::Database const & database,
66+ SampleSink const & sink) = 0;
67+ };
68+
69+ std::map<std::string, std::shared_ptr<Experiment>> AvailableExperiments ();
5670
5771google::cloud::StatusOr<Config> ParseArgs (std::vector<std::string> args);
5872
@@ -105,6 +119,13 @@ int main(int argc, char* argv[]) {
105119 << " \n "
106120 << std::flush;
107121
122+ auto available = AvailableExperiments ();
123+ auto e = available.find (config.experiment );
124+ if (e == available.end ()) {
125+ std::cerr << " Experiment " << config.experiment << " not found\n " ;
126+ return 1 ;
127+ }
128+
108129 cloud_spanner::DatabaseAdminClient admin_client;
109130 auto created =
110131 admin_client.CreateDatabase (database, {R"sql( CREATE TABLE KeyValue (
@@ -123,10 +144,25 @@ int main(int argc, char* argv[]) {
123144 std::cerr << " Error creating database: " << db.status () << " \n " ;
124145 return 1 ;
125146 }
147+
126148 std::cout << " ClientCount,ThreadCount,InsertCount,ElapsedTime\n "
127149 << std::flush;
128150
129- RunExperiment (config, database);
151+ std::mutex cout_mu;
152+ auto cout_sink =
153+ [&cout_mu](
154+ std::vector<SingleRowThroughputSample> const & samples) mutable {
155+ std::unique_lock<std::mutex> lk (cout_mu);
156+ for (auto const & s : samples) {
157+ std::cout << std::boolalpha << s.client_count << ' ,' << s.thread_count
158+ << ' ,' << s.insert_count << ' ,' << s.elapsed .count () << ' \n '
159+ << std::flush;
160+ }
161+ };
162+
163+ auto experiment = e->second ;
164+ experiment->SetUp (config, database);
165+ experiment->Run (config, database, cout_sink);
130166
131167 auto drop = admin_client.DropDatabase (database);
132168 if (!drop.ok ()) {
@@ -139,116 +175,118 @@ int main(int argc, char* argv[]) {
139175namespace {
140176
141177using RandomKeyGenerator = std::function<std::int64_t ()>;
142- using SampleSink = std::function<void (std::vector<SingleRowInsertSample>)>;
143178using ErrorSink = std::function<void (std::vector<google::cloud::Status>)>;
144179
145- int RunTask (Config const & config, cloud_spanner::Client client,
146- RandomKeyGenerator const & key_generator,
147- ErrorSink const & error_sink) {
148- int count = 0 ;
149- std::string value (1024 , ' A' );
150- std::vector<google::cloud::Status> errors;
151- for (auto start = std::chrono::steady_clock::now (),
152- deadline = start + config.iteration_duration ;
153- start < deadline; start = std::chrono::steady_clock::now ()) {
154- auto key = key_generator ();
155- auto m = cloud_spanner::MakeInsertOrUpdateMutation (
156- " KeyValue" , {" Key" , " Data" }, key, value);
157- auto result = client.Commit ([&m](cloud_spanner::Transaction const &) {
158- return cloud_spanner::Mutations{m};
159- });
160- if (!result) {
161- errors.push_back (std::move (result).status ());
180+ class InsertOrUpdateExperiment : public Experiment {
181+ public:
182+ void SetUp (Config const &, cloud_spanner::Database const &) override {}
183+
184+ void Run (Config const & config, cloud_spanner::Database const & database,
185+ SampleSink const & sink) override {
186+ // Create enough clients for the worst case
187+ std::vector<cloud_spanner::Client> clients;
188+ std::cout << " # Creating clients " << std::flush;
189+ for (int i = 0 ; i != config.maximum_clients ; ++i) {
190+ clients.emplace_back (cloud_spanner::Client (cloud_spanner::MakeConnection (
191+ database, cloud_spanner::ConnectionOptions ().set_channel_pool_domain (
192+ " task:" + std::to_string (i)))));
193+ std::cout << ' .' << std::flush;
194+ }
195+ std::cout << " DONE\n " ;
196+
197+ auto generator = google::cloud::internal::MakeDefaultPRNG ();
198+ std::uniform_int_distribution<int > thread_count_gen (config.minimum_threads ,
199+ config.maximum_threads );
200+
201+ for (int i = 0 ; i != config.samples ; ++i) {
202+ auto const thread_count = thread_count_gen (generator);
203+ // TODO(#1000) - avoid deadlocks with more than 100 threads per client
204+ auto min_clients =
205+ (std::max)(thread_count / 100 + 1 , config.minimum_clients );
206+ auto const client_count = std::uniform_int_distribution<std::size_t >(
207+ min_clients, clients.size () - 1 )(generator);
208+ std::vector<cloud_spanner::Client> iteration_clients (
209+ clients.begin (), clients.begin () + client_count);
210+ RunIteration (config, iteration_clients, thread_count, sink, generator);
162211 }
163- ++count;
164212 }
165- error_sink (std::move (errors));
166- return count;
167- }
168-
169- void RunIteration (Config const & config,
170- std::vector<cloud_spanner::Client> const & clients,
171- int thread_count, SampleSink const & sink,
172- google::cloud::internal::DefaultPRNG generator) {
173- std::mutex mu;
174- std::uniform_int_distribution<std::int64_t > random_key (0 , config.table_size );
175- RandomKeyGenerator locked_random_key = [&mu, &generator, &random_key] {
176- std::lock_guard<std::mutex> lk (mu);
177- return random_key (generator);
178- };
179213
180- std::mutex cerr_mu;
181- ErrorSink error_sink =
182- [&cerr_mu](std::vector<google::cloud::Status> const & errors) {
183- std::lock_guard<std::mutex> lk (cerr_mu);
184- for (auto const & e : errors) {
185- std::cerr << " # " << e << " \n " ;
186- }
187- };
214+ void RunIteration (Config const & config,
215+ std::vector<cloud_spanner::Client> const & clients,
216+ int thread_count, SampleSink const & sink,
217+ google::cloud::internal::DefaultPRNG generator) {
218+ std::mutex mu;
219+ std::uniform_int_distribution<std::int64_t > random_key (0 ,
220+ config.table_size );
221+ RandomKeyGenerator locked_random_key = [&mu, &generator, &random_key] {
222+ std::lock_guard<std::mutex> lk (mu);
223+ return random_key (generator);
224+ };
225+
226+ std::mutex cerr_mu;
227+ ErrorSink error_sink =
228+ [&cerr_mu](std::vector<google::cloud::Status> const & errors) {
229+ std::lock_guard<std::mutex> lk (cerr_mu);
230+ for (auto const & e : errors) {
231+ std::cerr << " # " << e << " \n " ;
232+ }
233+ };
234+
235+ std::vector<std::future<int >> tasks (thread_count);
236+ auto start = std::chrono::steady_clock::now ();
237+ int task_id = 0 ;
238+ for (auto & t : tasks) {
239+ auto client = clients[task_id++ % clients.size ()];
240+ t = std::async (std::launch::async, &InsertOrUpdateExperiment::RunTask,
241+ this , config, client, locked_random_key, error_sink);
242+ }
243+ int insert_count = 0 ;
244+ for (auto & t : tasks) {
245+ insert_count += t.get ();
246+ }
247+ auto elapsed = std::chrono::steady_clock::now () - start;
188248
189- std::vector<std::future<int >> tasks (thread_count);
190- auto start = std::chrono::steady_clock::now ();
191- int task_id = 0 ;
192- for (auto & t : tasks) {
193- auto client = clients[task_id++ % clients.size ()];
194- t = std::async (std::launch::async, RunTask, config, client,
195- locked_random_key, error_sink);
196- }
197- int insert_count = 0 ;
198- for (auto & t : tasks) {
199- insert_count += t.get ();
249+ sink ({SingleRowThroughputSample{
250+ static_cast <int >(clients.size ()), thread_count, insert_count,
251+ std::chrono::duration_cast<std::chrono::microseconds>(elapsed)}});
200252 }
201- auto elapsed = std::chrono::steady_clock::now () - start;
202-
203- sink ({SingleRowInsertSample{
204- static_cast <int >(clients.size ()), thread_count, insert_count,
205- std::chrono::duration_cast<std::chrono::microseconds>(elapsed)}});
206- }
207253
208- void RunExperiment (Config const & config,
209- cloud_spanner::Database const & database) {
210- // Create enough clients for the worst case
211- std::vector<cloud_spanner::Client> clients;
212- std::cout << " # Creating clients " << std::flush;
213- for (int i = 0 ; i != config.maximum_clients ; ++i) {
214- clients.emplace_back (cloud_spanner::Client (cloud_spanner::MakeConnection (
215- database, cloud_spanner::ConnectionOptions ().set_channel_pool_domain (
216- " task:" + std::to_string (i)))));
217- std::cout << ' .' << std::flush;
254+ int RunTask (Config const & config, cloud_spanner::Client client,
255+ RandomKeyGenerator const & key_generator,
256+ ErrorSink const & error_sink) {
257+ int count = 0 ;
258+ std::string value (1024 , ' A' );
259+ std::vector<google::cloud::Status> errors;
260+ for (auto start = std::chrono::steady_clock::now (),
261+ deadline = start + config.iteration_duration ;
262+ start < deadline; start = std::chrono::steady_clock::now ()) {
263+ auto key = key_generator ();
264+ auto m = cloud_spanner::MakeInsertOrUpdateMutation (
265+ " KeyValue" , {" Key" , " Data" }, key, value);
266+ auto result = client.Commit ([&m](cloud_spanner::Transaction const &) {
267+ return cloud_spanner::Mutations{m};
268+ });
269+ if (!result) {
270+ errors.push_back (std::move (result).status ());
271+ }
272+ ++count;
273+ }
274+ error_sink (std::move (errors));
275+ return count;
218276 }
219- std::cout << " DONE\n " ;
220-
221- auto generator = google::cloud::internal::MakeDefaultPRNG ();
222- std::uniform_int_distribution<int > thread_count_gen (config.minimum_threads ,
223- config.maximum_threads );
224-
225- std::mutex cout_mu;
226- auto cout_sink =
227- [&cout_mu](std::vector<SingleRowInsertSample> const & samples) mutable {
228- std::unique_lock<std::mutex> lk (cout_mu);
229- for (auto const & s : samples) {
230- std::cout << std::boolalpha << s.client_count << ' ,' << s.thread_count
231- << ' ,' << s.insert_count << ' ,' << s.elapsed .count () << ' \n '
232- << std::flush;
233- }
234- };
277+ };
235278
236- for (int i = 0 ; i != config.samples ; ++i) {
237- auto const thread_count = thread_count_gen (generator);
238- // TODO(#1000) - avoid deadlocks with more than 100 threads per client
239- auto min_clients =
240- (std::max)(thread_count / 100 + 1 , config.minimum_clients );
241- auto const client_count = std::uniform_int_distribution<std::size_t >(
242- min_clients, clients.size () - 1 )(generator);
243- std::vector<cloud_spanner::Client> iteration_clients (
244- clients.begin (), clients.begin () + client_count);
245- RunIteration (config, iteration_clients, thread_count, cout_sink, generator);
246- }
279+ std::map<std::string, std::shared_ptr<Experiment>> AvailableExperiments () {
280+ return {
281+ {" insert-or-update" , std::make_shared<InsertOrUpdateExperiment>()},
282+ };
247283}
248284
249285google::cloud::StatusOr<Config> ParseArgs (std::vector<std::string> args) {
250286 Config config;
251287
288+ config.experiment = " insert-or-update" ;
289+
252290 config.project_id =
253291 google::cloud::internal::GetEnv (" GOOGLE_CLOUD_PROJECT" ).value_or (" " );
254292 config.instance_id =
@@ -262,6 +300,8 @@ google::cloud::StatusOr<Config> ParseArgs(std::vector<std::string> args) {
262300
263301 // NOLINTNEXTLINE(modernize-avoid-c-arrays)
264302 Flag flags[] = {
303+ {" --experiment=" ,
304+ [](Config& c, std::string v) { c.experiment = std::move (v); }},
265305 {" --project=" ,
266306 [](Config& c, std::string v) { c.project_id = std::move (v); }},
267307 {" --instance=" ,
@@ -312,6 +352,10 @@ google::cloud::StatusOr<Config> ParseArgs(std::vector<std::string> args) {
312352 }
313353 }
314354
355+ if (config.experiment .empty ()) {
356+ return invalid_argument (" Missing value for --experiment flag" );
357+ }
358+
315359 if (config.project_id .empty ()) {
316360 return invalid_argument (
317361 " The project id is not set, provide a value in the --project flag,"
0 commit comments