aboutsummaryrefslogtreecommitdiff
path: root/llvm/lib/Support/ThreadPool.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'llvm/lib/Support/ThreadPool.cpp')
-rw-r--r--llvm/lib/Support/ThreadPool.cpp108
1 files changed, 106 insertions, 2 deletions
diff --git a/llvm/lib/Support/ThreadPool.cpp b/llvm/lib/Support/ThreadPool.cpp
index c304f0f..6960268 100644
--- a/llvm/lib/Support/ThreadPool.cpp
+++ b/llvm/lib/Support/ThreadPool.cpp
@@ -6,6 +6,7 @@
//
//===----------------------------------------------------------------------===//
//
+//
// This file implements a crude C++11 based thread pool.
//
//===----------------------------------------------------------------------===//
@@ -14,6 +15,8 @@
#include "llvm/Config/llvm-config.h"
+#include "llvm/ADT/ScopeExit.h"
+#include "llvm/Support/ExponentialBackoff.h"
#include "llvm/Support/FormatVariadic.h"
#include "llvm/Support/Threading.h"
#include "llvm/Support/raw_ostream.h"
@@ -33,7 +36,10 @@ ThreadPoolInterface::~ThreadPoolInterface() = default;
#if LLVM_ENABLE_THREADS
StdThreadPool::StdThreadPool(ThreadPoolStrategy S)
- : Strategy(S), MaxThreadCount(S.compute_thread_count()) {}
+ : Strategy(S), MaxThreadCount(S.compute_thread_count()) {
+ if (Strategy.UseJobserver)
+ TheJobserver = JobserverClient::getInstance();
+}
void StdThreadPool::grow(int requested) {
llvm::sys::ScopedWriter LockGuard(ThreadsLock);
@@ -45,7 +51,15 @@ void StdThreadPool::grow(int requested) {
Threads.emplace_back([this, ThreadID] {
set_thread_name(formatv("llvm-worker-{0}", ThreadID));
Strategy.apply_thread_strategy(ThreadID);
- processTasks(nullptr);
+ // Note on jobserver deadlock avoidance:
+ // GNU Make grants each invoked process one implicit job slot.
+ // JobserverClient::tryAcquire() returns that implicit slot on the first
+ // successful call in a process, ensuring forward progress without a
+ // dedicated "always-on" thread.
+ if (TheJobserver)
+ processTasksWithJobserver();
+ else
+ processTasks(nullptr);
});
}
}
@@ -133,6 +147,96 @@ void StdThreadPool::processTasks(ThreadPoolTaskGroup *WaitingForGroup) {
}
}
+/// Main loop for worker threads when using a jobserver.
+/// This function uses a two-level queue; it first acquires a job slot from the
+/// external jobserver, then retrieves a task from the internal queue.
+/// This allows the thread pool to cooperate with build systems like `make -j`.
+void StdThreadPool::processTasksWithJobserver() {
+ while (true) {
+ // Acquire a job slot from the external jobserver.
+ // This polls for a slot and yields the thread to avoid a high-CPU wait.
+ JobSlot Slot;
+ // The timeout for the backoff can be very long, as the shutdown
+ // is checked on each iteration. The sleep duration is capped by MaxWait
+ // in ExponentialBackoff, so shutdown latency is not a problem.
+ ExponentialBackoff Backoff(std::chrono::hours(24));
+ bool AcquiredToken = false;
+ do {
+ // Return if the thread pool is shutting down.
+ {
+ std::unique_lock<std::mutex> LockGuard(QueueLock);
+ if (!EnableFlag)
+ return;
+ }
+
+ Slot = TheJobserver->tryAcquire();
+ if (Slot.isValid()) {
+ AcquiredToken = true;
+ break;
+ }
+ } while (Backoff.waitForNextAttempt());
+
+ if (!AcquiredToken) {
+ // This is practically unreachable with a 24h timeout and indicates a
+ // deeper problem if hit.
+ report_fatal_error("Timed out waiting for jobserver token.");
+ }
+
+ // `make_scope_exit` guarantees the job slot is released, even if the
+ // task throws or we exit early. This prevents deadlocking the build.
+ auto SlotReleaser =
+ make_scope_exit([&] { TheJobserver->release(std::move(Slot)); });
+
+ // While we hold a job slot, process tasks from the internal queue.
+ while (true) {
+ std::function<void()> Task;
+ ThreadPoolTaskGroup *GroupOfTask = nullptr;
+
+ {
+ std::unique_lock<std::mutex> LockGuard(QueueLock);
+
+ // Wait until a task is available or the pool is shutting down.
+ QueueCondition.wait(LockGuard,
+ [&] { return !EnableFlag || !Tasks.empty(); });
+
+ // If shutting down and the queue is empty, the thread can terminate.
+ if (!EnableFlag && Tasks.empty())
+ return;
+
+ // If the queue is empty, we're done processing tasks for now.
+ // Break the inner loop to release the job slot.
+ if (Tasks.empty())
+ break;
+
+ // A task is available. Mark it as active before releasing the lock
+ // to prevent race conditions with `wait()`.
+ ++ActiveThreads;
+ Task = std::move(Tasks.front().first);
+ GroupOfTask = Tasks.front().second;
+ if (GroupOfTask != nullptr)
+ ++ActiveGroups[GroupOfTask];
+ Tasks.pop_front();
+ } // The queue lock is released.
+
+ // Run the task. The job slot remains acquired during execution.
+ Task();
+
+ // The task has finished. Update the active count and notify any waiters.
+ {
+ std::lock_guard<std::mutex> LockGuard(QueueLock);
+ --ActiveThreads;
+ if (GroupOfTask != nullptr) {
+ auto A = ActiveGroups.find(GroupOfTask);
+ if (--(A->second) == 0)
+ ActiveGroups.erase(A);
+ }
+ // If all tasks are complete, notify any waiting threads.
+ if (workCompletedUnlocked(nullptr))
+ CompletionCondition.notify_all();
+ }
+ }
+ }
+}
bool StdThreadPool::workCompletedUnlocked(ThreadPoolTaskGroup *Group) const {
if (Group == nullptr)
return !ActiveThreads && Tasks.empty();