aboutsummaryrefslogtreecommitdiff
path: root/libgo/go/sync
diff options
context:
space:
mode:
authorIan Lance Taylor <ian@gcc.gnu.org>2011-09-16 15:47:21 +0000
committerIan Lance Taylor <ian@gcc.gnu.org>2011-09-16 15:47:21 +0000
commitadb0401dac41c81571722312d4586b2693f95aa6 (patch)
treeea2b52e3c258d6b6d9356977c683c7f72a4a5fd5 /libgo/go/sync
parent5548ca3540bccbc908a45942896d635ea5f1c23f (diff)
downloadgcc-adb0401dac41c81571722312d4586b2693f95aa6.zip
gcc-adb0401dac41c81571722312d4586b2693f95aa6.tar.gz
gcc-adb0401dac41c81571722312d4586b2693f95aa6.tar.bz2
Update Go library to r60.
From-SVN: r178910
Diffstat (limited to 'libgo/go/sync')
-rw-r--r--libgo/go/sync/atomic/atomic.c28
-rw-r--r--libgo/go/sync/atomic/atomic_test.go102
-rw-r--r--libgo/go/sync/atomic/doc.go6
-rw-r--r--libgo/go/sync/cond.go69
-rw-r--r--libgo/go/sync/cond_test.go27
-rw-r--r--libgo/go/sync/mutex.go63
-rw-r--r--libgo/go/sync/mutex_test.go102
-rw-r--r--libgo/go/sync/once.go14
-rw-r--r--libgo/go/sync/once_test.go25
-rw-r--r--libgo/go/sync/rwmutex.go67
-rw-r--r--libgo/go/sync/rwmutex_test.go83
-rw-r--r--libgo/go/sync/waitgroup.go41
-rw-r--r--libgo/go/sync/waitgroup_test.go105
13 files changed, 628 insertions, 104 deletions
diff --git a/libgo/go/sync/atomic/atomic.c b/libgo/go/sync/atomic/atomic.c
index e2d9b24..6660a7d 100644
--- a/libgo/go/sync/atomic/atomic.c
+++ b/libgo/go/sync/atomic/atomic.c
@@ -95,3 +95,31 @@ AddUintptr (uintptr_t *val, uintptr_t delta)
{
return __sync_add_and_fetch (val, delta);
}
+
+int32_t LoadInt32 (int32_t *addr)
+ asm ("libgo_sync.atomic.LoadInt32");
+
+int32_t
+LoadInt32 (int32_t *addr)
+{
+ int32_t v;
+
+ v = *addr;
+ while (! __sync_bool_compare_and_swap (addr, v, v))
+ v = *addr;
+ return v;
+}
+
+uint32_t LoadUint32 (uint32_t *addr)
+ asm ("libgo_sync.atomic.LoadUint32");
+
+uint32_t
+LoadUint32 (uint32_t *addr)
+{
+ uint32_t v;
+
+ v = *addr;
+ while (! __sync_bool_compare_and_swap (addr, v, v))
+ v = *addr;
+ return v;
+}
diff --git a/libgo/go/sync/atomic/atomic_test.go b/libgo/go/sync/atomic/atomic_test.go
index 119ad00..2229e58 100644
--- a/libgo/go/sync/atomic/atomic_test.go
+++ b/libgo/go/sync/atomic/atomic_test.go
@@ -308,6 +308,46 @@ func TestCompareAndSwapUintptr(t *testing.T) {
}
}
+func TestLoadInt32(t *testing.T) {
+ var x struct {
+ before int32
+ i int32
+ after int32
+ }
+ x.before = magic32
+ x.after = magic32
+ for delta := int32(1); delta+delta > delta; delta += delta {
+ k := LoadInt32(&x.i)
+ if k != x.i {
+ t.Fatalf("delta=%d i=%d k=%d", delta, x.i, k)
+ }
+ x.i += delta
+ }
+ if x.before != magic32 || x.after != magic32 {
+ t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magic32, magic32)
+ }
+}
+
+func TestLoadUint32(t *testing.T) {
+ var x struct {
+ before uint32
+ i uint32
+ after uint32
+ }
+ x.before = magic32
+ x.after = magic32
+ for delta := uint32(1); delta+delta > delta; delta += delta {
+ k := LoadUint32(&x.i)
+ if k != x.i {
+ t.Fatalf("delta=%d i=%d k=%d", delta, x.i, k)
+ }
+ x.i += delta
+ }
+ if x.before != magic32 || x.after != magic32 {
+ t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magic32, magic32)
+ }
+}
+
// Tests of correct behavior, with contention.
// (Is the function atomic?)
//
@@ -537,3 +577,65 @@ func TestHammer64(t *testing.T) {
}
}
}
+
+func hammerLoadInt32(t *testing.T, uval *uint32) {
+ val := (*int32)(unsafe.Pointer(uval))
+ for {
+ v := LoadInt32(val)
+ vlo := v & ((1 << 16) - 1)
+ vhi := v >> 16
+ if vlo != vhi {
+ t.Fatalf("LoadInt32: %#x != %#x", vlo, vhi)
+ }
+ new := v + 1 + 1<<16
+ if vlo == 1e4 {
+ new = 0
+ }
+ if CompareAndSwapInt32(val, v, new) {
+ break
+ }
+ }
+}
+
+func hammerLoadUint32(t *testing.T, val *uint32) {
+ for {
+ v := LoadUint32(val)
+ vlo := v & ((1 << 16) - 1)
+ vhi := v >> 16
+ if vlo != vhi {
+ t.Fatalf("LoadUint32: %#x != %#x", vlo, vhi)
+ }
+ new := v + 1 + 1<<16
+ if vlo == 1e4 {
+ new = 0
+ }
+ if CompareAndSwapUint32(val, v, new) {
+ break
+ }
+ }
+}
+
+func TestHammerLoad(t *testing.T) {
+ tests := [...]func(*testing.T, *uint32){hammerLoadInt32, hammerLoadUint32}
+ n := 100000
+ if testing.Short() {
+ n = 10000
+ }
+ const procs = 8
+ defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(procs))
+ for _, tt := range tests {
+ c := make(chan int)
+ var val uint32
+ for p := 0; p < procs; p++ {
+ go func() {
+ for i := 0; i < n; i++ {
+ tt(t, &val)
+ }
+ c <- 1
+ }()
+ }
+ for p := 0; p < procs; p++ {
+ <-c
+ }
+ }
+}
diff --git a/libgo/go/sync/atomic/doc.go b/libgo/go/sync/atomic/doc.go
index ec5a0d3..b35eb53 100644
--- a/libgo/go/sync/atomic/doc.go
+++ b/libgo/go/sync/atomic/doc.go
@@ -56,6 +56,12 @@ func AddUint64(val *uint64, delta uint64) (new uint64)
// AddUintptr atomically adds delta to *val and returns the new value.
func AddUintptr(val *uintptr, delta uintptr) (new uintptr)
+// LoadInt32 atomically loads *addr.
+func LoadInt32(addr *int32) (val int32)
+
+// LoadUint32 atomically loads *addr.
+func LoadUint32(addr *uint32) (val uint32)
+
// Helper for ARM. Linker will discard on other systems
func panic64() {
panic("sync/atomic: broken 64-bit atomic operations (buggy QEMU)")
diff --git a/libgo/go/sync/cond.go b/libgo/go/sync/cond.go
index ea48f2e..75494b5 100644
--- a/libgo/go/sync/cond.go
+++ b/libgo/go/sync/cond.go
@@ -14,10 +14,26 @@ import "runtime"
// which must be held when changing the condition and
// when calling the Wait method.
type Cond struct {
- L Locker // held while observing or changing the condition
- m Mutex // held to avoid internal races
- waiters int // number of goroutines blocked on Wait
- sema *uint32
+ L Locker // held while observing or changing the condition
+ m Mutex // held to avoid internal races
+
+ // We must be careful to make sure that when Signal
+ // releases a semaphore, the corresponding acquire is
+ // executed by a goroutine that was already waiting at
+ // the time of the call to Signal, not one that arrived later.
+ // To ensure this, we segment waiting goroutines into
+ // generations punctuated by calls to Signal. Each call to
+ // Signal begins another generation if there are no goroutines
+ // left in older generations for it to wake. Because of this
+ // optimization (only begin another generation if there
+ // are no older goroutines left), we only need to keep track
+ // of the two most recent generations, which we call old
+ // and new.
+ oldWaiters int // number of waiters in old generation...
+ oldSema *uint32 // ... waiting on this semaphore
+
+ newWaiters int // number of waiters in new generation...
+ newSema *uint32 // ... waiting on this semaphore
}
// NewCond returns a new Cond with Locker l.
@@ -42,11 +58,11 @@ func NewCond(l Locker) *Cond {
//
func (c *Cond) Wait() {
c.m.Lock()
- if c.sema == nil {
- c.sema = new(uint32)
+ if c.newSema == nil {
+ c.newSema = new(uint32)
}
- s := c.sema
- c.waiters++
+ s := c.newSema
+ c.newWaiters++
c.m.Unlock()
c.L.Unlock()
runtime.Semacquire(s)
@@ -59,9 +75,16 @@ func (c *Cond) Wait() {
// during the call.
func (c *Cond) Signal() {
c.m.Lock()
- if c.waiters > 0 {
- c.waiters--
- runtime.Semrelease(c.sema)
+ if c.oldWaiters == 0 && c.newWaiters > 0 {
+ // Retire old generation; rename new to old.
+ c.oldWaiters = c.newWaiters
+ c.oldSema = c.newSema
+ c.newWaiters = 0
+ c.newSema = nil
+ }
+ if c.oldWaiters > 0 {
+ c.oldWaiters--
+ runtime.Semrelease(c.oldSema)
}
c.m.Unlock()
}
@@ -72,19 +95,19 @@ func (c *Cond) Signal() {
// during the call.
func (c *Cond) Broadcast() {
c.m.Lock()
- if c.waiters > 0 {
- s := c.sema
- n := c.waiters
- for i := 0; i < n; i++ {
- runtime.Semrelease(s)
+ // Wake both generations.
+ if c.oldWaiters > 0 {
+ for i := 0; i < c.oldWaiters; i++ {
+ runtime.Semrelease(c.oldSema)
+ }
+ c.oldWaiters = 0
+ }
+ if c.newWaiters > 0 {
+ for i := 0; i < c.newWaiters; i++ {
+ runtime.Semrelease(c.newSema)
}
- // We just issued n wakeups via the semaphore s.
- // To ensure that they wake up the existing waiters
- // and not waiters that arrive after Broadcast returns,
- // clear c.sema. The next operation will allocate
- // a new one.
- c.sema = nil
- c.waiters = 0
+ c.newWaiters = 0
+ c.newSema = nil
}
c.m.Unlock()
}
diff --git a/libgo/go/sync/cond_test.go b/libgo/go/sync/cond_test.go
index 846f98b..cefacb1 100644
--- a/libgo/go/sync/cond_test.go
+++ b/libgo/go/sync/cond_test.go
@@ -46,6 +46,33 @@ func TestCondSignal(t *testing.T) {
c.Signal()
}
+func TestCondSignalGenerations(t *testing.T) {
+ var m Mutex
+ c := NewCond(&m)
+ n := 100
+ running := make(chan bool, n)
+ awake := make(chan int, n)
+ for i := 0; i < n; i++ {
+ go func(i int) {
+ m.Lock()
+ running <- true
+ c.Wait()
+ awake <- i
+ m.Unlock()
+ }(i)
+ if i > 0 {
+ a := <-awake
+ if a != i-1 {
+ t.Fatalf("wrong goroutine woke up: want %d, got %d", i-1, a)
+ }
+ }
+ <-running
+ m.Lock()
+ c.Signal()
+ m.Unlock()
+ }
+}
+
func TestCondBroadcast(t *testing.T) {
var m Mutex
c := NewCond(&m)
diff --git a/libgo/go/sync/mutex.go b/libgo/go/sync/mutex.go
index 13f03ca..2d46c89 100644
--- a/libgo/go/sync/mutex.go
+++ b/libgo/go/sync/mutex.go
@@ -17,8 +17,8 @@ import (
// Mutexes can be created as part of other structures;
// the zero value for a Mutex is an unlocked mutex.
type Mutex struct {
- key int32
- sema uint32
+ state int32
+ sema uint32
}
// A Locker represents an object that can be locked and unlocked.
@@ -27,15 +27,41 @@ type Locker interface {
Unlock()
}
+const (
+ mutexLocked = 1 << iota // mutex is locked
+ mutexWoken
+ mutexWaiterShift = iota
+)
+
// Lock locks m.
// If the lock is already in use, the calling goroutine
// blocks until the mutex is available.
func (m *Mutex) Lock() {
- if atomic.AddInt32(&m.key, 1) == 1 {
- // changed from 0 to 1; we hold lock
+ // Fast path: grab unlocked mutex.
+ if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
- runtime.Semacquire(&m.sema)
+
+ awoke := false
+ for {
+ old := m.state
+ new := old | mutexLocked
+ if old&mutexLocked != 0 {
+ new = old + 1<<mutexWaiterShift
+ }
+ if awoke {
+ // The goroutine has been woken from sleep,
+ // so we need to reset the flag in either case.
+ new &^= mutexWoken
+ }
+ if atomic.CompareAndSwapInt32(&m.state, old, new) {
+ if old&mutexLocked == 0 {
+ break
+ }
+ runtime.Semacquire(&m.sema)
+ awoke = true
+ }
+ }
}
// Unlock unlocks m.
@@ -45,14 +71,25 @@ func (m *Mutex) Lock() {
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
- switch v := atomic.AddInt32(&m.key, -1); {
- case v == 0:
- // changed from 1 to 0; no contention
- return
- case v == -1:
- // changed from 0 to -1: wasn't locked
- // (or there are 4 billion goroutines waiting)
+ // Fast path: drop lock bit.
+ new := atomic.AddInt32(&m.state, -mutexLocked)
+ if (new+mutexLocked)&mutexLocked == 0 {
panic("sync: unlock of unlocked mutex")
}
- runtime.Semrelease(&m.sema)
+
+ old := new
+ for {
+ // If there are no waiters or a goroutine has already
+ // been woken or grabbed the lock, no need to wake anyone.
+ if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
+ return
+ }
+ // Grab the right to wake someone.
+ new = (old - 1<<mutexWaiterShift) | mutexWoken
+ if atomic.CompareAndSwapInt32(&m.state, old, new) {
+ runtime.Semrelease(&m.sema)
+ return
+ }
+ old = m.state
+ }
}
diff --git a/libgo/go/sync/mutex_test.go b/libgo/go/sync/mutex_test.go
index f5c20ca..4775884 100644
--- a/libgo/go/sync/mutex_test.go
+++ b/libgo/go/sync/mutex_test.go
@@ -9,6 +9,7 @@ package sync_test
import (
"runtime"
. "sync"
+ "sync/atomic"
"testing"
)
@@ -43,7 +44,7 @@ func BenchmarkContendedSemaphore(b *testing.B) {
s := new(uint32)
*s = 1
c := make(chan bool)
- runtime.GOMAXPROCS(2)
+ defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(2))
b.StartTimer()
go HammerSemaphore(s, b.N/2, c)
@@ -52,7 +53,6 @@ func BenchmarkContendedSemaphore(b *testing.B) {
<-c
}
-
func HammerMutex(m *Mutex, loops int, cdone chan bool) {
for i := 0; i < loops; i++ {
m.Lock()
@@ -72,24 +72,6 @@ func TestMutex(t *testing.T) {
}
}
-func BenchmarkUncontendedMutex(b *testing.B) {
- m := new(Mutex)
- HammerMutex(m, b.N, make(chan bool, 2))
-}
-
-func BenchmarkContendedMutex(b *testing.B) {
- b.StopTimer()
- m := new(Mutex)
- c := make(chan bool)
- runtime.GOMAXPROCS(2)
- b.StartTimer()
-
- go HammerMutex(m, b.N/2, c)
- go HammerMutex(m, b.N/2, c)
- <-c
- <-c
-}
-
func TestMutexPanic(t *testing.T) {
defer func() {
if recover() == nil {
@@ -102,3 +84,83 @@ func TestMutexPanic(t *testing.T) {
mu.Unlock()
mu.Unlock()
}
+
+func BenchmarkMutexUncontended(b *testing.B) {
+ type PaddedMutex struct {
+ Mutex
+ pad [128]uint8
+ }
+ const CallsPerSched = 1000
+ procs := runtime.GOMAXPROCS(-1)
+ N := int32(b.N / CallsPerSched)
+ c := make(chan bool, procs)
+ for p := 0; p < procs; p++ {
+ go func() {
+ var mu PaddedMutex
+ for atomic.AddInt32(&N, -1) >= 0 {
+ runtime.Gosched()
+ for g := 0; g < CallsPerSched; g++ {
+ mu.Lock()
+ mu.Unlock()
+ }
+ }
+ c <- true
+ }()
+ }
+ for p := 0; p < procs; p++ {
+ <-c
+ }
+}
+
+func benchmarkMutex(b *testing.B, slack, work bool) {
+ const (
+ CallsPerSched = 1000
+ LocalWork = 100
+ GoroutineSlack = 10
+ )
+ procs := runtime.GOMAXPROCS(-1)
+ if slack {
+ procs *= GoroutineSlack
+ }
+ N := int32(b.N / CallsPerSched)
+ c := make(chan bool, procs)
+ var mu Mutex
+ for p := 0; p < procs; p++ {
+ go func() {
+ foo := 0
+ for atomic.AddInt32(&N, -1) >= 0 {
+ runtime.Gosched()
+ for g := 0; g < CallsPerSched; g++ {
+ mu.Lock()
+ mu.Unlock()
+ if work {
+ for i := 0; i < LocalWork; i++ {
+ foo *= 2
+ foo /= 2
+ }
+ }
+ }
+ }
+ c <- foo == 42
+ }()
+ }
+ for p := 0; p < procs; p++ {
+ <-c
+ }
+}
+
+func BenchmarkMutex(b *testing.B) {
+ benchmarkMutex(b, false, false)
+}
+
+func BenchmarkMutexSlack(b *testing.B) {
+ benchmarkMutex(b, true, false)
+}
+
+func BenchmarkMutexWork(b *testing.B) {
+ benchmarkMutex(b, false, true)
+}
+
+func BenchmarkMutexWorkSlack(b *testing.B) {
+ benchmarkMutex(b, true, true)
+}
diff --git a/libgo/go/sync/once.go b/libgo/go/sync/once.go
index b6f5f5a..04b714a 100644
--- a/libgo/go/sync/once.go
+++ b/libgo/go/sync/once.go
@@ -4,10 +4,14 @@
package sync
+import (
+ "sync/atomic"
+)
+
// Once is an object that will perform exactly one action.
type Once struct {
m Mutex
- done bool
+ done uint32
}
// Do calls the function f if and only if the method is being called for the
@@ -26,10 +30,14 @@ type Once struct {
// Do to be called, it will deadlock.
//
func (o *Once) Do(f func()) {
+ if atomic.LoadUint32(&o.done) == 1 {
+ return
+ }
+ // Slow-path.
o.m.Lock()
defer o.m.Unlock()
- if !o.done {
- o.done = true
+ if o.done == 0 {
f()
+ atomic.CompareAndSwapUint32(&o.done, 0, 1)
}
}
diff --git a/libgo/go/sync/once_test.go b/libgo/go/sync/once_test.go
index 155954a..157a366 100644
--- a/libgo/go/sync/once_test.go
+++ b/libgo/go/sync/once_test.go
@@ -6,6 +6,8 @@ package sync_test
import (
. "sync"
+ "sync/atomic"
+ "runtime"
"testing"
)
@@ -35,3 +37,26 @@ func TestOnce(t *testing.T) {
t.Errorf("once failed: %d is not 1", *o)
}
}
+
+func BenchmarkOnce(b *testing.B) {
+ const CallsPerSched = 1000
+ procs := runtime.GOMAXPROCS(-1)
+ N := int32(b.N / CallsPerSched)
+ var once Once
+ f := func() {}
+ c := make(chan bool, procs)
+ for p := 0; p < procs; p++ {
+ go func() {
+ for atomic.AddInt32(&N, -1) >= 0 {
+ runtime.Gosched()
+ for g := 0; g < CallsPerSched; g++ {
+ once.Do(f)
+ }
+ }
+ c <- true
+ }()
+ }
+ for p := 0; p < procs; p++ {
+ <-c
+ }
+}
diff --git a/libgo/go/sync/rwmutex.go b/libgo/go/sync/rwmutex.go
index 9248b4b..cb1a477 100644
--- a/libgo/go/sync/rwmutex.go
+++ b/libgo/go/sync/rwmutex.go
@@ -4,7 +4,10 @@
package sync
-import "sync/atomic"
+import (
+ "runtime"
+ "sync/atomic"
+)
// An RWMutex is a reader/writer mutual exclusion lock.
// The lock can be held by an arbitrary number of readers
@@ -12,35 +15,22 @@ import "sync/atomic"
// RWMutexes can be created as part of other
// structures; the zero value for a RWMutex is
// an unlocked mutex.
-//
-// Writers take priority over Readers: no new RLocks
-// are granted while a blocked Lock call is waiting.
type RWMutex struct {
- w Mutex // held if there are pending readers or writers
- r Mutex // held if the w is being rd
- readerCount int32 // number of pending readers
+ w Mutex // held if there are pending writers
+ writerSem uint32 // semaphore for writers to wait for completing readers
+ readerSem uint32 // semaphore for readers to wait for completing writers
+ readerCount int32 // number of pending readers
+ readerWait int32 // number of departing readers
}
+const rwmutexMaxReaders = 1 << 30
+
// RLock locks rw for reading.
-// If the lock is already locked for writing or there is a writer already waiting
-// to release the lock, RLock blocks until the writer has released the lock.
func (rw *RWMutex) RLock() {
- // Use rw.r.Lock() to block granting the RLock if a goroutine
- // is waiting for its Lock. This is the prevent starvation of W in
- // this situation:
- // A: rw.RLock() // granted
- // W: rw.Lock() // waiting for rw.w().Lock()
- // B: rw.RLock() // granted
- // C: rw.RLock() // granted
- // B: rw.RUnlock()
- // ... (new readers come and go indefinitely, W is starving)
- rw.r.Lock()
- if atomic.AddInt32(&rw.readerCount, 1) == 1 {
- // The first reader locks rw.w, so writers will be blocked
- // while the readers have the RLock.
- rw.w.Lock()
+ if atomic.AddInt32(&rw.readerCount, 1) < 0 {
+ // A writer is pending, wait for it.
+ runtime.Semacquire(&rw.readerSem)
}
- rw.r.Unlock()
}
// RUnlock undoes a single RLock call;
@@ -48,9 +38,12 @@ func (rw *RWMutex) RLock() {
// It is a run-time error if rw is not locked for reading
// on entry to RUnlock.
func (rw *RWMutex) RUnlock() {
- if atomic.AddInt32(&rw.readerCount, -1) == 0 {
- // last reader finished, enable writers
- rw.w.Unlock()
+ if atomic.AddInt32(&rw.readerCount, -1) < 0 {
+ // A writer is pending.
+ if atomic.AddInt32(&rw.readerWait, -1) == 0 {
+ // The last reader unblocks the writer.
+ runtime.Semrelease(&rw.writerSem)
+ }
}
}
@@ -61,9 +54,14 @@ func (rw *RWMutex) RUnlock() {
// a blocked Lock call excludes new readers from acquiring
// the lock.
func (rw *RWMutex) Lock() {
- rw.r.Lock()
+ // First, resolve competition with other writers.
rw.w.Lock()
- rw.r.Unlock()
+ // Announce to readers there is a pending writer.
+ r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
+ // Wait for active readers.
+ if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
+ runtime.Semacquire(&rw.writerSem)
+ }
}
// Unlock unlocks rw for writing. It is a run-time error if rw is
@@ -72,7 +70,16 @@ func (rw *RWMutex) Lock() {
// As with Mutexes, a locked RWMutex is not associated with a particular
// goroutine. One goroutine may RLock (Lock) an RWMutex and then
// arrange for another goroutine to RUnlock (Unlock) it.
-func (rw *RWMutex) Unlock() { rw.w.Unlock() }
+func (rw *RWMutex) Unlock() {
+ // Announce to readers there is no active writer.
+ r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
+ // Unblock blocked readers, if any.
+ for i := 0; i < int(r); i++ {
+ runtime.Semrelease(&rw.readerSem)
+ }
+ // Allow other writers to proceed.
+ rw.w.Unlock()
+}
// RLocker returns a Locker interface that implements
// the Lock and Unlock methods by calling rw.RLock and rw.RUnlock.
diff --git a/libgo/go/sync/rwmutex_test.go b/libgo/go/sync/rwmutex_test.go
index 9fb89f8..dc8ce96 100644
--- a/libgo/go/sync/rwmutex_test.go
+++ b/libgo/go/sync/rwmutex_test.go
@@ -45,6 +45,7 @@ func doTestParallelReaders(numReaders, gomaxprocs int) {
}
func TestParallelReaders(t *testing.T) {
+ defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(-1))
doTestParallelReaders(1, 4)
doTestParallelReaders(3, 4)
doTestParallelReaders(4, 2)
@@ -102,6 +103,7 @@ func HammerRWMutex(gomaxprocs, numReaders, num_iterations int) {
}
func TestRWMutex(t *testing.T) {
+ defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(-1))
n := 1000
if testing.Short() {
n = 5
@@ -152,3 +154,84 @@ func TestRLocker(t *testing.T) {
wl.Unlock()
}
}
+
+func BenchmarkRWMutexUncontended(b *testing.B) {
+ type PaddedRWMutex struct {
+ RWMutex
+ pad [32]uint32
+ }
+ const CallsPerSched = 1000
+ procs := runtime.GOMAXPROCS(-1)
+ N := int32(b.N / CallsPerSched)
+ c := make(chan bool, procs)
+ for p := 0; p < procs; p++ {
+ go func() {
+ var rwm PaddedRWMutex
+ for atomic.AddInt32(&N, -1) >= 0 {
+ runtime.Gosched()
+ for g := 0; g < CallsPerSched; g++ {
+ rwm.RLock()
+ rwm.RLock()
+ rwm.RUnlock()
+ rwm.RUnlock()
+ rwm.Lock()
+ rwm.Unlock()
+ }
+ }
+ c <- true
+ }()
+ }
+ for p := 0; p < procs; p++ {
+ <-c
+ }
+}
+
+func benchmarkRWMutex(b *testing.B, localWork, writeRatio int) {
+ const CallsPerSched = 1000
+ procs := runtime.GOMAXPROCS(-1)
+ N := int32(b.N / CallsPerSched)
+ c := make(chan bool, procs)
+ var rwm RWMutex
+ for p := 0; p < procs; p++ {
+ go func() {
+ foo := 0
+ for atomic.AddInt32(&N, -1) >= 0 {
+ runtime.Gosched()
+ for g := 0; g < CallsPerSched; g++ {
+ foo++
+ if foo%writeRatio == 0 {
+ rwm.Lock()
+ rwm.Unlock()
+ } else {
+ rwm.RLock()
+ for i := 0; i != localWork; i += 1 {
+ foo *= 2
+ foo /= 2
+ }
+ rwm.RUnlock()
+ }
+ }
+ }
+ c <- foo == 42
+ }()
+ }
+ for p := 0; p < procs; p++ {
+ <-c
+ }
+}
+
+func BenchmarkRWMutexWrite100(b *testing.B) {
+ benchmarkRWMutex(b, 0, 100)
+}
+
+func BenchmarkRWMutexWrite10(b *testing.B) {
+ benchmarkRWMutex(b, 0, 10)
+}
+
+func BenchmarkRWMutexWorkWrite100(b *testing.B) {
+ benchmarkRWMutex(b, 100, 100)
+}
+
+func BenchmarkRWMutexWorkWrite10(b *testing.B) {
+ benchmarkRWMutex(b, 100, 10)
+}
diff --git a/libgo/go/sync/waitgroup.go b/libgo/go/sync/waitgroup.go
index 05478c6..a4c9b7e 100644
--- a/libgo/go/sync/waitgroup.go
+++ b/libgo/go/sync/waitgroup.go
@@ -4,7 +4,10 @@
package sync
-import "runtime"
+import (
+ "runtime"
+ "sync/atomic"
+)
// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
@@ -28,8 +31,8 @@ import "runtime"
//
type WaitGroup struct {
m Mutex
- counter int
- waiters int
+ counter int32
+ waiters int32
sema *uint32
}
@@ -48,19 +51,19 @@ type WaitGroup struct {
// Add adds delta, which may be negative, to the WaitGroup counter.
// If the counter becomes zero, all goroutines blocked on Wait() are released.
func (wg *WaitGroup) Add(delta int) {
- wg.m.Lock()
- if delta < -wg.counter {
- wg.m.Unlock()
+ v := atomic.AddInt32(&wg.counter, int32(delta))
+ if v < 0 {
panic("sync: negative WaitGroup count")
}
- wg.counter += delta
- if wg.counter == 0 && wg.waiters > 0 {
- for i := 0; i < wg.waiters; i++ {
- runtime.Semrelease(wg.sema)
- }
- wg.waiters = 0
- wg.sema = nil
+ if v > 0 || atomic.LoadInt32(&wg.waiters) == 0 {
+ return
}
+ wg.m.Lock()
+ for i := int32(0); i < wg.waiters; i++ {
+ runtime.Semrelease(wg.sema)
+ }
+ wg.waiters = 0
+ wg.sema = nil
wg.m.Unlock()
}
@@ -71,12 +74,20 @@ func (wg *WaitGroup) Done() {
// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
+ if atomic.LoadInt32(&wg.counter) == 0 {
+ return
+ }
wg.m.Lock()
- if wg.counter == 0 {
+ atomic.AddInt32(&wg.waiters, 1)
+ // This code is racing with the unlocked path in Add above.
+ // The code above modifies counter and then reads waiters.
+ // We must modify waiters and then read counter (the opposite order)
+ // to avoid missing an Add.
+ if atomic.LoadInt32(&wg.counter) == 0 {
+ atomic.AddInt32(&wg.waiters, -1)
wg.m.Unlock()
return
}
- wg.waiters++
if wg.sema == nil {
wg.sema = new(uint32)
}
diff --git a/libgo/go/sync/waitgroup_test.go b/libgo/go/sync/waitgroup_test.go
index fe35732..34430fc 100644
--- a/libgo/go/sync/waitgroup_test.go
+++ b/libgo/go/sync/waitgroup_test.go
@@ -5,7 +5,9 @@
package sync_test
import (
+ "runtime"
. "sync"
+ "sync/atomic"
"testing"
)
@@ -58,3 +60,106 @@ func TestWaitGroupMisuse(t *testing.T) {
wg.Done()
t.Fatal("Should panic")
}
+
+func BenchmarkWaitGroupUncontended(b *testing.B) {
+ type PaddedWaitGroup struct {
+ WaitGroup
+ pad [128]uint8
+ }
+ const CallsPerSched = 1000
+ procs := runtime.GOMAXPROCS(-1)
+ N := int32(b.N / CallsPerSched)
+ c := make(chan bool, procs)
+ for p := 0; p < procs; p++ {
+ go func() {
+ var wg PaddedWaitGroup
+ for atomic.AddInt32(&N, -1) >= 0 {
+ runtime.Gosched()
+ for g := 0; g < CallsPerSched; g++ {
+ wg.Add(1)
+ wg.Done()
+ wg.Wait()
+ }
+ }
+ c <- true
+ }()
+ }
+ for p := 0; p < procs; p++ {
+ <-c
+ }
+}
+
+func benchmarkWaitGroupAddDone(b *testing.B, localWork int) {
+ const CallsPerSched = 1000
+ procs := runtime.GOMAXPROCS(-1)
+ N := int32(b.N / CallsPerSched)
+ c := make(chan bool, procs)
+ var wg WaitGroup
+ for p := 0; p < procs; p++ {
+ go func() {
+ foo := 0
+ for atomic.AddInt32(&N, -1) >= 0 {
+ runtime.Gosched()
+ for g := 0; g < CallsPerSched; g++ {
+ wg.Add(1)
+ for i := 0; i < localWork; i++ {
+ foo *= 2
+ foo /= 2
+ }
+ wg.Done()
+ }
+ }
+ c <- foo == 42
+ }()
+ }
+ for p := 0; p < procs; p++ {
+ <-c
+ }
+}
+
+func BenchmarkWaitGroupAddDone(b *testing.B) {
+ benchmarkWaitGroupAddDone(b, 0)
+}
+
+func BenchmarkWaitGroupAddDoneWork(b *testing.B) {
+ benchmarkWaitGroupAddDone(b, 100)
+}
+
+func benchmarkWaitGroupWait(b *testing.B, localWork int) {
+ const CallsPerSched = 1000
+ procs := runtime.GOMAXPROCS(-1)
+ N := int32(b.N / CallsPerSched)
+ c := make(chan bool, procs)
+ var wg WaitGroup
+ wg.Add(procs)
+ for p := 0; p < procs; p++ {
+ go wg.Done()
+ }
+ for p := 0; p < procs; p++ {
+ go func() {
+ foo := 0
+ for atomic.AddInt32(&N, -1) >= 0 {
+ runtime.Gosched()
+ for g := 0; g < CallsPerSched; g++ {
+ wg.Wait()
+ for i := 0; i < localWork; i++ {
+ foo *= 2
+ foo /= 2
+ }
+ }
+ }
+ c <- foo == 42
+ }()
+ }
+ for p := 0; p < procs; p++ {
+ <-c
+ }
+}
+
+func BenchmarkWaitGroupWait(b *testing.B) {
+ benchmarkWaitGroupWait(b, 0)
+}
+
+func BenchmarkWaitGroupWaitWork(b *testing.B) {
+ benchmarkWaitGroupWait(b, 100)
+}