[ING] 攀岩观火之RxJava

从源码角度来分析RxJava的倒U型结构

Posted by 阿呆 on 2018-12-28

前言

对于学习RxJava的一些笔记,加上一些自己的理解,以及一些目前仍存在的疑问

主要参考以下部分(按比例排序):
1.扔物线 /给Android开发者的RxJava详解 https://gank.io/post/560e15be2dca930e00da1083#toc_1
2.CSDN大头鬼译文/深入浅出RxJava系列:https://blog.csdn.net/lzyzsd/article/details/41833541
3.CSDN下一个五年/理解RxJava的lift :https://blog.csdn.net/a910626/article/details/79316121
4.简书野生安卓兽/lift原理解析:https://www.jianshu.com/p/b15d9b3e194e
以及附上Rx官方的文档的中文版:https://mcxiaoke.gitbooks.io/rxdocs/content/topics/Getting-Started.html

定义

RxJava在Github的介绍:

RxJava : a library for composing asynchronous and event-based programs using observable sequences for the Java VM
RxJava是一个基于事件流、实现异步操作的库

作用

实现异步操作,类似于 Android 中的 AsyncTask、Handler的作用

特点

简洁优雅,程序越复杂,这点体现的越明显。它会把一系列复杂的或者嵌套的调用转换成一个链式调用

原理

扩展的观察者模式

RxJava的异步实现,是通过一种扩展的观察者模式来实现的,那么,我们先来了解一下经典的观察者模式

观察者模式

观察者模式面向的需求是:A对象(观察者)对B对象(被观察者)的某种变化高度敏感,需要在B变化的一瞬间作出反应。
观察者模式
把这张图片中的概念抽象出来(Button->被观察者 、 OnClickListener->观察者 、 setOnClickListener()->订阅 、 onClick()->事件 ),这样就由专用的观察者模式(只用于监听控件点击)转变成了通用的观察者模式
观察者模式

而RxJava作为一个工具库,使用的就是通用形式的观察者模式(TODO1:不是说扩展的观察者模式么)

RxJava的观察者模式

四个基本概念

  • Observable
  • Observer
  • subscribe
  • event

Observable和Observer通过subscribe()实现订阅关系,从而Observable可以在需要的时候发出事件来通知Observer

与传统的观察者模式不同,RxJava的事件回调方法除了普通事件 onNext()(相当于 onClick()/onEvent())之外,还定义了两个特殊的事件: onCompleted()和onError()

基本实现

基于以上的概念,RxJava的基本实现主要有三点:

创建Observer

Observer即观察者,它决定了事件触发的时候将有怎样的行为(而Observable则决定了什么时候调用以及调用顺序),RxJava中的Observer接口的实现方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Observer<String> observer = new Observer<String>(){
@Override
public void onNext(String s){
Log.d(tag,"Item: "+s);
}

@Override
public void onCompleted(){
Log.d(tag,"Completed!");
}

@Overridepublic void onError(Throwable e){
Log.d(tag,"Error!");
}
}

除了 Observer 接口之外,RxJava还内置了一个实现了Observer的抽象类:Subscribe。Subcriber对Observer接口进行了一些扩展,但它们的基本使用方式是完全一样的,代码我就不贴了,也是实现这三个方法。

1
2
3
Subscriber<String> subscriber = new Subscriber<String>(){
````onNext/onCompleted/onError
}

不仅在使用方式上一样,实质上,在RxJava的subscribe过程中,Observer也总是会被转换成一个Subscriber再使用。所以,如果你只想使用基本功能,选择 Observer 和 Subscriber 是完全一样的。
它们的区别主要在于两点:

  • onStart():这是Subscriber增加的方法;它的调用时机是,在subscribe刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,如数据的清零或重置。这是一个可选方法,它的默认实现为空。需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行),onStart就不适用了,因为它总在subscribe所发生的线程(TODO2 注册或者订阅动作发生的线程,不一定是主线程)被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubcribe()方法,具体可以在后面的文章中看到。
  • unsubscribe():这是 Subscriber所实现的另一个接口Subscription的方法
