Redis

内容分享4天前发布
1 0 0

简介


Redis
是一个基于内存的
key-value
结构的
NoSQL
数据库

基于内存存储,读写性能高适合存储热点数据,例如访问量大的数据


Redis
是纯内存操作,执行速度非常快采用单线程,避免不必要的上下文切换可竞争条件,多线程还要考虑线程安全问题使用
I/O
多路复用模型,非阻塞
IO


Redis
是纯内存操作,执行速度非常快,它的性能瓶颈是网络延迟而不是执行速度,
IO
多路复用模型主要就是实现了高效的网络请求


I/O
多路复用

用户空间和内核空间:
Linux
系统中一个进程使用的内存情况划分两部分:内核空间、用户空间。
用户空间只能执行受限的命令(
Ring3
),而且不能直接调用系统资源,必须通过内核提供的接口来访问内核空间可以执行特权命令(
Ring0
),调用一切系统资源

Redis


Linux
系统为了提高
IO
效率,会在用户空间和内核空间都加入缓冲区
写数据时,要把用户缓冲数据拷贝到内核缓冲区,然后写入设备读数据时,要从设备读取数据到内核缓冲区,然后拷贝到用户缓冲区
阻塞
IO
两个阶段都必须阻塞等待

Redis


- 阶段一:
    * 用户进程尝试读取数据(比如网卡数据)
    * 此时数据尚未到达,内核需要等待数据
    * 此时用户进程也处于阻塞状态
- 阶段二:
    * 数据到达并拷贝到内核缓冲区,代表已就绪
    * 将内核数据拷贝到用户缓冲区
    * 拷贝过程中,用户进程依然阻塞等待
    * 拷贝完成,用户进程解除阻塞,处理数据
- 在阻塞IO模型中,用户进程在两个阶段都是阻塞状态
Redis12345678910

非阻塞
IO
:**非阻塞
IO

recvfrom
操作会立即返回结果而不是阻塞用户进程

Redis


- 阶段一:
    * 用户进程尝试读取数据(比如网卡数据)此时数据尚未到达,内核需要等待数据
    * 返回异常给用户进程
    * 用户进程拿到error后,再次尝试读取
    * 循环往复,直到数据就绪
- 阶段二:
    * 将内核数据拷贝到用户缓冲区
    * 拷贝过程中,用户进程依然阻塞等待
    * 拷贝完成,用户进程解除阻塞,处理数据
- 非阻塞IO模型中,用户进程在第一个阶段是非阻塞,第二个阶段是阻塞状态。虽然是非阻塞,但性能并没有得到提高。而且忙等机制会导致CPU空转,CPU使用率暴增。
Redis12345678910


IO
多路复用:利用单个线程来同时监听多个
Socket
,并在某个
Socket
可读、可写时得到通知,从而避免无效的等待,充分利用
CPU
资源

Redis


- 阶段一:
    * 用户进程调用select,指定要监听的Socket集合内核,监听对应的多个socket
    * 任意一个或多个socket数据就绪则返回readable
    * 此过程中用户进程阻塞
- 阶段二:
    * 用户进程找到就绪的socket
    * 依次调用recvfrom读取数据
    * 内核将数据拷贝到用户空间
    * 用户进程处理数据
- IO多路复用是利用单个线程来同时监听多个Socket,并在某个Socket可读、可写时得到通知,从而避免无效的等待充分利用CPU资源。不过监听Socket的方式、通知的方式又有多种实现,常见的有:select、poll、epoll
    * select和poIl只会通知用户进程有Socket就绪,但不确定具体是哪个Socket,需要用户进程逐个遍历Socket来确认
    * epoll则会在通知用户进程Socket就绪的同时,把已就绪的Socket写入用户空间
Redis123456789101112


Redis
网络模型


Redis
通过
IO
多路复用来提高网络性能,并支持各种不同的多路复用实现,并将这些实现进行封装,提供了统一的高性能事件库

Redis

数据类型


Redis
是一个
key-value
的数据库,
key
一般是
String
类型,不过
value
的类型多种多样

Redis

字符串String

字符串(
String
):普通字符串,
Redis
中最简单的数据类型,采用最基本的
key-value
结构
不用
C
语言字符串的原因:
O(n)
复杂度获取长度、没有较好的扩容机制、特殊字符无法处理
Redis
通过结构体
SDS
(简单动态字符串)记录长度和类型

SDS
可存储更复杂的二进制数据,其使用
len
属性的值而非空字符来判断字符串是否结束
SDS
获取长度时间复杂度
O(1)
,减少字符串扩容引起的数据搬运次数
SDS

API
拼接字符串不会造成缓冲区溢出。其会检查
SDS
的空间,不够会自动扩容
存储
若字符串对象保存整数值,且可以用
long
类型标识,则将整数值保存在字符串对象结构的
ptr
属性里面,并将字符串的编码设置为
int
若字符串对象保存的是一个字符串,且长度小于等于
32
字节,则会使用
SDS
保存,且对象编码会设置为
embstr
若字符串对象保存的是一个字符串,且长度大于
32
字节,则会使用
SDS
保存,且对象编码会设置为
raw

常用命令


SET key value
设置指定
key
的值
可以用于在分布式的情况下存储
Session
,解决重复登录问题

GET key
获取指定
key
的值
SETEX key seconds value
设置指定
key
的值,并将
key
的过期时间设为
seconds

SETNX key value
只有在
key
不存在时设置
key
的值
可以用该命令实现分布式锁,若
key
不存在,则显示插入成功,可以用来表示加锁成功,若
key
存在,则会显示插入失败,可以用来表示加锁失败解锁要判断是否是持有锁的客户端进行操作,因此有两步操作(判断、解锁),通过
Lua
脚本保证原子性

MSET key1 value1 key2 value2
批量添加多个
String
类型的键值对
MGET key1 key2 key3
根据多个
key
获取多个
String
类型的
value

INCR key
让一个
key
对应的整型
value
自增
1

INCRBY key num
让一个
key
对应的整型
value
自增并指定步长
num

INCRBYFLOAT key num
让一个浮点类型的数字自增并指定步长


SET

GET
: 如果
key
不存在则是新增,如果存在则是修改

哈希(Hash)

哈希(
hash
):也叫散列,类似于
Java
中的
HashMap
结构,
value
保存的就是键值对

Redis
采用压缩列表或哈希表作为哈希表的实现
如果哈希类型的元素个数小于阈值(默认
512
),每个元素值小于阈值(默认
64
字节),则采用压缩列表实现否则采用哈希表


Redis 7.0
中,压缩列表数据结构已经废弃,交由
listpack
数据结构来实现

常用命令


HSET key field value
将哈希表
key
中的字段
field
的值设为
value

HGET key field
获取存储在哈希表中指定字段的值
HDEL key field
删除存储在哈希表中的指定字段
HKEYS key
获取哈希表中所有
field

HVALS key
获取哈希表中所有
value

HMSET key field1 value1 field2 value2
批量添加多个
hash
类型
key

field
的值
HMGET key field1 field2
批量获取多个
hash
类型
key

field
的值
HGETALL key
获取
key
中的
hash
的所有的
field

value

HINCRBY key field num
让一个
hash

filed
的字段值自增指定步长
num

HSETNX key field value
添加
hash
类型
field
值,前提这个
field
不存在

链表(List)

链表(
list
):按照插入顺序排序,可以有重复元素,类似于
Java
中的
LinkedList

底层实现是双向链表或压缩列表
如果列表的元素个数小于阈值(默认
512
),每个元素值小于阈值(默认
64
字节),则采用压缩列表实现否则采用双向链表


Redis 3.2
版本后,
List
数据类型底层数据结构只由
quicklist
实现了,替代了双向链表和压缩列表。

常用命令


LPUSH key value1 [value2]
将一个或多个值插入到列表头部
RPUSH key value1 [value2]
向列表右侧插入一个或多个元素
LRANGE key start stop
获取列表指定范围内的元素
RPOP key
移除并获取列表最后一个元素
LPOP key
移除并返回列表左侧的第一个元素,没有则返回
null

