Spring 响应式编程 随记 -- C2 Spring 响应式编程基本概念

2021/8/13 8:05:56

本文主要是介绍Spring 响应式编程 随记 -- C2 Spring 响应式编程基本概念,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

【 好书分享:《Spring 响应式编程》-- 京东】

C2 Spring 响应式编程基本概念

RxJava 库,Java 第一个响应式库

2.1 早期方案

  • 方法1: 可以用 回调 (callback) 来实现跨组件通信。
  • 方法2: 用 Future (java.util.concurrent.Future)
  • 方法3: 更好的 CompletionStageCompletableFuture
  • 方法4: Spring 4 里 的 ListenableFutureAsyncRestTemplate

观察者模式:
主题(Subject)包含一个依赖者(观察者)列表,主题通过调用自身的一个方法通知观察者有状态变化

观察者模式在运行时注册对象之间的一对多依赖,单向通信,解耦实现,高效分配事件。

public interface Subject<T> {
    void registerObserver(Observer<T> observer); 
    void unregisterObserver(Observer<T> observer); 
	  void notifyObservers(T event); 
}

public interface Observer<T> {
    void observe(T event);
}

依赖注入 (Dependency Injection) 容器可以负责查找所有 Subject 实例和注册程序。(@EventListener)

接下来我们做一点简单实现:

public class ConcreteObserverA implements Observer<String> {
    @Override
    public void observe(String event){
        System.out.println("ConcreteObserverA:" + event);
    }
}

public class ConcreteObserverB implements Observer<String> {
    @Override
    public void observe(String event){
        System.out.println("ConcreteObserverB:" + event);
    }
}

//String event
public class ConcreteSubject implements Subject<String> {
    private final Set<Observer<String>> observers = new CopyOnWriteArraySet<>();

    public void registerObserver(Observer<String> observer){
        observers.add(observer);
    }
    
    public void unregisterObserver(Observer<String> observer){
        observers.remove(observer);
    }

	  public void notifyObservers(String event){
        observers.forEach( ob -> ob.observe(event));
    }
}

CopyOnWriteArraySet 是一个线程安全的实现。

通信务必要注意线程安全问题。

针对 Funtional interfaces,比如 Observer 只有一个抽象方法接口,所以可以直接用 lambda 实现。

@Test
public void test(){
    Subject<String> subOne = new ConcreteSubject();
    Observer<String> obsA1 = Mockito.spy(new ConcreteObserverA());
    Observer<String> obsB1 = Mockito.spy(new ConcreteObserverB());
    subOne.registerObserver(obsA1);
    subOne.registerObserver(obsB1);

    //lambda way
    Subject<String> subTwo = new ConcreteSubject();
    subTwo.registerObserver(event -> System.out.println("ConcreteObserverA:" + event));
    subTwo.registerObserver(event -> System.out.println("ConcreteObserverB:" + event));
    // same as override observe(String event)
}

并行传播消息:

private final ExecutorService executorService = Executors.newCachedThreadPool();

public void notifyObservers(String event){
    observers.forEach(ob -> executorService.submit(
        () -> ob.observe(event)
    ));
}

多线程要小心。为了防止资源滥用,我们可以限制线程池大小,并且活跃度(liveness)属性设置为 violate 。

基于上述观察者模式的思想,可以变体的变成另一种发布订阅模式。为了实现事件分发,Spring 提供了 @EventListener 注解。

  • 观察者模式: 主题 <==> 观察者 (–>触发 订阅<–)
  • 发布订阅模式:
    发布者 -发布-> 事件通道 <==> 订阅者(–>触发 订阅<–)

事件通道 event channel,又称为消息代理或事件总线。

实现发布订阅模式的库:

  • MBassador
  • Guava 提供的 EventBus 库

Demo练习:显示房间温度

领域模型

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Temperature{
    private final double value;
}

模拟传感器

@Component
public class TemperatureSensor{
    private final ApplicationEventPublisher publisher;
    private final Random random = new Random();
    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

    public TemperatureSensor(ApplicationEventPublisher publisher){
        this.publisher = publisher;
    }

    @PostConstruct
    public void startProcessing(){
        executor.schedule(this::probe, 1, SECONDS);
    }

    private void probe(){
        var temp = random.nextGaussian() * 10 + 8;
        publisher.publishEvent(new TemperatureSensor(temp));
        executor.schedule(this::probe, random.nextInt(3000), MILLISECONDS);
    }
}

@Component 将其注册为一个 bean ,在 bean 准备就绪时,@PostConstruct注解的非静态函数会被 Spring 框架调用并且触发,开始执行随机温度序列的发布。事件的生成发生在单独的 ScheduledExecutorService executor 里面。

Spring Web MVC 中,不仅可以返回 @Controller 定义的泛型,还可以返回:

  • Callable<T> 非容器线程内阻塞调用
  • DeferredResult<T> 可以调用 setResult(T res) 在非容器线程内生成异步响应。

4.2版本之后,可以返回 ResponseBodyEmitter 用于发送多个对象,每个对象和消息转换器解耦。SseEmitter进一步拓展,可以一个传入请求发送多个传出消息。

StreamimgResponseBody 异步发送原始数据,更方便流式传输大数据而不阻塞 Servlet 线程。

下面继续构建组件,编写 Controller 用于 HTTP 通信以实现 demo。

// todo:


这篇关于Spring 响应式编程 随记 -- C2 Spring 响应式编程基本概念的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程