rxgo笔记 ## 使用chan做subject ``` package main import ( "fmt" "time" "github.com/reactivex/rxgo/handlers" "github.com/reactivex/rxgo/iterable" "github.com/reactivex/rxgo/observable" ) func main() { itChan := make(chan interface{}) defer close(itChan) it, _ := iterable.New(itChan) go func() { <-observable.From(it). Subscribe(handlers.NextFunc(func(v interface{}) { if num, ok := v.(int); ok { fmt.Println(num) } })) }() itChan <- 1 time.Sleep(1 * time.Second) } ``` ## 扩充原来的operators 包装包装即可,实现了简单的sum, flatmap : ``` package main import ( "fmt" "github.com/reactivex/rxgo/fx" "github.com/reactivex/rxgo/handlers" "github.com/reactivex/rxgo/iterable" "github.com/reactivex/rxgo/observable" "github.com/reactivex/rxgo/observer" ) type MyObservable struct { observable.Observable } func (o MyObservable) Sum(apply fx.MappableFunc) MyObservable { out := make(chan interface{}) go func() { sum := 0.0 for item := range o.Observable { retItem := apply(item) switch v := retItem.(type) { case int: sum += float64(v) case float64: sum += float64(v) } } out <- sum close(out) }() return MyObservable{observable.Observable(out)} } type FlatMappableFunc func(interface{}) observable.Observable // FlatMap 参考https://github.com/ReactiveX/RxGo/issues/49 func (o MyObservable) FlatMap(apply FlatMappableFunc) MyObservable { out := make(chan interface{}) go func() { for item := range o.Observable { go func(sub observable.Observable) { handler := observer.Observer{ NextHandler: func(i interface{}) { out <- i }, ErrHandler: func(err error) { out <- err }, } s := sub.Subscribe(handler) <-s }(apply(item)) } close(out) }() return MyObservable{observable.Observable(out)} } func ToMyObv(obv interface{}) MyObservable { switch v := obv.(type) { case observable.Observable: return MyObservable{v} case iterable.Iterable: return MyObservable{observable.From(v)} } return MyObservable{} } func main() { it, _ := iterable.New([]interface{}{1, 2, 3, 4, 5}) <-ToMyObv(it). Sum(func(v interface{}) interface{} { return v }). Map(func(v interface{}) interface{} { return v.(float64) + 1 }). Subscribe(handlers.NextFunc(func(v interface{}) { fmt.Println(v) })) } ``` 来自 大脸猫 写于 2018-09-23 00:12 -- 更新于2020-10-19 13:06 -- 0 条评论