LLEN key
获取列表长度
BLPOP和BRPOP

LPOP

RPOP
类似,在没有元素时等待指定时间
可以用于实现消息队列,其满足以下几点
满足消息保序需求重复消息:通过设置消息
ID
,进行检查
ID
来处理重复消息可靠性:可以采用
BRPOPLPUSH
命令,让消费者程序从一个
List
中读取消息,同时,
Redis
会把这个消息再插入到另一个备份
List
,通过读取备份
List
保证消息的可靠性


List
不支持多个消费者消费同一条消息,一旦消费者拉取一条消息后,这条消息就从
List
中删除

集合(Set)

集合(
set
):无序集合,没有重复元素,类似于
Java
中的
Hashset

如果集合中的元素都是整数且元素个数小于 阈值(默认
512
),
Redis
会使用整数集合作为
Set
类型的底层数据结构否则使用哈希表作为
Set
类型的底层数据结构

常用命令


SADD key member1[member2]
向集合添加一个或多个成员
可以用于点赞等唯一操作

SMEMBERS key
返回集合中的所有成员
SCARD key
获取集合的成员数
SINTER key1 [key2]
返回给定所有集合的交集
可以用于获取沟通关注等

SUNION key1 [key2]
返回所有给定集合的并集
SDIFF key1 [key2]

key1

key2
的差集
SREM key member1 [member2]
删除集合中一个或多个成员
SISMEMBER key member
判断一个元素是否存在于
set

有序集合(Zset)

有序集合(
sorted set/zset
):集合中每个元素关联一个分数(
score
),根据分数升序排序,并且没有重复元素

set
类型的底层数据结构是由压缩列表或跳表实现
如果有序集合的元素个数小于
128
个,并且每个元素的值小于
64
字节时,
Redis
会使用压缩列表作为
Zset
类型的底层数据结构否则采用跳表实现

在 Redis 7.0 中,压缩列表数据结构已经废弃了,交由
listpack
数据结构来实现

常用命令


ZADD key score1 member1 [score2 member2]
向有序集合添加一个或多个成员
ZREM key member [member ...]
移除有序集合中的一个或多个成员
ZRANGE key start stop [WITHSCORES]
通过索引区间返回有序集合中指定区间的成员
ZINCRBY key increment member
有序集合中对指定成员分数加上
increment

ZSCORE key member
获取
sorted set
中的指定元素的
score

ZRANK key member
获取
sorted set
中的指定元素的排名
ZCARD key
获取
sorted set
中的元素个数
ZCOUNT key min max
统计
score
值在给定范围内的所有元素的个数
ZDIFF.ZINTER.ZUNION
求差集.交集.并集

所有的排名默认都是升序,如果要降序则在命令的Z后面添加REV即可

位图(bitMap)

位图是利用数据中每一个bit位作为一种状态映射,可以利用极小的空间来实现大量数据的表示。

例如,可以用位图来记录用户一个月的签到情况。

Redis


Redis
中利用
String
类型结构实现
BitMap
,其最大上限
512M
,转换为
bit

2^32

bit
位。
String
类型是会保存为二进制的字节数组,所以,
Redis
就把字节数组的每个 bit 位利用起来,用来表示一个元素的二值状态,可以把
Bitmap
看作是一个
bit
数组

常用命令


SETBIT key offset value
向指定位置(
offset
)存入一个
0

1

GETBIT key offset
获取指定位置(
offset
)的
bit

BITCOUNT key start end
统计
BitMap
中指定范围值为
1

bit
位的数量
BITOP
将多个
BitMap
的结果做位运算(与 、或、异或)
BITPOS
查找
bit
数组指定范围内第一个
0

1
出现的位置
BITFIELD
:操作(查询、修改、自增)
BitMap

bit
数组中的指定位置(
offset
)的值
BITFIELD_RO
:获取
BitMap

bit
数组,并以十进制形式返回

GEO地理数据结构


GEO
本身没有设计新的底层数据结构,而直接使用
Sorted Set
集合类型。
GEO
类型使用
GeoHash
编码方法实现了经纬度到
Sorted Set
中元素权重分数的转换,这其中的两个关键机制就是对二维地图做区间划分和对区间进行编码。一组经纬度落在某个区间后,就用区间的编码值来表示,并把编码值作为
Sorted Set
元素的权重分数。

常用命令


GEOADD
:添加一个地理空间信息,包含:经度
longitude
、纬度
latitude
、值
member

GEODIST
:计算指定的两个点之间的距离并返回
GEOHASH
:将指定
member
的坐标转为
hash
字符串形式并返回
GEOPOS
:返回指定
member
的坐标
GEORADIUS
:指定圆心、半径,找到该圆内包含的所有
member
,并按照与圆心之间的距离排序后返回。
6.
以后已废弃
GEOSEARCH
:在指定范围内搜索
member
,并按照与指定点之间的距离排序后返回。范围可以是圆形或矩形。
6.2
新功能
GEOSEARCHSTORE
:与
GEOSEARCH
功能一致,不过可以把结果存储到一个指定的
key

Stream


Redis
专门为消息队列设计的数据类型,其支持消息的持久化、支持自动生成全局唯一
ID
、支持
ack
确认消息的模式、支持消费组模式等,让消息队列更加的稳定和可靠

常用命令


XADD
:插入消息,保证有序,可以自动生成全局唯一
ID

XLEN
:查询消息长度;
XREAD
:用于读取消息,可以按
ID
读取数据;
XDEL
: 根据消息
ID
删除消息;
DEL
:删除整个
Stream

XRANGE
:读取区间消息
XREADGROUP
:按消费组形式读取消息
XPENDING

XACK


XPENDING
命令可以用来查询每个消费组内所有消费者「已读取、但尚未确认」的消息;
XACK
命令用于向消息队列确认消息处理已完成

层级储存


Redis
没有类似
MySQL
中的
Table
的概念,
Redis

key
允许有多个单词形成层级结构,多个单词之间用
:
隔开,格式如: 项目名:业务名:类型:id

一旦我们向
Redis
采用这样的方式存储,那么在可视化界面中,
Redis
会以层级结构来进行存储,形成类似于这样的结构,更加方便
Redis
获取数据

Redis

Redis缓存

缓存模型与思想

标准的操作方式就是查询数据库之前先查询缓存,如果缓存数据存在,则直接从缓存中返回,如果缓存数据不存在,再查询数据库,然后将数据存入
Redis

Redis

缓存淘汰


Redis
中的内存不够用时,此时再向
Redis
中添加新的
key
,那么
Redis
就会按照某一种规则将内存中的数据删除掉,这种数据的删除规则被称之为内存的淘汰策略


noeviction
:不淘汰任何
key
,但是内存满时不允许写入新数据,默认是该策略
volatile-ttl
:对设置了
TTL

key
,比较
key
的剩余
TTL

TTL
越小越先被淘汰
allkeys-random
:对全体
key
,随机进行淘汰
volatile-random
:对设置了
TTL

key
,随机进行淘汰
allkeys-lru
:对全体
key
,基于
LRU
算法进行淘汰
volatile-lru
:对设置了
TTL

key
,基于
LRU
算法进行淘汰
allkeys-lfu
:对全体
key
,基于
LFU
算法进行淘汰
volatile-lfu
:对设置了
TTL

key
,基于
LFU
算法进行淘汰

优先使用
alkeys-lru
策略。充分利用
LRU
算法的优势,把最近最常访问的数据留在缓存中。如果业务有明显的冷热数据区分,建议使用如果业务中数据访问频率差别不大,没有明显冷热数据区分,建议使用
alkeys-random
,随机选择淘汰如果业务中有置顶的需求,可以使用
volatile-lru
策略,同时置顶数据不设置过期时间,这些数据就一直不被删除,会淘汰其他设置过期时间的数据。如果业务中有短时高频访问的数据,可以使用
allkeys-lfu

volatile-lfu
策略

