RocketMQ 实战手册:解锁分布式消息队列核心技能

引言:为什么选择 RocketMQ?

在分布式系统架构中,消息队列扮演着至关重要的角色,它不仅能实现系统解耦、流量削峰,还能保障数据最终一致性。Apache RocketMQ 作为阿里开源的分布式消息中间件,凭借其高吞吐、低延迟、高可靠等特性,已成为金融、电商等核心业务场景的首选。

本文将带你从零开始,深入 RocketMQ 实战,涵盖所有核心配置类与工具类,通过可直接运行的实例,让你快速掌握 RocketMQ 的实战技能。无论你是刚接触消息队列的开发者,还是需要优化现有 RocketMQ 应用的工程师,本文都能为你提供全面且实用的指导。

RocketMQ 核心架构与原理

基本架构

RocketMQ 的核心架构由四部分组成:NameServer、Broker、Producer 和 Consumer。

RocketMQ 实战手册:解锁分布式消息队列核心技能

NameServer:轻量级路由注册中心,管理 Broker 节点信息,提供路由发现服务Broker:消息存储和转发服务器,负责消息的存储、投递和查询等功能Producer:消息生产者,负责创建和发送消息Consumer:消息消费者,负责订阅和消费消息

消息传递流程

RocketMQ 实战手册:解锁分布式消息队列核心技能

生产者启动时向 NameServer 注册,并获取 Topic 对应的 Broker 路由信息生产者根据路由信息将消息发送到对应的 BrokerBroker 将消息存储在磁盘中,并同步到从节点(主从架构时)消费者启动时向 NameServer 注册,并订阅指定 Topic消费者通过拉取或推送方式从 Broker 获取消息消费者处理消息后向 Broker 返回消费结果(ACK)Broker 根据消费结果更新消息状态

环境准备与项目初始化

安装 RocketMQ

下载最新稳定版 RocketMQ(本文使用 5.2.0 版本)



wget https://archive.apache.org/dist/rocketmq/5.2.0/rocketmq-all-5.2.0-bin-release.zip
unzip rocketmq-all-5.2.0-bin-release.zip
cd rocketmq-all-5.2.0-bin-release

启动 NameServer


nohup sh bin/mqnamesrv &

启动 Broker


nohup sh bin/mqbroker -n localhost:9876 &

验证启动是否成功



jps
# 应看到NamesrvStartup和BrokerStartup进程

创建 Maven 项目

创建 Spring Boot 项目,
pom.xml
配置如下:



<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>3.2.0</version>
        <relativePath/>
    </parent>
    <groupId>com.ken.rocketmq</groupId>
    <artifactId>rocketmq-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rocketmq-demo</name>
    <description>RocketMQ实战示例项目</description>
    
    <properties>
        <java.version>17</java.version>
        <rocketmq.version>5.2.0</rocketmq.version>
        <fastjson2.version>2.0.45</fastjson2.version>
        <mybatis-plus.version>3.5.5</mybatis-plus.version>
        <lombok.version>1.18.30</lombok.version>
        <springdoc.version>2.1.0</springdoc.version>
        <guava.version>33.1.0-jre</guava.version>
    </properties>
    
    <dependencies>
        <!-- Spring Boot核心 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        
        <!-- RocketMQ -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>${rocketmq.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>${rocketmq.version}</version>
        </dependency>
        
        <!-- Lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
            <scope>provided</scope>
        </dependency>
        
        <!-- Fastjson2 -->
        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>${fastjson2.version}</version>
        </dependency>
        
        <!-- MyBatis-Plus -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>${mybatis-plus.version}</version>
        </dependency>
        
        <!-- MySQL驱动 -->
        <dependency>
            <groupId>com.mysql</groupId>
            <artifactId>mysql-connector-j</artifactId>
            <scope>runtime</scope>
        </dependency>
        
        <!-- Swagger3 -->
        <dependency>
            <groupId>org.springdoc</groupId>
            <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
            <version>${springdoc.version}</version>
        </dependency>
        
        <!-- Guava -->
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>${guava.version}</version>
        </dependency>
        
        <!-- 测试 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

配置文件


application.yml
配置:



spring:
  application:
    name: rocketmq-demo
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/rocketmq_demo?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai
    username: root
    password: root
 
rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: demo-producer-group
    send-message-timeout: 3000
    retry-times-when-send-failed: 2
    retry-times-when-send-async-failed: 2
  consumer:
    group: demo-consumer-group
    consume-thread-min: 20
    consume-thread-max: 64
    consume-message-batch-max-size: 1
 
mybatis-plus:
  mapper-locations: classpath*:mapper/**/*.xml
  type-aliases-package: com.ken.rocketmq.entity
  configuration:
    map-underscore-to-camel-case: true
    log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
 
springdoc:
  api-docs:
    path: /api-docs
  swagger-ui:
    path: /swagger-ui.html
    operationsSorter: method
 
server:
  port: 8080

核心配置类

生产者配置类



package com.ken.rocketmq.config;
 
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;
 
/**
 * RocketMQ生产者配置类
 * 配置并初始化DefaultMQProducer实例
 *
 * @author ken
 */
@Configuration
@Slf4j
public class RocketMQProducerConfig {
 
    @Value("${rocketmq.name-server}")
    private String nameServer;
 
    @Value("${rocketmq.producer.group}")
    private String producerGroup;
 
    @Value("${rocketmq.producer.send-message-timeout}")
    private int sendMessageTimeout;
 
    @Value("${rocketmq.producer.retry-times-when-send-failed}")
    private int retryTimesWhenSendFailed;
 
    /**
     * 创建默认的消息生产者
     *
     * @return DefaultMQProducer实例
     * @throws MQClientException 初始化生产者异常
     */
    @Bean
    public DefaultMQProducer defaultMQProducer() throws MQClientException {
        // 验证生产者组配置
        StringUtils.hasText(producerGroup, "生产者组不能为空");
        StringUtils.hasText(nameServer, "NameServer地址不能为空");
        
        // 创建生产者实例
        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
        
        // 配置NameServer地址
        producer.setNamesrvAddr(nameServer);
        
        // 设置发送超时时间
        producer.setSendMsgTimeout(sendMessageTimeout);
        
        // 设置同步发送失败重试次数
        producer.setRetryTimesWhenSendFailed(retryTimesWhenSendFailed);
        
        // 启动生产者
        producer.start();
        
        log.info("DefaultMQProducer初始化成功,生产者组:{},NameServer:{}", producerGroup, nameServer);
        return producer;
    }
}

消费者配置类



package com.ken.rocketmq.config;
 
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;
 
/**
 * RocketMQ消费者配置类
 * 配置并初始化DefaultMQPushConsumer实例
 *
 * @author ken
 */
@Configuration
@Slf4j
public class RocketMQConsumerConfig {
 
    @Value("${rocketmq.name-server}")
    private String nameServer;
 
    @Value("${rocketmq.consumer.group}")
    private String consumerGroup;
 
    @Value("${rocketmq.consumer.consume-thread-min}")
    private int consumeThreadMin;
 
    @Value("${rocketmq.consumer.consume-thread-max}")
    private int consumeThreadMax;
 
    @Value("${rocketmq.consumer.consume-message-batch-max-size}")
    private int consumeMessageBatchMaxSize;
 
    /**
     * 创建默认的推模式消费者
     *
     * @return DefaultMQPushConsumer实例
     * @throws MQClientException 初始化消费者异常
     */
    @Bean
    public DefaultMQPushConsumer defaultMQPushConsumer() throws MQClientException {
        // 验证消费者组配置
        StringUtils.hasText(consumerGroup, "消费者组不能为空");
        StringUtils.hasText(nameServer, "NameServer地址不能为空");
        
        // 创建消费者实例
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        
        // 配置NameServer地址
        consumer.setNamesrvAddr(nameServer);
        
        // 设置消费线程池最小线程数
        consumer.setConsumeThreadMin(consumeThreadMin);
        
        // 设置消费线程池最大线程数
        consumer.setConsumeThreadMax(consumeThreadMax);
        
        // 设置每次拉取的最大消息数
        consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
        
        // 注册消息监听器,实际使用时在具体业务类中实现
        // consumer.registerMessageListener(new MessageListenerConcurrently() {...});
        
        // 启动消费者
        consumer.start();
        
        log.info("DefaultMQPushConsumer初始化成功,消费者组:{},NameServer:{}", consumerGroup, nameServer);
        return consumer;
    }
}

事务消息配置类



package com.ken.rocketmq.config;
 
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;
 
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
/**
 * RocketMQ事务消息配置类
 * 配置并初始化TransactionMQProducer实例
 *
 * @author ken
 */
@Configuration
@Slf4j
public class RocketMQTransactionConfig {
 
    @Value("${rocketmq.name-server}")
    private String nameServer;
 
    @Value("${rocketmq.producer.group}")
    private String producerGroup;
 
    /**
     * 事务消息生产者线程池核心线程数
     */
    private static final int CORE_POOL_SIZE = 5;
 
    /**
     * 事务消息生产者线程池最大线程数
     */
    private static final int MAX_POOL_SIZE = 10;
 
    /**
     * 事务消息生产者线程池队列大小
     */
    private static final int QUEUE_CAPACITY = 100;
 
    /**
     * 事务消息生产者线程池空闲线程存活时间
     */
    private static final int KEEP_ALIVE_TIME = 100;
 
    /**
     * 创建事务消息生产者
     *
     * @param transactionListener 事务监听器
     * @return TransactionMQProducer实例
     * @throws MQClientException 初始化事务生产者异常
     */
    @Bean
    public TransactionMQProducer transactionMQProducer(TransactionListener transactionListener) throws MQClientException {
        // 验证配置
        StringUtils.hasText(producerGroup, "生产者组不能为空");
        StringUtils.hasText(nameServer, "NameServer地址不能为空");
        
        // 创建事务消息生产者实例
        TransactionMQProducer producer = new TransactionMQProducer(producerGroup + "-transaction");
        
        // 配置NameServer地址
        producer.setNamesrvAddr(nameServer);
        
        // 创建线程池用于处理事务消息的本地事务和回查
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                CORE_POOL_SIZE,
                MAX_POOL_SIZE,
                KEEP_ALIVE_TIME,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(QUEUE_CAPACITY),
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
        
        // 设置线程池
        producer.setExecutorService(threadPoolExecutor);
        
        // 设置事务监听器
        producer.setTransactionListener(transactionListener);
        
        // 启动事务消息生产者
        producer.start();
        
        log.info("TransactionMQProducer初始化成功,生产者组:{},NameServer:{}", 
                producerGroup + "-transaction", nameServer);
        return producer;
    }
}

消息追踪配置类



package com.ken.rocketmq.config;
 
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.ObjectUtils;
 
/**
 * RocketMQ消息追踪配置类
 * 配置消息发送和消费的追踪功能
 *
 * @author ken
 */
@Configuration
@AutoConfigureAfter(RocketMQAutoConfiguration.class)
@Slf4j
public class RocketMQTraceConfig {
 
    /**
     * 配置消息追踪增强器
     *
     * @param rocketMQTemplate RocketMQ模板
     * @return 消息追踪增强器
     */
    @Bean
    public RocketMQTraceEnhancer rocketMQTraceEnhancer(RocketMQTemplate rocketMQTemplate) {
        if (ObjectUtils.isEmpty(rocketMQTemplate)) {
            log.error("RocketMQTemplate为空,无法初始化消息追踪增强器");
            throw new IllegalArgumentException("RocketMQTemplate不能为空");
        }
        
        RocketMQTraceEnhancer enhancer = new RocketMQTraceEnhancer(rocketMQTemplate);
        log.info("RocketMQ消息追踪增强器初始化成功");
        return enhancer;
    }
 
    /**
     * RocketMQ消息追踪增强器
     * 用于增强消息发送和消费的追踪能力
     */
    public static class RocketMQTraceEnhancer {
        private final RocketMQTemplate rocketMQTemplate;
 
        public RocketMQTraceEnhancer(RocketMQTemplate rocketMQTemplate) {
            this.rocketMQTemplate = rocketMQTemplate;
            // 初始化追踪配置
            initTraceConfig();
        }
 
        /**
         * 初始化追踪配置
         */
        private void initTraceConfig() {
            // 开启消息发送追踪
            rocketMQTemplate.getProducer().setEnableMsgTrace(true);
            // 设置消息追踪的Topic
            rocketMQTemplate.getProducer().setCustomizedTraceTopic("RMQ_SYS_TRACE_TOPIC");
            
            log.info("RocketMQ消息追踪配置初始化完成");
        }
    }
}

核心工具类

消息实体类

首先定义通用的消息实体类,作为消息传递的载体:



package com.ken.rocketmq.entity;
 
import com.alibaba.fastjson2.annotation.JSONField;
import lombok.Data;
 
import java.io.Serializable;
import java.time.LocalDateTime;
 
/**
 * 通用消息实体类
 * 封装RocketMQ消息的基本属性
 *
 * @author ken
 */
@Data
public class RocketMQMessage<T> implements Serializable {
    private static final long serialVersionUID = 1L;
 
    /**
     * 消息ID,唯一标识
     */
    private String messageId;
 
    /**
     * 业务ID,用于业务追踪
     */
    private String businessId;
 
    /**
     * 消息主题
     */
    private String topic;
 
    /**
     * 消息标签
     */
    private String tag;
 
    /**
     * 消息内容
     */
    private T content;
 
    /**
     * 消息发送时间
     */
    @JSONField(format = "yyyy-MM-dd HH:mm:ss")
    private LocalDateTime sendTime;
 
    /**
     * 消息过期时间(毫秒),0表示不过期
     */
    private long expireTime = 0;
 
    /**
     * 延迟级别,0表示不延迟
     * RocketMQ延迟级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
     */
    private int delayLevel = 0;
}

消息发送工具类



package com.ken.rocketmq.util;
 
import com.alibaba.fastjson2.JSON;
import com.ken.rocketmq.entity.RocketMQMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
 
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
 
/**
 * RocketMQ消息发送工具类
 * 封装各种类型消息的发送方法
 *
 * @author ken
 */
@Component
@Slf4j
public class RocketMQSendUtil {
 
    @Autowired
    private DefaultMQProducer defaultMQProducer;
 
    @Autowired
    private TransactionMQProducer transactionMQProducer;
 
    /**
     * 发送同步消息
     *
     * @param topic   消息主题
     * @param tag     消息标签
     * @param content 消息内容
     * @param <T>     消息内容类型
     * @return 发送结果
     * @throws MQClientException    客户端异常
     * @throws MQBrokerException    Broker异常
     * @throws RemotingException    远程调用异常
     * @throws InterruptedException 线程中断异常
     */
    public <T> SendResult sendSyncMessage(String topic, String tag, T content)
            throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        return sendSyncMessage(buildMessage(topic, tag, content, null, 0, 0));
    }
 
    /**
     * 发送同步消息
     *
     * @param topic       消息主题
     * @param tag         消息标签
     * @param content     消息内容
     * @param businessId  业务ID
     * @param expireTime  过期时间(毫秒)
     * @param delayLevel  延迟级别
     * @param <T>         消息内容类型
     * @return 发送结果
     * @throws MQClientException    客户端异常
     * @throws MQBrokerException    Broker异常
     * @throws RemotingException    远程调用异常
     * @throws InterruptedException 线程中断异常
     */
    public <T> SendResult sendSyncMessage(String topic, String tag, T content, String businessId,
                                          long expireTime, int delayLevel)
            throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        return sendSyncMessage(buildMessage(topic, tag, content, businessId, expireTime, delayLevel));
    }
 
    /**
     * 发送同步消息
     *
     * @param message 消息对象
     * @param <T>     消息内容类型
     * @return 发送结果
     * @throws MQClientException    客户端异常
     * @throws MQBrokerException    Broker异常
     * @throws RemotingException    远程调用异常
     * @throws InterruptedException 线程中断异常
     */
    public <T> SendResult sendSyncMessage(RocketMQMessage<T> message)
            throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        validateMessage(message);
        Message rocketMsg = convertToRocketMessage(message);
        
        log.info("发送同步消息,messageId: {}", message.getMessageId());
        
        SendResult sendResult = defaultMQProducer.send(rocketMsg);
        log.info("同步消息发送完成,messageId: {},发送结果: {}", message.getMessageId(), sendResult.getSendStatus());
        
        return sendResult;
    }
 
    /**
     * 发送异步消息
     *
     * @param topic    消息主题
     * @param tag      消息标签
     * @param content  消息内容
     * @param callback 回调函数
     * @param <T>      消息内容类型
     * @throws MQClientException    客户端异常
     * @throws RemotingException    远程调用异常
     * @throws InterruptedException 线程中断异常
     */
    public <T> void sendAsyncMessage(String topic, String tag, T content, SendCallback callback)
            throws MQClientException, RemotingException, InterruptedException {
        sendAsyncMessage(buildMessage(topic, tag, content, null, 0, 0), callback);
    }
 
    /**
     * 发送异步消息
     *
     * @param message  消息对象
     * @param callback 回调函数
     * @param <T>      消息内容类型
     * @throws MQClientException    客户端异常
     * @throws RemotingException    远程调用异常
     * @throws InterruptedException 线程中断异常
     */
    public <T> void sendAsyncMessage(RocketMQMessage<T> message, SendCallback callback)
            throws MQClientException, RemotingException, InterruptedException {
        validateMessage(message);
        Message rocketMsg = convertToRocketMessage(message);
        
        log.info("发送异步消息,messageId: {}", message.getMessageId());
        
        defaultMQProducer.send(rocketMsg, callback);
        log.info("异步消息发送请求已提交,messageId: {}", message.getMessageId());
    }
 
    /**
     * 发送单向消息(只负责发送消息,不等待服务器回应,没有回调函数)
     *
     * @param topic   消息主题
     * @param tag     消息标签
     * @param content 消息内容
     * @param <T>     消息内容类型
     * @throws MQClientException    客户端异常
     * @throws RemotingException    远程调用异常
     * @throws InterruptedException 线程中断异常
     */
    public <T> void sendOnewayMessage(String topic, String tag, T content)
            throws MQClientException, RemotingException, InterruptedException {
        sendOnewayMessage(buildMessage(topic, tag, content, null, 0, 0));
    }
 
    /**
     * 发送单向消息
     *
     * @param message 消息对象
     * @param <T>     消息内容类型
     * @throws MQClientException    客户端异常
     * @throws RemotingException    远程调用异常
     * @throws InterruptedException 线程中断异常
     */
    public <T> void sendOnewayMessage(RocketMQMessage<T> message)
            throws MQClientException, RemotingException, InterruptedException {
        validateMessage(message);
        Message rocketMsg = convertToRocketMessage(message);
        
        log.info("发送单向消息,messageId: {}", message.getMessageId());
        
        defaultMQProducer.sendOneway(rocketMsg);
        log.info("单向消息发送完成,messageId: {}", message.getMessageId());
    }
 
    /**
     * 发送顺序消息
     *
     * @param topic      消息主题
     * @param tag        消息标签
     * @param content    消息内容
     * @param shardingKey 用于确定消息队列的分片键,相同键的消息会被发送到同一个队列
     * @param <T>        消息内容类型
     * @return 发送结果
     * @throws MQClientException    客户端异常
     * @throws MQBrokerException    Broker异常
     * @throws RemotingException    远程调用异常
     * @throws InterruptedException 线程中断异常
     */
    public <T> SendResult sendOrderlyMessage(String topic, String tag, T content, String shardingKey)
            throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        RocketMQMessage<T> message = buildMessage(topic, tag, content, null, 0, 0);
        return sendOrderlyMessage(message, shardingKey);
    }
 
    /**
     * 发送顺序消息
     *
     * @param message    消息对象
     * @param shardingKey 用于确定消息队列的分片键
     * @param <T>        消息内容类型
     * @return 发送结果
     * @throws MQClientException    客户端异常
     * @throws MQBrokerException    Broker异常
     * @throws RemotingException    远程调用异常
     * @throws InterruptedException 线程中断异常
     */
    public <T> SendResult sendOrderlyMessage(RocketMQMessage<T> message, String shardingKey)
            throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        validateMessage(message);
        StringUtils.hasText(shardingKey, "分片键不能为空");
        
        Message rocketMsg = convertToRocketMessage(message);
        
        log.info("发送顺序消息,messageId: {},shardingKey: {}", message.getMessageId(), shardingKey);
        
        // 根据shardingKey选择消息队列
        SendResult sendResult = defaultMQProducer.send(rocketMsg, (mqs, msg, arg) -> {
            String key = (String) arg;
            int index = key.hashCode() % mqs.size();
            return mqs.get(index);
        }, shardingKey);
        
        log.info("顺序消息发送完成,messageId: {},发送结果: {}", message.getMessageId(), sendResult.getSendStatus());
        
        return sendResult;
    }
 
    /**
     * 发送事务消息
     *
     * @param topic   消息主题
     * @param tag     消息标签
     * @param content 消息内容
     * @param arg     事务参数,将传递给TransactionListener
     * @param <T>     消息内容类型
     * @return 发送结果
     * @throws MQClientException    客户端异常
     * @throws RemotingException    远程调用异常
     * @throws InterruptedException 线程中断异常
     * @throws ExecutionException   执行异常
     */
    public <T> SendResult sendTransactionMessage(String topic, String tag, T content, Object arg)
            throws MQClientException, RemotingException, InterruptedException, ExecutionException {
        RocketMQMessage<T> message = buildMessage(topic, tag, content, null, 0, 0);
        return sendTransactionMessage(message, arg);
    }
 
    /**
     * 发送事务消息
     *
     * @param message 消息对象
     * @param arg     事务参数
     * @param <T>     消息内容类型
     * @return 发送结果
     * @throws MQClientException    客户端异常
     * @throws RemotingException    远程调用异常
     * @throws InterruptedException 线程中断异常
     * @throws ExecutionException   执行异常
     */
    public <T> SendResult sendTransactionMessage(RocketMQMessage<T> message, Object arg)
            throws MQClientException, RemotingException, InterruptedException, ExecutionException {
        validateMessage(message);
        
        Message rocketMsg = convertToRocketMessage(message);
        
        log.info("发送事务消息,messageId: {}", message.getMessageId());
        
        SendResult sendResult = transactionMQProducer.sendMessageInTransaction(rocketMsg, arg);
        
        log.info("事务消息发送完成,messageId: {},发送结果: {}", message.getMessageId(), sendResult.getSendStatus());
        
        return sendResult;
    }
 
    /**
     * 构建消息对象
     *
     * @param topic       消息主题
     * @param tag         消息标签
     * @param content     消息内容
     * @param businessId  业务ID
     * @param expireTime  过期时间
     * @param delayLevel  延迟级别
     * @param <T>         消息内容类型
     * @return 消息对象
     */
    private <T> RocketMQMessage<T> buildMessage(String topic, String tag, T content, 
                                              String businessId, long expireTime, int delayLevel) {
        StringUtils.hasText(topic, "消息主题不能为空");
        
        RocketMQMessage<T> message = new RocketMQMessage<>();
        message.setMessageId(UUID.randomUUID().toString().replaceAll("-", ""));
        message.setBusinessId(StringUtils.hasText(businessId) ? businessId : UUID.randomUUID().toString());
        message.setTopic(topic);
        message.setTag(tag);
        message.setContent(content);
        message.setSendTime(LocalDateTime.now());
        message.setExpireTime(expireTime);
        message.setDelayLevel(delayLevel);
        
        return message;
    }
 
    /**
     * 验证消息对象
     *
     * @param message 消息对象
     * @param <T>     消息内容类型
     */
    private <T> void validateMessage(RocketMQMessage<T> message) {
        if (ObjectUtils.isEmpty(message)) {
            throw new IllegalArgumentException("消息对象不能为空");
        }
        StringUtils.hasText(message.getTopic(), "消息主题不能为空");
        if (ObjectUtils.isEmpty(message.getContent())) {
            throw new IllegalArgumentException("消息内容不能为空");
        }
        StringUtils.hasText(message.getMessageId(), "消息ID不能为空");
    }
 
    /**
     * 将自定义消息对象转换为RocketMQ的Message对象
     *
     * @param message 自定义消息对象
     * @param <T>     消息内容类型
     * @return RocketMQ的Message对象
     */
    private <T> Message convertToRocketMessage(RocketMQMessage<T> message) {
        // 构建主题和标签
        String topic = message.getTopic();
        String tag = message.getTag();
        String fullTopic = StringUtils.hasText(tag) ? topic + ":" + tag : topic;
        
        // 序列化消息内容
        byte[] body = JSON.toJSONBytes(message);
        
        // 创建RocketMQ消息对象
        Message rocketMsg = new Message(fullTopic, body);
        
        // 设置消息ID
        rocketMsg.setKeys(message.getMessageId());
        
        // 设置业务ID
        rocketMsg.putUserProperty("businessId", message.getBusinessId());
        
        // 设置过期时间
        if (message.getExpireTime() > 0) {
            rocketMsg.setDelayTimeLevel(0);
            rocketMsg.setBornTimestamp(System.currentTimeMillis() + message.getExpireTime());
        }
        
        // 设置延迟级别
        if (message.getDelayLevel() > 0) {
            rocketMsg.setDelayTimeLevel(message.getDelayLevel());
        }
        
        return rocketMsg;
    }
}

