diff options
Diffstat (limited to 'libgo/go/old/netchan/common.go')
-rw-r--r-- | libgo/go/old/netchan/common.go | 338 |
1 files changed, 0 insertions, 338 deletions
diff --git a/libgo/go/old/netchan/common.go b/libgo/go/old/netchan/common.go deleted file mode 100644 index d0daf53..0000000 --- a/libgo/go/old/netchan/common.go +++ /dev/null @@ -1,338 +0,0 @@ -// Copyright 2010 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package netchan - -import ( - "encoding/gob" - "errors" - "io" - "reflect" - "sync" - "time" -) - -// The direction of a connection from the client's perspective. -type Dir int - -const ( - Recv Dir = iota - Send -) - -func (dir Dir) String() string { - switch dir { - case Recv: - return "Recv" - case Send: - return "Send" - } - return "???" -} - -// Payload types -const ( - payRequest = iota // request structure follows - payError // error structure follows - payData // user payload follows - payAck // acknowledgement; no payload - payClosed // channel is now closed - payAckSend // payload has been delivered. -) - -// A header is sent as a prefix to every transmission. It will be followed by -// a request structure, an error structure, or an arbitrary user payload structure. -type header struct { - Id int - PayloadType int - SeqNum int64 -} - -// Sent with a header once per channel from importer to exporter to report -// that it wants to bind to a channel with the specified direction for count -// messages, with space for size buffered values. If count is -1, it means unlimited. -type request struct { - Name string - Count int64 - Size int - Dir Dir -} - -// Sent with a header to report an error. -type error_ struct { - Error string -} - -// Used to unify management of acknowledgements for import and export. -type unackedCounter interface { - unackedCount() int64 - ack() int64 - seq() int64 -} - -// A channel and its direction. -type chanDir struct { - ch reflect.Value - dir Dir -} - -// clientSet contains the objects and methods needed for tracking -// clients of an exporter and draining outstanding messages. -type clientSet struct { - mu sync.Mutex // protects access to channel and client maps - names map[string]*chanDir - clients map[unackedCounter]bool -} - -// Mutex-protected encoder and decoder pair. -type encDec struct { - decLock sync.Mutex - dec *gob.Decoder - encLock sync.Mutex - enc *gob.Encoder -} - -func newEncDec(conn io.ReadWriter) *encDec { - return &encDec{ - dec: gob.NewDecoder(conn), - enc: gob.NewEncoder(conn), - } -} - -// Decode an item from the connection. -func (ed *encDec) decode(value reflect.Value) error { - ed.decLock.Lock() - err := ed.dec.DecodeValue(value) - if err != nil { - // TODO: tear down connection? - } - ed.decLock.Unlock() - return err -} - -// Encode a header and payload onto the connection. -func (ed *encDec) encode(hdr *header, payloadType int, payload interface{}) error { - ed.encLock.Lock() - hdr.PayloadType = payloadType - err := ed.enc.Encode(hdr) - if err == nil { - if payload != nil { - err = ed.enc.Encode(payload) - } - } - if err != nil { - // TODO: tear down connection if there is an error? - } - ed.encLock.Unlock() - return err -} - -// See the comment for Exporter.Drain. -func (cs *clientSet) drain(timeout time.Duration) error { - deadline := time.Now().Add(timeout) - for { - pending := false - cs.mu.Lock() - // Any messages waiting for a client? - for _, chDir := range cs.names { - if chDir.ch.Len() > 0 { - pending = true - } - } - // Any unacknowledged messages? - for client := range cs.clients { - n := client.unackedCount() - if n > 0 { // Check for > rather than != just to be safe. - pending = true - break - } - } - cs.mu.Unlock() - if !pending { - break - } - if timeout > 0 && time.Now().After(deadline) { - return errors.New("timeout") - } - time.Sleep(100 * time.Millisecond) - } - return nil -} - -// See the comment for Exporter.Sync. -func (cs *clientSet) sync(timeout time.Duration) error { - deadline := time.Now().Add(timeout) - // seq remembers the clients and their seqNum at point of entry. - seq := make(map[unackedCounter]int64) - cs.mu.Lock() - for client := range cs.clients { - seq[client] = client.seq() - } - cs.mu.Unlock() - for { - pending := false - cs.mu.Lock() - // Any unacknowledged messages? Look only at clients that existed - // when we started and are still in this client set. - for client := range seq { - if _, ok := cs.clients[client]; ok { - if client.ack() < seq[client] { - pending = true - break - } - } - } - cs.mu.Unlock() - if !pending { - break - } - if timeout > 0 && time.Now().After(deadline) { - return errors.New("timeout") - } - time.Sleep(100 * time.Millisecond) - } - return nil -} - -// A netChan represents a channel imported or exported -// on a single connection. Flow is controlled by the receiving -// side by sending payAckSend messages when values -// are delivered into the local channel. -type netChan struct { - *chanDir - name string - id int - size int // buffer size of channel. - closed bool - - // sender-specific state - ackCh chan bool // buffered with space for all the acks we need - space int // available space. - - // receiver-specific state - sendCh chan reflect.Value // buffered channel of values received from other end. - ed *encDec // so that we can send acks. - count int64 // number of values still to receive. -} - -// Create a new netChan with the given name (only used for -// messages), id, direction, buffer size, and count. -// The connection to the other side is represented by ed. -func newNetChan(name string, id int, ch *chanDir, ed *encDec, size int, count int64) *netChan { - c := &netChan{chanDir: ch, name: name, id: id, size: size, ed: ed, count: count} - if c.dir == Send { - c.ackCh = make(chan bool, size) - c.space = size - } - return c -} - -// Close the channel. -func (nch *netChan) close() { - if nch.closed { - return - } - if nch.dir == Recv { - if nch.sendCh != nil { - // If the sender goroutine is active, close the channel to it. - // It will close nch.ch when it can. - close(nch.sendCh) - } else { - nch.ch.Close() - } - } else { - nch.ch.Close() - close(nch.ackCh) - } - nch.closed = true -} - -// Send message from remote side to local receiver. -func (nch *netChan) send(val reflect.Value) { - if nch.dir != Recv { - panic("send on wrong direction of channel") - } - if nch.sendCh == nil { - // If possible, do local send directly and ack immediately. - if nch.ch.TrySend(val) { - nch.sendAck() - return - } - // Start sender goroutine to manage delayed delivery of values. - nch.sendCh = make(chan reflect.Value, nch.size) - go nch.sender() - } - select { - case nch.sendCh <- val: - // ok - default: - // TODO: should this be more resilient? - panic("netchan: remote sender sent more values than allowed") - } -} - -// sendAck sends an acknowledgment that a message has left -// the channel's buffer. If the messages remaining to be sent -// will fit in the channel's buffer, then we don't -// need to send an ack. -func (nch *netChan) sendAck() { - if nch.count < 0 || nch.count > int64(nch.size) { - nch.ed.encode(&header{Id: nch.id}, payAckSend, nil) - } - if nch.count > 0 { - nch.count-- - } -} - -// The sender process forwards items from the sending queue -// to the destination channel, acknowledging each item. -func (nch *netChan) sender() { - if nch.dir != Recv { - panic("sender on wrong direction of channel") - } - // When Exporter.Hangup is called, the underlying channel is closed, - // and so we may get a "too many operations on closed channel" error - // if there are outstanding messages in sendCh. - // Make sure that this doesn't panic the whole program. - defer func() { - if r := recover(); r != nil { - // TODO check that r is "too many operations", otherwise re-panic. - } - }() - for v := range nch.sendCh { - nch.ch.Send(v) - nch.sendAck() - } - nch.ch.Close() -} - -// Receive value from local side for sending to remote side. -func (nch *netChan) recv() (val reflect.Value, ok bool) { - if nch.dir != Send { - panic("recv on wrong direction of channel") - } - - if nch.space == 0 { - // Wait for buffer space. - <-nch.ackCh - nch.space++ - } - nch.space-- - return nch.ch.Recv() -} - -// acked is called when the remote side indicates that -// a value has been delivered. -func (nch *netChan) acked() { - if nch.dir != Send { - panic("recv on wrong direction of channel") - } - select { - case nch.ackCh <- true: - // ok - default: - // TODO: should this be more resilient? - panic("netchan: remote receiver sent too many acks") - } -} |