05-异步消息组件MQ高级

内容分享3小时前发布
0 0 0

学习目标

能够测试生产者重试机制能够测试生产者确认机制能够说出生产者确认机制的两种方式能够说出发送失败处理机制能够说出消息持久化机制能够测试消费者确认机制能够说出消费失败重试机制能够说出MQ消息幂等性方案能够说出延迟消息方案能够实现自动取消超时未支付订单功能能够说出保证消息的可靠性的完整方案

1 消息可靠性

1.1. 思路分析

在昨天的练习作业中,我们改造了余额支付功能,在支付成功后利用RabbitMQ通知交易服务,更新业务订单状态为已支付。但是大家思考一下:如果这里MQ通知失败,支付服务中支付流水显示支付成功,而交易服务中的订单状态却显示未支付,数据出现了不一致。

05-异步消息组件MQ高级

首先,我们一起分析一下消息丢失的可能性有哪些。

消息从发送者发送消息,到消费者处理消息,需要经过的流程是这样的:

05-异步消息组件MQ高级

消息从生产者到消费者的每一步都可能导致消息丢失:

发送消息时丢失:

生产者发送消息时连接MQ失败生产者发送消息到达MQ后未找到
Exchange
生产者发送消息到达MQ的
Exchange
后,未找到合适的
Queue
消息到达MQ后,处理消息的进程发生异常
MQ导致消息丢失:

消息到达MQ,保存到队列后,尚未消费就突然宕机
消费者处理消息时:

消息接收后尚未处理突然宕机消息接收后处理过程中抛出异常

综上,我们要解决消息丢失问题,保证MQ的可靠性,就必须从3个方面入手:

保证生产消息的可靠性确保MQ不会将消息弄丢保证消费消息的可靠性

注意:使用MQ并不是所有场景对消息的可靠性要求都很高,比如上图中,支付成功短信通知的流程对消息可靠性要求就不高,通常都可以保证消息正常到达消费者,即使个别没有成功通知用户也不影响主体业务流程,所以在设计技术方案时一定要根据业务需求具体分析。

1.2 生产消息可靠性

1.2.1. 生产者重试机制

1.2.1.1 配置

首先第一种情况,就是生产者发送消息时,出现了网络故障,导致与MQ的连接中断。

为了解决这个问题,SpringAMQP提供的消息发送时的重试机制。即:当
RabbitTemplate
与MQ连接超时后,多次重试。

从课程资料中找到
mq-demo-v2.zip
,并解压,使用IDEA打开mq-demo-v2工程。

修改
publisher
模块的
application.yaml
文件,添加下面的内容:



spring:
  rabbitmq:
    host: 192.168.101.68 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机【注意这里可能不对】
    username: hmall # 用户名【注意这里可能不对】
    password: 123 # 密码【注意这里可能不对】
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = 上次等待时长 * multiplier
        max-attempts: 3 # 总共尝试次数
  datasource:
    url: jdbc:mysql://192.168.101.68:3306/hmall?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
    driver-class-name: com.mysql.cj.jdbc.Driver
    username: root
    password: mysql
mybatis-plus:
  configuration:
    default-enum-type-handler: com.baomidou.mybatisplus.core.handlers.MybatisEnumTypeHandler
  global-config:
    db-config:
      update-strategy: not_null
      id-type: auto
logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS
  level:
    com.itheima: debug
 
rabbit-mq:
  enable: true
  persistence:
    enable: true

配置参数解释


initial-interval
: 失败后的初始等待时间
multiplier
: 倍增器,每次重试的等待时间是前一次几倍。

失败后下次等待时长 =上次等待时长 * multiplier


max-attempts
: 最大重试次数(包括第一次尝试)

举例:

由于
multiplier
设置为1,这意味着每次重试之间的间隔是固定的,不会增加。

假设在t=0时刻首次尝试发送消息,如果发送失败,则会按照以下时间点进行重试:

第一次尝试(也是首次发送):t=0(假设即时失败)第一次重试:等待1秒后重试,t=1秒(首次失败后等待1秒)第二次重试:等待1秒*1=1秒 后重试,t=2秒(从第一次重试再等待1秒)

如果设置如下:



initial-interval:1000ms
multiplier:2
max-attempts: 5

由于
multiplier
设置为2,这意味着每次重试之间的间隔会翻倍。

假设在t=0时刻首次尝试发送消息,如果发送失败,则会按照以下时间点进行重试:

第一次尝试(也是首次发送):t=0(假设即时失败)第一次重试:等待1秒后重试,t=1秒(首次失败后等待1秒)第二次重试:等待1*2=2秒 后重试,t=3秒第三次重试:等待2*2=4秒 后重试,t=7秒第四次重试:等待4*2=8秒 后重试,t=15秒

1.2.1.2 测试

我们利用命令停掉RabbitMQ服务:


docker stop mq

然后测试发送一条消息,会发现会每隔1秒重试1次,总共重试了3次。消息发送超时重试机制配置成功!



01-15 17:37:58:252  INFO 18792 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.101.68:5672]
01-15 17:38:00:281  INFO 18792 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.101.68:5672]
01-15 17:38:02:306  INFO 18792 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.101.68:5672]
 
org.springframework.amqp.AmqpIOException: java.net.SocketTimeoutException: connect timed out

注意:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。

如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

1.2.2. 生产者确认机制

1.2.2.1 两种机制介绍

一般情况下,只要生产者与MQ之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。

不过,在少数情况下,也会出现消息发送到MQ之后丢失的现象,比如:

MQ内部处理消息的进程发生了异常生产者发送消息到达MQ后未找到
Exchange
生产者发送消息到达MQ的
Exchange
后,未找到合适的
Queue
,因此无法路由

针对上述情况,RabbitMQ提供生产者确认机制,包括
Publisher Confirm

Publisher Return
两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执

具体如图所示:

05-异步消息组件MQ高级

生产者确认机制:

1.Publisher Return

消息投递成功但路由失败会调用Publisher Return回调方法返回异常信息。

2.Publisher Confirm

消息投递成功返回ack,投递失败返回nack。

注意:消息投递成功但可能路由失败了,此时会通过Publisher Confirm返回ack,通过Publisher Return回调方法返回异常信息。

默认两种机制都是关闭状态,需要通过配置文件来开启。

生产者确认机制:确保消息发送到MQ的一种机制,它通过2个函数来确认

(1)【异常】找不到交换机:publisher-confirm:nack

(2)【异常】找到交换机,匹配不到队列:publisher-return:ack

