aboutsummaryrefslogtreecommitdiff
path: root/gdbsupport/parallel-for.h
diff options
context:
space:
mode:
Diffstat (limited to 'gdbsupport/parallel-for.h')
-rw-r--r--gdbsupport/parallel-for.h286
1 files changed, 196 insertions, 90 deletions
diff --git a/gdbsupport/parallel-for.h b/gdbsupport/parallel-for.h
index c485c36..7f3fef9 100644
--- a/gdbsupport/parallel-for.h
+++ b/gdbsupport/parallel-for.h
@@ -21,116 +21,92 @@
#define GDBSUPPORT_PARALLEL_FOR_H
#include <algorithm>
-#include <type_traits>
+#include <atomic>
+#include <tuple>
+#include "gdbsupport/iterator-range.h"
#include "gdbsupport/thread-pool.h"
-#include "gdbsupport/function-view.h"
+#include "gdbsupport/work-queue.h"
namespace gdb
{
-/* A very simple "parallel for". This splits the range of iterators
- into subranges, and then passes each subrange to the callback. The
- work may or may not be done in separate threads.
+/* If enabled, print debug info about the inner workings of the parallel for
+ each functions. */
+constexpr bool parallel_for_each_debug = false;
- This approach was chosen over having the callback work on single
- items because it makes it simple for the caller to do
- once-per-subrange initialization and destruction.
+/* A "parallel-for" implementation using a shared work queue. Work items get
+ popped in batches of size up to BATCH_SIZE from the queue and handed out to
+ worker threads.
- The parameter N says how batching ought to be done -- there will be
- at least N elements processed per thread. Setting N to 0 is not
- allowed. */
+ Each worker thread instantiates an object of type Worker, forwarding ARGS to
+ its constructor. The Worker object can be used to keep some per-worker
+ thread state.
-template<class RandomIt, class RangeFunction>
+ Worker threads call Worker::operator() repeatedly until the queue is
+ empty.
+
+ This function is synchronous, meaning that it blocks and returns once the
+ processing is complete. */
+
+template<std::size_t batch_size, class RandomIt, class Worker,
+ class... WorkerArgs>
void
-parallel_for_each (unsigned n, RandomIt first, RandomIt last,
- RangeFunction callback)
+parallel_for_each (const RandomIt first, const RandomIt last,
+ WorkerArgs &&...worker_args)
{
- /* If enabled, print debug info about how the work is distributed across
- the threads. */
- const bool parallel_for_each_debug = false;
+ gdb_assert (first <= last);
- size_t n_worker_threads = thread_pool::g_thread_pool->thread_count ();
- size_t n_threads = n_worker_threads;
- size_t n_elements = last - first;
- size_t elts_per_thread = 0;
- size_t elts_left_over = 0;
-
- if (n_threads > 1)
+ if (parallel_for_each_debug)
{
- /* Require that there should be at least N elements in a
- thread. */
- gdb_assert (n > 0);
- if (n_elements / n_threads < n)
- n_threads = std::max (n_elements / n, (size_t) 1);
- elts_per_thread = n_elements / n_threads;
- elts_left_over = n_elements % n_threads;
- /* n_elements == n_threads * elts_per_thread + elts_left_over. */
+ debug_printf ("Parallel for: n elements: %zu\n",
+ static_cast<std::size_t> (last - first));
+ debug_printf ("Parallel for: batch size: %zu\n", batch_size);
}
- size_t count = n_threads == 0 ? 0 : n_threads - 1;
std::vector<gdb::future<void>> results;
+ work_queue<RandomIt, batch_size> queue (first, last);
- if (parallel_for_each_debug)
- {
- debug_printf (_("Parallel for: n_elements: %zu\n"), n_elements);
- debug_printf (_("Parallel for: minimum elements per thread: %u\n"), n);
- debug_printf (_("Parallel for: elts_per_thread: %zu\n"), elts_per_thread);
- }
+ /* The worker thread task.
+
+ We need to capture args as a tuple, because it's not possible to capture
+ the parameter pack directly in C++17. Once we migrate to C++20, the
+ capture can be simplified to:
- for (int i = 0; i < count; ++i)
+ ... args = std::forward<Args>(args)
+
+ and `args` can be used as-is in the lambda. */
+ auto args_tuple
+ = std::forward_as_tuple (std::forward<WorkerArgs> (worker_args)...);
+ auto task = [&queue, first, &args_tuple] ()
{
- RandomIt end;
- end = first + elts_per_thread;
- if (i < elts_left_over)
- /* Distribute the leftovers over the worker threads, to avoid having
- to handle all of them in a single thread. */
- end++;
-
- /* This case means we don't have enough elements to really
- distribute them. Rather than ever submit a task that does
- nothing, we short-circuit here. */
- if (first == end)
- end = last;
-
- if (end == last)
- {
- /* We're about to dispatch the last batch of elements, which
- we normally process in the main thread. So just truncate
- the result list here. This avoids submitting empty tasks
- to the thread pool. */
- count = i;
- break;
- }
+ /* Instantiate the user-defined worker. */
+ auto worker = std::make_from_tuple<Worker> (args_tuple);
- if (parallel_for_each_debug)
+ for (;;)
{
- debug_printf (_("Parallel for: elements on worker thread %i\t: %zu"),
- i, (size_t)(end - first));
- debug_printf (_("\n"));
+ const auto batch = queue.pop_batch ();
+
+ if (batch.empty ())
+ break;
+
+ if (parallel_for_each_debug)
+ debug_printf ("Processing %zu items, range [%zu, %zu[\n",
+ batch.size (),
+ batch.begin () - first,
+ batch.end () - first);
+
+ worker (batch);
}
- results.push_back (gdb::thread_pool::g_thread_pool->post_task ([=] ()
- {
- return callback (first, end);
- }));
- first = end;
- }
+ };
- for (int i = count; i < n_worker_threads; ++i)
- if (parallel_for_each_debug)
- {
- debug_printf (_("Parallel for: elements on worker thread %i\t: 0"), i);
- debug_printf (_("\n"));
- }
+ /* Start N_WORKER_THREADS tasks. */
+ const size_t n_worker_threads
+ = std::max<size_t> (thread_pool::g_thread_pool->thread_count (), 1);
- /* Process all the remaining elements in the main thread. */
- if (parallel_for_each_debug)
- {
- debug_printf (_("Parallel for: elements on main thread\t\t: %zu"),
- (size_t)(last - first));
- debug_printf (_("\n"));
- }
- callback (first, last);
+ for (int i = 0; i < n_worker_threads; ++i)
+ results.push_back (gdb::thread_pool::g_thread_pool->post_task (task));
+ /* Wait for all of them to be finished. */
for (auto &fut : results)
fut.get ();
}
@@ -139,12 +115,142 @@ parallel_for_each (unsigned n, RandomIt first, RandomIt last,
when debugging multi-threading behavior, and you want to limit
multi-threading in a fine-grained way. */
-template<class RandomIt, class RangeFunction>
+template<class RandomIt, class Worker, class... WorkerArgs>
+void
+sequential_for_each (RandomIt first, RandomIt last, WorkerArgs &&...worker_args)
+{
+ if (first == last)
+ return;
+
+ Worker (std::forward<WorkerArgs> (worker_args)...) ({ first, last });
+}
+
+namespace detail
+{
+
+/* Type to hold the state shared between threads of
+ gdb::parallel_for_each_async. */
+
+template<std::size_t min_batch_size, typename RandomIt, typename... WorkerArgs>
+struct pfea_state
+{
+ pfea_state (RandomIt first, RandomIt last, std::function<void ()> &&done,
+ WorkerArgs &&...worker_args)
+ : first (first),
+ last (last),
+ worker_args_tuple (std::forward_as_tuple
+ (std::forward<WorkerArgs> (worker_args)...)),
+ queue (first, last),
+ m_done (std::move (done))
+ {}
+
+ DISABLE_COPY_AND_ASSIGN (pfea_state);
+
+ /* This gets called by the last worker thread that drops its reference on
+ the shared state, thus when the processing is complete. */
+ ~pfea_state ()
+ {
+ if (m_done)
+ m_done ();
+ }
+
+ /* The interval to process. */
+ const RandomIt first, last;
+
+ /* Tuple of arguments to pass when constructing the user's worker object.
+
+ Use std::decay_t to avoid storing references to the caller's local
+ variables. If we didn't use it and the caller passed an lvalue `foo *`,
+ we would store it as a reference to `foo *`, thus storing a reference to
+ the caller's local variable.
+
+ The downside is that it's not possible to pass arguments by reference,
+ callers need to pass pointers or std::reference_wrappers. */
+ std::tuple<std::decay_t<WorkerArgs>...> worker_args_tuple;
+
+ /* Work queue that worker threads pull work items from. */
+ work_queue<RandomIt, min_batch_size> queue;
+
+private:
+ /* Callable called when the parallel-for is done. */
+ std::function<void ()> m_done;
+};
+
+} /* namespace detail */
+
+/* A "parallel-for" implementation using a shared work queue. Work items get
+ popped in batches from the queue and handed out to worker threads.
+
+ Batch sizes are proportional to the number of remaining items in the queue,
+ but always greater or equal to MIN_BATCH_SIZE.
+
+ The DONE callback is invoked when processing is done.
+
+ Each worker thread instantiates an object of type Worker, forwarding ARGS to
+ its constructor. The Worker object can be used to keep some per-worker
+ thread state. This version does not support passing references as arguments
+ to the worker. Use std::reference_wrapper or pointers instead.
+
+ Worker threads call Worker::operator() repeatedly until the queue is
+ empty.
+
+ This function is asynchronous. An arbitrary worker thread will call the DONE
+ callback when processing is done. */
+
+template<std::size_t min_batch_size, class RandomIt, class Worker,
+ class... WorkerArgs>
void
-sequential_for_each (unsigned n, RandomIt first, RandomIt last,
- RangeFunction callback)
+parallel_for_each_async (const RandomIt first, const RandomIt last,
+ std::function<void ()> &&done,
+ WorkerArgs &&...worker_args)
{
- callback (first, last);
+ gdb_assert (first <= last);
+
+ if (parallel_for_each_debug)
+ {
+ debug_printf ("Parallel for: n elements: %zu\n",
+ static_cast<std::size_t> (last - first));
+ debug_printf ("Parallel for: min batch size: %zu\n", min_batch_size);
+ }
+
+ const size_t n_worker_threads
+ = std::max<size_t> (thread_pool::g_thread_pool->thread_count (), 1);
+
+ /* The state shared between all worker threads. All worker threads get a
+ reference on the shared pointer through the lambda below. The last worker
+ thread to drop its reference will cause this object to be destroyed, which
+ will call the DONE callback. */
+ using state_t = detail::pfea_state<min_batch_size, RandomIt, WorkerArgs...>;
+ auto state
+ = std::make_shared<state_t> (first, last, std::move (done),
+ std::forward<WorkerArgs> (worker_args)...);
+
+ /* The worker thread task. */
+ auto task = [state] ()
+ {
+ /* Instantiate the user-defined worker. */
+ auto worker = std::make_from_tuple<Worker> (state->worker_args_tuple);
+
+ for (;;)
+ {
+ const auto batch = state->queue.pop_batch ();
+
+ if (batch.empty ())
+ break;
+
+ if (parallel_for_each_debug)
+ debug_printf ("Processing %zu items, range [%zu, %zu[\n",
+ batch.size (),
+ batch.begin () - state->first,
+ batch.end () - state->first);
+
+ worker (batch);
+ }
+ };
+
+ /* Start N_WORKER_THREADS tasks. */
+ for (int i = 0; i < n_worker_threads; ++i)
+ gdb::thread_pool::g_thread_pool->post_task (task);
}
}