go sarama拾遗:有趣的超时

先说结论

这篇文章太细太长了。先说结论。

在sarama进行producer.SendMessage重试的时候,会重新创建brokerProducer,这会重新dial。

在dial的时候,会硬编码重试3次(总共4次)。如果远端不可用,等待返回错误的时间将是:
conf.Net.WriteTimeout + 4* conf.Net.DialTimeout

从需求出发

最近接到一个需求,大意是,使用kafka producer尝试写入目标kafka,如果3秒后还未响应(很可能目标kafka已经挂了),则退出流程告警,不再处理。

第一版配置

在sarama的配置中,很自然的想起了带Timeout和Retry的配置。

config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 0                    // 重新发送的次数
config.Producer.Timeout = time.Millisecond * 10  // 等待 WaitForAck的时间
config.Producer.Return.Successes = true // syncproducer必须设置

config.Net.WriteTimeout = time.Second * 3

...
kafkaProducer, err := sarama.NewSyncProducer(addresses, config)

在本地用docker搭建一个kafka。
参考:https://juejin.im/entry/6844903829624848398

起动测试程序写消息,写一会后,使用iptables把9092端口封了。对,就是这么暴力:

iptables -A OUTPUT -p tcp --dport 9092 -j DROP
# 恢复 iptables -D OUTPUT -p tcp --dport 9092 -j DROP

在封锁端口之后,程序3秒后退出。符合预期。

Producer 和 Retry 的心路历程

本来需求做完,就应该开开心心的去一张一弛劳逸结合(简称摸鱼),但是,本人向来很有探索的精神。
我想,只试一次,就把对方的kafka 踢了,人生真是太残酷了,恻隐之心下,于是将Producer.Retry.Max配置改成了1。

config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 1                    // 重新发送的次数,现在改成1
config.Producer.Timeout = time.Millisecond * 10  // 等待 WaitForAck的时间
config.Producer.Return.Successes = true // syncproducer必须设置

config.Net.WriteTimeout = time.Second * 3

启动测试。先正常写入消息(这里很重要),然后封锁端口。
本来预计6秒左右,程序就应该退出了。此时诡异的现象发生,程序直接卡在那里。直到将近两分多钟后。才开始报错。

消息是怎么发送的

事出反常必有妖,马上谷歌一下。不得不吐槽的是,现在中文的技术文档真的是鱼龙混杂,天下文章一大抄。没有找到啥有价值的资料。最后只能自己动手看源码了。

这里使用goland的分析工具。
首先右键寻找Retry.Max 的引用。一开始很奇怪,只在async_producer.go找到了这个字段的引用。

点进去一看,原来SyncProducer就是AsyncProducer的包装而已。所以只需要看AsyncProducer的实现了。

这里先提一句。AsyncProducer有一个Input channel,所有想发送的数据,往这个chan里扔就行了。通过查找Input的引用,很快找到了dispatcher这个函数。在这里,处理整个producer.Input()的消息。

通过罗永浩式的地狱般的流程,跟踪msg的路程。山路十八弯找到了partitionProducer.dispatch这里先敲黑板,记住这个地方

最终有如下代码:

pp.brokerProducer.input <- msg

这条消息最终到了brokerProducer中。

那这个brokerProducer是什么呢?跳转到代码newBrokerProducer中。发现这两行。这里有看头。

            request := set.buildRequest()
            response, err := broker.Produce(request)  // 天啊。绕到这终于发送了。
            responses <- &brokerProducerResponse{  // 如果失败,则把消息和错误丢到responses里
                set: set,
                err: err,
                res: response,
            }

Produce,终于要发送数据了。先看看发送,Produce一路点开,进入到write函数中。

func (b *Broker) write(buf []byte) (n int, err error) {
    if err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout)); err != nil {
        return 0, err
    }
    return b.conn.Write(buf)
}