(3)【正常】找到交换机,匹配到队列:publisher-cofirm:ack

1.2.2.2 开启生产者确认

在publisher模块的
application.yaml
中添加配置:



spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开启publisher return机制

这里
publisher-confirm-type
有三种模式可选:


none
:关闭confirm机制
simple
:同步阻塞等待MQ的回执(回调方法)
correlated
:MQ异步回调返回回执

一般我们推荐使用
correlated
,回调机制。

1.2.2.3 实现方法

ReturnCallback

每个
RabbitTemplate
只能配置一个
ReturnCallback
,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:

05-异步消息组件MQ高级

内容如下:



package com.itheima.publisher.config;
 
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;
 
import javax.annotation.PostConstruct;
 
@Slf4j
@AllArgsConstructor
@Configuration
public class MqConfig {
    private final RabbitTemplate rabbitTemplate;
 
    @PostConstruct
    public void init(){
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                log.error("触发return callback,");
                log.debug("exchange: {}", returned.getExchange());
                log.debug("routingKey: {}", returned.getRoutingKey());
                log.debug("message: {}", returned.getMessage());
                log.debug("replyCode: {}", returned.getReplyCode());
                log.debug("replyText: {}", returned.getReplyText());
            }
        });
    }
}
ConfirmCallback

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:

05-异步消息组件MQ高级

这里的CorrelationData中包含两个核心的东西:


id
:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆
SettableListenableFuture
:回执结果的Future对象

将来MQ的回执就会通过这个
Future
来返回,我们可以提前给
CorrelationData
中的
Future
添加回调函数来处理消息回执:

05-异步消息组件MQ高级

我们测试下边的方法,向系统自带的交换机发送消息,并且添加
ConfirmCallback

注意:此代码不用编写直接测试即可,稍后我们会用工具类替代。



@Test
void testPublisherConfirm() {
    // 1.创建CorrelationData
    CorrelationData cd = new CorrelationData();
    // 2.给Future添加ConfirmCallback
    cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
        @Override
        public void onFailure(Throwable ex) {
            // 2.1.Future发生异常时的处理逻辑,基本不会触发
            log.error("send message fail", ex);
        }
        @Override
        public void onSuccess(CorrelationData.Confirm result) {
            // 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
            if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
                log.debug("发送消息成功,收到 ack!");
            }else{ // result.getReason(),String类型,返回nack时的异常描述
                log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
            }
        }
    });
    // 3.发送消息,故意指定一个错误的rontingKey
    rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}
1.2.2.4 测试

测试报错:

05-异步消息组件MQ高级

原因:每个RabbitTemplate只支持一个ReturnCallback。

解决:

屏蔽common-rabbitmq依赖,在common-rabbitmq中对RabbitTemplate设置了ReturnCallback

05-异步消息组件MQ高级

测试步骤:

可以看到,由于传递的
RoutingKey
是错误的,路由失败后,触发了
return callback
,同时也收到了ack。

05-异步消息组件MQ高级

当我们修改为正确的
RoutingKey
以后,就不会触发
return callback
了,只收到ack。

当我们把交换机名称修改错误则只会收到nack。

1.2.3. 发送失败处理机制

1.2.3.1 失败处理机制

在ConfirmCallback中收到nack表示消息投递失败,
ReturnCallback
异常表示路由失败

高频面试题:消息投递失败怎么处理/你们怎么保证消费成功

可以将消息记录到失败消息表,由定时任务进行发布,每隔10秒钟(可设置)执行获取失败消息重新发送,发送一次则在失败次数字段加一,达到3次停止自动发送由人工处理(如钉钉告警)。

在commonn-rabbitmq模块中实现了发送消息的工具方法,此方法实现了发送失败处理机制。


ReturnCallback
回调逻辑在com.itheima.common.rabbitmq.config.RabbitMqConfiguration中,核心代码如下:



@Configuration
@ConditionalOnProperty(prefix = "rabbit-mq", name = "enable", havingValue = "true")
@Import({RabbitClient.class, FailMsgDaoImpl.class})
@Slf4j
public class RabbitMqConfiguration implements ApplicationContextAware {
 
    /**
     * 并发数量
     */
    public static final int DEFAULT_CONCURRENT = 10;
 
    @Autowired(required = false)
    private FailMsgDao failMsgDao;
 
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        // 获取RabbitTemplate
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        //定义returnCallback回调方法
        rabbitTemplate.setReturnsCallback(
            new RabbitTemplate.ReturnsCallback() {
                @Override
                public void returnedMessage(ReturnedMessage returnedMessage) {
                    byte[] body = returnedMessage.getMessage().getBody();
                    //消息id
                    String messageId = returnedMessage.getMessage().getMessageProperties().getMessageId();
                    String content = new String(body, Charset.defaultCharset());
                    log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息id{},消息内容{}",
                             returnedMessage.getReplyCode(),
                             returnedMessage.getReplyText(),
                             returnedMessage.getExchange(),
                             returnedMessage.getRoutingKey(),
                             messageId,
                             content);
                    if (failMsgDao != null) {
                        // 失败消息落库(后续定时任务重试,达最大次数人工干预)
                        failMsgDao.save(messageId, returnedMessage.getExchange(), returnedMessage.getRoutingKey(), content, 0, DateUtils.getCurrentTime()+10, "returnCallback");
                    }
                }
            }
        );
    }
 
}

ApplicationContextAware 的作用:如果 Bean 实现了 ApplicationContextAware 接口,Spring 容器会调用 setApplicationContext 方法,将 ApplicationContext 传递给该 Bean。

Bean 创建:Spring 容器首先创建 Bean 实例。

属性注入:Spring 容器对 Bean 的属性进行依赖注入。

Aware 接口回调:如果 Bean 实现了 ApplicationContextAware 接口,Spring 容器会调用 setApplicationContext 方法,将 ApplicationContext 传递给该 Bean。

初始化方法:如果 Bean 配置了初始化方法(例如通过 @PostConstruct 注解或 init-method 属性),Spring 容器会调用这些初始化方法。

以上可以详见:Spring源码解析

ConfirmCallback的逻辑在RabbitClient 工具类的
sendMsg
方法中,通过sendMsg方法发送消息,在发送消息时指定
correlationData
对象,在correlationData对象中指定了ConfirmCallback回调方法的逻辑,当返回nack会将消息写入失败表,如果消息重发成功会将该记录从失败表删除。

核心代码如下:



