【28】RxJava模式与原理
2021/4/12 12:30:06
本文主要是介绍【28】RxJava模式与原理,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
(1)一个人只要自己不放弃自己,整个世界也不会放弃你.
(2)天生我才必有大用
(3)不能忍受学习之苦就一定要忍受生活之苦,这是多么痛苦而深刻的领悟.
(4)做难事必有所得
(5)精神乃真正的刀锋
(6)战胜对手有两次,第一次在内心中.
(7)好好活就是做有意义的事情.
(8)亡羊补牢,为时未晚
(9)科技领域,没有捷径与投机取巧。
(10)有实力,一年365天都是应聘的旺季,没实力,天天都是应聘的淡季。
(11)基础不牢,地动天摇
(12)编写实属不易,若喜欢或者对你有帮助记得点赞+关注或者收藏哦~
RxJava模式与原理
文章目录
- RxJava模式与原理
- 1.标准观察者与RxJava观察者
- 1.1标准的观察者设计模式
- 1.1.1生活中案例
- 1.1.2生活中案例代码实现
- 1.2扩展的RxJava观察者设计模式
- 1.2.1RxJava Hook点
- 1.2.2RxJava Hook机制
- 1.2.3RxJava观察者模式
- 1.2.3.1 Observer源码看看
- 1.2.3.2 Observable创建过程源码分析
- 1.2.3.3 subscribe订阅过程源码分析
- 1.2.3.3Observable创建过程时序图
- 1.2.3.4Observable与Observer订阅过程时序图
- 1.2.4.标准观察者设计模式与RxJava观察者设计模式对比
- 2.map变换操作符原理
- 3.装饰模型
- 4.背压
1.标准观察者与RxJava观察者
1.1标准的观察者设计模式
1.1.1生活中案例
(1)微信公众号与关注公众号的用户
(2)是一个被观察者有多个观察者的情况
1.1.2生活中案例代码实现
(1)抽象被观察者角色
public interface Observable { //关注 添加观察者 void addObServer(Observer observer); //取消关注 删除观察者 void removeObserver(Observer observer); //被观察者发出了改变通知观察者 void notifyObservers(); //被观察者发布一条消息的功能 void pushMessage(String message); }
(2)抽象观察者角色
public interface Observer { //被观察者改变了,收到改变通知,观察者做出相应响应 void update(String message); }
(3)具体被观察者角色
public class WechatServerObservable implements Observable{ /** * 容器管理观察者 */ private List<Observer> observerList = new ArrayList<>(); private String message; @Override public void addObServer(Observer observer) { observerList.add(observer); } @Override public void removeObserver(Observer observer) { if(null != observerList){ observerList.remove(observer); } } @Override public void notifyObservers() { for(Observer observer : observerList){ observer.update(message); } } @Override public void pushMessage(String message) { this.message = message; notifyObservers(); } }
(4)具体观察者角色
public class Person implements Observer{ private String name; private String message; public Person(String name) { this.name = name; } @Override public void update(String message) { this.message = message; readMessage(); } private void readMessage(){ System.out.println(String.format("%s收到了推送消息:%s",name,message)); } }
(5)客户端
public class ObserverModelActivity extends AppCompatActivity { @InjectView(R.id.btn_observer_test) private Button btn_observer_test; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_observer_model); InjectUtils.injectViewAndEvent(this); } @OnClick(R.id.btn_observer_test) public void onViewClick(View view){ testObserverModel(); } public void testObserverModel(){ String msg = "重大消息:国家改革委发布智慧农业转型号召"; //1.被观察者 Observable observable = new WechatServerObservable(); //2.观察者 Observer observer1 = new Person("张三"); Observer observer2 = new Person("张化"); Observer observer3 = new Person("张丽"); Observer observer4 = new Person("张雪"); observable.addObServer(observer1); observable.addObServer(observer2); observable.addObServer(observer3); observable.addObServer(observer4); observable.pushMessage(msg); } }
1.2扩展的RxJava观察者设计模式
1.2.1RxJava Hook点
(1)什么时候用到Hook?
- 整个项目都在使用RxJava,想对RxJava做监听,此时就会使用到Hook技术
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); }
- RxJavaPlugins.onAssembly为全局RxJava的Hook,create与map还有其他操作符都有这样一个方法。
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } return source; }
-
可以让程序员插入自己的Hook,即先执行程序员自己写的Function,然后再执行RxJava自身的Hook.
-
如何让onObservableAssembly不为空,满足程序员Hook先执行的条件呢?可以查看此值唯一的赋值处.
public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) { if (lockdown) { throw new IllegalStateException("Plugins can't be changed anymore"); } RxJavaPlugins.onObservableAssembly = onObservableAssembly; }
- 即直接调用setOnObservableAssembly函数设置一个值就可以了。
@Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_source1); RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() { @Override public Observable apply(Observable observable) throws Exception { Log.d(Flag.TAG, "apply: 整个项目 全局 监听 到底有多少地方使用 RxJava:" + observable); //不破坏人家的功能 return observable; } }); }
1.2.2RxJava Hook机制
(1)Hook即钩子
(2)程序在执行过程中,想个办法,先让程序执行自己写的一部分功能,然后再执行正常的程序。
1.2.3RxJava观察者模式
1.2.3.1 Observer源码看看
public interface Observer<T> { /** * Provides the Observer with the means of cancelling (disposing) the * connection (channel) with the Observable in both * synchronous (from within {@link #onNext(Object)}) and asynchronous manner. * @param d the Disposable instance whose {@link Disposable#dispose()} can * be called anytime to cancel the connection * @since 2.0 */ void onSubscribe(@NonNull Disposable d); /** * Provides the Observer with a new item to observe. * <p> * The {@link Observable} may call this method 0 or more times. * <p> * The {@code Observable} will not call this method again after it calls either {@link #onComplete} or * {@link #onError}. * * @param t * the item emitted by the Observable */ void onNext(@NonNull T t); /** * Notifies the Observer that the {@link Observable} has experienced an error condition. * <p> * If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or * {@link #onComplete}. * * @param e * the exception encountered by the Observable */ void one rror(@NonNull Throwable e); /** * Notifies the Observer that the {@link Observable} has finished sending push-based notifications. * <p> * The {@link Observable} will not call this method if it calls {@link #onError}. */ void onComplete(); }
(1)抽象观察者Observer为一个泛型,即在构建具体的观察者时传什么类型就是什么类型
(2)onSubscribe为订阅函数,即在subscribe执行的时候就立即得到执行
(3)onNext拿到上一个事件(卡片或功能)流下来的数据.
(4)onError拿到上一个事件(卡片或功能)流下来的错误数据.
(5)onComplete事件结束
1.2.3.2 Observable创建过程源码分析
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); }
(1)ObservableOnSubscribe:自定义source
- io.reactivex.internal.operators.observable.ObservableCreate
public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; }
(2)create创建的过程即将自定义的source赋给了io.reactivex.internal.operators.observable.ObservableCreate#source成员变量。
1.2.3.3 subscribe订阅过程源码分析
public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call one rror because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
(1)实际上是ObservableCreate.subscribe
(2)将自定义观察者传给了它。
(3)进入到被观察者的io.reactivex.Observable#subscribeActual
protected abstract void subscribeActual(Observer<? super T> observer);
(4)这个方法执行之后,会直接回调到
io.reactivex.internal.operators.observable.ObservableCreate#subscribeActual
@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
(5)将自定义观察者丢进来,并创建发射器,并且传入自定义观察者。
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
(6)然后执行onSubscribe方法,这也就是为什么执行了subscribe方法之后,这个方法马上会得到执行的原因。
observer.onSubscribe(parent);
(7)自定义source将发射器传入进去
source.subscribe(parent);
(8)调发射器的onNext
new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { //2.2发射器.onNext e.onNext("A"); } }
(9)再由发射器调用自定义观察者onNext
(10)整体调用图,即为一个U型结构。
1.2.3.3Observable创建过程时序图
1.2.3.4Observable与Observer订阅过程时序图
1.2.4.标准观察者设计模式与RxJava观察者设计模式对比
(1)在标准的观察者设计模式中,是一个“被观察者”,多个“观察者”,并且需要“被观察者”发出改变通知后,所有的“观察者”才能观察者。
(2)在RxJava观察者设计模式中,是多个“被观察者”,一个“观察者”,并且需要事件起点与终点在“订阅”一次之后,才发出改变通知,终点(观察者)才能观察到。
- 为什么是多个“被观察者”?
因为可以有多个操作符如flatMap,map操作符,这就意味着有多个观察者
严格意义上来讲RxJava应用的是发布订阅模式。
(3)RxJava观察者设计模式,没有容器的概念。
2.map变换操作符原理
(1)对数据进行变换
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); }
(2)订阅触发之后,调用终点,它是通过ObservableMap.subscribe
io.reactivex.internal.operators.observable.ObservableMap#subscribeActual
- source是上层事件
@Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function)); }
(3)添加一个MapObserver(终点)包裹
(4)调用了subscribe后
@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
(5)io.reactivex.internal.operators.observable.ObservableCreate.CreateEmitter#onNext
(6)io.reactivex.internal.operators.observable.ObservableMap.MapObserver#onNext
(7)io.reactivex.functions.Function#apply
(8)封装包裹拆包裹的过程
这即为卡片式思维
(13)map流程分析
3.装饰模型
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { e.onNext("Derry"); e.onComplete(); } }) // ↓ObservableCreate.map 包裹模型中 最里面 .map(new Function<String, Integer>() { @Override public Integer apply(String s) throws Exception { return 45454; } }) // ObservableMap.map .map(new Function<Integer, Boolean>() { @Override public Boolean apply(Integer integer) throws Exception { return true; } }) // ↓包裹模型中最外面 往上走↑的时候在一层 层的剥开 // ObservableMap.subscribe .subscribe(new Observer<Boolean>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Boolean bool) { Log.d(Flag.TAG, "onNext bool:" + bool); } @Override public void one rror(Throwable e) { } @Override public void onComplete() { } });
4.背压
(1)起点到终点发射10000个事件
(2)终点处理不过来,这时候采取背压策略。
(3)即不停的生产产品,生产的速度远远的超过消费的速度。内存会不停的消耗增长。
(4)使用Flowable解决背压问题。
(5)map操作符只能发射一次事件,flapMap可以发射多次事件。
这篇关于【28】RxJava模式与原理的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-09-28AI给的和自己写的Python代码,都无法改变输入框的内容,替换也不行
- 2024-09-27Sentinel配置限流资料:新手入门教程
- 2024-09-27Sentinel配置限流资料详解
- 2024-09-27Sentinel限流资料:新手入门教程
- 2024-09-26Sentinel限流资料入门详解
- 2024-09-26Springboot框架资料:初学者入门教程
- 2024-09-26Springboot框架资料详解:新手入门教程
- 2024-09-26Springboot企业级开发资料:新手入门指南
- 2024-09-26SpringBoot企业级开发资料新手指南
- 2024-09-26Springboot微服务资料入门教程