Skip to content

Commit acd2859

Browse files
authored
feat(bigtable): throw an error if there is a schema change in metadata returned by ExecuteQueryResponse (#15754)
* feat(bigtable): throw an error if there is a schema change in ExecuteQueryResponse
1 parent 829bce3 commit acd2859

File tree

2 files changed

+111
-8
lines changed

2 files changed

+111
-8
lines changed

google/cloud/bigtable/internal/data_connection_impl.cc

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -119,10 +119,12 @@ class DefaultPartialResultSetReader
119119
std::shared_ptr<OperationContext> operation_context,
120120
std::unique_ptr<internal::StreamingReadRpc<
121121
google::bigtable::v2::ExecuteQueryResponse>>
122-
reader)
122+
reader,
123+
google::bigtable::v2::ResultSetMetadata initial_metadata)
123124
: context_(std::move(context)),
124125
operation_context_(std::move(operation_context)),
125-
reader_(std::move(reader)) {}
126+
reader_(std::move(reader)),
127+
initial_metadata_(std::move(initial_metadata)) {}
126128

127129
~DefaultPartialResultSetReader() override = default;
128130

@@ -148,10 +150,22 @@ class DefaultPartialResultSetReader
148150
return true;
149151
}
150152

151-
// Ignore metadata from the stream because PartialResultSetSource already
152-
// has it set (in ExecuteQuery).
153-
// TODO(#15701): Investigate expected behavior for processing metadata.
153+
// Throw an error when there is a schema difference between
154+
// ExecuteQueryResponse and PrepareQueryResponse.
154155
if (response.has_metadata()) {
156+
std::string initial_metadata_str;
157+
std::string response_metadata_str;
158+
bool metadata_matched =
159+
initial_metadata_.SerializeToString(&initial_metadata_str) &&
160+
response.metadata().SerializeToString(&response_metadata_str) &&
161+
initial_metadata_str == response_metadata_str;
162+
if (response.metadata().ByteSizeLong() > 0 && !metadata_matched) {
163+
final_status_ = google::cloud::Status(
164+
google::cloud::StatusCode::kAborted,
165+
"Schema changed during ExecuteQuery operation");
166+
operation_context_->PostCall(*context_, final_status_);
167+
return false;
168+
}
155169
continue;
156170
}
157171

@@ -173,6 +187,7 @@ class DefaultPartialResultSetReader
173187
std::unique_ptr<
174188
internal::StreamingReadRpc<google::bigtable::v2::ExecuteQueryResponse>>
175189
reader_;
190+
google::bigtable::v2::ResultSetMetadata initial_metadata_;
176191
Status final_status_;
177192
};
178193

