@@ -342,11 +342,13 @@ class ExperimentImpl {
342342 explicit ExperimentImpl (google::cloud::internal::DefaultPRNG const & generator)
343343 : generator_(generator) {}
344344
345+ static int constexpr kColumnCount = 10 ;
346+
345347 Status CreateTable (Config const &, cs::Database const & database,
346348 std::string const & table_name) {
347349 std::string statement = " CREATE TABLE " + table_name;
348350 statement += " (Key INT64 NOT NULL,\n " ;
349- for (int i = 0 ; i != 10 ; ++i) {
351+ for (int i = 0 ; i != kColumnCount ; ++i) {
350352 statement +=
351353 " Data" + std::to_string (i) + " " + Traits::SpannerDataType () + " ,\n " ;
352354 }
@@ -405,10 +407,14 @@ class ExperimentImpl {
405407 0 , config.table_size - 1 )(generator_);
406408 }
407409
408- cs::KeySet RandomKeySet (Config const & config) {
410+ std:: int64_t RandomKeySetBegin (Config const & config) {
409411 std::lock_guard<std::mutex> lk (mu_);
410- auto begin = std::uniform_int_distribution<std::int64_t >(
412+ return std::uniform_int_distribution<std::int64_t >(
411413 0 , config.table_size - config.query_size )(generator_);
414+ }
415+
416+ cs::KeySet RandomKeySet (Config const & config) {
417+ auto begin = RandomKeySetBegin (config);
412418 auto end = begin + config.query_size - 1 ;
413419 return cs::KeySet ().AddRange (cs::MakeKeyBoundClosed (cs::Value (begin)),
414420 cs::MakeKeyBoundClosed (cs::Value (end)));
@@ -758,6 +764,245 @@ class ReadExperiment : public Experiment {
758764 std::string table_name_;
759765};
760766
767+ /* *
768+ * Run an experiment to measure the CPU overhead of the client over raw gRPC.
769+ *
770+ * This experiments creates and populates a table with `config.table_size` rows,
771+ * each row containing an integer key and 10 columns of the types defined by
772+ * `Traits`. Then the experiment performs `config.samples` iterations of:
773+ * - Randomly pick if it will do the work using the client library or raw
774+ * gRPC
775+ * - Then for `config.iteration_duration` seconds SELECT random ranges of
776+ * `config.query_size` rows
777+ * - Measure the CPU time required by the previous step
778+ */
779+ template <typename Traits>
780+ class SelectExperiment : public Experiment {
781+ public:
782+ explicit SelectExperiment (google::cloud::internal::DefaultPRNG generator)
783+ : impl_(generator),
784+ table_name_(" SelectExperiment_" + Traits::TableSuffix()) {}
785+
786+ Status SetUp (Config const & config, cs::Database const & database) override {
787+ return impl_.FillTable (config, database, table_name_);
788+ }
789+
790+ Status TearDown (Config const &, cs::Database const &) override { return {}; }
791+
792+ Status Run (Config const & config, cs::Database const & database) override {
793+ // Create enough clients and stubs for the worst case
794+ std::vector<cs::Client> clients;
795+ std::vector<std::shared_ptr<cs::internal::SpannerStub>> stubs;
796+ std::cout << " # Creating clients and stubs " << std::flush;
797+ for (int i = 0 ; i != config.maximum_clients ; ++i) {
798+ auto options = cs::ConnectionOptions ().set_channel_pool_domain (
799+ " task:" + std::to_string (i));
800+ clients.emplace_back (cs::Client (cs::MakeConnection (database, options)));
801+ stubs.emplace_back (
802+ cs::internal::CreateDefaultSpannerStub (options, /* channel_id=*/ 0 ));
803+ std::cout << ' .' << std::flush;
804+ }
805+ std::cout << " DONE\n " ;
806+
807+ // Capture some overall getrusage() statistics as comments.
808+ SimpleTimer overall;
809+ overall.Start ();
810+ for (int i = 0 ; i != config.samples ; ++i) {
811+ auto const use_stubs = impl_.UseStub (config);
812+ auto const thread_count = impl_.ThreadCount (config);
813+ auto const client_count = impl_.ClientCount (config, thread_count);
814+ if (use_stubs) {
815+ std::vector<std::shared_ptr<cs::internal::SpannerStub>> iteration_stubs (
816+ stubs.begin (), stubs.begin () + client_count);
817+ RunIterationViaStubs (config, iteration_stubs, thread_count);
818+ continue ;
819+ }
820+ std::vector<cs::Client> iteration_clients (clients.begin (),
821+ clients.begin () + client_count);
822+ RunIterationViaClients (config, iteration_clients, thread_count);
823+ }
824+ overall.Stop ();
825+ std::cout << overall.annotations ();
826+ return {};
827+ }
828+
829+ private:
830+ void RunIterationViaStubs (
831+ Config const & config,
832+ std::vector<std::shared_ptr<cs::internal::SpannerStub>> const & stubs,
833+ int thread_count) {
834+ std::vector<std::future<std::vector<RowCpuSample>>> tasks (thread_count);
835+ int task_id = 0 ;
836+ for (auto & t : tasks) {
837+ auto client = stubs[task_id++ % stubs.size ()];
838+ t = std::async (std::launch::async, &SelectExperiment::ViaStub, this ,
839+ config, thread_count, static_cast <int >(stubs.size ()),
840+ cs::Database (config.project_id , config.instance_id ,
841+ config.database_id ),
842+ client);
843+ }
844+ for (auto & t : tasks) {
845+ impl_.DumpSamples (t.get ());
846+ }
847+ }
848+
849+ std::vector<RowCpuSample> ViaStub (
850+ Config const & config, int thread_count, int client_count,
851+ cs::Database const & database,
852+ std::shared_ptr<cs::internal::SpannerStub> const & stub) {
853+ auto session = [&]() -> google::cloud::StatusOr<std::string> {
854+ Status last_status;
855+ for (int i = 0 ; i != ExperimentImpl<Traits>::kColumnCount ; ++i) {
856+ grpc::ClientContext context;
857+ google::spanner::v1::CreateSessionRequest request{};
858+ request.set_database (database.FullName ());
859+ auto response = stub->CreateSession (context, request);
860+ if (response) return response->name ();
861+ last_status = response.status ();
862+ }
863+ return last_status;
864+ }();
865+
866+ if (!session) {
867+ std::ostringstream os;
868+ os << " SESSION ERROR = " << session.status ();
869+ impl_.LogError (os.str ());
870+ return {};
871+ }
872+
873+ std::vector<RowCpuSample> samples;
874+ // We expect about 50 reads per second per thread. Use that to estimate
875+ // the size of the vector.
876+ samples.reserve (config.iteration_duration .count () * 50 );
877+ auto const statement = CreateStatement ();
878+ for (auto start = std::chrono::steady_clock::now (),
879+ deadline = start + config.iteration_duration ;
880+ start < deadline; start = std::chrono::steady_clock::now ()) {
881+ auto key = impl_.RandomKeySetBegin (config);
882+
883+ SimpleTimer timer;
884+ timer.Start ();
885+
886+ google::spanner::v1::ExecuteSqlRequest request{};
887+ request.set_session (*session);
888+ request.mutable_transaction ()
889+ ->mutable_single_use ()
890+ ->mutable_read_only ()
891+ ->Clear ();
892+ request.set_sql (statement);
893+ auto begin_type_value = cs::internal::ToProto (cs::Value (key));
894+ (*request.mutable_param_types ())[" begin" ] =
895+ std::move (begin_type_value.first );
896+ (*request.mutable_params ()->mutable_fields ())[" begin" ] =
897+ std::move (begin_type_value.second );
898+ auto end_type_value =
899+ cs::internal::ToProto (cs::Value (key + config.query_size ));
900+ (*request.mutable_param_types ())[" end" ] = std::move (end_type_value.first );
901+ (*request.mutable_params ()->mutable_fields ())[" end" ] =
902+ std::move (end_type_value.second );
903+
904+ int row_count = 0 ;
905+ google::spanner::v1::PartialResultSet result;
906+ std::vector<google::protobuf::Value> row;
907+ grpc::ClientContext context;
908+ auto stream = stub->ExecuteStreamingSql (context, request);
909+ for (bool success = stream->Read (&result); success;
910+ success = stream->Read (&result)) {
911+ if (result.chunked_value ()) {
912+ // We do not handle chunked values in the benchmark.
913+ continue ;
914+ }
915+ row.resize (ExperimentImpl<Traits>::kColumnCount );
916+ std::size_t index = 0 ;
917+ for (auto & value : *result.mutable_values ()) {
918+ row[index] = std::move (value);
919+ if (++index == ExperimentImpl<Traits>::kColumnCount ) {
920+ ++row_count;
921+ index = 0 ;
922+ }
923+ }
924+ }
925+ auto final = stream->Finish ();
926+ timer.Stop ();
927+ samples.push_back (RowCpuSample{
928+ thread_count, client_count, true , row_count, timer.elapsed_time (),
929+ timer.cpu_time (),
930+ google::cloud::grpc_utils::MakeStatusFromRpcError (final )});
931+ }
932+ return samples;
933+ }
934+
935+ void RunIterationViaClients (Config const & config,
936+ std::vector<cs::Client> const & clients,
937+ int thread_count) {
938+ std::vector<std::future<std::vector<RowCpuSample>>> tasks (thread_count);
939+ int task_id = 0 ;
940+ for (auto & t : tasks) {
941+ auto client = clients[task_id++ % clients.size ()];
942+ t = std::async (std::launch::async, &SelectExperiment::ViaClients, this ,
943+ config, thread_count, static_cast <int >(clients.size ()),
944+ client);
945+ }
946+ for (auto & t : tasks) {
947+ impl_.DumpSamples (t.get ());
948+ }
949+ }
950+
951+ std::vector<RowCpuSample> ViaClients (Config const & config, int thread_count,
952+ int client_count, cs::Client client) {
953+ auto const statement = CreateStatement ();
954+
955+ using T = typename Traits::native_type;
956+ using RowType = std::tuple<T, T, T, T, T, T, T, T, T, T>;
957+ std::vector<RowCpuSample> samples;
958+ // We expect about 50 reads per second per thread, so allocate enough
959+ // memory to start.
960+ samples.reserve (config.iteration_duration .count () * 50 );
961+ for (auto start = std::chrono::steady_clock::now (),
962+ deadline = start + config.iteration_duration ;
963+ start < deadline; start = std::chrono::steady_clock::now ()) {
964+ auto key = impl_.RandomKeySetBegin (config);
965+
966+ SimpleTimer timer;
967+ timer.Start ();
968+ auto rows = client.ExecuteQuery (cs::SqlStatement (
969+ statement, {{" begin" , cs::Value (key)},
970+ {" end" , cs::Value (key + config.query_size )}}));
971+ int row_count = 0 ;
972+ Status status;
973+ for (auto & row : cs::StreamOf<RowType>(rows)) {
974+ if (!row) {
975+ status = std::move (row).status ();
976+ break ;
977+ }
978+ ++row_count;
979+ }
980+ timer.Stop ();
981+ samples.push_back (RowCpuSample{thread_count, client_count, false ,
982+ row_count, timer.elapsed_time (),
983+ timer.cpu_time (), std::move (status)});
984+ }
985+ return samples;
986+ }
987+
988+ std::string CreateStatement () const {
989+ std::string sql = " SELECT" ;
990+ char const * sep = " " ;
991+ for (int i = 0 ; i != ExperimentImpl<Traits>::kColumnCount ; ++i) {
992+ sql += sep;
993+ sql += " Data" + std::to_string (i);
994+ sep = " , " ;
995+ }
996+ sql += " FROM " ;
997+ sql += table_name_;
998+ sql += " WHERE Key >= @begin AND Key < @end" ;
999+ return sql;
1000+ }
1001+
1002+ ExperimentImpl<Traits> impl_;
1003+ std::string table_name_;
1004+ };
1005+
7611006/* *
7621007 * Run an experiment to measure the CPU overhead of the client over raw gRPC.
7631008 *
@@ -1352,6 +1597,14 @@ ExperimentFactory MakeReadFactory() {
13521597 };
13531598}
13541599
1600+ template <typename Trait>
1601+ ExperimentFactory MakeSelectFactory () {
1602+ using G = google::cloud::internal::DefaultPRNG;
1603+ return [](G g) {
1604+ return google::cloud::internal::make_unique<SelectExperiment<Trait>>(g);
1605+ };
1606+ }
1607+
13551608template <typename Trait>
13561609ExperimentFactory MakeUpdateFactory () {
13571610 using G = google::cloud::internal::DefaultPRNG;
@@ -1382,6 +1635,13 @@ std::map<std::string, ExperimentFactory> AvailableExperiments() {
13821635 {" read-int64" , MakeReadFactory<Int64Traits>()},
13831636 {" read-string" , MakeReadFactory<StringTraits>()},
13841637 {" read-timestamp" , MakeReadFactory<TimestampTraits>()},
1638+ {" select-bool" , MakeSelectFactory<BoolTraits>()},
1639+ {" select-bytes" , MakeSelectFactory<BytesTraits>()},
1640+ {" select-date" , MakeSelectFactory<DateTraits>()},
1641+ {" select-float64" , MakeSelectFactory<Float64Traits>()},
1642+ {" select-int64" , MakeSelectFactory<Int64Traits>()},
1643+ {" select-string" , MakeSelectFactory<StringTraits>()},
1644+ {" select-timestamp" , MakeSelectFactory<TimestampTraits>()},
13851645 {" update-bool" , MakeUpdateFactory<BoolTraits>()},
13861646 {" update-bytes" , MakeUpdateFactory<BytesTraits>()},
13871647 {" update-date" , MakeUpdateFactory<DateTraits>()},
0 commit comments