# Observable
Observable 是多个值的惰性 Push 集合。他填补了下表中的缺失点:
SINGLE | MULTIPLEXED | |
|---|---|---|
Pull | Function | Iterator |
Push | Promise | Observable |
如,下面是一个 Observable,它在订阅时立即(同步)推送值 1、2、3,并且从 subscribe 调用开始后过 1 s 再推送值 4,然后结束。
import { Observable } from 'rxjs';
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
});
要调用 Observable 并查看这些值,我们需要订阅它:
import { Observable } from 'rxjs';
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
});
console.log('Before subscribe');
observable.subscribe({
next(x) { console.log('Next: ' x); },
error(err) { console.error('Error: ' err); },
complete() { console.log('Complete'); }
});
console.log('After subscribe');
// Before subscribe
// Next: 1
// Next: 2
// Next: 3
// After subscribe
// Next: 4
// Complete
# Pull vs Push
Pull 和 Push 是两种不同的协议,描述了数据生产者和数据消费者如何进通信。
什么是 Pull? 在 Pull 系统中,消费者决定什么时候从数据生产者中接收数据。数据生产者自己对什么时候数据被传递到消费者没有感知。
每个 JavaScript 函数都是一个 Pull 系统。函数是数据的生产者,调用函数的代码通过从其调用中 pull 出单个返回值来使用它。
ES 2015 中介绍了生成器函数和迭代器 (opens new window)(function *),也属于 Pull 系统。调用 iterator.next() 的代码是消费者,从迭代器(生产者)中拉出多个值。
PRODUCER | CONSUMER | |
|---|---|---|
Pull | Passive:produces data when requested | Active:decides when data is requested |
Push | Active:produces data at its own pace | Passive:reacts to received data |
什么是 Push ? 在 Push 系统中,生产者决定什么时候推送数据给消费者。数据消费者自己对什么时候数据被接收到没有感知。
Promise 是目前 JavaScript 中最常见的 Push 系统类型。Promise (生产者)传递一个 resolved 的值给注册的回调(消费者),不过和函数不一样,Promise 自己负责精准确定该值何时 push 到回调。
RxJS 引入了 Observable,一个新的 JavaScript Push 系统。Observable 是一个多值生产者,推送数据给 Observer(消费者)。
- 函数是一种惰性求值计算,在调用时同步返回单个的值。
- 生成器是一种惰性求值计算,在迭代时同步返回 0 个或到可能无限多个值。
- Promise是一种可能(或可能不会)最终返回单个值的计算。
- Observable是一种惰性求值计算,从调用时起可以同步或异步地返回 0 个或到可能无限多个值。
# Observables as generalizations of functions
Observable 不像 EventEmitter 也不像 Promise 用于多个值。在一些情况下 Observable 会表现地像 EventEmitter,如当使用 RxJS 的 Subject 进行多播时,但通常它们的行为不像 EventEmitter。
代码语言:javascript复制Observable 类似于零参数的函数,但将它们泛化为允许多个值。
function foo () {
console.log('Hello');
return 42;
}
const x = foo.call(); // same as foo()
console.log(x);
// Hello
// 42
const y = foo.call(); // same as foo()
console.log(y);
// Hello
// 42
使用 Observable 改写上面的代码:
import { Observable } from 'rxjs';
const foo = new Observable(subscriber => {
console.log('Hello');
subscriber.next(42);
});
foo.subscribe(x => {
console.log(x);
});
// Hello
// 42
foo.subscribe(y => {
console.log(y);
});
// Hello
// 42因为 函数 和 Observable 都是惰性计算。如果你不调用函数,console.log('Hello') 就不会被执行。同样对于 Observable,如果你不“调用”它(使用 subscribe), console.log('Hello') 也不会被执行。另外,“调用”和“订阅”是一个孤立的操作:两个函数调用触发两个单独的副作用,两个 Observable 订阅触发两个单独的副作用。和 EventEmitter 共享副作用并且无论订阅者是否存在都立即触发相反,Observable 没有共享执行并且是惰性计算。
订阅一个 Observable 就是调用一个函数。
部分人觉得 Observable 是异步的,这并不是真的。
console.log('before');
console.log(foo.call());
console.log('after');
// before
// Hello
// 42
// after
使用 Observable 会观察到和函数一样的输出:
console.log('before');
foo.subscribe(x => {
console.log(x);
});
console.log('after');
// before
// Hello
// 42
// after
这说明,对 foo 的订阅完全是同步的,就像一个函数一样。
Observable既能同步也可以异步地传递值。
那 Observable 和函数之间的区别是什么?Observable 可以随着时间推移“返回”多个值,这是函数无法做到的。
function foo () {
console.log('Hello');
return 42;
return 100; // dead code, will never happen
}
函数只能返回一个值,而 Observable 可以返回多个值:
import { Observable } from 'rxjs';
const foo = new Observable(subscriber => {
console.log('Hello');
subscriber.next(42);
subscriber.next(100);
subscriber.next(200);
});
console.log('Before');
foo.subscribe(x => {
console.log(x);
});
console.log('After');
// Before
// Hello
// 42
// 100
// 200
// After
也可以异步地返回值:
代码语言:javascript复制import { Observable } from 'rxjs';
const foo = new Observable(subscriber => {
console.log('Hello');
subscriber.next(42);
subscriber.next(100);
subscriber.next(200);
setTimeout(() => {
subscriber.next(300);
}, 1000);
});
console.log('Before');
foo.subscribe(x => {
console.log(x);
});
console.log('After');
// Before
// Hello
// 42
// 100
// 200
// After
// 300
结论:
func.call()表示同步地返回一个值observable.subscribe()表示同步或异步地返回 0 或多个值
# Anatomy of an Observable
Observable 使用 new Observable 或一个创建操作符来 created,会被 Observer subscribed,execute 来向 Observer 传递 next / error / complete 通知,并且他们的执行可能会被 disposed。这四个方面都编码字在 Observable 实例中,当其中一些与其他类型相关,如 Observer 和 Subscription。
Observable 核心关注点:
- Creating Observables
- Subscribing to Observables
- Executing the Observable
- Disposing Observables
# Creating Observables
Observable 构造函数接受一个参数:subscribe 函数
import {Observable} from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
const id = setInterval(() => {
subscriber.next('hi');
}, 1000);
});
Observable 可以使用
new Observable来创建。通常,Observable使用创建函数如of、from、interval等来创建。
# Subscribing to Observables
代码语言:javascript复制observable.subscribe(x => {
console.log(x);
});
这不是个巧合,observable.subscribe 和 new Observable(function subscribe(subscriber) {}) 的 subscribe 有相同的名字。在库中,它们是不一样的,不过在实践中可以认为它们在概念上是一样的。
这表示订阅调用不会在同一个 Observable 的多个 Observer 之间共享。当使用 Observer 调用 observable.subscribe 时,new Observable(function subscribe(subscriber) {}) 中的 subscribe 函数为给定的 subscriber 运行。对 observable.subscribe 的每次调用都会为给定的 subscriber 触发其对应的设置。
对于
Observable的订阅就像调用一个函数,提供了可以传递数据的回调。
这和 addEventListener / removeEventListener 等事件处理程序 API 完全不同。使用 observable.subscribe,给定的 Observer 不会在 Observable 中注册为监听器。Observable 甚至不维护一个 Observer 列表。
订阅调用只是一种启动 Observable 执行并将值或时间传递给该执行的 Observer 的方法。
# Executing Observables
new Observable(function subscribe(subscriber) {}) 里面的代码表示 Observable 的执行,只发生在每个订阅的 Observer 上的惰性计算。执行会随着时间的推移,同步或异步地产生多个值。
Observable 执行可以传递的值类型:
Next通知:发送一个值,如Number、String、Object等Error通知:发送一个错误,如ErrorComplete通知:不发送值
Next 通知时最重要也是最常见的类型:它表示发送给订阅者的实际数据。Error 和 Complete 通知在 Observable 执行过程中只可能执行一次,并且只能有一个发生。
import {Observable} from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
Observable 严格遵守协议,在 Complete 通知之后的 Next 通知将不会被发送:
import {Observable} from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
subscriber.next(4); // Is not delivered to subscribers
});
可以在 subscribe 代码外包一层 try/catch 块,以捕获错误:
import {Observable} from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
try {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
} catch (err) {
subscriber.error(err);
}
});
# Disposing Observables Executions
因为 Observable 执行可能是无限的,但是对于 Observer 来说在有限时间内结束执行时常见的需求,因此需要有取消执行的 API。因为每次执行只针对一个 Observer,一旦 Observer 接收到数据,它需要有方法去停止执行,不然会造成计算资源和内存的浪费。
当 observable.subscribe 被调用时,Observer 被附加到新创建的 Observable 执行中,该调用还会返回 Subscription 对象。
const subscription = observable.subscribe(x => console.log(x));
Subscription (opens new window) 表示正在进行的执行,它具有允许你取消该执行的最小 API。
import { from } from 'rxjs';
const observable = from([1, 2, 3]);
const subscription = observable.subscribe(x => console.log(x));
// Later
subscription.unsubscribe();当我们使用 create() 创建 Observable 时,每个 Observable 都必须定义如何处理该执行的资源,如可以在函数 subscribe() 中返回自定义取消订阅函数来实现。
const observable = new Observable(function subscribe (subscriber) {
const intervalId = setInterval(() => {
subscriber.next(Math.random());
}, 1000);
return function unsubscribe () {
clearInterval(intervalId);
};
});就像 observable.subscribe 类似于 new Observable(function subscribe (subscriber) {}), 我们从 subscribe 返回的 unsubscribe 在概念上等同于 subscription.unsubscribe。如果移除围绕在这些概念周围的 ReactiveX 类型,留下的就是原生的 JavaScript。
function subscribe (subscriber) {
const intervalId = setInterval(() => {
subscriber.next(Math.random());
}, 1000);
return function unsubscribe () {
clearInterval(intervalId);
};
}
const unsubscribe = subscribe({
next: x => console.log(x),
error: err => console.error(err),
complete: () => console.log('completed')
});
// Later
unsubscribe();之所以使用像 Observable、Observer 和 Subscription 的 Rx 类型,是为了安全考虑和 Operator 的可组合性。
# Observer
什么是 Observer? Observer 作为消费者消费 Observable 派发的值。Observer 只是一组回调,用于 Observable 派发的每种类型的通知:next, error 和 complete。
const observer = {
next: value => console.log(`Observer got a next value: ${value}`),
error: error => console.error(`Observer got an error: ${error}`),
complete: () => console.log('Observer got a complete notification')
};
// 通过将 observer 对象传递给 `subscribe`,来订阅 observable
observable.subscribe(observer);
Observer只是有三个回调的对象,用于Observable可能派发每种类型的通知。
RxJS 中的 Observer 也可能是部分的。如果没有提供某种回调,Observable 也会正常执行,只不过一些类型的通知会被忽略,因为他们在 Observer 中找不到对应的回调。
const observer = {
next: value => console.log(`Observer got a next value: ${value}`),
error: error => console.error(`Observer got an error: ${error}`)
};
在订阅 Observable 时,也可以不用将回调放在一个 Observer 对象中,只传一个 next 回调函数作为参数就可以。
observable.subscribe(value => console.log(`Observer got a next value: ${value}`));
在 observable.subscribe 内部,将使用参数中的回调函数作为下一个处理程序创建一个 Observer 对象。


