Skip to content

Commit 8e77e51

Browse files
committed
feat(bigtable): throw an error if there is a schema change in ExecuteQueryResponse
1 parent a3f8ffb commit 8e77e51

File tree

2 files changed

+108
-9
lines changed

2 files changed

+108
-9
lines changed

google/cloud/bigtable/internal/data_connection_impl.cc

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -118,10 +118,12 @@ class DefaultPartialResultSetReader
118118
std::shared_ptr<OperationContext> operation_context,
119119
std::unique_ptr<internal::StreamingReadRpc<
120120
google::bigtable::v2::ExecuteQueryResponse>>
121-
reader)
121+
reader,
122+
google::bigtable::v2::ResultSetMetadata initial_metadata)
122123
: context_(std::move(context)),
123124
operation_context_(std::move(operation_context)),
124-
reader_(std::move(reader)) {}
125+
reader_(std::move(reader)),
126+
initial_metadata_(std::move(initial_metadata)) {}
125127

126128
~DefaultPartialResultSetReader() override = default;
127129

@@ -147,11 +149,20 @@ class DefaultPartialResultSetReader
147149
return true;
148150
}
149151

150-
// Ignore metadata from the stream because PartialResultSetSource already
151-
// has it set (in ExecuteQuery).
152-
// TODO(#15701): Investigate expected behavior for processing metadata.
152+
// A metadata message after the first one indicates a schema change.
153153
if (response.has_metadata()) {
154-
continue;
154+
std::string initial_metadata_str, response_metadata_str;
155+
if (response.metadata().ByteSizeLong() == 0 ||
156+
(initial_metadata_.SerializeToString(&initial_metadata_str) &&
157+
response.metadata().SerializeToString(&response_metadata_str) &&
158+
initial_metadata_str == response_metadata_str)) {
159+
continue;
160+
}
161+
final_status_ = google::cloud::Status(
162+
google::cloud::StatusCode::kAborted,
163+
"Schema changed during ExecuteQuery operation");
164+
operation_context_->PostCall(*context_, final_status_);
165+
return false;
155166
}
156167

157168
final_status_ = google::cloud::Status(
@@ -172,6 +183,7 @@ class DefaultPartialResultSetReader
172183
std::unique_ptr<
173184
internal::StreamingReadRpc<google::bigtable::v2::ExecuteQueryResponse>>
174185
reader_;
186+
google::bigtable::v2::ResultSetMetadata initial_metadata_;
175187
Status final_status_;
176188
};
177189