@Slf4j
@Service
public class RabbitClient {
    ....
 
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Autowired(required = false)
    private FailMsgDao failMsgDao;
 
    /**
     * 发送消息 重试3次
     *
     * @param exchange   交换机
     * @param routingKey 路由key
     * @param msg        消息对象,会将对象序列化成json字符串发出
     * @param delay      延迟时间 秒
     * @param msgId      消息id
     * @param isFailMsg  是否是失败消息
     * @return 是否发送成功
     */
    @Retryable(value = MqException.class, maxAttempts = 3, backoff = @Backoff(value = 3000, multiplier = 1.5), recover = "saveFailMag")
    public void sendMsg(String exchange, 
                        String routingKey, 
                        Object msg, 
                        Integer delay, 
                        String msgId, 
                        boolean isFailMsg) {
        // 1.发送消息前准备
        // 1.1获取消息内容,如果非字符串将其序列化
        String jsonMsg = JsonUtils.toJsonStr(msg);
        // 1.2.全局唯一消息id,如果调用者设置了消息id,使用调用者消息id,如果为配置,默认雪花算法生成消息id
        if(StrUtil.isBlank(msgId)){
            msgId = IdUtil.getSnowflakeNextIdStr();
        }
        // 1.3.设置默认延迟时间,默认立即发送
        delay = NumberUtils.null2Default(delay, -1);
        log.debug("消息发送!exchange = {}, routingKey = {}, msg = {}, msgId = {}", exchange, routingKey, jsonMsg, msgId);
 
        // 1.4.构建回调
        RabbitMqListenableFutureCallback futureCallback = RabbitMqListenableFutureCallback.builder()
        .exchange(exchange)
        .routingKey(routingKey)
        .msg(jsonMsg)
        .msgId(msgId)
        .delay(delay)
        .isFailMsg(isFailMsg)
        .failMsgDao(failMsgDao)
        .build();
        // 1.5.CorrelationData设置
        CorrelationData correlationData = new CorrelationData(msgId.toString());
        correlationData.getFuture().addCallback(futureCallback);
 
        // 1.6.构造消息对象
        Message message = MessageBuilder.withBody(StrUtil.bytes(jsonMsg, CharsetUtil.CHARSET_UTF_8))
        //持久化
        .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
        //消息id
        .setMessageId(msgId.toString())
        .build();
 
        try {
            // 2.发送消息
            this.rabbitTemplate.convertAndSend(exchange, routingKey, message, new DelayMessagePostProcessor(delay), correlationData);
        } catch (Exception e) {
            log.error("send error:" + e);
            // 3.构建异常回调,并抛出异常
            MqException mqException = new MqException();
            mqException.setMsg(ExceptionUtil.getMessage(e));
            mqException.setMqId(msgId);
            throw mqException;
        }
    }
 
    public class RabbitMqListenableFutureCallback implements ListenableFutureCallback<CorrelationData.Confirm> {
 
        //记录失败消息service
        private FailMsgDao failMsgDao;
 
        private String exchange;
        private String routingKey;
        private String msg;
        private String msgId;
        private Integer delay;
 
        //是否是失败消息
        private boolean isFailMsg=false;
 
        @Override
        public void onFailure(Throwable ex) {
            if(failMsgDao == null) {
                return;
            }
            // 执行失败保存失败信息
            failMsgDao.save(msgId, exchange, routingKey, msg, delay, DateUtils.getCurrentTime() + 10, ExceptionUtil.getMessage(ex));
        }
 
