Spring 响应式编程 随记 -- C2 Spring 响应式编程基本概念
2021/8/13 8:05:56
本文主要是介绍Spring 响应式编程 随记 -- C2 Spring 响应式编程基本概念,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
【 好书分享:《Spring 响应式编程》-- 京东】
C2 Spring 响应式编程基本概念
RxJava 库,Java 第一个响应式库
2.1 早期方案
- 方法1: 可以用 回调 (callback) 来实现跨组件通信。
- 方法2: 用
Future
(java.util.concurrent.Future) - 方法3: 更好的
CompletionStage
和CompletableFuture
。 - 方法4: Spring 4 里 的
ListenableFuture
和AsyncRestTemplate
观察者模式:
主题(Subject)包含一个依赖者(观察者)列表,主题通过调用自身的一个方法通知观察者有状态变化
观察者模式在运行时注册对象之间的一对多依赖,单向通信,解耦实现,高效分配事件。
public interface Subject<T> { void registerObserver(Observer<T> observer); void unregisterObserver(Observer<T> observer); void notifyObservers(T event); } public interface Observer<T> { void observe(T event); }
依赖注入 (Dependency Injection) 容器可以负责查找所有 Subject 实例和注册程序。(@EventListener)
接下来我们做一点简单实现:
public class ConcreteObserverA implements Observer<String> { @Override public void observe(String event){ System.out.println("ConcreteObserverA:" + event); } } public class ConcreteObserverB implements Observer<String> { @Override public void observe(String event){ System.out.println("ConcreteObserverB:" + event); } } //String event public class ConcreteSubject implements Subject<String> { private final Set<Observer<String>> observers = new CopyOnWriteArraySet<>(); public void registerObserver(Observer<String> observer){ observers.add(observer); } public void unregisterObserver(Observer<String> observer){ observers.remove(observer); } public void notifyObservers(String event){ observers.forEach( ob -> ob.observe(event)); } }
CopyOnWriteArraySet
是一个线程安全的实现。
通信务必要注意线程安全问题。
针对 Funtional interfaces,比如 Observer
只有一个抽象方法接口,所以可以直接用 lambda 实现。
@Test public void test(){ Subject<String> subOne = new ConcreteSubject(); Observer<String> obsA1 = Mockito.spy(new ConcreteObserverA()); Observer<String> obsB1 = Mockito.spy(new ConcreteObserverB()); subOne.registerObserver(obsA1); subOne.registerObserver(obsB1); //lambda way Subject<String> subTwo = new ConcreteSubject(); subTwo.registerObserver(event -> System.out.println("ConcreteObserverA:" + event)); subTwo.registerObserver(event -> System.out.println("ConcreteObserverB:" + event)); // same as override observe(String event) }
并行传播消息:
private final ExecutorService executorService = Executors.newCachedThreadPool(); public void notifyObservers(String event){ observers.forEach(ob -> executorService.submit( () -> ob.observe(event) )); }
多线程要小心。为了防止资源滥用,我们可以限制线程池大小,并且活跃度(liveness)属性设置为 violate 。
基于上述观察者模式的思想,可以变体的变成另一种发布订阅模式。为了实现事件分发,Spring 提供了 @EventListener 注解。
- 观察者模式: 主题 <==> 观察者 (–>触发 订阅<–)
- 发布订阅模式:
发布者 -发布-> 事件通道 <==> 订阅者(–>触发 订阅<–)
事件通道 event channel,又称为消息代理或事件总线。
实现发布订阅模式的库:
- MBassador
- Guava 提供的 EventBus 库
Demo练习:显示房间温度
领域模型
@Data @AllArgsConstructor @NoArgsConstructor public class Temperature{ private final double value; }
模拟传感器
@Component public class TemperatureSensor{ private final ApplicationEventPublisher publisher; private final Random random = new Random(); private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); public TemperatureSensor(ApplicationEventPublisher publisher){ this.publisher = publisher; } @PostConstruct public void startProcessing(){ executor.schedule(this::probe, 1, SECONDS); } private void probe(){ var temp = random.nextGaussian() * 10 + 8; publisher.publishEvent(new TemperatureSensor(temp)); executor.schedule(this::probe, random.nextInt(3000), MILLISECONDS); } }
@Component 将其注册为一个 bean ,在 bean 准备就绪时,@PostConstruct注解的非静态函数会被 Spring 框架调用并且触发,开始执行随机温度序列的发布。事件的生成发生在单独的 ScheduledExecutorService executor 里面。
Spring Web MVC 中,不仅可以返回 @Controller 定义的泛型,还可以返回:
Callable<T>
非容器线程内阻塞调用DeferredResult<T>
可以调用setResult(T res)
在非容器线程内生成异步响应。
4.2版本之后,可以返回 ResponseBodyEmitter
用于发送多个对象,每个对象和消息转换器解耦。SseEmitter
进一步拓展,可以一个传入请求发送多个传出消息。
StreamimgResponseBody 异步发送原始数据,更方便流式传输大数据而不阻塞 Servlet 线程。
下面继续构建组件,编写 Controller 用于 HTTP 通信以实现 demo。
// todo:
这篇关于Spring 响应式编程 随记 -- C2 Spring 响应式编程基本概念的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-10-06小米11i印度快充版ROM合集:极致体验,超越期待
- 2024-10-06【ROM下载】小米11i 5G 印度版系统, 疾速跃迁,定义新速度
- 2024-10-06【ROM下载】小米 11 青春活力版,青春无极限,活力全开
- 2024-10-05小米13T Pro系统合集:性能与摄影的极致融合,值得你升级的系统ROM
- 2024-10-01基于Python+Vue开发的医院门诊预约挂号系统
- 2024-10-01基于Python+Vue开发的旅游景区管理系统
- 2024-10-01RestfulAPI入门指南:打造简单易懂的API接口
- 2024-10-01初学者指南:了解和使用Server Action
- 2024-10-01Server Component入门指南:搭建与配置详解
- 2024-10-01React 中使用 useRequest 实现数据请求