Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 3 additions & 20 deletions llvm/lib/Support/Parallel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,16 +193,7 @@ class ThreadPoolExecutor : public Executor {
JobserverClient *TheJobserver = nullptr;
};

// A global raw pointer to the executor. Lifetime is managed by the
// objects created within createExecutor().
static Executor *TheExec = nullptr;
static std::once_flag Flag;

// This function will be called exactly once to create the executor.
// It contains the necessary platform-specific logic. Since functions
// called by std::call_once cannot return value, we have to set the
// executor as a global variable.
void createExecutor() {
Executor *Executor::getDefaultExecutor() {
#ifdef _WIN32
// The ManagedStatic enables the ThreadPoolExecutor to be stopped via
// llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This
Expand All @@ -221,27 +212,19 @@ void createExecutor() {
// are more frequent with the debug static runtime.
//
// This also prevents intermittent deadlocks on exit with the MinGW runtime.

static ManagedStatic<ThreadPoolExecutor, ThreadPoolExecutor::Creator,
ThreadPoolExecutor::Deleter>
ManagedExec;
static std::unique_ptr<ThreadPoolExecutor> Exec(&(*ManagedExec));
TheExec = Exec.get();
return &*ManagedExec;
#else
// ManagedStatic is not desired on other platforms. When `Exec` is destroyed
// by llvm_shutdown(), worker threads will clean up and invoke TLS
// destructors. This can lead to race conditions if other threads attempt to
// access TLS objects that have already been destroyed.
static ThreadPoolExecutor Exec(strategy);
TheExec = &Exec;
return &Exec;
#endif
}

Executor *Executor::getDefaultExecutor() {
// Use std::call_once to lazily and safely initialize the executor.
std::call_once(Flag, createExecutor);
return TheExec;
}
} // namespace
} // namespace detail

Expand Down
81 changes: 65 additions & 16 deletions llvm/unittests/Support/JobserverTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "llvm/Config/llvm-config.h"
#include "llvm/Support/Debug.h"
#include "llvm/Support/Parallel.h"
#include "llvm/Support/Program.h"
#include "llvm/Support/ThreadPool.h"
#include "llvm/Support/raw_ostream.h"
#include "gtest/gtest.h"
Expand All @@ -40,8 +41,14 @@

using namespace llvm;

// Provided by the unit test main to locate the current test binary.
extern const char *TestMainArgv0;

