Rxjs浅浅滴学习
我们都知道 JS 是什么,那么什么是 Rx 呢?Rx 是 Reactive Extension(也叫 ReactiveX)的简称,指的是实践响应式编程的一套工具,Rx 官网首页的介绍是一套通过可监听流来做异步编程的 API(An API for asynchronous programming with observable streams)。
RxJS 是基于观察者模式和迭代器模式以函数式编程思维来实现的。
RxJS 的特点
- 数据流抽象了很多现实问题
- 擅长处理异步问题
- 把复杂问题分解为简单问题的组合
前端中的 DOM 事件、WebSocket 推送消息、AJAX 请求资源、动画都可以看作是数据流。
RxJS 对数据采用“推”的方式,当一个数据产生时,会将其推送给对应的处理函数,这个处理函数不用关心数据时同步产生还是异步产生的,因此处理异步将会变得非常简单。
RxJS 中很多操作符,每个操作符都提供了一个小功能,学习 RxJS 最重要的就是学习如何组合操作符来解决复杂问题。
在 RxJS 中用来解决异步事件管理的的基本概念是:
- Observable (可观察对象): 表示一个概念,这个概念是一个可调用的未来值或事件的集合。
- Observer (观察者): 一个回调函数的集合,它知道如何去监听由 Observable 提供的值。
- Subscription (订阅): 表示 Observable 的执行,主要用于取消 Observable 的执行。
- Operators (操作符): 采用函数式编程风格的纯函数 (pure function),使用像
map
、filter
、concat
、flatMap
等这样的操作符来处理集合。 - Subject (主体): 相当于 EventEmitter,并且是将值或事件多路推送给多个 Observer 的唯一方式。
- Schedulers (调度器): 用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如
setTimeout
或requestAnimationFrame
或其他。
Observable (可观察对象)
Observables 是多个值的惰性推送集合。它填补了下面表格中的空白:
单个值 | 多个值 | |
---|---|---|
拉取 | Function |
Iterator |
推送 | Promise |
Observable |
示例 - 当订阅下面代码中的 Observable 的时候会立即(同步地)推送值1
、2
、3
,然后1秒后会推送值4
,再然后是完成流:
1 | var observable = Rx.Observable.create(function (observer) { |
Observables 像是没有参数, 但可以泛化为多个值的函数。
Demo01:
1 | function foo() { |
Observables 重写上面的代码:
1 | var foo = Rx.Observable.create(function (observer) { |
这是因为函数和 Observables 都是惰性运算。如果你不调用函数,console.log('Hello')
就不会执行。Observables 也是如此,如果你不“调用”它(使用 subscribe
),console.log('Hello')
也不会执行。此外,“调用”或“订阅”是独立的操作:两个函数调用会触发两个单独的副作用,两个 Observable 订阅同样也是触发两个单独的副作用。EventEmitters 共享副作用并且无论是否存在订阅者都会尽早执行,Observables 与之相反,不会共享副作用并且是延迟执行。
订阅 Observable 类似于调用函数。
Observables 传递值可以是同步的,也可以是异步的。
Observable剖析
Observables 是使用 Rx.Observable.create
或创建操作符创建的,并使用观察者来订阅它,然后执行它并发送 next
/ error
/ complete
通知给观察者,而且执行可能会被清理。这四个方面全部编码在 Observables 实例中,但某些方面是与其他类型相关的,像 Observer (观察者) 和 Subscription (订阅)。
Observable 的核心关注点:
- 创建 Observables
- 订阅 Observables
- 执行 Observables
- 清理 Observables
创建 Observables
Rx.Observable.create
是 Observable
构造函数的别名,它接收一个参数:subscribe
函数。
下面的示例创建了一个 Observable,它每隔一秒会向观察者发送字符串 'hi'
。
1 | var observable = Rx.Observable.create(function subscribe(observer) { |
Observables 可以使用 create
来创建, 但通常我们使用所谓的创建操作符, 像 of
、from
、interval
、等等。
在上面的示例中,subscribe
函数是用来描述 Observable 最重要的一块。我们来看下订阅是什么意思。
订阅 Observables
示例中的 Observable 对象 observable
可以订阅,像这样:
1 | observable.subscribe(x => console.log(x)); |
observable.subscribe
和 Observable.create(function subscribe(observer) {...})
中的 subscribe
有着同样的名字,这并不是一个巧合。在库中,它们是不同的,但从实际出发,可以认为在概念上它们是等同的。
订阅 Observable 像是调用函数, 并提供接收数据的回调函数。
subscribe
调用是启动 “Observable 执行”的一种简单方式, 并将值或事件传递给本次执行的观察者。
执行 Observables
Observable.create(function subscribe(observer) {...})
中...
的代码表示 “Observable 执行”,它是惰性运算,只有在每个观察者订阅后才会执行。随着时间的推移,执行会以同步或异步的方式产生多个值。
Observable 执行可以传递三种类型的值:
- “Next” 通知: 发送一个值,比如数字、字符串、对象,等等。
- “Error” 通知: 发送一个 JavaScript 错误 或 异常。
- “Complete” 通知: 不再发送任何值。
“Next” 通知是最重要,也是最常见的类型:它们表示传递给观察者的实际数据。”Error” 和 “Complete” 通知可能只会在 Observable 执行期间发生一次,并且只会执行其中的一个。
在 Observable 执行中, 可能会发送零个到无穷多个 “Next” 通知。如果发送的是 “Error” 或 “Complete” 通知的话,那么之后不会再发送任何通知了。
1 | var observable = Rx.Observable.create(function subscribe(observer) { |
1 | var observable = Rx.Observable.create(function subscribe(observer) { |
1 | var observable = Rx.Observable.create(function subscribe(observer) { |
清理 Observable 执行
因为 Observable 执行可能会是无限的,并且观察者通常希望能在有限的时间内中止执行,所以我们需要一个 API 来取消执行。因为每个执行都是其对应观察者专属的,一旦观察者完成接收值,它必须要一种方法来停止执行,以避免浪费计算能力或内存资源。
1 | var observable = Rx.Observable.from([10, 20, 30]); |
当你订阅了 Observable,你会得到一个 Subscription ,它表示进行中的执行。只要调用
unsubscribe()
方法就可以取消执行。
Observer (观察者)
什么是观察者? - 观察者是由 Observable 发送的值的消费者。观察者只是一组回调函数的集合,每个回调函数对应一种 Observable 发送的通知类型:next
、error
和 complete
。下面的示例是一个典型的观察者对象:
1 | var observer = { |
要使用观察者,需要把它提供给 Observable 的 subscribe
方法:
1 | observable.subscribe(observer); |
观察者只是有三个回调函数的对象,每个回调函数对应一种 Observable 发送的通知类型。
Subscription(订阅)
什么是 Subscription ? - Subscription 是表示可清理资源的对象,通常是 Observable 的执行。Subscription 有一个重要的方法,即 unsubscribe
,它不需要任何参数,只是用来清理由 Subscription 占用的资源。
1 | var observable = Rx.Observable.interval(1000); |
Subscription 基本上只有一个
unsubscribe()
函数,这个函数用来释放资源或去取消 Observable 执行。
1 | var observable1 = Rx.Observable.interval(400); |
simple Demo1
1 | var button = document.querySelector('button'); |
1 | var button = document.querySelector('button'); |
Rx.Observable.fromEvent()
相当于创建了一个可观察对象Observable
,也就是监听的代理对象,subscribe是这个对象的一个方法,该方法返回这个监听的事件,subscribe
方法的参数是对观察对象返回值做出下一步操作(回调函数).
Demo2(控制流动)
1 | // 输入 "hello world" |
Demo3(产生值)
1 | // 输入 "hello world" |
###