diff options
Diffstat (limited to 'libgo/go/net/fd.go')
-rw-r--r-- | libgo/go/net/fd.go | 533 |
1 files changed, 533 insertions, 0 deletions
diff --git a/libgo/go/net/fd.go b/libgo/go/net/fd.go new file mode 100644 index 0000000..d300e4b --- /dev/null +++ b/libgo/go/net/fd.go @@ -0,0 +1,533 @@ +// Copyright 2009 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// TODO(rsc): All the prints in this file should go to standard error. + +package net + +import ( + "io" + "os" + "sync" + "syscall" + "time" +) + +// Network file descriptor. +type netFD struct { + // locking/lifetime of sysfd + sysmu sync.Mutex + sysref int + closing bool + + // immutable until Close + sysfd int + family int + proto int + sysfile *os.File + cr chan bool + cw chan bool + net string + laddr Addr + raddr Addr + + // owned by client + rdeadline_delta int64 + rdeadline int64 + rio sync.Mutex + wdeadline_delta int64 + wdeadline int64 + wio sync.Mutex + + // owned by fd wait server + ncr, ncw int +} + +type InvalidConnError struct{} + +func (e *InvalidConnError) String() string { return "invalid net.Conn" } +func (e *InvalidConnError) Temporary() bool { return false } +func (e *InvalidConnError) Timeout() bool { return false } + +// A pollServer helps FDs determine when to retry a non-blocking +// read or write after they get EAGAIN. When an FD needs to wait, +// send the fd on s.cr (for a read) or s.cw (for a write) to pass the +// request to the poll server. Then receive on fd.cr/fd.cw. +// When the pollServer finds that i/o on FD should be possible +// again, it will send fd on fd.cr/fd.cw to wake any waiting processes. +// This protocol is implemented as s.WaitRead() and s.WaitWrite(). +// +// There is one subtlety: when sending on s.cr/s.cw, the +// poll server is probably in a system call, waiting for an fd +// to become ready. It's not looking at the request channels. +// To resolve this, the poll server waits not just on the FDs it has +// been given but also its own pipe. After sending on the +// buffered channel s.cr/s.cw, WaitRead/WaitWrite writes a +// byte to the pipe, causing the pollServer's poll system call to +// return. In response to the pipe being readable, the pollServer +// re-polls its request channels. +// +// Note that the ordering is "send request" and then "wake up server". +// If the operations were reversed, there would be a race: the poll +// server might wake up and look at the request channel, see that it +// was empty, and go back to sleep, all before the requester managed +// to send the request. Because the send must complete before the wakeup, +// the request channel must be buffered. A buffer of size 1 is sufficient +// for any request load. If many processes are trying to submit requests, +// one will succeed, the pollServer will read the request, and then the +// channel will be empty for the next process's request. A larger buffer +// might help batch requests. +// +// To avoid races in closing, all fd operations are locked and +// refcounted. when netFD.Close() is called, it calls syscall.Shutdown +// and sets a closing flag. Only when the last reference is removed +// will the fd be closed. + +type pollServer struct { + cr, cw chan *netFD // buffered >= 1 + pr, pw *os.File + pending map[int]*netFD + poll *pollster // low-level OS hooks + deadline int64 // next deadline (nsec since 1970) +} + +func (s *pollServer) AddFD(fd *netFD, mode int) { + intfd := fd.sysfd + if intfd < 0 { + // fd closed underfoot + if mode == 'r' { + fd.cr <- true + } else { + fd.cw <- true + } + return + } + if err := s.poll.AddFD(intfd, mode, false); err != nil { + panic("pollServer AddFD " + err.String()) + return + } + + var t int64 + key := intfd << 1 + if mode == 'r' { + fd.ncr++ + t = fd.rdeadline + } else { + fd.ncw++ + key++ + t = fd.wdeadline + } + s.pending[key] = fd + if t > 0 && (s.deadline == 0 || t < s.deadline) { + s.deadline = t + } +} + +func (s *pollServer) LookupFD(fd int, mode int) *netFD { + key := fd << 1 + if mode == 'w' { + key++ + } + netfd, ok := s.pending[key] + if !ok { + return nil + } + s.pending[key] = nil, false + return netfd +} + +func (s *pollServer) WakeFD(fd *netFD, mode int) { + if mode == 'r' { + for fd.ncr > 0 { + fd.ncr-- + fd.cr <- true + } + } else { + for fd.ncw > 0 { + fd.ncw-- + fd.cw <- true + } + } +} + +func (s *pollServer) Now() int64 { + return time.Nanoseconds() +} + +func (s *pollServer) CheckDeadlines() { + now := s.Now() + // TODO(rsc): This will need to be handled more efficiently, + // probably with a heap indexed by wakeup time. + + var next_deadline int64 + for key, fd := range s.pending { + var t int64 + var mode int + if key&1 == 0 { + mode = 'r' + } else { + mode = 'w' + } + if mode == 'r' { + t = fd.rdeadline + } else { + t = fd.wdeadline + } + if t > 0 { + if t <= now { + s.pending[key] = nil, false + if mode == 'r' { + s.poll.DelFD(fd.sysfd, mode) + fd.rdeadline = -1 + } else { + s.poll.DelFD(fd.sysfd, mode) + fd.wdeadline = -1 + } + s.WakeFD(fd, mode) + } else if next_deadline == 0 || t < next_deadline { + next_deadline = t + } + } + } + s.deadline = next_deadline +} + +func (s *pollServer) Run() { + var scratch [100]byte + for { + var t = s.deadline + if t > 0 { + t = t - s.Now() + if t <= 0 { + s.CheckDeadlines() + continue + } + } + fd, mode, err := s.poll.WaitFD(t) + if err != nil { + print("pollServer WaitFD: ", err.String(), "\n") + return + } + if fd < 0 { + // Timeout happened. + s.CheckDeadlines() + continue + } + if fd == s.pr.Fd() { + // Drain our wakeup pipe. + for nn, _ := s.pr.Read(scratch[0:]); nn > 0; { + nn, _ = s.pr.Read(scratch[0:]) + } + // Read from channels + for fd, ok := <-s.cr; ok; fd, ok = <-s.cr { + s.AddFD(fd, 'r') + } + for fd, ok := <-s.cw; ok; fd, ok = <-s.cw { + s.AddFD(fd, 'w') + } + } else { + netfd := s.LookupFD(fd, mode) + if netfd == nil { + print("pollServer: unexpected wakeup for fd=", fd, " mode=", string(mode), "\n") + continue + } + s.WakeFD(netfd, mode) + } + } +} + +var wakeupbuf [1]byte + +func (s *pollServer) Wakeup() { s.pw.Write(wakeupbuf[0:]) } + +func (s *pollServer) WaitRead(fd *netFD) { + s.cr <- fd + s.Wakeup() + <-fd.cr +} + +func (s *pollServer) WaitWrite(fd *netFD) { + s.cw <- fd + s.Wakeup() + <-fd.cw +} + +// Network FD methods. +// All the network FDs use a single pollServer. + +var pollserver *pollServer +var onceStartServer sync.Once + +func startServer() { + p, err := newPollServer() + if err != nil { + print("Start pollServer: ", err.String(), "\n") + } + pollserver = p +} + +func newFD(fd, family, proto int, net string, laddr, raddr Addr) (f *netFD, err os.Error) { + onceStartServer.Do(startServer) + if e := syscall.SetNonblock(fd, true); e != 0 { + return nil, &OpError{"setnonblock", net, laddr, os.Errno(e)} + } + f = &netFD{ + sysfd: fd, + family: family, + proto: proto, + net: net, + laddr: laddr, + raddr: raddr, + } + var ls, rs string + if laddr != nil { + ls = laddr.String() + } + if raddr != nil { + rs = raddr.String() + } + f.sysfile = os.NewFile(fd, net+":"+ls+"->"+rs) + f.cr = make(chan bool, 1) + f.cw = make(chan bool, 1) + return f, nil +} + +// Add a reference to this fd. +func (fd *netFD) incref() { + fd.sysmu.Lock() + fd.sysref++ + fd.sysmu.Unlock() +} + +// Remove a reference to this FD and close if we've been asked to do so (and +// there are no references left. +func (fd *netFD) decref() { + fd.sysmu.Lock() + fd.sysref-- + if fd.closing && fd.sysref == 0 && fd.sysfd >= 0 { + // In case the user has set linger, switch to blocking mode so + // the close blocks. As long as this doesn't happen often, we + // can handle the extra OS processes. Otherwise we'll need to + // use the pollserver for Close too. Sigh. + syscall.SetNonblock(fd.sysfd, false) + fd.sysfile.Close() + fd.sysfile = nil + fd.sysfd = -1 + } + fd.sysmu.Unlock() +} + +func (fd *netFD) Close() os.Error { + if fd == nil || fd.sysfile == nil { + return os.EINVAL + } + + fd.incref() + syscall.Shutdown(fd.sysfd, syscall.SHUT_RDWR) + fd.closing = true + fd.decref() + return nil +} + +func (fd *netFD) Read(p []byte) (n int, err os.Error) { + if fd == nil { + return 0, os.EINVAL + } + fd.rio.Lock() + defer fd.rio.Unlock() + fd.incref() + defer fd.decref() + if fd.sysfile == nil { + return 0, os.EINVAL + } + if fd.rdeadline_delta > 0 { + fd.rdeadline = pollserver.Now() + fd.rdeadline_delta + } else { + fd.rdeadline = 0 + } + var oserr os.Error + for { + var errno int + n, errno = syscall.Read(fd.sysfile.Fd(), p) + if errno == syscall.EAGAIN && fd.rdeadline >= 0 { + pollserver.WaitRead(fd) + continue + } + if errno != 0 { + n = 0 + oserr = os.Errno(errno) + } else if n == 0 && errno == 0 && fd.proto != syscall.SOCK_DGRAM { + err = os.EOF + } + break + } + if oserr != nil { + err = &OpError{"read", fd.net, fd.raddr, oserr} + } + return +} + +func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err os.Error) { + if fd == nil || fd.sysfile == nil { + return 0, nil, os.EINVAL + } + fd.rio.Lock() + defer fd.rio.Unlock() + fd.incref() + defer fd.decref() + if fd.rdeadline_delta > 0 { + fd.rdeadline = pollserver.Now() + fd.rdeadline_delta + } else { + fd.rdeadline = 0 + } + var oserr os.Error + for { + var errno int + n, sa, errno = syscall.Recvfrom(fd.sysfd, p, 0) + if errno == syscall.EAGAIN && fd.rdeadline >= 0 { + pollserver.WaitRead(fd) + continue + } + if errno != 0 { + n = 0 + oserr = os.Errno(errno) + } + break + } + if oserr != nil { + err = &OpError{"read", fd.net, fd.laddr, oserr} + } + return +} + +func (fd *netFD) Write(p []byte) (n int, err os.Error) { + if fd == nil { + return 0, os.EINVAL + } + fd.wio.Lock() + defer fd.wio.Unlock() + fd.incref() + defer fd.decref() + if fd.sysfile == nil { + return 0, os.EINVAL + } + if fd.wdeadline_delta > 0 { + fd.wdeadline = pollserver.Now() + fd.wdeadline_delta + } else { + fd.wdeadline = 0 + } + nn := 0 + var oserr os.Error + + for { + n, errno := syscall.Write(fd.sysfile.Fd(), p[nn:]) + if n > 0 { + nn += n + } + if nn == len(p) { + break + } + if errno == syscall.EAGAIN && fd.wdeadline >= 0 { + pollserver.WaitWrite(fd) + continue + } + if errno != 0 { + n = 0 + oserr = os.Errno(errno) + break + } + if n == 0 { + oserr = io.ErrUnexpectedEOF + break + } + } + if oserr != nil { + err = &OpError{"write", fd.net, fd.raddr, oserr} + } + return nn, err +} + +func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err os.Error) { + if fd == nil || fd.sysfile == nil { + return 0, os.EINVAL + } + fd.wio.Lock() + defer fd.wio.Unlock() + fd.incref() + defer fd.decref() + if fd.wdeadline_delta > 0 { + fd.wdeadline = pollserver.Now() + fd.wdeadline_delta + } else { + fd.wdeadline = 0 + } + var oserr os.Error + for { + errno := syscall.Sendto(fd.sysfd, p, 0, sa) + if errno == syscall.EAGAIN && fd.wdeadline >= 0 { + pollserver.WaitWrite(fd) + continue + } + if errno != 0 { + oserr = os.Errno(errno) + } + break + } + if oserr == nil { + n = len(p) + } else { + err = &OpError{"write", fd.net, fd.raddr, oserr} + } + return +} + +func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (nfd *netFD, err os.Error) { + if fd == nil || fd.sysfile == nil { + return nil, os.EINVAL + } + + fd.incref() + defer fd.decref() + + // See ../syscall/exec.go for description of ForkLock. + // It is okay to hold the lock across syscall.Accept + // because we have put fd.sysfd into non-blocking mode. + syscall.ForkLock.RLock() + var s, e int + var sa syscall.Sockaddr + for { + s, sa, e = syscall.Accept(fd.sysfd) + if e != syscall.EAGAIN { + break + } + syscall.ForkLock.RUnlock() + pollserver.WaitRead(fd) + syscall.ForkLock.RLock() + } + if e != 0 { + syscall.ForkLock.RUnlock() + return nil, &OpError{"accept", fd.net, fd.laddr, os.Errno(e)} + } + syscall.CloseOnExec(s) + syscall.ForkLock.RUnlock() + + if nfd, err = newFD(s, fd.family, fd.proto, fd.net, fd.laddr, toAddr(sa)); err != nil { + syscall.Close(s) + return nil, err + } + return nfd, nil +} + +func (fd *netFD) dup() (f *os.File, err os.Error) { + ns, e := syscall.Dup(fd.sysfd) + if e != 0 { + return nil, &OpError{"dup", fd.net, fd.laddr, os.Errno(e)} + } + + // We want blocking mode for the new fd, hence the double negative. + if e = syscall.SetNonblock(ns, false); e != 0 { + return nil, &OpError{"setnonblock", fd.net, fd.laddr, os.Errno(e)} + } + + return os.NewFile(ns, fd.sysfile.Name()), nil +} |