创建 Observable

Observable即被观察者,它决定什么时候触发事件以及触发怎样的事件(onNext还是onComplete/onError)。
RxJava使用create()方法来创建一个 Observable。并为他定义事件触发规则:

1
2
3
4
5
6
7
8
9
10
11
12
13
Observable observable = Observable.create(new Observable.OnSubscribe<String>{
@Override
public void call(Subscriber<? super String> subscriber){
// 这里是立即发送数据,因为 这几个字符串是我们呢现在就有的,也可以先请求数据,当数据请求到了,再发送数据
// 例如加上这么一句
// str = getStr(); //getStr是一个耗时操作,所以代码需要等待 str获取到了才发送,getStr 可以是网络请求也可以是IO请求
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
// TODO3 对于这段代码我有一点疑问,call方法应该是在事件被触发的时候被Observabler调用,而学习资料上都说,当观察者Observable被订阅的时候,OnSubcribe的call会自动被调用

下面解释了为什么会立即发送这些事件,而不需要触发条件

这个例子很简单:事件的内容是字符串,而不是一些复杂的对象;事件的内容是已经定好了的,而不像有的观察者模式一样是待确定的(例如网络请求的结果在请求返回之前是未知的);所有事件在一瞬间被全部发送出去,而不是夹杂一些确定或不确定的时间间隔或者经过某种触发器来触发的。总之,这个例子看起来毫无实用价值。但这是为了便于说明,实质上只要你想,各种各样的事件发送规则你都可以自己来写。至于具体怎么做,后面都会讲到,但现在不行。只有把基础原理先说明白了,上层的运用才能更容易说清楚。

事件队列(即发送事件的规则)。create()方法是 RxJava最基本的创造事件序列的方法,基于这个方法,RxJava还提供了一些方法用来创建事件序列,例如:

1.just(T…) // 直接传入对象

1
2
3
Observable observable = Observable.just("Hello","Hi","Aloha")
equals:
onNext("Hello")->onNext("hi")->onNext("Aloha")->onCompleted()

2.from(T[]) / from(Iterable<?extends T>) 将传入的 Iterable ,传入数组或者链表拆分成具体对象后,依次发送出来,再拆分成一个个的对象

1
2
3
String[] words = {"Hello","Hi","Aloha"}
Observable observable = Observable.from(words)
同上一样的事件序列。这些事件序列也就是调用规则,由被观察者Observable决定
Subscribe(.vt 订阅)

创建了Observable和Observer之后,再用subscribe()方法将他们关联起来(让Observable持有Observer的引用)

1
2
observable.subscribe(observer) **or**
observable.subscribe(subscriber)

有人可能注意到,subscribe() 这个方法有点怪,它看起来像是顺序搞反了,成了[observable]订阅了[observer],读起来像是杂志订阅了读者,让人别扭。这种设计单纯是为了API的流式设计,牺牲了这点

我们来看一下 Observable.subscribe(Subscriber)的内部实现(仅核心代码)

1
2
3
4
5
public Subscription subscribe(Subscriber subscriber){
subscriber.onStart(); //先执行观察者的一些初始化方法
OnSubscribe.call(subscriber); //一个接口,内部定义了当subcribe动作执行的时候的一系列动作,也就是发送事件
return subscriber;
}

可以看到,subscribe()做了三件事:
1.调用 SUbscriber.onStart() // 一个可选的准备方法
2.调用Observable中的OnSubscribe.call(Subscriber)。在这里,事件的发送逻辑开始运行(onNext(事件))
3.将传入的SUbscriber作为Subscription返回,这是为了方便unsubscribe()
如图:

除了 subscribe(Observer)和subscribe(Subscriber),subscribe()还支持不完整定义的回调,RxJava会自动根据定义创建出Subscriber,形式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Action1<String> onNextAction = new Action1<String>(){
@Override
public void call(String s){
Log.d(tag,s);
}
}

Action1<String> onErrorAction = new Action1<Throwable>(){
@Override
public void call(Throwable throwable){
// Error handling
}
}

Action0 onCompletedAction = new Action0(){
@Override
public void call(){
//.....
}
}

可以看到 Action后面的数字代表所需要的参数个数,error和next都是1,而completed是0

小结:

要弄清楚什么是事件,什么是发送事件,事件就是上面屡屡用到的字符串,也就是 onNext方法中的参数,将事件传入onNext方法实际上就是发送事件,注意,是传入的这个动作就是发送事件,而onNext则是对事件的处理,不能把笼统地说onNext称之为发送事件,这是我个人的理解;这就好比说,onNext是发送事件,那什么是执行事件呢?所以懂了吧哈哈.

举个例子:

1.打印字符串数组

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
String[] names = {"Hello","yudan and ","zhuzi"};
Observable.from(names)
.subscribe(new Observer<String>{
....
public void onNext(String s){
log.d(TAg,"onNext: "+s);
}
....
});

Output:
onNext: Hello
onNext: yudan and
onNext: zhuzi
onCompleted

注意事件与事件流

2.由id取得图片并显示
由指定的一个 drawable 文件 id drawableRes 取得图片,并显示在 ImageView 中,并在出现异常的时候打印 Toast 报错

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
iv = findViewById(R.id.img_view);
Observable.create(new ObservableOnSubscribe<Drawable>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Drawable> emitter) throws Exception{
Drawable drawable = getResources().getDrawable(R.mipmap.ic_launcher);
Drawable drawable1 = getResources().getDrawable(R.mipmap.source_leifeng);
emitter.onNext(drawable);
emitter.onNext(drawable1);
emitter.onComplete();
}
}).subscribe(new Observer<Drawable>() {
@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onNext(Drawable drawable) {
Log.d(TAG, "onNext: ");
iv.setImageDrawable(drawable);
}

@Override
public void onError(Throwable e) {
Toast.makeText(SetImgActivity.this, "onError: ",Toast.LENGTH_SHORT)
.show();
}

@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
//上面是基于RxJava2.0

在RxJava的默认规则里,事件的发出和消费都是在同一线程的,也就是说,如果只用上面的方法,实现出来的只是一个同步的观察者模式,观察者模式本身的目的就是后台处理,前台回调,因此异步对于RxJava是至关重要的,而要实现异步,则需要用到 RxJava 的另一个概念:Scheduler

线程控制–Scheduler/01 调度器

1.Schedule的API
在RxJava中,通过Schedule来制定每一段代码应该运行在什么样的线程

  • Schedulers.immediate():直接运行在当前线程,相当于不指定线程,这是默认的Scheduler
  • Schedulers.newThread():总是启用新线程,并在新线程执行操作
  • Schedulers.io():I/O操作所使用的Scheduler。行为模式和newThread()差不多,区别在于 io()的内部实现是一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下io()比newThread()更有效率,不要把计算工作放在io()中,可以避免创建不必要的线程
  • Schedulers.computation():计算所使用的Scheduler。这个计算指的是CPU密集型计算,即不会被I/O等操作限制性能的操作,例如图形的计算,它使用固定的线程池(大小为CPU核数)
  • Android还有一个专用的AndroidSchedulers.mainThread() //需要添加上 rxandroid依赖
    有了这几个Scheduler,就可以使用 subscribeOn()和observerOn()两个方法来对线程进行控制了
1
2
3
4
5
6
7
8
9
Observable.just(1,2,3,4)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>{
@Override
public void call(Integer number){
Log.d(TAG,"number:"+number);
}
})

Scheduler的远离我们呢会放在下一章来讲,它是以变换的原理为基础的

变换

RxJava提供了对事件序列进行变换的支持,这是它的核心功能之一,也是大多数人说"RxJava太好用了"的原因;所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或者事件序列,来看API
map

1
2
3
4
5
6
7
8
9
10
11
12
13
Observable.just("imags/logo.png")
.map(new Func1<String,Bitmap>(){
@Override
public Bitmap call(String filePath){
return getBitmapFromPath(filePath);
}
})
.subscribe(new Action1<Bitmap>(){
@Override
public void call(Bitmap bitmap){
showBitmap(bitmap);
}
});

RxJava不仅可以针对事件对象,还可以针对整个事件序列,下面列举一些常见的变化以及分析图

1.map
map

2.flatMap
flatMap

这是一个非常有用但是难以理解的变化,flat:扁平化的意思
感觉 flatMap和from有点类似,区别在于from是对数据源处理,而flatmap是对Observable处理

看一段Sample吧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Student[] students = ...
Subscriber<Course> subscriber = new Subcriber<Course>(){
@Override
public void onNext(Course course){
Log.d(tag,course.getName());
}
....
}

Observable.from(students)
.flatMap(new Func1<Student,Observable<Course>>(){
@Override
public Observable<Course> call(Student student){ //传进来的 student已经是单个的student了
return Observable.from(students.getCourses());
//这里也可以不对student变换,返回一个 Observable<Student>,就相当于只实现了扁平化,但是你要知道 flatMap = 扁平化+变化(可以返回一样的就相当于没用变换)
}
})

扩展:由于可以在嵌套的 Observable中添加异步代码,flatMap也常用于嵌套的异步操作,例如嵌套的网络请求

3.throttleFirst():在每次事件触发后的一定时间间隔内丢弃新的事件。常用作抖动过滤
RxView.clickEvents(button)
.throttleFirst(500,TimeUnit.MILLISECONDS)

此外,RxJava还有很多便捷的方法来实现事件序列的变换,这里就不一一列举了

变换的原理:lift()

这些变换虽然功能各有不同,但实质上都是针对事件序列的处理和再发送,而在RxJava内部,它们是基于同一个基础的变换方法:‘lift(Operator)’

首先看一下 ‘lift()’ 的内部实现(仅核心代码)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public <R> Observable<R> lift(Operator<? entends R,? super T> operator){
return Observable.create(new OnSubscribe<R>(){ //创建一个新的Observable
@Override
public void call(Subscriber<R> subscriber){ //这里的subscriber是与上层的Observable对应的
Subscriber newSubscriber<R,T> = operator.call(Subscriber<R>); // new Subscriber是代理接收者,就是在这里建立起原始Subscriber与新的 Subscriber之间的关系(转换关系)
//newSubscriber<R,T>.onStart(); 这句先忽略,看关键的两句
onSubscribe<T>.call(newSubscriber<R,T>); //这个中转的 newSubscriber 订阅上层的Observable(通过调用 onSubscribe.call),直到遇到最顶级的Observable(事件发射源),这里就是实现了订阅的向上级传递(或者叫做通知上层可以发送消息了)
// 我们可以看到,这个新构造的Observable<R>,当它被订阅的时候(它本身的onSubscribe<R>执行),它的内部逻辑是,通知原来的Observable<T>(通过调用T层的onSubscribe<T>.call,完成通知),这是向上传递通知的过程
// 向下发送消息的过程实际上就是不断地调用每一个操作符的中间层 newSubscriber<R,T>的事件函数(onSubscribe.call 的内部就是调用它的事件发送函数),并经过operator的事件转换,传递给下层需要的事件 R;这是事件向下发送的过程
// 以上的 OnSubscribe.call 方法的逻辑基于这是一个lift操作
// 理解什么是订阅,订阅就是将一个subscriber对象传递到上层的 OnSubscribe接口的call方法中
// 上面的构造是在 onSubscribe里面调用onSubscribe,是一个向上传递的过程,也能对应到下图所示,但是原来的 subscriber不具备向上订阅的能力,类型不同,所以需要一个转换,这个转换既完成了向上订阅的可能,又完成了向下传递事件的可能,核心就是对这个subscriber的转换,将Subscriber<R>(接受R事件)转换成 Subscriber<T,R>(接收T事件,并转换成R事件)
}
})
}

我们来看一个具体的 Operator 的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Integer-->String
observable.lift(new Observable.Operator<String, Integer>() { //这个Operator就是返回 NewSubscriber,它具备订阅 Integer 事件源的能力
@Override
public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
// 将事件序列中的 Integer 对象转换为 String 对象
return new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
subscriber.onNext("" + integer); //integer转换成string发送给Subscriber<R(String)>
}

@Override
public void onCompleted() {
subscriber.onCompleted();
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
};
}
});

