又重温了一遍RxJS的一些核心概念,做一个简单总结。

入门

RxJS 是一个库,它通过使用 observable 序列来编写异步和基于事件的程序。它提供了一个核心类型 Observable,附属类型 (Observer、 Schedulers、 Subjects) 和受 [Array#extras] 启发的操作符 (map、filter、reduce、every, 等等),这些数组操作符可以把异步事件作为集合来处理。

注:[Array#extras] 我的理解应该是Array的原生处理方法,类似lodash方法,所以,可以把 RxJS 当做是用来处理事件的 Lodash 。

在 RxJS 中用来解决异步事件管理的的基本概念是:

  • Observable (可观察对象): 表示一个概念,这个概念是一个可调用的未来值或事件的集合。
  • Observer (观察者): 一个回调函数的集合,它知道如何去监听由 Observable 提供的值。
  • Subscription (订阅): 表示 Observable 的执行,主要用于取消 Observable 的执行。
  • Operators (操作符): 采用函数式编程风格的纯函数 (pure function),使用像 map、filter、concat、flatMap 等这样的操作符来处理集合。
  • Subject (主体): 相当于 EventEmitter,并且是将值或事件多路推送给多个 Observer 的唯一方式。
  • Schedulers (调度器): 用来控制并发并且是中央集权的调度员,允许我们在发生计算时进行协调,例如 setTimeout 或 requestAnimationFrame 或其他。

Observable 作为函数的泛化

订阅 Observable 类似于调用函数。如果你不调用函数,函数就不会执行;如果你不订阅Observable,也同样不会执行。
Observable和函数的区别在于:Observable可以返回多个值,函数则无法做到。

1
2
3
4
5
function foo() {
console.log('Hello');
return 42;
return 100; // 死代码,永远不会执行
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
var foo = Rx.Observable.create(function (observer) {
console.log('Hello');
observer.next(42);
observer.next(100); // “返回”另外一个值
observer.next(200); // 还可以再“返回”值
});

console.log('before');
foo.subscribe(function (x) {
console.log(x);
});
console.log('after');

# 输出
"before"
"Hello"
42
100
200
"after"

Observable剖析

Observable 的核心关注点:

  • 创建 Observables
  • 订阅 Observables
  • 执行 Observables
  • 清理 Observables

Observer观察者

观察者是由 Observable 发送的值的消费者。观察者只是一组回调函数的集合,每个回调函数对应一种 Observable 发送的通知类型:nexterrorcomplete

1
2
3
4
5
var observer = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};

要使用观察者,需要把它提供给 Observable 的 subscribe 方法:

1
observable.subscribe(observer);

观察者只是有三个回调函数的对象,每个回调函数对应一种 Observable 发送的通知类型。

RxJS 中的观察者也可能是部分的。如果你没有提供某个回调函数,Observable 的执行也会正常运行,只是某些通知类型会被忽略,因为观察者中没有相对应的回调函数。

当订阅 Observable 时,你可能只提供了一个回调函数作为参数,而并没有将其附加到观察者对象上,例如这样:

1
observable.subscribe(x => console.log('Observer got a next value: ' + x));

在 observable.subscribe 内部,它会创建一个观察者对象并使用第一个回调函数参数作为 next 的处理方法。三种类型的回调函数都可以直接作为参数来提供:

1
2
3
4
5
observable.subscribe(
x => console.log('Observer got a next value: ' + x),
err => console.error('Observer got an error: ' + err),
() => console.log('Observer got a complete notification')
);

Subject主体

Subject 是一种特殊类型的 Observable,它允许将值多播给多个观察者,所以 Subject 是多播的,而普通的 Observables 是单播的(每个已订阅的观察者都拥有 Observable 的独立执行)。

每个 Subject 都是 Observable 。
每个 Subject 都是观察者。

参考:https://cn.rx.js.org/manual/overview.html