@@ -893,8 +905,8 @@ bigtable::RowStream DataConnectionImpl::ExecuteQuery(
893905
std::shared_ptr<OperationContext> const& operation_context) mutable
894906
-> StatusOr<std::unique_ptr<bigtable::ResultSourceInterface>> {
895907
auto factory =
896-
[stub, request, tracing_enabled, tracing_options,
897-
operation_context](std::string const& resume_token) mutable {
908+
[stub, request, tracing_enabled, tracing_options, operation_context,
909+
metadata](std::string const& resume_token) mutable {
898910
if (!resume_token.empty()) request.set_resume_token(resume_token);
899911
auto context = std::make_shared<grpc::ClientContext>();
900912
auto const& options = internal::CurrentOptions();
@@ -903,7 +915,8 @@ bigtable::RowStream DataConnectionImpl::ExecuteQuery(
903915
auto stream = stub->ExecuteQuery(context, options, request);
904916
std::unique_ptr<PartialResultSetReader> reader =
905917
std::make_unique<DefaultPartialResultSetReader>(
906-
std::move(context), operation_context, std::move(stream));
918+
std::move(context), operation_context, std::move(stream),
919+
metadata);
907920
if (tracing_enabled) {
908921
reader = std::make_unique<LoggingResultSetReader>(std::move(reader),
909922
tracing_options);

google/cloud/bigtable/internal/data_connection_impl_test.cc

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3694,6 +3694,92 @@ TEST_F(DataConnectionTest,
36943694
EXPECT_THAT(row2.values().at(1).get<std::string>(), IsOkAndHolds("v2"));
36953695
}
36963696

3697+
TEST_F(DataConnectionTest, ExecuteQueryFailureWithSchemaChange) {
3698+
auto factory = std::make_unique<SimpleOperationContextFactory>();
3699+
3700+
auto mock = std::make_shared<MockBigtableStub>();
3701+
auto fake_cq_impl = std::make_shared<FakeCompletionQueueImpl>();
3702+
auto mock_bg = std::make_unique<MockBackgroundThreads>();
3703+
EXPECT_CALL(*mock_bg, cq).WillRepeatedly([&]() {
3704+
return CompletionQueue{fake_cq_impl};
3705+
});
3706+
auto constexpr kResultMetadataText = R"pb(
3707+
proto_schema {
3708+
columns {
3709+
name: "row_key"
3710+
type { string_type {} }
3711+
}
3712+
columns {
3713+
name: "value"
3714+
type { string_type {} }
3715+
}
3716+
}
3717+
)pb";
3718+
v2::PrepareQueryResponse pq_response;
3719+
pq_response.set_prepared_query("test-pq-id-54321");
3720+
ASSERT_TRUE(google::protobuf::TextFormat::ParseFromString(
3721+
kResultMetadataText, pq_response.mutable_metadata()));
3722+
*pq_response.mutable_valid_until() = internal::ToProtoTimestamp(
3723+
std::chrono::system_clock::now() + std::chrono::seconds(3600));
3724+
3725+
auto constexpr kExecuteQueryResultMetadataText = R"pb(
3726+
proto_schema {
3727+
columns {
3728+
name: "row_key"
3729+
type { string_type {} }
3730+
}
3731+
columns {
3732+
name: "different_value"
3733+
type { string_type {} }
3734+
}
3735+
}
3736+
)pb";
3737+
v2::ExecuteQueryResponse eq_response;
3738+
ASSERT_TRUE(google::protobuf::TextFormat::ParseFromString(
3739+
kExecuteQueryResultMetadataText, eq_response.mutable_metadata()));
3740+
3741+
auto refresh_fn = []() {
3742+
return make_ready_future(
3743+
StatusOr<google::bigtable::v2::PrepareQueryResponse>(
3744+
Status{StatusCode::kUnimplemented, "not implemented"}));
3745+
};
3746+
// EXPECT_CALL(*mock, ExecuteQuery)
3747+
EXPECT_CALL(*mock, ExecuteQuery)
3748+
.Times(3)
3749+
.WillRepeatedly(
3750+
[&](auto, auto const&,
3751+
google::bigtable::v2::ExecuteQueryRequest const& request) {
3752+
EXPECT_EQ(request.app_profile_id(), kAppProfile);
3753+
EXPECT_EQ(request.instance_name(),
3754+
"projects/test-project/instances/test-instance");
3755+
auto stream = std::make_unique<MockExecuteQueryStream>();
3756+
EXPECT_CALL(*stream, Read)
3757+
.WillOnce([&](google::bigtable::v2::ExecuteQueryResponse* r) {
3758+
*r = eq_response;
3759+
return absl::nullopt;
3760+
});
3761+
return stream;
3762+
});
3763+
3764+
auto conn = TestConnection(std::move(mock), std::move(factory));
3765+
internal::OptionsSpan span(CallOptions());
3766+
Project p("test-project");
3767+
bigtable::SqlStatement statement("SELECT * FROM the-table");
3768+
bigtable::InstanceResource instance(p, "test-instance");
3769+
auto query_plan =
3770+
QueryPlan::Create(CompletionQueue(fake_cq_impl), std::move(pq_response),
3771+
std::move(refresh_fn));
3772+
auto prepared_query =
3773+
bigtable::PreparedQuery(instance, statement, std::move(query_plan));
3774+
auto bq = prepared_query.BindParameters({});
3775+
bigtable::ExecuteQueryParams params{std::move(bq)};
3776+
auto row_stream = conn->ExecuteQuery(std::move(params));
3777+
for (auto const& row : row_stream) {
3778+
EXPECT_THAT(row, StatusIs(StatusCode::kAborted));
3779+
}
3780+
fake_cq_impl->SimulateCompletion(false);
3781+
}
3782+
36973783
} // namespace
36983784
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
36993785
} // namespace bigtable_internal

0 commit comments

Comments
 (0)