简介
是一个基于内存的
Redis
结构的
key-value
数据库
NoSQL
基于内存存储,读写性能高适合存储热点数据,例如访问量大的数据
是纯内存操作,执行速度非常快采用单线程,避免不必要的上下文切换可竞争条件,多线程还要考虑线程安全问题使用
Redis
多路复用模型,非阻塞
I/O
IO
是纯内存操作,执行速度非常快,它的性能瓶颈是网络延迟而不是执行速度,
Redis
多路复用模型主要就是实现了高效的网络请求
IO
多路复用
I/O
用户空间和内核空间:
系统中一个进程使用的内存情况划分两部分:内核空间、用户空间。
Linux
用户空间只能执行受限的命令(
),而且不能直接调用系统资源,必须通过内核提供的接口来访问内核空间可以执行特权命令(
Ring3
),调用一切系统资源
Ring0
系统为了提高
Linux
效率,会在用户空间和内核空间都加入缓冲区
IO
写数据时,要把用户缓冲数据拷贝到内核缓冲区,然后写入设备读数据时,要从设备读取数据到内核缓冲区,然后拷贝到用户缓冲区
阻塞
:两个阶段都必须阻塞等待
IO
- 阶段一: * 用户进程尝试读取数据(比如网卡数据) * 此时数据尚未到达,内核需要等待数据 * 此时用户进程也处于阻塞状态 - 阶段二: * 数据到达并拷贝到内核缓冲区,代表已就绪 * 将内核数据拷贝到用户缓冲区 * 拷贝过程中,用户进程依然阻塞等待 * 拷贝完成,用户进程解除阻塞,处理数据 - 在阻塞IO模型中,用户进程在两个阶段都是阻塞状态
12345678910
非阻塞
:**非阻塞
IO
的
IO
操作会立即返回结果而不是阻塞用户进程
recvfrom
- 阶段一: * 用户进程尝试读取数据(比如网卡数据)此时数据尚未到达,内核需要等待数据 * 返回异常给用户进程 * 用户进程拿到error后,再次尝试读取 * 循环往复,直到数据就绪 - 阶段二: * 将内核数据拷贝到用户缓冲区 * 拷贝过程中,用户进程依然阻塞等待 * 拷贝完成,用户进程解除阻塞,处理数据 - 非阻塞IO模型中,用户进程在第一个阶段是非阻塞,第二个阶段是阻塞状态。虽然是非阻塞,但性能并没有得到提高。而且忙等机制会导致CPU空转,CPU使用率暴增。
12345678910
多路复用:利用单个线程来同时监听多个
IO
,并在某个
Socket
可读、可写时得到通知,从而避免无效的等待,充分利用
Socket
资源
CPU
- 阶段一: * 用户进程调用select,指定要监听的Socket集合内核,监听对应的多个socket * 任意一个或多个socket数据就绪则返回readable * 此过程中用户进程阻塞 - 阶段二: * 用户进程找到就绪的socket * 依次调用recvfrom读取数据 * 内核将数据拷贝到用户空间 * 用户进程处理数据 - IO多路复用是利用单个线程来同时监听多个Socket,并在某个Socket可读、可写时得到通知,从而避免无效的等待充分利用CPU资源。不过监听Socket的方式、通知的方式又有多种实现,常见的有:select、poll、epoll * select和poIl只会通知用户进程有Socket就绪,但不确定具体是哪个Socket,需要用户进程逐个遍历Socket来确认 * epoll则会在通知用户进程Socket就绪的同时,把已就绪的Socket写入用户空间
123456789101112
网络模型
Redis
通过
Redis
多路复用来提高网络性能,并支持各种不同的多路复用实现,并将这些实现进行封装,提供了统一的高性能事件库
IO
数据类型
是一个
Redis
的数据库,
key-value
一般是
key
类型,不过
String
的类型多种多样
value
字符串String
字符串(
):普通字符串,
String
中最简单的数据类型,采用最基本的
Redis
结构
key-value
不用
语言字符串的原因:
C
复杂度获取长度、没有较好的扩容机制、特殊字符无法处理
O(n)
通过结构体
Redis
(简单动态字符串)记录长度和类型
SDS
可存储更复杂的二进制数据,其使用
SDS
属性的值而非空字符来判断字符串是否结束
len
获取长度时间复杂度
SDS
,减少字符串扩容引起的数据搬运次数
O(1)
的
SDS
拼接字符串不会造成缓冲区溢出。其会检查
API
的空间,不够会自动扩容
SDS
存储
若字符串对象保存整数值,且可以用
类型标识,则将整数值保存在字符串对象结构的
long
属性里面,并将字符串的编码设置为
ptr
若字符串对象保存的是一个字符串,且长度小于等于
int
字节,则会使用
32
保存,且对象编码会设置为
SDS
若字符串对象保存的是一个字符串,且长度大于
embstr
字节,则会使用
32
保存,且对象编码会设置为
SDS
raw
常用命令
设置指定
SET key value
的值
key
可以用于在分布式的情况下存储
,解决重复登录问题
Session
获取指定
GET key
的值
key
设置指定
SETEX key seconds value
的值,并将
key
的过期时间设为
key
秒
seconds
只有在
SETNX key value
不存在时设置
key
的值
key
可以用该命令实现分布式锁,若
不存在,则显示插入成功,可以用来表示加锁成功,若
key
存在,则会显示插入失败,可以用来表示加锁失败解锁要判断是否是持有锁的客户端进行操作,因此有两步操作(判断、解锁),通过
key
脚本保证原子性
Lua
批量添加多个
MSET key1 value1 key2 value2
类型的键值对
String
根据多个
MGET key1 key2 key3
获取多个
key
类型的
String
value
让一个
INCR key
对应的整型
key
自增
value
1
让一个
INCRBY key num
对应的整型
key
自增并指定步长
value
num
让一个浮点类型的数字自增并指定步长
INCRBYFLOAT key num
和
SET: 如果
GET不存在则是新增,如果存在则是修改
key
哈希(Hash)
哈希(
):也叫散列,类似于
hash
中的
Java
结构,
HashMap
保存的就是键值对
value
采用压缩列表或哈希表作为哈希表的实现
Redis
如果哈希类型的元素个数小于阈值(默认
),每个元素值小于阈值(默认
512
字节),则采用压缩列表实现否则采用哈希表
64
在
中,压缩列表数据结构已经废弃,交由
Redis 7.0数据结构来实现
listpack
常用命令
将哈希表
HSET key field value
中的字段
key
的值设为
field
value
获取存储在哈希表中指定字段的值
HGET key field
删除存储在哈希表中的指定字段
HDEL key field
获取哈希表中所有
HKEYS key
field
获取哈希表中所有
HVALS key
value
批量添加多个
HMSET key field1 value1 field2 value2
类型
hash
的
key
的值
field
批量获取多个
HMGET key field1 field2
类型
hash
的
key
的值
field
获取
HGETALL key
中的
key
的所有的
hash
和
field
value
让一个
HINCRBY key field num
的
hash
的字段值自增指定步长
filed
num
添加
HSETNX key field value
类型
hash
值,前提这个
field
不存在
field
链表(List)
链表(
):按照插入顺序排序,可以有重复元素,类似于
list
中的
Java
LinkedList
底层实现是双向链表或压缩列表
如果列表的元素个数小于阈值(默认
),每个元素值小于阈值(默认
512
字节),则采用压缩列表实现否则采用双向链表
64
在
版本后,
Redis 3.2数据类型底层数据结构只由
List实现了,替代了双向链表和压缩列表。
quicklist
常用命令
将一个或多个值插入到列表头部
LPUSH key value1 [value2]
向列表右侧插入一个或多个元素
RPUSH key value1 [value2]
获取列表指定范围内的元素
LRANGE key start stop
移除并获取列表最后一个元素
RPOP key
移除并返回列表左侧的第一个元素,没有则返回
LPOP key
null
获取列表长度
LLEN key
与
BLPOP和BRPOP
和
LPOP
类似,在没有元素时等待指定时间
RPOP
可以用于实现消息队列,其满足以下几点
满足消息保序需求重复消息:通过设置消息
,进行检查
ID
来处理重复消息可靠性:可以采用
ID
命令,让消费者程序从一个
BRPOPLPUSH
中读取消息,同时,
List
会把这个消息再插入到另一个备份
Redis
,通过读取备份
List
保证消息的可靠性
List
不支持多个消费者消费同一条消息,一旦消费者拉取一条消息后,这条消息就从
List中删除
List
集合(Set)
集合(
):无序集合,没有重复元素,类似于
set
中的
Java
Hashset
如果集合中的元素都是整数且元素个数小于 阈值(默认
),
512
会使用整数集合作为
Redis
类型的底层数据结构否则使用哈希表作为
Set
类型的底层数据结构
Set
常用命令
向集合添加一个或多个成员
SADD key member1[member2]
可以用于点赞等唯一操作
返回集合中的所有成员
SMEMBERS key
获取集合的成员数
SCARD key
返回给定所有集合的交集
SINTER key1 [key2]
可以用于获取沟通关注等
返回所有给定集合的并集
SUNION key1 [key2]
求
SDIFF key1 [key2]
与
key1
的差集
key2
删除集合中一个或多个成员
SREM key member1 [member2]
判断一个元素是否存在于
SISMEMBER key member
中
set
有序集合(Zset)
有序集合(
):集合中每个元素关联一个分数(
sorted set/zset
),根据分数升序排序,并且没有重复元素
score
类型的底层数据结构是由压缩列表或跳表实现
set
如果有序集合的元素个数小于
个,并且每个元素的值小于
128
字节时,
64
会使用压缩列表作为
Redis
类型的底层数据结构否则采用跳表实现
Zset
在 Redis 7.0 中,压缩列表数据结构已经废弃了,交由
数据结构来实现
listpack
常用命令
向有序集合添加一个或多个成员
ZADD key score1 member1 [score2 member2]
移除有序集合中的一个或多个成员
ZREM key member [member ...]
通过索引区间返回有序集合中指定区间的成员
ZRANGE key start stop [WITHSCORES]
有序集合中对指定成员分数加上
ZINCRBY key increment member
increment
获取
ZSCORE key member
中的指定元素的
sorted set
值
score
获取
ZRANK key member
中的指定元素的排名
sorted set
获取
ZCARD key
中的元素个数
sorted set
统计
ZCOUNT key min max
值在给定范围内的所有元素的个数
score
求差集.交集.并集
ZDIFF.ZINTER.ZUNION
所有的排名默认都是升序,如果要降序则在命令的Z后面添加REV即可
位图(bitMap)
位图是利用数据中每一个bit位作为一种状态映射,可以利用极小的空间来实现大量数据的表示。
例如,可以用位图来记录用户一个月的签到情况。
中利用
Redis
类型结构实现
String
,其最大上限
BitMap
,转换为
512M
是
bit
个
2^32
位。
bit
类型是会保存为二进制的字节数组,所以,
String
就把字节数组的每个 bit 位利用起来,用来表示一个元素的二值状态,可以把
Redis
看作是一个
Bitmap
数组
bit
常用命令
向指定位置(
SETBIT key offset value
)存入一个
offset
或
0
1
获取指定位置(
GETBIT key offset
)的
offset
值
bit
统计
BITCOUNT key start end
中指定范围值为
BitMap
的
1
位的数量
bit
将多个
BITOP
的结果做位运算(与 、或、异或)
BitMap
查找
BITPOS
数组指定范围内第一个
bit
或
0
出现的位置
1
:操作(查询、修改、自增)
BITFIELD
中
BitMap
数组中的指定位置(
bit
)的值
offset
:获取
BITFIELD_RO
中
BitMap
数组,并以十进制形式返回
bit
GEO地理数据结构
本身没有设计新的底层数据结构,而直接使用
GEO
集合类型。
Sorted Set
类型使用
GEO
编码方法实现了经纬度到
GeoHash
中元素权重分数的转换,这其中的两个关键机制就是对二维地图做区间划分和对区间进行编码。一组经纬度落在某个区间后,就用区间的编码值来表示,并把编码值作为
Sorted Set
元素的权重分数。
Sorted Set
常用命令
:添加一个地理空间信息,包含:经度
GEOADD
、纬度
longitude
、值
latitude
member
:计算指定的两个点之间的距离并返回
GEODIST
:将指定
GEOHASH
的坐标转为
member
字符串形式并返回
hash
:返回指定
GEOPOS
的坐标
member
:指定圆心、半径,找到该圆内包含的所有
GEORADIUS
,并按照与圆心之间的距离排序后返回。
member
以后已废弃
6.
:在指定范围内搜索
GEOSEARCH
,并按照与指定点之间的距离排序后返回。范围可以是圆形或矩形。
member
新功能
6.2
:与
GEOSEARCHSTORE
功能一致,不过可以把结果存储到一个指定的
GEOSEARCH
。
key
Stream
专门为消息队列设计的数据类型,其支持消息的持久化、支持自动生成全局唯一
Redis
、支持
ID
确认消息的模式、支持消费组模式等,让消息队列更加的稳定和可靠
ack
常用命令
:插入消息,保证有序,可以自动生成全局唯一
XADD
;
ID
:查询消息长度;
XLEN
:用于读取消息,可以按
XREAD
读取数据;
ID
: 根据消息
XDEL
删除消息;
ID
:删除整个
DEL
;
Stream
:读取区间消息
XRANGE
:按消费组形式读取消息
XREADGROUP
和
XPENDING
XACK
命令可以用来查询每个消费组内所有消费者「已读取、但尚未确认」的消息;
XPENDING
命令用于向消息队列确认消息处理已完成
XACK
层级储存
没有类似
Redis
中的
MySQL
的概念,
Table
的
Redis
允许有多个单词形成层级结构,多个单词之间用
key
隔开,格式如: 项目名:业务名:类型:id
:
一旦我们向
采用这样的方式存储,那么在可视化界面中,
Redis
会以层级结构来进行存储,形成类似于这样的结构,更加方便
Redis
获取数据
Redis
Redis缓存
缓存模型与思想
标准的操作方式就是查询数据库之前先查询缓存,如果缓存数据存在,则直接从缓存中返回,如果缓存数据不存在,再查询数据库,然后将数据存入
。
Redis
缓存淘汰
当
中的内存不够用时,此时再向
Redis
中添加新的
Redis
,那么
key
就会按照某一种规则将内存中的数据删除掉,这种数据的删除规则被称之为内存的淘汰策略
Redis
:不淘汰任何
noeviction
,但是内存满时不允许写入新数据,默认是该策略
key
:对设置了
volatile-ttl
的
TTL
,比较
key
的剩余
key
,
TTL
越小越先被淘汰
TTL
:对全体
allkeys-random
,随机进行淘汰
key
:对设置了
volatile-random
的
TTL
,随机进行淘汰
key
:对全体
allkeys-lru
,基于
key
算法进行淘汰
LRU
:对设置了
volatile-lru
的
TTL
,基于
key
算法进行淘汰
LRU
:对全体
allkeys-lfu
,基于
key
算法进行淘汰
LFU
:对设置了
volatile-lfu
的
TTL
,基于
key
算法进行淘汰
LFU
优先使用
策略。充分利用
alkeys-lru算法的优势,把最近最常访问的数据留在缓存中。如果业务有明显的冷热数据区分,建议使用如果业务中数据访问频率差别不大,没有明显冷热数据区分,建议使用
LRU,随机选择淘汰如果业务中有置顶的需求,可以使用
alkeys-random策略,同时置顶数据不设置过期时间,这些数据就一直不被删除,会淘汰其他设置过期时间的数据。如果业务中有短时高频访问的数据,可以使用
volatile-lru或
allkeys-lfu策略
volatile-lfu
先进先出
:最新的数据加入时,最老的数据淘汰最近最少使用
FIFO
:双向链表+哈希表的形式存储,双向链表用于组织数据,数据采用链表节点进行组织,最经常访问的排在前面,哈希表用于存储键
LRU
和值链表节点。当某个
key
被访问,程序通过哈希表迅速定位节点,并将该节点调整至链表的最开始位置最不经常使用
key
:元素会绑定被访问次数,形成一个链表,优先淘汰访问次数少的元素
LFU
缓存更新策略(双写一致性)
由于缓存的数据源来自于数据库,而数据库的数据是会发生变化的,因此,如果当数据库中数据发生变化,而缓存却没有同步,此时就会有一致性问题存在
双写一致性:当修改了数据库的数据也要同时更新缓存的数据,缓存和数据库的数据要保持一致
删除缓存还是更新缓存?
更新数据库时让缓存失效,查询时再更新缓存
先操作缓存还是先操作数据库?
先操作缓存,再操作数据库,会有脏数据的情况,缓存和数据库不一致
- 先操作数据库,再删除缓存,也会有脏数据的情况,缓存和数据库不一致
1
- 因此采用延时双删策略,待数据库修改完成后,延时一段时间,再删除一次缓存
1
如何保证缓存与数据库的双写一致
采用分布式锁的方式,每一个线程获取锁来操作完整个缓存的更新,但性能低下。可以通过读写锁来提高性能,只适用于强一致的业务采用异步通知保证数据的最终一致性,但需要保证
的可靠性
MQ
缓存过期删除
惰性删除:设置该
过期时间后,当需要该
key
时,检查其是否过期,如果过期,就删除,否则返回该
key
key
优点:对
友好,只会在使用该
CPU时才会进行过期检查,对于很多用不到的
key不用浪费时间进行检查
key缺点:对内存不友好,如果一个
已经过期,但是一直没有使用,那么该
key就会一直存在内存中
key
定期删除:每隔一段时间,对一些
进行检查,删除其中过期的
key
(从一定数量的数据库中取出一定数量的随机
key
进行检查,并删除其中的过期
key
)
key
模式是定时任务,执行频率默认为
SLOW
,每次不超过
10hz
,以通过修改配置文件
25ms
的hz 选项来调整这个次数
redis.conf
模式执行频率不固定,但两次间隔不低于
FAST
,每次耗时不超过
2ms
1ms
优点:可以通过限制删除操作执行的时长和频率来减少删除操作对 CPU 的影响。另外定期删除,也能有效释放过期键占用的内存。
缺点:难以确定删除操作执行的时长和频率。
缓存穿透
缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。因此恶意请求可以通过请求不存在的数据,一直请求到数据库进行攻击。
缓存空对象
即使数据库返回空对象,仍将空对象缓存至
中,下次用户访问这个不存在的数据,那么在
Redis
中也能找到这个数据就不会请求数据库
Redis
- <font>优点:实现简单,维护方便</font>
- <font>缺点:</font>
* <font>额外的内存消耗</font>
* <font>可能造成短期的不一致</font>
1234
布隆过滤
布隆过滤器其实采用的是哈希思想来解决这个问题,通过一个庞大的二进制数组,走哈希思想去判断当前这个要查询的这个数据是否存在,若布隆过滤器判断存在,则放行,这个请求会去访问
,哪怕此时
Redis
中的数据过期了,但是数据库中一定存在这个数据,在数据库中查询出来这个数据后,再将其放入到
Redis
中。假设布隆过滤器判断这个数据不存在,则直接返回
Redis
(位图):相于一个以
bitmap
位为单位的数组,数组中每个单元只能存储二级制数
bit
或
0
。数据在存储时通过多个哈希计算得到在位图上的位置并置
1
。因此可以查询数据是否存在。预热时就是将位图进行填充。
1
数组越小误判率就越大,数组越大误判率就越小,但同时带来的更多的内存消耗
缓存击穿
缓存击穿问题也叫热点
问题,就是一个被高并发访问并且缓存重建业务较复杂的
Key
突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。
key
假设线程
在查询缓存之后,本来应该去查询数据库,然后把这个数据重新加载到缓存的,此时只要线程
1
走完这个逻辑,其他线程就都能从缓存中加载这些数据了,但是假设在线程
1
没有走完的时候,后续的线程
1
,线程
2
,线程
3
同时过来访问当前这个方法, 那么这些线程都不能从缓存中查询到数据,那么他们就会同一时刻来访问查询缓存,都没查到,接着同一时间去访问数据库,同时的去执行数据库代码,对数据库访问压力过大
4
互斥锁
因为锁能实现互斥性。假设线程过来,只能单一来访问数据库,从而避免对于数据库访问压力过大,但这也会影响查询的性能,因为此时会让查询性能从并行变成了串行,我们可以采用
方法 +
tryLock
来解决这样的问题。
double check
强一致性,但性能差
代码
利用
的
Redis
方法来表示锁。该方法含义是
setnx
中若没有这个
Redis
,则插入成功,返回
key
,在
1
中返回
stringRedisTemplate
;若有这个
true
则插入失败返回
key
,在
0
返回
stringRedisTemplate
。通过
false
和
true
来判断是否有线程在获取和释放锁。
false
private boolean tryLock(String key) {
Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
return BooleanUtil.isTrue(flag);
}
private void unlock(String key) {
stringRedisTemplate.delete(key);
}
java
运行12345678
// 获取锁 String lockKey = "lock:shop:" + id; Shop shop = null; try { boolean isLock = tryLock(lockKey); // 判断否获取成功 if(!isLock){ //4.3 失败,则休眠重试 Thread.sleep(50); return queryWithMutex(id); } // 成功,根据id查询数据库 shop = getById(id); // 不存在,返回错误 if(shop == null){ // 将空值写入redis stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES); // 返回错误信息 return null; } // 写入redis stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_NULL_TTL,TimeUnit.MINUTES); }catch (Exception e){ throw new RuntimeException(e); } finally { //7.释放互斥锁 unlock(lockKey); }
java 运行123456789101112131415161718192021222324252627282930
逻辑过期
之所以会出现缓存击穿,原因是对
设置了过期时间,若不设置过期时间,就不会有缓存击穿问题。通过采用逻辑过期方案,将过期时间设置在
key
的
redis
中,在查询数据时通过
value
来判断当前数据是否过期,若过期,则获取互斥锁,并开启一个新线程去重构数据。
value
巧妙在于,异步的构建缓存,缺点在于在构建完缓存之前,返回的都是脏数据。
高可用,性能优,但会出现脏数据
缓存雪崩
缓存雪崩指在同一时段大量的缓存
同时失效或者
key
服务器宕机,导致大量请求到达数据库,带来巨大的压力。
Redis
给不同的
的
key
添加随机值利用
TTL
集群提高服务的可用性给缓存业务添加降级限流策略给业务添加多级缓存
Redis
Redis持久化
AOF持久化
AOF日志
全称为
AOF
(追加文件)。
Append Only File
处理的每一个写命令都会记录在
Redis
文件,可以看作是命令日志文件(只记录写操作命令,读操作命令不记录)。
AOF
先执行写操作命令,再将命令记录到
Redis
日志中
AOF
可以避免额外的检查开销,不用检查语法,因为命令执行成功后才会写入到
日志中不阻塞当前写操作命令,当写操作命令执行成功后,才会将命令记录到
AOF
日志但是因为两步操作,会有丢失数据的风险虽然不阻塞当前命令,但是有可能会阻塞下一个命令
AOF
写回策略

