go: 如何正确使用grpc stream tag: grpc stream client server 本文将使用go,编写通用的服务端、客户端的代码。包含单向流和双向流。 ## 基本原理 1. 从stream来看,分为接收端(调用`Recv`)发送端(`调用Send`)。流程是类似的。 2. **接收端必须由发送端来控制结束**,当客户端调用`CloseSend`,则服务端`Recv EOF`。当服务端`return nil`,则客户端`Recv EOF`。这样才能保证数据的传输安全。其它情况下终止客户端或服务端,都可能造成数据丢失。 3. `Recv/Send`会阻塞,需要在独立协程中处理。 4. 在`GracefulStop`的时候,新的流不能进来,但服务端没有能力终止stream,客户端也不会收到通知。可以`return nil`强行终止stream,此时客户端收到`EOF`,客户端如果在发送数据,则数据可能丢失。stream应该有生命周期,在`GracefulStop`后,等待这个周期再进行强行`Stop`。一种不好的实现是,发送端是无穷循环永远不`CloseSend`,使得stream的生命周期无限长,`GracefulStop`机制失效。 ## 示例proto proto中包含简单的pb字段,并且包含双向流和单向流的api。 ``` syntax = "proto3"; package hello; option go_package = "grpcexample/proto/hello"; message HelloReq { string Req = 1; } message HelloRsp { string Reply = 1; } service helloSvc { // https://grpc.io/docs/languages/go/basics/ 阅读Bidirectional streaming RPC // 双向流 rpc SendHello(stream HelloReq) returns (stream HelloRsp); // 单向流 rpc SendHello2(stream HelloReq) returns (HelloRsp); } ``` ## 单向流 ### 客户端 1. 这个客户端是数据上报业务,会源源不断地将数据向服务端发送。 2. **keepalive非常重要**,可以避免客户端因服务器重启等问题造成连接泄露。 3. 调用`stream.Send`时,如果服务器此时已经return,将收到eof。 ```go func TestSendHello2(t *testing.T) { keepAliveArgs := keepalive.ClientParameters{ Time: 10 * time.Second, // 至少10S,如果10S内没有ping或者数据发送/接收,则触发连接回收 // 每次ping进行等待的最长时间,keepalive维持一个倒计时器,当触发连接回收并timeout后,连接断开 Timeout: 20 * time.Second, } conn, err := grpc.Dial( "127.0.0.1:6688", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithKeepaliveParams(keepAliveArgs), ) if err != nil { t.Fatal(err) } client := hello.NewHelloSvcClient(conn) ctx, cancel := context.WithCancel(context.Background()) stream, err := client.SendHello2(ctx) if err != nil { t.Fatal(err) } for i := 0; i < 5; i++ { err := stream.Send(&hello.HelloReq{ Req: fmt.Sprintf("hello:%v", i), }) if err != nil { if errors.Is(err, io.EOF) { log.Println("Send EOF") cancel() break } t.Errorf("send error:%v", err) cancel() break } time.Sleep(time.Second) } // 关闭流并等待服务器的响应 res, err := stream.CloseAndRecv() if errors.Is(err, io.EOF) { // 正常退出 log.Printf("CloseSend EOF") } else if err != nil { log.Fatalf("CloseSend error:%v", err) } log.Println("res:", res) // 接收到服务器的返回数据 } ``` 如果客户端已经发送了所有的数据,且不继续使用这个stream,可以调用CloseAndRecv。 此时客户端会收到`EOF`和服务器返回的最后一条消息。 1. 当客户端`CloseAndRecv`,服务端会收到`EOF`,**代表数据传输正常完结**。 2. 当客户端强行退出,服务端收到`codes.Canceled`错误,数据可能会丢失。 3. 一种不太好的实现是客户端stream为无穷循环永远不`CloseSend`。这会使得stream被hold住永远触发不了服务端的`GracefulStop`。这在服务端重启的时候会有数据丢失。 ### 服务端 ```go func (s *HelloServer) SendHello2(svc pb.HelloSvc_SendHello2Server) error { errChan := make(chan error, 1) go func() { loop: for { // 在Recv的时候,没办法传入ctx,服务端无法强行终止stream // https://github.com/grpc/grpc-go/issues/3909 req, err := svc.Recv() if err != nil { s, ok := status.FromError(err) if ok && s.Code() == codes.Canceled { log.Println("收到取消请求信号,流正在关闭") errChan <- nil break loop } else if errors.Is(err, io.EOF) { log.Println("收到EOF正常退出流") errChan <- nil return } log.Printf("recv error:%v", err) errChan <- err break loop } // 使用req的数据 fmt.Printf("req:%v\n", req) } fmt.Printf("goroutine exit\n") }() select { case <-s.GlobalCtx.Done(): // 当服务重启或退出,此Ctx done,直接终止这个stream return nil case v := <-errChan: // 当出现任何错误或正常退出,此stream退出 return v } } ``` 启动服务: ```go func normalGrpcSvr() { var keepAliveArgs = keepalive.ServerParameters{ Time: 10 * time.Second, Timeout: 20 * time.Second, MaxConnectionAge: 30 * time.Second, } listener, err := net.Listen("tcp", fmt.Sprintf(":%d", 6688)) if err != nil { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer( grpc.KeepaliveParams(keepAliveArgs), grpc.MaxSendMsgSize(1024*1024*4), grpc.MaxRecvMsgSize(1024*1024*4), ) ctx, cancel := context.WithCancel(context.Background()) pb.RegisterHelloSvcServer(s, &services.HelloServer{ GlobalCtx: ctx, }) reflection.Register(s) fmt.Printf("run:%v\n", 6688) sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) go func() { sig := <-sigs cancel() fmt.Printf("GraceFullyExit has exited, sig:%v\n", sig) s.GracefulStop() }() if err := s.Serve(listener); err != nil { logrus.Errorf("failed to serve: %v", err) } } ``` 由于**服务端不能终止stream**,服务端在执行Recv和Send的时候有hold的可能。https://github.com/grpc/grpc-go/issues/3909 至少在grpc v2,都无法解决此问题。 当服务端调用`GracefulStop`时,新的客户端将不能再连接进来,但已连接的客户端流**并不会收到任何消息或断开**,此时需要return所有现存的流。 此时客户端`Send`数据,数据可能丢失。 在这个设计中: 1. 服务端结构中包含`GlobalCtx`,在启动服务时,监听信号`syscall.SIGINT, syscall.SIGTERM`。当收到这两个信号时,cancel `GlobalCtx`,并调用`GracefulStop`。 2. 为防止服务端`Recv`被卡住,独立使用的协程。服务端可以在外面`return nil`,客户端收到`EOF`。但这个行为会丢失数据。 ## 双向流 ### 客户端 双向流要独立地处理`Send`和`Recv`两个流程。 ```go func TestSendHello(t *testing.T) { keepAliveArgs := keepalive.ClientParameters{ Time: 10 * time.Second, // 至少10S,如果10S内没有ping或者数据发送/接收,则触发连接回收 // 每次ping进行等待的最长时间,keepalive维持一个倒计时器,当触发连接回收并timeout后,连接断开 Timeout: 20 * time.Second, } conn, err := grpc.Dial( "127.0.0.1:6688", grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithKeepaliveParams(keepAliveArgs), ) if err != nil { t.Fatal(err) } errChan := make(chan error, 1) client := hello.NewHelloSvcClient(conn) ctx, cancel := context.WithCancel(context.Background()) stream, err := client.SendHello(ctx) if err != nil { t.Fatal(err) } // 即使服务端不会回数据给客户端,仍然要在客户端等待EOF信号 go func() { for { reply, err := stream.Recv() s, ok := status.FromError(err) if errors.Is(err, io.EOF) { log.Println("Recv EOF") cancel() close(errChan) return } else if err != nil { t.Errorf("Recv error:%v", err) cancel() errChan <- err return } fmt.Printf("reply:%v\n", reply) } }() // 向服务端发数据 for i := 0; i < 10000; i++ { err := stream.Send(&hello.HelloReq{ Req: fmt.Sprintf("hello:%v", i), }) // send正常情况下不应该收到任何错误,发现错误则应该终止并进入错误处理 if errors.Is(err, io.EOF) { log.Println("Send EOF") cancel() return } else if err != nil { t.Errorf("send error:%v", err) cancel() break } log.Printf("send %v", i) // time.Sleep(time.Second) } // 当所有的消息被发送完毕,则调用closeSend,此时应该在Recv处等待Eof错误,代表服务端已经正常全部接收完所有消息 stream.CloseSend() err = <-errChan if err != nil { t.Errorf("CloseSend error:%v", err) return } // 正常退出 } ``` 双向流的客户端要独立的处理`Send`和`Recv`。分场景来讨论,总结起来只有三类错误场景:EOF、`codes.Canceled`、`transport is closing`,分3类场景讨论: 1. 客户端发送、接收数据的时候,服务端`return nil`。 1. 客户端的发送和接收两端都收到`EOF `信号 。**此场景下,可能存在数据丢失。**需要进行复杂的协议设计来避免数据丢失。 2. 服务端的`Send`出现`rpc error: code = Unavailable desc = transport is closing`错误。因为流已经关闭,无法再发送数据。这里要进行妥善的处理,**return后不能再调用Send**。 1. 服务端正常,客户端强行终止(不调用CloseSend)时: 1. 服务端在`Recv`的地方将收到`codes.Canceled`。 2. 在服务端的`Send`端将出现`rpc error: code = Unavailable desc = transport is closing`错误。因为流已经关闭,无法再发送数据。**此场景下,可能存在数据丢失。 这说明,无论是服务端还是客户端**非正常的流程都会有数据丢失风险**。 3. 正常场景: 1. 客户端发送数据完毕,调用`stream.CloseSend()`: 2. 在服务端的`Recv`,将收到`EOF`。这代表客户端已经将数据发送完。 3. 此时,如果服务端还有数据发送给客户端,不用着急`return nil`,而应该等待数据`Send`完毕再`return`。return相当于客户端的CloseSend。 4. 在客户端的`Recv`,将收到`EOF`,此时双向流正常退出。 ### 服务端 服务端也必须独立地合适地处理`Send`和`Recv`两个流程。再次指出,**接收端必须由发送端来控制结束**。客户端要显式地调用`CloseSend`,然后不再发送数据。服务端return 后不再发送数据。 这段代码除了独立地处理Send和Recv后,还引入了`willReturn`这个信号,在客户端终止发送数据时,提醒服务端不要再发送数据,终止stream。实际情况会更复杂,这只是一个参考。 ```go func (s *HelloServer) SendHello(svc pb.HelloSvc_SendHelloServer) error { errChan := make(chan error, 1) willReturn := make(chan int, 1) // 接收数据 go func() { loop: for { req, err := svc.Recv() if err != nil { s, ok := status.FromError(err) if ok && s.Code() == codes.Canceled { log.Println("Recv canceled") errChan <- nil break loop } else if errors.Is(err, io.EOF) { log.Println("Recv EOF") // 这里是正常退出,处理服务端尚未发送完的数据,先不return nil close(willReturn) break loop } log.Printf("Recv error:%v", err) errChan <- err break loop } fmt.Printf("req:%v\n", req) } }() // 发送数据 go func() { i := 0 loop: for { i += 1 err := svc.Send(&pb.HelloRsp{Reply: fmt.Sprintf("OK:%v", i)}) if err != nil { s, ok := status.FromError(err) if ok && s.Code() == codes.Canceled { log.Println("Send canceled") break loop } log.Printf("Send error:%v", err) break loop } select { case <-willReturn: log.Printf("will return") // 终止stream errChan <- nil break loop default: time.Sleep(time.Second) } } }() wait := func() error { // 等待强制结束 select { case <-time.After(time.Second * 15): return status.Error(codes.DeadlineExceeded, "timeout") case v := <-errChan: return v } } select { case <-s.GlobalCtx.Done(): log.Printf("global ctx done") return wait() case v := <-errChan: return v case <-willReturn: return wait() } } ``` 来自 大脸猪 写于 2022-06-06 14:50 -- 更新于2024-09-14 01:17 -- 0 条评论