Skip to content

Commit f5c2fd0

Browse files
authored
impl(bigtable): AsyncReadRows traced backoffs (#11599)
1 parent b89901f commit f5c2fd0

File tree

3 files changed

+83
-3
lines changed

3 files changed

+83
-3
lines changed

google/cloud/bigtable/internal/async_row_reader.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
#include "google/cloud/bigtable/internal/async_row_reader.h"
1616
#include "google/cloud/bigtable/version.h"
17+
#include "google/cloud/internal/grpc_opentelemetry.h"
1718
#include "google/cloud/log.h"
1819

1920
namespace google {
@@ -40,7 +41,7 @@ void AsyncRowReader::MakeRequest() {
4041
}
4142
parser_ = bigtable::internal::ReadRowsParserFactory().Create();
4243

43-
internal::OptionsSpan span(options_);
44+
internal::ScopedCallContext scope(call_context_);
4445
auto context = std::make_shared<grpc::ClientContext>();
4546
internal::ConfigureContext(*context, internal::CurrentOptions());
4647

@@ -206,7 +207,7 @@ void AsyncRowReader::OnStreamFinished(Status status) {
206207
return;
207208
}
208209
auto self = this->shared_from_this();
209-
cq_.MakeRelativeTimer(backoff_policy_->OnCompletion())
210+
internal::TracedAsyncBackoff(cq_, backoff_policy_->OnCompletion())
210211
.then(
211212
[self](
212213
future<StatusOr<std::chrono::system_clock::time_point>> result) {

google/cloud/bigtable/internal/async_row_reader.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "google/cloud/completion_queue.h"
2727
#include "google/cloud/future.h"
2828
#include "google/cloud/grpc_options.h"
29+
#include "google/cloud/internal/call_context.h"
2930
#include "google/cloud/status_or.h"
3031
#include "absl/types/optional.h"
3132
#include <chrono>
@@ -159,7 +160,7 @@ class AsyncRowReader : public std::enable_shared_from_this<AsyncRowReader> {
159160
Status status_;
160161
/// Tracks the level of recursion of TryGiveRowToUser
161162
int recursion_level_ = 0;
162-
Options options_ = internal::CurrentOptions();
163+
internal::CallContext call_context_;
163164
};
164165

165166
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END

google/cloud/bigtable/internal/async_row_reader_test.cc

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,12 @@
1515
#include "google/cloud/bigtable/internal/async_row_reader.h"
1616
#include "google/cloud/bigtable/row_reader.h"
1717
#include "google/cloud/bigtable/testing/mock_bigtable_stub.h"
18+
#include "google/cloud/internal/async_streaming_read_rpc_impl.h"
19+
#include "google/cloud/internal/background_threads_impl.h"
20+
#include "google/cloud/internal/opentelemetry.h"
1821
#include "google/cloud/testing_util/mock_backoff_policy.h"
1922
#include "google/cloud/testing_util/mock_completion_queue_impl.h"
23+
#include "google/cloud/testing_util/opentelemetry_matchers.h"
2024
#include "google/cloud/testing_util/status_matchers.h"
2125
#include "absl/strings/str_format.h"
2226
#include <gmock/gmock.h>
@@ -1115,6 +1119,80 @@ TEST(AsyncRowReaderTest, CurrentOptionsContinuedOnRetries) {
11151119
timer_promise.set_value(make_status_or(std::chrono::system_clock::now()));
11161120
}
11171121

1122+
#ifdef GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY
1123+
using ::google::cloud::testing_util::EnableTracing;
1124+
using ::google::cloud::testing_util::IsActive;
1125+
using ::google::cloud::testing_util::SpanNamed;
1126+
using ::testing::AllOf;
1127+
using ::testing::Each;
1128+
using ::testing::SizeIs;
1129+
using ErrorStream = internal::AsyncStreamingReadRpcError<v2::ReadRowsResponse>;
1130+
1131+
TEST(AsyncRowReaderTest, TracedBackoff) {
1132+
auto span_catcher = testing_util::InstallSpanCatcher();
1133+
1134+
auto mock = std::make_shared<MockBigtableStub>();
1135+
EXPECT_CALL(*mock, AsyncReadRows).Times(kNumRetries + 1).WillRepeatedly([] {
1136+
return std::make_unique<ErrorStream>(TransientError());
1137+
});
1138+
1139+
promise<void> p;
1140+
internal::AutomaticallyCreatedBackgroundThreads background;
1141+
auto on_row = [](bigtable::Row const&) { return make_ready_future(true); };
1142+
auto on_finish = [&p](Status const&) { p.set_value(); };
1143+
1144+
auto retry = DataLimitedErrorCountRetryPolicy(kNumRetries).clone();
1145+
auto mock_b = std::make_unique<MockBackoffPolicy>();
1146+
EXPECT_CALL(*mock_b, OnCompletion).Times(kNumRetries);
1147+
1148+
internal::OptionsSpan o(EnableTracing(Options{}));
1149+
AsyncRowReader::Create(background.cq(), mock, kAppProfile, kTableName,
1150+
std::move(on_row), std::move(on_finish),
1151+
bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT,
1152+
bigtable::Filter::PassAllFilter(), std::move(retry),
1153+
std::move(mock_b));
1154+
1155+
// Block until the async call has completed.
1156+
p.get_future().get();
1157+
1158+
EXPECT_THAT(span_catcher->GetSpans(),
1159+
AllOf(SizeIs(kNumRetries), Each(SpanNamed("Async Backoff"))));
1160+
}
1161+
1162+
TEST(AsyncRowReaderTest, CallSpanActiveThroughout) {
1163+
auto span_catcher = testing_util::InstallSpanCatcher();
1164+
1165+
auto span = internal::MakeSpan("span");
1166+
1167+
auto mock = std::make_shared<MockBigtableStub>();
1168+
EXPECT_CALL(*mock, AsyncReadRows)
1169+
.Times(kNumRetries + 1)
1170+
.WillRepeatedly([span] {
1171+
EXPECT_THAT(span, IsActive());
1172+
return std::make_unique<ErrorStream>(TransientError());
1173+
});
1174+
1175+
promise<void> p;
1176+
internal::AutomaticallyCreatedBackgroundThreads background;
1177+
auto on_row = [](bigtable::Row const&) { return make_ready_future(true); };
1178+
auto on_finish = [&p](Status const&) { p.set_value(); };
1179+
auto retry = DataLimitedErrorCountRetryPolicy(kNumRetries).clone();
1180+
auto mock_b = std::make_unique<MockBackoffPolicy>();
1181+
EXPECT_CALL(*mock_b, OnCompletion).Times(kNumRetries);
1182+
1183+
auto scope = opentelemetry::trace::Scope(span);
1184+
internal::OptionsSpan o(EnableTracing(Options{}));
1185+
AsyncRowReader::Create(background.cq(), mock, kAppProfile, kTableName,
1186+
std::move(on_row), std::move(on_finish),
1187+
bigtable::RowSet(), bigtable::RowReader::NO_ROWS_LIMIT,
1188+
bigtable::Filter::PassAllFilter(), std::move(retry),
1189+
std::move(mock_b));
1190+
1191+
// Block until the async call has completed.
1192+
p.get_future().get();
1193+
}
1194+
#endif // GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY
1195+
11181196
} // namespace
11191197
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
11201198
} // namespace bigtable_internal

0 commit comments

Comments
 (0)