先进先出
FIFO
:最新的数据加入时,最老的数据淘汰最近最少使用
LRU
:双向链表+哈希表的形式存储,双向链表用于组织数据,数据采用链表节点进行组织,最经常访问的排在前面,哈希表用于存储键
key
和值链表节点。当某个
key
被访问,程序通过哈希表迅速定位节点,并将该节点调整至链表的最开始位置最不经常使用
LFU
:元素会绑定被访问次数,形成一个链表,优先淘汰访问次数少的元素

Redis

缓存更新策略(双写一致性)

由于缓存的数据源来自于数据库,而数据库的数据是会发生变化的,因此,如果当数据库中数据发生变化,而缓存却没有同步,此时就会有一致性问题存在

双写一致性:当修改了数据库的数据也要同时更新缓存的数据,缓存和数据库的数据要保持一致

删除缓存还是更新缓存?
更新数据库时让缓存失效,查询时再更新缓存
先操作缓存还是先操作数据库?
先操作缓存,再操作数据库,会有脏数据的情况,缓存和数据库不一致

Redis


- 先操作数据库,再删除缓存,也会有脏数据的情况,缓存和数据库不一致


1

Redis


- 因此采用延时双删策略,待数据库修改完成后,延时一段时间,再删除一次缓存


1

Redis

如何保证缓存与数据库的双写一致
采用分布式锁的方式,每一个线程获取锁来操作完整个缓存的更新,但性能低下。可以通过读写锁来提高性能,只适用于强一致的业务采用异步通知保证数据的最终一致性,但需要保证
MQ
的可靠性

Redis

缓存过期删除

惰性删除:设置该
key
过期时间后,当需要该
key
时,检查其是否过期,如果过期,就删除,否则返回该
key

优点:对
CPU
友好,只会在使用该
key
时才会进行过期检查,对于很多用不到的
key
不用浪费时间进行检查

缺点:对内存不友好,如果一个
key
已经过期,但是一直没有使用,那么该
key
就会一直存在内存中

