|  | /* Parallel for loops | 
|  |  | 
|  | Copyright (C) 2019-2025 Free Software Foundation, Inc. | 
|  |  | 
|  | This file is part of GDB. | 
|  |  | 
|  | This program is free software; you can redistribute it and/or modify | 
|  | it under the terms of the GNU General Public License as published by | 
|  | the Free Software Foundation; either version 3 of the License, or | 
|  | (at your option) any later version. | 
|  |  | 
|  | This program is distributed in the hope that it will be useful, | 
|  | but WITHOUT ANY WARRANTY; without even the implied warranty of | 
|  | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | 
|  | GNU General Public License for more details. | 
|  |  | 
|  | You should have received a copy of the GNU General Public License | 
|  | along with this program.  If not, see <http://www.gnu.org/licenses/>.  */ | 
|  |  | 
|  | #ifndef GDBSUPPORT_PARALLEL_FOR_H | 
|  | #define GDBSUPPORT_PARALLEL_FOR_H | 
|  |  | 
|  | #include <algorithm> | 
|  | #include <atomic> | 
|  | #include <tuple> | 
|  | #include "gdbsupport/iterator-range.h" | 
|  | #include "gdbsupport/thread-pool.h" | 
|  | #include "gdbsupport/work-queue.h" | 
|  |  | 
|  | namespace gdb | 
|  | { | 
|  |  | 
|  | /* If enabled, print debug info about the inner workings of the parallel for | 
|  | each functions.  */ | 
|  | constexpr bool parallel_for_each_debug = false; | 
|  |  | 
|  | /* A "parallel-for" implementation using a shared work queue.  Work items get | 
|  | popped in batches of size up to BATCH_SIZE from the queue and handed out to | 
|  | worker threads. | 
|  |  | 
|  | Each worker thread instantiates an object of type Worker, forwarding ARGS to | 
|  | its constructor.  The Worker object can be used to keep some per-worker | 
|  | thread state. | 
|  |  | 
|  | Worker threads call Worker::operator() repeatedly until the queue is | 
|  | empty. | 
|  |  | 
|  | This function is synchronous, meaning that it blocks and returns once the | 
|  | processing is complete.  */ | 
|  |  | 
|  | template<std::size_t batch_size, class RandomIt, class Worker, | 
|  | class... WorkerArgs> | 
|  | void | 
|  | parallel_for_each (const RandomIt first, const RandomIt last, | 
|  | WorkerArgs &&...worker_args) | 
|  | { | 
|  | gdb_assert (first <= last); | 
|  |  | 
|  | if (parallel_for_each_debug) | 
|  | { | 
|  | debug_printf ("Parallel for: n elements: %zu\n", | 
|  | static_cast<std::size_t> (last - first)); | 
|  | debug_printf ("Parallel for: batch size: %zu\n", batch_size); | 
|  | } | 
|  |  | 
|  | std::vector<gdb::future<void>> results; | 
|  | work_queue<RandomIt, batch_size> queue (first, last); | 
|  |  | 
|  | /* The worker thread task. | 
|  |  | 
|  | We need to capture args as a tuple, because it's not possible to capture | 
|  | the parameter pack directly in C++17.  Once we migrate to C++20, the | 
|  | capture can be simplified to: | 
|  |  | 
|  | ... args = std::forward<Args>(args) | 
|  |  | 
|  | and `args` can be used as-is in the lambda.  */ | 
|  | auto args_tuple | 
|  | = std::forward_as_tuple (std::forward<WorkerArgs> (worker_args)...); | 
|  | auto task = [&queue, first, &args_tuple] () | 
|  | { | 
|  | /* Instantiate the user-defined worker.  */ | 
|  | auto worker = std::make_from_tuple<Worker> (args_tuple); | 
|  |  | 
|  | for (;;) | 
|  | { | 
|  | const auto batch = queue.pop_batch (); | 
|  |  | 
|  | if (batch.empty ()) | 
|  | break; | 
|  |  | 
|  | if (parallel_for_each_debug) | 
|  | debug_printf ("Processing %zu items, range [%zu, %zu[\n", | 
|  | batch.size (), | 
|  | batch.begin () - first, | 
|  | batch.end () - first); | 
|  |  | 
|  | worker (batch); | 
|  | } | 
|  | }; | 
|  |  | 
|  | /* Start N_WORKER_THREADS tasks.  */ | 
|  | const size_t n_worker_threads | 
|  | = std::max<size_t> (thread_pool::g_thread_pool->thread_count (), 1); | 
|  |  | 
|  | for (int i = 0; i < n_worker_threads; ++i) | 
|  | results.push_back (gdb::thread_pool::g_thread_pool->post_task (task)); | 
|  |  | 
|  | /* Wait for all of them to be finished.  */ | 
|  | for (auto &fut : results) | 
|  | fut.get (); | 
|  | } | 
|  |  | 
|  | /* A sequential drop-in replacement of parallel_for_each.  This can be useful | 
|  | when debugging multi-threading behavior, and you want to limit | 
|  | multi-threading in a fine-grained way.  */ | 
|  |  | 
|  | template<class RandomIt, class Worker, class... WorkerArgs> | 
|  | void | 
|  | sequential_for_each (RandomIt first, RandomIt last, WorkerArgs &&...worker_args) | 
|  | { | 
|  | if (first == last) | 
|  | return; | 
|  |  | 
|  | Worker (std::forward<WorkerArgs> (worker_args)...) ({ first, last }); | 
|  | } | 
|  |  | 
|  | namespace detail | 
|  | { | 
|  |  | 
|  | /* Type to hold the state shared between threads of | 
|  | gdb::parallel_for_each_async.  */ | 
|  |  | 
|  | template<std::size_t min_batch_size, typename RandomIt, typename... WorkerArgs> | 
|  | struct pfea_state | 
|  | { | 
|  | pfea_state (RandomIt first, RandomIt last, std::function<void ()> &&done, | 
|  | WorkerArgs &&...worker_args) | 
|  | : first (first), | 
|  | last (last), | 
|  | worker_args_tuple (std::forward_as_tuple | 
|  | (std::forward<WorkerArgs> (worker_args)...)), | 
|  | queue (first, last), | 
|  | m_done (std::move (done)) | 
|  | {} | 
|  |  | 
|  | DISABLE_COPY_AND_ASSIGN (pfea_state); | 
|  |  | 
|  | /* This gets called by the last worker thread that drops its reference on | 
|  | the shared state, thus when the processing is complete.  */ | 
|  | ~pfea_state () | 
|  | { | 
|  | if (m_done) | 
|  | m_done (); | 
|  | } | 
|  |  | 
|  | /* The interval to process.  */ | 
|  | const RandomIt first, last; | 
|  |  | 
|  | /* Tuple of arguments to pass when constructing the user's worker object. | 
|  |  | 
|  | Use std::decay_t to avoid storing references to the caller's local | 
|  | variables.  If we didn't use it and the caller passed an lvalue `foo *`, | 
|  | we would store it as a reference to `foo *`, thus storing a reference to | 
|  | the caller's local variable. | 
|  |  | 
|  | The downside is that it's not possible to pass arguments by reference, | 
|  | callers need to pass pointers or std::reference_wrappers.  */ | 
|  | std::tuple<std::decay_t<WorkerArgs>...> worker_args_tuple; | 
|  |  | 
|  | /* Work queue that worker threads pull work items from.  */ | 
|  | work_queue<RandomIt, min_batch_size> queue; | 
|  |  | 
|  | private: | 
|  | /* Callable called when the parallel-for is done.  */ | 
|  | std::function<void ()> m_done; | 
|  | }; | 
|  |  | 
|  | } /* namespace detail */ | 
|  |  | 
|  | /* A "parallel-for" implementation using a shared work queue.  Work items get | 
|  | popped in batches from the queue and handed out to worker threads. | 
|  |  | 
|  | Batch sizes are proportional to the number of remaining items in the queue, | 
|  | but always greater or equal to MIN_BATCH_SIZE. | 
|  |  | 
|  | The DONE callback is invoked when processing is done. | 
|  |  | 
|  | Each worker thread instantiates an object of type Worker, forwarding ARGS to | 
|  | its constructor.  The Worker object can be used to keep some per-worker | 
|  | thread state.  This version does not support passing references as arguments | 
|  | to the worker.  Use std::reference_wrapper or pointers instead. | 
|  |  | 
|  | Worker threads call Worker::operator() repeatedly until the queue is | 
|  | empty. | 
|  |  | 
|  | This function is asynchronous.  An arbitrary worker thread will call the DONE | 
|  | callback when processing is done.  */ | 
|  |  | 
|  | template<std::size_t min_batch_size, class RandomIt, class Worker, | 
|  | class... WorkerArgs> | 
|  | void | 
|  | parallel_for_each_async (const RandomIt first, const RandomIt last, | 
|  | std::function<void ()> &&done, | 
|  | WorkerArgs &&...worker_args) | 
|  | { | 
|  | gdb_assert (first <= last); | 
|  |  | 
|  | if (parallel_for_each_debug) | 
|  | { | 
|  | debug_printf ("Parallel for: n elements: %zu\n", | 
|  | static_cast<std::size_t> (last - first)); | 
|  | debug_printf ("Parallel for: min batch size: %zu\n", min_batch_size); | 
|  | } | 
|  |  | 
|  | const size_t n_worker_threads | 
|  | = std::max<size_t> (thread_pool::g_thread_pool->thread_count (), 1); | 
|  |  | 
|  | /* The state shared between all worker threads.  All worker threads get a | 
|  | reference on the shared pointer through the lambda below.  The last worker | 
|  | thread to drop its reference will cause this object to be destroyed, which | 
|  | will call the DONE callback.  */ | 
|  | using state_t = detail::pfea_state<min_batch_size, RandomIt, WorkerArgs...>; | 
|  | auto state | 
|  | = std::make_shared<state_t> (first, last, std::move (done), | 
|  | std::forward<WorkerArgs> (worker_args)...); | 
|  |  | 
|  | /* The worker thread task.  */ | 
|  | auto task = [state] () | 
|  | { | 
|  | /* Instantiate the user-defined worker.  */ | 
|  | auto worker = std::make_from_tuple<Worker> (state->worker_args_tuple); | 
|  |  | 
|  | for (;;) | 
|  | { | 
|  | const auto batch = state->queue.pop_batch (); | 
|  |  | 
|  | if (batch.empty ()) | 
|  | break; | 
|  |  | 
|  | if (parallel_for_each_debug) | 
|  | debug_printf ("Processing %zu items, range [%zu, %zu[\n", | 
|  | batch.size (), | 
|  | batch.begin () - state->first, | 
|  | batch.end () - state->first); | 
|  |  | 
|  | worker (batch); | 
|  | } | 
|  | }; | 
|  |  | 
|  | /* Start N_WORKER_THREADS tasks.  */ | 
|  | for (int i = 0; i < n_worker_threads; ++i) | 
|  | gdb::thread_pool::g_thread_pool->post_task (task); | 
|  | } | 
|  |  | 
|  | } | 
|  |  | 
|  | #endif /* GDBSUPPORT_PARALLEL_FOR_H */ |