消息消费工具类



package com.ken.rocketmq.util;
 
import com.alibaba.fastjson2.JSON;
import com.ken.rocketmq.entity.RocketMQMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
 
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
 
/**
 * RocketMQ消息消费工具类
 * 封装消息订阅和消费的相关方法
 *
 * @author ken
 */
@Component
@Slf4j
public class RocketMQConsumeUtil {
 
    @Autowired
    private DefaultMQPushConsumer defaultMQPushConsumer;
 
    /**
     * 订阅主题并设置并发消息监听器
     *
     * @param topic        消息主题
     * @param tag          消息标签,*表示所有标签
     * @param messageClass 消息内容类型
     * @param consumer     消息处理函数
     * @param <T>          消息内容类型
     * @throws MQClientException 客户端异常
     */
    public <T> void subscribe(String topic, String tag, Class<T> messageClass, Consumer<RocketMQMessage<T>> consumer)
            throws MQClientException {
        subscribe(topic, tag, messageClass, consumer, null);
    }
 
    /**
     * 订阅主题并设置并发消息监听器
     *
     * @param topic        消息主题
     * @param tag          消息标签
     * @param messageClass 消息内容类型
     * @param consumer     消息处理函数
     * @param properties   订阅属性
     * @param <T>          消息内容类型
     * @throws MQClientException 客户端异常
     */
    public <T> void subscribe(String topic, String tag, Class<T> messageClass, 
                             Consumer<RocketMQMessage<T>> consumer, Map<String, String> properties)
            throws MQClientException {
        StringUtils.hasText(topic, "消息主题不能为空");
        StringUtils.hasText(tag, "消息标签不能为空");
        if (ObjectUtils.isEmpty(messageClass)) {
            throw new IllegalArgumentException("消息内容类型不能为空");
        }
        if (ObjectUtils.isEmpty(consumer)) {
            throw new IllegalArgumentException("消息处理函数不能为空");
        }
        
        // 设置消费起始位置
        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        
        // 注册消息监听器
        defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            if (CollectionUtils.isEmpty(msgs)) {
                log.warn("接收到空的消息列表");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            
            for (MessageExt msg : msgs) {
                try {
                    log.info("接收到消息,messageId: {},topic: {},tag: {},queueId: {},reconsumeTimes: {}",
                            msg.getMsgId(), msg.getTopic(), msg.getTags(), msg.getQueueId(), msg.getReconsumeTimes());
                    
                    // 解析消息
                    RocketMQMessage<T> message = parseMessage(msg, messageClass);
                    if (ObjectUtils.isEmpty(message)) {
                        log.error("消息解析失败,messageId: {}", msg.getMsgId());
                        continue;
                    }
                    
                    // 处理消息
                    consumer.accept(message);
                    
                } catch (Exception e) {
                    log.error("消息处理异常,messageId: {}", msg.getMsgId(), e);
                    // 如果重试次数超过3次,记录到死信队列或进行其他处理
                    if (msg.getReconsumeTimes() >= 3) {
                        log.error("消息重试次数已达上限,将不再重试,messageId: {}", msg.getMsgId());
                        // TODO: 死信队列处理逻辑
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                    // 消息处理失败,返回重试
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
            
            // 消息处理成功
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        
        // 订阅主题
        String subExpression = StringUtils.hasText(tag) ? tag : "*";
        if (ObjectUtils.isEmpty(properties)) {
            defaultMQPushConsumer.subscribe(topic, subExpression);
        } else {
            defaultMQPushConsumer.subscribe(topic, subExpression, properties);
        }
        
        log.info("已订阅主题,topic: {},tag: {}", topic, subExpression);
    }
 
    /**
     * 订阅主题并设置顺序消息监听器
     *
     * @param topic        消息主题
     * @param tag          消息标签
     * @param messageClass 消息内容类型
     * @param consumer     消息处理函数
     * @param <T>          消息内容类型
     * @throws MQClientException 客户端异常
     */
    public <T> void subscribeOrderly(String topic, String tag, Class<T> messageClass, 
                                    Consumer<RocketMQMessage<T>> consumer) throws MQClientException {
        StringUtils.hasText(topic, "消息主题不能为空");
        StringUtils.hasText(tag, "消息标签不能为空");
        if (ObjectUtils.isEmpty(messageClass)) {
            throw new IllegalArgumentException("消息内容类型不能为空");
        }
        if (ObjectUtils.isEmpty(consumer)) {
            throw new IllegalArgumentException("消息处理函数不能为空");
        }
        
        // 设置消费起始位置
        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        
        // 注册顺序消息监听器
        defaultMQPushConsumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
            context.setAutoCommit(true);
            
            if (CollectionUtils.isEmpty(msgs)) {
                log.warn("接收到空的消息列表");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
            
            for (MessageExt msg : msgs) {
                try {
                    log.info("接收到顺序消息,messageId: {},topic: {},tag: {},queueId: {},reconsumeTimes: {}",
                            msg.getMsgId(), msg.getTopic(), msg.getTags(), msg.getQueueId(), msg.getReconsumeTimes());
                    
                    // 解析消息
                    RocketMQMessage<T> message = parseMessage(msg, messageClass);
                    if (ObjectUtils.isEmpty(message)) {
                        log.error("消息解析失败,messageId: {}", msg.getMsgId());
                        continue;
                    }
                    
                    // 处理消息
                    consumer.accept(message);
                    
                } catch (Exception e) {
                    log.error("顺序消息处理异常,messageId: {}", msg.getMsgId(), e);
                    // 顺序消息处理失败,返回重试
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
            
            // 消息处理成功
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        
        // 订阅主题
        String subExpression = StringUtils.hasText(tag) ? tag : "*";
        defaultMQPushConsumer.subscribe(topic, subExpression);
        
        log.info("已订阅顺序消息主题,topic: {},tag: {}", topic, subExpression);
    }
 
    /**
     * 解析消息
     *
     * @param msg          RocketMQ消息对象
     * @param messageClass 消息内容类型
     * @param <T>          消息内容类型
     * @return 解析后的消息对象
     */
    private <T> RocketMQMessage<T> parseMessage(MessageExt msg, Class<T> messageClass) {
        try {
            // 从消息体中解析RocketMQMessage对象
            String body = new String(msg.getBody(), StandardCharsets.UTF_8);
            RocketMQMessage<T> message = JSON.parseObject(body, RocketMQMessage.class);
            
            if (ObjectUtils.isEmpty(message)) {
                log.error("消息体解析为空,messageId: {}", msg.getMsgId());
                return null;
            }
            
            // 解析消息内容
            if (!ObjectUtils.isEmpty(message.getContent()) && message.getContent() instanceof String) {
                String contentStr = (String) message.getContent();
                T content = JSON.parseObject(contentStr, messageClass);
                message.setContent(content);
            }
            
            return message;
        } catch (Exception e) {
            log.error("消息解析异常,messageId: {}", msg.getMsgId(), e);
            return null;
        }
    }
 
    /**
     * 取消订阅
     *
     * @param topic 消息主题
     * @throws MQClientException 客户端异常
     */
    public void unsubscribe(String topic) throws MQClientException {
        StringUtils.hasText(topic, "消息主题不能为空");
        
        defaultMQPushConsumer.unsubscribe(topic);
        log.info("已取消订阅主题,topic: {}", topic);
    }
}

消息查询工具类



package com.ken.rocketmq.util;
 
import com.alibaba.fastjson2.JSON;
import com.ken.rocketmq.entity.RocketMQMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
 
import java.nio.charset.StandardCharsets;
import java.util.List;
 
/**
 * RocketMQ消息查询工具类
 * 提供消息查询相关功能
 *
 * @author ken
 */
@Component
@Slf4j
public class RocketMQQueryUtil {
 
    @Autowired
    private DefaultMQProducer defaultMQProducer;
 
    /**
     * 根据消息ID查询消息
     *
     * @param topic     消息主题
     * @param messageId 消息ID
     * @return 消息对象
     * @throws MQClientException 客户端异常
     */
    public MessageExt queryMessageByMessageId(String topic, String messageId) throws MQClientException {
        StringUtils.hasText(topic, "消息主题不能为空");
        StringUtils.hasText(messageId, "消息ID不能为空");
        
        log.info("根据消息ID查询消息,topic: {},messageId: {}", topic, messageId);
        
        // 查询消息
        MessageExt messageExt = defaultMQProducer.viewMessage(topic, messageId);
        
        if (ObjectUtils.isEmpty(messageExt)) {
            log.warn("未找到消息,topic: {},messageId: {}", topic, messageId);
            return null;
        }
        
        log.info("消息查询成功,topic: {},messageId: {},queueId: {},storeTime: {}",
                topic, messageId, messageExt.getQueueId(), messageExt.getStoreTimestamp());
        
        return messageExt;
    }
 
    /**
     * 根据消息ID查询消息并转换为自定义消息对象
     *
     * @param topic        消息主题
     * @param messageId    消息ID
     * @param messageClass 消息内容类型
     * @param <T>          消息内容类型
     * @return 自定义消息对象
     * @throws MQClientException 客户端异常
     */
    public <T> RocketMQMessage<T> queryRocketMessageByMessageId(String topic, String messageId, Class<T> messageClass)
            throws MQClientException {
        MessageExt messageExt = queryMessageByMessageId(topic, messageId);
        if (ObjectUtils.isEmpty(messageExt)) {
            return null;
        }
        
        return parseMessage(messageExt, messageClass);
    }
 
    /**
     * 根据业务ID查询消息
     *
     * @param topic      消息主题
     * @param businessId 业务ID
     * @param maxNum     最大查询数量
     * @return 消息列表
     * @throws MQClientException 客户端异常
     */
    public List<MessageExt> queryMessageByBusinessId(String topic, String businessId, int maxNum)
            throws MQClientException {
        StringUtils.hasText(topic, "消息主题不能为空");
        StringUtils.hasText(businessId, "业务ID不能为空");
        if (maxNum <= 0 || maxNum > 32) {
            throw new IllegalArgumentException("最大查询数量必须在1到32之间");
        }
        
        log.info("根据业务ID查询消息,topic: {},businessId: {},maxNum: {}", topic, businessId, maxNum);
        
        // 根据业务ID查询消息,业务ID存储在消息的用户属性中
        // 注意:RocketMQ的queryMessage方法主要用于根据key查询,这里需要遍历队列查询
        // 实际应用中,建议将businessId设置为消息的key,以便更高效查询
        List<MessageExt> messageExts = defaultMQProducer.queryMessage(topic, businessId, maxNum, 0, System.currentTimeMillis());
        
        if (ObjectUtils.isEmpty(messageExts)) {
            log.warn("未找到消息,topic: {},businessId: {}", topic, businessId);
            return null;
        }
        
        log.info("消息查询成功,topic: {},businessId: {},找到{}条消息", topic, businessId, messageExts.size());
        
        return messageExts;
    }
 
    /**
     * 解析消息为自定义消息对象
     *
     * @param messageExt   RocketMQ消息对象
     * @param messageClass 消息内容类型
     * @param <T>          消息内容类型
     * @return 自定义消息对象
     */
    private <T> RocketMQMessage<T> parseMessage(MessageExt messageExt, Class<T> messageClass) {
        try {
            String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
            RocketMQMessage<T> message = JSON.parseObject(body, RocketMQMessage.class);
            
            if (ObjectUtils.isEmpty(message)) {
                log.error("消息体解析为空,messageId: {}", messageExt.getMsgId());
                return null;
            }
            
            // 解析消息内容
            if (!ObjectUtils.isEmpty(message.getContent()) && message.getContent() instanceof String) {
                String contentStr = (String) message.getContent();
                T content = JSON.parseObject(contentStr, messageClass);
                message.setContent(content);
            }
            
            return message;
        } catch (Exception e) {
            log.error("消息解析异常,messageId: {}", messageExt.getMsgId(), e);
            return null;
        }
    }
}

事务消息监听器



package com.ken.rocketmq.listener;
 
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
 
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
 
/**
 * 事务消息监听器
 * 处理本地事务执行和事务状态回查
 *
 * @author ken
 */
@Component
@Slf4j
public class RocketMQTransactionListener implements TransactionListener {
 
    /**
     * 存储本地事务状态,实际应用中应持久化到数据库
     */
    private final ConcurrentMap<String, LocalTransactionState> transactionStateMap = new ConcurrentHashMap<>();
 
    /**
     * 执行本地事务
     *
     * @param msg 半事务消息
     * @param arg 事务参数
     * @return 本地事务状态:COMMIT_MESSAGE提交,ROLLBACK_MESSAGE回滚,UNKNOW未知状态
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String transactionId = msg.getTransactionId();
        log.info("开始执行本地事务,transactionId: {},消息内容: {}", transactionId, new String(msg.getBody()));
        
        try {
            // 这里执行实际的本地事务逻辑
            // 例如:数据库操作、业务逻辑处理等
            
            // 模拟本地事务处理
            if (arg != null && arg instanceof String) {
                String businessKey = (String) arg;
                log.info("执行本地事务,businessKey: {}", businessKey);
                
                // 实际业务中根据处理结果返回不同状态
                if ("success".equals(businessKey)) {
                    // 本地事务执行成功,提交消息
                    transactionStateMap.put(transactionId, LocalTransactionState.COMMIT_MESSAGE);
                    return LocalTransactionState.COMMIT_MESSAGE;
                } else if ("fail".equals(businessKey)) {
                    // 本地事务执行失败,回滚消息
                    transactionStateMap.put(transactionId, LocalTransactionState.ROLLBACK_MESSAGE);
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }
            }
            
            // 未知状态,需要后续回查
            transactionStateMap.put(transactionId, LocalTransactionState.UNKNOW);
            return LocalTransactionState.UNKNOW;
            
        } catch (Exception e) {
            log.error("本地事务执行异常,transactionId: {}", transactionId, e);
            // 本地事务执行异常,回滚消息
            transactionStateMap.put(transactionId, LocalTransactionState.ROLLBACK_MESSAGE);
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
 
    /**
     * 回查本地事务状态
     * 当executeLocalTransaction返回UNKNOW时,Broker会定期调用此方法回查事务状态
     *
     * @param msg 半事务消息
     * @return 本地事务状态
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        String transactionId = msg.getTransactionId();
        log.info("回查本地事务状态,transactionId: {},消息内容: {}", transactionId, new String(msg.getBody()));
        
        try {
            // 从存储中获取事务状态
            LocalTransactionState state = transactionStateMap.get(transactionId);
            
            if (state == null) {
                // 未找到事务状态,可能是本地事务还未执行完成,返回UNKNOW继续回查
                log.warn("未找到本地事务状态,transactionId: {}", transactionId);
                return LocalTransactionState.UNKNOW;
            }
            
            log.info("本地事务状态回查结果,transactionId: {},状态: {}", transactionId, state);
            
            // 如果状态是COMMIT或ROLLBACK,移除缓存的状态
            if (state != LocalTransactionState.UNKNOW) {
                transactionStateMap.remove(transactionId);
            }
            
            return state;
            
        } catch (Exception e) {
            log.error("本地事务状态回查异常,transactionId: {}", transactionId, e);
            return LocalTransactionState.UNKNOW;
        }
    }
}

实战案例

1. 普通消息发送与消费

消息发送服务


package com.ken.rocketmq.service;
 
import com.ken.rocketmq.entity.Order;
import com.ken.rocketmq.entity.RocketMQMessage;
import com.ken.rocketmq.util.RocketMQSendUtil;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
 
/**
 * 普通消息发送服务
 *
 * @author ken
 */
@RestController
@RequestMapping("/message")
@Tag(name = "普通消息接口", description = "提供同步、异步、单向消息的发送功能")
@Slf4j
public class MessageSendService {
 
    @Autowired
    private RocketMQSendUtil rocketMQSendUtil;
 
    /**
     * 消息主题
     */
    private static final String ORDER_TOPIC = "order_topic";
 
    /**
     * 消息标签
     */
    private static final String CREATE_ORDER_TAG = "create_order";
 
    /**
     * 发送同步消息
     *
     * @param order 订单信息
     * @return 发送结果
     */
    @PostMapping("/sync")
    @Operation(summary = "发送同步消息", description = "发送创建订单的同步消息")
    public String sendSyncMessage(
            @Parameter(description = "订单信息") @RequestBody Order order) {
        try {
            SendResult sendResult = rocketMQSendUtil.sendSyncMessage(
                    ORDER_TOPIC, CREATE_ORDER_TAG, order);
            return "同步消息发送成功,messageId: " + sendResult.getMsgId() + 
                    ",状态: " + sendResult.getSendStatus();
        } catch (MQClientException | MQBrokerException | RemotingException | InterruptedException e) {
            log.error("同步消息发送失败", e);
            return "同步消息发送失败:" + e.getMessage();
        }
    }
 
    /**
     * 发送异步消息
     *
     * @param order 订单信息
     * @return 发送结果
     */
    @PostMapping("/async")
    @Operation(summary = "发送异步消息", description = "发送创建订单的异步消息")
    public String sendAsyncMessage(
            @Parameter(description = "订单信息") @RequestBody Order order) {
        try {
            rocketMQSendUtil.sendAsyncMessage(ORDER_TOPIC, CREATE_ORDER_TAG, order, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    log.info("异步消息发送成功,messageId: {},状态: {}", 
                            sendResult.getMsgId(), sendResult.getSendStatus());
                    // 异步发送成功后的处理逻辑
                }
 
                @Override
                public void onException(Throwable e) {
                    log.error("异步消息发送失败", e);
                    // 异步发送失败后的处理逻辑
                }
            });
            return "异步消息发送请求已提交";
        } catch (MQClientException | RemotingException | InterruptedException e) {
            log.error("异步消息发送请求提交失败", e);
            return "异步消息发送请求提交失败:" + e.getMessage();
        }
    }
 
    /**
     * 发送单向消息
     *
     * @param order 订单信息
     * @return 发送结果
     */
    @PostMapping("/oneway")
    @Operation(summary = "发送单向消息", description = "发送创建订单的单向消息")
    public String sendOnewayMessage(
            @Parameter(description = "订单信息") @RequestBody Order order) {
        try {
            rocketMQSendUtil.sendOnewayMessage(ORDER_TOPIC, CREATE_ORDER_TAG, order);
            return "单向消息发送成功";
        } catch (MQClientException | RemotingException | InterruptedException e) {
            log.error("单向消息发送失败", e);
            return "单向消息发送失败:" + e.getMessage();
        }
    }
}
消息消费服务


package com.ken.rocketmq.service;
 
import com.ken.rocketmq.entity.Order;
import com.ken.rocketmq.entity.RocketMQMessage;
import com.ken.rocketmq.util.RocketMQConsumeUtil;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
 
/**
 * 普通消息消费服务
 *
 * @author ken
 */
@Service
@Slf4j
public class MessageConsumeService {
 
    @Autowired
    private RocketMQConsumeUtil rocketMQConsumeUtil;
 
    /**
     * 消息主题
     */
    private static final String ORDER_TOPIC = "order_topic";
 
    /**
     * 消息标签
     */
    private static final String CREATE_ORDER_TAG = "create_order";
 
    /**
     * 初始化消息订阅
     */
    @PostConstruct
    public void init() {
        try {
            // 订阅订单创建消息
            subscribeCreateOrderMessage();
        } catch (MQClientException e) {
            log.error("消息订阅初始化失败", e);
            throw new RuntimeException("消息订阅初始化失败", e);
        }
    }
 
    /**
     * 订阅订单创建消息
     *
     * @throws MQClientException 客户端异常
     */
    private void subscribeCreateOrderMessage() throws MQClientException {
        rocketMQConsumeUtil.subscribe(ORDER_TOPIC, CREATE_ORDER_TAG, Order.class, this::handleCreateOrderMessage);
    }
 
    /**
     * 处理订单创建消息
     *
     * @param message 订单消息
     */
    private void handleCreateOrderMessage(RocketMQMessage<Order> message) {
        log.info("开始处理订单创建消息,messageId: {},businessId: {}", 
                message.getMessageId(), message.getBusinessId());
        
        Order order = message.getContent();
        if (order == null) {
            log.error("订单信息为空,messageId: {}", message.getMessageId());
            return;
        }
        
        try {
            // 处理订单逻辑,例如:更新订单状态、通知用户等
            log.info("处理订单,订单号: {},用户ID: {},金额: {}", 
                    order.getOrderNo(), order.getUserId(), order.getAmount());
            
            // TODO: 实际业务处理逻辑
            
            log.info("订单消息处理完成,messageId: {},订单号: {}", 
                    message.getMessageId(), order.getOrderNo());
        } catch (Exception e) {
            log.error("订单消息处理异常,messageId: {},订单号: {}", 
                    message.getMessageId(), order.getOrderNo(), e);
            // 抛出异常会触发消息重试
            throw new RuntimeException("订单消息处理异常", e);
        }
    }
}

2. 顺序消息实现



package com.ken.rocketmq.service;
 
import com.ken.rocketmq.entity.Order;
import com.ken.rocketmq.util.RocketMQConsumeUtil;
import com.ken.rocketmq.util.RocketMQSendUtil;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
 
/**
 * 顺序消息服务
 * 处理订单状态变更的顺序消息
 *
 * @author ken
 */
@RestController
@RequestMapping("/order/sequence")
@Tag(name = "顺序消息接口", description = "提供订单状态变更的顺序消息发送和消费功能")
@Slf4j
public class OrderSequenceMessageService {
 
    @Autowired
    private RocketMQSendUtil rocketMQSendUtil;
 
    @Autowired
    private RocketMQConsumeUtil rocketMQConsumeUtil;
 
    /**
     * 消息主题
     */
    private static final String ORDER_STATUS_TOPIC = "order_status_topic";
 
    /**
     * 消息标签
     */
    private static final String STATUS_CHANGE_TAG = "status_change";
 
    /**
     * 初始化顺序消息订阅
     */
    @PostConstruct
    public void init() {
        try {
            // 订阅订单状态变更的顺序消息
            subscribeOrderStatusMessage();
        } catch (MQClientException e) {
            log.error("顺序消息订阅初始化失败", e);
            throw new RuntimeException("顺序消息订阅初始化失败", e);
        }
    }
 
    /**
     * 发送订单状态变更的顺序消息
     *
     * @param order 订单信息
     * @return 发送结果
     */
    @PostMapping("/send")
    @Operation(summary = "发送顺序消息", description = "发送订单状态变更的顺序消息")
    public String sendOrderStatusMessage(
            @Parameter(description = "订单信息") @RequestBody Order order) {
        try {
            // 使用订单号作为分片键,确保同一个订单的消息发送到同一个队列
            String shardingKey = order.getOrderNo();
            
            SendResult sendResult = rocketMQSendUtil.sendOrderlyMessage(
                    ORDER_STATUS_TOPIC, STATUS_CHANGE_TAG, order, shardingKey);
            
            return "顺序消息发送成功,messageId: " + sendResult.getMsgId() + 
                    ",队列ID: " + sendResult.getMessageQueue().getQueueId();
        } catch (MQClientException | MQBrokerException | RemotingException | InterruptedException e) {
            log.error("顺序消息发送失败", e);
            return "顺序消息发送失败:" + e.getMessage();
        }
    }
 
    /**
     * 订阅订单状态变更消息
     *
     * @throws MQClientException 客户端异常
     */
    private void subscribeOrderStatusMessage() throws MQClientException {
        rocketMQConsumeUtil.subscribeOrderly(ORDER_STATUS_TOPIC, STATUS_CHANGE_TAG, Order.class, 
                this::handleOrderStatusMessage);
    }
 
    /**
     * 处理订单状态变更消息
     *
     * @param message 订单消息
     */
    private void handleOrderStatusMessage(RocketMQMessage<Order> message) {
        log.info("开始处理订单状态消息,messageId: {},businessId: {}", 
                message.getMessageId(), message.getBusinessId());
        
        Order order = message.getContent();
        if (order == null) {
            log.error("订单信息为空,messageId: {}", message.getMessageId());
            return;
        }
        
        try {
            // 处理订单状态变更逻辑
            log.info("处理订单状态变更,订单号: {},当前状态: {},用户ID: {},金额: {}", 
                    order.getOrderNo(), order.getStatus(), order.getUserId(), order.getAmount());
            
            // TODO: 实际业务处理逻辑,如更新订单状态、记录状态变更日志等
            
            log.info("订单状态消息处理完成,messageId: {},订单号: {}", 
                    message.getMessageId(), order.getOrderNo());
        } catch (Exception e) {
            log.error("订单状态消息处理异常,messageId: {},订单号: {}", 
                    message.getMessageId(), order.getOrderNo(), e);
            // 抛出异常会触发消息重试
            throw new RuntimeException("订单状态消息处理异常", e);
        }
    }
}

3. 事务消息处理



package com.ken.rocketmq.service;
 
import com.ken.rocketmq.entity.Order;
import com.ken.rocketmq.util.RocketMQSendUtil;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
 
import java.util.concurrent.ExecutionException;
 
/**
 * 事务消息服务
 * 处理创建订单的事务消息
 *
 * @author ken
 */
@RestController
@RequestMapping("/order/transaction")
@Tag(name = "事务消息接口", description = "提供创建订单的事务消息发送功能")
@Slf4j
public class OrderTransactionMessageService {
 
    @Autowired
    private RocketMQSendUtil rocketMQSendUtil;
 
    /**
     * 消息主题
     */
    private static final String ORDER_TRANSACTION_TOPIC = "order_transaction_topic";
 
    /**
     * 消息标签
     */
    private static final String CREATE_ORDER_TAG = "create_order";
 
    /**
     * 发送创建订单的事务消息
     *
     * @param order 订单信息
     * @return 发送结果
     */
    @PostMapping("/send")
    @Operation(summary = "发送事务消息", description = "发送创建订单的事务消息")
    public String sendCreateOrderTransactionMessage(
            @Parameter(description = "订单信息") @RequestBody Order order) {
        try {
            // 事务参数,用于在TransactionListener中判断事务执行结果
            // 实际应用中可以传递业务关键信息
            String transactionArg = "success"; // success表示成功,fail表示失败
            
            SendResult sendResult = rocketMQSendUtil.sendTransactionMessage(
                    ORDER_TRANSACTION_TOPIC, CREATE_ORDER_TAG, order, transactionArg);
            
            return "事务消息发送成功,messageId: " + sendResult.getMsgId() + 
                    ",状态: " + sendResult.getSendStatus();
        } catch (MQClientException | RemotingException | InterruptedException | ExecutionException e) {
            log.error("事务消息发送失败", e);
            return "事务消息发送失败:" + e.getMessage();
        }
    }
}

4. 延迟消息应用



package com.ken.rocketmq.service;
 
import com.ken.rocketmq.entity.Order;
import com.ken.rocketmq.util.RocketMQConsumeUtil;
import com.ken.rocketmq.util.RocketMQSendUtil;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
 
/**
 * 延迟消息服务
 * 处理订单超时取消的延迟消息
 *
 * @author ken
 */
@RestController
@RequestMapping("/order/delay")
@Tag(name = "延迟消息接口", description = "提供订单超时取消的延迟消息发送和消费功能")
@Slf4j
public class OrderDelayMessageService {
 
    @Autowired
    private RocketMQSendUtil rocketMQSendUtil;
 
    @Autowired
    private RocketMQConsumeUtil rocketMQConsumeUtil;
 
    /**
     * 消息主题
     */
    private static final String ORDER_DELAY_TOPIC = "order_delay_topic";
 
    /**
     * 消息标签
     */
    private static final String CANCEL_ORDER_TAG = "cancel_order";
 
    /**
     * RocketMQ延迟级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
     */
    private static final int[] DELAY_LEVELS = {1, 5, 10, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200};
 
    /**
     * 初始化延迟消息订阅
     */
    @PostConstruct
    public void init() {
        try {
            // 订阅订单超时取消的延迟消息
            subscribeOrderCancelMessage();
        } catch (MQClientException e) {
            log.error("延迟消息订阅初始化失败", e);
            throw new RuntimeException("延迟消息订阅初始化失败", e);
        }
    }
 
    /**
     * 发送订单超时取消的延迟消息
     *
     * @param order      订单信息
     * @param delayLevel 延迟级别(1-18)
     * @return 发送结果
     */
    @PostMapping("/send")
    @Operation(summary = "发送延迟消息", description = "发送订单超时取消的延迟消息")
    public String sendOrderCancelDelayMessage(
            @Parameter(description = "订单信息") @RequestBody Order order,
            @Parameter(description = "延迟级别(1-18),对应延迟时间:1s 5s 10s 30s 1m ... 2h") 
            @RequestParam int delayLevel) {
        if (delayLevel < 1 || delayLevel > 18) {
            return "延迟级别必须在1-18之间";
        }
        
        try {
            SendResult sendResult = rocketMQSendUtil.sendSyncMessage(
                    ORDER_DELAY_TOPIC, CANCEL_ORDER_TAG, order, order.getOrderNo(), 0, delayLevel);
            
            int delayTime = DELAY_LEVELS[delayLevel - 1];
            String delayUnit = delayTime < 60 ? "秒" : "分钟";
            int displayTime = delayTime < 60 ? delayTime : delayTime / 60;
            
            return String.format("延迟消息发送成功,messageId: %s,订单号: %s,将在%d%s后处理",
                    sendResult.getMsgId(), order.getOrderNo(), displayTime, delayUnit);
        } catch (MQClientException | MQBrokerException | RemotingException | InterruptedException e) {
            log.error("延迟消息发送失败", e);
            return "延迟消息发送失败:" + e.getMessage();
        }
    }
 
    /**
     * 订阅订单超时取消消息
     *
     * @throws MQClientException 客户端异常
     */
    private void subscribeOrderCancelMessage() throws MQClientException {
        rocketMQConsumeUtil.subscribe(ORDER_DELAY_TOPIC, CANCEL_ORDER_TAG, Order.class, 
                this::handleOrderCancelMessage);
    }
 
    /**
     * 处理订单超时取消消息
     *
     * @param message 订单消息
     */
    private void handleOrderCancelMessage(RocketMQMessage<Order> message) {
        log.info("开始处理订单超时取消消息,messageId: {},businessId: {}", 
                message.getMessageId(), message.getBusinessId());
        
        Order order = message.getContent();
        if (order == null) {
            log.error("订单信息为空,messageId: {}", message.getMessageId());
            return;
        }
        
        try {
            // 处理订单超时取消逻辑
            log.info("处理订单超时取消,订单号: {},用户ID: {},当前状态: {}", 
                    order.getOrderNo(), order.getUserId(), order.getStatus());
            
            // TODO: 实际业务处理逻辑,如检查订单状态,如果仍未支付则取消订单
            
            log.info("订单超时取消消息处理完成,messageId: {},订单号: {}", 
                    message.getMessageId(), order.getOrderNo());
        } catch (Exception e) {
            log.error("订单超时取消消息处理异常,messageId: {},订单号: {}", 
                    message.getMessageId(), order.getOrderNo(), e);
            // 抛出异常会触发消息重试
            throw new RuntimeException("订单超时取消消息处理异常", e);
        }
    }
}

数据库表设计

为了配合上面的实战案例,我们需要设计相关的数据库表:



-- 创建数据库
CREATE DATABASE IF NOT EXISTS rocketmq_demo CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
 
-- 使用数据库
USE rocketmq_demo;
 
-- 订单表
CREATE TABLE IF NOT EXISTS `order` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `order_no` varchar(64) NOT NULL COMMENT '订单编号',
  `user_id` bigint NOT NULL COMMENT '用户ID',
  `amount` decimal(10,2) NOT NULL COMMENT '订单金额',
  `status` tinyint NOT NULL COMMENT '订单状态:0-创建中,1-已创建,2-已支付,3-已取消,4-已完成',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_order_no` (`order_no`),
  KEY `idx_user_id` (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单表';
 
-- 订单状态变更日志表
CREATE TABLE IF NOT EXISTS `order_status_log` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `order_no` varchar(64) NOT NULL COMMENT '订单编号',
  `before_status` tinyint NOT NULL COMMENT '变更前状态',
  `after_status` tinyint NOT NULL COMMENT '变更后状态',
  `operate_user` varchar(64) NOT NULL COMMENT '操作人',
  `operate_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '操作时间',
  `remark` varchar(255) DEFAULT NULL COMMENT '备注',
  PRIMARY KEY (`id`),
  KEY `idx_order_no` (`order_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单状态变更日志表';
 
-- 事务消息状态表
CREATE TABLE IF NOT EXISTS `transaction_message` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `transaction_id` varchar(64) NOT NULL COMMENT '事务ID',
  `message_id` varchar(64) NOT NULL COMMENT '消息ID',
  `business_id` varchar(64) NOT NULL COMMENT '业务ID',
  `topic` varchar(128) NOT NULL COMMENT '消息主题',
  `tag` varchar(64) DEFAULT NULL COMMENT '消息标签',
  `status` tinyint NOT NULL COMMENT '状态:0-处理中,1-已提交,2-已回滚',
  `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`),
  UNIQUE KEY `uk_transaction_id` (`transaction_id`),
  KEY `idx_business_id` (`business_id`),
  KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='事务消息状态表';

项目启动类



package com.ken.rocketmq;
 
import io.swagger.v3.oas.annotations.OpenAPIDefinition;
import io.swagger.v3.oas.annotations.info.Info;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
 
/**
 * RocketMQ实战示例项目启动类
 *
 * @author ken
 */
@SpringBootApplication
@MapperScan("com.ken.rocketmq.mapper")
@OpenAPIDefinition(info = @Info(title = "RocketMQ实战API", version = "1.0", description = "RocketMQ实战示例接口文档"))
public class RocketmqDemoApplication {
 
    public static void main(String[] args) {
        SpringApplication.run(RocketmqDemoApplication.class, args);
    }
}

总结

本文详细介绍了 RocketMQ 的实战应用,从核心架构原理到具体的配置类和工具类实现,再到各类消息的实战案例,全面覆盖了 RocketMQ 的主要使用场景。

© 版权声明

相关文章

暂无评论

none
暂无评论...