定期删除:每隔一段时间,对一些
key
进行检查,删除其中过期的
key
(从一定数量的数据库中取出一定数量的随机
key
进行检查,并删除其中的过期
key


SLOW
模式是定时任务,执行频率默认为
10hz
,每次不超过
25ms
,以通过修改配置文件
redis.conf
的hz 选项来调整这个次数
FAST
模式执行频率不固定,但两次间隔不低于
2ms
,每次耗时不超过
1ms

优点:可以通过限制删除操作执行的时长和频率来减少删除操作对 CPU 的影响。另外定期删除,也能有效释放过期键占用的内存。

缺点:难以确定删除操作执行的时长和频率。

缓存穿透

缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会打到数据库。因此恶意请求可以通过请求不存在的数据,一直请求到数据库进行攻击。

缓存空对象

即使数据库返回空对象,仍将空对象缓存至
Redis
中,下次用户访问这个不存在的数据,那么在
Redis
中也能找到这个数据就不会请求数据库

Redis


- <font>优点:实现简单,维护方便</font>
- <font>缺点:</font>
    * <font>额外的内存消耗</font>
    * <font>可能造成短期的不一致</font>


1234

布隆过滤

布隆过滤器其实采用的是哈希思想来解决这个问题,通过一个庞大的二进制数组,走哈希思想去判断当前这个要查询的这个数据是否存在,若布隆过滤器判断存在,则放行,这个请求会去访问
Redis
,哪怕此时
Redis
中的数据过期了,但是数据库中一定存在这个数据,在数据库中查询出来这个数据后,再将其放入到
Redis
中。假设布隆过滤器判断这个数据不存在,则直接返回

Redis


bitmap
(位图):相于一个以
bit
位为单位的数组,数组中每个单元只能存储二级制数
0

1
。数据在存储时通过多个哈希计算得到在位图上的位置并置
1
。因此可以查询数据是否存在。预热时就是将位图进行填充。

数组越小误判率就越大,数组越大误判率就越小,但同时带来的更多的内存消耗

缓存击穿

缓存击穿问题也叫热点
Key
问题,就是一个被高并发访问并且缓存重建业务较复杂的
key
突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击。

Redis

假设线程
1
在查询缓存之后,本来应该去查询数据库,然后把这个数据重新加载到缓存的,此时只要线程
1
走完这个逻辑,其他线程就都能从缓存中加载这些数据了,但是假设在线程
1
没有走完的时候,后续的线程
2
,线程
3
,线程
4
同时过来访问当前这个方法, 那么这些线程都不能从缓存中查询到数据,那么他们就会同一时刻来访问查询缓存,都没查到,接着同一时间去访问数据库,同时的去执行数据库代码,对数据库访问压力过大

互斥锁

因为锁能实现互斥性。假设线程过来,只能单一来访问数据库,从而避免对于数据库访问压力过大,但这也会影响查询的性能,因为此时会让查询性能从并行变成了串行,我们可以采用
tryLock
方法 +
double check
来解决这样的问题。

Redis

强一致性,但性能差

代码

利用
Redis

setnx
方法来表示锁。该方法含义是
Redis
中若没有这个
key
,则插入成功,返回
1
,在
stringRedisTemplate
中返回
true
;若有这个
key
则插入失败返回
0
,在
stringRedisTemplate
返回
false
。通过
true

false
来判断是否有线程在获取和释放锁。


private boolean tryLock(String key) {
    Boolean flag = stringRedisTemplate.opsForValue().setIfAbsent(key, "1", 10, TimeUnit.SECONDS);
    return BooleanUtil.isTrue(flag);
}

private void unlock(String key) {
    stringRedisTemplate.delete(key);
}

java
运行12345678

// 获取锁
String lockKey = "lock:shop:" + id;
Shop shop = null;
try {
    boolean isLock = tryLock(lockKey);
    // 判断否获取成功
    if(!isLock){
        //4.3 失败,则休眠重试
        Thread.sleep(50);
        return queryWithMutex(id);
    }
    // 成功,根据id查询数据库
    shop = getById(id);
    // 不存在,返回错误
    if(shop == null){
        // 将空值写入redis
        stringRedisTemplate.opsForValue().set(key,"",CACHE_NULL_TTL,TimeUnit.MINUTES);
        // 返回错误信息
        return null;
    }
    // 写入redis
    stringRedisTemplate.opsForValue().set(key,JSONUtil.toJsonStr(shop),CACHE_NULL_TTL,TimeUnit.MINUTES);

}catch (Exception e){
    throw new RuntimeException(e);
}
finally {
    //7.释放互斥锁
    unlock(lockKey);
}

java
运行
Redis123456789101112131415161718192021222324252627282930

逻辑过期

之所以会出现缓存击穿,原因是对
key
设置了过期时间,若不设置过期时间,就不会有缓存击穿问题。通过采用逻辑过期方案,将过期时间设置在
redis

value
中,在查询数据时通过
value
来判断当前数据是否过期,若过期,则获取互斥锁,并开启一个新线程去重构数据。

巧妙在于,异步的构建缓存,缺点在于在构建完缓存之前,返回的都是脏数据。

Redis

高可用,性能优,但会出现脏数据

缓存雪崩

缓存雪崩指在同一时段大量的缓存
key
同时失效或者
Redis
服务器宕机,导致大量请求到达数据库,带来巨大的压力。

Redis

给不同的
key

TTL
添加随机值利用
Redis
集群提高服务的可用性给缓存业务添加降级限流策略给业务添加多级缓存

Redis持久化

AOF持久化

AOF日志


AOF
全称为
Append Only File
(追加文件)。
Redis
处理的每一个写命令都会记录在
AOF
文件,可以看作是命令日志文件(只记录写操作命令,读操作命令不记录)。
Redis
先执行写操作命令,再将命令记录到
AOF
日志中

可以避免额外的检查开销,不用检查语法,因为命令执行成功后才会写入到
AOF
日志中不阻塞当前写操作命令,当写操作命令执行成功后,才会将命令记录到
AOF
日志但是因为两步操作,会有丢失数据的风险虽然不阻塞当前命令,但是有可能会阻塞下一个命令

写回策略

![](https://i-blog.csdnimg.cn/img_convert/3a779d69b7097c0bba410fac3668d807.png)

Redis
执行完写操作命令后,会将命令追加到
server.aof_buf
缓冲区然后通过
write()
系统调用,将
aof_buf
缓冲区的数据写入到
AOF
文件,此时数据并没有写入到硬盘,而是拷贝到了内核缓冲区
page cache
,等待内核将数据写入硬盘而内核缓冲区何时写入硬盘,由内核决定(
redis.conf

appendfsync
配置项可以有三种策略)

Always
:每次写操作命令执行完后,同步将
AOF
日志数据写回硬盘
Everysec
:每次写操作命令执行完后,先将命令写入到
AOF
文件的内核缓冲区,然后每隔一秒将缓冲区里的内容写回到硬盘
No
:写操作命令执行完后,先将命令写入到
AOF
文件的内核缓冲区,再由操作系统决定何时将缓冲区内容写回硬盘

重写机制


AOF
日志文件随着执行的写操作命令越来越多,文件大小也会越来越大,而过大的
AOF
日志文件就会带来性能问题,因为重启
Redis
后,需要读
AOF
文件的内容以恢复数据,过大
AOF
日志文件会导致恢复过程很慢。因此
AOF
重写机制保证当
AOF
文件超过设定阈值,就会压缩
AOF
文件


AOF
重写机制重写时,读取当前数据库中的所有键值对,然后将每个键值对用一条命令记录到新的
AOF
文件,等到全部记录完后,就将新的
AOF
文件替换掉现有的
AOF
文件,此时重复写入的命令就会合并。尽管某个键值对被多条写命令反复修改,最终也只需要根据这个键值对当前的最新状态,然后用一条命令去记录键值对,代替之前记录这个键值对的多条命令。

后台重写


Redis
的重写
AOF
过程是由后台子进程
bgrewriteaof
来完成

子进程进行
AOF
重写期间,主进程可以继续处理命令请求,从而避免阻塞主进程子进程带有主进程的数据副本,不需要像多线程那样共享内存加锁。创建子进程时,父子进程是共享内存数据的,不过这个共享的内存只能以只读的方式,而当父子进程任意一方修改了该共享内存,就会发生写时复制,于是父子进程就有了独立的数据副本,不用加锁来保证数据安全

在子进程重写
AOF
的日志过程中,若主进程修改已经存在的
key-value
,此时
key-value
数据在子进程的内存数据就跟主进程的内存数据不一致了。
Redis
设置一个
AOF
重写缓冲区,该缓冲区在创建
bgrewriteaof
子进程之后开始使用。在重写
AOF
期间,当
Redis
执行完一个写命令后,会同时将这个写命令写入到
AOF
缓冲区和
AOF
重写缓冲区

Redis

子进程完成重写工作后,会向主进程发送一条信号,主进程收到该信号后,会调用一个信号处理函数


AOF
重写缓冲区中的所有内容追加到新的
AOF
的文件中新的
AOF
的文件进行改名,覆盖现有的
AOF
文件

RDB持久化


RDB
全称为
Redis Database Backup file

Redis
数据备份文件),也被叫做
Redis
数据快照。当
Redis
实例故障重启后,从磁盘读取快照文件,恢复数据。快照文件称为
RDB
文件,默认是保存在当前运行目录。
RDB
快照就是记录某一个瞬间的内存数据,记录的是实际数据


RDB
持久化在四种情况下会执行:

执行
save
命令:
save
命令会导致主进程执行
RDB
,这个过程中其它所有命令都会被阻塞。只有在数据迁移时可能用到执行
bgsave
命令:这个命令执行后会开启独立进程完成
RDB
,主进程可以持续处理用户请求,不受影响

bgsave
开始时会
fork
主进程得到子进程,子进程共享主进程的内存数据。完成
fork
后读取内存数据并写入
RDB
文件中

Redis
停机时:
Redis
停机时会执行一次
save
命令,实现
RDB
持久化触发
RDB
条件时
Redis
内部有触发
RDB
的机制,可以在
redis.conf
文件中找到

写时复制(Copy-On-Write,COW)

在执行
bgsave
过程中,由于是交给子进程来构建
RDB
文件,主线程还是可以继续工作的,通过写时复制技术,可以让主线程也能进行修改数据。执行
bgsave
命令的时候,会通过
fork()
创建子进程,此时子进程和父进程是共享同一片内存数据的,因为创建子进程的时候,会复制父进程的页表,但是页表指向的物理内存还是一个,和
AOF
重写机制相似

当主线程(父进程)要修改共享数据里的某一块数据(比如键值对
A
)时,就会发生写时复制,于是这块数据的物理内存会被复制一份(键值对
A
),然后主线程在这个数据副本(键值对
A
)进行修改操作而
bgsave
子进程可以继续把原来的数据(键值对
A
)写入到
RDB
文件。因此发生写时复制后,
RDB
快照保存的是先前的内容

RBD和AOF合体使用


Redis4.0
通过混合使用
AOF
日志和
RBD
内存快照(混合持久化),使得既有恢复速度快,又有
AOF
丢失数据少的优点

在混合持久化下,在
AOF
重写日志时,


fork
出来的重写子进程会先将与主线程共享的内存数据以
RDB
方式写入到
AOF
文件主线程处理的操作命令会被记录在重写缓冲区里,重写缓冲区里的增量命令会以
AOF
方式写入到
AOF
文件写入完成后通知主进程将新的含有
RDB
格式和
AOF
格式的
AOF
文件替换旧的的
AOF
文件。


AOF
文件的前半部分是
RDB
格式的全量数据,后半部分是
AOF
格式的增量数据,是
Redis
后台子进程重写
AOF
期间主线程处理的操作命令

Redis分布式

主从复制

主从复制(高并发):单点
Redis
的并发能力是有上限的,要进一步提高
Redis
的并发能力,就需要搭建主从集群,实现读写分离。并且单台服务器下,若出现故障,则数据有可能全部丢失。

Redis

所有的数据修改只在主服务器上进行,然后将最新的数据同步给从服务器,以使得数据一致

第一次同步-主从全量同步

Replication ld
:简称
replid
,是数据集的标记,
id
一致则说明是同一数据集。其中每一个
master
都有唯一的
replid

slave
则会继承
master
节点的
replid

offset
:偏移量,随着记录在
repl_baklog
中的数据增多而逐渐增大。
slave
完成同步时会记录当前同步的
offset
。若
slave

offset
小于
master

offset
,说明
slave
数据落后于
master
,需要更新

Redis


    1. 从节点执行replicaof命令,发生psync命令请求主节点同步数据(replication id、offset)
    2. 主节点判断是否是第一次请求,第一次就与从节点同步版本信息(replication id和offset)
    3. 主节点执行bgsave(不阻塞主线程),生成RDB文件后,发送从节点执行,从节点收到RDB文件后,清空当前数据,载入RDB文件
        + 但是在生成RDB文件时新增的数据不会同步,因此主服务器在下面这三个时间间隙中将收到的写操作命令,写入到replication buffer缓冲区里
            - 主服务器生成RDB文件期间
            - 主服务器发送RDB文件给从服务器期间
            - 从服务器加载RDB文件期间
    4. 把生成之后的命令日志文件发送给从节点进行同步
    5. 最后,主服务器将replication buffer缓冲区里所记录的写操作命令发送给从服务器,从服务器执行后,主从服务器数据同步成功
Redis123456789

命令传播

主从服务器完成第一次同步后,双方就会维护一个
TCP
连接,该连接是长连接,避免频繁的
TCP
连接和断开带来的性能开销

Redis

主从增量同步(
slave
重启或网络断开重连,命令传播失效阶段)

Redis


    1. 从节点请求主节点同步数据,主节点判断不是第一次请求,不是就获取从节点的offset值
    2. 主节点从命令日志中获取</font>`<font>offset</font>`<font>值之后的数据,发送给从节点进行数据同步</font>
        1. repl_backlog_buffer,是一个环形缓冲区,用于主从服务器断连后,从中找到差异的数据(如果 offset 已经“掉出窗口”(被环形缓冲区覆盖了),主库无法做部分重同步,此时就会做全量同步)
        2. replication offset,标记上面那个缓冲区的同步进度,主从服务器都有各自的偏移量,主服务器使用master_repl_offset记录写到的位置,从服务器使用slave_repl_offset记录读到的位置。


1234

在主服务器进行命令传播时,不仅将写命令发送给从服务器,还将写命令写入
repl_backlog_buffer
缓冲区里,因此这个缓冲区里会保存着最近传播的写命令

哨兵

在主从架构中,由于主从模式是读写分离,若主节点(
master
)故障宕机,则将没有主节点来服务客户端的写操作请求,也没有主节点给从节点(
slave
)进行数据同步


Redis
提供了哨兵(
Sentinel
)机制来实现主从集群自动故障恢复,结构和作用如下

Redis

监控:
Sentinel
会不断检查
master

slave
是否按预期工作,
Sentinel
基于心跳机制监测服务状态,每隔
1
秒向集群的每个实例发送
ping
命令
主观下线:如果某
sentinel
节点发现某实例未在规定时间响应,则认为该实例主观下线。客观下线:若超过指定数量(
quorum
)的
sentinel
都认为该实例主观下线,则该实例客观下线。
quorum
值最好超过
Sentinel
实例数量的一半。
自动故障恢复:如果
master
故障,
Sentinel
会将一个
slave
提升为
master
。当故障实例恢复后也以新的
master
为主。
首先判断主与从节点断开时间长短,如超过指定值就排除该从节点(此节点不会作为主节点)然后判断从节点的
slave-priority
值,越小优先级越高若
slave-prority
一样,则判断
slave
节点的
offset
值,越大优先级越高(和主节点数据越一致)最后是判断
slave
节点的运行
id
大小,越小优先级越高

脑裂问题:主节点与从节点以及哨兵在不同的网络分区下,哨兵与主节点的网络连接断开,此时,哨兵就会在从节点中选取一个主节点。但此时客户端与之前的主节点还连接着,往内写入数据,却不会同步到从结点中。之后网络恢复,此时之前主节点和断开的从节点连接在一起,但此时有两个主节点,而之前的主节点会被降级回从节点,之前写入的数据会被刷掉,此时数据丢失

解决方案:
redis
中有两个配置参数:,
min-replicas-to-write 1
表示最少的
salve
节点为
1
个;
min-replicas-max-lag 5
表示数据复制和同步的延迟不能超过
5
秒。不满足上述条件,则不允许客户端数据写入。

通知:
Sentinel
充当
Redis
客户端的服务发现来源,当集群发生故障转移时,会将最新信息推送给
Redis
的客户端

集群

集群中有多个
master
,每个
master
保存不同的数据,每个
master
都可以有多个
slave
节点,
master
之间通过
ping
监测彼此健康状态。客户端请求可以请求访问请求中的任意节点,最终都会被转发到正确节点

Redis

数据读写:
Redis
分片集群引入了哈希槽的概念,
Redis
集群中有
16384
个哈希槽,每个
key
通过
CRC16
校验后对
16384
取模来决定放置在哪个槽,集群的每个节点负责一部分
Hash
槽。并且可以对
key
的一部分作为
hash
计算的输入

Redis 允许对 key 的一部分进行哈希计算(称为 hash tag)。
写法是把 key 的一部分放在大括号
{}
中,比如:


- user:{1001}:name
- user:{1001}:age


12

这样 CRC16 时,只会对
{1001}
计算槽号。结果就是这些 key 一定落到同一个槽(同一个节点),方便在同节点上做事务或 Lua 脚本。

Redis

redis使用

redis安装与启动

安装

安装后重点文件说明:


/usr/local/redis-4.0.0/src/redis-server

Redis
服务启动脚本
/usr/local/redis-4.0.0/src/redis-cli

Redis
客户端脚本
/usr/local/redis-4.0.0/redis.conf

Redis
配置文件

启动

**服务端启动命令**


redis-server.exe redis.windows.conf

shell
1


Redis
服务默认端口号为
6379
,通过快捷键
Ctrl + C
即可停止
Redis
服务,当
Redis
服务启动成功后,可通过客户端进行连接。

客户端连接命令


redis-cli.exe

shell
1

通过
redis-cli.exe
命令默认连接的是本地的
redis
服务,并使用默认6379端口。也可以通过指定如下参数连接:


-h ip地址

-p 端口号

-a 密码
(如果需要)

修改
Redis
配置文件

设置
Redis
服务密码,修改
redis.windows.conf


requirepass 123456

shell
1

修改密码后需要重启
Redis
服务才能生效

Jedis

导入依赖


<dependency>
  <groupId>org.junit.jupiter</groupId>
  <artifactId>junit-jupiter-api</artifactId>
  <version>5.10.2</version>
  <scope>test</scope>
</dependency>

xml
123456

线程池建立连接


public class JedisConnectionFactory{
    private static final JedisPool jedisPool;
    static {
        JedisPoolConfig jedisPoolconfig = new JedisPoolconfig();
        // 最大连接
        jedisPoolConfig.setMaxTotal(8);
        // 最大空闲连接
        jedisPoolConfig.setMaxIdle(8);
        // 最小空闲连接
        jedisPoolconfig.setMinIdle(0);
        // 设置最长等待时间,ms
        jedisPoolConfig.setMaxWaitMillis(200);
        jedisPool = new JedisPool(jedisPoolconfig,"192.168.150.101",6379,1000,“123321);
    }
    
    // 获取Jedis对象
    public static Jedis getJedis() {
        return jedisPool.getResource();
    }
}                                 

java
运行
Redis1234567891011121314151617181920

使用Jedis


String result = jedis.set("hello", "world");
String name = jedis.get("hello");

java
运行12

`SpringDataRedis`

配置
`maven`坐标

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

xml
1234
配置`Redis`数据源

spring:
  redis:
    host: 192.168.150.101
    port: 6379
    password: 123321
    lettuce:
      pool:
        max-active: 8  #最大连接
        max-idle: 8   #最大空闲连接
        min-idle: 0   #最小空闲连接
        max-wait: 100ms #连接等待时间

yaml

Redis1234567891011
编写配置类

@Configuration
@slf4j
public class RedisConfiguration{
    @Bean
    public RedisTemplate redisTemplate(RedisconnectionFactory redisConnectionfactory) {
        // 创建RedisTemplate对象
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        // 设置连接工厂
        template.setConnectionFactory(connectionFactory);
        // 创建JSON序列化工具
        GenericJackson2JsonRedisSerializer jsonRedisSerializer = 
                                        new GenericJackson2JsonRedisSerializer();
        // 设置Key的序列化
        template.setKeySerializer(RedisSerializer.string());
        template.setHashKeySerializer(RedisSerializer.string());
        // 设置Value的序列化
        template.setValueSerializer(jsonRedisSerializer);
        template.setHashValueSerializer(jsonRedisSerializer);
        // 返回
        return template;
    }
}

java
运行
Redis12345678910111213141516171819202122
`Java`中操作`Redis`
`String`类型

public void testString(){
    //SET key value					设置指定key的值
    redisTemplate.opsForValue().set("city","北京");
    
    //GET key						获取指定key的值
    String city =(String) redisTemplate.opsForValue().get("city");
    
    //SETEX key seconds value		设置指定key的值,并将key的过期时间设为seconds秒或分钟
    redisTemplate.opsForValue().set("code", "1234", 3, TimeUnit.MINUTES);
    
    //SETNX key value				只有在key不存在时设置key的值
    redisTemplate.opsForValue().setIfAbsent("city","北京");
}

java
运行
Redis12345678910111213
`Hash`类型

public void testHash(){

    HashOperations hashOperations = redisTemplate.opsForHash();

    //HSET key field value		将哈希表 key 中的字段 field 的值设为 value
    hashOperations.put("100", "name","tom");
    hashOperations.put("100", "age","20");

    //HGET key field			获取存储在哈希表中指定字段的值
    String name = (String) hashOperations.get("100", "name");

    //HKEYS key					获取哈希表中所有字段
    Set keys = hashOperations.keys("100");

    //HVALS key					获取哈希表中所有值
    List values = hashOperations.values("100");

    //HDEL key field			删除存储在哈希表中的指定字段
    hashOperations.delete("100","age");
}

java
运行
Redis1234567891011121314151617181920
`List`类型

public void testList(){
    //lpush lrane rpop llen
    ListOperations listOperations =redisTemplate.opsForList();

    //LPUSH key value1 [value2]		将一个或多个值插入到列表头部
    listOperations.leftPushAll("mylist","a","b","c");
    listOperations.leftPush("mylist","d");

    //LRANGE key start stop			获取列表指定范围内的元素
    List mylist= listOperations.range("mylist",0,-1);

    //RPOP key						移除并获取列表最后一个元素
    listOperations.rightPop("mylist");

    //LLEN key						获取列表长度
    Long size = listOperations.size("mylist");
}


java
运行
Redis123456789101112131415161718
`Set`类型

public void testset(){
    
    SetOperations setOperations = redisTemplate.opsForset();

    //SADD key member1[member2]		向集合添加一个或多个成员
    setOperations.add("set1","a","b","c","d");
    setOperations.add("set2","a","b","x","y");

    //SMEMBERS key					返回集合中的所有成员
    Set members =setOperations.members("set1");

    //SCARD key						获取集合的成员数
    Long size = setOperations.size("set1");

    //SINTER key1 [key2].			返回给定所有集合的交集
    Set intersect = setOperations.intersect("set1","set2");

    //SUNION key1 [key2]。			返回所有给定集合的并集公
    Set union = setOperations.union("set1","set2");

    //SREM key member1 [member2]	删除集合中一个或多个成员
    setOperations.remove("set1","a","b");
}

java
运行
Redis1234567891011121314151617181920212223
有序集类型

public void testzset(){
    
    ZSetOperations zSetOperations =redisTemplate.opsForZset();

    //ZADD key score1 member1 [score2 member2]	向有序集合添加一个或多个成员
    zSetOperations.add("zset1","a",10);
    zSetOperations.add("zset1","b",12);
    zSetOperations.add("zset1","c",9);

    //ZRANGE key start stop [WITHSCORES]		通过索引区间返回有序集合中指定区间内的成员
    Set zset1 = zSetOperations.range("zset1",0,-1);

    //ZINCRBY key increment member				有序集合中对指定成员分数加上增加increment
    zSetOperations.incrementscore("zset1","c",10);

    //ZREM key member [member ...]					移除有序集合中的一个或多个成员
    zSetOperations.remove("zset1","a","b");
}

java
运行
Redis123456789101112131415161718
通用命令

public void testCommon(){
    
    //KEYS pattern		查找所有符合给定模式( pattern)的 key
    Set keys = redisTemplate.keys("*");System.out.println(keys);

    //EXISTS key		检查给定 key 是否存在
    Boolean name =redisTemplate.hasKey("name");
    Boolean set1=redisTemplate.hasKey("set1");

    //TYPE key			返回 key 所储存的值的类型
    for(object key : keys){
        forDataType type = redisTemplate.type(key);
    }

    //DEL key			该命令用于在 key 存在时删除 key
    redisTemplate.delete( key:"mylist");
}

java
运行
Redis1234567891011121314151617

通用功能

全局`ID`生成

一种在分布式系统下用来生成全局唯一
ID
的工具,一般要满足下列特性

Redis

通过
Redis
的自增功能,实现全局唯一
ID
的生成

为了增加
ID
的安全性,我们可以不直接使用
Redis
自增的数值,而是拼接一些其它信息

Redis

符号位:
1bit
,永远为
0
,表示正数时间戳:
31bit
,以秒为单位,可以使用
69
年序列号:
32bit
,秒内的计数器,支持每秒产生
2^32
个不同
ID


@Component
public class RedisIdWorker {
    /**
     * 开始时间戳
     */
    private static final long BEGIN_TIMESTAMP = 1640995200L;
    /**
     * 序列号的位数
     */
    private static final int COUNT_BITS = 32;

    private StringRedisTemplate stringRedisTemplate;

    public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    public long nextId(String keyPrefix) {
        // 1.生成时间戳
        LocalDateTime now = LocalDateTime.now();
        long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
        long timestamp = nowSecond - BEGIN_TIMESTAMP;

        // 2.生成序列号
        // 2.1.获取当前日期,精确到天
        String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
        // 2.2.自增长
        long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);

        // 3.拼接并返回
        return timestamp << COUNT_BITS | count;
    }
}

