aboutsummaryrefslogtreecommitdiff
path: root/libgo/go/rpc/server.go
diff options
context:
space:
mode:
Diffstat (limited to 'libgo/go/rpc/server.go')
-rw-r--r--libgo/go/rpc/server.go131
1 files changed, 80 insertions, 51 deletions
diff --git a/libgo/go/rpc/server.go b/libgo/go/rpc/server.go
index acadeec..7450744 100644
--- a/libgo/go/rpc/server.go
+++ b/libgo/go/rpc/server.go
@@ -174,7 +174,7 @@ type Response struct {
// Server represents an RPC Server.
type Server struct {
- sync.Mutex // protects the serviceMap
+ mu sync.Mutex // protects the serviceMap
serviceMap map[string]*service
reqLock sync.Mutex // protects freeReq
freeReq *Request
@@ -196,12 +196,14 @@ func isExported(name string) bool {
return unicode.IsUpper(rune)
}
-// Is this type exported or local to this package?
-func isExportedOrLocalType(t reflect.Type) bool {
+// Is this type exported or a builtin?
+func isExportedOrBuiltinType(t reflect.Type) bool {
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
- return t.PkgPath() == "" || isExported(t.Name())
+ // PkgPath will be non-empty even for an exported type,
+ // so we need to check the type name as well.
+ return isExported(t.Name()) || t.PkgPath() == ""
}
// Register publishes in the server the set of methods of the
@@ -224,8 +226,8 @@ func (server *Server) RegisterName(name string, rcvr interface{}) os.Error {
}
func (server *Server) register(rcvr interface{}, name string, useName bool) os.Error {
- server.Lock()
- defer server.Unlock()
+ server.mu.Lock()
+ defer server.mu.Unlock()
if server.serviceMap == nil {
server.serviceMap = make(map[string]*service)
}
@@ -239,13 +241,13 @@ func (server *Server) register(rcvr interface{}, name string, useName bool) os.E
if sname == "" {
log.Fatal("rpc: no service name for type", s.typ.String())
}
- if s.typ.PkgPath() != "" && !isExported(sname) && !useName {
+ if !isExported(sname) && !useName {
s := "rpc Register: type " + sname + " is not exported"
log.Print(s)
- return os.ErrorString(s)
+ return os.NewError(s)
}
if _, present := server.serviceMap[sname]; present {
- return os.ErrorString("rpc: service already defined: " + sname)
+ return os.NewError("rpc: service already defined: " + sname)
}
s.name = sname
s.method = make(map[string]*methodType)
@@ -255,7 +257,7 @@ func (server *Server) register(rcvr interface{}, name string, useName bool) os.E
method := s.typ.Method(m)
mtype := method.Type
mname := method.Name
- if mtype.PkgPath() != "" || !isExported(mname) {
+ if method.PkgPath != "" {
continue
}
// Method needs three ins: receiver, *args, *reply.
@@ -265,7 +267,7 @@ func (server *Server) register(rcvr interface{}, name string, useName bool) os.E
}
// First arg need not be a pointer.
argType := mtype.In(1)
- if !isExportedOrLocalType(argType) {
+ if !isExportedOrBuiltinType(argType) {
log.Println(mname, "argument type not exported or local:", argType)
continue
}
@@ -275,7 +277,7 @@ func (server *Server) register(rcvr interface{}, name string, useName bool) os.E
log.Println("method", mname, "reply type not a pointer:", replyType)
continue
}
- if !isExportedOrLocalType(replyType) {
+ if !isExportedOrBuiltinType(replyType) {
log.Println("method", mname, "reply type not exported or local:", replyType)
continue
}
@@ -294,7 +296,7 @@ func (server *Server) register(rcvr interface{}, name string, useName bool) os.E
if len(s.method) == 0 {
s := "rpc Register: type " + sname + " has no exported methods of suitable type"
log.Print(s)
- return os.ErrorString(s)
+ return os.NewError(s)
}
server.serviceMap[s.name] = s
return nil
@@ -376,7 +378,6 @@ func (c *gobServerCodec) Close() os.Error {
return c.rwc.Close()
}
-
// ServeConn runs the server on a single connection.
// ServeConn blocks, serving the connection until the client hangs up.
// The caller typically invokes ServeConn in a go statement.
@@ -393,7 +394,7 @@ func (server *Server) ServeConn(conn io.ReadWriteCloser) {
func (server *Server) ServeCodec(codec ServerCodec) {
sending := new(sync.Mutex)
for {
- req, service, mtype, err := server.readRequest(codec)
+ service, mtype, req, argv, replyv, err := server.readRequest(codec)
if err != nil {
if err != os.EOF {
log.Println("rpc:", err)
@@ -401,9 +402,6 @@ func (server *Server) ServeCodec(codec ServerCodec) {
if err == os.EOF || err == io.ErrUnexpectedEOF {
break
}
- // discard body
- codec.ReadRequestBody(nil)
-
// send a response if we actually managed to read a header.
if req != nil {
server.sendResponse(sending, req, invalidRequest, codec, err.String())
@@ -411,35 +409,29 @@ func (server *Server) ServeCodec(codec ServerCodec) {
}
continue
}
+ go service.call(server, sending, mtype, req, argv, replyv, codec)
+ }
+ codec.Close()
+}
- // Decode the argument value.
- var argv reflect.Value
- argIsValue := false // if true, need to indirect before calling.
- if mtype.ArgType.Kind() == reflect.Ptr {
- argv = reflect.New(mtype.ArgType.Elem())
- } else {
- argv = reflect.New(mtype.ArgType)
- argIsValue = true
- }
- // argv guaranteed to be a pointer now.
- replyv := reflect.New(mtype.ReplyType.Elem())
- err = codec.ReadRequestBody(argv.Interface())
- if err != nil {
- if err == os.EOF || err == io.ErrUnexpectedEOF {
- if err == io.ErrUnexpectedEOF {
- log.Println("rpc:", err)
- }
- break
- }
- server.sendResponse(sending, req, replyv.Interface(), codec, err.String())
- continue
+// ServeRequest is like ServeCodec but synchronously serves a single request.
+// It does not close the codec upon completion.
+func (server *Server) ServeRequest(codec ServerCodec) os.Error {
+ sending := new(sync.Mutex)
+ service, mtype, req, argv, replyv, err := server.readRequest(codec)
+ if err != nil {
+ if err == os.EOF || err == io.ErrUnexpectedEOF {
+ return err
}
- if argIsValue {
- argv = argv.Elem()
+ // send a response if we actually managed to read a header.
+ if req != nil {
+ server.sendResponse(sending, req, invalidRequest, codec, err.String())
+ server.freeRequest(req)
}
- go service.call(server, sending, mtype, req, argv, replyv, codec)
+ return err
}
- codec.Close()
+ service.call(server, sending, mtype, req, argv, replyv, codec)
+ return nil
}
func (server *Server) getRequest() *Request {
@@ -482,7 +474,38 @@ func (server *Server) freeResponse(resp *Response) {
server.respLock.Unlock()
}
-func (server *Server) readRequest(codec ServerCodec) (req *Request, service *service, mtype *methodType, err os.Error) {
+func (server *Server) readRequest(codec ServerCodec) (service *service, mtype *methodType, req *Request, argv, replyv reflect.Value, err os.Error) {
+ service, mtype, req, err = server.readRequestHeader(codec)
+ if err != nil {
+ if err == os.EOF || err == io.ErrUnexpectedEOF {
+ return
+ }
+ // discard body
+ codec.ReadRequestBody(nil)
+ return
+ }
+
+ // Decode the argument value.
+ argIsValue := false // if true, need to indirect before calling.
+ if mtype.ArgType.Kind() == reflect.Ptr {
+ argv = reflect.New(mtype.ArgType.Elem())
+ } else {
+ argv = reflect.New(mtype.ArgType)
+ argIsValue = true
+ }
+ // argv guaranteed to be a pointer now.
+ if err = codec.ReadRequestBody(argv.Interface()); err != nil {
+ return
+ }
+ if argIsValue {
+ argv = argv.Elem()
+ }
+
+ replyv = reflect.New(mtype.ReplyType.Elem())
+ return
+}
+
+func (server *Server) readRequestHeader(codec ServerCodec) (service *service, mtype *methodType, req *Request, err os.Error) {
// Grab the request header.
req = server.getRequest()
err = codec.ReadRequestHeader(req)
@@ -491,26 +514,26 @@ func (server *Server) readRequest(codec ServerCodec) (req *Request, service *ser
if err == os.EOF || err == io.ErrUnexpectedEOF {
return
}
- err = os.ErrorString("rpc: server cannot decode request: " + err.String())
+ err = os.NewError("rpc: server cannot decode request: " + err.String())
return
}
- serviceMethod := strings.Split(req.ServiceMethod, ".", -1)
+ serviceMethod := strings.Split(req.ServiceMethod, ".")
if len(serviceMethod) != 2 {
- err = os.ErrorString("rpc: service/method request ill-formed: " + req.ServiceMethod)
+ err = os.NewError("rpc: service/method request ill-formed: " + req.ServiceMethod)
return
}
// Look up the request.
- server.Lock()
+ server.mu.Lock()
service = server.serviceMap[serviceMethod[0]]
- server.Unlock()
+ server.mu.Unlock()
if service == nil {
- err = os.ErrorString("rpc: can't find service " + req.ServiceMethod)
+ err = os.NewError("rpc: can't find service " + req.ServiceMethod)
return
}
mtype = service.method[serviceMethod[1]]
if mtype == nil {
- err = os.ErrorString("rpc: can't find method " + req.ServiceMethod)
+ err = os.NewError("rpc: can't find method " + req.ServiceMethod)
}
return
}
@@ -567,6 +590,12 @@ func ServeCodec(codec ServerCodec) {
DefaultServer.ServeCodec(codec)
}
+// ServeRequest is like ServeCodec but synchronously serves a single request.
+// It does not close the codec upon completion.
+func ServeRequest(codec ServerCodec) os.Error {
+ return DefaultServer.ServeRequest(codec)
+}
+
// Accept accepts connections on the listener and serves requests
// to DefaultServer for each incoming connection.
// Accept blocks; the caller typically invokes it in a go statement.