Rxjs简介

RxJS是ReactiveX编程理念的JavaScript版本。ReactiveX来自微软,它是一种针对异步数据流的编程。简单来说,它将一切数据,包括HTTP请求,DOM事件或者普通数据等包装成流的形式,然后用强大丰富的操作符对流进行处理,使你能以同步编程的方式处理异步数据,并组合不同的操作符来轻松优雅的实现你所需要的功能。

Reactivex简介

ReactiveX 是一个基于一系列可观察的异步和基础事件编程组成的一个库。

它继承观察者模式,支持序列数据或者事件。更高级的用法允许你将如下的一些抽象概念操作一起联合使用,比如低线程,同步,线程安全,数据并发,非阻塞I/O流。

它通常被称为“函数响应式编程”,这是用词不当的。ReactiveX 可以是函数式的,可以是响应式的,但是和“函数响应式编程”是不同的概览。一个主要的不同点是“函数响应式编程”是对随着时间不停变化的值进行操作的,而ReactiveX是对超时提交产生的离散值上。

Why Rxjs

我们可能会问自己,为什么使用RxJS?不是有Promise吗?没错,Promise对于只期望单值返回的异步请求(比如:XMLHttpRequest)是一个好的解决方案。但Reactive Extensions for JavaScript对Promises, callbacks , Web Workers, Web Sockets进行了统一优化. 一旦我们统一优化这一些概念后,我们将能更好的进行开发。

为了更好的理解,请看下面的这个例子:
搜索是前端开发中很常见的功能,一般是监听<input />的keyup事件,然后将内容发送到后台,并展示后台返回的数据。

<input id="text"></input>
<script>
    var text = document.querySelector('#text');
    text.addEventListener('keyup', (e) =>{
        var searchText = e.target.value;
        // 发送输入内容到后台
        $.ajax({
            url: `/search/${searchText}`,
            success: data => {
              // 拿到后台返回数据,并展示搜索结果
              render(data);
            }
        });
    });
</script>

上面代码实现我们要的功能,但存在两个较大的问题:

  • 多余的请求

当想搜索“爱迪生”时,输入框可能会存在三种情况,“爱”、“爱迪”、“爱迪生”。而这三种情况将会发起 3 次请求,存在 2 次多余的请求。

  • 已无用的请求仍然执行

一开始搜了“爱迪生”,然后马上改搜索“达尔文”。结果后台返回了“爱迪生”的搜索结果,执行渲染逻辑后结果框展示了“爱迪生”的结果,而不是当前正在搜索的“达尔文”,这是不正确的。

减少多余请求数,可以用 setTimeout 函数节流的方式来处理,核心代码如下:

<input id="text"></input>
<script>
    var text = document.querySelector('#text'),
        timer = null;
    text.addEventListener('keyup', (e) =>{
        // 在 250 毫秒内进行其他输入,则清除上一个定时器
        clearTimeout(timer);
        // 定时器,在 250 毫秒后触发
        timer = setTimeout(() => {
            console.log('发起请求..');
        },250)
    })
</script>

已无用的请求仍然执行 的解决方式,可以在发起请求前声明一个当前搜索的状态变量,后台将搜索的内容及结果一起返回,前端判断返回数据与当前搜索是否一致,一致才走到渲染逻辑。最终代码为:

<input id="text"></input>
<script>
    var text = document.querySelector('#text'),
        timer = null,
        currentSearch = '';

    text.addEventListener('keyup', (e) =>{
        clearTimeout(timer)
        timer = setTimeout(() => {
            // 声明一个当前所搜的状态变量
            currentSearch = '书'; 

            var searchText = e.target.value;
            $.ajax({
                url: `/search/${searchText}`,
                success: data => {
                    // 判断后台返回的标志与我们存的当前搜索变量是否一致
                    if (data.search === currentSearch) {
                        // 渲染展示
                        render(data);
                    } else {
                        // ..
                    }
                }           
            });
        },250)
    })
</script>

上面代码基本满足需求,但代码开始显得乱糟糟。我们来使用RxJS实现上面代码功能,如下:

var text = document.querySelector('#text');
var inputStream = Rx.Observable.fromEvent(text, 'keyup')
                    .debounceTime(250)
                    .pluck('target', 'value')
                    .switchMap(url => Http.get(url))
                    .subscribe(data => render(data));