java
运行
Redis123456789101112131415161718192021222324252627282930313233

线程锁

多线程安全问题,可以用两种锁来解决

Redis

悲观锁:

悲观锁可以实现对于数据的串行化执行,比如
syn
,和
lock
都是悲观锁的代表,同时,悲观锁中又可以再细分为公平锁,非公平锁,可重入锁,等等

乐观锁:

有一个版本号,每次操作数据会对版本号
+1
,提交回数据时,会去校验是否比之前版本大
1
,如果大
1
,则操作成功,否则说明有其他线程进行了修改,因此线程不安全。

也可以直接用库存量作为版本号


boolean success = seckillVoucherService.update()
            .setSql("stock = stock -1") //set stock = stock -1
            .eq("voucher_id", voucherId).eq("stock",voucher.getStock()).update(); 
            //where id = ? and stock = ?

java
运行1234

分布式锁

Redis获取锁

互斥:确保只能有一个线程获取锁,利用
SETNX
的互斥特性非阻塞:尝试一次,成功返回
true
,失败返回
false


SET lock thread 
expire lock 10	// 设置超时时间 避免未释放锁就宕机情况

//合为一条 NX是互斥 EX是设置超时时间
set lock thread NX EX 10

// 释放锁
DEL key

shell
12345678

代码实现


