Rxjs浅浅滴学习

Rxjs浅浅滴学习

我们都知道 JS 是什么,那么什么是 Rx 呢?Rx 是 Reactive Extension(也叫 ReactiveX)的简称,指的是实践响应式编程的一套工具,Rx 官网首页的介绍是一套通过可监听流来做异步编程的 API(An API for asynchronous programming with observable streams)。

RxJS 是基于观察者模式和迭代器模式以函数式编程思维来实现的。

RxJS 的特点

  • 数据流抽象了很多现实问题
  • 擅长处理异步问题
  • 把复杂问题分解为简单问题的组合

前端中的 DOM 事件、WebSocket 推送消息、AJAX 请求资源、动画都可以看作是数据流。

RxJS 对数据采用“推”的方式,当一个数据产生时,会将其推送给对应的处理函数,这个处理函数不用关心数据时同步产生还是异步产生的,因此处理异步将会变得非常简单。

RxJS 中很多操作符,每个操作符都提供了一个小功能,学习 RxJS 最重要的就是学习如何组合操作符来解决复杂问题。

在 RxJS 中用来解决异步事件管理的的基本概念是:

  • Observable (可观察对象): 表示一个概念,这个概念是一个可调用的未来值或事件的集合。
  • Observer (观察者): 一个回调函数的集合,它知道如何去监听由 Observable 提供的值。
  • Subscription (订阅): 表示 Observable 的执行,主要用于取消 Observable 的执行。
  • Operators (操作符): 采用函数式编程风格的纯函数 (pure function),使用像 mapfilterconcatflatMap 等这样的操作符来处理集合。
  • Subject (主体): 相当于 EventEmitter,并且是将值或事件多路推送给多个 Observer 的唯一方式。
  • Schedulers (调度器): 用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如 setTimeoutrequestAnimationFrame 或其他。

Observable (可观察对象)

Observables 是多个值的惰性推送集合。它填补了下面表格中的空白:

单个值 多个值
拉取 Function Iterator
推送 Promise Observable

示例 - 当订阅下面代码中的 Observable 的时候会立即(同步地)推送值123,然后1秒后会推送值4,再然后是完成流:

1
2
3
4
5
6
7
8
9
var observable = Rx.Observable.create(function (observer) {
observer.next(1);
observer.next(2);
observer.next(3);
setTimeout(() => {
observer.next(4);
observer.complete();
}, 1000);
});

Observables 像是没有参数, 但可以泛化为多个值的函数。

Demo01:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
function foo() {
console.log('Hello');
return 42;
}

var x = foo.call(); // 等同于 foo()
console.log(x);
var y = foo.call(); // 等同于 foo()
console.log(y);


"Hello"
42
"Hello"
42

Observables 重写上面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
var foo = Rx.Observable.create(function (observer) {
console.log('Hello');
observer.next(42);
});

foo.subscribe(function (x) {
console.log(x);
});
foo.subscribe(function (y) {
console.log(y);
});


"Hello"
42
"Hello"
42

这是因为函数和 Observables 都是惰性运算。如果你不调用函数,console.log('Hello') 就不会执行。Observables 也是如此,如果你不“调用”它(使用 subscribe),console.log('Hello') 也不会执行。此外,“调用”或“订阅”是独立的操作:两个函数调用会触发两个单独的副作用,两个 Observable 订阅同样也是触发两个单独的副作用。EventEmitters 共享副作用并且无论是否存在订阅者都会尽早执行,Observables 与之相反,不会共享副作用并且是延迟执行。

订阅 Observable 类似于调用函数。

Observables 传递值可以是同步的,也可以是异步的。

Observable剖析

Observables 是使用 Rx.Observable.create 或创建操作符创建的,并使用观察者来订阅它,然后执行它并发送 next / error / complete 通知给观察者,而且执行可能会被清理。这四个方面全部编码在 Observables 实例中,但某些方面是与其他类型相关的,像 Observer (观察者) 和 Subscription (订阅)。

Observable 的核心关注点:

  • 创建 Observables
  • 订阅 Observables
  • 执行 Observables
  • 清理 Observables

创建 Observables

Rx.Observable.createObservable 构造函数的别名,它接收一个参数:subscribe 函数。

下面的示例创建了一个 Observable,它每隔一秒会向观察者发送字符串 'hi'

1
2
3
4
5
var observable = Rx.Observable.create(function subscribe(observer) {
var id = setInterval(() => {
observer.next('hi')
}, 1000);
});

Observables 可以使用 create 来创建, 但通常我们使用所谓的创建操作符, 像 offrominterval、等等。

在上面的示例中,subscribe 函数是用来描述 Observable 最重要的一块。我们来看下订阅是什么意思。

订阅 Observables

示例中的 Observable 对象 observable 可以订阅,像这样:

1
observable.subscribe(x => console.log(x));

observable.subscribeObservable.create(function subscribe(observer) {...}) 中的 subscribe 有着同样的名字,这并不是一个巧合。在库中,它们是不同的,但从实际出发,可以认为在概念上它们是等同的。

订阅 Observable 像是调用函数, 并提供接收数据的回调函数。

subscribe 调用是启动 “Observable 执行”的一种简单方式, 并将值或事件传递给本次执行的观察者。

执行 Observables

Observable.create(function subscribe(observer) {...})...的代码表示 “Observable 执行”,它是惰性运算,只有在每个观察者订阅后才会执行。随着时间的推移,执行会以同步或异步的方式产生多个值。

Observable 执行可以传递三种类型的值:

  • “Next” 通知: 发送一个值,比如数字、字符串、对象,等等。
  • “Error” 通知: 发送一个 JavaScript 错误 或 异常。
  • “Complete” 通知: 不再发送任何值。

