Skip to content

Commit 8509ca4

Browse files
AntoinePrvZihan Qi
andauthored
GH-45860: [C++] Respect CPU affinity in cpu_count and ThreadPool default capacity (#47152)
### Rationale for this change We want the ThreadPool default capacity to follow the CPU affinity set by the user, if any. For example: ```console $ python -c "import pyarrow as pa; print(pa.cpu_count())" 24 $ taskset -c 5,6,7 python -c "import pyarrow as pa; print(pa.cpu_count())" 3 ``` ### What changes are included in this PR? - Implement and expose CPU affinity detection as a utility function in `arrow/io_util.h`; on non-Linux platform, it returns `Status::NotImplemented` - Use CPU affinity count, if available, to choose the default ThreadPool capacity (note: based on original changes by Zihan Qi in PR #46034) ### Are these changes tested? By unit tests on CI, and by hand locally. ### Are there any user-facing changes? ThreadPool capacity now follows CPU affinity settings on Linux. * GitHub Issue: #45860 Lead-authored-by: AntoinePrv <[email protected]> Co-authored-by: Zihan Qi <[email protected]> Signed-off-by: Antoine Pitrou <[email protected]>
1 parent c8baf8a commit 8509ca4

File tree

8 files changed

+76
-18
lines changed

8 files changed

+76
-18
lines changed

cpp/src/arrow/result.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,14 @@ class [[nodiscard]] Result : public util::EqualityComparable<Result<T>> {
377377
return MoveValueUnsafe();
378378
}
379379

380+
/// Return a copy of the internally stored value or alternative if an error is stored.
381+
T ValueOr(T alternative) const& {
382+
if (!ok()) {
383+
return alternative;
384+
}
385+
return ValueUnsafe();
386+
}
387+
380388
/// Retrieve the value if ok(), falling back to an alternative generated by the provided
381389
/// factory
382390
template <typename G>

cpp/src/arrow/util/io_util.cc

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@
115115
#elif __linux__
116116
# include <sys/sysinfo.h>
117117
# include <fstream>
118+
# include <limits>
118119
#endif
119120

120121
#ifdef _WIN32
@@ -2219,6 +2220,22 @@ int64_t GetTotalMemoryBytes() {
22192220
#endif
22202221
}
22212222

2223+
Result<int32_t> GetNumAffinityCores() {
2224+
#if defined(__linux__)
2225+
cpu_set_t mask;
2226+
if (sched_getaffinity(0, sizeof(mask), &mask) == 0) {
2227+
auto count = CPU_COUNT(&mask);
2228+
if (count > 0 &&
2229+
static_cast<uint64_t>(count) < std::numeric_limits<uint32_t>::max()) {
2230+
return static_cast<uint32_t>(count);
2231+
}
2232+
}
2233+
return IOErrorFromErrno(errno, "Could not read the CPU affinity.");
2234+
#else
2235+
return Status::NotImplemented("Only implemented for Linux");
2236+
#endif
2237+
}
2238+
22222239
Result<void*> LoadDynamicLibrary(const char* path) {
22232240
#ifdef _WIN32
22242241
ARROW_ASSIGN_OR_RAISE(auto platform_path, PlatformFilename::FromString(path));

cpp/src/arrow/util/io_util.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,12 @@ int64_t GetCurrentRSS();
419419
ARROW_EXPORT
420420
int64_t GetTotalMemoryBytes();
421421

422+
/// \brief Get the number of affinity core on the system.
423+
///
424+
/// This is only implemented on Linux.
425+
/// If a value is returned, it is guaranteed to be greater or equal to one.
426+
ARROW_EXPORT Result<int32_t> GetNumAffinityCores();
427+
422428
/// \brief Load a dynamic library
423429
///
424430
/// This wraps dlopen() except on Windows, where LoadLibrary() is called.

cpp/src/arrow/util/io_util_test.cc

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1123,5 +1123,16 @@ TEST(Memory, TotalMemory) {
11231123
#endif
11241124
}
11251125

1126+
TEST(CpuAffinity, NumberOfCores) {
1127+
auto maybe_affinity_cores = GetNumAffinityCores();
1128+
#ifdef __linux__
1129+
ASSERT_OK_AND_ASSIGN(auto affinity_cores, maybe_affinity_cores);
1130+
ASSERT_GE(affinity_cores, 1);
1131+
ASSERT_LE(affinity_cores, std::thread::hardware_concurrency());
1132+
#else
1133+
ASSERT_RAISES(NotImplemented, maybe_affinity_cores);
1134+
#endif
1135+
}
1136+
11261137
} // namespace internal
11271138
} // namespace arrow

cpp/src/arrow/util/thread_pool.cc

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -732,19 +732,23 @@ static int ParseOMPEnvVar(const char* name) {
732732
}
733733

