RxJava 操作符 From Just Interval
为什么会有这个
RxJava框架现在出现已经有些年头了,如果有人问你你会不会用,可能大多数人都会说会。但是我被人问过一个我没有考虑过的问题,你知道Rxjava是怎么实现的吗?我。。。。。
所以就有了这一篇文章。如果你想通过这篇文章学会Rxjava怎么用,这可能不会是一篇很好的文章,这里面有很多干扰你阅读的东西,和一些我现在还不懂的知识点。但是如果你想通过这篇文章找到我,然后对我说你这样理解不对,这将是一篇完美的文章,因为而且你还会得到一个陌生人的崇拜。
操作符 fromArray
String[] array = { "a","b","c"};Observable.fromArray(array) .subscribe(new Observer() { @Override public void onSubscribe(Disposable d) { OutputUtil.printLn("绑定"); } @Override public void onNext(String s) { OutputUtil.printLn(s); } @Override public void onError(Throwable e) { OutputUtil.printLn("错误"); } @Override public void onComplete() { OutputUtil.printLn("完成"); } });复制代码
运行结果
绑定abc完成复制代码
在开始之前我们先定义几个名字
- Observable (可被观察者)
- Emitter (发射器)
- Observer (观察者)
我们在上一篇文章中提到了 如果自己创建 可被观察者 同时通过自己调用 发射器 来进行数据操作的情况,同时RxJava框架为我们准备了多个定义好的操作符。这里我们来讨论一下 fromArray 操作符的内部实现
通过 fromArray 进入到 源码部分
public staticObservable fromArray(T... items) { ObjectHelper.requireNonNull(items, "items is null"); if (items.length == 0) { return empty(); } else if (items.length == 1) { return just(items[0]); } return RxJavaPlugins.onAssembly(new ObservableFromArray (items)); }复制代码
还记得之前 如果是通过 create 方法创建返回的是 RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
,这里还是在创建一个 可被观察者,同时将需要发射的数据传入
public final class ObservableFromArrayextends Observable { final T[] array; public ObservableFromArray(T[] array) { this.array = array; }复制代码
基本逻辑与create方法相通,不同的是如果是通过 create 方法,我们这里传入的是一个 发射器 的实现,因为我们需要自己来操控 发射器 .
回到我们的测试代码,进入下一步操作是 subscribe(new Observer<String>() {....}
操作,这里传入的观察者的内容,进入这个方法时 我们看到了似曾相识的代码
public final void subscribe(Observer observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { // hook 方法 observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); // 这里要开始绑定了 subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }复制代码
可被观察者 和 观察者 在绑定的时候,使用的是同样的方法,关键的方法还是绑定的过程 subscribeActual(observer);
因为我们创建的 可被观察者 是 ObservableFromArray
它继承自 Observable 所以在绑定的时候,执行的代码是 ObservableFromArray
内部的方法
final T[] array;@Override public void subscribeActual(Observer s) { FromArrayDisposabled = new FromArrayDisposable (s, array); // 将实现的 绑定传回,这样就能在 观察者 中控制发射 s.onSubscribe(d); if (d.fusionMode) { return; } // 开始发射 d.run(); }复制代码
这个 可被观察者 在初始化的时候已经传入了,我们添加的 array .
其中创建的static final class FromArrayDisposableextends BasicQueueDisposable { // 传入 观察者 这样就能向 观察者 传递数据 final Observer actual; // 数据集合 final T[] array; int index; boolean fusionMode; volatile boolean disposed; FromArrayDisposable(Observer actual, T[] array) { this.actual = actual; this.array = array; } @Override public int requestFusion(int mode) { if ((mode & SYNC) != 0) { fusionMode = true; return SYNC; } return NONE; } @Nullable @Override public T poll() { int i = index; T[] a = array; if (i != a.length) { index = i + 1; return ObjectHelper.requireNonNull(a[i], "The array element is null"); } return null; } @Override public boolean isEmpty() { return index == array.length; } @Override public void clear() { index = array.length; } @Override public void dispose() { disposed = true; } @Override public boolean isDisposed() { return disposed; } void run() { T[] a = array; int n = a.length; // run 代码 是循环从 array 中提取数据,当结束时调用结束 for (int i = 0; i < n && !isDisposed(); i++) { T value = a[i]; if (value == null) { actual.onError(new NullPointerException("The " + i + "th element is null")); return; } actual.onNext(value); } if (!isDisposed()) { actual.onComplete(); } } }复制代码
其实到这里,基本上已经完成了.但是你可能会发现这里面并没有对 onComplete() 和 onError() 方法进行再处理,也没有很复杂的 disposed 操作. 其实如果你我们如果是这样实现简单逻辑的话,到这里已经是结束了,代码已经运行完成.如果解除绑定,发射器还是会停止发射数据.
但如果你试一下的话,在 onComplete() 之后在调用 onNext() 观察者还是会接收到数据的.操作符 just
我们在使用 fromArray 操作符的时候,会发现如果我们的 array 长度为 1 ,就会直接转到 just
if (items.length == 1) { return just(items[0]);}复制代码
进入到 just 方法
public staticObservable just(T item) { ObjectHelper.requireNonNull(item, "The item is null"); // 创建 just 可被观察者 return RxJavaPlugins.onAssembly(new ObservableJust (item)); }java```javapublic final class ObservableJust extends Observable implements ScalarCallable { // 等待发送的数据 private final T value; public ObservableJust(final T value) { this.value = value; } @Override protected void subscribeActual(Observer s) { ScalarDisposable sd = new ScalarDisposable (s, value); s.onSubscribe(sd); sd.run(); } @Override public T call() { return value; }}复制代码
按照之前的阅读,我们下一步查看的内容就应该是 subscribeActual() 方法,这里创建的是 ScalarDisposable
public static final class ScalarDisposableextends AtomicInteger implements QueueDisposable , Runnable { private static final long serialVersionUID = 3880992722410194083L; final Observer observer; final T value; static final int START = 0; static final int FUSED = 1; static final int ON_NEXT = 2; static final int ON_COMPLETE = 3; public ScalarDisposable(Observer observer, T value) { this.observer = observer; this.value = value; } @Override public boolean offer(T value) { throw new UnsupportedOperationException("Should not be called!"); } @Override public boolean offer(T v1, T v2) { throw new UnsupportedOperationException("Should not be called!"); } @Nullable @Override public T poll() throws Exception { if (get() == FUSED) { lazySet(ON_COMPLETE); return value; } return null; } @Override public boolean isEmpty() { return get() != FUSED; } @Override public void clear() { lazySet(ON_COMPLETE); } @Override public void dispose() { set(ON_COMPLETE); } @Override public boolean isDisposed() { return get() == ON_COMPLETE; } @Override public int requestFusion(int mode) { if ((mode & SYNC) != 0) { lazySet(FUSED); return SYNC; } return NONE; } @Override public void run() { if (get() == START && compareAndSet(START, ON_NEXT)) { observer.onNext(value); if (get() == ON_NEXT) { lazySet(ON_COMPLETE); observer.onComplete(); } } } }复制代码
这里执行的 run() 方法 是想 观察者 发射内容之后
如果简单的去理解,这里的内容还是可以理解的,但是如果我们在分析一下 run()方法内部static final int START = 0;static final int FUSED = 1;static final int ON_NEXT = 2;static final int ON_COMPLETE = 3;public void run() { // 初始状态下 AtomicInteger 类下 get() 拿到的 value 为 0,然后将其切换为 2 // 这里的 compareAndSet() 方法 牵扯到了线程安全,通过原子操作将值改变 if (get() == START && compareAndSet(START, ON_NEXT)) { observer.onNext(value); if (get() == ON_NEXT) { // 这里的操作能够实现非堵塞的写入 lazySet(ON_COMPLETE); observer.onComplete(); } }}复制代码
总结
- fromArray 和 just 两个 操作符内部是相通的,根据传入的参数的数量不同可以进行相互的转化
- 我们之后会提到 RxJava 最厉害的线程之间切换的操作,通过前面的这些,就能够看出 RxJava 为线程的切换做了很多的功过.为了保护线程安全,真是煞费苦心.
##操作符 interval
System.out.println(Thread.currentThread().getId());Observable.interval(1,1, TimeUnit.SECONDS) .take(5) .subscribe(new Observer() { @Override public void onSubscribe(Disposable d) { System.out.println(Thread.currentThread().getId()); OutputUtil.printLn("绑定"); } @Override public void onNext(Long s) { System.out.println(Thread.currentThread().getId()); OutputUtil.printLn(System.currentTimeMillis()+""); OutputUtil.printLn(s+""); } @Override public void onError(Throwable e) { OutputUtil.printLn("错误"); } @Override public void onComplete() { System.out.println(Thread.currentThread().getId()); OutputUtil.printLn("完成"); } });复制代码
这个操作符 实现的是类似于计时器的功能,发射器等待一段相应的时间然后再向观察者发送数据。这里有牵扯到了RxJava的线程相关内容,请谨慎的往下看,我可能会有错
结果:I/System.out: 1I/System.out: 1D/内容: 绑定I/System.out: 5585D/内容: 1523349727345 0I/System.out: 5585D/内容: 1523349728345 1I/System.out: 5585D/内容: 1523349729345 2I/System.out: 5585D/内容: 1523349730345 3I/System.out: 5585D/内容: 1523349731344 4I/System.out: 5585D/内容: 完成 复制代码
System.out 输出的是当前程序运行所在的线程,
内容标识的是输出的时间和输出的数值.从这里我们可以看到,默认条件下可被观察者
的绑定是在我们运行的线程上面,但是等到观察者
接收到数据的时候,线程就已经切换了.我们可以认为在绑定的时候,这里创建了新的线程来运行.而且程序的运行是没有阻塞程序运行线程的.
以interval()
为入口,我们进入到可被观察者的创建过程
public static Observableinterval(long initialDelay, long period, TimeUnit unit) { return interval(initialDelay, period, unit, Schedulers.computation());}public static Observable interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) { ObjectHelper.requireNonNull(unit, "unit is null"); ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableInterval(Math.max(0L, initialDelay), Math.max(0L, period), unit, scheduler));}复制代码
这里我们把 Schedulers 称为 调度器
它的主要任务是实现线程调度,可以等待,可以切换线程,可以指定线程.我们在不传入调度器的情况下,会给一个默认的调度器. 将数据传入之后就产生了一个 可被观察者 的对象,如果是普通条件下我们下一步就要开始订阅了.但是我们现在这个情况如果不做任何处理的话,消息会一直发送,所以我们还是会添加一个 take() 方法,来控制这个输出的上限.不过我们先详细讨论.到了订阅部分之后,执行的是 ObservableInterval 里的 订阅方法
@Override public void subscribeActual(Observer s) { // 创建特殊的观察者 IntervalObserver is = new IntervalObserver(s); s.onSubscribe(is); Scheduler sch = scheduler; if (sch instanceof TrampolineScheduler) { // 如果是 TrampolineScheduler 则会在订阅线程上面执行 观察者 代码 Worker worker = sch.createWorker(); is.setResource(worker); worker.schedulePeriodically(is, initialDelay, period, unit); } else { // 在指定的 线程 上面执行观察者代码 Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit); is.setResource(d); } }复制代码
我们先不讨论调度器为 TrampolineScheduler 的情况 相较于线程调度,这个要简单的多.
下面我们讨论 Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
这句话, sch
表示的是 我们添加的 调度器 ,因为我们前面没有设置 调度器,所以 默认为 Schedulers.computation()
所以 这里调用的 就会是 ComputationScheduler
中的 schedulePeriodicallyDirect() 方法!!! 并不是 Scheduler 里面的 别问我怎么知道的.
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) { PoolWorker w = pool.get().getEventLoop(); return w.schedulePeriodicallyDirect(run, initialDelay, period, unit); }复制代码
然后 poolWorker 内部逻辑极其复杂,之后会在讨论.
进入到 w.schedulePeriodicallyDirect(run, initialDelay, period, unit);
方法内部
public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) { // hook 相关 final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); if (period <= 0L) { // 间隔 为 0 的情况不讨论 } // 这个 decorateRun 还记得吗? 这个是我们 IntervalObserver 啊,他调用了 观察者中的 onNext() // 然后这里来调用它, 社会社会 ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun); try { // 通过 ExecutorService 来控制 run() 执行的次数和间隔时间 Future f = executor.scheduleAtFixedRate(task, initialDelay, period, unit); // 设定未来 task.setFuture(f); return task; } catch (RejectedExecutionException ex) { RxJavaPlugins.onError(ex); return EmptyDisposable.INSTANCE; } }复制代码
public final class ScheduledDirectPeriodicTask extends AbstractDirectTask implements Runnable { private static final long serialVersionUID = 1811839108042568751L; public ScheduledDirectPeriodicTask(Runnable runnable) { super(runnable); } @Override public void run() { // 线程还进行了切换,切换到了当前线程,这个当前线程可不是什么 订阅线程 而是 调度器的线程 runner = Thread.currentThread(); try { // 看! 是这里 runnable.run(); runner = null; } catch (Throwable ex) { runner = null; lazySet(FINISHED); RxJavaPlugins.onError(ex); } }}复制代码
总结
这一段内容比想象中还要复杂,相信你看完之后肯定是一脸懵逼。也没有办法,我是一边看一边写,有很多东西没有一个全局的观念,同时水平有限,不能够体会到作者其中的用意。如果你有什么问题,或者我写的有什么错误,请及时的告诉我。
- 在 interval 中 会给一个默认的 Scheduler 调度器,通过调度器来表示这个 观察者的内容应该在哪个线程上面被处理.
- 使用 Work 来标识任务,使用 ExecutorService 来进行任务的计时 和 重复操作
- 代码很复杂,功能很强大.会用就好,以后有机会了再细致研究