流程如下(注意看 OnSubscribe 和 SUbscriber 的颜色是不一样的,代表它们不是一个层级的)

1.单层lift
单层lift

2.多层lift
多层lift

lift流程梳理
1.从流的角度来理解。
2.Observable转化为Observable
Operator生成了Subscriber<T,R>。
Observable向Subscriber<T,R>发送事件,发送的数据类型是T,通过转换器转化为R,然后发送给Subscriber
3.流的路线是:
Observable -> Subscriber<T,R> -> Observable -> Subscriber.
4.即,在observable执行了lift操作后,会返回一个新的observable,这个新的observable像代理一样,负责接收原始的observable发出的数据,处理后发送给Subscriber。

中间层返回的虽然是 Observable,但它也是一个 Subscriber(就是因为它内部构造了一个newSubscriber,可以向上订阅,否则无法订阅(即便它持有下层subscriber的引用),因为下层事件和上层不同)

compose:对Observable整体的变换

除了lift之外,Observable还有一个变换方法叫做compose(Transformer)
它和lift()的主要区别在于,lif()是针对事件项和事件序列的,而compose()是针对Observable本身进行变换的,其实就是组合一些固定的lift,封装起来,看Sample:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Observable1
.lift1()
.lift2()
.lift3()
.lift4()
.subscribe(subscriber1);

