go sarama拾遗:有趣的超时 ## 先说结论 这篇文章太细太长了。先说结论。 在sarama进行`producer.SendMessage`重试的时候,会重新创建`brokerProducer`,这会重新dial。 在dial的时候,会硬编码重试3次(总共4次)。如果远端不可用,等待返回错误的时间将是: conf.Net.WriteTimeout + 4* conf.Net.DialTimeout ## 从需求出发 最近接到一个需求,大意是,使用kafka producer尝试写入目标kafka,如果3秒后还未响应(很可能目标kafka已经挂了),则退出流程告警,不再处理。 ## 第一版配置 在sarama的配置中,很自然的想起了带Timeout和Retry的配置。 ``` config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Max = 0 // 重新发送的次数 config.Producer.Timeout = time.Millisecond * 10 // 等待 WaitForAck的时间 config.Producer.Return.Successes = true // syncproducer必须设置 config.Net.WriteTimeout = time.Second * 3 ... kafkaProducer, err := sarama.NewSyncProducer(addresses, config) ``` 在本地用docker搭建一个kafka。 参考:https://juejin.im/entry/6844903829624848398 起动测试程序写消息,写一会后,使用iptables把9092端口封了。对,就是这么暴力: ``` iptables -A OUTPUT -p tcp --dport 9092 -j DROP # 恢复 iptables -D OUTPUT -p tcp --dport 9092 -j DROP ``` 在封锁端口之后,程序3秒后退出。符合预期。  ## Producer 和 Retry 的心路历程 本来需求做完,就应该开开心心的去一张一弛劳逸结合(简称摸鱼),但是,本人向来很有探索的精神。 我想,只试一次,就把对方的kafka 踢了,人生真是太残酷了,恻隐之心下,于是将`Producer.Retry.Max`配置改成了1。 ``` config := sarama.NewConfig() config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Retry.Max = 1 // 重新发送的次数,现在改成1 config.Producer.Timeout = time.Millisecond * 10 // 等待 WaitForAck的时间 config.Producer.Return.Successes = true // syncproducer必须设置 config.Net.WriteTimeout = time.Second * 3 ``` 启动测试。先正常写入消息(这里很重要),然后封锁端口。 本来预计6秒左右,程序就应该退出了。此时诡异的现象发生,**程序直接卡在那里**。直到将近两分多钟后。才开始报错。 ## 消息是怎么发送的 事出反常必有妖,马上谷歌一下。不得不吐槽的是,现在中文的技术文档真的是鱼龙混杂,天下文章一大抄。没有找到啥有价值的资料。最后只能自己动手看源码了。 这里使用goland的分析工具。 首先右键寻找Retry.Max 的引用。一开始很奇怪,只在async_producer.go找到了这个字段的引用。  点进去一看,原来SyncProducer就是AsyncProducer的包装而已。所以只需要看AsyncProducer的实现了。 这里先提一句。AsyncProducer有一个Input channel,所有想发送的数据,往这个chan里扔就行了。通过查找Input的引用,很快找到了`dispatcher`这个函数。在这里,处理整个producer.Input()的消息。  通过罗永浩式的地狱般的流程,跟踪msg的路程。山路十八弯找到了`partitionProducer.dispatch`。**这里先敲黑板,记住这个地方**。 最终有如下代码: ``` pp.brokerProducer.input <- msg ``` 这条消息最终到了brokerProducer中。 那这个brokerProducer是什么呢?跳转到代码`newBrokerProducer`中。发现这两行。这里有看头。 ``` request := set.buildRequest() response, err := broker.Produce(request) // 天啊。绕到这终于发送了。 responses <- &brokerProducerResponse{ // 如果失败,则把消息和错误丢到responses里 set: set, err: err, res: response, } ``` 在`Produce`,终于要发送数据了。先看看发送,`Produce`一路点开,进入到`write`函数中。 ``` func (b *Broker) write(buf []byte) (n int, err error) { if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil { return 0, err } return b.conn.Write(buf) } ``` 这里,终于看到了第一个配置`conf.Net.WriteTimeout`。我感觉我马上离真相很近了。 这里的b.conn,通过反查得知: ``` dialer := conf.getDialer() b.conn, b.connErr = dialer.Dial("tcp", b.addr) ``` conn是一个tcp连接。dialer的代码在这,从这里开始,用的就是go的标准库: ``` return &net.Dialer{ Timeout: c.Net.DialTimeout, KeepAlive: c.Net.KeepAlive, LocalAddr: c.Net.LocalAddr, } ``` ## 看看重试逻辑 发送的代码就先看到这里,这里说下错误处理,也就是retry的逻辑。 当write如果返回了错误,则写到`responses`中。反查`responses`, 一路点开,在`brokerProducer.run`里面,发现了response的处理逻辑。  最终调用`asyncProducer.retryMessage`:  **它会判定是否重试**,如果重试的话,就会把msg放到retries这个channel中,继续查找,找到了`retryHandler`,在这个函数中,消息又被扔回了原来的 `AsyncProducer.input`:  然后,上面的地狱般的流程,又会重新被执行一次。 然而,回到之前的问题。为什么在retry的时候,会卡死。似乎还是没有解答。当看代码没有思路的时候,加点打印,就成了最后的法宝。 ## 再战,永不言败 通过我聪明的大脑袋一思考,发现,绝对是之前看漏了什么东西。我决定把sarama的代码clone下来,加点日志。看看它的执行流程。 ``` git clone https://github.com/Shopify/sarama.git ``` 然后,在go.mod中,把引用的module解析到本地的代码。 go.mod ``` ... require( .... ) replace github.com/Shopify/sarama => /home/honoryin/workspace/personal/sarama ``` 大功告成。 出于直觉,我觉得应该先在`getDialer`中加点日志,很可能和 config.Net.DialTimeout 这个配置有关。 打开`getDialer`函数,打一下调用栈。 ```go func (c *Config) getDialer() proxy.Dialer { fmt.Println("getDialer run") stack := make([]byte, 1024*8) var stackStr = string(stack[:runtime.Stack(stack, false)]) fmt.Println(stackStr) ......... } ``` 令人惊喜的事情发生了。 令人惊喜的事情发生了。 令人惊喜的事情发生了。 在发生失败后,进入重试流程时。`getDialer`**函数重复的被执行**。 getDialer调用栈:  而此时,因为`config.Net.DialTimeout`使用的是默认的配置,所以,在重试的时候把卡在这。 事情到了这里,不妨继续深挖下,为什么会重试getDialer。因为,我是在已经建立连接之后,中途将端口封锁的。它的重试机制在哪里呢。 定位到调用栈的最后一行`broker.go:161`,发现调用了匿名函数,所以如法炮制,找到这个匿名函数的调用处`Open`,再打下调用栈。  调用栈结果:  找到`async_producer.go:517`,发现居然是上面的`dispatch`函数。  顺着调用栈从下往上看,  因为此时是重试,`msg.retries`被设为1,大于`pp.highWatermark`(默认0) `newHighWatermark`函数中,brokerProducer被设置为了nil。 原来,这就是之前看漏的地方。  因为brokerProducer被设置为了nil,下面需要进行brokerProducer的重新配置。  在`updateLeader`函数:  看到`pp.breaker.Run`,这里的`breaker`**就是一个重试器**。 反查它的定义: ``` breaker: breaker.New(3, 1, 10*time.Second), ``` 这里,设定了3次重试,也就是说必须调4次`getDialer`失败,才会收到错误。而`getDialer`,又会因为被`config.Net.DialTimeout`控制,默认时间是30秒。这会造成长期的等待。 窗外,夜色正美。 凉风拂面,虽然已经到了深夜,但一种奇妙的乐趣让人心旷神怡。站在大厦之上,仰望深沉的天空。 -- 公昔登临,想诗境满怀,酒杯在手 -- 我来依旧,见青山对面,明月当楼 来自 大脸猪 写于 2020-11-18 10:57 -- 更新于2020-11-18 18:10 -- 1 条评论