diff options
Diffstat (limited to 'libgo/go/old')
-rw-r--r-- | libgo/go/old/netchan/common.go | 338 | ||||
-rw-r--r-- | libgo/go/old/netchan/export.go | 400 | ||||
-rw-r--r-- | libgo/go/old/netchan/import.go | 287 | ||||
-rw-r--r-- | libgo/go/old/netchan/netchan_test.go | 447 |
4 files changed, 0 insertions, 1472 deletions
diff --git a/libgo/go/old/netchan/common.go b/libgo/go/old/netchan/common.go deleted file mode 100644 index d0daf53..0000000 --- a/libgo/go/old/netchan/common.go +++ /dev/null @@ -1,338 +0,0 @@ -// Copyright 2010 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package netchan - -import ( - "encoding/gob" - "errors" - "io" - "reflect" - "sync" - "time" -) - -// The direction of a connection from the client's perspective. -type Dir int - -const ( - Recv Dir = iota - Send -) - -func (dir Dir) String() string { - switch dir { - case Recv: - return "Recv" - case Send: - return "Send" - } - return "???" -} - -// Payload types -const ( - payRequest = iota // request structure follows - payError // error structure follows - payData // user payload follows - payAck // acknowledgement; no payload - payClosed // channel is now closed - payAckSend // payload has been delivered. -) - -// A header is sent as a prefix to every transmission. It will be followed by -// a request structure, an error structure, or an arbitrary user payload structure. -type header struct { - Id int - PayloadType int - SeqNum int64 -} - -// Sent with a header once per channel from importer to exporter to report -// that it wants to bind to a channel with the specified direction for count -// messages, with space for size buffered values. If count is -1, it means unlimited. -type request struct { - Name string - Count int64 - Size int - Dir Dir -} - -// Sent with a header to report an error. -type error_ struct { - Error string -} - -// Used to unify management of acknowledgements for import and export. -type unackedCounter interface { - unackedCount() int64 - ack() int64 - seq() int64 -} - -// A channel and its direction. -type chanDir struct { - ch reflect.Value - dir Dir -} - -// clientSet contains the objects and methods needed for tracking -// clients of an exporter and draining outstanding messages. -type clientSet struct { - mu sync.Mutex // protects access to channel and client maps - names map[string]*chanDir - clients map[unackedCounter]bool -} - -// Mutex-protected encoder and decoder pair. -type encDec struct { - decLock sync.Mutex - dec *gob.Decoder - encLock sync.Mutex - enc *gob.Encoder -} - -func newEncDec(conn io.ReadWriter) *encDec { - return &encDec{ - dec: gob.NewDecoder(conn), - enc: gob.NewEncoder(conn), - } -} - -// Decode an item from the connection. -func (ed *encDec) decode(value reflect.Value) error { - ed.decLock.Lock() - err := ed.dec.DecodeValue(value) - if err != nil { - // TODO: tear down connection? - } - ed.decLock.Unlock() - return err -} - -// Encode a header and payload onto the connection. -func (ed *encDec) encode(hdr *header, payloadType int, payload interface{}) error { - ed.encLock.Lock() - hdr.PayloadType = payloadType - err := ed.enc.Encode(hdr) - if err == nil { - if payload != nil { - err = ed.enc.Encode(payload) - } - } - if err != nil { - // TODO: tear down connection if there is an error? - } - ed.encLock.Unlock() - return err -} - -// See the comment for Exporter.Drain. -func (cs *clientSet) drain(timeout time.Duration) error { - deadline := time.Now().Add(timeout) - for { - pending := false - cs.mu.Lock() - // Any messages waiting for a client? - for _, chDir := range cs.names { - if chDir.ch.Len() > 0 { - pending = true - } - } - // Any unacknowledged messages? - for client := range cs.clients { - n := client.unackedCount() - if n > 0 { // Check for > rather than != just to be safe. - pending = true - break - } - } - cs.mu.Unlock() - if !pending { - break - } - if timeout > 0 && time.Now().After(deadline) { - return errors.New("timeout") - } - time.Sleep(100 * time.Millisecond) - } - return nil -} - -// See the comment for Exporter.Sync. -func (cs *clientSet) sync(timeout time.Duration) error { - deadline := time.Now().Add(timeout) - // seq remembers the clients and their seqNum at point of entry. - seq := make(map[unackedCounter]int64) - cs.mu.Lock() - for client := range cs.clients { - seq[client] = client.seq() - } - cs.mu.Unlock() - for { - pending := false - cs.mu.Lock() - // Any unacknowledged messages? Look only at clients that existed - // when we started and are still in this client set. - for client := range seq { - if _, ok := cs.clients[client]; ok { - if client.ack() < seq[client] { - pending = true - break - } - } - } - cs.mu.Unlock() - if !pending { - break - } - if timeout > 0 && time.Now().After(deadline) { - return errors.New("timeout") - } - time.Sleep(100 * time.Millisecond) - } - return nil -} - -// A netChan represents a channel imported or exported -// on a single connection. Flow is controlled by the receiving -// side by sending payAckSend messages when values -// are delivered into the local channel. -type netChan struct { - *chanDir - name string - id int - size int // buffer size of channel. - closed bool - - // sender-specific state - ackCh chan bool // buffered with space for all the acks we need - space int // available space. - - // receiver-specific state - sendCh chan reflect.Value // buffered channel of values received from other end. - ed *encDec // so that we can send acks. - count int64 // number of values still to receive. -} - -// Create a new netChan with the given name (only used for -// messages), id, direction, buffer size, and count. -// The connection to the other side is represented by ed. -func newNetChan(name string, id int, ch *chanDir, ed *encDec, size int, count int64) *netChan { - c := &netChan{chanDir: ch, name: name, id: id, size: size, ed: ed, count: count} - if c.dir == Send { - c.ackCh = make(chan bool, size) - c.space = size - } - return c -} - -// Close the channel. -func (nch *netChan) close() { - if nch.closed { - return - } - if nch.dir == Recv { - if nch.sendCh != nil { - // If the sender goroutine is active, close the channel to it. - // It will close nch.ch when it can. - close(nch.sendCh) - } else { - nch.ch.Close() - } - } else { - nch.ch.Close() - close(nch.ackCh) - } - nch.closed = true -} - -// Send message from remote side to local receiver. -func (nch *netChan) send(val reflect.Value) { - if nch.dir != Recv { - panic("send on wrong direction of channel") - } - if nch.sendCh == nil { - // If possible, do local send directly and ack immediately. - if nch.ch.TrySend(val) { - nch.sendAck() - return - } - // Start sender goroutine to manage delayed delivery of values. - nch.sendCh = make(chan reflect.Value, nch.size) - go nch.sender() - } - select { - case nch.sendCh <- val: - // ok - default: - // TODO: should this be more resilient? - panic("netchan: remote sender sent more values than allowed") - } -} - -// sendAck sends an acknowledgment that a message has left -// the channel's buffer. If the messages remaining to be sent -// will fit in the channel's buffer, then we don't -// need to send an ack. -func (nch *netChan) sendAck() { - if nch.count < 0 || nch.count > int64(nch.size) { - nch.ed.encode(&header{Id: nch.id}, payAckSend, nil) - } - if nch.count > 0 { - nch.count-- - } -} - -// The sender process forwards items from the sending queue -// to the destination channel, acknowledging each item. -func (nch *netChan) sender() { - if nch.dir != Recv { - panic("sender on wrong direction of channel") - } - // When Exporter.Hangup is called, the underlying channel is closed, - // and so we may get a "too many operations on closed channel" error - // if there are outstanding messages in sendCh. - // Make sure that this doesn't panic the whole program. - defer func() { - if r := recover(); r != nil { - // TODO check that r is "too many operations", otherwise re-panic. - } - }() - for v := range nch.sendCh { - nch.ch.Send(v) - nch.sendAck() - } - nch.ch.Close() -} - -// Receive value from local side for sending to remote side. -func (nch *netChan) recv() (val reflect.Value, ok bool) { - if nch.dir != Send { - panic("recv on wrong direction of channel") - } - - if nch.space == 0 { - // Wait for buffer space. - <-nch.ackCh - nch.space++ - } - nch.space-- - return nch.ch.Recv() -} - -// acked is called when the remote side indicates that -// a value has been delivered. -func (nch *netChan) acked() { - if nch.dir != Send { - panic("recv on wrong direction of channel") - } - select { - case nch.ackCh <- true: - // ok - default: - // TODO: should this be more resilient? - panic("netchan: remote receiver sent too many acks") - } -} diff --git a/libgo/go/old/netchan/export.go b/libgo/go/old/netchan/export.go deleted file mode 100644 index a65b260..0000000 --- a/libgo/go/old/netchan/export.go +++ /dev/null @@ -1,400 +0,0 @@ -// Copyright 2010 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -/* - Package netchan implements type-safe networked channels: - it allows the two ends of a channel to appear on different - computers connected by a network. It does this by transporting - data sent to a channel on one machine so it can be recovered - by a receive of a channel of the same type on the other. - - An exporter publishes a set of channels by name. An importer - connects to the exporting machine and imports the channels - by name. After importing the channels, the two machines can - use the channels in the usual way. - - Networked channels are not synchronized; they always behave - as if they are buffered channels of at least one element. -*/ -package netchan - -// BUG: can't use range clause to receive when using ImportNValues to limit the count. - -import ( - "errors" - "io" - "log" - "net" - "reflect" - "strconv" - "sync" - "time" -) - -// Export - -// expLog is a logging convenience function. The first argument must be a string. -func expLog(args ...interface{}) { - args[0] = "netchan export: " + args[0].(string) - log.Print(args...) -} - -// An Exporter allows a set of channels to be published on a single -// network port. A single machine may have multiple Exporters -// but they must use different ports. -type Exporter struct { - *clientSet -} - -type expClient struct { - *encDec - exp *Exporter - chans map[int]*netChan // channels in use by client - mu sync.Mutex // protects remaining fields - errored bool // client has been sent an error - seqNum int64 // sequences messages sent to client; has value of highest sent - ackNum int64 // highest sequence number acknowledged - seqLock sync.Mutex // guarantees messages are in sequence, only locked under mu -} - -func newClient(exp *Exporter, conn io.ReadWriter) *expClient { - client := new(expClient) - client.exp = exp - client.encDec = newEncDec(conn) - client.seqNum = 0 - client.ackNum = 0 - client.chans = make(map[int]*netChan) - return client -} - -func (client *expClient) sendError(hdr *header, err string) { - error := &error_{err} - expLog("sending error to client:", error.Error) - client.encode(hdr, payError, error) // ignore any encode error, hope client gets it - client.mu.Lock() - client.errored = true - client.mu.Unlock() -} - -func (client *expClient) newChan(hdr *header, dir Dir, name string, size int, count int64) *netChan { - exp := client.exp - exp.mu.Lock() - ech, ok := exp.names[name] - exp.mu.Unlock() - if !ok { - client.sendError(hdr, "no such channel: "+name) - return nil - } - if ech.dir != dir { - client.sendError(hdr, "wrong direction for channel: "+name) - return nil - } - nch := newNetChan(name, hdr.Id, ech, client.encDec, size, count) - client.chans[hdr.Id] = nch - return nch -} - -func (client *expClient) getChan(hdr *header, dir Dir) *netChan { - nch := client.chans[hdr.Id] - if nch == nil { - return nil - } - if nch.dir != dir { - client.sendError(hdr, "wrong direction for channel: "+nch.name) - } - return nch -} - -// The function run manages sends and receives for a single client. For each -// (client Recv) request, this will launch a serveRecv goroutine to deliver -// the data for that channel, while (client Send) requests are handled as -// data arrives from the client. -func (client *expClient) run() { - hdr := new(header) - hdrValue := reflect.ValueOf(hdr) - req := new(request) - reqValue := reflect.ValueOf(req) - error := new(error_) - for { - *hdr = header{} - if err := client.decode(hdrValue); err != nil { - if err != io.EOF { - expLog("error decoding client header:", err) - } - break - } - switch hdr.PayloadType { - case payRequest: - *req = request{} - if err := client.decode(reqValue); err != nil { - expLog("error decoding client request:", err) - break - } - if req.Size < 1 { - panic("netchan: remote requested " + strconv.Itoa(req.Size) + " values") - } - switch req.Dir { - case Recv: - // look up channel before calling serveRecv to - // avoid a lock around client.chans. - if nch := client.newChan(hdr, Send, req.Name, req.Size, req.Count); nch != nil { - go client.serveRecv(nch, *hdr, req.Count) - } - case Send: - client.newChan(hdr, Recv, req.Name, req.Size, req.Count) - // The actual sends will have payload type payData. - // TODO: manage the count? - default: - error.Error = "request: can't handle channel direction" - expLog(error.Error, req.Dir) - client.encode(hdr, payError, error) - } - case payData: - client.serveSend(*hdr) - case payClosed: - client.serveClosed(*hdr) - case payAck: - client.mu.Lock() - if client.ackNum != hdr.SeqNum-1 { - // Since the sequence number is incremented and the message is sent - // in a single instance of locking client.mu, the messages are guaranteed - // to be sent in order. Therefore receipt of acknowledgement N means - // all messages <=N have been seen by the recipient. We check anyway. - expLog("sequence out of order:", client.ackNum, hdr.SeqNum) - } - if client.ackNum < hdr.SeqNum { // If there has been an error, don't back up the count. - client.ackNum = hdr.SeqNum - } - client.mu.Unlock() - case payAckSend: - if nch := client.getChan(hdr, Send); nch != nil { - nch.acked() - } - default: - log.Fatal("netchan export: unknown payload type", hdr.PayloadType) - } - } - client.exp.delClient(client) -} - -// Send all the data on a single channel to a client asking for a Recv. -// The header is passed by value to avoid issues of overwriting. -func (client *expClient) serveRecv(nch *netChan, hdr header, count int64) { - for { - val, ok := nch.recv() - if !ok { - if err := client.encode(&hdr, payClosed, nil); err != nil { - expLog("error encoding server closed message:", err) - } - break - } - // We hold the lock during transmission to guarantee messages are - // sent in sequence number order. Also, we increment first so the - // value of client.SeqNum is the value of the highest used sequence - // number, not one beyond. - client.mu.Lock() - client.seqNum++ - hdr.SeqNum = client.seqNum - client.seqLock.Lock() // guarantee ordering of messages - client.mu.Unlock() - err := client.encode(&hdr, payData, val.Interface()) - client.seqLock.Unlock() - if err != nil { - expLog("error encoding client response:", err) - client.sendError(&hdr, err.Error()) - break - } - // Negative count means run forever. - if count >= 0 { - if count--; count <= 0 { - break - } - } - } -} - -// Receive and deliver locally one item from a client asking for a Send -// The header is passed by value to avoid issues of overwriting. -func (client *expClient) serveSend(hdr header) { - nch := client.getChan(&hdr, Recv) - if nch == nil { - return - } - // Create a new value for each received item. - val := reflect.New(nch.ch.Type().Elem()).Elem() - if err := client.decode(val); err != nil { - expLog("value decode:", err, "; type ", nch.ch.Type()) - return - } - nch.send(val) -} - -// Report that client has closed the channel that is sending to us. -// The header is passed by value to avoid issues of overwriting. -func (client *expClient) serveClosed(hdr header) { - nch := client.getChan(&hdr, Recv) - if nch == nil { - return - } - nch.close() -} - -func (client *expClient) unackedCount() int64 { - client.mu.Lock() - n := client.seqNum - client.ackNum - client.mu.Unlock() - return n -} - -func (client *expClient) seq() int64 { - client.mu.Lock() - n := client.seqNum - client.mu.Unlock() - return n -} - -func (client *expClient) ack() int64 { - client.mu.Lock() - n := client.seqNum - client.mu.Unlock() - return n -} - -// Serve waits for incoming connections on the listener -// and serves the Exporter's channels on each. -// It blocks until the listener is closed. -func (exp *Exporter) Serve(listener net.Listener) { - for { - conn, err := listener.Accept() - if err != nil { - expLog("listen:", err) - break - } - go exp.ServeConn(conn) - } -} - -// ServeConn exports the Exporter's channels on conn. -// It blocks until the connection is terminated. -func (exp *Exporter) ServeConn(conn io.ReadWriter) { - exp.addClient(conn).run() -} - -// NewExporter creates a new Exporter that exports a set of channels. -func NewExporter() *Exporter { - e := &Exporter{ - clientSet: &clientSet{ - names: make(map[string]*chanDir), - clients: make(map[unackedCounter]bool), - }, - } - return e -} - -// ListenAndServe exports the exporter's channels through the -// given network and local address defined as in net.Listen. -func (exp *Exporter) ListenAndServe(network, localaddr string) error { - listener, err := net.Listen(network, localaddr) - if err != nil { - return err - } - go exp.Serve(listener) - return nil -} - -// addClient creates a new expClient and records its existence -func (exp *Exporter) addClient(conn io.ReadWriter) *expClient { - client := newClient(exp, conn) - exp.mu.Lock() - exp.clients[client] = true - exp.mu.Unlock() - return client -} - -// delClient forgets the client existed -func (exp *Exporter) delClient(client *expClient) { - exp.mu.Lock() - delete(exp.clients, client) - exp.mu.Unlock() -} - -// Drain waits until all messages sent from this exporter/importer, including -// those not yet sent to any client and possibly including those sent while -// Drain was executing, have been received by the importer. In short, it -// waits until all the exporter's messages have been received by a client. -// If the timeout is positive and Drain takes longer than that to complete, -// an error is returned. -func (exp *Exporter) Drain(timeout time.Duration) error { - // This wrapper function is here so the method's comment will appear in godoc. - return exp.clientSet.drain(timeout) -} - -// Sync waits until all clients of the exporter have received the messages -// that were sent at the time Sync was invoked. Unlike Drain, it does not -// wait for messages sent while it is running or messages that have not been -// dispatched to any client. If the timeout is positive and Sync takes longer -// than that to complete, an error is returned. -func (exp *Exporter) Sync(timeout time.Duration) error { - // This wrapper function is here so the method's comment will appear in godoc. - return exp.clientSet.sync(timeout) -} - -func checkChan(chT interface{}, dir Dir) (reflect.Value, error) { - chanType := reflect.TypeOf(chT) - if chanType.Kind() != reflect.Chan { - return reflect.Value{}, errors.New("not a channel") - } - if dir != Send && dir != Recv { - return reflect.Value{}, errors.New("unknown channel direction") - } - switch chanType.ChanDir() { - case reflect.BothDir: - case reflect.SendDir: - if dir != Recv { - return reflect.Value{}, errors.New("to import/export with Send, must provide <-chan") - } - case reflect.RecvDir: - if dir != Send { - return reflect.Value{}, errors.New("to import/export with Recv, must provide chan<-") - } - } - return reflect.ValueOf(chT), nil -} - -// Export exports a channel of a given type and specified direction. The -// channel to be exported is provided in the call and may be of arbitrary -// channel type. -// Despite the literal signature, the effective signature is -// Export(name string, chT chan T, dir Dir) -func (exp *Exporter) Export(name string, chT interface{}, dir Dir) error { - ch, err := checkChan(chT, dir) - if err != nil { - return err - } - exp.mu.Lock() - defer exp.mu.Unlock() - _, present := exp.names[name] - if present { - return errors.New("channel name already being exported:" + name) - } - exp.names[name] = &chanDir{ch, dir} - return nil -} - -// Hangup disassociates the named channel from the Exporter and closes -// the channel. Messages in flight for the channel may be dropped. -func (exp *Exporter) Hangup(name string) error { - exp.mu.Lock() - chDir, ok := exp.names[name] - if ok { - delete(exp.names, name) - } - // TODO drop all instances of channel from client sets - exp.mu.Unlock() - if !ok { - return errors.New("netchan export: hangup: no such channel: " + name) - } - chDir.ch.Close() - return nil -} diff --git a/libgo/go/old/netchan/import.go b/libgo/go/old/netchan/import.go deleted file mode 100644 index 50abaa9..0000000 --- a/libgo/go/old/netchan/import.go +++ /dev/null @@ -1,287 +0,0 @@ -// Copyright 2010 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package netchan - -import ( - "errors" - "io" - "log" - "net" - "reflect" - "sync" - "time" -) - -// Import - -// impLog is a logging convenience function. The first argument must be a string. -func impLog(args ...interface{}) { - args[0] = "netchan import: " + args[0].(string) - log.Print(args...) -} - -// An Importer allows a set of channels to be imported from a single -// remote machine/network port. A machine may have multiple -// importers, even from the same machine/network port. -type Importer struct { - *encDec - chanLock sync.Mutex // protects access to channel map - names map[string]*netChan - chans map[int]*netChan - errors chan error - maxId int - mu sync.Mutex // protects remaining fields - unacked int64 // number of unacknowledged sends. - seqLock sync.Mutex // guarantees messages are in sequence, only locked under mu -} - -// NewImporter creates a new Importer object to import a set of channels -// from the given connection. The Exporter must be available and serving when -// the Importer is created. -func NewImporter(conn io.ReadWriter) *Importer { - imp := new(Importer) - imp.encDec = newEncDec(conn) - imp.chans = make(map[int]*netChan) - imp.names = make(map[string]*netChan) - imp.errors = make(chan error, 10) - imp.unacked = 0 - go imp.run() - return imp -} - -// Import imports a set of channels from the given network and address. -func Import(network, remoteaddr string) (*Importer, error) { - conn, err := net.Dial(network, remoteaddr) - if err != nil { - return nil, err - } - return NewImporter(conn), nil -} - -// shutdown closes all channels for which we are receiving data from the remote side. -func (imp *Importer) shutdown() { - imp.chanLock.Lock() - for _, ich := range imp.chans { - if ich.dir == Recv { - ich.close() - } - } - imp.chanLock.Unlock() -} - -// Handle the data from a single imported data stream, which will -// have the form -// (response, data)* -// The response identifies by name which channel is transmitting data. -func (imp *Importer) run() { - // Loop on responses; requests are sent by ImportNValues() - hdr := new(header) - hdrValue := reflect.ValueOf(hdr) - ackHdr := new(header) - err := new(error_) - errValue := reflect.ValueOf(err) - for { - *hdr = header{} - if e := imp.decode(hdrValue); e != nil { - if e != io.EOF { - impLog("header:", e) - imp.shutdown() - } - return - } - switch hdr.PayloadType { - case payData: - // done lower in loop - case payError: - if e := imp.decode(errValue); e != nil { - impLog("error:", e) - return - } - if err.Error != "" { - impLog("response error:", err.Error) - select { - case imp.errors <- errors.New(err.Error): - continue // errors are not acknowledged - default: - imp.shutdown() - return - } - } - case payClosed: - nch := imp.getChan(hdr.Id, false) - if nch != nil { - nch.close() - } - continue // closes are not acknowledged. - case payAckSend: - // we can receive spurious acks if the channel is - // hung up, so we ask getChan to ignore any errors. - nch := imp.getChan(hdr.Id, true) - if nch != nil { - nch.acked() - imp.mu.Lock() - imp.unacked-- - imp.mu.Unlock() - } - continue - default: - impLog("unexpected payload type:", hdr.PayloadType) - return - } - nch := imp.getChan(hdr.Id, false) - if nch == nil { - continue - } - if nch.dir != Recv { - impLog("cannot happen: receive from non-Recv channel") - return - } - // Acknowledge receipt - ackHdr.Id = hdr.Id - ackHdr.SeqNum = hdr.SeqNum - imp.encode(ackHdr, payAck, nil) - // Create a new value for each received item. - value := reflect.New(nch.ch.Type().Elem()).Elem() - if e := imp.decode(value); e != nil { - impLog("importer value decode:", e) - return - } - nch.send(value) - } -} - -func (imp *Importer) getChan(id int, errOk bool) *netChan { - imp.chanLock.Lock() - ich := imp.chans[id] - imp.chanLock.Unlock() - if ich == nil { - if !errOk { - impLog("unknown id in netchan request: ", id) - } - return nil - } - return ich -} - -// Errors returns a channel from which transmission and protocol errors -// can be read. Clients of the importer are not required to read the error -// channel for correct execution. However, if too many errors occur -// without being read from the error channel, the importer will shut down. -func (imp *Importer) Errors() chan error { - return imp.errors -} - -// Import imports a channel of the given type, size and specified direction. -// It is equivalent to ImportNValues with a count of -1, meaning unbounded. -func (imp *Importer) Import(name string, chT interface{}, dir Dir, size int) error { - return imp.ImportNValues(name, chT, dir, size, -1) -} - -// ImportNValues imports a channel of the given type and specified -// direction and then receives or transmits up to n values on that -// channel. A value of n==-1 implies an unbounded number of values. The -// channel will have buffer space for size values, or 1 value if size < 1. -// The channel to be bound to the remote site's channel is provided -// in the call and may be of arbitrary channel type. -// Despite the literal signature, the effective signature is -// ImportNValues(name string, chT chan T, dir Dir, size, n int) error -// Example usage: -// imp, err := NewImporter("tcp", "netchanserver.mydomain.com:1234") -// if err != nil { log.Fatal(err) } -// ch := make(chan myType) -// err = imp.ImportNValues("name", ch, Recv, 1, 1) -// if err != nil { log.Fatal(err) } -// fmt.Printf("%+v\n", <-ch) -func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, size, n int) error { - ch, err := checkChan(chT, dir) - if err != nil { - return err - } - imp.chanLock.Lock() - defer imp.chanLock.Unlock() - _, present := imp.names[name] - if present { - return errors.New("channel name already being imported:" + name) - } - if size < 1 { - size = 1 - } - id := imp.maxId - imp.maxId++ - nch := newNetChan(name, id, &chanDir{ch, dir}, imp.encDec, size, int64(n)) - imp.names[name] = nch - imp.chans[id] = nch - // Tell the other side about this channel. - hdr := &header{Id: id} - req := &request{Name: name, Count: int64(n), Dir: dir, Size: size} - if err = imp.encode(hdr, payRequest, req); err != nil { - impLog("request encode:", err) - return err - } - if dir == Send { - go func() { - for i := 0; n == -1 || i < n; i++ { - val, ok := nch.recv() - if !ok { - if err = imp.encode(hdr, payClosed, nil); err != nil { - impLog("error encoding client closed message:", err) - } - return - } - // We hold the lock during transmission to guarantee messages are - // sent in order. - imp.mu.Lock() - imp.unacked++ - imp.seqLock.Lock() - imp.mu.Unlock() - if err = imp.encode(hdr, payData, val.Interface()); err != nil { - impLog("error encoding client send:", err) - return - } - imp.seqLock.Unlock() - } - }() - } - return nil -} - -// Hangup disassociates the named channel from the Importer and closes -// the channel. Messages in flight for the channel may be dropped. -func (imp *Importer) Hangup(name string) error { - imp.chanLock.Lock() - defer imp.chanLock.Unlock() - nc := imp.names[name] - if nc == nil { - return errors.New("netchan import: hangup: no such channel: " + name) - } - delete(imp.names, name) - delete(imp.chans, nc.id) - nc.close() - return nil -} - -func (imp *Importer) unackedCount() int64 { - imp.mu.Lock() - n := imp.unacked - imp.mu.Unlock() - return n -} - -// Drain waits until all messages sent from this exporter/importer, including -// those not yet sent to any server and possibly including those sent while -// Drain was executing, have been received by the exporter. In short, it -// waits until all the importer's messages have been received. -// If the timeout (measured in nanoseconds) is positive and Drain takes -// longer than that to complete, an error is returned. -func (imp *Importer) Drain(timeout int64) error { - deadline := time.Now().Add(time.Duration(timeout)) - for imp.unackedCount() > 0 { - if timeout > 0 && time.Now().After(deadline) { - return errors.New("timeout") - } - time.Sleep(100 * time.Millisecond) - } - return nil -} diff --git a/libgo/go/old/netchan/netchan_test.go b/libgo/go/old/netchan/netchan_test.go deleted file mode 100644 index 9a7c076..0000000 --- a/libgo/go/old/netchan/netchan_test.go +++ /dev/null @@ -1,447 +0,0 @@ -// Copyright 2010 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package netchan - -import ( - "net" - "strings" - "testing" - "time" -) - -const count = 10 // number of items in most tests -const closeCount = 5 // number of items when sender closes early - -const base = 23 - -func exportSend(exp *Exporter, n int, t *testing.T, done chan bool) { - ch := make(chan int) - err := exp.Export("exportedSend", ch, Send) - if err != nil { - t.Fatal("exportSend:", err) - } - go func() { - for i := 0; i < n; i++ { - ch <- base + i - } - close(ch) - if done != nil { - done <- true - } - }() -} - -func exportReceive(exp *Exporter, t *testing.T, expDone chan bool) { - ch := make(chan int) - err := exp.Export("exportedRecv", ch, Recv) - expDone <- true - if err != nil { - t.Fatal("exportReceive:", err) - } - for i := 0; i < count; i++ { - v, ok := <-ch - if !ok { - if i != closeCount { - t.Errorf("exportReceive expected close at %d; got one at %d", closeCount, i) - } - break - } - if v != base+i { - t.Errorf("export Receive: bad value: expected %d+%d=%d; got %d", base, i, base+i, v) - } - } -} - -func importSend(imp *Importer, n int, t *testing.T, done chan bool) { - ch := make(chan int) - err := imp.ImportNValues("exportedRecv", ch, Send, 3, -1) - if err != nil { - t.Fatal("importSend:", err) - } - go func() { - for i := 0; i < n; i++ { - ch <- base + i - } - close(ch) - if done != nil { - done <- true - } - }() -} - -func importReceive(imp *Importer, t *testing.T, done chan bool) { - ch := make(chan int) - err := imp.ImportNValues("exportedSend", ch, Recv, 3, count) - if err != nil { - t.Fatal("importReceive:", err) - } - for i := 0; i < count; i++ { - v, ok := <-ch - if !ok { - if i != closeCount { - t.Errorf("importReceive expected close at %d; got one at %d", closeCount, i) - } - break - } - if v != base+i { - t.Errorf("importReceive: bad value: expected %d+%d=%d; got %+d", base, i, base+i, v) - } - } - if done != nil { - done <- true - } -} - -func TestExportSendImportReceive(t *testing.T) { - exp, imp := pair(t) - exportSend(exp, count, t, nil) - importReceive(imp, t, nil) -} - -func TestExportReceiveImportSend(t *testing.T) { - exp, imp := pair(t) - expDone := make(chan bool) - done := make(chan bool) - go func() { - exportReceive(exp, t, expDone) - done <- true - }() - <-expDone - importSend(imp, count, t, nil) - <-done -} - -func TestClosingExportSendImportReceive(t *testing.T) { - exp, imp := pair(t) - exportSend(exp, closeCount, t, nil) - importReceive(imp, t, nil) -} - -func TestClosingImportSendExportReceive(t *testing.T) { - exp, imp := pair(t) - expDone := make(chan bool) - done := make(chan bool) - go func() { - exportReceive(exp, t, expDone) - done <- true - }() - <-expDone - importSend(imp, closeCount, t, nil) - <-done -} - -func TestErrorForIllegalChannel(t *testing.T) { - exp, imp := pair(t) - // Now export a channel. - ch := make(chan int, 1) - err := exp.Export("aChannel", ch, Send) - if err != nil { - t.Fatal("export:", err) - } - ch <- 1234 - close(ch) - // Now try to import a different channel. - ch = make(chan int) - err = imp.Import("notAChannel", ch, Recv, 1) - if err != nil { - t.Fatal("import:", err) - } - // Expect an error now. Start a timeout. - timeout := make(chan bool, 1) // buffered so closure will not hang around. - go func() { - time.Sleep(10 * time.Second) // very long, to give even really slow machines a chance. - timeout <- true - }() - select { - case err = <-imp.Errors(): - if strings.Index(err.Error(), "no such channel") < 0 { - t.Error("wrong error for nonexistent channel:", err) - } - case <-timeout: - t.Error("import of nonexistent channel did not receive an error") - } -} - -// Not a great test but it does at least invoke Drain. -func TestExportDrain(t *testing.T) { - exp, imp := pair(t) - done := make(chan bool) - go func() { - exportSend(exp, closeCount, t, nil) - done <- true - }() - <-done - go importReceive(imp, t, done) - exp.Drain(0) - <-done -} - -// Not a great test but it does at least invoke Drain. -func TestImportDrain(t *testing.T) { - exp, imp := pair(t) - expDone := make(chan bool) - go exportReceive(exp, t, expDone) - <-expDone - importSend(imp, closeCount, t, nil) - imp.Drain(0) -} - -// Not a great test but it does at least invoke Sync. -func TestExportSync(t *testing.T) { - exp, imp := pair(t) - done := make(chan bool) - exportSend(exp, closeCount, t, nil) - go importReceive(imp, t, done) - exp.Sync(0) - <-done -} - -// Test hanging up the send side of an export. -// TODO: test hanging up the receive side of an export. -func TestExportHangup(t *testing.T) { - exp, imp := pair(t) - ech := make(chan int) - err := exp.Export("exportedSend", ech, Send) - if err != nil { - t.Fatal("export:", err) - } - // Prepare to receive two values. We'll actually deliver only one. - ich := make(chan int) - err = imp.ImportNValues("exportedSend", ich, Recv, 1, 2) - if err != nil { - t.Fatal("import exportedSend:", err) - } - // Send one value, receive it. - const Value = 1234 - ech <- Value - v := <-ich - if v != Value { - t.Fatal("expected", Value, "got", v) - } - // Now hang up the channel. Importer should see it close. - exp.Hangup("exportedSend") - v, ok := <-ich - if ok { - t.Fatal("expected channel to be closed; got value", v) - } -} - -// Test hanging up the send side of an import. -// TODO: test hanging up the receive side of an import. -func TestImportHangup(t *testing.T) { - exp, imp := pair(t) - ech := make(chan int) - err := exp.Export("exportedRecv", ech, Recv) - if err != nil { - t.Fatal("export:", err) - } - // Prepare to Send two values. We'll actually deliver only one. - ich := make(chan int) - err = imp.ImportNValues("exportedRecv", ich, Send, 1, 2) - if err != nil { - t.Fatal("import exportedRecv:", err) - } - // Send one value, receive it. - const Value = 1234 - ich <- Value - v := <-ech - if v != Value { - t.Fatal("expected", Value, "got", v) - } - // Now hang up the channel. Exporter should see it close. - imp.Hangup("exportedRecv") - v, ok := <-ech - if ok { - t.Fatal("expected channel to be closed; got value", v) - } -} - -// loop back exportedRecv to exportedSend, -// but receive a value from ctlch before starting the loop. -func exportLoopback(exp *Exporter, t *testing.T) { - inch := make(chan int) - if err := exp.Export("exportedRecv", inch, Recv); err != nil { - t.Fatal("exportRecv") - } - - outch := make(chan int) - if err := exp.Export("exportedSend", outch, Send); err != nil { - t.Fatal("exportSend") - } - - ctlch := make(chan int) - if err := exp.Export("exportedCtl", ctlch, Recv); err != nil { - t.Fatal("exportRecv") - } - - go func() { - <-ctlch - for i := 0; i < count; i++ { - x := <-inch - if x != base+i { - t.Errorf("exportLoopback expected %d; got %d", i, x) - } - outch <- x - } - }() -} - -// This test checks that channel operations can proceed -// even when other concurrent operations are blocked. -func TestIndependentSends(t *testing.T) { - if testing.Short() { - t.Logf("disabled test during -short") - return - } - exp, imp := pair(t) - - exportLoopback(exp, t) - - importSend(imp, count, t, nil) - done := make(chan bool) - go importReceive(imp, t, done) - - // wait for export side to try to deliver some values. - time.Sleep(250 * time.Millisecond) - - ctlch := make(chan int) - if err := imp.ImportNValues("exportedCtl", ctlch, Send, 1, 1); err != nil { - t.Fatal("importSend:", err) - } - ctlch <- 0 - - <-done -} - -// This test cross-connects a pair of exporter/importer pairs. -type value struct { - I int - Source string -} - -func TestCrossConnect(t *testing.T) { - e1, i1 := pair(t) - e2, i2 := pair(t) - - crossExport(e1, e2, t) - crossImport(i1, i2, t) -} - -// Export side of cross-traffic. -func crossExport(e1, e2 *Exporter, t *testing.T) { - s := make(chan value) - err := e1.Export("exportedSend", s, Send) - if err != nil { - t.Fatal("exportSend:", err) - } - - r := make(chan value) - err = e2.Export("exportedReceive", r, Recv) - if err != nil { - t.Fatal("exportReceive:", err) - } - - go crossLoop("export", s, r, t) -} - -// Import side of cross-traffic. -func crossImport(i1, i2 *Importer, t *testing.T) { - s := make(chan value) - err := i2.Import("exportedReceive", s, Send, 2) - if err != nil { - t.Fatal("import of exportedReceive:", err) - } - - r := make(chan value) - err = i1.Import("exportedSend", r, Recv, 2) - if err != nil { - t.Fatal("import of exported Send:", err) - } - - crossLoop("import", s, r, t) -} - -// Cross-traffic: send and receive 'count' numbers. -func crossLoop(name string, s, r chan value, t *testing.T) { - for si, ri := 0, 0; si < count && ri < count; { - select { - case s <- value{si, name}: - si++ - case v := <-r: - if v.I != ri { - t.Errorf("loop: bad value: expected %d, hello; got %+v", ri, v) - } - ri++ - } - } -} - -const flowCount = 100 - -// test flow control from exporter to importer. -func TestExportFlowControl(t *testing.T) { - if testing.Short() { - t.Logf("disabled test during -short") - return - } - exp, imp := pair(t) - - sendDone := make(chan bool, 1) - exportSend(exp, flowCount, t, sendDone) - - ch := make(chan int) - err := imp.ImportNValues("exportedSend", ch, Recv, 20, -1) - if err != nil { - t.Fatal("importReceive:", err) - } - - testFlow(sendDone, ch, flowCount, t) -} - -// test flow control from importer to exporter. -func TestImportFlowControl(t *testing.T) { - if testing.Short() { - t.Logf("disabled test during -short") - return - } - exp, imp := pair(t) - - ch := make(chan int) - err := exp.Export("exportedRecv", ch, Recv) - if err != nil { - t.Fatal("importReceive:", err) - } - - sendDone := make(chan bool, 1) - importSend(imp, flowCount, t, sendDone) - testFlow(sendDone, ch, flowCount, t) -} - -func testFlow(sendDone chan bool, ch <-chan int, N int, t *testing.T) { - go func() { - time.Sleep(500 * time.Millisecond) - sendDone <- false - }() - - if <-sendDone { - t.Fatal("send did not block") - } - n := 0 - for i := range ch { - t.Log("after blocking, got value ", i) - n++ - } - if n != N { - t.Fatalf("expected %d values; got %d", N, n) - } -} - -func pair(t *testing.T) (*Exporter, *Importer) { - c0, c1 := net.Pipe() - exp := NewExporter() - go exp.ServeConn(c0) - imp := NewImporter(c1) - return exp, imp -} |