这里,终于看到了第一个配置conf.Net.WriteTimeout。我感觉我马上离真相很近了。

这里的b.conn,通过反查得知:

        dialer := conf.getDialer()
        b.conn, b.connErr = dialer.Dial("tcp", b.addr)

conn是一个tcp连接。dialer的代码在这,从这里开始,用的就是go的标准库:

        return &net.Dialer{
            Timeout:   c.Net.DialTimeout,
            KeepAlive: c.Net.KeepAlive,
            LocalAddr: c.Net.LocalAddr,
        }

看看重试逻辑

发送的代码就先看到这里,这里说下错误处理,也就是retry的逻辑。

当write如果返回了错误,则写到responses中。反查responses, 一路点开,在brokerProducer.run里面,发现了response的处理逻辑。

最终调用asyncProducer.retryMessage

它会判定是否重试,如果重试的话,就会把msg放到retries这个channel中,继续查找,找到了retryHandler,在这个函数中,消息又被扔回了原来的 AsyncProducer.input

然后,上面的地狱般的流程,又会重新被执行一次。

然而,回到之前的问题。为什么在retry的时候,会卡死。似乎还是没有解答。当看代码没有思路的时候,加点打印,就成了最后的法宝。

再战,永不言败

通过我聪明的大脑袋一思考,发现,绝对是之前看漏了什么东西。我决定把sarama的代码clone下来,加点日志。看看它的执行流程。

git clone https://github.com/Shopify/sarama.git

然后,在go.mod中,把引用的module解析到本地的代码。
go.mod

...
require(
....
)
replace github.com/Shopify/sarama => /home/honoryin/workspace/personal/sarama

大功告成。

出于直觉,我觉得应该先在getDialer中加点日志,很可能和 config.Net.DialTimeout 这个配置有关。
打开getDialer函数,打一下调用栈。

func (c *Config) getDialer() proxy.Dialer {
    fmt.Println("getDialer run")
    stack := make([]byte, 1024*8)
    var stackStr = string(stack[:runtime.Stack(stack, false)])
    fmt.Println(stackStr)
    .........
}

令人惊喜的事情发生了。
令人惊喜的事情发生了。
令人惊喜的事情发生了。

在发生失败后,进入重试流程时。getDialer函数重复的被执行

getDialer调用栈:

而此时,因为config.Net.DialTimeout使用的是默认的配置,所以,在重试的时候把卡在这。

事情到了这里,不妨继续深挖下,为什么会重试getDialer。因为,我是在已经建立连接之后,中途将端口封锁的。它的重试机制在哪里呢。

定位到调用栈的最后一行broker.go:161,发现调用了匿名函数,所以如法炮制,找到这个匿名函数的调用处Open,再打下调用栈。

调用栈结果:

找到async_producer.go:517,发现居然是上面的dispatch函数。

顺着调用栈从下往上看,

因为此时是重试,msg.retries被设为1,大于pp.highWatermark(默认0)
newHighWatermark函数中,brokerProducer被设置为了nil。

原来,这就是之前看漏的地方。

因为brokerProducer被设置为了nil,下面需要进行brokerProducer的重新配置。

updateLeader函数:

看到pp.breaker.Run,这里的breaker就是一个重试器

反查它的定义:

        breaker:    breaker.New(3, 1, 10*time.Second),

这里,设定了3次重试,也就是说必须调4次getDialer失败,才会收到错误。而getDialer,又会因为被config.Net.DialTimeout控制,默认时间是30秒。这会造成长期的等待。

窗外,夜色正美。
凉风拂面,虽然已经到了深夜,但一种奇妙的乐趣让人心旷神怡。站在大厦之上,仰望深沉的天空。

-- 公昔登临,想诗境满怀,酒杯在手
-- 我来依旧,见青山对面,明月当楼

来自 大脸猪 写于 2020-11-18 10:57 -- 更新于2020-11-18 18:10 -- 0 条评论

0条评论

评论: