//===- llvm/Support/Parallel.cpp - Parallel algorithms --------------------===// // // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. // See https://llvm.org/LICENSE.txt for license information. // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception // //===----------------------------------------------------------------------===// #include "llvm/Support/Parallel.h" #include "llvm/Config/llvm-config.h" #include "llvm/Support/ManagedStatic.h" #include "llvm/Support/Threading.h" #include #include #include #include #include llvm::ThreadPoolStrategy llvm::parallel::strategy; namespace llvm { namespace parallel { #if LLVM_ENABLE_THREADS #ifdef _WIN32 static thread_local unsigned threadIndex = UINT_MAX; unsigned getThreadIndex() { GET_THREAD_INDEX_IMPL; } #else thread_local unsigned threadIndex = UINT_MAX; #endif namespace detail { namespace { /// An abstract class that takes closures and runs them asynchronously. class Executor { public: virtual ~Executor() = default; virtual void add(std::function func, bool Sequential = false) = 0; virtual size_t getThreadCount() const = 0; static Executor *getDefaultExecutor(); }; /// An implementation of an Executor that runs closures on a thread pool /// in filo order. class ThreadPoolExecutor : public Executor { public: explicit ThreadPoolExecutor(ThreadPoolStrategy S = hardware_concurrency()) { ThreadCount = S.compute_thread_count(); // Spawn all but one of the threads in another thread as spawning threads // can take a while. Threads.reserve(ThreadCount); Threads.resize(1); std::lock_guard Lock(Mutex); // Use operator[] before creating the thread to avoid data race in .size() // in “safe libc++” mode. auto &Thread0 = Threads[0]; Thread0 = std::thread([this, S] { for (unsigned I = 1; I < ThreadCount; ++I) { Threads.emplace_back([=] { work(S, I); }); if (Stop) break; } ThreadsCreated.set_value(); work(S, 0); }); } void stop() { { std::lock_guard Lock(Mutex); if (Stop) return; Stop = true; } Cond.notify_all(); ThreadsCreated.get_future().wait(); } ~ThreadPoolExecutor() override { stop(); std::thread::id CurrentThreadId = std::this_thread::get_id(); for (std::thread &T : Threads) if (T.get_id() == CurrentThreadId) T.detach(); else T.join(); } struct Creator { static void *call() { return new ThreadPoolExecutor(strategy); } }; struct Deleter { static void call(void *Ptr) { ((ThreadPoolExecutor *)Ptr)->stop(); } }; void add(std::function F, bool Sequential = false) override { { std::lock_guard Lock(Mutex); if (Sequential) WorkQueueSequential.emplace_front(std::move(F)); else WorkQueue.emplace_back(std::move(F)); } Cond.notify_one(); } size_t getThreadCount() const override { return ThreadCount; } private: bool hasSequentialTasks() const { return !WorkQueueSequential.empty() && !SequentialQueueIsLocked; } bool hasGeneralTasks() const { return !WorkQueue.empty(); } void work(ThreadPoolStrategy S, unsigned ThreadID) { threadIndex = ThreadID; S.apply_thread_strategy(ThreadID); while (true) { std::unique_lock Lock(Mutex); Cond.wait(Lock, [&] { return Stop || hasGeneralTasks() || hasSequentialTasks(); }); if (Stop) break; bool Sequential = hasSequentialTasks(); if (Sequential) SequentialQueueIsLocked = true; else assert(hasGeneralTasks()); auto &Queue = Sequential ? WorkQueueSequential : WorkQueue; auto Task = std::move(Queue.back()); Queue.pop_back(); Lock.unlock(); Task(); if (Sequential) SequentialQueueIsLocked = false; } } std::atomic Stop{false}; std::atomic SequentialQueueIsLocked{false}; std::deque> WorkQueue; std::deque> WorkQueueSequential; std::mutex Mutex; std::condition_variable Cond; std::promise ThreadsCreated; std::vector Threads; unsigned ThreadCount; }; Executor *Executor::getDefaultExecutor() { // The ManagedStatic enables the ThreadPoolExecutor to be stopped via // llvm_shutdown() which allows a "clean" fast exit, e.g. via _exit(). This // stops the thread pool and waits for any worker thread creation to complete // but does not wait for the threads to finish. The wait for worker thread // creation to complete is important as it prevents intermittent crashes on // Windows due to a race condition between thread creation and process exit. // // The ThreadPoolExecutor will only be destroyed when the static unique_ptr to // it is destroyed, i.e. in a normal full exit. The ThreadPoolExecutor // destructor ensures it has been stopped and waits for worker threads to // finish. The wait is important as it prevents intermittent crashes on // Windows when the process is doing a full exit. // // The Windows crashes appear to only occur with the MSVC static runtimes and // are more frequent with the debug static runtime. // // This also prevents intermittent deadlocks on exit with the MinGW runtime. static ManagedStatic ManagedExec; static std::unique_ptr Exec(&(*ManagedExec)); return Exec.get(); } } // namespace } // namespace detail size_t getThreadCount() { return detail::Executor::getDefaultExecutor()->getThreadCount(); } #endif // Latch::sync() called by the dtor may cause one thread to block. If is a dead // lock if all threads in the default executor are blocked. To prevent the dead // lock, only allow the root TaskGroup to run tasks parallelly. In the scenario // of nested parallel_for_each(), only the outermost one runs parallelly. TaskGroup::TaskGroup() #if LLVM_ENABLE_THREADS : Parallel((parallel::strategy.ThreadsRequested != 1) && (threadIndex == UINT_MAX)) {} #else : Parallel(false) {} #endif TaskGroup::~TaskGroup() { // We must ensure that all the workloads have finished before decrementing the // instances count. L.sync(); } void TaskGroup::spawn(std::function F, bool Sequential) { #if LLVM_ENABLE_THREADS if (Parallel) { L.inc(); detail::Executor::getDefaultExecutor()->add( [&, F = std::move(F)] { F(); L.dec(); }, Sequential); return; } #endif F(); } } // namespace parallel } // namespace llvm void llvm::parallelFor(size_t Begin, size_t End, llvm::function_ref Fn) { #if LLVM_ENABLE_THREADS if (parallel::strategy.ThreadsRequested != 1) { auto NumItems = End - Begin; // Limit the number of tasks to MaxTasksPerGroup to limit job scheduling // overhead on large inputs. auto TaskSize = NumItems / parallel::detail::MaxTasksPerGroup; if (TaskSize == 0) TaskSize = 1; parallel::TaskGroup TG; for (; Begin + TaskSize < End; Begin += TaskSize) { TG.spawn([=, &Fn] { for (size_t I = Begin, E = Begin + TaskSize; I != E; ++I) Fn(I); }); } if (Begin != End) { TG.spawn([=, &Fn] { for (size_t I = Begin; I != End; ++I) Fn(I); }); } return; } #endif for (; Begin != End; ++Begin) Fn(Begin); }