diff options
author | Carlos O'Donell <carlos@systemhalted.org> | 2015-01-21 00:46:16 -0500 |
---|---|---|
committer | Carlos O'Donell <carlos@systemhalted.org> | 2015-01-21 00:46:16 -0500 |
commit | 042e1521c794a945edc43b5bfa7e69ad70420524 (patch) | |
tree | 74f44d4b065d801cfe6ace654827ca8ef351bff8 /nptl | |
parent | a8db092ec0c6742a9d41e1715946e90d4edfeec1 (diff) | |
download | glibc-042e1521c794a945edc43b5bfa7e69ad70420524.zip glibc-042e1521c794a945edc43b5bfa7e69ad70420524.tar.gz glibc-042e1521c794a945edc43b5bfa7e69ad70420524.tar.bz2 |
Fix semaphore destruction (bug 12674).
This commit fixes semaphore destruction by either using 64b atomic
operations (where available), or by using two separate fields when only
32b atomic operations are available. In the latter case, we keep a
conservative estimate of whether there are any waiting threads in one
bit of the field that counts the number of available tokens, thus
allowing sem_post to atomically both add a token and determine whether
it needs to call futex_wake.
See:
https://sourceware.org/ml/libc-alpha/2014-12/msg00155.html
Diffstat (limited to 'nptl')
-rw-r--r-- | nptl/DESIGN-sem.txt | 46 | ||||
-rw-r--r-- | nptl/Makefile | 5 | ||||
-rw-r--r-- | nptl/sem_getvalue.c | 26 | ||||
-rw-r--r-- | nptl/sem_init.c | 35 | ||||
-rw-r--r-- | nptl/sem_open.c | 9 | ||||
-rw-r--r-- | nptl/sem_post.c | 67 | ||||
-rw-r--r-- | nptl/sem_timedwait.c | 96 | ||||
-rw-r--r-- | nptl/sem_trywait.c | 50 | ||||
-rw-r--r-- | nptl/sem_wait.c | 101 | ||||
-rw-r--r-- | nptl/sem_waitcommon.c | 467 | ||||
-rw-r--r-- | nptl/structsem.sym | 12 | ||||
-rw-r--r-- | nptl/tst-sem11.c | 9 | ||||
-rw-r--r-- | nptl/tst-sem13.c | 18 |
13 files changed, 641 insertions, 300 deletions
diff --git a/nptl/DESIGN-sem.txt b/nptl/DESIGN-sem.txt deleted file mode 100644 index 17eb0c1..0000000 --- a/nptl/DESIGN-sem.txt +++ /dev/null @@ -1,46 +0,0 @@ -Semaphores pseudocode -============================== - - int sem_wait(sem_t * sem); - int sem_trywait(sem_t * sem); - int sem_post(sem_t * sem); - int sem_getvalue(sem_t * sem, int * sval); - -struct sem_t { - - unsigned int count; - - current semaphore count, also used as a futex -} - -sem_wait(sem_t *sem) -{ - for (;;) { - - if (atomic_decrement_if_positive(sem->count)) - break; - - futex_wait(&sem->count, 0) - } -} - -sem_post(sem_t *sem) -{ - n = atomic_increment(sem->count); - // Pass the new value of sem->count - futex_wake(&sem->count, n + 1); -} - -sem_trywait(sem_t *sem) -{ - if (atomic_decrement_if_positive(sem->count)) { - return 0; - } else { - return EAGAIN; - } -} - -sem_getvalue(sem_t *sem, int *sval) -{ - *sval = sem->count; - read_barrier(); -} diff --git a/nptl/Makefile b/nptl/Makefile index ce2d0e4..43d8510 100644 --- a/nptl/Makefile +++ b/nptl/Makefile @@ -100,7 +100,7 @@ libpthread-routines = nptl-init vars events version \ sem_init sem_destroy \ sem_open sem_close sem_unlink \ sem_getvalue \ - sem_wait sem_trywait sem_timedwait sem_post \ + sem_wait sem_timedwait sem_post \ cleanup cleanup_defer cleanup_compat \ cleanup_defer_compat unwind \ pt-longjmp pt-cleanup\ @@ -283,8 +283,7 @@ tests-nolibpthread = tst-unload gen-as-const-headers = pthread-errnos.sym \ lowlevelcond.sym lowlevelrwlock.sym \ lowlevelbarrier.sym unwindbuf.sym \ - lowlevelrobustlock.sym pthread-pi-defines.sym \ - structsem.sym + lowlevelrobustlock.sym pthread-pi-defines.sym LDFLAGS-pthread.so = -Wl,--enable-new-dtags,-z,nodelete,-z,initfirst diff --git a/nptl/sem_getvalue.c b/nptl/sem_getvalue.c index c3c91e1..1432cc7 100644 --- a/nptl/sem_getvalue.c +++ b/nptl/sem_getvalue.c @@ -19,23 +19,37 @@ #include <semaphore.h> #include <shlib-compat.h> #include "semaphoreP.h" +#include <atomic.h> int -__new_sem_getvalue (sem, sval) - sem_t *sem; - int *sval; +__new_sem_getvalue (sem_t *sem, int *sval) { struct new_sem *isem = (struct new_sem *) sem; /* XXX Check for valid SEM parameter. */ - - *sval = isem->value; + /* FIXME This uses relaxed MO, even though POSIX specifies that this function + should be linearizable. However, its debatable whether linearizability + is the right requirement. We need to follow up with POSIX and, if + necessary, use a stronger MO here and elsewhere (e.g., potentially + release MO in all places where we consume a token). */ + +#if __HAVE_64B_ATOMICS + *sval = atomic_load_relaxed (&isem->data) & SEM_VALUE_MASK; +#else + *sval = atomic_load_relaxed (&isem->value) >> SEM_VALUE_SHIFT; +#endif return 0; } versioned_symbol (libpthread, __new_sem_getvalue, sem_getvalue, GLIBC_2_1); #if SHLIB_COMPAT(libpthread, GLIBC_2_0, GLIBC_2_1) -strong_alias (__new_sem_getvalue, __old_sem_getvalue) +int +__old_sem_getvalue (sem_t *sem, int *sval) +{ + struct old_sem *isem = (struct old_sem *) sem; + *sval = isem->value; + return 0; +} compat_symbol (libpthread, __old_sem_getvalue, sem_getvalue, GLIBC_2_0); #endif diff --git a/nptl/sem_init.c b/nptl/sem_init.c index 5350f16..575b661 100644 --- a/nptl/sem_init.c +++ b/nptl/sem_init.c @@ -18,17 +18,29 @@ #include <errno.h> #include <semaphore.h> -#include <lowlevellock.h> #include <shlib-compat.h> #include "semaphoreP.h" #include <kernel-features.h> +/* Returns FUTEX_PRIVATE if pshared is zero and private futexes are supported; + returns FUTEX_SHARED otherwise. + TODO Remove when cleaning up the futex API throughout glibc. */ +static __always_inline int +futex_private_if_supported (int pshared) +{ + if (pshared != 0) + return LLL_SHARED; +#ifdef __ASSUME_PRIVATE_FUTEX + return LLL_PRIVATE; +#else + return THREAD_GETMEM (THREAD_SELF, header.private_futex) + ^ FUTEX_PRIVATE_FLAG; +#endif +} + int -__new_sem_init (sem, pshared, value) - sem_t *sem; - int pshared; - unsigned int value; +__new_sem_init (sem_t *sem, int pshared, unsigned int value) { /* Parameter sanity check. */ if (__glibc_unlikely (value > SEM_VALUE_MAX)) @@ -40,16 +52,15 @@ __new_sem_init (sem, pshared, value) /* Map to the internal type. */ struct new_sem *isem = (struct new_sem *) sem; - /* Use the values the user provided. */ - isem->value = value; -#ifdef __ASSUME_PRIVATE_FUTEX - isem->private = pshared ? 0 : FUTEX_PRIVATE_FLAG; + /* Use the values the caller provided. */ +#if __HAVE_64B_ATOMICS + isem->data = value; #else - isem->private = pshared ? 0 : THREAD_GETMEM (THREAD_SELF, - header.private_futex); + isem->value = value << SEM_VALUE_SHIFT; + isem->nwaiters = 0; #endif - isem->nwaiters = 0; + isem->private = futex_private_if_supported (pshared); return 0; } diff --git a/nptl/sem_open.c b/nptl/sem_open.c index b5a5de4..bfd2dea 100644 --- a/nptl/sem_open.c +++ b/nptl/sem_open.c @@ -193,9 +193,14 @@ sem_open (const char *name, int oflag, ...) struct new_sem newsem; } sem; - sem.newsem.value = value; - sem.newsem.private = 0; +#if __HAVE_64B_ATOMICS + sem.newsem.data = value; +#else + sem.newsem.value = value << SEM_VALUE_SHIFT; sem.newsem.nwaiters = 0; +#endif + /* This always is a shared semaphore. */ + sem.newsem.private = LLL_SHARED; /* Initialize the remaining bytes as well. */ memset ((char *) &sem.initsem + sizeof (struct new_sem), '\0', diff --git a/nptl/sem_post.c b/nptl/sem_post.c index d1c39ff..9162e4c 100644 --- a/nptl/sem_post.c +++ b/nptl/sem_post.c @@ -26,34 +26,78 @@ #include <shlib-compat.h> +/* Wrapper for lll_futex_wake, with error checking. + TODO Remove when cleaning up the futex API throughout glibc. */ +static __always_inline void +futex_wake (unsigned int* futex, int processes_to_wake, int private) +{ + int res = lll_futex_wake (futex, processes_to_wake, private); + /* No error. Ignore the number of woken processes. */ + if (res >= 0) + return; + switch (res) + { + case -EFAULT: /* Could have happened due to memory reuse. */ + case -EINVAL: /* Could be either due to incorrect alignment (a bug in + glibc or in the application) or due to memory being + reused for a PI futex. We cannot distinguish between the + two causes, and one of them is correct use, so we do not + act in this case. */ + return; + case -ENOSYS: /* Must have been caused by a glibc bug. */ + /* No other errors are documented at this time. */ + default: + abort (); + } +} + + +/* See sem_wait for an explanation of the algorithm. */ int __new_sem_post (sem_t *sem) { struct new_sem *isem = (struct new_sem *) sem; + int private = isem->private; - __typeof (isem->value) cur; +#if __HAVE_64B_ATOMICS + /* Add a token to the semaphore. We use release MO to make sure that a + thread acquiring this token synchronizes with us and other threads that + added tokens before (the release sequence includes atomic RMW operations + by other threads). */ + /* TODO Use atomic_fetch_add to make it scale better than a CAS loop? */ + unsigned long int d = atomic_load_relaxed (&isem->data); do { - cur = isem->value; - if (isem->value == SEM_VALUE_MAX) + if ((d & SEM_VALUE_MASK) == SEM_VALUE_MAX) { __set_errno (EOVERFLOW); return -1; } } - while (atomic_compare_and_exchange_bool_rel (&isem->value, cur + 1, cur)); + while (!atomic_compare_exchange_weak_release (&isem->data, &d, d + 1)); - atomic_full_barrier (); - if (isem->nwaiters > 0) + /* If there is any potentially blocked waiter, wake one of them. */ + if ((d >> SEM_NWAITERS_SHIFT) > 0) + futex_wake (((unsigned int *) &isem->data) + SEM_VALUE_OFFSET, 1, private); +#else + /* Add a token to the semaphore. Similar to 64b version. */ + unsigned int v = atomic_load_relaxed (&isem->value); + do { - int err = lll_futex_wake (&isem->value, 1, - isem->private ^ FUTEX_PRIVATE_FLAG); - if (__builtin_expect (err, 0) < 0) + if ((v << SEM_VALUE_SHIFT) == SEM_VALUE_MAX) { - __set_errno (-err); + __set_errno (EOVERFLOW); return -1; } } + while (!atomic_compare_exchange_weak_release (&isem->value, + &v, v + (1 << SEM_VALUE_SHIFT))); + + /* If there is any potentially blocked waiter, wake one of them. */ + if ((v & SEM_NWAITERS_MASK) != 0) + futex_wake (&isem->value, 1, private); +#endif + return 0; } versioned_symbol (libpthread, __new_sem_post, sem_post, GLIBC_2_1); @@ -66,6 +110,9 @@ __old_sem_post (sem_t *sem) { int *futex = (int *) sem; + /* We must need to synchronize with consumers of this token, so the atomic + increment must have release MO semantics. */ + atomic_write_barrier (); (void) atomic_increment_val (futex); /* We always have to assume it is a shared semaphore. */ int err = lll_futex_wake (futex, 1, LLL_SHARED); diff --git a/nptl/sem_timedwait.c b/nptl/sem_timedwait.c index d04bcdf..042b0ac 100644 --- a/nptl/sem_timedwait.c +++ b/nptl/sem_timedwait.c @@ -1,4 +1,4 @@ -/* sem_timedwait -- wait on a semaphore. Generic futex-using version. +/* sem_timedwait -- wait on a semaphore with timeout. Copyright (C) 2003-2015 Free Software Foundation, Inc. This file is part of the GNU C Library. Contributed by Paul Mackerras <paulus@au.ibm.com>, 2003. @@ -17,101 +17,21 @@ License along with the GNU C Library; if not, see <http://www.gnu.org/licenses/>. */ -#include <errno.h> -#include <sysdep.h> -#include <lowlevellock.h> -#include <internaltypes.h> -#include <semaphore.h> -#include <sys/time.h> - -#include <pthreadP.h> -#include <shlib-compat.h> - - -extern void __sem_wait_cleanup (void *arg) attribute_hidden; - -/* This is in a seperate function in order to make sure gcc - puts the call site into an exception region, and thus the - cleanups get properly run. */ -static int -__attribute__ ((noinline)) -do_futex_timed_wait (struct new_sem *isem, struct timespec *rt) -{ - int err, oldtype = __pthread_enable_asynccancel (); - - err = lll_futex_timed_wait (&isem->value, 0, rt, - isem->private ^ FUTEX_PRIVATE_FLAG); - - __pthread_disable_asynccancel (oldtype); - return err; -} +#include "sem_waitcommon.c" +/* This is in a separate file because because sem_timedwait is only provided + if __USE_XOPEN2K is defined. */ int sem_timedwait (sem_t *sem, const struct timespec *abstime) { - struct new_sem *isem = (struct new_sem *) sem; - int err; - - if (atomic_decrement_if_positive (&isem->value) > 0) - return 0; - if (abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000) { __set_errno (EINVAL); return -1; } - atomic_increment (&isem->nwaiters); - - pthread_cleanup_push (__sem_wait_cleanup, isem); - - while (1) - { - struct timeval tv; - struct timespec rt; - int sec, nsec; - - /* Get the current time. */ - __gettimeofday (&tv, NULL); - - /* Compute relative timeout. */ - sec = abstime->tv_sec - tv.tv_sec; - nsec = abstime->tv_nsec - tv.tv_usec * 1000; - if (nsec < 0) - { - nsec += 1000000000; - --sec; - } - - /* Already timed out? */ - if (sec < 0) - { - __set_errno (ETIMEDOUT); - err = -1; - break; - } - - /* Do wait. */ - rt.tv_sec = sec; - rt.tv_nsec = nsec; - err = do_futex_timed_wait(isem, &rt); - if (err != 0 && err != -EWOULDBLOCK) - { - __set_errno (-err); - err = -1; - break; - } - - if (atomic_decrement_if_positive (&isem->value) > 0) - { - err = 0; - break; - } - } - - pthread_cleanup_pop (0); - - atomic_decrement (&isem->nwaiters); - - return err; + if (__new_sem_wait_fast ((struct new_sem *) sem, 0) == 0) + return 0; + else + return __new_sem_wait_slow((struct new_sem *) sem, abstime); } diff --git a/nptl/sem_trywait.c b/nptl/sem_trywait.c deleted file mode 100644 index d434098..0000000 --- a/nptl/sem_trywait.c +++ /dev/null @@ -1,50 +0,0 @@ -/* sem_trywait -- wait on a semaphore. Generic futex-using version. - Copyright (C) 2003-2015 Free Software Foundation, Inc. - This file is part of the GNU C Library. - Contributed by Paul Mackerras <paulus@au.ibm.com>, 2003. - - The GNU C Library is free software; you can redistribute it and/or - modify it under the terms of the GNU Lesser General Public - License as published by the Free Software Foundation; either - version 2.1 of the License, or (at your option) any later version. - - The GNU C Library is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public - License along with the GNU C Library; if not, see - <http://www.gnu.org/licenses/>. */ - -#include <errno.h> -#include <sysdep.h> -#include <lowlevellock.h> -#include <internaltypes.h> -#include <semaphore.h> -#include <atomic.h> - -#include <shlib-compat.h> - - -int -__new_sem_trywait (sem_t *sem) -{ - int *futex = (int *) sem; - int val; - - if (*futex > 0) - { - val = atomic_decrement_if_positive (futex); - if (val > 0) - return 0; - } - - __set_errno (EAGAIN); - return -1; -} -versioned_symbol (libpthread, __new_sem_trywait, sem_trywait, GLIBC_2_1); -#if SHLIB_COMPAT (libpthread, GLIBC_2_0, GLIBC_2_1) -strong_alias (__new_sem_trywait, __old_sem_trywait) -compat_symbol (libpthread, __old_sem_trywait, sem_trywait, GLIBC_2_0); -#endif diff --git a/nptl/sem_wait.c b/nptl/sem_wait.c index df11933..c1fd10c 100644 --- a/nptl/sem_wait.c +++ b/nptl/sem_wait.c @@ -17,80 +17,18 @@ License along with the GNU C Library; if not, see <http://www.gnu.org/licenses/>. */ -#include <errno.h> -#include <sysdep.h> -#include <lowlevellock.h> -#include <internaltypes.h> -#include <semaphore.h> - -#include <pthreadP.h> -#include <shlib-compat.h> -#include <atomic.h> - - -void -attribute_hidden -__sem_wait_cleanup (void *arg) -{ - struct new_sem *isem = (struct new_sem *) arg; - - atomic_decrement (&isem->nwaiters); -} - -/* This is in a seperate function in order to make sure gcc - puts the call site into an exception region, and thus the - cleanups get properly run. */ -static int -__attribute__ ((noinline)) -do_futex_wait (struct new_sem *isem) -{ - int err, oldtype = __pthread_enable_asynccancel (); - - err = lll_futex_wait (&isem->value, 0, isem->private ^ FUTEX_PRIVATE_FLAG); - - __pthread_disable_asynccancel (oldtype); - return err; -} +#include "sem_waitcommon.c" int __new_sem_wait (sem_t *sem) { - struct new_sem *isem = (struct new_sem *) sem; - int err; - - if (atomic_decrement_if_positive (&isem->value) > 0) + if (__new_sem_wait_fast ((struct new_sem *) sem, 0) == 0) return 0; - - atomic_increment (&isem->nwaiters); - - pthread_cleanup_push (__sem_wait_cleanup, isem); - - while (1) - { - err = do_futex_wait(isem); - if (err != 0 && err != -EWOULDBLOCK) - { - __set_errno (-err); - err = -1; - break; - } - - if (atomic_decrement_if_positive (&isem->value) > 0) - { - err = 0; - break; - } - } - - pthread_cleanup_pop (0); - - atomic_decrement (&isem->nwaiters); - - return err; + else + return __new_sem_wait_slow((struct new_sem *) sem, NULL); } versioned_symbol (libpthread, __new_sem_wait, sem_wait, GLIBC_2_1); - #if SHLIB_COMPAT (libpthread, GLIBC_2_0, GLIBC_2_1) int attribute_compat_text_section @@ -121,3 +59,34 @@ __old_sem_wait (sem_t *sem) compat_symbol (libpthread, __old_sem_wait, sem_wait, GLIBC_2_0); #endif + +int +__new_sem_trywait (sem_t *sem) +{ + /* We must not fail spuriously, so require a definitive result even if this + may lead to a long execution time. */ + if (__new_sem_wait_fast ((struct new_sem *) sem, 1) == 0) + return 0; + __set_errno (EAGAIN); + return -1; +} +versioned_symbol (libpthread, __new_sem_trywait, sem_trywait, GLIBC_2_1); +#if SHLIB_COMPAT (libpthread, GLIBC_2_0, GLIBC_2_1) +int +__old_sem_trywait (sem_t *sem) +{ + int *futex = (int *) sem; + int val; + + if (*futex > 0) + { + val = atomic_decrement_if_positive (futex); + if (val > 0) + return 0; + } + + __set_errno (EAGAIN); + return -1; +} +compat_symbol (libpthread, __old_sem_trywait, sem_trywait, GLIBC_2_0); +#endif diff --git a/nptl/sem_waitcommon.c b/nptl/sem_waitcommon.c new file mode 100644 index 0000000..96848d7 --- /dev/null +++ b/nptl/sem_waitcommon.c @@ -0,0 +1,467 @@ +/* sem_waitcommon -- wait on a semaphore, shared code. + Copyright (C) 2003-2015 Free Software Foundation, Inc. + This file is part of the GNU C Library. + Contributed by Paul Mackerras <paulus@au.ibm.com>, 2003. + + The GNU C Library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 2.1 of the License, or (at your option) any later version. + + The GNU C Library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with the GNU C Library; if not, see + <http://www.gnu.org/licenses/>. */ + +#include <errno.h> +#include <sysdep.h> +#include <lowlevellock.h> +#include <internaltypes.h> +#include <semaphore.h> +#include <sys/time.h> + +#include <pthreadP.h> +#include <shlib-compat.h> +#include <atomic.h> + +/* Wrapper for lll_futex_wait with absolute timeout and error checking. + TODO Remove when cleaning up the futex API throughout glibc. */ +static __always_inline int +futex_abstimed_wait (unsigned int* futex, unsigned int expected, + const struct timespec* abstime, int private, bool cancel) +{ + int err, oldtype; + if (abstime == NULL) + { + if (cancel) + oldtype = __pthread_enable_asynccancel (); + err = lll_futex_wait (futex, expected, private); + if (cancel) + __pthread_disable_asynccancel (oldtype); + } + else + { + struct timeval tv; + struct timespec rt; + int sec, nsec; + + /* Get the current time. */ + __gettimeofday (&tv, NULL); + + /* Compute relative timeout. */ + sec = abstime->tv_sec - tv.tv_sec; + nsec = abstime->tv_nsec - tv.tv_usec * 1000; + if (nsec < 0) + { + nsec += 1000000000; + --sec; + } + + /* Already timed out? */ + if (sec < 0) + return ETIMEDOUT; + + /* Do wait. */ + rt.tv_sec = sec; + rt.tv_nsec = nsec; + if (cancel) + oldtype = __pthread_enable_asynccancel (); + err = lll_futex_timed_wait (futex, expected, &rt, private); + if (cancel) + __pthread_disable_asynccancel (oldtype); + } + switch (err) + { + case 0: + case -EAGAIN: + case -EINTR: + case -ETIMEDOUT: + return -err; + + case -EFAULT: /* Must have been caused by a glibc or application bug. */ + case -EINVAL: /* Either due to wrong alignment or due to the timeout not + being normalized. Must have been caused by a glibc or + application bug. */ + case -ENOSYS: /* Must have been caused by a glibc bug. */ + /* No other errors are documented at this time. */ + default: + abort (); + } +} + +/* Wrapper for lll_futex_wake, with error checking. + TODO Remove when cleaning up the futex API throughout glibc. */ +static __always_inline void +futex_wake (unsigned int* futex, int processes_to_wake, int private) +{ + int res = lll_futex_wake (futex, processes_to_wake, private); + /* No error. Ignore the number of woken processes. */ + if (res >= 0) + return; + switch (res) + { + case -EFAULT: /* Could have happened due to memory reuse. */ + case -EINVAL: /* Could be either due to incorrect alignment (a bug in + glibc or in the application) or due to memory being + reused for a PI futex. We cannot distinguish between the + two causes, and one of them is correct use, so we do not + act in this case. */ + return; + case -ENOSYS: /* Must have been caused by a glibc bug. */ + /* No other errors are documented at this time. */ + default: + abort (); + } +} + + +/* The semaphore provides two main operations: sem_post adds a token to the + semaphore; sem_wait grabs a token from the semaphore, potentially waiting + until there is a token available. A sem_wait needs to synchronize with + the sem_post that provided the token, so that whatever lead to the sem_post + happens before the code after sem_wait. + + Conceptually, available tokens can simply be counted; let's call that the + value of the semaphore. However, we also want to know whether there might + be a sem_wait that is blocked on the value because it was zero (using a + futex with the value being the futex variable); if there is no blocked + sem_wait, sem_post does not need to execute a futex_wake call. Therefore, + we also need to count the number of potentially blocked sem_wait calls + (which we call nwaiters). + + What makes this tricky is that POSIX requires that a semaphore can be + destroyed as soon as the last remaining sem_wait has returned, and no + other sem_wait or sem_post calls are executing concurrently. However, the + sem_post call whose token was consumed by the last sem_wait is considered + to have finished once it provided the token to the sem_wait. + Thus, sem_post must not access the semaphore struct anymore after it has + made a token available; IOW, it needs to be able to atomically provide + a token and check whether any blocked sem_wait calls might exist. + + This is straightforward to do if the architecture provides 64b atomics + because we can just put both the value and nwaiters into one variable that + we access atomically: This is the data field, the value is in the + least-significant 32 bits, and nwaiters in the other bits. When sem_post + makes a value available, it can atomically check nwaiters. + + If we have only 32b atomics available, we cannot put both nwaiters and + value into one 32b value because then we might have too few bits for both + of those counters. Therefore, we need to use two distinct fields. + + To allow sem_post to atomically make a token available and check for + blocked sem_wait calls, we use one bit in value to indicate whether + nwaiters is nonzero. That allows sem_post to use basically the same + algorithm as with 64b atomics, but requires sem_wait to update the bit; it + can't do this atomically with another access to nwaiters, but it can compute + a conservative value for the bit because it's benign if the bit is set + even if nwaiters is zero (all we get is an unnecessary futex wake call by + sem_post). + Specifically, sem_wait will unset the bit speculatively if it believes that + there is no other concurrently executing sem_wait. If it misspeculated, + it will have to clean up by waking any other sem_wait call (i.e., what + sem_post would do otherwise). This does not conflict with the destruction + requirement because the semaphore must not be destructed while any sem_wait + is still executing. */ + +/* Set this to true if you assume that, in contrast to current Linux futex + documentation, lll_futex_wake can return -EINTR only if interrupted by a + signal, not spuriously due to some other reason. + TODO Discuss EINTR conditions with the Linux kernel community. For + now, we set this to true to not change behavior of semaphores compared + to previous glibc builds. */ +static const int sem_assume_only_signals_cause_futex_EINTR = 1; + +#if !__HAVE_64B_ATOMICS +static void +__sem_wait_32_finish (struct new_sem *sem); +#endif + +static void +__sem_wait_cleanup (void *arg) +{ + struct new_sem *sem = (struct new_sem *) arg; + +#if __HAVE_64B_ATOMICS + /* Stop being registered as a waiter. See below for MO. */ + atomic_fetch_add_relaxed (&sem->data, -(1UL << SEM_NWAITERS_SHIFT)); +#else + __sem_wait_32_finish (sem); +#endif +} + +/* Wait until at least one token is available, possibly with a timeout. + This is in a separate function in order to make sure gcc + puts the call site into an exception region, and thus the + cleanups get properly run. TODO still necessary? Other futex_wait + users don't seem to need it. */ +static int +__attribute__ ((noinline)) +do_futex_wait (struct new_sem *sem, const struct timespec *abstime) +{ + int err; + +#if __HAVE_64B_ATOMICS + err = futex_abstimed_wait ((unsigned int *) &sem->data + SEM_VALUE_OFFSET, 0, + abstime, sem->private, true); +#else + err = futex_abstimed_wait (&sem->value, SEM_NWAITERS_MASK, abstime, + sem->private, true); +#endif + + return err; +} + +/* Fast path: Try to grab a token without blocking. */ +static int +__new_sem_wait_fast (struct new_sem *sem, int definitive_result) +{ + /* We need acquire MO if we actually grab a token, so that this + synchronizes with all token providers (i.e., the RMW operation we read + from or all those before it in modification order; also see sem_post). + We do not need to guarantee any ordering if we observed that there is + no token (POSIX leaves it unspecified whether functions that fail + synchronize memory); thus, relaxed MO is sufficient for the initial load + and the failure path of the CAS. If the weak CAS fails and we need a + definitive result, retry. */ +#if __HAVE_64B_ATOMICS + unsigned long d = atomic_load_relaxed (&sem->data); + do + { + if ((d & SEM_VALUE_MASK) == 0) + break; + if (atomic_compare_exchange_weak_acquire (&sem->data, &d, d - 1)) + return 0; + } + while (definitive_result); + return -1; +#else + unsigned int v = atomic_load_relaxed (&sem->value); + do + { + if ((v >> SEM_VALUE_SHIFT) == 0) + break; + if (atomic_compare_exchange_weak_acquire (&sem->value, + &v, v - (1 << SEM_VALUE_SHIFT))) + return 0; + } + while (definitive_result); + return -1; +#endif +} + +/* Slow path that blocks. */ +static int +__attribute__ ((noinline)) +__new_sem_wait_slow (struct new_sem *sem, const struct timespec *abstime) +{ + int err = 0; + +#if __HAVE_64B_ATOMICS + /* Add a waiter. Relaxed MO is sufficient because we can rely on the + ordering provided by the RMW operations we use. */ + unsigned long d = atomic_fetch_add_relaxed (&sem->data, + 1UL << SEM_NWAITERS_SHIFT); + + pthread_cleanup_push (__sem_wait_cleanup, sem); + + /* Wait for a token to be available. Retry until we can grab one. */ + for (;;) + { + /* If there is no token available, sleep until there is. */ + if ((d & SEM_VALUE_MASK) == 0) + { + err = do_futex_wait (sem, abstime); + /* A futex return value of 0 or EAGAIN is due to a real or spurious + wake-up, or due to a change in the number of tokens. We retry in + these cases. + If we timed out, forward this to the caller. + EINTR could be either due to being interrupted by a signal, or + due to a spurious wake-up. Thus, we cannot distinguish between + both, and are not allowed to return EINTR to the caller but have + to retry; this is because we may not have been interrupted by a + signal. However, if we assume that only signals cause a futex + return of EINTR, we forward EINTR to the caller. + + Retrying on EINTR is technically always allowed because to + reliably interrupt sem_wait with a signal, the signal handler + must call sem_post (which is AS-Safe). In executions where the + signal handler does not do that, the implementation can correctly + claim that sem_wait hadn't actually started to execute yet, and + thus the signal never actually interrupted sem_wait. We make no + timing guarantees, so the program can never observe that sem_wait + actually did start to execute. Thus, in a correct program, we + can expect a signal that wanted to interrupt the sem_wait to have + provided a token, and can just try to grab this token if + futex_wait returns EINTR. */ + if (err == ETIMEDOUT || + (err == EINTR && sem_assume_only_signals_cause_futex_EINTR)) + { + __set_errno (err); + err = -1; + /* Stop being registered as a waiter. */ + atomic_fetch_add_relaxed (&sem->data, + -(1UL << SEM_NWAITERS_SHIFT)); + break; + } + /* Relaxed MO is sufficient; see below. */ + d = atomic_load_relaxed (&sem->data); + } + else + { + /* Try to grab both a token and stop being a waiter. We need + acquire MO so this synchronizes with all token providers (i.e., + the RMW operation we read from or all those before it in + modification order; also see sem_post). On the failure path, + relaxed MO is sufficient because we only eventually need the + up-to-date value; the futex_wait or the CAS perform the real + work. */ + if (atomic_compare_exchange_weak_acquire (&sem->data, + &d, d - 1 - (1UL << SEM_NWAITERS_SHIFT))) + { + err = 0; + break; + } + } + } + + pthread_cleanup_pop (0); +#else + /* The main difference to the 64b-atomics implementation is that we need to + access value and nwaiters in separate steps, and that the nwaiters bit + in the value can temporarily not be set even if nwaiters is nonzero. + We work around incorrectly unsetting the nwaiters bit by letting sem_wait + set the bit again and waking the number of waiters that could grab a + token. There are two additional properties we need to ensure: + (1) We make sure that whenever unsetting the bit, we see the increment of + nwaiters by the other thread that set the bit. IOW, we will notice if + we make a mistake. + (2) When setting the nwaiters bit, we make sure that we see the unsetting + of the bit by another waiter that happened before us. This avoids having + to blindly set the bit whenever we need to block on it. We set/unset + the bit while having incremented nwaiters (i.e., are a registered + waiter), and the problematic case only happens when one waiter indeed + followed another (i.e., nwaiters was never larger than 1); thus, this + works similarly as with a critical section using nwaiters (see the MOs + and related comments below). + + An alternative approach would be to unset the bit after decrementing + nwaiters; however, that would result in needing Dekker-like + synchronization and thus full memory barriers. We also would not be able + to prevent misspeculation, so this alternative scheme does not seem + beneficial. */ + unsigned int v; + + /* Add a waiter. We need acquire MO so this synchronizes with the release + MO we use when decrementing nwaiters below; it ensures that if another + waiter unset the bit before us, we see that and set it again. Also see + property (2) above. */ + atomic_fetch_add_acquire (&sem->nwaiters, 1); + + pthread_cleanup_push (__sem_wait_cleanup, sem); + + /* Wait for a token to be available. Retry until we can grab one. */ + /* We do not need any ordering wrt. to this load's reads-from, so relaxed + MO is sufficient. The acquire MO above ensures that in the problematic + case, we do see the unsetting of the bit by another waiter. */ + v = atomic_load_relaxed (&sem->value); + do + { + do + { + /* We are about to block, so make sure that the nwaiters bit is + set. We need release MO on the CAS to ensure that when another + waiter unsets the nwaiters bit, it will also observe that we + incremented nwaiters in the meantime (also see the unsetting of + the bit below). Relaxed MO on CAS failure is sufficient (see + above). */ + do + { + if ((v & SEM_NWAITERS_MASK) != 0) + break; + } + while (!atomic_compare_exchange_weak_release (&sem->value, + &v, v | SEM_NWAITERS_MASK)); + /* If there is no token, wait. */ + if ((v >> SEM_VALUE_SHIFT) == 0) + { + /* See __HAVE_64B_ATOMICS variant. */ + err = do_futex_wait(sem, abstime); + if (err == ETIMEDOUT || + (err == EINTR && sem_assume_only_signals_cause_futex_EINTR)) + { + __set_errno (err); + err = -1; + goto error; + } + err = 0; + /* We blocked, so there might be a token now. Relaxed MO is + sufficient (see above). */ + v = atomic_load_relaxed (&sem->value); + } + } + /* If there is no token, we must not try to grab one. */ + while ((v >> SEM_VALUE_SHIFT) == 0); + } + /* Try to grab a token. We need acquire MO so this synchronizes with + all token providers (i.e., the RMW operation we read from or all those + before it in modification order; also see sem_post). */ + while (!atomic_compare_exchange_weak_acquire (&sem->value, + &v, v - (1 << SEM_VALUE_SHIFT))); + +error: + pthread_cleanup_pop (0); + + __sem_wait_32_finish (sem); +#endif + + return err; +} + +/* Stop being a registered waiter (non-64b-atomics code only). */ +#if !__HAVE_64B_ATOMICS +static void +__sem_wait_32_finish (struct new_sem *sem) +{ + /* The nwaiters bit is still set, try to unset it now if this seems + necessary. We do this before decrementing nwaiters so that the unsetting + is visible to other waiters entering after us. Relaxed MO is sufficient + because we are just speculating here; a stronger MO would not prevent + misspeculation. */ + unsigned int wguess = atomic_load_relaxed (&sem->nwaiters); + if (wguess == 1) + /* We might be the last waiter, so unset. This needs acquire MO so that + it syncronizes with the release MO when setting the bit above; if we + overwrite someone else that set the bit, we'll read in the following + decrement of nwaiters at least from that release sequence, so we'll + see if the other waiter is still active or if another writer entered + in the meantime (i.e., using the check below). */ + atomic_fetch_and_acquire (&sem->value, ~SEM_NWAITERS_MASK); + + /* Now stop being a waiter, and see whether our guess was correct. + This needs release MO so that it synchronizes with the acquire MO when + a waiter increments nwaiters; this makes sure that newer writers see that + we reset the waiters_present bit. */ + unsigned int wfinal = atomic_fetch_add_release (&sem->nwaiters, -1); + if (wfinal > 1 && wguess == 1) + { + /* We guessed wrong, and so need to clean up after the mistake and + unblock any waiters that could have not been woken. There is no + additional ordering that we need to set up, so relaxed MO is + sufficient. */ + unsigned int v = atomic_fetch_or_relaxed (&sem->value, + SEM_NWAITERS_MASK); + /* If there are available tokens, then wake as many waiters. If there + aren't any, then there is no need to wake anyone because there is + none to grab for another waiter. If tokens become available + subsequently, then the respective sem_post calls will do the wake-up + due to us having set the nwaiters bit again. */ + v >>= SEM_VALUE_SHIFT; + if (v > 0) + futex_wake (&sem->value, v, sem->private); + } +} +#endif diff --git a/nptl/structsem.sym b/nptl/structsem.sym deleted file mode 100644 index 0e2a15f..0000000 --- a/nptl/structsem.sym +++ /dev/null @@ -1,12 +0,0 @@ -#include <limits.h> -#include <stddef.h> -#include <sched.h> -#include <bits/pthreadtypes.h> -#include "internaltypes.h" - --- - -VALUE offsetof (struct new_sem, value) -PRIVATE offsetof (struct new_sem, private) -NWAITERS offsetof (struct new_sem, nwaiters) -SEM_VALUE_MAX SEM_VALUE_MAX diff --git a/nptl/tst-sem11.c b/nptl/tst-sem11.c index 5248eba..1a2dbaf 100644 --- a/nptl/tst-sem11.c +++ b/nptl/tst-sem11.c @@ -34,8 +34,11 @@ main (void) puts ("sem_init failed"); return 1; } - +#if __HAVE_64B_ATOMICS + if ((u.ns.data >> SEM_NWAITERS_SHIFT) != 0) +#else if (u.ns.nwaiters != 0) +#endif { puts ("nwaiters not initialized"); return 1; @@ -68,7 +71,11 @@ main (void) goto again; } +#if __HAVE_64B_ATOMICS + if ((u.ns.data >> SEM_NWAITERS_SHIFT) != 0) +#else if (u.ns.nwaiters != 0) +#endif { puts ("nwaiters not reset"); return 1; diff --git a/nptl/tst-sem13.c b/nptl/tst-sem13.c index 068d79e..1560e91 100644 --- a/nptl/tst-sem13.c +++ b/nptl/tst-sem13.c @@ -33,9 +33,14 @@ do_test (void) perror ("sem_timedwait did not fail with EINVAL"); return 1; } - if (u.ns.nwaiters != 0) +#if __HAVE_64B_ATOMICS + unsigned int nwaiters = (u.ns.data >> SEM_NWAITERS_SHIFT); +#else + unsigned int nwaiters = u.ns.nwaiters; +#endif + if (nwaiters != 0) { - printf ("sem_timedwait modified nwaiters: %ld\n", u.ns.nwaiters); + printf ("sem_timedwait modified nwaiters: %d\n", nwaiters); return 1; } @@ -52,9 +57,14 @@ do_test (void) perror ("2nd sem_timedwait did not fail with ETIMEDOUT"); return 1; } - if (u.ns.nwaiters != 0) +#if __HAVE_64B_ATOMICS + nwaiters = (u.ns.data >> SEM_NWAITERS_SHIFT); +#else + nwaiters = u.ns.nwaiters; +#endif + if (nwaiters != 0) { - printf ("2nd sem_timedwait modified nwaiters: %ld\n", u.ns.nwaiters); + printf ("2nd sem_timedwait modified nwaiters: %d\n", nwaiters); return 1; } |