Observable2
.lift1()
.lift2()
.lift3()
.lift4()
.subscribe(subscriber2);

Observable2
.lift1()
.lift2()
.lift3()
.lift4()
.subscribe(subscriber3)

你觉得这样太不软件工程了,于是你改成这样:

1
2
3
4
5
6
7
8
9
10
11
private Observable liftAll(Observable observable) {
return observable
.lift1()
.lift2()
.lift3()
.lift4();
}
...
liftAll(observable1).subscribe(subscriber1);
liftAll(observable2).subscribe(subscriber2);
liftAll(observable3).subscribe(subscriber3);

这是一种常见的处理方法,类似于工厂方法,将外部方法定义成内部的,方法便自动拥有了对象的实例,而不需要在外部接收对象参数

总结就是:lift是对事件序列的变换,而compose是对Observable的变换(其实也是lift变化的组合,但是事件序列对于开发者而言已经不可见)

线程控制:Scheduler②

ObserveOn()指定的是Subscriber的线程,而这个Subscriber并不是(严格的说[不一定是],但这里不妨理解为[不是])subscribe()参数中的Subscriber,而是observeOn()执行的当前Observable所对应的Subscriber,即它的直接下级Subscriber

API

1
2
3
4
5
6
7
8
Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(mapOperator) // 新线程,由 observeOn() 指定
.observeOn(Schedulers.io())
.map(mapOperator2) // IO 线程,由 observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber); // Android 主线程,由 observeOn() 指定
Scheduler的原理

