diff options
author | Ian Lance Taylor <ian@gcc.gnu.org> | 2015-10-31 00:59:47 +0000 |
---|---|---|
committer | Ian Lance Taylor <ian@gcc.gnu.org> | 2015-10-31 00:59:47 +0000 |
commit | af146490bb04205107cb23e301ec7a8ff927b5fc (patch) | |
tree | 13beeaed3698c61903fe93fb1ce70bd9b18d4e7f /libgo/go/sync | |
parent | 725e1be3406315d9bcc8195d7eef0a7082b3c7cc (diff) | |
download | gcc-af146490bb04205107cb23e301ec7a8ff927b5fc.zip gcc-af146490bb04205107cb23e301ec7a8ff927b5fc.tar.gz gcc-af146490bb04205107cb23e301ec7a8ff927b5fc.tar.bz2 |
runtime: Remove now unnecessary pad field from ParFor.
It is not needed due to the removal of the ctx field.
Reviewed-on: https://go-review.googlesource.com/16525
From-SVN: r229616
Diffstat (limited to 'libgo/go/sync')
-rw-r--r-- | libgo/go/sync/atomic/atomic_test.go | 71 | ||||
-rw-r--r-- | libgo/go/sync/export_test.go | 2 | ||||
-rw-r--r-- | libgo/go/sync/mutex.go | 17 | ||||
-rw-r--r-- | libgo/go/sync/mutex_test.go | 55 | ||||
-rw-r--r-- | libgo/go/sync/runtime.go | 7 | ||||
-rw-r--r-- | libgo/go/sync/waitgroup.go | 134 | ||||
-rw-r--r-- | libgo/go/sync/waitgroup_test.go | 123 |
7 files changed, 275 insertions, 134 deletions
diff --git a/libgo/go/sync/atomic/atomic_test.go b/libgo/go/sync/atomic/atomic_test.go index eaa3b6b..6dae0fd 100644 --- a/libgo/go/sync/atomic/atomic_test.go +++ b/libgo/go/sync/atomic/atomic_test.go @@ -164,7 +164,7 @@ func TestSwapPointer(t *testing.T) { x.before = magicptr x.after = magicptr var j uintptr - for delta := uintptr(1); delta+delta > delta; delta += delta { + for delta := uintptr(1 << 16); delta+delta > delta; delta += delta { k := SwapPointer(&x.i, unsafe.Pointer(delta)) if uintptr(x.i) != delta || uintptr(k) != j { t.Fatalf("delta=%d i=%d j=%d k=%d", delta, x.i, j, k) @@ -456,7 +456,7 @@ func TestCompareAndSwapPointer(t *testing.T) { magicptr := uintptr(m) x.before = magicptr x.after = magicptr - for val := uintptr(1); val+val > val; val += val { + for val := uintptr(1 << 16); val+val > val; val += val { x.i = unsafe.Pointer(val) if !CompareAndSwapPointer(&x.i, unsafe.Pointer(val), unsafe.Pointer(val+1)) { t.Fatalf("should have swapped %#x %#x", val, val+1) @@ -595,7 +595,7 @@ func TestLoadPointer(t *testing.T) { magicptr := uintptr(m) x.before = magicptr x.after = magicptr - for delta := uintptr(1); delta+delta > delta; delta += delta { + for delta := uintptr(1 << 16); delta+delta > delta; delta += delta { k := LoadPointer(&x.i) if k != x.i { t.Fatalf("delta=%d i=%d k=%d", delta, x.i, k) @@ -731,7 +731,7 @@ func TestStorePointer(t *testing.T) { x.before = magicptr x.after = magicptr v := unsafe.Pointer(uintptr(0)) - for delta := uintptr(1); delta+delta > delta; delta += delta { + for delta := uintptr(1 << 16); delta+delta > delta; delta += delta { StorePointer(&x.i, unsafe.Pointer(v)) if x.i != v { t.Fatalf("delta=%d i=%d v=%d", delta, x.i, v) @@ -759,14 +759,12 @@ var hammer32 = map[string]func(*uint32, int){ "SwapInt32": hammerSwapInt32, "SwapUint32": hammerSwapUint32, "SwapUintptr": hammerSwapUintptr32, - "SwapPointer": hammerSwapPointer32, "AddInt32": hammerAddInt32, "AddUint32": hammerAddUint32, "AddUintptr": hammerAddUintptr32, "CompareAndSwapInt32": hammerCompareAndSwapInt32, "CompareAndSwapUint32": hammerCompareAndSwapUint32, "CompareAndSwapUintptr": hammerCompareAndSwapUintptr32, - "CompareAndSwapPointer": hammerCompareAndSwapPointer32, } func init() { @@ -818,20 +816,6 @@ func hammerSwapUintptr32(uaddr *uint32, count int) { } } -func hammerSwapPointer32(uaddr *uint32, count int) { - // only safe when uintptr is 32-bit. - // not called on 64-bit systems. - addr := (*unsafe.Pointer)(unsafe.Pointer(uaddr)) - seed := int(uintptr(unsafe.Pointer(&count))) - for i := 0; i < count; i++ { - new := uintptr(seed+i)<<16 | uintptr(seed+i)<<16>>16 - old := uintptr(SwapPointer(addr, unsafe.Pointer(new))) - if old>>16 != old<<16>>16 { - panic(fmt.Sprintf("SwapPointer is not atomic: %#08x", old)) - } - } -} - func hammerAddInt32(uaddr *uint32, count int) { addr := (*int32)(unsafe.Pointer(uaddr)) for i := 0; i < count; i++ { @@ -891,20 +875,6 @@ func hammerCompareAndSwapUintptr32(uaddr *uint32, count int) { } } -func hammerCompareAndSwapPointer32(uaddr *uint32, count int) { - // only safe when uintptr is 32-bit. - // not called on 64-bit systems. - addr := (*unsafe.Pointer)(unsafe.Pointer(uaddr)) - for i := 0; i < count; i++ { - for { - v := LoadPointer(addr) - if CompareAndSwapPointer(addr, v, unsafe.Pointer(uintptr(v)+1)) { - break - } - } - } -} - func TestHammer32(t *testing.T) { const p = 4 n := 100000 @@ -940,14 +910,12 @@ var hammer64 = map[string]func(*uint64, int){ "SwapInt64": hammerSwapInt64, "SwapUint64": hammerSwapUint64, "SwapUintptr": hammerSwapUintptr64, - "SwapPointer": hammerSwapPointer64, "AddInt64": hammerAddInt64, "AddUint64": hammerAddUint64, "AddUintptr": hammerAddUintptr64, "CompareAndSwapInt64": hammerCompareAndSwapInt64, "CompareAndSwapUint64": hammerCompareAndSwapUint64, "CompareAndSwapUintptr": hammerCompareAndSwapUintptr64, - "CompareAndSwapPointer": hammerCompareAndSwapPointer64, } func init() { @@ -999,20 +967,6 @@ func hammerSwapUintptr64(uaddr *uint64, count int) { } } -func hammerSwapPointer64(uaddr *uint64, count int) { - // only safe when uintptr is 64-bit. - // not called on 32-bit systems. - addr := (*unsafe.Pointer)(unsafe.Pointer(uaddr)) - seed := int(uintptr(unsafe.Pointer(&count))) - for i := 0; i < count; i++ { - new := uintptr(seed+i)<<32 | uintptr(seed+i)<<32>>32 - old := uintptr(SwapPointer(addr, unsafe.Pointer(new))) - if old>>32 != old<<32>>32 { - panic(fmt.Sprintf("SwapPointer is not atomic: %v", old)) - } - } -} - func hammerAddInt64(uaddr *uint64, count int) { addr := (*int64)(unsafe.Pointer(uaddr)) for i := 0; i < count; i++ { @@ -1072,20 +1026,6 @@ func hammerCompareAndSwapUintptr64(uaddr *uint64, count int) { } } -func hammerCompareAndSwapPointer64(uaddr *uint64, count int) { - // only safe when uintptr is 64-bit. - // not called on 32-bit systems. - addr := (*unsafe.Pointer)(unsafe.Pointer(uaddr)) - for i := 0; i < count; i++ { - for { - v := LoadPointer(addr) - if CompareAndSwapPointer(addr, v, unsafe.Pointer(uintptr(v)+1)) { - break - } - } - } -} - func TestHammer64(t *testing.T) { if test64err != nil { t.Skipf("Skipping 64-bit tests: %v", test64err) @@ -1465,9 +1405,6 @@ func TestUnaligned64(t *testing.T) { } func TestNilDeref(t *testing.T) { - if p := runtime.GOOS + "/" + runtime.GOARCH; p == "freebsd/arm" || p == "netbsd/arm" { - t.Skipf("issue 7338: skipping test on %q", p) - } funcs := [...]func(){ func() { CompareAndSwapInt32(nil, 0, 0) }, func() { CompareAndSwapInt64(nil, 0, 0) }, diff --git a/libgo/go/sync/export_test.go b/libgo/go/sync/export_test.go index fa5983a..6f49b3b 100644 --- a/libgo/go/sync/export_test.go +++ b/libgo/go/sync/export_test.go @@ -7,3 +7,5 @@ package sync // Export for testing. var Runtime_Semacquire = runtime_Semacquire var Runtime_Semrelease = runtime_Semrelease + +const RaceEnabled = raceenabled diff --git a/libgo/go/sync/mutex.go b/libgo/go/sync/mutex.go index 73b3377..3f280ad 100644 --- a/libgo/go/sync/mutex.go +++ b/libgo/go/sync/mutex.go @@ -48,15 +48,31 @@ func (m *Mutex) Lock() { } awoke := false + iter := 0 for { old := m.state new := old | mutexLocked if old&mutexLocked != 0 { + if runtime_canSpin(iter) { + // Active spinning makes sense. + // Try to set mutexWoken flag to inform Unlock + // to not wake other blocked goroutines. + if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && + atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { + awoke = true + } + runtime_doSpin() + iter++ + continue + } new = old + 1<<mutexWaiterShift } if awoke { // The goroutine has been woken from sleep, // so we need to reset the flag in either case. + if new&mutexWoken == 0 { + panic("sync: inconsistent mutex state") + } new &^= mutexWoken } if atomic.CompareAndSwapInt32(&m.state, old, new) { @@ -65,6 +81,7 @@ func (m *Mutex) Lock() { } runtime_Semacquire(&m.sema) awoke = true + iter = 0 } } diff --git a/libgo/go/sync/mutex_test.go b/libgo/go/sync/mutex_test.go index 151b25c..91a4855 100644 --- a/libgo/go/sync/mutex_test.go +++ b/libgo/go/sync/mutex_test.go @@ -134,3 +134,58 @@ func BenchmarkMutexWork(b *testing.B) { func BenchmarkMutexWorkSlack(b *testing.B) { benchmarkMutex(b, true, true) } + +func BenchmarkMutexNoSpin(b *testing.B) { + // This benchmark models a situation where spinning in the mutex should be + // non-profitable and allows to confirm that spinning does not do harm. + // To achieve this we create excess of goroutines most of which do local work. + // These goroutines yield during local work, so that switching from + // a blocked goroutine to other goroutines is profitable. + // As a matter of fact, this benchmark still triggers some spinning in the mutex. + var m Mutex + var acc0, acc1 uint64 + b.SetParallelism(4) + b.RunParallel(func(pb *testing.PB) { + c := make(chan bool) + var data [4 << 10]uint64 + for i := 0; pb.Next(); i++ { + if i%4 == 0 { + m.Lock() + acc0 -= 100 + acc1 += 100 + m.Unlock() + } else { + for i := 0; i < len(data); i += 4 { + data[i]++ + } + // Elaborate way to say runtime.Gosched + // that does not put the goroutine onto global runq. + go func() { + c <- true + }() + <-c + } + } + }) +} + +func BenchmarkMutexSpin(b *testing.B) { + // This benchmark models a situation where spinning in the mutex should be + // profitable. To achieve this we create a goroutine per-proc. + // These goroutines access considerable amount of local data so that + // unnecessary rescheduling is penalized by cache misses. + var m Mutex + var acc0, acc1 uint64 + b.RunParallel(func(pb *testing.PB) { + var data [16 << 10]uint64 + for i := 0; pb.Next(); i++ { + m.Lock() + acc0 -= 100 + acc1 += 100 + m.Unlock() + for i := 0; i < len(data); i += 4 { + data[i]++ + } + } + }) +} diff --git a/libgo/go/sync/runtime.go b/libgo/go/sync/runtime.go index 3b86630..c66d2de 100644 --- a/libgo/go/sync/runtime.go +++ b/libgo/go/sync/runtime.go @@ -38,3 +38,10 @@ func init() { var s syncSema runtime_Syncsemcheck(unsafe.Sizeof(s)) } + +// Active spinning runtime support. +// runtime_canSpin returns true is spinning makes sense at the moment. +func runtime_canSpin(i int) bool + +// runtime_doSpin does active spinning. +func runtime_doSpin() diff --git a/libgo/go/sync/waitgroup.go b/libgo/go/sync/waitgroup.go index 92cc57d..de399e6 100644 --- a/libgo/go/sync/waitgroup.go +++ b/libgo/go/sync/waitgroup.go @@ -15,23 +15,21 @@ import ( // runs and calls Done when finished. At the same time, // Wait can be used to block until all goroutines have finished. type WaitGroup struct { - m Mutex - counter int32 - waiters int32 - sema *uint32 + // 64-bit value: high 32 bits are counter, low 32 bits are waiter count. + // 64-bit atomic operations require 64-bit alignment, but 32-bit + // compilers do not ensure it. So we allocate 12 bytes and then use + // the aligned 8 bytes in them as state. + state1 [12]byte + sema uint32 } -// WaitGroup creates a new semaphore each time the old semaphore -// is released. This is to avoid the following race: -// -// G1: Add(1) -// G1: go G2() -// G1: Wait() // Context switch after Unlock() and before Semacquire(). -// G2: Done() // Release semaphore: sema == 1, waiters == 0. G1 doesn't run yet. -// G3: Wait() // Finds counter == 0, waiters == 0, doesn't block. -// G3: Add(1) // Makes counter == 1, waiters == 0. -// G3: go G4() -// G3: Wait() // G1 still hasn't run, G3 finds sema == 1, unblocked! Bug. +func (wg *WaitGroup) state() *uint64 { + if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { + return (*uint64)(unsafe.Pointer(&wg.state1)) + } else { + return (*uint64)(unsafe.Pointer(&wg.state1[4])) + } +} // Add adds delta, which may be negative, to the WaitGroup counter. // If the counter becomes zero, all goroutines blocked on Wait are released. @@ -43,10 +41,13 @@ type WaitGroup struct { // at any time. // Typically this means the calls to Add should execute before the statement // creating the goroutine or other event to be waited for. +// If a WaitGroup is reused to wait for several independent sets of events, +// new Add calls must happen after all previous Wait calls have returned. // See the WaitGroup example. func (wg *WaitGroup) Add(delta int) { + statep := wg.state() if raceenabled { - _ = wg.m.state // trigger nil deref early + _ = *statep // trigger nil deref early if delta < 0 { // Synchronize decrements with Wait. raceReleaseMerge(unsafe.Pointer(wg)) @@ -54,7 +55,9 @@ func (wg *WaitGroup) Add(delta int) { raceDisable() defer raceEnable() } - v := atomic.AddInt32(&wg.counter, int32(delta)) + state := atomic.AddUint64(statep, uint64(delta)<<32) + v := int32(state >> 32) + w := uint32(state) if raceenabled { if delta > 0 && v == int32(delta) { // The first increment must be synchronized with Wait. @@ -66,18 +69,25 @@ func (wg *WaitGroup) Add(delta int) { if v < 0 { panic("sync: negative WaitGroup counter") } - if v > 0 || atomic.LoadInt32(&wg.waiters) == 0 { + if w != 0 && delta > 0 && v == int32(delta) { + panic("sync: WaitGroup misuse: Add called concurrently with Wait") + } + if v > 0 || w == 0 { return } - wg.m.Lock() - if atomic.LoadInt32(&wg.counter) == 0 { - for i := int32(0); i < wg.waiters; i++ { - runtime_Semrelease(wg.sema) - } - wg.waiters = 0 - wg.sema = nil + // This goroutine has set counter to 0 when waiters > 0. + // Now there can't be concurrent mutations of state: + // - Adds must not happen concurrently with Wait, + // - Wait does not increment waiters if it sees counter == 0. + // Still do a cheap sanity check to detect WaitGroup misuse. + if *statep != state { + panic("sync: WaitGroup misuse: Add called concurrently with Wait") + } + // Reset waiters count to 0. + *statep = 0 + for ; w != 0; w-- { + runtime_Semrelease(&wg.sema) } - wg.m.Unlock() } // Done decrements the WaitGroup counter. @@ -87,51 +97,41 @@ func (wg *WaitGroup) Done() { // Wait blocks until the WaitGroup counter is zero. func (wg *WaitGroup) Wait() { + statep := wg.state() if raceenabled { - _ = wg.m.state // trigger nil deref early + _ = *statep // trigger nil deref early raceDisable() } - if atomic.LoadInt32(&wg.counter) == 0 { - if raceenabled { - raceEnable() - raceAcquire(unsafe.Pointer(wg)) - } - return - } - wg.m.Lock() - w := atomic.AddInt32(&wg.waiters, 1) - // This code is racing with the unlocked path in Add above. - // The code above modifies counter and then reads waiters. - // We must modify waiters and then read counter (the opposite order) - // to avoid missing an Add. - if atomic.LoadInt32(&wg.counter) == 0 { - atomic.AddInt32(&wg.waiters, -1) - if raceenabled { - raceEnable() - raceAcquire(unsafe.Pointer(wg)) - raceDisable() + for { + state := atomic.LoadUint64(statep) + v := int32(state >> 32) + w := uint32(state) + if v == 0 { + // Counter is 0, no need to wait. + if raceenabled { + raceEnable() + raceAcquire(unsafe.Pointer(wg)) + } + return } - wg.m.Unlock() - if raceenabled { - raceEnable() + // Increment waiters count. + if atomic.CompareAndSwapUint64(statep, state, state+1) { + if raceenabled && w == 0 { + // Wait must be synchronized with the first Add. + // Need to model this is as a write to race with the read in Add. + // As a consequence, can do the write only for the first waiter, + // otherwise concurrent Waits will race with each other. + raceWrite(unsafe.Pointer(&wg.sema)) + } + runtime_Semacquire(&wg.sema) + if *statep != 0 { + panic("sync: WaitGroup is reused before previous Wait has returned") + } + if raceenabled { + raceEnable() + raceAcquire(unsafe.Pointer(wg)) + } + return } - return - } - if raceenabled && w == 1 { - // Wait must be synchronized with the first Add. - // Need to model this is as a write to race with the read in Add. - // As a consequence, can do the write only for the first waiter, - // otherwise concurrent Waits will race with each other. - raceWrite(unsafe.Pointer(&wg.sema)) - } - if wg.sema == nil { - wg.sema = new(uint32) - } - s := wg.sema - wg.m.Unlock() - runtime_Semacquire(s) - if raceenabled { - raceEnable() - raceAcquire(unsafe.Pointer(wg)) } } diff --git a/libgo/go/sync/waitgroup_test.go b/libgo/go/sync/waitgroup_test.go index 4c0a043..3e3e3bf 100644 --- a/libgo/go/sync/waitgroup_test.go +++ b/libgo/go/sync/waitgroup_test.go @@ -5,6 +5,7 @@ package sync_test import ( + "runtime" . "sync" "sync/atomic" "testing" @@ -46,6 +47,12 @@ func TestWaitGroup(t *testing.T) { } } +func knownRacy(t *testing.T) { + if RaceEnabled { + t.Skip("skipping known-racy test under the race detector") + } +} + func TestWaitGroupMisuse(t *testing.T) { defer func() { err := recover() @@ -60,6 +67,95 @@ func TestWaitGroupMisuse(t *testing.T) { t.Fatal("Should panic") } +func TestWaitGroupMisuse2(t *testing.T) { + knownRacy(t) + if testing.Short() { + t.Skip("skipping flaky test in short mode; see issue 11443") + } + if runtime.NumCPU() <= 2 { + t.Skip("NumCPU<=2, skipping: this test requires parallelism") + } + defer func() { + err := recover() + if err != "sync: negative WaitGroup counter" && + err != "sync: WaitGroup misuse: Add called concurrently with Wait" && + err != "sync: WaitGroup is reused before previous Wait has returned" { + t.Fatalf("Unexpected panic: %#v", err) + } + }() + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(4)) + done := make(chan interface{}, 2) + // The detection is opportunistically, so we want it to panic + // at least in one run out of a million. + for i := 0; i < 1e6; i++ { + var wg WaitGroup + wg.Add(1) + go func() { + defer func() { + done <- recover() + }() + wg.Wait() + }() + go func() { + defer func() { + done <- recover() + }() + wg.Add(1) // This is the bad guy. + wg.Done() + }() + wg.Done() + for j := 0; j < 2; j++ { + if err := <-done; err != nil { + panic(err) + } + } + } + t.Fatal("Should panic") +} + +func TestWaitGroupMisuse3(t *testing.T) { + knownRacy(t) + if runtime.NumCPU() <= 1 { + t.Skip("NumCPU==1, skipping: this test requires parallelism") + } + defer func() { + err := recover() + if err != "sync: negative WaitGroup counter" && + err != "sync: WaitGroup misuse: Add called concurrently with Wait" && + err != "sync: WaitGroup is reused before previous Wait has returned" { + t.Fatalf("Unexpected panic: %#v", err) + } + }() + defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(4)) + done := make(chan interface{}, 1) + // The detection is opportunistically, so we want it to panic + // at least in one run out of a million. + for i := 0; i < 1e6; i++ { + var wg WaitGroup + wg.Add(1) + go func() { + wg.Done() + }() + go func() { + defer func() { + done <- recover() + }() + wg.Wait() + // Start reusing the wg before waiting for the Wait below to return. + wg.Add(1) + go func() { + wg.Done() + }() + wg.Wait() + }() + wg.Wait() + if err := <-done; err != nil { + panic(err) + } + } + t.Fatal("Should panic") +} + func TestWaitGroupRace(t *testing.T) { // Run this test for about 1ms. for i := 0; i < 1000; i++ { @@ -85,6 +181,19 @@ func TestWaitGroupRace(t *testing.T) { } } +func TestWaitGroupAlign(t *testing.T) { + type X struct { + x byte + wg WaitGroup + } + var x X + x.wg.Add(1) + go func(x *X) { + x.wg.Done() + }(&x) + x.wg.Wait() +} + func BenchmarkWaitGroupUncontended(b *testing.B) { type PaddedWaitGroup struct { WaitGroup @@ -146,3 +255,17 @@ func BenchmarkWaitGroupWait(b *testing.B) { func BenchmarkWaitGroupWaitWork(b *testing.B) { benchmarkWaitGroupWait(b, 100) } + +func BenchmarkWaitGroupActuallyWait(b *testing.B) { + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + var wg WaitGroup + wg.Add(1) + go func() { + wg.Done() + }() + wg.Wait() + } + }) +} |