Skip to content

Commit 709e125

Browse files
authored
Merge branch 'main' into gemini_styleguide_updates
2 parents 4fc6ceb + 659a261 commit 709e125

19 files changed

+399
-43
lines changed

ci/cloudbuild/dockerfiles/checkers.Dockerfile

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ ARG ARCH=amd64
2323

2424
RUN dnf makecache && \
2525
dnf install -y \
26-
cargo \
2726
cmake \
2827
clang-tools-extra \
2928
diffutils \
@@ -35,7 +34,11 @@ RUN dnf makecache && \
3534
python-pip \
3635
ShellCheck
3736

38-
RUN cargo install typos-cli --version 1.24.1 --root /usr/local
37+
RUN dnf makecache && \
38+
dnf install -y \
39+
cargo
40+
41+
RUN cargo install typos-cli --locked --version 1.24.1 --root /usr/local
3942

4043
RUN curl -L -o /usr/bin/buildifier https://github.com/bazelbuild/buildtools/releases/download/v6.4.0/buildifier-linux-amd64 && \
4144
chmod 755 /usr/bin/buildifier

google/cloud/bigtable/client.h

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -49,36 +49,7 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
4949
* For this reason, it is recommended to reuse `Client` objects when possible.
5050
*
5151
* @par Example
52-
* @code
53-
* #include "google/cloud/bigtable/client.h"
54-
* #include "google/cloud/project.h"
55-
* #include <iostream>
56-
*
57-
* int main() {
58-
* namespace cbt = google::cloud::bigtable;
59-
* cbt::Client client(cbt::MakeDataConnection());
60-
* cbt::InstanceResource instance(google::cloud::Project("my-project"),
61-
* "my-instance");
62-
*
63-
* // Declare a parameter with a type, but no value.
64-
* cbt::SqlStatement statement(
65-
* "SELECT _key, CAST(family['qual'] AS STRING) AS value "
66-
* "FROM my-table WHERE _key = @key",
67-
* {{"key", cbt::Value(cbt::Bytes())}});
68-
*
69-
* google::cloud::StatusOr<cbt::PreparedQuery> prepared_query =
70-
* client.PrepareQuery(instance, statement);
71-
* if (!prepared_query) throw std::move(prepared_query).status();
72-
*
73-
* auto bound_query = prepared_query->BindParameters(
74-
* {{"key", cbt::Value("row-key-2")}});
75-
*
76-
* RowStream results =
77-
* client.ExecuteQuery(std::move(bound_query));
78-
*
79-
* ... // process rows
80-
* }
81-
* @endcode
52+
* @snippet data_snippets.cc prepare-and-execute-query
8253
*/
8354
class Client {
8455
public:

google/cloud/bigtable/examples/bigtable_examples_common.cc

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,24 @@ Commands::value_type MakeCommandEntry(std::string const& name,
129129
return Commands::value_type{name, std::move(adapter)};
130130
}
131131

132+
Commands::value_type MakeCommandEntry(std::string const& name,
133+
std::vector<std::string> const& args,
134+
ClientCommandType const& function) {
135+
auto command = [=](std::vector<std::string> argv) {
136+
auto constexpr kFixedArguments = 0;
137+
if ((argv.size() == 1 && argv[0] == "--help") ||
138+
argv.size() != args.size() + kFixedArguments) {
139+
std::ostringstream os;
140+
os << name;
141+
if (!args.empty()) os << " " << absl::StrJoin(args, " ");
142+
throw Usage{std::move(os).str()};
143+
}
144+
auto client = bigtable::Client(bigtable::MakeDataConnection());
145+
function(client, argv);
146+
};
147+
return {name, command};
148+
}
149+
132150
} // namespace examples
133151
} // namespace bigtable
134152
} // namespace cloud

google/cloud/bigtable/examples/bigtable_examples_common.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include "google/cloud/bigtable/admin/bigtable_instance_admin_client.h"
1919
#include "google/cloud/bigtable/admin/bigtable_table_admin_client.h"
20+
#include "google/cloud/bigtable/client.h"
2021
#include "google/cloud/bigtable/table.h"
2122
#include "google/cloud/internal/random.h"
2223
#include "google/cloud/testing_util/example_driver.h"
@@ -85,6 +86,13 @@ Commands::value_type MakeCommandEntry(std::string const& name,
8586
std::vector<std::string> const& args,
8687
TableAsyncCommandType const& command);
8788

89+
using ClientCommandType = std::function<void(google::cloud::bigtable::Client,
90+
std::vector<std::string>)>;
91+
92+
Commands::value_type MakeCommandEntry(std::string const& name,
93+
std::vector<std::string> const& args,
94+
ClientCommandType const& function);
95+
8896
} // namespace examples
8997
} // namespace bigtable
9098
} // namespace cloud