线程切换的知识,要结合lift的源码理解比较好理解

1
2
3
4
5
6
7
8
9
10
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber); //这里可以切换发送的线程 Observer的线程
newSubscriber.onStart();
onSubscribe.call(newSubscriber); //这里可以切换 Observable的线程,并且它决定了上层的整个call的所处线程(包括向上订阅和向下发送两个过程),在没有使用 observeOn的情况下,整个流程都跑在subscribeOn指定的线程,当然,observeOn则可以改变这一现状,将发送的逻辑指定到新的线程
}
});
}

形象化:

subscribeOn():

subscribeOn

observeOn():

observeOn

注意一个问题、
subscribeOn只有最上层的subscribeOn起效果,且它会影响除此

1
2
3
4
5
6
7
8
Observable.create()
.lift() 2
.observeOn(Thread 1)
.lift() 1
.subscribeOn(Thread 2)
.observeOn(Thread 3)
.subscribe(subscriber) 3
//这段逻辑分析的正确性有待验证
延伸:doOnSubscribe()

前面讲 Subscriber的时候,提到过Subscriber的’onStart()'可以用作流程开始前的初始化,然而由于’onStart()'在subscribe()发生时就被调用了,因此不能指定线程,而是只能执行在 subscribe()被调用时的线程。这就导致如果onStart()中含有对线程有要求的代码(例如UI操作),将会有线程非法的风险,因为有时你无法预测subscribe()将会在什么线程执行

