diff options
author | Ian Lance Taylor <ian@gcc.gnu.org> | 2018-02-08 18:22:39 +0000 |
---|---|---|
committer | Ian Lance Taylor <ian@gcc.gnu.org> | 2018-02-08 18:22:39 +0000 |
commit | 78cce47cff6efdfc96821cc3d94e789abca1c39a (patch) | |
tree | f26b729ad50f3c18a319ff63f115f7eeb5c01eb5 /libgo/go/net/pipe.go | |
parent | 42c1be422c610d844de315d7de6fd29c28afa1ae (diff) | |
parent | 74e6f14adb7057b29d361cc35c76f16663d1e649 (diff) | |
download | gcc-78cce47cff6efdfc96821cc3d94e789abca1c39a.zip gcc-78cce47cff6efdfc96821cc3d94e789abca1c39a.tar.gz gcc-78cce47cff6efdfc96821cc3d94e789abca1c39a.tar.bz2 |
Merge from trunk revision 257495.
From-SVN: r257499
Diffstat (limited to 'libgo/go/net/pipe.go')
-rw-r--r-- | libgo/go/net/pipe.go | 232 |
1 files changed, 204 insertions, 28 deletions
diff --git a/libgo/go/net/pipe.go b/libgo/go/net/pipe.go index 37e552f..9177fc4 100644 --- a/libgo/go/net/pipe.go +++ b/libgo/go/net/pipe.go @@ -5,63 +5,239 @@ package net import ( - "errors" "io" + "sync" "time" ) +// pipeDeadline is an abstraction for handling timeouts. +type pipeDeadline struct { + mu sync.Mutex // Guards timer and cancel + timer *time.Timer + cancel chan struct{} // Must be non-nil +} + +func makePipeDeadline() pipeDeadline { + return pipeDeadline{cancel: make(chan struct{})} +} + +// set sets the point in time when the deadline will time out. +// A timeout event is signaled by closing the channel returned by waiter. +// Once a timeout has occurred, the deadline can be refreshed by specifying a +// t value in the future. +// +// A zero value for t prevents timeout. +func (d *pipeDeadline) set(t time.Time) { + d.mu.Lock() + defer d.mu.Unlock() + + if d.timer != nil && !d.timer.Stop() { + <-d.cancel // Wait for the timer callback to finish and close cancel + } + d.timer = nil + + // Time is zero, then there is no deadline. + closed := isClosedChan(d.cancel) + if t.IsZero() { + if closed { + d.cancel = make(chan struct{}) + } + return + } + + // Time in the future, setup a timer to cancel in the future. + if dur := time.Until(t); dur > 0 { + if closed { + d.cancel = make(chan struct{}) + } + d.timer = time.AfterFunc(dur, func() { + close(d.cancel) + }) + return + } + + // Time in the past, so close immediately. + if !closed { + close(d.cancel) + } +} + +// wait returns a channel that is closed when the deadline is exceeded. +func (d *pipeDeadline) wait() chan struct{} { + d.mu.Lock() + defer d.mu.Unlock() + return d.cancel +} + +func isClosedChan(c <-chan struct{}) bool { + select { + case <-c: + return true + default: + return false + } +} + +type timeoutError struct{} + +func (timeoutError) Error() string { return "deadline exceeded" } +func (timeoutError) Timeout() bool { return true } +func (timeoutError) Temporary() bool { return true } + +type pipeAddr struct{} + +func (pipeAddr) Network() string { return "pipe" } +func (pipeAddr) String() string { return "pipe" } + +type pipe struct { + wrMu sync.Mutex // Serialize Write operations + + // Used by local Read to interact with remote Write. + // Successful receive on rdRx is always followed by send on rdTx. + rdRx <-chan []byte + rdTx chan<- int + + // Used by local Write to interact with remote Read. + // Successful send on wrTx is always followed by receive on wrRx. + wrTx chan<- []byte + wrRx <-chan int + + once sync.Once // Protects closing localDone + localDone chan struct{} + remoteDone <-chan struct{} + + readDeadline pipeDeadline + writeDeadline pipeDeadline +} + // Pipe creates a synchronous, in-memory, full duplex // network connection; both ends implement the Conn interface. // Reads on one end are matched with writes on the other, // copying data directly between the two; there is no internal // buffering. func Pipe() (Conn, Conn) { - r1, w1 := io.Pipe() - r2, w2 := io.Pipe() - - return &pipe{r1, w2}, &pipe{r2, w1} -} + cb1 := make(chan []byte) + cb2 := make(chan []byte) + cn1 := make(chan int) + cn2 := make(chan int) + done1 := make(chan struct{}) + done2 := make(chan struct{}) -type pipe struct { - *io.PipeReader - *io.PipeWriter + p1 := &pipe{ + rdRx: cb1, rdTx: cn1, + wrTx: cb2, wrRx: cn2, + localDone: done1, remoteDone: done2, + readDeadline: makePipeDeadline(), + writeDeadline: makePipeDeadline(), + } + p2 := &pipe{ + rdRx: cb2, rdTx: cn2, + wrTx: cb1, wrRx: cn1, + localDone: done2, remoteDone: done1, + readDeadline: makePipeDeadline(), + writeDeadline: makePipeDeadline(), + } + return p1, p2 } -type pipeAddr int +func (*pipe) LocalAddr() Addr { return pipeAddr{} } +func (*pipe) RemoteAddr() Addr { return pipeAddr{} } -func (pipeAddr) Network() string { - return "pipe" +func (p *pipe) Read(b []byte) (int, error) { + n, err := p.read(b) + if err != nil && err != io.EOF && err != io.ErrClosedPipe { + err = &OpError{Op: "read", Net: "pipe", Err: err} + } + return n, err } -func (pipeAddr) String() string { - return "pipe" -} +func (p *pipe) read(b []byte) (n int, err error) { + switch { + case isClosedChan(p.localDone): + return 0, io.ErrClosedPipe + case isClosedChan(p.remoteDone): + return 0, io.EOF + case isClosedChan(p.readDeadline.wait()): + return 0, timeoutError{} + } -func (p *pipe) Close() error { - err := p.PipeReader.Close() - err1 := p.PipeWriter.Close() - if err == nil { - err = err1 + select { + case bw := <-p.rdRx: + nr := copy(b, bw) + p.rdTx <- nr + return nr, nil + case <-p.localDone: + return 0, io.ErrClosedPipe + case <-p.remoteDone: + return 0, io.EOF + case <-p.readDeadline.wait(): + return 0, timeoutError{} } - return err } -func (p *pipe) LocalAddr() Addr { - return pipeAddr(0) +func (p *pipe) Write(b []byte) (int, error) { + n, err := p.write(b) + if err != nil && err != io.ErrClosedPipe { + err = &OpError{Op: "write", Net: "pipe", Err: err} + } + return n, err } -func (p *pipe) RemoteAddr() Addr { - return pipeAddr(0) +func (p *pipe) write(b []byte) (n int, err error) { + switch { + case isClosedChan(p.localDone): + return 0, io.ErrClosedPipe + case isClosedChan(p.remoteDone): + return 0, io.ErrClosedPipe + case isClosedChan(p.writeDeadline.wait()): + return 0, timeoutError{} + } + + p.wrMu.Lock() // Ensure entirety of b is written together + defer p.wrMu.Unlock() + for once := true; once || len(b) > 0; once = false { + select { + case p.wrTx <- b: + nw := <-p.wrRx + b = b[nw:] + n += nw + case <-p.localDone: + return n, io.ErrClosedPipe + case <-p.remoteDone: + return n, io.ErrClosedPipe + case <-p.writeDeadline.wait(): + return n, timeoutError{} + } + } + return n, nil } func (p *pipe) SetDeadline(t time.Time) error { - return &OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")} + if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) { + return io.ErrClosedPipe + } + p.readDeadline.set(t) + p.writeDeadline.set(t) + return nil } func (p *pipe) SetReadDeadline(t time.Time) error { - return &OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")} + if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) { + return io.ErrClosedPipe + } + p.readDeadline.set(t) + return nil } func (p *pipe) SetWriteDeadline(t time.Time) error { - return &OpError{Op: "set", Net: "pipe", Source: nil, Addr: nil, Err: errors.New("deadline not supported")} + if isClosedChan(p.localDone) || isClosedChan(p.remoteDone) { + return io.ErrClosedPipe + } + p.writeDeadline.set(t) + return nil +} + +func (p *pipe) Close() error { + p.once.Do(func() { close(p.localDone) }) + return nil } |