Skip to content

Commit 9d26abf

Browse files
authored
impl(bigtable): support RetryInfo in ReadRow(s) (#13536)
1 parent 94355b0 commit 9d26abf

File tree

4 files changed

+107
-31
lines changed

4 files changed

+107
-31
lines changed

google/cloud/bigtable/internal/data_connection_impl.cc

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,9 @@ bigtable::RowReader DataConnectionImpl::ReadRowsFull(
195195
auto impl = std::make_shared<DefaultRowReader>(
196196
stub_, std::move(params.app_profile_id), std::move(params.table_name),
197197
std::move(params.row_set), params.rows_limit, std::move(params.filter),
198-
params.reverse, retry_policy(*current), backoff_policy(*current));
198+
params.reverse, retry_policy(*current), backoff_policy(*current),
199+
// TODO(#13514) - use Option value.
200+
false);
199201
return MakeRowReader(std::move(impl));
200202
}
201203

google/cloud/bigtable/internal/default_row_reader.cc

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
#include "google/cloud/bigtable/internal/default_row_reader.h"
16+
#include "google/cloud/bigtable/internal/retry_info_helper.h"
1617
#include "google/cloud/bigtable/table.h"
1718
#include "google/cloud/grpc_error_delegate.h"
1819

@@ -26,7 +27,8 @@ DefaultRowReader::DefaultRowReader(
2627
std::string table_name, bigtable::RowSet row_set, std::int64_t rows_limit,
2728
bigtable::Filter filter, bool reverse,
2829
std::unique_ptr<bigtable::DataRetryPolicy> retry_policy,
29-
std::unique_ptr<BackoffPolicy> backoff_policy, Sleeper sleeper)
30+
std::unique_ptr<BackoffPolicy> backoff_policy, bool use_server_retry_info,
31+
Sleeper sleeper)
3032
: stub_(std::move(stub)),
3133
app_profile_id_(std::move(app_profile_id)),
3234
table_name_(std::move(table_name)),
@@ -36,6 +38,7 @@ DefaultRowReader::DefaultRowReader(
3638
reverse_(reverse),
3739
retry_policy_(std::move(retry_policy)),
3840
backoff_policy_(std::move(backoff_policy)),
41+
use_server_retry_info_(use_server_retry_info),
3942
sleeper_(std::move(sleeper)) {}
4043

4144
void DefaultRowReader::MakeRequest() {
@@ -125,9 +128,10 @@ absl::variant<Status, bigtable::Row> DefaultRowReader::Advance() {
125128
// If we receive an error, but the retryable set is empty, stop.
126129
if (row_set_.IsEmpty()) return Status{};
127130

128-
if (!retry_policy_->OnFailure(status)) return status;
129-
130-
sleeper_(backoff_policy_->OnCompletion());
131+
auto delay = BackoffOrBreak(use_server_retry_info_, status, *retry_policy_,
132+
*backoff_policy_);
133+
if (!delay) return status;
134+
sleeper_(*delay);
131135

132136
// If we reach this place, we failed and need to restart the call.
133137
MakeRequest();

google/cloud/bigtable/internal/default_row_reader.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ class DefaultRowReader : public RowReaderImpl {
5353
std::string table_name, bigtable::RowSet row_set, std::int64_t rows_limit,
5454
bigtable::Filter filter, bool reverse,
5555
std::unique_ptr<bigtable::DataRetryPolicy> retry_policy,
56-
std::unique_ptr<BackoffPolicy> backoff_policy,
56+
std::unique_ptr<BackoffPolicy> backoff_policy, bool use_server_retry_info,
5757
Sleeper sleeper = [](auto d) { std::this_thread::sleep_for(d); });
5858

5959
~DefaultRowReader() override;
@@ -99,6 +99,7 @@ class DefaultRowReader : public RowReaderImpl {
9999
bool reverse_;
100100
std::unique_ptr<bigtable::DataRetryPolicy> retry_policy_;
101101
std::unique_ptr<BackoffPolicy> backoff_policy_;
102+
bool use_server_retry_info_;
102103
Sleeper sleeper_;
103104
std::shared_ptr<grpc::ClientContext> context_;
104105
RetryContext retry_context_;

google/cloud/bigtable/internal/default_row_reader_test.cc

Lines changed: 94 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ TEST_F(DefaultRowReaderTest, EmptyReaderHasNoRows) {
138138
auto impl = std::make_shared<DefaultRowReader>(
139139
mock, kAppProfile, kTableName, bigtable::RowSet(),
140140
bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(),
141-
false, retry_.clone(), backoff_.clone());
141+
false, retry_.clone(), backoff_.clone(), false);
142142
auto reader = bigtable_internal::MakeRowReader(std::move(impl));
143143
EXPECT_THAT(StatusOrRowKeys(reader), IsEmpty());
144144
}
@@ -161,7 +161,7 @@ TEST_F(DefaultRowReaderTest, ReadOneRow) {
161161
auto impl = std::make_shared<DefaultRowReader>(
162162
mock, kAppProfile, kTableName, bigtable::RowSet(),
163163
bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(),
164-
false, retry_.clone(), backoff_.clone());
164+
false, retry_.clone(), backoff_.clone(), false);
165165
auto reader = bigtable_internal::MakeRowReader(std::move(impl));
166166
EXPECT_THAT(StatusOrRowKeys(reader), ElementsAre(IsOkAndHolds("r1")));
167167
}
@@ -188,7 +188,7 @@ TEST_F(DefaultRowReaderTest, StreamIsDrained) {
188188
auto impl = std::make_shared<DefaultRowReader>(
189189
mock, kAppProfile, kTableName, bigtable::RowSet(),
190190
bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(),
191-
false, retry_.clone(), backoff_.clone());
191+
false, retry_.clone(), backoff_.clone(), false);
192192
auto reader = bigtable_internal::MakeRowReader(std::move(impl));
193193

194194
auto it = reader.begin();
@@ -226,7 +226,7 @@ TEST_F(DefaultRowReaderTest, RetryThenSuccess) {
226226
auto impl = std::make_shared<DefaultRowReader>(
227227
mock, kAppProfile, kTableName, bigtable::RowSet(),
228228
bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(),
229-
false, retry_.clone(), backoff_.clone());
229+
false, retry_.clone(), backoff_.clone(), false);
230230
auto reader = bigtable_internal::MakeRowReader(std::move(impl));
231231
EXPECT_THAT(StatusOrRowKeys(reader), ElementsAre(IsOkAndHolds("r1")));
232232
}
@@ -248,7 +248,7 @@ TEST_F(DefaultRowReaderTest, NoRetryOnPermanentError) {
248248
auto impl = std::make_shared<DefaultRowReader>(
249249
mock, kAppProfile, kTableName, bigtable::RowSet(),
250250
bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(),
251-
false, retry_.clone(), backoff_.clone());
251+
false, retry_.clone(), backoff_.clone(), false);
252252
auto reader = bigtable_internal::MakeRowReader(std::move(impl));
253253
EXPECT_THAT(StatusOrRowKeys(reader),
254254
ElementsAre(StatusIs(StatusCode::kPermissionDenied)));
@@ -275,13 +275,14 @@ TEST_F(DefaultRowReaderTest, RetryPolicyExhausted) {
275275
.Times(kNumRetries)
276276
.WillRepeatedly(Return(ms(10)));
277277

278-
MockFunction<void(std::chrono::milliseconds)> mock_sleeper;
278+
MockFunction<void(ms)> mock_sleeper;
279279
EXPECT_CALL(mock_sleeper, Call(ms(10))).Times(kNumRetries);
280280

281281
auto impl = std::make_shared<DefaultRowReader>(
282282
mock, kAppProfile, kTableName, bigtable::RowSet(),
283283
bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(),
284-
false, retry_.clone(), std::move(backoff), mock_sleeper.AsStdFunction());
284+
false, retry_.clone(), std::move(backoff), false,
285+
mock_sleeper.AsStdFunction());
285286
auto reader = bigtable_internal::MakeRowReader(std::move(impl));
286287
EXPECT_THAT(StatusOrRowKeys(reader),
287288
ElementsAre(StatusIs(StatusCode::kUnavailable)));
@@ -316,7 +317,7 @@ TEST_F(DefaultRowReaderTest, RetrySkipsAlreadyReadRows) {
316317
auto impl = std::make_shared<DefaultRowReader>(
317318
mock, kAppProfile, kTableName, bigtable::RowSet("r1", "r2"),
318319
bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(),
319-
false, retry_.clone(), backoff_.clone());
320+
false, retry_.clone(), backoff_.clone(), false);
320321
auto reader = bigtable_internal::MakeRowReader(std::move(impl));
321322
EXPECT_THAT(StatusOrRowKeys(reader), ElementsAre(IsOkAndHolds("r1")));
322323
}
@@ -359,7 +360,7 @@ TEST_F(DefaultRowReaderTest, RetrySkipsAlreadyScannedRows) {
359360
auto impl = std::make_shared<DefaultRowReader>(
360361
mock, kAppProfile, kTableName, bigtable::RowSet("r1", "r2", "r3"),
361362
bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(),
362-
false, retry_.clone(), backoff_.clone());
363+
false, retry_.clone(), backoff_.clone(), false);
363364
auto reader = bigtable_internal::MakeRowReader(std::move(impl));
364365
EXPECT_THAT(StatusOrRowKeys(reader), ElementsAre(IsOkAndHolds("r1")));
365366
}
@@ -394,7 +395,7 @@ TEST_F(DefaultRowReaderTest, FailedParseIsRetried) {
394395
auto impl = std::make_shared<DefaultRowReader>(
395396
mock, kAppProfile, kTableName, bigtable::RowSet(),
396397
bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(),
397-
false, std::move(retry), backoff_.clone());
398+
false, std::move(retry), backoff_.clone(), false);
398399
auto reader = bigtable_internal::MakeRowReader(std::move(impl));
399400
EXPECT_THAT(StatusOrRowKeys(reader), ElementsAre(IsOkAndHolds("r1")));
400401
}
@@ -433,7 +434,7 @@ TEST_F(DefaultRowReaderTest, FailedParseSkipsAlreadyReadRows) {
433434
auto impl = std::make_shared<DefaultRowReader>(
434435
mock, kAppProfile, kTableName, bigtable::RowSet("r1", "r2"),
435436
bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(),
436-
false, std::move(retry), backoff_.clone());
437+
false, std::move(retry), backoff_.clone(), false);
437438
auto reader = bigtable_internal::MakeRowReader(std::move(impl));
438439
EXPECT_THAT(StatusOrRowKeys(reader), ElementsAre(IsOkAndHolds("r1")));
439440
}
@@ -481,7 +482,7 @@ TEST_F(DefaultRowReaderTest, FailedParseSkipsAlreadyScannedRows) {
481482
auto impl = std::make_shared<DefaultRowReader>(
482483
mock, kAppProfile, kTableName, bigtable::RowSet("r1", "r2", "r3"),
483484
bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(),
484-
false, std::move(retry), backoff_.clone());
485+
false, std::move(retry), backoff_.clone(), false);
485486
auto reader = bigtable_internal::MakeRowReader(std::move(impl));
486487
EXPECT_THAT(StatusOrRowKeys(reader), ElementsAre(IsOkAndHolds("r1")));
487488
}
@@ -506,7 +507,7 @@ TEST_F(DefaultRowReaderTest, FailedParseWithPermanentError) {
506507
auto impl = std::make_shared<DefaultRowReader>(
507508
mock, kAppProfile, kTableName, bigtable::RowSet(),
508509
bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(),
509-
false, retry_.clone(), backoff_.clone());
510+
false, retry_.clone(), backoff_.clone(), false);
510511
auto reader = bigtable_internal::MakeRowReader(std::move(impl));
511512
EXPECT_THAT(StatusOrRowKeys(reader),
512513
ElementsAre(StatusIs(StatusCode::kInternal)));
@@ -532,7 +533,7 @@ TEST_F(DefaultRowReaderTest, NoRetryOnEmptyRowSet) {
532533
auto impl = std::make_shared<DefaultRowReader>(
533534
mock, kAppProfile, kTableName, bigtable::RowSet("r1", "r2"),
534535
bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(),
535-
false, retry_.clone(), backoff_.clone());
536+
false, retry_.clone(), backoff_.clone(), false);
536537
auto reader = bigtable_internal::MakeRowReader(std::move(impl));
537538
EXPECT_THAT(StatusOrRowKeys(reader), ElementsAre(IsOkAndHolds("r2")));
538539
}
@@ -554,7 +555,7 @@ TEST_F(DefaultRowReaderTest, RowLimitIsSent) {
554555
auto impl = std::make_shared<DefaultRowReader>(
555556
mock, kAppProfile, kTableName, bigtable::RowSet(), 42,
556557
bigtable::Filter::PassAllFilter(), false, retry_.clone(),
557-
backoff_.clone());
558+
backoff_.clone(), false);
558559
auto reader = bigtable_internal::MakeRowReader(std::move(impl));
559560
EXPECT_THAT(StatusOrRowKeys(reader), IsEmpty());
560561
}
@@ -586,7 +587,7 @@ TEST_F(DefaultRowReaderTest, RowLimitIsDecreasedOnRetry) {
586587
auto impl = std::make_shared<DefaultRowReader>(
587588
mock, kAppProfile, kTableName, bigtable::RowSet(), 42,
588589
bigtable::Filter::PassAllFilter(), false, retry_.clone(),
589-
backoff_.clone());
590+
backoff_.clone(), false);
590591
auto reader = bigtable_internal::MakeRowReader(std::move(impl));
591592
EXPECT_THAT(StatusOrRowKeys(reader), ElementsAre(IsOkAndHolds("r1")));
592593
}
@@ -612,7 +613,7 @@ TEST_F(DefaultRowReaderTest, NoRetryIfRowLimitReached) {
612613
auto impl = std::make_shared<DefaultRowReader>(
613614
mock, kAppProfile, kTableName, bigtable::RowSet(), 1,
614615
bigtable::Filter::PassAllFilter(), false, retry_.clone(),
615-
backoff_.clone());
616+
backoff_.clone(), false);
616617
auto reader = bigtable_internal::MakeRowReader(std::move(impl));
617618
EXPECT_THAT(StatusOrRowKeys(reader), ElementsAre(IsOkAndHolds("r1")));
618619
}
@@ -639,7 +640,7 @@ TEST_F(DefaultRowReaderTest, CancelDrainsStream) {
639640
auto impl = std::make_shared<DefaultRowReader>(
640641
mock, kAppProfile, kTableName, bigtable::RowSet(),
641642
bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(),
642-
false, retry_.clone(), backoff_.clone());
643+
false, retry_.clone(), backoff_.clone(), false);
643644
auto reader = bigtable_internal::MakeRowReader(std::move(impl));
644645