google/cloud/bigtable/examples/data_snippets.cc

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "google/cloud/bigtable/testing/cleanup_stale_resources.h"
1919
#include "google/cloud/bigtable/testing/random_names.h"
2020
//! [bigtable includes]
21+
#include "google/cloud/bigtable/client.h"
2122
#include "google/cloud/bigtable/table.h"
2223
//! [bigtable includes]
2324
#include "google/cloud/internal/getenv.h"
@@ -778,6 +779,38 @@ void RunWriteExamples(
778779
admin.DeleteTable(schema->name());
779780
}
780781

782+
void PrepareAndExecuteQuery(google::cloud::bigtable::Client client,
783+
std::vector<std::string> const& args) {
784+
// [prepare-and-execute-query] [START bigtable_api_execute_query]
785+
namespace cbt = ::google::cloud::bigtable;
786+
[](cbt::Client client, std::string const& project_id,
787+
std::string const& instance_id, std::string const& table_id) {
788+
cbt::InstanceResource instance(google::cloud::Project(project_id),
789+
instance_id);
790+
cbt::SqlStatement statement(
791+
"SELECT _key, CAST(fam['column0'] AS STRING) as value0 FROM `" +
792+
table_id + "` WHERE _key=@key",
793+
{{"key", cbt::Parameter(cbt::Bytes())}});
794+
795+
auto prepared_query = client.PrepareQuery(instance, statement);
796+
if (!prepared_query) throw std::move(prepared_query).status();
797+
798+
auto bound_query = prepared_query->BindParameters(
799+
{{"key", cbt::Value(cbt::Bytes("test-key-for-apply"))}});
800+
801+
auto results = client.ExecuteQuery(std::move(bound_query));
802+
803+
using RowType = std::tuple<cbt::Bytes, absl::optional<std::string>>;
804+
for (auto& row : cbt::StreamOf<RowType>(results)) {
805+
if (!row.ok()) throw std::move(row.status());
806+
auto v = std::get<1>(*row);
807+
std::cout << std::get<0>(*row) << "; " << (v ? *v : "null") << std::endl;
808+
}
809+
}
810+
// [prepare-and-execute-query] [END bigtable_api_execute_query]
811+
(std::move(client), args.at(0), args.at(1), args.at(2));
812+
}
813+
781814
void RunDataExamples(
782815
google::cloud::bigtable_admin::BigtableTableAdminClient admin,
783816
google::cloud::internal::DefaultPRNG& generator,
@@ -879,6 +912,11 @@ void RunDataExamples(
879912
std::cout << "Running ReadModifyWrite() example [3]" << std::endl;
880913
ReadModifyWrite(table, {"read-modify-write"});
881914

915+
if (!google::cloud::bigtable::examples::UsingEmulator()) {
916+
auto client = cbt::Client(cbt::MakeDataConnection());
917+
std::cout << "Running PrepareAndExecuteQuery() example" << std::endl;
918+
PrepareAndExecuteQuery(client, {project_id, instance_id, table_id});
919+
}
882920
admin.DeleteTable(schema->name());
883921
}
884922

@@ -946,6 +984,9 @@ int main(int argc, char* argv[]) try {
946984
MakeCommandEntry("write-batch", {}, WriteBatch),
947985
MakeCommandEntry("write-increment", {}, WriteIncrement),
948986
MakeCommandEntry("write-conditional", {}, WriteConditionally),
987+
MakeCommandEntry("prepare-and-execute-query",
988+
{"<project_id>", "<instance_id>", "<table_id>"},
989+
PrepareAndExecuteQuery),
949990
{"auto", RunAll},
950991
};
951992

google/cloud/bigtable/internal/data_connection_impl.cc

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -119,10 +119,12 @@ class DefaultPartialResultSetReader
119119
std::shared_ptr<OperationContext> operation_context,
120120
std::unique_ptr<internal::StreamingReadRpc<
121121
google::bigtable::v2::ExecuteQueryResponse>>
122-
reader)
122+
reader,
123+
google::bigtable::v2::ResultSetMetadata initial_metadata)
123124
: context_(std::move(context)),
124125
operation_context_(std::move(operation_context)),
125-
reader_(std::move(reader)) {}
126+
reader_(std::move(reader)),
127+
initial_metadata_(std::move(initial_metadata)) {}
126128

127129
~DefaultPartialResultSetReader() override = default;
128130

@@ -148,10 +150,22 @@ class DefaultPartialResultSetReader
148150
return true;
149151
}
150152

