//===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===// // // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. // See https://llvm.org/LICENSE.txt for license information. // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception // //===----------------------------------------------------------------------===// #include "llvm/Support/Parallel.h" #include "llvm/ADT/ScopeExit.h" #include "llvm/Config/llvm-config.h" #include "llvm/Support/ExponentialBackoff.h" #include "llvm/Support/Jobserver.h" #include "llvm/Support/ManagedStatic.h" #include "llvm/Support/Threading.h" #include #include #include #include #include #include llvm::ThreadPoolStrategy llvm::parallel::strategy; namespace llvm { namespace parallel { #if LLVM_ENABLE_THREADS #ifdef _WIN32 static thread_local unsigned threadIndex = UINT_MAX; unsigned getThreadIndex() { GET_THREAD_INDEX_IMPL; } #else thread_local unsigned threadIndex = UINT_MAX; #endif namespace detail { namespace { /// An abstract class that takes closures and runs them asynchronously. class Executor { public: virtual ~Executor() = default; virtual void add(std::function func) = 0; virtual size_t getThreadCount() const = 0; static Executor *getDefaultExecutor(); }; /// An implementation of an Executor that runs closures on a thread pool /// in filo order. class ThreadPoolExecutor : public Executor { public: explicit ThreadPoolExecutor(ThreadPoolStrategy S) { if (S.UseJobserver) TheJobserver = JobserverClient::getInstance(); ThreadCount = S.compute_thread_count(); // Spawn all but one of the threads in another thread as spawning threads // can take a while. Threads.reserve(ThreadCount); Threads.resize(1); std::lock_guard Lock(Mutex); // Use operator[] before creating the thread to avoid data race in .size() // in 'safe libc++' mode. auto &Thread0 = Threads[0]; Thread0 = std::thread([this, S] { for (unsigned I = 1; I < ThreadCount; ++I) { Threads.emplace_back([this, S, I] { work(S, I); }); if (Stop) break; } ThreadsCreated.set_value(); work(S, 0); }); } // To make sure the thread pool executor can only be created with a parallel // strategy. ThreadPoolExecutor() = delete; void stop() { { std::lock_guard Lock(Mutex); if (Stop) return; Stop = true; } Cond.notify_all(); ThreadsCreated.get_future().wait(); } ~ThreadPoolExecutor() override { stop(); std::thread::id CurrentThreadId = std::this_thread::get_id(); for (std::thread &T : Threads) if (T.get_id() == CurrentThreadId) T.detach(); else T.join(); } struct Creator { static void *call() { return new ThreadPoolExecutor(strategy); } }; struct Deleter { static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); } }; void add(std::function F) override { { std::lock_guard Lock(Mutex); WorkStack.push_back(std::move(F)); } Cond.notify_one(); } size_t getThreadCount() const override { return ThreadCount; } private: void work(ThreadPoolStrategy S, unsigned ThreadID) { threadIndex = ThreadID; S.apply_thread_strategy(ThreadID); // Note on jobserver deadlock avoidance: // GNU Make grants each invoked process one implicit job slot. Our // JobserverClient models this by returning an implicit JobSlot on the // first successful tryAcquire() in a process. This guarantees forward // progress without requiring a dedicated "always-on" thread here. static thread_local std::unique_ptr Backoff; while (true) { if (TheJobserver) { // Jobserver-mode scheduling: // - Acquire one job slot (with exponential backoff to avoid busy-wait). // - While holding the slot, drain and run tasks from the local queue. // - Release the slot when the queue is empty or when shutting down. // Rationale: Holding a slot amortizes acquire/release overhead over // multiple tasks and avoids requeue/yield churn, while still enforcing // the jobserver’s global concurrency limit. With K available slots, // up to K workers run tasks in parallel; within each worker tasks run // sequentially until the local queue is empty. ExponentialBackoff Backoff(std::chrono::hours(24)); JobSlot Slot; do { if (Stop) return; Slot = TheJobserver->tryAcquire(); if (Slot.isValid()) break; } while (Backoff.waitForNextAttempt()); auto SlotReleaser = llvm::make_scope_exit( [&] { TheJobserver->release(std::move(Slot)); }); while (true) { std::function Task; { std::unique_lock Lock(Mutex); Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); }); if (Stop && WorkStack.empty()) return; if (WorkStack.empty()) break; Task = std::move(WorkStack.back()); WorkStack.pop_back(); } Task(); } } else { std::unique_lock Lock(Mutex); Cond.wait(Lock, [&] { return Stop || !WorkStack.empty(); }); if (Stop) break; auto Task = std::move(WorkStack.back()); WorkStack.pop_back(); Lock.unlock(); Task(); } } } std::atomic Stop{false}; std::vector> WorkStack; std::mutex Mutex; std::condition_variable Cond; std::promise ThreadsCreated; std::vector Threads; unsigned ThreadCount; JobserverClient *TheJobserver = nullptr; }; // A global raw pointer to the executor. Lifetime is managed by the // objects created within createExecutor(). static Executor *TheExec = nullptr; static std::once_flag Flag; // This function will be called exactly once to create the executor. // It contains the necessary platform-specific logic. Since functions // called by std::call_once cannot return value, we have to set the // executor as a global variable. void createExecutor() { #ifdef _WIN32 // The ManagedStatic enables the ThreadPoolExecutor to be stopped via // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This // stops the thread pool and waits for any worker thread creation to complete // but does not wait for the threads to finish. The wait for worker thread // creation to complete is important as it prevents intermittent crashes on // Windows due to a race condition between thread creation and process exit. // // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor // destructor ensures it has been stopped and waits for worker threads to // finish. The wait is important as it prevents intermittent crashes on // Windows when the process is doing a full exit. // // The Windows crashes appear to only occur with the MSVC static runtimes and // are more frequent with the debug static runtime. // // This also prevents intermittent deadlocks on exit with the MinGW runtime. static ManagedStatic ManagedExec; static std::unique_ptr Exec(&(*ManagedExec)); TheExec = Exec.get(); #else // ManagedStatic is not desired on other platforms. When `Exec` is destroyed // by llvm_shutdown(), worker threads will clean up and invoke TLS // destructors. This can lead to race conditions if other threads attempt to // access TLS objects that have already been destroyed. static ThreadPoolExecutor Exec(strategy); TheExec = &Exec; #endif } Executor *Executor::getDefaultExecutor() { // Use std::call_once to lazily and safely initialize the executor. std::call_once(Flag, createExecutor); return TheExec; } } // namespace } // namespace detail size_t getThreadCount() { return detail::Executor::getDefaultExecutor()->getThreadCount(); } #endif // Latch::sync() called by the dtor may cause one thread to block. If is a dead // lock if all threads in the default executor are blocked. To prevent the dead // lock, only allow the root TaskGroup to run tasks parallelly. In the scenario // of nested parallel_for_each(), only the outermost one runs parallelly. TaskGroup::TaskGroup() #if LLVM_ENABLE_THREADS : Parallel((parallel::strategy.ThreadsRequested != 1) && (threadIndex == UINT_MAX)) {} #else : Parallel(false) {} #endif TaskGroup::~TaskGroup() { // We must ensure that all the workloads have finished before decrementing the // instances count. L.sync(); } void TaskGroup::spawn(std::function F) { #if LLVM_ENABLE_THREADS if (Parallel) { L.inc(); detail::Executor::getDefaultExecutor()->add([&, F = std::move(F)] { F(); L.dec(); }); return; } #endif F(); } } // namespace parallel } // namespace llvm void llvm::parallelFor(size_t Begin, size_t End, llvm::function_ref Fn) { #if LLVM_ENABLE_THREADS if (parallel::strategy.ThreadsRequested != 1) { auto NumItems = End - Begin; // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling // overhead on large inputs. auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup; if (TaskSize == 0) TaskSize = 1; parallel::TaskGroup TG; for (; Begin + TaskSize < End; Begin += TaskSize) { TG.spawn([=, &Fn] { for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I) Fn(I); }); } if (Begin != End) { TG.spawn([=, &Fn] { for (size_t I = Begin; I != End; ++I) Fn(I); }); } return; } #endif for (; Begin != End; ++Begin) Fn(Begin); }