Apache Dubbo 有injvm方式的通信,能够避免网络带来的延迟,同时也不占用本地端口,对测试、本地验证而言,是一种比较方便的RPC通信方式。
最近看到 containerd 的代码,发现它也有类似的需求。 但使用ip端口通信,有可能会有端口冲突;使用unix socket,可能会有路径冲突。 考察了下gRPC有没有和injvm类似的,基于内存的通信方式。后来发现pipe非常好用,所以记录了下。
Golang/gRPC对网络的抽象 首先,我们先看一下gRPC一次调用的架构图。当然,这个架构图目前只关注了网络抽象分布。
我们重点关注网络部分。
操作系统系统抽象 首先,在网络包之上,系统抽象出来了socket ,代表一条虚拟连接,对于UDP,这个虚拟连接是不可靠的,对于TCP,这个链接是尽力可靠的。
对于网络编程而言,仅仅有连接是不够的,还需要告诉开发者如何创建、关闭连接。对于服务端 ,系统提供了accept方法 ,用来接收连接。对于客户端 ,系统提供了connect方法 ,用于和服务端建立连接。
Golang抽象 在Golang中,socket对等的概念叫net.Conn ,代表了一条虚拟连接。
接下来,对于服务端,accept这个行为被包装成了net.Listener接口 ;对于客户端,Golang则基于connect提供了net.Dial方法 。
1 2 3 4 5 6 type  Listener interface  {      Accept() (Conn, error )   Close() error    Addr() Addr }
 
gRPC使用 那么gRPC是怎么使用Listener和Dial的呢?
对于gRPC服务端,Serve方法 接收一个Listener,表示在这个Listener上提供服务。
对于gRPC客户端,网络本质上就是一个能够连接到某个地方的东西就可以,所以只需要一个dialer func(context.Context, string) (net.Conn, error)函数就行了。
什么是pipe 在操作系统层面,pipe 表示一个数据管道,而这个管道两端都在本程序中,可以很好的满足我们的要求:基于内存的网络通信。
Golang也基于pipe提供了net.Pipe()函数 创建了一个双向的、基于内存通信的管道,在能力上,能够很好的满足gRPC对底层通信的要求。
但是net.Pipe仅仅产生了两个net.Conn,即只产生两个网络连接,没有之前提到的Listner,也没有Dial方法。
于是结合Golang的channel,把net.Pipe包装成了Listner,也提供了Dial方法:
Listener.Accept(),只需要监听一个channel,客户端连接过来的时候,把连接通过channel传递过来即可 
Dial方法,调用Pipe,将一端通过channel给服务端(作为服务端连接),另一端作为客户端连接 
 
代码如下:
pipe_listener.go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 package  mainimport  (   "context"    "errors"    "net"    "sync"    "sync/atomic"  )var  ErrPipeListenerClosed = errors.New(`pipe listener already closed` )type  PipeListener struct  {   ch    chan  net.Conn   close  chan  struct {}   done  uint32    m     sync.Mutex }func  ListenPipe ()   *PipeListener {   return  &PipeListener{     ch:    make (chan  net.Conn),     close : make (chan  struct {}),   } }func  (l *PipeListener)   Accept() (c net.Conn, e error ) {   select  {   case  c = <-l.ch:   case  <-l.close :     e = ErrPipeListenerClosed   }   return  }func  (l *PipeListener)   Close() (e error ) {   if  atomic.LoadUint32(&l.done) == 0  {     l.m.Lock()     defer  l.m.Unlock()     if  l.done == 0  {       defer  atomic.StoreUint32(&l.done, 1 )       close (l.close )       return      }   }   e = ErrPipeListenerClosed   return  }func  (l *PipeListener)   Addr() net.Addr {   return  pipeAddr(0 ) }func  (l *PipeListener)   Dial(network, addr string ) (net.Conn, error ) {   return  l.DialContext(context.Background(), network, addr) }func  (l *PipeListener)   DialContext(ctx context.Context, network, addr string ) (conn net.Conn, e error ) {      if  atomic.LoadUint32(&l.done) != 0  {     e = ErrPipeListenerClosed     return    }      c0, c1 := net.Pipe()      select  {   case  <-ctx.Done():     e = ctx.Err()   case  l.ch <- c0:     conn = c1   case  <-l.close :     c0.Close()     c1.Close()     e = ErrPipeListenerClosed   }   return  }type  pipeAddr int func  (pipeAddr)   Network() string  {   return  `pipe`  }func  (pipeAddr)   String() string  {   return  `pipe`  }
 
