Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ci/cloudbuild/dockerfiles/fedora-latest-bazel.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,5 @@ ENV GO_LOCATION=/usr/local/go
ENV PATH=${GO_LOCATION}/bin:${PATH}
RUN go version
WORKDIR /var/tmp/downloads/cloud-bigtable-clients-test
RUN curl -fsSL https://github.com/googleapis/cloud-bigtable-clients-test/archive/v0.0.2.tar.gz | \
RUN curl -fsSL https://github.com/googleapis/cloud-bigtable-clients-test/archive/v0.0.4.tar.gz | \
tar -xzf - --strip-components=1
20 changes: 16 additions & 4 deletions google/cloud/bigtable/internal/data_connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,14 @@ class DefaultPartialResultSetReader
return true;
}

// Ignore metadata from the stream because PartialResultSetSource already
// has it set (in ExecuteQuery).
// TODO(#15701): Investigate expected behavior for processing metadata.
if (response.has_metadata()) {
continue;
// ExecuteQuery should not return metadata
auto result_status =
Status(StatusCode::kInternal,
"Expected results response, but received: METADATA");
operation_context_->PostCall(*context_, result_status);
final_status_ = std::move(result_status);
return false;
}

final_status_ = google::cloud::Status(
Expand Down Expand Up @@ -785,6 +788,11 @@ StatusOr<bigtable::PreparedQuery> DataConnectionImpl::PrepareQuery(
return response;
});
};
if (!response->metadata().has_proto_schema() ||
response->metadata().proto_schema().columns().empty()) {
return Status(StatusCode::kInternal,
"Invalid empty ResultSetMetadata received");
}
auto query_plan = QueryPlan::Create(background_->cq(), *std::move(response),
std::move(refresh_fn));
return bigtable::PreparedQuery(
Expand Down Expand Up @@ -869,6 +877,10 @@ future<StatusOr<bigtable::PreparedQuery>> DataConnectionImpl::AsyncPrepareQuery(
return response;
});
};
if (!response->metadata().has_proto_schema() ||
response->metadata().proto_schema().columns().empty()) {
return Status(StatusCode::kInternal, "missing metadata");
}

