RabbitMQ学习
2021/9/22 23:17:26
本文主要是介绍RabbitMQ学习,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
RabbitMQ学习
使用场景
-
消息队列解决什么问题?
- 异步处理
- 应用解耦
- 流量削锋
- 日志处理
安装与配置
用户及vhost配置
添加用户
virtual host管理
开发指南
Simple简单队列
模型
P:消息生产者
红色:阶列
C:消息消费者
不足
耦合性高,生产者—消费者一一对应。队列名变更都得变理
工作队列
模型
为什么会出现工作队列
Simple队列是一一对应的,而且我们实际开发,生产者发送消息是不费力的,而消费者一般是跟业务相结合的。,消息者接收到消息之后就需要处理。需要花费时间。队列就需要更多的消费者。
现象:
消费者1和2处理的消息是一样的
消费者1:奇数
消费者2:偶数
其实是一个轮询分发(roun-robin)
公平分发(Fail dispatch)
使用公平发分要关闭autoACK,改成手动。
公共的消费类
public class BHFailCustomConsumer extends DefaultConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(BHFailCustomConsumer.class); private String consumerName = "defaultConsumer"; private long delayTime = 100L; public BHFailCustomConsumer(Channel channel,long delayTime, String ...name) { super(channel); try { //接收消息,在没有应答前只接收1条消息 channel.basicQos(1); } catch (IOException e) { e.printStackTrace(); } this.delayTime = delayTime; if(name!=null) { this.consumerName = name[0]; }else{ this.consumerName = "defaultConsumer "+ UUID.randomUUID().toString(); } } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"utf-8"); LOGGER.debug(this.consumerName +"_"+msg); try { Thread.sleep(delayTime); } catch (InterruptedException e) { e.printStackTrace(); } //手动应答 getChannel().basicAck(envelope.getDeliveryTag(),false); } }
C1,C2,使用不是的delayTime延迟
Connection connection = messageBrokerHelper.getConnection(); Channel channel = messageBrokerHelper.getChannel(connection,QUEUE_NAME); Consumer consumer = new BHFailCustomConsumer(channel,delayTime,name); try { LOGGER.info("Consumer {} waiting consume,delay time {}",name,delayTime); //关掉自动应答,由消费者手动处理应答 channel.basicConsume(QUEUE_NAME,false,consumer); } catch (IOException e) { LOGGER.debug("consumer listener failure",e); }
W 生产者
public void sender(int senderCount){ Connection connection = messageBrokerHelper.getConnection(); Channel channel = messageBrokerHelper.getChannel(connection,QUEUE_NAME); try { int senderTime = senderCount < 1 ? 1 : senderCount; for (int i=0;i<senderTime;i++) { String body = "hello,rabbit mq" + i; channel.basicPublish("", QUEUE_NAME, null, body.getBytes()); } } catch (IOException e) { e.printStackTrace(); }finally { messageBrokerHelper.closeChannel(channel); messageBrokerHelper.closeConnection(connection); } }
消息应答与持久化
订阅模式
模型
- 一个生产者,多个消费者,每个消费者有自己报价列
- 生产者没有直接把消息发送到队列,而是发到了交换机,转发器exchange
- 每个队列都要绑定到交换机上
- 生产者发送的消息,经过交换机,到达队列。实现一个消息被多个消费者所消费。
场景:
注册->邮件->短信
生产者
消费者
exchange(交换机 转发器)
接收生产者消息,并接收到的消转发给队列
fanout:不处理路由键
Direct:处理路由键
路由模式
Topic模式
‘# 匹配一个或者多个
*匹配一个
RPC模式
消息确认机制(事务+confirm)
两种方式:
AMQP实现了事务机制
Confirm模式
事务机制
-
txselect
用户将当前channel设置成transation模式
-
txCommit
用于提交事务
- txRollback
回滚事务
Confirm模式
生产者的实现原理
开启confirm模式
这篇关于RabbitMQ学习的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-10-27[开源] 一款轻量级的kafka可视化管理平台
- 2024-10-23Kafka消息丢失资料详解:初学者必看教程
- 2024-10-23Kafka资料新手入门指南
- 2024-10-23Kafka解耦入门:新手必读教程
- 2024-10-23Kafka入门:新手必读的简单教程
- 2024-10-23Kafka入门:新手必读的简单教程
- 2024-10-23Kafka消息丢失入门:新手必读指南
- 2024-10-23Kafka消息队列入门:新手必看的简单教程
- 2024-10-23Kafka消息队列入门与应用
- 2024-10-23Kafka重复消费入门:轻松掌握Kafka重复消息处理技巧