diff options
Diffstat (limited to 'rt/aio_misc.c')
-rw-r--r-- | rt/aio_misc.c | 371 |
1 files changed, 228 insertions, 143 deletions
diff --git a/rt/aio_misc.c b/rt/aio_misc.c index fa3c75c..97ef69f 100644 --- a/rt/aio_misc.c +++ b/rt/aio_misc.c @@ -1,5 +1,5 @@ /* Handle general operations. - Copyright (C) 1997, 1998, 1999 Free Software Foundation, Inc. + Copyright (C) 1997, 1998, 1999, 2000 Free Software Foundation, Inc. This file is part of the GNU C Library. Contributed by Ulrich Drepper <drepper@cygnus.com>, 1997. @@ -19,15 +19,19 @@ Boston, MA 02111-1307, USA. */ #include <aio.h> +#include <assert.h> #include <errno.h> #include <limits.h> #include <pthread.h> #include <stdlib.h> #include <unistd.h> #include <sys/stat.h> +#include <sys/time.h> #include "aio_misc.h" +static void add_request_to_runlist (struct requestlist *newrequest); + /* Pool of request list entries. */ static struct requestlist **pool; @@ -55,6 +59,9 @@ static struct requestlist *requests; /* Number of threads currently running. */ static int nthreads; +/* Number of threads waiting for work to arrive. */ +static int idle_thread_count; + /* These are the values used to optimize the use of AIO. The user can overwrite them by using the `aio_init' function. */ @@ -66,13 +73,19 @@ static struct aioinit optim = 0, 0, 0, - { 0, } + 1, + 0 }; /* Since the list is global we need a mutex protecting it. */ pthread_mutex_t __aio_requests_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP; +/* When you add a request to the list and there are idle threads present, + you signal this condition variable. When a thread finishes work, it waits + on this condition variable for a time before it actually exits. */ +pthread_cond_t __aio_new_request_notification = PTHREAD_COND_INITIALIZER; + /* Functions to handle request list pool. */ static struct requestlist * @@ -85,6 +98,8 @@ get_elem (void) struct requestlist *new_row; size_t new_size; + assert(sizeof(struct aiocb) == sizeof(struct aiocb64)); + /* Compute new size. */ new_size = pool_size ? pool_size + ENTRIES_PER_ROW : optim.aio_num; @@ -210,6 +225,9 @@ __aio_init (const struct aioinit *init) : init->aio_num & ~ENTRIES_PER_ROW); } + if (init->aio_idle_time != 0) + optim.aio_idle_time = init->aio_idle_time; + /* Release the mutex. */ pthread_mutex_unlock (&__aio_requests_mutex); } @@ -299,6 +317,7 @@ __aio_enqueue_request (aiocb_union *aiocbp, int operation) } else { + running = yes; /* Enqueue this request for a new descriptor. */ if (last == NULL) { @@ -320,7 +339,7 @@ __aio_enqueue_request (aiocb_union *aiocbp, int operation) newp->next_prio = NULL; } - if (running == no) + if (running == yes) { /* We try to create a new thread for this file descriptor. The function which gets called will handle all available requests @@ -330,8 +349,8 @@ __aio_enqueue_request (aiocb_union *aiocbp, int operation) If no new thread can be created or if the specified limit of threads for AIO is reached we queue the request. */ - /* See if we can create a thread. */ - if (nthreads < optim.aio_threads) + /* See if we need to and are able to create a thread. */ + if (nthreads < optim.aio_threads && idle_thread_count == 0) { pthread_t thid; pthread_attr_t attr; @@ -358,24 +377,14 @@ __aio_enqueue_request (aiocb_union *aiocbp, int operation) } /* Enqueue the request in the run queue if it is not yet running. */ - if (running < yes && result == 0) + if (running == yes && result == 0) { - if (runlist == NULL || runlist->aiocbp->aiocb.__abs_prio < prio) - { - newp->next_run = runlist; - runlist = newp; - } - else - { - runp = runlist; + add_request_to_runlist (newp); - while (runp->next_run != NULL - && runp->next_run->aiocbp->aiocb.__abs_prio >= prio) - runp = runp->next_run; - - newp->next_run = runp->next_run; - runp->next_run = newp; - } + /* If there is a thread waiting for work, then let it know that we + have just given it something to do. */ + if (idle_thread_count > 0) + pthread_cond_signal (&__aio_new_request_notification); } if (result == 0) @@ -408,149 +417,197 @@ handle_fildes_io (void *arg) do { - /* Update our variables. */ - aiocbp = runp->aiocbp; - fildes = aiocbp->aiocb.aio_fildes; - - /* Change the priority to the requested value (if necessary). */ - if (aiocbp->aiocb.__abs_prio != param.sched_priority - || aiocbp->aiocb.__policy != policy) + /* If runp is NULL, then we were created to service the work queue + in general, not to handle any particular request. In that case we + skip the "do work" stuff on the first pass, and go directly to the + "get work off the work queue" part of this loop, which is near the + end. */ + if (runp == NULL) + pthread_mutex_lock (&__aio_requests_mutex); + else { - param.sched_priority = aiocbp->aiocb.__abs_prio; - policy = aiocbp->aiocb.__policy; - pthread_setschedparam (self, policy, ¶m); - } + /* Update our variables. */ + aiocbp = runp->aiocbp; + fildes = aiocbp->aiocb.aio_fildes; - /* Process request pointed to by RUNP. We must not be disturbed - by signals. */ - if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_READ) - { - if (aiocbp->aiocb.aio_lio_opcode & 128) - aiocbp->aiocb.__return_value = - TEMP_FAILURE_RETRY (__pread64 (fildes, + /* Change the priority to the requested value (if necessary). */ + if (aiocbp->aiocb.__abs_prio != param.sched_priority + || aiocbp->aiocb.__policy != policy) + { + param.sched_priority = aiocbp->aiocb.__abs_prio; + policy = aiocbp->aiocb.__policy; + pthread_setschedparam (self, policy, ¶m); + } + + /* Process request pointed to by RUNP. We must not be disturbed + by signals. */ + if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_READ) + { + if (aiocbp->aiocb.aio_lio_opcode & 128) + aiocbp->aiocb.__return_value = + TEMP_FAILURE_RETRY (__pread64 (fildes, (void *) + aiocbp->aiocb64.aio_buf, + aiocbp->aiocb64.aio_nbytes, + aiocbp->aiocb64.aio_offset)); + else + aiocbp->aiocb.__return_value = + TEMP_FAILURE_RETRY (pread (fildes, + (void *) aiocbp->aiocb.aio_buf, + aiocbp->aiocb.aio_nbytes, + aiocbp->aiocb.aio_offset)); + + if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE) + /* The Linux kernel is different from others. It returns + ESPIPE if using pread on a socket. Other platforms + simply ignore the offset parameter and behave like + read. */ + aiocbp->aiocb.__return_value = + TEMP_FAILURE_RETRY (read (fildes, + (void *) aiocbp->aiocb64.aio_buf, + aiocbp->aiocb64.aio_nbytes)); + } + else if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_WRITE) + { + if (aiocbp->aiocb.aio_lio_opcode & 128) + aiocbp->aiocb.__return_value = + TEMP_FAILURE_RETRY (__pwrite64 (fildes, (const void *) + aiocbp->aiocb64.aio_buf, + aiocbp->aiocb64.aio_nbytes, + aiocbp->aiocb64.aio_offset)); + else + aiocbp->aiocb.__return_value = + TEMP_FAILURE_RETRY (pwrite (fildes, (const void *) + aiocbp->aiocb.aio_buf, + aiocbp->aiocb.aio_nbytes, + aiocbp->aiocb.aio_offset)); + + if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE) + /* The Linux kernel is different from others. It returns + ESPIPE if using pwrite on a socket. Other platforms + simply ignore the offset parameter and behave like + write. */ + aiocbp->aiocb.__return_value = + TEMP_FAILURE_RETRY (write (fildes, (void *) aiocbp->aiocb64.aio_buf, - aiocbp->aiocb64.aio_nbytes, - aiocbp->aiocb64.aio_offset)); - else + aiocbp->aiocb64.aio_nbytes)); + } + else if (aiocbp->aiocb.aio_lio_opcode == LIO_DSYNC) aiocbp->aiocb.__return_value = - TEMP_FAILURE_RETRY (pread (fildes, - (void *) aiocbp->aiocb.aio_buf, - aiocbp->aiocb.aio_nbytes, - aiocbp->aiocb.aio_offset)); - - if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE) - /* The Linux kernel is different from others. It returns - ESPIPE if using pread on a socket. Other platforms - simply ignore the offset parameter and behave like - read. */ + TEMP_FAILURE_RETRY (fdatasync (fildes)); + else if (aiocbp->aiocb.aio_lio_opcode == LIO_SYNC) aiocbp->aiocb.__return_value = - TEMP_FAILURE_RETRY (read (fildes, - (void *) aiocbp->aiocb64.aio_buf, - aiocbp->aiocb64.aio_nbytes)); - } - else if ((aiocbp->aiocb.aio_lio_opcode & 127) == LIO_WRITE) - { - if (aiocbp->aiocb.aio_lio_opcode & 128) - aiocbp->aiocb.__return_value = - TEMP_FAILURE_RETRY (__pwrite64 (fildes, - (const void *) aiocbp->aiocb64.aio_buf, - aiocbp->aiocb64.aio_nbytes, - aiocbp->aiocb64.aio_offset)); + TEMP_FAILURE_RETRY (fsync (fildes)); else - aiocbp->aiocb.__return_value = - TEMP_FAILURE_RETRY (pwrite (fildes, - (const void *) aiocbp->aiocb.aio_buf, - aiocbp->aiocb.aio_nbytes, - aiocbp->aiocb.aio_offset)); - - if (aiocbp->aiocb.__return_value == -1 && errno == ESPIPE) - /* The Linux kernel is different from others. It returns - ESPIPE if using pwrite on a socket. Other platforms - simply ignore the offset parameter and behave like - write. */ - aiocbp->aiocb.__return_value = - TEMP_FAILURE_RETRY (write (fildes, - (void *) aiocbp->aiocb64.aio_buf, - aiocbp->aiocb64.aio_nbytes)); - } - else if (aiocbp->aiocb.aio_lio_opcode == LIO_DSYNC) - aiocbp->aiocb.__return_value = TEMP_FAILURE_RETRY (fdatasync (fildes)); - else if (aiocbp->aiocb.aio_lio_opcode == LIO_SYNC) - aiocbp->aiocb.__return_value = TEMP_FAILURE_RETRY (fsync (fildes)); - else - { - /* This is an invalid opcode. */ - aiocbp->aiocb.__return_value = -1; - __set_errno (EINVAL); - } + { + /* This is an invalid opcode. */ + aiocbp->aiocb.__return_value = -1; + __set_errno (EINVAL); + } - /* Get the mutex. */ - pthread_mutex_lock (&__aio_requests_mutex); + /* Get the mutex. */ + pthread_mutex_lock (&__aio_requests_mutex); - if (aiocbp->aiocb.__return_value == -1) - aiocbp->aiocb.__error_code = errno; - else - aiocbp->aiocb.__error_code = 0; + if (aiocbp->aiocb.__return_value == -1) + aiocbp->aiocb.__error_code = errno; + else + aiocbp->aiocb.__error_code = 0; - /* Send the signal to notify about finished processing of the - request. */ - __aio_notify (runp); + /* Send the signal to notify about finished processing of the + request. */ + __aio_notify (runp); - /* Now dequeue the current request. */ - if (runp->next_prio == NULL) - { - /* No outstanding request for this descriptor. Remove this - descriptor from the list. */ - if (runp->next_fd != NULL) - runp->next_fd->last_fd = runp->last_fd; - if (runp->last_fd != NULL) - runp->last_fd->next_fd = runp->next_fd; - else - requests = runp->next_fd; - } - else - { - runp->next_prio->last_fd = runp->last_fd; - runp->next_prio->next_fd = runp->next_fd; - runp->next_prio->running = yes; - if (runp->next_fd != NULL) - runp->next_fd->last_fd = runp->next_prio; - if (runp->last_fd != NULL) - runp->last_fd->next_fd = runp->next_prio; + /* Now dequeue the current request. */ + if (runp->next_prio == NULL) + { + /* No outstanding request for this descriptor. Remove this + descriptor from the list. */ + if (runp->next_fd != NULL) + runp->next_fd->last_fd = runp->last_fd; + if (runp->last_fd != NULL) + runp->last_fd->next_fd = runp->next_fd; + else + requests = runp->next_fd; + } else - requests = runp->next_prio; - } + { + runp->next_prio->last_fd = runp->last_fd; + runp->next_prio->next_fd = runp->next_fd; + runp->next_prio->running = yes; + if (runp->next_fd != NULL) + runp->next_fd->last_fd = runp->next_prio; + if (runp->last_fd != NULL) + runp->last_fd->next_fd = runp->next_prio; + else + requests = runp->next_prio; + add_request_to_runlist (runp->next_prio); + } - /* Free the old element. */ - __aio_free_request (runp); + /* Free the old element. */ + __aio_free_request (runp); + } runp = runlist; - if (runp != NULL) + + /* If the runlist is empty, then we sleep for a while, waiting for + something to arrive in it. */ + if (runp == NULL && optim.aio_idle_time >= 0) { - /* We must not run requests which are not marked `running'. */ - if (runp->running == yes) - runlist = runp->next_run; - else + struct timeval now; + struct timespec wakeup_time; + + ++idle_thread_count; + gettimeofday (&now, NULL); + wakeup_time.tv_sec = now.tv_sec + optim.aio_idle_time; + wakeup_time.tv_nsec = now.tv_usec * 1000; + if (wakeup_time.tv_nsec > 1000000000) { - struct requestlist *old; - - do - { - old = runp; - runp = runp->next_run; - } - while (runp != NULL && runp->running != yes); - - if (runp != NULL) - old->next_run = runp->next_run; + wakeup_time.tv_nsec -= 1000000000; + ++wakeup_time.tv_sec; } + pthread_cond_timedwait (&__aio_new_request_notification, + &__aio_requests_mutex, + &wakeup_time); + --idle_thread_count; + runp = runlist; } - /* If no request to work on we will stop the thread. */ if (runp == NULL) --nthreads; else - runp->running = allocated; + { + assert (runp->running == yes); + runp->running = allocated; + runlist = runp->next_run; + + /* If we have a request to process, and there's still another in + the run list, then we need to either wake up or create a new + thread to service the request that is still in the run list. */ + if (runlist != NULL) + { + /* There are at least two items in the work queue to work on. + If there are other idle threads, then we should wake them + up for these other work elements; otherwise, we should try + to create a new thread. */ + if (idle_thread_count > 0) + pthread_cond_signal (&__aio_new_request_notification); + else if (nthreads < optim.aio_threads) + { + pthread_t thid; + pthread_attr_t attr; + + /* Make sure the thread is created detached. */ + pthread_attr_init (&attr); + pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); + + /* Now try to start a thread. If we fail, no big deal, + because we know that there is at least one thread (us) + that is working on AIO operations. */ + if (pthread_create (&thid, &attr, handle_fildes_io, NULL) + == 0) + ++nthreads; + } + } + } /* Release the mutex. */ pthread_mutex_unlock (&__aio_requests_mutex); @@ -577,5 +634,33 @@ free_res (void) free (pool); } - text_set_element (__libc_subfreeres, free_res); + + +/* Add newrequest to the runlist. The __abs_prio flag of newrequest must + be correctly set to do this. Also, you had better set newrequest's + "running" flag to "yes" before you release your lock or you'll throw an + assertion. */ +static void +add_request_to_runlist (struct requestlist *newrequest) +{ + int prio = newrequest->aiocbp->aiocb.__abs_prio; + struct requestlist *runp; + + if (runlist == NULL || runlist->aiocbp->aiocb.__abs_prio < prio) + { + newrequest->next_run = runlist; + runlist = newrequest; + } + else + { + runp = runlist; + + while (runp->next_run != NULL + && runp->next_run->aiocbp->aiocb.__abs_prio >= prio) + runp = runp->next_run; + + newrequest->next_run = runp->next_run; + runp->next_run = newrequest; + } +} |