diff options
author | Ian Lance Taylor <ian@gcc.gnu.org> | 2011-03-16 23:05:44 +0000 |
---|---|---|
committer | Ian Lance Taylor <ian@gcc.gnu.org> | 2011-03-16 23:05:44 +0000 |
commit | 5133f00ef8baab894d92de1e8b8baae59815a8b6 (patch) | |
tree | 44176975832a3faf1626836e70c97d5edd674122 /libgo/go/rpc | |
parent | f617201f55938fc89b532f2240bdf77bea946471 (diff) | |
download | gcc-5133f00ef8baab894d92de1e8b8baae59815a8b6.zip gcc-5133f00ef8baab894d92de1e8b8baae59815a8b6.tar.gz gcc-5133f00ef8baab894d92de1e8b8baae59815a8b6.tar.bz2 |
Update to current version of Go library (revision 94d654be2064).
From-SVN: r171076
Diffstat (limited to 'libgo/go/rpc')
-rw-r--r-- | libgo/go/rpc/client.go | 78 | ||||
-rw-r--r-- | libgo/go/rpc/debug.go | 2 | ||||
-rw-r--r-- | libgo/go/rpc/jsonrpc/client.go | 3 | ||||
-rw-r--r-- | libgo/go/rpc/jsonrpc/server.go | 3 | ||||
-rw-r--r-- | libgo/go/rpc/server.go | 100 | ||||
-rw-r--r-- | libgo/go/rpc/server_test.go | 117 |
6 files changed, 164 insertions, 139 deletions
diff --git a/libgo/go/rpc/client.go b/libgo/go/rpc/client.go index 601c497..6de6d13 100644 --- a/libgo/go/rpc/client.go +++ b/libgo/go/rpc/client.go @@ -15,6 +15,16 @@ import ( "sync" ) +// ServerError represents an error that has been returned from +// the remote side of the RPC connection. +type ServerError string + +func (e ServerError) String() string { + return string(e) +} + +const ErrShutdown = os.ErrorString("connection is shut down") + // Call represents an active RPC. type Call struct { ServiceMethod string // The name of the service and method to call. @@ -30,12 +40,12 @@ type Call struct { // with a single Client. type Client struct { mutex sync.Mutex // protects pending, seq - shutdown os.Error // non-nil if the client is shut down sending sync.Mutex seq uint64 codec ClientCodec pending map[uint64]*Call closing bool + shutdown bool } // A ClientCodec implements writing of RPC requests and @@ -43,7 +53,9 @@ type Client struct { // The client calls WriteRequest to write a request to the connection // and calls ReadResponseHeader and ReadResponseBody in pairs // to read responses. The client calls Close when finished with the -// connection. +// connection. ReadResponseBody may be called with a nil +// argument to force the body of the response to be read and then +// discarded. type ClientCodec interface { WriteRequest(*Request, interface{}) os.Error ReadResponseHeader(*Response) os.Error @@ -55,10 +67,10 @@ type ClientCodec interface { func (client *Client) send(c *Call) { // Register this call. client.mutex.Lock() - if client.shutdown != nil { - c.Error = client.shutdown + if client.shutdown { + c.Error = ErrShutdown client.mutex.Unlock() - _ = c.Done <- c // do not block + c.done() return } c.seq = client.seq @@ -93,25 +105,30 @@ func (client *Client) input() { c := client.pending[seq] client.pending[seq] = c, false client.mutex.Unlock() - err = client.codec.ReadResponseBody(c.Reply) - if response.Error != "" { - c.Error = os.ErrorString(response.Error) - } else if err != nil { - c.Error = err + + if response.Error == "" { + err = client.codec.ReadResponseBody(c.Reply) + if err != nil { + c.Error = os.ErrorString("reading body " + err.String()) + } } else { - // Empty strings should turn into nil os.Errors - c.Error = nil + // We've got an error response. Give this to the request; + // any subsequent requests will get the ReadResponseBody + // error if there is one. + c.Error = ServerError(response.Error) + err = client.codec.ReadResponseBody(nil) + if err != nil { + err = os.ErrorString("reading error body: " + err.String()) + } } - // We don't want to block here. It is the caller's responsibility to make - // sure the channel has enough buffer space. See comment in Go(). - _ = c.Done <- c // do not block + c.done() } // Terminate pending calls. client.mutex.Lock() - client.shutdown = err + client.shutdown = true for _, call := range client.pending { call.Error = err - _ = call.Done <- call // do not block + call.done() } client.mutex.Unlock() if err != os.EOF || !client.closing { @@ -119,6 +136,16 @@ func (client *Client) input() { } } +func (call *Call) done() { + select { + case call.Done <- call: + // ok + default: + // We don't want to block here. It is the caller's responsibility to make + // sure the channel has enough buffer space. See comment in Go(). + } +} + // NewClient returns a new Client to handle requests to the // set of services at the other end of the connection. func NewClient(conn io.ReadWriteCloser) *Client { @@ -201,10 +228,11 @@ func Dial(network, address string) (*Client, os.Error) { } func (client *Client) Close() os.Error { - if client.shutdown != nil || client.closing { - return os.ErrorString("rpc: already closed") - } client.mutex.Lock() + if client.shutdown || client.closing { + client.mutex.Unlock() + return ErrShutdown + } client.closing = true client.mutex.Unlock() return client.codec.Close() @@ -231,9 +259,9 @@ func (client *Client) Go(serviceMethod string, args interface{}, reply interface } } c.Done = done - if client.shutdown != nil { - c.Error = client.shutdown - _ = c.Done <- c // do not block + if client.shutdown { + c.Error = ErrShutdown + c.done() return c } client.send(c) @@ -242,8 +270,8 @@ func (client *Client) Go(serviceMethod string, args interface{}, reply interface // Call invokes the named function, waits for it to complete, and returns its error status. func (client *Client) Call(serviceMethod string, args interface{}, reply interface{}) os.Error { - if client.shutdown != nil { - return client.shutdown + if client.shutdown { + return ErrShutdown } call := <-client.Go(serviceMethod, args, reply, nil).Done return call.Error diff --git a/libgo/go/rpc/debug.go b/libgo/go/rpc/debug.go index 44b32e0..32dc8a1 100644 --- a/libgo/go/rpc/debug.go +++ b/libgo/go/rpc/debug.go @@ -83,7 +83,7 @@ func (server debugHTTP) ServeHTTP(w http.ResponseWriter, req *http.Request) { } server.Unlock() sort.Sort(services) - err := debug.Execute(services, w) + err := debug.Execute(w, services) if err != nil { fmt.Fprintln(w, "rpc: error executing template:", err.String()) } diff --git a/libgo/go/rpc/jsonrpc/client.go b/libgo/go/rpc/jsonrpc/client.go index dcaa69f..5b806bd 100644 --- a/libgo/go/rpc/jsonrpc/client.go +++ b/libgo/go/rpc/jsonrpc/client.go @@ -98,6 +98,9 @@ func (c *clientCodec) ReadResponseHeader(r *rpc.Response) os.Error { } func (c *clientCodec) ReadResponseBody(x interface{}) os.Error { + if x == nil { + return nil + } return json.Unmarshal(*c.resp.Result, x) } diff --git a/libgo/go/rpc/jsonrpc/server.go b/libgo/go/rpc/jsonrpc/server.go index bf53bda..9c6b8b4 100644 --- a/libgo/go/rpc/jsonrpc/server.go +++ b/libgo/go/rpc/jsonrpc/server.go @@ -85,6 +85,9 @@ func (c *serverCodec) ReadRequestHeader(r *rpc.Request) os.Error { } func (c *serverCodec) ReadRequestBody(x interface{}) os.Error { + if x == nil { + return nil + } // JSON params is array value. // RPC params is struct. // Unmarshal into array containing struct for now. diff --git a/libgo/go/rpc/server.go b/libgo/go/rpc/server.go index 5c50bcc..9dcda41 100644 --- a/libgo/go/rpc/server.go +++ b/libgo/go/rpc/server.go @@ -73,7 +73,7 @@ rpc.HandleHTTP() l, e := net.Listen("tcp", ":1234") if e != nil { - log.Exit("listen error:", e) + log.Fatal("listen error:", e) } go http.Serve(l, nil) @@ -82,7 +82,7 @@ client, err := rpc.DialHTTP("tcp", serverAddress + ":1234") if err != nil { - log.Exit("dialing:", err) + log.Fatal("dialing:", err) } Then it can make a remote call: @@ -92,7 +92,7 @@ var reply int err = client.Call("Arith.Multiply", args, &reply) if err != nil { - log.Exit("arith error:", err) + log.Fatal("arith error:", err) } fmt.Printf("Arith: %d*%d=%d", args.A, args.B, *reply) @@ -225,7 +225,7 @@ func (server *Server) register(rcvr interface{}, name string, useName bool) os.E sname = name } if sname == "" { - log.Exit("rpc: no service name for type", s.typ.String()) + log.Fatal("rpc: no service name for type", s.typ.String()) } if s.typ.PkgPath() != "" && !isExported(sname) && !useName { s := "rpc Register: type " + sname + " is not exported" @@ -299,10 +299,10 @@ func (server *Server) register(rcvr interface{}, name string, useName bool) os.E // A value sent as a placeholder for the response when the server receives an invalid request. type InvalidRequest struct { - marker int + Marker int } -var invalidRequest = InvalidRequest{1} +var invalidRequest = InvalidRequest{} func _new(t *reflect.PtrType) *reflect.PtrValue { v := reflect.MakeZero(t).(*reflect.PtrValue) @@ -316,6 +316,7 @@ func sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec Se resp.ServiceMethod = req.ServiceMethod if errmsg != "" { resp.Error = errmsg + reply = invalidRequest } resp.Seq = req.Seq sending.Lock() @@ -389,54 +390,74 @@ func (server *Server) ServeConn(conn io.ReadWriteCloser) { func (server *Server) ServeCodec(codec ServerCodec) { sending := new(sync.Mutex) for { - // Grab the request header. - req := new(Request) - err := codec.ReadRequestHeader(req) + req, service, mtype, err := server.readRequest(codec) if err != nil { + if err != os.EOF { + log.Println("rpc:", err) + } if err == os.EOF || err == io.ErrUnexpectedEOF { - if err == io.ErrUnexpectedEOF { - log.Println("rpc:", err) - } break } - s := "rpc: server cannot decode request: " + err.String() - sendResponse(sending, req, invalidRequest, codec, s) - break - } - serviceMethod := strings.Split(req.ServiceMethod, ".", -1) - if len(serviceMethod) != 2 { - s := "rpc: service/method request ill-formed: " + req.ServiceMethod - sendResponse(sending, req, invalidRequest, codec, s) - continue - } - // Look up the request. - server.Lock() - service, ok := server.serviceMap[serviceMethod[0]] - server.Unlock() - if !ok { - s := "rpc: can't find service " + req.ServiceMethod - sendResponse(sending, req, invalidRequest, codec, s) - continue - } - mtype, ok := service.method[serviceMethod[1]] - if !ok { - s := "rpc: can't find method " + req.ServiceMethod - sendResponse(sending, req, invalidRequest, codec, s) + // discard body + codec.ReadRequestBody(nil) + + // send a response if we actually managed to read a header. + if req != nil { + sendResponse(sending, req, invalidRequest, codec, err.String()) + } continue } + // Decode the argument value. argv := _new(mtype.ArgType) replyv := _new(mtype.ReplyType) err = codec.ReadRequestBody(argv.Interface()) if err != nil { - log.Println("rpc: tearing down", serviceMethod[0], "connection:", err) + if err == os.EOF || err == io.ErrUnexpectedEOF { + if err == io.ErrUnexpectedEOF { + log.Println("rpc:", err) + } + break + } sendResponse(sending, req, replyv.Interface(), codec, err.String()) - break + continue } go service.call(sending, mtype, req, argv, replyv, codec) } codec.Close() } +func (server *Server) readRequest(codec ServerCodec) (req *Request, service *service, mtype *methodType, err os.Error) { + // Grab the request header. + req = new(Request) + err = codec.ReadRequestHeader(req) + if err != nil { + req = nil + if err == os.EOF || err == io.ErrUnexpectedEOF { + return + } + err = os.ErrorString("rpc: server cannot decode request: " + err.String()) + return + } + + serviceMethod := strings.Split(req.ServiceMethod, ".", -1) + if len(serviceMethod) != 2 { + err = os.ErrorString("rpc: service/method request ill-formed: " + req.ServiceMethod) + return + } + // Look up the request. + server.Lock() + service = server.serviceMap[serviceMethod[0]] + server.Unlock() + if service == nil { + err = os.ErrorString("rpc: can't find service " + req.ServiceMethod) + return + } + mtype = service.method[serviceMethod[1]] + if mtype == nil { + err = os.ErrorString("rpc: can't find method " + req.ServiceMethod) + } + return +} // Accept accepts connections on the listener and serves requests // for each incoming connection. Accept blocks; the caller typically @@ -445,7 +466,7 @@ func (server *Server) Accept(lis net.Listener) { for { conn, err := lis.Accept() if err != nil { - log.Exit("rpc.Serve: accept:", err.String()) // TODO(r): exit? + log.Fatal("rpc.Serve: accept:", err.String()) // TODO(r): exit? } go server.ServeConn(conn) } @@ -465,7 +486,8 @@ func RegisterName(name string, rcvr interface{}) os.Error { // The server calls ReadRequestHeader and ReadRequestBody in pairs // to read requests from the connection, and it calls WriteResponse to // write a response back. The server calls Close when finished with the -// connection. +// connection. ReadRequestBody may be called with a nil +// argument to force the body of the request to be read and discarded. type ServerCodec interface { ReadRequestHeader(*Request) os.Error ReadRequestBody(interface{}) os.Error diff --git a/libgo/go/rpc/server_test.go b/libgo/go/rpc/server_test.go index 355d51c..05aaebc 100644 --- a/libgo/go/rpc/server_test.go +++ b/libgo/go/rpc/server_test.go @@ -72,7 +72,7 @@ func (t *Arith) Error(args *Args, reply *Reply) os.Error { func listenTCP() (net.Listener, string) { l, e := net.Listen("tcp", "127.0.0.1:0") // any available address if e != nil { - log.Exitf("net.Listen tcp :0: %v", e) + log.Fatalf("net.Listen tcp :0: %v", e) } return l, l.Addr().String() } @@ -134,14 +134,25 @@ func testRPC(t *testing.T, addr string) { t.Errorf("Add: expected %d got %d", reply.C, args.A+args.B) } - args = &Args{7, 8} + // Nonexistent method + args = &Args{7, 0} reply = new(Reply) - err = client.Call("Arith.Mul", args, reply) - if err != nil { - t.Errorf("Mul: expected no error but got string %q", err.String()) + err = client.Call("Arith.BadOperation", args, reply) + // expect an error + if err == nil { + t.Error("BadOperation: expected error") + } else if !strings.HasPrefix(err.String(), "rpc: can't find method ") { + t.Errorf("BadOperation: expected can't find method error; got %q", err) } - if reply.C != args.A*args.B { - t.Errorf("Mul: expected %d got %d", reply.C, args.A*args.B) + + // Unknown service + args = &Args{7, 8} + reply = new(Reply) + err = client.Call("Arith.Unknown", args, reply) + if err == nil { + t.Error("expected error calling unknown service") + } else if strings.Index(err.String(), "method") < 0 { + t.Error("expected error about method; got", err) } // Out of order. @@ -178,6 +189,15 @@ func testRPC(t *testing.T, addr string) { t.Error("Div: expected divide by zero error; got", err) } + // Bad type. + reply = new(Reply) + err = client.Call("Arith.Add", reply, reply) // args, reply would be the correct thing to use + if err == nil { + t.Error("expected error calling Arith.Add with wrong arg type") + } else if strings.Index(err.String(), "type") < 0 { + t.Error("expected error about type; got", err) + } + // Non-struct argument const Val = 12345 str := fmt.Sprint(Val) @@ -200,9 +220,19 @@ func testRPC(t *testing.T, addr string) { if str != expect { t.Errorf("String: expected %s got %s", expect, str) } + + args = &Args{7, 8} + reply = new(Reply) + err = client.Call("Arith.Mul", args, reply) + if err != nil { + t.Errorf("Mul: expected no error but got string %q", err.String()) + } + if reply.C != args.A*args.B { + t.Errorf("Mul: expected %d got %d", reply.C, args.A*args.B) + } } -func TestHTTPRPC(t *testing.T) { +func TestHTTP(t *testing.T) { once.Do(startServer) testHTTPRPC(t, "") newOnce.Do(startNewServer) @@ -233,65 +263,6 @@ func testHTTPRPC(t *testing.T, path string) { } } -func TestCheckUnknownService(t *testing.T) { - once.Do(startServer) - - conn, err := net.Dial("tcp", "", serverAddr) - if err != nil { - t.Fatal("dialing:", err) - } - - client := NewClient(conn) - - args := &Args{7, 8} - reply := new(Reply) - err = client.Call("Unknown.Add", args, reply) - if err == nil { - t.Error("expected error calling unknown service") - } else if strings.Index(err.String(), "service") < 0 { - t.Error("expected error about service; got", err) - } -} - -func TestCheckUnknownMethod(t *testing.T) { - once.Do(startServer) - - conn, err := net.Dial("tcp", "", serverAddr) - if err != nil { - t.Fatal("dialing:", err) - } - - client := NewClient(conn) - - args := &Args{7, 8} - reply := new(Reply) - err = client.Call("Arith.Unknown", args, reply) - if err == nil { - t.Error("expected error calling unknown service") - } else if strings.Index(err.String(), "method") < 0 { - t.Error("expected error about method; got", err) - } -} - -func TestCheckBadType(t *testing.T) { - once.Do(startServer) - - conn, err := net.Dial("tcp", "", serverAddr) - if err != nil { - t.Fatal("dialing:", err) - } - - client := NewClient(conn) - - reply := new(Reply) - err = client.Call("Arith.Add", reply, reply) // args, reply would be the correct thing to use - if err == nil { - t.Error("expected error calling Arith.Add with wrong arg type") - } else if strings.Index(err.String(), "type") < 0 { - t.Error("expected error about type; got", err) - } -} - type ArgNotPointer int type ReplyNotPointer int type ArgNotPublic int @@ -364,14 +335,12 @@ func TestSendDeadlock(t *testing.T) { testSendDeadlock(client) done <- true }() - for i := 0; i < 50; i++ { - time.Sleep(100 * 1e6) - _, ok := <-done - if ok { - return - } + select { + case <-done: + return + case <-time.After(5e9): + t.Fatal("deadlock") } - t.Fatal("deadlock") } func testSendDeadlock(client *Client) { |