@@ -896,8 +911,8 @@ bigtable::RowStream DataConnectionImpl::ExecuteQuery(
896911
std::shared_ptr<OperationContext> const& operation_context) mutable
897912
-> StatusOr<std::unique_ptr<bigtable::ResultSourceInterface>> {
898913
auto factory =
899-
[stub, request, tracing_enabled, tracing_options,
900-
operation_context](std::string const& resume_token) mutable {
914+
[stub, request, tracing_enabled, tracing_options, operation_context,
915+
initial_metadata = metadata](std::string const& resume_token) mutable {
901916
if (!resume_token.empty()) request.set_resume_token(resume_token);
902917
auto context = std::make_shared<grpc::ClientContext>();
903918
auto const& options = internal::CurrentOptions();
@@ -906,7 +921,8 @@ bigtable::RowStream DataConnectionImpl::ExecuteQuery(
906921
auto stream = stub->ExecuteQuery(context, options, request);
907922
std::unique_ptr<PartialResultSetReader> reader =
908923
std::make_unique<DefaultPartialResultSetReader>(
909-
std::move(context), operation_context, std::move(stream));
924+
std::move(context), operation_context, std::move(stream),
925+
std::move(initial_metadata));
910926
if (tracing_enabled) {
911927
reader = std::make_unique<LoggingResultSetReader>(std::move(reader),
912928
tracing_options);

google/cloud/bigtable/internal/data_connection_impl_test.cc

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ using ::testing::Contains;
7777
using ::testing::ElementsAre;
7878
using ::testing::ElementsAreArray;
7979
using ::testing::Eq;
80+
using ::testing::HasSubstr;
8081
using ::testing::Matcher;
8182
using ::testing::MockFunction;
8283
using ::testing::Pair;
@@ -3695,6 +3696,92 @@ TEST_F(DataConnectionTest,
36953696
EXPECT_THAT(row2.values().at(1).get<std::string>(), IsOkAndHolds("v2"));
36963697
}
36973698

3699+
TEST_F(DataConnectionTest, ExecuteQueryFailureWithSchemaChange) {
3700+
auto factory = std::make_unique<SimpleOperationContextFactory>();
3701+
3702+
auto mock = std::make_shared<MockBigtableStub>();
3703+
auto fake_cq_impl = std::make_shared<FakeCompletionQueueImpl>();
3704+
auto mock_bg = std::make_unique<MockBackgroundThreads>();
3705+
EXPECT_CALL(*mock_bg, cq).WillRepeatedly([&]() {
3706+
return CompletionQueue{fake_cq_impl};
3707+
});
3708+
auto constexpr kResultMetadataText = R"pb(
3709+
proto_schema {
3710+
columns {
3711+
name: "row_key"
3712+
type { string_type {} }
3713+
}
3714+
columns {
3715+
name: "value"
3716+
type { string_type {} }
3717+
}
3718+
}
3719+
)pb";
3720+
v2::PrepareQueryResponse pq_response;
3721+
pq_response.set_prepared_query("test-pq-id-54321");
3722+
ASSERT_TRUE(google::protobuf::TextFormat::ParseFromString(
3723+
kResultMetadataText, pq_response.mutable_metadata()));
3724+
*pq_response.mutable_valid_until() = internal::ToProtoTimestamp(
3725+
std::chrono::system_clock::now() + std::chrono::seconds(3600));
3726+
3727+
auto constexpr kExecuteQueryResultMetadataText = R"pb(
3728+
proto_schema {
3729+
columns {
3730+
name: "row_key"
3731+
type { string_type {} }
3732+
}
3733+
columns {
3734+
name: "different_value"
3735+
type { string_type {} }
3736+
}
3737+
}
3738+
)pb";
3739+
v2::ExecuteQueryResponse eq_response;
3740+
ASSERT_TRUE(google::protobuf::TextFormat::ParseFromString(
3741+
kExecuteQueryResultMetadataText, eq_response.mutable_metadata()));
3742+
3743+
auto refresh_fn = []() {
3744+
return make_ready_future(
3745+
StatusOr<google::bigtable::v2::PrepareQueryResponse>(
3746+
Status{StatusCode::kUnimplemented, "not implemented"}));
3747+
};
3748+
EXPECT_CALL(*mock, ExecuteQuery)
3749+
.Times(3)
3750+
.WillRepeatedly(
3751+
[&](auto, auto const&,
3752+
google::bigtable::v2::ExecuteQueryRequest const& request) {
3753+
EXPECT_EQ(request.app_profile_id(), kAppProfile);
3754+
EXPECT_EQ(request.instance_name(),
3755+
"projects/test-project/instances/test-instance");
3756+
auto stream = std::make_unique<MockExecuteQueryStream>();
3757+
EXPECT_CALL(*stream, Read)
3758+
.WillOnce([&](google::bigtable::v2::ExecuteQueryResponse* r) {
3759+
*r = eq_response;
3760+
return absl::nullopt;
3761+
});
3762+
return stream;
3763+
});
3764+
3765+
auto conn = TestConnection(std::move(mock), std::move(factory));
3766+
internal::OptionsSpan span(CallOptions());
3767+
Project p("test-project");
3768+
bigtable::SqlStatement statement("SELECT * FROM the-table");
3769+
bigtable::InstanceResource instance(p, "test-instance");
3770+
auto query_plan =
3771+
QueryPlan::Create(CompletionQueue(fake_cq_impl), std::move(pq_response),
3772+
std::move(refresh_fn));
3773+
auto prepared_query =
3774+
bigtable::PreparedQuery(instance, statement, std::move(query_plan));
3775+
auto bq = prepared_query.BindParameters({});
3776+
bigtable::ExecuteQueryParams params{std::move(bq)};
3777+
auto row_stream = conn->ExecuteQuery(std::move(params));
3778+
for (auto const& row : row_stream) {
3779+
EXPECT_THAT(row,
3780+
StatusIs(StatusCode::kAborted, HasSubstr("Schema changed")));
3781+
}
3782+
fake_cq_impl->SimulateCompletion(false);
3783+
}
3784+
36983785
} // namespace
36993786
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
37003787
} // namespace bigtable_internal

0 commit comments

Comments
 (0)