|
14 | 14 |
|
15 | 15 | #include "google/cloud/bigtable/test_proxy/cbt_test_proxy.h" |
16 | 16 | #include "google/cloud/bigtable/cell.h" |
| 17 | +#include "google/cloud/bigtable/client.h" |
17 | 18 | #include "google/cloud/bigtable/idempotent_mutation_policy.h" |
18 | 19 | #include "google/cloud/bigtable/mutations.h" |
19 | 20 | #include "google/cloud/bigtable/table.h" |
@@ -360,6 +361,70 @@ grpc::Status CbtTestProxy::ReadModifyWriteRow( |
360 | 361 | return grpc::Status(); |
361 | 362 | } |
362 | 363 |
|
| 364 | +grpc::Status CbtTestProxy::ExecuteQuery( |
| 365 | + grpc::ServerContext*, |
| 366 | + google::bigtable::testproxy::ExecuteQueryRequest const* request, |
| 367 | + google::bigtable::testproxy::ExecuteQueryResult* response) { |
| 368 | + // Retrieve connection |
| 369 | + auto const& conn = GetConnection(request->client_id()); |
| 370 | + if (!conn.ok()) return ToGrpcStatus(std::move(conn).status()); |
| 371 | + auto client = bigtable::Client(*conn); |
| 372 | + auto const& request_proto = request->request(); |
| 373 | + |
| 374 | + // Call prepare query |
| 375 | + auto instance = MakeInstanceResource(request_proto.instance_name()); |
| 376 | + // NOLINTNEXTLINE(deprecated-declarations) |
| 377 | + bigtable::SqlStatement sql_statement{request_proto.query()}; |
| 378 | + auto prepared_query = |
| 379 | + client.PrepareQuery(*std::move(instance), sql_statement); |
| 380 | + if (!prepared_query.ok()) { |
| 381 | + *response->mutable_status() = ToRpcStatus(prepared_query.status()); |
| 382 | + return ::grpc::Status(); |
| 383 | + } |
| 384 | + |
| 385 | + // Bind parameters |
| 386 | + std::unordered_map<std::string, Value> params; |
| 387 | + for (auto const& param : request_proto.params()) { |
| 388 | + auto value = |
| 389 | + bigtable_internal::FromProto(param.second.type(), param.second); |
| 390 | + params.insert(std::make_pair(param.first, std::move(value))); |
| 391 | + } |
| 392 | + auto bound_query = prepared_query->BindParameters(params); |
| 393 | + auto bound_query_metadata = bound_query.response()->metadata(); |
| 394 | + |
| 395 | + RowStream result = client.ExecuteQuery(std::move(bound_query), {}); |
| 396 | + |
| 397 | + Status status; |
| 398 | + std::vector<google::bigtable::testproxy::SqlRow> proxy_rows; |
| 399 | + for (auto& row : result) { |
| 400 | + if (!row.ok()) { |
| 401 | + status = row.status(); |
| 402 | + break; |
| 403 | + } |
| 404 | + google::bigtable::testproxy::SqlRow proxy_row; |
| 405 | + for (auto const& v : row->values()) { |
| 406 | + *proxy_row.add_values() = bigtable_internal::ToProto(v).second; |
| 407 | + } |
| 408 | + proxy_rows.push_back(std::move(proxy_row)); |
| 409 | + } |
| 410 | + |
| 411 | + if (status.ok()) { |
| 412 | + for (auto& p : proxy_rows) { |
| 413 | + *response->add_rows() = std::move(p); |
| 414 | + } |
| 415 | + } |
| 416 | + |
| 417 | + // populate metadata |
| 418 | + google::bigtable::testproxy::ResultSetMetadata metadata; |
| 419 | + for (auto const& column : bound_query_metadata.proto_schema().columns()) { |
| 420 | + *metadata.add_columns() = column; |
| 421 | + } |
| 422 | + *response->mutable_metadata() = metadata; |
| 423 | + |
| 424 | + *response->mutable_status() = ToRpcStatus(status); |
| 425 | + return grpc::Status(); |
| 426 | +} |
| 427 | + |
363 | 428 | StatusOr<std::shared_ptr<DataConnection>> CbtTestProxy::GetConnection( |
364 | 429 | std::string const& client_id) { |
365 | 430 | std::lock_guard<std::mutex> lk(mu_); |
|
0 commit comments