RxJS 一些有趣的 operate 操作符
前言
RxJS 一些有趣的 operate 操作符
之前我们写了关于 RxJS 中的几个概念,Observable, Subject, pipe, Scheduler 等
而 RxJS 之所以很方便,很大程度上取决于它封装好的一些 operate 操作符
前文的 pipe 管道也属于操作符的一种
正文
RxJS 中,个人理解其实就是两大类的操作符,一类是创建类操作符,一类就是管道类
不过文档中给它分类的更加详细
There are operators for different purposes, and they may be categorized as: creation, transformation, filtering, joining, multicasting, error handling, utility, etc. In the following list you will find all the operators organized in categories.
创建类操作符
在 RxJS 中,创建类的操作符其实不多,有几个比较常见的
fromEvent
fromEvent 使得我们能够进行 dom 元素事件的绑定
1 | import { fromEvent } from "rxjs"; |
这样子每次我们点击就可以输出一次值,该值为事件 event 对象

如果我们想要解绑该事件
我们可以调用返回的 Subscription 的 unsubscribe 方法
1 | import { fromEvent } from "rxjs"; |
interval
setInterval 的一个包装,使得我们可以启用一个定时器
1 | import { interval } from "rxjs"; |
每次会输出该次对应的索引(从 0 开始)

取消该定时器,同样调用 Subscription 的 unsubscribe 方法
1 | import { interval } from "rxjs"; |
of
把传入的每个值都经过 Observable 发送出来
1 | import { of } from "rxjs"; |
传入 1, 2, 3, 4 ,那么输出就是 1, 2, 3, 4 了

range
生成一个序列,然后经由 Observable 发送出来
1 | import { range } from "rxjs"; |
注意,这里的参数不是起始和结束,而是起始和个数
即输出不是 2, 3, 4, 5 而是 2, 3, 4, 5, 6

