diff options
author | Ian Lance Taylor <iant@golang.org> | 2019-09-06 18:12:46 +0000 |
---|---|---|
committer | Ian Lance Taylor <ian@gcc.gnu.org> | 2019-09-06 18:12:46 +0000 |
commit | aa8901e9bb0399d2c16f988ba2fe46eb0c0c5d13 (patch) | |
tree | 7e63b06d1eec92beec6997c9d3ab47a5d6a835be /libgo/go/sync | |
parent | 920ea3b8ba3164b61ac9490dfdfceb6936eda6dd (diff) | |
download | gcc-aa8901e9bb0399d2c16f988ba2fe46eb0c0c5d13.zip gcc-aa8901e9bb0399d2c16f988ba2fe46eb0c0c5d13.tar.gz gcc-aa8901e9bb0399d2c16f988ba2fe46eb0c0c5d13.tar.bz2 |
libgo: update to Go 1.13beta1 release
Reviewed-on: https://go-review.googlesource.com/c/gofrontend/+/193497
From-SVN: r275473
Diffstat (limited to 'libgo/go/sync')
-rw-r--r-- | libgo/go/sync/export_test.go | 42 | ||||
-rw-r--r-- | libgo/go/sync/mutex.go | 18 | ||||
-rw-r--r-- | libgo/go/sync/mutex_test.go | 2 | ||||
-rw-r--r-- | libgo/go/sync/once.go | 16 | ||||
-rw-r--r-- | libgo/go/sync/pool.go | 136 | ||||
-rw-r--r-- | libgo/go/sync/pool_test.go | 180 | ||||
-rw-r--r-- | libgo/go/sync/poolqueue.go | 309 | ||||
-rw-r--r-- | libgo/go/sync/runtime.go | 8 | ||||
-rw-r--r-- | libgo/go/sync/runtime_sema_test.go | 6 | ||||
-rw-r--r-- | libgo/go/sync/rwmutex.go | 29 | ||||
-rw-r--r-- | libgo/go/sync/waitgroup.go | 2 |
11 files changed, 664 insertions, 84 deletions
diff --git a/libgo/go/sync/export_test.go b/libgo/go/sync/export_test.go index 669076e..10d3599 100644 --- a/libgo/go/sync/export_test.go +++ b/libgo/go/sync/export_test.go @@ -9,3 +9,45 @@ var Runtime_Semacquire = runtime_Semacquire var Runtime_Semrelease = runtime_Semrelease var Runtime_procPin = runtime_procPin var Runtime_procUnpin = runtime_procUnpin + +// poolDequeue testing. +type PoolDequeue interface { + PushHead(val interface{}) bool + PopHead() (interface{}, bool) + PopTail() (interface{}, bool) +} + +func NewPoolDequeue(n int) PoolDequeue { + return &poolDequeue{ + vals: make([]eface, n), + } +} + +func (d *poolDequeue) PushHead(val interface{}) bool { + return d.pushHead(val) +} + +func (d *poolDequeue) PopHead() (interface{}, bool) { + return d.popHead() +} + +func (d *poolDequeue) PopTail() (interface{}, bool) { + return d.popTail() +} + +func NewPoolChain() PoolDequeue { + return new(poolChain) +} + +func (c *poolChain) PushHead(val interface{}) bool { + c.pushHead(val) + return true +} + +func (c *poolChain) PopHead() (interface{}, bool) { + return c.popHead() +} + +func (c *poolChain) PopTail() (interface{}, bool) { + return c.popTail() +} diff --git a/libgo/go/sync/mutex.go b/libgo/go/sync/mutex.go index 4c5582c..11ad20c 100644 --- a/libgo/go/sync/mutex.go +++ b/libgo/go/sync/mutex.go @@ -77,7 +77,11 @@ func (m *Mutex) Lock() { } return } + // Slow path (outlined so that the fast path can be inlined) + m.lockSlow() +} +func (m *Mutex) lockSlow() { var waitStartTime int64 starving := false awoke := false @@ -131,7 +135,7 @@ func (m *Mutex) Lock() { if waitStartTime == 0 { waitStartTime = runtime_nanotime() } - runtime_SemacquireMutex(&m.sema, queueLifo) + runtime_SemacquireMutex(&m.sema, queueLifo, 1) starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { @@ -180,6 +184,14 @@ func (m *Mutex) Unlock() { // Fast path: drop lock bit. new := atomic.AddInt32(&m.state, -mutexLocked) + if new != 0 { + // Outlined slow path to allow inlining the fast path. + // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock. + m.unlockSlow(new) + } +} + +func (m *Mutex) unlockSlow(new int32) { if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } @@ -198,7 +210,7 @@ func (m *Mutex) Unlock() { // Grab the right to wake someone. new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { - runtime_Semrelease(&m.sema, false) + runtime_Semrelease(&m.sema, false, 1) return } old = m.state @@ -208,6 +220,6 @@ func (m *Mutex) Unlock() { // Note: mutexLocked is not set, the waiter will set it after wakeup. // But mutex is still considered locked if mutexStarving is set, // so new coming goroutines won't acquire it. - runtime_Semrelease(&m.sema, true) + runtime_Semrelease(&m.sema, true, 1) } } diff --git a/libgo/go/sync/mutex_test.go b/libgo/go/sync/mutex_test.go index 5214684..e61a853 100644 --- a/libgo/go/sync/mutex_test.go +++ b/libgo/go/sync/mutex_test.go @@ -21,7 +21,7 @@ import ( func HammerSemaphore(s *uint32, loops int, cdone chan bool) { for i := 0; i < loops; i++ { Runtime_Semacquire(s) - Runtime_Semrelease(s, false) + Runtime_Semrelease(s, false, 0) } cdone <- true } diff --git a/libgo/go/sync/once.go b/libgo/go/sync/once.go index d8ef952..8476197 100644 --- a/libgo/go/sync/once.go +++ b/libgo/go/sync/once.go @@ -10,8 +10,13 @@ import ( // Once is an object that will perform exactly one action. type Once struct { - m Mutex + // done indicates whether the action has been performed. + // It is first in the struct because it is used in the hot path. + // The hot path is inlined at every call site. + // Placing done first allows more compact instructions on some architectures (amd64/x86), + // and fewer instructions (to calculate offset) on other architectures. done uint32 + m Mutex } // Do calls the function f if and only if Do is being called for the @@ -33,10 +38,13 @@ type Once struct { // without calling f. // func (o *Once) Do(f func()) { - if atomic.LoadUint32(&o.done) == 1 { - return + if atomic.LoadUint32(&o.done) == 0 { + // Outlined slow-path to allow inlining of the fast-path. + o.doSlow(f) } - // Slow-path. +} + +func (o *Once) doSlow(f func()) { o.m.Lock() defer o.m.Unlock() if o.done == 0 { diff --git a/libgo/go/sync/pool.go b/libgo/go/sync/pool.go index e54f917..ca7afdb 100644 --- a/libgo/go/sync/pool.go +++ b/libgo/go/sync/pool.go @@ -47,6 +47,9 @@ type Pool struct { local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal localSize uintptr // size of the local array + victim unsafe.Pointer // local from previous cycle + victimSize uintptr // size of victims array + // New optionally specifies a function to generate // a value when Get would otherwise return nil. // It may not be changed concurrently with calls to Get. @@ -55,9 +58,8 @@ type Pool struct { // Local per-P Pool appendix. type poolLocalInternal struct { - private interface{} // Can be used only by the respective P. - shared []interface{} // Can be used by any P. - Mutex // Protects shared. + private interface{} // Can be used only by the respective P. + shared poolChain // Local P can pushHead/popHead; any P can popTail. } type poolLocal struct { @@ -97,17 +99,15 @@ func (p *Pool) Put(x interface{}) { race.ReleaseMerge(poolRaceAddr(x)) race.Disable() } - l := p.pin() + l, _ := p.pin() if l.private == nil { l.private = x x = nil } - runtime_procUnpin() if x != nil { - l.Lock() - l.shared = append(l.shared, x) - l.Unlock() + l.shared.pushHead(x) } + runtime_procUnpin() if race.Enabled { race.Enable() } @@ -125,22 +125,19 @@ func (p *Pool) Get() interface{} { if race.Enabled { race.Disable() } - l := p.pin() + l, pid := p.pin() x := l.private l.private = nil - runtime_procUnpin() if x == nil { - l.Lock() - last := len(l.shared) - 1 - if last >= 0 { - x = l.shared[last] - l.shared = l.shared[:last] - } - l.Unlock() + // Try to pop the head of the local shard. We prefer + // the head over the tail for temporal locality of + // reuse. + x, _ = l.shared.popHead() if x == nil { - x = p.getSlow() + x = p.getSlow(pid) } } + runtime_procUnpin() if race.Enabled { race.Enable() if x != nil { @@ -153,45 +150,63 @@ func (p *Pool) Get() interface{} { return x } -func (p *Pool) getSlow() (x interface{}) { +func (p *Pool) getSlow(pid int) interface{} { // See the comment in pin regarding ordering of the loads. size := atomic.LoadUintptr(&p.localSize) // load-acquire - local := p.local // load-consume + locals := p.local // load-consume // Try to steal one element from other procs. - pid := runtime_procPin() - runtime_procUnpin() for i := 0; i < int(size); i++ { - l := indexLocal(local, (pid+i+1)%int(size)) - l.Lock() - last := len(l.shared) - 1 - if last >= 0 { - x = l.shared[last] - l.shared = l.shared[:last] - l.Unlock() - break + l := indexLocal(locals, (pid+i+1)%int(size)) + if x, _ := l.shared.popTail(); x != nil { + return x } - l.Unlock() } - return x + + // Try the victim cache. We do this after attempting to steal + // from all primary caches because we want objects in the + // victim cache to age out if at all possible. + size = atomic.LoadUintptr(&p.victimSize) + if uintptr(pid) >= size { + return nil + } + locals = p.victim + l := indexLocal(locals, pid) + if x := l.private; x != nil { + l.private = nil + return x + } + for i := 0; i < int(size); i++ { + l := indexLocal(locals, (pid+i)%int(size)) + if x, _ := l.shared.popTail(); x != nil { + return x + } + } + + // Mark the victim cache as empty for future gets don't bother + // with it. + atomic.StoreUintptr(&p.victimSize, 0) + + return nil } -// pin pins the current goroutine to P, disables preemption and returns poolLocal pool for the P. +// pin pins the current goroutine to P, disables preemption and +// returns poolLocal pool for the P and the P's id. // Caller must call runtime_procUnpin() when done with the pool. -func (p *Pool) pin() *poolLocal { +func (p *Pool) pin() (*poolLocal, int) { pid := runtime_procPin() - // In pinSlow we store to localSize and then to local, here we load in opposite order. + // In pinSlow we store to local and then to localSize, here we load in opposite order. // Since we've disabled preemption, GC cannot happen in between. // Thus here we must observe local at least as large localSize. // We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness). s := atomic.LoadUintptr(&p.localSize) // load-acquire l := p.local // load-consume if uintptr(pid) < s { - return indexLocal(l, pid) + return indexLocal(l, pid), pid } return p.pinSlow() } -func (p *Pool) pinSlow() *poolLocal { +func (p *Pool) pinSlow() (*poolLocal, int) { // Retry under the mutex. // Can not lock the mutex while pinned. runtime_procUnpin() @@ -202,7 +217,7 @@ func (p *Pool) pinSlow() *poolLocal { s := p.localSize l := p.local if uintptr(pid) < s { - return indexLocal(l, pid) + return indexLocal(l, pid), pid } if p.local == nil { allPools = append(allPools, p) @@ -212,35 +227,46 @@ func (p *Pool) pinSlow() *poolLocal { local := make([]poolLocal, size) atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release - return &local[pid] + return &local[pid], pid } func poolCleanup() { // This function is called with the world stopped, at the beginning of a garbage collection. // It must not allocate and probably should not call any runtime functions. - // Defensively zero out everything, 2 reasons: - // 1. To prevent false retention of whole Pools. - // 2. If GC happens while a goroutine works with l.shared in Put/Get, - // it will retain whole Pool. So next cycle memory consumption would be doubled. - for i, p := range allPools { - allPools[i] = nil - for i := 0; i < int(p.localSize); i++ { - l := indexLocal(p.local, i) - l.private = nil - for j := range l.shared { - l.shared[j] = nil - } - l.shared = nil - } + + // Because the world is stopped, no pool user can be in a + // pinned section (in effect, this has all Ps pinned). + + // Drop victim caches from all pools. + for _, p := range oldPools { + p.victim = nil + p.victimSize = 0 + } + + // Move primary cache to victim cache. + for _, p := range allPools { + p.victim = p.local + p.victimSize = p.localSize p.local = nil p.localSize = 0 } - allPools = []*Pool{} + + // The pools with non-empty primary caches now have non-empty + // victim caches and no pools have primary caches. + oldPools, allPools = allPools, nil } var ( allPoolsMu Mutex - allPools []*Pool + + // allPools is the set of pools that have non-empty primary + // caches. Protected by either 1) allPoolsMu and pinning or 2) + // STW. + allPools []*Pool + + // oldPools is the set of pools that may have non-empty victim + // caches. Protected by STW. + oldPools []*Pool ) func init() { diff --git a/libgo/go/sync/pool_test.go b/libgo/go/sync/pool_test.go index dad2f99..7e175a9 100644 --- a/libgo/go/sync/pool_test.go +++ b/libgo/go/sync/pool_test.go @@ -10,6 +10,7 @@ package sync_test import ( "runtime" "runtime/debug" + "sort" . "sync" "sync/atomic" "testing" @@ -40,11 +41,20 @@ func TestPool(t *testing.T) { } Runtime_procUnpin() - p.Put("c") - debug.SetGCPercent(100) // to allow following GC to actually run + // Put in a large number of objects so they spill into + // stealable space. + for i := 0; i < 100; i++ { + p.Put("c") + } + // After one GC, the victim cache should keep them alive. + runtime.GC() + if g := p.Get(); g != "c" { + t.Fatalf("got %#v; want c after GC", g) + } + // A second GC should drop the victim cache. runtime.GC() if g := p.Get(); g != nil { - t.Fatalf("got %#v; want nil after GC", g) + t.Fatalf("got %#v; want nil after second GC", g) } } @@ -96,6 +106,9 @@ func testPool(t *testing.T, drain bool) { const N = 100 loop: for try := 0; try < 3; try++ { + if try == 1 && testing.Short() { + break + } var fin, fin1 uint32 for i := 0; i < N; i++ { v := new(string) @@ -151,6 +164,86 @@ func TestPoolStress(t *testing.T) { } } +func TestPoolDequeue(t *testing.T) { + testPoolDequeue(t, NewPoolDequeue(16)) +} + +func TestPoolChain(t *testing.T) { + testPoolDequeue(t, NewPoolChain()) +} + +func testPoolDequeue(t *testing.T, d PoolDequeue) { + const P = 10 + // In long mode, do enough pushes to wrap around the 21-bit + // indexes. + N := 1<<21 + 1000 + if testing.Short() { + N = 1e3 + } + have := make([]int32, N) + var stop int32 + var wg WaitGroup + + // Start P-1 consumers. + for i := 1; i < P; i++ { + wg.Add(1) + go func() { + fail := 0 + for atomic.LoadInt32(&stop) == 0 { + val, ok := d.PopTail() + if ok { + fail = 0 + atomic.AddInt32(&have[val.(int)], 1) + if val.(int) == N-1 { + atomic.StoreInt32(&stop, 1) + } + } else { + // Speed up the test by + // allowing the pusher to run. + if fail++; fail%100 == 0 { + runtime.Gosched() + } + } + } + wg.Done() + }() + } + + // Start 1 producer. + nPopHead := 0 + wg.Add(1) + go func() { + for j := 0; j < N; j++ { + for !d.PushHead(j) { + // Allow a popper to run. + runtime.Gosched() + } + if j%10 == 0 { + val, ok := d.PopHead() + if ok { + nPopHead++ + atomic.AddInt32(&have[val.(int)], 1) + } + } + } + wg.Done() + }() + wg.Wait() + + // Check results. + for i, count := range have { + if count != 1 { + t.Errorf("expected have[%d] = 1, got %d", i, count) + } + } + if nPopHead == 0 { + // In theory it's possible in a valid schedule for + // popHead to never succeed, but in practice it almost + // always succeeds, so this is unlikely to flake. + t.Errorf("popHead never succeeded") + } +} + func BenchmarkPool(b *testing.B) { var p Pool b.RunParallel(func(pb *testing.PB) { @@ -174,3 +267,84 @@ func BenchmarkPoolOverflow(b *testing.B) { } }) } + +var globalSink interface{} + +func BenchmarkPoolSTW(b *testing.B) { + // Take control of GC. + defer debug.SetGCPercent(debug.SetGCPercent(-1)) + + var mstats runtime.MemStats + var pauses []uint64 + + var p Pool + for i := 0; i < b.N; i++ { + // Put a large number of items into a pool. + const N = 100000 + var item interface{} = 42 + for i := 0; i < N; i++ { + p.Put(item) + } + // Do a GC. + runtime.GC() + // Record pause time. + runtime.ReadMemStats(&mstats) + pauses = append(pauses, mstats.PauseNs[(mstats.NumGC+255)%256]) + } + + // Get pause time stats. + sort.Slice(pauses, func(i, j int) bool { return pauses[i] < pauses[j] }) + var total uint64 + for _, ns := range pauses { + total += ns + } + // ns/op for this benchmark is average STW time. + b.ReportMetric(float64(total)/float64(b.N), "ns/op") + b.ReportMetric(float64(pauses[len(pauses)*95/100]), "p95-ns/STW") + b.ReportMetric(float64(pauses[len(pauses)*50/100]), "p50-ns/STW") +} + +func BenchmarkPoolExpensiveNew(b *testing.B) { + // Populate a pool with items that are expensive to construct + // to stress pool cleanup and subsequent reconstruction. + + // Create a ballast so the GC has a non-zero heap size and + // runs at reasonable times. + globalSink = make([]byte, 8<<20) + defer func() { globalSink = nil }() + + // Create a pool that's "expensive" to fill. + var p Pool + var nNew uint64 + p.New = func() interface{} { + atomic.AddUint64(&nNew, 1) + time.Sleep(time.Millisecond) + return 42 + } + var mstats1, mstats2 runtime.MemStats + runtime.ReadMemStats(&mstats1) + b.RunParallel(func(pb *testing.PB) { + // Simulate 100X the number of goroutines having items + // checked out from the Pool simultaneously. + items := make([]interface{}, 100) + var sink []byte + for pb.Next() { + // Stress the pool. + for i := range items { + items[i] = p.Get() + // Simulate doing some work with this + // item checked out. + sink = make([]byte, 32<<10) + } + for i, v := range items { + p.Put(v) + items[i] = nil + } + } + _ = sink + }) + runtime.ReadMemStats(&mstats2) + + b.ReportMetric(float64(mstats2.NumGC-mstats1.NumGC)/float64(b.N), "GCs/op") + b.ReportMetric(float64(nNew)/float64(b.N), "New/op") +} diff --git a/libgo/go/sync/poolqueue.go b/libgo/go/sync/poolqueue.go new file mode 100644 index 0000000..22f7496 --- /dev/null +++ b/libgo/go/sync/poolqueue.go @@ -0,0 +1,309 @@ +// Copyright 2019 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package sync + +import ( + "sync/atomic" + "unsafe" +) + +// poolDequeue is a lock-free fixed-size single-producer, +// multi-consumer queue. The single producer can both push and pop +// from the head, and consumers can pop from the tail. +// +// It has the added feature that it nils out unused slots to avoid +// unnecessary retention of objects. This is important for sync.Pool, +// but not typically a property considered in the literature. +type poolDequeue struct { + // headTail packs together a 32-bit head index and a 32-bit + // tail index. Both are indexes into vals modulo len(vals)-1. + // + // tail = index of oldest data in queue + // head = index of next slot to fill + // + // Slots in the range [tail, head) are owned by consumers. + // A consumer continues to own a slot outside this range until + // it nils the slot, at which point ownership passes to the + // producer. + // + // The head index is stored in the most-significant bits so + // that we can atomically add to it and the overflow is + // harmless. + headTail uint64 + + // vals is a ring buffer of interface{} values stored in this + // dequeue. The size of this must be a power of 2. + // + // vals[i].typ is nil if the slot is empty and non-nil + // otherwise. A slot is still in use until *both* the tail + // index has moved beyond it and typ has been set to nil. This + // is set to nil atomically by the consumer and read + // atomically by the producer. + vals []eface +} + +type eface struct { + typ, val unsafe.Pointer +} + +const dequeueBits = 32 + +// dequeueLimit is the maximum size of a poolDequeue. +// +// This must be at most (1<<dequeueBits)/2 because detecting fullness +// depends on wrapping around the ring buffer without wrapping around +// the index. We divide by 4 so this fits in an int on 32-bit. +const dequeueLimit = (1 << dequeueBits) / 4 + +// dequeueNil is used in poolDeqeue to represent interface{}(nil). +// Since we use nil to represent empty slots, we need a sentinel value +// to represent nil. +type dequeueNil *struct{} + +func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) { + const mask = 1<<dequeueBits - 1 + head = uint32((ptrs >> dequeueBits) & mask) + tail = uint32(ptrs & mask) + return +} + +func (d *poolDequeue) pack(head, tail uint32) uint64 { + const mask = 1<<dequeueBits - 1 + return (uint64(head) << dequeueBits) | + uint64(tail&mask) +} + +// pushHead adds val at the head of the queue. It returns false if the +// queue is full. It must only be called by a single producer. +func (d *poolDequeue) pushHead(val interface{}) bool { + ptrs := atomic.LoadUint64(&d.headTail) + head, tail := d.unpack(ptrs) + if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head { + // Queue is full. + return false + } + slot := &d.vals[head&uint32(len(d.vals)-1)] + + // Check if the head slot has been released by popTail. + typ := atomic.LoadPointer(&slot.typ) + if typ != nil { + // Another goroutine is still cleaning up the tail, so + // the queue is actually still full. + return false + } + + // The head slot is free, so we own it. + if val == nil { + val = dequeueNil(nil) + } + *(*interface{})(unsafe.Pointer(slot)) = val + + // Increment head. This passes ownership of slot to popTail + // and acts as a store barrier for writing the slot. + atomic.AddUint64(&d.headTail, 1<<dequeueBits) + return true +} + +// popHead removes and returns the element at the head of the queue. +// It returns false if the queue is empty. It must only be called by a +// single producer. +func (d *poolDequeue) popHead() (interface{}, bool) { + var slot *eface + for { + ptrs := atomic.LoadUint64(&d.headTail) + head, tail := d.unpack(ptrs) + if tail == head { + // Queue is empty. + return nil, false + } + + // Confirm tail and decrement head. We do this before + // reading the value to take back ownership of this + // slot. + head-- + ptrs2 := d.pack(head, tail) + if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { + // We successfully took back slot. + slot = &d.vals[head&uint32(len(d.vals)-1)] + break + } + } + + val := *(*interface{})(unsafe.Pointer(slot)) + if val == dequeueNil(nil) { + val = nil + } + // Zero the slot. Unlike popTail, this isn't racing with + // pushHead, so we don't need to be careful here. + *slot = eface{} + return val, true +} + +// popTail removes and returns the element at the tail of the queue. +// It returns false if the queue is empty. It may be called by any +// number of consumers. +func (d *poolDequeue) popTail() (interface{}, bool) { + var slot *eface + for { + ptrs := atomic.LoadUint64(&d.headTail) + head, tail := d.unpack(ptrs) + if tail == head { + // Queue is empty. + return nil, false + } + + // Confirm head and tail (for our speculative check + // above) and increment tail. If this succeeds, then + // we own the slot at tail. + ptrs2 := d.pack(head, tail+1) + if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) { + // Success. + slot = &d.vals[tail&uint32(len(d.vals)-1)] + break + } + } + + // We now own slot. + val := *(*interface{})(unsafe.Pointer(slot)) + if val == dequeueNil(nil) { + val = nil + } + + // Tell pushHead that we're done with this slot. Zeroing the + // slot is also important so we don't leave behind references + // that could keep this object live longer than necessary. + // + // We write to val first and then publish that we're done with + // this slot by atomically writing to typ. + slot.val = nil + atomic.StorePointer(&slot.typ, nil) + // At this point pushHead owns the slot. + + return val, true +} + +// poolChain is a dynamically-sized version of poolDequeue. +// +// This is implemented as a doubly-linked list queue of poolDequeues +// where each dequeue is double the size of the previous one. Once a +// dequeue fills up, this allocates a new one and only ever pushes to +// the latest dequeue. Pops happen from the other end of the list and +// once a dequeue is exhausted, it gets removed from the list. +type poolChain struct { + // head is the poolDequeue to push to. This is only accessed + // by the producer, so doesn't need to be synchronized. + head *poolChainElt + + // tail is the poolDequeue to popTail from. This is accessed + // by consumers, so reads and writes must be atomic. + tail *poolChainElt +} + +type poolChainElt struct { + poolDequeue + + // next and prev link to the adjacent poolChainElts in this + // poolChain. + // + // next is written atomically by the producer and read + // atomically by the consumer. It only transitions from nil to + // non-nil. + // + // prev is written atomically by the consumer and read + // atomically by the producer. It only transitions from + // non-nil to nil. + next, prev *poolChainElt +} + +func storePoolChainElt(pp **poolChainElt, v *poolChainElt) { + atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(pp)), unsafe.Pointer(v)) +} + +func loadPoolChainElt(pp **poolChainElt) *poolChainElt { + return (*poolChainElt)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(pp)))) +} + +func (c *poolChain) pushHead(val interface{}) { + d := c.head + if d == nil { + // Initialize the chain. + const initSize = 8 // Must be a power of 2 + d = new(poolChainElt) + d.vals = make([]eface, initSize) + c.head = d + storePoolChainElt(&c.tail, d) + } + + if d.pushHead(val) { + return + } + + // The current dequeue is full. Allocate a new one of twice + // the size. + newSize := len(d.vals) * 2 + if newSize >= dequeueLimit { + // Can't make it any bigger. + newSize = dequeueLimit + } + + d2 := &poolChainElt{prev: d} + d2.vals = make([]eface, newSize) + c.head = d2 + storePoolChainElt(&d.next, d2) + d2.pushHead(val) +} + +func (c *poolChain) popHead() (interface{}, bool) { + d := c.head + for d != nil { + if val, ok := d.popHead(); ok { + return val, ok + } + // There may still be unconsumed elements in the + // previous dequeue, so try backing up. + d = loadPoolChainElt(&d.prev) + } + return nil, false +} + +func (c *poolChain) popTail() (interface{}, bool) { + d := loadPoolChainElt(&c.tail) + if d == nil { + return nil, false + } + + for { + // It's important that we load the next pointer + // *before* popping the tail. In general, d may be + // transiently empty, but if next is non-nil before + // the pop and the pop fails, then d is permanently + // empty, which is the only condition under which it's + // safe to drop d from the chain. + d2 := loadPoolChainElt(&d.next) + + if val, ok := d.popTail(); ok { + return val, ok + } + + if d2 == nil { + // This is the only dequeue. It's empty right + // now, but could be pushed to in the future. + return nil, false + } + + // The tail of the chain has been drained, so move on + // to the next dequeue. Try to drop it from the chain + // so the next pop doesn't have to look at the empty + // dequeue again. + if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) { + // We won the race. Clear the prev pointer so + // the garbage collector can collect the empty + // dequeue and so popHead doesn't back up + // further than necessary. + storePoolChainElt(&d2.prev, nil) + } + d = d2 + } +} diff --git a/libgo/go/sync/runtime.go b/libgo/go/sync/runtime.go index b6b9e48..3ad44e7 100644 --- a/libgo/go/sync/runtime.go +++ b/libgo/go/sync/runtime.go @@ -15,14 +15,18 @@ func runtime_Semacquire(s *uint32) // SemacquireMutex is like Semacquire, but for profiling contended Mutexes. // If lifo is true, queue waiter at the head of wait queue. -func runtime_SemacquireMutex(s *uint32, lifo bool) +// skipframes is the number of frames to omit during tracing, counting from +// runtime_SemacquireMutex's caller. +func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int) // Semrelease atomically increments *s and notifies a waiting goroutine // if one is blocked in Semacquire. // It is intended as a simple wakeup primitive for use by the synchronization // library and should not be used directly. // If handoff is true, pass count directly to the first waiter. -func runtime_Semrelease(s *uint32, handoff bool) +// skipframes is the number of frames to omit during tracing, counting from +// runtime_Semrelease's caller. +func runtime_Semrelease(s *uint32, handoff bool, skipframes int) // Approximation of notifyList in runtime/sema.go. Size and alignment must // agree. diff --git a/libgo/go/sync/runtime_sema_test.go b/libgo/go/sync/runtime_sema_test.go index a680847..152cf0e 100644 --- a/libgo/go/sync/runtime_sema_test.go +++ b/libgo/go/sync/runtime_sema_test.go @@ -18,7 +18,7 @@ func BenchmarkSemaUncontended(b *testing.B) { b.RunParallel(func(pb *testing.PB) { sem := new(PaddedSem) for pb.Next() { - Runtime_Semrelease(&sem.sem, false) + Runtime_Semrelease(&sem.sem, false, 0) Runtime_Semacquire(&sem.sem) } }) @@ -44,7 +44,7 @@ func benchmarkSema(b *testing.B, block, work bool) { b.RunParallel(func(pb *testing.PB) { foo := 0 for pb.Next() { - Runtime_Semrelease(&sem, false) + Runtime_Semrelease(&sem, false, 0) if work { for i := 0; i < 100; i++ { foo *= 2 @@ -54,7 +54,7 @@ func benchmarkSema(b *testing.B, block, work bool) { Runtime_Semacquire(&sem) } _ = foo - Runtime_Semrelease(&sem, false) + Runtime_Semrelease(&sem, false, 0) }) } diff --git a/libgo/go/sync/rwmutex.go b/libgo/go/sync/rwmutex.go index 16a2f92..dc0faf6 100644 --- a/libgo/go/sync/rwmutex.go +++ b/libgo/go/sync/rwmutex.go @@ -47,7 +47,7 @@ func (rw *RWMutex) RLock() { } if atomic.AddInt32(&rw.readerCount, 1) < 0 { // A writer is pending, wait for it. - runtime_SemacquireMutex(&rw.readerSem, false) + runtime_SemacquireMutex(&rw.readerSem, false, 0) } if race.Enabled { race.Enable() @@ -66,21 +66,26 @@ func (rw *RWMutex) RUnlock() { race.Disable() } if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { - if r+1 == 0 || r+1 == -rwmutexMaxReaders { - race.Enable() - throw("sync: RUnlock of unlocked RWMutex") - } - // A writer is pending. - if atomic.AddInt32(&rw.readerWait, -1) == 0 { - // The last reader unblocks the writer. - runtime_Semrelease(&rw.writerSem, false) - } + // Outlined slow-path to allow the fast-path to be inlined + rw.rUnlockSlow(r) } if race.Enabled { race.Enable() } } +func (rw *RWMutex) rUnlockSlow(r int32) { + if r+1 == 0 || r+1 == -rwmutexMaxReaders { + race.Enable() + throw("sync: RUnlock of unlocked RWMutex") + } + // A writer is pending. + if atomic.AddInt32(&rw.readerWait, -1) == 0 { + // The last reader unblocks the writer. + runtime_Semrelease(&rw.writerSem, false, 1) + } +} + // Lock locks rw for writing. // If the lock is already locked for reading or writing, // Lock blocks until the lock is available. @@ -95,7 +100,7 @@ func (rw *RWMutex) Lock() { r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders // Wait for active readers. if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { - runtime_SemacquireMutex(&rw.writerSem, false) + runtime_SemacquireMutex(&rw.writerSem, false, 0) } if race.Enabled { race.Enable() @@ -125,7 +130,7 @@ func (rw *RWMutex) Unlock() { } // Unblock blocked readers, if any. for i := 0; i < int(r); i++ { - runtime_Semrelease(&rw.readerSem, false) + runtime_Semrelease(&rw.readerSem, false, 0) } // Allow other writers to proceed. rw.w.Unlock() diff --git a/libgo/go/sync/waitgroup.go b/libgo/go/sync/waitgroup.go index 99dd400..e81a493 100644 --- a/libgo/go/sync/waitgroup.go +++ b/libgo/go/sync/waitgroup.go @@ -90,7 +90,7 @@ func (wg *WaitGroup) Add(delta int) { // Reset waiters count to 0. *statep = 0 for ; w != 0; w-- { - runtime_Semrelease(semap, false) + runtime_Semrelease(semap, false, 0) } } |