Skip to content
32 changes: 24 additions & 8 deletions google/cloud/bigtable/internal/data_connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,12 @@ class DefaultPartialResultSetReader
std::shared_ptr<OperationContext> operation_context,
std::unique_ptr<internal::StreamingReadRpc<
google::bigtable::v2::ExecuteQueryResponse>>
reader)
reader,
google::bigtable::v2::ResultSetMetadata initial_metadata)
: context_(std::move(context)),
operation_context_(std::move(operation_context)),
reader_(std::move(reader)) {}
reader_(std::move(reader)),
initial_metadata_(std::move(initial_metadata)) {}

~DefaultPartialResultSetReader() override = default;

Expand All @@ -148,10 +150,22 @@ class DefaultPartialResultSetReader
return true;
}

// Ignore metadata from the stream because PartialResultSetSource already
// has it set (in ExecuteQuery).
// TODO(#15701): Investigate expected behavior for processing metadata.
// Throw an error when there is a schema difference between
// ExecuteQueryResponse and PrepareQueryResponse.
if (response.has_metadata()) {
std::string initial_metadata_str;
std::string response_metadata_str;
bool metadata_matched =
initial_metadata_.SerializeToString(&initial_metadata_str) &&
response.metadata().SerializeToString(&response_metadata_str) &&
initial_metadata_str == response_metadata_str;
if (response.metadata().ByteSizeLong() > 0 && !metadata_matched) {
final_status_ = google::cloud::Status(
google::cloud::StatusCode::kAborted,
"Schema changed during ExecuteQuery operation");
operation_context_->PostCall(*context_, final_status_);
return false;
}
continue;
}

Expand All @@ -173,6 +187,7 @@ class DefaultPartialResultSetReader
std::unique_ptr<
internal::StreamingReadRpc<google::bigtable::v2::ExecuteQueryResponse>>
reader_;
google::bigtable::v2::ResultSetMetadata initial_metadata_;
Status final_status_;
};

Expand Down Expand Up @@ -896,8 +911,8 @@ bigtable::RowStream DataConnectionImpl::ExecuteQuery(
std::shared_ptr<OperationContext> const& operation_context) mutable
-> StatusOr<std::unique_ptr<bigtable::ResultSourceInterface>> {
auto factory =
[stub, request, tracing_enabled, tracing_options,
operation_context](std::string const& resume_token) mutable {
[stub, request, tracing_enabled, tracing_options, operation_context,
initial_metadata = metadata](std::string const& resume_token) mutable {
if (!resume_token.empty()) request.set_resume_token(resume_token);
auto context = std::make_shared<grpc::ClientContext>();
auto const& options = internal::CurrentOptions();
Expand All @@ -906,7 +921,8 @@ bigtable::RowStream DataConnectionImpl::ExecuteQuery(
auto stream = stub->ExecuteQuery(context, options, request);
std::unique_ptr<PartialResultSetReader> reader =
std::make_unique<DefaultPartialResultSetReader>(
std::move(context), operation_context, std::move(stream));
std::move(context), operation_context, std::move(stream),
std::move(initial_metadata));
if (tracing_enabled) {
reader = std::make_unique<LoggingResultSetReader>(std::move(reader),
tracing_options);
Expand Down
87 changes: 87 additions & 0 deletions google/cloud/bigtable/internal/data_connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ using ::testing::Contains;
using ::testing::ElementsAre;
using ::testing::ElementsAreArray;
using ::testing::Eq;
using ::testing::HasSubstr;
using ::testing::Matcher;
using ::testing::MockFunction;
using ::testing::Pair;
Expand Down Expand Up @@ -3695,6 +3696,92 @@ TEST_F(DataConnectionTest,
EXPECT_THAT(row2.values().at(1).get<std::string>(), IsOkAndHolds("v2"));
}

TEST_F(DataConnectionTest, ExecuteQueryFailureWithSchemaChange) {
auto factory = std::make_unique<SimpleOperationContextFactory>();

auto mock = std::make_shared<MockBigtableStub>();
auto fake_cq_impl = std::make_shared<FakeCompletionQueueImpl>();
auto mock_bg = std::make_unique<MockBackgroundThreads>();
EXPECT_CALL(*mock_bg, cq).WillRepeatedly([&]() {
return CompletionQueue{fake_cq_impl};
});
auto constexpr kResultMetadataText = R"pb(
proto_schema {
columns {
name: "row_key"
type { string_type {} }
}
columns {
name: "value"
type { string_type {} }
}
}
)pb";
v2::PrepareQueryResponse pq_response;
pq_response.set_prepared_query("test-pq-id-54321");
ASSERT_TRUE(google::protobuf::TextFormat::ParseFromString(
kResultMetadataText, pq_response.mutable_metadata()));
*pq_response.mutable_valid_until() = internal::ToProtoTimestamp(
std::chrono::system_clock::now() + std::chrono::seconds(3600));

auto constexpr kExecuteQueryResultMetadataText = R"pb(
proto_schema {
columns {
name: "row_key"
type { string_type {} }
}
columns {
name: "different_value"
type { string_type {} }
}
}
)pb";
v2::ExecuteQueryResponse eq_response;
ASSERT_TRUE(google::protobuf::TextFormat::ParseFromString(
kExecuteQueryResultMetadataText, eq_response.mutable_metadata()));

auto refresh_fn = []() {
return make_ready_future(
StatusOr<google::bigtable::v2::PrepareQueryResponse>(
Status{StatusCode::kUnimplemented, "not implemented"}));
};
EXPECT_CALL(*mock, ExecuteQuery)
.Times(3)
.WillRepeatedly(
[&](auto, auto const&,
google::bigtable::v2::ExecuteQueryRequest const& request) {
EXPECT_EQ(request.app_profile_id(), kAppProfile);
EXPECT_EQ(request.instance_name(),
"projects/test-project/instances/test-instance");
auto stream = std::make_unique<MockExecuteQueryStream>();
EXPECT_CALL(*stream, Read)
.WillOnce([&](google::bigtable::v2::ExecuteQueryResponse* r) {
*r = eq_response;
return absl::nullopt;
});
return stream;
});

auto conn = TestConnection(std::move(mock), std::move(factory));
internal::OptionsSpan span(CallOptions());
Project p("test-project");
bigtable::SqlStatement statement("SELECT * FROM the-table");
bigtable::InstanceResource instance(p, "test-instance");
auto query_plan =
QueryPlan::Create(CompletionQueue(fake_cq_impl), std::move(pq_response),
std::move(refresh_fn));
auto prepared_query =
bigtable::PreparedQuery(instance, statement, std::move(query_plan));
auto bq = prepared_query.BindParameters({});
bigtable::ExecuteQueryParams params{std::move(bq)};
auto row_stream = conn->ExecuteQuery(std::move(params));
for (auto const& row : row_stream) {
EXPECT_THAT(row,
StatusIs(StatusCode::kAborted, HasSubstr("Schema changed")));
}
fake_cq_impl->SimulateCompletion(false);
}

} // namespace
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace bigtable_internal
Expand Down