【golang高级开发】如何使用golang实现自定义RPC框架
RPC (Remote Procedure Call)是一种远程调用协议,通过网络传输,使得程序能够像本地调用一样调用远程服务。在现代微服务架构中,RPC协议被广泛使用。golang通过标准库的net/rpc包提供了一套RPC框架,但是这个框架无法满足一些特定的业务需求,本文就来介绍如何使用golang自己实现一个RPC框架。
1. 基本概念
在实现自定义RPC框架之前,需要先了解以下几个基本概念:
- Service:RPC调用的服务,即提供RPC服务的函数集合。
- Method:Service中的方法,即具体的RPC调用方法。
- Codec:序列化和反序列化的方法,将调用的参数和返回值序列化成二进制数据,以便通过网络传输。
- Transport:网络传输协议,用于将序列化后的二进制数据通过网络传输到远程服务。
2. 实现步骤
接下来我们就来实现一个简单的自定义RPC框架,步骤如下:
- 定义Service和Method
- 实现Codec
- 实现Transport
- 完成框架
2.1 定义Service和Method
我们以一个简单的计算器服务为例,在服务端提供两个方法Add和Multiply,客户端可以通过RPC调用这两个方法。
定义服务:
```go
// 定义CalculatorService接口
type CalculatorService interface {
Add(int, int) int
Multiply(int, int) int
}
// 实现具体的CalculatorService
type CalculatorServiceImpl struct {}
func (c *CalculatorServiceImpl) Add(a, b int) int {
return a + b
}
func (c *CalculatorServiceImpl) Multiply(a, b int) int {
return a * b
}
```
定义Service和Method之后,接下来需要定义一个struct来存储Service和其对应的Method。同时,定义一个Register方法,用于注册新的Service和Method。
```go
type Server struct {
services map[string]*service
}
type service struct {
typ reflect.Type
method map[string]*methodType
}
type methodType struct {
method reflect.Method
ArgType reflect.Type
ReplyType reflect.Type
}
func (s *Server) Register(receiver interface{}) error {
service := new(service)
service.typ = reflect.TypeOf(receiver).Elem()
service.method = make(map[string]*methodType)
for i := 0; i < service.typ.NumMethod(); i++ {
method := service.typ.Method(i)
mType := method.Type
if mType.NumIn() != 3 || mType.NumOut() != 1 {
continue
}
argType := mType.In(1)
replyType := mType.In(2)
if !isExportedOrBuiltinType(argType) || !isExportedOrBuiltinType(replyType) {
continue
}
service.method[method.Name] = &methodType{
method: method,
ArgType: argType,
ReplyType: replyType,
}
}
s.services[reflect.Indirect(reflect.ValueOf(receiver)).Type().Name()] = service
return nil
}
func isExportedOrBuiltinType(t reflect.Type) bool {
pkgPath := t.PkgPath()
return pkgPath == "" || pkgPath == "builtin"
}
```
在Register方法中,循环遍历service.typ中的所有方法,将满足条件的方法添加到service.method中。最后将service添加到Server.services中。
2.2 实现Codec
Codec用于将调用的参数和返回值序列化成二进制数据,以便通过网络传输。
在这里,我们使用golang的标准库encoding/gob实现Codec。Gob是golang标准库中的编解码库,支持任意类型的编解码和传输,比JSON和XML更高效。在实现Codec之前,需要先定义一个request结构体和response结构体,用于存储调用信息和返回信息。
```go
type request struct {
ServiceMethod string // 形如"Service.Method"
Seq uint64 // 请求序列号
Args []byte // 客户端传递的参数
}
type response struct {
Seq uint64 // 请求序列号
ServiceMethod string // 形如"Service.Method"
Error string // 存储错误信息
Reply []byte // 存储响应参数
}
```
接下来实现Codec,具体实现代码如下:
```go
type Codec struct {
conn io.ReadWriteCloser
dec *gob.Decoder
enc *gob.Encoder
mutex sync.Mutex
ids uint64
pending map[uint64]*call
}
type call struct {
req *request
resp *response
done chan *call
}
func (c *Codec) WriteRequest(method string, args interface{}) (uint64, error) {
c.mutex.Lock()
defer c.mutex.Unlock()
id := c.ids
c.ids++
req := &request{
ServiceMethod: method,
Seq: id,
}
buf := bytes.NewBuffer(nil)
enc := gob.NewEncoder(buf)
if err := enc.Encode(args); err != nil {
return 0, err
}
req.Args = buf.Bytes()
call := &call{
req: req,
resp: new(response),
done: make(chan *call),
}
c.pending[id] = call
if err := c.enc.Encode(req); err != nil {
delete(c.pending, id)
return 0, err
}
return id, nil
}
func (c *Codec) ReadResponseHeader() (*rpc.Response, error) {
c.mutex.Lock()
defer c.mutex.Unlock()
var resp response
if err := c.dec.Decode(&resp); err != nil {
return nil, err
}
call := c.pending[resp.Seq]
delete(c.pending, resp.Seq)
call.resp = &resp
call.done <- call
return &rpc.Response{
ServiceMethod: resp.ServiceMethod,
Seq: resp.Seq,
Error: errors.New(resp.Error),
}, nil
}
func (c *Codec) ReadResponseBody(x interface{}) error {
c.mutex.Lock()
defer c.mutex.Unlock()
call := <-c.pending[resp.Seq].done
if call.resp.Error != "" {
return errors.New(call.resp.Error)
}
dec := gob.NewDecoder(bytes.NewBuffer(call.resp.Reply))
return dec.Decode(x)
}
```
在上面的代码中,我们使用了一个pending map来存储请求的序列号和请求的返回值。在WriteRequest方法中,我们将请求信息编码成二进制数据,然后将请求信息和该请求的channel存储到pending中。在ReadResponseHeader和ReadResponseBody方法中,我们根据pending中的请求序列号获取该请求对应的call,然后将call.resp进行解码后返回。
2.3 实现Transport
Transport用于将序列化后的二进制数据通过网络传输到远程服务。
在golang中,可以使用net包来实现简单的Socket编程。在这里,我们通过net.Dial建立连接后,将Codec中序列化后的数据通过Socket发送到远程服务端。
```go
type Transport struct {
conn io.ReadWriteCloser
}
func (t *Transport) Dial(network, address string) error {
conn, err := net.Dial(network, address)
if err != nil {
return err
}
t.conn = conn
return nil
}
func (t *Transport) Close() error {
return t.conn.Close()
}
func (t *Transport) Codec() rpc.ClientCodec {
return &Codec{
conn: t.conn,
dec: gob.NewDecoder(t.conn),
enc: gob.NewEncoder(t.conn),
pending: make(map[uint64]*call),
}
}
```
2.4 完成框架
最后,我们完成自定义RPC框架的实现。具体代码如下:
```go
type Server struct {
services map[string]*service
}
type service struct {
typ reflect.Type
method map[string]*methodType
}
type methodType struct {
method reflect.Method
ArgType reflect.Type
ReplyType reflect.Type
}
func (s *Server) Register(receiver interface{}) error {
service := new(service)
service.typ = reflect.TypeOf(receiver).Elem()
service.method = make(map[string]*methodType)
for i := 0; i < service.typ.NumMethod(); i++ {
method := service.typ.Method(i)
mType := method.Type
if mType.NumIn() != 3 || mType.NumOut() != 1 {
continue
}
argType := mType.In(1)
replyType := mType.In(2)
if !isExportedOrBuiltinType(argType) || !isExportedOrBuiltinType(replyType) {
continue
}
service.method[method.Name] = &methodType{
method: method,
ArgType: argType,
ReplyType: replyType,
}
}
s.services[reflect.Indirect(reflect.ValueOf(receiver)).Type().Name()] = service
return nil
}
func (s *Server) ServeCodec(codec rpc.ServerCodec) error {
for {
req, err := codec.ReadRequestHeader()
if err != nil {
if err != io.EOF && err != io.ErrUnexpectedEOF {
log.Println("rpc server:", err)
}
return err
}
serviceMethod := req.ServiceMethod
dot := strings.LastIndex(serviceMethod, ".")
if dot < 0 {
err := errors.New("rpc server: service/method request ill-formed: " + serviceMethod)
log.Println(err.Error())
resp := &rpc.Response{
ServiceMethod: serviceMethod,
Seq: req.Seq,
Error: err.Error(),
}
codec.WriteResponse(resp, nil)
continue
}
serviceName, methodName := serviceMethod[:dot], serviceMethod[dot+1:]
service, ok := s.services[serviceName]
if !ok {
err := errors.New("rpc server: can't find service " + serviceName)
log.Println(err.Error())
resp := &rpc.Response{
ServiceMethod: serviceMethod,
Seq: req.Seq,
Error: err.Error(),
}
codec.WriteResponse(resp, nil)
continue
}
mtype, ok := service.method[methodName]
if !ok {
err := errors.New("rpc server: can't find method " + methodName)
log.Println(err.Error())
resp := &rpc.Response{
ServiceMethod: serviceMethod,
Seq: req.Seq,
Error: err.Error(),
}
codec.WriteResponse(resp, nil)
continue
}
argv := reflect.New(mtype.ArgType)
replyv := reflect.New(mtype.ReplyType).Elem()
if err = codec.ReadRequestBody(argv.Interface()); err != nil {
log.Println("rpc server: ", err)
resp := &rpc.Response{
ServiceMethod: serviceMethod,
Seq: req.Seq,
Error: err.Error(),
}
codec.WriteResponse(resp, nil)
continue
}
// Call the service method.
returnValues := mtype.method.Func.Call([]reflect.Value{
reflect.ValueOf(service),
reflect.ValueOf(argv.Interface()),
replyv,
})
// The return value for the method is an error.
errInter := returnValues[0].Interface()
if errInter != nil {
err := errInter.(error)
log.Println("rpc server: ", err)
resp := &rpc.Response{
ServiceMethod: serviceMethod,
Seq: req.Seq,
Error: err.Error(),
}
codec.WriteResponse(resp, nil)
continue
}
resp := &rpc.Response{
ServiceMethod: serviceMethod,
Seq: req.Seq,
Error: "",
}
if err = codec.WriteResponse(resp, replyv.Interface()); err != nil {
log.Println("rpc server: ", err)
}
}
}
func (s *Server) ServeTransport(transport *Transport) error {
codec := transport.Codec()
defer transport.Close()
return s.ServeCodec(codec)
}
func isExportedOrBuiltinType(t reflect.Type) bool {
pkgPath := t.PkgPath()
return pkgPath == "" || pkgPath == "builtin"
}
```
在上面的代码中,我们定义了一个Server结构体,用于注册Service和Method,同时实现ServeCodec和ServeTransport方法,用于在服务端处理RPC请求。
3. 测试
完成自定义RPC框架的实现之后,我们需要对其进行测试。下面我们将分别在服务端和客户端使用该RPC框架。
服务端代码:
```go
func main() {
server := new(Server)
server.services = make(map[string]*service)
_ = server.Register(&CalculatorServiceImpl{})
transport := new(Transport)
_ = transport.Dial("tcp", "localhost:8080")
defer transport.Close()
log.Fatal(server.ServeTransport(transport))
}
```
在服务端,我们首先通过Server.Register方法注册了一个CalculatorServiceImpl服务,然后使用Transport.Dial方法连接到特定的地址。
客户端代码:
```go
func main() {
transport := new(Transport)
_ = transport.Dial("tcp", "localhost:8080")
defer transport.Close()
client := rpc.NewClientWithCodec(transport.Codec())
var res int
err := client.Call("CalculatorService.Add", []int{10, 20}, &res)
if err != nil {
log.Fatal(err)
}
log.Printf("Add(10, 20) = %d", res)
var mul int
err = client.Call("CalculatorService.Multiply", []int{10, 20}, &mul)
if err != nil {
log.Fatal(err)
}
log.Printf("Multiply(10, 20) = %d", mul)
}
```
在客户端,我们首先通过Transport.Dial方法连接到服务端,然后通过rpc.NewClientWithCodec方法创建一个客户端,并使用client.Call方法调用服务端的方法。
最后,我们启动服务端和客户端,可以看到客户端成功调用了服务端提供的Add和Multiply方法。
4. 总结
本文介绍了如何使用golang实现自定义RPC框架,包括定义Service和Method,实现Codec和Transport,完成框架等步骤,并通过一个简单的计算器服务对该RPC框架进行了测试。该自定义RPC框架适用于一些特定的业务需求,可以满足不同的RPC调用场景。