aboutsummaryrefslogtreecommitdiff
path: root/rt/aio_misc.c
diff options
context:
space:
mode:
Diffstat (limited to 'rt/aio_misc.c')
-rw-r--r--rt/aio_misc.c371
1 files changed, 228 insertions, 143 deletions
diff --git a/rt/aio_misc.c b/rt/aio_misc.c
index fa3c75c..97ef69f 100644
--- a/rt/aio_misc.c
+++ b/rt/aio_misc.c
@@ -1,5 +1,5 @@
/* Handle general operations.
- Copyright (C) 1997, 1998, 1999 Free Software Foundation, Inc.
+ Copyright (C) 1997, 1998, 1999, 2000 Free Software Foundation, Inc.
This file is part of the GNU C Library.
Contributed by Ulrich Drepper <drepper@cygnus.com>, 1997.
@@ -19,15 +19,19 @@
Boston, MA 02111-1307, USA. */
#include <aio.h>
+#include <assert.h>
#include <errno.h>
#include <limits.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/stat.h>
+#include <sys/time.h>
#include "aio_misc.h"
+static void add_request_to_runlist (struct requestlist *newrequest);
+
/* Pool of request list entries. */
static struct requestlist **pool;
@@ -55,6 +59,9 @@ static struct requestlist *requests;
/* Number of threads currently running. */
static int nthreads;
+/* Number of threads waiting for work to arrive. */
+static int idle_thread_count;
+
/* These are the values used to optimize the use of AIO. The user can
overwrite them by using the `aio_init' function. */
@@ -66,13 +73,19 @@ static struct aioinit optim =
0,
0,
0,
- { 0, }
+ 1,
+ 0
};
/* Since the list is global we need a mutex protecting it. */
pthread_mutex_t __aio_requests_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
+/* When you add a request to the list and there are idle threads present,
+ you signal this condition variable. When a thread finishes work, it waits
+ on this condition variable for a time before it actually exits. */
+pthread_cond_t __aio_new_request_notification = PTHREAD_COND_INITIALIZER;
+
/* Functions to handle request list pool. */
static struct requestlist *
@@ -85,6 +98,8 @@ get_elem (void)
struct requestlist *new_row;
size_t new_size;
+ assert(sizeof(struct aiocb) == sizeof(struct aiocb64));
+
/* Compute new size. */
new_size = pool_size ? pool_size + ENTRIES_PER_ROW : optim.aio_num;
@@ -210,6 +225,9 @@ __aio_init (const struct aioinit *init)
: init->aio_num & ~ENTRIES_PER_ROW);
}
+ if (init->aio_idle_time != 0)
+ optim.aio_idle_time = init->aio_idle_time;
+
/* Release the mutex. */
pthread_mutex_unlock (&__aio_requests_mutex);
}
@@ -299,6 +317,7 @@ __aio_enqueue_request (aiocb_union *aiocbp, int operation)
}
else
{
+ running = yes;
/* Enqueue this request for a new descriptor. */
if (last == NULL)
{
@@ -320,7 +339,7 @@ __aio_enqueue_request (aiocb_union *aiocbp, int operation)
newp->next_prio = NULL;
}
- if (running == no)
+ if (running == yes)
{
/* We try to create a new thread for this file descriptor. The
function which gets called will handle all available requests
@@ -330,8 +349,8 @@ __aio_enqueue_request (aiocb_union *aiocbp, int operation)
If no new thread can be created or if the specified limit of
threads for AIO is reached we queue the request. */
- /* See if we can create a thread. */
- if (nthreads < optim.aio_threads)
+ /* See if we need to and are able to create a thread. */
+ if (nthreads < optim.aio_threads && idle_thread_count == 0)
{
pthread_t thid;
pthread_attr_t attr;
@@ -358,24 +377,14 @@ __aio_enqueue_request (aiocb_union *aiocbp, int operation)
}
/* Enqueue the request in the run queue if it is not yet running. */
- if (running < yes && result == 0)
+ if (running == yes && result == 0)
{
- if (runlist == NULL || runlist->aiocbp->aiocb.__abs_prio < prio)
- {
- newp->next_run = runlist;
- runlist = newp;
- }
- else
- {
- runp = runlist;
+ add_request_to_runlist (newp);
- while (runp->next_run != NULL
- && runp->next_run->aiocbp->aiocb.__abs_prio >= prio)
- runp = runp->next_run;
-
- newp->next_run = runp->next_run;
- runp->next_run = newp;
- }
+ /* If there is a thread waiting for work, then let it know that we
+ have just given it something to do. */
+ if (idle_thread_count > 0)
+ pthread_cond_signal (&__aio_new_request_notification);
}
if (result == 0)
@@ -408,149 +417,197 @@ handle_fildes_io (void *arg)
do
{
- /* Update our variables. */
- aiocbp = runp->aiocbp;
- fildes = aiocbp->aiocb.aio_fildes;
-
- /* Change the priority to the requested value (if necessary). */
- if (aiocbp->aiocb.__abs_prio != param.sched_priority
- || aiocbp->aiocb.__policy != policy)
+ /* If runp is NULL, then we were created to service the work queue
+ in general, not to handle any particular request. In that case we
+ skip the "do work" stuff on the first pass, and go directly to the
+ "get work off the work queue" part of this loop, which is near the
+ end. */
+ if (runp == NULL)
+ pthread_mutex_lock (&__aio_requests_mutex);
+ else
{
- param.sched_priority = aiocbp->aiocb.__abs_prio;
- policy = aiocbp->aiocb.__policy;
- pthread_setschedparam (self, policy, &param);
- }
+ /* Update our variables. */
+ aiocbp = runp->aiocbp;
+ fildes = aiocbp->aiocb.aio_fildes;
- /* Process request pointed to by RUNP. We must not be disturbed
- by signals. */
- if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_READ)
- {
- if (aiocbp->aiocb.aio_lio_opcode & 128)
- aiocbp->aiocb.__return_value =
- TEMP_FAILURE_RETRY (__pread64 (fildes,
+ /* Change the priority to the requested value (if necessary). */
+ if (aiocbp->aiocb.__abs_prio != param.sched_priority
+ || aiocbp->aiocb.__policy != policy)
+ {
+ param.sched_priority = aiocbp->aiocb.__abs_prio;
+ policy = aiocbp->aiocb.__policy;
+ pthread_setschedparam (self, policy, &param);
+ }
+
+ /* Process request pointed to by RUNP. We must not be disturbed
+ by signals. */
+ if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_READ)
+ {
+ if (aiocbp->aiocb.aio_lio_opcode & 128)
+ aiocbp->aiocb.__return_value =
+ TEMP_FAILURE_RETRY (__pread64 (fildes, (void *)
+ aiocbp->aiocb64.aio_buf,
+ aiocbp->aiocb64.aio_nbytes,
+ aiocbp->aiocb64.aio_offset));
+ else
+ aiocbp->aiocb.__return_value =
+ TEMP_FAILURE_RETRY (pread (fildes,
+ (void *) aiocbp->aiocb.aio_buf,
+ aiocbp->aiocb.aio_nbytes,
+ aiocbp->aiocb.aio_offset));
+
+ if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE)
+ /* The Linux kernel is different from others. It returns
+ ESPIPE if using pread on a socket. Other platforms
+ simply ignore the offset parameter and behave like
+ read. */
+ aiocbp->aiocb.__return_value =
+ TEMP_FAILURE_RETRY (read (fildes,
+ (void *) aiocbp->aiocb64.aio_buf,
+ aiocbp->aiocb64.aio_nbytes));
+ }
+ else if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_WRITE)
+ {
+ if (aiocbp->aiocb.aio_lio_opcode & 128)
+ aiocbp->aiocb.__return_value =
+ TEMP_FAILURE_RETRY (__pwrite64 (fildes, (const void *)
+ aiocbp->aiocb64.aio_buf,
+ aiocbp->aiocb64.aio_nbytes,
+ aiocbp->aiocb64.aio_offset));
+ else
+ aiocbp->aiocb.__return_value =
+ TEMP_FAILURE_RETRY (pwrite (fildes, (const void *)
+ aiocbp->aiocb.aio_buf,
+ aiocbp->aiocb.aio_nbytes,
+ aiocbp->aiocb.aio_offset));
+
+ if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE)
+ /* The Linux kernel is different from others. It returns
+ ESPIPE if using pwrite on a socket. Other platforms
+ simply ignore the offset parameter and behave like
+ write. */
+ aiocbp->aiocb.__return_value =
+ TEMP_FAILURE_RETRY (write (fildes,
(void *) aiocbp->aiocb64.aio_buf,
- aiocbp->aiocb64.aio_nbytes,
- aiocbp->aiocb64.aio_offset));
- else
+ aiocbp->aiocb64.aio_nbytes));
+ }
+ else if (aiocbp->aiocb.aio_lio_opcode == LIO_DSYNC)
aiocbp->aiocb.__return_value =
- TEMP_FAILURE_RETRY (pread (fildes,
- (void *) aiocbp->aiocb.aio_buf,
- aiocbp->aiocb.aio_nbytes,
- aiocbp->aiocb.aio_offset));
-
- if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE)
- /* The Linux kernel is different from others. It returns
- ESPIPE if using pread on a socket. Other platforms
- simply ignore the offset parameter and behave like
- read. */
+ TEMP_FAILURE_RETRY (fdatasync (fildes));
+ else if (aiocbp->aiocb.aio_lio_opcode == LIO_SYNC)
aiocbp->aiocb.__return_value =
- TEMP_FAILURE_RETRY (read (fildes,
- (void *) aiocbp->aiocb64.aio_buf,
- aiocbp->aiocb64.aio_nbytes));
- }
- else if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_WRITE)
- {
- if (aiocbp->aiocb.aio_lio_opcode & 128)
- aiocbp->aiocb.__return_value =
- TEMP_FAILURE_RETRY (__pwrite64 (fildes,
- (const void *) aiocbp->aiocb64.aio_buf,
- aiocbp->aiocb64.aio_nbytes,
- aiocbp->aiocb64.aio_offset));
+ TEMP_FAILURE_RETRY (fsync (fildes));
else
- aiocbp->aiocb.__return_value =
- TEMP_FAILURE_RETRY (pwrite (fildes,
- (const void *) aiocbp->aiocb.aio_buf,
- aiocbp->aiocb.aio_nbytes,
- aiocbp->aiocb.aio_offset));
-
- if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE)
- /* The Linux kernel is different from others. It returns
- ESPIPE if using pwrite on a socket. Other platforms
- simply ignore the offset parameter and behave like
- write. */
- aiocbp->aiocb.__return_value =
- TEMP_FAILURE_RETRY (write (fildes,
- (void *) aiocbp->aiocb64.aio_buf,
- aiocbp->aiocb64.aio_nbytes));
- }
- else if (aiocbp->aiocb.aio_lio_opcode == LIO_DSYNC)
- aiocbp->aiocb.__return_value = TEMP_FAILURE_RETRY (fdatasync (fildes));
- else if (aiocbp->aiocb.aio_lio_opcode == LIO_SYNC)
- aiocbp->aiocb.__return_value = TEMP_FAILURE_RETRY (fsync (fildes));
- else
- {
- /* This is an invalid opcode. */
- aiocbp->aiocb.__return_value = -1;
- __set_errno (EINVAL);
- }
+ {
+ /* This is an invalid opcode. */
+ aiocbp->aiocb.__return_value = -1;
+ __set_errno (EINVAL);
+ }
- /* Get the mutex. */
- pthread_mutex_lock (&__aio_requests_mutex);
+ /* Get the mutex. */
+ pthread_mutex_lock (&__aio_requests_mutex);
- if (aiocbp->aiocb.__return_value == -1)
- aiocbp->aiocb.__error_code = errno;
- else
- aiocbp->aiocb.__error_code = 0;
+ if (aiocbp->aiocb.__return_value == -1)
+ aiocbp->aiocb.__error_code = errno;
+ else
+ aiocbp->aiocb.__error_code = 0;
- /* Send the signal to notify about finished processing of the
- request. */
- __aio_notify (runp);
+ /* Send the signal to notify about finished processing of the
+ request. */
+ __aio_notify (runp);
- /* Now dequeue the current request. */
- if (runp->next_prio == NULL)
- {
- /* No outstanding request for this descriptor. Remove this
- descriptor from the list. */
- if (runp->next_fd != NULL)
- runp->next_fd->last_fd = runp->last_fd;
- if (runp->last_fd != NULL)
- runp->last_fd->next_fd = runp->next_fd;
- else
- requests = runp->next_fd;
- }
- else
- {
- runp->next_prio->last_fd = runp->last_fd;
- runp->next_prio->next_fd = runp->next_fd;
- runp->next_prio->running = yes;
- if (runp->next_fd != NULL)
- runp->next_fd->last_fd = runp->next_prio;
- if (runp->last_fd != NULL)
- runp->last_fd->next_fd = runp->next_prio;
+ /* Now dequeue the current request. */
+ if (runp->next_prio == NULL)
+ {
+ /* No outstanding request for this descriptor. Remove this
+ descriptor from the list. */
+ if (runp->next_fd != NULL)
+ runp->next_fd->last_fd = runp->last_fd;
+ if (runp->last_fd != NULL)
+ runp->last_fd->next_fd = runp->next_fd;
+ else
+ requests = runp->next_fd;
+ }
else
- requests = runp->next_prio;
- }
+ {
+ runp->next_prio->last_fd = runp->last_fd;
+ runp->next_prio->next_fd = runp->next_fd;
+ runp->next_prio->running = yes;
+ if (runp->next_fd != NULL)
+ runp->next_fd->last_fd = runp->next_prio;
+ if (runp->last_fd != NULL)
+ runp->last_fd->next_fd = runp->next_prio;
+ else
+ requests = runp->next_prio;
+ add_request_to_runlist (runp->next_prio);
+ }
- /* Free the old element. */
- __aio_free_request (runp);
+ /* Free the old element. */
+ __aio_free_request (runp);
+ }
runp = runlist;
- if (runp != NULL)
+
+ /* If the runlist is empty, then we sleep for a while, waiting for
+ something to arrive in it. */
+ if (runp == NULL && optim.aio_idle_time >= 0)
{
- /* We must not run requests which are not marked `running'. */
- if (runp->running == yes)
- runlist = runp->next_run;
- else
+ struct timeval now;
+ struct timespec wakeup_time;
+
+ ++idle_thread_count;
+ gettimeofday (&now, NULL);
+ wakeup_time.tv_sec = now.tv_sec + optim.aio_idle_time;
+ wakeup_time.tv_nsec = now.tv_usec * 1000;
+ if (wakeup_time.tv_nsec > 1000000000)
{
- struct requestlist *old;
-
- do
- {
- old = runp;
- runp = runp->next_run;
- }
- while (runp != NULL && runp->running != yes);
-
- if (runp != NULL)
- old->next_run = runp->next_run;
+ wakeup_time.tv_nsec -= 1000000000;
+ ++wakeup_time.tv_sec;
}
+ pthread_cond_timedwait (&__aio_new_request_notification,
+ &__aio_requests_mutex,
+ &wakeup_time);
+ --idle_thread_count;
+ runp = runlist;
}
- /* If no request to work on we will stop the thread. */
if (runp == NULL)
--nthreads;
else
- runp->running = allocated;
+ {
+ assert (runp->running == yes);
+ runp->running = allocated;
+ runlist = runp->next_run;
+
+ /* If we have a request to process, and there's still another in
+ the run list, then we need to either wake up or create a new
+ thread to service the request that is still in the run list. */
+ if (runlist != NULL)
+ {
+ /* There are at least two items in the work queue to work on.
+ If there are other idle threads, then we should wake them
+ up for these other work elements; otherwise, we should try
+ to create a new thread. */
+ if (idle_thread_count > 0)
+ pthread_cond_signal (&__aio_new_request_notification);
+ else if (nthreads < optim.aio_threads)
+ {
+ pthread_t thid;
+ pthread_attr_t attr;
+
+ /* Make sure the thread is created detached. */
+ pthread_attr_init (&attr);
+ pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
+
+ /* Now try to start a thread. If we fail, no big deal,
+ because we know that there is at least one thread (us)
+ that is working on AIO operations. */
+ if (pthread_create (&thid, &attr, handle_fildes_io, NULL)
+ == 0)
+ ++nthreads;
+ }
+ }
+ }
/* Release the mutex. */
pthread_mutex_unlock (&__aio_requests_mutex);
@@ -577,5 +634,33 @@ free_res (void)
free (pool);
}
-
text_set_element (__libc_subfreeres, free_res);
+
+
+/* Add newrequest to the runlist. The __abs_prio flag of newrequest must
+ be correctly set to do this. Also, you had better set newrequest's
+ "running" flag to "yes" before you release your lock or you'll throw an
+ assertion. */
+static void
+add_request_to_runlist (struct requestlist *newrequest)
+{
+ int prio = newrequest->aiocbp->aiocb.__abs_prio;
+ struct requestlist *runp;
+
+ if (runlist == NULL || runlist->aiocbp->aiocb.__abs_prio < prio)
+ {
+ newrequest->next_run = runlist;
+ runlist = newrequest;
+ }
+ else
+ {
+ runp = runlist;
+
+ while (runp->next_run != NULL
+ && runp->next_run->aiocbp->aiocb.__abs_prio >= prio)
+ runp = runp->next_run;
+
+ newrequest->next_run = runp->next_run;
+ runp->next_run = newrequest;
+ }
+}