redis之mq实现发布订阅模式
2021/8/29 19:06:21
本文主要是介绍redis之mq实现发布订阅模式,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
概述
Redis不仅可作为缓存服务器,还可用作消息队列,本示例演示如何使用redis实现发布/订阅消息队列。
- 在Redis中,发布者没有将消息发送给特定订阅者的程序。相反,发布的消息被描述为通道,而不知道(如果有的话)可能有哪些订阅者。
- 订阅者表示对一个或多个主题感兴趣,只接收感兴趣的消息,而不知道(如果有的话)发布者是什么。
- 发布者和订阅者的这种解耦可以实现更大的可伸缩性和更动态的网络拓扑。
代码实现
redis实现mq的存储方式很多,可以使用list,zset及stream,这些数据的存储结构决定了怎么消费问题(消息是一次使用、允许多次使用、允许多端消息等),比如使用list,我们可以使用leftPush插入消息,使用rightPop消费消息,实现一条消息一次消息,可以参考与以示例代码:
@Test public void testMq() { for (int i = 0; i < 10; i++) { redisTemplate.opsForList().leftPush("task-queue", "data" + i); log.info("插入了一个新的任务==>{}", "data" + i); } String taskId = redisTemplate.opsForList().rightPop("task-queue").toString(); log.info("处理成功,清除任务==>{}", taskId); }
1.配置代码RedisConfig.java
package demo.data.mqRedis.config; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cache.annotation.CachingConfigurerSupport; import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; @Configuration @EnableCaching public class RedisConfig extends CachingConfigurerSupport { @Autowired private RedisTemplate redisTemplate; /** * redisTemplate 序列化使用的jdkSerializeable, 存储二进制字节码, 所以自定义序列化类,方便调试redis * * @param redisConnectionFactory * @return */ @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>(); //使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值 redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer()); redisTemplate.setHashValueSerializer(new GenericJackson2JsonRedisSerializer()); //使用StringRedisSerializer来序列化和反序列化redis的ke redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setHashKeySerializer(new StringRedisSerializer()); //开启事务 redisTemplate.setEnableTransactionSupport(true); redisTemplate.setConnectionFactory(redisConnectionFactory); return redisTemplate; } @Bean MessageListenerAdapter messageListener() { return new MessageListenerAdapter(new RedisMessageSubscriber()); } @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.addMessageListener(listenerAdapter, topic()); return container; } @Bean MessagePublisher redisPublisher() { return new RedisMessagePublisher(redisTemplate, topic()); } @Bean ChannelTopic topic() { return new ChannelTopic("messageQueue"); } }View Code
2.定义消息发布接口MessagePublisher.java
package demo.data.mqRedis.config; public interface MessagePublisher { void publish(String message); }
3.发布方实现RedisMessagePublisher.java
package demo.data.mqRedis.config; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.ChannelTopic; /** * 消息发布方 */ public class RedisMessagePublisher implements MessagePublisher { @Autowired private RedisTemplate<String, Object> redisTemplate; @Autowired private ChannelTopic topic; public RedisMessagePublisher( RedisTemplate<String, Object> redisTemplate, ChannelTopic topic) { this.redisTemplate = redisTemplate; this.topic = topic; } public void publish(String message) { redisTemplate.convertAndSend(topic.getTopic(), message); } }
4.消息接收方RedisMessageSubscriber.java
package demo.data.mqRedis.config; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.stereotype.Service; import java.util.ArrayList; import java.util.List; /** * 消息订阅方 */ @Service @Slf4j public class RedisMessageSubscriber implements MessageListener { public static List<String> messageList = new ArrayList<>(); public void onMessage(Message message, byte[] pattern) { messageList.add(message.toString()); log.info("订阅方接收到了消息==>{}", message.toString()); } }
5.最后贴上application.yml配置
spring: redis: host: 127.0.0.1 port: 6379 password:
查看运行结果
1.编写测试用例试发布消息TestRedisMQ.java
package demo.data.mqRedis; import demo.data.mqRedis.config.RedisMessagePublisher; import lombok.extern.slf4j.Slf4j; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.UUID; @RunWith(SpringRunner.class) @SpringBootTest @Slf4j public class TestRedisMQ { @Autowired RedisMessagePublisher redisMessagePublisher; @Test public void testMq() { String message = "Message " + UUID.randomUUID(); redisMessagePublisher.publish(message); } }
https://www.cnblogs.com/tqlin/p/11468257.html
这篇关于redis之mq实现发布订阅模式的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-12-24Redis资料:新手入门快速指南
- 2024-12-24Redis资料:新手入门教程与实践指南
- 2024-12-24Redis资料:新手入门教程与实践指南
- 2024-12-07Redis高并发入门详解
- 2024-12-07Redis缓存入门:新手必读指南
- 2024-12-07Redis缓存入门:新手必读教程
- 2024-12-07Redis入门:新手必备的简单教程
- 2024-12-07Redis入门:新手必读的简单教程
- 2024-12-06Redis入门教程:从安装到基本操作
- 2024-12-06Redis缓存入门教程:轻松掌握缓存技巧