        @Override
        public void onSuccess(CorrelationData.Confirm result) {
        if(failMsgDao == null){
            return;
        }
        if(!result.isAck()){
            // 执行失败保存失败信息,如果已经存在保存信息,如果不在信息信息
            failMsgDao.save(msgId, exchange, routingKey, msg, delay,DateUtils.getCurrentTime() + 10, "MQ回复nack");
        }else if(isFailMsg && msgId != null){
            // 如果发送的是失败消息,当收到ack需要从fail_msg删除该消息
            failMsgDao.removeById(msgId);
        }
    }
}

当发送消息失败会入库到失败消息表。

我们可以启动定时任务去扫描失败消息表的记录,重新发送,当达到最大失败次数后由人工处理。

1.2.3.4 测试

下边测试发送失败入库功能:

首先在publisher模块添加common-rabbitmq依赖



<dependency>
  <groupId>com.itheima</groupId>
  <artifactId>common-rabbitmq</artifactId>
  <version>0.0.1-SNAPSHOT</version>
</dependency>

屏蔽MqConfig类中设置ReturnCallback的代码

由于RabbitTemplate只能设置一次ReturnCallback,而在common-rabbitmq中设置了ReturnCallback,所以屏蔽MqConfig类中设置ReturnCallback的代码,如下图:

05-异步消息组件MQ高级

在hmall中创建失败消息表

查看虚拟机对应hmall数据库,如果已存在fail_msg表需要删除原表再通过下边的语句新建表。



create table fail_msg
(
  id                     varchar(255)  not null comment '消息id'
  primary key,
  exchange               varchar(255)  not null comment '交换机',
  routing_key            varchar(255)  not null comment '路由key',
  msg                    text          not null comment '消息',
  reason                 varchar(255)  not null comment '原因',
  delay_msg_execute_time int           null comment '延迟消息执行时间',
  next_fetch_time        int           null comment '下次拉取时间',
  create_time            datetime      null comment '创建时间',
  update_time            datetime      null comment '更新时间',
  fail_count             int default 0 null comment '失败次数'
)
    comment '失败消息记录表' charset = utf8mb4;

在publisher模块进行配置

在启动类中添加@MapperScan(“com.itheima.common.rabbitmq.dao.mapper”)

在application.yaml添加:直接复制粘贴为一级配置

05-异步消息组件MQ高级



rabbit-mq:
  enable: true
  persistence:
    enable: true

使用RabbitClient工具类发送消息



@Resource
private RabbitClient rabbitClient;
@Test
void testPublisherReturn() {
    rabbitClient.sendMsg("hmall.directa", "q", "hello");
}

分别测试ReturnCallback和ConfirmCallback不同情况下失败消息入库。

测试ReturnCallback时注意:单元测试方法运行完就完毕了数据库连接池,而ReturnCallback是回调方法,是在单元测试方法执行完再执行,在ReturnCallback中操作数据库时报没有可用的数据库连接的错误,需要在单元测试方法最后添加休眠代码,保证ReturnCallback执行完成再结束整个单元测试方法。



@Test
void testPublisherReturn() throws InterruptedException {
    rabbitClient.sendMsg("hmall.directa", "q", "hello");
    // 增加一点时间,确保ReturnCallback执行完成
    Thread.sleep(5000);
}

1.2.4 失败消息定时任务重发(作业)

我们完成了失败消息的存储,请使用任意一个你熟悉的定时任务完成消息的重发

05-异步消息组件MQ高级

思路大致如下:

选定一个定时任务框架,如:SpringTask、Quartz、xxljob等编写实现代码

扫描表:fail_msg扫描条件:where fail_count < 3调用rabbitClient重发消息发送完成后,删除表:fail_msg中的数据
参考实现思路:首先问AI,然后就可以得到详尽的步骤

05-异步消息组件MQ高级

大致代码路径如下:这里我就不再写查找数据库那部分代码了,较为简单

05-异步消息组件MQ高级

1.2.5 小结

如何保证生产消息可靠性?

首先在发送消息时可以开启重试机制,避免因为短暂的网络问题导致发送消息失败。

RabbitMQ还提供生产者确认机制保证发送消息到MQ的可靠性。

生产者确认机制包括两种:

1.Publisher Return

消息投递成功但路由失败会调用Publisher Return回调方法返回异常信息。

2.Publisher Confirm

消息投递成功返回ack,投递失败返回nack。

注意:消息投递成功但可能路由失败了,此时会通过Publisher Confirm返回ack,通过Publisher Return回调方法返回异常信息。

我们在发送消息时给每个消息指定一个唯一ID,设置回调方法,如果Publisher Return失败或Publisher Confirm返回nack,我们在回调方法中解析失败消息,并记录到失败表由定时任务去异步重新发送。

1.3.消息持久化

为了提升性能,默认情况下MQ的数据都是在内存存储的临时数据,重启后就会消失。为了保证数据的可靠性,必须配置持久化,包括:

交换机持久化队列持久化消息持久化

我们以控制台界面为例来说明。

1.3.1 交换机持久化

交换机持久化是指将交换机的定义信息(元数据)持久化到RabbitMQ的数据库(mnesia)中,RabbitMQ重启后交换机定义仍然存在。

若交换机不设置持久化,在rabbitmq服务重启之后,相关的交换机元数据会丢失,但消息不会丢失,只是不能将消息发送到这个交换机中,所以通常要设置交换机持久化。

在控制台中声明交换机是默认设置持久化的。

设置方法:

在控制台的
Exchanges
页面,添加交换机时可以配置交换机的
Durability
参数:

05-异步消息组件MQ高级

设置为
Durable
就是持久化模式,
Transient
就是临时模式。

设置持久化后在交换机列表会有一个”D”标识

05-异步消息组件MQ高级

1.3.2 队列持久化

队列持久化也是将队列的定义信息(元数据)持久化到RabbitMQ的数据库中。

如果队列不设置持久化,在RabbitMQ重启后队列的元数据丢失。

设置队列持久化可以保证队列本身的元数据不会因异常情况而丢失,队列中存储的是消息的在队列中的位置、消息的ID、存储位置等,消息会存储在独立的rdq数据文件中,队列持久化不能保证消息数据不会丢失。

在控制台的Queues页面,添加队列时,同样可以配置队列的
Durability
参数:

05-异步消息组件MQ高级

除了持久化以外,你可以看到队列还有很多其它参数,有一些我们会在后期学习。

设置持久化后在队列列表会有一个”D”标识

05-异步消息组件MQ高级

1.3.3 消息持久化

要确保消息不会丢失需要将消息设置为持久化

在控制台发送消息的时候,可以添加很多参数,而消息的持久化是要配置一个
properties

05-异步消息组件MQ高级

查看持久消息的内容:

delivery_mode=2 表示持久化

05-异步消息组件MQ高级

1.4 消费消息可靠性

当RabbitMQ向消费者投递消息以后,需要知道消费者的处理状态如何。因为消息投递给消费者并不代表就一定被正确消费了,可能出现的故障有很多,比如:

消息投递的过程中出现了网络故障消费者接收到消息后突然宕机消费者接收到消息后,因处理不当导致异常…

一旦发生上述情况,消息也会丢失。因此,RabbitMQ必须知道消费者的处理状态,一旦消息处理失败能重新投递消息。

但问题来了:RabbitMQ如何得知消费者的处理状态呢?

本章我们就一起研究一下消费者处理消息时的可靠性解决方案。

1.4.1. 消费者确认机制

1.4.1.1 介绍

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制(Consumer Acknowledgement)。即:当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ消息处理状态。回执有三种可选值:

ack:成功处理消息,RabbitMQ从队列中删除该消息nack:消息处理失败,RabbitMQ需要再次投递消息reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

一般reject方式用的较少,除非是消息格式有问题,那就是开发问题了。因此大多数情况下我们需要将消息处理的代码通过
try catch
机制捕获,消息处理成功时返回ack,处理失败时返回nack.

SpringAMQP帮我们实现了消息确认,并可以通过配置文件设置消息确认的处理方式,有三种模式:


none
:不处理。即消息投递给消费者后消息会立刻从MQ删除。非常不安全,不建议使用
manual
:手动模式。需要自己在业务代码中调用api,发送
ack

reject
,存在业务入侵,但更灵活
auto
:自动模式。当业务正常执行时则自动返回
ack
. 当业务出现异常时,根据异常判断返回不同结果:

如果是业务异常,会自动返回
nack
;如果是消息处理或校验异常,自动返回
reject
,返回的异常包括:MessageConversionException、MethodArgumentTypeMismatchException等

1.4.1.2 auto模式测试

通过下面的配置可以修改消息确认的处理方式为auto:



spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto # 自动ack

修改consumer服务的SpringRabbitListener类中的方法,模拟一个消息处理的异常:



@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage2(String msg) throws InterruptedException {
    log.info("spring 消费者接收到消息:【" + msg + "】");
    if (true) {
        throw new RuntimeException("故意的");
    }
    log.info("消息处理完成");
}

在此方法内第一句打断点,我们向队列“simple.queue”发一条消息,此时进入断点

05-异步消息组件MQ高级

可以发现此时有一条消息状态为
unacked
(未确定状态):

05-异步消息组件MQ高级

放行以后,由于抛出的是业务异常,消息处理失败后,会回到RabbitMQ,并重新投递到消费者。

1.4.1.3
manual
模式测试(自学)

通常在应用程序中会使用
auto
自动模式,手动模式在生产中不常用可以自行学习。

通过 api指定是否重新入队,具体返回ack或nack通过手动编程实现。

由于手动模式需要通过api编程,需要在监听方法添加Channel、Message类型的参数,如下:

Message:是spring AMQP封装的底层消息对象。

Channel:是消费端与MQ基于通道的操作对象。



@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg, Channel channel, Message message) throws InterruptedException, IOException {
    log.info("spring 消费者接收到消息:【" + msg + "】");
    //返回nack
    //每个参数的意义:1.消息的标记 2.是否确认之前所有未确认的消息 3.是否重新入队
    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
    //        log.info("消息处理完成");
    //        //返回ack,每个参数的意义:1.消息的标记 2.是否确认之前所有消息
    //        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
1.4.1.4 小结

消息者确认机制怎么实现?

消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ消息处理状态:

ack:成功处理消息,RabbitMQ从队列中删除该消息nack:消息处理失败,RabbitMQ需要再次投递消息reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

具体实现方法:

SpringAMQP提供消息确认配置,有三种模式:


none
:不处理。即消息投递给消费者后消息会立刻从MQ删除。非常不安全,不建议使用。
manual
:手动模式。需要自己在业务代码中调用api,发送
ack

reject

auto
:自动模式。当业务正常执行时则自动返回
ack
. 当业务出现异常时会自动返回
nack
.

我们通常使用auto自动模式,业务处理完成没有异常则自动返回ack,如果存在业务异常则自动返回nack,如果在消息处理或消息校验异常时自动返回reject。

除了自动确认还要了解手动确认,手动确认是将返回ack或nack在程序中编码实现,举例如下:

返回ack:



//返回ack,每个参数的意义:1.消息的标记 2.是否确认之前所有消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

返回nack:



//每个参数的意义:1.消息的标记 2.是否确认之前所有消息 3.是否重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);

1.4.2. 失败重试机制

1.4.2.1 本地重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次返回到队列,再次投递,直到消息处理成功为止。

极端情况就是消费者一直无法执行成功,那么消息投递就会无限循环,导致mq的消息处理飙升,带来不必要的压力:

05-异步消息组件MQ高级

当然,上述极端情况发生的概率还是非常低的,不过不怕一万就怕万一。为了应对上述情况Spring又提供了消费者失败重试机制:在消费者出现异常时利用本地重试,而不是无限制的投递到mq队列。

修改consumer服务的application.yml文件,添加内容:



spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = 上次等待时长 * multiplier
          max-attempts: 3 # 最大重试次数

重启consumer服务,重复之前的测试。可以发现:

消费者在失败后消息没有重新回到MQ无限重新投递,而是在本地重试了3次本地重试3次后查看RabbitMQ控制台,发现消息被删除,说明最后SpringAMQP返回
reject

结论:

开启本地重试时,消息处理过程中抛出异常,不会请求到队列,而是在消费者本地重试重试达到最大次数后,Spring会返回reject,消息会被丢弃

1.4.2.2 失败消息入队

本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。

因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由
MessageRecovery
接口来定义的,它有3个不同实现:


RejectAndDontRequeueRecoverer
:重试耗尽后,直接
reject
,丢弃消息。默认就是这种方式
ImmediateRequeueMessageRecoverer
:重试耗尽后,返回
nack
,消息重新入队
RepublishMessageRecoverer
:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是
RepublishMessageRecoverer
,失败后将消息投递到一个固定交换机,通过交换机将消息转发到失败消息队列,程序监听失败消息队列,接收到失败消息,将失败消息存入失败消息表,通过定时任务进行处理。

1)在consumer服务中定义处理失败消息的交换机和队列



