分布式技术(下)-Redis&FastDFS&RabbitMQ
2021/10/1 2:11:09
本文主要是介绍分布式技术(下)-Redis&FastDFS&RabbitMQ,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
第七阶段模块二
Redis
1. 概述
1.1 互联网架构的演变历程
第1阶段:数据访问量不大,简单的架构即可搞定!
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4ntkevuE-1633005745103)(E:\MarkDown\拉勾笔记\redis 架构第一阶段)]
第2阶段:数据访问量大,使用缓存技术来缓解数据库的压力。
不同的业务访问不同的数据库
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OL4DRruk-1633005745106)(E:\MarkDown\拉勾笔记\redis 架构第二阶段)]
第3阶段:
主从读写分离。
之前的缓存确实能够缓解数据库的压力,但是写和读都集中在一个数据库上,压力又来了。
一个数据库负责写,一个数据库负责读。分工合作。愉快!
让master(主数据库)来响应事务性(增删改)操作,让slave(从数据库)来响应非事务性 (查询)操作,然后再采用主从复制来把master上的事务性操作同步到slave数据库中
mysql的master/slave就是网站的标配!
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-LUUi1SLa-1633005745109)(E:\MarkDown\拉勾笔记\redis 架构第三阶段)]
第4阶段:
mysql的主从复制,读写分离的基础上,mysql的主库开始出现瓶颈
由于MyISAM使用表锁,所以并发性能特别差
分库分表开始流行,mysql也提出了表分区,虽然不稳定,但我们看到了希望
开始吧,mysql集群
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EeV79TLc-1633005745114)(E:\MarkDown\拉勾笔记\redis 架构第四阶段)]
1.2 Redis入门介绍
1.互联网需求的3高
高并发,高可扩,高性能
2.Redis 是一种运行速度很快,并发性能很强,并且运行在内存上的NoSql(not only sql)数据库
3.NoSQL数据库 和 传统数据库 相比的优势
NoSQL数据库无需事先为要存储的数据建立字段,随时可以存储自定义的数据格式。
而在关系数据库里,增删字段是一件非常麻烦的事情。若是非常大数据量的表,增加字段简直就是一个噩梦
4.Redis的常用使用场景
缓存,毫无疑问这是Redis当今最为人熟知的使用场景。在提升服务器性能方面非常有效;一些频繁被访问的数据,经常被访问的数据如果放在关系型数据库,每次查询的开销都会很 大,而放在redis中,因为redis 是放在内存中的可以很高效的访问
排行榜,在使用传统的关系型数据库(mysql oracle 等)来做这个事儿,非常的麻烦,而利 用Redis的SortSet(有序集合)数据结构能够简单的搞定;
计算器/限速器,利用Redis中原子性的自增操作,我们可以统计类似用户点赞数、用户访问数等,这类操作如果用MySQL,频繁的读写会带来相当大的压力;限速器比较典型的使用场景是限制某个用户访问某个API的频率,常用的有抢购时,防止用户疯狂点击带来不必要的压力;
好友关系,利用集合的一些命令,比如求交集、并集、差集等。可以方便搞定一些共同好友、共同爱好之类的功能;
简单消息队列,除了Redis自身的发布/订阅模式,我们也可以利用List来实现一个队列机制, 比如:到货通知、邮件发送之类的需求,不需要高可靠,但是会带来非常大的DB压力,完全可以用List来完成异步解耦;
Session共享,以jsp为例,默认Session是保存在服务器的文件中,如果是集群服务,同一个用户过来可能落在不同机器上,这就会导致用户频繁登陆;采用Redis保存Session后,无论用户落在那台机器上都能够获取到对应的Session信息。
1.3 Redis/Memcache/MongoDB对比
1.3.1 Redis和Memcache
Redis和Memcache都是内存数据库。不过memcache还可用于缓存其他东西,例如图片、视频等等。
memcache 数据结构单一kv,redis 更丰富一些,还提供 list,set, hash 等数据结构的存储,有效的减少网络 IO 的次数
虚拟内存–Redis当物理内存用完时,可以将一些很久没用到的value交换到磁盘
存储数据安全–memcache挂掉后,数据没了(没有持久化机制);redis可以定期保存到磁盘(持 久化)
灾难恢复–memcache挂掉后,数据不可恢复; redis数据丢失后可以通过RBD或AOF恢复
1.3.2 Redis和MongoDB
redis和mongodb并不是竞争关系,更多的是一种协作共存的关系。
mongodb本质上还是硬盘数据库,在复杂查询时仍然会有大量的资源消耗,而且在处理复杂逻辑时仍然要不可避免地进行多次查询。
这时就需要redis或Memcache这样的内存数据库来作为中间层进行缓存和加速。
比如在某些复杂页面的场景中,整个页面的内容如果都从mongodb中查询,可能要几十个查询语句,耗时很长。如果需求允许,则可以把整个页面的对象缓存至redis中,定期更新。这样 mongodb和redis就能很好地协作起来
1.4 分布式数据库CAP原理
1.4.1 CAP简介
1.传统的关系型数据库事务具备ACID:
A:原子性
C:一致性
I:独立性
D:持久性
2.分布式数据库的CAP:
C(Consistency):强一致性
“all nodes see the same data at the same time”,即更新操作成功并返回客户端后,所有节点在同一时间的数据完全一致,这就是分布式的一致性。一致性的问题在并发系统中不可避免,对于客户端来说,一致性指的是并发访问时更新过的数据如何获取的问题。从服务端来看,则是更新如何复制分布到整个系统,以保证数据最终一致。
A(Availability):高可用性
可用性指“Reads and writes always succeed”,即服务一直可用,而且要是正常的响应时间。好的可用性主要是指系统能够很好的为用户服务,不出现用户操作失败或者访问超时等用户体验不好的情况。
P(Partition tolerance):分区容错性
即分布式系统在遇到某节点或网络分区故障时,仍然能够对外提供满足一致性或可用性的服务。
分区容错性要求能够使应用虽然是一个分布式系统,而看上去却好像是在一个可以运转正常的整体。比如现在的分布式系统中有某一个或者几个机器宕掉了,其他剩下的机器还能够正常运转满足系统需求,对于用户而言并没有什么体验上的影响。
1.4.2 CAP理论
CAP理论提出就是针对分布式数据库环境的,所以,P这个属性必须容忍它的存在,而且是必须具备的。
因为P是必须的,那么我们需要选择的就是A和C。
大家知道,在分布式环境下,为了保证系统可用性,通常都采取了复制的方式,避免一个节点损坏,导致系统不可用。那么就出现了每个节点上的数据出现了很多个副本的情况,而数据从一个节点复制到另外的节点时需要时间和要求网络畅通的,所以,当P发生时,也就是无法向某个节点复制数据时,这时候你有两个选择:
选择可用性 A,此时,那个失去联系的节点依然可以向系统提供服务,不过它的数据就不能保证是同步的了(失去了C属性)。
选择一致性C,为了保证数据库的一致性,我们必须等待失去联系的节点恢复过来,在这个过程中,那个节点是不允许对外提供服务的,这时候系统处于不可用状态(失去了A属性)。
最常见的例子是读写分离,某个节点负责写入数据,然后将数据同步到其它节点,其它节点提供读取的服务,当两个节点出现通信问题时,你就面临着选择A(继续提供服务,但是数据不保证准确),C(用户处于等待状态,一直等到数据同步完成)。
1.4.3 CAP总结
分区是常态,不可避免,三者不可共存
可用性和一致性是一对冤家
一致性高,可用性低
一致性低,可用性高
因此,根据 CAP 原理将 NoSQL 数据库分成了满足 CA 原则、满足 CP 原则和满足 AP 原则三大类:
CA - 单点集群,满足一致性,可用性的系统,通常在可扩展性上不太强大。
CP - 满足一致性,分区容忍性的系统,通常性能不是特别高。
AP - 满足可用性,分区容忍性的系统,通常可能对一致性要求低一些。
2. 下载与安装
2.1 下载
redis:http://www.redis.net.cn/
图形工具:https://redisdesktop.com/download
2.2 安装
虽然可以在安装在windows操作系统,但是官方不推荐,所以我们一如既往的安装在linux上
1.上传tar.gz包,并解压
tar -zxvf redis-5.0.4.tar.gz
2.安装gcc(必须有网络)
yum -y install gcc
忘记是否安装过,可以使用 gcc -v 命令查看gcc版本,如果没有安装过,会提示命令不存在
3.进入redis目录,进行编译
make
4.编译之后,开始安装
make install
2.3 安装后的操作
2.3.1 后台运行方式
1.redis默认不会使用后台运行,如果你需要,修改配置文件daemonize=yes,当你后台服务启动的时候,会写成一个进程文件运行。
vim /opt/redis-5.0.4/redis.conf
daemonize yes
2.以配置文件的方式启动
cd /usr/local/bin redis-server /opt/redis-5.0.4/redis.conf
2.3.2 关闭数据库
1.单实例关闭
redis-cli shutdown
2.多实例关闭
redis-cli -p 6379 shutdown
2.3.3 常用操作
1.检测6379端口是否在监听
netstat -lntp | grep 6379
端口为什么是6379?
6379在是手机按键上MERZ对应的号码, 而MERZ取自意大利歌女Alessia Merz的名字。 MERZ长期以来被antirez(redis作者)及其朋友当作愚蠢的代名词。
2.检测后台进程是否存在
ps -ef|grep redis
2.3.4 连接redis并测试
redis-cli ping
2.3.5 HelloWorld
set k1 china # 保存数据 get kl # 获取数据
2.3.6 测试性能
1.先 ctrl+c,退出redis客户端
redis-benchmark
2.执行命令后,命令不会自动停止,需要我们手动ctrl+c停止测试
[root@localhost bin]# redis-benchmark ====== PING_INLINE ====== 100000 requests completed in 1.80 seconds # 1.8秒处理了10万个请求,性能要看笔记本的配置高低 50 parallel clients 3 bytes payload keep alive: 1 87.69% <= 1 milliseconds 99.15% <= 2 milliseconds 99.65% <= 3 milliseconds 99.86% <= 4 milliseconds 99.92% <= 5 milliseconds 99.94% <= 6 milliseconds 99.97% <= 7 milliseconds 100.00% <= 7 milliseconds 55524.71 requests per second # 每秒处理的请求数量
2.3.7 默认16个数据库
vim /opt/redis-5.0.4/redis.conf 127.0.0.1:6379> get k1 # 查询k1 "china" 127.0.0.1:6379> select 16 # 切换16号数据库 (error) ERR DB index is out of range # 数据库的下标超出了范围 127.0.0.1:6379> select 15 # 切换15号数据库 OK 127.0.0.1:6379[15]> get k1 # 查询k1 (nil) 127.0.0.1:6379[15]> select 0 # 切换0号数据库 OK 127.0.0.1:6379> get k1 # 查询k1 "china"
2.3.8 数据库键的数量
dbsize
redis在linux支持命令补全(tab)
2.3.9 清空数据库
1.清空当前库
flushdb
2.清空所有(16个)库,慎用!!
flushall
2.3.10 模糊查询(key)
模糊查询keys命令,有三个通配符:
1.*:通配任意多个字符
查询所有的键
keys *
模糊查询k开头,后面随便多少个字符
keys k*
模糊查询e为最后一位,前面随便多少个字符
keys *e
双 * 模式,匹配任意多个字符:查询包含k的键
keys *k*
2.?:通配单个字符
模糊查询k字头,并且匹配一个字符
keys k?
你只记得第一个字母是k,他的长度是3
keys k??
3.[]:通配括号内的某一个字符
记得其他字母,就第二个字母可能是a或e
keys r[ae]dis
2.3.11 键(key)
1.exists key:判断某个key是否存在
127.0.0.1:6379> exists k1 (integer) 1 # 存在 127.0.0.1:6379> exists y1 (integer) 0 # 不存在
2.move key db:移动(剪切,粘贴)键到几号库
127.0.0.1:6379> move x1 8 # 将x1移动到8号库 (integer) 1 # 移动成功 127.0.0.1:6379> exists x1 # 查看当前库中是否存在x1 (integer) 0 # 不存在(因为已经移走了) 127.0.0.1:6379> select 8 # 切换8号库 OK 127.0.0.1:6379[8]> keys * # 查看当前库中的所有键 1) "x1"
3.ttl key:查看键还有多久过期(-1永不过期,-2已过期)
time to live 还能活多久
127.0.0.1:6379[8]> ttl x1 (integer) -1 # 永不过期
4.expire key 秒:为键设置过期时间(生命倒计时)
127.0.0.1:6379[8]> set k1 v1 # 保存k1 OK 127.0.0.1:6379[8]> ttl k1 # 查看k1的过期时间 (integer) -1 # 永不过期 127.0.0.1:6379[8]> expire k1 10 # 设置k1的过期时间为10秒(10秒后自动销毁) (integer) 1 # 设置成功 127.0.0.1:6379[8]> get k1 # 获取k1 "v1" 127.0.0.1:6379[8]> ttl k1 # 查看k1的过期时间 (integer) 2 # 还有2秒过期 127.0.0.1:6379[8]> get k1 (nil) 127.0.0.1:6379[8]> keys * # 从内存中销毁了 (empty list or set)
5.type key:查看键的数据类型
127.0.0.1:6379[8]> type k1 string # k1的数据类型是会string字符串
3. 使用Redis
3.1 五大数据类型
操作文档:http://redisdoc.com/
3.1.1 字符串String
1.set/get/del/append/strlen
127.0.0.1:6379> set k1 v1 # 保存数据 OK 127.0.0.1:6379> set k2 v2 # 保存数据 OK 127.0.0.1:6379> keys * 1) "k1" 2) "k2" 127.0.0.1:6379> del k2 # 删除数据k2 (integer) 1 127.0.0.1:6379> keys * 1) "k1" 127.0.0.1:6379> get k1 # 获取数据k1 "v1" 127.0.0.1:6379> append k1 abc # 往k1的值追加数据abc (integer) 5 # 返回值的长度(字符数量) 127.0.0.1:6379> get k1 "v1abc" 127.0.0.1:6379> strlen k1 # 返回k1值的长度(字符数量) (integer) 5
2.incr/decr/incrby/decrby:加减操作,操作的必须是数字类型
incr:意思是increment,增加
decr:意思是decrement,减少
127.0.0.1:6379> set k1 1 # 初始化k1的值为1 OK 127.0.0.1:6379> incr k1 # k1自增1(相当于++) (integer) 2 127.0.0.1:6379> incr k1 (integer) 3 127.0.0.1:6379> get k1 "3" 127.0.0.1:6379> decr k1 # k1自减1(相当于--) (integer) 2 127.0.0.1:6379> decr k1 (integer) 1 127.0.0.1:6379> get k1 "1" 127.0.0.1:6379> incrby k1 3 # k1自增3(相当于+=3) (integer) 4 127.0.0.1:6379> get k1 "4" 127.0.0.1:6379> decrby k1 2 # k1自减2(相当于-=2) (integer) 2 127.0.0.1:6379> get k1 "2"
3.getrange/setrange:类似between…and…
range:范围
127.0.0.1:6379> set k1 abcdef # 初始化k1的值为abcdef OK 127.0.0.1:6379> get k1 "abcdef" 127.0.0.1:6379> getrange k1 0 -1 # 查询k1全部的值 "abcdef" 127.0.0.1:6379> getrange k1 0 3 # 查询k1的值,范围是下标0~下标3(包含0和3,共返回4个字符) "abcd" 127.0.0.1:6379> setrange k1 1 xxx # 替换k1的值,从下标1开始替换为xxx (integer) 6 127.0.0.1:6379> get k1 "axxxef"
4.setex/setnx
set with expir:添加数据的同时设置生命周期
127.0.0.1:6379> setex k1 5 v1 # 添加k1 v1数据的同时,设置5秒的声明周期 OK 127.0.0.1:6379> get k1 "v1" 127.0.0.1:6379> get k1 (nil) # 已过期,k1的值v1自动销毁
set if not exist:添加数据的时候判断是否已经存在,防止已存在的数据被覆盖掉
127.0.0.1:6379> setnx k1 wei (integer) 0 # 添加失败,因为k1已经存在 127.0.0.1:6379> get k1 "weiwei" 127.0.0.1:6379> setnx k2 wei (integer) 1 # k2不存在,所以添加成功
5.mset/mget/msetnx
m:more更多
127.0.0.1:6379> set k1 v1 k2 v2 # set不支持一次添加多条数据 (error) ERR syntax error 127.0.0.1:6379> mset k1 v1 k2 v2 k3 v3 # mset可以一次添加多条数据 OK 127.0.0.1:6379> keys * 1) "k1" 2) "k2" 3) "k3" 127.0.0.1:6379> mget k2 k3 # 一次获取多条数据 1) "v2" 2) "v3" 127.0.0.1:6379> msetnx k3 v3 k4 v4 # 一次添加多条数据时,如果添加的数据中有已经存在的,则失败 (integer) 0 127.0.0.1:6379> msetnx k4 v4 k5 v5 # 一次添加多条数据时,如果添加的数据中都不存在的,则成功 (integer) 1
6.getset:先get后set
127.0.0.1:6379> getset k6 v6 (nil) # 因为没有k6,所以get为null,然后将k6 v6的值添加到数据库 127.0.0.1:6379> keys * 1) "k4" 2) "k1" 3) "k2" 4) "k3" 5) "k5" 6) "k6" 127.0.0.1:6379> get k6 "v6" 127.0.0.1:6379> getset k6 vv6 # 先获取k6的值,然后修改k6的值为vv6 "v6" 127.0.0.1:6379> get k6 "vv6"
3.1.2 列表List
push和pop,类似机枪AK47:push,压子弹,pop,射击出子弹
1.lpush/rpush/lrange
l:left 自左向右→添加 (从上往下添加)
r:right 自右向左←添加(从下往上添加)
127.0.0.1:6379> lpush list01 1 2 3 4 5 # 从上往下添加 (integer) 5 127.0.0.1:6379> keys * 1) "list01" 127.0.0.1:6379> lrange list01 0 -1 # 查询list01中的全部数据0表示开始,-1表示结尾 1) "5" 2) "4" 3) "3" 4) "2" 5) "1" 127.0.0.1:6379> rpush list02 1 2 3 4 5 # 从下往上添加 (integer) 5 127.0.0.1:6379> lrange list02 0 -1 1) "1" 2) "2" 3) "3" 4) "4" 5) "5"
2.lpop/rpop:移除第一个元素(上左下右)
127.0.0.1:6379> lpop list02 # 从左(上)边移除第一个元素 "1" 127.0.0.1:6379> rpop list02 # 从右(下)边移除第一个元素 "5"
3.lindex:根据下标查询元素(从左向右,自上而下)
127.0.0.1:6379> lrange list01 0 -1 1) "5" 2) "4" 3) "3" 4) "2" 5) "1" 127.0.0.1:6379> lindex list01 2 # 从上到下数,下标为2的值 "3" 127.0.0.1:6379> lindex list01 1 # 从上到下数,下标为1的值 "4"
4.llen:返回集合长度
127.0.0.1:6379> llen list01 (integer) 5
5.lrem:删除n个value
127.0.0.1:6379> lpush list01 1 2 2 3 3 3 4 4 4 4 (integer) 10 127.0.0.1:6379> lrem list01 2 3 # 从list01中移除2个3 (integer) 2 127.0.0.1:6379> lrange list01 0 -1 1) "4" 2) "4" 3) "4" 4) "4" 5) "3" 6) "2" 7) "2" 8) "1"
6.ltrim:截取指定范围的值,别的全扔掉
ltrim key begindex endindex
127.0.0.1:6379> lpush list01 1 2 3 4 5 6 7 8 9 (integer) 9 127.0.0.1:6379> lrange list01 0 -1 1) "9" # 下标0 2) "8" # 下标1 3) "7" # 下标2 4) "6" # 下标3 5) "5" # 下标4 6) "4" # 下标5 7) "3" # 下标6 8) "2" # 下标7 9) "1" # 下标8 127.0.0.1:6379> ltrim list01 3 6 # 截取下标3~6的值,别的全扔掉 OK 127.0.0.1:6379> lrange list01 0 -1 1) "6" 2) "5" 3) "4" 4) "3"
7.rpoplpush:从一个集合搞一个元素到另一个集合中(右出一个,左进一个)
127.0.0.1:6379> rpush list01 1 2 3 4 5 (integer) 5 127.0.0.1:6379> lrange list01 0 -1 1) "1" 2) "2" 3) "3" 4) "4" 5) "5" 127.0.0.1:6379> rpush list02 1 2 3 4 5 (integer) 5 127.0.0.1:6379> lrange list02 0 -1 1) "1" 2) "2" 3) "3" 4) "4" 5) "5" 127.0.0.1:6379> rpoplpush list01 list02 # list01右边出一个,从左进入到list02的第一个位置 "5" 127.0.0.1:6379> lrange list01 0 -1 1) "1" 2) "2" 3) "3" 4) "4" 127.0.0.1:6379> lrange list02 0 -1 1) "5" 2) "1" 3) "2" 4) "3" 5) "4" 6) "5"
8.lset:改变某个下标的某个值
lset key index value
127.0.0.1:6379> lrange list02 0 -1 1) "5" 2) "1" 3) "2" 4) "3" 5) "4" 6) "5" 127.0.0.1:6379> lset list02 0 x # 将list02中下标为0的元素修改成x OK 127.0.0.1:6379> lrange list02 0 -1 1) "x" 2) "1" 3) "2" 4) "3" 5) "4" 6) "5"
9.linsert:插入元素(指定某个元素之前/之后)
linsert key before/after oldvalue newvalue
127.0.0.1:6379> lrange list02 0 -1 1) "x" 2) "1" 3) "2" 4) "3" 5) "4" 6) "5" 127.0.0.1:6379> linsert list02 before 2 java # 从左边进入,在list02中的2元素之前插入java (integer) 7 127.0.0.1:6379> lrange list02 0 -1 1) "x" 2) "1" 3) "java" 4) "2" 5) "3" 6) "4" 7) "5" 127.0.0.1:6379> linsert list02 after 2 redis # 从左边进入,在list02中的2元素之后插入redis (integer) 8 127.0.0.1:6379> lrange list02 0 -1 1) "x" 2) "1" 3) "java" 4) "2" 5) "redis" 6) "3" 7) "4" 8) "5"
10.性能总结:类似添加火车皮一样,头尾操作效率高,中间操作效率惨;
3.1.3 集合Set
和java中的set特点类似,不允许重复
1.sadd/smembers/sismember:添加/查看/判断是否存在
127.0.0.1:6379> sadd set01 1 2 2 3 3 3 # 添加元素(自动排除重复元素) (integer) 3 127.0.0.1:6379> smembers set01 # 查询set01集合 1) "1" 2) "2" 3) "3" 127.0.0.1:6379> sismember set01 2 (integer) 1 # 存在 127.0.0.1:6379> sismember set01 5 (integer) 0 # 不存在
注意:1和0可不是下标,而是布尔。1:true存在,0:false不存在
2.scard:获得集合中的元素个数
127.0.0.1:6379> scard set01 (integer) 3 # 集合中有3个元素
3.srem:删除集合中的元素
srem key value
127.0.0.1:6379> srem set01 2 # 移除set01中的元素2 (integer) 1 # 1表示移除成功
4.srandmember:从集合中随机获取几个元素
srandmember 整数(个数)
127.0.0.1:6379> sadd set01 1 2 3 4 5 6 7 8 9 (integer) 9 127.0.0.1:6379> smembers set01 1) "1" 2) "2" 3) "3" 4) "4" 5) "5" 6) "6" 7) "7" 8) "8" 9) "9" 127.0.0.1:6379> srandmember set01 3 # 从set01中随机获取3个元素 1) "8" 2) "2" 3) "3" 127.0.0.1:6379> srandmember set01 5 # 从set01中随机获取5个元素 1) "5" 2) "8" 3) "7" 4) "4" 5) "6"
5.spop:随机出栈(移除)
127.0.0.1:6379> smembers set01 1) "1" 2) "2" 3) "3" 4) "4" 5) "5" 6) "6" 7) "7" 8) "8" 9) "9" 127.0.0.1:6379> spop set01 # 随机移除一个元素 "8" 127.0.0.1:6379> spop set01 # 随机移除一个元素 "7"
6.smove:移动元素:将key1某个值赋值给key2
127.0.0.1:6379> sadd set01 1 2 3 4 5 (integer) 5 127.0.0.1:6379> sadd set02 x y z (integer) 3 127.0.0.1:6379> smove set01 set02 3 # 将set01中的元素3移动到set02中 (integer) 1 # 移动成功
7.数学集合类
交集:sinter
并集:sunion
差集:sdiff
127.0.0.1:6379> sadd set01 1 2 3 4 5 (integer) 5 127.0.0.1:6379> sadd set02 2 a 1 b 3 (integer) 5 127.0.0.1:6379> sinter set01 set02 # set01和set02共同存在的元素 1) "1" 2) "2" 3) "3" 127.0.0.1:6379> sunion set01 set02 # 将set01和set02中所有元素合并起来(排除重复的) 1) "5" 2) "4" 3) "3" 4) "2" 5) "b" 6) "a" 7) "1" 127.0.0.1:6379> sdiff set01 set02 # 在set01中存在,在set02中不存在 1) "4" 2) "5" 127.0.0.1:6379> sdiff set02 set01 # 在set02中存在,在set01中不存在 1) "b" 2) "a"
3.1.4 哈希Hash
类似java里面的Map
KV模式不变,但V是一个键值对
1.hset/hget/hmset/hmget/hgetall/hdel:添加/得到/多添加/多得到/得到全部/删除属性
127.0.0.1:6379> hset user id 1001 # 添加user,值为id=1001 (integer) 1 127.0.0.1:6379> hget user (error) ERR wrong number of arguments for 'hget' command 127.0.0.1:6379> hget user id # 查询user,必须指明具体的字段 "1001" 127.0.0.1:6379> hmset student id 101 name tom age 22 # 添加学生student,属性一堆 OK 127.0.0.1:6379> hget student name # 获取学生名字 "tom" 127.0.0.1:6379> hmget student name age # 获取学生姓名、年龄 1) "tom" 2) "22" 127.0.0.1:6379> hgetall student # 获取学生全部信息 1) "id" 2) "101" 3) "name" 4) "tom" 5) "age" 6) "22" 127.0.0.1:6379> hdel student age # 删除学生年龄属性 (integer) 1 # 删除成功 127.0.0.1:6379> hgetall student 1) "id" 2) "101" 3) "name" 4) "tom"
2.hlen:返回元素的属性个数
127.0.0.1:6379> hgetall student 1) "id" 2) "101" 3) "name" 4) "tom" 127.0.0.1:6379> hlen student (integer) 2 # student属性的数量,id和name,共两个属性
3.hexists:判断元素是否存在某个属性
127.0.0.1:6379> hexists student name # student中是否存在name属性 (integer) 1 # 存在 127.0.0.1:6379> hexists student age # student中是否存在age属性 (integer) 0 # 不存在
4.hkeys/hvals:获得属性的所有key/获得属性的所有value
127.0.0.1:6379> hkeys student # 获取student所有的属性名 1) "id" 2) "name" 127.0.0.1:6379> hvals student # 获取student所有属性的值(内容) 1) "101" 2) "tom"
5.hincrby/hincrbyfloat:自增(整数)/自增(小数)
127.0.0.1:6379> hmset student id 101 name tom age 22 OK 127.0.0.1:6379> hincrby student age 2 # 自增整数2 (integer) 24 127.0.0.1:6379> hget student age "24" 127.0.0.1:6379> hmset user id 1001 money 1000 OK 127.0.0.1:6379> hincrbyfloat user money 5.5 # 自增小数5.5 "1005.5" 127.0.0.1:6379> hget user money "1005.5"
6.hsetnx:添加的时候,先判断是否存在
127.0.0.1:6379> hsetnx student age 18 # 添加时,判断age是否存在 (integer) 0 # 添加失败,因为age已存在 127.0.0.1:6379> hsetnx student sex 男 # 添加时,判断sex是否存在 (integer) 1 # 添加成功,因为sex不存在 127.0.0.1:6379> hgetall student 1) "id" 2) "101" 3) "name" 4) "tom" 5) "age" 6) "24" 7) "sex" 8) "\xe7\x94\xb7" # 可以添加中文,但是显示为乱码(后期解决)
3.1.5 有序集合Zset
真实需求:
充10元可享vip1;
充20元可享vip2;
充30元可享vip3;
以此类推…
1.zadd/zrange (withscores):添加/查询
127.0.0.1:6379> zadd zset01 10 vip1 20 vip2 30 vip3 40 vip4 50 vip5 (integer) 5 127.0.0.1:6379> zrange zset01 0 -1 # 查询数据 1) "vip1" 2) "vip2" 3) "vip3" 4) "vip4" 5) "vip5" 127.0.0.1:6379> zrange zset01 0 -1 withscores # 带着分数查询数据 1) "vip1" 2) "10" 3) "vip2" 4) "20" 5) "vip3" 6) "30" 7) "vip4" 8) "40" 9) "vip5" 10) "50"
2.zrangebyscore:模糊查询
( : 不包含
limit:跳过几个截取几个
127.0.0.1:6379> zrangebyscore zset01 20 40 # 20 <= score <= 40 1) "vip2" 2) "vip3" 3) "vip4" 127.0.0.1:6379> zrangebyscore zset01 20 (40 # 20 <= score < 40 1) "vip2" 2) "vip3" 127.0.0.1:6379> zrangebyscore zset01 (20 (40 # 20 < score < 40 1) "vip3" 127.0.0.1:6379> zrangebyscore zset01 10 40 limit 2 2 # 10 <= score <=40,共返回四个,跳过前2个,取2个 1) "vip3" 2) "vip4" 127.0.0.1:6379> zrangebyscore zset01 10 40 limit 2 1 # 20 <= score <=40,共返回四个,跳过前2个,取1个 1) "vip3"
3.zrem:删除元素
127.0.0.1:6379> zrem zset01 vip5 # 移除vip5 (integer) 1
4.zcard/zcount/zrank/zscore:集合长度/范围内元素个数/得元素下标/通过值得分数
127.0.0.1:6379> zcard zset01 # 集合中元素的个数 (integer) 4 127.0.0.1:6379> zcount zset01 20 30 # 分数在20~30之间,共有几个元素 (integer) 2 127.0.0.1:6379> zrank zset01 vip3 # vip3在集合中的下标(从上向下) (integer) 2 127.0.0.1:6379> zscore zset01 vip2 # 通过元素获得对应的分数 "20"
5.zrevrank:逆序找下标(从下向上)
127.0.0.1:6379> zrevrank zset01 vip3 (integer) 1
6.zrevrange:逆序查询
127.0.0.1:6379> zrange zset01 0 -1 # 顺序查询 1) "vip1" 2) "vip2" 3) "vip3" 4) "vip4" 127.0.0.1:6379> zrevrange zset01 0 -1 # 逆序查询 1) "vip4" 2) "vip3" 3) "vip2" 4) "vip1"
7.zrevrangebyscore:逆序范围查找
127.0.0.1:6379> zrevrangebyscore zset01 30 20 # 逆序查询分数在30~20之间的(注意,先写大值,再写小值) 1) "vip3" 2) "vip2" 127.0.0.1:6379> zrevrangebyscore zset01 20 30 # 如果小值在前,则结果为null (empty list or set)
3.2 持久化
3.2.1 RDB
Redis DataBase
在指定的时间间隔内,将内存中的数据集的快照写入磁盘;
默认保存在/usr/local/bin中,文件名dump.rdb;
3.2.1.1 自动备份
redis是内存数据库,当我们每次用完redis,关闭linux时,按道理来说,内存释放,redis中的数据也会随之消失
为什么我们再次启动redis的时候,昨天的数据还在,并没有消失呢?
正是因为,每次关机时,redis会自动将数据备份到一个文件中 :/usr/local/bin/dump.rdb
接下来我们就来全方位的认识 自动备份机制
1.默认的自动备份策略不利于我们测试,所以修改redis.conf文件中的自动备份策略
vim redis.conf /SNAP # 搜索 save 900 1 # 900秒内,至少变更1次,才会自动备份 save 120 10 # 120秒内,至少变更10次,才会自动备份 save 60 10000 # 60秒内,至少变更10000次,才会自动备份
当然如果你只是用Redis的缓存功能,不需要持久化,那么你可以注释掉所有的 save 行来停用保存功能。可以直接一个空字符串来实现停用:save “”
2.使用shutdown模拟关机 ,关机之前和关机之后,对比dump.rdb文件的更新时间
注意:当我们使用shutdown命令,redis会自动将数据库备份,所以,dump.rdb文件创建时间更新了
3.开机启动redis,我们要在120秒内保存10条数据,再查看dump.rdb文件的更新时间(开两个终端窗口,方便查看)
4.120秒内保存10条数据这一动作触发了备份指令,目前,dump.rdb文件中保存了10条数据,将 dump.rdb拷贝一份dump10.rdb,此时两个文件中都保存10条数据
5.既然有数据已经备份了,那我们就肆无忌惮的将数据全部删除flushall,再次shutdown关机
6.再次启动redis,发现数据真的消失了,并没有按照我们所想的将dump.rdb文件中的内容恢复到 redis中。为什么?
因为,当我们保存10条以上的数据时,数据备份起来了;
然后删除数据库,备份文件中的数据,也没问题;
但是,问题出在shutdown上,这个命令一旦执行,就会立刻备份,将删除之后的空数据库生成备份文件,将之前装10条数据的备份文件覆盖掉了。所以,就出现了上图的结果。自动恢复失败。
怎么解决这个问题呢?要将备份文件再备份
7.将dump.rdb文件删除,将dump10.rdb重命名为dump.rdb
8.启动redis服务,登录redis,数据10条,全部恢复!
3.2.1.2 手动备份
之前自动备份,必须更改好多数据,例如上边,我们改变了十多条数据,才会自动备份;
现在,我只保存一条数据,就想立刻备份,应该怎么做?
每次操作完成,执行命令 save 就会立刻备份
3.2.1.3 与RDB相关的配置
1.stop-writes-on-bgsave-error:进水口和出水口,出水口发生故障与否
yes:当后台备份时候反生错误,前台停止写入
no:不管死活,就是往里怼
2.rdbcompression:对于存储到磁盘中的快照,是否启动LZF压缩算法,一般都会启动,因为这点性能,多买一台电脑,完全搞定N个来回了。
yes:启动
no:不启动(不想消耗CPU资源,可关闭)
3.rdbchecksum:在存储快照后,是否启动CRC64算法进行数据校验;
开启后,大约增加10%左右的CPU消耗;
如果希望获得最大的性能提升,可以选择关闭;
4.dbfilename:快照备份文件名字
5.dir:快照备份文件保存的目录,默认为当前目录
优势and劣势
优:适合大规模数据恢复,对数据完整性和一致行要求不高;
劣:一定间隔备份一次,意外down掉,就失去最后一次快照的所有修改
3.2.2 AOF
Append Only File 以日志的形式记录每个写操作;
将redis执行过的写指令全部记录下来(读操作不记录);
只许追加文件,不可以改写文件;
redis在启动之初会读取该文件从头到尾执行一遍,这样来重新构建数据;
3.2.2.1 开启AOF
1.为了避免失误,最好将redis.conf总配置文件备份一下,然后再修改内容如下:
appendonly yes appendfilename appendonly.aof
2.重新启动redis,以新配置文件启动
redis-server /opt/redis5.0.4/redis.conf
3.连接redis,加数据,删库,退出
4.查看当前文件夹多一个aof文件,看看文件中的内容,保存的都是写操作
文件中最后一句要删除,否则数据恢复不了
编辑这个文件,最后要 :wq! 强制执行
5.只需要重新连接,数据恢复成功
3.2.2.2 共存?谁优先?
我们查看redis.conf文件,AOF和RDB两种备份策略可以同时开启,那系统会怎样选择?
1.动手试试,编辑appendonly.aof,胡搞乱码,保存退出
2.启动redis 失败,所以是AOF优先载入来恢复原始数据!因为AOF比RDB数据保存的完整性更高!
3.修复AOF文件,杀光不符合redis语法规范的代码
reids-check-aof --fix appendonly.aof
3.2.2.3 与AOF相关的配置
1.appendonly:开启aof模式
2.appendfilename:aof的文件名字,最好别改!
3.appendfsync:追写策略
always:每次数据变更,就会立即记录到磁盘,性能较差,但数据完整性好
everysec:默认设置,异步操作,每秒记录,如果一秒内宕机,会有数据丢失
no:不追写
4.no-appendfsync-on-rewrite:重写时是否运用Appendfsync追写策略;用默认no即可,保证数据安全性。
AOF采用文件追加的方式,文件会越来越大,为了解决这个问题,增加了重写机制,redis会自动记录上一次AOF文件的大小,当AOF文件大小达到预先设定的大小时,redis就会启动 AOF文件进行内容压缩,只保留可以恢复数据的最小指令集合
5.auto-aof-rewrite-percentage:如果AOF文件大小已经超过原来的100%,也就是一倍,才重写压缩
6.auto-aof-rewrite-min-size:如果AOF文件已经超过了64mb,才重写压缩
3.2.3 总结(如何选择?)
RDB:只用作后备用途,建议15分钟备份一次就好
AOF:
在最恶劣的情况下,也只丢失不超过2秒的数据,数据完整性比较高,但代价太大,会带来持续的IO
对硬盘的大小要求也高,默认64mb太小了,企业级最少都是5G以上;
后面要学习的master/slave才是新浪微博的选择!!
3.3 事务
可以一次执行多个命令,是一个命令组,一个事务中,所有命令都会序列化(排队),不会被插队;
一个队列中,一次性,顺序性,排他性的执行一系列命令
三特性
隔离性:所有命令都会按照顺序执行,事务在执行的过程中,不会被其他客户端送来的命令打断
没有隔离级别:队列中的命令没有提交之前都不会被实际的执行,不存在“事务中查询要看到事务里的更新,事务外查询不能看到”这个头疼的问题
不保证原子性:冤有头债有主,如果一个命令失败,但是别的命令可能会执行成功,没有回滚
三步走
开启multi
入队queued
执行exec
关系型数据库事务相比
multi:可以理解成关系型事务中的 begin
exec :可以理解成关系型事务中的 commit
discard :可以理解成关系型事务中的 rollback
3.3.1 一起生
开启事务,加入队列,一起执行,并成功
127.0.0.1:6379> multi # 开启事务 OK 127.0.0.1:6379> set k1 v1 QUEUED # 加入队列 127.0.0.1:6379> set k2 v2 QUEUED # 加入队列 127.0.0.1:6379> get k2 QUEUED # 加入队列 127.0.0.1:6379> set k3 v3 QUEUED # 加入队列 127.0.0.1:6379> exec # 执行,一起成功! 1) OK 2) OK 3) "v2" 4) OK
3.3.2 一起死
放弃之前的操作,恢复到原来的值
127.0.0.1:6379> multi # 开启事务 OK 127.0.0.1:6379> set k1 v1111 QUEUED 127.0.0.1:6379> set k2 v2222 QUEUED 127.0.0.1:6379> discard # 放弃操作 OK 127.0.0.1:6379> get k1 "v1" # 还是原来的值
3.3.3 一粒老鼠屎坏一锅汤
一句报错,全部取消,恢复到原来的值
127.0.0.1:6379> multi OK 127.0.0.1:6379> set k4 v4 QUEUED 127.0.0.1:6379> setlalala # 一句报错 (error) ERR unknown command `setlalala`, with args beginning with: 127.0.0.1:6379> set k5 v5 QUEUED 127.0.0.1:6379> exec # 队列中命令全部取消 (error) EXECABORT Transaction discarded because of previous errors. 127.0.0.1:6379> keys * # 还是原来的值 1) "k2" 2) "k3" 3) "k1"
3.3.4 冤有头债有主
追究责任,谁的错,找谁去
127.0.0.1:6379> multi OK 127.0.0.1:6379> incr k1 # 虽然v1不能++,但是加入队列并没有报错,类似java中的通过编译 QUEUED 127.0.0.1:6379> set k4 v4 QUEUED 127.0.0.1:6379> set k5 v5 QUEUED 127.0.0.1:6379> exec 1) (error) ERR value is not an integer or out of range # 真正执行的时候,报错 2) OK # 成功 3) OK # 成功 127.0.0.1:6379> keys * 1) "k5" 2) "k1" 3) "k3" 4) "k2" 5) "k4"
3.3.5 watch监控
测试:模拟收入与支出
正常情况下:
127.0.0.1:6379> set in 100 # 收入100元 OK 127.0.0.1:6379> set out 0 # 支出0元 OK 127.0.0.1:6379> multi OK 127.0.0.1:6379> decrby in 20 # 收入-20 QUEUED 127.0.0.1:6379> incrby out 20 # 支出+20 QUEUED 127.0.0.1:6379> exec 1) (integer) 80 2) (integer) 20 # 结果,没问题!
特殊情况下:
127.0.0.1:6379> watch in # 监控收入in OK 127.0.0.1:6379> multi OK 127.0.0.1:6379> decrby in 20 QUEUED 127.0.0.1:6379> incrby out 20 QUEUED 127.0.0.1:6379> exec (nil) # 在exec之前,我开启了另一个窗口(线程),对监控的in做了修改,所以本次的事务将被打断(失效),类似于“乐观锁”
unwatch:取消watch命令对所有key的操作
一旦执行了exec命令,那么之前加的所有监控自动失效!
3.4 Redis的发布订阅
进程间的一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。例如:微信订阅号
订阅一个或多个频道
127.0.0.1:6379> subscribe cctv1 cctv5 cctv6 # 1.订阅三个频道 Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "cctv1" 3) (integer) 1 1) "subscribe" 2) "cctv5" 3) (integer) 2 1) "subscribe" 2) "cctv6" 3) (integer) 3 1) "message" # 3.cctv5接收到推送过来的信息 2) "cctv5" 3) "NBA"
127.0.0.1:6379> publish cctv5 NBA # 2.发送消息给cctv5 (integer) 1
3.5 主从复制
就是 redis集群的策略
配从(库)不配主(库):小弟可以选择谁是大哥,但大哥没有权利去选择小弟
读写分离:主机写,从机读
3.5.1 一主二仆
1.准备三台服务器,并修改redis.conf
bind 0.0.0.0
2.启动三台redis,并查看每台机器的角色,都是master
info replication
3.测试开始
3.1) 首先,将三个机器全都清空,第一台添加值
mset k1 v1 k2 v2
3.2) 其余两台机器,复制(找大哥)
slaveof 192.168.44.129 6379
3.3) 第一台再添加值
set k3 v3
思考1:slave之前的k1和k2是否能拿到?
可以获得,只要跟了大哥,之前的数据也会立刻同步
思考2:slave之后的k3是否能拿到?
可以获得,只要跟了大哥,数据会立刻同步
思考3:同时添加k4,结果如何?
主机(129master)可以添加成功,从机(130和131是slave)失败,从机只负责读取数据,无权写入数据,这就是“读写分离”
思考4:主机shutdown,从机如何?
130和131仍然是slave,并显示他们的master已离线
思考5:主机重启,从机又如何?
130和131仍然是slave,并显示他们的master已上线
思考6:从机死了,主机如何?
从机归来身份是否变化? 主机没有变化,只是显示少了一个slave
主机和从机没有变化,而重启归来的从机自立门户成为了master,不和原来的集群在一起了
3.5.2 血脉相传
一个主机理论上可以多个从机,但是这样的话,这个主机会很累
我们可以使用java面向对象继承中的传递性来解决这个问题,减轻主机的负担
形成祖孙三代:
127.0.0.1:6379> slaveof 192.168.44.129 6379 # 130跟随129 OK 127.0.0.1:6379> slaveof 192.168444.130 6379 # 131跟随130 OK
3.5.3 谋权篡位
1个主机,2个从机,当1个主机挂掉了,只能从2个从机中再次选1个主机
国不可一日无君,军不可一日无帅
手动选老大
模拟测试:1为master,2和3为slave,当1挂掉后,2篡权为master,3跟2
slaveof no one # 2上执行,没有人能让我臣服,那我就是老大
slaveof 192.168.44.130 6379 # 3跟随2号
思考:当1再次回归,会怎样?
2和3已经形成新的集群,和1没有任何的关系了。所以1成为了光杆司令
3.5.4 复制原理
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-x48CmhGl-1633005745120)(E:\MarkDown\拉勾笔记\redis 主从复制原理)]
完成上面几个步骤后就完成了从服务器数据初始化的所有操作,从服务器此时可以接收来自用户的读请求
全量复制:Slave初始化阶段,这时Slave需要将Master上的所有数据都复制一份slave接收到数据文件后,存盘,并加载到内存中;(步骤1234)
增量复制:Slave初始化后,开始正常工作时主服务器发生的写操作同步到从服务器的过程;(步骤56)
但,只要是重新连接master,一次性(全量复制)同步将自动执行;
Redis主从同步策略:
主从刚刚连接的时候,进行全量同步;
全同步结束后,进行增量同步。当然,如果有需要,slave 在任何时候都可以发起全量同步。
redis 策略是,无论如何,首先会尝试进行增量同步,如不成功,要求从机进行全量同步。
3.5.5 哨兵模式
自动版的谋权篡位!
有个哨兵一直在巡逻,突然发现!!!!!老大挂了,小弟们会自动投票,从众小弟中选出新的老大
Sentinel是Redis的高可用性解决方案:
由一个或多个Sentinel实例组成的Sentinel系统可以监视任意多个主服务器,以及所有从服务器,并在被监视的主服务器进入下线状态时,自动将下线主服务器属下的某个从服务器升级为新的主服务器,然后由新的主服务器代替已下线的主服务器继续处理命令请求
模拟测试
- 1主,2和3从
- 每一台服务器中创建一个配置文件sentinel.conf,名字绝不能错,并编辑sentinel.conf
# sentinel monitor 被监控主机名(自定义) ip port 票数 sentinel monitor redis129 192.168.44.129 6379 1
- 启动服务的顺序:主Redis --> 从Redis --> Sentinel1/2/3
redis-sentinel sentinel.conf
- 将1号老大挂掉,后台自动发起激烈的投票,选出新的老大
127.0.0.1:6379> shutdown not connected> exit
-
查看最后权利的分配
3成为了新的老大,2还是小弟
-
如果之前的老大再次归来呢?
1号再次归来,自己成为了master,和3平起平坐
过了几秒之后,被哨兵检测到了1号机的归来,1号你别自己玩了,进入集体吧,但是新的老大已经产生了,你只能作为小弟再次进入集体!
3.5.6 缺点
由于所有的写操作都是在master上完成的;
然后再同步到slave上,所以两台机器之间通信会有延迟;
当系统很繁忙的时候,延迟问题会加重;
slave机器数量增加,问题也会加重
3.6 总配置redis.conf 详解
# Redis 配置文件示例 # 注意单位: 当需要配置内存大小时, 可能需要指定像1k,5GB,4M等常见格式 # # 1k => 1000 bytes # 1kb => 1024 bytes # 1m => 1000000 bytes # 1mb => 1024*1024 bytes # 1g => 1000000000 bytes # 1gb => 1024*1024*1024 bytes # # 单位是对大小写不敏感的 1GB 1Gb 1gB 是相同的。 ################################## INCLUDES 包含文件相关################################### # 可以在这里包含一个或多个其他的配置文件。如果你有一个适用于所有Redis服务器的标准配置模板 # 但也需要一些每个服务器自定义的设置,这个功能将很有用。被包含的配置文件也可以包含其他配置文 件, # 所以需要谨慎的使用这个功能。 # # 注意“inclue”选项不能被admin或Redis哨兵的"CONFIG REWRITE"命令重写。 # 因为Redis总是使用最后解析的配置行最为配置指令的值, 你最好在这个文件的开头配置includes来 # 避免它在运行时重写配置。 # 如果相反你想用includes的配置覆盖原来的配置,你最好在该文件的最后使用include # # include /path/to/local.conf # include /path/to/other.conf ################################ GENERAL 综合配置##################################### # 默认Rdis不会作为守护进程运行。如果需要的话配置成'yes' # 注意配置成守护进程后Redis会将进程号写入文件/var/run/redis.pid daemonize no # 当以守护进程方式运行时,默认Redis会把进程ID写到 /var/run/redis.pid。你可以在这里修改路径。 pidfile /var/run/redis.pid # 接受连接的特定端口,默认是6379 # 如果端口设置为0,Redis就不会监听TCP套接字。 port 6379 # TCP listen() backlog. # server在与客户端建立tcp连接的过程中,SYN队列的大小 # 在高并发环境下你需要一个高backlog值来避免慢客户端连接问题。注意Linux内核默默地将这个值减小 # 到/proc/sys/net/core/somaxconn的值,所以需要确认增大somaxconn和tcp_max_syn_backlog # 两个值来达到想要的效果。 tcp-backlog 511 # 默认Redis监听服务器上所有可用网络接口的连接。可以用"bind"配置指令跟一个或多个ip地址来实现 # 监听一个或多个网络接口 # # 示例: # # bind 192.168.1.100 10.0.0.1 # bind 127.0.0.1 # 指定用来监听Unix套套接字的路径。没有默认值, 所以在没有指定的情况下Redis不会监听Unix套接字 # # unixsocket /tmp/redis.sock # unixsocketperm 755 # 一个客户端空闲多少秒后关闭连接。(0代表禁用,永不关闭) timeout 0 # TCP keepalive. # # 如果非零,则设置SO_KEEPALIVE选项来向空闲连接的客户端发送ACK,由于以下两个原因这是很有用的: # # 1)能够检测无响应的对端 # 2)让该连接中间的网络设备知道这个连接还存活 # # 在Linux上,这个指定的值(单位:秒)就是发送ACK的时间间隔。 # 注意:要关闭这个连接需要两倍的这个时间值。 # 在其他内核上这个时间间隔由内核配置决定 # # 这个选项的一个合理值是60秒 tcp-keepalive 0 # 指定服务器调试等级 # 可能值: # debug (大量信息,对开发/测试有用) # verbose (很多精简的有用信息,但是不像debug等级那么多) # notice (适量的信息,基本上是你生产环境中需要的) # warning (只有很重要/严重的信息会记录下来) loglevel notice # 指明日志文件名。也可以使用"stdout"来强制让Redis把日志信息写到标准输出上。 # 注意:如果Redis以守护进程方式运行,而设置日志显示到标准输出的话,日志会发送到/dev/null logfile "" # 要使用系统日志记录器,只要设置 "syslog-enabled" 为 "yes" 就可以了。 # 然后根据需要设置其他一些syslog参数就可以了。 # syslog-enabled no # 指明syslog身份 # syslog-ident redis # 指明syslog的设备。必须是user或LOCAL0 ~ LOCAL7之一。 # syslog-facility local0 # 设置数据库个数。默认数据库是 DB 0, # 可以通过select <dbid> (0 <= dbid <= 'databases' - 1 )来为每个连接使用不同的数据库。 databases 16 ############################# SNAPSHOTTING 快照,持久化操作配置 ############################# # # 把数据库存到磁盘上: # # save <seconds> <changes> # # 会在指定秒数和数据变化次数之后把数据库写到磁盘上。 # # 下面的例子将会进行把数据写入磁盘的操作: # 900秒(15分钟)之后,且至少1次变更 # 300秒(5分钟)之后,且至少10次变更 # 60秒之后,且至少10000次变更 # # 注意:你要想不写磁盘的话就把所有 "save" 设置注释掉就行了。 # # 通过添加一条带空字符串参数的save指令也能移除之前所有配置的save指令 # 像下面的例子: # save "" save 900 1 save 300 10 save 60 10000 # 默认如果开启RDB快照(至少一条save指令)并且最新的后台保存失败,Redis将会停止接受写操作 # 这将使用户知道数据没有正确的持久化到硬盘,否则可能没人注意到并且造成一些灾难。 # # 如果后台保存进程能重新开始工作,Redis将自动允许写操作 # # 然而如果你已经部署了适当的Redis服务器和持久化的监控,你可能想关掉这个功能以便于即使是 # 硬盘,权限等出问题了Redis也能够像平时一样正常工作, stop-writes-on-bgsave-error yes # 当导出到 .rdb 数据库时是否用LZF压缩字符串对象? # 默认设置为 "yes",因为几乎在任何情况下它都是不错的。 # 如果你想节省CPU的话你可以把这个设置为 "no",但是如果你有可压缩的key和value的话, # 那数据文件就会更大了。 rdbcompression yes # 因为版本5的RDB有一个CRC64算法的校验和放在了文件的最后。这将使文件格式更加可靠但在 # 生产和加载RDB文件时,这有一个性能消耗(大约10%),所以你可以关掉它来获取最好的性能。 # # 生成的关闭校验的RDB文件有一个0的校验和,它将告诉加载代码跳过检查 rdbchecksum yes # 持久化数据库的文件名 dbfilename dump.rdb # 工作目录 # # 数据库会写到这个目录下,文件名就是上面的 "dbfilename" 的值。 # # 累加文件也放这里。 # # 注意你这里指定的必须是目录,不是文件名。 dir ./ ############################### REPLICATION 主从复制的配置 ############################### # 主从同步。通过 slaveof 指令来实现Redis实例的备份。 # 注意,这里是本地从远端复制数据。也就是说,本地可以有不同的数据库文件、绑定不同的IP、监听 # 不同的端口。 # # slaveof <masterip> <masterport> # 如果master设置了密码保护(通过 "requirepass" 选项来配置),那么slave在开始同步之前必须 # 进行身份验证,否则它的同步请求会被拒绝。 # # masterauth <master-password> # 当一个slave失去和master的连接,或者同步正在进行中,slave的行为有两种可能: # # 1) 如果 slave-serve-stale-data 设置为 "yes" (默认值),slave会继续响应客户端请求, # 可能是正常数据,也可能是还没获得值的空数据。 # 2) 如果 slave-serve-stale-data 设置为 "no",slave会回复"正在从master同步 # (SYNC with master in progress)"来处理各种请求,除了 INFO 和 SLAVEOF 命令。 # slave-serve-stale-data yes # 你可以配置salve实例是否接受写操作。可写的slave实例可能对存储临时数据比较有用(因为写入salve # 的数据在同master同步之后将很容被删除),但是如果客户端由于配置错误在写入时也可能产生一些问题。 # # 从Redis2.6默认所有的slave为只读 # # 注意:只读的slave不是为了暴露给互联网上不可信的客户端而设计的。它只是一个防止实例误用的保护层。 # 一个只读的slave支持所有的管理命令比如config,debug等。为了限制你可以用'renamecommand'来 # 隐藏所有的管理和危险命令来增强只读slave的安全性 slave-read-only yes # slave根据指定的时间间隔向master发送ping请求。 # 时间间隔可以通过 repl_ping_slave_period 来设置。 # 默认10秒。 # # repl-ping-slave-period 10 # 以下选项设置同步的超时时间 # # 1)slave在与master SYNC期间有大量数据传输,造成超时 # 2)在slave角度,master超时,包括数据、ping等 # 3)在master角度,slave超时,当master发送REPLCONF ACK pings # # 确保这个值大于指定的repl-ping-slave-period,否则在主从间流量不高时每次都会检测到超时 # # repl-timeout 60 # 是否在slave套接字发送SYNC之后禁用 TCP_NODELAY ? # # 如果你选择“yes”Redis将使用更少的TCP包和带宽来向slaves发送数据。但是这将使数据传输到slave # 上有延迟,Linux内核的默认配置会达到40毫秒 # # 如果你选择了 "no" 数据传输到salve的延迟将会减少但要使用更多的带宽 # # 默认我们会为低延迟做优化,但高流量情况或主从之间的跳数过多时,把这个选项设置为“yes” # 是个不错的选择。 repl-disable-tcp-nodelay no # 设置数据备份的backlog大小。backlog是一个slave在一段时间内断开连接时记录salve数据的缓冲, # 所以一个slave在重新连接时,不必要全量的同步,而是一个增量同步就足够了,将在断开连接的这段 # 时间内slave丢失的部分数据传送给它。 # # 同步的backlog越大,slave能够进行增量同步并且允许断开连接的时间就越长。 # # backlog只分配一次并且至少需要一个slave连接 # # repl-backlog-size 1mb # 当master在一段时间内不再与任何slave连接,backlog将会释放。以下选项配置了从最后一个 # slave断开开始计时多少秒后,backlog缓冲将会释放。 # # 0表示永不释放backlog # # repl-backlog-ttl 3600 # slave的优先级是一个整数展示在Redis的Info输出中。如果master不再正常工作了,哨兵将用它来 # 选择一个slave提升=升为master。 # # 优先级数字小的salve会优先考虑提升为master,所以例如有三个slave优先级分别为10,100,25, # 哨兵将挑选优先级最小数字为10的slave。 # # 0作为一个特殊的优先级,标识这个slave不能作为master,所以一个优先级为0的slave永远不会被 # 哨兵挑选提升为master # # 默认优先级为100 slave-priority 100 # 如果master少于N个延时小于等于M秒的已连接slave,就可以停止接收写操作。 # # N个slave需要是“oneline”状态 # # 延时是以秒为单位,并且必须小于等于指定值,是从最后一个从slave接收到的ping(通常每秒发送) # 开始计数。 # # This option does not GUARANTEES that N replicas will accept the write, but # will limit the window of exposure for lost writes in case not enough slaves # are available, to the specified number of seconds. # # 例如至少需要3个延时小于等于10秒的slave用下面的指令: # # min-slaves-to-write 3 # min-slaves-max-lag 10 # # 两者之一设置为0将禁用这个功能。 # # 默认 min-slaves-to-write 值是0(该功能禁用)并且 min-slaves-max-lag 值是10。 ################################# SECURITY 安全相关配置 ################################## # 要求客户端在处理任何命令时都要验证身份和密码。 # 这个功能在有你不信任的其它客户端能够访问redis服务器的环境里非常有用。 # # 为了向后兼容的话这段应该注释掉。而且大多数人不需要身份验证(例如:它们运行在自己的服务器上) # # 警告:因为Redis太快了,所以外面的人可以尝试每秒150k的密码来试图破解密码。这意味着你需要 # 一个高强度的密码,否则破解太容易了。 # # requirepass foobared # 命令重命名 # # 在共享环境下,可以为危险命令改变名字。比如,你可以为 CONFIG 改个其他不太容易猜到的名字, # 这样内部的工具仍然可以使用,而普通的客户端将不行。 # # 例如: # # rename-command CONFIG b840fc02d524045429941cc15f59e41cb7be6c52 # # 也可以通过改名为空字符串来完全禁用一个命令 # # rename-command CONFIG "" # # 请注意:改变命令名字被记录到AOF文件或被传送到从服务器可能产生问题。 ################################## LIMITS 范围配置 ################################### # 设置最多同时连接的客户端数量。默认这个限制是10000个客户端,然而如果Redis服务器不能配置 # 处理文件的限制数来满足指定的值,那么最大的客户端连接数就被设置成当前文件限制数减32(因 # 为Redis服务器保留了一些文件描述符作为内部使用) # # 一旦达到这个限制,Redis会关闭所有新连接并发送错误'max number of clients reached' # # maxclients 10000 # 不要用比设置的上限更多的内存。一旦内存使用达到上限,Redis会根据选定的回收策略(参见: # maxmemmory-policy)删除key # # 如果因为删除策略Redis无法删除key,或者策略设置为 "noeviction",Redis会回复需要更 # 多内存的错误信息给命令。例如,SET,LPUSH等等,但是会继续响应像Get这样的只读命令。 # # 在使用Redis作为LRU缓存,或者为实例设置了硬性内存限制的时候(使用 "noeviction" 策略) # 的时候,这个选项通常事很有用的。 # # 警告:当有多个slave连上达到内存上限的实例时,master为同步slave的输出缓冲区所需 # 内存不计算在使用内存中。这样当驱逐key时,就不会因网络问题 / 重新同步事件触发驱逐key # 的循环,反过来slaves的输出缓冲区充满了key被驱逐的DEL命令,这将触发删除更多的key, # 直到这个数据库完全被清空为止 # # 总之...如果你需要附加多个slave,建议你设置一个稍小maxmemory限制,这样系统就会有空闲 # 的内存作为slave的输出缓存区(但是如果最大内存策略设置为"noeviction"的话就没必要了) # # maxmemory <bytes> # 最大内存策略:如果达到内存限制了,Redis如何选择删除key。你可以在下面五个行为里选: # # volatile-lru -> 根据LRU算法生成的过期时间来删除。 # allkeys-lru -> 根据LRU算法删除任何key。 # volatile-random -> 根据过期设置来随机删除key。 # allkeys->random -> 无差别随机删。 # volatile-ttl -> 根据最近过期时间来删除(辅以TTL) # noeviction -> 谁也不删,直接在写操作时返回错误。 # # 注意:对所有策略来说,如果Redis找不到合适的可以删除的key都会在写操作时返回一个错误。 # # 目前为止涉及的命令:set setnx setex append # incr decr rpush lpush rpushx lpushx linsert lset rpoplpush sadd # sinter sinterstore sunion sunionstore sdiff sdiffstore zadd zincrby # zunionstore zinterstore hset hsetnx hmset hincrby incrby decrby # getset mset msetnx exec sort # # 默认值如下: # # maxmemory-policy volatile-lru # LRU和最小TTL算法的实现都不是很精确,但是很接近(为了省内存),所以你可以用样本量做检测。 # 例如:默认Redis会检查3个key然后取最旧的那个,你可以通过下面的配置指令来设置样本的个数。 # # maxmemory-samples 3 ############################## APPEND ONLY MODE AOF模式配置 ############################### # 默认情况下,Redis是异步的把数据导出到磁盘上。这种模式在很多应用里已经足够好,但Redis进程 # 出问题或断电时可能造成一段时间的写操作丢失(这取决于配置的save指令)。 # # AOF是一种提供了更可靠的替代持久化模式,例如使用默认的数据写入文件策略(参见后面的配置) # 在遇到像服务器断电或单写情况下Redis自身进程出问题但操作系统仍正常运行等突发事件时,Redis # 能只丢失1秒的写操作。 # # AOF和RDB持久化能同时启动并且不会有问题。 # 如果AOF开启,那么在启动时Redis将加载AOF文件,它更能保证数据的可靠性。 # # 请查看 http://redis.io/topics/persistence 来获取更多信息. appendonly no # 纯累加文件名字(默认:"appendonly.aof") appendfilename "appendonly.aof" # fsync() 系统调用告诉操作系统把数据写到磁盘上,而不是等更多的数据进入输出缓冲区。 # 有些操作系统会真的把数据马上刷到磁盘上;有些则会尽快去尝试这么做。 # # Redis支持三种不同的模式: # # no:不要立刻刷,只有在操作系统需要刷的时候再刷。比较快。 # always:每次写操作都立刻写入到aof文件。慢,但是最安全。 # everysec:每秒写一次。折中方案。 # # 默认的 "everysec" 通常来说能在速度和数据安全性之间取得比较好的平衡。根据你的理解来 # 决定,如果你能放宽该配置为"no" 来获取更好的性能(但如果你能忍受一些数据丢失,可以考虑使用 # 默认的快照持久化模式),或者相反,用“always”会比较慢但比everysec要更安全。 # # 请查看下面的文章来获取更多的细节 # http://antirez.com/post/redis-persistence-demystified.html # # 如果不能确定,就用 "everysec" # appendfsync always appendfsync everysec # appendfsync no # 如果AOF的同步策略设置成 "always" 或者 "everysec",并且后台的存储进程(后台存储或写入AOF # 日志)会产生很多磁盘I/O开销。某些Linux的配置下会使Redis因为 fsync()系统调用而阻塞很久。 # 注意,目前对这个情况还没有完美修正,甚至不同线程的 fsync() 会阻塞我们同步的write(2)调用。 # # 为了缓解这个问题,可以用下面这个选项。它可以在 BGSAVE 或 BGREWRITEAOF 处理时阻止fsync()。 # # 这就意味着如果有子进程在进行保存操作,那么Redis就处于"不可同步"的状态。 # 这实际上是说,在最差的情况下可能会丢掉30秒钟的日志数据。(默认Linux设定) # # 如果把这个设置成"yes"带来了延迟问题,就保持"no",这是保存持久数据的最安全的方式。 no-appendfsync-on-rewrite no # 自动重写AOF文件 # 如果AOF日志文件增大到指定百分比,Redis能够通过 BGREWRITEAOF 自动重写AOF日志文件。 # # 工作原理:Redis记住上次重写时AOF文件的大小(如果重启后还没有写操作,就直接用启动时的AOF大小) # # 这个基准大小和当前大小做比较。如果当前大小超过指定比例,就会触发重写操作。你还需要指定被重写 # 日志的最小尺寸,这样避免了达到指定百分比但尺寸仍然很小的情况还要重写。 # # 指定百分比为0会禁用AOF自动重写特性。 auto-aof-rewrite-percentage 100 auto-aof-rewrite-min-size 64mb ################################ LUA SCRIPTING ############################### # 设置lua脚本的最大运行时间,单位为毫秒,redis会记个log,然后返回error。当一个脚本超过了最大时限。只有SCRIPT KILL和SHUTDOWN NOSAVE可以用。第一个可以杀没有调write命令的东西。要是已经调用了write,只能用第二个命令杀。 lua-time-limit 5000 ################################## SLOW LOG ################################### # 是redis用于记录慢查询执行时间的日志系统。由于slowlog只保存在内存中,因此slowlog的效率很高,完全不用担心会影响到redis的性能。 # 只有query执行时间大于slowlog-log-slower-than的才会定义成慢查询,才会被slowlog进行记录。 # 单位是微妙 slowlog-log-slower-than 10000 # slowlog-max-len表示慢查询最大的条数 slowlog-max-len 128 ############################ EVENT NOTIFICATION ############################## # 这个功能可以让客户端通过订阅给定的频道或者模式,来获知数据库中键的变化,以及数据库中命令的执行情况,所以在默认配置下,该功能处于关闭状态。 # notify-keyspace-events 的参数可以是以下字符的任意组合,它指定了服务器该发送哪些类型的通知: # K 键空间通知,所有通知以 __keyspace@__ 为前缀 # E 键事件通知,所有通知以 __keyevent@__ 为前缀 # g DEL 、 EXPIRE 、 RENAME 等类型无关的通用命令的通知 # $ 字符串命令的通知 # l 列表命令的通知 # s 集合命令的通知 # h 哈希命令的通知 # z 有序集合命令的通知 # x 过期事件:每当有过期键被删除时发送 # e 驱逐(evict)事件:每当有键因为 maxmemory 政策而被删除时发送 # A 参数 g$lshzxe 的别名 # 输入的参数中至少要有一个 K 或者 E,否则的话,不管其余的参数是什么,都不会有任何 通知被分发。详细使用可以参考http://redis.io/topics/notifications notify-keyspace-events "" ############################### ADVANCED CONFIG ############################### # 单位字节:数据量小于等于hash-max-ziplist-entries的用ziplist,大于hash-max-ziplistentries用hash hash-max-ziplist-entries 512 # value大小小于等于hash-max-ziplist-value的用ziplist,大于hash-max-ziplist-value用hash。 hash-max-ziplist-value 64 # 数据量小于等于list-max-ziplist-entries用ziplist(压缩列表),大于list-max-ziplistentries用list。 list-max-ziplist-entries 512 # value大小小于等于list-max-ziplist-value的用ziplist,大于list-max-ziplist-value用list。 list-max-ziplist-value 64 # 数据量小于等于set-max-intset-entries用iniset,大于set-max-intset-entries用set。 set-max-intset-entries 512 # 数据量小于等于zset-max-ziplist-entries用ziplist,大于zset-max-ziplist-entries用zset。 zset-max-ziplist-entries 128 # value大小小于等于zset-max-ziplist-value用ziplist,大于zset-max-ziplist-value用zset。 zset-max-ziplist-value 64 # 基数统计的算法 HyperLogLog 键只需要花费 12 KB 内存,就可以计算接近 2^64 个不同元素的基数 # 设置HyeperLogLog的字节数限制,这个值通常在0~15000之间,默认为3000,基本不超过16000。value大小小于等于hll-sparse-max-bytes使用稀疏数据结构(sparse),大于hll-sparse-maxbytes使用稠密的数据结构(dense)。一个比16000大的value是几乎没用的,建议的value大概为3000。如果对CPU要求不高,对空间要求较高的,建议设置到10000左右。 hll-sparse-max-bytes 3000 # 重置hash。 Redis将在每100毫秒时使用1毫秒的CPU时间来对redis的hash表进行重新hash,可以降低内存的使用。当你的使用场景中,有非常严格的实时性需要,不能够接受Redis时不时的对请求有2毫秒的延迟的话,把这项配置为no。如果没有这么严格的实时性要求,可以设置为yes,以便能够尽可能快的释放内存。 activerehashing yes # 对于Redis服务器的输出(也就是命令的返回值)来说,其大小通常是不可控制的。有可能一个简单的命令,能够产生体积庞大的返回数据。另外也有可能因为执行了太多命令,导致产生返回数据的速率超过了往客户端发送的速率,这是也会导致服务器堆积大量消息,从而导致输出缓冲区越来越大,占用过多内存,甚至导致系统崩溃。 # 用于强制断开出于某些原因而无法以足够快的速度从服务器读取数据的客户端的连接。 #对于normal client,包括monitor。第一个0表示取消hard limit,第二个0和第三个0表示取消soft limit,normal client默认取消限制,因为如果没有寻问,他们是不会接收数据的。 client-output-buffer-limit normal 0 0 0 #对于slave client和MONITER client,如果client-output-buffer一旦超过256mb,又或者超过64mb持续60秒,那么服务器就会立即断开客户端连接。 client-output-buffer-limit slave 256mb 64mb 60 #对于pubsub client,如果client-output-buffer一旦超过32mb,又或者超过8mb持续60秒,那么服务器就会立即断开客户端连接。 client-output-buffer-limit pubsub 32mb 8mb 60 # redis执行任务的频率 hz 10 # aof rewrite过程中,是否采取增量"文件同步"策略,默认为"yes",而且必须为yes. # rewrite过程中,每32M数据进行一次文件同步,这样可以减少"aof大文件"写入对磁盘的操作次数. aof-rewrite-incremental-fsync yes
通常情况下,默认的配置足够你解决问题!
没有极特殊的要求,不要乱改配置!
3.7 Jedis
java和redis打交道的API客户端
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.1.0</version> </dependency>
3.7.1 连接redis
/** * @auther wei * @date 2021/9/24 13:48 * @description 测试连接redis */ public class Test1 { public static void main(String[] args) { Jedis jedis = new Jedis("192.168.44.129",6379); String pong = jedis.ping(); System.out.println("pong = " + pong); } } // 运行前: // 1.关闭防火墙 systemctl stop firewalld.service // 2.修改redis.conf [ bind 0.0.0.0 ] 允许任何ip访问,以这个redis.conf启动redis服务(重启redis) // redis-server /opt/redis5.0.4/redis.conf
3.7.2 常用API
/** * @auther wei * @date 2021/9/24 13:59 * @description 常用API */ public class Test2_API { private void testString(){ Jedis jedis = new Jedis("192.168.44.129",6379); // String jedis.set("k1","v1"); jedis.set("k2","v2"); jedis.set("k3","v3"); Set<String> set = jedis.keys("*"); Iterator<String> iterator = set.iterator(); for (set.iterator();iterator.hasNext();){ String k = iterator.next(); System.out.println(k+"->"+jedis.get(k)); } Boolean k2Exists = jedis.exists("k2"); // 查看k2是否存在 System.out.println("k2Exists = " + k2Exists); System.out.println(jedis.ttl("k1")); // 查看k1的过期时间 //jedis.mset("k4","v4","k5","v5"); System.out.println(jedis.mget("k1","k2","k3","k4","k5")); System.out.println("-------------------------------------------------------"); } private void testList(){ Jedis jedis = new Jedis("192.168.44.129",6379); // list //jedis.lpush("list01","l1","l2","l3","l4","l5"); List<String> list01 = jedis.lrange("list01", 0, -1); for (String s : list01) { System.out.println(s); } System.out.println("-------------------------------------------------------"); } private void testSet(){ Jedis jedis = new Jedis("192.168.44.129",6379); // set jedis.sadd("order","jd001"); jedis.sadd("order","jd002"); jedis.sadd("order","jd003"); Set<String> order = jedis.smembers("order"); Iterator<String> order_iterator = order.iterator(); while(order_iterator.hasNext()){ String s = order_iterator.next(); System.out.println(s); } jedis.srem("order","jd002"); System.out.println(jedis.smembers("order").size()); } private void testHash(){ Jedis jedis = new Jedis("192.168.44.129",6379); jedis.hset("user1","username","james"); System.out.println(jedis.hget("user1","username")); HashMap<String, String> map = new HashMap<>(); map.put("username","tom"); map.put("gender","boy"); map.put("address","beijing"); map.put("phone","15152037019"); jedis.hmset("user2",map); List<String> list = jedis.hmget("user2", "username", "phone"); for (String s : list) { System.out.println(s); } } private void testZset(){ Jedis jedis = new Jedis("192.168.44.129",6379); jedis.zadd("zset01",60d,"zs1"); jedis.zadd("zset01",70d,"zs2"); jedis.zadd("zset01",80d,"zs3"); jedis.zadd("zset01",90d,"zs4"); Set<String> zset01 = jedis.zrange("zset01", 0, -1); Iterator<String> iterator = zset01.iterator(); while(iterator.hasNext()){ String s = iterator.next(); System.out.println(s); } } public static void main(String[] args) { Test2_API api = new Test2_API(); //api.testString(); // 测试String //api.testList(); // 测试list //api.testSet(); // 测试list //api.testHash(); // 测试hash api.testZset(); // 测试zset } }
3.7.3 事务
初始化余额和支出
set yue 100 set zhichu 0
/** * @auther wei * @date 2021/9/24 14:42 * @description 测试事务 */ public class TestTransaction { public static void main(String[] args) throws InterruptedException { Jedis jedis = new Jedis("192.168.44.129", 6379); int yue = Integer.parseInt(jedis.get("yue")); int zhichu = 10; jedis.watch("yue"); // 监控余额 Thread.sleep(5000); // 模拟网络延迟 if (yue < zhichu){ jedis.unwatch(); // 解除监控 System.out.println("余额不足!"); }else { Transaction transaction = jedis.multi(); // 开启事务 transaction.decrBy("yue",zhichu); // 余额减少 transaction.incrBy("zhichu",zhichu); // 累计消费增加 transaction.exec(); System.out.println("余额:" + jedis.get("yue")); System.out.println("累计支出:" + jedis.get("zhichu")); } } }
模拟网络延迟:,10秒内,进入linux修改余额为5,这样,余额<支出,就会进入if
3.7.4 JedisPool
redis的连接池技术
详情:https://help.aliyun.com/document_detail/98726.html
<dependency> <groupId>commons-pool</groupId> <artifactId>commons-pool</artifactId> <version>1.6</version> </dependency>
使用单例模式进行优化
/** * @auther wei * @date 2021/9/24 14:55 * @description 单例模式优化jedis连接池 */ public class JdeisPoolUtil { private JdeisPoolUtil(){} private volatile static JedisPool jedisPool = null; private volatile static Jedis jedis = null; // 返回一个连接池 private static JedisPool getInstance(){ // 双层检测锁(企业中用的非常频繁) if (jedisPool == null){ // 第一层:检测体温 synchronized (JdeisPoolUtil.class){ // 排队进站 if (jedisPool == null){ // 第二层:查看健康码 JedisPoolConfig config = new JedisPoolConfig(); config.setMaxTotal(1000); // 资源池中的最大连接数 config.setMaxIdle(30); // 资源池允许的最大空闲连接数 config.setMaxWaitMillis(60*1000); // 当资源池连接用尽后,调用者的最大等待时间(单位为毫秒) config.setTestOnBorrow(true); // 向资源池借用连接时是否做连接有效性检测(业务量很大时候建议设置为false,减少一次ping的开销) jedisPool = new JedisPool(config, "192.168.44.129", 6379); } } } return jedisPool; } // 返回jedis对象 public static Jedis getJedis(){ if (jedis == null){ jedis = getInstance().getResource(); } return jedis; } }
测试类
/** * @auther wei * @date 2021/9/24 15:05 * @description 测试jedis连接池 */ public class Test_JedisPool { public static void main(String[] args) { Jedis jedis1 = JdeisPoolUtil.getJedis(); Jedis jedis2 = JdeisPoolUtil.getJedis(); System.out.println(jedis1 == jedis2); } }
3.8 高并发下的分布式锁
经典案例:秒杀,抢购优惠券等
3.8.1 搭建工程并测试单线程
pom.xml
<packaging>war</packaging> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>5.2.7.RELEASE</version> </dependency> <!--实现分布式锁的工具类--> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.6.1</version> </dependency> <!--spring操作redis的工具类--> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-redis</artifactId> <version>2.3.2.RELEASE</version> </dependency> <!--redis客户端--> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.1.0</version> </dependency> <!--json解析工具--> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.8</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.tomcat.maven</groupId> <artifactId>tomcat7-maven-plugin</artifactId> <configuration> <port>8001</port> <path>/</path> </configuration> <executions> <execution> <!-- 打包完成后,运行服务 --> <phase>package</phase> <goals> <goal>run</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
web.xml
<?xml version="1.0" encoding="UTF-8"?> <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://xmlns.jcp.org/xml/ns/javaee" xsi:schemaLocation="http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd" id="WebApp_ID" version="3.1"> <servlet> <servlet-name>springmvc</servlet-name> <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class> <init-param> <param-name>contextConfigLocation</param-name> <param-value>classpath:spring/spring.xml</param-value> </init-param> </servlet> <servlet-mapping> <servlet-name>springmvc</servlet-name> <url-pattern>/</url-pattern> </servlet-mapping> </web-app>
spring-dao.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <context:component-scan base-package="controller"/> <!--spring为连接redis,提供的模板工具类--> <bean id="stringRedisTemplate" class="org.springframework.data.redis.core.StringRedisTemplate"> <property name="connectionFactory" ref="connectionFactory"></property> </bean> <bean id="connectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"> <property name="hostName" value="192.168.44.129"></property> <property name="port" value="6379"/> </bean> </beans>
测试类
/** * @auther wei * @date 2021/9/24 15:15 * @description 测试秒杀 */ @Controller public class TestKill { @Autowired private StringRedisTemplate stringRedisTemplate; @RequestMapping("/kill") // 只能解决一个Tomcat的并发问题:synchronized锁的是一个进程下的线程并发,如果分布式环境,多个进程并发,这种方案就失效了! public @ResponseBody synchronized String kill(){ // 1.从redis中获取手机库存数量 int phoneCount = Integer.parseInt(stringRedisTemplate.opsForValue().get("phone")); // 2.判断手机的数量是否够秒杀 if (phoneCount > 0){ phoneCount--; // 库存减少之后,再将库存的值返回redis stringRedisTemplate.opsForValue().set("phone",phoneCount+""); System.out.println("库存-1,剩余:" + phoneCount); }else { System.out.println("库存不足!"); } return "over"; } }
3.8.2 高并发测试
1.启动两次工程,端口号分别8001和8002
2.使用nginx做负载均衡
upstream wei{ server 192.168.44.1:8001; server 192.168.44.1:8002; } server { listen 80; server_name localhost; #charset koi8-r; #access_log logs/host.access.log main; location / { proxy_pass http://wei; root html; index index.html index.htm; }
/usr/local/nginx/sbin/nginx -c /usr/local/nginx/conf/nginx.conf
3.使用 JMeter 模拟1秒内发出100个http请求,会发现同一个商品会被两台服务器同时抢购!
3.8.3 实现分布式锁的思路
1.因为redis是单线程的,所以命令也就具备原子性,使用setnx命令实现锁,保存k-v
如果k不存在,保存(当前线程加锁),执行完成后,删除k表示释放锁
如果k已存在,阻塞线程执行,表示有锁
2.如果加锁成功,在执行业务代码的过程中出现异常,导致没有删除k(释放锁失败),那么就会造成死锁(后面的所有线程都无法执行)!
设置过期时间,例如10秒后,redis自动删除
3.高并发下,由于时间段等因素导致服务器压力过大或过小,每个线程执行的时间不同
第一个线程,执行需要13秒,执行到第10秒时,redis自动过期了k(释放锁)
第二个线程,执行需要7秒,加锁,执行第3秒(锁 被释放了,为什么,是被第一个线程的 finally主动deleteKey释放掉了)
。。。连锁反应,当前线程刚加的锁,就被其他线程释放掉了,周而复始,导致锁会永久失效
4.给每个线程加上唯一的标识UUID随机生成,释放的时候判断是否是当前的标识即可
5.问题又来了,过期时间如果设定?
如果10秒太短不够用怎么办?
设置60秒,太长又浪费时间
可以开启一个定时器线程,当过期时间小于总过期时间的1/3时,增长总过期时间(吃仙丹续命!)
自己实现分布式锁,太难了!
3.8.4 Redisson
Redis 是最流行的 NoSQL 数据库解决方案之一,而 Java 是世界上最流行的编程语言之一。
虽然两者看起来很自然地在一起“工作”,但是要知道,Redis 其实并没有对 Java 提供原生支持。
相反,作为 Java 开发人员,我们若想在程序中集成 Redis,必须使用 Redis 的第三方库。
而 Redisson 就是用于在 Java 程序中操作 Redis 的库,它使得我们可以在程序中轻松地使用 Redis。
Redisson 在 java.util 中常用接口的基础上,为我们提供了一系列具有分布式特性的工具类。
/** * @auther wei * @date 2021/9/24 15:15 * @description 测试秒杀 */ @Controller public class TestKill { @Autowired private Redisson redisson; @Autowired private StringRedisTemplate stringRedisTemplate; @RequestMapping("/kill") // 只能解决一个Tomcat的并发问题:synchronized锁的是一个进程下的线程并发,如果分布式环境,多个进程并发,这种方案就失效了! public @ResponseBody synchronized String kill(){ // 定义商品id String productKey = "HUAWEI-P40"; // 通过锁=redisson获取锁 RLock rLock = redisson.getLock(productKey); // 底层源码就是集成了setnx,过期时间操作 // 上锁(过期时间为30秒) rLock.lock(30, TimeUnit.SECONDS); try { // 1.从redis中获取手机库存数量 int phoneCount = Integer.parseInt(stringRedisTemplate.opsForValue().get("phone")); // 2.判断手机的数量是否够秒杀 if (phoneCount > 0){ phoneCount--; // 库存减少之后,再将库存的值返回redis stringRedisTemplate.opsForValue().set("phone",phoneCount+""); System.out.println("库存-1,剩余:" + phoneCount); }else { System.out.println("库存不足!"); } } catch (Exception e) { e.printStackTrace(); } finally { rLock.unlock(); } return "over"; } @Bean public Redisson redisson(){ Config config = new Config(); // 使用单个redis服务器 config.useSingleServer().setAddress("redis://192.168.44.129:6379").setDatabase(0); // 使用集群redis服务器 //config.useClusterServers().setScanInterval(2000).addNodeAddress("redis://192.168.44.129:6379","redis://192.168.44.130:6379","redis://192.168.44.131:6379"); return (Redisson) Redisson.create(config); } }
实现分布式锁的方案其实有很多,我们之前用过的zookeeper的特点就是高可靠性,现在我们用的 redis特点就是高性能。
目前分布式锁,应用最多的仍然是“Redis”
分布式文件系统—FastDFS
1. 场景概述
天猫,淘宝等购物网站,大量的图片和视频,文件太多,如何存储?
用户访问量大又如何保证下载速度?分布式文件系统就是解决这些问题的!
1.1 什么是文件系统
文件数据是如何存储的??
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XPy8Ledh-1633005745123)(E:\MarkDown\拉勾笔记\FastDFS 什么是文件系统)]
1.2 分布式文件系统
一台电脑存储量有限,并且并发吞吐量也有限,如何提高性能?
一吨货物,我要运送到吐鲁番:
1个人运,不敢想象
50个人运,太难了;
500个人运,每个人都很轻松;
这就是分布式吗?
答:这里面有集群的概念,也有分布式的概念,二者不要混淆,面试常问的经典题目
分布式:不同的业务模块部署在不同的服务器上或者同一个业务模块分拆多个子业务,部署不同的服务器上。解决高并发的问题;
集群:同一个业务部署在多台服务器上,提高系统的高可用
例如:
小饭馆原来只有一个厨师,切菜洗菜备料一手抓。客人越来越多,一个厨师忙不过来,只能再请一个厨师,两个厨师都能炒菜,也就是两个厨师的作用是一样的,这样,两个厨师的关系就是“集群”;
为了让厨师专心炒菜,把菜炒到极致,又请了配菜师负责切菜,备料等工作。厨师和备菜师的关系是“分布式”;
一个备菜师忙不过来,要提供两份食材给两个厨师,又请了一个备菜师,两个备菜师的关系又是“集群”。
1.3 主流的分布式文件系统
1.3.1 HDFS
(Hadoop Distributed File System)Hadoop 分布式文件系统;
高容错的系统,适合部署到廉价的机器上;
能提供高吞吐量的数据访问,非常适合大规模数据应用;
HDFS采用主从结构,一个HDFS是由一个name节点和N个data节点组成;
name节点储存元数据,一个文件分割成N份存储在不同的data节点上。
1.3.2 GFS
Google File System
可扩展的分布式文件系统,用于大型的,分布式的,对大量数据进行访问的应用;
运行于廉价的普通硬件上,可以提供容错功能;
它可以给大量的用户提供总体性能较高的服务;
GFS采用主从结构,一个GFS集群由一个master和大量的chunkserver(分块服务器)组成;
一个文件被分割若干块,分散储存到多个分块server中
1.3.3.FastDFS
由淘宝资深架构师余庆编写并开源;
专为互联网量身定制,充分考虑了冗余备份、负载均衡、线性扩容等机制,并注重高可用、高性能等指标,使用FastDFS很容易搭建一套高性能的文件服务器集群提供文件上传、下载等服务;
HDFS,GFS等都是通用的文件系统,他们的优点是开发体验好,但是系统的复杂度较高,性能也一般;
相比之下,专用的分布式文件系统体验差,但是复杂度低,性能也高,尤其fastDFS特别适合图片,小视频等小文件,因为fastDFS对文件是不分割的,所以没有文件合并的开销;
网络通信用socket,速度快。
1.4 工作原理
fastDFS包含Tracker Server和Storage Server;
客户端请求Tracker Server进行文件的上传与下载;
Tracker Server调度Storage Server最终完成上传与下载
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QCz2Vp8K-1633005745125)(E:\MarkDown\拉勾笔记\fastDFS 工作原理)]
Tracker (译:追踪者)
作用是负载均衡和调度,它管理着存储服务(Storage Server),可以理解为:“大管家,追踪者,调度员”;
Tracker Server可以集群,实现高可用,策略为“轮询”。
Storage (译:仓库; 贮存器)
作用是文件存储,客户端上传的文件最终存储到storage服务器上;
storage集群采用分组的方式,同组内的每台服务器是平等关系,数据同步,目的是实现数据备份,从而高可用,而不同组的服务器之间是不通信的;
同组内的每台服务器的存储量不一致的情况下,会选取容量最小的那个,所以同组内的服务器之间软硬件最好保持一致。
Storage Server会连接集群中的所有Tracker Server,定时向他们汇报自己的状态,例如:剩余空间,文件同步情况,文件上传下载次数等信息。
1.5 上传/下载 原理
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-e52xnRG9-1633005745126)(E:\MarkDown\拉勾笔记\FastDFS 上传原理)]
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lB6Jn0PH-1633005745129)(E:\MarkDown\拉勾笔记\FastDFS 下载原理)]
客户端上传文件后,storage会将文件id返回给客户端
group1/M00/02/11/aJxAeF21O5wAAAAAAAAGaEIOA12345.sh
组名:文件上传后,在storage组的名称,文件上传成功后,由storage返回,需要客户端自行保存。
虚拟磁盘路径:
storage配置的虚拟路径,在磁盘选项storage_path对应。
storage_path0对应M00,
storage_path1对应M01,
数据两级目录: storage在虚拟磁盘下自行创建的目录。
文件名: 与上传时不同,是用storage根据特定信息生成的,里面包含:storage服务器的ip,创建时间戳,大小,后缀名等信息
2. FastDFS的上传与下载
2.1 安装
2.1.1 安装gcc(编译时需要)
yum install -y gcc gcc-c++
2.1.2 安装libevent(运行时需求)
yum -y install libevent
2.1.3 安装 libfastcommon
libfastcommon是FastDFS官方提供的,libfastcommon包含了FastDFS运行所需要的一些基础库。
1.上传 libfastcommon-master.zip 到 /opt
安装解压zip包的命令: yum install -y unzip 解压包: unzip libfastcommon.zip 进入目录: cd libfastcommon-master
2.编译
./make.sh
如果:make.sh的权限不够,则需要授权(可执行的权利)
chmod 777 make.sh
3.安装
./make.sh install
libfastcommon安装好后会在/usr/lib64 目录下生成 libfastcommon.so 库文件
4.拷贝库文件
cd /usr/lib64 cp libfastcommon.so /usr/lib
2.1.4 安装Tracker
1.下载 FastDFS_v5.05.tar.gz,并上传到 /opt
tar -zxvf FastDFS_v5.05.tar.gz cd FastDFS ./make.sh ./make.sh install
2.安装成功将安装目录下的conf下的文件拷贝到/etc/fdfs/下
cp /opt/FastDFS/conf/* /etc/fdfs/
2.2 配置
1.Tracker配置
vim /etc/fdfs/tracker.conf
#端口号 port=22122 #基础目录(Tracker运行时会向此目录存储storage的管理数据)(基础目录不存在的话,需要自行创建 mkdir /home/fastdfs) base_path=/home/fastdfs
2.Storage配置
vim /etc/fdfs/storage.conf
#配置组名 group_name=group1 #端口 port=23000 #向tracker心跳间隔(秒) heart_beat_interval=30 #storage基础目录 #目录不存在,需要自行创建 base_path=/home/fastdfs #store存放文件的位置(store_path) #可以理解一个磁盘一个path,多个磁盘,多个store_path #fdfs_storage目录不存在,需要自行创建 #mkdir /home/fastdfs/fdfs_storage store_path0=/home/fastdfs/fdfs_storage #如果有多个挂载磁盘则定义多个store_path,如下 #store_path1=..... (M01) #store_path2=..... (M02) #配置tracker服务器:IP tracker_server=192.168.44.129:22122 #如果有多个则配置多个tracker #tracker_server=192.168.44.x:22122
2.3 启动服务
1.启动tracker
/usr/bin/fdfs_trackerd /etc/fdfs/tracker.conf restart
2.启动storage
/usr/bin/fdfs_storaged /etc/fdfs/storage.conf restart
3.查看所有运行的端口:
netstat -ntlp
2.4 搭建 Java工程
使用IDEA创建maven工程
2.4.1 pom.xml
<!--fastdfs的java客户端--> <dependency> <groupId>net.oschina.zcx7878</groupId> <artifactId>fastdfs-client-java</artifactId> <version>1.27.0.0</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-io</artifactId> <version>1.3.2</version> </dependency>
2.4.2 创建配置文件
在resources下创建config目录,在config目录下创建 fastdfs-client.properties,内容如下:
##fastdfs-client.properties fastdfs.connect_timeout_in_seconds = 5 fastdfs.network_timeout_in_seconds = 30 fastdfs.charset = UTF-8 fastdfs.http_anti_steal_token = false fastdfs.http_secret_key = FastDFS1234567890 fastdfs.http_tracker_http_port = 80 fastdfs.tracker_servers = 192.168.44.129:22122
2.4.3 文件上传
/** * @auther wei * @date 2021/9/25 15:54 * @description 文件上传 */ public class TestUpload { public static void main(String[] args) { try { // 1.加载配置文件 ClientGlobal.initByProperties("config/fastdfs-client.properties"); // 2.创建tracker客户端 TrackerClient trackerClient = new TrackerClient(); // 3.通过tracker客户端获取tracker的连接服务并返回 TrackerServer trackerServer = trackerClient.getConnection(); // 4.声明storage服务 StorageServer storageServer = null; // 5.定义storage客户端 StorageClient1 client = new StorageClient1(trackerServer, storageServer); // 6.定义文件元信息 NameValuePair[] list = new NameValuePair[1]; list[0] = new NameValuePair("fileName","1.jpg"); // 7.上传 String fileID = client.upload_file1("G:\\1.jpg", "jpg", list); System.out.println("fileID = " + fileID); // group1/M00/00/00/wKgsgWFO2OmAEE5XAExtg1rxSVE472.jpg /** * group1:一台服务器,就是一个组 * M00:storage_path0 ----> /home/fastdfs/fdfs_storage/data * 00/00:两级数据目录 */ // 关闭服务 trackerServer.close(); } catch (Exception e) { e.printStackTrace(); } } }
2.4.4 文件查询
/** * @auther wei * @date 2021/9/25 16:12 * @description 文件查询 */ public class TestQuery { public static void main(String[] args) { try { // 1.加载配置文件 ClientGlobal.initByProperties("config/fastdfs-client.properties"); // 2.创建tracker客户端 TrackerClient trackerClient = new TrackerClient(); // 3.通过tracker客户端获取tracker的连接服务并返回 TrackerServer trackerServer = trackerClient.getConnection(); // 4.声明storage服务 StorageServer storageServer = null; // 5.定义storage客户端 StorageClient1 client = new StorageClient1(trackerServer, storageServer); // 6.查询 FileInfo fileInfo = client.query_file_info1("group1/M00/00/00/wKgsgWFO2OmAEE5XAExtg1rxSVE472.jpg"); //System.out.println(fileInfo); if (fileInfo != null){ System.out.println("fileInfo = " + fileInfo); }else { System.out.println("查无此文件!"); } trackerServer.close(); } catch (Exception e) { e.printStackTrace(); } } }
2.4.5 文件下载
/** * @auther wei * @date 2021/9/25 16:17 * @description 文件下载 */ public class TestDownload { public static void main(String[] args) { try { // 1.加载配置文件 ClientGlobal.initByProperties("config/fastdfs-client.properties"); // 2.创建tracker客户端 TrackerClient trackerClient = new TrackerClient(); // 3.通过tracker客户端获取tracker的连接服务并返回 TrackerServer trackerServer = trackerClient.getConnection(); // 4.声明storage服务 StorageServer storageServer = null; // 5.定义storage客户端 StorageClient1 client = new StorageClient1(trackerServer, storageServer); // 6.下载 byte[] bytes = client.download_file1("group1/M00/00/00/wKgsgWFO2OmAEE5XAExtg1rxSVE472.jpg"); // 7.通过IO流将字节数组,转换成文件 FileOutputStream fileOutputStream = new FileOutputStream(new File("G:/xxxxx.jpg")); fileOutputStream.write(bytes); fileOutputStream.close(); trackerServer.close(); System.out.println("下载完毕!"); } catch (Exception e) { e.printStackTrace(); } } }
3. 项目实战
掌握fastDFS在真实项目中的使用方法;
掌握fastDFS实现图片服务器;
3.1 搭建图片服务器
3.1.1 Nginx模块安装 (Storage)
- 上传 fastdfs-nginx-module_v1.16.tar.gz 到 /opt
- 解压nginx模块
tar -zxvf fastdfs-nginx-module_v1.16.tar.gz
- 修改 config 文件,将文件中的 /usr/local/ 路径改为 /usr/
cd /opt/fastdfs-nginx-module/src vim config
- 将 fastdfs-nginx-module/src下的 mod_fastdfs.conf 拷贝至 /etc/fdfs 下
cp mod_fastdfs.conf /etc/fdfs/
- 修改 /etc/fdfs/mod_fastdfs.conf
vim /etc/fdfs/mod_fastdfs.conf
base_path=/home/fastdfs tracker_server=192.168.44.129:22122 #(n个tracker配置n行) #tracker_server=192.168.44.x:22122 #url中包含group名称 url_have_group_name=true #指定文件存储路径(上面配置的store路径) store_path0=/home/fastdfs/fdfs_storage
- 将 libfdfsclient.so 拷贝至 /usr/lib 下
cp /usr/lib64/libfdfsclient.so /usr/lib/
- 创建nginx/client目录
mkdir -p /var/temp/nginx/client
3.1.2 Nginx安装 (Tracker)
- 将 nginx-1.14.0.tar.gz上传到/opt(安装过nginx,此步省略)
- 解压:tar -zxvf nginx-1.14.0.tar.gz(安装过nginx,此步省略)
- 安装依赖库(安装过nginx,此步省略)
yum install pcre yum install pcre-devel yum install zlib yum install zlib-devel yum install openssl yum install openssl-devel
- 进入nginx解压的目录下 cd /opt/nginx-1.14.0
- 安装
./configure \ --prefix=/usr/local/nginx \ --pid-path=/var/run/nginx/nginx.pid \ --lock-path=/var/lock/nginx.lock \ --error-log-path=/var/log/nginx/error.log \ --http-log-path=/var/log/nginx/access.log \ --with-http_gzip_static_module \ --http-client-body-temp-path=/var/temp/nginx/client \ --http-proxy-temp-path=/var/temp/nginx/proxy \ --http-fastcgi-temp-path=/var/temp/nginx/fastcgi \ --http-uwsgi-temp-path=/var/temp/nginx/uwsgi \ --http-scgi-temp-path=/var/temp/nginx/scgi \ --add-module=/opt/fastdfs-nginx-module/src
**注意:**上边将临时文件目录指定为 /var/temp/nginx,需要在 /var 下创建 temp 及 nginx 目录:mkdir /var/temp/nginx
- 编译:make
- 安装:make install
- 拷贝配置文件
cd /opt/FastDFS/conf cp http.conf mime.types /etc/fdfs/ 是否覆盖:yes
- 修改nginx配置文件
cd /usr/local/nginx/conf/ vim nginx.conf
server { listen 80; server_name 192.168.44.129; #charset koi8-r; #access_log logs/host.access.log main; location /group1/M00 { root /home/fastdfs/fdfs_storage/data; ngx_fastdfs_module; } }
- 关闭nginx,并启动nginx
pkill -9 nginx /usr/local/nginx/sbin/nginx -c /usr/local/nginx/conf/nginx.conf
- 访问nginx并查看图片
http://192.168.44.129
http://192.168.44.129/group1/M00/00/00/wKgsgWFO2OmAEE5XAExtg1rxSVE472.jpg
3.2 创建前端页面
<%@ page contentType="text/html;charset=UTF-8" language="java" %> <html> <head> <title>上传图片</title> </head> <body> <%--上传文件,文件与文字比较起来,属于内容较大,必须使用post的提交方式--%> <%--上传文件,和普通文本有区别,action接收参数也会区别对待,所以声明带文件提交的表单为“多部件表单”--%> <form action="upload" method="post" enctype="multipart/form-data"> <input type="file" name="fname"><br> <button>提交</button> </form> </body> </html>
3.3 搭建web服务
3.3.1 pom.xml
<packaging>war</packaging> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> </properties> <dependencies> <!-- 因为有jsp页面,所以引用servlet依赖--> <dependency> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> <scope>provided</scope> <version>2.5</version> </dependency> <!-- 页面提交过来的请求,使用springmvc来处理--> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>5.2.7.RELEASE</version> </dependency> <!-- java连接fastDFS的客户端工具--> <dependency> <groupId>net.oschina.zcx7878</groupId> <artifactId>fastdfs-client-java</artifactId> <version>1.27.0.0</version> </dependency> <!-- 图片上传到FastDFS需要用的到IO工具--> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-io</artifactId> <version>1.3.2</version> </dependency> <!-- 图片保存到web服务器需要用到的IO工具--> <dependency> <groupId>commons-fileupload</groupId> <artifactId>commons-fileupload</artifactId> <version>1.3.1</version> </dependency> <!--用来转换java对象和json字符串,注意,2.7以上版本必须搭配spring5.0以上--> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.8</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.tomcat.maven</groupId> <artifactId>tomcat7-maven-plugin</artifactId> <configuration> <port>8001</port> <path>/</path> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>run</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
3.3.2 web.xml
<?xml version="1.0" encoding="UTF-8"?> <web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://xmlns.jcp.org/xml/ns/javaee" xsi:schemaLocation=" http://xmlns.jcp.org/xml/ns/javaee http://xmlns.jcp.org/xml/ns/javaee/web-app_3_1.xsd" id="WebApp_ID" version="3.1"> <servlet> <servlet-name>springMVC</servlet-name> <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class> <init-param> <param-name>contextConfigLocation</param-name> <param-value>classpath:spring/spring-mvc.xml</param-value> </init-param> </servlet> <servlet-mapping> <servlet-name>springMVC</servlet-name> <url-pattern>/</url-pattern> </servlet-mapping> </web-app>
3.3.3 spring-mvc.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:mvc="http://www.springframework.org/schema/mvc" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd"> <!--扫描注解包--> <context:component-scan base-package="controller"/> <!--扫描控制器中的注解 : @Response--> <mvc:annotation-driven/> <!--上传文件的解析器(规定上传文件的大小限制)--> <bean id="multipartResolver" class="org.springframework.web.multipart.commons.CommonsMultipartResolver"> <!--限制文件最大:2GB--> <property name="maxUploadSize" value="2048000000"/> </bean> </beans>
3.3.4 文件实体类
public class FileSystem implements Serializable { private String fileId; private String filePath; private String fileName; @Override public String toString(){} setter... getter... }
3.3.5 控制层
/** * @auther wei * @date 2021/9/25 17:24 * @description 处理上传文件的控制器 */ @Controller public class FileAction { /** * @param request 多部件表单的请求对象 * @return 上传文件对象的json对象 * @throws Exception * * 上传流程: * 1.先把文件保存到web服务器上 * 2.在从web服务器上将文件 上传 到 FastDFS上 */ @RequestMapping("/upload") public @ResponseBody FileSystem upload(MultipartHttpServletRequest request) throws Exception{ // MultipartHttpServletRequest:是httpServletRequest的强化版本,不仅可以装文本信息,还可以装图片信息 FileSystem fileSystem = new FileSystem(); /* 1.把文件保存到web服务器上 */ // 从页面请求中,获取上传的文件对象 MultipartFile file = request.getFile("fname"); // 从文件对象中获取文件的原始名称 String oldFileName = file.getOriginalFilename(); // 通过字符串截取的方式,从文件原始名中获取文件的后缀名 String hou = oldFileName.substring(oldFileName.lastIndexOf(".") + 1); // 为了避免文件因为同名而覆盖,生成全新的文件名 String newFileName = UUID.randomUUID().toString() + "." + hou; // 创建web服务器保存文件的目录(预先创建好G:/upload目录,否则系统找不到路径,会抛异常) File toSaveFile = new File("G:/upload" + newFileName); // 将路径转换成文件 file.transferTo(toSaveFile); // 服务器的绝对路径 String newFilePath = toSaveFile.getAbsolutePath(); /* 2.把文件从web服务器上传到 FastDFS */ // 加载配置文件 ClientGlobal.initByProperties("config/fastdfs-client.properties"); // 创建tracker客户端 TrackerClient trackerClient = new TrackerClient(); //通过tracker客户端获取tracker的连接服务并返回 TrackerServer trackerServer = trackerClient.getConnection(); // 声明storage服务 StorageServer storageServer = null; // 定义storage客户端 StorageClient1 client = new StorageClient1(trackerServer,storageServer); // 定义文件元信息 NameValuePair[] list = new NameValuePair[1]; list[0] = new NameValuePair("fileName",oldFileName); // 上传 String fileId = client.upload_file1(newFilePath, hou, list); //System.out.println(fileId); trackerServer.close(); // 封装fileSystem对象 fileSystem.setFileId(fileId); fileSystem.setFileName(oldFileName); fileSystem.setFilePath(fileId); // 以及上传到FastDFS上,通过fileId来访问图片,所有fileId即为文件路径 return fileSystem; } }
3.3.6 添加fastDFS的配置文件
在resources下创建config目录,在config目录下创建 fastdfs-client.properties
参考:2.4.2
3.3.7 启动fastDFS服务,测试开始
[root@localhost /]# /usr/bin/fdfs_trackerd /etc/fdfs/tracker.conf restart [root@localhost /]# /usr/bin/fdfs_storaged /etc/fdfs/storage.conf restart [root@localhost /]# /usr/local/nginx/sbin/nginx -c /usr/local/nginx/conf/nginx.conf [root@localhost /]# netstat -ntlp [root@localhost /]# systemctl stop firewalld.service [root@localhost /]# cd /home/fastdfs/fdfs_storage/data/ [root@localhost /]# ls
3.4 典型错误
重启linux服务器,可能会到nginx启动失败:
[root@localhost logs]# /usr/local/nginx/sbin/nginx -c /usr/local/nginx/conf/nginx.conf [root@localhost /]# nginx: [emerg] open() "/var/run/nginx/nginx.pid" failed (2:No such file or directory)
导致本次错误的原因,是没有修改pid文件的路径,编辑nginx的配置文件:
vim /usr/local/nginx/conf/nginx.conf
pid /usr/local/nginx/logs/nginx.pid;
再次启动nginx,搞定!
RabbitMQ
1.什么是RabbitMQ
1.1 MQ(Message Queue)消息队列
消息队列中间件,是分布式系统中的重要组件
主要解决,异步处理,应用解耦,流量削峰等问题
从而实现高性能,高可用,可伸缩和最终一致性的架构
使用较多的消息队列产品:RabbitMQ,RocketMQ,ActiveMQ,ZeroMQ,Kafka等
1.1.1 异步处理
用户注册后,需要发送验证邮箱和手机验证码;
将注册信息写入数据库,发送验证邮件,发送手机,三个步骤全部完成后,返回给客户端
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-l8GdbnuN-1633005745131)(E:\MarkDown\拉勾笔记\RabbitMQ 消息队列-异步处理)]
1.1.2 应用解耦
场景:订单系统需要通知库存系统
如果库存系统异常,则订单调用库存失败,导致下单失败
原因:订单系统和库存系统耦合度太高
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IhRcUNK5-1633005745134)(E:\MarkDown\拉勾笔记\RabbitMQ 消息队列-应用解耦)]
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户,下单成功;
库存系统:订阅下单的消息,获取下单信息,库存系统根据下单信息,再进行库存操作;
假如:下单的时候,库存系统不能正常运行,也不会影响下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了,实现了订单系统和库存系统的应用解耦;
所以说,消息队列是典型的:生产者消费者模型
生产者不断的向消息队列中生产消息,消费者不断的从队列中获取消息
因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的入侵,这样就实现了生产者和消费者的解耦
1.1.3 流量削峰
抢购,秒杀等业务,针对高并发的场景
因为流量过大,暴增会导致应用挂掉,为解决这个问题,在前端加入消息队列
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RQtEZmzW-1633005745136)(E:\MarkDown\拉勾笔记\RabbitMQ 消息队列-流量削峰)]
用户的请求,服务器接收后,首先写入消息队列,如果超过队列的长度,就抛弃,甩一个秒杀结束的页面!
说白了,秒杀成功的就是进入队列的用户;
1.2 背景知识介绍
1.2.1 AMQP高级消息队列协议
即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议
协议:数据在传输的过程中必须要遵守的规则
基于此协议的客户端可以与消息中间件传递消息
并不受产品、开发语言等条件的限制
1.2.2 JMS
Java Message Server,Java消息服务应用程序接口, 一种规范,和JDBC担任的角色类似
是一个Java平台中关于面向消息中间件的API,用于两个应用程序之间,或分布式系统中发送消息,进行异步通信
1.2.3 二者的联系
JMS是定义了统一接口,统一消息操作;AMQP通过协议统一数据交互格式
JMS必须是java语言;AMQP只是协议,与语言无关
1.2.4 Erlang语言
Erlang(['ə:læŋ])是一种通用的面向并发的编程语言,它由瑞典电信设备制造商爱立信所辖的CSLab开发,目的是创造一种可以应对大规模并发活动的编程语言和运行环境
最初是由爱立信专门为通信应用设计的,比如控制交换机或者变换协议等,因此非常适合构建分布式,实时软并行计算系统
Erlang运行时环境是一个虚拟机,有点像Java的虚拟机,这样代码一经编译,同样可以随处运行
1.3 为什么选择RabbitMQ
我们开篇说消息队列产品那么多,为什么偏偏选择RabbitMQ呢?
先看命名:兔子行动非常迅速而且繁殖起来也非常疯狂,所以就把Rabbit用作这个分布式软件的命名
Erlang开发,AMQP的最佳搭档,安装部署简单,上手门槛低
企业级消息队列,经过大量实践考验的高可靠,大量成功的应用案例,例如阿里、网易等一线大厂都有使用
有强大的WEB管理页面
强大的社区支持,为技术进步提供动力
支持消息持久化、支持消息确认机制、灵活的任务分发机制等,支持功能非常丰富
集群扩展很容易,并且可以通过增加节点实现成倍的性能提升
总结:如果你希望使用一个可靠性高、功能强大、易于管理的消息队列系统那么就选择 RabbitMQ,如果你想用一个性能高,但偶尔丢点数据不是很在乎可以使用kafka或者zeroMQ
kafka和zeroMQ的性能爆表,绝对可以压RabbitMQ一头!
1.4 RabbitMQ各组件功能
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GWYFcHAm-1633005745137)(E:\MarkDown\拉勾笔记\RabbitMQ各组件功能)]
Broker:消息队列服务器实体
Virtual Host:虚拟主机
标识一批交换机、消息队列和相关对象,形成的整体
虚拟主机是共享相同的身份认证和加密环境的独立服务器域
每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制
vhost是AMQP概念的基础,RabbitMQ默认的vhost是 /,必须在链接时指定
Exchange:交换器(路由)
用来接收生产者发送的消息并将这些消息路由给服务器中的队列
Queue:消息队列
用来保存消息直到发送给消费者。
它是消息的容器,也是消息的终点。
一个消息可投入一个或多个队列。
消息一直在队列里面,等待消费者连接到这个队列将其取走。
Banding:绑定,用于消息队列和交换机之间的关联。
Channel:通道(信道)
多路复用连接中的一条独立的双向数据流通道。
信道是建立在真实的TCP连接内的虚拟链接
AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,都是通过信道完成的
因为对操作系统来说,建立和销毁TCP连接都是非常昂贵的开销,所以引入信道的概念,用来复用TCP连接。
Connection:网络连接,比如一个TCP连接。
Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。
Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
Message:消息
消息是不具名的,它是由消息头和消息体组成。
消息体是不透明的,而消息头则是由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(优先级)、delivery-mode(消息可能需要持久性存储[消息的路由模式])等。
2.怎么用RabbitMQ
想要安装RabbitMQ,必须先安装erlang语言环境,类似安装tomcat,必须先安装JDK
查看匹配的版本:https://www.rabbitmq.com/which-erlang.html
2.1 RabbitMQ安装启动
erlang下载:https://dl.bintray.com/rabbitmq-erlang/rpm/erlang
socat下载:http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
RabbitMQ下载:https://www.rabbitmq.com/install-rpm.html#downloads
2.1.1 安装
[root@localhost opt]# rpm -ivh erlang-21.3.8.16-1.el7.x86_64.rpm
[root@localhost opt]# rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
[root@localhost opt]# rpm -ivh rabbitmq-server-3.8.6-1.el7.noarch.rpm
2.1.2 启动后台管理插件
[root@localhost opt]# rabbitmq-plugins enable rabbitmq_management
2.1.3 启动RabbitMQ
[root@localhost opt]# systemctl start rabbitmq-server.service [root@localhost opt]# systemctl status rabbitmq-server.service [root@localhost opt]# systemctl restart rabbitmq-server.service [root@localhost opt]# systemctl stop rabbitmq-server.service
2.1.4 查看进程
[root@localhost opt]# ps -ef | grep rabbitmq
2.1.5 测试
1.关闭防火墙: systemctl stop firewalld
2.浏览器输入:http://ip:15672
3.默认帐号密码:guest,guest用户默认不允许远程连接
- 创建账号
[root@localhost opt]# rabbitmqctl add_user wei 123456
- 设置用户角色
[root@localhost opt]# rabbitmqctl set_user_tags wei administrator
- 设置用户权限
[root@localhost opt]# rabbitmqctl set_permissions -p "/" wei ".*" ".*" ".*"
- 查看当前用户和角色
[root@localhost opt]# rabbitmqctl list_users
- 修改当前用户密码
[root@localhost opt]# rabbitmqctl change_password wei 123123
4.管理界面介绍
overview:概览
connections:查看链接情况
channels:信道(通道)情况
Exchanges:交换机(路由)情况,默认4类7个
Queues:消息队列情况
Admin:管理员列表
端口:
5672:RabbitMQ提供给编程语言客户端链接的端口
15672:RabbitMQ管理界面的端口
25672:RabbitMQ集群的端口
2.2 RabbitMQ快速入门
2.2.1 依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.wei</groupId> <artifactId>lagou-rabbitmq</artifactId> <version>1.0-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>11</maven.compiler.source> <maven.compiler.target>11</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.7.3</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.9</version> </dependency> </dependencies> </project>
2.2.2 日志依赖log4j(可选项)
log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.File=rebbitmq.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %l %m%n log4j.rootLogger=debug, stdout,file
2.2.2 创建连接
先创建好虚拟主机
import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @auther wei * @date 2021/9/26 20:45 * @description 专门与RabbitMQ获得连接 */ public class ConnectionUtil { public static Connection getConnection() throws Exception{ // 1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 2.工厂对象中设置MQ的连接信息(ip,port,vhost,username,password) factory.setHost("192.168.44.129"); factory.setPort(5672); factory.setVirtualHost("/lagou"); factory.setUsername("wei"); factory.setPassword("123123"); // 3.通过工厂获得与MQ的连接 Connection connection = factory.newConnection(); return connection; } public static void main(String[] args) throws Exception { Connection connection = getConnection(); System.out.println("connection = " + connection); connection.close(); } }
2.3 RabbitMQ模式
RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此我们只学习前5种
在线手册:https://www.rabbitmq.com/getstarted.html
5种消息模型,大体分为两类:
1和2属于点对点
3、4、5属于发布订阅模式(一对多)
点对点模式:P2P(point to point)模式包含三个角色:
消息队列(queue),发送者(sender),接收者(receiver)
每个消息发送到一个特定的队列中,接收者从中获得消息
队列中保留这些消息,直到他们被消费或超时
特点:
1.每个消息只有一个消费者,一旦消费,消息就不在队列中了
2.发送者和接收者之间没有依赖性,发送者发送完成,不管接收者是否运行,都不会影响消息发送到队列中(我给你发微信,不管你看不看手机,反正我发完了)
3.接收者成功接收消息之后需向对象应答成功(确认)
如果希望发送的每个消息都会被成功处理,那需要P2P
发布订阅模式:publish(Pub)/subscribe(Sub)
pub/sub模式包含三个角色:交换机(exchange),发布者(publisher),订阅者 (subcriber)
多个发布者将消息发送交换机,系统将这些消息传递给多个订阅者
特点:
1.每个消息可以有多个订阅者
2.发布者和订阅者之间在时间上有依赖,对于某个交换机的订阅者,必须创建一个订阅后,才能消费发布者的消息
3.为了消费消息,订阅者必须保持运行状态;类似于,看电视直播。
如果希望发送的消息被多个消费者处理,可采用本模式
2.3.1 简单模式
下面引用官网的一段介绍:
RabbitMQ is a message broker: it accepts and forwards messages. You can think about it as a post office: when you put the mail that you want posting in a post box, you can be sure that Mr. or Ms. Mailperson will eventually deliver the mail to your recipient. In this analogy, RabbitMQ is a post box, a post office and a postman.
译文:RabbitMQ是一个消息代理:它接收和转发消息。你可以把它想象成一个邮局:当你把你想要 寄的邮件放到一个邮箱里,你可以确定邮递员先生或女士最终会把邮件送到你的收件人那里。在 这个类比中,RabbitMQ是一个邮箱、一个邮局和一个邮递员。
RabbitMQ本身只是接收,存储和转发消息,并不会对信息进行处理!
类似邮局,处理信件的应该是收件人而不是邮局!
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ao0yesDG-1633005745139)(E:\MarkDown\拉勾笔记\RabbitMQ模式-简单模式)]
2.3.1.1 生产者P
/** * @auther wei * @date 2021/9/27 13:50 * @description 消息生产者 */ public class Sender { public static void main(String[] args) throws Exception { String msg = "wei:Hello,RabbitMQ!"; // 1.获得连接 Connection connection = ConnectionUtil.getConnection(); // 2.从连接轴创建通道(信道) Channel channel = connection.createChannel(); // 3.创建消息队列(1,2,3,4,5) /** * 参数1:队列名称 * 参数2:队列中的数据是否持久化 * 参数3:是否排外(是否支持扩展,当前队列只能自己用,不能给别人用) * 参数4:是否自动删除(当队列的连接数为0时,队列会销毁,不管队列是否还保存数据) * 参数5:队列参数(没有参数为空) */ channel.queueDeclare("queue1",false,false,false,null); // 4.向指定的队列发送消息(1,2,3,4) /** * 参数1:交换机名称(当前是简单模式,也就是P2P模式,没有交换机,所有名称为"") * 参数2:目标队列的名称 * 参数3:设置消息的属性(没有属性则为空) * 参数4:消息内容(只接收字节数组) */ channel.basicPublish("","queue1",null,msg.getBytes()); System.out.println("发送:" + msg); // 5.释放资源 channel.close(); connection.close(); } }
启动生产者,即可前往管理端查看队列中的信息,会有一条信息没有处理和确认
2.3.1.2 消费者C
/** * @auther wei * @date 2021/9/27 14:01 * @description 消息接收者 */ public class Recer { public static void main(String[] args) throws Exception { // 1.获得连接 Connection connection = ConnectionUtil.getConnection(); // 2.获得通道(信道) Channel channel = connection.createChannel(); // 3.从信道中获得消息 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override // 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息) public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body就是从队列中获取的消息 String s = new String(body); System.out.println("接收 = " + s); } }; // 4.监听队列 true:自动消息确认 channel.basicConsume("queue1",true,consumer); } }
启动消费者,前往管理端查看队列中的信息,所有信息都已经处理和确认,显示0
2.3.1.3 消息确认机制ACK
1.通过刚才的案例可以看出,消息一旦被消费,消息就会立刻从队列中移除
2.RabbitMQ如何得知消息被消费者接收?
如果消费者接收消息后,还没执行操作就抛异常宕机导致消费失败,但是RabbitMQ无从得知,这样消息就丢失了
因此,RabbitMQ有一个ACK机制,当消费者获取消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收
ACK:(Acknowledge character)即是确认字符,在数据通信中,接收站发给发送站的一种传输类控制字符。表示发来的数据已确认接收无误我们在使用http请求时,http的状态码200 就是告诉我们服务器执行成功
整个过程就想快递员将包裹送到你手里,并且需要你的签字,并拍照回执
不过这种回执ACK分为两种情况:
自动ACK:消息接收后,消费者立刻自动发送ACK(快递放在快递柜)
手动ACK:消息接收后,不会发送ACK,需要手动调用(快递必须本人签收)
两种情况如何选择,需要看消息的重要性:
如果消息不太重要,丢失也没有影响,自动ACK会比较方便
如果消息非常重要,最好消费完成手动ACK,如果自动ACK消费后,RabbitMQ就会把消息从队列中删除,如果此时消费者抛异常宕机,那么消息就永久丢失了
3.修改手动消息确认
// false:手动消息确认 channel.basicConsume("queue1", false, consumer);
结果如下:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1wiTI6T3-1633005745140)(E:\MarkDown\拉勾笔记\RabbitMQ 消息确认机制ACK)]
解决问题
/** * @auther wei * @date 2021/9/27 14:01 * @description 消息接收者 */ public class RecerByACK { public static void main(String[] args) throws Exception { // 1.获得连接 Connection connection = ConnectionUtil.getConnection(); // 2.获得通道(信道) final Channel channel = connection.createChannel(); // 3.从信道中获得消息 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override // 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息) public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body就是从队列中获取的消息 String s = new String(body); System.out.println("接收 = " + s); // 手动确认(收件人信息,是否同时确认多个消息) channel.basicAck(envelope.getDeliveryTag(),false); } }; // 4.监听队列 false:手动消息确认 channel.basicConsume("queue1",false,consumer); } }
2.3.2 工作队列模式
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VJQV6UWh-1633005745142)(E:\MarkDown\拉勾笔记\RabbitMQ模式-工作队列模式)]
之前我们学习的简单模式,一个消费者来处理消息,如果生产者生产消息过快过多,而消费者的能力有限,就会产生消息在队列中堆积(生活中的滞销)
一个烧烤师傅,一次烤50支羊肉串,就一个人吃的话,烤好的肉串会越来越多,怎么处理?
多招揽客人进行消费即可。当我们运行许多消费者程序时,消息队列中的任务会被众多消费者共享,但其中某一个消息只会被一个消费者获取(100支肉串20个人吃,但是其中的某支肉串只能被一个人吃)
2.3.2.1 生产者P
/** * @auther wei * @date 2021/9/27 14:23 * @description 消息生产者 */ public class Sender { public static void main(String[] args) throws Exception { // 1.获得连接 Connection connection = ConnectionUtil.getConnection(); // 2.从连接轴创建通道(信道) Channel channel = connection.createChannel(); // 3.创建消息队列(1,2,3,4,5) channel.queueDeclare("test_work_queue",false,false,false,null); // 4.向指定的队列发送消息(1,2,3,4) for (int i = 1; i <= 100; i++) { String msg = "羊肉串 --> " + i; channel.basicPublish("","test_work_queue",null,msg.getBytes()); System.out.println("新鲜出炉:" + msg); } // 5.释放资源 channel.close(); connection.close(); } }
2.3.2.2 消费者1
/** * @auther wei * @date 2021/9/27 14:26 * @description 消费者1 */ public class Recer1 { static int i = 1; // 统计吃掉羊肉串的数量 public static void main(String[] args) throws Exception { // 1.获得连接 Connection connection = ConnectionUtil.getConnection(); // 2.获得通道(信道) final Channel channel = connection.createChannel(); // queueDeclare() 此方法有双重作用,如果对垒不存在,就创建;如果队列存在,则获取 channel.queueDeclare("test_work_queue",false,false,false,null); // 3.从信道中获得消息 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override // 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息) public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body就是从队列中获取的消息 String s = new String(body); System.out.println("【顾客1】吃掉 " + s + "!总共吃【" + i++ + "串!】"); try { // 模拟网络延迟 Thread.sleep(200); } catch (Exception e) { e.printStackTrace(); } // 手动确认(收件人信息,是否同时确认多个消息) channel.basicAck(envelope.getDeliveryTag(),false); } }; // 4.监听队列 false:手动消息确认 channel.basicConsume("test_work_queue",false,consumer); } }
2.3.2.3 消费者2
/** * @auther wei * @date 2021/9/27 14:26 * @description 消费者2 */ public class Recer2 { static int i = 1; // 统计吃掉羊肉串的数量 public static void main(String[] args) throws Exception { // 1.获得连接 Connection connection = ConnectionUtil.getConnection(); // 2.获得通道(信道) final Channel channel = connection.createChannel(); // queueDeclare() 此方法有双重作用,如果对垒不存在,就创建;如果队列存在,则获取 channel.queueDeclare("test_work_queue",false,false,false,null); // 3.从信道中获得消息 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override // 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息) public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body就是从队列中获取的消息 String s = new String(body); System.out.println("【顾客2】吃掉 " + s + "!总共吃【" + i++ + "串!】"); try { // 模拟网络延迟 Thread.sleep(900); } catch (Exception e) { e.printStackTrace(); } // 手动确认(收件人信息,是否同时确认多个消息) channel.basicAck(envelope.getDeliveryTag(),false); } }; // 4.监听队列 false:手动消息确认 channel.basicConsume("test_work_queue",false,consumer); } }
先运行2个消费者,排队等候消费(取餐),再运行生产者开始生产消息(烤肉串)
虽然两个消费者的消费速度不一致(线程休眠时间),但是消费的数量却是一致的,各消费50个消息
例如:工作中,A同学编码速率高,B同学编码速率低,两个人同时开发一个项目,A10天完 成,B30天完成,A完成自己的编码部分,就无所事事了,等着B完成就可以了,这样是不可 以的,应该遵循“能者多劳”
效率高的多干点,效率低的少干点
看下面官网是如何给出解决思路的:
公平的分配
您可能已经注意到分派仍然不能完全按照我们的要求工作。例如,如果有两个员工,当所有奇怪的消息都很重,甚至消息都很轻时,一个员工会一直很忙,而另一个人几乎什么工作都 不做。好吧,RabbitMQ对此一无所知,它仍然会均匀地分派消息。
这是因为RabbitMQ只在消息进入队列时发送消息。它不查看用户未确认消息的数量。它只是盲目地将每条第n个消息分派给第n个消费者。
为了克服这个问题,我们可以使用设置为prefetchCount = 1的basicQos方法。这告诉RabbitMQ一次不要给一个worker发送一条以上的消息。或者,换句话说,在worker处理并确认前一个消息之前,不要向它发送新消息。相反,它将把它分派到下一个不繁忙的worker。
// 声明队列(此处为消费者,不是声明创建队列,而且获取,二者代码相同)出餐口排队 channel.queueDeclare("test_work_queue",false,false,false,null); // 可以理解为:快递一个一个送,送完一个再送下一个,速度快的送件就多 channel.basicQos(1);
能者多劳必须要配合手动的ACK机制才生效
2.3.2.4 面试题:避免消息堆积?
- workqueue,多个消费者监听同一个队列
- 接收到消息后,通过线程池,异步消费
2.3.3 发布订阅模式
看官网:
Publish/Subscribe
In the previous tutorial we created a work queue. The assumption behind a work queue is that each task is delivered to exactly one worker. In this part we’ll do something completely different – we’ll deliver a message to multiple consumers. This pattern is known as “publish/subscribe”.
To illustrate the pattern, we’re going to build a simple logging system. It will consist of two programs – the first will emit log messages and the second will receive and print them.
In our logging system every running copy of the receiver program will get the messages. That way we’ll be able to run one receiver and direct the logs to disk; and at the same time we’ll be able to run another receiver and see the logs on the screen.
Essentially, published log messages are going to be broadcast to all the receivers.
发布-订阅
在上一篇教程中,我们创建了一个工作队列。工作队列背后的假设是,每个任务都被准确地交付给一个工作者。在这一部分中,我们将做一些完全不同的事情——将消息传递给多个消费者。 此模式称为“发布/订阅”。
为了演示这个模式,我们将构建一个简单的日志记录系统。它将由两个程序组成——第一个将发送日志消息,第二个将接收和打印它们。
在我们的日志系统中,接收程序的每一个正在运行的副本都将获得消息。这样我们就可以运行一个接收器并将日志指向磁盘;与此同时,我们可以运行另一个接收器并在屏幕上看到日志。
基本上,发布的日志消息将广播到所有接收方。
生活中的案例:就是玩抖音快手,众多粉丝关注一个视频主,视频主发布视频,所有粉丝都可以得到视频通知
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZUSK9wSZ-1633005745143)(E:\MarkDown\拉勾笔记\RabbitMQ模式-发布订阅模式1)]
上图中,X就是视频主,红色的队列就是粉丝。binding是绑定的意思(关注)
P生产者发送信息给X路由,X将信息转发给绑定X的队列
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1Y5yYG9e-1633005745145)(E:\MarkDown\拉勾笔记\RabbitMQ模式-发布订阅模式2)]
X队列将信息通过信道发送给消费者,从而进行消费
整个过程,必须先创建路由
路由在生产者程序中创建
因为路由没有存储消息的能力,当生产者将信息发送给路由后,消费者还没有运行,所以没有队列,路由并不知道将信息发送给谁
运行程序的顺序:
1.MessageSender
2.MessageReceiver1和MessageReceiver2
3.MessageSender
2.3.3.1 生产者
/** * @auther wei * @date 2021/9/27 14:23 * @description 消息生产者 */ public class Sender { public static void main(String[] args) throws Exception { // 1.获得连接 Connection connection = ConnectionUtil.getConnection(); // 2.从连接轴创建通道(信道) Channel channel = connection.createChannel(); // 3.声明路由(路由名,路由类型) // fanout:不处理路由键(只需要将队列绑定到路由上,发送到路由上的消息都会被转发到与该路由绑定的所有对列上) channel.exchangeDeclare("test_exchange_fanout","fanout"); // 4.向指定的队列发送消息(1,2,3,4) String msg = "hello,大家好!"; channel.basicPublish("test_exchange_fanout","",null,msg.getBytes()); System.out.println("生产者:" + msg); // 5.释放资源 channel.close(); connection.close(); } }
2.3.3.2 消费者1
/** * @auther wei * @date 2021/9/27 14:01 * @description 消息消费者1 */ public class Recer1 { public static void main(String[] args) throws Exception { // 1.获得连接 Connection connection = ConnectionUtil.getConnection(); // 2.获得通道(信道) Channel channel = connection.createChannel(); // 3.声明队列 channel.queueDeclare("test_exchange_fanout_queue_1",false,false,false,null); // 4.绑定路由 channel.queueBind("test_exchange_fanout_queue_1","test_exchange_fanout",""); // 5.从信道中获得消息 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override // 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息) public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body就是从队列中获取的消息 String s = new String(body); System.out.println("【消费者1】 = " + s); } }; // 6.监听队列 true:自动消息确认 channel.basicConsume("test_exchange_fanout_queue_1",true,consumer); } }
2.3.3.3 消费者2
/** * @auther wei * @date 2021/9/27 14:01 * @description 消息消费者2 */ public class Recer2 { public static void main(String[] args) throws Exception { // 1.获得连接 Connection connection = ConnectionUtil.getConnection(); // 2.获得通道(信道) Channel channel = connection.createChannel(); // 3.声明队列 channel.queueDeclare("test_exchange_fanout_queue_2",false,false,false,null); // 4.绑定路由 channel.queueBind("test_exchange_fanout_queue_2","test_exchange_fanout",""); // 5.从信道中获得消息 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override // 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息) public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body就是从队列中获取的消息 String s = new String(body); System.out.println("【消费者2】 = " + s); } }; // 6.监听队列 true:自动消息确认 channel.basicConsume("test_exchange_fanout_queue_2",true,consumer); } }
2.3.4 路由模式
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ps5Dtmwk-1633005745147)(E:\MarkDown\拉勾笔记\RabbitMQ模式-路由模式)]
路由会根据类型进行定向分发消息给不同的队列,如图所示
可以理解为是快递公司的分拣中心,整个小区,东面的楼小张送货,西面的楼小王送货
2.3.4.1 生产者
/** * @auther wei * @date 2021/9/27 14:23 * @description 消息生产者 */ public class Sender { public static void main(String[] args) throws Exception { // 1.获得连接 Connection connection = ConnectionUtil.getConnection(); // 2.从连接轴创建通道(信道) Channel channel = connection.createChannel(); // 3.声明路由(路由名,路由类型) // direct:根据路由键进行定向分发消息 channel.exchangeDeclare("test_exchange_direct","direct"); // 4.向指定的队列发送消息(1,2,3,4) String msg = "用户注册,【userid=S101】"; channel.basicPublish("test_exchange_direct","insert",null,msg.getBytes()); System.out.println("[用户系统]:" + msg); // 5.释放资源 channel.close(); connection.close(); } }
2.3.4.2 消费者1
/** * @auther wei * @date 2021/9/27 14:01 * @description 消息消费者1 */ public class Recer1 { public static void main(String[] args) throws Exception { // 1.获得连接 Connection connection = ConnectionUtil.getConnection(); // 2.获得通道(信道) Channel channel = connection.createChannel(); // 3.声明队列 channel.queueDeclare("test_exchange_direct_queue_1",false,false,false,null); // 4.绑定路由(如果路由键的类型是 添加,删除,修改的话,绑定到这个队列1上) channel.queueBind("test_exchange_direct_queue_1","test_exchange_direct","insert"); channel.queueBind("test_exchange_direct_queue_1","test_exchange_direct","update"); channel.queueBind("test_exchange_direct_queue_1","test_exchange_direct","delete"); // 5.从信道中获得消息 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override // 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息) public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body就是从队列中获取的消息 String s = new String(body); System.out.println("【消费者1】 = " + s); } }; // 6.监听队列 true:自动消息确认 channel.basicConsume("test_exchange_direct_queue_1",true,consumer); } }
2.3.4.3 消费者2
/** * @auther wei * @date 2021/9/27 14:01 * @description 消息消费者2 */ public class Recer2 { public static void main(String[] args) throws Exception { // 1.获得连接 Connection connection = ConnectionUtil.getConnection(); // 2.获得通道(信道) Channel channel = connection.createChannel(); // 3.声明队列 channel.queueDeclare("test_exchange_direct_queue_2",false,false,false,null); // 4.绑定路由(如果路由键的类型是 查询的话,绑定到这个队列2上) channel.queueBind("test_exchange_direct_queue_2","test_exchange_direct","select"); // 5.从信道中获得消息 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override // 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息) public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body就是从队列中获取的消息 String s = new String(body); System.out.println("【消费者2】 = " + s); } }; // 6.监听队列 true:自动消息确认 channel.basicConsume("test_exchange_direct_queue_2",true,consumer); } }
- 记住运行程序的顺序,先运行一次sender(创建路由器)
- 有了路由器之后,在创建两个Recer1和Recer2,进行队列绑定
- 再次运行sender,发出消息
2.3.5 通配符模式
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BXEGGqDq-1633005745148)(E:\MarkDown\拉勾笔记\RabbitMQ模式-通配符模式)]
和路由模式90%是一样的。
唯独的区别就是路由键支持模糊匹配
匹配符号
*:只能匹配一个词(正好一个词,多一个不行,少一个也不行)
#:匹配0个或更多个词
看一下官网案例:
Q1绑定了路由键 * .orange.* Q2绑定了路由键 * .*.rabbit 和 lazy.#
下面生产者的消息会被发送给哪个队列?
quick.orange.rabbit # Q1 Q2 lazy.orange.elephant # Q1 Q2 quick.orange.fox # Q1 lazy.brown.fox # Q2 lazy.pink.rabbit # Q2 quick.brown.fox # 无 orange # 无 quick.orange.male.rabbit # 无
2.3.5.1 生产者
/** * @auther wei * @date 2021/9/27 14:23 * @description 消息生产者 */ public class Sender { public static void main(String[] args) throws Exception { // 1.获得连接 Connection connection = ConnectionUtil.getConnection(); // 2.从连接轴创建通道(信道) Channel channel = connection.createChannel(); // 3.声明路由(路由名,路由类型) // topic:模糊匹配的定向分发 channel.exchangeDeclare("test_exchange_topic","topic"); // 4.向指定的队列发送消息(1,2,3,4) String msg = "用户注册,【userid=S101】"; channel.basicPublish("test_exchange_topic","user.register",null,msg.getBytes()); System.out.println("[用户系统]:" + msg); // 5.释放资源 channel.close(); connection.close(); } }
2.3.5.2 消费者1
/** * @auther wei * @date 2021/9/27 14:01 * @description 消息消费者1 */ public class Recer1 { public static void main(String[] args) throws Exception { // 1.获得连接 Connection connection = ConnectionUtil.getConnection(); // 2.获得通道(信道) Channel channel = connection.createChannel(); // 3.声明队列 channel.queueDeclare("test_exchange_topic_queue_1",false,false,false,null); // 4.绑定路由(绑定 用户相关 的消息) channel.queueBind("test_exchange_topic_queue_1","test_exchange_topic","user.#"); // 5.从信道中获得消息 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override // 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息) public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body就是从队列中获取的消息 String s = new String(body); System.out.println("【消费者1】 = " + s); } }; // 6.监听队列 true:自动消息确认 channel.basicConsume("test_exchange_topic_queue_1",true,consumer); } }
2.3.5.3 消费者2
/** * @auther wei * @date 2021/9/27 14:01 * @description 消息消费者2 */ public class Recer2 { public static void main(String[] args) throws Exception { // 1.获得连接 Connection connection = ConnectionUtil.getConnection(); // 2.获得通道(信道) Channel channel = connection.createChannel(); // 3.声明队列 channel.queueDeclare("test_exchange_topic_queue_2",false,false,false,null); // 4.绑定路由(绑定 商品和订单相关 的消息) channel.queueBind("test_exchange_topic_queue_2","test_exchange_topic","product.#"); channel.queueBind("test_exchange_topic_queue_2","test_exchange_topic","order.#"); // 5.从信道中获得消息 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override // 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息) public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body就是从队列中获取的消息 String s = new String(body); System.out.println("【消费者2】 = " + s); } }; // 6.监听队列 true:自动消息确认 channel.basicConsume("test_exchange_topic_queue_2",true,consumer); } }
2.4 持久化
消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何避免消息丢失?
消费者的ACK确认机制,可以防止消费者丢失消息
万一在消费者消费之前,RabbitMQ服务器宕机了,那消息也会丢失
想要将消息持久化,那么路由和队列都要持久化才可以
2.4.1 生产者
/** * @auther wei * @date 2021/9/27 14:23 * @description 消息生产者 */ public class Sender { public static void main(String[] args) throws Exception { // 1.获得连接 Connection connection = ConnectionUtil.getConnection(); // 2.从连接轴创建通道(信道) Channel channel = connection.createChannel(); // 3.声明路由(路由名,路由类型,持久化) // topic:模糊匹配的定向分发 channel.exchangeDeclare("test_exchange_topic","topic",true); // 4.向指定的队列发送消息(1,2,3,4) String msg = "商品降价"; channel.basicPublish("test_exchange_topic","product.price", MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes()); System.out.println("[用户系统]:" + msg); // 5.释放资源 channel.close(); connection.close(); } }
2.4.2 消费者
/** * @auther wei * @date 2021/9/27 14:01 * @description 消息消费者1 */ public class Recer1 { public static void main(String[] args) throws Exception { // 1.获得连接 Connection connection = ConnectionUtil.getConnection(); // 2.获得通道(信道) Channel channel = connection.createChannel(); // 3.声明队列(第二个参数为true:表示支持持久化) channel.queueDeclare("test_exchange_topic_queue_1",true,false,false,null); // 4.绑定路由(绑定 用户相关 的消息) channel.queueBind("test_exchange_topic_queue_1","test_exchange_topic","user.#"); // 5.从信道中获得消息 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override // 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息) public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body就是从队列中获取的消息 String s = new String(body); System.out.println("【消费者1】 = " + s); } }; // 6.监听队列 true:自动消息确认 channel.basicConsume("test_exchange_topic_queue_1",true,consumer); } }
2.5 Spring整合RabbitMQ
五种消息模型,在企业中应用最广泛的就是最后一种:定向匹配topic
Spring AMQP 是基于 Spring 框架的AMQP消息解决方案,提供模板化的发送和接收消息的抽象层,提供基于消息驱动的 POJO的消息监听等,简化了我们对于RabbitMQ相关程序的开发。
2.5.1 生产端工程
依赖
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.0.1.RELEASE</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.9</version> </dependency>
spring-rabbitmq-producer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--1.配置连接工厂--> <rabbit:connection-factory id="connectionFactory" host="192.168.44.129" port="5672" username="wei" password="123123" virtual-host="/lagou"/> <!--2.配置队列--> <rabbit:queue name="test_spring_queue_1"/> <!--3.配置rabbitAdmin:主要用于在java代码中对队列的管理,用来创建,绑定,删除队列与交换机,发送消息等--> <rabbit:admin connection-factory="connectionFactory"/> <!--4.配置交换机,topic类型--> <rabbit:topic-exchange name="spring_topic_exchange"> <rabbit:bindings> <!--绑定队列--> <rabbit:binding pattern="msg.#" queue="test_spring_queue_1"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> <!--5.配置json转换的工具--> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/> <!--6.配置rabbitmq的模板--> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="spring_topic_exchange" message-converter="jsonMessageConverter"/> </beans>
发消息
/** * @auther wei * @date 2021/9/27 20:34 * @description 消息生产者 */ public class Sender { public static void main(String[] args) { // 1.创建spring容器 ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml"); // 2.从容器中获取 rabbit模板对象 RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class); // 3.发送消息 Map<String, String> map = new HashMap<>(); map.put("name","小微"); map.put("email","15952037019@163.com"); rabbitTemplate.convertAndSend("msg.user",map); context.close(); } }
2.5.2 消费端工程
依赖与生产者一致
spring-rabbitmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!-- 1. 配置连接 --> <rabbit:connection-factory id="connectionFactory" host="192.168.204.141" port="5672" username="laosun" password="123123" virtual-host="/lagou" /> <!-- 2. 配置队列 --> <rabbit:queue name="test_spring_queue_1"/> <!-- 3.配置rabbitAdmin --> <rabbit:admin connection-factory="connectionFactory"/> <!-- 4.springIOC注解扫描包--> <context:component-scan base-package="listener"/> <!-- 5.配置监听 --> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="consumerListener" queuenames="test_spring_queue_1" /> </rabbit:listener-container> </beans>
消费者
MessageListener接口用于spring容器接收到消息后处理消息
如果需要使用自己定义的类型来实现 处理消息时,必须实现该接口,并重写onMessage()方法
当spring容器接收消息后,会自动交由onMessage进行处理
/** * @auther wei * @date 2021/9/28 11:47 * @description 消费者监听队列 */ @Component public class ConsumerListener implements MessageListener { // Jackson提供序列化和反序列化中使用最多的类,用来转换json的 private static final ObjectMapper MEPPER = new ObjectMapper(); @Override public void onMessage(Message message) { try { // 将message对象转换成json JsonNode jsonNode = MEPPER.readTree(message.getBody()); String name = jsonNode.get("name").asText(); JsonNode email = jsonNode.get("email"); System.out.println("从队列中获取:【" + name +"的邮箱是:" + email + "】"); } catch (Exception e) { e.printStackTrace(); } } }
启动项目
/** * @auther wei * @date 2021/9/28 12:10 * @description 运行项目 */ public class TestRunner { public static void main(String[] args) throws Exception { // 获得容器 ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-consumer.xml"); // 让程序一致运行,别终止 System.in.read(); } }
2.6 消息成功确认机制
在实际场景下,有的生产者发送的消息是必须保证成功发送到消息队列中,那么如何保证成功投递呢?
事务机制
发布确认机制
2.6.1 事务机制
AMQP协议提供的一种保证消息成功投递的方式,通过信道开启 transactional 模式
并利用信道 的三个方法来实现以事务方式 发送消息,若发送失败,通过异常处理回滚事务,确保消息成功投递
channel.txSelect(): 开启事务
channel.txCommit() :提交事务
channel.txRollback() :回滚事务
Spring已经对上面三个方法进行了封装,所以我们只能使用原始的代码演示
2.6.1.1 生产者
/** * @auther wei * @date 2021/9/27 14:23 * @description 消息生产者 */ public class Sender { public static void main(String[] args) throws Exception { // 1.获得连接 Connection connection = ConnectionUtil.getConnection(); // 2.从连接轴创建通道(信道) Channel channel = connection.createChannel(); // 3.声明路由(路由名,路由类型,持久化) // topic:模糊匹配的定向分发 channel.exchangeDeclare("test_transaction","topic"); // 4.开启事务 channel.txSelect(); try { // 5.向指定的队列发送消息(1,2,3,4) channel.basicPublish("test_transaction", "product.price", null, "商品1-降价".getBytes()); System.out.println(1 / 0); channel.basicPublish("test_transaction", "product.price", null, "商品2-降价".getBytes()); // 6.提交事务(一起成功) channel.txCommit(); System.out.println("[ 生产者 ]: 消息已全部发送!"); }catch (Exception e){ System.out.println("消息全部撤销!"); channel.txRollback(); // 事务回滚(一起失败) e.printStackTrace(); }finally { // 7.释放资源 channel.close(); connection.close(); } } }
2.6.1.2 消费者
/** * @auther wei * @date 2021/9/27 14:01 * @description 消息消费者 */ public class Recer { public static void main(String[] args) throws Exception { // 1.获得连接 Connection connection = ConnectionUtil.getConnection(); // 2.获得通道(信道) Channel channel = connection.createChannel(); // 3.声明队列(第二个参数为true:表示支持持久化) channel.queueDeclare("test_transaction_queue",false,false,false,null); // 4.绑定路由(绑定 用户相关 的消息) channel.queueBind("test_transaction_queue","test_transaction","product.#"); // 5.从信道中获得消息 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override // 交付处理(收件人信息,包裹上的快递标签,协议的配置,消息) public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body就是从队列中获取的消息 String s = new String(body); System.out.println("【消费者】 = " + s); } }; // 6.监听队列 true:自动消息确认 channel.basicConsume("test_transaction_queue",true,consumer); } }
2.6.2 Confirm发布确认机制
RabbitMQ为了保证消息的成功投递,采用通过AMQP协议层面为我们提供事务机制的方案,但是采用事务会大大降低消息的吞吐量
老孙我本机SSD硬盘测试结果10w条消息未开启事务,大约8s发送完毕;而开启了事务后,需要将近310s,差了30多倍。
接着老孙翻阅官网,发现官网中已标注
Using standard AMQP 0-9-1, the only way to guarantee that a message isn’t lost is by using transactions – make the channel transactional then for each message or set of messages publish, commit. In this case, transactions are unnecessarily heavyweight and decrease throughput by a factor of 250. To remedy this, a confirmation mechanism was introduced. It mimics the consumer acknowledgements mechanism already present in the protocol.
关键性译文:开启事务性能最大损失超过250倍
那么有没有更加高效的解决方式呢?答案就是采用Confirm模式。
事务效率为什么会这么低呢?试想一下:10条消息,前9条成功,如果第10条失败,那么9条消息要全部撤销回滚。太太太浪费
而confirm模式则采用补发第10条的措施来完成10条消息的送达
2.6.2.1 在spring中应用
spring-rabbitmq-producer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--1.配置连接工厂--> <rabbit:connection-factory id="connectionFactory" host="192.168.44.129" port="5672" username="wei" password="123123" virtual-host="/lagou" publisher-confirms="true"/> <!--2.配置队列--> <rabbit:queue name="test_spring_queue_1"/> <!--3.配置rabbitAdmin:主要用于在java代码中对队列的管理,用来创建,绑定,删除队列与交换机,发送消息等--> <rabbit:admin connection-factory="connectionFactory"/> <!--4.配置交换机,topic类型--> <rabbit:topic-exchange name="spring_topic_exchange"> <rabbit:bindings> <!--绑定队列--> <rabbit:binding pattern="msg.#" queue="test_spring_queue_1"></rabbit:binding> </rabbit:bindings> </rabbit:topic-exchange> <!--5.配置json转换的工具--> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/> <!--6.配置rabbitmq的模板--> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="spring_topic_exchange" message-converter="jsonMessageConverter" confirm-callback="messageConfirm"/> <!--7.配置确认机制处理类--> <bean id="messageConfirm" class="confirm.MessageConfirm"/> </beans>
消息确认处理类
/** * @auther wei * @date 2021/9/28 16:04 * @description 消息确认处理 */ public class MessageConfirm implements RabbitTemplate.ConfirmCallback { /** * @param correlationData 消息相关的数据对象(封装了消息的唯一id) * @param b 消息是否确认成功 * @param s 异常信息 */ @Override public void confirm(CorrelationData correlationData, boolean b, String s) { if (b == true){ System.out.println("消息确认成功!"); }else { System.out.println("xxxxx 消息确认失败 xxxxx"); //System.out.println(s); // 如果本条消息一定要发送到队列中,例如下订单消息,我们可以采用消息补发 // 1.采用递归(限定递归次数) // 2.redis+定时任务(jdk的timer,或者定时任务框架Quartz) } } }
log4j.properties
log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.Target=System.out log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n log4j.appender.file=org.apache.log4j.FileAppender log4j.appender.file.File=rabbitmq.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %l%m%n log4j.rootLogger=debug, stdout,file
发送消息
/** * @auther wei * @date 2021/9/27 20:34 * @description 消息生产者 */ public class Sender { public static void main(String[] args) { // 1.创建spring容器 ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml"); // 2.从容器中获取 rabbit模板对象 RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class); // 3.发送消息 Map<String, String> map = new HashMap<>(); map.put("name","吕布"); map.put("email","6666@163.com"); rabbitTemplate.convertAndSend("lalala","msg.user",map); System.out.println("消息已发送..."); context.close(); } }
2.7 消费端限流
在沙漠中行走,3天不喝水,突然喝水,如果使劲喝,容易猝死,要一口一口慢慢喝
我们 Rabbitmq 服务器积压了成千上万条未处理的消息,然后随便打开一个消费者客户端,就会出现这样的情况: 巨量的消息瞬间全部喷涌推送过来,但是单个客户端无法同时处理这么多数据, 就会被压垮崩溃
所以,当数据量特别大的时候,我们对生产端限流肯定是不科学的,因为有时候并发量就是特别大,有时候并发量又特别少,这是用户的行为,我们是无法约束的
所以我们应该对消费端限流,用于保持消费端的稳定
例如:汽车企业不停的生产汽车,4S店有好多库存车卖不出去,但是也不会降价处理,就是要保证市值的稳定,如果生产多少台,就卖多少台,不管价格的话,市场就乱了,所以我们要用不变的价格来稳住消费者购车,才能平稳发展
RabbitMQ 提供了一种 Qos (Quality of Service,服务质量)服务质量保证功能
即在非自动确认消息的前提下,如果一定数目的消息未被确认前,不再进行消费新的消息
生产者使用循环发出多条消息
public class Sender { public static void main(String[] args) { // 1.创建spring容器 ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml"); // 2.从容器中获取 rabbit模板对象 RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class); // 3.发送消息 Map<String, String> map = new HashMap<>(); map.put("name","吕布"); map.put("email","6666@163.com"); for (int i = 1; i <= 10; i++) { rabbitTemplate.convertAndSend("msg.user",map); System.out.println("消息已发送..."); } context.close(); } }
生产10条堆积未处理的消息
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DezMAGnH-1633005745150)(E:\MarkDown\拉勾笔记\RabbitMQ 消费端限流)]
消费者进行限流处理
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!--1.配置连接工厂--> <rabbit:connection-factory id="connectionFactory" host="192.168.44.129" port="5672" username="wei" password="123123" virtual-host="/lagou"/> <!--2.配置队列--> <rabbit:queue name="test_spring_queue_1"/> <!--3.配置rabbitAdmin:主要用于在java代码中对队列的管理,用来创建,绑定,删除队列与交换机,发送消息等--> <rabbit:admin connection-factory="connectionFactory"/> <!--4..springIOC注解扫描包--> <context:component-scan base-package="listener"/> <!--5.配置监听--> <!-- prefetch="3" 一次性消费的消息数量。会告诉 RabbitMQ 不要同时给一个消费者推送多于N 个消息,一旦有 N 个消息还没有ack,则该 consumer 将阻塞,直到消息被ack--> <!-- acknowledge-mode: manual 手动确认--> <rabbit:listener-container connection-factory="connectionFactory" prefetch="3" acknowledge="manual"> <rabbit:listener ref="consumerListener" queue-names="test_spring_queue_1"/> </rabbit:listener-container> </beans>
/** * @auther wei * @date 2021/9/28 11:47 * @description 消费者监听队列 * AbstractAdaptableMessageListener用于在spring容器接收到消息后用于处理消息的抽象基类 */ @Component public class ConsumerListener extends AbstractAdaptableMessageListener { // Jackson提供序列化和反序列化中使用最多的类,用来转换json的 private static final ObjectMapper MEPPER = new ObjectMapper(); @Override public void onMessage(Message message, Channel channel) throws Exception { try { // 将message对象转换成json JsonNode jsonNode = MEPPER.readTree(message.getBody()); String name = jsonNode.get("name").asText(); JsonNode email = jsonNode.get("email"); System.out.println("从队列中获取:【" + name +"的邮箱是:" + email + "】"); // 手动确认消息(参数1,参数2) /** * 参数1:RabbitMQ向该channel投递的这条消息的唯一标识ID,此ID是一个单调递增的正整数 * 参数2:为了减少网络的流量,手动确认可以被批量处理,当该参数为true时,则可以一次性确认小于等于msgId值的所有消息 */ long msgId = message.getMessageProperties().getDeliveryTag(); channel.basicAck(msgId,true); Thread.sleep(3000); System.out.println("休息3秒后,在继续接收消息!"); } catch (Exception e) { e.printStackTrace(); } } }
每次确认接收3条消息
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zddw71OK-1633005745151)(E:\MarkDown\拉勾笔记\RabbitMQ 消费端限流02)]
2.8 过期时间TTL
Time To Live:生存时间、还能活多久,单位毫秒
在这个周期内,消息可以被消费者正常消费,超过这个时间,则自动删除(其实是被称为dead message并投入到死信队列,无法消费该消息)
RabbitMQ可以对消息和队列设置TTL
通过队列设置,队列中所有消息都有相同的过期时间
对消息单独设置,每条消息的TTL可以不同(更颗粒化)
2.8.1 设置队列TTL
spring-rabbitmq-producer.xml
<!--2.重新配置一个队列,同时,对队列中的消息设置过期时间--> <rabbit:queue name="test_spring_queue_ttl" auto-declare="true"> <rabbit:queue-arguments> <entry key="x-message-ttl" value-type="long" value="5000"/> </rabbit:queue-arguments> </rabbit:queue>
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-CO9Lb5jT-1633005745153)(E:\MarkDown\拉勾笔记\RabbitMQ 过期时间TTL-设置队列TTL)]
5秒之后,消息自动删除
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Eu0GA1DD-1633005745154)(E:\MarkDown\拉勾笔记\RabbitMQ 过期时间TTL-设置队列TTL02)]
2.8.2 设置消息TTL
设置某条消息的ttl,只需要在创建发送消息时指定即可
<!--2.配置队列--> <rabbit:queue name="test_spring_queue_ttl_2">
public class Sender2 { public static void main(String[] args) { // 1.创建spring容器 ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml"); // 2.从容器中获取 rabbit模板对象 RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class); // 3.创建消息的配置对象 MessageProperties properties = new MessageProperties(); // 4.设置过期时间3秒 properties.setExpiration("3000"); // 5.创建消息 Message message = new Message("测试过期时间".getBytes(),properties); // 6.发送消息 rabbitTemplate.convertAndSend("msg.user",message); System.out.println("消息已发送..."); context.close(); } }
如果同时设置了queue和message的TTL值,则二者中较小的才会起作用
2.9 死信队列
DLX(Dead Letter Exchanges)死信交换机/死信邮箱,当消息在队列中由于某些原因没有被及时消费而变成死信(dead message)后,这些消息就会被分发到DLX交换机中,而绑定DLX交换机的队列,称之为:“死信队列”
消息没有被及时消费的原因:
消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false
消息超时未消费
达到最大队列长度
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PZdJ56aK-1633005745156)(E:\MarkDown\拉勾笔记\RabbitMQ 死信队列)]
spring-rabbitmq-producer-dlx.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!--1.配置连接工厂--> <rabbit:connection-factory id="connectionFactory" host="192.168.44.129" port="5672" username="wei" password="123123" virtual-host="/lagou" publisher-confirms="true"/> <!--3.配置rabbitAdmin:主要用于在java代码中对队列的管理,用来创建,绑定,删除队列与交换机,发送消息等--> <rabbit:admin connection-factory="connectionFactory"/> <!--6.配置rabbitmq的模板--> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="my_exchange"/> <!--####################################################################################--> <!--声明死信队列--> <rabbit:queue name="dlx_queue"/> <!--声明定向的死信交换机--> <rabbit:direct-exchange name="dlx_exchange"> <rabbit:bindings> <rabbit:binding key="dlx_ttl" queue="dlx_queue"/> <rabbit:binding key="dlx_max" queue="dlx_queue"/> </rabbit:bindings> </rabbit:direct-exchange> <!--声明定向的测试消息的交换机--> <rabbit:direct-exchange name="my_exchange"> <rabbit:bindings> <rabbit:binding key="dlx_ttl" queue="test_ttl_queue"/> <rabbit:binding key="dlx_max" queue="test_max_queue"/> </rabbit:bindings> </rabbit:direct-exchange> <!--声明测试过期的消息队列--> <rabbit:queue name="test_ttl_queue"> <rabbit:queue-arguments> <!--1.设置队列的过期时间TTL--> <entry key="x-message-ttl" value-type="long" value="6000"/> <!--2.消息如果超时,将消息投递给 死信交换机--> <entry key="x-dead-letter-exchange" value="dlx_exchange"/> </rabbit:queue-arguments> </rabbit:queue> <!--声明测试超出长度的消息队列--> <rabbit:queue name="test_max_queue"> <rabbit:queue-arguments> <!--1.设置队列额定长度(本队列最多装2个消息)--> <entry key="x-max-length" value-type="long" value="2"/> <!--2.消息如果超除长度,将消息投递给 死信交换机--> <entry key="x-dead-letter-exchange" value="dlx_exchange"/> </rabbit:queue-arguments> </rabbit:queue> </beans>
发消息进行测试
public class SenderDLX { public static void main(String[] args) { // 1.创建spring容器 ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer-dlx.xml"); // 2.从容器中获取 rabbit模板对象 RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class); // 3.发送消息 //rabbitTemplate.convertAndSend("dlx_ttl","测试超时".getBytes()); rabbitTemplate.convertAndSend("dlx_max","测试长度1".getBytes()); rabbitTemplate.convertAndSend("dlx_max","测试长度2".getBytes()); rabbitTemplate.convertAndSend("dlx_max","测试长度3".getBytes()); System.out.println("消息已发送..."); context.close(); } }
2.10 延迟队列
延迟队列:TTL + 死信队列的合体
死信队列只是一种特殊的队列,里面的消息仍然可以消费
在电商开发部分中,都会涉及到延时关闭订单,此时延迟队列正好可以解决这个问题
2.10.1 生产者
沿用上面死信队列案例的超时测试,超时时间改为订单关闭时间即可
public class SenderDLX { public static void main(String[] args) { // 1.创建spring容器 ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer-dlx.xml"); // 2.从容器中获取 rabbit模板对象 RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class); // 3.发送消息 rabbitTemplate.convertAndSend("dlx_ttl","超时,关闭订单".getBytes()); System.out.println("消息已发送..."); context.close(); } }
2.10.2 消费者
spring-rabbitmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!--1.配置连接工厂--> <rabbit:connection-factory id="connectionFactory" host="192.168.44.129" port="5672" username="wei" password="123123" virtual-host="/lagou"/> <!--3.配置rabbitAdmin:主要用于在java代码中对队列的管理,用来创建,绑定,删除队列与交换机,发送消息等--> <rabbit:admin connection-factory="connectionFactory"/> <!--4..springIOC注解扫描包--> <context:component-scan base-package="listener"/> <!--5.监听死信队列--> <rabbit:listener-container connection-factory="connectionFactory" prefetch="3" acknowledge="manual"> <rabbit:listener ref="consumerListener" queue-names="dlx_queue" /> </rabbit:listener-container> </beans>
@Component public class ConsumerListener extends AbstractAdaptableMessageListener { // Jackson提供序列化和反序列化中使用最多的类,用来转换json的 private static final ObjectMapper MEPPER = new ObjectMapper(); @Override public void onMessage(Message message, Channel channel) throws Exception { try { String str = new String(message.getBody()); System.out.println("str = " + str); // 手动确认消息(参数1,参数2) /** * 参数1:RabbitMQ向该channel投递的这条消息的唯一标识ID,此ID是一个单调递增的正整数 * 参数2:为了减少网络的流量,手动确认可以被批量处理,当该参数为true时,则可以一次性确认小于等于msgId值的所有消息 */ /* long msgId = message.getMessageProperties().getDeliveryTag(); channel.basicAck(msgId,true); Thread.sleep(3000); System.out.println("休息3秒后,在继续接收消息!"); */ } catch (Exception e) { e.printStackTrace(); } } }
3.RabbitMQ集群
rabbitmq有3种模式,但集群模式是2种。详细如下:
单一模式:即单机情况不做集群,就单独运行一个rabbitmq而已。之前我们一直在用
普通模式:默认模式,以两个节点(A、B)为例来进行说明
当消息进入A节点的Queue后,consumer从B节点消费时,RabbitMQ会在A和B之间创建临时通道进行消息传输,把A中的消息实体取出并经过通道交给B发送给consumer
当A故障后,B就无法取到A节点中未消费的消息实体
如果做了消息持久化,那么得等A节点恢复,然后才可被消费
如果没有持久化的话,就会产生消息丢失的现象
镜像模式:非常经典的 mirror 镜像模式,保证 100% 数据不丢失。
高可靠性解决方案,主要就是实现数据的同步,一般来讲是 2 - 3 个节点实现数据同步
对于 100% 数据可靠性解决方案,一般是采用 3 个节点。
在实际工作中也是用得最多的,并且实现非常的简单,一般互联网大厂都会构建这种镜像集群模式
还有主备模式,远程模式,多活模式等,本次课程不作为重点,可自行查阅资料了解
3.1 集群搭建
前置条件:准备两台linux,并安装好rabbitmq
集群步骤如下:
1.修改 /etc/hosts 映射文件
1号服务器:
127.0.0.1 A localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 A localhost localhost.localdomain localhost6 localhost6.localdomain6 192.168.44.129 A 192.168.44.130 B
2号服务器:
127.0.0.1 B localhost localhost.localdomain localhost4 localhost4.localdomain4 ::1 B localhost localhost.localdomain localhost6 localhost6.localdomain6 192.168.44.129 A 192.168.44.130 B
修改hosts文件,要重启服务器,reboot
2.相互通信,cookie必须保持一致,同步 rabbitmq的cookie 文件:跨服务器拷贝 .erlang.cookie (隐藏文件,使用 ls -all 显示)
[root@A opt]# scp /var/lib/rabbitmq/.erlang.cookie 192.168.44.130:/var/lib/rabbitmq
修改cookie文件,要重启服务器,reboot
3.停止防火墙,启动rabbitmq服务
[root@A ~]# systemctl stop firewalld [root@A ~]# systemctl start rabbitmq-server
4.加入集群节点
[root@B ~]# rabbitmqctl stop_app [root@B ~]# rabbitmqctl join_cluster rabbit@A [root@B ~]# rabbitmqctl start_app
5.查看节点状态
[root@B ~]# rabbitmqctl cluster_status
6.查看管理端
搭建集群结构之后,之前创建的交换机、队列、用户都属于单一结构,在新的集群环境中是不能用的
所以在新的集群中重新手动添加用户即可(任意节点添加,所有节点共享)
[root@A ~]# rabbitmqctl add_user laosun 123123 [root@A ~]# rabbitmqctl set_user_tags laosun administrator [root@A ~]# rabbitmqctl set_permissions -p "/" laosun ".*" ".*" ".*"
注意:当节点脱离集群还原成单一结构后,交换机,队列和用户等数据都会重新回来
此时,集群搭建完毕,但是默认采用的模式“普通模式”,可靠性不高
3.2 镜像模式
将所有队列设置为镜像队列,即队列会被复制到各个节点,各个节点状态一致
语法:set_policy {name} {pattern} {definition}
name:策略名,可自定义
pattern:队列的匹配模式(正则表达式)
“^” 可以使用正则表达式,比如"^queue_" 表示对队列名称以“queue_”开头的所有队列进行镜像,而"^"表示匹配所有的队列
definition:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode
ha-mode:(High Available,高可用)模式,指明镜像队列的模式,有效值为 all/exactly/nodes,当前策略模式为 all,即复制到所有节点,包含新增节点
all:表示在集群中所有的节点上进行镜像
exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定
ha-params:ha-mode模式需要用到的参数
ha-sync-mode:进行队列中消息的同步方式,有效值为automatic和manual
[root@A ~]# rabbitmqctl set_policy xall "^" '{"ha-mode":"all"}'
通过管理端设置镜像策略
3.3 HAProxy实现镜像队列的负载均衡
虽然我们在程序中访问A服务器,可以实现消息的同步,虽然在同步,但都是A服务器在接收消息,A太累
是否可以像Nginx一样,做负载均衡,A和B轮流接收消息,再镜像同步
3.3.1 HAProxy简介
HA(High Available,高可用),Proxy(代理)
HAProxy是一款提供高可用性,负载均衡,并且基于TCP和HTTP应用的代理软件
HAProxy完全免费
HAProxy可以支持数以万计的并发连接
HAProxy可以简单又安全的整合进架构中,同时还保护web服务器不被暴露到网络上
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Yw0rbCxi-1633005745158)(E:\MarkDown\拉勾笔记\RabbitMQ HAProxy简介)]
3.3.2 HAProxy与Nginx
OSI:(Open System Interconnection:开放式系统互联 是把网络通信的工作分为7层,分别是物理层,数据链路层,网络层,传输层,会话层,表示层和应用层)
Nginx的优点:
工作在OSI第7层,可以针对http应用做一些分流的策略
Nginx对网络的依赖非常小,理论上能ping通就就能进行负载功能,屹立至今的绝对优势
Nginx安装和配置比较简单,测试起来比较方便;
Nginx不仅仅是一款优秀的负载均衡器/反向代理软件,它同时也是功能强大的Web应用服务器
HAProxy的优点:
工作在网络4层和7层,支持TCP与Http协议
它仅仅就只是一款负载均衡软件;单纯从效率上来讲HAProxy更会比Nginx有更出色的负载均衡速度,在并发处理上也是优于Nginx的
支持8种负载均衡策略 ,支持心跳检测
性能上HA胜,功能性和便利性上Nginx胜
对于Http协议,Haproxy处理效率比Nginx高。所以,没有特殊要求的时候或者一般场景,建议使用Haproxy来做Http协议负载
但如果是Web应用,那么建议使用Nginx!
总之,大家可以结合各自使用场景的特点来进行合理地选择
3.3.3 安装和配置
HAProxy下载:http://www.haproxy.org/download/1.8/src/haproxy-1.8.12.tar.gz
解压
[root@localhost opt]# tar -zxvf haproxy-1.8.12.tar.gz
make时需要使用 TARGET 指定内核及版本
[root@localhost opt]# uname -r 3.10.0-229.el7.x86_64
根据内核版本选择编译参数:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-b2kCZhAX-1633005745159)(E:\MarkDown\拉勾笔记\RabbitMQ HAProxy安装)]
进入目录,编译和安装
[root@localhost opt]# cd haproxy-1.8.12 [root@localhost haproxy-1.8.12]# make TARGET=linux2628 PREFIX=/usr/local/haproxy [root@localhost haproxy-1.8.12]# make install PREFIX=/usr/local/haproxy
安装成功后,查看版本
[root@localhost haproxy-1.8.12]# /usr/local/haproxy/sbin/haproxy -v
配置启动文件,复制haproxy文件到/usr/sbin下 ,复制haproxy脚本,到/etc/init.d下
[root@localhost haproxy-1.8.12]# cp /usr/local/haproxy/sbin/haproxy /usr/sbin/ [root@localhost haproxy-1.8.12]# cp ./examples/haproxy.init /etc/init.d/haproxy [root@localhost haproxy-1.8.12]# chmod 755 /etc/init.d/haproxy
创建系统账号
[root@localhost haproxy-1.8.12]# useradd -r haproxy
haproxy.cfg 配置文件需要自行创建
[root@localhost haproxy-1.8.12]# mkdir /etc/haproxy [root@localhost haproxy-1.8.12]# vim /etc/haproxy/haproxy.cfg
添加配置信息到haproxy.cfg
#全局配置 global #设置日志 log 127.0.0.1 local0 info #当前工作目录 chroot /usr/local/haproxy #用户与用户组 user haproxy group haproxy #运行进程ID uid 99 gid 99 #守护进程启动 daemon #最大连接数 maxconn 4096 #默认配置 defaults #应用全局的日志配置 log global #默认的模式mode {tcp|http|health},TCP是4层,HTTP是7层,health只返回OK mode tcp #日志类别tcplog option tcplog #不记录健康检查日志信息 option dontlognull #3次失败则认为服务不可用 retries 3 #每个进程可用的最大连接数 maxconn 2000 #连接超时 timeout connect 5s #客户端超时30秒,ha就会发起重新连接 timeout client 30s #服务端超时15秒,ha就会发起重新连接 timeout server 15s #绑定配置 listen rabbitmq_cluster bind 192.168.44.131:5672 #配置TCP模式 mode tcp #简单的轮询 balance roundrobin #RabbitMQ集群节点配置,每隔5秒对mq集群做检查,2次正确证明服务可用,3次失败证明服务不可用 server A 192.168.44.129:5672 check inter 5000 rise 2 fall 3 server B 192.168.44.130:5672 check inter 5000 rise 2 fall 3 #haproxy监控页面地址 listen monitor bind 192.168.44.131:8100 mode http option httplog stats enable # 监控页面地址 http://192.168.44.131:8100/monitor stats uri /monitor stats refresh 5s
启动HAProxy
[root@localhost haproxy]# service haproxy start
访问监控中心:http://192.168.44.131:8100/monitor
记得关闭防火墙: systemctl stop firewalld
项目发消息,只需要将服务器地址修改为131即可,其余不变
所有的请求都会交给HAProxy,其负载均衡给每个rabbitmq服务器
3.4 KeepAlived搭建高可用的HAProxy集群
现在的最后一个问题暴露出来了,如果HAProxy服务器宕机,rabbitmq服务器就不可用了。所以我们需要对HAProxy也要做高可用的集群
3.4.1 概述
Keepalived是Linux下一个轻量级别的高可用热备解决方案
Keepalived的作用是检测服务器的状态,它根据TCP/IP参考模型的第三、第四层、第五层交换机制检测每个服务节点的状态,如果有一台web服务器宕机,或工作出现故障,Keepalived将检测到,并将有故障的服务器从系统中剔除,同时使用其他服务器代替该服务器的工作,当服务器工作正常后Keepalived自动将服务器加入到服务器群中,这些工作全部自动完成,不需要人工干涉, 需要人工做的只是修复故障的服务器。
keepalived基于vrrp(Virtual Router Redundancy Protocol,虚拟路由冗余协议)协议,vrrp它是一种主备(主机和备用机)模式的协议,通过VRRP可以在网络发生故障时透明的进行设备切换而不影响主机之间的数据通信
两台主机之间生成一个虚拟的ip,我们称漂移ip,漂移ip由主服务器承担,一但主服务器宕机,备份服务器就会抢夺漂移ip,继续工作,有效的解决了群集中的单点故障
说白了,将多台路由器设备虚拟成一个设备,对外提供统一ip(VIP)
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-E3TbasNI-1633005745161)(E:\MarkDown\拉勾笔记\RabbitMQ KeepAlived搭建高可用的HAProxy集群)]
3.4.2 安装KeepAlived
修改hosts文件的地址映射
ip | 用途 | 主机名 |
---|---|---|
192.168.44.131 | KeepAlived HAProxy | C |
192.168.44.132 | KeepAlived HAProxy | D |
安装 keepalived
[root@C ~]# yum install -y keepalived
修改配置文件(内容大改,不如删掉,重新创建)
[root@C ~]# rm -rf /etc/keepalived/keepalived.conf [root@C ~]# vim /etc/keepalived/keepalived.conf
! Configuration File for keepalived global_defs { router_id C ## 非常重要,标识本机的hostname } vrrp_script chk_haproxy{ script "/etc/keepalived/haproxy_check.sh" ## 执行的脚本位置 interval 2 ## 检测时间间隔 weight -20 ## 如果条件成立则权重减20 } vrrp_instance VI_1 { state MASTER ## 非常重要,标识主机,备用机132改为 BACKUP interface eno16777736 ## 非常重要,网卡名(ifconfig查看) virtual_router_id 66 ## 非常重要,自定义,虚拟路由ID号(主备节点要相同) priority 100 ## 优先级(0-254),一般主机的大于备机 advert_int 1 ## 主备信息发送间隔,两个节点必须一致,默认1秒 authentication { ## 认证匹配,设置认证类型和密码,MASTER和BACKUP必须使用相同的密码才能正常通信 auth_type PASS auth_pass 1111 } track_script { chk_haproxy ## 检查haproxy健康状况的脚本 } virtual_ipaddress { ## 简称“VIP” 192.168.44.66/24 ## 非常重要,虚拟ip,可以指定多个,以后连接mq就用这个虚拟ip } } virtual_server 192.168.44.66 5672 { ## 虚拟ip的详细配置 delay_loop 6 # 健康检查间隔,单位为秒 lb_algo rr # lvs调度算法rr|wrr|lc|wlc|lblc|sh|dh lb_kind NAT # 负载均衡转发规则。一般包括DR,NAT,TUN 3种 protocol TCP # 转发协议,有TCP和UDP两种,一般用TCP real_server 192.168.44.131 5672 { ## 本机的真实ip weight 1 # 默认为1,0为失效 } }
创建执行脚本 /etc/keepalived/haproxy_check.sh
#!/bin/bash COUNT=`ps -C haproxy --no-header |wc -l` if [ $COUNT -eq 0 ];then /usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg sleep 2 if [ `ps -C haproxy --no-header |wc -l` -eq 0 ];then killall keepalived fi fi
Keepalived 组之间的心跳检查并不能察觉到 HAproxy 负载是否正常,所以需要使用此脚本。
在 Keepalived 主机上,开启此脚本检测 HAproxy 是否正常工作,如正常工作,记录日志。
如进程不存在,则尝试重启 HAproxy ,2秒后检测,如果还没有,则关掉主 Keepalived ,此时备 Keepalived 检测到主 Keepalived 挂掉,接管VIP,继续服务
授权,否则不能执行
[root@C etc]# chmod +x /etc/keepalived/haproxy_check.sh
启动keepalived(两台都启动)
[root@C etc]# systemctl stop firewalld [root@C etc]# service keepalived start | stop | status | restart
查看状态
[root@C etc]# ps -ef | grep haproxy [root@C etc]# ps -ef | grep keepalived
查看ip情况 ip addr 或 ip a
[root@C etc]# ip a
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wVwK8Vhl-1633005745162)(E:\MarkDown\拉勾笔记\RabbitMQ KeepAlived搭建高可用的HAProxy集群02)]
此时,安装完毕,按照上面的步骤就可以安装第二台了(服务器hostname和ip注意要修改)
常见的网络错误:子网掩码、网关等信息要一致
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sUxVez4G-1633005745163)(E:\MarkDown\拉勾笔记\RabbitMQ KeepAlived搭建高可用的HAProxy集群03)]
3.4.3 测试ip漂移的规则
查看虚拟ip ip addr 或 ip a
目前,C节点是主机,所以虚拟ip在C节点
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0vAv81QQ-1633005745166)(E:\MarkDown\拉勾笔记\RabbitMQ KeepAlived搭建高可用的HAProxy集群-测试ip漂移的规则)]
停止C的keepalived,虚拟ip漂移到D节点
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2bZDwZxk-1633005745167)(E:\MarkDown\拉勾笔记\RabbitMQ KeepAlived搭建高可用的HAProxy集群-测试ip漂移的规则02)]
重新启动C节点keepalived,虚拟ip依旧在D节点,并不会由于C的回归而回归
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fzgcE2wd-1633005745168)(E:\MarkDown\拉勾笔记\RabbitMQ KeepAlived搭建高可用的HAProxy集群-测试ip漂移的规则03)]
停止D的keepalived,虚拟ip再漂移回C节点
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8NOq7L4O-1633005745170)(E:\MarkDown\拉勾笔记\RabbitMQ KeepAlived搭建高可用的HAProxy集群-测试ip漂移的规则04)]
这篇关于分布式技术(下)-Redis&FastDFS&RabbitMQ的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-12-27阿里云Redis学习入门指南
- 2024-12-27阿里云Redis入门详解:轻松搭建与管理
- 2024-12-27阿里云Redis学习:新手入门指南
- 2024-12-24Redis资料:新手入门快速指南
- 2024-12-24Redis资料:新手入门教程与实践指南
- 2024-12-24Redis资料:新手入门教程与实践指南
- 2024-12-07Redis高并发入门详解
- 2024-12-07Redis缓存入门:新手必读指南
- 2024-12-07Redis缓存入门:新手必读教程
- 2024-12-07Redis入门:新手必备的简单教程