go: 使用rxgo 实现责任链模式鉴权系统 ## 责任链模式 责任链模式将任务抽象为一个个的子任务依次执行,每个子任务都对任务做一定处理,如果处理成功,则中断后续的流程。 参考资料:https://refactoringguru.cn/design-patterns/chain-of-responsibility  例如常见的电话应答服务就是标准的责任链。 1. 收听语音,如果是查账等可自助的服务,直接按数字操作,链终止。否则转人工,链继续。 2. 普通客服处理成功,链终止。否则转专家,链继续。 3. 专家处理成功,链终止。专家不能马上处理,记工单,链终止。 这个模式可以将复杂的业务提取为一个个可复用的子业务,有助于优化代码的逻辑。 ## 鉴权 假设需要对api鉴权。有下述两条规则(实际业务会更复杂): 1. 检查是否是管理员,如果是,则直接通过。否则继续。 2. 检查要操作的任务是否是传入的用户创建的,是则通过。否则继续。 3. 返回不通过。 各项检查操作(函数)是可以在其它地方复用的。所以可以抽象为子任务节点。 实现责任链模式,可以使用rxgo。rxgo是reactivex的go实现。因为责任链标准的流式处理,天然和rxgo的思想不谋而合。  ### 定义任务与用户的抽象 ``` type Task struct { userID int } // GetUserID 获取这个task关联的用户ID func (t *Task) GetUserID() int { return t.userID } type User struct { id int role string } // GetID 获取用户的ID func (u *User) GetID() int { return u.id } // GetRole 获取用户的角色 func (u *User) GetRole() string { return u.role } ``` ### 定义鉴权系统的入参 ``` // checkInput 传入鉴权系统的入参,它是一个future,使用GetRet等待系统对其进行鉴权 type checkInput struct { t *Task u *User ch chan bool } // GetRet 阻塞并等待鉴权的结果 func (r *checkInput) GetRet() bool { select { case v := <-r.ch: return v case <-time.After(time.Second): return false } } // Error 这里利用rxgo return error提前结束流程的特性,这个参数可以作为error提前返回 func (checkInput) Error() string { return "" } ``` 这里要特别注意,这个参数实现error的接口,这是为了在rxgo中提前结束此次流程的讨巧设计。 ### 把各种鉴权操作抽象为函数 ``` func CheckTaskUser(task interface{ GetUserID() int }, user interface{ GetID() int }) bool { return task.GetUserID() == user.GetID() } func IsAdmin(user interface{ GetRole() string }) bool { return user.GetRole() == "admin" } ``` ### 定义鉴权流程,流程持续运行 ``` func Check(req checkInput) bool { checkCh <- req return req.GetRet() } var checkCh = make(chan checkInput, 1) func init() { // 这里只是演示所以硬编码,实际上每个节点是通过配置文件加载的,流程可以动态变更与配置 rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) { for req := range checkCh { next <- rxgo.Of(req) } }}). Map(func(c context.Context, i interface{}) (interface{}, error) { // 第1个节点,检查是否是管理员 req := i.(checkInput) if IsAdmin(req.u) { // 如果是管理员,使用return error提前终止流程 return nil, req } return req, nil }). Map(func(c context.Context, i interface{}) (interface{}, error) { // 第2个节点,检查是否是自己的任务 req := i.(checkInput) if CheckTaskUser(req.t, req.u) { // 如果是自己的任务,使用return error提前终止流程 return nil, req } return req, nil }). ForEach(func(i interface{}) { v := i.(checkInput) // 如果返回值仍然不为空,则说明所有流程都执行完毕,检查失败 if i != nil { v.ch <- false } }, func(e error) { req := checkInput{} // 如果错误类型是req类型,说明是提前返回的中断流程信号,对此次检查设置为成功 if errors.As(e, &req) { req.ch <- true } // 因为要持续运行,发现中断错误则继续运行 }, func() {}, rxgo.WithErrorStrategy(rxgo.ContinueOnError)) } ``` 在这里主要注意点: 1. 如果在某个节点鉴权成功,则直接`return nil, req`,因为`req`实现了`error`的接口。可以提前返回。 2. 配置rxgo策略为`rxgo.WithErrorStrategy(rxgo.ContinueOnError)`,在处理错误的时候不中断这个`observable` 3. 在最终的`ForEach`中统一处理各种状况,并对`req`的future设置值。 ### 开始鉴权 ``` func main() { task := &Task{ userID: 1, } task2 := &Task{ userID: 2, } adminUser := &User{ id: 1, role: "admin", } otherUser := &User{ id: 2, role: "normal", } // 如果是管理员,且是自己的任务,通过 t1 := Check(newReqStruct(adminUser, task)) logrus.Infof("t1:%v", t1) // true // 如果不是管理员,且不是自己的任务,不通过 t2 := Check(newReqStruct(otherUser, task)) // false logrus.Infof("t2:%v", t2) // 如果是管理员,且不是自己的任务,通过 t3 := Check(newReqStruct(adminUser, task2)) logrus.Infof("t3:%v", t3) // true } ``` 来自 大脸猪 写于 2021-12-10 15:07 -- 更新于2021-12-10 15:42 -- 0 条评论