Skip to content

Commit 82469c8

Browse files
authored
impl(bigtable): add operation context support and retry exhaustion detection to streaming types (#15726)
1 parent 4e642f7 commit 82469c8

11 files changed

+288
-46
lines changed

google/cloud/bigtable/internal/data_connection_impl.cc

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,12 @@ class DefaultPartialResultSetReader
148148
}
149149
}
150150

151-
Status Finish() override { return final_status_; }
151+
grpc::ClientContext const& context() const override { return *context_; }
152+
153+
Status Finish() override {
154+
// operation_context_->OnDone(final_status_);
155+
return final_status_;
156+
}
152157

153158
private:
154159
std::shared_ptr<grpc::ClientContext> context_;
@@ -840,30 +845,32 @@ bigtable::RowStream DataConnectionImpl::ExecuteQuery(
840845
google::bigtable::v2::ResultSetMetadata metadata,
841846
std::unique_ptr<bigtable::DataRetryPolicy> retry_policy_prototype,
842847
std::unique_ptr<BackoffPolicy> backoff_policy_prototype,
843-
std::shared_ptr<OperationContext> const&) mutable
848+
std::shared_ptr<OperationContext> const& operation_context) mutable
844849
-> StatusOr<std::unique_ptr<bigtable::ResultSourceInterface>> {
845-
auto factory = [stub, request, tracing_enabled,
846-
tracing_options](std::string const& resume_token) mutable {
847-
if (!resume_token.empty()) request.set_resume_token(resume_token);
848-
auto context = std::make_shared<grpc::ClientContext>();
849-
auto const& options = internal::CurrentOptions();
850-
internal::ConfigureContext(*context, options);
851-
auto stream = stub->ExecuteQuery(context, options, request);
852-
std::unique_ptr<PartialResultSetReader> reader =
853-
std::make_unique<DefaultPartialResultSetReader>(std::move(context),
854-
std::move(stream));
855-
if (tracing_enabled) {
856-
reader = std::make_unique<LoggingResultSetReader>(std::move(reader),
857-
tracing_options);
858-
}
859-
return reader;
860-
};
850+
auto factory =
851+
[stub, request, tracing_enabled, tracing_options,
852+
operation_context](std::string const& resume_token) mutable {
853+
if (!resume_token.empty()) request.set_resume_token(resume_token);
854+
auto context = std::make_shared<grpc::ClientContext>();
855+
auto const& options = internal::CurrentOptions();
856+
internal::ConfigureContext(*context, options);
857+
auto stream = stub->ExecuteQuery(context, options, request);
858+
std::unique_ptr<PartialResultSetReader> reader =
859+
std::make_unique<DefaultPartialResultSetReader>(
860+
std::move(context), std::move(stream));
861+
if (tracing_enabled) {
862+
reader = std::make_unique<LoggingResultSetReader>(std::move(reader),
863+
tracing_options);
864+
}
865+
return reader;
866+
};
861867

862868
auto rpc = std::make_unique<PartialResultSetResume>(
863869
std::move(factory), Idempotency::kIdempotent,
864870
retry_policy_prototype->clone(), backoff_policy_prototype->clone());
865871

866-
return PartialResultSetSource::Create(std::move(metadata), std::move(rpc));
872+
return PartialResultSetSource::Create(std::move(metadata),
873+
operation_context, std::move(rpc));
867874
};
868875

869876
auto operation_context = std::make_shared<OperationContext>();

google/cloud/bigtable/internal/logging_result_set_reader.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ bool LoggingResultSetReader::Read(
4343
}
4444
return success;
4545
}
46+
grpc::ClientContext const& LoggingResultSetReader::context() const {
47+
return impl_->context();
48+
}
4649

4750
Status LoggingResultSetReader::Finish() { return impl_->Finish(); }
4851

