aboutsummaryrefslogtreecommitdiff
path: root/libgo/go/rpc/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'libgo/go/rpc/server.go')
-rw-r--r--libgo/go/rpc/server.go100
1 files changed, 61 insertions, 39 deletions
diff --git a/libgo/go/rpc/server.go b/libgo/go/rpc/server.go
index 5c50bcc..9dcda41 100644
--- a/libgo/go/rpc/server.go
+++ b/libgo/go/rpc/server.go
@@ -73,7 +73,7 @@
rpc.HandleHTTP()
l, e := net.Listen("tcp", ":1234")
if e != nil {
- log.Exit("listen error:", e)
+ log.Fatal("listen error:", e)
}
go http.Serve(l, nil)
@@ -82,7 +82,7 @@
client, err := rpc.DialHTTP("tcp", serverAddress + ":1234")
if err != nil {
- log.Exit("dialing:", err)
+ log.Fatal("dialing:", err)
}
Then it can make a remote call:
@@ -92,7 +92,7 @@
var reply int
err = client.Call("Arith.Multiply", args, &reply)
if err != nil {
- log.Exit("arith error:", err)
+ log.Fatal("arith error:", err)
}
fmt.Printf("Arith: %d*%d=%d", args.A, args.B, *reply)
@@ -225,7 +225,7 @@ func (server *Server) register(rcvr interface{}, name string, useName bool) os.E
sname = name
}
if sname == "" {
- log.Exit("rpc: no service name for type", s.typ.String())
+ log.Fatal("rpc: no service name for type", s.typ.String())
}
if s.typ.PkgPath() != "" && !isExported(sname) && !useName {
s := "rpc Register: type " + sname + " is not exported"
@@ -299,10 +299,10 @@ func (server *Server) register(rcvr interface{}, name string, useName bool) os.E
// A value sent as a placeholder for the response when the server receives an invalid request.
type InvalidRequest struct {
- marker int
+ Marker int
}
-var invalidRequest = InvalidRequest{1}
+var invalidRequest = InvalidRequest{}
func _new(t *reflect.PtrType) *reflect.PtrValue {
v := reflect.MakeZero(t).(*reflect.PtrValue)
@@ -316,6 +316,7 @@ func sendResponse(sending *sync.Mutex, req *Request, reply interface{}, codec Se
resp.ServiceMethod = req.ServiceMethod
if errmsg != "" {
resp.Error = errmsg
+ reply = invalidRequest
}
resp.Seq = req.Seq
sending.Lock()
@@ -389,54 +390,74 @@ func (server *Server) ServeConn(conn io.ReadWriteCloser) {
func (server *Server) ServeCodec(codec ServerCodec) {
sending := new(sync.Mutex)
for {
- // Grab the request header.
- req := new(Request)
- err := codec.ReadRequestHeader(req)
+ req, service, mtype, err := server.readRequest(codec)
if err != nil {
+ if err != os.EOF {
+ log.Println("rpc:", err)
+ }
if err == os.EOF || err == io.ErrUnexpectedEOF {
- if err == io.ErrUnexpectedEOF {
- log.Println("rpc:", err)
- }
break
}
- s := "rpc: server cannot decode request: " + err.String()
- sendResponse(sending, req, invalidRequest, codec, s)
- break
- }
- serviceMethod := strings.Split(req.ServiceMethod, ".", -1)
- if len(serviceMethod) != 2 {
- s := "rpc: service/method request ill-formed: " + req.ServiceMethod
- sendResponse(sending, req, invalidRequest, codec, s)
- continue
- }
- // Look up the request.
- server.Lock()
- service, ok := server.serviceMap[serviceMethod[0]]
- server.Unlock()
- if !ok {
- s := "rpc: can't find service " + req.ServiceMethod
- sendResponse(sending, req, invalidRequest, codec, s)
- continue
- }
- mtype, ok := service.method[serviceMethod[1]]
- if !ok {
- s := "rpc: can't find method " + req.ServiceMethod
- sendResponse(sending, req, invalidRequest, codec, s)
+ // discard body
+ codec.ReadRequestBody(nil)
+
+ // send a response if we actually managed to read a header.
+ if req != nil {
+ sendResponse(sending, req, invalidRequest, codec, err.String())
+ }
continue
}
+
// Decode the argument value.
argv := _new(mtype.ArgType)
replyv := _new(mtype.ReplyType)
err = codec.ReadRequestBody(argv.Interface())
if err != nil {
- log.Println("rpc: tearing down", serviceMethod[0], "connection:", err)
+ if err == os.EOF || err == io.ErrUnexpectedEOF {
+ if err == io.ErrUnexpectedEOF {
+ log.Println("rpc:", err)
+ }
+ break
+ }
sendResponse(sending, req, replyv.Interface(), codec, err.String())
- break
+ continue
}
go service.call(sending, mtype, req, argv, replyv, codec)
}
codec.Close()
}
+func (server *Server) readRequest(codec ServerCodec) (req *Request, service *service, mtype *methodType, err os.Error) {
+ // Grab the request header.
+ req = new(Request)
+ err = codec.ReadRequestHeader(req)
+ if err != nil {
+ req = nil
+ if err == os.EOF || err == io.ErrUnexpectedEOF {
+ return
+ }
+ err = os.ErrorString("rpc: server cannot decode request: " + err.String())
+ return
+ }
+
+ serviceMethod := strings.Split(req.ServiceMethod, ".", -1)
+ if len(serviceMethod) != 2 {
+ err = os.ErrorString("rpc: service/method request ill-formed: " + req.ServiceMethod)
+ return
+ }
+ // Look up the request.
+ server.Lock()
+ service = server.serviceMap[serviceMethod[0]]
+ server.Unlock()
+ if service == nil {
+ err = os.ErrorString("rpc: can't find service " + req.ServiceMethod)
+ return
+ }
+ mtype = service.method[serviceMethod[1]]
+ if mtype == nil {
+ err = os.ErrorString("rpc: can't find method " + req.ServiceMethod)
+ }
+ return
+}
// Accept accepts connections on the listener and serves requests
// for each incoming connection. Accept blocks; the caller typically
@@ -445,7 +466,7 @@ func (server *Server) Accept(lis net.Listener) {
for {
conn, err := lis.Accept()
if err != nil {
- log.Exit("rpc.Serve: accept:", err.String()) // TODO(r): exit?
+ log.Fatal("rpc.Serve: accept:", err.String()) // TODO(r): exit?
}
go server.ServeConn(conn)
}
@@ -465,7 +486,8 @@ func RegisterName(name string, rcvr interface{}) os.Error {
// The server calls ReadRequestHeader and ReadRequestBody in pairs
// to read requests from the connection, and it calls WriteResponse to
// write a response back. The server calls Close when finished with the
-// connection.
+// connection. ReadRequestBody may be called with a nil
+// argument to force the body of the request to be read and discarded.
type ServerCodec interface {
ReadRequestHeader(*Request) os.Error
ReadRequestBody(interface{}) os.Error