@Bean
public DirectExchange errorMessageExchange(){
    return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
    return new Queue("error.queue", true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
    return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
}

2)定义一个RepublishMessageRecoverer,指定失败消息投递交换机的名称及routingkey



@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
    return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
}

完整代码如下:



package com.itheima.consumer.config;
 
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
 
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorMessageConfig {
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
    }
 
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}
1.4.2.3 测试失败消息入队

测试流程:

启动消费端程序:



@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
    log.info("spring 消费者接收到消息:【" + msg + "】");
    if (true) {
        throw new RuntimeException("故意的");
    }
    log.info("消息处理完成");
}

达到最大重试次数将会投递到失败消息队列。

05-异步消息组件MQ高级

监听失败消息队列将失败消息写入数据库中,由人工定期处理。



@RabbitListener(queues = "error.queue")
public void listenErrorQueue(String msg) throws InterruptedException {
    System.out.println("接收失败消息:【" + msg + "】" + LocalTime.now());
    //存入数据库失败消息表...
}
1.4.2.4 小结

消费消息失败重试机制是什么?

提供三种策略:


RejectAndDontRequeueRecoverer
:重试耗尽后,直接
reject
,丢弃消息。默认就是这种方式
ImmediateRequeueMessageRecoverer
:重试耗尽后,返回
nack
,消息重新入队
RepublishMessageRecoverer
:重试耗尽后,将失败消息投递到指定的交换机

推荐使用
RepublishMessageRecoverer
,将失败消息投递到固定的交换机,通过交换机将消息转发到失败消息队列,程序监听失败消息队列,接收到失败消息,将失败消息存入失败消息表,通过定时任务进行处理。

1.4.3. MQ消息幂等性

1.4.3.1 什么是幂等性

何为幂等性?

在程序开发中,是指同一个业务,执行一次或多次对业务状态的影响是一致的。例如:

根据id删除数据查询数据

但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:

取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况退款业务。重复退款对商家而言会有经济损失。

所以,我们要尽可能避免业务被重复执行,然而在实际业务场景中,由于意外经常会出现业务被重复执行的情况。例如:

页面卡顿时频繁刷新导致表单重复提交服务间调用的重试MQ消息的重复投递

因此,我们必须想办法保证消息处理的幂等性。这里给出两种方案:

唯一消息ID业务状态判断

幂等专题PPT+工程代码:提取码: aj6p

https://pan.baidu.com/s/1GHoA05nTTacO6av9qHjw1w?pwd=aj6p

1.4.4.2 唯一消息ID

这个思路非常简单:

每一条消息都生成一个唯一的id,与消息一起投递给消费者。消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库或Redis如果下次又收到相同消息,去数据库或Redis查询判断是否存在,存在则为重复消息放弃处理。

我们该如何给消息添加唯一ID呢?

其实很简单,SpringAMQP的MessageConverter自带了MessageID的功能,我们只要开启这个功能即可。