public boolean tryLock(long timeoutSec) {
    // 获取线程标示
    String threadId = ID_PREFIX + Thread.currentThread().getId();
    // 获取锁
    Boolean success = stringRedisTemplate.opsForValue()
                .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
    return Boolean.TRUE.equals(success);
}

java
运行12345678
Redis释放锁

手动释放超时释放:获取锁时添加一个时间


DEL lock

shell
1

但是在释放锁的情况下,有可能出现误删情况

Redis

持有锁的线程在锁的内部出现了阻塞,导致他的锁自动释放,这时其他线程,线程
2
来尝试获得锁,就拿到了这把锁,然后线程
2
在持有锁执行过程中,线程
1
反应过来,继续执行,而线程
1
执行过程中,走到了删除锁逻辑,此时就会把本应该属于线程
2
的锁进行删除,这就是误删别人锁的情况说明

解决方法:在获取锁时存入线程标示(可以用UUID表示) 在释放锁时先获取锁中的线程标示,判断是否与当前线程标示一致


public void unlock() {
    // 获取线程标示
    String threadId = ID_PREFIX + Thread.currentThread().getId();
    // 获取锁中的标示
    String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
    // 判断标示是否一致
    if(threadId.equals(id)) {
        // 释放锁
        stringRedisTemplate.delete(KEY_PREFIX + name);
    }
}

java
运行
Redis1234567891011

但是由于线程的拿锁,比锁和删锁操作并不是原子性操作,仍然会有失败可能。


Redis
提供了
Lua
脚本功能,在一个脚本中编写多条
Redis
命令,确保多条命令执行时的原子性。


redis.call('命令名称', 'key', '其它参数', ...)

# 执行 set name jack
redis.call('set', 'name', 'jack')

# 先执行 set name jack
redis.call('set', 'name', 'Rose')
# 再执行 get name
local name = redis.call('get', 'name')
# 返回
return name

lua
运行
Redis1234567891011

-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致
if (redis.call('GET', KEYS[1]) == ARGV[1]) then
  -- 一致,则删除锁
  return redis.call('DEL', KEYS[1])
end
-- 不一致,则直接返回
return 0

lua
运行12345678

private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
    UNLOCK_SCRIPT = new DefaultRedisScript<>();
    UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
    UNLOCK_SCRIPT.setResultType(Long.class);
}

