aboutsummaryrefslogtreecommitdiff
path: root/libgo/go/runtime/chan.go
diff options
context:
space:
mode:
Diffstat (limited to 'libgo/go/runtime/chan.go')
-rw-r--r--libgo/go/runtime/chan.go90
1 files changed, 69 insertions, 21 deletions
diff --git a/libgo/go/runtime/chan.go b/libgo/go/runtime/chan.go
index 291fe00..549e566 100644
--- a/libgo/go/runtime/chan.go
+++ b/libgo/go/runtime/chan.go
@@ -133,6 +133,21 @@ func chanbuf(c *hchan, i uint) unsafe.Pointer {
return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}
+// full reports whether a send on c would block (that is, the channel is full).
+// It uses a single word-sized read of mutable state, so although
+// the answer is instantaneously true, the correct answer may have changed
+// by the time the calling function receives the return value.
+func full(c *hchan) bool {
+ // c.dataqsiz is immutable (never written after the channel is created)
+ // so it is safe to read at any time during channel operation.
+ if c.dataqsiz == 0 {
+ // Assumes that a pointer read is relaxed-atomic.
+ return c.recvq.first == nil
+ }
+ // Assumes that a uint read is relaxed-atomic.
+ return c.qcount == c.dataqsiz
+}
+
// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
@@ -177,7 +192,7 @@ func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
- // (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
+ // (first c.closed and second full()).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
@@ -186,9 +201,10 @@ func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
- // channel wasn't closed during the first observation.
- if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
- (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
+ // channel wasn't closed during the first observation. However, nothing here
+ // guarantees forward progress. We rely on the side effects of lock release in
+ // chanrecv() and closechan() to update this thread's view of c.closed and full().
+ if !block && c.closed == 0 && full(c) {
return false
}
@@ -250,7 +266,7 @@ func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
- goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
+ gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
@@ -262,6 +278,7 @@ func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
throw("G waiting list is corrupted")
}
gp.waiting = nil
+ gp.activeStackChans = false
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
@@ -417,6 +434,16 @@ func closechan(c *hchan) {
}
}
+// empty reports whether a read from c would block (that is, the channel is
+// empty). It uses a single atomic read of mutable state.
+func empty(c *hchan) bool {
+ // c.dataqsiz is immutable.
+ if c.dataqsiz == 0 {
+ return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil
+ }
+ return atomic.Loaduint(&c.qcount) == 0
+}
+
// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
@@ -457,21 +484,33 @@ func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
- //
- // After observing that the channel is not ready for receiving, we observe that the
- // channel is not closed. Each of these observations is a single word-sized read
- // (first c.sendq.first or c.qcount, and second c.closed).
- // Because a channel cannot be reopened, the later observation of the channel
- // being not closed implies that it was also not closed at the moment of the
- // first observation. We behave as if we observed the channel at that moment
- // and report that the receive cannot proceed.
- //
- // The order of operations is important here: reversing the operations can lead to
- // incorrect behavior when racing with a close.
- if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
- c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
- atomic.Load(&c.closed) == 0 {
- return
+ if !block && empty(c) {
+ // After observing that the channel is not ready for receiving, we observe whether the
+ // channel is closed.
+ //
+ // Reordering of these checks could lead to incorrect behavior when racing with a close.
+ // For example, if the channel was open and not empty, was closed, and then drained,
+ // reordered reads could incorrectly indicate "open and empty". To prevent reordering,
+ // we use atomic loads for both checks, and rely on emptying and closing to happen in
+ // separate critical sections under the same lock. This assumption fails when closing
+ // an unbuffered channel with a blocked send, but that is an error condition anyway.
+ if atomic.Load(&c.closed) == 0 {
+ // Because a channel cannot be reopened, the later observation of the channel
+ // being not closed implies that it was also not closed at the moment of the
+ // first observation. We behave as if we observed the channel at that moment
+ // and report that the receive cannot proceed.
+ return
+ }
+ // The channel is irreversibly closed. Re-check whether the channel has any pending data
+ // to receive, which could have arrived between the empty and closed checks above.
+ // Sequential consistency is also required here, when racing with such a send.
+ if empty(c) {
+ // The channel is irreversibly closed and empty.
+ if ep != nil {
+ typedmemclr(c.elemtype, ep)
+ }
+ return true, false
+ }
}
var t0 int64
@@ -543,13 +582,14 @@ func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
- goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
+ gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
+ gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
@@ -616,6 +656,14 @@ func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
goready(gp, skip+1)
}
+func chanparkcommit(gp *g, chanLock unsafe.Pointer) bool {
+ // There are unlocked sudogs that point into gp's stack. Stack
+ // copying must lock the channels of those sudogs.
+ gp.activeStackChans = true
+ unlock((*mutex)(chanLock))
+ return true
+}
+
// compiler implements
//
// select {