以Jackson的消息转换器为例:



@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jjmc.setCreateMessageIds(true);
    return jjmc;
}

接收消息的方法中添加org.springframework.amqp.core.Message类型的参数,整体的处理逻辑是:

接收消息解析出消息id根据消息id查询数据库已处理消息表如果存在说明已处理该消息,如果查询不到说明未处理该消息根据判断结果去处理消息处理消息完毕将消息id写入已处理消息表。

代码如下:



@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg,Channel channel,Message message) throws InterruptedException {
    log.info("spring 消费者接收到消息:【" + msg + "】");
    //消息id
    String messageId = message.getMessageProperties().getMessageId();
    //先从数据库查询是否已处理该消息,否则已处理直接return
    if (true) {
        throw new RuntimeException("故意的");
    }
    //将已处理的消息id存入数据库中...
    log.info("消息处理完成");
}
1.4.4.3 业务判断

业务判断就是基于业务本身的逻辑或状态来判断是否是重复的请求,不同的业务场景判断的思路也不一样。

例如在支付通知案例中,处理消息的业务逻辑是把订单状态从未支付修改为已支付。因此我们就可以在执行更新时判断订单状态是否是未支付,如果不是则证明订单已经被处理过,无需重复处理。

这里的SQL就类似于:where ** and status='1',这种思想就是CAS(compare and swap)乐观锁思想

乐观锁、悲观锁的思想很重要,大家务必学习,概述如下

乐观锁:认为不一定发生,常见方案:CAS

悲观锁:认为一定会发生,常见方案:单机synchronzed、ReetrantLock,分布式Redisson、Zookeeper等

相比较而言,使用唯一消息ID的方案需要操作数据库或Redis保存消息ID,所以更推荐使用业务判断的方案。

以支付修改订单的业务为例,我们需要修改
OrderServiceImpl
中的
markOrderPaySuccess
方法:



@Override
public void markOrderPaySuccess(Long orderId) {
    // 1.查询订单
    Order old = getById(orderId);
    // 2.判断订单状态
    if (old == null || old.getStatus() != 1) {
        // 订单不存在或者订单状态不是1,放弃处理
        return;
    }
    // 3.尝试更新订单
    Order order = new Order();
    order.setId(orderId);
    order.setStatus(2);
    order.setPayTime(LocalDateTime.now());
    updateById(order);
}

直接update语句中添加条件也可以,如下:



@Override
public void markOrderPaySuccess(Long orderId) {
    // UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1
    lambdaUpdate()
    .set(Order::getStatus, 2)
    .set(Order::getPayTime, LocalDateTime.now())
    .eq(Order::getId, orderId)
    // 这个是判断的关键
    .eq(Order::getStatus, 1)
    .update();
}

注意看,上述代码等同于这样的SQL语句:


UPDATE `order` SET status = ? , pay_time = ? WHERE id = ? AND status = 1

我们在where条件中除了判断id以外,还加上了status必须为1的条件。如果条件不符(说明订单已支付),则SQL匹配不到数据,根本不会执行。

1.4.4.4 面试题

如何保证MQ幂等性?或 如何防止消息重复消费?

1.4.4. 业务补偿

虽然我们利用各种机制尽可能增加了消息的可靠性,但也不好说能保证消息100%的可靠。万一真的MQ通知失败该怎么办呢?有没有其它补偿方案,能够确保订单的支付状态一致呢?

其实思想很简单:既然MQ通知不一定发送到交易服务,那么交易服务就必须自己主动去查询支付状态。这样即便支付服务的MQ通知失败,我们依然能通过主动查询来保证订单状态的一致。流程如下:

05-异步消息组件MQ高级

支付宝、微信等支付也是这种思路:异步回调(VX告诉你是否成功) + 主动查询(你主动查是否成功)

图中黄色线圈起来的部分就是MQ通知失败后的补偿处理方案,由交易服务自己主动去查询支付状态。

不过需要注意的是,交易服务并不知道用户会在什么时候支付,如果查询的时机不正确(比如查询的时候用户正在支付中),可能查询到的支付状态也不正确。

那么问题来了,我们到底该在什么时间主动查询支付状态呢?

由用户在页面中主动点击“已完成支付”按钮时去查询最新的支付结果。

05-异步消息组件MQ高级

通过定时任务去定义查询支付结果,比如:对于30分钟内的订单状态为未支付状态时去主动查询支付结果,超过30分钟用户未支付将自动取消订单。

可以每隔一段时间查询一次,并判断支付状态。如果发现订单已经支付,则立刻更新订单状态为已支付即可。定时任务大家之前学习过,具体的实现这里就不再赘述了。

至此,消息可靠性的问题已经解决了。

综上,支付服务与交易服务之间的订单状态一致性是如何保证的?

首先,支付服务在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递的可靠性最后,我们提供了用户主动查询支付结果的入口 ,并且还在交易服务设置了定时任务,定期查询订单支付状态。这样即便MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。

1.4.5. 面试题

如何保证消息的可靠性?可以百分百保证MQ的消息可靠性吗?

如何保证MQ幂等性?或 如何防止消息重复消费?

2.延迟消息

2.1. 技术方案

在电商的支付业务中,对于一些库存有限的商品,为了更好的用户体验,通常都会在用户下单时立刻扣减商品库存。例如电影院购票、高铁购票,下单后就会锁定座位资源,其他人无法重复购买。

但是这样就存在一个问题,假如用户下单后一直不付款,就会一直占有库存资源,导致其他客户无法正常交易,最终导致商户利益受损!

面试官真的会问:你们什么时候扣减库存?

加入购物车:不合理创建订单:有可能,注意超时未支付的自动取消订单+释放库存实际支付:有可能,直接扣减库存即可,注意分布式事务

因此,电商中通常的做法就是:对于超过一定时间未支付的订单会自动取消订单并释放占用的库存

例如,订单支付超时时间为30分钟,则我们应该在用户下单后的第30分钟检查订单支付状态,如果发现未支付,应该立刻取消订单,释放库存。

但问题来了:如何才能准确的实现在下单后第30分钟去检查支付状态呢?

像这种在一段时间以后才执行的任务,我们称之为延迟任务,而要实现延迟任务,最简单的方案就是利用MQ的延迟消息了。

在RabbitMQ中实现延迟消息也有两种方案:

2.1.1 死信交换机+TTL

