diff options
author | Ian Lance Taylor <ian@gcc.gnu.org> | 2011-09-16 15:47:21 +0000 |
---|---|---|
committer | Ian Lance Taylor <ian@gcc.gnu.org> | 2011-09-16 15:47:21 +0000 |
commit | adb0401dac41c81571722312d4586b2693f95aa6 (patch) | |
tree | ea2b52e3c258d6b6d9356977c683c7f72a4a5fd5 /libgo/go/sync | |
parent | 5548ca3540bccbc908a45942896d635ea5f1c23f (diff) | |
download | gcc-adb0401dac41c81571722312d4586b2693f95aa6.zip gcc-adb0401dac41c81571722312d4586b2693f95aa6.tar.gz gcc-adb0401dac41c81571722312d4586b2693f95aa6.tar.bz2 |
Update Go library to r60.
From-SVN: r178910
Diffstat (limited to 'libgo/go/sync')
-rw-r--r-- | libgo/go/sync/atomic/atomic.c | 28 | ||||
-rw-r--r-- | libgo/go/sync/atomic/atomic_test.go | 102 | ||||
-rw-r--r-- | libgo/go/sync/atomic/doc.go | 6 | ||||
-rw-r--r-- | libgo/go/sync/cond.go | 69 | ||||
-rw-r--r-- | libgo/go/sync/cond_test.go | 27 | ||||
-rw-r--r-- | libgo/go/sync/mutex.go | 63 | ||||
-rw-r--r-- | libgo/go/sync/mutex_test.go | 102 | ||||
-rw-r--r-- | libgo/go/sync/once.go | 14 | ||||
-rw-r--r-- | libgo/go/sync/once_test.go | 25 | ||||
-rw-r--r-- | libgo/go/sync/rwmutex.go | 67 | ||||
-rw-r--r-- | libgo/go/sync/rwmutex_test.go | 83 | ||||
-rw-r--r-- | libgo/go/sync/waitgroup.go | 41 | ||||
-rw-r--r-- | libgo/go/sync/waitgroup_test.go | 105 |
13 files changed, 628 insertions, 104 deletions
diff --git a/libgo/go/sync/atomic/atomic.c b/libgo/go/sync/atomic/atomic.c index e2d9b24..6660a7d 100644 --- a/libgo/go/sync/atomic/atomic.c +++ b/libgo/go/sync/atomic/atomic.c @@ -95,3 +95,31 @@ AddUintptr (uintptr_t *val, uintptr_t delta) { return __sync_add_and_fetch (val, delta); } + +int32_t LoadInt32 (int32_t *addr) + asm ("libgo_sync.atomic.LoadInt32"); + +int32_t +LoadInt32 (int32_t *addr) +{ + int32_t v; + + v = *addr; + while (! __sync_bool_compare_and_swap (addr, v, v)) + v = *addr; + return v; +} + +uint32_t LoadUint32 (uint32_t *addr) + asm ("libgo_sync.atomic.LoadUint32"); + +uint32_t +LoadUint32 (uint32_t *addr) +{ + uint32_t v; + + v = *addr; + while (! __sync_bool_compare_and_swap (addr, v, v)) + v = *addr; + return v; +} diff --git a/libgo/go/sync/atomic/atomic_test.go b/libgo/go/sync/atomic/atomic_test.go index 119ad00..2229e58 100644 --- a/libgo/go/sync/atomic/atomic_test.go +++ b/libgo/go/sync/atomic/atomic_test.go @@ -308,6 +308,46 @@ func TestCompareAndSwapUintptr(t *testing.T) { } } +func TestLoadInt32(t *testing.T) { + var x struct { + before int32 + i int32 + after int32 + } + x.before = magic32 + x.after = magic32 + for delta := int32(1); delta+delta > delta; delta += delta { + k := LoadInt32(&x.i) + if k != x.i { + t.Fatalf("delta=%d i=%d k=%d", delta, x.i, k) + } + x.i += delta + } + if x.before != magic32 || x.after != magic32 { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magic32, magic32) + } +} + +func TestLoadUint32(t *testing.T) { + var x struct { + before uint32 + i uint32 + after uint32 + } + x.before = magic32 + x.after = magic32 + for delta := uint32(1); delta+delta > delta; delta += delta { + k := LoadUint32(&x.i) + if k != x.i { + t.Fatalf("delta=%d i=%d k=%d", delta, x.i, k) + } + x.i += delta + } + if x.before != magic32 || x.after != magic32 { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magic32, magic32) + } +} + // Tests of correct behavior, with contention. // (Is the function atomic?) // @@ -537,3 +577,65 @@ func TestHammer64(t *testing.T) { } } } + +func hammerLoadInt32(t *testing.T, uval *uint32) { + val := (*int32)(unsafe.Pointer(uval)) + for { + v := LoadInt32(val) + vlo := v & ((1 << 16) - 1) + vhi := v >> 16 + if vlo != vhi { + t.Fatalf("LoadInt32: %#x != %#x", vlo, vhi) + } + new := v + 1 + 1<<16 + if vlo == 1e4 { + new = 0 + } + if CompareAndSwapInt32(val, v, new) { + break + } + } +} + +func hammerLoadUint32(t *testing.T, val *uint32) { + for { + v := LoadUint32(val) + vlo := v & ((1 << 16) - 1) + vhi := v >> 16 + if vlo != vhi { + t.Fatalf("LoadUint32: %#x != %#x", vlo, vhi) + } + new := v + 1 + 1<<16 + if vlo == 1e4 { + new = 0 + } + if CompareAndSwapUint32(val, v, new) { + break + } + } +} + +func TestHammerLoad(t *testing.T) { + tests := [...]func(*testing.T, *uint32){hammerLoadInt32, hammerLoadUint32} + n := 100000 + if testing.Short() { + n = 10000 + } + const procs = 8 + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(procs)) + for _, tt := range tests { + c := make(chan int) + var val uint32 + for p := 0; p < procs; p++ { + go func() { + for i := 0; i < n; i++ { + tt(t, &val) + } + c <- 1 + }() + } + for p := 0; p < procs; p++ { + <-c + } + } +} diff --git a/libgo/go/sync/atomic/doc.go b/libgo/go/sync/atomic/doc.go index ec5a0d3..b35eb53 100644 --- a/libgo/go/sync/atomic/doc.go +++ b/libgo/go/sync/atomic/doc.go @@ -56,6 +56,12 @@ func AddUint64(val *uint64, delta uint64) (new uint64) // AddUintptr atomically adds delta to *val and returns the new value. func AddUintptr(val *uintptr, delta uintptr) (new uintptr) +// LoadInt32 atomically loads *addr. +func LoadInt32(addr *int32) (val int32) + +// LoadUint32 atomically loads *addr. +func LoadUint32(addr *uint32) (val uint32) + // Helper for ARM. Linker will discard on other systems func panic64() { panic("sync/atomic: broken 64-bit atomic operations (buggy QEMU)") diff --git a/libgo/go/sync/cond.go b/libgo/go/sync/cond.go index ea48f2e..75494b5 100644 --- a/libgo/go/sync/cond.go +++ b/libgo/go/sync/cond.go @@ -14,10 +14,26 @@ import "runtime" // which must be held when changing the condition and // when calling the Wait method. type Cond struct { - L Locker // held while observing or changing the condition - m Mutex // held to avoid internal races - waiters int // number of goroutines blocked on Wait - sema *uint32 + L Locker // held while observing or changing the condition + m Mutex // held to avoid internal races + + // We must be careful to make sure that when Signal + // releases a semaphore, the corresponding acquire is + // executed by a goroutine that was already waiting at + // the time of the call to Signal, not one that arrived later. + // To ensure this, we segment waiting goroutines into + // generations punctuated by calls to Signal. Each call to + // Signal begins another generation if there are no goroutines + // left in older generations for it to wake. Because of this + // optimization (only begin another generation if there + // are no older goroutines left), we only need to keep track + // of the two most recent generations, which we call old + // and new. + oldWaiters int // number of waiters in old generation... + oldSema *uint32 // ... waiting on this semaphore + + newWaiters int // number of waiters in new generation... + newSema *uint32 // ... waiting on this semaphore } // NewCond returns a new Cond with Locker l. @@ -42,11 +58,11 @@ func NewCond(l Locker) *Cond { // func (c *Cond) Wait() { c.m.Lock() - if c.sema == nil { - c.sema = new(uint32) + if c.newSema == nil { + c.newSema = new(uint32) } - s := c.sema - c.waiters++ + s := c.newSema + c.newWaiters++ c.m.Unlock() c.L.Unlock() runtime.Semacquire(s) @@ -59,9 +75,16 @@ func (c *Cond) Wait() { // during the call. func (c *Cond) Signal() { c.m.Lock() - if c.waiters > 0 { - c.waiters-- - runtime.Semrelease(c.sema) + if c.oldWaiters == 0 && c.newWaiters > 0 { + // Retire old generation; rename new to old. + c.oldWaiters = c.newWaiters + c.oldSema = c.newSema + c.newWaiters = 0 + c.newSema = nil + } + if c.oldWaiters > 0 { + c.oldWaiters-- + runtime.Semrelease(c.oldSema) } c.m.Unlock() } @@ -72,19 +95,19 @@ func (c *Cond) Signal() { // during the call. func (c *Cond) Broadcast() { c.m.Lock() - if c.waiters > 0 { - s := c.sema - n := c.waiters - for i := 0; i < n; i++ { - runtime.Semrelease(s) + // Wake both generations. + if c.oldWaiters > 0 { + for i := 0; i < c.oldWaiters; i++ { + runtime.Semrelease(c.oldSema) + } + c.oldWaiters = 0 + } + if c.newWaiters > 0 { + for i := 0; i < c.newWaiters; i++ { + runtime.Semrelease(c.newSema) } - // We just issued n wakeups via the semaphore s. - // To ensure that they wake up the existing waiters - // and not waiters that arrive after Broadcast returns, - // clear c.sema. The next operation will allocate - // a new one. - c.sema = nil - c.waiters = 0 + c.newWaiters = 0 + c.newSema = nil } c.m.Unlock() } diff --git a/libgo/go/sync/cond_test.go b/libgo/go/sync/cond_test.go index 846f98b..cefacb1 100644 --- a/libgo/go/sync/cond_test.go +++ b/libgo/go/sync/cond_test.go @@ -46,6 +46,33 @@ func TestCondSignal(t *testing.T) { c.Signal() } +func TestCondSignalGenerations(t *testing.T) { + var m Mutex + c := NewCond(&m) + n := 100 + running := make(chan bool, n) + awake := make(chan int, n) + for i := 0; i < n; i++ { + go func(i int) { + m.Lock() + running <- true + c.Wait() + awake <- i + m.Unlock() + }(i) + if i > 0 { + a := <-awake + if a != i-1 { + t.Fatalf("wrong goroutine woke up: want %d, got %d", i-1, a) + } + } + <-running + m.Lock() + c.Signal() + m.Unlock() + } +} + func TestCondBroadcast(t *testing.T) { var m Mutex c := NewCond(&m) diff --git a/libgo/go/sync/mutex.go b/libgo/go/sync/mutex.go index 13f03ca..2d46c89 100644 --- a/libgo/go/sync/mutex.go +++ b/libgo/go/sync/mutex.go @@ -17,8 +17,8 @@ import ( // Mutexes can be created as part of other structures; // the zero value for a Mutex is an unlocked mutex. type Mutex struct { - key int32 - sema uint32 + state int32 + sema uint32 } // A Locker represents an object that can be locked and unlocked. @@ -27,15 +27,41 @@ type Locker interface { Unlock() } +const ( + mutexLocked = 1 << iota // mutex is locked + mutexWoken + mutexWaiterShift = iota +) + // Lock locks m. // If the lock is already in use, the calling goroutine // blocks until the mutex is available. func (m *Mutex) Lock() { - if atomic.AddInt32(&m.key, 1) == 1 { - // changed from 0 to 1; we hold lock + // Fast path: grab unlocked mutex. + if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { return } - runtime.Semacquire(&m.sema) + + awoke := false + for { + old := m.state + new := old | mutexLocked + if old&mutexLocked != 0 { + new = old + 1<<mutexWaiterShift + } + if awoke { + // The goroutine has been woken from sleep, + // so we need to reset the flag in either case. + new &^= mutexWoken + } + if atomic.CompareAndSwapInt32(&m.state, old, new) { + if old&mutexLocked == 0 { + break + } + runtime.Semacquire(&m.sema) + awoke = true + } + } } // Unlock unlocks m. @@ -45,14 +71,25 @@ func (m *Mutex) Lock() { // It is allowed for one goroutine to lock a Mutex and then // arrange for another goroutine to unlock it. func (m *Mutex) Unlock() { - switch v := atomic.AddInt32(&m.key, -1); { - case v == 0: - // changed from 1 to 0; no contention - return - case v == -1: - // changed from 0 to -1: wasn't locked - // (or there are 4 billion goroutines waiting) + // Fast path: drop lock bit. + new := atomic.AddInt32(&m.state, -mutexLocked) + if (new+mutexLocked)&mutexLocked == 0 { panic("sync: unlock of unlocked mutex") } - runtime.Semrelease(&m.sema) + + old := new + for { + // If there are no waiters or a goroutine has already + // been woken or grabbed the lock, no need to wake anyone. + if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 { + return + } + // Grab the right to wake someone. + new = (old - 1<<mutexWaiterShift) | mutexWoken + if atomic.CompareAndSwapInt32(&m.state, old, new) { + runtime.Semrelease(&m.sema) + return + } + old = m.state + } } diff --git a/libgo/go/sync/mutex_test.go b/libgo/go/sync/mutex_test.go index f5c20ca..4775884 100644 --- a/libgo/go/sync/mutex_test.go +++ b/libgo/go/sync/mutex_test.go @@ -9,6 +9,7 @@ package sync_test import ( "runtime" . "sync" + "sync/atomic" "testing" ) @@ -43,7 +44,7 @@ func BenchmarkContendedSemaphore(b *testing.B) { s := new(uint32) *s = 1 c := make(chan bool) - runtime.GOMAXPROCS(2) + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(2)) b.StartTimer() go HammerSemaphore(s, b.N/2, c) @@ -52,7 +53,6 @@ func BenchmarkContendedSemaphore(b *testing.B) { <-c } - func HammerMutex(m *Mutex, loops int, cdone chan bool) { for i := 0; i < loops; i++ { m.Lock() @@ -72,24 +72,6 @@ func TestMutex(t *testing.T) { } } -func BenchmarkUncontendedMutex(b *testing.B) { - m := new(Mutex) - HammerMutex(m, b.N, make(chan bool, 2)) -} - -func BenchmarkContendedMutex(b *testing.B) { - b.StopTimer() - m := new(Mutex) - c := make(chan bool) - runtime.GOMAXPROCS(2) - b.StartTimer() - - go HammerMutex(m, b.N/2, c) - go HammerMutex(m, b.N/2, c) - <-c - <-c -} - func TestMutexPanic(t *testing.T) { defer func() { if recover() == nil { @@ -102,3 +84,83 @@ func TestMutexPanic(t *testing.T) { mu.Unlock() mu.Unlock() } + +func BenchmarkMutexUncontended(b *testing.B) { + type PaddedMutex struct { + Mutex + pad [128]uint8 + } + const CallsPerSched = 1000 + procs := runtime.GOMAXPROCS(-1) + N := int32(b.N / CallsPerSched) + c := make(chan bool, procs) + for p := 0; p < procs; p++ { + go func() { + var mu PaddedMutex + for atomic.AddInt32(&N, -1) >= 0 { + runtime.Gosched() + for g := 0; g < CallsPerSched; g++ { + mu.Lock() + mu.Unlock() + } + } + c <- true + }() + } + for p := 0; p < procs; p++ { + <-c + } +} + +func benchmarkMutex(b *testing.B, slack, work bool) { + const ( + CallsPerSched = 1000 + LocalWork = 100 + GoroutineSlack = 10 + ) + procs := runtime.GOMAXPROCS(-1) + if slack { + procs *= GoroutineSlack + } + N := int32(b.N / CallsPerSched) + c := make(chan bool, procs) + var mu Mutex + for p := 0; p < procs; p++ { + go func() { + foo := 0 + for atomic.AddInt32(&N, -1) >= 0 { + runtime.Gosched() + for g := 0; g < CallsPerSched; g++ { + mu.Lock() + mu.Unlock() + if work { + for i := 0; i < LocalWork; i++ { + foo *= 2 + foo /= 2 + } + } + } + } + c <- foo == 42 + }() + } + for p := 0; p < procs; p++ { + <-c + } +} + +func BenchmarkMutex(b *testing.B) { + benchmarkMutex(b, false, false) +} + +func BenchmarkMutexSlack(b *testing.B) { + benchmarkMutex(b, true, false) +} + +func BenchmarkMutexWork(b *testing.B) { + benchmarkMutex(b, false, true) +} + +func BenchmarkMutexWorkSlack(b *testing.B) { + benchmarkMutex(b, true, true) +} diff --git a/libgo/go/sync/once.go b/libgo/go/sync/once.go index b6f5f5a..04b714a 100644 --- a/libgo/go/sync/once.go +++ b/libgo/go/sync/once.go @@ -4,10 +4,14 @@ package sync +import ( + "sync/atomic" +) + // Once is an object that will perform exactly one action. type Once struct { m Mutex - done bool + done uint32 } // Do calls the function f if and only if the method is being called for the @@ -26,10 +30,14 @@ type Once struct { // Do to be called, it will deadlock. // func (o *Once) Do(f func()) { + if atomic.LoadUint32(&o.done) == 1 { + return + } + // Slow-path. o.m.Lock() defer o.m.Unlock() - if !o.done { - o.done = true + if o.done == 0 { f() + atomic.CompareAndSwapUint32(&o.done, 0, 1) } } diff --git a/libgo/go/sync/once_test.go b/libgo/go/sync/once_test.go index 155954a..157a366 100644 --- a/libgo/go/sync/once_test.go +++ b/libgo/go/sync/once_test.go @@ -6,6 +6,8 @@ package sync_test import ( . "sync" + "sync/atomic" + "runtime" "testing" ) @@ -35,3 +37,26 @@ func TestOnce(t *testing.T) { t.Errorf("once failed: %d is not 1", *o) } } + +func BenchmarkOnce(b *testing.B) { + const CallsPerSched = 1000 + procs := runtime.GOMAXPROCS(-1) + N := int32(b.N / CallsPerSched) + var once Once + f := func() {} + c := make(chan bool, procs) + for p := 0; p < procs; p++ { + go func() { + for atomic.AddInt32(&N, -1) >= 0 { + runtime.Gosched() + for g := 0; g < CallsPerSched; g++ { + once.Do(f) + } + } + c <- true + }() + } + for p := 0; p < procs; p++ { + <-c + } +} diff --git a/libgo/go/sync/rwmutex.go b/libgo/go/sync/rwmutex.go index 9248b4b..cb1a477 100644 --- a/libgo/go/sync/rwmutex.go +++ b/libgo/go/sync/rwmutex.go @@ -4,7 +4,10 @@ package sync -import "sync/atomic" +import ( + "runtime" + "sync/atomic" +) // An RWMutex is a reader/writer mutual exclusion lock. // The lock can be held by an arbitrary number of readers @@ -12,35 +15,22 @@ import "sync/atomic" // RWMutexes can be created as part of other // structures; the zero value for a RWMutex is // an unlocked mutex. -// -// Writers take priority over Readers: no new RLocks -// are granted while a blocked Lock call is waiting. type RWMutex struct { - w Mutex // held if there are pending readers or writers - r Mutex // held if the w is being rd - readerCount int32 // number of pending readers + w Mutex // held if there are pending writers + writerSem uint32 // semaphore for writers to wait for completing readers + readerSem uint32 // semaphore for readers to wait for completing writers + readerCount int32 // number of pending readers + readerWait int32 // number of departing readers } +const rwmutexMaxReaders = 1 << 30 + // RLock locks rw for reading. -// If the lock is already locked for writing or there is a writer already waiting -// to release the lock, RLock blocks until the writer has released the lock. func (rw *RWMutex) RLock() { - // Use rw.r.Lock() to block granting the RLock if a goroutine - // is waiting for its Lock. This is the prevent starvation of W in - // this situation: - // A: rw.RLock() // granted - // W: rw.Lock() // waiting for rw.w().Lock() - // B: rw.RLock() // granted - // C: rw.RLock() // granted - // B: rw.RUnlock() - // ... (new readers come and go indefinitely, W is starving) - rw.r.Lock() - if atomic.AddInt32(&rw.readerCount, 1) == 1 { - // The first reader locks rw.w, so writers will be blocked - // while the readers have the RLock. - rw.w.Lock() + if atomic.AddInt32(&rw.readerCount, 1) < 0 { + // A writer is pending, wait for it. + runtime.Semacquire(&rw.readerSem) } - rw.r.Unlock() } // RUnlock undoes a single RLock call; @@ -48,9 +38,12 @@ func (rw *RWMutex) RLock() { // It is a run-time error if rw is not locked for reading // on entry to RUnlock. func (rw *RWMutex) RUnlock() { - if atomic.AddInt32(&rw.readerCount, -1) == 0 { - // last reader finished, enable writers - rw.w.Unlock() + if atomic.AddInt32(&rw.readerCount, -1) < 0 { + // A writer is pending. + if atomic.AddInt32(&rw.readerWait, -1) == 0 { + // The last reader unblocks the writer. + runtime.Semrelease(&rw.writerSem) + } } } @@ -61,9 +54,14 @@ func (rw *RWMutex) RUnlock() { // a blocked Lock call excludes new readers from acquiring // the lock. func (rw *RWMutex) Lock() { - rw.r.Lock() + // First, resolve competition with other writers. rw.w.Lock() - rw.r.Unlock() + // Announce to readers there is a pending writer. + r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders + // Wait for active readers. + if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { + runtime.Semacquire(&rw.writerSem) + } } // Unlock unlocks rw for writing. It is a run-time error if rw is @@ -72,7 +70,16 @@ func (rw *RWMutex) Lock() { // As with Mutexes, a locked RWMutex is not associated with a particular // goroutine. One goroutine may RLock (Lock) an RWMutex and then // arrange for another goroutine to RUnlock (Unlock) it. -func (rw *RWMutex) Unlock() { rw.w.Unlock() } +func (rw *RWMutex) Unlock() { + // Announce to readers there is no active writer. + r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) + // Unblock blocked readers, if any. + for i := 0; i < int(r); i++ { + runtime.Semrelease(&rw.readerSem) + } + // Allow other writers to proceed. + rw.w.Unlock() +} // RLocker returns a Locker interface that implements // the Lock and Unlock methods by calling rw.RLock and rw.RUnlock. diff --git a/libgo/go/sync/rwmutex_test.go b/libgo/go/sync/rwmutex_test.go index 9fb89f8..dc8ce96 100644 --- a/libgo/go/sync/rwmutex_test.go +++ b/libgo/go/sync/rwmutex_test.go @@ -45,6 +45,7 @@ func doTestParallelReaders(numReaders, gomaxprocs int) { } func TestParallelReaders(t *testing.T) { + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(-1)) doTestParallelReaders(1, 4) doTestParallelReaders(3, 4) doTestParallelReaders(4, 2) @@ -102,6 +103,7 @@ func HammerRWMutex(gomaxprocs, numReaders, num_iterations int) { } func TestRWMutex(t *testing.T) { + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(-1)) n := 1000 if testing.Short() { n = 5 @@ -152,3 +154,84 @@ func TestRLocker(t *testing.T) { wl.Unlock() } } + +func BenchmarkRWMutexUncontended(b *testing.B) { + type PaddedRWMutex struct { + RWMutex + pad [32]uint32 + } + const CallsPerSched = 1000 + procs := runtime.GOMAXPROCS(-1) + N := int32(b.N / CallsPerSched) + c := make(chan bool, procs) + for p := 0; p < procs; p++ { + go func() { + var rwm PaddedRWMutex + for atomic.AddInt32(&N, -1) >= 0 { + runtime.Gosched() + for g := 0; g < CallsPerSched; g++ { + rwm.RLock() + rwm.RLock() + rwm.RUnlock() + rwm.RUnlock() + rwm.Lock() + rwm.Unlock() + } + } + c <- true + }() + } + for p := 0; p < procs; p++ { + <-c + } +} + +func benchmarkRWMutex(b *testing.B, localWork, writeRatio int) { + const CallsPerSched = 1000 + procs := runtime.GOMAXPROCS(-1) + N := int32(b.N / CallsPerSched) + c := make(chan bool, procs) + var rwm RWMutex + for p := 0; p < procs; p++ { + go func() { + foo := 0 + for atomic.AddInt32(&N, -1) >= 0 { + runtime.Gosched() + for g := 0; g < CallsPerSched; g++ { + foo++ + if foo%writeRatio == 0 { + rwm.Lock() + rwm.Unlock() + } else { + rwm.RLock() + for i := 0; i != localWork; i += 1 { + foo *= 2 + foo /= 2 + } + rwm.RUnlock() + } + } + } + c <- foo == 42 + }() + } + for p := 0; p < procs; p++ { + <-c + } +} + +func BenchmarkRWMutexWrite100(b *testing.B) { + benchmarkRWMutex(b, 0, 100) +} + +func BenchmarkRWMutexWrite10(b *testing.B) { + benchmarkRWMutex(b, 0, 10) +} + +func BenchmarkRWMutexWorkWrite100(b *testing.B) { + benchmarkRWMutex(b, 100, 100) +} + +func BenchmarkRWMutexWorkWrite10(b *testing.B) { + benchmarkRWMutex(b, 100, 10) +} diff --git a/libgo/go/sync/waitgroup.go b/libgo/go/sync/waitgroup.go index 05478c6..a4c9b7e 100644 --- a/libgo/go/sync/waitgroup.go +++ b/libgo/go/sync/waitgroup.go @@ -4,7 +4,10 @@ package sync -import "runtime" +import ( + "runtime" + "sync/atomic" +) // A WaitGroup waits for a collection of goroutines to finish. // The main goroutine calls Add to set the number of @@ -28,8 +31,8 @@ import "runtime" // type WaitGroup struct { m Mutex - counter int - waiters int + counter int32 + waiters int32 sema *uint32 } @@ -48,19 +51,19 @@ type WaitGroup struct { // Add adds delta, which may be negative, to the WaitGroup counter. // If the counter becomes zero, all goroutines blocked on Wait() are released. func (wg *WaitGroup) Add(delta int) { - wg.m.Lock() - if delta < -wg.counter { - wg.m.Unlock() + v := atomic.AddInt32(&wg.counter, int32(delta)) + if v < 0 { panic("sync: negative WaitGroup count") } - wg.counter += delta - if wg.counter == 0 && wg.waiters > 0 { - for i := 0; i < wg.waiters; i++ { - runtime.Semrelease(wg.sema) - } - wg.waiters = 0 - wg.sema = nil + if v > 0 || atomic.LoadInt32(&wg.waiters) == 0 { + return } + wg.m.Lock() + for i := int32(0); i < wg.waiters; i++ { + runtime.Semrelease(wg.sema) + } + wg.waiters = 0 + wg.sema = nil wg.m.Unlock() } @@ -71,12 +74,20 @@ func (wg *WaitGroup) Done() { // Wait blocks until the WaitGroup counter is zero. func (wg *WaitGroup) Wait() { + if atomic.LoadInt32(&wg.counter) == 0 { + return + } wg.m.Lock() - if wg.counter == 0 { + atomic.AddInt32(&wg.waiters, 1) + // This code is racing with the unlocked path in Add above. + // The code above modifies counter and then reads waiters. + // We must modify waiters and then read counter (the opposite order) + // to avoid missing an Add. + if atomic.LoadInt32(&wg.counter) == 0 { + atomic.AddInt32(&wg.waiters, -1) wg.m.Unlock() return } - wg.waiters++ if wg.sema == nil { wg.sema = new(uint32) } diff --git a/libgo/go/sync/waitgroup_test.go b/libgo/go/sync/waitgroup_test.go index fe35732..34430fc 100644 --- a/libgo/go/sync/waitgroup_test.go +++ b/libgo/go/sync/waitgroup_test.go @@ -5,7 +5,9 @@ package sync_test import ( + "runtime" . "sync" + "sync/atomic" "testing" ) @@ -58,3 +60,106 @@ func TestWaitGroupMisuse(t *testing.T) { wg.Done() t.Fatal("Should panic") } + +func BenchmarkWaitGroupUncontended(b *testing.B) { + type PaddedWaitGroup struct { + WaitGroup + pad [128]uint8 + } + const CallsPerSched = 1000 + procs := runtime.GOMAXPROCS(-1) + N := int32(b.N / CallsPerSched) + c := make(chan bool, procs) + for p := 0; p < procs; p++ { + go func() { + var wg PaddedWaitGroup + for atomic.AddInt32(&N, -1) >= 0 { + runtime.Gosched() + for g := 0; g < CallsPerSched; g++ { + wg.Add(1) + wg.Done() + wg.Wait() + } + } + c <- true + }() + } + for p := 0; p < procs; p++ { + <-c + } +} + +func benchmarkWaitGroupAddDone(b *testing.B, localWork int) { + const CallsPerSched = 1000 + procs := runtime.GOMAXPROCS(-1) + N := int32(b.N / CallsPerSched) + c := make(chan bool, procs) + var wg WaitGroup + for p := 0; p < procs; p++ { + go func() { + foo := 0 + for atomic.AddInt32(&N, -1) >= 0 { + runtime.Gosched() + for g := 0; g < CallsPerSched; g++ { + wg.Add(1) + for i := 0; i < localWork; i++ { + foo *= 2 + foo /= 2 + } + wg.Done() + } + } + c <- foo == 42 + }() + } + for p := 0; p < procs; p++ { + <-c + } +} + +func BenchmarkWaitGroupAddDone(b *testing.B) { + benchmarkWaitGroupAddDone(b, 0) +} + +func BenchmarkWaitGroupAddDoneWork(b *testing.B) { + benchmarkWaitGroupAddDone(b, 100) +} + +func benchmarkWaitGroupWait(b *testing.B, localWork int) { + const CallsPerSched = 1000 + procs := runtime.GOMAXPROCS(-1) + N := int32(b.N / CallsPerSched) + c := make(chan bool, procs) + var wg WaitGroup + wg.Add(procs) + for p := 0; p < procs; p++ { + go wg.Done() + } + for p := 0; p < procs; p++ { + go func() { + foo := 0 + for atomic.AddInt32(&N, -1) >= 0 { + runtime.Gosched() + for g := 0; g < CallsPerSched; g++ { + wg.Wait() + for i := 0; i < localWork; i++ { + foo *= 2 + foo /= 2 + } + } + } + c <- foo == 42 + }() + } + for p := 0; p < procs; p++ { + <-c + } +} + +func BenchmarkWaitGroupWait(b *testing.B) { + benchmarkWaitGroupWait(b, 0) +} + +func BenchmarkWaitGroupWaitWork(b *testing.B) { + benchmarkWaitGroupWait(b, 100) +} |