# RxJS Note

# 什么是RxJS?

RxJS全称 Reactive Extensions for JavaScript, RxJS是一个库,它通过使用 observable 序列来编写异步和基于事件的程序。

RxJS是结合了函数式编程、观察者模式、迭代器模式的产物

  1. 函数式编程, 可以参考阮一峰[函数式编程初探](函数式编程初探 - 阮一峰的网络日志)
  2. 观察者模式, 典型的有DOM EventListener
  3. 迭代器模式, 典型的有ES6 Iterater

# RxJS内容

# 2-1基本概念
  • Observable(被观察者)

  • Observer(观察者)

  • Operators(操作符)

  • Subscribtion(订阅)

  • Subject(主题)

  • Scheduler(调度器)

    另: 弹珠图

# 2-2:概念解析

Observable核心关注点:

  • 创建Observable : Observable.create等创建类操作符
  • 订阅Observable: observable.subscribe(observer)
  • 执行Observable: observer.next()、observer.error()、observer.complete()
  • 取消Observable: subscribtion.unsubscribe()

创建Observable的方法示例:

//公式表示Observable
Observable = Publisher + Iterator
// 01
var observable = Rx.Observable.create(function subscribe(observer) {
  var id = setInterval(() => {
    observer.next("hi")
  }, 1000);
});
// 02
var observable = Rx.Observable.of(...)
// 03 
var observable = Rx.Observable.from(...)
// 04
var observable = Rx.Observable.interval(1000)
// etc....

Observable注意事项:

  1. 延迟计算: 不订阅就不会执行

一个observable对象不订阅的话就不会执行

  1. 渐进式取值: 示例代码
var source = Rx.Observable.from([1,2,3]);
var example = souce
            .filter(x => x % 2 === 0)
            .map(x => x + 1)

example.subscribe(console.log);
// 执行过程
// step1: 1 ==> filter ==> map
// step2: 2 ==> filter ==> map
// step3: 3 ==> filter ==> map
// 一个元素会执行到底

Observer:

Observer即Observable产生数据后的消费者就是Observer,Observer是一组函数调用的集合,每个回调函数对应一种 Observable 发送的通知类型:nexterror 和 complete

典型的observer对象:

const observer = {
  next: x => console.log("Observer got a next value: " + x),
  error: err => console.error("Observer got an error: " + err),
  complete: () => console.log("Observer got a complete notification"),
};
//另外一种形式
observable.subscribe(console.log)
// 👆的代码当subscribe方法不传入, error, complete 的时候, 会把第一个参数默认为next方法

使用观察者的时候必须把observer提供给Observable的subscrib方法

Observable又分为两种:

  • Cold Observable =>类似电视台播放节目, cold observable可以理解成每次subscribe后都产生一个“生产者”, 然后这个生产者产生的数据通过next函数传递给订阅的Observer.

    const cold$ = new Observable(observer = > {
        const producer = new Producer()
        // observer 接受producer产生的数据并处理
    })
    
  • Hot Observable => 类似视频点播网站, youku,bilibili etc... hot observable概念上是有一个独立于Observable对象的“生产者”,这个“生产者”的创建和subscribe调用没有关系,subscribe调用只是让Observer连接上“生产者”而已

    const producer = new Producer()
    const hot$ = new Observable(observer => {
        // obsever 接受并处理producer
    })
    

# Subscription

Subscription表示的是一个可清理的资源对象, 即Observable订阅执行后的结果, Subscription基本上只有一个unsubscribe方法

var subscription = observable.subscribe(observer)

//基本上只有一个unsubscribe方法
var observable1 = Rx.Observable.interval(400);
var observable2 = Rx.Observable.interval(300);

var subscription = observable1.subscribe(x => console.log("first: " + x));
var childSubscription = observable2.subscribe(x => console.log("second: " + x));

subscription.add(childSubscription);

setTimeout(() => {
  // subscription 和 childSubscription 都会取消订阅
  subscription.unsubscribe();
}, 1000);

# Operators