该方案利用死信交换机实现,死信交换机(Dead Letter Exchange, DLX)是一种处理消息队列中无法被消费的消息的方式。当消息因为某些原因(如消费者拒绝消费消息、消息过期等)而无法被正常消费时,这些消息就会成为“死信”。

结合 TTL(Time To Live),你可以设定消息在队列中的存活时间,当消息在队列中停留的时间超过了设定的 TTL 后,消息就会成为死信。当消息变成死信时,会被发送到死信交换机中去,通过死信交换机转发到指定的队列,由应用程序去消费,进一步处理这些消息。

假设需求是下单后30分钟对未支付订单自动取消订单,通过下图举例说明:

05-异步消息组件MQ高级

流程如下:

创建订单成功向“ttl.fanout”交换机发送消息,消息内容记录下单的订单信息,消息的TTL为30分钟通过”ttl.fanout”将消息转发到”ttl.queue”队列由于该队列并没有消费程序去监听,当到达30分钟时该消息变为死信,自动发给死信交换机“hmall.direct”,由死信交换机将消息转发到”direct.queue1″队列。应用监听“direct.queue1”队列,收到未支付超时订单,执行取消订单并回滚库存。

此方案的问题是:

当一个消息的TTL到期以后不一定会被移除或投递到死信交换机,而是在消息恰好处于队首时才会被处理。

当队列中消息堆积很多的时候,过期消息可能不会被按时处理,因此你设置的TTL时间不一定准确。

2.1.2 延迟消息插件

RabbitMQ社区提供了一个延迟消息插件来实现相同的效果。

此方案实现过程非常简单,安装延迟消息插件,发送消息时指定延迟时间,到达延迟时间消息被消费。

延迟消息插件内部会维护一个本地数据库表,同时使用Elang Timers功能实现计时。如果消息的延迟时间设置较长,可能会导致堆积的延迟消息非常多,会带来较大的CPU开销,同时延迟消息的时间会存在误差。

因此,不建议设置延迟时间过长的延迟消息

对上述两个方案比较,实现延迟消息推荐使用延迟消息插件实现。

2.2.使用延迟消息插件

官方文档说明:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

2.2.1下载

插件下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

由于我们安装的MQ是
3.8
版本,因此这里下载
3.8.17
版本:

05-异步消息组件MQ高级

当然,也可以直接使用课前资料提供好的插件:

05-异步消息组件MQ高级

2.2.2 安装

因为我们是基于Docker安装,所以需要先查看RabbitMQ的插件目录对应的数据卷。


docker volume inspect mq-plugins

结果如下:



[
    {
        "CreatedAt": "2024-06-19T09:22:59+08:00",
        "Driver": "local",
        "Labels": null,
        "Mountpoint": "/var/lib/docker/volumes/mq-plugins/_data",
        "Name": "mq-plugins",
        "Options": null,
        "Scope": "local"
    }
]

插件目录被挂载到了
/var/lib/docker/volumes/mq-plugins/_data
这个目录,我们上传插件到该目录下。接下来执行命令,安装插件【就算本地插件已有,这里也要执行,否则不生效】:


docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

运行结果如下:

05-异步消息组件MQ高级

2.2.3 声明延迟交换机

控制台方式(二选一):

05-异步消息组件MQ高级

创建delay.queue队列,并绑定到延迟交换机

05-异步消息组件MQ高级

基于注解方式(二选一):



@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true",type = ExchangeTypes.DIRECT, durable = "true"),
        key = "delay"
))
public void listenDelayMessage(String msg){
    log.info("接收到delay.queue的延迟消息:{}", msg);
}

2.2.4 发送延迟消息

发送消息时,必须通过x-delay属性设定延迟时间:



@Test
void testPublisherDelayMessage() {
    // 1.创建消息
    String message = "hello, delayed message";
    // 2.发送消息,利用消息后置处理器添加消息头
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 添加延迟消息属性
            message.getMessageProperties().setDelay(5000);
            log.info("发送消息"+new String(message.getBody(), StandardCharsets.UTF_8)+ LocalDateTime.now());
            return message;
        }
    });
}

2.2.5 测试

测试流程:

启动消费端监听程序。

在consumer模块添加监听方法。



@RabbitListener(queues = "delay.queue")
public void listenDelayQueue(String msg) throws InterruptedException {
    System.out.println("接收延迟消息:【" + msg + "】" + LocalTime.now());
}

执行发送程序观察发送程序日志中发送的消息的时间,观察监听程序的日志中接收消息时间的,判断是否是按延迟时间发送消息。

注意:发送延迟消息会触发ReturnCallback,这是因为延迟消息并没有发送到队列,而是在延迟时间到达才发送到队列,所以会触发ReturnCallback,需要在ReturnCallback方法中判断,如果是延迟消息不要向失败消息表插入记录。

2.3.自动取消超时未支付订单(作业)

2.3.1. 需求分析

接下来,我们就在交易服务中利用延迟消息实现订单超时取消功能,假设订单未支付超时时间为30分钟,下单后30内不支付将自动取消订单,其大概思路如下:

05-异步消息组件MQ高级

理论上我们应该在下单时发送一条延迟消息,延迟时间为30分钟。这样就可以在接收到消息时检验订单支付状态,关闭未支付订单。

不过大家设想,极端情况如果用户在下单后正好30分钟去支付了,此时延迟消息发送到交易服务,交易服务收到延迟消息后取消订单,用户支付成功,问题出现。

我们可以把延迟消息的时间定为35分钟,当下单30分钟后用户无法支付,35分钟后自动取消订单避免取消订单后用户支付成功的问题。交互流程如下:

05-异步消息组件MQ高级

2.3.2 基础配置

我们在常量类中配置交换机、队列、RoutingKey等常量,内容如下:



package com.hmall.common.constants;
 
public interface MqConstants{
    String DELAY_EXCHANGE_NAME = "trade.delay.direct";
    String DELAY_ORDER_QUEUE_NAME = "trade.delay.order.queue";
    String DELAY_ORDER_KEY = "delay.order.query";
}


trade-service
模块的
pom.xml
中引入amqp的依赖:



<!--amqp-->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>


trade-service

application.yaml
中添加MQ的配置:



spring:
  rabbitmq:
    host: 192.168.101.68
    port: 5672
    virtual-host: /hmall
    username: hmall
    password: 123

2.3.3. 发送延迟消息

接下来,我们改造下单业务,在下单完成后,发送延迟消息。

修改
trade-service
模块的
com.hmall.trade.service.impl.OrderServiceImpl
类的
createOrder
方法,添加消息发送的代码:



@Override
//    @Transactional
@GlobalTransactional
public Long createOrder(OrderFormDTO orderFormDTO) {
    ....
    ....
    //todo 发送延迟消息
    
    return order.getId();
}

这里延迟消息的时间应该是35分钟,不过我们为了测试方便暂时设置为10秒。

2.3.4. 接收延迟消息

接下来,我们在
trader-service
编写一个监听器,监听延迟消息,查询订单支付状态:

05-异步消息组件MQ高级

代码如下:



package com.hmall.trade.listener;
 
import com.hmall.api.client.PayClient;
import com.hmall.api.dto.PayOrderDTO;
import com.hmall.common.constants.MqConstants;
import com.hmall.trade.domain.po.Order;
import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
@Component
@RequiredArgsConstructor
public class OrderDelayMessageListener {
 
    private final IOrderService orderService;
 
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = MqConstants.DELAY_ORDER_QUEUE_NAME),
        exchange = @Exchange(name = MqConstants.DELAY_EXCHANGE_NAME, delayed = "true"),
        key = MqConstants.DELAY_ORDER_KEY
    ))
    public void listenOrderDelayMessage(Long orderId) {
        // 1.查询订单
        Order order = orderService.getById(orderId);
        // 2.检测订单状态,判断是否已支付
        if (order == null || order.getStatus() != 1) {
            // 订单不存在或者已经支付
            return;
        }
        // 3.todo 未支付,需要查询支付流水状态
 
        // 4.判断是否支付
 
        // 4.1.已支付,标记订单状态为已支付
        orderService.markOrderPaySuccess(orderId);
 
        // TODO 4.2.未支付,取消订单,回滚库存
 
    }
}

逻辑如下:

1、收到消息表示支付成功,查询支付系统最新的支付状态

2、如果已支付则标记订单为已支付

3、如果未支付则取消订单

取消订单需要完成两件事情:

将订单状态修改为已关闭将订单中已经扣除的库存重新加回来

大家在
IOrderService
接口中定义
cancelOrder
方法,此方法为取消订单的service方法。


void cancelOrder(Long orderId);

并且在
OrderServiceImpl
中实现该方法,实现过程中要注意业务幂等性判断。

2.3.5. 编写查询支付状态接口

由于MQ消息处理时需要查询当前最新的支付状态,因此我们要在
pay-service
模块定义查询支付状态的接口


pay-service
模块的
PayController
中实现该接口:



@ApiOperation("根据id查询支付单")
@GetMapping("/biz/{id}")
public PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id){
    PayOrder payOrder = payOrderService.lambdaQuery().eq(PayOrder::getBizOrderNo, id).one();
    return BeanUtils.copyBean(payOrder, PayOrderDTO.class);
}

此接口会在api工程定义,所以将PayOrderDTO 在api工程定义,PayOrderDTO 的结果同PayOrder ,为防止返回数据有变这里定义为DTO类型,编写时可直接将PayOrder 拷贝至PayOrderDTO 中并进行微改。


PayOrderDTO
代码如下:



package com.hmall.api.pay.dto;
 
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
 
import java.time.LocalDateTime;
 
/**
 * <p>
 * 支付订单
 * </p>
 */
@Data
@ApiModel(description = "支付单数据传输实体")
public class PayOrderDTO {
    @ApiModelProperty("id")
    private Long id;
    @ApiModelProperty("业务订单号")
    private Long bizOrderNo;
    @ApiModelProperty("支付单号")
    private Long payOrderNo;
    @ApiModelProperty("支付用户id")
    private Long bizUserId;
    @ApiModelProperty("支付渠道编码")
    private String payChannelCode;
    @ApiModelProperty("支付金额,单位分")
    private Integer amount;
    @ApiModelProperty("付类型,1:h5,2:小程序,3:公众号,4:扫码,5:余额支付")
    private Integer payType;
    @ApiModelProperty("付状态,0:待提交,1:待支付,2:支付超时或取消,3:支付成功")
    private Integer status;
    @ApiModelProperty("拓展字段,用于传递不同渠道单独处理的字段")
    private String expandJson;
    @ApiModelProperty("第三方返回业务码")
    private String resultCode;
    @ApiModelProperty("第三方返回提示信息")
    private String resultMsg;
    @ApiModelProperty("支付成功时间")
    private LocalDateTime paySuccessTime;
    @ApiModelProperty("支付超时时间")
    private LocalDateTime payOverTime;
    @ApiModelProperty("支付二维码链接")
    private String qrCodeUrl;
    @ApiModelProperty("创建时间")
    private LocalDateTime createTime;
    @ApiModelProperty("更新时间")
    private LocalDateTime updateTime;
}

接下来定义接口对应的
FeignClient
.


PayClient
代码如下:



package com.hmall.api.client;
 
import com.hmall.api.client.fallback.PayClientFallback;
import com.hmall.api.dto.PayOrderDTO;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
 
@FeignClient(value = "pay-service", fallbackFactory = PayClientFallbackFactory .class)
public interface PayClient {
    /**
     * 根据交易订单id查询支付单
     * @param id 业务订单id
     * @return 支付单信息
     */
    @GetMapping("/pay-orders/biz/{id}")
    PayOrderDTO queryPayOrderByBizOrderNo(@PathVariable("id") Long id);
}


PayClientFallbackFactory
代码如下:



package com.hmall.api.client.fallback;
 
import com.hmall.api.client.PayClient;
import com.hmall.api.dto.PayOrderDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.openfeign.FallbackFactory;
 
@Slf4j
public class PayClientFallbackFactory implements FallbackFactory<PayClient> {
    @Override
    public PayClient create(Throwable cause) {
        return new PayClient() {
            @Override
            public PayOrderDTO queryPayOrderByBizOrderNo(Long id) {
                return null;
            }
        };
    }
}

最终在
hm-api
模块定义三个类:

05-异步消息组件MQ高级

说明:

PayOrderDTO:支付单的数据传输实体PayClient:支付系统的Feign客户端PayClientFallback:支付系统的fallback逻辑

3.MQ集群(拓展)

生产环境通常会搭建MQ集群,通常运维人员负责,如果工作中让Java工程师负责我们只需要参考部署文档进行部署即可。参考:RabbitMQ集群部署

作业

失败消息定时任务重发

参考 1.2.4 小节中描述实现

自动取消超时未支付订单

参考 2.3描述完成“自动取消超时未支付订单”功能。

© 版权声明

相关文章

暂无评论

none
暂无评论...