namespace {

// Unique anchor whose address helps locate the current test binary.
static int JobserverTestAnchor = 0;

// RAII helper to set an environment variable for the duration of a test.
class ScopedEnvironment {
std::string Name;
Expand Down Expand Up @@ -382,51 +389,93 @@ TEST_F(JobserverStrategyTest, ThreadPoolConcurrencyIsLimited) {
EXPECT_EQ(CompletedTasks, NumTasks);
}

TEST_F(JobserverStrategyTest, ParallelForIsLimited) {
// Parent-side driver that spawns a fresh process to run the child test which
// validates that parallelFor respects the jobserver limit when it is the first
// user of the default executor in that process.
TEST_F(JobserverStrategyTest, ParallelForIsLimited_Subprocess) {
// Mark child execution.
setenv("LLVM_JOBSERVER_TEST_CHILD", "1", 1);

// Find the current test binary and build args to run only the child test.
std::string Executable =
sys::fs::getMainExecutable(TestMainArgv0, &JobserverTestAnchor);
ASSERT_FALSE(Executable.empty()) << "Failed to get main executable path";
SmallVector<StringRef, 4> Args{Executable,
"--gtest_filter=JobserverStrategyTest."
"ParallelForIsLimited_SubprocessChild"};

std::string Error;
bool ExecFailed = false;
int RC = sys::ExecuteAndWait(Executable, Args, std::nullopt, {}, 0, 0, &Error,
&ExecFailed);
unsetenv("LLVM_JOBSERVER_TEST_CHILD");
ASSERT_FALSE(ExecFailed) << Error;
ASSERT_EQ(RC, 0) << "Executable failed with exit code " << RC;
}

// Child-side test: create FIFO and make-proxy in this process, set the
// jobserver strategy, and then run parallelFor.
TEST_F(JobserverStrategyTest, ParallelForIsLimited_SubprocessChild) {
if (!getenv("LLVM_JOBSERVER_TEST_CHILD"))
GTEST_SKIP() << "Not running in child mode";

// This test verifies that llvm::parallelFor respects the jobserver limit.
const int NumExplicitJobs = 3;
const int ConcurrencyLimit = NumExplicitJobs + 1; // +1 implicit
const int NumTasks = 20;

LLVM_DEBUG(dbgs() << "Calling startMakeProxy with " << NumExplicitJobs
<< " jobs.\n");
startMakeProxy(NumExplicitJobs);
LLVM_DEBUG(dbgs() << "MakeProxy is running.\n");

// Set the global strategy. parallelFor will use this.
// Set the global strategy before any default executor is created.
parallel::strategy = jobserver_concurrency();

std::atomic<int> ActiveTasks{0};
std::atomic<int> MaxActiveTasks{0};

parallelFor(0, NumTasks, [&](int i) {
parallelFor(0, NumTasks, [&]([[maybe_unused]] int i) {
int CurrentActive = ++ActiveTasks;
LLVM_DEBUG(dbgs() << "Task " << i << ": Active tasks: " << CurrentActive
<< "\n");
int OldMax = MaxActiveTasks.load();
while (CurrentActive > OldMax)
MaxActiveTasks.compare_exchange_weak(OldMax, CurrentActive);

std::this_thread::sleep_for(std::chrono::milliseconds(20));
--ActiveTasks;
});

LLVM_DEBUG(dbgs() << "ParallelFor finished. Max active tasks was "
<< MaxActiveTasks << ".\n");
EXPECT_LE(MaxActiveTasks, ConcurrencyLimit);
}

TEST_F(JobserverStrategyTest, ParallelSortIsLimited) {
// This test serves as an integration test to ensure parallelSort completes
// correctly when running under the jobserver strategy. It doesn't directly
// measure concurrency but verifies correctness.
// Parent-side driver for parallelSort child test.
TEST_F(JobserverStrategyTest, ParallelSortIsLimited_Subprocess) {
setenv("LLVM_JOBSERVER_TEST_CHILD", "1", 1);

std::string Executable =
sys::fs::getMainExecutable(TestMainArgv0, &JobserverTestAnchor);
ASSERT_FALSE(Executable.empty()) << "Failed to get main executable path";
SmallVector<StringRef, 4> Args{Executable,
"--gtest_filter=JobserverStrategyTest."
"ParallelSortIsLimited_SubprocessChild"};

std::string Error;
bool ExecFailed = false;
int RC = sys::ExecuteAndWait(Executable, Args, std::nullopt, {}, 0, 0, &Error,
&ExecFailed);
unsetenv("LLVM_JOBSERVER_TEST_CHILD");
ASSERT_FALSE(ExecFailed) << Error;
ASSERT_EQ(RC, 0) << "Executable failed with exit code " << RC;
}

// Child-side test: ensure parallelSort runs and completes correctly under the
// jobserver strategy when it owns default executor initialization.
TEST_F(JobserverStrategyTest, ParallelSortIsLimited_SubprocessChild) {
if (!getenv("LLVM_JOBSERVER_TEST_CHILD"))
GTEST_SKIP() << "Not running in child mode";

const int NumExplicitJobs = 3;
startMakeProxy(NumExplicitJobs);

parallel::strategy = jobserver_concurrency();

std::vector<int> V(1024);
// Fill with random data
std::mt19937 randEngine;
std::uniform_int_distribution<int> dist;
for (int &i : V)
Expand Down
Loading