645646
auto it = reader.begin();
@@ -664,7 +665,7 @@ TEST_F(DefaultRowReaderTest, CancelBeforeBegin) {
664665
auto impl = std::make_shared<DefaultRowReader>(
665666
mock, kAppProfile, kTableName, bigtable::RowSet(),
666667
bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(),
667-
false, retry_.clone(), backoff_.clone());
668+
false, retry_.clone(), backoff_.clone(), false);
668669
auto reader = bigtable_internal::MakeRowReader(std::move(impl));
669670

670671
// Manually cancel the call before a stream was created.
@@ -689,7 +690,7 @@ TEST_F(DefaultRowReaderTest, RowReaderConstructorDoesNotCallRpc) {
689690
auto impl = std::make_shared<DefaultRowReader>(
690691
mock, kAppProfile, kTableName, bigtable::RowSet(),
691692
bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(),
692-
false, retry_.clone(), backoff_.clone());
693+
false, retry_.clone(), backoff_.clone(), false);
693694
auto reader = bigtable_internal::MakeRowReader(std::move(impl));
694695
}
695696

@@ -718,7 +719,7 @@ TEST_F(DefaultRowReaderTest, RetryUsesNewContext) {
718719
auto impl = std::make_shared<DefaultRowReader>(
719720
mock, kAppProfile, kTableName, bigtable::RowSet(),
720721
bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(),
721-
false, retry_.clone(), backoff_.clone());
722+
false, retry_.clone(), backoff_.clone(), false);
722723
auto reader = bigtable_internal::MakeRowReader(std::move(impl));
723724
EXPECT_THAT(StatusOrRowKeys(reader),
724725
ElementsAre(StatusIs(StatusCode::kUnavailable)));
@@ -745,7 +746,7 @@ TEST_F(DefaultRowReaderTest, ReverseScanSuccess) {
745746
auto impl = std::make_shared<DefaultRowReader>(
746747
mock, kAppProfile, kTableName, bigtable::RowSet(),
747748
bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(),
748-
true, retry_.clone(), backoff_.clone());
749+
true, retry_.clone(), backoff_.clone(), false);
749750
auto reader = bigtable_internal::MakeRowReader(std::move(impl));
750751
EXPECT_THAT(
751752
StatusOrRowKeys(reader),
@@ -773,7 +774,7 @@ TEST_F(DefaultRowReaderTest, ReverseScanFailsOnIncreasingRowKeyOrder) {
773774
auto impl = std::make_shared<DefaultRowReader>(
774775
mock, kAppProfile, kTableName, bigtable::RowSet(),
775776
bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(),
776-
true, retry_.clone(), backoff_.clone());
777+
true, retry_.clone(), backoff_.clone(), false);
777778
auto reader = bigtable_internal::MakeRowReader(std::move(impl));
778779
EXPECT_THAT(
779780
StatusOrRowKeys(reader),
@@ -821,7 +822,7 @@ TEST_F(DefaultRowReaderTest, ReverseScanResumption) {
821822
auto impl = std::make_shared<DefaultRowReader>(
822823
mock, kAppProfile, kTableName, bigtable::RowSet("r1", "r2", "r3"),
823824
bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(),
824-
true, retry_.clone(), backoff_.clone());
825+
true, retry_.clone(), backoff_.clone(), false);
825826
auto reader = bigtable_internal::MakeRowReader(std::move(impl));
826827
EXPECT_THAT(StatusOrRowKeys(reader), ElementsAre(IsOkAndHolds("r3")));
827828
}
@@ -854,12 +855,80 @@ TEST_F(DefaultRowReaderTest, BigtableCookies) {
854855
auto impl = std::make_shared<DefaultRowReader>(
855856
mock, kAppProfile, kTableName, bigtable::RowSet("r1"),
856857
bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(),
857-
true, retry_.clone(), backoff_.clone());
858+
true, retry_.clone(), backoff_.clone(), false);
858859
auto reader = bigtable_internal::MakeRowReader(std::move(impl));
859860
EXPECT_THAT(StatusOrRowKeys(reader),
860861
ElementsAre(StatusIs(StatusCode::kPermissionDenied)));
861862
}
862863