google/cloud/bigtable/internal/logging_result_set_reader.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ class LoggingResultSetReader : public PartialResultSetReader {
4141
void TryCancel() override;
4242
bool Read(absl::optional<std::string> const& resume_token,
4343
UnownedPartialResultSet& result) override;
44+
grpc::ClientContext const& context() const override;
4445
Status Finish() override;
4546

4647
private:

google/cloud/bigtable/internal/partial_result_set_reader.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include "google/cloud/status.h"
1919
#include <google/bigtable/v2/data.pb.h>
20+
#include <grpcpp/client_context.h>
2021

2122
namespace google {
2223
namespace cloud {
@@ -55,6 +56,7 @@ class PartialResultSetReader {
5556
virtual void TryCancel() = 0;
5657
virtual bool Read(absl::optional<std::string> const& resume_token,
5758
UnownedPartialResultSet& result) = 0;
59+
virtual grpc::ClientContext const& context() const = 0;
5860
virtual Status Finish() = 0;
5961
};
6062

google/cloud/bigtable/internal/partial_result_set_resume.cc

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
#include "google/cloud/bigtable/internal/partial_result_set_resume.h"
16+
#include "google/cloud/internal/retry_loop_helpers.h"
1617
#include <thread>
1718

1819
namespace google {
@@ -43,17 +44,26 @@ bool PartialResultSetResume::Read(
4344
return false;
4445
}
4546
if (idempotency_ == google::cloud::Idempotency::kNonIdempotent ||
46-
!retry_policy_prototype_->OnFailure(status)) {
47+
!retry_policy_->OnFailure(status)) {
48+
if (retry_policy_->IsExhausted()) break;
4749
return false;
4850
}
49-
std::this_thread::sleep_for(backoff_policy_prototype_->OnCompletion());
51+
std::this_thread::sleep_for(backoff_policy_->OnCompletion());
5052
resumption = true;
5153
last_status_.reset();
5254
reader_ = factory_(*resume_token);
53-
} while (!retry_policy_prototype_->IsExhausted());
55+
} while (!retry_policy_->IsExhausted());
56+
if (last_status_) {
57+
last_status_ = internal::RetryLoopError(*last_status_, __func__,
58+
retry_policy_->IsExhausted());
59+
}
5460
return false;
5561
}
5662

63+
grpc::ClientContext const& PartialResultSetResume::context() const {
64+
return reader_->context();
65+
}
66+
5767
Status PartialResultSetResume::Finish() {
5868
// Finish() can be called only once, so cache the last result.
5969
if (last_status_.has_value()) {

google/cloud/bigtable/internal/partial_result_set_resume.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,22 +45,23 @@ class PartialResultSetResume : public PartialResultSetReader {
4545
std::unique_ptr<BackoffPolicy> backoff_policy)
4646
: factory_(std::move(factory)),
4747
idempotency_(idempotency),
48-
retry_policy_prototype_(std::move(retry_policy)),
49-
backoff_policy_prototype_(std::move(backoff_policy)),
48+
retry_policy_(std::move(retry_policy)),
49+
backoff_policy_(std::move(backoff_policy)),
5050
reader_(factory_(std::string{})) {}
5151

5252
~PartialResultSetResume() override = default;
5353

5454
void TryCancel() override;
5555
bool Read(absl::optional<std::string> const& resume_token,
5656
UnownedPartialResultSet& result) override;
57+
grpc::ClientContext const& context() const override;
5758
Status Finish() override;
5859

5960
private:
6061
PartialResultSetReaderFactory factory_;
6162
google::cloud::Idempotency idempotency_;
62-
std::unique_ptr<bigtable::DataRetryPolicy> retry_policy_prototype_;
63-
std::unique_ptr<BackoffPolicy> backoff_policy_prototype_;
63+
std::unique_ptr<bigtable::DataRetryPolicy> retry_policy_;
64+
std::unique_ptr<BackoffPolicy> backoff_policy_;
6465
std::unique_ptr<PartialResultSetReader> reader_;
6566
absl::optional<Status> last_status_;
6667
};

google/cloud/bigtable/internal/partial_result_set_resume_test.cc

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,9 @@ TEST(PartialResultSetResume, TooManyTransients) {
303303
auto status = reader->Finish();
304304
EXPECT_THAT(status,
305305
StatusIs(StatusCode::kUnavailable, HasSubstr("Try again")));
306+
EXPECT_THAT(status.error_info().metadata(),
307+
::testing::Contains(::testing::Pair("gcloud-cpp.retry.reason",
308+
"retry-policy-exhausted")));
306309
}
307310

308311
TEST(PartialResultSetResume, ResumptionStart) {
@@ -321,8 +324,9 @@ TEST(PartialResultSetResume, ResumptionStart) {
321324
response.push_back(r56);
322325

323326
MockFactory mock_factory;
327+
grpc::ClientContext context;
324328
EXPECT_CALL(mock_factory, MakeReader)
325-
.WillOnce([&response](std::string const& token) {
329+
.WillOnce([&](std::string const& token) {
326330
EXPECT_TRUE(token.empty());
327331
auto mock = std::make_unique<MockPartialResultSetReader>();
328332
EXPECT_CALL(*mock, Read(_, _))
@@ -332,9 +336,13 @@ TEST(PartialResultSetResume, ResumptionStart) {
332336
EXPECT_CALL(*mock, TryCancel()).Times(0);
333337
EXPECT_CALL(*mock, Finish())
334338
.WillOnce(Return(Status(StatusCode::kUnavailable, "Try again")));
339+
EXPECT_CALL(*mock, context)
340+
.Times(1)
341+
.WillRepeatedly(
342+
[&]() -> grpc::ClientContext const& { return context; });
335343
return mock;
336344
})
337-
.WillOnce([&response](std::string const& token) {
345+
.WillOnce([&](std::string const& token) {
338346
EXPECT_TRUE(token.empty());
339347
auto mock = std::make_unique<MockPartialResultSetReader>();
340348
EXPECT_CALL(*mock, Read(_, _))
@@ -344,15 +352,19 @@ TEST(PartialResultSetResume, ResumptionStart) {
344352
.WillOnce(Return(false));
345353
EXPECT_CALL(*mock, TryCancel()).Times(0);
346354
EXPECT_CALL(*mock, Finish()).WillOnce(Return(Status()));
355+
EXPECT_CALL(*mock, context)
356+
.Times(13)
357+
.WillRepeatedly(
358+
[&]() -> grpc::ClientContext const& { return context; });
347359
return mock;
348360
});
349361

350362
auto factory = [&mock_factory](std::string const& token) {
351363
return mock_factory.MakeReader(token);
352364
};
353365
auto grpc_reader = MakeTestResume(factory, Idempotency::kIdempotent);
354-
auto reader =
355-
PartialResultSetSource::Create(metadata, std::move(grpc_reader));
366+
auto reader = PartialResultSetSource::Create(
367+
metadata, std::make_shared<OperationContext>(), std::move(grpc_reader));
356368
ASSERT_STATUS_OK(reader);
357369

358370
// Verify the returned rows are correct, despite the resumption from the
@@ -384,8 +396,9 @@ TEST(PartialResultSetResume, ResumptionMidway) {
384396
response.push_back(r56);
385397

386398
MockFactory mock_factory;
399+
grpc::ClientContext context;
387400
EXPECT_CALL(mock_factory, MakeReader)
388-
.WillOnce([&response](std::string const& token) {
401+
.WillOnce([&](std::string const& token) {
389402
EXPECT_TRUE(token.empty());
390403
auto mock = std::make_unique<MockPartialResultSetReader>();
391404
EXPECT_CALL(*mock, Read(_, _))
@@ -395,25 +408,33 @@ TEST(PartialResultSetResume, ResumptionMidway) {
395408
EXPECT_CALL(*mock, TryCancel()).Times(0);
396409
EXPECT_CALL(*mock, Finish())
397410
.WillOnce(Return(Status(StatusCode::kUnavailable, "Try again")));
411+
EXPECT_CALL(*mock, context)
412+
.Times(9)
413+
.WillRepeatedly(
414+
[&]() -> grpc::ClientContext const& { return context; });
398415
return mock;
399416
})
400-
.WillOnce([&response](std::string const& token) {
417+
.WillOnce([&](std::string const& token) {
401418
EXPECT_EQ("resume-after-4", token);
402419
auto mock = std::make_unique<MockPartialResultSetReader>();
403420
EXPECT_CALL(*mock, Read(_, _))
404421
.WillOnce(ReadAction(response[2], false))
405422
.WillOnce(Return(false));
406423
EXPECT_CALL(*mock, TryCancel()).Times(0);
407424
EXPECT_CALL(*mock, Finish()).WillOnce(Return(Status()));
425+
EXPECT_CALL(*mock, context)
426+
.Times(5)
427+
.WillRepeatedly(
428+
[&]() -> grpc::ClientContext const& { return context; });
408429
return mock;
409430
});
410431

411432
auto factory = [&mock_factory](std::string const& token) {
412433
return mock_factory.MakeReader(token);
413434
};
414435
auto grpc_reader = MakeTestResume(factory, Idempotency::kIdempotent);
415-
auto reader =
416-
PartialResultSetSource::Create(metadata, std::move(grpc_reader));
436+
auto reader = PartialResultSetSource::Create(
437+
metadata, std::make_shared<OperationContext>(), std::move(grpc_reader));
417438
ASSERT_STATUS_OK(reader);
418439

419440
// Verify the returned rows are correct, despite the resumption from a

google/cloud/bigtable/internal/partial_result_set_source.cc

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,10 @@ std::string AsString(T const& s) {
4343
StatusOr<std::unique_ptr<bigtable::ResultSourceInterface>>
4444
PartialResultSetSource::Create(
4545
absl::optional<google::bigtable::v2::ResultSetMetadata> metadata,
46+
std::shared_ptr<OperationContext> operation_context,
4647
std::unique_ptr<PartialResultSetReader> reader) {
47-
std::unique_ptr<PartialResultSetSource> source(
48-
new PartialResultSetSource(std::move(metadata), std::move(reader)));
48+
std::unique_ptr<PartialResultSetSource> source(new PartialResultSetSource(
49+
std::move(metadata), std::move(operation_context), std::move(reader)));
4950

5051
// Do an initial read from the stream to determine the fate of the factory.
5152
auto status = source->ReadFromStream();
@@ -59,9 +60,11 @@ PartialResultSetSource::Create(
5960

6061
PartialResultSetSource::PartialResultSetSource(
6162
absl::optional<google::bigtable::v2::ResultSetMetadata> metadata,
63+
std::shared_ptr<OperationContext> operation_context,
6264
std::unique_ptr<PartialResultSetReader> reader)
6365
: options_(internal::CurrentOptions()),
6466
reader_(std::move(reader)),
67+
operation_context_(std::move(operation_context)),
6568
metadata_(std::move(metadata)) {
6669
if (metadata_.has_value()) {
6770
columns_ = std::make_shared<std::vector<std::string>>();
@@ -91,19 +94,27 @@ PartialResultSetSource::~PartialResultSetSource() {
9194
}
9295
state_ = State::kFinished;
9396
}
97+
98+
operation_context_->OnDone(last_status_);
9499
}
95100

96101
StatusOr<bigtable::QueryRow> PartialResultSetSource::NextRow() {
102+
operation_context_->ElementRequest(reader_->context());
97103
while (rows_.empty()) {
98-
if (state_ == State::kFinished) return bigtable::QueryRow();
104+
if (state_ == State::kFinished) {
105+
operation_context_->ElementDelivery(reader_->context());
106+
return bigtable::QueryRow();
107+
}
99108
internal::OptionsSpan span(options_);
100109
// Continue fetching if there are more rows in the stream.
101110
auto status = ReadFromStream();
111+
last_status_ = status;
102112
if (!status.ok()) return status;
103113
}
104114
// Returns the row at the front of the queue
105115
auto row = std::move(rows_.front());
106116
rows_.pop_front();
117+
operation_context_->ElementDelivery(reader_->context());
107118
return row;
108119
}
109120

@@ -137,7 +148,8 @@ Status PartialResultSetSource::ReadFromStream() {
137148
return internal::InternalError("Stream ended with uncommitted rows.",
138149
GCP_ERROR_INFO());
139150
}
140-
return reader_->Finish();
151+
last_status_ = reader_->Finish();
152+
return last_status_;
141153
}
142154

143155
Status PartialResultSetSource::ProcessDataFromStream(

google/cloud/bigtable/internal/partial_result_set_source.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_PARTIAL_RESULT_SET_SOURCE_H
1616
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_PARTIAL_RESULT_SET_SOURCE_H
1717

18+
#include "google/cloud/bigtable/internal/operation_context.h"
1819
#include "google/cloud/bigtable/internal/partial_result_set_reader.h"
1920
#include "google/cloud/bigtable/results.h"
2021
#include "google/cloud/bigtable/value.h"
@@ -43,6 +44,7 @@ class PartialResultSetSource : public bigtable::ResultSourceInterface {
4344
/// Factory method to create a PartialResultSetSource.
4445
static StatusOr<std::unique_ptr<bigtable::ResultSourceInterface>> Create(
4546
absl::optional<google::bigtable::v2::ResultSetMetadata> metadata,
47+
std::shared_ptr<OperationContext> operation_context,
4648
std::unique_ptr<PartialResultSetReader> reader);
4749

4850
~PartialResultSetSource() override;
@@ -56,6 +58,7 @@ class PartialResultSetSource : public bigtable::ResultSourceInterface {
5658
private:
5759
explicit PartialResultSetSource(
5860
absl::optional<google::bigtable::v2::ResultSetMetadata> metadata,
61+
std::shared_ptr<OperationContext> operation_context,
5962
std::unique_ptr<PartialResultSetReader> reader);
6063

6164
Status ReadFromStream();
@@ -68,7 +71,7 @@ class PartialResultSetSource : public bigtable::ResultSourceInterface {
6871

6972
Options options_;
7073
std::unique_ptr<PartialResultSetReader> reader_;
71-
74+
std::shared_ptr<OperationContext> operation_context_;
7275
// The ResultSetMetadata is received in the first response. It is received
7376
// from ExecuteQueryResponse
7477
absl::optional<google::bigtable::v2::ResultSetMetadata> metadata_;
@@ -84,6 +87,7 @@ class PartialResultSetSource : public bigtable::ResultSourceInterface {
8487
// see a new token.
8588
absl::optional<std::string> resume_token_ = "";
8689

90+
Status last_status_;
8791
// The state of our PartialResultSetReader.
8892
enum class State {
8993
// `Read()` has yet to return nullopt.

0 commit comments

Comments
 (0)