RxJS能简化你的代码,它将与流有关的内部状态封装在流中,而不需要在流外定义各种变量来以一种上帝视角控制流程。Rx的编程方式使你的业务逻辑流程清晰,易维护,并显著减少出bug的概率。

安装方法

  • 通过 npm 安装 ES6 版本

npm install rxjs

导入整个核心功能集:

import Rx from 'rxjs/Rx';
Rx.Observable.of(1,2,3)

通过打补丁的方式只导入所需要的(这对于减少 bundling 的体积是十分有用的):

import { Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/operator/map';
Observable.of(1,2,3).map(x => x + '!!!'); // 等等

只导入需要的并且使用被提议的绑定操作符:
注意:这个额外的预发需要编译器支持并且此语法可能会在没有任何通知的情况下完全从 TC39 撤回!要使用的话需要你自己来承担风险。

import { Observable } from 'rxjs/Observable';
import { of } from 'rxjs/observable/of';
import { map } from 'rxjs/operator/map';
Observable::of(1,2,3)::map(x => x + '!!!'); // 等等
  • 通过 npm 安装 CommonJS 版本

npm install rxjs

导入所有核心功能:

var Rx = require('rxjs/Rx');
Rx.Observable.of(1,2,3); // 等等

通过打补丁的方式只导入所需要的(这对于减少 bundling 的体积是十分有用的):

var Observable = require('rxjs/Observable').Observable;
// 使用适合的方法在 Observable 上打补丁
require('rxjs/add/observable/of');
require('rxjs/add/operator/map');
Observable.of(1,2,3).map(function (x) { return x + '!!!'; }); // 等等

导入操作符并手动地使用它们(这对于减少 bundling 的体积也十分有用):

var of = require('rxjs/observable/of').of;
var map = require('rxjs/operator/map').map;
map.call(of(1,2,3), function (x) { return x + '!!!'; });

还可以使用上面的方法来构建你自己的 Observable 并将其从你自己的模块中导出。

  • 使用 TypeScript 的 CommonJS 模式

当使用 RxJS 时收到了像 error TS2304: Cannot find name 'Promise' 或 error TS2304: Cannot find name 'Iterable' 这样的报错信息,那么你可能需要安装额外的 typings 。
对于使用 typings 的用户:

typings install es6-shim --ambient

如果没有使用 typings 的话,可以从 /es6-shim/es6-shim.d.ts 拷贝定义好的接口。
tsconfig.json 或 CLI 参数中添加类型定义文件。

  • 通过 npm 所有全模块类型 (CJS/ES6/AMD/TypeScript)

要安装这个库需要 npm 3及以上版本,使用下面的命令行:

npm install @reactivex/rxjs

如果你使用的还是 npm 2的话,那么在这个库升级至稳定版之前,需要明确地指定库的版本号:

npm install @reactivex/rxjs@5.0.0-beta.1
  • CDN

对于 CDN,可以使用 BootCDN。只需要用当前的版本号来替换下面链接中的 version:

https://cdn.bootcss.com/rxjs/6.0.0-beta.3/rxjs.umd.min.js

核心概念

  • Stream

流是在时间流逝的过程中产生的一系列事件。它具有时间与事件响应的概念。
我们可以把一切输入都当做数据流来处理,比如说:
- 用户操作
- 网络响应
- 定时器
- Worker

  • Observable

可观察序列,简单来说数据就在Observable中流动,你可以使用各种operator对流进行处理

  • Observer

observer(观察者),表示的是对序列结果的处理方式

  • Operator

操作符(纯函数)

  • Subject

Subject是一种能够发射数据给多个observer的Observable, 这让Subject看起来就好像是EventEmitter。

  • Scheduers

调度器, 控制并发行为

  • Subscription

订阅器,Observable动作执行者

模式

RxJS是基于观察者模式,迭代器模式和函数式编程:

  • 观察者模式

window.addEventListener('click', function(){
  console.log('click!');
})

JS的事件监听就是天生的观察者模式。给window的click事件(被观察者)绑定了一个listener(观察者),当事件发生,回调函数就会被触发

  • 迭代器模式

迭代器模式,提供一种方法顺序访问一个聚合对象中的各种元素,而又不暴露该对象的内部表示。
ES6里的Iterator即可实现:

let arr = ['a', 'b', 'c'];
let iter = arr[Symbol.iterator]();
iter.next() // { value: 'a', done: false }
iter.next() // { value: 'b', done: false }
iter.next() // { value: 'c', done: false }
iter.next() // { value: undefined, done: true }
  • 函数式编程

提到函数式编程,就要提到声明式编程和命令式编程
函数式编程是声明式编程的体现
问题:将数组[1, 2, 3]的每个元素乘以2,然后计算总和。
- 命令式编程

const arr = [1, 2, 3];
let total = 0;
for(let i = 0; i < arr.length; i++) {
  total += arr[i] * 2;
}
- 声明式编程
const arr = [1, 2, 3];
let total = arr.map(x => x * 2).reduce((total, value) => total + value)

声明式的特点是专注于描述结果本身,不关注到底怎么到达结果。而命令式就是真正实现结果的步骤

声明式编程把原始数据经过一系列转换(map, reduce),最后得到想要的数据

现在前端流行的MVC框架(Vue,React,Angular),也都是提倡:编写UI结构时使用声明式编程,在编写业务逻辑时使用命令式编程

开始使用

  • 创建可观察序列

创建一个序列有很多种方式,我们仅列举常用的几种:

Observable.of(...args)

将普通JavaScript数据转为可观察序列。

Observable.fromPromise(promise)

将Promise转化为Observable。

Observable.fromEvent(elment, eventName)

从DOM事件创建序列,例如Observable.fromEvent($input, 'click')。

Observable.ajax(url | AjaxRequest)

发送http请求,AjaxRequest

Observable.create(subscribe)

这个属于万能的创建方法,很少会用到,在你用这个方法之前先想想能不能用RxJS上的类方法来创建你所需要的序列。

  • 合并序列

合并序列也属于创建序列的一种,例如有这样的需求:进入某个页面后拿到了一个列表,然后需要对列表每一项发出一个http请求来获取对应的详细信息,这里我们把每个http请求作为一个序列,然后我们希望合并它们。
合并有很多种方式,例如N个请求按顺序串行发出(前一个结束再发下一个);N个请求同时发出并且要求全部到达后合并为数组,触发一次回调;N个请求同时发出,对于每一个到达就触发一次回调。
如果不用RxJS,我们会比较难处理这么多情形,不仅实现麻烦,维护更麻烦,下面是使用RxJS对上述需求的解决方案:

const ob1 = Observable.ajax('api/detail/1');
const ob2 = Observable.ajax('api/detail/2');
...
const obs = [ob1, ob2...];
// 分别创建对应的HTTP请求。
  • N个请求按顺序串行发出(前一个结束再发下一个)
Observable.concat(...obs).subscribe(detail => console.log('每个请求都触发回调'));
  • N个请求同时并行发出,对于每一个到达就触发一次回调
Observable.merge(...obs).subscribe(detail => console.log('每个请求都触发回调'));
  • N个请求同时发出并且要求全部到达后合并为数组,触发一次回调
Observable.forkJoin(...obs).subscribe(detailArray => console.log('触发一次回调'));
  • 创建多路推送序列

Subject是一种可以多路推送的可观察对象。与EventEmitter类似,Subject维护着自己的Observer。

每一个Subject都是一个Observable(可观察对象) 对于一个Subject,你可以订阅(subscribe)它,Observer会和往常一样接收到数据。从Observer的视角看,它并不能区分自己的执行环境是普通Observable的单路推送还是基于Subject的多路推送。

Subject的内部实现中,并不会在被订阅(subscribe)后创建新的执行环境。它仅仅会把新的Observer注册在由它本身维护的Observer列表中,这和其他语言、库中的addListener机制类似。

每一个Subject也可以作为Observer(观察者) Subject同样也是一个由next(v),error(e),和 complete()这些方法组成的对象。调用next(theValue)方法后,Subject会向所有已经在其上注册的Observer多路推送theValue。

下面的例子中,我们在Subject上注册了两个Observer,并且多路推送了一些数值:

var subject = new Rx.Subject();
subject.subscribe({
  next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({
  next: (v) => console.log('observerB: ' + v)
});
subject.next(1);
subject.next(2);

控制台输出结果如下:

observerA: 1
observerB: 1
observerA: 2
observerB: 2
  • 扁平化流

有时候,我们的Observable送出的是一个新的Observable:

var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.of(1, 2, 3));
source.subscribe(value => {
  console.log(value)
});

这里,console打印出来的是对象,而不是我们想要的1,2,3,这是因为map返回的Rx.Observable.of(1, 2, 3)本身也是个Observable

用图表示如下:

click  : ------c------------c--------
      map(e => Rx.Observable.of(1,2,3))
source : ------o------------o--------
              \            \
               (123)|       (123)|

因此,我们订阅到的value值就是一个Observable对象,而不是普通数据1,2,3

我想要的其实不是Observable本身,而是属于这个Observable里面的那些东西,现在这个情形就是Observable里面又有Observable,有两层,可是我想要让它变成一层就好,该怎么办呢?

这就需要把Observable扁平化

const arr = [1, [2, 3], 4];
// 扁平化后:
const flatArr = [1, 2, 3, 4];

concatAll这个操作符就可以把Observable扁平化

var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.map(e => Rx.Observable.of(1, 2, 3));
var example = source.concatAll();
example.subscribe(value => {
  console.log(value)
})
click  : ------c------------c--------
      map(e => Rx.Observable.of(1,2,3))
source : ------o------------o--------
              \            \
               (123)|       (123)|
                 concatAll()
example: ------(123)--------(123)------------

flatMap操作符也可以实现同样的作用,就是写法有些不同:

var click = Rx.Observable.fromEvent(document.body, 'click');
var source = click.flatMap(e => Rx.Observable.of(1, 2, 3));
source.subscribe(value => {
  console.log(value)
})
click  : ------c------------c--------
      flatMap(e => Rx.Observable.of(1,2,3))
source: ------(123)--------(123)------------

常用操作符

  • repeat

重复 count 次,源 Observable 发出的值。

  • pip

  • map

对 Observable 对象发出的每个值,使用指定的 project 函数,进行映射处理。

  • mapTo

对 Observable 对象发出的每个值,映射成固定的值。

  • pluck

提取属性值并输出

  • do

不做数据格式化,可用于调试

  • filter

对 Observable 对象发出的每个值,作为参数调用指定的 predicate 函数,若该函数的返回值为 true,则表示保留该项,若返回值为 false,则舍弃该值。

  • take

于获取 Observable 对象发出的前 n 项值,取完后就结束。

  • takeWhile

满足什么条件时开始取数据

  • skip

表示跳过多少条数据后开始取

  • distinctUntilChanged

过滤源 Observable 发出的值,若当前发出的值与前一次值不一致,则发出该值。

  • scan

功能有点类似于Array#reduce这个方法,可用于累加数据同时可以使用startWith的数据用途scan的初始值,最后返回累加的数据

  • delay

表示Observable延时多久开始处理订阅数据

  • toArray

把输出值格式化成数据形式

  • toMap

给当前的输出取个名字或标签

  • expand

实现递归

  • forkJoin

类似于Promise.all,只有数据全部返回且状态为complete时,表示成功处理了请求,否则失败

  • let

这个操作符可以获取完整的输入Observable对象,做相应的处理后返回新的Observable对象

  • catch

用于Observable处理数据异常的处理

  • combineLatest

用于组且各个输入的Observable,并获取和返回各个Observable最新的数据

  • buffer

缓冲源 Observable 对象已发出的值,直到 closingNotifier 触发后,才统一输出缓存的元素。

  • bufferTime

设定源 Observable 对象已发出的值的缓冲时间。

  • bufferCount

缓冲源 Observable对象已发出的值,直到大小达到给定的最大 bufferSize 。

  • concatMap

对每个 Observable 对象发出的值,进行映射处理,并进行合并。该操作符也会先处理前一个 Observable 对象,在处理下一个 Observable 对象。

  • switchMap

对源 Observable 对象发出的值,做映射处理。若有新的 Observable 对象出现,会在新的 Observable 对象发出新值后,退订前一个未处理完的 Observable 对象。

  • first

用于获取 Observable 对象发出的第一个元素,取完后就结束。

  • takeUntil

当 takeUntil 传入的 notifier 发出值时,源 Observable 对象就会直接进入完成状态。

  • takeLast

获取源 Observable 对象发出的,后面 count 项的值。

  • last

获取源 Observable 对象发出的最后一项的值。

  • debounceTime

在设定的时间跨度内,若源 Observable 对象没有再发出新值,则返回最近一次发出的值。

  • throttleTime

从源 Observable 对象发出第一个值开始,忽略等待时间内发出的值,等待时间过后再发出新值。与 debounceTime 不同的是,throttleTime 一开始就会发出值,在等待时间内不会发出任何值,等待时间过后又会发出新的值。

  • distinct

过滤源 Observable 发出的值,确保不会发出重复出现的值。

  • concat

把多个 Observable 对象合并为一个 Observable 对象,Observable 对象会依次执行,即需等前一个 Observable 对象完成后,才会继续订阅下一个。

  • concatAll

合并多个 Observable 对象,并在上一个 Observable 对象完成后订阅下一个 Observable 对象。

  • startWith

在开始发出源 Observable 数据之前发出已设置的参数值,并返回新的 Observable 对象。

  • merge

合并 Observable 对象,并按给定的时序发出对应值。

  • mergeAll

将高阶 Observable 对象转换为一阶Observable 对象,并同时处理所有的 Observable 对象。

  • combineLatest

用于合并输入的 Observable 对象,当源 Observable 对象和 other Observable 对象都发出值后,才会调用 project 函数。

  • zip

根据每个输入 Observable 对象的输出顺序,产生一个新的 Observable 对象。

  • withLatestFrom

当源 Observable 发出新值的时候,根据 project 函数,合并 other Observable 对象此前发出的最新值。

  • switch

切换为最新的 Observable 数据源,并退订前一个 Observable 数据源。

  • delayWhen

delayWhen 的作用跟 delay 操作符类似,最大的区别是 delayWhen 会影响每个元素,而且调用的时候需要设置 delayDurationSelector 函数,该函数的返回值是 Observable 对象。

  • multicast

用于挂载 Subject 对象,并返回一个可链接 (connectable) 的 Observable 对象。

  • publish

用于挂载 Subject 对象,并返回一个可链接 (connectable) 的 Observable 对象。即 publish 操作符与 multicast(new Rx.Subject()) 是等价的。

  • share

share 操作符是 publish + refCount 的简写。

  • retry

发生错误后,重试 count 次数

  • retryWhen

捕获异常 Observable 对象,进行异常处理后,可重新订阅源 Observable 对象。

简单拖拽实例

<style type="text/css">
html, body {
  height: 100%;
  background-color: tomato;
  position: relative;
}
#drag {
  position: absolute;
  display: inline-block;
  width: 100px;
  height: 100px;
  background-color: #fff;
  cursor: all-scroll;
}
</style>
<div id="drag"></div>
const mouseDown = Rx.Observable.fromEvent(dragDOM, 'mousedown');
const mouseUp = Rx.Observable.fromEvent(body, 'mouseup');
const mouseMove = Rx.Observable.fromEvent(body, 'mousemove');

首先给出3个Observable,分别代表3种事件,我们希望mousedown的时候监听mousemove,然后mouseup时停止监听,于是RxJS可以这么写:

const source = mouseDown
.map(event => mouseMove.takeUntil(mouseUp))

takeUntil操作符可以在某个条件符合时,发送complete事件

source: -------e--------------e-----
                \              \
                  --m-m-m-m|     -m--m-m--m-m|

从图上可以看出,我们还需要把source扁平化,才能获取所需数据。

完整代码:

const dragDOM = document.getElementById('drag');
const body = document.body;
const mouseDown = Rx.Observable.fromEvent(dragDOM, 'mousedown');
const mouseUp = Rx.Observable.fromEvent(body, 'mouseup');
const mouseMove = Rx.Observable.fromEvent(body, 'mousemove');
mouseDown
    .flatMap(event => mouseMove.takeUntil(mouseUp))
    .map(event => ({ x: event.clientX, y: event.clientY }))
    .subscribe(pos => {
        dragDOM.style.left = pos.x + 'px';
        dragDOM.style.top = pos.y + 'px';
    })

参考资料