2424#include " google/cloud/internal/random.h"
2525#include < algorithm>
2626#include < future>
27+ #include < numeric>
2728#include < random>
2829#include < sstream>
2930#include < thread>
@@ -1024,6 +1025,255 @@ class UpdateExperiment : public Experiment {
10241025 std::string table_name_;
10251026};
10261027
1028+ /* *
1029+ * Run an experiment to measure the CPU overhead of the client over raw gRPC.
1030+ *
1031+ * These experiments create an empty table. The table schema has an integer key
1032+ * and 10 columns of the types defined by `Traits`. Then the experiment performs
1033+ * M iterations of:
1034+ * - Randomly select if it will insert using the client library or raw gRPC.
1035+ * - Then for N seconds insert random rows
1036+ * - Measure the CPU time required by the previous step
1037+ *
1038+ * The values of K, M, N are configurable.
1039+ */
1040+ template <typename Traits>
1041+ class MutationExperiment : public Experiment {
1042+ public:
1043+ explicit MutationExperiment (
1044+ google::cloud::internal::DefaultPRNG const & generator)
1045+ : impl_(generator),
1046+ table_name_(" MutationExperiment_" + Traits::TableSuffix()) {}
1047+
1048+ Status SetUp (Config const & config, cs::Database const & database) override {
1049+ return impl_.CreateTable (config, database, table_name_);
1050+ }
1051+
1052+ Status TearDown (Config const &, cs::Database const &) override { return {}; }
1053+
1054+ Status Run (Config const & config, cs::Database const & database) override {
1055+ // Create enough clients and stubs for the worst case
1056+ std::vector<cs::Client> clients;
1057+ std::vector<std::shared_ptr<cs::internal::SpannerStub>> stubs;
1058+ std::cout << " # Creating clients and stubs " << std::flush;
1059+ for (int i = 0 ; i != config.maximum_clients ; ++i) {
1060+ auto options = cs::ConnectionOptions ().set_channel_pool_domain (
1061+ " task:" + std::to_string (i));
1062+ clients.emplace_back (cs::Client (cs::MakeConnection (database, options)));
1063+ stubs.emplace_back (
1064+ cs::internal::CreateDefaultSpannerStub (options, /* channel_id=*/ 0 ));
1065+ std::cout << ' .' << std::flush;
1066+ }
1067+ std::cout << " DONE\n " ;
1068+
1069+ random_keys_.resize (config.table_size );
1070+ std::iota (random_keys_.begin (), random_keys_.end (), 0 );
1071+ auto generator = impl_.Generator ();
1072+ std::shuffle (random_keys_.begin (), random_keys_.end (), generator);
1073+
1074+ // Capture some overall getrusage() statistics as comments.
1075+ SimpleTimer overall;
1076+ overall.Start ();
1077+ for (int i = 0 ; i != config.samples ; ++i) {
1078+ auto const use_stubs = impl_.UseStub (config);
1079+ auto const thread_count = impl_.ThreadCount (config);
1080+ auto const client_count = impl_.ClientCount (config, thread_count);
1081+ if (use_stubs) {
1082+ std::vector<std::shared_ptr<cs::internal::SpannerStub>> iteration_stubs (
1083+ stubs.begin (), stubs.begin () + client_count);
1084+ RunIterationViaStubs (config, iteration_stubs, thread_count);
1085+ continue ;
1086+ }
1087+ std::vector<cs::Client> iteration_clients (clients.begin (),
1088+ clients.begin () + client_count);
1089+ RunIterationViaClients (config, iteration_clients, thread_count);
1090+ }
1091+ overall.Stop ();
1092+ std::cout << overall.annotations ();
1093+ return {};
1094+ }
1095+
1096+ private:
1097+ void RunIterationViaStubs (
1098+ Config const & config,
1099+ std::vector<std::shared_ptr<cs::internal::SpannerStub>> const & stubs,
1100+ int thread_count) {
1101+ std::vector<std::future<std::vector<RowCpuSample>>> tasks (thread_count);
1102+ int task_id = 0 ;
1103+ for (auto & t : tasks) {
1104+ auto client = stubs[task_id++ % stubs.size ()];
1105+ t = std::async (std::launch::async, &MutationExperiment::InsertRowsViaStub,
1106+ this , config, thread_count, static_cast <int >(stubs.size ()),
1107+ cs::Database (config.project_id , config.instance_id ,
1108+ config.database_id ),
1109+ client);
1110+ }
1111+ for (auto & t : tasks) {
1112+ impl_.DumpSamples (t.get ());
1113+ }
1114+ }
1115+
1116+ std::vector<RowCpuSample> InsertRowsViaStub (
1117+ Config const & config, int thread_count, int client_count,
1118+ cs::Database const & database,
1119+ std::shared_ptr<cs::internal::SpannerStub> const & stub) {
1120+ std::vector<std::string> const column_names{
1121+ " Key" , " Data0" , " Data1" , " Data2" , " Data3" , " Data4" ,
1122+ " Data5" , " Data6" , " Data7" , " Data8" , " Data9" };
1123+
1124+ auto session = [&]() -> google::cloud::StatusOr<std::string> {
1125+ Status last_status;
1126+ for (int i = 0 ; i != 10 ; ++i) {
1127+ grpc::ClientContext context;
1128+ google::spanner::v1::CreateSessionRequest request{};
1129+ request.set_database (database.FullName ());
1130+ auto response = stub->CreateSession (context, request);
1131+ if (response) return response->name ();
1132+ last_status = response.status ();
1133+ }
1134+ return last_status;
1135+ }();
1136+
1137+ if (!session) {
1138+ std::ostringstream os;
1139+ os << " SESSION ERROR = " << session.status ();
1140+ impl_.LogError (os.str ());
1141+ return {};
1142+ }
1143+
1144+ std::vector<RowCpuSample> samples;
1145+ // We expect about 50 reads per second per thread. Use that to estimate
1146+ // the size of the vector.
1147+ samples.reserve (config.iteration_duration .count () * 50 );
1148+ for (auto start = std::chrono::steady_clock::now (),
1149+ deadline = start + config.iteration_duration ;
1150+ start < deadline; start = std::chrono::steady_clock::now ()) {
1151+ std::int64_t key = 0 ;
1152+ {
1153+ std::lock_guard<std::mutex> lk (mu_);
1154+ if (random_keys_.empty ()) {
1155+ return samples;
1156+ }
1157+ key = random_keys_.back ();
1158+ random_keys_.pop_back ();
1159+ }
1160+
1161+ using T = typename Traits::native_type;
1162+ std::vector<T> const values{
1163+ impl_.GenerateRandomValue (), impl_.GenerateRandomValue (),
1164+ impl_.GenerateRandomValue (), impl_.GenerateRandomValue (),
1165+ impl_.GenerateRandomValue (), impl_.GenerateRandomValue (),
1166+ impl_.GenerateRandomValue (), impl_.GenerateRandomValue (),
1167+ impl_.GenerateRandomValue (), impl_.GenerateRandomValue (),
1168+ };
1169+
1170+ SimpleTimer timer;
1171+ timer.Start ();
1172+
1173+ grpc::ClientContext context;
1174+ google::spanner::v1::CommitRequest commit_request;
1175+ commit_request.set_session (*session);
1176+ commit_request.mutable_single_use_transaction ()
1177+ ->mutable_read_write ()
1178+ ->Clear ();
1179+ auto & mutation =
1180+ *commit_request.add_mutations ()->mutable_insert_or_update ();
1181+ mutation.set_table (table_name_);
1182+ for (auto const & c : column_names) {
1183+ mutation.add_columns (c);
1184+ }
1185+ auto & row = *mutation.add_values ();
1186+ row.add_values ()->set_string_value (std::to_string (key));
1187+ for (auto v : values) {
1188+ *row.add_values () =
1189+ cs::internal::ToProto (cs::Value (std::move (v))).second ;
1190+ }
1191+ auto response = stub->Commit (context, commit_request);
1192+
1193+ timer.Stop ();
1194+ samples.push_back (RowCpuSample{
1195+ thread_count, client_count, true , commit_request.mutations_size (),
1196+ timer.elapsed_time (), timer.cpu_time (), response.status ()});
1197+ }
1198+ return samples;
1199+ }
1200+
1201+ void RunIterationViaClients (Config const & config,
1202+ std::vector<cs::Client> const & clients,
1203+ int thread_count) {
1204+ std::vector<std::future<std::vector<RowCpuSample>>> tasks (thread_count);
1205+ int task_id = 0 ;
1206+ for (auto & t : tasks) {
1207+ auto client = clients[task_id++ % clients.size ()];
1208+ t = std::async (std::launch::async,
1209+ &MutationExperiment::InsertRowsViaClient, this , config,
1210+ thread_count, static_cast <int >(clients.size ()), client);
1211+ }
1212+ for (auto & t : tasks) {
1213+ impl_.DumpSamples (t.get ());
1214+ }
1215+ }
1216+
1217+ std::vector<RowCpuSample> InsertRowsViaClient (Config const & config,
1218+ int thread_count,
1219+ int client_count,
1220+ cs::Client client) {
1221+ std::vector<std::string> const column_names{
1222+ " Key" , " Data0" , " Data1" , " Data2" , " Data3" , " Data4" ,
1223+ " Data5" , " Data6" , " Data7" , " Data8" , " Data9" };
1224+
1225+ std::vector<RowCpuSample> samples;
1226+ // We expect about 50 reads per second per thread, so allocate enough
1227+ // memory to start.
1228+ samples.reserve (config.iteration_duration .count () * 50 );
1229+ for (auto start = std::chrono::steady_clock::now (),
1230+ deadline = start + config.iteration_duration ;
1231+ start < deadline; start = std::chrono::steady_clock::now ()) {
1232+ std::int64_t key = 0 ;
1233+ {
1234+ std::lock_guard<std::mutex> lk (mu_);
1235+ if (random_keys_.empty ()) {
1236+ return samples;
1237+ }
1238+ key = random_keys_.back ();
1239+ random_keys_.pop_back ();
1240+ }
1241+
1242+ using T = typename Traits::native_type;
1243+ std::vector<T> const values{
1244+ impl_.GenerateRandomValue (), impl_.GenerateRandomValue (),
1245+ impl_.GenerateRandomValue (), impl_.GenerateRandomValue (),
1246+ impl_.GenerateRandomValue (), impl_.GenerateRandomValue (),
1247+ impl_.GenerateRandomValue (), impl_.GenerateRandomValue (),
1248+ impl_.GenerateRandomValue (), impl_.GenerateRandomValue (),
1249+ };
1250+
1251+ SimpleTimer timer;
1252+ timer.Start ();
1253+
1254+ int row_count = 0 ;
1255+ auto commit_result =
1256+ client.Commit ([&](cs::Transaction const &)
1257+ -> google::cloud::StatusOr<cs::Mutations> {
1258+ return cs::Mutations{cs::MakeInsertOrUpdateMutation (
1259+ table_name_, column_names, key, values[0 ], values[1 ], values[2 ],
1260+ values[3 ], values[4 ], values[5 ], values[6 ], values[7 ],
1261+ values[8 ], values[9 ])};
1262+ });
1263+ timer.Stop ();
1264+ samples.push_back (RowCpuSample{
1265+ thread_count, client_count, false , row_count, timer.elapsed_time (),
1266+ timer.cpu_time (), std::move (commit_result).status ()});
1267+ }
1268+ return samples;
1269+ }
1270+
1271+ ExperimentImpl<Traits> impl_;
1272+ std::string table_name_;
1273+ std::mutex mu_;
1274+ std::vector<int64_t > random_keys_;
1275+ };
1276+
10271277class RunAllExperiment : public Experiment {
10281278 public:
10291279 explicit RunAllExperiment (google::cloud::internal::DefaultPRNG generator)
@@ -1110,6 +1360,14 @@ ExperimentFactory MakeUpdateFactory() {
11101360 };
11111361}
11121362
1363+ template <typename Trait>
1364+ ExperimentFactory MakeMutationFactory () {
1365+ using G = google::cloud::internal::DefaultPRNG;
1366+ return [](G g) {
1367+ return google::cloud::internal::make_unique<MutationExperiment<Trait>>(g);
1368+ };
1369+ }
1370+
11131371std::map<std::string, ExperimentFactory> AvailableExperiments () {
11141372 auto make_run_all = [](google::cloud::internal::DefaultPRNG g) {
11151373 return google::cloud::internal::make_unique<RunAllExperiment>(g);
@@ -1131,6 +1389,13 @@ std::map<std::string, ExperimentFactory> AvailableExperiments() {
11311389 {" update-int64" , MakeUpdateFactory<Int64Traits>()},
11321390 {" update-string" , MakeUpdateFactory<StringTraits>()},
11331391 {" update-timestamp" , MakeUpdateFactory<TimestampTraits>()},
1392+ {" mutation-bool" , MakeMutationFactory<BoolTraits>()},
1393+ {" mutation-bytes" , MakeMutationFactory<BytesTraits>()},
1394+ {" mutation-date" , MakeMutationFactory<DateTraits>()},
1395+ {" mutation-float64" , MakeMutationFactory<Float64Traits>()},
1396+ {" mutation-int64" , MakeMutationFactory<Int64Traits>()},
1397+ {" mutation-string" , MakeMutationFactory<StringTraits>()},
1398+ {" mutation-timestamp" , MakeMutationFactory<TimestampTraits>()},
11341399 };
11351400}
11361401
0 commit comments