“Next” 通知是最重要,也是最常见的类型:它们表示传递给观察者的实际数据。”Error” 和 “Complete” 通知可能只会在 Observable 执行期间发生一次,并且只会执行其中的一个。

在 Observable 执行中, 可能会发送零个到无穷多个 “Next” 通知。如果发送的是 “Error” 或 “Complete” 通知的话,那么之后不会再发送任何通知了。

1
2
3
4
5
6
var observable = Rx.Observable.create(function subscribe(observer) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
});
1
2
3
4
5
6
7
var observable = Rx.Observable.create(function subscribe(observer) {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
observer.next(4); // 因为违反规约,所以不会发送
});
1
2
3
4
5
6
7
8
9
10
var observable = Rx.Observable.create(function subscribe(observer) {
try {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
} catch (err) {
observer.error(err); // 如果捕获到异常会发送一个错误
}
});

清理 Observable 执行

因为 Observable 执行可能会是无限的,并且观察者通常希望能在有限的时间内中止执行,所以我们需要一个 API 来取消执行。因为每个执行都是其对应观察者专属的,一旦观察者完成接收值,它必须要一种方法来停止执行,以避免浪费计算能力或内存资源。

1
2
3
4
var observable = Rx.Observable.from([10, 20, 30]);
var subscription = observable.subscribe(x => console.log(x));
// 稍后:
subscription.unsubscribe();

当你订阅了 Observable,你会得到一个 Subscription ,它表示进行中的执行。只要调用 unsubscribe() 方法就可以取消执行。

Observer (观察者)

什么是观察者? - 观察者是由 Observable 发送的值的消费者。观察者只是一组回调函数的集合,每个回调函数对应一种 Observable 发送的通知类型:nexterrorcomplete 。下面的示例是一个典型的观察者对象:

1
2
3
4
5
var observer = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};

要使用观察者,需要把它提供给 Observable 的 subscribe 方法:

1
observable.subscribe(observer);

观察者只是有三个回调函数的对象,每个回调函数对应一种 Observable 发送的通知类型。

Subscription(订阅)

什么是 Subscription ? - Subscription 是表示可清理资源的对象,通常是 Observable 的执行。Subscription 有一个重要的方法,即 unsubscribe,它不需要任何参数,只是用来清理由 Subscription 占用的资源。

1
2
3
4
5
6
var observable = Rx.Observable.interval(1000);
var subscription = observable.subscribe(x => console.log(x));
// 稍后:
// 这会取消正在进行中的 Observable 执行
// Observable 执行是通过使用观察者调用 subscribe 方法启动的
subscription.unsubscribe();

Subscription 基本上只有一个 unsubscribe() 函数,这个函数用来释放资源或去取消 Observable 执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
var observable1 = Rx.Observable.interval(400);
var observable2 = Rx.Observable.interval(300);

var subscription = observable1.subscribe(x => console.log('first: ' + x));
var childSubscription = observable2.subscribe(x => console.log('second: ' + x));

subscription.add(childSubscription);

setTimeout(() => {
// subscription 和 childSubscription 都会取消订阅
subscription.unsubscribe();
}, 1000);

output:
second: 0
first: 0
second: 1
first: 1
second: 2

simple Demo1

1
2
3
4
var button = document.querySelector('button');
button.addEventListener('click', () =>
console.log('Clicked!')
);
1
2
3
var button = document.querySelector('button');
Rx.Observable.fromEvent(button, 'click')
.subscribe(() => console.log('Clicked!'));

Rx.Observable.fromEvent()相当于创建了一个可观察对象Observable,也就是监听的代理对象,subscribe是这个对象的一个方法,该方法返回这个监听的事件,subscribe方法的参数是对观察对象返回值做出下一步操作(回调函数).

Demo2(控制流动)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 输入 "hello world"
var input = Rx.Observable.fromEvent(document.querySelector('input'), 'input');

// 过滤掉小于3个字符长度的目标值
input.filter(event => event.target.value.length > 2)
.map(event => event.target.value)
.subscribe(value => console.log(value)); // "hel"

// 延迟事件
input.delay(200)
.map(event => event.target.value)
.subscribe(value => console.log(value)); // "h" -200ms-> "e" -200ms-> "l" ...

// 每200ms只能通过一个事件
input.throttleTime(200)
.map(event => event.target.value)
.subscribe(value => console.log(value)); // "h" -200ms-> "w"

// 停止输入后200ms方能通过最新的那个事件
input.debounceTime(200)
.map(event => event.target.value)
.subscribe(value => console.log(value)); // "o" -200ms-> "d"

// 在3次事件后停止事件流
input.take(3)
.map(event => event.target.value)
.subscribe(value => console.log(value)); // "hel"

// 直到其他 observable 触发事件才停止事件流
var stopStream = Rx.Observable.fromEvent(document.querySelector('button'), 'click');
input.takeUntil(stopStream)
.map(event => event.target.value)
.subscribe(value => console.log(value)); // "hello" (点击才能看到)

Demo3(产生值)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 输入 "hello world"
var input = Rx.Observable.fromEvent(document.querySelector('input'), 'input');

// 传递一个新的值
input.map(event => event.target.value)
.subscribe(value => console.log(value)); // "h"

// 通过提取属性传递一个新的值
input.pluck('target', 'value')
.subscribe(value => console.log(value)); // "h"

// 传递之前的两个值
input.pluck('target', 'value').pairwise()
.subscribe(value => console.log(value)); // ["h", "he"]

// 只会通过唯一的值
input.pluck('data').distinct()
.subscribe(value => console.log(value)); // "helo wrd"

// 不会传递重复的值
input.pluck('data').distinctUntilChanged()
.subscribe(value => console.log(value)); // "helo world"

###

感谢你的打赏哦!