其实按照前面的思路,来理解 doOnSubscribe()将会变得非常简单,好比有两个 SubscribeOn(),分别为1,2,并且对应 Thread1,Thread2 一旦subscribe执行,调用链向上回调,也就是说调用链上剩余的的所有代码(剩余回调部分+事件发送部分)都执行在Thread2,直到遇到新的subscribeOn,新的线程1接管了这个回调,并且由于回调与发送的倒U字型结构,剩余部分的回调和事件发送都将执行在 Thread1()。但是,注意,在Thread1和Thread2之间,可以做一些操作,而且这些操作是处于Thread2,这就是多余的(非第一个subscribeOn)sunscribeOn和doOnSubscribeOn的用处!精彩

TODO/补充一张流程图

TODO/分析subscribeOn与observeOn的源码 (先看下 Schedule和Worker源码,这是基础,不然不理解)

subscribeOn 源码分析

预备知识点:
Scheduler类与Schedulers类分析

1
2
3
4
//a Scheduler is an object that schedules units of work
public abstract class Scheduler{

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
public final Observable<T> subscribeOn(Scheduler scheduler) {
return subscribeOn(scheduler, !(this.onSubscribe instanceof OnSubscribeCreate));
}

// requestOn应该是表示
public final Observable<T> subscribeOn(Scheduler scheduler, boolean requestOn) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return unsafeCreate(new OperatorSubscribeOn<T>(this, scheduler, requestOn));
}

/**
* Subscribes Observers on the specified {@code Scheduler}.
* <p>
* <img width="640" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/subscribeOn.png" alt="">
*
* @param <T> the value type of the actual source
*/
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {

final Scheduler scheduler;
final Observable<T> source;
final boolean requestOn;

public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler, boolean requestOn) {
this.scheduler = scheduler; // 将传进来的调度器保存在本地
this.source = source; // sourse就是parent observable
this.requestOn = requestOn; //暂时不知道这是个什么东西
}

@Override
public void call(final Subscriber<? super T> subscriber) { // 订阅时候的回调,这里应该算是核心地方了
final Worker inner = scheduler.createWorker(); // worker实现了 Subscription 接口,这里的inner由具体的 scheduler决定
// 结合lift的源码来理解;这里的SubscribeOnSubscriber<T>就是那个代理 newSubscriber,但是注意,这里的代码
SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<T>(subscriber, requestOn, inner, source); //这里确定了对应关系
// subscriber.add(Subscription) ,subscriber都是实现了Subscription接口的
subscriber.add(parent);
subscriber.add(inner);

inner.schedule(parent);
// worker.schedule(Action0)就是将Action0(0参数的一个任务)运行在worker指定的线程
// 所以接下来我们看这个 action0做了些什么,应该是看它的call方法(在call方法里它实现了向上级传递消息(订阅))
//TODO 为什么没有 onSubscribe.call(newSubscriber)呢?,其实在它内部实现了的哈,action(parent实现了action0)的call方法
}

