从围绕API到围绕数据-使用流式编程构建更简洁的架构 ## 背景 在服务刚刚搭建时,通常的思维就是根据API编写业务逻辑: ``` // SendStream ... func (d *Svc) SendStream(stream MyApi_data.ProxyDialOut_SendStreamServer) error { for { ... data, err := stream.Recv() if err != nil { logrus.Errorf("recv error:%v", err) return err } ... // 对data做相关的操作 } } ``` 在服务暴露出越来越多的API后,相似的操作会越来越多。此时会进行抽象和封装,提取公共操作,例如提取函数、建立工厂等。 比如,在已有的API中添加监控统计。虽然对统计器做了抽象(对象或者函数),但可能仍然需要侵入到所有不同的API实现中。 ``` // SendStream ... func (d *MyApiSvc) SendStream(stream MyApi_data.ProxyDialOut_SendStreamServer) error { for { ... data, err := stream.Recv() if err != nil { logrus.Errorf("recv error:%v", err) return err } ... // 对data做相关的操作 ... // 添加一个共享的监控统计器,调用上报业务,每个api都需要改动 counter.Add("MyApi", 1) } } ``` 这在简单项目中无可厚非,但长此以往,随着各种功能的加入,API的业务代码会迅速臃肿起来。 后续,会发现每个API都各不相同,却又有公共部分。所以不得不写出大量形容相似的代码。这在**部门大部分项目中都屡见不鲜**。 究其原因,这是**因为抽象层次不够**造成的。 ## 摒除以API为中心的编程模式 在网络编程中,一般会引入中间件(比如trpc的filter)来处理共有逻辑,比如鉴权,日志,panic处理等。 但中间件一般**太过于抽象**并不直观,使得编写调试不易。但它的思路值得借鉴。 在对业务进行思考后,突发奇想。虽然对客户端(用户)而言,**每个API都是服务**(消费者)。但对于具体处理而言,**每个API同时也是生产者**。 将**每个API看成data source**,生产数据(data),就是对api最底层的抽象。 在这里,引入一个简单的流式编程包`go-streams`(github.com/reugn/go-streams),方便快速建立流式编程的架构。 ### 建立抽象:每个API都是datasource 每个api,都实现Source的接口,将自己收到的数据,无脑封装往下一跳怼 ``` import "github.com/reugn/go-streams/extension" type Source interface{ GetSource() *extension.ChanSource } ``` ### 实现抽象:为每个API服务都创建`chan`,这是`数据源`的本质 ``` type MyApiSvc struct { name string ctx context.Context ch chan any // 就是它 protocol string } // GetSource 实现Source接口 func (t *MyApiSvc) GetSource() *extension.ChanSource { return extension.NewChanSource(t.ch) } type DataItem struct { data any session map[string]any } // SendStream ... func (d *MyApiSvc) SendStream(stream MyApi_data.ProxyDialOut_SendStreamServer) error { for { ... data, err := stream.Recv() if err != nil { logrus.Errorf("recv error:%v", err) return err } ... // 这里不对数据做任何处理,封装之后,直接丢到chan里 td := new(DataItem) td.session = make(map[string]any) td.session["ip"] = ip td.session["trace_id"] = grand.S(8) td.data = data d.ch <- td } } ``` 每个api的`chan`被`go-streams`封装为一个`数据源ChanSource`类型。 将各种API的原始数据封装为`DataItem`在流中统一处理,内置`session`是神来之笔。这个session会包含每条数据的个性化信息。可以由每个步骤增添并提供给下一步骤使用。 这样,在编写业务逻辑时就能站在更上层、数据的角度思考问题。 ## 流式处理 在上面,每个数据源都已经被封装为一个`ChanSource`(本质是`chan`),现在来统一规划业务逻辑。 使用`go-streams`,将整个业务逻辑抽象成`数据流`的多个步骤: 此编程模式的特色之处在于: 1. 每个步骤**接收上一个节点**的数据,处理之后,将**数据发往下一跳**。编写单一步骤的时候,**只需要考虑本步骤**处理的事情,思维量大大减少。 2. 在单个步骤,处理是并发的,但在不同的步骤,处理是顺序的。 3. 围绕数据编程,方便抽象施加统一的处理过程,比如`getParser`,`getSender`两个工厂函数。 4. 可以任意的在节点间**统一的**新增其它的处理,**不侵入**已经编写好的业务逻辑。每个节点都有**前驱和后继**,拥有无限可能。没错,这就是`面向切面编程`。 ```go source := getDataSource(ctx, cfg.Name) // cfg.Name == "MyApi",通过工厂函数载入配置,获得interface `Source` // 调用接口 source.GetSource().Via(flow.NewMap(func(i interface{}) interface{} { // 步骤1,创建日志 // 从用户发来的每条消息都被打散成为了数据源的一条数据 msg := i.(model.*DataItem) traceID := msg.GetSession()["trace_id"].(string) // 从数据的session中获取数据的附加信息 tags := map[string]interface{}{ "trace_id": traceID, "ip": msg.GetSession()["ip"], "name": c.Name, } log := logrus.WithFields(tags) // 这个步骤只是为了添加一个日志对象 return []any{msg, log} // 使用8个协程来执行这个步骤 }, 8)).Via(flow.NewMap(func(i interface{}) interface{} { // 步骤2,解析数据 arr := i.([]any) // 这里的i是上一步骤return的数据 msg := arr[0].(*DataItem) log := arr[1].(*logrus.Entry) parser := getParser(cfg.Name) // 这个工厂函数是每种数据源的个性化处理。根据配置获取一个解析器 // 解析数据 data, err := parser(ctx, msg, c.Name, msg.GetSession()["ip"]) if err != nil { log.Error(err) return err } return []any{data, log} }, 8)).Via(flow.NewMap(func(i interface{}) interface{} { // 步骤3,发送数据到下个服务 arr,ok := i.([]any) // 这里的i,就是上一步骤return的数据 if !ok{ return i // 如果上一步骤return的是error,则直接跳过不再解析 } data := arr[0].(*MyApiData) // 这里的data,已经是上一步骤解析出来的数据 log := arr[1].(*logrus.Entry) // 发数数据 sender := getSender(cfg.Name) // 这个工厂函数为不同的数据源分配一个发送器 sender.Send(qdata) return i }, 8)).Via(flow.NewMap(func(i interface{}) interface{} { // 步骤4,统计发送成功的数据量 arr, ok := i.([]any) // 这里的i,就是上一步骤return的数据 if ok{ msg := arr[0].(*DataItem) log := arr[1].(*logrus.Entry) // 内部统计 log.Info("send success") controller.TraceAfter(msg.GetSession()["ip"]) } return i }, 8)).To(extension.NewIgnoreSink()) ``` ## 为什么要使用`go-streams` 1. 库非常的简单,实际就是对go chan的封装。简单是一种美,简单的东西一般不容易出错。 2. 隐含了`流式编程`的主要思想,它并没有什么黑科技,但使用它会**强制**我们使用`面向数据`的,`抽象的`方式来思考问题。最终写出低耦合可调测的代码。这才是难能可贵的。 来自 大脸猪 写于 2022-09-26 14:37 -- 更新于2022-09-26 14:37 -- 0 条评论