如何用pipe作为gRPC的connection 有了上面的包装,我们就可以基于此创建一个gRPC的服务器端和客户端,来进行基于内存的RPC通信了。
首先,我们简单的创建一个服务,包含了四种调用方式:
helloworld/helloworld.proto 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 syntax = "proto3" ;option  go_package = "google.golang.org/grpc/examples/helloworld/helloworld" ;option  java_multiple_files = true ;option  java_package = "io.grpc.examples.helloworld" ;option  java_outer_classname = "HelloWorldProto" ;package  helloworld;service  Greeter  {      rpc  SayHello(HelloRequest) returns  (HelloReply)  {}      rpc  SayHelloReplyStream(HelloRequest) returns  (stream HelloReply) ;      rpc  SayHelloRequestStream(stream HelloRequest) returns  (HelloReply) ;      rpc  SayHelloBiStream(stream HelloRequest) returns  (stream HelloReply) ; }message  HelloRequest  {   string  name = 1 ; }message  HelloReply  {   string  message = 1 ; }
 
然后生成相关的stub代码:
1 2 3 protoc --go_out=. --go_opt=paths=source_relative \   --go-grpc_out=. --go-grpc_opt=paths=source_relative \   helloworld/helloworld.proto
 
然后开始写服务端代码,基本逻辑就是实现一个demo版本的服务端就好:
server.go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 package  mainimport  (   "context"    "log"    "github.com/robberphex/grpc-in-memory/helloworld"    pb "github.com/robberphex/grpc-in-memory/helloworld"  )type  server struct  {         pb.UnimplementedGreeterServer }func  (s *server)   SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error ) {   log.Printf("Received: %v" , in.GetName())   return  &pb.HelloReply{Message: "Hello "  + in.GetName()}, nil  }func  (s *server)   SayHelloRequestStream(streamServer pb.Greeter_SayHelloRequestStreamServer) error  {   req, err := streamServer.Recv()   if  err != nil  {     log.Printf("error receiving: %v" , err)     return  err   }   log.Printf("Received: %v" , req.GetName())   req, err = streamServer.Recv()   if  err != nil  {     log.Printf("error receiving: %v" , err)     return  err   }   log.Printf("Received: %v" , req.GetName())   streamServer.SendAndClose(&pb.HelloReply{Message: "Hello "  + req.GetName()})   return  nil  }func  (s *server)   SayHelloReplyStream(req *pb.HelloRequest, streamServer pb.Greeter_SayHelloReplyStreamServer) error  {   log.Printf("Received: %v" , req.GetName())   err := streamServer.Send(&pb.HelloReply{Message: "Hello "  + req.GetName()})   if  err != nil  {     log.Printf("error Send: %+v" , err)     return  err   }   err = streamServer.Send(&pb.HelloReply{Message: "Hello "  + req.GetName() + "_dup" })   if  err != nil  {     log.Printf("error Send: %+v" , err)     return  err   }   return  nil  }func  (s *server)   SayHelloBiStream(streamServer helloworld.Greeter_SayHelloBiStreamServer) error  {   req, err := streamServer.Recv()   if  err != nil  {     log.Printf("error receiving: %+v" , err)          return  err   }   log.Printf("Received: %v" , req.GetName())   err = streamServer.Send(&pb.HelloReply{Message: "Hello "  + req.GetName()})   if  err != nil  {     log.Printf("error Send: %+v" , err)     return  err   }      return  nil  }func  NewServerImpl ()   *server {   return  &server{} }
 
然后我们创建一个基于pipe连接的客户端来调用服务端。
包含如下几个步骤:
创建服务端实现 
基于pipe创建listener,然后基于它创建gRPC server 
基于pipe创建客户端连接,然后创建gRPC client,调用服务 
 
代码如下:
client.go 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 package  mainimport  (   "context"    "fmt"    "log"    "net"    pb "github.com/robberphex/grpc-in-memory/helloworld"    "google.golang.org/grpc"  )func  serverToClient (svc *server)   pb.GreeterClient {      pipe := ListenPipe()   s := grpc.NewServer()      pb.RegisterGreeterServer(s, svc)   if  err := s.Serve(pipe); err != nil  {     log.Fatalf("failed to serve: %v" , err)   }      clientConn, err := grpc.Dial(`pipe` ,     grpc.WithInsecure(),     grpc.WithContextDialer(func (c context.Context, s string )   (net.Conn, error ) {       return  pipe.DialContext(c, `pipe` , s)     }),   )   if  err != nil  {     log.Fatalf("did not connect: %v" , err)   }      c := pb.NewGreeterClient(clientConn)   return  c }func  main ()   {   svc := NewServerImpl()   c := serverToClient(svc)   ctx := context.Background()      for  i := 0 ; i < 5 ; i++ {     r, err := c.SayHello(ctx, &pb.HelloRequest{Name: fmt.Sprintf("world_unary_%d" , i)})     if  err != nil  {       log.Fatalf("could not greet: %v" , err)     }     log.Printf("Greeting: %s" , r.GetMessage())   }      for  i := 0 ; i < 5 ; i++ {     streamClient, err := c.SayHelloRequestStream(ctx)     if  err != nil  {       log.Fatalf("could not SayHelloRequestStream: %v" , err)     }     err = streamClient.Send(&pb.HelloRequest{Name: fmt.Sprintf("SayHelloRequestStream_%d" , i)})     if  err != nil  {       log.Fatalf("could not Send: %v" , err)     }     err = streamClient.Send(&pb.HelloRequest{Name: fmt.Sprintf("SayHelloRequestStream_%d_dup" , i)})     if  err != nil  {       log.Fatalf("could not Send: %v" , err)     }     reply, err := streamClient.CloseAndRecv()     if  err != nil  {       log.Fatalf("could not Recv: %v" , err)     }     log.Println(reply.GetMessage())   }      for  i := 0 ; i < 5 ; i++ {     streamClient, err := c.SayHelloReplyStream(ctx, &pb.HelloRequest{Name: fmt.Sprintf("SayHelloReplyStream_%d" , i)})     if  err != nil  {       log.Fatalf("could not SayHelloReplyStream: %v" , err)     }     reply, err := streamClient.Recv()     if  err != nil  {       log.Fatalf("could not Recv: %v" , err)     }     log.Println(reply.GetMessage())     reply, err = streamClient.Recv()     if  err != nil  {       log.Fatalf("could not Recv: %v" , err)     }     log.Println(reply.GetMessage())   }      for  i := 0 ; i < 5 ; i++ {     streamClient, err := c.SayHelloBiStream(ctx)     if  err != nil  {       log.Fatalf("could not SayHelloStream: %v" , err)     }     err = streamClient.Send(&pb.HelloRequest{Name: fmt.Sprintf("world_stream_%d" , i)})     if  err != nil  {       log.Fatalf("could not Send: %v" , err)     }     reply, err := streamClient.Recv()     if  err != nil  {       log.Fatalf("could not Recv: %v" , err)     }     log.Println(reply.GetMessage())   } }
 
总结 当然,作为基于内存的RPC调用,还可以有更好的方式,比如直接将对象传递到服务端,直接通过本地调用方式来通信。 但这种方式破坏了很多约定,比如对象地址、比如gRPC连接参数不生效等等。
本文介绍的,基于Pipe的通信方式,除了网络层走了内存传递之外,其他都和正常RPC通信行为一致,比如同样经历了序列化、经历了HTTP/2的流控制等。当然,性能上比原生调用也会差一点,但是好在对于测试、验证场景,行为上的一致比较重要些。
本文代码已经托管到了GitHub https://github.com/robberphex/grpc-in-memory 。