aboutsummaryrefslogtreecommitdiff
path: root/gprofng/src/DbeThread.cc
diff options
context:
space:
mode:
Diffstat (limited to 'gprofng/src/DbeThread.cc')
-rw-r--r--gprofng/src/DbeThread.cc224
1 files changed, 224 insertions, 0 deletions
diff --git a/gprofng/src/DbeThread.cc b/gprofng/src/DbeThread.cc
new file mode 100644
index 0000000..7eb3cb7
--- /dev/null
+++ b/gprofng/src/DbeThread.cc
@@ -0,0 +1,224 @@
+/* Copyright (C) 2021 Free Software Foundation, Inc.
+ Contributed by Oracle.
+
+ This file is part of GNU Binutils.
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3, or (at your option)
+ any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, 51 Franklin Street - Fifth Floor, Boston,
+ MA 02110-1301, USA. */
+
+#include "config.h"
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include "DbeThread.h"
+#include "util.h"
+#include "vec.h"
+
+static void
+cleanup_free_mutex (void* arg) {
+ // pthread_mutex_t *p_mutex = (pthread_mutex_t *) arg;
+ // if (p_mutex)
+ // pthread_mutex_unlock (p_mutex);
+}
+
+static void*
+thread_pool_loop (void* arg)
+{
+ DbeThreadPool *thrp = (DbeThreadPool*) arg;
+ Dprintf (DEBUG_THREADS, "thread_pool_loop:%d starting thread=%llu\n",
+ __LINE__, (unsigned long long) pthread_self ());
+
+ /* set my cancel state to 'enabled', and cancel type to 'defered'. */
+ pthread_setcancelstate (PTHREAD_CANCEL_ENABLE, NULL);
+ pthread_setcanceltype (PTHREAD_CANCEL_DEFERRED, NULL);
+
+ /* set thread cleanup handler */
+ pthread_cleanup_push (cleanup_free_mutex, (void*) & (thrp->p_mutex));
+ for (;;)
+ {
+ DbeQueue *q = thrp->get_queue ();
+ if (q)
+ { /* a request is pending */
+ Dprintf (DEBUG_THREADS,
+ "thread_pool_loop:%d thread=%llu queue=%d start\n",
+ __LINE__, (unsigned long long) pthread_self (), q->id);
+ q->func (q->arg);
+ Dprintf (DEBUG_THREADS,
+ "thread_pool_loop:%d thread=%llu queue=%d done\n",
+ __LINE__, (unsigned long long) pthread_self (), q->id);
+ delete q;
+ continue;
+ }
+ if (thrp->no_new_queues)
+ {
+ Dprintf (DEBUG_THREADS, "thread_pool_loop:%d exit thread=%llu\n",
+ __LINE__, (unsigned long long) pthread_self ());
+ pthread_exit (NULL);
+ }
+ Dprintf (DEBUG_THREADS,
+ "thread_pool_loop:%d before pthread_cond_wait thread=%llu\n",
+ __LINE__, (unsigned long long) pthread_self ());
+ pthread_mutex_lock (&thrp->p_mutex);
+ pthread_cond_wait (&thrp->p_cond_var, &thrp->p_mutex);
+ Dprintf (DEBUG_THREADS,
+ "thread_pool_loop:%d after pthread_cond_wait thread=%llu\n",
+ __LINE__, (unsigned long long) pthread_self ());
+ pthread_mutex_unlock (&thrp->p_mutex);
+ }
+
+ // never reached, but we must use it here. See `man pthread_cleanup_push`
+ pthread_cleanup_pop (0);
+}
+
+DbeThreadPool::DbeThreadPool (int _max_threads)
+{
+ static const int DBE_NTHREADS_DEFAULT = 4;
+ char *s = getenv ("GPROFNG_DBE_NTHREADS");
+ if (s)
+ {
+ max_threads = atoi (s);
+ if (max_threads < 0)
+ max_threads = 0;
+ if (_max_threads > 0 && max_threads < _max_threads)
+ max_threads = _max_threads;
+ }
+ else
+ {
+ max_threads = _max_threads;
+ if (max_threads < 0)
+ max_threads = DBE_NTHREADS_DEFAULT;
+ }
+ Dprintf (DEBUG_THREADS, "DbeThreadPool:%d max_threads %d ---> %d\n",
+ __LINE__, _max_threads, max_threads);
+ pthread_mutex_init (&p_mutex, NULL);
+ pthread_cond_init (&p_cond_var, NULL);
+ threads = new Vector <pthread_t>(max_threads);
+ queue = NULL;
+ last_queue = NULL;
+ no_new_queues = false;
+ queues_cnt = 0;
+ total_queues = 0;
+}
+
+DbeThreadPool::~DbeThreadPool ()
+{
+ delete threads;
+}
+
+DbeQueue *
+DbeThreadPool::get_queue ()
+{
+ pthread_mutex_lock (&p_mutex);
+ DbeQueue *q = queue;
+ Dprintf (DEBUG_THREADS,
+ "get_queue:%d thr: %lld id=%d queues_cnt=%d threads_cnt=%d max_threads=%d\n",
+ __LINE__, (unsigned long long) pthread_self (),
+ q ? q->id : -1, queues_cnt, (int) threads->size (), max_threads);
+ if (q)
+ {
+ queue = q->next;
+ queues_cnt--;
+ }
+ pthread_mutex_unlock (&p_mutex);
+ return q;
+}
+
+void
+DbeThreadPool::put_queue (DbeQueue *q)
+{
+ if (max_threads == 0)
+ {
+ // nothing runs in parallel
+ q->id = ++total_queues;
+ Dprintf (DEBUG_THREADS, NTXT ("put_queue:%d thr=%lld max_threads=%d queue (%d) runs on the worked thread\n"),
+ __LINE__, (unsigned long long) pthread_self (), max_threads, q->id);
+ q->func (q->arg);
+ delete q;
+ return;
+ }
+
+ pthread_mutex_lock (&p_mutex);
+ // nothing runs in parallel
+ q->id = ++total_queues;
+ Dprintf (DEBUG_THREADS, "put_queue:%d thr=%lld max_threads=%d queue (%d)\n",
+ __LINE__, (unsigned long long) pthread_self (), max_threads, q->id);
+ if (queue)
+ {
+ last_queue->next = q;
+ last_queue = q;
+ }
+ else
+ {
+ queue = q;
+ last_queue = q;
+ }
+ queues_cnt++;
+ Dprintf (DEBUG_THREADS,
+ "put_queue:%d id=%d queues_cnt=%d threads_cnt=%d max_threads=%d\n",
+ __LINE__, q->id, queues_cnt, (int) threads->size (), max_threads);
+ if (queues_cnt > threads->size () && threads->size () < max_threads)
+ {
+ pthread_t thr;
+ int r = pthread_create (&thr, NULL, thread_pool_loop, (void *) this);
+ Dprintf (DEBUG_THREADS,
+ "put_queue:%d pthread_create returns %d thr=%llu\n",
+ __LINE__, r, (unsigned long long) thr);
+ if (r)
+ fprintf (stderr, GTXT ("pthread_create failed. errnum=%d (%s)\n"), r,
+ STR (strerror (r)));
+ else
+ threads->append (thr);
+ }
+ pthread_cond_signal (&p_cond_var);
+ pthread_mutex_unlock (&p_mutex);
+}
+
+void
+DbeThreadPool::wait_queues ()
+{
+ pthread_mutex_lock (&p_mutex);
+ no_new_queues = true;
+ pthread_mutex_unlock (&p_mutex);
+ pthread_cond_broadcast (&p_cond_var);
+ for (;;) // Run requests on the worked thread too
+ {
+ DbeQueue *q = get_queue ();
+ if (q == NULL)
+ break;
+ Dprintf (DEBUG_THREADS, "wait_queues:%d thread=%llu queue=%d start\n",
+ __LINE__, (unsigned long long) pthread_self (), q->id);
+ q->func (q->arg);
+ Dprintf (DEBUG_THREADS, "wait_queues:%d thread=%llu queue=%d done\n",
+ __LINE__, (unsigned long long) pthread_self (), q->id);
+ delete q;
+ }
+ for (int i = 0, sz = threads->size (); i < sz; i++)
+ {
+ void *retval;
+ pthread_join (threads->get (i), &retval);
+ }
+}
+
+DbeQueue::DbeQueue (int (*_func) (void *arg), void *_arg)
+{
+ func = _func;
+ arg = _arg;
+ next = NULL;
+}
+
+DbeQueue::~DbeQueue () { }