auto query_plan = QueryPlan::Create(background_->cq(),
*std::move(response), refresh_fn);
Expand Down
75 changes: 75 additions & 0 deletions google/cloud/bigtable/test_proxy/cbt_test_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "google/cloud/bigtable/test_proxy/cbt_test_proxy.h"
#include "google/cloud/bigtable/cell.h"
#include "google/cloud/bigtable/client.h"
#include "google/cloud/bigtable/idempotent_mutation_policy.h"
#include "google/cloud/bigtable/mutations.h"
#include "google/cloud/bigtable/table.h"
Expand All @@ -34,6 +35,8 @@ namespace {
namespace v2 = ::google::bigtable::v2;
namespace testpb = ::google::bigtable::testproxy;

using ms = std::chrono::milliseconds;

// Convert `Status` to `grpc::Status`, discarding any `details`.
::grpc::Status ToGrpcStatus(Status const& status) {
return ::grpc::Status(static_cast<grpc::StatusCode>(status.code()),
Expand Down Expand Up @@ -360,6 +363,78 @@ grpc::Status CbtTestProxy::ReadModifyWriteRow(
return grpc::Status();
}

grpc::Status CbtTestProxy::ExecuteQuery(
grpc::ServerContext*,
google::bigtable::testproxy::ExecuteQueryRequest const* request,
google::bigtable::testproxy::ExecuteQueryResult* response) {
// Client options
auto retry_policy_option = DataLimitedErrorCountRetryPolicy(0).clone();
auto backoff_policy_option =
google::cloud::internal::ExponentialBackoffPolicy(ms(0), ms(0), 2.0)
.clone();
auto query_refresh_option =
bigtable::experimental::QueryPlanRefreshLimitedErrorCountRetryPolicy(0)
.clone();
auto opts =
Options{}
.set<DataRetryPolicyOption>(std::move(retry_policy_option))
.set<DataBackoffPolicyOption>(std::move(backoff_policy_option))
.set<bigtable::experimental::QueryPlanRefreshRetryPolicyOption>(
std::move(query_refresh_option));

// Retrieve connection
auto const& conn = GetConnection(request->client_id());
if (!conn.ok()) return ToGrpcStatus(std::move(conn).status());
auto client = bigtable::Client(*conn, opts);
auto const& request_proto = request->request();

// Call prepare query
auto instance = MakeInstanceResource(request_proto.instance_name());
// NOLINTNEXTLINE(deprecated-declarations)
bigtable::SqlStatement sql_statement{request_proto.query()};
auto prepared_query =
client.PrepareQuery(*std::move(instance), sql_statement);
if (!prepared_query.ok()) {
*response->mutable_status() = ToRpcStatus(prepared_query.status());
return ::grpc::Status();
}

// Bind parameters
std::unordered_map<std::string, Value> params;
for (auto const& param : request_proto.params()) {
auto value =
bigtable_internal::FromProto(param.second.type(), param.second);
params.insert(std::make_pair(param.first, std::move(value)));
}
auto bound_query = prepared_query->BindParameters(params);
auto bound_query_metadata = bound_query.response()->metadata();
RowStream result = client.ExecuteQuery(std::move(bound_query), {});

Status status;
for (auto const& row : result) {
if (!row.ok()) {
status = row.status();
GCP_LOG(INFO) << "Error reading row: " << row.status();
continue;
}
google::bigtable::testproxy::SqlRow proxy_row;
for (auto const& value : row->values()) {
*proxy_row.add_values() = bigtable_internal::ToProto(value).second;
}
*response->add_rows() = std::move(proxy_row);
}

// populate metadata
google::bigtable::testproxy::ResultSetMetadata metadata;
for (auto const& column : bound_query_metadata.proto_schema().columns()) {
*metadata.add_columns() = column;
}
*response->mutable_metadata() = metadata;

*response->mutable_status() = ToRpcStatus(status);
return grpc::Status();
}

StatusOr<std::shared_ptr<DataConnection>> CbtTestProxy::GetConnection(
std::string const& client_id) {
std::lock_guard<std::mutex> lk(mu_);
Expand Down
5 changes: 5 additions & 0 deletions google/cloud/bigtable/test_proxy/cbt_test_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ class CbtTestProxy final
::google::bigtable::testproxy::ReadModifyWriteRowRequest const* request,
::google::bigtable::testproxy::RowResult* response) override;

grpc::Status ExecuteQuery(
grpc::ServerContext* context,
google::bigtable::testproxy::ExecuteQueryRequest const* request,
google::bigtable::testproxy::ExecuteQueryResult* response) override;

private:
StatusOr<std::shared_ptr<DataConnection>> GetConnection(
std::string const& client_id);
Expand Down
52 changes: 45 additions & 7 deletions google/cloud/bigtable/value.h
Original file line number Diff line number Diff line change
Expand Up @@ -630,12 +630,6 @@ class Value {
if (!key) return std::move(key).status();
if (!value) return std::move(value).status();

// the documented behavior indicates that the last value will take
// precedence for a given key.
auto const& pos = m.find(key.value());
if (pos != m.end()) {
m.erase(pos);
}
m.insert(std::make_pair(*std::move(key), *std::move(value)));
}
return m;
Expand Down Expand Up @@ -692,6 +686,48 @@ class Value {
return std::move(*pv.mutable_array_value()->mutable_values(pos));
}

/**
* This method removes duplicate keys from a map Value proto.
* Although we represent maps with unordered_maps, in this case we use an
* ordered map to preserve the order of the array_value().
*/
static void ValidateMapValue(google::bigtable::v2::Type& t,
google::bigtable::v2::Value& v) {
if (!t.has_map_type() || v.array_value().values().empty()) return;
std::map<std::string, google::bigtable::v2::Value> m;
for (auto const& kv : v.array_value().values()) {
auto key_proto = kv.array_value().values()[0];
std::string key;
switch (key_proto.kind_case()) {
case google::bigtable::v2::Value::kStringValue:
key = key_proto.string_value();
break;
case google::bigtable::v2::Value::kBytesValue:
key = key_proto.bytes_value();
break;
case google::bigtable::v2::Value::kIntValue:
key = std::to_string(key_proto.int_value());
break;
default:
// Undefined behavior for malformed proto. We leave it as is and
// return.
return;
}
// the documented behavior indicates that the last value will take
// precedence for a given key.
auto pos = m.find(key);
if (pos != m.end()) {
m.erase(pos);
}
m.insert(std::make_pair(key, std::move(kv)));
}
auto new_array_value = google::bigtable::v2::ArrayValue();
for (auto const& kv : m) {
*new_array_value.add_values() = kv.second;
}
*v.mutable_array_value() = new_array_value;
}

// A private templated constructor that is called by all the public
// constructors to set the type_ and value_ members. The `PrivateConstructor`
// type is used so that this overload is never chosen for
Expand All @@ -704,7 +740,9 @@ class Value {
: type_(MakeTypeProto(t)), value_(MakeValueProto(std::forward<T>(t))) {}

Value(google::bigtable::v2::Type t, google::bigtable::v2::Value v)
: type_(std::move(t)), value_(std::move(v)) {}
: type_(std::move(t)), value_(std::move(v)) {
ValidateMapValue(type_, value_);
}

friend struct bigtable_internal::ValueInternals;
friend class Parameter;
Expand Down
28 changes: 28 additions & 0 deletions google/cloud/bigtable/value_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,34 @@ TEST(Value, ArrayValueBasedEquality) {
}
}

// We test that a proto with duplicate map keys will produce a Value with
// the last key-value taking precedence
TEST(Value, GetValueDuplicateKeysMap) {
// We create a Value from a simple unordered map and then add a same-key value
// to its proto value.
auto v = Value(std::unordered_map<std::string, std::string>{{"foo", "foo"}});
auto proto = bigtable_internal::ToProto(v);
EXPECT_EQ(proto.second.array_value().values_size(), 1);

auto&& key_proto = bigtable_internal::ToProto(Value("foo"));
auto&& value_proto = bigtable_internal::ToProto(Value("bar"));

google::bigtable::v2::Value item;
*item.mutable_array_value()->add_values() = key_proto.second;
*item.mutable_array_value()->add_values() = value_proto.second;
*proto.second.mutable_array_value()->add_values() = std::move(item);

EXPECT_EQ(proto.second.array_value().values_size(), 2);

// Now we convert the proto back to a Value and confirm that the value is
// "bar"
Value from_proto = bigtable_internal::FromProto(proto.first, proto.second);
using MapType = std::unordered_map<std::string, std::string>;
ASSERT_STATUS_OK(from_proto.get<MapType>());
MapType map = *from_proto.get<MapType>();
EXPECT_EQ(map["foo"], "bar");
}

TEST(Value, UnsortedKeysMapEquality) {
std::vector<std::pair<Value, Value>> const test_cases = {
{
Expand Down
Loading