stream(2)自定义接口生产者、消费者

2021/7/28 13:05:50

本文主要是介绍stream(2)自定义接口生产者、消费者,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

生产者:

新定义一个接口MySource

package com.itmuch.content.rocketmq;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface MySource {
    String MY_OUTPUT = "my-output";
    @Output(MY_OUTPUT)
    MessageChannel output();
}

在启动类上添加到EnableBinding上

@EnableBinding({Source.class, MySource.class})

在配置文件中

stream:
  rocketmq:
    binder:
      name-server: 127.0.0.1:9876 #找到borker
  bindings:
    output:
      destination: stream-test-topic #用来指定topic
    my-output:
      destination: stream-my-topic #用来指定topic

注意:配置文件中的“my-output”是接口MySource中的MY_OUTPUT的引用,否则注册不上消息队列;

功能代码

@Autowired
private MySource mySource;

@GetMapping("/test-stream-2")
public String testStream2(){
    this.mySource.output().send(
            MessageBuilder.withPayload("消息体").build()
    );
    return "success";
}

消费者:

新建接口MySink

package com.itmuch.usercenter.rocketmq;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface MySink {
    String MY_INPUT = "my-input";

    @Input(MY_INPUT)
    SubscribableChannel input();
}

在启动类上添加到EnableBinding上

@EnableBinding({Sink.class, MySink.class})

在配置文件中

stream:
  rocketmq:
    binder:
      name-server: 127.0.0.1:9876 #找到borker
  bindings:
    input:
      destination: stream-test-topic #用来指定topic
      group: test-group #rocketMQ:虽然这个group可以随便写但是要设置,不然无法启动  其他MQ:可留空
   my-input:
     destination: stream-my-topic #用来指定topic
     group: my-group #rocketMQ:虽然这个group可以随便写但是要设置,不然无法启动  其他MQ:可留空

注意:配置文件中的“my-input”是接口MySink中的MY_INPUT的引用,否则获取不到消息队列上的信息;

package com.itmuch.usercenter.rocketmq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class MyTestStreamConsumer {
    @StreamListener(MySink.MY_INPUT)
    public void recevice(String mess){
      log.info("自定义接口消费:通过stream收到消息{}",mess);
    }
}


当成功访问http://localhost:8010/test-stream-2 时,消费者控制台就会打印日志(意味着消费者已经从消息队列中获取信息并消费了)

http://img4.sycdn.imooc.com/60ff81e10001704b18730460.jpg



消息过滤:https://www.imooc.com/article/290424

stream异常处理手记:https://www.imooc.com/article/290435

eg:

在消费者重定义一个监听器,监听发生的所有异常

package com.itmuch.usercenter.rocketmq;

import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class MyTestStreamConsumer {
    @StreamListener(MySink.MY_INPUT)
    public void recevice(String mess){
      log.info("自定义接口消费:通过stream收到消息{}",mess);
        throw new IllegalArgumentException("抛异常");
    }

    /**
     * 全局异常处理
     * @param message
     */
    @StreamListener("errorChannel")
    public void error(Message<?> message) {
        ErrorMessage errorMessage = (ErrorMessage) message;
//        System.out.println("Handling ERROR: " + errorMessage);
        log.warn("反生异常:{}",errorMessage);
    }


}







这篇关于stream(2)自定义接口生产者、消费者的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程