public void unlock() {
    // 调用lua脚本
    stringRedisTemplate.execute(
        UNLOCK_SCRIPT,
        Collections.singletonList(KEY_PREFIX + name),
        ID_PREFIX + Thread.currentThread().getId());
}

java
运行
Redis1234567891011121314
分布式锁-Redission

基于
setnx
实现的分布式锁存在下面的问题:

不可重入问题:重入问题是指获得锁的线程可以再次进入到获取相同的锁的代码块中,可重入锁的意义在于防止死锁,比如
HashTable
这样的代码中,他的方法都是使用
synchronized
修饰的,假如他在一个方法内,调用另一个方法,那么此时如果锁是不可重入的,就会产生死锁。所以可重入锁的主要意义为了是防止死锁,我们的
synchronized

Lock
锁都是可重入的。不可重试:是指目前分布式只能尝试一次,合理的情况是:当线程获得锁失败后,他应该能再次尝试获得锁主从一致性:如果
Redis
提供了主从集群,当向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。


Redission
实现的分布式执行流程

Redis


Redission
的加锁,设置过期时间等操作都是基于
Lua
脚本完成,以保证原子性若业务时间较长,超过锁的时间,则需要对锁续期。
Redission
则设置了
Watch dog
线程完成锁的续期
Redission
在获取锁时,在设置时间内会不断循环尝试
Redission
实现的分布式锁是可重入的
利用
Hash
结构记录线程
id
和重入次数
为了保证主从一致性,可以采用
RedLock
(红锁):不止在一个
redis
实例上创建锁,而是应该是要在多个
redis
实例上创建锁
(n/2+1)
,避免在一个redis实例上加锁【但是维护复杂,实在要强一致,可以采用
Zookeeper

引入依赖


<dependency>
  <groupId>org.redisson</groupId>
  <artifactId>redisson</artifactId>
  <version>3.13.6</version>
</dependency>

xml
12345

配置Redisson客户端


@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient(){
        // 配置
        Config config = new Config();
        // 添加redis地址,这里添加单点地址,也可以使用config.useClusterServers()添加集群地址
        config.useSingleServer().setAddress("redis://192.168.150.101:6379")
                .setPassword("123321");
        
        // 创建RedissonClient对象
        return Redisson.create(config);
    }
}

java
运行
Redis123456789101112131415

使用Redission的分布式锁


消息x@Resource
private RedissionClient redissonClient;

@Test
void testRedisson() throws Exception{
    //获取锁(可重入),指定锁的名称
    RLock lock = redissonClient.getLock("anyLock");
    //尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位
    boolean isLock = lock.tryLock(10,10,TimeUnit.SECONDS);
    //判断获取锁成功
    if(isLock){
        try{
            System.out.println("执行业务");          
        }finally{
            //释放锁
            lock.unlock();
        }
    }
}

java
运行
Redis12345678910111213141516171819

`Redis`消息队列

存放消息的队列。最简单的消息队列模型包括3个角色:

消息队列:存储和管理消息,也被称为消息代理(
Message Broker
)生产者:发送消息到消息队列消费者:从消息队列获取消息并处理消息

Redis

基于List实现消息队列


Redis

list
数据结构是一个双向链表,很容易模拟出队列效果。

通过
BRPOP
或者
BLPOP
来实现阻塞效果

Redis

优点:

利用
Redis
存储,不受限于
JVM
内存上限基于
Redis
的持久化机制,数据安全性有保证可以满足消息有序性

缺点:

无法避免消息丢失只支持单消费者

基于PubSub的消息队列

消费者可以订阅一个或多个`channel`,生产者向对应`channel`发送消息后,所有订阅者都能收到相关消息。

SUBSCRIBE channel [channel]
:订阅一个或多个频道
PUBLISH channel msg
:向一个频道发送消息
PSUBSCRIBE pattern[pattern]
:订阅与
pattern
格式匹配的所有频道

h?1lo** subscribes to **hello**, **hallo** and **hxllo**

**h*llo** subscribes to **hllo** and **heeeello**

**h[ae]llo** subscribes to **hello** and **hallo**, but not hillo

Redis

优点:

采用发布订阅模型,支持多生产、多消费

缺点:

不支持数据持久化无法避免消息丢失消息堆积有上限,超出时数据丢失

基于Stream的消息队列

`Stream`是`Redis 5.0`引入的一种新数据类型,可以实现一个功能非常完善的消息队列。