151-
// Ignore metadata from the stream because PartialResultSetSource already
152-
// has it set (in ExecuteQuery).
153-
// TODO(#15701): Investigate expected behavior for processing metadata.
153+
// Throw an error when there is a schema difference between
154+
// ExecuteQueryResponse and PrepareQueryResponse.
154155
if (response.has_metadata()) {
156+
std::string initial_metadata_str;
157+
std::string response_metadata_str;
158+
bool metadata_matched =
159+
initial_metadata_.SerializeToString(&initial_metadata_str) &&
160+
response.metadata().SerializeToString(&response_metadata_str) &&
161+
initial_metadata_str == response_metadata_str;
162+
if (response.metadata().ByteSizeLong() > 0 && !metadata_matched) {
163+
final_status_ = google::cloud::Status(
164+
google::cloud::StatusCode::kAborted,
165+
"Schema changed during ExecuteQuery operation");
166+
operation_context_->PostCall(*context_, final_status_);
167+
return false;
168+
}
155169
continue;
156170
}
157171

@@ -173,6 +187,7 @@ class DefaultPartialResultSetReader
173187
std::unique_ptr<
174188
internal::StreamingReadRpc<google::bigtable::v2::ExecuteQueryResponse>>
175189
reader_;
190+
google::bigtable::v2::ResultSetMetadata initial_metadata_;
176191
Status final_status_;
177192
};
178193

@@ -896,8 +911,8 @@ bigtable::RowStream DataConnectionImpl::ExecuteQuery(
896911
std::shared_ptr<OperationContext> const& operation_context) mutable
897912
-> StatusOr<std::unique_ptr<bigtable::ResultSourceInterface>> {
898913
auto factory =
899-
[stub, request, tracing_enabled, tracing_options,
900-
operation_context](std::string const& resume_token) mutable {
914+
[stub, request, tracing_enabled, tracing_options, operation_context,
915+
initial_metadata = metadata](std::string const& resume_token) mutable {
901916
if (!resume_token.empty()) request.set_resume_token(resume_token);
902917
auto context = std::make_shared<grpc::ClientContext>();
903918
auto const& options = internal::CurrentOptions();
@@ -906,7 +921,8 @@ bigtable::RowStream DataConnectionImpl::ExecuteQuery(
906921
auto stream = stub->ExecuteQuery(context, options, request);
907922
std::unique_ptr<PartialResultSetReader> reader =
908923
std::make_unique<DefaultPartialResultSetReader>(
909-
std::move(context), operation_context, std::move(stream));
924+
std::move(context), operation_context, std::move(stream),
925+
std::move(initial_metadata));
910926
if (tracing_enabled) {
911927
reader = std::make_unique<LoggingResultSetReader>(std::move(reader),
912928
tracing_options);

google/cloud/bigtable/internal/data_connection_impl_test.cc

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ using ::testing::Contains;
7777
using ::testing::ElementsAre;
7878
using ::testing::ElementsAreArray;
7979
using ::testing::Eq;
80+
using ::testing::HasSubstr;
8081
using ::testing::Matcher;
8182
using ::testing::MockFunction;
8283
using ::testing::Pair;
@@ -3695,6 +3696,92 @@ TEST_F(DataConnectionTest,
36953696
EXPECT_THAT(row2.values().at(1).get<std::string>(), IsOkAndHolds("v2"));
36963697
}
36973698

3699+
TEST_F(DataConnectionTest, ExecuteQueryFailureWithSchemaChange) {
3700+
auto factory = std::make_unique<SimpleOperationContextFactory>();
3701+
3702+
auto mock = std::make_shared<MockBigtableStub>();
3703+
auto fake_cq_impl = std::make_shared<FakeCompletionQueueImpl>();
3704+
auto mock_bg = std::make_unique<MockBackgroundThreads>();
3705+
EXPECT_CALL(*mock_bg, cq).WillRepeatedly([&]() {
3706+
return CompletionQueue{fake_cq_impl};
3707+
});
3708+
auto constexpr kResultMetadataText = R"pb(
3709+
proto_schema {
3710+
columns {
3711+
name: "row_key"
3712+
type { string_type {} }
3713+
}
3714+
columns {
3715+
name: "value"
3716+
type { string_type {} }
3717+
}
3718+
}
3719+
)pb";
3720+
v2::PrepareQueryResponse pq_response;
3721+
pq_response.set_prepared_query("test-pq-id-54321");
3722+
ASSERT_TRUE(google::protobuf::TextFormat::ParseFromString(
3723+
kResultMetadataText, pq_response.mutable_metadata()));
3724+
*pq_response.mutable_valid_until() = internal::ToProtoTimestamp(
3725+
std::chrono::system_clock::now() + std::chrono::seconds(3600));
3726+
3727+
auto constexpr kExecuteQueryResultMetadataText = R"pb(
3728+
proto_schema {
3729+
columns {
3730+
name: "row_key"
3731+
type { string_type {} }
3732+
}
3733+
columns {
3734+
name: "different_value"
3735+
type { string_type {} }
3736+
}
3737+
}
3738+
)pb";
3739+
v2::ExecuteQueryResponse eq_response;
3740+
ASSERT_TRUE(google::protobuf::TextFormat::ParseFromString(
3741+
kExecuteQueryResultMetadataText, eq_response.mutable_metadata()));
3742+
3743+
auto refresh_fn = []() {
3744+
return make_ready_future(
3745+
StatusOr<google::bigtable::v2::PrepareQueryResponse>(
3746+
Status{StatusCode::kUnimplemented, "not implemented"}));
3747+
};
3748+
EXPECT_CALL(*mock, ExecuteQuery)
3749+
.Times(3)
3750+
.WillRepeatedly(
3751+
[&](auto, auto const&,
3752+
google::bigtable::v2::ExecuteQueryRequest const& request) {
3753+
EXPECT_EQ(request.app_profile_id(), kAppProfile);
3754+
EXPECT_EQ(request.instance_name(),
3755+
"projects/test-project/instances/test-instance");
3756+
auto stream = std::make_unique<MockExecuteQueryStream>();
3757+
EXPECT_CALL(*stream, Read)
3758+
.WillOnce([&](google::bigtable::v2::ExecuteQueryResponse* r) {
3759+
*r = eq_response;
3760+
return absl::nullopt;
3761+
});
3762+
return stream;
3763+
});
3764+
3765+
auto conn = TestConnection(std::move(mock), std::move(factory));
3766+
internal::OptionsSpan span(CallOptions());
3767+
Project p("test-project");
3768+
bigtable::SqlStatement statement("SELECT * FROM the-table");
3769+
bigtable::InstanceResource instance(p, "test-instance");
3770+
auto query_plan =
3771+
QueryPlan::Create(CompletionQueue(fake_cq_impl), std::move(pq_response),
3772+
std::move(refresh_fn));
3773+
auto prepared_query =
3774+
bigtable::PreparedQuery(instance, statement, std::move(query_plan));
3775+
auto bq = prepared_query.BindParameters({});
3776+
bigtable::ExecuteQueryParams params{std::move(bq)};
3777+
auto row_stream = conn->ExecuteQuery(std::move(params));
3778+
for (auto const& row : row_stream) {
3779+
EXPECT_THAT(row,
3780+
StatusIs(StatusCode::kAborted, HasSubstr("Schema changed")));
3781+
}
3782+
fake_cq_impl->SimulateCompletion(false);
3783+
}
3784+
36983785
} // namespace
36993786
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
37003787
} // namespace bigtable_internal

