diff options
Diffstat (limited to 'llvm/lib/Support/ThreadPool.cpp')
-rw-r--r-- | llvm/lib/Support/ThreadPool.cpp | 108 |
1 files changed, 106 insertions, 2 deletions
diff --git a/llvm/lib/Support/ThreadPool.cpp b/llvm/lib/Support/ThreadPool.cpp index c304f0f..6960268 100644 --- a/llvm/lib/Support/ThreadPool.cpp +++ b/llvm/lib/Support/ThreadPool.cpp @@ -6,6 +6,7 @@ // //===----------------------------------------------------------------------===// // +// // This file implements a crude C++11 based thread pool. // //===----------------------------------------------------------------------===// @@ -14,6 +15,8 @@ #include "llvm/Config/llvm-config.h" +#include "llvm/ADT/ScopeExit.h" +#include "llvm/Support/ExponentialBackoff.h" #include "llvm/Support/FormatVariadic.h" #include "llvm/Support/Threading.h" #include "llvm/Support/raw_ostream.h" @@ -33,7 +36,10 @@ ThreadPoolInterface::~ThreadPoolInterface() = default; #if LLVM_ENABLE_THREADS StdThreadPool::StdThreadPool(ThreadPoolStrategy S) - : Strategy(S), MaxThreadCount(S.compute_thread_count()) {} + : Strategy(S), MaxThreadCount(S.compute_thread_count()) { + if (Strategy.UseJobserver) + TheJobserver = JobserverClient::getInstance(); +} void StdThreadPool::grow(int requested) { llvm::sys::ScopedWriter LockGuard(ThreadsLock); @@ -45,7 +51,15 @@ void StdThreadPool::grow(int requested) { Threads.emplace_back([this, ThreadID] { set_thread_name(formatv("llvm-worker-{0}", ThreadID)); Strategy.apply_thread_strategy(ThreadID); - processTasks(nullptr); + // Note on jobserver deadlock avoidance: + // GNU Make grants each invoked process one implicit job slot. + // JobserverClient::tryAcquire() returns that implicit slot on the first + // successful call in a process, ensuring forward progress without a + // dedicated "always-on" thread. + if (TheJobserver) + processTasksWithJobserver(); + else + processTasks(nullptr); }); } } @@ -133,6 +147,96 @@ void StdThreadPool::processTasks(ThreadPoolTaskGroup *WaitingForGroup) { } } +/// Main loop for worker threads when using a jobserver. +/// This function uses a two-level queue; it first acquires a job slot from the +/// external jobserver, then retrieves a task from the internal queue. +/// This allows the thread pool to cooperate with build systems like `make -j`. +void StdThreadPool::processTasksWithJobserver() { + while (true) { + // Acquire a job slot from the external jobserver. + // This polls for a slot and yields the thread to avoid a high-CPU wait. + JobSlot Slot; + // The timeout for the backoff can be very long, as the shutdown + // is checked on each iteration. The sleep duration is capped by MaxWait + // in ExponentialBackoff, so shutdown latency is not a problem. + ExponentialBackoff Backoff(std::chrono::hours(24)); + bool AcquiredToken = false; + do { + // Return if the thread pool is shutting down. + { + std::unique_lock<std::mutex> LockGuard(QueueLock); + if (!EnableFlag) + return; + } + + Slot = TheJobserver->tryAcquire(); + if (Slot.isValid()) { + AcquiredToken = true; + break; + } + } while (Backoff.waitForNextAttempt()); + + if (!AcquiredToken) { + // This is practically unreachable with a 24h timeout and indicates a + // deeper problem if hit. + report_fatal_error("Timed out waiting for jobserver token."); + } + + // `make_scope_exit` guarantees the job slot is released, even if the + // task throws or we exit early. This prevents deadlocking the build. + auto SlotReleaser = + make_scope_exit([&] { TheJobserver->release(std::move(Slot)); }); + + // While we hold a job slot, process tasks from the internal queue. + while (true) { + std::function<void()> Task; + ThreadPoolTaskGroup *GroupOfTask = nullptr; + + { + std::unique_lock<std::mutex> LockGuard(QueueLock); + + // Wait until a task is available or the pool is shutting down. + QueueCondition.wait(LockGuard, + [&] { return !EnableFlag || !Tasks.empty(); }); + + // If shutting down and the queue is empty, the thread can terminate. + if (!EnableFlag && Tasks.empty()) + return; + + // If the queue is empty, we're done processing tasks for now. + // Break the inner loop to release the job slot. + if (Tasks.empty()) + break; + + // A task is available. Mark it as active before releasing the lock + // to prevent race conditions with `wait()`. + ++ActiveThreads; + Task = std::move(Tasks.front().first); + GroupOfTask = Tasks.front().second; + if (GroupOfTask != nullptr) + ++ActiveGroups[GroupOfTask]; + Tasks.pop_front(); + } // The queue lock is released. + + // Run the task. The job slot remains acquired during execution. + Task(); + + // The task has finished. Update the active count and notify any waiters. + { + std::lock_guard<std::mutex> LockGuard(QueueLock); + --ActiveThreads; + if (GroupOfTask != nullptr) { + auto A = ActiveGroups.find(GroupOfTask); + if (--(A->second) == 0) + ActiveGroups.erase(A); + } + // If all tasks are complete, notify any waiting threads. + if (workCompletedUnlocked(nullptr)) + CompletionCondition.notify_all(); + } + } + } +} bool StdThreadPool::workCompletedUnlocked(ThreadPoolTaskGroup *Group) const { if (Group == nullptr) return !ActiveThreads && Tasks.empty(); |