aboutsummaryrefslogtreecommitdiff
path: root/libgo/go/net/pipe.go
diff options
context:
space:
mode:
authorIan Lance Taylor <ian@gcc.gnu.org>2018-02-08 18:22:39 +0000
committerIan Lance Taylor <ian@gcc.gnu.org>2018-02-08 18:22:39 +0000
commit78cce47cff6efdfc96821cc3d94e789abca1c39a (patch)
treef26b729ad50f3c18a319ff63f115f7eeb5c01eb5 /libgo/go/net/pipe.go
parent42c1be422c610d844de315d7de6fd29c28afa1ae (diff)
parent74e6f14adb7057b29d361cc35c76f16663d1e649 (diff)
downloadgcc-78cce47cff6efdfc96821cc3d94e789abca1c39a.zip
gcc-78cce47cff6efdfc96821cc3d94e789abca1c39a.tar.gz
gcc-78cce47cff6efdfc96821cc3d94e789abca1c39a.tar.bz2
Merge from trunk revision 257495.
From-SVN: r257499
Diffstat (limited to 'libgo/go/net/pipe.go')
-rw-r--r--libgo/go/net/pipe.go232
1 files changed, 204 insertions, 28 deletions
diff --git a/libgo/go/net/pipe.go b/libgo/go/net/pipe.go
index 37e552f..9177fc4 100644
--- a/libgo/go/net/pipe.go
+++ b/libgo/go/net/pipe.go
@@ -5,63 +5,239 @@
package net
import (
- "errors"
"io"
+ "sync"
"time"
)
+// pipeDeadline is an abstraction for handling timeouts.
+type pipeDeadline struct {
+ mu sync.Mutex // Guards timer and cancel
+ timer *time.Timer
+ cancel chan struct{} // Must be non-nil
+}
+
+func makePipeDeadline() pipeDeadline {
+ return pipeDeadline{cancel: make(chan struct{})}
+}
+
+// set sets the point in time when the deadline will time out.
+// A timeout event is signaled by closing the channel returned by waiter.
+// Once a timeout has occurred, the deadline can be refreshed by specifying a
+// t value in the future.
+//
+// A zero value for t prevents timeout.
+func (d *pipeDeadline) set(t time.Time) {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+
+ if d.timer != nil && !d.timer.Stop() {
+ <-d.cancel // Wait for the timer callback to finish and close cancel
+ }
+ d.timer = nil
+
+ // Time is zero, then there is no deadline.
+ closed := isClosedChan(d.cancel)
+ if t.IsZero() {
+ if closed {
+ d.cancel = make(chan struct{})
+ }
+ return
+ }
+
+ // Time in the future, setup a timer to cancel in the future.
+ if dur := time.Until(t); dur > 0 {
+ if closed {
+ d.cancel = make(chan struct{})
+ }
+ d.timer = time.AfterFunc(dur, func() {
+ close(d.cancel)
+ })
+ return
+ }
+
+ // Time in the past, so close immediately.
+ if !closed {
+ close(d.cancel)
+ }
+}
+
+// wait returns a channel that is closed when the deadline is exceeded.
+func (d *pipeDeadline) wait() chan struct{} {
+ d.mu.Lock()
+ defer d.mu.Unlock()
+ return d.cancel
+}
+
+func isClosedChan(c <-chan struct{}) bool {
+ select {
+ case <-c:
+ return true
+ default:
+ return false
+ }
+}
+
+type timeoutError struct{}
+
+func (timeoutError) Error() string { return "deadline exceeded" }
+func (timeoutError) Timeout() bool { return true }
+func (timeoutError) Temporary() bool { return true }
+
+type pipeAddr struct{}
+
+func (pipeAddr) Network() string { return "pipe" }
+func (pipeAddr) String() string { return "pipe" }
+
+type pipe struct {
+ wrMu sync.Mutex // Serialize Write operations
+
+ // Used by local Read to interact with remote Write.
+ // Successful receive on rdRx is always followed by send on rdTx.
+ rdRx <-chan []byte
+ rdTx chan<- int
+
+ // Used by local Write to interact with remote Read.
+ // Successful send on wrTx is always followed by receive on wrRx.
+ wrTx chan<- []byte
+ wrRx <-chan int
+
+ once sync.Once // Protects closing localDone
+ localDone chan struct{}
+ remoteDone <-chan struct{}
+
+ readDeadline pipeDeadline
+ writeDeadline pipeDeadline
+}
+
// Pipe creates a synchronous, in-memory, full duplex
// network connection; both ends implement the Conn interface.
// Reads on one end are matched with writes on the other,
// copying data directly between the two; there is no internal
// buffering.
func Pipe() (Conn, Conn) {
- r1, w1 := io.Pipe()
- r2, w2 := io.Pipe()
-
- return &pipe{r1, w2}, &pipe{r2, w1}
-}
+ cb1 := make(chan []byte)
+ cb2 := make(chan []byte)
+ cn1 := make(chan int)
+ cn2 := make(chan int)
+ done1 := make(chan struct{})
+ done2 := make(chan struct{})
-type pipe struct {
- *io.PipeReader
- *io.PipeWriter
+ p1 := &pipe{
+ rdRx: cb1, rdTx: cn1,
+ wrTx: cb2, wrRx: cn2,
+ localDone: done1, remoteDone: done2,
+ readDeadline: makePipeDeadline(),
+ writeDeadline: makePipeDeadline(),
+ }
+ p2 := &pipe{
+ rdRx: cb2, rdTx: cn2,
+ wrTx: cb1, wrRx: cn1,
+ localDone: done2, remoteDone: done1,
+ readDeadline: makePipeDeadline(),
+ writeDeadline: makePipeDeadline(),
+ }
+ return p1, p2
}
-type pipeAddr int
+func (*pipe) LocalAddr() Addr { return pipeAddr{} }
+func (*pipe) RemoteAddr() Addr { return pipeAddr{} }
-func (pipeAddr) Network() string {
- return "pipe"
+func (p *pipe) Read(b []byte) (int, error) {
+ n, err := p.read(b)
+ if err != nil && err != io.EOF && err != io.ErrClosedPipe {
+ err = &OpError{Op: "read", Net: "pipe", Err: err}
+ }
+ return n, err
}
-func (pipeAddr) String() string {
- return "pipe"
-}
+func (p *pipe) read(b []byte) (n int, err error) {
+ switch {
+ case isClosedChan(p.localDone):
+ return 0, io.ErrClosedPipe
+ case isClosedChan(p.remoteDone):
+ return 0, io.EOF
+ case isClosedChan(p.readDeadline.wait()):
+ return 0, timeoutError{}
+ }
-func (p *pipe) Close() error {
- err := p.PipeReader.Close()
- err1 := p.PipeWriter.Close()
- if err == nil {
- err = err1
+ select {
+ case bw := <-p.rdRx:
+ nr := copy(b, bw)
+ p.rdTx <- nr
+ return nr, nil
+ case <-p.localDone:
+ return 0, io.ErrClosedPipe
+ case <-p.remoteDone:
+ return 0, io.EOF
+ case <-p.readDeadline.wait():
+ return 0, timeoutError{}
}
- return err
}
-func (p *pipe) LocalAddr() Addr {
- return pipeAddr(0)
+func (p *pipe) Write(b []byte) (int, error) {
+ n, err := p.write(b)
+ if err != nil && err != io.ErrClosedPipe {
+ err = &OpError{Op: "write", Net: "pipe", Err: err}
+ }
+ return n, err
}
-func (p *pipe) RemoteAddr() Addr {
- return pipeAddr(0)
+func (p *pipe) write(b []byte) (n int, err error) {
+ switch {
+ case isClosedChan(p.localDone):
+ return 0, io.ErrClosedPipe
+ case isClosedChan(p.remoteDone):
+ return 0, io.ErrClosedPipe
+ case isClosedChan(p.writeDeadline.wait()):
+ return 0, timeoutError{}
+ }
+
+ p.wrMu.Lock() // Ensure entirety of b is written together
+ defer p.wrMu.Unlock()
+ for once := true; once || len(b) > 0; once = false {
+ select {
+ case p.wrTx <- b:
+ nw := <-p.wrRx
+ b = b[nw:]
+ n += nw
+ case <-p.localDone:
+ return n, io.ErrClosedPipe
+ case <-p.remoteDone:
+ return n, io.ErrClosedPipe
+ case <-p.writeDeadline.wait():
+ return n, timeoutError{}
+ }
+ }
+ return n, nil
}
func (p *pipe) SetDeadline(t time.Time) error {
- return &OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
+ if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
+ return io.ErrClosedPipe
+ }
+ p.readDeadline.set(t)
+ p.writeDeadline.set(t)
+ return nil
}
func (p *pipe) SetReadDeadline(t time.Time) error {
- return &OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
+ if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
+ return io.ErrClosedPipe
+ }
+ p.readDeadline.set(t)
+ return nil
}
func (p *pipe) SetWriteDeadline(t time.Time) error {
- return &OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")}
+ if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) {
+ return io.ErrClosedPipe
+ }
+ p.writeDeadline.set(t)
+ return nil
+}
+
+func (p *pipe) Close() error {
+ p.once.Do(func() { close(p.localDone) })
+ return nil
}