Skip to content

Commit 0829f24

Browse files
authored
impl(bigquery): Job benchmark program (#12719)
1 parent e7eafe2 commit 0829f24

File tree

7 files changed

+338
-104
lines changed

7 files changed

+338
-104
lines changed

google/cloud/bigquery/v2/minimal/benchmarks/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ endforeach ()
6767
# Runs benchmark integration tests.
6868
set(experimental_bigquery_rest_client_benchmark_programs
6969
# cmake-format: sort
70-
dataset_benchmark_programs.cc job_benchmark_programs.cc
70+
dataset_benchmark_programs.cc job_readonly_benchmark_programs.cc
7171
project_benchmark_programs.cc table_benchmark_programs.cc)
7272

7373
# Export the list of benchmark integration tests to a .bzl file so we do not

google/cloud/bigquery/v2/minimal/benchmarks/dataset_benchmark_programs.cc

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -120,28 +120,24 @@ google::cloud::StatusOr<DatasetBenchmarkResult> RunDatasetBenchmark(
120120
if (now >= mark) {
121121
mark = now + test_duration / kBenchmarkProgressMarks;
122122
std::cout << "Start Time=" << absl::FormatTime(start, local_time_zone)
123-
<< std::endl
124-
<< "Current Progress Mark="
125-
<< absl::FormatTime(now, local_time_zone) << std::endl
126-
<< "Next Progress Mark="
127-
<< absl::FormatTime(mark, local_time_zone) << std::endl
128-
<< "End Time=" << absl::FormatTime(end, local_time_zone)
129-
<< std::endl
130-
<< "Number of GetDataset operations performed thus far= "
123+
<< "\nCurrent Progress Mark="
124+
<< absl::FormatTime(now, local_time_zone)
125+
<< "\nNext Progress Mark="
126+
<< absl::FormatTime(mark, local_time_zone)
127+
<< "\nEnd Time=" << absl::FormatTime(end, local_time_zone)
128+
<< "\nNumber of GetDataset operations performed thus far= "
131129
<< result.get_results.operations.size()
132-
<< ", Number of ListDatasets operations performed thus far= "
133-
<< result.list_results.operations.size() << std::endl;
134-
std::cout << "..." << std::endl;
130+
<< "\nNumber of ListDatasets operations performed thus far= "
131+
<< result.list_results.operations.size() << "\n...\n"
132+
<< std::flush;
135133
} else if (now > end) {
136-
std::cout << "Start Time=" << absl::FormatTime(start, local_time_zone)
137-
<< std::endl
138-
<< "End Time=" << absl::FormatTime(end, local_time_zone)
139-
<< std::endl
140-
<< "Total Number of GetDataset operations= "
134+
std::cout << "\nStart Time=" << absl::FormatTime(start, local_time_zone)
135+
<< "\nEnd Time=" << absl::FormatTime(end, local_time_zone)
136+
<< "\nTotal Number of GetDataset operations= "
141137
<< result.get_results.operations.size()
142-
<< ", Total Number of ListDatasets operations= "
143-
<< result.list_results.operations.size() << std::endl;
144-
std::cout << "..." << std::endl;
138+
<< "\nTotal Number of ListDatasets operations= "
139+
<< result.list_results.operations.size() << "\n...\n"
140+
<< std::flush;
145141
}
146142
}
147143
return result;
@@ -155,25 +151,31 @@ int main(int argc, char* argv[]) {
155151
auto c = config.ParseArgs(args);
156152
if (!c) {
157153
std::cerr << "Error parsing command-line arguments: " << c.status()
158-
<< std::endl;
154+
<< "\n"
155+
<< std::flush;
159156
return 1;
160157
}
161158
config = *std::move(c);
162159
}
163160

164161
if (config.ExitAfterParse()) {
165162
if (config.wants_description) {
166-
std::cout << kDescription << std::endl;
163+
std::cout << kDescription << "\n" << std::flush;
167164
}
168165
if (config.wants_help) {
169166
config.PrintUsage();
170167
}
171-
std::cout << "Exiting..." << std::endl;
168+
std::cout << "Exiting..."
169+
<< "\n"
170+
<< std::flush;
171+
;
172172
return 0;
173173
}
174174
std::cout << "# Dataset Benchmark STARTED For GetDataset() and "
175175
"ListDatasets() apis with test duration as ["
176-
<< config.test_duration.count() << "] seconds" << std::endl;
176+
<< config.test_duration.count() << "] seconds"
177+
<< "\n"
178+
<< std::flush;
177179

178180
DatasetBenchmark benchmark(config);
179181
// Start the threads running the dataset benchmark test.
@@ -207,7 +209,8 @@ int main(int argc, char* argv[]) {
207209
auto result = future.get();
208210
if (!result) {
209211
std::cerr << "Standard exception raised by task[" << count
210-
<< "]: " << result.status() << std::endl;
212+
<< "]: " << result.status() << "\n"
213+
<< std::flush;
211214
} else {
212215
append(combined, *result);
213216
}
@@ -218,7 +221,8 @@ int main(int argc, char* argv[]) {
218221
combined.get_results.elapsed = latency_test_elapsed;
219222
combined.list_results.elapsed = latency_test_elapsed;
220223
std::cout << " DONE. Elapsed Test Duration="
221-
<< FormatDuration(latency_test_elapsed) << std::endl;
224+
<< FormatDuration(latency_test_elapsed) << "\n"
225+
<< std::flush;
222226

223227
Benchmark::PrintLatencyResult(std::cout, "Latency-Results", "GetDataset()",
224228
combined.get_results);
@@ -229,7 +233,9 @@ int main(int argc, char* argv[]) {
229233
"GetDataset()", combined.get_results);
230234
Benchmark::PrintThroughputResult(std::cout, "Throughput-Results",
231235
"ListDatasets()", combined.list_results);
232-
std::cout << "# Dataset Benchmark ENDED" << std::endl;
236+
std::cout << "# Dataset Benchmark ENDED"
237+
<< "\n"
238+
<< std::flush;
233239

234240
return 0;
235241
}

google/cloud/bigquery/v2/minimal/benchmarks/experimental_bigquery_rest_client_benchmark_programs.bzl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
experimental_bigquery_rest_client_benchmark_programs = [
2020
"dataset_benchmark_programs.cc",
21-
"job_benchmark_programs.cc",
21+
"job_readonly_benchmark_programs.cc",
2222
"project_benchmark_programs.cc",
2323
"table_benchmark_programs.cc",
2424
]

google/cloud/bigquery/v2/minimal/benchmarks/job_benchmark_programs.cc

Lines changed: 0 additions & 25 deletions
This file was deleted.
Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
// Copyright 2023 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "google/cloud/bigquery/v2/minimal/benchmarks/benchmark.h"
16+
#include "google/cloud/bigquery/v2/minimal/benchmarks/benchmarks_config.h"
17+
#include "google/cloud/internal/make_status.h"
18+
#include "absl/time/clock.h"
19+
#include "absl/time/time.h"
20+
#include <cctype>
21+
#include <future>
22+
#include <iomanip>
23+
#include <sstream>
24+
25+
using ::google::cloud::StatusCode;
26+
using ::google::cloud::bigquery_v2_minimal_benchmarks::Benchmark;
27+
using ::google::cloud::bigquery_v2_minimal_benchmarks::BenchmarkResult;
28+
using ::google::cloud::bigquery_v2_minimal_benchmarks::FormatDuration;
29+
using ::google::cloud::bigquery_v2_minimal_benchmarks::JobBenchmark;
30+
using ::google::cloud::bigquery_v2_minimal_benchmarks::JobConfig;
31+
using ::google::cloud::bigquery_v2_minimal_benchmarks::OperationResult;
32+
33+
char const kDescription[] =
34+
R"""(Measures the latency of Bigquery's `GetJob()` and
35+
`ListJobs()` apis.
36+
37+
This benchmark measures the latency of Bigquery's `GetJob()` and
38+
`ListJobs()` apis. The benchmark:
39+
- Starts T threads as supplied in the command-line, executing the
40+
following loop:
41+
- Runs for the test duration as supplied in the command-line, constantly
42+
executing this basic block:
43+
- Randomly, with 50% probability, makes a rest call to `GetJob()`
44+
and `ListJobs()` apis alternatively.
45+
- If either call fail, the test returns with the failure message.
46+
- Reports progress based on the total executing time and where the
47+
test is currently.
48+
49+
The test then waits for all the threads to finish and:
50+
51+
- Collects the results from all the threads.
52+
- Reports the total running time.
53+
- Reports the latency results, including p0 (minimum), p50, p90, p95, p99, p99.9, and
54+
p100 (maximum) latencies.
55+
)""";
56+
57+
// Helper functions and types for job benchmark program.
58+
namespace {
59+
// Number of Progress report threads.
60+
constexpr int kBenchmarkProgressMarks = 4;
61+
62+
struct JobBenchmarkResult {
63+
BenchmarkResult get_results;
64+
BenchmarkResult list_results;
65+
};
66+
67+
// Gets a single job.
68+
OperationResult RunGetJob(JobBenchmark& benchmark) {
69+
auto op = [&benchmark]() -> google::cloud::Status {
70+
return benchmark.GetJob().status();
71+
};
72+
return Benchmark::TimeOperation(std::move(op));
73+
}
74+
75+
// Gets a lists of jobs.
76+
OperationResult RunListJobs(JobBenchmark& benchmark) {
77+
std::deque<OperationResult> operations;
78+
auto op = [&benchmark]() -> ::google::cloud::Status {
79+
auto jobs = benchmark.ListJobs();
80+
for (auto& job : jobs) {
81+
auto op_status = job.status();
82+
if (!op_status.ok()) {
83+
return op_status;
84+
}
85+
}
86+
87+
return ::google::cloud::Status(StatusCode::kOk, "");
88+
};
89+
return Benchmark::TimeOperation(std::move(op));
90+
}
91+
92+
// Run an iteration of the test.
93+
google::cloud::StatusOr<JobBenchmarkResult> RunJobBenchmark(
94+
JobBenchmark& benchmark, absl::Duration test_duration) {
95+
JobBenchmarkResult result = {};
96+
auto generator = google::cloud::internal::MakeDefaultPRNG();
97+
std::uniform_int_distribution<int> prng_operation(0, 1);
98+
99+
auto start = absl::Now();
100+
auto mark = start + test_duration / kBenchmarkProgressMarks;
101+
auto end = start + test_duration;
102+
auto local_time_zone = absl::LocalTimeZone();
103+
for (auto now = start; now < end; now = absl::Now()) {
104+
if (prng_operation(generator) == 0) {
105+
// Call GetJob.
106+
auto op_result = RunGetJob(benchmark);
107+
if (!op_result.status.ok()) {
108+
return op_result.status;
109+
}
110+
result.get_results.operations.emplace_back(op_result);
111+
} else {
112+
// Call ListJobs.
113+
auto op_result = RunListJobs(benchmark);
114+
if (!op_result.status.ok()) {
115+
return op_result.status;
116+
}
117+
result.list_results.operations.emplace_back(op_result);
118+
}
119+
if (now >= mark) {
120+
mark = now + test_duration / kBenchmarkProgressMarks;
121+
std::cout << "Start Time=" << absl::FormatTime(start, local_time_zone)
122+
<< "\nCurrent Progress Mark="
123+
<< absl::FormatTime(now, local_time_zone)
124+
<< "\nNext Progress Mark="
125+
<< absl::FormatTime(mark, local_time_zone)
126+
<< "\nEnd Time=" << absl::FormatTime(end, local_time_zone)
127+
<< "\nNumber of GetJob operations performed thus far= "
128+
<< result.get_results.operations.size()
129+
<< "\nNumber of ListJobs operations performed thus far= "
130+
<< result.list_results.operations.size() << "\n...\n"
131+
<< std::flush;
132+
} else if (now > end) {
133+
std::cout << "\nStart Time=" << absl::FormatTime(start, local_time_zone)
134+
<< "\nEnd Time=" << absl::FormatTime(end, local_time_zone)
135+
<< "\nTotal Number of GetJob operations= "
136+
<< result.get_results.operations.size()
137+
<< "\nTotal Number of ListJobs operations= "
138+
<< result.list_results.operations.size() << "\n...\n"
139+
<< std::flush;
140+
}
141+
}
142+
return result;
143+
}
144+
} // anonymous namespace
145+
146+
int main(int argc, char* argv[]) {
147+
JobConfig config;
148+
{
149+
std::vector<std::string> args{argv, argv + argc};
150+
auto c = config.ParseArgs(args);
151+
if (!c) {
152+
std::cerr << "Error parsing command-line arguments: " << c.status()
153+
<< "\n"
154+
<< std::flush;
155+
return 1;
156+
}
157+
config = *std::move(c);
158+
}
159+
160+
if (config.ExitAfterParse()) {
161+
if (config.wants_description) {
162+
std::cout << kDescription << "\n" << std::flush;
163+
}
164+
if (config.wants_help) {
165+
std::cout
166+
<< "The usage information for Job benchmark lists out all the "
167+
"flags needed by all the apis being benchmarked, namely: GetJob, "
168+
"ListJobs, Query, GetqueryResults and InsertJob."
169+
<< "\n"
170+
<< std::flush;
171+
config.PrintUsage();
172+
}
173+
std::cout << "Exiting..."
174+
<< "\n"
175+
<< std::flush;
176+
return 0;
177+
}
178+
std::cout << "# Job Benchmark STARTED For GetJob() and "
179+
"ListJobs() apis with test duration as ["
180+
<< config.test_duration.count() << "] seconds"
181+
<< "\n"
182+
<< std::flush;
183+
184+
JobBenchmark benchmark(config);
185+
// Start the threads running the job benchmark test.
186+
auto latency_test_start = absl::Now();
187+
std::vector<std::future<google::cloud::StatusOr<JobBenchmarkResult>>> tasks;
188+
// If the user requests only one thread, use the current thread.
189+
auto launch_policy =
190+
config.thread_count == 1 ? std::launch::deferred : std::launch::async;
191+
for (int i = 0; i != config.thread_count; ++i) {
192+
tasks.emplace_back(std::async(launch_policy, RunJobBenchmark,
193+
std::ref(benchmark),
194+
absl::FromChrono(config.test_duration)));
195+
}
196+
197+
// Wait for the threads and combine all the results.
198+
JobBenchmarkResult combined{};
199+
int count = 0;
200+
auto append = [](JobBenchmarkResult& destination,
201+
JobBenchmarkResult const& source) {
202+
auto append_ops = [](BenchmarkResult& d, BenchmarkResult const& s) {
203+
d.operations.insert(d.operations.end(), s.operations.begin(),
204+
s.operations.end());
205+
};
206+
append_ops(destination.get_results, source.get_results);
207+
append_ops(destination.list_results, source.list_results);
208+
};
209+
for (auto& future : tasks) {
210+
auto result = future.get();
211+
if (!result) {
212+
std::cerr << "Standard exception raised by task[" << count
213+
<< "]: " << result.status() << "\n"
214+
<< std::flush;
215+
} else {
216+
append(combined, *result);
217+
}
218+
++count;
219+
}
220+
auto latency_test_elapsed =
221+
absl::ToChronoMilliseconds(absl::Now() - latency_test_start);
222+
combined.get_results.elapsed = latency_test_elapsed;
223+
combined.list_results.elapsed = latency_test_elapsed;
224+
std::cout << " DONE. Elapsed Test Duration="
225+
<< FormatDuration(latency_test_elapsed) << "\n"
226+
<< std::flush;
227+
228+
Benchmark::PrintLatencyResult(std::cout, "Latency-Results", "GetJob()",
229+
combined.get_results);
230+
Benchmark::PrintLatencyResult(std::cout, "Latency-Results", "ListJobs()",
231+
combined.list_results);
232+
233+
Benchmark::PrintThroughputResult(std::cout, "Throughput-Results", "GetJob()",
234+
combined.get_results);
235+
Benchmark::PrintThroughputResult(std::cout, "Throughput-Results",
236+
"ListJobs()", combined.list_results);
237+
std::cout << "# Job Benchmark ENDED"
238+
<< "\n"
239+
<< std::flush;
240+
241+
return 0;
242+
}

0 commit comments

Comments
 (0)