static final class SubscribeOnSubscriber<T> extends Subscriber<T> implements Action0 {

final Subscriber<? super T> actual;

final boolean requestOn;

final Worker worker;

Observable<T> source;

Thread t;

SubscribeOnSubscriber(Subscriber<? super T> actual, boolean requestOn, Worker worker, Observable<T> source) {
this.actual = actual; // actual是下级subscriber
this.requestOn = requestOn;
this.worker = worker;
this.source = source;
}

@Override
public void onNext(T t) {
actual.onNext(t);
}

@Override
public void onError(Throwable e) {
try {
actual.onError(e);
} finally {
worker.unsubscribe();
}
}

@Override
public void onCompleted() {
try {
actual.onCompleted();
} finally {
worker.unsubscribe();
}
}

@Override
public void call() {
Observable<T> src = source;
source = null;
t = Thread.currentThread();
src.unsafeSubscribe(this);
// this是这个newSubscriber(代理subscriber)
// 这里就是向上订阅的过程
}

@Override
public void setProducer(final Producer p) {
actual.setProducer(new Producer() {
@Override
public void request(final long n) {
if (t == Thread.currentThread() || !requestOn) {
p.request(n);
} else {
worker.schedule(new Action0() {
@Override
public void call() {
p.request(n);
}
});
}
}
});
}
}
}

回到数据的产生与接收 | 生产者观察者模式

在这样是一个事件发送链中,我们应当先问问自己这几个问题:

  • RxJava是如何准备数据的,有哪些方式,它们的区别与联系又是什么
  • 数据的发送时机又是什么?
  • RxJava的异步指的到底是什么

下面我们来剖析这几个问题

Observable.from(T[]) VS Observable.create(OnSubscribe)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
public static <T> Observable<T> from(T[] array) {
int n = array.length;
if (n == 0) {
return empty();
} else
if (n == 1) {
return just(array[0]);
}

//** 回到了 create 方式
return unsafeCreate(new OnSubscribeFromArray<T>(array));
}

继续看这个带着数据Array的OnSubscribe是如何构造的

public final class OnSubscribeFromArray<T> implements OnSubscribe<T> {
final T[] array;
public OnSubscribeFromArray(T[] array) {
//** 本地化传进来的数据源
this.array = array;
}

@Override
public void call(Subscriber<? super T> child) {
//** 将数据放进构造出来的生产者里面,现在我们知道了,Subscriber(Producer设置完毕)
//** 推测即便是使用create方式构造的Observable,依然会使用 setProducer 方式来将数据 push //进缓存队列(Producer里面的缓存队列),但是目前没有发现是如何实现的,这里是个很大的疑问

//** 第二个疑问是,subscriber理论上是每次执行完onNext再回头请求数据的,这个又是如何通知生产者 Observable的呢?
child.setProducer(new FromArrayProducer<T>(child, array));
}

static final class FromArrayProducer<T> extends AtomicLong implements Producer {
/** */
private static final long serialVersionUID = 3534218984725836979L;

final Subscriber<? super T> child;
final T[] array;

int index;

public FromArrayProducer(Subscriber<? super T> child, T[] array) {
this.child = child;
this.array = array;
}

@Override
public void request(long n) {
if (n < 0) {
throw new IllegalArgumentException("n >= 0 required but it was " + n);
}
if (n == Long.MAX_VALUE) {
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
fastPath();
}
} else
if (n != 0) {
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
slowPath(n);
}
}
}

void fastPath() {
final Subscriber<? super T> child = this.child;

for (T t : array) {
if (child.isUnsubscribed()) {
return;
}

child.onNext(t);
}

if (child.isUnsubscribed()) {
return;
}
child.onCompleted();
}