generate
能够像 for 那样生成值
1 | import { generate } from "rxjs"; |
上面的代码表示初始值为 1, 结束条件为值小于 5 , 然后每次都会自增 1
和下面的代码等价
1 | new Observable((subscriber) => { |
这样子我们就能从 1 输出到 4

merge
合并多个 Observable 为单个 Observable ,然后一一输出对应的值
1 | import { fromEvent, interval, map, merge } from "rxjs"; |
经过 merge 之后,新的 Observable 会在每次点击和间隔 1s 输出一次对应的值

forkJoin
合并多个 Observable 为单个 Observable ,收集每个 Observable 完成前的一个值,然后输出
1 | import { forkJoin, fromEvent, interval, map, take } from "rxjs"; |
在点击两次且等待超过三秒之后,会输出一个数组,这个数组包含二者的值

concat
合并多个 Observable 为单个 Observable ,和 merge 不同的是,concat 会在第一个 Observable 完成之后再订阅第二个 Observable ,以此类推
1 | import { concat, fromEvent, interval, map, take } from "rxjs"; |
这里我们必须点击两次之后,定时器才会启动,如果点击事件不使用 take(2) 限定次数的话,那么定时器永远不会开始
因为点击事件永远无法完成

race
合并多个 Observable 为单个 Observable ,只会输出一个发出值的 Observable
1 | import { fromEvent, interval, map, race, take } from "rxjs"; |
此时如果立马进行点击,那么定时器就不会输出,如果等到定时器输出第一个值,那么点击事件就不会生效
定时器“赢”

点击事件“赢”

zip
合并多个 Observable 为单个 Observable ,每个 Observable 发出的每个值会组成一个数组作为新的值发出
1 | import { fromEvent, interval, map, take, zip } from "rxjs"; |
每个点击和每次定时器执行作为值发送出去,新值数量由这些 Observable 发出的值的最少的个数来确定
在这里点击事件只会发生两次,所以新的 Observable 只会发出两个值
定时器的第三个值无法被匹配,故不会发送到新的 Observable 对象中
一旦某个传入的 Observable 完成,其他传入的 Observable 会被 unsubscribe ,所以不用担心资源释放问题

combineLatest
合并多个 Observable 为单个 Observable ,与 zip 类似,但是 combineLatest 会取最近的一次值组成一个新值然后发出
1 | import { combineLatest, fromEvent, interval, map, take, zip } from "rxjs"; |
这里假如我们等三秒之后在点击,那么输出的值是 [event, '定时器:2']
而 zip 这里会输出 [event, '定时器:0']

除了上面这些常用的, RxJS 还封装了 ajax ,不过前端一般都是用 axios 来进行 http 请求
所以感觉用处不大
还有一些比较少见的创建操作符,比如 defer, timer, iif 等,都不难,看一下文档一下子就懂了
管道操作符
在之前的文章 RxJS 使用之 pipe 管道 中,我们已经列举了一些常见的管道操作符了
比如 map, filter, first, last 等
这些一般都比较容易理解
现在我们来列举一些高级一点的管道
takeUntil
传入一个 Observable ,在这个 Observable 完成时,源 Observable 则完成
1 | import { fromEvent, interval, map, take, takeUntil } from "rxjs"; |
这里如果我们等 3s 之后再点击,那么点击事件不会生效,因为定时器已经完成了
在 3s 内的点击事件都是可以发出的
之前我们在 Subject 那一章讲过这个管道,在 antd( angular 版本) 中非常常见
在需要对全局可观察对象订阅的地方,加上 pipe(takeUntil(destroy$)) ,然后在 ngOnDestroy 钩子中完成 destroy$ 即可比较优雅的管理这些订阅过程
distinct
对于源 Observable 发送的一系列的值,会经过去重之后再发送出来
对于基本类型来说,使用起来非常简单
1 | import { distinct, of} from "rxjs"; |
这里源 Observable 发出 1, 2, 3, 3, 4, 4
经过去重之后会输出 1, 2, 3, 4

对于引用类型,可以通过一个函数来提取唯一的部分进行比较,一般都是以对象的某个属性来去重,比如唯一 id 等
当然如果不传,那么就是按照引用的地址进行比较,即 obj1 === obj2 这种形式
1 | import { distinct, of } from "rxjs"; |
由于 s2 和 s3 的 id 是一样的,所以最终只会输出 s1 和 s2

distinctUntilChanged
这个就比较有意思了,从名字上看,也是去重,但是去重的模式不一样
这个管道的去重指和前一个值进行比较,如果相同,那么该值就不会被输出
1 | import { distinctUntilChanged, of } from "rxjs"; |
如果我们使用 distinct ,上面的代码输出为 1, 2
而 distinctUntilChanged 会输出 1, 2, 1, 2
当然这个管道也支持传入函数来确定唯一的值
比如,现在我们希望在窗口水平 resize 的时候进行一些操作,可以这么写
1 | import { distinctUntilChanged, fromEvent, map } from "rxjs"; |
效果如下:

当然,如果你想换成垂直 resize ,只要更改为 distinctUntilChanged((pre, cur) => pre.height === cur.height) 即可
可以想象一下如果我们不使用 RxJS 的话,这段逻辑写起来还是挺复杂的,而且免不得得产生一些局部变量(记录 pre 值)
retry
在 Observable 发生错误的时候可以重新订阅
1 | import { Observable, retry } from "rxjs"; |
在这里我们使用 retry(2) 表示如果源 Observable 发生了错误,那么重新订阅,最大次数为 2 次
也就是两次后,如果成功了,那无事发生,如果还是报错,那么错误就会通过 error 回调抛出

可以看到前两次的 error 并没有被抛出,只有最后一次的 error 被抛出
这个管道对于 http 请求还是挺有用的,直接就可以设置重拾次数
elementAt
直接取某个索引的元素,很像 js 中通过 [] 取值
1 | import { elementAt, range } from "rxjs"; |
取索引 2 的值,对于 1, 2, 3, 4, 5 ,也就是数字 3
效果如下:

single
只有在匹配到一个值的时候才会输出,其他情况下,没有值匹配或者匹配了多个值,则会报错
1 | import { of, single } from "rxjs"; |
效果如下:

combineLatestAll, concatAll, mergeAll
和 combineLatest 的效果一样,不过 combineLatestAll 作为管道主要用于高阶 Observable
1 | import { fromEvent, map, interval, take, combineLatestAll } from "rxjs"; |
这里每次点击产生一个定时器,每个定时器只输出两个值,只响应两次点击事件
意味着会有两个定时器,然后通过 combineLatestAll 结合两个定时器,完全可以看成如下代码
1 | import { combineLatest } from "rxjs"; |
效果如下:

除了 combineLatestAll ,其他的一些创建类操作符也有其对应的管道操作符
concat对应concatAllmerge对应mergeAll
exhaustAll
用于高阶 Observable ,当订阅第一个 Observable ,接下来在这个 Observable 完成前就订阅的 Observable 被抛弃
感觉很绕,我们可以写个例子
1 | import { fromEvent, interval, take, map, exhaustAll } from "rxjs"; |
这里我们使用 map 让生成的 Observable 成为高阶的,然后使用 exhaustAll
当我们点击的时候,会输出 0, 1, 2 ,如果在 0, 1, 2 输出还没完成的时候,继续点击,那么这个 Observable 是会被抛弃的
也就是只有等到 0, 1, 2 输出完成,我们再次点击,那么才会继续输出 0, 1, 2
效果如下:

switchAll
用于高阶 Observable ,这个和 exhaustAll 相反,如果在前一个 Observable 还未完成时,得到了下一个 Observable
那么当前订阅的 Observable 会被完成,然后去订阅这个新的 Observable
1 | import { fromEvent, interval, take, map, tap, switchAll } from "rxjs"; |
这个例子和 exhaustAll 的一样,只不过我们使用 switchAll 来合并 Observable
在输出 0, 1, 2 输出未完成之前,继续点击的话,那么会重新输出 0, 1, 2
效果如下:

后记
RxJS 的系列基本就结束了,接下来学习 Angular ,在 RxJS 方面应该就问题不大了