# 发送消息
XADD key [NOMKSTREAM][MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field vlaue [field vlaue]
# [NOMKSTREAM]: 如果队列不存在 是否自动创建队列 默认是自动创建
# [MAXLEN|MINID [=|~] threshold [LIMIT count]]: 设置消息队列的最大消息数量
# *|ID:消息的唯一id,*代表由Redis自动生成。格式是"时间-递增数字",例如"1644804662707-0"
# field vlaue:发送到队列中的消息,称为Entry。格式就是多个key-value键值对

# 创建名为 users 的队列,并向其中发送一个消息,内容是:{name=jack,age=21},并且使用Redis自动生成ID
XADD users * name jack age 21

# 读取消息
XREAD [COUNT count][BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
# [COUNT count]: 每次读取消息的最大数量
# [BLOCK milliseconds]: 当没有消息的时候 是否阻塞 阻塞时长
# STREAMS key:要从哪个队列读取消息,key是队列名
# ID: 起始id,只返回大于该ID的消息 0:代表从第一个消息开始 $:代表从最新的消息开始

# 使用XREAD读取第一个消息
XREAD COUNT 1 STREAMS users 0
# XREAD阻塞方式,读取最新的消息
XREAD COUNT 1 BLOCK 1000 STREAMS users $

shell

Redis123456789101112131415161718192021

在业务开发中,可以循环的调用
XREAD
阻塞方式来查询最新消息,从而实现持续监听队列的效果


while(true){
    //尝试读取队列中的消息,最多阳塞2秒
    Object msg = redis.execute("XREAD COUNT I BLOCK 2000 STREAMS users $");
    if(msg == null) {
        continue;
    }
    // 处理消息
    handleMessage(msg);
}

java
运行123456789

当指定起始
ID

$
时,代表读取最新的消息,如果处理一条消息的过程中,又有超过
1
条以上的消息到达队列,则下次获取时也只能获取到最新的一条,会出现漏读消息的问题


STREAM
类型消息队列的
XREAD
命令特点:

消息可回溯一个消息可以被多个消费者读取可以阻塞读取有消息漏读的风险

消费者组

消费者组(
Consumer Group
):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的消息标示:消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费消息确认:消费者获取消息后,消息处于
pending
状态,并存入一个
pending-list
。当处理完成后需要通过
XACK
来确认消息,标记消息为已处理,才会从
pending-list
移除


# 创建消费者组
XGROUP CREATE key groupName ID [MKSTREAM]
# key: 队列名称
# groupName:消费者组名称
# ID:起始ID标示,$代表队列中最后一个消息,0则代表队列中第一个消息
# MKSTREAM:队列不存在时自动创建队列

# 删除指定的消费者组
XGROUP DESTORY key groupName

# 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupname consumername

# 删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname consumername

# 从消费者组读取消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
# group:消费组名称
# consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
# count:本次查询的最大数量
# BLOCK milliseconds:当没有消息时最长等待时间
# NOACK:无需手动ACK,获取到消息后自动确认
# STREAMS key:指定队列名称
# ID:获取消息的起始ID:  
#							">":从下一个未消费的消息开始
#							其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始

# 确认消息已处理
XACK key group ID [ID ...]

# 查看pending-list
XPENDING key group[[lDLE min-idie-timel start endcount [consumer]]

shell

Redis123456789101112131415161718192021222324252627282930313233

消费者监听消息的基本思路


while(true){
    //尝试监听队列,使用阻塞模式,最长等待 2000毫秒
    Object msg = redis.call("XREADGROUP GROUP gI CI COUNT 1 BLOCK 2000 STREAMS S1>");
    if(msg == null){ 	// null说明没有消息,继续下一次
        continue;
    }
    try {
        //处理消息,完成后一定要ACK
        handleMessage(msg);
    }catch(Exception e){
        while(true){
            // 在pending-listr中取数据来处理
            Object msg = redis.calL("XREADGROUP GROUP gI C1 COUNT 1 STREAMS S1 @");
            if(msg == null){ // null说明没有异常消息,所有消息都已确认,结束循环
                break;
            }
            try {
                //说明有异常消息,再次处理
                handleMessage(msg);
            }catch(Exception e){
                //再次出现异常,记录日志,继续循环
                continue;
            }
        }
    }
}

java
运行
Redis1234567891011121314151617181920212223242526


STREAM
类型消息队列的
XREADGROUP
命令特点:

消息可回溯可以多消费者争抢消息,加快消费速度可以阻塞读取没有消息漏读的风险有消息确认机制,保证消息至少被消费一次

例子:


private class VoucherOrderHandler implements Runnable {

    @Override
    public void run() {
        while (true) {
            try {
                // 1.获取消息队列中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >
                List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                    Consumer.from("g1", "c1"),
                    StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                    StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
                );
                // 2.判断订单信息是否为空
                if (list == null || list.isEmpty()) {
                    // 如果为null,说明没有消息,继续下一次循环
                    continue;
                }
                // 解析数据
                MapRecord<String, Object, Object> record = list.get(0);
                Map<Object, Object> value = record.getValue();
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                // 3.创建订单
                createVoucherOrder(voucherOrder);
                // 4.确认消息 XACK
                stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
            } catch (Exception e) {
                log.error("处理订单异常", e);
                //处理异常消息
                handlePendingList();
            }
        }
    }

    private void handlePendingList() {
        while (true) {
            try {
                // 1.获取pending-list中的订单信息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0
                List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                    Consumer.from("g1", "c1"),
                    StreamReadOptions.empty().count(1),
                    StreamOffset.create("stream.orders", ReadOffset.from("0"))
                );
                // 2.判断订单信息是否为空
                if (list == null || list.isEmpty()) {
                    // 如果为null,说明没有异常消息,结束循环
                    break;
                }
                // 解析数据
                MapRecord<String, Object, Object> record = list.get(0);
                Map<Object, Object> value = record.getValue();
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
                // 3.创建订单
                createVoucherOrder(voucherOrder);
                // 4.确认消息 XACK
                stringRedisTemplate.opsForStream().acknowledge("s1", "g1", record.getId());
            } catch (Exception e) {
                log.error("处理pendding订单异常", e);
                try{
                    Thread.sleep(20);
                }catch(Exception e){
                    e.printStackTrace();
                }
            }
        }
    }
}

java
运行
Redis123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566

`Feed`流(`Redis`滚动分页)


Feed
流为用户持续的提供“沉浸式”的体验,通过无限下拉刷新获取新的信息。
Feed
不需要用户再去推送信息,而是系统分析用户到底想要什么,然后直接把内容推送给用户,从而使用户能够更加的节约时间,不用主动去寻找。


Feed
流有两种模式:


Timeline
:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈
优点:信息全面,不会有缺失。并且实现也相对简单缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低
智能排序:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣信息来吸引用户
优点:投喂用户感兴趣信息,用户粘度很高,容易沉迷缺点:如果算法不精准,可能起到反作用

Feed流的实现模式

拉模式(读扩散)

当张三和李四和王五发了消息后,都会保存在自己的邮箱中,假设赵六要读取信息,那么他会从读取他自己的收件箱,此时系统会从他关注的人群中,把他关注人的信息全部都进行拉取,然后在进行排序


- 优点:比较节约空间,因为赵六在读信息时,并没有重复读取,而且读取完之后可以把他的收件箱进行清楚。
- 缺点:比较延迟,当用户读取数据时才去关注的人里边去读取数据,假设用户关注了大量的用户,那么此时就会拉取海量的内容,对服务器压力巨大。


12

Redis

推模式(写扩散)

推模式是没有写邮箱的,当张三写了一个内容,此时会主动的把张三写的内容发送到他的粉丝收件箱中去,假设此时李四再来读取,就不用再去临时拉取了


- 优点:时效快,不用临时拉取
- 缺点:内存压力大,假设一个大V写信息,很多人关注他, 就会写很多分数据到粉丝那边去


12

Redis

推拉结合模式(读写混合)

推拉模式是一个折中的方案,站在发件人这一段,如果是个普通的人,那么我们采用写扩散的方式,直接把数据写入到他的粉丝中去,因为普通的人他的粉丝关注量比较小,所以这样做没有压力,如果是大V,那么他是直接将数据先写入到一份到发件箱里边去,然后再直接写一份到活跃粉丝收件箱里边去,现在站在收件人这端来看,如果是活跃粉丝,那么大V和普通的人发的都会直接写入到自己收件箱里边来,而如果是普通的粉丝,由于他们上线不是很频繁,所以等他们上线时,再从发件箱里边去拉信息。

Redis

滚动分页

传统了分页在
Feed
流是不适用的,因为
Feed
流中的数据会不断更新,所以数据的脚标也在变化,因此不能采用传统的分页模式。

假设在
t1
时刻,我们去读取第一页,此时
page = 1

size = 5
,那么我们拿到的就是
10~6
这几条记录,假设现在
t2
时候又发布了一条记录,此时
t3
时刻,读取第二页,传入的参数是
page=2 ,size=5
,那么此时读取到的第二页实际上是从
6
开始,然后是
6~2
,那么我们就读取到了重复的数据,所以
feed
流的分页,不能采用原始方案来做。

Redis


Feed
流的滚动分页:记录每次操作的最后一条,然后从这个位置开始去读取数据

我们从
t1
时刻开始,拿第一页数据,拿到了
10~6
,然后记录下当前最后一次拿取的记录,就是
6

t2
时刻发布了新的记录,此时这个
11
放到最顶上,但是不会影响我们之前记录的
6
,此时
t3
时刻来拿第二页,第二页这个时候拿数据,还是从
6
后一点的
5
去拿,就拿到了
5-1
的记录。我们可以采用
sortedSet
来做,可以进行范围查询,并且还可以记录当前获取数据时间戳最小值,就可以实现滚动分页了

Redis

滚动分页查询参数


max
:当前时间戳 | 上一次查询的最小时间戳
min
:时间戳最小值
0

offset

0
| 上一次查询的最小值的个数
count
:查询条数


@Override
public Result queryBlogOfFollow(Long max, Integer offset) {
    // 1.获取当前用户
    Long userId = UserHolder.getUser().getId();
    // 2.查询收件箱 ZREVRANGEBYSCORE key Max Min LIMIT offset count
    String key = FEED_KEY + userId;
    Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate.opsForZSet()
        .reverseRangeByScoreWithScores(key, 0, max, offset, 2);
    // 3.非空判断
    if (typedTuples == null || typedTuples.isEmpty()) {
        return Result.ok();
    }
    // 4.解析数据:blogId、minTime(时间戳)、offset
    List<Long> ids = new ArrayList<>(typedTuples.size());
    long minTime = 0; // 2
    int os = 1; // 最小值的个数
    for (ZSetOperations.TypedTuple<String> tuple : typedTuples) { // 5 4 4 2 2
        // 4.1.获取id
        ids.add(Long.valueOf(tuple.getValue()));
        // 4.2.获取分数(时间戳)
        long time = tuple.getScore().longValue();
        if(time == minTime){
            os++;
        }else{
            minTime = time;
            os = 1;
        }
    }
    os = minTime == max ? os : os + offset;
    // 5.根据id查询blog
    String idStr = StrUtil.join(",", ids);
    List<Blog> blogs = query().in("id", ids).last("ORDER BY FIELD(id," + idStr + ")").list();

    for (Blog blog : blogs) {
        // 5.1.查询blog有关的用户
        queryBlogUser(blog);
        // 5.2.查询blog是否被点赞
        isBlogLiked(blog);
    }

    // 6.封装并返回
    ScrollResult r = new ScrollResult();
    r.setList(blogs);
    r.setOffset(os);
    r.setMinTime(minTime);

    return Result.ok(r);
}

java
运行
Redis123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
© 版权声明

相关文章

暂无评论

none
暂无评论...