执行完写操作命令后,会将命令追加到
Redis
缓冲区然后通过
server.aof_buf
系统调用,将
write()
缓冲区的数据写入到
aof_buf
文件,此时数据并没有写入到硬盘,而是拷贝到了内核缓冲区
AOF
,等待内核将数据写入硬盘而内核缓冲区何时写入硬盘,由内核决定(
page cache
中
redis.conf
配置项可以有三种策略)
appendfsync
:每次写操作命令执行完后,同步将
Always
日志数据写回硬盘
AOF
:每次写操作命令执行完后,先将命令写入到
Everysec
文件的内核缓冲区,然后每隔一秒将缓冲区里的内容写回到硬盘
AOF
:写操作命令执行完后,先将命令写入到
No
文件的内核缓冲区,再由操作系统决定何时将缓冲区内容写回硬盘
AOF
重写机制
日志文件随着执行的写操作命令越来越多,文件大小也会越来越大,而过大的
AOF
日志文件就会带来性能问题,因为重启
AOF
后,需要读
Redis
文件的内容以恢复数据,过大
AOF
日志文件会导致恢复过程很慢。因此
AOF
重写机制保证当
AOF
文件超过设定阈值,就会压缩
AOF
文件
AOF
重写机制重写时,读取当前数据库中的所有键值对,然后将每个键值对用一条命令记录到新的
AOF
文件,等到全部记录完后,就将新的
AOF
文件替换掉现有的
AOF
文件,此时重复写入的命令就会合并。尽管某个键值对被多条写命令反复修改,最终也只需要根据这个键值对当前的最新状态,然后用一条命令去记录键值对,代替之前记录这个键值对的多条命令。
AOF
后台重写
的重写
Redis
过程是由后台子进程
AOF
来完成
bgrewriteaof
子进程进行
重写期间,主进程可以继续处理命令请求,从而避免阻塞主进程子进程带有主进程的数据副本,不需要像多线程那样共享内存加锁。创建子进程时,父子进程是共享内存数据的,不过这个共享的内存只能以只读的方式,而当父子进程任意一方修改了该共享内存,就会发生写时复制,于是父子进程就有了独立的数据副本,不用加锁来保证数据安全
AOF
在子进程重写
的日志过程中,若主进程修改已经存在的
AOF
,此时
key-value
数据在子进程的内存数据就跟主进程的内存数据不一致了。
key-value
设置一个
Redis
重写缓冲区,该缓冲区在创建
AOF
子进程之后开始使用。在重写
bgrewriteaof
期间,当
AOF
执行完一个写命令后,会同时将这个写命令写入到
Redis
缓冲区和
AOF
重写缓冲区
AOF
子进程完成重写工作后,会向主进程发送一条信号,主进程收到该信号后,会调用一个信号处理函数
将
重写缓冲区中的所有内容追加到新的
AOF
的文件中新的
AOF
的文件进行改名,覆盖现有的
AOF
文件
AOF
RDB持久化
全称为
RDB
(
Redis Database Backup file
数据备份文件),也被叫做
Redis
数据快照。当
Redis
实例故障重启后,从磁盘读取快照文件,恢复数据。快照文件称为
Redis
文件,默认是保存在当前运行目录。
RDB
快照就是记录某一个瞬间的内存数据,记录的是实际数据
RDB
持久化在四种情况下会执行:
RDB
执行
命令:
save
命令会导致主进程执行
save
,这个过程中其它所有命令都会被阻塞。只有在数据迁移时可能用到执行
RDB
命令:这个命令执行后会开启独立进程完成
bgsave
,主进程可以持续处理用户请求,不受影响
RDB
开始时会
bgsave
主进程得到子进程,子进程共享主进程的内存数据。完成
fork
后读取内存数据并写入
fork
文件中
RDB
停机时:
Redis
停机时会执行一次
Redis
命令,实现
save
持久化触发
RDB
条件时:
RDB
内部有触发
Redis
的机制,可以在
RDB
文件中找到
redis.conf
写时复制(Copy-On-Write,COW)
在执行
过程中,由于是交给子进程来构建
bgsave
文件,主线程还是可以继续工作的,通过写时复制技术,可以让主线程也能进行修改数据。执行
RDB
命令的时候,会通过
bgsave
创建子进程,此时子进程和父进程是共享同一片内存数据的,因为创建子进程的时候,会复制父进程的页表,但是页表指向的物理内存还是一个,和
fork()
重写机制相似
AOF
当主线程(父进程)要修改共享数据里的某一块数据(比如键值对
)时,就会发生写时复制,于是这块数据的物理内存会被复制一份(键值对
A
),然后主线程在这个数据副本(键值对
A
)进行修改操作而
A
子进程可以继续把原来的数据(键值对
bgsave
)写入到
A
文件。因此发生写时复制后,
RDB
快照保存的是先前的内容
RDB
RBD和AOF合体使用
通过混合使用
Redis4.0
日志和
AOF
内存快照(混合持久化),使得既有恢复速度快,又有
RBD
丢失数据少的优点
AOF
在混合持久化下,在
重写日志时,
AOF
出来的重写子进程会先将与主线程共享的内存数据以
fork
方式写入到
RDB
文件主线程处理的操作命令会被记录在重写缓冲区里,重写缓冲区里的增量命令会以
AOF
方式写入到
AOF
文件写入完成后通知主进程将新的含有
AOF
格式和
RDB
格式的
AOF
文件替换旧的的
AOF
文件。
AOF
文件的前半部分是
AOF
格式的全量数据,后半部分是
RDB
格式的增量数据,是
AOF
后台子进程重写
Redis
期间主线程处理的操作命令
AOF
Redis分布式
主从复制
主从复制(高并发):单点
的并发能力是有上限的,要进一步提高
Redis
的并发能力,就需要搭建主从集群,实现读写分离。并且单台服务器下,若出现故障,则数据有可能全部丢失。
Redis
所有的数据修改只在主服务器上进行,然后将最新的数据同步给从服务器,以使得数据一致
第一次同步-主从全量同步
:简称
Replication ld
,是数据集的标记,
replid
一致则说明是同一数据集。其中每一个
id
都有唯一的
master
,
replid
则会继承
slave
节点的
master
replid
:偏移量,随着记录在
offset
中的数据增多而逐渐增大。
repl_baklog
完成同步时会记录当前同步的
slave
。若
offset
的
slave
小于
offset
的
master
,说明
offset
数据落后于
slave
,需要更新
master
1. 从节点执行replicaof命令,发生psync命令请求主节点同步数据(replication id、offset) 2. 主节点判断是否是第一次请求,第一次就与从节点同步版本信息(replication id和offset) 3. 主节点执行bgsave(不阻塞主线程),生成RDB文件后,发送从节点执行,从节点收到RDB文件后,清空当前数据,载入RDB文件 + 但是在生成RDB文件时新增的数据不会同步,因此主服务器在下面这三个时间间隙中将收到的写操作命令,写入到replication buffer缓冲区里 - 主服务器生成RDB文件期间 - 主服务器发送RDB文件给从服务器期间 - 从服务器加载RDB文件期间 4. 把生成之后的命令日志文件发送给从节点进行同步 5. 最后,主服务器将replication buffer缓冲区里所记录的写操作命令发送给从服务器,从服务器执行后,主从服务器数据同步成功
123456789
命令传播
主从服务器完成第一次同步后,双方就会维护一个
连接,该连接是长连接,避免频繁的
TCP
连接和断开带来的性能开销
TCP
主从增量同步(
重启或网络断开重连,命令传播失效阶段)
slave
1. 从节点请求主节点同步数据,主节点判断不是第一次请求,不是就获取从节点的offset值
2. 主节点从命令日志中获取</font>`<font>offset</font>`<font>值之后的数据,发送给从节点进行数据同步</font>
1. repl_backlog_buffer,是一个环形缓冲区,用于主从服务器断连后,从中找到差异的数据(如果 offset 已经“掉出窗口”(被环形缓冲区覆盖了),主库无法做部分重同步,此时就会做全量同步)
2. replication offset,标记上面那个缓冲区的同步进度,主从服务器都有各自的偏移量,主服务器使用master_repl_offset记录写到的位置,从服务器使用slave_repl_offset记录读到的位置。
1234
在主服务器进行命令传播时,不仅将写命令发送给从服务器,还将写命令写入
缓冲区里,因此这个缓冲区里会保存着最近传播的写命令
repl_backlog_buffer
哨兵
在主从架构中,由于主从模式是读写分离,若主节点(
)故障宕机,则将没有主节点来服务客户端的写操作请求,也没有主节点给从节点(
master
)进行数据同步
slave
提供了哨兵(
Redis
)机制来实现主从集群自动故障恢复,结构和作用如下
Sentinel
监控:
会不断检查
Sentinel
和
master
是否按预期工作,
slave
基于心跳机制监测服务状态,每隔
Sentinel
秒向集群的每个实例发送
1
命令
ping
主观下线:如果某
节点发现某实例未在规定时间响应,则认为该实例主观下线。客观下线:若超过指定数量(
sentinel
)的
quorum
都认为该实例主观下线,则该实例客观下线。
sentinel
值最好超过
quorum
实例数量的一半。
Sentinel
自动故障恢复:如果
故障,
master
会将一个
Sentinel
提升为
slave
。当故障实例恢复后也以新的
master
为主。
master
首先判断主与从节点断开时间长短,如超过指定值就排除该从节点(此节点不会作为主节点)然后判断从节点的
值,越小优先级越高若
slave-priority
一样,则判断
slave-prority
节点的
slave
值,越大优先级越高(和主节点数据越一致)最后是判断
offset
节点的运行
slave
大小,越小优先级越高
id
脑裂问题:主节点与从节点以及哨兵在不同的网络分区下,哨兵与主节点的网络连接断开,此时,哨兵就会在从节点中选取一个主节点。但此时客户端与之前的主节点还连接着,往内写入数据,却不会同步到从结点中。之后网络恢复,此时之前主节点和断开的从节点连接在一起,但此时有两个主节点,而之前的主节点会被降级回从节点,之前写入的数据会被刷掉,此时数据丢失
解决方案:
中有两个配置参数:,
redis
表示最少的
min-replicas-to-write 1
节点为
salve
个;
1
表示数据复制和同步的延迟不能超过
min-replicas-max-lag 5
秒。不满足上述条件,则不允许客户端数据写入。
5
通知:
充当
Sentinel
客户端的服务发现来源,当集群发生故障转移时,会将最新信息推送给
Redis
的客户端
Redis
集群
集群中有多个
,每个
master
保存不同的数据,每个
master
都可以有多个
master
节点,
slave
之间通过
master
监测彼此健康状态。客户端请求可以请求访问请求中的任意节点,最终都会被转发到正确节点
ping
数据读写:
分片集群引入了哈希槽的概念,
Redis
集群中有
Redis
个哈希槽,每个
16384
通过
key
校验后对
CRC16
取模来决定放置在哪个槽,集群的每个节点负责一部分
16384
槽。并且可以对
Hash
的一部分作为
key
计算的输入
hash
Redis 允许对 key 的一部分进行哈希计算(称为 hash tag)。
写法是把 key 的一部分放在大括号
中,比如:
{}
- user:{1001}:name
- user:{1001}:age
12
这样 CRC16 时,只会对
计算槽号。结果就是这些 key 一定落到同一个槽(同一个节点),方便在同节点上做事务或 Lua 脚本。
{1001}
redis使用
redis安装与启动
安装
安装后重点文件说明:
:
/usr/local/redis-4.0.0/src/redis-server
服务启动脚本
Redis
:
/usr/local/redis-4.0.0/src/redis-cli
客户端脚本
Redis
:
/usr/local/redis-4.0.0/redis.conf
配置文件
Redis
启动
**服务端启动命令**
redis-server.exe redis.windows.conf
shell
1
服务默认端口号为
Redis
,通过快捷键
6379
即可停止
Ctrl + C
服务,当
Redis
服务启动成功后,可通过客户端进行连接。
Redis
客户端连接命令
redis-cli.exe
shell
1
通过
命令默认连接的是本地的
redis-cli.exe
服务,并使用默认6379端口。也可以通过指定如下参数连接:
redis
-h ip地址
-p 端口号
(如果需要)
-a 密码
修改
配置文件
Redis
设置
服务密码,修改
Redis
redis.windows.conf
requirepass 123456
shell
1
修改密码后需要重启
服务才能生效
Redis
Jedis
导入依赖
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.10.2</version>
<scope>test</scope>
</dependency>
xml
123456
线程池建立连接
public class JedisConnectionFactory{ private static final JedisPool jedisPool; static { JedisPoolConfig jedisPoolconfig = new JedisPoolconfig(); // 最大连接 jedisPoolConfig.setMaxTotal(8); // 最大空闲连接 jedisPoolConfig.setMaxIdle(8); // 最小空闲连接 jedisPoolconfig.setMinIdle(0); // 设置最长等待时间,ms jedisPoolConfig.setMaxWaitMillis(200); jedisPool = new JedisPool(jedisPoolconfig,"192.168.150.101",6379,1000,“123321); } // 获取Jedis对象 public static Jedis getJedis() { return jedisPool.getResource(); } }
java 运行1234567891011121314151617181920
使用Jedis
String result = jedis.set("hello", "world");
String name = jedis.get("hello");
java
运行12
`SpringDataRedis`
配置
`maven`坐标
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
xml
1234
配置`Redis`数据源
spring: redis: host: 192.168.150.101 port: 6379 password: 123321 lettuce: pool: max-active: 8 #最大连接 max-idle: 8 #最大空闲连接 min-idle: 0 #最小空闲连接 max-wait: 100ms #连接等待时间
yaml1234567891011
编写配置类
@Configuration @slf4j public class RedisConfiguration{ @Bean public RedisTemplate redisTemplate(RedisconnectionFactory redisConnectionfactory) { // 创建RedisTemplate对象 RedisTemplate<String, Object> template = new RedisTemplate<>(); // 设置连接工厂 template.setConnectionFactory(connectionFactory); // 创建JSON序列化工具 GenericJackson2JsonRedisSerializer jsonRedisSerializer = new GenericJackson2JsonRedisSerializer(); // 设置Key的序列化 template.setKeySerializer(RedisSerializer.string()); template.setHashKeySerializer(RedisSerializer.string()); // 设置Value的序列化 template.setValueSerializer(jsonRedisSerializer); template.setHashValueSerializer(jsonRedisSerializer); // 返回 return template; } }
java 运行12345678910111213141516171819202122
`Java`中操作`Redis`
`String`类型
public void testString(){ //SET key value 设置指定key的值 redisTemplate.opsForValue().set("city","北京"); //GET key 获取指定key的值 String city =(String) redisTemplate.opsForValue().get("city"); //SETEX key seconds value 设置指定key的值,并将key的过期时间设为seconds秒或分钟 redisTemplate.opsForValue().set("code", "1234", 3, TimeUnit.MINUTES); //SETNX key value 只有在key不存在时设置key的值 redisTemplate.opsForValue().setIfAbsent("city","北京"); }
java 运行12345678910111213
`Hash`类型
public void testHash(){ HashOperations hashOperations = redisTemplate.opsForHash(); //HSET key field value 将哈希表 key 中的字段 field 的值设为 value hashOperations.put("100", "name","tom"); hashOperations.put("100", "age","20"); //HGET key field 获取存储在哈希表中指定字段的值 String name = (String) hashOperations.get("100", "name"); //HKEYS key 获取哈希表中所有字段 Set keys = hashOperations.keys("100"); //HVALS key 获取哈希表中所有值 List values = hashOperations.values("100"); //HDEL key field 删除存储在哈希表中的指定字段 hashOperations.delete("100","age"); }
java 运行1234567891011121314151617181920
`List`类型
public void testList(){ //lpush lrane rpop llen ListOperations listOperations =redisTemplate.opsForList(); //LPUSH key value1 [value2] 将一个或多个值插入到列表头部 listOperations.leftPushAll("mylist","a","b","c"); listOperations.leftPush("mylist","d"); //LRANGE key start stop 获取列表指定范围内的元素 List mylist= listOperations.range("mylist",0,-1); //RPOP key 移除并获取列表最后一个元素 listOperations.rightPop("mylist"); //LLEN key 获取列表长度 Long size = listOperations.size("mylist"); }
java 运行123456789101112131415161718
`Set`类型
public void testset(){ SetOperations setOperations = redisTemplate.opsForset(); //SADD key member1[member2] 向集合添加一个或多个成员 setOperations.add("set1","a","b","c","d"); setOperations.add("set2","a","b","x","y"); //SMEMBERS key 返回集合中的所有成员 Set members =setOperations.members("set1"); //SCARD key 获取集合的成员数 Long size = setOperations.size("set1"); //SINTER key1 [key2]. 返回给定所有集合的交集 Set intersect = setOperations.intersect("set1","set2"); //SUNION key1 [key2]。 返回所有给定集合的并集公 Set union = setOperations.union("set1","set2"); //SREM key member1 [member2] 删除集合中一个或多个成员 setOperations.remove("set1","a","b"); }
java 运行1234567891011121314151617181920212223
有序集类型
public void testzset(){ ZSetOperations zSetOperations =redisTemplate.opsForZset(); //ZADD key score1 member1 [score2 member2] 向有序集合添加一个或多个成员 zSetOperations.add("zset1","a",10); zSetOperations.add("zset1","b",12); zSetOperations.add("zset1","c",9); //ZRANGE key start stop [WITHSCORES] 通过索引区间返回有序集合中指定区间内的成员 Set zset1 = zSetOperations.range("zset1",0,-1); //ZINCRBY key increment member 有序集合中对指定成员分数加上增加increment zSetOperations.incrementscore("zset1","c",10); //ZREM key member [member ...] 移除有序集合中的一个或多个成员 zSetOperations.remove("zset1","a","b"); }
java 运行123456789101112131415161718
通用命令
public void testCommon(){ //KEYS pattern 查找所有符合给定模式( pattern)的 key Set keys = redisTemplate.keys("*");System.out.println(keys); //EXISTS key 检查给定 key 是否存在 Boolean name =redisTemplate.hasKey("name"); Boolean set1=redisTemplate.hasKey("set1"); //TYPE key 返回 key 所储存的值的类型 for(object key : keys){ forDataType type = redisTemplate.type(key); } //DEL key 该命令用于在 key 存在时删除 key redisTemplate.delete( key:"mylist"); }
java 运行1234567891011121314151617
通用功能
全局`ID`生成
一种在分布式系统下用来生成全局唯一
的工具,一般要满足下列特性
ID
通过
的自增功能,实现全局唯一
Redis
的生成
ID
为了增加
的安全性,我们可以不直接使用
ID
自增的数值,而是拼接一些其它信息
Redis
符号位:
,永远为
1bit
,表示正数时间戳:
0
,以秒为单位,可以使用
31bit
年序列号:
69
,秒内的计数器,支持每秒产生
32bit
个不同
2^32
ID
@Component public class RedisIdWorker { /** * 开始时间戳 */ private static final long BEGIN_TIMESTAMP = 1640995200L; /** * 序列号的位数 */ private static final int COUNT_BITS = 32; private StringRedisTemplate stringRedisTemplate; public RedisIdWorker(StringRedisTemplate stringRedisTemplate) { this.stringRedisTemplate = stringRedisTemplate; } public long nextId(String keyPrefix) { // 1.生成时间戳 LocalDateTime now = LocalDateTime.now(); long nowSecond = now.toEpochSecond(ZoneOffset.UTC); long timestamp = nowSecond - BEGIN_TIMESTAMP; // 2.生成序列号 // 2.1.获取当前日期,精确到天 String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd")); // 2.2.自增长 long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date); // 3.拼接并返回 return timestamp << COUNT_BITS | count; } }
java 运行123456789101112131415161718192021222324252627282930313233
线程锁
多线程安全问题,可以用两种锁来解决
悲观锁:
悲观锁可以实现对于数据的串行化执行,比如
,和
syn
都是悲观锁的代表,同时,悲观锁中又可以再细分为公平锁,非公平锁,可重入锁,等等
lock
乐观锁:
有一个版本号,每次操作数据会对版本号
,提交回数据时,会去校验是否比之前版本大
+1
,如果大
1
,则操作成功,否则说明有其他线程进行了修改,因此线程不安全。
1
也可以直接用库存量作为版本号
boolean success = seckillVoucherService.update()
.setSql("stock = stock -1") //set stock = stock -1
.eq("voucher_id", voucherId).eq("stock",voucher.getStock()).update();
//where id = ? and stock = ?
java
运行1234
分布式锁
Redis获取锁
互斥:确保只能有一个线程获取锁,利用
的互斥特性非阻塞:尝试一次,成功返回
SETNX
,失败返回
true
false
SET lock thread
expire lock 10 // 设置超时时间 避免未释放锁就宕机情况
//合为一条 NX是互斥 EX是设置超时时间
set lock thread NX EX 10
// 释放锁
DEL key
shell
12345678
代码实现
public boolean tryLock(long timeoutSec) {
// 获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
// 获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
java
运行12345678
Redis释放锁
手动释放超时释放:获取锁时添加一个时间
DEL lock
shell
1
但是在释放锁的情况下,有可能出现误删情况
持有锁的线程在锁的内部出现了阻塞,导致他的锁自动释放,这时其他线程,线程
来尝试获得锁,就拿到了这把锁,然后线程
2
在持有锁执行过程中,线程
2
反应过来,继续执行,而线程
1
执行过程中,走到了删除锁逻辑,此时就会把本应该属于线程
1
的锁进行删除,这就是误删别人锁的情况说明
2
解决方法:在获取锁时存入线程标示(可以用UUID表示) 在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致
public void unlock() { // 获取线程标示 String threadId = ID_PREFIX + Thread.currentThread().getId(); // 获取锁中的标示 String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name); // 判断标示是否一致 if(threadId.equals(id)) { // 释放锁 stringRedisTemplate.delete(KEY_PREFIX + name); } }
java 运行1234567891011
但是由于线程的拿锁,比锁和删锁操作并不是原子性操作,仍然会有失败可能。
提供了
Redis
脚本功能,在一个脚本中编写多条
Lua
命令,确保多条命令执行时的原子性。
Redis
redis.call('命令名称', 'key', '其它参数', ...) # 执行 set name jack redis.call('set', 'name', 'jack') # 先执行 set name jack redis.call('set', 'name', 'Rose') # 再执行 get name local name = redis.call('get', 'name') # 返回 return name
lua 运行1234567891011
-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
-- 一致,则删除锁
return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0
lua
运行12345678
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT; static { UNLOCK_SCRIPT = new DefaultRedisScript<>(); UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua")); UNLOCK_SCRIPT.setResultType(Long.class); } public void unlock() { // 调用lua脚本 stringRedisTemplate.execute( UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX + name), ID_PREFIX + Thread.currentThread().getId()); }
java 运行1234567891011121314
分布式锁-Redission
基于
实现的分布式锁存在下面的问题:
setnx
不可重入问题:重入问题是指获得锁的线程可以再次进入到获取相同的锁的代码块中,可重入锁的意义在于防止死锁,比如
这样的代码中,他的方法都是使用
HashTable
修饰的,假如他在一个方法内,调用另一个方法,那么此时如果锁是不可重入的,就会产生死锁。所以可重入锁的主要意义为了是防止死锁,我们的
synchronized
和
synchronized
锁都是可重入的。不可重试:是指目前分布式只能尝试一次,合理的情况是:当线程获得锁失败后,他应该能再次尝试获得锁主从一致性:如果
Lock
提供了主从集群,当向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。
Redis
实现的分布式执行流程
Redission
的加锁,设置过期时间等操作都是基于
Redission
脚本完成,以保证原子性若业务时间较长,超过锁的时间,则需要对锁续期。
Lua
则设置了
Redission
线程完成锁的续期
Watch dog
在获取锁时,在设置时间内会不断循环尝试
Redission
实现的分布式锁是可重入的
Redission
利用
结构记录线程
Hash
和重入次数
id
为了保证主从一致性,可以采用
(红锁):不止在一个
RedLock
实例上创建锁,而是应该是要在多个
redis
实例上创建锁
redis
,避免在一个redis实例上加锁【但是维护复杂,实在要强一致,可以采用
(n/2+1)
】
Zookeeper
引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.6</version>
</dependency>
xml
12345
配置Redisson客户端
@Configuration public class RedissonConfig { @Bean public RedissonClient redissonClient(){ // 配置 Config config = new Config(); // 添加redis地址,这里添加单点地址,也可以使用config.useClusterServers()添加集群地址 config.useSingleServer().setAddress("redis://192.168.150.101:6379") .setPassword("123321"); // 创建RedissonClient对象 return Redisson.create(config); } }
java 运行123456789101112131415
使用Redission的分布式锁
消息x@Resource private RedissionClient redissonClient; @Test void testRedisson() throws Exception{ //获取锁(可重入),指定锁的名称 RLock lock = redissonClient.getLock("anyLock"); //尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位 boolean isLock = lock.tryLock(10,10,TimeUnit.SECONDS); //判断获取锁成功 if(isLock){ try{ System.out.println("执行业务"); }finally{ //释放锁 lock.unlock(); } } }
java 运行12345678910111213141516171819
`Redis`消息队列
存放消息的队列。最简单的消息队列模型包括3个角色:
消息队列:存储和管理消息,也被称为消息代理(
)生产者:发送消息到消息队列消费者:从消息队列获取消息并处理消息
Message Broker
基于List实现消息队列
的
Redis
数据结构是一个双向链表,很容易模拟出队列效果。
list
通过
或者
BRPOP
来实现阻塞效果
BLPOP
优点:
利用
存储,不受限于
Redis
内存上限基于
JVM
的持久化机制,数据安全性有保证可以满足消息有序性
Redis
缺点:
无法避免消息丢失只支持单消费者
基于PubSub的消息队列
消费者可以订阅一个或多个`channel`,生产者向对应`channel`发送消息后,所有订阅者都能收到相关消息。
:订阅一个或多个频道
SUBSCRIBE channel [channel]
:向一个频道发送消息
PUBLISH channel msg
:订阅与
PSUBSCRIBE pattern[pattern]
格式匹配的所有频道
pattern
h?1lo** subscribes to **hello**, **hallo** and **hxllo**
**h*llo** subscribes to **hllo** and **heeeello**
**h[ae]llo** subscribes to **hello** and **hallo**, but not hillo
优点:
采用发布订阅模型,支持多生产、多消费
缺点:
不支持数据持久化无法避免消息丢失消息堆积有上限,超出时数据丢失
基于Stream的消息队列
`Stream`是`Redis 5.0`引入的一种新数据类型,可以实现一个功能非常完善的消息队列。
# 发送消息 XADD key [NOMKSTREAM][MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field vlaue [field vlaue] # [NOMKSTREAM]: 如果队列不存在 是否自动创建队列 默认是自动创建 # [MAXLEN|MINID [=|~] threshold [LIMIT count]]: 设置消息队列的最大消息数量 # *|ID:消息的唯一id,*代表由Redis自动生成。格式是"时间-递增数字",例如"1644804662707-0" # field vlaue:发送到队列中的消息,称为Entry。格式就是多个key-value键值对 # 创建名为 users 的队列,并向其中发送一个消息,内容是:{name=jack,age=21},并且使用Redis自动生成ID XADD users * name jack age 21 # 读取消息 XREAD [COUNT count][BLOCK milliseconds] STREAMS key [key ...] ID [ID ...] # [COUNT count]: 每次读取消息的最大数量 # [BLOCK milliseconds]: 当没有消息的时候 是否阻塞 阻塞时长 # STREAMS key:要从哪个队列读取消息,key是队列名 # ID: 起始id,只返回大于该ID的消息 0:代表从第一个消息开始 $:代表从最新的消息开始 # 使用XREAD读取第一个消息 XREAD COUNT 1 STREAMS users 0 # XREAD阻塞方式,读取最新的消息 XREAD COUNT 1 BLOCK 1000 STREAMS users $
shell123456789101112131415161718192021
在业务开发中,可以循环的调用
阻塞方式来查询最新消息,从而实现持续监听队列的效果
XREAD
while(true){
//尝试读取队列中的消息,最多阳塞2秒
Object msg = redis.execute("XREAD COUNT I BLOCK 2000 STREAMS users $");
if(msg == null) {
continue;
}
// 处理消息
handleMessage(msg);
}
java
运行123456789
当指定起始
为
ID
时,代表读取最新的消息,如果处理一条消息的过程中,又有超过
$
条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题
1
类型消息队列的
STREAM
命令特点:
XREAD
消息可回溯一个消息可以被多个消费者读取可以阻塞读取有消息漏读的风险
消费者组
消费者组(
):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:
Consumer Group
消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的消息标示:消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费消息确认:消费者获取消息后,消息处于
状态,并存入一个
pending
。当处理完成后需要通过
pending-list
来确认消息,标记消息为已处理,才会从
XACK
移除
pending-list
# 创建消费者组 XGROUP CREATE key groupName ID [MKSTREAM] # key: 队列名称 # groupName:消费者组名称 # ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息 # MKSTREAM:队列不存在时自动创建队列 # 删除指定的消费者组 XGROUP DESTORY key groupName # 给指定的消费者组添加消费者 XGROUP CREATECONSUMER key groupname consumername # 删除消费者组中的指定消费者 XGROUP DELCONSUMER key groupname consumername # 从消费者组读取消息 XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...] # group:消费组名称 # consumer:消费者名称,如果消费者不存在,会自动创建一个消费者 # count:本次查询的最大数量 # BLOCK milliseconds:当没有消息时最长等待时间 # NOACK:无需手动ACK,获取到消息后自动确认 # STREAMS key:指定队列名称 # ID:获取消息的起始ID: # ">":从下一个未消费的消息开始 # 其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始 # 确认消息已处理 XACK key group ID [ID ...] # 查看pending-list XPENDING key group[[lDLE min-idie-timel start endcount [consumer]]
shell123456789101112131415161718192021222324252627282930313233
消费者监听消息的基本思路
while(true){ //尝试监听队列,使用阻塞模式,最长等待 2000毫秒 Object msg = redis.call("XREADGROUP GROUP gI CI COUNT 1 BLOCK 2000 STREAMS S1>"); if(msg == null){ // null说明没有消息,继续下一次 continue; } try { //处理消息,完成后一定要ACK handleMessage(msg); }catch(Exception e){ while(true){ // 在pending-listr中取数据来处理 Object msg = redis.calL("XREADGROUP GROUP gI C1 COUNT 1 STREAMS S1 @"); if(msg == null){ // null说明没有异常消息,所有消息都已确认,结束循环 break; } try { //说明有异常消息,再次处理 handleMessage(msg); }catch(Exception e){ //再次出现异常,记录日志,继续循环 continue; } } } }
java 运行1234567891011121314151617181920212223242526
类型消息队列的
STREAM
命令特点:
XREADGROUP
消息可回溯可以多消费者争抢消息,加快消费速度可以阻塞读取没有消息漏读的风险有消息确认机制,保证消息至少被消费一次
例子:
private class VoucherOrderHandler implements Runnable { @Override public void run() { while (true) { try { // 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 > List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)), StreamOffset.create("stream.orders", ReadOffset.lastConsumed()) ); // 2.判断订单信息是否为空 if (list == null || list.isEmpty()) { // 如果为null,说明没有消息,继续下一次循环 continue; } // 解析数据 MapRecord<String, Object, Object> record = list.get(0); Map<Object, Object> value = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true); // 3.创建订单 createVoucherOrder(voucherOrder); // 4.确认消息 XACK stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId()); } catch (Exception e) { log.error("处理订单异常", e); //处理异常消息 handlePendingList(); } } } private void handlePendingList() { while (true) { try { // 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0 List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read( Consumer.from("g1", "c1"), StreamReadOptions.empty().count(1), StreamOffset.create("stream.orders", ReadOffset.from("0")) ); // 2.判断订单信息是否为空 if (list == null || list.isEmpty()) { // 如果为null,说明没有异常消息,结束循环 break; } // 解析数据 MapRecord<String, Object, Object> record = list.get(0); Map<Object, Object> value = record.getValue(); VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true); // 3.创建订单 createVoucherOrder(voucherOrder); // 4.确认消息 XACK stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId()); } catch (Exception e) { log.error("处理pendding订单异常", e); try{ Thread.sleep(20); }catch(Exception e){ e.printStackTrace(); } } } } }
java 运行123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
`Feed`流(`Redis`滚动分页)
流为用户持续的提供“沉浸式”的体验,通过无限下拉刷新获取新的信息。
Feed
不需要用户再去推送信息,而是系统分析用户到底想要什么,然后直接把内容推送给用户,从而使用户能够更加的节约时间,不用主动去寻找。
Feed
流有两种模式:
Feed
:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈
Timeline
优点:信息全面,不会有缺失。并且实现也相对简单缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低
智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣信息来吸引用户
优点:投喂用户感兴趣信息,用户粘度很高,容易沉迷缺点:如果算法不精准,可能起到反作用
Feed流的实现模式
拉模式(读扩散)
当张三和李四和王五发了消息后,都会保存在自己的邮箱中,假设赵六要读取信息,那么他会从读取他自己的收件箱,此时系统会从他关注的人群中,把他关注人的信息全部都进行拉取,然后在进行排序
- 优点:比较节约空间,因为赵六在读信息时,并没有重复读取,而且读取完之后可以把他的收件箱进行清楚。
- 缺点:比较延迟,当用户读取数据时才去关注的人里边去读取数据,假设用户关注了大量的用户,那么此时就会拉取海量的内容,对服务器压力巨大。
12
推模式(写扩散)
推模式是没有写邮箱的,当张三写了一个内容,此时会主动的把张三写的内容发送到他的粉丝收件箱中去,假设此时李四再来读取,就不用再去临时拉取了
- 优点:时效快,不用临时拉取
- 缺点:内存压力大,假设一个大V写信息,很多人关注他, 就会写很多分数据到粉丝那边去
12
推拉结合模式(读写混合)
推拉模式是一个折中的方案,站在发件人这一段,如果是个普通的人,那么我们采用写扩散的方式,直接把数据写入到他的粉丝中去,因为普通的人他的粉丝关注量比较小,所以这样做没有压力,如果是大V,那么他是直接将数据先写入到一份到发件箱里边去,然后再直接写一份到活跃粉丝收件箱里边去,现在站在收件人这端来看,如果是活跃粉丝,那么大V和普通的人发的都会直接写入到自己收件箱里边来,而如果是普通的粉丝,由于他们上线不是很频繁,所以等他们上线时,再从发件箱里边去拉信息。
滚动分页
传统了分页在
流是不适用的,因为
Feed
流中的数据会不断更新,所以数据的脚标也在变化,因此不能采用传统的分页模式。
Feed
假设在
时刻,我们去读取第一页,此时
t1,
page = 1,那么我们拿到的就是
size = 5这几条记录,假设现在
10~6时候又发布了一条记录,此时
t2时刻,读取第二页,传入的参数是
t3,那么此时读取到的第二页实际上是从
page=2 ,size=5开始,然后是
6,那么我们就读取到了重复的数据,所以
6~2流的分页,不能采用原始方案来做。
feed
流的滚动分页:记录每次操作的最后一条,然后从这个位置开始去读取数据
Feed
我们从
时刻开始,拿第一页数据,拿到了
t1,然后记录下当前最后一次拿取的记录,就是
10~6,
6时刻发布了新的记录,此时这个
t2放到最顶上,但是不会影响我们之前记录的
11,此时
6时刻来拿第二页,第二页这个时候拿数据,还是从
t3后一点的
6去拿,就拿到了
5的记录。我们可以采用
5-1来做,可以进行范围查询,并且还可以记录当前获取数据时间戳最小值,就可以实现滚动分页了
sortedSet
滚动分页查询参数
:当前时间戳 | 上一次查询的最小时间戳
max
:时间戳最小值
min
0
:
offset
| 上一次查询的最小值的个数
0
:查询条数
count
@Override public Result queryBlogOfFollow(Long max, Integer offset) { // 1.获取当前用户 Long userId = UserHolder.getUser().getId(); // 2.查询收件箱 ZREVRANGEBYSCORE key Max Min LIMIT offset count String key = FEED_KEY + userId; Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet() .reverseRangeByScoreWithScores(key, 0, max, offset, 2); // 3.非空判断 if (typedTuples == null || typedTuples.isEmpty()) { return Result.ok(); } // 4.解析数据:blogId、minTime(时间戳)、offset List<Long> ids = new ArrayList<>(typedTuples.size()); long minTime = 0; // 2 int os = 1; // 最小值的个数 for (ZSetOperations.TypedTuple<String> tuple : typedTuples) { // 5 4 4 2 2 // 4.1.获取id ids.add(Long.valueOf(tuple.getValue())); // 4.2.获取分数(时间戳) long time = tuple.getScore().longValue(); if(time == minTime){ os++; }else{ minTime = time; os = 1; } } os = minTime == max ? os : os + offset; // 5.根据id查询blog String idStr = StrUtil.join(",", ids); List<Blog> blogs = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list(); for (Blog blog : blogs) { // 5.1.查询blog有关的用户 queryBlogUser(blog); // 5.2.查询blog是否被点赞 isBlogLiked(blog); } // 6.封装并返回 ScrollResult r = new ScrollResult(); r.setList(blogs); r.setOffset(os); r.setMinTime(minTime); return Result.ok(r); }
java 运行123456789101112131415161718192021222324252627282930313233343536373839404142434445464748