Skip to content

Commit fa02b1b

Browse files
authored
feat(bigtable): add simple integration test for query support (#15718)
1 parent ac0b7ca commit fa02b1b

File tree

2 files changed

+179
-10
lines changed

2 files changed

+179
-10
lines changed

google/cloud/bigtable/client.cc

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,30 +13,32 @@
1313
// limitations under the License.
1414

1515
#include "google/cloud/bigtable/client.h"
16+
#include "google/cloud/options.h"
1617
#include "internal/partial_result_set_source.h"
1718

1819
namespace google {
1920
namespace cloud {
2021
namespace bigtable {
2122
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
2223

23-
StatusOr<PreparedQuery> Client::PrepareQuery(
24-
InstanceResource const& instance, SqlStatement const& statement,
25-
// NOLINTNEXTLINE(performance-unnecessary-value-param)
26-
Options) {
27-
PrepareQueryParams params{std::move(instance), std::move(statement)};
28-
return conn_->PrepareQuery(std::move(params));
24+
using ::google::cloud::internal::MergeOptions;
25+
using ::google::cloud::internal::OptionsSpan;
26+
27+
StatusOr<PreparedQuery> Client::PrepareQuery(InstanceResource const& instance,
28+
SqlStatement const& statement,
29+
Options opts) {
30+
OptionsSpan span(MergeOptions(std::move(opts), opts_));
31+
return conn_->PrepareQuery({std::move(instance), std::move(statement)});
2932
}
3033

3134
future<StatusOr<PreparedQuery>> Client::AsyncPrepareQuery(
3235
// NOLINTNEXTLINE(performance-unnecessary-value-param)
3336
InstanceResource const& instance, SqlStatement const& statement, Options) {
34-
PrepareQueryParams params{std::move(instance), std::move(statement)};
35-
return conn_->AsyncPrepareQuery(std::move(params));
37+
return conn_->AsyncPrepareQuery({std::move(instance), std::move(statement)});
3638
}
3739

38-
// NOLINTNEXTLINE(performance-unnecessary-value-param)
39-
RowStream Client::ExecuteQuery(BoundQuery&& bound_query, Options) {
40+
RowStream Client::ExecuteQuery(BoundQuery&& bound_query, Options opts) {
41+
OptionsSpan span(MergeOptions(std::move(opts), opts_));
4042
return conn_->ExecuteQuery({std::move(bound_query)});
4143
}
4244

google/cloud/bigtable/tests/data_integration_test.cc

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,17 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
#include "google/cloud/bigtable/client.h"
1516
#include "google/cloud/bigtable/internal/defaults.h"
17+
#include "google/cloud/bigtable/options.h"
18+
#include "google/cloud/bigtable/retry_policy.h"
1619
#include "google/cloud/bigtable/testing/table_integration_test.h"
1720
#include "google/cloud/log.h"
1821
#include "google/cloud/testing_util/chrono_literals.h"
1922
#include "google/cloud/testing_util/scoped_environment.h"
2023
#include "google/cloud/testing_util/scoped_log.h"
2124
#include "google/cloud/testing_util/status_matchers.h"
25+
#include "absl/strings/str_format.h"
2226
#include <thread>
2327

2428
namespace google {
@@ -36,6 +40,7 @@ using ::std::chrono::milliseconds;
3640
using ::testing::Contains;
3741
using ::testing::ElementsAre;
3842
using ::testing::HasSubstr;
43+
using ms = std::chrono::milliseconds;
3944

4045
class DataIntegrationTest : public TableIntegrationTest,
4146
public ::testing::WithParamInterface<std::string> {
@@ -673,6 +678,168 @@ TEST(ConnectionRefresh, Frequent) {
673678
Apply(table, row_key, created);
674679
}
675680

681+
TEST_P(DataIntegrationTest, SingleColumnQuery) {
682+
if (UsingCloudBigtableEmulator()) GTEST_SKIP();
683+
auto const table_id = testing::TableTestEnvironment::table_id();
684+
auto retry_policy_option = DataLimitedErrorCountRetryPolicy(0).clone();
685+
auto backoff_policy_option =
686+
google::cloud::internal::ExponentialBackoffPolicy(ms(0), ms(0), 2.0)
687+
.clone();
688+
auto query_refresh_option =
689+
bigtable::experimental::QueryPlanRefreshLimitedErrorCountRetryPolicy(0)
690+
.clone();
691+
auto opts =
692+
Options{}
693+
.set<DataRetryPolicyOption>(std::move(retry_policy_option))
694+
.set<DataBackoffPolicyOption>(std::move(backoff_policy_option))
695+
.set<bigtable::experimental::QueryPlanRefreshRetryPolicyOption>(
696+
std::move(query_refresh_option));
697+
auto connection = google::cloud::bigtable::MakeDataConnection(opts);
698+
auto table =
699+
Table(connection, TableResource(project_id(), instance_id(), table_id));
700+
std::string const row_key = "row-key-for-client-query-test";
701+
std::string const family = kFamily4;
702+
std::string const column1 = "c1";
703+
std::string const column2 = "c2";
704+
std::string const value1 = "v1";
705+
std::string const value2 = "v2";
706+
707+
std::vector<Cell> created{
708+
{row_key, family, column1, 0, value1},
709+
{row_key, family, column2, 0, value2},
710+
};
711+
BulkApply(table, created);
712+
auto client = Client(connection, opts);
713+
std::vector<std::string> full_table_path =
714+
absl::StrSplit(table.table_name(), '/');
715+
auto table_name = full_table_path.back();
716+
std::string quoted_table_name = "`" + table_name + "`";
717+
Project project(project_id());
718+
InstanceResource instance_resource(project, instance_id());
719+
auto prepared_query = client.PrepareQuery(
720+
instance_resource,
721+
SqlStatement("SELECT CAST(family4['c1'] AS STRING) AS c1 FROM " +
722+
quoted_table_name + " WHERE _key = '" + row_key + "'"));
723+
ASSERT_STATUS_OK(prepared_query);
724+
725+
auto bound_query = prepared_query->BindParameters({});
726+
auto row_stream = client.ExecuteQuery(std::move(bound_query));
727+
std::vector<StatusOr<bigtable::QueryRow>> rows;
728+
for (auto const& row : std::move(row_stream)) {
729+
rows.push_back(row);
730+
}
731+
732+
ASSERT_EQ(rows.size(), 1);
733+
ASSERT_STATUS_OK(rows[0]);
734+
auto const& row1 = *rows[0];
735+
ASSERT_EQ(row1.columns().size(), 1);
736+
EXPECT_EQ(row1.columns().at(0), "c1");
737+
auto value = row1.get<std::string>("c1");
738+
ASSERT_STATUS_OK(value);
739+
EXPECT_EQ(*value, value1);
740+
}
741+
742+
TEST_P(DataIntegrationTest, SingleColumnQueryWithHistory) {
743+
if (UsingCloudBigtableEmulator()) GTEST_SKIP();
744+
auto const table_id = testing::TableTestEnvironment::table_id();
745+
auto retry_policy_option = DataLimitedErrorCountRetryPolicy(0).clone();
746+
auto backoff_policy_option =
747+
google::cloud::internal::ExponentialBackoffPolicy(ms(0), ms(0), 2.0)
748+
.clone();
749+
auto query_refresh_option =
750+
bigtable::experimental::QueryPlanRefreshLimitedErrorCountRetryPolicy(0)
751+
.clone();
752+
auto opts =
753+
Options{}
754+
.set<DataRetryPolicyOption>(std::move(retry_policy_option))
755+
.set<DataBackoffPolicyOption>(std::move(backoff_policy_option))
756+
.set<bigtable::experimental::QueryPlanRefreshRetryPolicyOption>(
757+
std::move(query_refresh_option));
758+
auto connection = google::cloud::bigtable::MakeDataConnection(opts);
759+
auto table =
760+
Table(connection, TableResource(project_id(), instance_id(), table_id));
761+
std::string const row_key = "row-key-for-history-test";
762+
std::string const family = kFamily4;
763+
std::string const column = "c1";
764+
std::string const value_old = "v1_old";
765+
std::string const value_new = "v2_new";
766+
767+
// Get times in microseconds
768+
auto now_sys = std::chrono::system_clock::now();
769+
auto current_time = std::chrono::duration_cast<std::chrono::microseconds>(
770+
now_sys.time_since_epoch())
771+
.count();
772+
auto old_time = current_time - 5000000; // 5 seconds older
773+
774+
// Apply mutations with specific timestamps
775+
SingleRowMutation mutation(row_key);
776+
mutation.emplace_back(
777+
SetCell(family, column,
778+
duration_cast<milliseconds>(std::chrono::microseconds(old_time)),
779+
value_old));
780+
mutation.emplace_back(SetCell(
781+
family, column,
782+
duration_cast<milliseconds>(std::chrono::microseconds(current_time)),
783+
value_new));
784+
auto apply_status = table.Apply(std::move(mutation));
785+
ASSERT_TRUE(apply_status.ok()) << apply_status.message();
786+
787+
// Execute query using WITH_HISTORY
788+
auto client = Client(connection, opts);
789+
std::vector<std::string> full_table_path =
790+
absl::StrSplit(table.table_name(), '/');
791+
auto table_name = full_table_path.back();
792+
std::string quoted_table_name = "`" + table_name + "`";
793+
Project project(project_id());
794+
InstanceResource instance_resource(project, instance_id());
795+
std::string query_string = absl::StrFormat(
796+
R"sql(SELECT CAST(family4['c1'] AS ARRAY<STRUCT<timestamp TIMESTAMP, value STRING>>) AS c1_history
797+
FROM %s(WITH_HISTORY => TRUE)
798+
WHERE _key = '%s')sql",
799+
quoted_table_name, row_key);
800+
auto prepared_query =
801+
client.PrepareQuery(instance_resource, SqlStatement(query_string));
802+
ASSERT_TRUE(prepared_query.ok()) << prepared_query.status().message();
803+
804+
auto bound_query = (*prepared_query).BindParameters({});
805+
auto row_stream = client.ExecuteQuery(std::move(bound_query));
806+
std::vector<StatusOr<QueryRow>> rows;
807+
for (auto const& row : std::move(row_stream)) {
808+
rows.push_back(row);
809+
}
810+
ASSERT_EQ(rows.size(), 1);
811+
ASSERT_TRUE(rows[0].ok()) << rows[0].status().message();
812+
auto const& row = *rows[0];
813+
ASSERT_EQ(row.columns().size(), 1);
814+
EXPECT_EQ(row.columns().at(0), "c1_history");
815+
816+
auto value_hist = row.get("c1_history");
817+
ASSERT_TRUE(value_hist.ok()) << value_hist.status().message();
818+
Value const& bigtable_val = *value_hist;
819+
using HistoryEntry = std::tuple<Timestamp, std::string>;
820+
auto history_array = bigtable_val.get<std::vector<HistoryEntry>>();
821+
ASSERT_TRUE(history_array.ok()) << history_array.status().message();
822+
ASSERT_EQ(history_array->size(), 2);
823+
824+
// Verify cells returned ordered from newest to oldest.
825+
auto const& entry0 = (*history_array)[0];
826+
auto ts_new = std::get<0>(entry0).get<sys_time<std::chrono::microseconds>>();
827+
ASSERT_STATUS_OK(ts_new);
828+
auto expected_current_time_ms =
829+
duration_cast<milliseconds>(std::chrono::microseconds(current_time));
830+
EXPECT_EQ(duration_cast<milliseconds>(ts_new->time_since_epoch()),
831+
expected_current_time_ms);
832+
EXPECT_EQ(std::get<1>(entry0), value_new);
833+
auto const& entry1 = (*history_array)[1];
834+
auto ts_old = std::get<0>(entry1).get<sys_time<std::chrono::microseconds>>();
835+
ASSERT_STATUS_OK(ts_old);
836+
auto expected_old_time_ms =
837+
duration_cast<milliseconds>(std::chrono::microseconds(old_time));
838+
EXPECT_EQ(duration_cast<milliseconds>(ts_old->time_since_epoch()),
839+
expected_old_time_ms);
840+
EXPECT_EQ(std::get<1>(entry1), value_old);
841+
}
842+
676843
// TODO(#8800) - remove after deprecation is complete
677844
#include "google/cloud/internal/diagnostics_pop.inc"
678845

0 commit comments

Comments
 (0)