aboutsummaryrefslogtreecommitdiff
path: root/libgo/go/old
diff options
context:
space:
mode:
Diffstat (limited to 'libgo/go/old')
-rw-r--r--libgo/go/old/netchan/common.go338
-rw-r--r--libgo/go/old/netchan/export.go400
-rw-r--r--libgo/go/old/netchan/import.go287
-rw-r--r--libgo/go/old/netchan/netchan_test.go447
4 files changed, 0 insertions, 1472 deletions
diff --git a/libgo/go/old/netchan/common.go b/libgo/go/old/netchan/common.go
deleted file mode 100644
index d0daf53..0000000
--- a/libgo/go/old/netchan/common.go
+++ /dev/null
@@ -1,338 +0,0 @@
-// Copyright 2010 The Go Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package netchan
-
-import (
- "encoding/gob"
- "errors"
- "io"
- "reflect"
- "sync"
- "time"
-)
-
-// The direction of a connection from the client's perspective.
-type Dir int
-
-const (
- Recv Dir = iota
- Send
-)
-
-func (dir Dir) String() string {
- switch dir {
- case Recv:
- return "Recv"
- case Send:
- return "Send"
- }
- return "???"
-}
-
-// Payload types
-const (
- payRequest = iota // request structure follows
- payError // error structure follows
- payData // user payload follows
- payAck // acknowledgement; no payload
- payClosed // channel is now closed
- payAckSend // payload has been delivered.
-)
-
-// A header is sent as a prefix to every transmission. It will be followed by
-// a request structure, an error structure, or an arbitrary user payload structure.
-type header struct {
- Id int
- PayloadType int
- SeqNum int64
-}
-
-// Sent with a header once per channel from importer to exporter to report
-// that it wants to bind to a channel with the specified direction for count
-// messages, with space for size buffered values. If count is -1, it means unlimited.
-type request struct {
- Name string
- Count int64
- Size int
- Dir Dir
-}
-
-// Sent with a header to report an error.
-type error_ struct {
- Error string
-}
-
-// Used to unify management of acknowledgements for import and export.
-type unackedCounter interface {
- unackedCount() int64
- ack() int64
- seq() int64
-}
-
-// A channel and its direction.
-type chanDir struct {
- ch reflect.Value
- dir Dir
-}
-
-// clientSet contains the objects and methods needed for tracking
-// clients of an exporter and draining outstanding messages.
-type clientSet struct {
- mu sync.Mutex // protects access to channel and client maps
- names map[string]*chanDir
- clients map[unackedCounter]bool
-}
-
-// Mutex-protected encoder and decoder pair.
-type encDec struct {
- decLock sync.Mutex
- dec *gob.Decoder
- encLock sync.Mutex
- enc *gob.Encoder
-}
-
-func newEncDec(conn io.ReadWriter) *encDec {
- return &encDec{
- dec: gob.NewDecoder(conn),
- enc: gob.NewEncoder(conn),
- }
-}
-
-// Decode an item from the connection.
-func (ed *encDec) decode(value reflect.Value) error {
- ed.decLock.Lock()
- err := ed.dec.DecodeValue(value)
- if err != nil {
- // TODO: tear down connection?
- }
- ed.decLock.Unlock()
- return err
-}
-
-// Encode a header and payload onto the connection.
-func (ed *encDec) encode(hdr *header, payloadType int, payload interface{}) error {
- ed.encLock.Lock()
- hdr.PayloadType = payloadType
- err := ed.enc.Encode(hdr)
- if err == nil {
- if payload != nil {
- err = ed.enc.Encode(payload)
- }
- }
- if err != nil {
- // TODO: tear down connection if there is an error?
- }
- ed.encLock.Unlock()
- return err
-}
-
-// See the comment for Exporter.Drain.
-func (cs *clientSet) drain(timeout time.Duration) error {
- deadline := time.Now().Add(timeout)
- for {
- pending := false
- cs.mu.Lock()
- // Any messages waiting for a client?
- for _, chDir := range cs.names {
- if chDir.ch.Len() > 0 {
- pending = true
- }
- }
- // Any unacknowledged messages?
- for client := range cs.clients {
- n := client.unackedCount()
- if n > 0 { // Check for > rather than != just to be safe.
- pending = true
- break
- }
- }
- cs.mu.Unlock()
- if !pending {
- break
- }
- if timeout > 0 && time.Now().After(deadline) {
- return errors.New("timeout")
- }
- time.Sleep(100 * time.Millisecond)
- }
- return nil
-}
-
-// See the comment for Exporter.Sync.
-func (cs *clientSet) sync(timeout time.Duration) error {
- deadline := time.Now().Add(timeout)
- // seq remembers the clients and their seqNum at point of entry.
- seq := make(map[unackedCounter]int64)
- cs.mu.Lock()
- for client := range cs.clients {
- seq[client] = client.seq()
- }
- cs.mu.Unlock()
- for {
- pending := false
- cs.mu.Lock()
- // Any unacknowledged messages? Look only at clients that existed
- // when we started and are still in this client set.
- for client := range seq {
- if _, ok := cs.clients[client]; ok {
- if client.ack() < seq[client] {
- pending = true
- break
- }
- }
- }
- cs.mu.Unlock()
- if !pending {
- break
- }
- if timeout > 0 && time.Now().After(deadline) {
- return errors.New("timeout")
- }
- time.Sleep(100 * time.Millisecond)
- }
- return nil
-}
-
-// A netChan represents a channel imported or exported
-// on a single connection. Flow is controlled by the receiving
-// side by sending payAckSend messages when values
-// are delivered into the local channel.
-type netChan struct {
- *chanDir
- name string
- id int
- size int // buffer size of channel.
- closed bool
-
- // sender-specific state
- ackCh chan bool // buffered with space for all the acks we need
- space int // available space.
-
- // receiver-specific state
- sendCh chan reflect.Value // buffered channel of values received from other end.
- ed *encDec // so that we can send acks.
- count int64 // number of values still to receive.
-}
-
-// Create a new netChan with the given name (only used for
-// messages), id, direction, buffer size, and count.
-// The connection to the other side is represented by ed.
-func newNetChan(name string, id int, ch *chanDir, ed *encDec, size int, count int64) *netChan {
- c := &netChan{chanDir: ch, name: name, id: id, size: size, ed: ed, count: count}
- if c.dir == Send {
- c.ackCh = make(chan bool, size)
- c.space = size
- }
- return c
-}
-
-// Close the channel.
-func (nch *netChan) close() {
- if nch.closed {
- return
- }
- if nch.dir == Recv {
- if nch.sendCh != nil {
- // If the sender goroutine is active, close the channel to it.
- // It will close nch.ch when it can.
- close(nch.sendCh)
- } else {
- nch.ch.Close()
- }
- } else {
- nch.ch.Close()
- close(nch.ackCh)
- }
- nch.closed = true
-}
-
-// Send message from remote side to local receiver.
-func (nch *netChan) send(val reflect.Value) {
- if nch.dir != Recv {
- panic("send on wrong direction of channel")
- }
- if nch.sendCh == nil {
- // If possible, do local send directly and ack immediately.
- if nch.ch.TrySend(val) {
- nch.sendAck()
- return
- }
- // Start sender goroutine to manage delayed delivery of values.
- nch.sendCh = make(chan reflect.Value, nch.size)
- go nch.sender()
- }
- select {
- case nch.sendCh <- val:
- // ok
- default:
- // TODO: should this be more resilient?
- panic("netchan: remote sender sent more values than allowed")
- }
-}
-
-// sendAck sends an acknowledgment that a message has left
-// the channel's buffer. If the messages remaining to be sent
-// will fit in the channel's buffer, then we don't
-// need to send an ack.
-func (nch *netChan) sendAck() {
- if nch.count < 0 || nch.count > int64(nch.size) {
- nch.ed.encode(&header{Id: nch.id}, payAckSend, nil)
- }
- if nch.count > 0 {
- nch.count--
- }
-}
-
-// The sender process forwards items from the sending queue
-// to the destination channel, acknowledging each item.
-func (nch *netChan) sender() {
- if nch.dir != Recv {
- panic("sender on wrong direction of channel")
- }
- // When Exporter.Hangup is called, the underlying channel is closed,
- // and so we may get a "too many operations on closed channel" error
- // if there are outstanding messages in sendCh.
- // Make sure that this doesn't panic the whole program.
- defer func() {
- if r := recover(); r != nil {
- // TODO check that r is "too many operations", otherwise re-panic.
- }
- }()
- for v := range nch.sendCh {
- nch.ch.Send(v)
- nch.sendAck()
- }
- nch.ch.Close()
-}
-
-// Receive value from local side for sending to remote side.
-func (nch *netChan) recv() (val reflect.Value, ok bool) {
- if nch.dir != Send {
- panic("recv on wrong direction of channel")
- }
-
- if nch.space == 0 {
- // Wait for buffer space.
- <-nch.ackCh
- nch.space++
- }
- nch.space--
- return nch.ch.Recv()
-}
-
-// acked is called when the remote side indicates that
-// a value has been delivered.
-func (nch *netChan) acked() {
- if nch.dir != Send {
- panic("recv on wrong direction of channel")
- }
- select {
- case nch.ackCh <- true:
- // ok
- default:
- // TODO: should this be more resilient?
- panic("netchan: remote receiver sent too many acks")
- }
-}
diff --git a/libgo/go/old/netchan/export.go b/libgo/go/old/netchan/export.go
deleted file mode 100644
index a65b260..0000000
--- a/libgo/go/old/netchan/export.go
+++ /dev/null
@@ -1,400 +0,0 @@
-// Copyright 2010 The Go Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-/*
- Package netchan implements type-safe networked channels:
- it allows the two ends of a channel to appear on different
- computers connected by a network. It does this by transporting
- data sent to a channel on one machine so it can be recovered
- by a receive of a channel of the same type on the other.
-
- An exporter publishes a set of channels by name. An importer
- connects to the exporting machine and imports the channels
- by name. After importing the channels, the two machines can
- use the channels in the usual way.
-
- Networked channels are not synchronized; they always behave
- as if they are buffered channels of at least one element.
-*/
-package netchan
-
-// BUG: can't use range clause to receive when using ImportNValues to limit the count.
-
-import (
- "errors"
- "io"
- "log"
- "net"
- "reflect"
- "strconv"
- "sync"
- "time"
-)
-
-// Export
-
-// expLog is a logging convenience function. The first argument must be a string.
-func expLog(args ...interface{}) {
- args[0] = "netchan export: " + args[0].(string)
- log.Print(args...)
-}
-
-// An Exporter allows a set of channels to be published on a single
-// network port. A single machine may have multiple Exporters
-// but they must use different ports.
-type Exporter struct {
- *clientSet
-}
-
-type expClient struct {
- *encDec
- exp *Exporter
- chans map[int]*netChan // channels in use by client
- mu sync.Mutex // protects remaining fields
- errored bool // client has been sent an error
- seqNum int64 // sequences messages sent to client; has value of highest sent
- ackNum int64 // highest sequence number acknowledged
- seqLock sync.Mutex // guarantees messages are in sequence, only locked under mu
-}
-
-func newClient(exp *Exporter, conn io.ReadWriter) *expClient {
- client := new(expClient)
- client.exp = exp
- client.encDec = newEncDec(conn)
- client.seqNum = 0
- client.ackNum = 0
- client.chans = make(map[int]*netChan)
- return client
-}
-
-func (client *expClient) sendError(hdr *header, err string) {
- error := &error_{err}
- expLog("sending error to client:", error.Error)
- client.encode(hdr, payError, error) // ignore any encode error, hope client gets it
- client.mu.Lock()
- client.errored = true
- client.mu.Unlock()
-}
-
-func (client *expClient) newChan(hdr *header, dir Dir, name string, size int, count int64) *netChan {
- exp := client.exp
- exp.mu.Lock()
- ech, ok := exp.names[name]
- exp.mu.Unlock()
- if !ok {
- client.sendError(hdr, "no such channel: "+name)
- return nil
- }
- if ech.dir != dir {
- client.sendError(hdr, "wrong direction for channel: "+name)
- return nil
- }
- nch := newNetChan(name, hdr.Id, ech, client.encDec, size, count)
- client.chans[hdr.Id] = nch
- return nch
-}
-
-func (client *expClient) getChan(hdr *header, dir Dir) *netChan {
- nch := client.chans[hdr.Id]
- if nch == nil {
- return nil
- }
- if nch.dir != dir {
- client.sendError(hdr, "wrong direction for channel: "+nch.name)
- }
- return nch
-}
-
-// The function run manages sends and receives for a single client. For each
-// (client Recv) request, this will launch a serveRecv goroutine to deliver
-// the data for that channel, while (client Send) requests are handled as
-// data arrives from the client.
-func (client *expClient) run() {
- hdr := new(header)
- hdrValue := reflect.ValueOf(hdr)
- req := new(request)
- reqValue := reflect.ValueOf(req)
- error := new(error_)
- for {
- *hdr = header{}
- if err := client.decode(hdrValue); err != nil {
- if err != io.EOF {
- expLog("error decoding client header:", err)
- }
- break
- }
- switch hdr.PayloadType {
- case payRequest:
- *req = request{}
- if err := client.decode(reqValue); err != nil {
- expLog("error decoding client request:", err)
- break
- }
- if req.Size < 1 {
- panic("netchan: remote requested " + strconv.Itoa(req.Size) + " values")
- }
- switch req.Dir {
- case Recv:
- // look up channel before calling serveRecv to
- // avoid a lock around client.chans.
- if nch := client.newChan(hdr, Send, req.Name, req.Size, req.Count); nch != nil {
- go client.serveRecv(nch, *hdr, req.Count)
- }
- case Send:
- client.newChan(hdr, Recv, req.Name, req.Size, req.Count)
- // The actual sends will have payload type payData.
- // TODO: manage the count?
- default:
- error.Error = "request: can't handle channel direction"
- expLog(error.Error, req.Dir)
- client.encode(hdr, payError, error)
- }
- case payData:
- client.serveSend(*hdr)
- case payClosed:
- client.serveClosed(*hdr)
- case payAck:
- client.mu.Lock()
- if client.ackNum != hdr.SeqNum-1 {
- // Since the sequence number is incremented and the message is sent
- // in a single instance of locking client.mu, the messages are guaranteed
- // to be sent in order. Therefore receipt of acknowledgement N means
- // all messages <=N have been seen by the recipient. We check anyway.
- expLog("sequence out of order:", client.ackNum, hdr.SeqNum)
- }
- if client.ackNum < hdr.SeqNum { // If there has been an error, don't back up the count.
- client.ackNum = hdr.SeqNum
- }
- client.mu.Unlock()
- case payAckSend:
- if nch := client.getChan(hdr, Send); nch != nil {
- nch.acked()
- }
- default:
- log.Fatal("netchan export: unknown payload type", hdr.PayloadType)
- }
- }
- client.exp.delClient(client)
-}
-
-// Send all the data on a single channel to a client asking for a Recv.
-// The header is passed by value to avoid issues of overwriting.
-func (client *expClient) serveRecv(nch *netChan, hdr header, count int64) {
- for {
- val, ok := nch.recv()
- if !ok {
- if err := client.encode(&hdr, payClosed, nil); err != nil {
- expLog("error encoding server closed message:", err)
- }
- break
- }
- // We hold the lock during transmission to guarantee messages are
- // sent in sequence number order. Also, we increment first so the
- // value of client.SeqNum is the value of the highest used sequence
- // number, not one beyond.
- client.mu.Lock()
- client.seqNum++
- hdr.SeqNum = client.seqNum
- client.seqLock.Lock() // guarantee ordering of messages
- client.mu.Unlock()
- err := client.encode(&hdr, payData, val.Interface())
- client.seqLock.Unlock()
- if err != nil {
- expLog("error encoding client response:", err)
- client.sendError(&hdr, err.Error())
- break
- }
- // Negative count means run forever.
- if count >= 0 {
- if count--; count <= 0 {
- break
- }
- }
- }
-}
-
-// Receive and deliver locally one item from a client asking for a Send
-// The header is passed by value to avoid issues of overwriting.
-func (client *expClient) serveSend(hdr header) {
- nch := client.getChan(&hdr, Recv)
- if nch == nil {
- return
- }
- // Create a new value for each received item.
- val := reflect.New(nch.ch.Type().Elem()).Elem()
- if err := client.decode(val); err != nil {
- expLog("value decode:", err, "; type ", nch.ch.Type())
- return
- }
- nch.send(val)
-}
-
-// Report that client has closed the channel that is sending to us.
-// The header is passed by value to avoid issues of overwriting.
-func (client *expClient) serveClosed(hdr header) {
- nch := client.getChan(&hdr, Recv)
- if nch == nil {
- return
- }
- nch.close()
-}
-
-func (client *expClient) unackedCount() int64 {
- client.mu.Lock()
- n := client.seqNum - client.ackNum
- client.mu.Unlock()
- return n
-}
-
-func (client *expClient) seq() int64 {
- client.mu.Lock()
- n := client.seqNum
- client.mu.Unlock()
- return n
-}
-
-func (client *expClient) ack() int64 {
- client.mu.Lock()
- n := client.seqNum
- client.mu.Unlock()
- return n
-}
-
-// Serve waits for incoming connections on the listener
-// and serves the Exporter's channels on each.
-// It blocks until the listener is closed.
-func (exp *Exporter) Serve(listener net.Listener) {
- for {
- conn, err := listener.Accept()
- if err != nil {
- expLog("listen:", err)
- break
- }
- go exp.ServeConn(conn)
- }
-}
-
-// ServeConn exports the Exporter's channels on conn.
-// It blocks until the connection is terminated.
-func (exp *Exporter) ServeConn(conn io.ReadWriter) {
- exp.addClient(conn).run()
-}
-
-// NewExporter creates a new Exporter that exports a set of channels.
-func NewExporter() *Exporter {
- e := &Exporter{
- clientSet: &clientSet{
- names: make(map[string]*chanDir),
- clients: make(map[unackedCounter]bool),
- },
- }
- return e
-}
-
-// ListenAndServe exports the exporter's channels through the
-// given network and local address defined as in net.Listen.
-func (exp *Exporter) ListenAndServe(network, localaddr string) error {
- listener, err := net.Listen(network, localaddr)
- if err != nil {
- return err
- }
- go exp.Serve(listener)
- return nil
-}
-
-// addClient creates a new expClient and records its existence
-func (exp *Exporter) addClient(conn io.ReadWriter) *expClient {
- client := newClient(exp, conn)
- exp.mu.Lock()
- exp.clients[client] = true
- exp.mu.Unlock()
- return client
-}
-
-// delClient forgets the client existed
-func (exp *Exporter) delClient(client *expClient) {
- exp.mu.Lock()
- delete(exp.clients, client)
- exp.mu.Unlock()
-}
-
-// Drain waits until all messages sent from this exporter/importer, including
-// those not yet sent to any client and possibly including those sent while
-// Drain was executing, have been received by the importer. In short, it
-// waits until all the exporter's messages have been received by a client.
-// If the timeout is positive and Drain takes longer than that to complete,
-// an error is returned.
-func (exp *Exporter) Drain(timeout time.Duration) error {
- // This wrapper function is here so the method's comment will appear in godoc.
- return exp.clientSet.drain(timeout)
-}
-
-// Sync waits until all clients of the exporter have received the messages
-// that were sent at the time Sync was invoked. Unlike Drain, it does not
-// wait for messages sent while it is running or messages that have not been
-// dispatched to any client. If the timeout is positive and Sync takes longer
-// than that to complete, an error is returned.
-func (exp *Exporter) Sync(timeout time.Duration) error {
- // This wrapper function is here so the method's comment will appear in godoc.
- return exp.clientSet.sync(timeout)
-}
-
-func checkChan(chT interface{}, dir Dir) (reflect.Value, error) {
- chanType := reflect.TypeOf(chT)
- if chanType.Kind() != reflect.Chan {
- return reflect.Value{}, errors.New("not a channel")
- }
- if dir != Send && dir != Recv {
- return reflect.Value{}, errors.New("unknown channel direction")
- }
- switch chanType.ChanDir() {
- case reflect.BothDir:
- case reflect.SendDir:
- if dir != Recv {
- return reflect.Value{}, errors.New("to import/export with Send, must provide <-chan")
- }
- case reflect.RecvDir:
- if dir != Send {
- return reflect.Value{}, errors.New("to import/export with Recv, must provide chan<-")
- }
- }
- return reflect.ValueOf(chT), nil
-}
-
-// Export exports a channel of a given type and specified direction. The
-// channel to be exported is provided in the call and may be of arbitrary
-// channel type.
-// Despite the literal signature, the effective signature is
-// Export(name string, chT chan T, dir Dir)
-func (exp *Exporter) Export(name string, chT interface{}, dir Dir) error {
- ch, err := checkChan(chT, dir)
- if err != nil {
- return err
- }
- exp.mu.Lock()
- defer exp.mu.Unlock()
- _, present := exp.names[name]
- if present {
- return errors.New("channel name already being exported:" + name)
- }
- exp.names[name] = &chanDir{ch, dir}
- return nil
-}
-
-// Hangup disassociates the named channel from the Exporter and closes
-// the channel. Messages in flight for the channel may be dropped.
-func (exp *Exporter) Hangup(name string) error {
- exp.mu.Lock()
- chDir, ok := exp.names[name]
- if ok {
- delete(exp.names, name)
- }
- // TODO drop all instances of channel from client sets
- exp.mu.Unlock()
- if !ok {
- return errors.New("netchan export: hangup: no such channel: " + name)
- }
- chDir.ch.Close()
- return nil
-}
diff --git a/libgo/go/old/netchan/import.go b/libgo/go/old/netchan/import.go
deleted file mode 100644
index 50abaa9..0000000
--- a/libgo/go/old/netchan/import.go
+++ /dev/null
@@ -1,287 +0,0 @@
-// Copyright 2010 The Go Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package netchan
-
-import (
- "errors"
- "io"
- "log"
- "net"
- "reflect"
- "sync"
- "time"
-)
-
-// Import
-
-// impLog is a logging convenience function. The first argument must be a string.
-func impLog(args ...interface{}) {
- args[0] = "netchan import: " + args[0].(string)
- log.Print(args...)
-}
-
-// An Importer allows a set of channels to be imported from a single
-// remote machine/network port. A machine may have multiple
-// importers, even from the same machine/network port.
-type Importer struct {
- *encDec
- chanLock sync.Mutex // protects access to channel map
- names map[string]*netChan
- chans map[int]*netChan
- errors chan error
- maxId int
- mu sync.Mutex // protects remaining fields
- unacked int64 // number of unacknowledged sends.
- seqLock sync.Mutex // guarantees messages are in sequence, only locked under mu
-}
-
-// NewImporter creates a new Importer object to import a set of channels
-// from the given connection. The Exporter must be available and serving when
-// the Importer is created.
-func NewImporter(conn io.ReadWriter) *Importer {
- imp := new(Importer)
- imp.encDec = newEncDec(conn)
- imp.chans = make(map[int]*netChan)
- imp.names = make(map[string]*netChan)
- imp.errors = make(chan error, 10)
- imp.unacked = 0
- go imp.run()
- return imp
-}
-
-// Import imports a set of channels from the given network and address.
-func Import(network, remoteaddr string) (*Importer, error) {
- conn, err := net.Dial(network, remoteaddr)
- if err != nil {
- return nil, err
- }
- return NewImporter(conn), nil
-}
-
-// shutdown closes all channels for which we are receiving data from the remote side.
-func (imp *Importer) shutdown() {
- imp.chanLock.Lock()
- for _, ich := range imp.chans {
- if ich.dir == Recv {
- ich.close()
- }
- }
- imp.chanLock.Unlock()
-}
-
-// Handle the data from a single imported data stream, which will
-// have the form
-// (response, data)*
-// The response identifies by name which channel is transmitting data.
-func (imp *Importer) run() {
- // Loop on responses; requests are sent by ImportNValues()
- hdr := new(header)
- hdrValue := reflect.ValueOf(hdr)
- ackHdr := new(header)
- err := new(error_)
- errValue := reflect.ValueOf(err)
- for {
- *hdr = header{}
- if e := imp.decode(hdrValue); e != nil {
- if e != io.EOF {
- impLog("header:", e)
- imp.shutdown()
- }
- return
- }
- switch hdr.PayloadType {
- case payData:
- // done lower in loop
- case payError:
- if e := imp.decode(errValue); e != nil {
- impLog("error:", e)
- return
- }
- if err.Error != "" {
- impLog("response error:", err.Error)
- select {
- case imp.errors <- errors.New(err.Error):
- continue // errors are not acknowledged
- default:
- imp.shutdown()
- return
- }
- }
- case payClosed:
- nch := imp.getChan(hdr.Id, false)
- if nch != nil {
- nch.close()
- }
- continue // closes are not acknowledged.
- case payAckSend:
- // we can receive spurious acks if the channel is
- // hung up, so we ask getChan to ignore any errors.
- nch := imp.getChan(hdr.Id, true)
- if nch != nil {
- nch.acked()
- imp.mu.Lock()
- imp.unacked--
- imp.mu.Unlock()
- }
- continue
- default:
- impLog("unexpected payload type:", hdr.PayloadType)
- return
- }
- nch := imp.getChan(hdr.Id, false)
- if nch == nil {
- continue
- }
- if nch.dir != Recv {
- impLog("cannot happen: receive from non-Recv channel")
- return
- }
- // Acknowledge receipt
- ackHdr.Id = hdr.Id
- ackHdr.SeqNum = hdr.SeqNum
- imp.encode(ackHdr, payAck, nil)
- // Create a new value for each received item.
- value := reflect.New(nch.ch.Type().Elem()).Elem()
- if e := imp.decode(value); e != nil {
- impLog("importer value decode:", e)
- return
- }
- nch.send(value)
- }
-}
-
-func (imp *Importer) getChan(id int, errOk bool) *netChan {
- imp.chanLock.Lock()
- ich := imp.chans[id]
- imp.chanLock.Unlock()
- if ich == nil {
- if !errOk {
- impLog("unknown id in netchan request: ", id)
- }
- return nil
- }
- return ich
-}
-
-// Errors returns a channel from which transmission and protocol errors
-// can be read. Clients of the importer are not required to read the error
-// channel for correct execution. However, if too many errors occur
-// without being read from the error channel, the importer will shut down.
-func (imp *Importer) Errors() chan error {
- return imp.errors
-}
-
-// Import imports a channel of the given type, size and specified direction.
-// It is equivalent to ImportNValues with a count of -1, meaning unbounded.
-func (imp *Importer) Import(name string, chT interface{}, dir Dir, size int) error {
- return imp.ImportNValues(name, chT, dir, size, -1)
-}
-
-// ImportNValues imports a channel of the given type and specified
-// direction and then receives or transmits up to n values on that
-// channel. A value of n==-1 implies an unbounded number of values. The
-// channel will have buffer space for size values, or 1 value if size < 1.
-// The channel to be bound to the remote site's channel is provided
-// in the call and may be of arbitrary channel type.
-// Despite the literal signature, the effective signature is
-// ImportNValues(name string, chT chan T, dir Dir, size, n int) error
-// Example usage:
-// imp, err := NewImporter("tcp", "netchanserver.mydomain.com:1234")
-// if err != nil { log.Fatal(err) }
-// ch := make(chan myType)
-// err = imp.ImportNValues("name", ch, Recv, 1, 1)
-// if err != nil { log.Fatal(err) }
-// fmt.Printf("%+v\n", <-ch)
-func (imp *Importer) ImportNValues(name string, chT interface{}, dir Dir, size, n int) error {
- ch, err := checkChan(chT, dir)
- if err != nil {
- return err
- }
- imp.chanLock.Lock()
- defer imp.chanLock.Unlock()
- _, present := imp.names[name]
- if present {
- return errors.New("channel name already being imported:" + name)
- }
- if size < 1 {
- size = 1
- }
- id := imp.maxId
- imp.maxId++
- nch := newNetChan(name, id, &chanDir{ch, dir}, imp.encDec, size, int64(n))
- imp.names[name] = nch
- imp.chans[id] = nch
- // Tell the other side about this channel.
- hdr := &header{Id: id}
- req := &request{Name: name, Count: int64(n), Dir: dir, Size: size}
- if err = imp.encode(hdr, payRequest, req); err != nil {
- impLog("request encode:", err)
- return err
- }
- if dir == Send {
- go func() {
- for i := 0; n == -1 || i < n; i++ {
- val, ok := nch.recv()
- if !ok {
- if err = imp.encode(hdr, payClosed, nil); err != nil {
- impLog("error encoding client closed message:", err)
- }
- return
- }
- // We hold the lock during transmission to guarantee messages are
- // sent in order.
- imp.mu.Lock()
- imp.unacked++
- imp.seqLock.Lock()
- imp.mu.Unlock()
- if err = imp.encode(hdr, payData, val.Interface()); err != nil {
- impLog("error encoding client send:", err)
- return
- }
- imp.seqLock.Unlock()
- }
- }()
- }
- return nil
-}
-
-// Hangup disassociates the named channel from the Importer and closes
-// the channel. Messages in flight for the channel may be dropped.
-func (imp *Importer) Hangup(name string) error {
- imp.chanLock.Lock()
- defer imp.chanLock.Unlock()
- nc := imp.names[name]
- if nc == nil {
- return errors.New("netchan import: hangup: no such channel: " + name)
- }
- delete(imp.names, name)
- delete(imp.chans, nc.id)
- nc.close()
- return nil
-}
-
-func (imp *Importer) unackedCount() int64 {
- imp.mu.Lock()
- n := imp.unacked
- imp.mu.Unlock()
- return n
-}
-
-// Drain waits until all messages sent from this exporter/importer, including
-// those not yet sent to any server and possibly including those sent while
-// Drain was executing, have been received by the exporter. In short, it
-// waits until all the importer's messages have been received.
-// If the timeout (measured in nanoseconds) is positive and Drain takes
-// longer than that to complete, an error is returned.
-func (imp *Importer) Drain(timeout int64) error {
- deadline := time.Now().Add(time.Duration(timeout))
- for imp.unackedCount() > 0 {
- if timeout > 0 && time.Now().After(deadline) {
- return errors.New("timeout")
- }
- time.Sleep(100 * time.Millisecond)
- }
- return nil
-}
diff --git a/libgo/go/old/netchan/netchan_test.go b/libgo/go/old/netchan/netchan_test.go
deleted file mode 100644
index 9a7c076..0000000
--- a/libgo/go/old/netchan/netchan_test.go
+++ /dev/null
@@ -1,447 +0,0 @@
-// Copyright 2010 The Go Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package netchan
-
-import (
- "net"
- "strings"
- "testing"
- "time"
-)
-
-const count = 10 // number of items in most tests
-const closeCount = 5 // number of items when sender closes early
-
-const base = 23
-
-func exportSend(exp *Exporter, n int, t *testing.T, done chan bool) {
- ch := make(chan int)
- err := exp.Export("exportedSend", ch, Send)
- if err != nil {
- t.Fatal("exportSend:", err)
- }
- go func() {
- for i := 0; i < n; i++ {
- ch <- base + i
- }
- close(ch)
- if done != nil {
- done <- true
- }
- }()
-}
-
-func exportReceive(exp *Exporter, t *testing.T, expDone chan bool) {
- ch := make(chan int)
- err := exp.Export("exportedRecv", ch, Recv)
- expDone <- true
- if err != nil {
- t.Fatal("exportReceive:", err)
- }
- for i := 0; i < count; i++ {
- v, ok := <-ch
- if !ok {
- if i != closeCount {
- t.Errorf("exportReceive expected close at %d; got one at %d", closeCount, i)
- }
- break
- }
- if v != base+i {
- t.Errorf("export Receive: bad value: expected %d+%d=%d; got %d", base, i, base+i, v)
- }
- }
-}
-
-func importSend(imp *Importer, n int, t *testing.T, done chan bool) {
- ch := make(chan int)
- err := imp.ImportNValues("exportedRecv", ch, Send, 3, -1)
- if err != nil {
- t.Fatal("importSend:", err)
- }
- go func() {
- for i := 0; i < n; i++ {
- ch <- base + i
- }
- close(ch)
- if done != nil {
- done <- true
- }
- }()
-}
-
-func importReceive(imp *Importer, t *testing.T, done chan bool) {
- ch := make(chan int)
- err := imp.ImportNValues("exportedSend", ch, Recv, 3, count)
- if err != nil {
- t.Fatal("importReceive:", err)
- }
- for i := 0; i < count; i++ {
- v, ok := <-ch
- if !ok {
- if i != closeCount {
- t.Errorf("importReceive expected close at %d; got one at %d", closeCount, i)
- }
- break
- }
- if v != base+i {
- t.Errorf("importReceive: bad value: expected %d+%d=%d; got %+d", base, i, base+i, v)
- }
- }
- if done != nil {
- done <- true
- }
-}
-
-func TestExportSendImportReceive(t *testing.T) {
- exp, imp := pair(t)
- exportSend(exp, count, t, nil)
- importReceive(imp, t, nil)
-}
-
-func TestExportReceiveImportSend(t *testing.T) {
- exp, imp := pair(t)
- expDone := make(chan bool)
- done := make(chan bool)
- go func() {
- exportReceive(exp, t, expDone)
- done <- true
- }()
- <-expDone
- importSend(imp, count, t, nil)
- <-done
-}
-
-func TestClosingExportSendImportReceive(t *testing.T) {
- exp, imp := pair(t)
- exportSend(exp, closeCount, t, nil)
- importReceive(imp, t, nil)
-}
-
-func TestClosingImportSendExportReceive(t *testing.T) {
- exp, imp := pair(t)
- expDone := make(chan bool)
- done := make(chan bool)
- go func() {
- exportReceive(exp, t, expDone)
- done <- true
- }()
- <-expDone
- importSend(imp, closeCount, t, nil)
- <-done
-}
-
-func TestErrorForIllegalChannel(t *testing.T) {
- exp, imp := pair(t)
- // Now export a channel.
- ch := make(chan int, 1)
- err := exp.Export("aChannel", ch, Send)
- if err != nil {
- t.Fatal("export:", err)
- }
- ch <- 1234
- close(ch)
- // Now try to import a different channel.
- ch = make(chan int)
- err = imp.Import("notAChannel", ch, Recv, 1)
- if err != nil {
- t.Fatal("import:", err)
- }
- // Expect an error now. Start a timeout.
- timeout := make(chan bool, 1) // buffered so closure will not hang around.
- go func() {
- time.Sleep(10 * time.Second) // very long, to give even really slow machines a chance.
- timeout <- true
- }()
- select {
- case err = <-imp.Errors():
- if strings.Index(err.Error(), "no such channel") < 0 {
- t.Error("wrong error for nonexistent channel:", err)
- }
- case <-timeout:
- t.Error("import of nonexistent channel did not receive an error")
- }
-}
-
-// Not a great test but it does at least invoke Drain.
-func TestExportDrain(t *testing.T) {
- exp, imp := pair(t)
- done := make(chan bool)
- go func() {
- exportSend(exp, closeCount, t, nil)
- done <- true
- }()
- <-done
- go importReceive(imp, t, done)
- exp.Drain(0)
- <-done
-}
-
-// Not a great test but it does at least invoke Drain.
-func TestImportDrain(t *testing.T) {
- exp, imp := pair(t)
- expDone := make(chan bool)
- go exportReceive(exp, t, expDone)
- <-expDone
- importSend(imp, closeCount, t, nil)
- imp.Drain(0)
-}
-
-// Not a great test but it does at least invoke Sync.
-func TestExportSync(t *testing.T) {
- exp, imp := pair(t)
- done := make(chan bool)
- exportSend(exp, closeCount, t, nil)
- go importReceive(imp, t, done)
- exp.Sync(0)
- <-done
-}
-
-// Test hanging up the send side of an export.
-// TODO: test hanging up the receive side of an export.
-func TestExportHangup(t *testing.T) {
- exp, imp := pair(t)
- ech := make(chan int)
- err := exp.Export("exportedSend", ech, Send)
- if err != nil {
- t.Fatal("export:", err)
- }
- // Prepare to receive two values. We'll actually deliver only one.
- ich := make(chan int)
- err = imp.ImportNValues("exportedSend", ich, Recv, 1, 2)
- if err != nil {
- t.Fatal("import exportedSend:", err)
- }
- // Send one value, receive it.
- const Value = 1234
- ech <- Value
- v := <-ich
- if v != Value {
- t.Fatal("expected", Value, "got", v)
- }
- // Now hang up the channel. Importer should see it close.
- exp.Hangup("exportedSend")
- v, ok := <-ich
- if ok {
- t.Fatal("expected channel to be closed; got value", v)
- }
-}
-
-// Test hanging up the send side of an import.
-// TODO: test hanging up the receive side of an import.
-func TestImportHangup(t *testing.T) {
- exp, imp := pair(t)
- ech := make(chan int)
- err := exp.Export("exportedRecv", ech, Recv)
- if err != nil {
- t.Fatal("export:", err)
- }
- // Prepare to Send two values. We'll actually deliver only one.
- ich := make(chan int)
- err = imp.ImportNValues("exportedRecv", ich, Send, 1, 2)
- if err != nil {
- t.Fatal("import exportedRecv:", err)
- }
- // Send one value, receive it.
- const Value = 1234
- ich <- Value
- v := <-ech
- if v != Value {
- t.Fatal("expected", Value, "got", v)
- }
- // Now hang up the channel. Exporter should see it close.
- imp.Hangup("exportedRecv")
- v, ok := <-ech
- if ok {
- t.Fatal("expected channel to be closed; got value", v)
- }
-}
-
-// loop back exportedRecv to exportedSend,
-// but receive a value from ctlch before starting the loop.
-func exportLoopback(exp *Exporter, t *testing.T) {
- inch := make(chan int)
- if err := exp.Export("exportedRecv", inch, Recv); err != nil {
- t.Fatal("exportRecv")
- }
-
- outch := make(chan int)
- if err := exp.Export("exportedSend", outch, Send); err != nil {
- t.Fatal("exportSend")
- }
-
- ctlch := make(chan int)
- if err := exp.Export("exportedCtl", ctlch, Recv); err != nil {
- t.Fatal("exportRecv")
- }
-
- go func() {
- <-ctlch
- for i := 0; i < count; i++ {
- x := <-inch
- if x != base+i {
- t.Errorf("exportLoopback expected %d; got %d", i, x)
- }
- outch <- x
- }
- }()
-}
-
-// This test checks that channel operations can proceed
-// even when other concurrent operations are blocked.
-func TestIndependentSends(t *testing.T) {
- if testing.Short() {
- t.Logf("disabled test during -short")
- return
- }
- exp, imp := pair(t)
-
- exportLoopback(exp, t)
-
- importSend(imp, count, t, nil)
- done := make(chan bool)
- go importReceive(imp, t, done)
-
- // wait for export side to try to deliver some values.
- time.Sleep(250 * time.Millisecond)
-
- ctlch := make(chan int)
- if err := imp.ImportNValues("exportedCtl", ctlch, Send, 1, 1); err != nil {
- t.Fatal("importSend:", err)
- }
- ctlch <- 0
-
- <-done
-}
-
-// This test cross-connects a pair of exporter/importer pairs.
-type value struct {
- I int
- Source string
-}
-
-func TestCrossConnect(t *testing.T) {
- e1, i1 := pair(t)
- e2, i2 := pair(t)
-
- crossExport(e1, e2, t)
- crossImport(i1, i2, t)
-}
-
-// Export side of cross-traffic.
-func crossExport(e1, e2 *Exporter, t *testing.T) {
- s := make(chan value)
- err := e1.Export("exportedSend", s, Send)
- if err != nil {
- t.Fatal("exportSend:", err)
- }
-
- r := make(chan value)
- err = e2.Export("exportedReceive", r, Recv)
- if err != nil {
- t.Fatal("exportReceive:", err)
- }
-
- go crossLoop("export", s, r, t)
-}
-
-// Import side of cross-traffic.
-func crossImport(i1, i2 *Importer, t *testing.T) {
- s := make(chan value)
- err := i2.Import("exportedReceive", s, Send, 2)
- if err != nil {
- t.Fatal("import of exportedReceive:", err)
- }
-
- r := make(chan value)
- err = i1.Import("exportedSend", r, Recv, 2)
- if err != nil {
- t.Fatal("import of exported Send:", err)
- }
-
- crossLoop("import", s, r, t)
-}
-
-// Cross-traffic: send and receive 'count' numbers.
-func crossLoop(name string, s, r chan value, t *testing.T) {
- for si, ri := 0, 0; si < count && ri < count; {
- select {
- case s <- value{si, name}:
- si++
- case v := <-r:
- if v.I != ri {
- t.Errorf("loop: bad value: expected %d, hello; got %+v", ri, v)
- }
- ri++
- }
- }
-}
-
-const flowCount = 100
-
-// test flow control from exporter to importer.
-func TestExportFlowControl(t *testing.T) {
- if testing.Short() {
- t.Logf("disabled test during -short")
- return
- }
- exp, imp := pair(t)
-
- sendDone := make(chan bool, 1)
- exportSend(exp, flowCount, t, sendDone)
-
- ch := make(chan int)
- err := imp.ImportNValues("exportedSend", ch, Recv, 20, -1)
- if err != nil {
- t.Fatal("importReceive:", err)
- }
-
- testFlow(sendDone, ch, flowCount, t)
-}
-
-// test flow control from importer to exporter.
-func TestImportFlowControl(t *testing.T) {
- if testing.Short() {
- t.Logf("disabled test during -short")
- return
- }
- exp, imp := pair(t)
-
- ch := make(chan int)
- err := exp.Export("exportedRecv", ch, Recv)
- if err != nil {
- t.Fatal("importReceive:", err)
- }
-
- sendDone := make(chan bool, 1)
- importSend(imp, flowCount, t, sendDone)
- testFlow(sendDone, ch, flowCount, t)
-}
-
-func testFlow(sendDone chan bool, ch <-chan int, N int, t *testing.T) {
- go func() {
- time.Sleep(500 * time.Millisecond)
- sendDone <- false
- }()
-
- if <-sendDone {
- t.Fatal("send did not block")
- }
- n := 0
- for i := range ch {
- t.Log("after blocking, got value ", i)
- n++
- }
- if n != N {
- t.Fatalf("expected %d values; got %d", N, n)
- }
-}
-
-func pair(t *testing.T) (*Exporter, *Importer) {
- c0, c1 := net.Pipe()
- exp := NewExporter()
- go exp.ServeConn(c0)
- imp := NewImporter(c1)
- return exp, imp
-}