大概有100多个操作符,可分为一下几大类

  1. 创建类: create,of,range,from,fromEvent,fromEventPattern,ajax,repeat....
  2. 合并类: concat,merge,zip,race,startWith,forkJoin,switch...
  3. 辅助类: count,max,Reduce,every,find,findIndex,isEmpty,defaultIfEmpty
  4. 过滤类: filter,first,take,last,takeUtil,skip...
  5. 转化类: map,mapTo,scan,concatMap,mergeMap,switchMap,exhaustMap...

所有的操作符处理过的Observable都是一个全新的Observable对象,实现了数据的纯净性(Purity),即数据是不可变的Immutable

# Subject

什么是 Subject?

RxJS Subject 是一种特殊类型的 Observable,它允许将值多播给多个观察者,所以 Subject 是多播的,而普通的 Observables 是单播的(每个已订阅的观察者都拥有 Observable 的独立执行)

Subject既可以当Observable又可以当Observer

下面代码的subject可以看作是一个observable

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log("observerA: " + v)
});
subject.subscribe({
  next: (v) => console.log("observerB: " + v)
});

subject.next(1);
subject.next(2);
// output:👇
observerA: 1
observerB: 1
observerA: 2
observerB: 2

subject也可看作observable

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log("observerA: " + v)
});
subject.subscribe({
  next: (v) => console.log("observerB: " + v)
});

var observable = Rx.Observable.from([1, 2, 3]);

observable.subscribe(subject); // 你可以提供一个 Subject 进行订阅

//output: 👇
observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

Subjects 是将任意 Observable 执行共享给多个观察者的唯一方式,也就是多播,多播的定义就是一个Observable可以被多个observer订阅

# Scheduler

Scheduler(时间调度器)

什么是调度器? - 调度器控制着何时启动 subscription 和何时发送通知。它由三部分组成:

  • 调度器是一种数据结构。 它知道如何根据优先级或其他标准来存储任务和将任务进行排序。
  • 调度器是执行上下文。 它表示在何时何地执行任务(举例来说,立即的,或另一种回调函数机制(比如 setTimeout 或 process.nextTick),或动画帧)。
  • 调度器有一个(虚拟的)时钟。 调度器功能通过它的 getter 方法 now() 提供了“时间”的概念。在具体调度器上安排的任务将严格遵循该时钟所表示的时间。

在下面的示例中,我们采用普通的 Observable ,它同步地发出值123,并使用操作符 observeOn 来指定 async 调度器发送这些值。

var observable = Rx.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
})
.observeOn(Rx.Scheduler.async);

console.log("just before subscribe");
observable.subscribe({
  next: x => console.log("got value " + x),
  error: err => console.error("something wrong occurred: " + err),
  complete: () => console.log("done"),
});
console.log("just after subscribe");

//output:
just before subscribe
just after subscribe
got value 1
got value 2
got value 3
done

调度器类型

  • null: 不传递任何调度器的话,会以同步递归的方式发送通知。用于定时操作或尾递归操作。
  • quene: 当前事件帧中的队列调度(蹦床调度器)。用于迭代操作。
  • asap: 微任务的队列调度,它使用可用的最快速的传输机制,比如 Node.js 的 process.nextTick() 或 Web Worker 的 MessageChannel 或 setTimeout 或其他。用于异步转换
  • async: 使用 setInterval 的调度。用于基于时间的操作符。
  • animationFrame: 用于动画场景

Scheduler处理Observable数据有两个操作符: subscribeOn、observeOn

先说subscribeOn,它的作用:

  • 改变源数据(source observables)的执行时机
  • 只能使用一次

observeOn,它的作用:

  • 改变Notifications的执行时机,即Observer中next, error, complete函数
  • 能够用于每个操作符前面, 即可以多次使用

Scheduler参考资料

弹珠图:

Image for danzhu

# 参考(学习)资料

# 框架相关

  • Angular: 内置RxJS,其中典型的httpClient

# 哪些项目使用了RxJS

Last Updated: 2020年11月14日星期六晚上9点51分