@@ -50,17 +50,6 @@ namespace {
50
50
51
51
const int SYSTEM_PAGE_SIZE = omnisci::get_page_size();
52
52
53
- int checked_open (const char * path, const bool recover) {
54
- auto fd = omnisci::open (path, O_RDWR | O_CREAT | (recover ? O_APPEND : O_TRUNC), 0644 );
55
- if (fd > 0 ) {
56
- return fd;
57
- }
58
- auto err = std::string (" Dictionary path " ) + std::string (path) +
59
- std::string (" does not exist." );
60
- LOG (ERROR) << err;
61
- throw DictPayloadUnavailable (err);
62
- }
63
-
64
53
const uint64_t round_up_p2 (const uint64_t num) {
65
54
uint64_t in = num;
66
55
in--;
@@ -111,124 +100,21 @@ constexpr size_t StringDictionary::MAX_STRLEN;
111
100
constexpr size_t StringDictionary::MAX_STRCOUNT;
112
101
113
102
StringDictionary::StringDictionary (const DictRef& dict_ref,
114
- const std::string& folder,
115
- const bool isTemp,
116
- const bool recover,
117
103
const bool materializeHashes,
118
104
size_t initial_capacity)
119
105
: dict_ref_(dict_ref)
120
- , folder_(folder)
121
106
, str_count_(0 )
122
107
, string_id_string_dict_hash_table_(initial_capacity, INVALID_STR_ID)
123
108
, hash_cache_(initial_capacity)
124
- , isTemp_(isTemp)
125
109
, materialize_hashes_(materializeHashes)
126
- , payload_fd_(-1 )
127
- , offset_fd_(-1 )
128
110
, offset_map_(nullptr )
129
111
, payload_map_(nullptr )
130
112
, offset_file_size_(0 )
131
113
, payload_file_size_(0 )
132
114
, payload_file_off_(0 )
133
115
, strings_cache_(nullptr ) {
134
- if (!isTemp && folder.empty ()) {
135
- return ;
136
- }
137
-
138
116
// initial capacity must be a power of two for efficient bucket computation
139
117
CHECK_EQ (size_t (0 ), (initial_capacity & (initial_capacity - 1 )));
140
- if (!isTemp_) {
141
- boost::filesystem::path storage_path (folder);
142
- offsets_path_ = (storage_path / boost::filesystem::path (" DictOffsets" )).string ();
143
- const auto payload_path =
144
- (storage_path / boost::filesystem::path (" DictPayload" )).string ();
145
- payload_fd_ = checked_open (payload_path.c_str (), recover);
146
- offset_fd_ = checked_open (offsets_path_.c_str (), recover);
147
- payload_file_size_ = omnisci::file_size (payload_fd_);
148
- offset_file_size_ = omnisci::file_size (offset_fd_);
149
- }
150
- bool storage_is_empty = false ;
151
- if (payload_file_size_ == 0 ) {
152
- storage_is_empty = true ;
153
- addPayloadCapacity ();
154
- }
155
- if (offset_file_size_ == 0 ) {
156
- addOffsetCapacity ();
157
- }
158
- if (!isTemp_) { // we never mmap or recover temp dictionaries
159
- payload_map_ =
160
- reinterpret_cast <char *>(omnisci::checked_mmap (payload_fd_, payload_file_size_));
161
- offset_map_ = reinterpret_cast <StringIdxEntry*>(
162
- omnisci::checked_mmap (offset_fd_, offset_file_size_));
163
- if (recover) {
164
- const size_t bytes = omnisci::file_size (offset_fd_);
165
- if (bytes % sizeof (StringIdxEntry) != 0 ) {
166
- LOG (WARNING) << " Offsets " << offsets_path_ << " file is truncated" ;
167
- }
168
- const uint64_t str_count =
169
- storage_is_empty ? 0 : getNumStringsFromStorage (bytes / sizeof (StringIdxEntry));
170
- collisions_ = 0 ;
171
- // at this point we know the size of the StringDict we need to load
172
- // so lets reallocate the vector to the correct size
173
- const uint64_t max_entries =
174
- std::max (round_up_p2 (str_count * 2 + 1 ),
175
- round_up_p2 (std::max (initial_capacity, static_cast <size_t >(1 ))));
176
- std::vector<int32_t > new_str_ids (max_entries, INVALID_STR_ID);
177
- string_id_string_dict_hash_table_.swap (new_str_ids);
178
- if (materialize_hashes_) {
179
- std::vector<string_dict_hash_t > new_hash_cache (max_entries / 2 );
180
- hash_cache_.swap (new_hash_cache);
181
- }
182
- // Bail early if we know we don't have strings to add (i.e. a new or empty
183
- // dictionary)
184
- if (str_count == 0 ) {
185
- return ;
186
- }
187
-
188
- unsigned string_id = 0 ;
189
- mapd_lock_guard<mapd_shared_mutex> write_lock (rw_mutex_);
190
-
191
- uint32_t thread_inits = 0 ;
192
- const auto thread_count = std::thread::hardware_concurrency ();
193
- const uint32_t items_per_thread = std::max<uint32_t >(
194
- 2000 , std::min<uint32_t >(200000 , (str_count / thread_count) + 1 ));
195
- std::vector<std::future<std::vector<std::pair<string_dict_hash_t , unsigned int >>>>
196
- dictionary_futures;
197
- for (string_id = 0 ; string_id < str_count; string_id += items_per_thread) {
198
- dictionary_futures.emplace_back (std::async (
199
- std::launch::async, [string_id, str_count, items_per_thread, this ] {
200
- std::vector<std::pair<string_dict_hash_t , unsigned int >> hashVec;
201
- for (uint32_t curr_id = string_id;
202
- curr_id < string_id + items_per_thread && curr_id < str_count;
203
- curr_id++) {
204
- const auto recovered = getStringFromStorage (curr_id);
205
- if (recovered.canary ) {
206
- // hit the canary, recovery finished
207
- break ;
208
- } else {
209
- std::string_view temp (recovered.c_str_ptr , recovered.size );
210
- hashVec.emplace_back (std::make_pair (hash_string (temp), temp.size ()));
211
- }
212
- }
213
- return hashVec;
214
- }));
215
- thread_inits++;
216
- if (thread_inits % thread_count == 0 ) {
217
- processDictionaryFutures (dictionary_futures);
218
- }
219
- }
220
- // gather last few threads
221
- if (dictionary_futures.size () != 0 ) {
222
- processDictionaryFutures (dictionary_futures);
223
- }
224
- VLOG (1 ) << " Opened string dictionary " << folder << " # Strings: " << str_count_
225
- << " Hash table size: " << string_id_string_dict_hash_table_.size ()
226
- << " Fill rate: "
227
- << static_cast <double >(str_count_) * 100.0 /
228
- string_id_string_dict_hash_table_.size ()
229
- << " % Collisions: " << collisions_;
230
- }
231
- }
232
118
}
233
119
234
120
namespace {
@@ -319,19 +205,9 @@ size_t StringDictionary::getNumStringsFromStorage(
319
205
StringDictionary::~StringDictionary () noexcept {
320
206
free (CANARY_BUFFER);
321
207
if (payload_map_) {
322
- if (!isTemp_) {
323
- CHECK (offset_map_);
324
- omnisci::checked_munmap (payload_map_, payload_file_size_);
325
- omnisci::checked_munmap (offset_map_, offset_file_size_);
326
- CHECK_GE (payload_fd_, 0 );
327
- omnisci::close (payload_fd_);
328
- CHECK_GE (offset_fd_, 0 );
329
- omnisci::close (offset_fd_);
330
- } else {
331
- CHECK (offset_map_);
332
- free (payload_map_);
333
- free (offset_map_);
334
- }
208
+ CHECK (offset_map_);
209
+ free (payload_map_);
210
+ free (offset_map_);
335
211
}
336
212
}
337
213
@@ -1318,17 +1194,8 @@ void StringDictionary::checkAndConditionallyIncreasePayloadCapacity(
1318
1194
if (payload_file_off_ + write_length > payload_file_size_) {
1319
1195
const size_t min_capacity_needed =
1320
1196
write_length - (payload_file_size_ - payload_file_off_);
1321
- if (!isTemp_) {
1322
- CHECK_GE (payload_fd_, 0 );
1323
- omnisci::checked_munmap (payload_map_, payload_file_size_);
1324
- addPayloadCapacity (min_capacity_needed);
1325
- CHECK (payload_file_off_ + write_length <= payload_file_size_);
1326
- payload_map_ =
1327
- reinterpret_cast <char *>(omnisci::checked_mmap (payload_fd_, payload_file_size_));
1328
- } else {
1329
- addPayloadCapacity (min_capacity_needed);
1330
- CHECK (payload_file_off_ + write_length <= payload_file_size_);
1331
- }
1197
+ addPayloadCapacity (min_capacity_needed);
1198
+ CHECK (payload_file_off_ + write_length <= payload_file_size_);
1332
1199
}
1333
1200
}
1334
1201
@@ -1338,17 +1205,8 @@ void StringDictionary::checkAndConditionallyIncreaseOffsetCapacity(
1338
1205
if (offset_file_off + write_length >= offset_file_size_) {
1339
1206
const size_t min_capacity_needed =
1340
1207
write_length - (offset_file_size_ - offset_file_off);
1341
- if (!isTemp_) {
1342
- CHECK_GE (offset_fd_, 0 );
1343
- omnisci::checked_munmap (offset_map_, offset_file_size_);
1344
- addOffsetCapacity (min_capacity_needed);
1345
- CHECK (offset_file_off + write_length <= offset_file_size_);
1346
- offset_map_ = reinterpret_cast <StringIdxEntry*>(
1347
- omnisci::checked_mmap (offset_fd_, offset_file_size_));
1348
- } else {
1349
- addOffsetCapacity (min_capacity_needed);
1350
- CHECK (offset_file_off + write_length <= offset_file_size_);
1351
- }
1208
+ addOffsetCapacity (min_capacity_needed);
1209
+ CHECK (offset_file_off + write_length <= offset_file_size_);
1352
1210
}
1353
1211
}
1354
1212
@@ -1395,10 +1253,6 @@ std::string_view StringDictionary::getStringFromStorageFast(
1395
1253
1396
1254
StringDictionary::PayloadString StringDictionary::getStringFromStorage (
1397
1255
const int string_id) const noexcept {
1398
- if (!isTemp_) {
1399
- CHECK_GE (payload_fd_, 0 );
1400
- CHECK_GE (offset_fd_, 0 );
1401
- }
1402
1256
CHECK_GE (string_id, 0 );
1403
1257
const StringIdxEntry* str_meta = offset_map_ + string_id;
1404
1258
if (str_meta->size == 0xffff ) {
@@ -1409,42 +1263,13 @@ StringDictionary::PayloadString StringDictionary::getStringFromStorage(
1409
1263
}
1410
1264
1411
1265
void StringDictionary::addPayloadCapacity (const size_t min_capacity_requested) noexcept {
1412
- if (!isTemp_) {
1413
- payload_file_size_ += addStorageCapacity (payload_fd_, min_capacity_requested);
1414
- } else {
1415
- payload_map_ = static_cast <char *>(
1416
- addMemoryCapacity (payload_map_, payload_file_size_, min_capacity_requested));
1417
- }
1266
+ payload_map_ = static_cast <char *>(
1267
+ addMemoryCapacity (payload_map_, payload_file_size_, min_capacity_requested));
1418
1268
}
1419
1269
1420
1270
void StringDictionary::addOffsetCapacity (const size_t min_capacity_requested) noexcept {
1421
- if (!isTemp_) {
1422
- offset_file_size_ += addStorageCapacity (offset_fd_, min_capacity_requested);
1423
- } else {
1424
- offset_map_ = static_cast <StringIdxEntry*>(
1425
- addMemoryCapacity (offset_map_, offset_file_size_, min_capacity_requested));
1426
- }
1427
- }
1428
-
1429
- size_t StringDictionary::addStorageCapacity (
1430
- int fd,
1431
- const size_t min_capacity_requested) noexcept {
1432
- const size_t canary_buff_size_to_add =
1433
- std::max (static_cast <size_t >(1024 * SYSTEM_PAGE_SIZE),
1434
- (min_capacity_requested / SYSTEM_PAGE_SIZE + 1 ) * SYSTEM_PAGE_SIZE);
1435
-
1436
- if (canary_buffer_size < canary_buff_size_to_add) {
1437
- CANARY_BUFFER = static_cast <char *>(realloc (CANARY_BUFFER, canary_buff_size_to_add));
1438
- canary_buffer_size = canary_buff_size_to_add;
1439
- CHECK (CANARY_BUFFER);
1440
- memset (CANARY_BUFFER, 0xff , canary_buff_size_to_add);
1441
- }
1442
-
1443
- CHECK_NE (lseek (fd, 0 , SEEK_END), -1 );
1444
- const auto write_return = write (fd, CANARY_BUFFER, canary_buff_size_to_add);
1445
- CHECK (write_return > 0 &&
1446
- (static_cast <size_t >(write_return) == canary_buff_size_to_add));
1447
- return canary_buff_size_to_add;
1271
+ offset_map_ = static_cast <StringIdxEntry*>(
1272
+ addMemoryCapacity (offset_map_, offset_file_size_, min_capacity_requested));
1448
1273
}
1449
1274
1450
1275
void * StringDictionary::addMemoryCapacity (void * addr,
@@ -1481,22 +1306,6 @@ void StringDictionary::invalidateInvertedIndex() noexcept {
1481
1306
compare_cache_.invalidateInvertedIndex ();
1482
1307
}
1483
1308
1484
- // TODO 5 Mar 2021 Nothing will undo the writes to dictionary currently on a failed
1485
- // load. The next write to the dictionary that does checkpoint will make the
1486
- // uncheckpointed data be written to disk. Only option is a table truncate, and thats
1487
- // assuming not replicated dictionary
1488
- bool StringDictionary::checkpoint () noexcept {
1489
- CHECK (!isTemp_);
1490
- bool ret = true ;
1491
- ret = ret &&
1492
- (omnisci::msync ((void *)offset_map_, offset_file_size_, /* async=*/ false ) == 0 );
1493
- ret = ret &&
1494
- (omnisci::msync ((void *)payload_map_, payload_file_size_, /* async=*/ false ) == 0 );
1495
- ret = ret && (omnisci::fsync (offset_fd_) == 0 );
1496
- ret = ret && (omnisci::fsync (payload_fd_) == 0 );
1497
- return ret;
1498
- }
1499
-
1500
1309
void StringDictionary::buildSortedCache () {
1501
1310
// This method is not thread-safe.
1502
1311
const auto cur_cache_size = sorted_cache.size ();
@@ -1703,9 +1512,6 @@ size_t StringDictionary::buildDictionaryTranslationMap(
1703
1512
return 0 ;
1704
1513
}
1705
1514
1706
- // Sort this/source dict and dest dict on folder_ so we can enforce
1707
- // lock ordering and avoid deadlocks
1708
-
1709
1515
const int32_t dest_db_id = dest_dict->getDbId ();
1710
1516
const int32_t dest_dict_id = dest_dict->getDictId ();
1711
1517
if (getDbId () == dest_db_id && getDictId () == dest_dict_id) {
0 commit comments