Skip to content
This repository was archived by the owner on May 9, 2024. It is now read-only.

Commit 3a634b0

Browse files
committed
Support nested dictionaries in StringDictionary::copyStrings.
Signed-off-by: ienkovich <[email protected]>
1 parent d260021 commit 3a634b0

File tree

4 files changed

+119
-29
lines changed

4 files changed

+119
-29
lines changed

omniscidb/StringDictionary/StringDictionary.cpp

Lines changed: 52 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1005,52 +1005,77 @@ std::vector<int32_t> StringDictionary::getRegexpLike(const std::string& pattern,
10051005
return result;
10061006
}
10071007

1008-
std::vector<std::string> StringDictionary::copyStrings() const {
1009-
CHECK(!base_dict_) << "Not implemented";
1010-
mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
1008+
std::vector<std::string> StringDictionary::copyStrings(int64_t generation) const {
1009+
generation = generation >= 0 ? std::min(generation, static_cast<int64_t>(entryCount()))
1010+
: static_cast<int64_t>(entryCount());
1011+
if (!strings_cache_) {
1012+
strings_cache_ = std::make_shared<std::vector<std::string>>();
1013+
strings_cache_->reserve(entryCount());
1014+
copyStrings(0, entryCount(), *strings_cache_);
1015+
} else if (strings_cache_->size() < static_cast<size_t>(generation)) {
1016+
auto start = strings_cache_->size();
1017+
strings_cache_->reserve(entryCount());
1018+
copyStrings(start, entryCount(), *strings_cache_);
1019+
}
10111020

1012-
if (strings_cache_) {
1013-
return *strings_cache_;
1021+
return std::vector<std::string>(strings_cache_->begin(),
1022+
strings_cache_->begin() + generation);
1023+
}
1024+
1025+
void StringDictionary::copyStrings(int64_t string_id_start,
1026+
int64_t string_id_end,
1027+
std::vector<std::string>& out_vec) const {
1028+
CHECK_GE(string_id_start, 0);
1029+
CHECK_LE(string_id_end, static_cast<int64_t>(entryCount()));
1030+
1031+
if (base_dict_ && string_id_start < base_generation_) {
1032+
base_dict_->copyStrings(
1033+
string_id_start, std::min(base_generation_, string_id_end), out_vec);
10141034
}
10151035

1016-
strings_cache_ = std::make_shared<std::vector<std::string>>();
1017-
strings_cache_->reserve(str_count_);
1018-
const bool multithreaded = str_count_ > 10000;
1019-
const auto worker_count =
1020-
multithreaded ? static_cast<size_t>(cpu_threads()) : size_t(1);
1021-
CHECK_GT(worker_count, 0UL);
1022-
std::vector<std::vector<std::string>> worker_results(worker_count);
1036+
int64_t local_string_id_start = std::max(string_id_start, base_generation_);
1037+
int64_t local_string_id_end = string_id_end;
1038+
if (local_string_id_start >= local_string_id_end) {
1039+
return;
1040+
}
1041+
1042+
mapd_lock_guard<mapd_shared_mutex> write_lock(rw_mutex_);
1043+
const bool multithreaded = (local_string_id_end - local_string_id_start) > 10000;
10231044
auto copy = [this](std::vector<std::string>& str_list,
1024-
const size_t start_id,
1025-
const size_t end_id) {
1045+
const int64_t start_id,
1046+
const int64_t end_id) {
10261047
CHECK_LE(start_id, end_id);
10271048
str_list.reserve(end_id - start_id);
1028-
for (size_t string_id = start_id; string_id < end_id; ++string_id) {
1049+
for (int64_t string_id = start_id; string_id < end_id; ++string_id) {
10291050
str_list.push_back(getStringUnlocked(string_id));
10301051
}
10311052
};
10321053
if (multithreaded) {
1054+
const auto worker_count = cpu_threads();
1055+
CHECK_GT(worker_count, 0);
1056+
std::vector<std::vector<std::string>> worker_results(worker_count);
10331057
std::vector<std::future<void>> workers;
1034-
const auto stride = (str_count_ + (worker_count - 1)) / worker_count;
1035-
for (size_t worker_idx = 0, start = 0, end = std::min(start + stride, str_count_);
1036-
worker_idx < worker_count && start < str_count_;
1037-
++worker_idx, start += stride, end = std::min(start + stride, str_count_)) {
1058+
const auto stride =
1059+
(local_string_id_end - local_string_id_start + (worker_count - 1)) / worker_count;
1060+
for (int64_t worker_idx = 0,
1061+
start = local_string_id_start,
1062+
end = std::min(start + stride, local_string_id_end);
1063+
worker_idx < worker_count && start < local_string_id_end;
1064+
++worker_idx,
1065+
start += stride,
1066+
end = std::min(start + stride, local_string_id_end)) {
10381067
workers.push_back(std::async(
10391068
std::launch::async, copy, std::ref(worker_results[worker_idx]), start, end));
10401069
}
10411070
for (auto& worker : workers) {
10421071
worker.get();
10431072
}
1073+
for (const auto& worker_result : worker_results) {
1074+
out_vec.insert(out_vec.end(), worker_result.begin(), worker_result.end());
1075+
}
10441076
} else {
1045-
CHECK_EQ(worker_results.size(), size_t(1));
1046-
copy(worker_results[0], 0, str_count_);
1047-
}
1048-
1049-
for (const auto& worker_result : worker_results) {
1050-
strings_cache_->insert(
1051-
strings_cache_->end(), worker_result.begin(), worker_result.end());
1077+
copy(out_vec, local_string_id_start, local_string_id_end);
10521078
}
1053-
return *strings_cache_;
10541079
}
10551080

10561081
bool StringDictionary::fillRateIsHigh(const size_t num_strings) const noexcept {

omniscidb/StringDictionary/StringDictionary.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ class StringDictionary {
136136
const char escape,
137137
const size_t generation) const;
138138

139-
std::vector<std::string> copyStrings() const;
139+
std::vector<std::string> copyStrings(int64_t generation = -1) const;
140140

141141
static constexpr int32_t INVALID_STR_ID = -1;
142142
static constexpr size_t MAX_STRLEN = (1 << 15) - 1;
@@ -187,6 +187,9 @@ class StringDictionary {
187187
std::pair<char*, size_t> getOwnedStringBytesChecked(const int string_id) const noexcept;
188188
template <class T, class String>
189189
void getOrAddBulkParallel(const std::vector<String>& string_vec, T* encoded_vec);
190+
void copyStrings(int64_t string_id_start,
191+
int64_t string_id_end,
192+
std::vector<std::string>& out_vec) const;
190193
template <class String>
191194
uint32_t computeBucket(
192195
const uint32_t hash,

omniscidb/StringDictionary/StringDictionaryProxy.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -645,7 +645,7 @@ bool StringDictionaryProxy::operator!=(StringDictionaryProxy const& rhs) const {
645645
}
646646

647647
std::vector<std::string> StringDictionaryProxy::copyStrings() const {
648-
auto res = string_dict_->copyStrings();
648+
auto res = string_dict_->copyStrings(generation_);
649649
res.reserve(entryCount());
650650
for (auto str_ptr : transient_string_vec_) {
651651
res.emplace_back(*str_ptr);

omniscidb/Tests/StringDictionaryTest.cpp

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -679,6 +679,68 @@ TEST(NestedStringDictionary, EachStringSerially) {
679679
}
680680
}
681681

682+
TEST(NestedStringDictionary, CopyStrings) {
683+
auto dict1 =
684+
std::make_shared<StringDictionary>(DictRef{-1, 1}, -1, g_cache_string_hash);
685+
ASSERT_EQ(dict1->getOrAdd("str1"), 0);
686+
ASSERT_EQ(dict1->getOrAdd("str2"), 1);
687+
ASSERT_EQ(dict1->getOrAdd("str3"), 2);
688+
689+
ASSERT_EQ(dict1->copyStrings(), std::vector<std::string>({"str1"s, "str2"s, "str3"s}));
690+
ASSERT_EQ(dict1->copyStrings(10),
691+
std::vector<std::string>({"str1"s, "str2"s, "str3"s}));
692+
ASSERT_EQ(dict1->copyStrings(2), std::vector<std::string>({"str1"s, "str2"s}));
693+
694+
auto dict2 = std::make_shared<StringDictionary>(dict1, -1, g_cache_string_hash);
695+
ASSERT_EQ(dict1->getOrAdd("str4"), 3);
696+
ASSERT_EQ(dict2->getOrAdd("str5"), 3);
697+
ASSERT_EQ(dict2->getOrAdd("str6"), 4);
698+
699+
ASSERT_EQ(dict1->copyStrings(),
700+
std::vector<std::string>({"str1"s, "str2"s, "str3"s, "str4"s}));
701+
ASSERT_EQ(dict1->copyStrings(3), std::vector<std::string>({"str1"s, "str2"s, "str3"s}));
702+
703+
ASSERT_EQ(dict2->copyStrings(),
704+
std::vector<std::string>({"str1"s, "str2"s, "str3"s, "str5"s, "str6"s}));
705+
ASSERT_EQ(dict2->copyStrings(10),
706+
std::vector<std::string>({"str1"s, "str2"s, "str3"s, "str5"s, "str6"s}));
707+
ASSERT_EQ(dict2->copyStrings(4),
708+
std::vector<std::string>({"str1"s, "str2"s, "str3"s, "str5"s}));
709+
ASSERT_EQ(dict2->copyStrings(2), std::vector<std::string>({"str1"s, "str2"s}));
710+
711+
ASSERT_EQ(dict2->getOrAdd("str7"), 5);
712+
713+
ASSERT_EQ(
714+
dict2->copyStrings(),
715+
std::vector<std::string>({"str1"s, "str2"s, "str3"s, "str5"s, "str6"s, "str7"s}));
716+
}
717+
718+
TEST(NestedStringDictionary, CopyStringsParallel) {
719+
constexpr int STR_COUNT = 20'000;
720+
721+
std::vector<std::string> strings1;
722+
std::vector<std::string> strings2;
723+
for (int i = 0; i < STR_COUNT; ++i) {
724+
strings1.push_back(std::to_string(i));
725+
strings2.push_back(std::to_string(STR_COUNT + i));
726+
}
727+
strings2.insert(strings2.begin(), strings1.begin(), strings1.end());
728+
729+
auto dict1 =
730+
std::make_shared<StringDictionary>(DictRef{-1, 1}, -1, g_cache_string_hash);
731+
dict1->getOrAddBulk(strings1);
732+
733+
auto dict2 = std::make_shared<StringDictionary>(dict1, -1, g_cache_string_hash);
734+
dict2->getOrAddBulk(strings2);
735+
736+
ASSERT_EQ(dict1->copyStrings(), strings1);
737+
ASSERT_EQ(dict2->copyStrings(), strings2);
738+
739+
dict1->getOrAddBulk(strings2);
740+
741+
ASSERT_EQ(dict1->copyStrings(), strings2);
742+
}
743+
682744
TEST(StringDictionaryProxy, BuildIntersectionTranslationMapToOtherProxy) {
683745
// Use existing dictionary from GetBulk
684746
const DictRef dict_ref1(-1, 1);

0 commit comments

Comments
 (0)