Skip to content

Commit 45b56d7

Browse files
authored
feat(bigtable): add support for PartialResultSetResume (#15637)
* feat(bigtable): add support for PartialResultSetResume
1 parent 250231e commit 45b56d7

10 files changed

+519
-17
lines changed

google/cloud/bigtable/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,8 @@ add_library(
206206
internal/operation_context_factory.cc
207207
internal/operation_context_factory.h
208208
internal/partial_result_set_reader.h
209+
internal/partial_result_set_resume.cc
210+
internal/partial_result_set_resume.h
209211
internal/partial_result_set_source.cc
210212
internal/partial_result_set_source.h
211213
internal/prefix_range_end.cc
@@ -476,6 +478,7 @@ if (BUILD_TESTING)
476478
internal/mutate_rows_limiter_test.cc
477479
internal/operation_context_factory_test.cc
478480
internal/operation_context_test.cc
481+
internal/partial_result_set_resume_test.cc
479482
internal/partial_result_set_source_test.cc
480483
internal/prefix_range_end_test.cc
481484
internal/rate_limiter_test.cc

google/cloud/bigtable/bigtable_client_unit_tests.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ bigtable_client_unit_tests = [
6464
"internal/mutate_rows_limiter_test.cc",
6565
"internal/operation_context_factory_test.cc",
6666
"internal/operation_context_test.cc",
67+
"internal/partial_result_set_resume_test.cc",
6768
"internal/partial_result_set_source_test.cc",
6869
"internal/prefix_range_end_test.cc",
6970
"internal/rate_limiter_test.cc",

google/cloud/bigtable/google_cloud_cpp_bigtable.bzl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ google_cloud_cpp_bigtable_hdrs = [
102102
"internal/operation_context.h",
103103
"internal/operation_context_factory.h",
104104
"internal/partial_result_set_reader.h",
105+
"internal/partial_result_set_resume.h",
105106
"internal/partial_result_set_source.h",
106107
"internal/prefix_range_end.h",
107108
"internal/rate_limiter.h",
@@ -214,6 +215,7 @@ google_cloud_cpp_bigtable_srcs = [
214215
"internal/mutate_rows_limiter.cc",
215216
"internal/operation_context.cc",
216217
"internal/operation_context_factory.cc",
218+
"internal/partial_result_set_resume.cc",
217219
"internal/partial_result_set_source.cc",
218220
"internal/prefix_range_end.cc",
219221
"internal/rate_limiter.cc",
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "google/cloud/bigtable/internal/partial_result_set_resume.h"
16+
#include <thread>
17+
18+
namespace google {
19+
namespace cloud {
20+
namespace bigtable_internal {
21+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
22+
23+
void PartialResultSetResume::TryCancel() { reader_->TryCancel(); }
24+
25+
bool PartialResultSetResume::Read(
26+
absl::optional<std::string> const& resume_token,
27+
UnownedPartialResultSet& result) {
28+
bool resumption = false;
29+
do {
30+
if (reader_->Read(resume_token, result)) {
31+
// Let the caller know if we recreated the PartialResultSetReader using
32+
// the resume_token so that they might discard any previous results that
33+
// will be contained in the new stream.
34+
if (resumption) result.resumption = true;
35+
return true;
36+
}
37+
auto status = Finish();
38+
if (status.ok()) return false;
39+
if (!resume_token) {
40+
// Our caller has requested that we not try to resume the stream,
41+
// probably because they have already delivered previous results that
42+
// would otherwise be replayed.
43+
return false;
44+
}
45+
if (idempotency_ == google::cloud::Idempotency::kNonIdempotent ||
46+
!retry_policy_prototype_->OnFailure(status)) {
47+
return false;
48+
}
49+
std::this_thread::sleep_for(backoff_policy_prototype_->OnCompletion());
50+
resumption = true;
51+
last_status_.reset();
52+
reader_ = factory_(*resume_token);
53+
} while (!retry_policy_prototype_->IsExhausted());
54+
return false;
55+
}
56+
57+
Status PartialResultSetResume::Finish() {
58+
// Finish() can be called only once, so cache the last result.
59+
if (last_status_.has_value()) {
60+
return *last_status_;
61+
}
62+
last_status_ = reader_->Finish();
63+
return *last_status_;
64+
}
65+
66+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
67+
} // namespace bigtable_internal
68+
} // namespace cloud
69+
} // namespace google
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Copyright 2025 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// https://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_PARTIAL_RESULT_SET_RESUME_H
16+
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_PARTIAL_RESULT_SET_RESUME_H
17+
18+
#include "google/cloud/bigtable/internal/partial_result_set_reader.h"
19+
#include "google/cloud/bigtable/rpc_backoff_policy.h"
20+
#include "google/cloud/bigtable/rpc_retry_policy.h"
21+
#include "google/cloud/bigtable/version.h"
22+
#include "google/cloud/backoff_policy.h"
23+
#include "absl/types/optional.h"
24+
#include <functional>
25+
#include <memory>
26+
#include <string>
27+
28+
namespace google {
29+
namespace cloud {
30+
namespace bigtable_internal {
31+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
32+
33+
/// Create a new PartialResultSetReader given a resume token value.
34+
using PartialResultSetReaderFactory =
35+
std::function<std::unique_ptr<PartialResultSetReader>(std::string)>;
36+
37+
/**
38+
* A PartialResultSetReader that resumes the streaming RPC on retryable errors.
39+
*/
40+
class PartialResultSetResume : public PartialResultSetReader {
41+
public:
42+
PartialResultSetResume(
43+
PartialResultSetReaderFactory factory,
44+
google::cloud::Idempotency idempotency,
45+
std::unique_ptr<bigtable::RPCRetryPolicy> retry_policy,
46+
std::unique_ptr<bigtable::RPCBackoffPolicy> backoff_policy)
47+
: factory_(std::move(factory)),
48+
idempotency_(idempotency),
49+
retry_policy_prototype_(std::move(retry_policy)),
50+
backoff_policy_prototype_(std::move(backoff_policy)),
51+
reader_(factory_(std::string{})) {}
52+
53+
~PartialResultSetResume() override = default;
54+
55+
void TryCancel() override;
56+
bool Read(absl::optional<std::string> const& resume_token,
57+
UnownedPartialResultSet& result) override;
58+
Status Finish() override;
59+
60+
private:
61+
PartialResultSetReaderFactory factory_;
62+
google::cloud::Idempotency idempotency_;
63+
std::unique_ptr<bigtable::RPCRetryPolicy> retry_policy_prototype_;
64+
std::unique_ptr<bigtable::RPCBackoffPolicy> backoff_policy_prototype_;
65+
std::unique_ptr<PartialResultSetReader> reader_;
66+
absl::optional<Status> last_status_;
67+
};
68+
69+
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
70+
} // namespace bigtable_internal
71+
} // namespace cloud
72+
} // namespace google
73+
74+
#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_PARTIAL_RESULT_SET_RESUME_H

0 commit comments

Comments
 (0)