diff options
Diffstat (limited to 'libgo/go/rpc/server.go')
-rw-r--r-- | libgo/go/rpc/server.go | 131 |
1 files changed, 80 insertions, 51 deletions
diff --git a/libgo/go/rpc/server.go b/libgo/go/rpc/server.go index acadeec..7450744 100644 --- a/libgo/go/rpc/server.go +++ b/libgo/go/rpc/server.go @@ -174,7 +174,7 @@ type Response struct { // Server represents an RPC Server. type Server struct { - sync.Mutex // protects the serviceMap + mu sync.Mutex // protects the serviceMap serviceMap map[string]*service reqLock sync.Mutex // protects freeReq freeReq *Request @@ -196,12 +196,14 @@ func isExported(name string) bool { return unicode.IsUpper(rune) } -// Is this type exported or local to this package? -func isExportedOrLocalType(t reflect.Type) bool { +// Is this type exported or a builtin? +func isExportedOrBuiltinType(t reflect.Type) bool { for t.Kind() == reflect.Ptr { t = t.Elem() } - return t.PkgPath() == "" || isExported(t.Name()) + // PkgPath will be non-empty even for an exported type, + // so we need to check the type name as well. + return isExported(t.Name()) || t.PkgPath() == "" } // Register publishes in the server the set of methods of the @@ -224,8 +226,8 @@ func (server *Server) RegisterName(name string, rcvr interface{}) os.Error { } func (server *Server) register(rcvr interface{}, name string, useName bool) os.Error { - server.Lock() - defer server.Unlock() + server.mu.Lock() + defer server.mu.Unlock() if server.serviceMap == nil { server.serviceMap = make(map[string]*service) } @@ -239,13 +241,13 @@ func (server *Server) register(rcvr interface{}, name string, useName bool) os.E if sname == "" { log.Fatal("rpc: no service name for type", s.typ.String()) } - if s.typ.PkgPath() != "" && !isExported(sname) && !useName { + if !isExported(sname) && !useName { s := "rpc Register: type " + sname + " is not exported" log.Print(s) - return os.ErrorString(s) + return os.NewError(s) } if _, present := server.serviceMap[sname]; present { - return os.ErrorString("rpc: service already defined: " + sname) + return os.NewError("rpc: service already defined: " + sname) } s.name = sname s.method = make(map[string]*methodType) @@ -255,7 +257,7 @@ func (server *Server) register(rcvr interface{}, name string, useName bool) os.E method := s.typ.Method(m) mtype := method.Type mname := method.Name - if mtype.PkgPath() != "" || !isExported(mname) { + if method.PkgPath != "" { continue } // Method needs three ins: receiver, *args, *reply. @@ -265,7 +267,7 @@ func (server *Server) register(rcvr interface{}, name string, useName bool) os.E } // First arg need not be a pointer. argType := mtype.In(1) - if !isExportedOrLocalType(argType) { + if !isExportedOrBuiltinType(argType) { log.Println(mname, "argument type not exported or local:", argType) continue } @@ -275,7 +277,7 @@ func (server *Server) register(rcvr interface{}, name string, useName bool) os.E log.Println("method", mname, "reply type not a pointer:", replyType) continue } - if !isExportedOrLocalType(replyType) { + if !isExportedOrBuiltinType(replyType) { log.Println("method", mname, "reply type not exported or local:", replyType) continue } @@ -294,7 +296,7 @@ func (server *Server) register(rcvr interface{}, name string, useName bool) os.E if len(s.method) == 0 { s := "rpc Register: type " + sname + " has no exported methods of suitable type" log.Print(s) - return os.ErrorString(s) + return os.NewError(s) } server.serviceMap[s.name] = s return nil @@ -376,7 +378,6 @@ func (c *gobServerCodec) Close() os.Error { return c.rwc.Close() } - // ServeConn runs the server on a single connection. // ServeConn blocks, serving the connection until the client hangs up. // The caller typically invokes ServeConn in a go statement. @@ -393,7 +394,7 @@ func (server *Server) ServeConn(conn io.ReadWriteCloser) { func (server *Server) ServeCodec(codec ServerCodec) { sending := new(sync.Mutex) for { - req, service, mtype, err := server.readRequest(codec) + service, mtype, req, argv, replyv, err := server.readRequest(codec) if err != nil { if err != os.EOF { log.Println("rpc:", err) @@ -401,9 +402,6 @@ func (server *Server) ServeCodec(codec ServerCodec) { if err == os.EOF || err == io.ErrUnexpectedEOF { break } - // discard body - codec.ReadRequestBody(nil) - // send a response if we actually managed to read a header. if req != nil { server.sendResponse(sending, req, invalidRequest, codec, err.String()) @@ -411,35 +409,29 @@ func (server *Server) ServeCodec(codec ServerCodec) { } continue } + go service.call(server, sending, mtype, req, argv, replyv, codec) + } + codec.Close() +} - // Decode the argument value. - var argv reflect.Value - argIsValue := false // if true, need to indirect before calling. - if mtype.ArgType.Kind() == reflect.Ptr { - argv = reflect.New(mtype.ArgType.Elem()) - } else { - argv = reflect.New(mtype.ArgType) - argIsValue = true - } - // argv guaranteed to be a pointer now. - replyv := reflect.New(mtype.ReplyType.Elem()) - err = codec.ReadRequestBody(argv.Interface()) - if err != nil { - if err == os.EOF || err == io.ErrUnexpectedEOF { - if err == io.ErrUnexpectedEOF { - log.Println("rpc:", err) - } - break - } - server.sendResponse(sending, req, replyv.Interface(), codec, err.String()) - continue +// ServeRequest is like ServeCodec but synchronously serves a single request. +// It does not close the codec upon completion. +func (server *Server) ServeRequest(codec ServerCodec) os.Error { + sending := new(sync.Mutex) + service, mtype, req, argv, replyv, err := server.readRequest(codec) + if err != nil { + if err == os.EOF || err == io.ErrUnexpectedEOF { + return err } - if argIsValue { - argv = argv.Elem() + // send a response if we actually managed to read a header. + if req != nil { + server.sendResponse(sending, req, invalidRequest, codec, err.String()) + server.freeRequest(req) } - go service.call(server, sending, mtype, req, argv, replyv, codec) + return err } - codec.Close() + service.call(server, sending, mtype, req, argv, replyv, codec) + return nil } func (server *Server) getRequest() *Request { @@ -482,7 +474,38 @@ func (server *Server) freeResponse(resp *Response) { server.respLock.Unlock() } -func (server *Server) readRequest(codec ServerCodec) (req *Request, service *service, mtype *methodType, err os.Error) { +func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, err os.Error) { + service, mtype, req, err = server.readRequestHeader(codec) + if err != nil { + if err == os.EOF || err == io.ErrUnexpectedEOF { + return + } + // discard body + codec.ReadRequestBody(nil) + return + } + + // Decode the argument value. + argIsValue := false // if true, need to indirect before calling. + if mtype.ArgType.Kind() == reflect.Ptr { + argv = reflect.New(mtype.ArgType.Elem()) + } else { + argv = reflect.New(mtype.ArgType) + argIsValue = true + } + // argv guaranteed to be a pointer now. + if err = codec.ReadRequestBody(argv.Interface()); err != nil { + return + } + if argIsValue { + argv = argv.Elem() + } + + replyv = reflect.New(mtype.ReplyType.Elem()) + return +} + +func (server *Server) readRequestHeader(codec ServerCodec) (service *service, mtype *methodType, req *Request, err os.Error) { // Grab the request header. req = server.getRequest() err = codec.ReadRequestHeader(req) @@ -491,26 +514,26 @@ func (server *Server) readRequest(codec ServerCodec) (req *Request, service *ser if err == os.EOF || err == io.ErrUnexpectedEOF { return } - err = os.ErrorString("rpc: server cannot decode request: " + err.String()) + err = os.NewError("rpc: server cannot decode request: " + err.String()) return } - serviceMethod := strings.Split(req.ServiceMethod, ".", -1) + serviceMethod := strings.Split(req.ServiceMethod, ".") if len(serviceMethod) != 2 { - err = os.ErrorString("rpc: service/method request ill-formed: " + req.ServiceMethod) + err = os.NewError("rpc: service/method request ill-formed: " + req.ServiceMethod) return } // Look up the request. - server.Lock() + server.mu.Lock() service = server.serviceMap[serviceMethod[0]] - server.Unlock() + server.mu.Unlock() if service == nil { - err = os.ErrorString("rpc: can't find service " + req.ServiceMethod) + err = os.NewError("rpc: can't find service " + req.ServiceMethod) return } mtype = service.method[serviceMethod[1]] if mtype == nil { - err = os.ErrorString("rpc: can't find method " + req.ServiceMethod) + err = os.NewError("rpc: can't find method " + req.ServiceMethod) } return } @@ -567,6 +590,12 @@ func ServeCodec(codec ServerCodec) { DefaultServer.ServeCodec(codec) } +// ServeRequest is like ServeCodec but synchronously serves a single request. +// It does not close the codec upon completion. +func ServeRequest(codec ServerCodec) os.Error { + return DefaultServer.ServeRequest(codec) +} + // Accept accepts connections on the listener and serves requests // to DefaultServer for each incoming connection. // Accept blocks; the caller typically invokes it in a go statement. |