Skip to content

Commit a3f8ffb

Browse files
test(bigtable): add IT for fetching a column family (#15728)
* test(bigtable): add IT for fetching a column family * chore: bring latest changes of IT * chore: add latest version of tests * chore: lint * chore: use unordered_map * chore: duplicated names * chore: duplicate name ii * chore: checkers * chore: expect family column name * chore: adjust tests * chore: adjust test * chore: add debug statement * chore: final test adjustments * chore: use Bytes key * chore: remove temporary table change * chore: fix expected value * chore: use RowStream
1 parent 361f9b8 commit a3f8ffb

File tree

1 file changed

+206
-0
lines changed

1 file changed

+206
-0
lines changed

google/cloud/bigtable/tests/data_integration_test.cc

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -610,6 +610,212 @@ TEST_P(DataIntegrationTest, TableApplyWithLogging) {
610610
EXPECT_THAT(log.ExtractLines(), Not(Contains(HasSubstr("MutateRow"))));
611611
}
612612

613+
TEST_P(DataIntegrationTest, ClientQueryColumnFamily) {
614+
if (UsingCloudBigtableEmulator()) GTEST_SKIP();
615+
auto const table_id = testing::TableTestEnvironment::table_id();
616+
auto retry_policy_option = DataLimitedErrorCountRetryPolicy(0).clone();
617+
auto backoff_policy_option =
618+
google::cloud::internal::ExponentialBackoffPolicy(ms(0), ms(0), 2.0)
619+
.clone();
620+
auto query_refresh_option =
621+
bigtable::experimental::QueryPlanRefreshLimitedErrorCountRetryPolicy(0)
622+
.clone();
623+
auto opts =
624+
Options{}
625+
.set<DataRetryPolicyOption>(std::move(retry_policy_option))
626+
.set<DataBackoffPolicyOption>(std::move(backoff_policy_option))
627+
.set<bigtable::experimental::QueryPlanRefreshRetryPolicyOption>(
628+
std::move(query_refresh_option));
629+
auto connection = google::cloud::bigtable::MakeDataConnection(opts);
630+
auto table =
631+
Table(connection, TableResource(project_id(), instance_id(), table_id));
632+
std::string const row_key = "row-key-for-client-query-test";
633+
std::string const family = kFamily4;
634+
std::string const column1 = "c1";
635+
std::string const column2 = "c2";
636+
std::string const value1 = "v1";
637+
std::string const value2 = "v2";
638+
639+
std::vector<Cell> created{
640+
{row_key, family, column1, 0, value1},
641+
{row_key, family, column2, 0, value2},
642+
};
643+
BulkApply(table, created);
644+
auto client = Client(connection, opts);
645+
std::vector<std::string> full_table_path =
646+
absl::StrSplit(table.table_name(), '/');
647+
auto table_name = full_table_path.back();
648+
std::string quoted_table_name = "`" + table_name + "`";
649+
Project project(project_id());
650+
InstanceResource instance_resource(project, instance_id());
651+
652+
auto prepared_query = client.PrepareQuery(
653+
instance_resource,
654+
SqlStatement("SELECT family4 FROM " + quoted_table_name +
655+
" WHERE _key = '" + row_key + "'"));
656+
657+
ASSERT_STATUS_OK(prepared_query);
658+
659+
auto bound_query = prepared_query->BindParameters({});
660+
auto row_stream = client.ExecuteQuery(std::move(bound_query));
661+
662+
std::vector<StatusOr<bigtable::QueryRow>> rows;
663+
for (auto& row : row_stream) {
664+
rows.push_back(std::move(row));
665+
}
666+
667+
ASSERT_EQ(rows.size(), 1);
668+
ASSERT_STATUS_OK(rows[0]);
669+
auto const& row1 = *rows[0];
670+
ASSERT_EQ(row1.columns().size(), 1);
671+
ASSERT_EQ(row1.columns().at(0), family);
672+
ASSERT_EQ(row1.values().at(0), Value(std::unordered_map<Bytes, Bytes>{
673+
{Bytes(column1), Bytes(value1)},
674+
{Bytes(column2), Bytes(value2)}}));
675+
}
676+
677+
TEST_P(DataIntegrationTest, ClientQueryColumnFamilyWithHistory) {
678+
if (UsingCloudBigtableEmulator()) GTEST_SKIP();
679+
auto const table_id = testing::TableTestEnvironment::table_id();
680+
auto retry_policy_option = DataLimitedErrorCountRetryPolicy(0).clone();
681+
auto backoff_policy_option =
682+
google::cloud::internal::ExponentialBackoffPolicy(ms(0), ms(0), 2.0)
683+
.clone();
684+
auto query_refresh_option =
685+
bigtable::experimental::QueryPlanRefreshLimitedErrorCountRetryPolicy(0)
686+
.clone();
687+
auto opts =
688+
Options{}
689+
.set<DataRetryPolicyOption>(std::move(retry_policy_option))
690+
.set<DataBackoffPolicyOption>(std::move(backoff_policy_option))
691+
.set<bigtable::experimental::QueryPlanRefreshRetryPolicyOption>(
692+
std::move(query_refresh_option));
693+
auto connection = google::cloud::bigtable::MakeDataConnection(opts);
694+
auto table =
695+
Table(connection, TableResource(project_id(), instance_id(), table_id));
696+
std::string const row_key = "row-key-for-history-test";
697+
std::string const family = kFamily4;
698+
std::string const column1 = "c1";
699+
std::string const column2 = "c2";
700+
std::string const column_1_value_old = "c1_v1_old";
701+
std::string const column_1_value_new = "c1_v2_new";
702+
std::string const column_2_value_old = "c2_v1_old";
703+
std::string const column_2_value_new = "c2_v2_new";
704+
705+
// Get times in microseconds
706+
auto now_sys = std::chrono::system_clock::now();
707+
auto current_time = std::chrono::duration_cast<std::chrono::microseconds>(
708+
now_sys.time_since_epoch())
709+
.count();
710+
auto old_time = current_time - 5000000; // 5 seconds older
711+
712+
// Apply mutations with specific timestamps
713+
SingleRowMutation mutation(row_key);
714+
mutation.emplace_back(
715+
SetCell(family, column1,
716+
duration_cast<milliseconds>(std::chrono::microseconds(old_time)),
717+
column_1_value_old));
718+
mutation.emplace_back(SetCell(
719+
family, column1,
720+
duration_cast<milliseconds>(std::chrono::microseconds(current_time)),
721+
column_1_value_new));
722+
mutation.emplace_back(
723+
SetCell(family, column2,
724+
duration_cast<milliseconds>(std::chrono::microseconds(old_time)),
725+
column_2_value_old));
726+
mutation.emplace_back(SetCell(
727+
family, column2,
728+
duration_cast<milliseconds>(std::chrono::microseconds(current_time)),
729+
column_2_value_new));
730+
auto apply_status = table.Apply(std::move(mutation));
731+
ASSERT_TRUE(apply_status.ok()) << apply_status.message();
732+
733+
// Execute query using WITH_HISTORY
734+
auto client = Client(connection, opts);
735+
std::vector<std::string> full_table_path =
736+
absl::StrSplit(table.table_name(), '/');
737+
auto table_name = full_table_path.back();
738+
std::string quoted_table_name = "`" + table_name + "`";
739+
Project project(project_id());
740+
InstanceResource instance_resource(project, instance_id());
741+
std::string query_string = absl::StrFormat(
742+
R"sql(SELECT family4 AS family4_history
743+
FROM %s(WITH_HISTORY => TRUE)
744+
WHERE _key = '%s')sql",
745+
quoted_table_name, row_key);
746+
auto prepared_query =
747+
client.PrepareQuery(instance_resource, SqlStatement(query_string));
748+
ASSERT_TRUE(prepared_query.ok()) << prepared_query.status().message();
749+
750+
auto bound_query = (*prepared_query).BindParameters({});
751+
RowStream row_stream = client.ExecuteQuery(std::move(bound_query));
752+
using HistoryEntry = std::tuple<std::pair<std::string, Timestamp>,
753+
std::pair<std::string, Bytes>>;
754+
using RowType = std::unordered_map<Bytes, std::vector<HistoryEntry>>;
755+
std::vector<StatusOr<RowType>> rows;
756+
for (auto& row : StreamOf<std::tuple<RowType>>(row_stream)) {
757+
ASSERT_STATUS_OK(row);
758+
rows.emplace_back(std::move(std::get<0>(*row)));
759+
}
760+
ASSERT_EQ(rows.size(), 1);
761+
ASSERT_TRUE(rows[0].ok()) << rows[0].status().message();
762+
auto const& value_hist = rows[0];
763+
764+
ASSERT_TRUE(value_hist.ok()) << value_hist.status().message();
765+
Value const& bigtable_val = Value(value_hist.value());
766+
auto history_map = bigtable_val.get<RowType>();
767+
ASSERT_TRUE(history_map.ok()) << history_map.status().message();
768+
ASSERT_EQ(history_map->size(), 2);
769+
770+
// Verify the new version of "c1"
771+
auto c1_entry0 = (*history_map)[Bytes(column1)][0];
772+
auto c1_ts_new =
773+
std::get<0>(c1_entry0).second.get<sys_time<std::chrono::microseconds>>();
774+
ASSERT_STATUS_OK(c1_ts_new);
775+
auto c1_expected_current_time_ms =
776+
duration_cast<milliseconds>(std::chrono::microseconds(current_time));
777+
EXPECT_EQ(duration_cast<milliseconds>(c1_ts_new->time_since_epoch()),
778+
c1_expected_current_time_ms);
779+
EXPECT_EQ(std::get<1>(c1_entry0).second.get<std::string>(),
780+
column_1_value_new);
781+
782+
// Verify the old version of "c1"
783+
auto c1_entry1 = (*history_map)[Bytes(column1)][1];
784+
auto c1_ts_old =
785+
std::get<0>(c1_entry1).second.get<sys_time<std::chrono::microseconds>>();
786+
ASSERT_STATUS_OK(c1_ts_old);
787+
auto c1_expected_old_time_ms =
788+
duration_cast<milliseconds>(std::chrono::microseconds(old_time));
789+
EXPECT_EQ(duration_cast<milliseconds>(c1_ts_old->time_since_epoch()),
790+
c1_expected_old_time_ms);
791+
EXPECT_EQ(std::get<1>(c1_entry1).second.get<std::string>(),
792+
column_1_value_old);
793+
794+
// Verify the new version of "c2"
795+
auto c2_entry0 = (*history_map)[Bytes(column2)][0];
796+
auto c2_ts_new =
797+
std::get<0>(c2_entry0).second.get<sys_time<std::chrono::microseconds>>();
798+
ASSERT_STATUS_OK(c2_ts_new);
799+
auto c2_expected_current_time_ms =
800+
duration_cast<milliseconds>(std::chrono::microseconds(current_time));
801+
EXPECT_EQ(duration_cast<milliseconds>(c2_ts_new->time_since_epoch()),
802+
c2_expected_current_time_ms);
803+
EXPECT_EQ(std::get<1>(c2_entry0).second.get<std::string>(),
804+
column_2_value_new);
805+
806+
// Verify the old version of "c2"
807+
auto c2_entry1 = (*history_map)[Bytes(column2)][1];
808+
auto c2_ts_old =
809+
std::get<0>(c2_entry1).second.get<sys_time<std::chrono::microseconds>>();
810+
ASSERT_STATUS_OK(c2_ts_old);
811+
auto c2_expected_old_time_ms =
812+
duration_cast<milliseconds>(std::chrono::microseconds(old_time));
813+
EXPECT_EQ(duration_cast<milliseconds>(c2_ts_old->time_since_epoch()),
814+
c2_expected_old_time_ms);
815+
EXPECT_EQ(std::get<1>(c2_entry1).second.get<std::string>(),
816+
column_2_value_old);
817+
}
818+
613819
// TODO(#8800) - remove after deprecation is complete
614820
#include "google/cloud/internal/disable_deprecation_warnings.inc"
615821

0 commit comments

Comments
 (0)