google/cloud/bigtable/internal/query_plan.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ std::shared_ptr<QueryPlan> QueryPlan::Create(
3535
return plan;
3636
}
3737

38+
QueryPlan::~QueryPlan() {
39+
if (refresh_timer_.valid()) refresh_timer_.cancel();
40+
}
41+
3842
void QueryPlan::Initialize() {
3943
std::unique_lock<std::mutex> lock(mu_);
4044
if (state_ == RefreshState::kDone) ScheduleRefresh(lock);

google/cloud/bigtable/internal/query_plan.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ class QueryPlan : public std::enable_shared_from_this<QueryPlan> {
4343
StatusOr<google::bigtable::v2::PrepareQueryResponse> response,
4444
RefreshFn fn, std::shared_ptr<Clock> clock = std::make_shared<Clock>());
4545

46+
~QueryPlan();
47+
4648
// Invalidates the current QueryPlan and triggers a refresh.
4749
void Invalidate(Status status, std::string const& invalid_query_plan_id);
4850

google/cloud/bigtable/internal/query_plan_test.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,10 +229,11 @@ TEST(QueryPlanTest, CreateFailedPlanAndRefresh) {
229229
}
230230

231231
// TODO(#15695): For reasons not yet understood, the fedora m32 CI build has
232-
// failures not seen in m64 builds when the number of threads is "too" high.
232+
// failures not seen in m64 builds when the number of threads is "too" high.
233+
// These failures occur while trying to create more than 500 threads.
233234
constexpr int LimitNumThreadsOn32Bit(int num_threads) {
234235
#if INTPTR_MAX == INT32_MAX
235-
return std::min(num_threads, 500);
236+
return std::min(num_threads, 200);
236237
#else
237238
return num_threads;
238239
#endif

0 commit comments

Comments
 (0)