aboutsummaryrefslogtreecommitdiff
path: root/libgo/go/sync
diff options
context:
space:
mode:
authorIan Lance Taylor <iant@golang.org>2019-09-06 18:12:46 +0000
committerIan Lance Taylor <ian@gcc.gnu.org>2019-09-06 18:12:46 +0000
commitaa8901e9bb0399d2c16f988ba2fe46eb0c0c5d13 (patch)
tree7e63b06d1eec92beec6997c9d3ab47a5d6a835be /libgo/go/sync
parent920ea3b8ba3164b61ac9490dfdfceb6936eda6dd (diff)
downloadgcc-aa8901e9bb0399d2c16f988ba2fe46eb0c0c5d13.zip
gcc-aa8901e9bb0399d2c16f988ba2fe46eb0c0c5d13.tar.gz
gcc-aa8901e9bb0399d2c16f988ba2fe46eb0c0c5d13.tar.bz2
libgo: update to Go 1.13beta1 release
Reviewed-on: https://go-review.googlesource.com/c/gofrontend/+/193497 From-SVN: r275473
Diffstat (limited to 'libgo/go/sync')
-rw-r--r--libgo/go/sync/export_test.go42
-rw-r--r--libgo/go/sync/mutex.go18
-rw-r--r--libgo/go/sync/mutex_test.go2
-rw-r--r--libgo/go/sync/once.go16
-rw-r--r--libgo/go/sync/pool.go136
-rw-r--r--libgo/go/sync/pool_test.go180
-rw-r--r--libgo/go/sync/poolqueue.go309
-rw-r--r--libgo/go/sync/runtime.go8
-rw-r--r--libgo/go/sync/runtime_sema_test.go6
-rw-r--r--libgo/go/sync/rwmutex.go29
-rw-r--r--libgo/go/sync/waitgroup.go2
11 files changed, 664 insertions, 84 deletions
diff --git a/libgo/go/sync/export_test.go b/libgo/go/sync/export_test.go
index 669076e..10d3599 100644
--- a/libgo/go/sync/export_test.go
+++ b/libgo/go/sync/export_test.go
@@ -9,3 +9,45 @@ var Runtime_Semacquire = runtime_Semacquire
var Runtime_Semrelease = runtime_Semrelease
var Runtime_procPin = runtime_procPin
var Runtime_procUnpin = runtime_procUnpin
+
+// poolDequeue testing.
+type PoolDequeue interface {
+ PushHead(val interface{}) bool
+ PopHead() (interface{}, bool)
+ PopTail() (interface{}, bool)
+}
+
+func NewPoolDequeue(n int) PoolDequeue {
+ return &poolDequeue{
+ vals: make([]eface, n),
+ }
+}
+
+func (d *poolDequeue) PushHead(val interface{}) bool {
+ return d.pushHead(val)
+}
+
+func (d *poolDequeue) PopHead() (interface{}, bool) {
+ return d.popHead()
+}
+
+func (d *poolDequeue) PopTail() (interface{}, bool) {
+ return d.popTail()
+}
+
+func NewPoolChain() PoolDequeue {
+ return new(poolChain)
+}
+
+func (c *poolChain) PushHead(val interface{}) bool {
+ c.pushHead(val)
+ return true
+}
+
+func (c *poolChain) PopHead() (interface{}, bool) {
+ return c.popHead()
+}
+
+func (c *poolChain) PopTail() (interface{}, bool) {
+ return c.popTail()
+}
diff --git a/libgo/go/sync/mutex.go b/libgo/go/sync/mutex.go
index 4c5582c..11ad20c 100644
--- a/libgo/go/sync/mutex.go
+++ b/libgo/go/sync/mutex.go
@@ -77,7 +77,11 @@ func (m *Mutex) Lock() {
}
return
}
+ // Slow path (outlined so that the fast path can be inlined)
+ m.lockSlow()
+}
+func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
@@ -131,7 +135,7 @@ func (m *Mutex) Lock() {
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
- runtime_SemacquireMutex(&m.sema, queueLifo)
+ runtime_SemacquireMutex(&m.sema, queueLifo, 1)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
@@ -180,6 +184,14 @@ func (m *Mutex) Unlock() {
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
+ if new != 0 {
+ // Outlined slow path to allow inlining the fast path.
+ // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
+ m.unlockSlow(new)
+ }
+}
+
+func (m *Mutex) unlockSlow(new int32) {
if (new+mutexLocked)&mutexLocked == 0 {
throw("sync: unlock of unlocked mutex")
}
@@ -198,7 +210,7 @@ func (m *Mutex) Unlock() {
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
- runtime_Semrelease(&m.sema, false)
+ runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
@@ -208,6 +220,6 @@ func (m *Mutex) Unlock() {
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
- runtime_Semrelease(&m.sema, true)
+ runtime_Semrelease(&m.sema, true, 1)
}
}
diff --git a/libgo/go/sync/mutex_test.go b/libgo/go/sync/mutex_test.go
index 5214684..e61a853 100644
--- a/libgo/go/sync/mutex_test.go
+++ b/libgo/go/sync/mutex_test.go
@@ -21,7 +21,7 @@ import (
func HammerSemaphore(s *uint32, loops int, cdone chan bool) {
for i := 0; i < loops; i++ {
Runtime_Semacquire(s)
- Runtime_Semrelease(s, false)
+ Runtime_Semrelease(s, false, 0)
}
cdone <- true
}
diff --git a/libgo/go/sync/once.go b/libgo/go/sync/once.go
index d8ef952..8476197 100644
--- a/libgo/go/sync/once.go
+++ b/libgo/go/sync/once.go
@@ -10,8 +10,13 @@ import (
// Once is an object that will perform exactly one action.
type Once struct {
- m Mutex
+ // done indicates whether the action has been performed.
+ // It is first in the struct because it is used in the hot path.
+ // The hot path is inlined at every call site.
+ // Placing done first allows more compact instructions on some architectures (amd64/x86),
+ // and fewer instructions (to calculate offset) on other architectures.
done uint32
+ m Mutex
}
// Do calls the function f if and only if Do is being called for the
@@ -33,10 +38,13 @@ type Once struct {
// without calling f.
//
func (o *Once) Do(f func()) {
- if atomic.LoadUint32(&o.done) == 1 {
- return
+ if atomic.LoadUint32(&o.done) == 0 {
+ // Outlined slow-path to allow inlining of the fast-path.
+ o.doSlow(f)
}
- // Slow-path.
+}
+
+func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done == 0 {
diff --git a/libgo/go/sync/pool.go b/libgo/go/sync/pool.go
index e54f917..ca7afdb 100644
--- a/libgo/go/sync/pool.go
+++ b/libgo/go/sync/pool.go
@@ -47,6 +47,9 @@ type Pool struct {
local unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
localSize uintptr // size of the local array
+ victim unsafe.Pointer // local from previous cycle
+ victimSize uintptr // size of victims array
+
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
@@ -55,9 +58,8 @@ type Pool struct {
// Local per-P Pool appendix.
type poolLocalInternal struct {
- private interface{} // Can be used only by the respective P.
- shared []interface{} // Can be used by any P.
- Mutex // Protects shared.
+ private interface{} // Can be used only by the respective P.
+ shared poolChain // Local P can pushHead/popHead; any P can popTail.
}
type poolLocal struct {
@@ -97,17 +99,15 @@ func (p *Pool) Put(x interface{}) {
race.ReleaseMerge(poolRaceAddr(x))
race.Disable()
}
- l := p.pin()
+ l, _ := p.pin()
if l.private == nil {
l.private = x
x = nil
}
- runtime_procUnpin()
if x != nil {
- l.Lock()
- l.shared = append(l.shared, x)
- l.Unlock()
+ l.shared.pushHead(x)
}
+ runtime_procUnpin()
if race.Enabled {
race.Enable()
}
@@ -125,22 +125,19 @@ func (p *Pool) Get() interface{} {
if race.Enabled {
race.Disable()
}
- l := p.pin()
+ l, pid := p.pin()
x := l.private
l.private = nil
- runtime_procUnpin()
if x == nil {
- l.Lock()
- last := len(l.shared) - 1
- if last >= 0 {
- x = l.shared[last]
- l.shared = l.shared[:last]
- }
- l.Unlock()
+ // Try to pop the head of the local shard. We prefer
+ // the head over the tail for temporal locality of
+ // reuse.
+ x, _ = l.shared.popHead()
if x == nil {
- x = p.getSlow()
+ x = p.getSlow(pid)
}
}
+ runtime_procUnpin()
if race.Enabled {
race.Enable()
if x != nil {
@@ -153,45 +150,63 @@ func (p *Pool) Get() interface{} {
return x
}
-func (p *Pool) getSlow() (x interface{}) {
+func (p *Pool) getSlow(pid int) interface{} {
// See the comment in pin regarding ordering of the loads.
size := atomic.LoadUintptr(&p.localSize) // load-acquire
- local := p.local // load-consume
+ locals := p.local // load-consume
// Try to steal one element from other procs.
- pid := runtime_procPin()
- runtime_procUnpin()
for i := 0; i < int(size); i++ {
- l := indexLocal(local, (pid+i+1)%int(size))
- l.Lock()
- last := len(l.shared) - 1
- if last >= 0 {
- x = l.shared[last]
- l.shared = l.shared[:last]
- l.Unlock()
- break
+ l := indexLocal(locals, (pid+i+1)%int(size))
+ if x, _ := l.shared.popTail(); x != nil {
+ return x
}
- l.Unlock()
}
- return x
+
+ // Try the victim cache. We do this after attempting to steal
+ // from all primary caches because we want objects in the
+ // victim cache to age out if at all possible.
+ size = atomic.LoadUintptr(&p.victimSize)
+ if uintptr(pid) >= size {
+ return nil
+ }
+ locals = p.victim
+ l := indexLocal(locals, pid)
+ if x := l.private; x != nil {
+ l.private = nil
+ return x
+ }
+ for i := 0; i < int(size); i++ {
+ l := indexLocal(locals, (pid+i)%int(size))
+ if x, _ := l.shared.popTail(); x != nil {
+ return x
+ }
+ }
+
+ // Mark the victim cache as empty for future gets don't bother
+ // with it.
+ atomic.StoreUintptr(&p.victimSize, 0)
+
+ return nil
}
-// pin pins the current goroutine to P, disables preemption and returns poolLocal pool for the P.
+// pin pins the current goroutine to P, disables preemption and
+// returns poolLocal pool for the P and the P's id.
// Caller must call runtime_procUnpin() when done with the pool.
-func (p *Pool) pin() *poolLocal {
+func (p *Pool) pin() (*poolLocal, int) {
pid := runtime_procPin()
- // In pinSlow we store to localSize and then to local, here we load in opposite order.
+ // In pinSlow we store to local and then to localSize, here we load in opposite order.
// Since we've disabled preemption, GC cannot happen in between.
// Thus here we must observe local at least as large localSize.
// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
s := atomic.LoadUintptr(&p.localSize) // load-acquire
l := p.local // load-consume
if uintptr(pid) < s {
- return indexLocal(l, pid)
+ return indexLocal(l, pid), pid
}
return p.pinSlow()
}
-func (p *Pool) pinSlow() *poolLocal {
+func (p *Pool) pinSlow() (*poolLocal, int) {
// Retry under the mutex.
// Can not lock the mutex while pinned.
runtime_procUnpin()
@@ -202,7 +217,7 @@ func (p *Pool) pinSlow() *poolLocal {
s := p.localSize
l := p.local
if uintptr(pid) < s {
- return indexLocal(l, pid)
+ return indexLocal(l, pid), pid
}
if p.local == nil {
allPools = append(allPools, p)
@@ -212,35 +227,46 @@ func (p *Pool) pinSlow() *poolLocal {
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release
- return &local[pid]
+ return &local[pid], pid
}
func poolCleanup() {
// This function is called with the world stopped, at the beginning of a garbage collection.
// It must not allocate and probably should not call any runtime functions.
- // Defensively zero out everything, 2 reasons:
- // 1. To prevent false retention of whole Pools.
- // 2. If GC happens while a goroutine works with l.shared in Put/Get,
- // it will retain whole Pool. So next cycle memory consumption would be doubled.
- for i, p := range allPools {
- allPools[i] = nil
- for i := 0; i < int(p.localSize); i++ {
- l := indexLocal(p.local, i)
- l.private = nil
- for j := range l.shared {
- l.shared[j] = nil
- }
- l.shared = nil
- }
+
+ // Because the world is stopped, no pool user can be in a
+ // pinned section (in effect, this has all Ps pinned).
+
+ // Drop victim caches from all pools.
+ for _, p := range oldPools {
+ p.victim = nil
+ p.victimSize = 0
+ }
+
+ // Move primary cache to victim cache.
+ for _, p := range allPools {
+ p.victim = p.local
+ p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
- allPools = []*Pool{}
+
+ // The pools with non-empty primary caches now have non-empty
+ // victim caches and no pools have primary caches.
+ oldPools, allPools = allPools, nil
}
var (
allPoolsMu Mutex
- allPools []*Pool
+
+ // allPools is the set of pools that have non-empty primary
+ // caches. Protected by either 1) allPoolsMu and pinning or 2)
+ // STW.
+ allPools []*Pool
+
+ // oldPools is the set of pools that may have non-empty victim
+ // caches. Protected by STW.
+ oldPools []*Pool
)
func init() {
diff --git a/libgo/go/sync/pool_test.go b/libgo/go/sync/pool_test.go
index dad2f99..7e175a9 100644
--- a/libgo/go/sync/pool_test.go
+++ b/libgo/go/sync/pool_test.go
@@ -10,6 +10,7 @@ package sync_test
import (
"runtime"
"runtime/debug"
+ "sort"
. "sync"
"sync/atomic"
"testing"
@@ -40,11 +41,20 @@ func TestPool(t *testing.T) {
}
Runtime_procUnpin()
- p.Put("c")
- debug.SetGCPercent(100) // to allow following GC to actually run
+ // Put in a large number of objects so they spill into
+ // stealable space.
+ for i := 0; i < 100; i++ {
+ p.Put("c")
+ }
+ // After one GC, the victim cache should keep them alive.
+ runtime.GC()
+ if g := p.Get(); g != "c" {
+ t.Fatalf("got %#v; want c after GC", g)
+ }
+ // A second GC should drop the victim cache.
runtime.GC()
if g := p.Get(); g != nil {
- t.Fatalf("got %#v; want nil after GC", g)
+ t.Fatalf("got %#v; want nil after second GC", g)
}
}
@@ -96,6 +106,9 @@ func testPool(t *testing.T, drain bool) {
const N = 100
loop:
for try := 0; try < 3; try++ {
+ if try == 1 && testing.Short() {
+ break
+ }
var fin, fin1 uint32
for i := 0; i < N; i++ {
v := new(string)
@@ -151,6 +164,86 @@ func TestPoolStress(t *testing.T) {
}
}
+func TestPoolDequeue(t *testing.T) {
+ testPoolDequeue(t, NewPoolDequeue(16))
+}
+
+func TestPoolChain(t *testing.T) {
+ testPoolDequeue(t, NewPoolChain())
+}
+
+func testPoolDequeue(t *testing.T, d PoolDequeue) {
+ const P = 10
+ // In long mode, do enough pushes to wrap around the 21-bit
+ // indexes.
+ N := 1<<21 + 1000
+ if testing.Short() {
+ N = 1e3
+ }
+ have := make([]int32, N)
+ var stop int32
+ var wg WaitGroup
+
+ // Start P-1 consumers.
+ for i := 1; i < P; i++ {
+ wg.Add(1)
+ go func() {
+ fail := 0
+ for atomic.LoadInt32(&stop) == 0 {
+ val, ok := d.PopTail()
+ if ok {
+ fail = 0
+ atomic.AddInt32(&have[val.(int)], 1)
+ if val.(int) == N-1 {
+ atomic.StoreInt32(&stop, 1)
+ }
+ } else {
+ // Speed up the test by
+ // allowing the pusher to run.
+ if fail++; fail%100 == 0 {
+ runtime.Gosched()
+ }
+ }
+ }
+ wg.Done()
+ }()
+ }
+
+ // Start 1 producer.
+ nPopHead := 0
+ wg.Add(1)
+ go func() {
+ for j := 0; j < N; j++ {
+ for !d.PushHead(j) {
+ // Allow a popper to run.
+ runtime.Gosched()
+ }
+ if j%10 == 0 {
+ val, ok := d.PopHead()
+ if ok {
+ nPopHead++
+ atomic.AddInt32(&have[val.(int)], 1)
+ }
+ }
+ }
+ wg.Done()
+ }()
+ wg.Wait()
+
+ // Check results.
+ for i, count := range have {
+ if count != 1 {
+ t.Errorf("expected have[%d] = 1, got %d", i, count)
+ }
+ }
+ if nPopHead == 0 {
+ // In theory it's possible in a valid schedule for
+ // popHead to never succeed, but in practice it almost
+ // always succeeds, so this is unlikely to flake.
+ t.Errorf("popHead never succeeded")
+ }
+}
+
func BenchmarkPool(b *testing.B) {
var p Pool
b.RunParallel(func(pb *testing.PB) {
@@ -174,3 +267,84 @@ func BenchmarkPoolOverflow(b *testing.B) {
}
})
}
+
+var globalSink interface{}
+
+func BenchmarkPoolSTW(b *testing.B) {
+ // Take control of GC.
+ defer debug.SetGCPercent(debug.SetGCPercent(-1))
+
+ var mstats runtime.MemStats
+ var pauses []uint64
+
+ var p Pool
+ for i := 0; i < b.N; i++ {
+ // Put a large number of items into a pool.
+ const N = 100000
+ var item interface{} = 42
+ for i := 0; i < N; i++ {
+ p.Put(item)
+ }
+ // Do a GC.
+ runtime.GC()
+ // Record pause time.
+ runtime.ReadMemStats(&mstats)
+ pauses = append(pauses, mstats.PauseNs[(mstats.NumGC+255)%256])
+ }
+
+ // Get pause time stats.
+ sort.Slice(pauses, func(i, j int) bool { return pauses[i] < pauses[j] })
+ var total uint64
+ for _, ns := range pauses {
+ total += ns
+ }
+ // ns/op for this benchmark is average STW time.
+ b.ReportMetric(float64(total)/float64(b.N), "ns/op")
+ b.ReportMetric(float64(pauses[len(pauses)*95/100]), "p95-ns/STW")
+ b.ReportMetric(float64(pauses[len(pauses)*50/100]), "p50-ns/STW")
+}
+
+func BenchmarkPoolExpensiveNew(b *testing.B) {
+ // Populate a pool with items that are expensive to construct
+ // to stress pool cleanup and subsequent reconstruction.
+
+ // Create a ballast so the GC has a non-zero heap size and
+ // runs at reasonable times.
+ globalSink = make([]byte, 8<<20)
+ defer func() { globalSink = nil }()
+
+ // Create a pool that's "expensive" to fill.
+ var p Pool
+ var nNew uint64
+ p.New = func() interface{} {
+ atomic.AddUint64(&nNew, 1)
+ time.Sleep(time.Millisecond)
+ return 42
+ }
+ var mstats1, mstats2 runtime.MemStats
+ runtime.ReadMemStats(&mstats1)
+ b.RunParallel(func(pb *testing.PB) {
+ // Simulate 100X the number of goroutines having items
+ // checked out from the Pool simultaneously.
+ items := make([]interface{}, 100)
+ var sink []byte
+ for pb.Next() {
+ // Stress the pool.
+ for i := range items {
+ items[i] = p.Get()
+ // Simulate doing some work with this
+ // item checked out.
+ sink = make([]byte, 32<<10)
+ }
+ for i, v := range items {
+ p.Put(v)
+ items[i] = nil
+ }
+ }
+ _ = sink
+ })
+ runtime.ReadMemStats(&mstats2)
+
+ b.ReportMetric(float64(mstats2.NumGC-mstats1.NumGC)/float64(b.N), "GCs/op")
+ b.ReportMetric(float64(nNew)/float64(b.N), "New/op")
+}
diff --git a/libgo/go/sync/poolqueue.go b/libgo/go/sync/poolqueue.go
new file mode 100644
index 0000000..22f7496
--- /dev/null
+++ b/libgo/go/sync/poolqueue.go
@@ -0,0 +1,309 @@
+// Copyright 2019 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package sync
+
+import (
+ "sync/atomic"
+ "unsafe"
+)
+
+// poolDequeue is a lock-free fixed-size single-producer,
+// multi-consumer queue. The single producer can both push and pop
+// from the head, and consumers can pop from the tail.
+//
+// It has the added feature that it nils out unused slots to avoid
+// unnecessary retention of objects. This is important for sync.Pool,
+// but not typically a property considered in the literature.
+type poolDequeue struct {
+ // headTail packs together a 32-bit head index and a 32-bit
+ // tail index. Both are indexes into vals modulo len(vals)-1.
+ //
+ // tail = index of oldest data in queue
+ // head = index of next slot to fill
+ //
+ // Slots in the range [tail, head) are owned by consumers.
+ // A consumer continues to own a slot outside this range until
+ // it nils the slot, at which point ownership passes to the
+ // producer.
+ //
+ // The head index is stored in the most-significant bits so
+ // that we can atomically add to it and the overflow is
+ // harmless.
+ headTail uint64
+
+ // vals is a ring buffer of interface{} values stored in this
+ // dequeue. The size of this must be a power of 2.
+ //
+ // vals[i].typ is nil if the slot is empty and non-nil
+ // otherwise. A slot is still in use until *both* the tail
+ // index has moved beyond it and typ has been set to nil. This
+ // is set to nil atomically by the consumer and read
+ // atomically by the producer.
+ vals []eface
+}
+
+type eface struct {
+ typ, val unsafe.Pointer
+}
+
+const dequeueBits = 32
+
+// dequeueLimit is the maximum size of a poolDequeue.
+//
+// This must be at most (1<<dequeueBits)/2 because detecting fullness
+// depends on wrapping around the ring buffer without wrapping around
+// the index. We divide by 4 so this fits in an int on 32-bit.
+const dequeueLimit = (1 << dequeueBits) / 4
+
+// dequeueNil is used in poolDeqeue to represent interface{}(nil).
+// Since we use nil to represent empty slots, we need a sentinel value
+// to represent nil.
+type dequeueNil *struct{}
+
+func (d *poolDequeue) unpack(ptrs uint64) (head, tail uint32) {
+ const mask = 1<<dequeueBits - 1
+ head = uint32((ptrs >> dequeueBits) & mask)
+ tail = uint32(ptrs & mask)
+ return
+}
+
+func (d *poolDequeue) pack(head, tail uint32) uint64 {
+ const mask = 1<<dequeueBits - 1
+ return (uint64(head) << dequeueBits) |
+ uint64(tail&mask)
+}
+
+// pushHead adds val at the head of the queue. It returns false if the
+// queue is full. It must only be called by a single producer.
+func (d *poolDequeue) pushHead(val interface{}) bool {
+ ptrs := atomic.LoadUint64(&d.headTail)
+ head, tail := d.unpack(ptrs)
+ if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
+ // Queue is full.
+ return false
+ }
+ slot := &d.vals[head&uint32(len(d.vals)-1)]
+
+ // Check if the head slot has been released by popTail.
+ typ := atomic.LoadPointer(&slot.typ)
+ if typ != nil {
+ // Another goroutine is still cleaning up the tail, so
+ // the queue is actually still full.
+ return false
+ }
+
+ // The head slot is free, so we own it.
+ if val == nil {
+ val = dequeueNil(nil)
+ }
+ *(*interface{})(unsafe.Pointer(slot)) = val
+
+ // Increment head. This passes ownership of slot to popTail
+ // and acts as a store barrier for writing the slot.
+ atomic.AddUint64(&d.headTail, 1<<dequeueBits)
+ return true
+}
+
+// popHead removes and returns the element at the head of the queue.
+// It returns false if the queue is empty. It must only be called by a
+// single producer.
+func (d *poolDequeue) popHead() (interface{}, bool) {
+ var slot *eface
+ for {
+ ptrs := atomic.LoadUint64(&d.headTail)
+ head, tail := d.unpack(ptrs)
+ if tail == head {
+ // Queue is empty.
+ return nil, false
+ }
+
+ // Confirm tail and decrement head. We do this before
+ // reading the value to take back ownership of this
+ // slot.
+ head--
+ ptrs2 := d.pack(head, tail)
+ if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
+ // We successfully took back slot.
+ slot = &d.vals[head&uint32(len(d.vals)-1)]
+ break
+ }
+ }
+
+ val := *(*interface{})(unsafe.Pointer(slot))
+ if val == dequeueNil(nil) {
+ val = nil
+ }
+ // Zero the slot. Unlike popTail, this isn't racing with
+ // pushHead, so we don't need to be careful here.
+ *slot = eface{}
+ return val, true
+}
+
+// popTail removes and returns the element at the tail of the queue.
+// It returns false if the queue is empty. It may be called by any
+// number of consumers.
+func (d *poolDequeue) popTail() (interface{}, bool) {
+ var slot *eface
+ for {
+ ptrs := atomic.LoadUint64(&d.headTail)
+ head, tail := d.unpack(ptrs)
+ if tail == head {
+ // Queue is empty.
+ return nil, false
+ }
+
+ // Confirm head and tail (for our speculative check
+ // above) and increment tail. If this succeeds, then
+ // we own the slot at tail.
+ ptrs2 := d.pack(head, tail+1)
+ if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
+ // Success.
+ slot = &d.vals[tail&uint32(len(d.vals)-1)]
+ break
+ }
+ }
+
+ // We now own slot.
+ val := *(*interface{})(unsafe.Pointer(slot))
+ if val == dequeueNil(nil) {
+ val = nil
+ }
+
+ // Tell pushHead that we're done with this slot. Zeroing the
+ // slot is also important so we don't leave behind references
+ // that could keep this object live longer than necessary.
+ //
+ // We write to val first and then publish that we're done with
+ // this slot by atomically writing to typ.
+ slot.val = nil
+ atomic.StorePointer(&slot.typ, nil)
+ // At this point pushHead owns the slot.
+
+ return val, true
+}
+
+// poolChain is a dynamically-sized version of poolDequeue.
+//
+// This is implemented as a doubly-linked list queue of poolDequeues
+// where each dequeue is double the size of the previous one. Once a
+// dequeue fills up, this allocates a new one and only ever pushes to
+// the latest dequeue. Pops happen from the other end of the list and
+// once a dequeue is exhausted, it gets removed from the list.
+type poolChain struct {
+ // head is the poolDequeue to push to. This is only accessed
+ // by the producer, so doesn't need to be synchronized.
+ head *poolChainElt
+
+ // tail is the poolDequeue to popTail from. This is accessed
+ // by consumers, so reads and writes must be atomic.
+ tail *poolChainElt
+}
+
+type poolChainElt struct {
+ poolDequeue
+
+ // next and prev link to the adjacent poolChainElts in this
+ // poolChain.
+ //
+ // next is written atomically by the producer and read
+ // atomically by the consumer. It only transitions from nil to
+ // non-nil.
+ //
+ // prev is written atomically by the consumer and read
+ // atomically by the producer. It only transitions from
+ // non-nil to nil.
+ next, prev *poolChainElt
+}
+
+func storePoolChainElt(pp **poolChainElt, v *poolChainElt) {
+ atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(pp)), unsafe.Pointer(v))
+}
+
+func loadPoolChainElt(pp **poolChainElt) *poolChainElt {
+ return (*poolChainElt)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(pp))))
+}
+
+func (c *poolChain) pushHead(val interface{}) {
+ d := c.head
+ if d == nil {
+ // Initialize the chain.
+ const initSize = 8 // Must be a power of 2
+ d = new(poolChainElt)
+ d.vals = make([]eface, initSize)
+ c.head = d
+ storePoolChainElt(&c.tail, d)
+ }
+
+ if d.pushHead(val) {
+ return
+ }
+
+ // The current dequeue is full. Allocate a new one of twice
+ // the size.
+ newSize := len(d.vals) * 2
+ if newSize >= dequeueLimit {
+ // Can't make it any bigger.
+ newSize = dequeueLimit
+ }
+
+ d2 := &poolChainElt{prev: d}
+ d2.vals = make([]eface, newSize)
+ c.head = d2
+ storePoolChainElt(&d.next, d2)
+ d2.pushHead(val)
+}
+
+func (c *poolChain) popHead() (interface{}, bool) {
+ d := c.head
+ for d != nil {
+ if val, ok := d.popHead(); ok {
+ return val, ok
+ }
+ // There may still be unconsumed elements in the
+ // previous dequeue, so try backing up.
+ d = loadPoolChainElt(&d.prev)
+ }
+ return nil, false
+}
+
+func (c *poolChain) popTail() (interface{}, bool) {
+ d := loadPoolChainElt(&c.tail)
+ if d == nil {
+ return nil, false
+ }
+
+ for {
+ // It's important that we load the next pointer
+ // *before* popping the tail. In general, d may be
+ // transiently empty, but if next is non-nil before
+ // the pop and the pop fails, then d is permanently
+ // empty, which is the only condition under which it's
+ // safe to drop d from the chain.
+ d2 := loadPoolChainElt(&d.next)
+
+ if val, ok := d.popTail(); ok {
+ return val, ok
+ }
+
+ if d2 == nil {
+ // This is the only dequeue. It's empty right
+ // now, but could be pushed to in the future.
+ return nil, false
+ }
+
+ // The tail of the chain has been drained, so move on
+ // to the next dequeue. Try to drop it from the chain
+ // so the next pop doesn't have to look at the empty
+ // dequeue again.
+ if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
+ // We won the race. Clear the prev pointer so
+ // the garbage collector can collect the empty
+ // dequeue and so popHead doesn't back up
+ // further than necessary.
+ storePoolChainElt(&d2.prev, nil)
+ }
+ d = d2
+ }
+}
diff --git a/libgo/go/sync/runtime.go b/libgo/go/sync/runtime.go
index b6b9e48..3ad44e7 100644
--- a/libgo/go/sync/runtime.go
+++ b/libgo/go/sync/runtime.go
@@ -15,14 +15,18 @@ func runtime_Semacquire(s *uint32)
// SemacquireMutex is like Semacquire, but for profiling contended Mutexes.
// If lifo is true, queue waiter at the head of wait queue.
-func runtime_SemacquireMutex(s *uint32, lifo bool)
+// skipframes is the number of frames to omit during tracing, counting from
+// runtime_SemacquireMutex's caller.
+func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
// Semrelease atomically increments *s and notifies a waiting goroutine
// if one is blocked in Semacquire.
// It is intended as a simple wakeup primitive for use by the synchronization
// library and should not be used directly.
// If handoff is true, pass count directly to the first waiter.
-func runtime_Semrelease(s *uint32, handoff bool)
+// skipframes is the number of frames to omit during tracing, counting from
+// runtime_Semrelease's caller.
+func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
// Approximation of notifyList in runtime/sema.go. Size and alignment must
// agree.
diff --git a/libgo/go/sync/runtime_sema_test.go b/libgo/go/sync/runtime_sema_test.go
index a680847..152cf0e 100644
--- a/libgo/go/sync/runtime_sema_test.go
+++ b/libgo/go/sync/runtime_sema_test.go
@@ -18,7 +18,7 @@ func BenchmarkSemaUncontended(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
sem := new(PaddedSem)
for pb.Next() {
- Runtime_Semrelease(&sem.sem, false)
+ Runtime_Semrelease(&sem.sem, false, 0)
Runtime_Semacquire(&sem.sem)
}
})
@@ -44,7 +44,7 @@ func benchmarkSema(b *testing.B, block, work bool) {
b.RunParallel(func(pb *testing.PB) {
foo := 0
for pb.Next() {
- Runtime_Semrelease(&sem, false)
+ Runtime_Semrelease(&sem, false, 0)
if work {
for i := 0; i < 100; i++ {
foo *= 2
@@ -54,7 +54,7 @@ func benchmarkSema(b *testing.B, block, work bool) {
Runtime_Semacquire(&sem)
}
_ = foo
- Runtime_Semrelease(&sem, false)
+ Runtime_Semrelease(&sem, false, 0)
})
}
diff --git a/libgo/go/sync/rwmutex.go b/libgo/go/sync/rwmutex.go
index 16a2f92..dc0faf6 100644
--- a/libgo/go/sync/rwmutex.go
+++ b/libgo/go/sync/rwmutex.go
@@ -47,7 +47,7 @@ func (rw *RWMutex) RLock() {
}
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// A writer is pending, wait for it.
- runtime_SemacquireMutex(&rw.readerSem, false)
+ runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
if race.Enabled {
race.Enable()
@@ -66,21 +66,26 @@ func (rw *RWMutex) RUnlock() {
race.Disable()
}
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
- if r+1 == 0 || r+1 == -rwmutexMaxReaders {
- race.Enable()
- throw("sync: RUnlock of unlocked RWMutex")
- }
- // A writer is pending.
- if atomic.AddInt32(&rw.readerWait, -1) == 0 {
- // The last reader unblocks the writer.
- runtime_Semrelease(&rw.writerSem, false)
- }
+ // Outlined slow-path to allow the fast-path to be inlined
+ rw.rUnlockSlow(r)
}
if race.Enabled {
race.Enable()
}
}
+func (rw *RWMutex) rUnlockSlow(r int32) {
+ if r+1 == 0 || r+1 == -rwmutexMaxReaders {
+ race.Enable()
+ throw("sync: RUnlock of unlocked RWMutex")
+ }
+ // A writer is pending.
+ if atomic.AddInt32(&rw.readerWait, -1) == 0 {
+ // The last reader unblocks the writer.
+ runtime_Semrelease(&rw.writerSem, false, 1)
+ }
+}
+
// Lock locks rw for writing.
// If the lock is already locked for reading or writing,
// Lock blocks until the lock is available.
@@ -95,7 +100,7 @@ func (rw *RWMutex) Lock() {
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// Wait for active readers.
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
- runtime_SemacquireMutex(&rw.writerSem, false)
+ runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
if race.Enabled {
race.Enable()
@@ -125,7 +130,7 @@ func (rw *RWMutex) Unlock() {
}
// Unblock blocked readers, if any.
for i := 0; i < int(r); i++ {
- runtime_Semrelease(&rw.readerSem, false)
+ runtime_Semrelease(&rw.readerSem, false, 0)
}
// Allow other writers to proceed.
rw.w.Unlock()
diff --git a/libgo/go/sync/waitgroup.go b/libgo/go/sync/waitgroup.go
index 99dd400..e81a493 100644
--- a/libgo/go/sync/waitgroup.go
+++ b/libgo/go/sync/waitgroup.go
@@ -90,7 +90,7 @@ func (wg *WaitGroup) Add(delta int) {
// Reset waiters count to 0.
*statep = 0
for ; w != 0; w-- {
- runtime_Semrelease(semap, false)
+ runtime_Semrelease(semap, false, 0)
}
}