RxJava
2021/4/18 12:28:23
本文主要是介绍RxJava,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
引言
本文将描述RxJava的设计原理,为了简化,本文并非完全参照RxJava的源码,也不讨论使用RxJava的作用,而从实现角度分析RxJava。本文不讨论RxJava的设计来源,具体请参考“函数式编程”的无副作用。
原理
RxJava使用简单示例
我们来看一个RxJava的一个简单使用示例:
Observable.just(123) .map(new Function<Integer, String>() { @Override public String apply(Integer i) { return "" + i; }}) .doOnNext(new Consumer<String>() { @Override public void accept(String s) { System.out.println("log:" + s + " Thread:" + Thread.currentThread().getName()); }}) .filter(new Filter<String>() { @Override public boolean filter(String s) { return s != null && s.length() > 0; }}) .subscribeOn(Schedules.ASYNC) .observeOn(Schedules.MAIN) .subscribe(new Consumer<String>() { @Override public void accept(String s) { System.out.println("result:" + s + " Thread:" + Thread.currentThread().getName()); }});
运行得到结果:
I/System.out: log:123 Thread:Thread-2
I/System.out: result:123 Thread:main
分析
上述RxJava并非使用官方源库,而是本文自定义的RxJava,也能达到官网RxJava一样的效果。
RxJava并非在调用map、doNext、filter、subscribeOn、ObserveOn等操作符时,立即调用内部的方法,基于函数式编程无副作用理论,我们对其进行包一层Observable,在subscribe的使用,也并非立即去消费对应Observer的内容,也调用上一层Observable对应的Observer。如图所示:
从上图可知,RxJava调用操作符时,并没有直接调用到其内部的方法。它每调用一次操作符就new了一个与之对应的Observable对象,调到最后开始subscribe的时候,就new了一个与之对应的Observer传参,最后调到最开始ObservableJust的时候,就开始进行onNext、onError、onComplete等操作,注意,现在在ObservableJust中最开始调用的是最外层的ObserveObserver的onNext,之后再层层往内调用,最后调用到我们传递的Observer。每次调用时,我们可以对其进行线程切换,如在ObservableSubscribeOn层subscribeOn(ASYN)时,就对后续的操作都放到了子线程中执行,再在ObservableObsereOn层中的onNext时,又可以将线程切换到Main线程。
源码
简单起见,我们简化一下上述:
Observable.just(123) .map(new Function<Integer, String>() { @Override public String apply(Integer i) { return "" + i; }}) .subscribeOn(Schedules.ASYNC) .observeOn(Schedules.MAIN) .subscribe(new Observer<String>() { @Override public void onNext(String s) { System.out.println(s); } @Override public void onComplete() { System.out.println("onComplete"); } @Override public void one rror(Throwable r) { System.out.println("onError"); } @Override public void onSubscribe() { System.out.println("onSubscribe"); } });
针对上述案例,我们抽象出两个接口,1、Observer,2、ObservableSource。通过Observable开始分发事件。
public interface ObservableSource<T> { void subscribe(Observer<T> observer); } public interface Observer<T> { void onNext(T t); void onComplete(); void one rror(Throwable r); void onSubscribe(); } public abstract class Observable<T> implements ObservableSource<T> { public static <T> Observable<T> just(T item) { return new ObservableJust<>(item); } public <R> Observable<R> map(Function<T, R> function) { return (Observable<R>) new ObservableMap<>(this, function); } public Observable<T> subscribeOn(Schedules schedules) { return new ObservableSubscribeOn<>(this, schedules); } public Observable<T> observeOn(Schedules schedules) { return new ObservableObserveOn<>(this, schedules); } public void subscribe(Consumer<T> consumer) { this.subscribe(new LambdaObserver<>(consumer, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION, Functions.EMPTY_ACTION)); } }
从ObservableJust出发,本源码简化了RxJava官方的源码,如下:
public class ObservableJust<T> extends Observable<T> { private final T value; public ObservableJust(T value) { this.value = value; } @Override public void subscribe(Observer<T> observer) { // 最后调用这里,才开始onNext等 observer.onSubscribe(); try { observer.onNext(value); observer.onComplete(); } catch (Throwable r) { observer.onError(r); } } }
之后就是map
// 主要用来保存srouce Observable,如ObservableJust.map之后,就new了一个ObservableMap,在该ObservableMap中保存了ObservableJust的引用,这就是装饰器模式,可以参考JVM的IOStream源码理解。 // 这样就能在sbscribe的时候,调用source.subscribe了,并进行功能增强,如线程切换等。 public abstract class AbstractObservableWithUpStream<T, R> extends Observable<T> { 是 protected final ObservableSource<T> source; protected AbstractObservableWithUpStream(ObservableSource<T> source) { this.source = source; } } public class ObservableMap<T, R> extends AbstractObservableWithUpStream<T, R>{ private final Function<T, R> function; public ObservableMap(ObservableSource<T> source, Function<T, R> function) { super(source); this.function = function; } @Override public void subscribe(Observer<T> observer) { source.subscribe(new MapObserver<T, R>((Observer<R>) observer, function)); } private static class MapObserver<T, R> extends BasicObserver<T, R> { final Function<T,R> mapper; public MapObserver(Observer<R> actual, Function<T,R> mapper) { super(actual); this.mapper = mapper; } @Override public void onNext(T t) { R r = mapper.apply(t); // 这里调用了Function中的apply方法,以进行业务能力扩展 actual.onNext(r); } } }
之后subscribeOn
public class ObservableSubscribeOn<T> extends AbstractObservableWithUpStream<T, T> { private final Schedules schedules; protected ObservableSubscribeOn(ObservableSource<T> source, Schedules schedules) { super(source); this.schedules = schedules; } @Override public void subscribe(Observer<T> observer) { // 在subscribe的时候进行线程切换 if (schedules == Schedules.MAIN) { source.subscribe(new SubscribeOnObserver<>(observer)); } else if(schedules == Schedules.ASYNC) { Schedules.executorService.submit(() -> source.subscribe(new SubscribeOnObserver<>(observer))); } } private static class SubscribeOnObserver<T> extends BasicObserver<T, T> { public SubscribeOnObserver(Observer<T> actual) { super(actual); } @Override public void onNext(T t) { actual.onNext(t); } } }
onServeOn
public class ObservableObserveOn<T> extends AbstractObservableWithUpStream<T, T> { private final Schedules schedules; protected ObservableObserveOn(ObservableSource<T> source, Schedules schedules) { super(source); this.schedules = schedules; } @Override public void subscribe(Observer<T> observer) { source.subscribe(new ObserveOnObserver<T>(observer, schedules)); } private static class ObserveOnObserver<T> extends BasicObserver<T, T> { private final Schedules schedules; public ObserveOnObserver(Observer<T> actual, Schedules schedules) { super(actual); this.schedules = schedules; } @Override public void onNext(T t) { // 在onNext的时候切换线程 if (schedules == Schedules.MAIN) { new Handler(Looper.getMainLooper()).post(() -> { actual.onNext(t); }); } else if (schedules == Schedules.ASYNC) { Schedules.executorService.submit(() -> actual.onNext(t)); } } } }
最后回调示例代码
总结
本文通过图解,源码,以及调用示例进行RxJava分析,同时,我们也可以如何自定义操作符,继承Observable,之后构建对应的操作符的Observable类和Observer类。
这篇关于RxJava的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-06-26结对编程到底难不难?答案在这里
- 2024-06-19《2023版Java工程师》课程升级公告
- 2024-06-15matplotlib作图不显示3D图,怎么办?
- 2024-06-1503-Loki 日志监控
- 2024-06-1504-让LLM理解知识 -Prompt
- 2024-06-05做软件测试需要懂代码吗?
- 2024-06-0514-ShardingSphere的分布式主键实现
- 2024-06-03为什么以及如何要进行架构设计权衡?
- 2024-05-31全网首发第二弹!软考2024年5月《软件设计师》真题+解析+答案!(11-20题)
- 2024-05-31全网首发!软考2024年5月《软件设计师》真题+解析+答案!(21-30题)