diff options
Diffstat (limited to 'gdbsupport/parallel-for.h')
-rw-r--r-- | gdbsupport/parallel-for.h | 286 |
1 files changed, 196 insertions, 90 deletions
diff --git a/gdbsupport/parallel-for.h b/gdbsupport/parallel-for.h index c485c36..7f3fef9 100644 --- a/gdbsupport/parallel-for.h +++ b/gdbsupport/parallel-for.h @@ -21,116 +21,92 @@ #define GDBSUPPORT_PARALLEL_FOR_H #include <algorithm> -#include <type_traits> +#include <atomic> +#include <tuple> +#include "gdbsupport/iterator-range.h" #include "gdbsupport/thread-pool.h" -#include "gdbsupport/function-view.h" +#include "gdbsupport/work-queue.h" namespace gdb { -/* A very simple "parallel for". This splits the range of iterators - into subranges, and then passes each subrange to the callback. The - work may or may not be done in separate threads. +/* If enabled, print debug info about the inner workings of the parallel for + each functions. */ +constexpr bool parallel_for_each_debug = false; - This approach was chosen over having the callback work on single - items because it makes it simple for the caller to do - once-per-subrange initialization and destruction. +/* A "parallel-for" implementation using a shared work queue. Work items get + popped in batches of size up to BATCH_SIZE from the queue and handed out to + worker threads. - The parameter N says how batching ought to be done -- there will be - at least N elements processed per thread. Setting N to 0 is not - allowed. */ + Each worker thread instantiates an object of type Worker, forwarding ARGS to + its constructor. The Worker object can be used to keep some per-worker + thread state. -template<class RandomIt, class RangeFunction> + Worker threads call Worker::operator() repeatedly until the queue is + empty. + + This function is synchronous, meaning that it blocks and returns once the + processing is complete. */ + +template<std::size_t batch_size, class RandomIt, class Worker, + class... WorkerArgs> void -parallel_for_each (unsigned n, RandomIt first, RandomIt last, - RangeFunction callback) +parallel_for_each (const RandomIt first, const RandomIt last, + WorkerArgs &&...worker_args) { - /* If enabled, print debug info about how the work is distributed across - the threads. */ - const bool parallel_for_each_debug = false; + gdb_assert (first <= last); - size_t n_worker_threads = thread_pool::g_thread_pool->thread_count (); - size_t n_threads = n_worker_threads; - size_t n_elements = last - first; - size_t elts_per_thread = 0; - size_t elts_left_over = 0; - - if (n_threads > 1) + if (parallel_for_each_debug) { - /* Require that there should be at least N elements in a - thread. */ - gdb_assert (n > 0); - if (n_elements / n_threads < n) - n_threads = std::max (n_elements / n, (size_t) 1); - elts_per_thread = n_elements / n_threads; - elts_left_over = n_elements % n_threads; - /* n_elements == n_threads * elts_per_thread + elts_left_over. */ + debug_printf ("Parallel for: n elements: %zu\n", + static_cast<std::size_t> (last - first)); + debug_printf ("Parallel for: batch size: %zu\n", batch_size); } - size_t count = n_threads == 0 ? 0 : n_threads - 1; std::vector<gdb::future<void>> results; + work_queue<RandomIt, batch_size> queue (first, last); - if (parallel_for_each_debug) - { - debug_printf (_("Parallel for: n_elements: %zu\n"), n_elements); - debug_printf (_("Parallel for: minimum elements per thread: %u\n"), n); - debug_printf (_("Parallel for: elts_per_thread: %zu\n"), elts_per_thread); - } + /* The worker thread task. + + We need to capture args as a tuple, because it's not possible to capture + the parameter pack directly in C++17. Once we migrate to C++20, the + capture can be simplified to: - for (int i = 0; i < count; ++i) + ... args = std::forward<Args>(args) + + and `args` can be used as-is in the lambda. */ + auto args_tuple + = std::forward_as_tuple (std::forward<WorkerArgs> (worker_args)...); + auto task = [&queue, first, &args_tuple] () { - RandomIt end; - end = first + elts_per_thread; - if (i < elts_left_over) - /* Distribute the leftovers over the worker threads, to avoid having - to handle all of them in a single thread. */ - end++; - - /* This case means we don't have enough elements to really - distribute them. Rather than ever submit a task that does - nothing, we short-circuit here. */ - if (first == end) - end = last; - - if (end == last) - { - /* We're about to dispatch the last batch of elements, which - we normally process in the main thread. So just truncate - the result list here. This avoids submitting empty tasks - to the thread pool. */ - count = i; - break; - } + /* Instantiate the user-defined worker. */ + auto worker = std::make_from_tuple<Worker> (args_tuple); - if (parallel_for_each_debug) + for (;;) { - debug_printf (_("Parallel for: elements on worker thread %i\t: %zu"), - i, (size_t)(end - first)); - debug_printf (_("\n")); + const auto batch = queue.pop_batch (); + + if (batch.empty ()) + break; + + if (parallel_for_each_debug) + debug_printf ("Processing %zu items, range [%zu, %zu[\n", + batch.size (), + batch.begin () - first, + batch.end () - first); + + worker (batch); } - results.push_back (gdb::thread_pool::g_thread_pool->post_task ([=] () - { - return callback (first, end); - })); - first = end; - } + }; - for (int i = count; i < n_worker_threads; ++i) - if (parallel_for_each_debug) - { - debug_printf (_("Parallel for: elements on worker thread %i\t: 0"), i); - debug_printf (_("\n")); - } + /* Start N_WORKER_THREADS tasks. */ + const size_t n_worker_threads + = std::max<size_t> (thread_pool::g_thread_pool->thread_count (), 1); - /* Process all the remaining elements in the main thread. */ - if (parallel_for_each_debug) - { - debug_printf (_("Parallel for: elements on main thread\t\t: %zu"), - (size_t)(last - first)); - debug_printf (_("\n")); - } - callback (first, last); + for (int i = 0; i < n_worker_threads; ++i) + results.push_back (gdb::thread_pool::g_thread_pool->post_task (task)); + /* Wait for all of them to be finished. */ for (auto &fut : results) fut.get (); } @@ -139,12 +115,142 @@ parallel_for_each (unsigned n, RandomIt first, RandomIt last, when debugging multi-threading behavior, and you want to limit multi-threading in a fine-grained way. */ -template<class RandomIt, class RangeFunction> +template<class RandomIt, class Worker, class... WorkerArgs> +void +sequential_for_each (RandomIt first, RandomIt last, WorkerArgs &&...worker_args) +{ + if (first == last) + return; + + Worker (std::forward<WorkerArgs> (worker_args)...) ({ first, last }); +} + +namespace detail +{ + +/* Type to hold the state shared between threads of + gdb::parallel_for_each_async. */ + +template<std::size_t min_batch_size, typename RandomIt, typename... WorkerArgs> +struct pfea_state +{ + pfea_state (RandomIt first, RandomIt last, std::function<void ()> &&done, + WorkerArgs &&...worker_args) + : first (first), + last (last), + worker_args_tuple (std::forward_as_tuple + (std::forward<WorkerArgs> (worker_args)...)), + queue (first, last), + m_done (std::move (done)) + {} + + DISABLE_COPY_AND_ASSIGN (pfea_state); + + /* This gets called by the last worker thread that drops its reference on + the shared state, thus when the processing is complete. */ + ~pfea_state () + { + if (m_done) + m_done (); + } + + /* The interval to process. */ + const RandomIt first, last; + + /* Tuple of arguments to pass when constructing the user's worker object. + + Use std::decay_t to avoid storing references to the caller's local + variables. If we didn't use it and the caller passed an lvalue `foo *`, + we would store it as a reference to `foo *`, thus storing a reference to + the caller's local variable. + + The downside is that it's not possible to pass arguments by reference, + callers need to pass pointers or std::reference_wrappers. */ + std::tuple<std::decay_t<WorkerArgs>...> worker_args_tuple; + + /* Work queue that worker threads pull work items from. */ + work_queue<RandomIt, min_batch_size> queue; + +private: + /* Callable called when the parallel-for is done. */ + std::function<void ()> m_done; +}; + +} /* namespace detail */ + +/* A "parallel-for" implementation using a shared work queue. Work items get + popped in batches from the queue and handed out to worker threads. + + Batch sizes are proportional to the number of remaining items in the queue, + but always greater or equal to MIN_BATCH_SIZE. + + The DONE callback is invoked when processing is done. + + Each worker thread instantiates an object of type Worker, forwarding ARGS to + its constructor. The Worker object can be used to keep some per-worker + thread state. This version does not support passing references as arguments + to the worker. Use std::reference_wrapper or pointers instead. + + Worker threads call Worker::operator() repeatedly until the queue is + empty. + + This function is asynchronous. An arbitrary worker thread will call the DONE + callback when processing is done. */ + +template<std::size_t min_batch_size, class RandomIt, class Worker, + class... WorkerArgs> void -sequential_for_each (unsigned n, RandomIt first, RandomIt last, - RangeFunction callback) +parallel_for_each_async (const RandomIt first, const RandomIt last, + std::function<void ()> &&done, + WorkerArgs &&...worker_args) { - callback (first, last); + gdb_assert (first <= last); + + if (parallel_for_each_debug) + { + debug_printf ("Parallel for: n elements: %zu\n", + static_cast<std::size_t> (last - first)); + debug_printf ("Parallel for: min batch size: %zu\n", min_batch_size); + } + + const size_t n_worker_threads + = std::max<size_t> (thread_pool::g_thread_pool->thread_count (), 1); + + /* The state shared between all worker threads. All worker threads get a + reference on the shared pointer through the lambda below. The last worker + thread to drop its reference will cause this object to be destroyed, which + will call the DONE callback. */ + using state_t = detail::pfea_state<min_batch_size, RandomIt, WorkerArgs...>; + auto state + = std::make_shared<state_t> (first, last, std::move (done), + std::forward<WorkerArgs> (worker_args)...); + + /* The worker thread task. */ + auto task = [state] () + { + /* Instantiate the user-defined worker. */ + auto worker = std::make_from_tuple<Worker> (state->worker_args_tuple); + + for (;;) + { + const auto batch = state->queue.pop_batch (); + + if (batch.empty ()) + break; + + if (parallel_for_each_debug) + debug_printf ("Processing %zu items, range [%zu, %zu[\n", + batch.size (), + batch.begin () - state->first, + batch.end () - state->first); + + worker (batch); + } + }; + + /* Start N_WORKER_THREADS tasks. */ + for (int i = 0; i < n_worker_threads; ++i) + gdb::thread_pool::g_thread_pool->post_task (task); } } |