go: 优雅处理kafka消费退出 在业务中,kafka的消费者服务非常常见。主要流程是从kafka中取出消息,处理消息。 本文使用kafka-go(github.com/segmentio/kafka-go),调研kafka优雅退出的方式和注意事项。 在这之前,先准备一个多 partitions的 kafka作为实验环境。 ``` kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 3 --topic testkafka # 创建kafka topic kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic testkafka # 删除topic ``` ## 写入端 ``` func WriteMsg2Kafka() { kafkawriter := &kafka.Writer{ Addr: kafka.TCP("127.0.0.1:9092"), Topic: "testkafka", Balancer: &kafka.RoundRobin{}, Async: true, Compression: kafka.Snappy, RequiredAcks: kafka.RequireOne, Completion: func(messages []kafka.Message, err error) { if err != nil { logrus.Errorf("write kafka error:%v", err) } }, ErrorLogger: logrus.StandardLogger(), } i := 0 for { i += 1 kafkawriter.WriteMessages(context.Background(), kafka.Message{ Value: []byte(fmt.Sprint(i)), }) logrus.Infof("send msg") time.Sleep(time.Millisecond * 10) } } ``` 为了方便观察现象,写入端会向kafka中顺序写入 1、2、3...。这样在消费者就能知道是否丢失了消息。 ## 消费者 ``` func ReadKafkaWithKafkago(ctx context.Context, task string) { reader := kafka.NewReader(kafka.ReaderConfig{ Brokers: []string{"127.0.0.1:9092"}, Topic: "testkafka", CommitInterval: time.Second, // 重要的配置,如果不配置将严重影响写入性能 GroupID: "test1", // Partition: partition.ID, // MaxWait: time.Millisecond * 500, // Logger: logrus.StandardLogger(), QueueCapacity: 50, }) loop: for { msg, err := reader.FetchMessage(ctx) if err != nil { if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { logrus.Infof("kafka get context canceled:%v", err) break loop } if errors.Is(err, io.EOF) { // 当reader.Close后,进入这个分支 logrus.Infof("kafka get eof") break loop } logrus.Errorf("kafka get msg error:%v", err) continue } // logrus.Infof("%s", msg.Value) t := &Testkafka{} id, _ := datautils.ToInt(string(msg.Value)) t.Id = id t.Task = task t.Insert() err = reader.CommitMessages(context.TODO(), msg) if err != nil { logrus.Errorf("commit error:%v", err) } } reader.Close() } ``` 1. 在这个函数中,传入了`ctx`用来控制消费者的生命周期。 2. 当`ctx.Done`的时候,触发`kafka get context canceled`调用。循环被终止。 3. 在循环跳出后,调用`reader.Close()` 4. 为了验证是否丢数据,会将每条消息(id)写入到mysql的表中。 在main中,监听退出信号: ``` ctx, cancel := context.WithCancel(context.Background()) sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) go func() { sig := <-sigs logrus.Infof("GraceFullyExit has exited, sig:%v", sig) cancel() // 给kafka消费者发信号让它退出 }() task := os.Args[2] kafkatest.ReadKafkaWithKafkago(ctx, task) time.Sleep(time.Second) ``` ## 结论 1. 使用`SELECT id FROM testkafka t1 WHERE NOT EXISTS (SELECT * FROM testkafka t2 WHERE t2.id = t1.id + 1)`可以观察mysql的表是否ID连续。用以判定是否有数据丢失。 2. 多次kill 1-N个消费者并重启消费者,不影响kafka数据消费的完整性。这个示例**满足数据不丢失**这一要求。 3. 如果**只有一个消费者**,在kill掉并拉起时,不会有数据重消费的问题。 4. 如果有多个消费者,kill掉其中一个**会偶尔出现少量已入库的消息被重消费**。具体原因不明,猜测原因和rebalance机制有关。 来自 大脸猪 写于 2023-11-07 14:33 -- 更新于2023-11-25 22:11 -- 0 条评论