aboutsummaryrefslogtreecommitdiff
path: root/libgo/go/net/fd_plan9.go
diff options
context:
space:
mode:
Diffstat (limited to 'libgo/go/net/fd_plan9.go')
-rw-r--r--libgo/go/net/fd_plan9.go149
1 files changed, 132 insertions, 17 deletions
diff --git a/libgo/go/net/fd_plan9.go b/libgo/go/net/fd_plan9.go
index 7533232..300d8c4 100644
--- a/libgo/go/net/fd_plan9.go
+++ b/libgo/go/net/fd_plan9.go
@@ -7,21 +7,37 @@ package net
import (
"io"
"os"
+ "sync/atomic"
"syscall"
"time"
)
+type atomicBool int32
+
+func (b *atomicBool) isSet() bool { return atomic.LoadInt32((*int32)(b)) != 0 }
+func (b *atomicBool) setFalse() { atomic.StoreInt32((*int32)(b), 0) }
+func (b *atomicBool) setTrue() { atomic.StoreInt32((*int32)(b), 1) }
+
// Network file descriptor.
type netFD struct {
// locking/lifetime of sysfd + serialize access to Read and Write methods
fdmu fdMutex
// immutable until Close
- net string
- n string
- dir string
- ctl, data *os.File
- laddr, raddr Addr
+ net string
+ n string
+ dir string
+ listen, ctl, data *os.File
+ laddr, raddr Addr
+ isStream bool
+
+ // deadlines
+ raio *asyncIO
+ waio *asyncIO
+ rtimer *time.Timer
+ wtimer *time.Timer
+ rtimedout atomicBool // set true when read deadline has been reached
+ wtimedout atomicBool // set true when write deadline has been reached
}
var (
@@ -32,8 +48,16 @@ func sysInit() {
netdir = "/net"
}
-func newFD(net, name string, ctl, data *os.File, laddr, raddr Addr) (*netFD, error) {
- return &netFD{net: net, n: name, dir: netdir + "/" + net + "/" + name, ctl: ctl, data: data, laddr: laddr, raddr: raddr}, nil
+func newFD(net, name string, listen, ctl, data *os.File, laddr, raddr Addr) (*netFD, error) {
+ return &netFD{
+ net: net,
+ n: name,
+ dir: netdir + "/" + net + "/" + name,
+ listen: listen,
+ ctl: ctl, data: data,
+ laddr: laddr,
+ raddr: raddr,
+ }, nil
}
func (fd *netFD) init() error {
@@ -64,11 +88,20 @@ func (fd *netFD) destroy() {
err = err1
}
}
+ if fd.listen != nil {
+ if err1 := fd.listen.Close(); err1 != nil && err == nil {
+ err = err1
+ }
+ }
fd.ctl = nil
fd.data = nil
+ fd.listen = nil
}
func (fd *netFD) Read(b []byte) (n int, err error) {
+ if fd.rtimedout.isSet() {
+ return 0, errTimeout
+ }
if !fd.ok() || fd.data == nil {
return 0, syscall.EINVAL
}
@@ -79,10 +112,15 @@ func (fd *netFD) Read(b []byte) (n int, err error) {
if len(b) == 0 {
return 0, nil
}
- n, err = fd.data.Read(b)
+ fd.raio = newAsyncIO(fd.data.Read, b)
+ n, err = fd.raio.Wait()
+ fd.raio = nil
if isHangup(err) {
err = io.EOF
}
+ if isInterrupted(err) {
+ err = errTimeout
+ }
if fd.net == "udp" && err == io.EOF {
n = 0
err = nil
@@ -91,6 +129,9 @@ func (fd *netFD) Read(b []byte) (n int, err error) {
}
func (fd *netFD) Write(b []byte) (n int, err error) {
+ if fd.wtimedout.isSet() {
+ return 0, errTimeout
+ }
if !fd.ok() || fd.data == nil {
return 0, syscall.EINVAL
}
@@ -98,7 +139,13 @@ func (fd *netFD) Write(b []byte) (n int, err error) {
return 0, err
}
defer fd.writeUnlock()
- return fd.data.Write(b)
+ fd.waio = newAsyncIO(fd.data.Write, b)
+ n, err = fd.waio.Wait()
+ fd.waio = nil
+ if isInterrupted(err) {
+ err = errTimeout
+ }
+ return
}
func (fd *netFD) closeRead() error {
@@ -124,11 +171,10 @@ func (fd *netFD) Close() error {
}
if fd.net == "tcp" {
// The following line is required to unblock Reads.
- // For some reason, WriteString returns an error:
- // "write /net/tcp/39/listen: inappropriate use of fd"
- // But without it, Reads on dead conns hang forever.
- // See Issue 9554.
- fd.ctl.WriteString("hangup")
+ _, err := fd.ctl.WriteString("close")
+ if err != nil {
+ return err
+ }
}
err := fd.ctl.Close()
if fd.data != nil {
@@ -136,8 +182,14 @@ func (fd *netFD) Close() error {
err = err1
}
}
+ if fd.listen != nil {
+ if err1 := fd.listen.Close(); err1 != nil && err == nil {
+ err = err1
+ }
+ }
fd.ctl = nil
fd.data = nil
+ fd.listen = nil
return err
}
@@ -165,15 +217,74 @@ func (fd *netFD) file(f *os.File, s string) (*os.File, error) {
}
func (fd *netFD) setDeadline(t time.Time) error {
- return syscall.EPLAN9
+ return setDeadlineImpl(fd, t, 'r'+'w')
}
func (fd *netFD) setReadDeadline(t time.Time) error {
- return syscall.EPLAN9
+ return setDeadlineImpl(fd, t, 'r')
}
func (fd *netFD) setWriteDeadline(t time.Time) error {
- return syscall.EPLAN9
+ return setDeadlineImpl(fd, t, 'w')
+}
+
+func setDeadlineImpl(fd *netFD, t time.Time, mode int) error {
+ d := t.Sub(time.Now())
+ if mode == 'r' || mode == 'r'+'w' {
+ fd.rtimedout.setFalse()
+ }
+ if mode == 'w' || mode == 'r'+'w' {
+ fd.wtimedout.setFalse()
+ }
+ if t.IsZero() || d < 0 {
+ // Stop timer
+ if mode == 'r' || mode == 'r'+'w' {
+ if fd.rtimer != nil {
+ fd.rtimer.Stop()
+ }
+ fd.rtimer = nil
+ }
+ if mode == 'w' || mode == 'r'+'w' {
+ if fd.wtimer != nil {
+ fd.wtimer.Stop()
+ }
+ fd.wtimer = nil
+ }
+ } else {
+ // Interrupt I/O operation once timer has expired
+ if mode == 'r' || mode == 'r'+'w' {
+ fd.rtimer = time.AfterFunc(d, func() {
+ fd.rtimedout.setTrue()
+ if fd.raio != nil {
+ fd.raio.Cancel()
+ }
+ })
+ }
+ if mode == 'w' || mode == 'r'+'w' {
+ fd.wtimer = time.AfterFunc(d, func() {
+ fd.wtimedout.setTrue()
+ if fd.waio != nil {
+ fd.waio.Cancel()
+ }
+ })
+ }
+ }
+ if !t.IsZero() && d < 0 {
+ // Interrupt current I/O operation
+ if mode == 'r' || mode == 'r'+'w' {
+ fd.rtimedout.setTrue()
+ if fd.raio != nil {
+ fd.raio.Cancel()
+ }
+ }
+ if mode == 'w' || mode == 'r'+'w' {
+ fd.wtimedout.setTrue()
+ if fd.waio != nil {
+ fd.waio.Cancel()
+ }
+ }
+ }
+ return nil
}
func setReadBuffer(fd *netFD, bytes int) error {
@@ -187,3 +298,7 @@ func setWriteBuffer(fd *netFD, bytes int) error {
func isHangup(err error) bool {
return err != nil && stringsHasSuffix(err.Error(), "Hangup")
}
+
+func isInterrupted(err error) bool {
+ return err != nil && stringsHasSuffix(err.Error(), "interrupted")
+}