Skip to content
This repository was archived by the owner on Dec 21, 2023. It is now read-only.

Commit a16d9ad

Browse files
author
Hoyt Koepke
authored
Fixed subtle threading bug in column indexer of ml_data. (#2979)
Fixed subtle threading bug in column indexer of ml_data.
1 parent 3fe09b5 commit a16d9ad

File tree

2 files changed

+54
-32
lines changed

2 files changed

+54
-32
lines changed

src/ml/ml_data/column_statistics.hpp

Lines changed: 53 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,7 @@ class column_statistics {
165165
for(; cv_idx < cat_index_vect.size(); ++cv_idx) {
166166
size_t idx = cat_index_vect[cv_idx] - parallel_threshhold;
167167

168-
if(idx >= global_element_counts.size() || idx >= global_size)
169-
adjust_global_array_size(idx, global_element_counts);
168+
check_global_array_sizes(idx, global_element_counts);
170169

171170
if(cv_idx == 0 || (idx != cat_index_vect[cv_idx - 1]) ) {
172171
std::lock_guard<simple_spinlock> el_lg(global_element_locks[get_lock_index(idx)]);
@@ -310,12 +309,7 @@ class column_statistics {
310309
size_t idx = dict[d_idx].first - parallel_threshhold;
311310
double v = dict[d_idx].second;
312311

313-
if(idx >= global_element_counts.size()
314-
|| idx >= global_mean_var_acc.size()
315-
|| idx >= global_size) {
316-
adjust_global_array_size(idx, global_element_counts);
317-
adjust_global_array_size(idx, global_mean_var_acc);
318-
}
312+
check_global_array_sizes(idx, global_element_counts, global_mean_var_acc);
319313

320314
std::lock_guard<simple_spinlock> el_lg(global_element_locks[get_lock_index(idx)]);
321315

@@ -439,7 +433,9 @@ class column_statistics {
439433
std::vector<size_t> global_element_counts;
440434
std::vector<element_statistics_accumulator> global_mean_var_acc;
441435

442-
size_t global_size = 0;
436+
volatile size_t global_size = 0;
437+
volatile size_t global_array_buffer_size = 0;
438+
std::mutex _array_resize_lock;
443439

444440
/** Return the index of the appropriate lock.
445441
*
@@ -471,37 +467,64 @@ class column_statistics {
471467
v.resize(idx + 1);
472468
}
473469
}
474-
475-
/** Check global array size. Possibly resize it.
470+
471+
/** Check global array size. Possibly resize them.
476472
*/
477-
template <typename T>
478-
void adjust_global_array_size(size_t idx, std::vector<T>& v) {
473+
template <typename... V>
474+
inline void check_global_array_sizes(size_t idx, V&... vv) {
479475

480476
// If needed, increase the value of global_size.
481-
atomic_set_max(global_size, idx + 1);
477+
if(UNLIKELY(idx >= global_size)) {
478+
atomic_set_max(global_size, idx + 1);
479+
}
482480

483-
// See if a resize is needed.
484-
if(UNLIKELY(idx >= v.size() )) {
481+
if(UNLIKELY(idx >= global_array_buffer_size)) {
482+
resize_global_arrays(idx, vv...);
483+
}
484+
}
485485

486-
// Grow aggressively, since a resize is really expensive.
487-
size_t new_size = 2 * (parallel_threshhold + idx + 1);
488486

489-
{
490-
std::array<std::unique_lock<simple_spinlock>, n_locks> all_locks;
491-
for(size_t i = 0; i < n_locks; ++i)
492-
all_locks[i] = std::unique_lock<simple_spinlock>(global_element_locks[i], std::defer_lock);
487+
template <typename T>
488+
inline void __resize_global_array(size_t new_size, std::vector<T>& v) {
489+
// This condition should always be true as the
490+
DASSERT_EQ(v.size(), global_array_buffer_size);
491+
v.resize(new_size);
492+
}
493+
494+
template <typename V1, typename... VV>
495+
inline void __resize_global_array(size_t new_size, V1& v, VV&... other_v) {
496+
__resize_global_array(new_size, v);
497+
__resize_global_array(new_size, other_v...);
498+
}
499+
500+
501+
template <typename... V>
502+
void resize_global_arrays(size_t idx, V&... vv) {
503+
504+
// Grow aggressively, since a resize is really expensive.
505+
size_t new_size = 2 * (parallel_threshhold + idx + 1);
493506

494-
// Ensure nothing is happening with the vector by locking all
495-
// locks in a thread safe way. This prevents any thread from
496-
// accessing it while we resize it.
497-
boost::lock(all_locks.begin(), all_locks.end());
507+
// First, lock a global lock while the resize is happening, as many threads are likely to
508+
// hit this at once. This prevents multiple threads from locking all the locks in the full
509+
// array when there is contention.
510+
std::lock_guard<std::mutex> lg(_array_resize_lock);
498511

499-
// It's possible that another thread beat us to it.
500-
if(v.size() < idx + 1)
501-
v.resize(new_size);
512+
// Do we still need to resize it, or has another array hit this already?
513+
if(global_array_buffer_size <= idx) {
514+
std::array<std::unique_lock<simple_spinlock>, n_locks> all_locks;
502515

503-
// The destructor of all_locks takes care of the unlocking.
516+
for(size_t i = 0; i < n_locks; ++i) {
517+
all_locks[i] = std::unique_lock<simple_spinlock>(global_element_locks[i], std::defer_lock);
504518
}
519+
520+
// Ensure nothing is happening with the vector by locking all
521+
// locks in a thread safe way. This prevents any thread from
522+
// accessing it while we resize it.
523+
boost::lock(all_locks.begin(), all_locks.end());
524+
525+
__resize_global_array(new_size, vv...);
526+
527+
global_array_buffer_size = new_size;
505528
}
506529
}
507530

test/ml_data/CMakeLists.txt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@ make_boost_test(dml_basic_storage.cxx REQUIRES unity_shared_for_testing)
44
make_boost_test(dml_numerics.cxx REQUIRES unity_shared_for_testing)
55
make_boost_test(dml_untranslated_columns.cxx REQUIRES unity_shared_for_testing)
66
make_boost_test(dml_reindexing.cxx REQUIRES unity_shared_for_testing)
7-
# KNOWN FAILURE
8-
# make_boost_test(dml_stats_merge_test.cxx REQUIRES unity_shared_for_testing)
7+
make_boost_test(dml_stats_merge_test.cxx REQUIRES unity_shared_for_testing)
98
make_boost_test(dml_metadata_api.cxx REQUIRES unity_shared_for_testing)
109
make_boost_test(dml_schema_errors.cxx REQUIRES unity_shared_for_testing)
1110
# KNOWN FAILURE

0 commit comments

Comments
 (0)