864+
TEST_F(DefaultRowReaderTest, RetryInfoHeeded) {
865+
auto const delay = ms(std::chrono::minutes(5));
866+
auto mock = std::make_shared<MockBigtableStub>();
867+
EXPECT_CALL(*mock, ReadRows)
868+
.WillOnce([delay](auto, auto const&,
869+
google::bigtable::v2::ReadRowsRequest const&) {
870+
auto stream = std::make_unique<MockReadRowsStream>();
871+
EXPECT_CALL(*stream, Read).WillOnce([delay] {
872+
auto s = internal::ResourceExhaustedError("try again");
873+
internal::SetRetryInfo(s, internal::RetryInfo{delay});
874+
return s;
875+
});
876+
return stream;
877+
})
878+
.WillOnce([](auto, auto const&,
879+
google::bigtable::v2::ReadRowsRequest const& request) {
880+
EXPECT_THAT(request, HasCorrectResourceNames());
881+
auto stream = std::make_unique<MockReadRowsStream>();
882+
EXPECT_CALL(*stream, Read)
883+
.WillOnce(Return(MakeRow("r1")))
884+
.WillOnce(Return(Status()));
885+
return stream;
886+
});
887+
888+
MockFunction<void(ms)> mock_sleeper;
889+
EXPECT_CALL(mock_sleeper, Call(delay));
890+
891+
internal::OptionsSpan span(TestOptions(/*expected_streams=*/2));
892+
893+
auto impl = std::make_shared<DefaultRowReader>(
894+
mock, kAppProfile, kTableName, bigtable::RowSet(),
895+
bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(),
896+
false, retry_.clone(), backoff_.clone(), true,
897+
mock_sleeper.AsStdFunction());
898+
auto reader = bigtable_internal::MakeRowReader(std::move(impl));
899+
EXPECT_THAT(StatusOrRowKeys(reader), ElementsAre(IsOkAndHolds("r1")));
900+
}
901+
902+
TEST_F(DefaultRowReaderTest, RetryInfoIgnored) {
903+
auto const delay = ms(std::chrono::minutes(5));
904+
auto mock = std::make_shared<MockBigtableStub>();
905+
EXPECT_CALL(*mock, ReadRows)
906+
.WillOnce([delay](auto, auto const&,
907+
google::bigtable::v2::ReadRowsRequest const&) {
908+
auto stream = std::make_unique<MockReadRowsStream>();
909+
EXPECT_CALL(*stream, Read).WillOnce([delay] {
910+
auto s = internal::ResourceExhaustedError("try again");
911+
internal::SetRetryInfo(s, internal::RetryInfo{delay});
912+
return s;
913+
});
914+
return stream;
915+
});
916+
917+
MockFunction<void(ms)> mock_sleeper;
918+
EXPECT_CALL(mock_sleeper, Call).Times(0);
919+
920+
internal::OptionsSpan span(TestOptions(/*expected_streams=*/1));
921+
922+
auto impl = std::make_shared<DefaultRowReader>(
923+
mock, kAppProfile, kTableName, bigtable::RowSet(),
924+
bigtable::RowReader::NO_ROWS_LIMIT, bigtable::Filter::PassAllFilter(),
925+
false, retry_.clone(), backoff_.clone(), false,
926+
mock_sleeper.AsStdFunction());
927+
auto reader = bigtable_internal::MakeRowReader(std::move(impl));
928+
EXPECT_THAT(StatusOrRowKeys(reader),
929+
ElementsAre(StatusIs(StatusCode::kResourceExhausted)));
930+
}
931+
863932
} // namespace
864933
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
865934
} // namespace bigtable_internal

0 commit comments

Comments
 (0)