734734
int ThreadPool::DefaultCapacity() {
735-
int capacity, limit;
736-
capacity = ParseOMPEnvVar("OMP_NUM_THREADS");
737-
if (capacity == 0) {
738-
capacity = std::thread::hardware_concurrency();
735+
int capacity = ParseOMPEnvVar("OMP_NUM_THREADS");
736+
if (capacity <= 0) {
737+
capacity = static_cast<int>(GetNumAffinityCores().ValueOr(0));
739738
}
740-
limit = ParseOMPEnvVar("OMP_THREAD_LIMIT");
741-
if (limit > 0) {
742-
capacity = std::min(limit, capacity);
739+
if (capacity <= 0) {
740+
capacity = static_cast<int>(std::thread::hardware_concurrency());
743741
}
744-
if (capacity == 0) {
745-
ARROW_LOG(WARNING) << "Failed to determine the number of available threads, "
746-
"using a hardcoded arbitrary value";
742+
if (capacity <= 0) {
747743
capacity = 4;
744+
ARROW_LOG(WARNING) << "Failed to determine the number of available threads, "
745+
"using a hardcoded arbitrary value of "
746+
<< capacity;
747+
}
748+
749+
const int limit = ParseOMPEnvVar("OMP_THREAD_LIMIT");
750+
if (limit > 0) {
751+
capacity = std::min(limit, capacity);
748752
}
749753
return capacity;
750754
}

cpp/src/arrow/util/thread_pool.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,7 @@ class ARROW_EXPORT ThreadPool : public Executor {
475475

476476
// Heuristic for the default capacity of a thread pool for CPU-bound tasks.
477477
// This is exposed as a static method to help with testing.
478+
// The number returned is guaranteed to be greater or equal to one.
478479
static int DefaultCapacity();
479480

480481
// Shutdown the pool. Once the pool starts shutting down, new tasks

cpp/src/arrow/util/thread_pool_test.cc

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1039,35 +1039,46 @@ TEST(TestGlobalThreadPool, Capacity) {
10391039
// Exercise default capacity heuristic
10401040
ASSERT_OK(DelEnvVar("OMP_NUM_THREADS"));
10411041
ASSERT_OK(DelEnvVar("OMP_THREAD_LIMIT"));
1042+
10421043
int hw_capacity = std::thread::hardware_concurrency();
1043-
ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
1044+
ASSERT_LE(ThreadPool::DefaultCapacity(), hw_capacity);
1045+
ASSERT_GE(ThreadPool::DefaultCapacity(), 1);
1046+
10441047
ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "13"));
10451048
ASSERT_EQ(ThreadPool::DefaultCapacity(), 13);
1049+
10461050
ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "7,5,13"));
10471051
ASSERT_EQ(ThreadPool::DefaultCapacity(), 7);
10481052
ASSERT_OK(DelEnvVar("OMP_NUM_THREADS"));
10491053

10501054
ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "1"));
10511055
ASSERT_EQ(ThreadPool::DefaultCapacity(), 1);
1056+
10521057
ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "999"));
1053-
if (hw_capacity <= 999) {
1054-
ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
1055-
}
1058+
ASSERT_LE(ThreadPool::DefaultCapacity(), std::min(999, hw_capacity));
1059+
ASSERT_GE(ThreadPool::DefaultCapacity(), 1);
1060+
10561061
ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "6,5,13"));
10571062
ASSERT_EQ(ThreadPool::DefaultCapacity(), 6);
1063+
10581064
ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "2"));
10591065
ASSERT_EQ(ThreadPool::DefaultCapacity(), 2);
10601066

10611067
// Invalid env values
10621068
ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "0"));
10631069
ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "0"));
1064-
ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
1070+
ASSERT_LE(ThreadPool::DefaultCapacity(), hw_capacity);
1071+
ASSERT_GE(ThreadPool::DefaultCapacity(), 1);
1072+
10651073
ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "zzz"));
10661074
ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "x"));
1067-
ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
1075+
ASSERT_LE(ThreadPool::DefaultCapacity(), hw_capacity);
1076+
ASSERT_GE(ThreadPool::DefaultCapacity(), 1);
1077+
10681078
ASSERT_OK(SetEnvVar("OMP_THREAD_LIMIT", "-1"));
10691079
ASSERT_OK(SetEnvVar("OMP_NUM_THREADS", "99999999999999999999999999"));
1070-
ASSERT_EQ(ThreadPool::DefaultCapacity(), hw_capacity);
1080+
ASSERT_LE(ThreadPool::DefaultCapacity(), hw_capacity);
1081+
ASSERT_GE(ThreadPool::DefaultCapacity(), 1);
10711082

10721083
ASSERT_OK(DelEnvVar("OMP_NUM_THREADS"));
10731084
ASSERT_OK(DelEnvVar("OMP_THREAD_LIMIT"));

docs/source/cpp/threading.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ CPU vs. I/O
4444
-----------
4545

4646
In order to minimize the overhead of context switches our default thread pool
47-
for CPU-intensive tasks has a fixed size, defaulting to
47+
for CPU-intensive tasks has a fixed size, defaulting to the process CPU affinity (on Linux) or
4848
`std::thread::hardware_concurrency <https://en.cppreference.com/w/cpp/thread/thread/hardware_concurrency>`_.
4949
This means that CPU tasks should never block for long periods of time because this
5050
will result in under-utilization of the CPU. To achieve this we have a separate

0 commit comments

Comments
 (0)