void slowPath(long r) {
final Subscriber<? super T> child = this.child;
final T[] array = this.array;
final int n = array.length;

long e = 0L;
int i = index;

for (;;) {

while (r != 0L && i != n) {
if (child.isUnsubscribed()) {
return;
}

child.onNext(array[i]);

i++;

if (i == n) {
if (!child.isUnsubscribed()) {
child.onCompleted();
}
return;
}

r--;
e--;
}

r = get() + e;

if (r == 0L) {
index = i;
r = addAndGet(e);
if (r == 0L) {
return;
}
e = 0L;
}
}
}
}
}

这里我们先来看一下 just(array[0]),最终会调用到下面这个方法

1
2
3
4
5
6
7
8
9
10
11
12
static final class JustOnSubscribe<T> implements OnSubscribe<T> {
final T value;

JustOnSubscribe(T value) {
this.value = value;
}

@Override
public void call(Subscriber<? super T> s) {
s.setProducer(createProducer(s, value));
}
}

每一种Observable的创建,调用构造函数将数据源存放在本地,OnSubscribe是联系观察者与被观察者的纽带,每个OnSubscriber的call方法又会有下面的调用

1
2
3
4
5
6
7
8
public final class OnSubscribeFromArray<T> implements OnSubscribe<T> {
....
@Override
public void call(Subscriber<? super T> child) {
child.setProducer(new FromArrayProducer<T>(child, array)); // array是构造函数里传进来的数据源
}
....
}

Producer是什么?我们知道每个Subscriber内部都有一个Producer(具体请查看源码)

1
2
3
4
5
6
7
public abstract class Subscriber<T> implements Observer<T>, Subscription { //observer里面三个函数,onNext,onError,onCompleted
private final SubscriptionList subscriptions;
private final Subscriber<?> subscriber;

/* protected by `this` */
private Producer producer;
}

来看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
public interface Producer{
void request(long n);
// subscriber请求n条数据,我们需要关注的是,当subscriber调用producer.request(n)的时候,具体的Producer是如何处理的
}

看一个具体的 FroomArrayProducer的实现,关注一下 request的内部逻辑
static final class FromArrayProducer<T>
extends AtomicLong
implements Producer {
/** */
private static final long serialVersionUID = 3534218984725836979L;

final Subscriber<? super T> child;
final T[] array;

int index;

public FromArrayProducer(Subscriber<? super T> child, T[] array) {
this.child = child;
this.array = array;
}

@Override
public void request(long n) {
if (n < 0) {
throw new IllegalArgumentException("n >= 0 required but it was " + n); //请求负数条数据是非法的
}
if (n == Long.MAX_VALUE) { // 最大值策略
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
fastPath();
}
} else
if (n != 0) {
if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
slowPath(n);
}
}
}

void fastPath() {
final Subscriber<? super T> child = this.child;

for (T t : array) {
if (child.isUnsubscribed()) {
return;
}

child.onNext(t);
}

if (child.isUnsubscribed()) {
return;
}
child.onCompleted();
}

void slowPath(long r) {
final Subscriber<? super T> child = this.child;
final T[] array = this.array;
final int n = array.length;

long e = 0L;
int i = index;

for (;;) {

while (r != 0L && i != n) {
if (child.isUnsubscribed()) {
return;
}

child.onNext(array[i]);

i++;

if (i == n) {
if (!child.isUnsubscribed()) {
child.onCompleted();
}
return;
}

r--;
e--;
}

r = get() + e;

if (r == 0L) {
index = i;
r = addAndGet(e);
if (r == 0L) {
return;
}
e = 0L;
}
}
}
}
}

它们无一例外最终都是走到了 onSubscribe()函数中,用它来构建最终的Observable
它们都会将数据保存下来。然后根据设定的规则(要搞懂这个),来发送数据,好像这个规则也会保存下来,在发送的时候调用这个规则来处理事件

RxJava的使用小结

参考凯哥的文章
与大头鬼的文章 : https://blog.csdn.net/lzyzsd/article/details/50120801
[Pause]