2025年5月19日 10:38:37 星期一

grpc 检测客户端连接是否存在

默认情况下,服务端是没有检测客户端连接是否存活的。
如果因为网络抖动,客户端退出,此时客户端会向服务端发送一个Fin_wait2的消息。但这个消息如果丢失,服务端将长期认为客户端“仍然存在”,即使此时客户端已经退出。
为了解决这个问题,grpc服务端在启动的时候,可以传入keepalive参数,原理是:每隔N秒ping客户端,当客户端无法ping通的时候,服务端会主动断开连接。代码如下:

  1. var kasp = keepalive.ServerParameters{
  2. Time: 5 * time.Second, // Ping the client if it is idle for 5 seconds to ensure the connection is still active
  3. Timeout: 1 * time.Second, // Wait 1 second for the ping ack before assuming the connection is dead
  4. }
  5. lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
  6. if err != nil {
  7. log.Fatalf("failed to listen: %v", err)
  8. }
  9. s := grpc.NewServer(grpc.KeepaliveParams(kasp))
  10. pb.RegisterHelloServer(s, &svc{})
  11. if err := s.Serve(lis); err != nil {
  12. log.Fatalf("failed to serve: %v", err)
  13. }

上面的代码表明,每隔5s ping一次客户端,并且回包必须在1s内返回。否则连接将被回收。
服务端的处理代码可以写成这样:

  1. func (s *svc) HelloStream(stream pb.Hello_HelloStreamServer) error {
  2. now := time.Now().Unix()
  3. log.Println("enter stream", now)
  4. req, err := stream.Recv()
  5. if err != nil {
  6. return err
  7. }
  8. name := req.Greetings
  9. loop:
  10. for {
  11. out := new(pb.HelloRsp)
  12. out.Resp = fmt.Sprintf("hello,%v", name)
  13. sendctx, sendcancel := context.WithCancel(context.Background())
  14. go func() {
  15. err := stream.Send(out) // 在协程中send,使得context.Done响应能被处理
  16. if err != nil {
  17. log.Println(err)
  18. }
  19. sendcancel()
  20. }()
  21. select {
  22. case <-sendctx.Done():
  23. case <-stream.Context().Done()://当keepalive连接超时,这里的逻辑被执行,服务端退出
  24. log.Println("client closed")
  25. break loop
  26. }
  27. time.Sleep(time.Second)
  28. }
  29. log.Println("exit stream")
  30. return nil
  31. }

真的隐蔽啊。真的坑啊。

来自 大脸猪 写于 2019-11-26 11:15 -- 更新于2020-10-19 13:06 -- 0 条评论

0条评论

字体
字号


评论: