RocketMQ 实战手册:解锁分布式消息队列核心技能
引言:为什么选择 RocketMQ?
在分布式系统架构中,消息队列扮演着至关重要的角色,它不仅能实现系统解耦、流量削峰,还能保障数据最终一致性。Apache RocketMQ 作为阿里开源的分布式消息中间件,凭借其高吞吐、低延迟、高可靠等特性,已成为金融、电商等核心业务场景的首选。
本文将带你从零开始,深入 RocketMQ 实战,涵盖所有核心配置类与工具类,通过可直接运行的实例,让你快速掌握 RocketMQ 的实战技能。无论你是刚接触消息队列的开发者,还是需要优化现有 RocketMQ 应用的工程师,本文都能为你提供全面且实用的指导。
RocketMQ 核心架构与原理
基本架构
RocketMQ 的核心架构由四部分组成:NameServer、Broker、Producer 和 Consumer。

NameServer:轻量级路由注册中心,管理 Broker 节点信息,提供路由发现服务Broker:消息存储和转发服务器,负责消息的存储、投递和查询等功能Producer:消息生产者,负责创建和发送消息Consumer:消息消费者,负责订阅和消费消息
消息传递流程

生产者启动时向 NameServer 注册,并获取 Topic 对应的 Broker 路由信息生产者根据路由信息将消息发送到对应的 BrokerBroker 将消息存储在磁盘中,并同步到从节点(主从架构时)消费者启动时向 NameServer 注册,并订阅指定 Topic消费者通过拉取或推送方式从 Broker 获取消息消费者处理消息后向 Broker 返回消费结果(ACK)Broker 根据消费结果更新消息状态
环境准备与项目初始化
安装 RocketMQ
下载最新稳定版 RocketMQ(本文使用 5.2.0 版本)
wget https://archive.apache.org/dist/rocketmq/5.2.0/rocketmq-all-5.2.0-bin-release.zip
unzip rocketmq-all-5.2.0-bin-release.zip
cd rocketmq-all-5.2.0-bin-release
启动 NameServer
nohup sh bin/mqnamesrv &
启动 Broker
nohup sh bin/mqbroker -n localhost:9876 &
验证启动是否成功
jps
# 应看到NamesrvStartup和BrokerStartup进程
创建 Maven 项目
创建 Spring Boot 项目,配置如下:
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.0</version>
<relativePath/>
</parent>
<groupId>com.ken.rocketmq</groupId>
<artifactId>rocketmq-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rocketmq-demo</name>
<description>RocketMQ实战示例项目</description>
<properties>
<java.version>17</java.version>
<rocketmq.version>5.2.0</rocketmq.version>
<fastjson2.version>2.0.45</fastjson2.version>
<mybatis-plus.version>3.5.5</mybatis-plus.version>
<lombok.version>1.18.30</lombok.version>
<springdoc.version>2.1.0</springdoc.version>
<guava.version>33.1.0-jre</guava.version>
</properties>
<dependencies>
<!-- Spring Boot核心 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- RocketMQ -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<!-- Fastjson2 -->
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>${fastjson2.version}</version>
</dependency>
<!-- MyBatis-Plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<!-- MySQL驱动 -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<!-- Swagger3 -->
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>${springdoc.version}</version>
</dependency>
<!-- Guava -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<!-- 测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
配置文件
配置:
application.yml
spring:
application:
name: rocketmq-demo
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/rocketmq_demo?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai
username: root
password: root
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: demo-producer-group
send-message-timeout: 3000
retry-times-when-send-failed: 2
retry-times-when-send-async-failed: 2
consumer:
group: demo-consumer-group
consume-thread-min: 20
consume-thread-max: 64
consume-message-batch-max-size: 1
mybatis-plus:
mapper-locations: classpath*:mapper/**/*.xml
type-aliases-package: com.ken.rocketmq.entity
configuration:
map-underscore-to-camel-case: true
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
springdoc:
api-docs:
path: /api-docs
swagger-ui:
path: /swagger-ui.html
operationsSorter: method
server:
port: 8080
核心配置类
生产者配置类
package com.ken.rocketmq.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;
/**
* RocketMQ生产者配置类
* 配置并初始化DefaultMQProducer实例
*
* @author ken
*/
@Configuration
@Slf4j
public class RocketMQProducerConfig {
@Value("${rocketmq.name-server}")
private String nameServer;
@Value("${rocketmq.producer.group}")
private String producerGroup;
@Value("${rocketmq.producer.send-message-timeout}")
private int sendMessageTimeout;
@Value("${rocketmq.producer.retry-times-when-send-failed}")
private int retryTimesWhenSendFailed;
/**
* 创建默认的消息生产者
*
* @return DefaultMQProducer实例
* @throws MQClientException 初始化生产者异常
*/
@Bean
public DefaultMQProducer defaultMQProducer() throws MQClientException {
// 验证生产者组配置
StringUtils.hasText(producerGroup, "生产者组不能为空");
StringUtils.hasText(nameServer, "NameServer地址不能为空");
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
// 配置NameServer地址
producer.setNamesrvAddr(nameServer);
// 设置发送超时时间
producer.setSendMsgTimeout(sendMessageTimeout);
// 设置同步发送失败重试次数
producer.setRetryTimesWhenSendFailed(retryTimesWhenSendFailed);
// 启动生产者
producer.start();
log.info("DefaultMQProducer初始化成功,生产者组:{},NameServer:{}", producerGroup, nameServer);
return producer;
}
}
消费者配置类
package com.ken.rocketmq.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;
/**
* RocketMQ消费者配置类
* 配置并初始化DefaultMQPushConsumer实例
*
* @author ken
*/
@Configuration
@Slf4j
public class RocketMQConsumerConfig {
@Value("${rocketmq.name-server}")
private String nameServer;
@Value("${rocketmq.consumer.group}")
private String consumerGroup;
@Value("${rocketmq.consumer.consume-thread-min}")
private int consumeThreadMin;
@Value("${rocketmq.consumer.consume-thread-max}")
private int consumeThreadMax;
@Value("${rocketmq.consumer.consume-message-batch-max-size}")
private int consumeMessageBatchMaxSize;
/**
* 创建默认的推模式消费者
*
* @return DefaultMQPushConsumer实例
* @throws MQClientException 初始化消费者异常
*/
@Bean
public DefaultMQPushConsumer defaultMQPushConsumer() throws MQClientException {
// 验证消费者组配置
StringUtils.hasText(consumerGroup, "消费者组不能为空");
StringUtils.hasText(nameServer, "NameServer地址不能为空");
// 创建消费者实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
// 配置NameServer地址
consumer.setNamesrvAddr(nameServer);
// 设置消费线程池最小线程数
consumer.setConsumeThreadMin(consumeThreadMin);
// 设置消费线程池最大线程数
consumer.setConsumeThreadMax(consumeThreadMax);
// 设置每次拉取的最大消息数
consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize);
// 注册消息监听器,实际使用时在具体业务类中实现
// consumer.registerMessageListener(new MessageListenerConcurrently() {...});
// 启动消费者
consumer.start();
log.info("DefaultMQPushConsumer初始化成功,消费者组:{},NameServer:{}", consumerGroup, nameServer);
return consumer;
}
}
事务消息配置类
package com.ken.rocketmq.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.StringUtils;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* RocketMQ事务消息配置类
* 配置并初始化TransactionMQProducer实例
*
* @author ken
*/
@Configuration
@Slf4j
public class RocketMQTransactionConfig {
@Value("${rocketmq.name-server}")
private String nameServer;
@Value("${rocketmq.producer.group}")
private String producerGroup;
/**
* 事务消息生产者线程池核心线程数
*/
private static final int CORE_POOL_SIZE = 5;
/**
* 事务消息生产者线程池最大线程数
*/
private static final int MAX_POOL_SIZE = 10;
/**
* 事务消息生产者线程池队列大小
*/
private static final int QUEUE_CAPACITY = 100;
/**
* 事务消息生产者线程池空闲线程存活时间
*/
private static final int KEEP_ALIVE_TIME = 100;
/**
* 创建事务消息生产者
*
* @param transactionListener 事务监听器
* @return TransactionMQProducer实例
* @throws MQClientException 初始化事务生产者异常
*/
@Bean
public TransactionMQProducer transactionMQProducer(TransactionListener transactionListener) throws MQClientException {
// 验证配置
StringUtils.hasText(producerGroup, "生产者组不能为空");
StringUtils.hasText(nameServer, "NameServer地址不能为空");
// 创建事务消息生产者实例
TransactionMQProducer producer = new TransactionMQProducer(producerGroup + "-transaction");
// 配置NameServer地址
producer.setNamesrvAddr(nameServer);
// 创建线程池用于处理事务消息的本地事务和回查
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE,
MAX_POOL_SIZE,
KEEP_ALIVE_TIME,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 设置线程池
producer.setExecutorService(threadPoolExecutor);
// 设置事务监听器
producer.setTransactionListener(transactionListener);
// 启动事务消息生产者
producer.start();
log.info("TransactionMQProducer初始化成功,生产者组:{},NameServer:{}",
producerGroup + "-transaction", nameServer);
return producer;
}
}
消息追踪配置类
package com.ken.rocketmq.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.ObjectUtils;
/**
* RocketMQ消息追踪配置类
* 配置消息发送和消费的追踪功能
*
* @author ken
*/
@Configuration
@AutoConfigureAfter(RocketMQAutoConfiguration.class)
@Slf4j
public class RocketMQTraceConfig {
/**
* 配置消息追踪增强器
*
* @param rocketMQTemplate RocketMQ模板
* @return 消息追踪增强器
*/
@Bean
public RocketMQTraceEnhancer rocketMQTraceEnhancer(RocketMQTemplate rocketMQTemplate) {
if (ObjectUtils.isEmpty(rocketMQTemplate)) {
log.error("RocketMQTemplate为空,无法初始化消息追踪增强器");
throw new IllegalArgumentException("RocketMQTemplate不能为空");
}
RocketMQTraceEnhancer enhancer = new RocketMQTraceEnhancer(rocketMQTemplate);
log.info("RocketMQ消息追踪增强器初始化成功");
return enhancer;
}
/**
* RocketMQ消息追踪增强器
* 用于增强消息发送和消费的追踪能力
*/
public static class RocketMQTraceEnhancer {
private final RocketMQTemplate rocketMQTemplate;
public RocketMQTraceEnhancer(RocketMQTemplate rocketMQTemplate) {
this.rocketMQTemplate = rocketMQTemplate;
// 初始化追踪配置
initTraceConfig();
}
/**
* 初始化追踪配置
*/
private void initTraceConfig() {
// 开启消息发送追踪
rocketMQTemplate.getProducer().setEnableMsgTrace(true);
// 设置消息追踪的Topic
rocketMQTemplate.getProducer().setCustomizedTraceTopic("RMQ_SYS_TRACE_TOPIC");
log.info("RocketMQ消息追踪配置初始化完成");
}
}
}
核心工具类
消息实体类
首先定义通用的消息实体类,作为消息传递的载体:
package com.ken.rocketmq.entity;
import com.alibaba.fastjson2.annotation.JSONField;
import lombok.Data;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* 通用消息实体类
* 封装RocketMQ消息的基本属性
*
* @author ken
*/
@Data
public class RocketMQMessage<T> implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 消息ID,唯一标识
*/
private String messageId;
/**
* 业务ID,用于业务追踪
*/
private String businessId;
/**
* 消息主题
*/
private String topic;
/**
* 消息标签
*/
private String tag;
/**
* 消息内容
*/
private T content;
/**
* 消息发送时间
*/
@JSONField(format = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime sendTime;
/**
* 消息过期时间(毫秒),0表示不过期
*/
private long expireTime = 0;
/**
* 延迟级别,0表示不延迟
* RocketMQ延迟级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
*/
private int delayLevel = 0;
}
消息发送工具类
package com.ken.rocketmq.util;
import com.alibaba.fastjson2.JSON;
import com.ken.rocketmq.entity.RocketMQMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
/**
* RocketMQ消息发送工具类
* 封装各种类型消息的发送方法
*
* @author ken
*/
@Component
@Slf4j
public class RocketMQSendUtil {
@Autowired
private DefaultMQProducer defaultMQProducer;
@Autowired
private TransactionMQProducer transactionMQProducer;
/**
* 发送同步消息
*
* @param topic 消息主题
* @param tag 消息标签
* @param content 消息内容
* @param <T> 消息内容类型
* @return 发送结果
* @throws MQClientException 客户端异常
* @throws MQBrokerException Broker异常
* @throws RemotingException 远程调用异常
* @throws InterruptedException 线程中断异常
*/
public <T> SendResult sendSyncMessage(String topic, String tag, T content)
throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
return sendSyncMessage(buildMessage(topic, tag, content, null, 0, 0));
}
/**
* 发送同步消息
*
* @param topic 消息主题
* @param tag 消息标签
* @param content 消息内容
* @param businessId 业务ID
* @param expireTime 过期时间(毫秒)
* @param delayLevel 延迟级别
* @param <T> 消息内容类型
* @return 发送结果
* @throws MQClientException 客户端异常
* @throws MQBrokerException Broker异常
* @throws RemotingException 远程调用异常
* @throws InterruptedException 线程中断异常
*/
public <T> SendResult sendSyncMessage(String topic, String tag, T content, String businessId,
long expireTime, int delayLevel)
throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
return sendSyncMessage(buildMessage(topic, tag, content, businessId, expireTime, delayLevel));
}
/**
* 发送同步消息
*
* @param message 消息对象
* @param <T> 消息内容类型
* @return 发送结果
* @throws MQClientException 客户端异常
* @throws MQBrokerException Broker异常
* @throws RemotingException 远程调用异常
* @throws InterruptedException 线程中断异常
*/
public <T> SendResult sendSyncMessage(RocketMQMessage<T> message)
throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
validateMessage(message);
Message rocketMsg = convertToRocketMessage(message);
log.info("发送同步消息,messageId: {}", message.getMessageId());
SendResult sendResult = defaultMQProducer.send(rocketMsg);
log.info("同步消息发送完成,messageId: {},发送结果: {}", message.getMessageId(), sendResult.getSendStatus());
return sendResult;
}
/**
* 发送异步消息
*
* @param topic 消息主题
* @param tag 消息标签
* @param content 消息内容
* @param callback 回调函数
* @param <T> 消息内容类型
* @throws MQClientException 客户端异常
* @throws RemotingException 远程调用异常
* @throws InterruptedException 线程中断异常
*/
public <T> void sendAsyncMessage(String topic, String tag, T content, SendCallback callback)
throws MQClientException, RemotingException, InterruptedException {
sendAsyncMessage(buildMessage(topic, tag, content, null, 0, 0), callback);
}
/**
* 发送异步消息
*
* @param message 消息对象
* @param callback 回调函数
* @param <T> 消息内容类型
* @throws MQClientException 客户端异常
* @throws RemotingException 远程调用异常
* @throws InterruptedException 线程中断异常
*/
public <T> void sendAsyncMessage(RocketMQMessage<T> message, SendCallback callback)
throws MQClientException, RemotingException, InterruptedException {
validateMessage(message);
Message rocketMsg = convertToRocketMessage(message);
log.info("发送异步消息,messageId: {}", message.getMessageId());
defaultMQProducer.send(rocketMsg, callback);
log.info("异步消息发送请求已提交,messageId: {}", message.getMessageId());
}
/**
* 发送单向消息(只负责发送消息,不等待服务器回应,没有回调函数)
*
* @param topic 消息主题
* @param tag 消息标签
* @param content 消息内容
* @param <T> 消息内容类型
* @throws MQClientException 客户端异常
* @throws RemotingException 远程调用异常
* @throws InterruptedException 线程中断异常
*/
public <T> void sendOnewayMessage(String topic, String tag, T content)
throws MQClientException, RemotingException, InterruptedException {
sendOnewayMessage(buildMessage(topic, tag, content, null, 0, 0));
}
/**
* 发送单向消息
*
* @param message 消息对象
* @param <T> 消息内容类型
* @throws MQClientException 客户端异常
* @throws RemotingException 远程调用异常
* @throws InterruptedException 线程中断异常
*/
public <T> void sendOnewayMessage(RocketMQMessage<T> message)
throws MQClientException, RemotingException, InterruptedException {
validateMessage(message);
Message rocketMsg = convertToRocketMessage(message);
log.info("发送单向消息,messageId: {}", message.getMessageId());
defaultMQProducer.sendOneway(rocketMsg);
log.info("单向消息发送完成,messageId: {}", message.getMessageId());
}
/**
* 发送顺序消息
*
* @param topic 消息主题
* @param tag 消息标签
* @param content 消息内容
* @param shardingKey 用于确定消息队列的分片键,相同键的消息会被发送到同一个队列
* @param <T> 消息内容类型
* @return 发送结果
* @throws MQClientException 客户端异常
* @throws MQBrokerException Broker异常
* @throws RemotingException 远程调用异常
* @throws InterruptedException 线程中断异常
*/
public <T> SendResult sendOrderlyMessage(String topic, String tag, T content, String shardingKey)
throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
RocketMQMessage<T> message = buildMessage(topic, tag, content, null, 0, 0);
return sendOrderlyMessage(message, shardingKey);
}
/**
* 发送顺序消息
*
* @param message 消息对象
* @param shardingKey 用于确定消息队列的分片键
* @param <T> 消息内容类型
* @return 发送结果
* @throws MQClientException 客户端异常
* @throws MQBrokerException Broker异常
* @throws RemotingException 远程调用异常
* @throws InterruptedException 线程中断异常
*/
public <T> SendResult sendOrderlyMessage(RocketMQMessage<T> message, String shardingKey)
throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
validateMessage(message);
StringUtils.hasText(shardingKey, "分片键不能为空");
Message rocketMsg = convertToRocketMessage(message);
log.info("发送顺序消息,messageId: {},shardingKey: {}", message.getMessageId(), shardingKey);
// 根据shardingKey选择消息队列
SendResult sendResult = defaultMQProducer.send(rocketMsg, (mqs, msg, arg) -> {
String key = (String) arg;
int index = key.hashCode() % mqs.size();
return mqs.get(index);
}, shardingKey);
log.info("顺序消息发送完成,messageId: {},发送结果: {}", message.getMessageId(), sendResult.getSendStatus());
return sendResult;
}
/**
* 发送事务消息
*
* @param topic 消息主题
* @param tag 消息标签
* @param content 消息内容
* @param arg 事务参数,将传递给TransactionListener
* @param <T> 消息内容类型
* @return 发送结果
* @throws MQClientException 客户端异常
* @throws RemotingException 远程调用异常
* @throws InterruptedException 线程中断异常
* @throws ExecutionException 执行异常
*/
public <T> SendResult sendTransactionMessage(String topic, String tag, T content, Object arg)
throws MQClientException, RemotingException, InterruptedException, ExecutionException {
RocketMQMessage<T> message = buildMessage(topic, tag, content, null, 0, 0);
return sendTransactionMessage(message, arg);
}
/**
* 发送事务消息
*
* @param message 消息对象
* @param arg 事务参数
* @param <T> 消息内容类型
* @return 发送结果
* @throws MQClientException 客户端异常
* @throws RemotingException 远程调用异常
* @throws InterruptedException 线程中断异常
* @throws ExecutionException 执行异常
*/
public <T> SendResult sendTransactionMessage(RocketMQMessage<T> message, Object arg)
throws MQClientException, RemotingException, InterruptedException, ExecutionException {
validateMessage(message);
Message rocketMsg = convertToRocketMessage(message);
log.info("发送事务消息,messageId: {}", message.getMessageId());
SendResult sendResult = transactionMQProducer.sendMessageInTransaction(rocketMsg, arg);
log.info("事务消息发送完成,messageId: {},发送结果: {}", message.getMessageId(), sendResult.getSendStatus());
return sendResult;
}
/**
* 构建消息对象
*
* @param topic 消息主题
* @param tag 消息标签
* @param content 消息内容
* @param businessId 业务ID
* @param expireTime 过期时间
* @param delayLevel 延迟级别
* @param <T> 消息内容类型
* @return 消息对象
*/
private <T> RocketMQMessage<T> buildMessage(String topic, String tag, T content,
String businessId, long expireTime, int delayLevel) {
StringUtils.hasText(topic, "消息主题不能为空");
RocketMQMessage<T> message = new RocketMQMessage<>();
message.setMessageId(UUID.randomUUID().toString().replaceAll("-", ""));
message.setBusinessId(StringUtils.hasText(businessId) ? businessId : UUID.randomUUID().toString());
message.setTopic(topic);
message.setTag(tag);
message.setContent(content);
message.setSendTime(LocalDateTime.now());
message.setExpireTime(expireTime);
message.setDelayLevel(delayLevel);
return message;
}
/**
* 验证消息对象
*
* @param message 消息对象
* @param <T> 消息内容类型
*/
private <T> void validateMessage(RocketMQMessage<T> message) {
if (ObjectUtils.isEmpty(message)) {
throw new IllegalArgumentException("消息对象不能为空");
}
StringUtils.hasText(message.getTopic(), "消息主题不能为空");
if (ObjectUtils.isEmpty(message.getContent())) {
throw new IllegalArgumentException("消息内容不能为空");
}
StringUtils.hasText(message.getMessageId(), "消息ID不能为空");
}
/**
* 将自定义消息对象转换为RocketMQ的Message对象
*
* @param message 自定义消息对象
* @param <T> 消息内容类型
* @return RocketMQ的Message对象
*/
private <T> Message convertToRocketMessage(RocketMQMessage<T> message) {
// 构建主题和标签
String topic = message.getTopic();
String tag = message.getTag();
String fullTopic = StringUtils.hasText(tag) ? topic + ":" + tag : topic;
// 序列化消息内容
byte[] body = JSON.toJSONBytes(message);
// 创建RocketMQ消息对象
Message rocketMsg = new Message(fullTopic, body);
// 设置消息ID
rocketMsg.setKeys(message.getMessageId());
// 设置业务ID
rocketMsg.putUserProperty("businessId", message.getBusinessId());
// 设置过期时间
if (message.getExpireTime() > 0) {
rocketMsg.setDelayTimeLevel(0);
rocketMsg.setBornTimestamp(System.currentTimeMillis() + message.getExpireTime());
}
// 设置延迟级别
if (message.getDelayLevel() > 0) {
rocketMsg.setDelayTimeLevel(message.getDelayLevel());
}
return rocketMsg;
}
}
消息消费工具类
package com.ken.rocketmq.util;
import com.alibaba.fastjson2.JSON;
import com.ken.rocketmq.entity.RocketMQMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
/**
* RocketMQ消息消费工具类
* 封装消息订阅和消费的相关方法
*
* @author ken
*/
@Component
@Slf4j
public class RocketMQConsumeUtil {
@Autowired
private DefaultMQPushConsumer defaultMQPushConsumer;
/**
* 订阅主题并设置并发消息监听器
*
* @param topic 消息主题
* @param tag 消息标签,*表示所有标签
* @param messageClass 消息内容类型
* @param consumer 消息处理函数
* @param <T> 消息内容类型
* @throws MQClientException 客户端异常
*/
public <T> void subscribe(String topic, String tag, Class<T> messageClass, Consumer<RocketMQMessage<T>> consumer)
throws MQClientException {
subscribe(topic, tag, messageClass, consumer, null);
}
/**
* 订阅主题并设置并发消息监听器
*
* @param topic 消息主题
* @param tag 消息标签
* @param messageClass 消息内容类型
* @param consumer 消息处理函数
* @param properties 订阅属性
* @param <T> 消息内容类型
* @throws MQClientException 客户端异常
*/
public <T> void subscribe(String topic, String tag, Class<T> messageClass,
Consumer<RocketMQMessage<T>> consumer, Map<String, String> properties)
throws MQClientException {
StringUtils.hasText(topic, "消息主题不能为空");
StringUtils.hasText(tag, "消息标签不能为空");
if (ObjectUtils.isEmpty(messageClass)) {
throw new IllegalArgumentException("消息内容类型不能为空");
}
if (ObjectUtils.isEmpty(consumer)) {
throw new IllegalArgumentException("消息处理函数不能为空");
}
// 设置消费起始位置
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 注册消息监听器
defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
if (CollectionUtils.isEmpty(msgs)) {
log.warn("接收到空的消息列表");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
for (MessageExt msg : msgs) {
try {
log.info("接收到消息,messageId: {},topic: {},tag: {},queueId: {},reconsumeTimes: {}",
msg.getMsgId(), msg.getTopic(), msg.getTags(), msg.getQueueId(), msg.getReconsumeTimes());
// 解析消息
RocketMQMessage<T> message = parseMessage(msg, messageClass);
if (ObjectUtils.isEmpty(message)) {
log.error("消息解析失败,messageId: {}", msg.getMsgId());
continue;
}
// 处理消息
consumer.accept(message);
} catch (Exception e) {
log.error("消息处理异常,messageId: {}", msg.getMsgId(), e);
// 如果重试次数超过3次,记录到死信队列或进行其他处理
if (msg.getReconsumeTimes() >= 3) {
log.error("消息重试次数已达上限,将不再重试,messageId: {}", msg.getMsgId());
// TODO: 死信队列处理逻辑
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
// 消息处理失败,返回重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
// 消息处理成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 订阅主题
String subExpression = StringUtils.hasText(tag) ? tag : "*";
if (ObjectUtils.isEmpty(properties)) {
defaultMQPushConsumer.subscribe(topic, subExpression);
} else {
defaultMQPushConsumer.subscribe(topic, subExpression, properties);
}
log.info("已订阅主题,topic: {},tag: {}", topic, subExpression);
}
/**
* 订阅主题并设置顺序消息监听器
*
* @param topic 消息主题
* @param tag 消息标签
* @param messageClass 消息内容类型
* @param consumer 消息处理函数
* @param <T> 消息内容类型
* @throws MQClientException 客户端异常
*/
public <T> void subscribeOrderly(String topic, String tag, Class<T> messageClass,
Consumer<RocketMQMessage<T>> consumer) throws MQClientException {
StringUtils.hasText(topic, "消息主题不能为空");
StringUtils.hasText(tag, "消息标签不能为空");
if (ObjectUtils.isEmpty(messageClass)) {
throw new IllegalArgumentException("消息内容类型不能为空");
}
if (ObjectUtils.isEmpty(consumer)) {
throw new IllegalArgumentException("消息处理函数不能为空");
}
// 设置消费起始位置
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 注册顺序消息监听器
defaultMQPushConsumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
context.setAutoCommit(true);
if (CollectionUtils.isEmpty(msgs)) {
log.warn("接收到空的消息列表");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
for (MessageExt msg : msgs) {
try {
log.info("接收到顺序消息,messageId: {},topic: {},tag: {},queueId: {},reconsumeTimes: {}",
msg.getMsgId(), msg.getTopic(), msg.getTags(), msg.getQueueId(), msg.getReconsumeTimes());
// 解析消息
RocketMQMessage<T> message = parseMessage(msg, messageClass);
if (ObjectUtils.isEmpty(message)) {
log.error("消息解析失败,messageId: {}", msg.getMsgId());
continue;
}
// 处理消息
consumer.accept(message);
} catch (Exception e) {
log.error("顺序消息处理异常,messageId: {}", msg.getMsgId(), e);
// 顺序消息处理失败,返回重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
// 消息处理成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 订阅主题
String subExpression = StringUtils.hasText(tag) ? tag : "*";
defaultMQPushConsumer.subscribe(topic, subExpression);
log.info("已订阅顺序消息主题,topic: {},tag: {}", topic, subExpression);
}
/**
* 解析消息
*
* @param msg RocketMQ消息对象
* @param messageClass 消息内容类型
* @param <T> 消息内容类型
* @return 解析后的消息对象
*/
private <T> RocketMQMessage<T> parseMessage(MessageExt msg, Class<T> messageClass) {
try {
// 从消息体中解析RocketMQMessage对象
String body = new String(msg.getBody(), StandardCharsets.UTF_8);
RocketMQMessage<T> message = JSON.parseObject(body, RocketMQMessage.class);
if (ObjectUtils.isEmpty(message)) {
log.error("消息体解析为空,messageId: {}", msg.getMsgId());
return null;
}
// 解析消息内容
if (!ObjectUtils.isEmpty(message.getContent()) && message.getContent() instanceof String) {
String contentStr = (String) message.getContent();
T content = JSON.parseObject(contentStr, messageClass);
message.setContent(content);
}
return message;
} catch (Exception e) {
log.error("消息解析异常,messageId: {}", msg.getMsgId(), e);
return null;
}
}
/**
* 取消订阅
*
* @param topic 消息主题
* @throws MQClientException 客户端异常
*/
public void unsubscribe(String topic) throws MQClientException {
StringUtils.hasText(topic, "消息主题不能为空");
defaultMQPushConsumer.unsubscribe(topic);
log.info("已取消订阅主题,topic: {}", topic);
}
}
消息查询工具类
package com.ken.rocketmq.util;
import com.alibaba.fastjson2.JSON;
import com.ken.rocketmq.entity.RocketMQMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.nio.charset.StandardCharsets;
import java.util.List;
/**
* RocketMQ消息查询工具类
* 提供消息查询相关功能
*
* @author ken
*/
@Component
@Slf4j
public class RocketMQQueryUtil {
@Autowired
private DefaultMQProducer defaultMQProducer;
/**
* 根据消息ID查询消息
*
* @param topic 消息主题
* @param messageId 消息ID
* @return 消息对象
* @throws MQClientException 客户端异常
*/
public MessageExt queryMessageByMessageId(String topic, String messageId) throws MQClientException {
StringUtils.hasText(topic, "消息主题不能为空");
StringUtils.hasText(messageId, "消息ID不能为空");
log.info("根据消息ID查询消息,topic: {},messageId: {}", topic, messageId);
// 查询消息
MessageExt messageExt = defaultMQProducer.viewMessage(topic, messageId);
if (ObjectUtils.isEmpty(messageExt)) {
log.warn("未找到消息,topic: {},messageId: {}", topic, messageId);
return null;
}
log.info("消息查询成功,topic: {},messageId: {},queueId: {},storeTime: {}",
topic, messageId, messageExt.getQueueId(), messageExt.getStoreTimestamp());
return messageExt;
}
/**
* 根据消息ID查询消息并转换为自定义消息对象
*
* @param topic 消息主题
* @param messageId 消息ID
* @param messageClass 消息内容类型
* @param <T> 消息内容类型
* @return 自定义消息对象
* @throws MQClientException 客户端异常
*/
public <T> RocketMQMessage<T> queryRocketMessageByMessageId(String topic, String messageId, Class<T> messageClass)
throws MQClientException {
MessageExt messageExt = queryMessageByMessageId(topic, messageId);
if (ObjectUtils.isEmpty(messageExt)) {
return null;
}
return parseMessage(messageExt, messageClass);
}
/**
* 根据业务ID查询消息
*
* @param topic 消息主题
* @param businessId 业务ID
* @param maxNum 最大查询数量
* @return 消息列表
* @throws MQClientException 客户端异常
*/
public List<MessageExt> queryMessageByBusinessId(String topic, String businessId, int maxNum)
throws MQClientException {
StringUtils.hasText(topic, "消息主题不能为空");
StringUtils.hasText(businessId, "业务ID不能为空");
if (maxNum <= 0 || maxNum > 32) {
throw new IllegalArgumentException("最大查询数量必须在1到32之间");
}
log.info("根据业务ID查询消息,topic: {},businessId: {},maxNum: {}", topic, businessId, maxNum);
// 根据业务ID查询消息,业务ID存储在消息的用户属性中
// 注意:RocketMQ的queryMessage方法主要用于根据key查询,这里需要遍历队列查询
// 实际应用中,建议将businessId设置为消息的key,以便更高效查询
List<MessageExt> messageExts = defaultMQProducer.queryMessage(topic, businessId, maxNum, 0, System.currentTimeMillis());
if (ObjectUtils.isEmpty(messageExts)) {
log.warn("未找到消息,topic: {},businessId: {}", topic, businessId);
return null;
}
log.info("消息查询成功,topic: {},businessId: {},找到{}条消息", topic, businessId, messageExts.size());
return messageExts;
}
/**
* 解析消息为自定义消息对象
*
* @param messageExt RocketMQ消息对象
* @param messageClass 消息内容类型
* @param <T> 消息内容类型
* @return 自定义消息对象
*/
private <T> RocketMQMessage<T> parseMessage(MessageExt messageExt, Class<T> messageClass) {
try {
String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
RocketMQMessage<T> message = JSON.parseObject(body, RocketMQMessage.class);
if (ObjectUtils.isEmpty(message)) {
log.error("消息体解析为空,messageId: {}", messageExt.getMsgId());
return null;
}
// 解析消息内容
if (!ObjectUtils.isEmpty(message.getContent()) && message.getContent() instanceof String) {
String contentStr = (String) message.getContent();
T content = JSON.parseObject(contentStr, messageClass);
message.setContent(content);
}
return message;
} catch (Exception e) {
log.error("消息解析异常,messageId: {}", messageExt.getMsgId(), e);
return null;
}
}
}
事务消息监听器
package com.ken.rocketmq.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* 事务消息监听器
* 处理本地事务执行和事务状态回查
*
* @author ken
*/
@Component
@Slf4j
public class RocketMQTransactionListener implements TransactionListener {
/**
* 存储本地事务状态,实际应用中应持久化到数据库
*/
private final ConcurrentMap<String, LocalTransactionState> transactionStateMap = new ConcurrentHashMap<>();
/**
* 执行本地事务
*
* @param msg 半事务消息
* @param arg 事务参数
* @return 本地事务状态:COMMIT_MESSAGE提交,ROLLBACK_MESSAGE回滚,UNKNOW未知状态
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String transactionId = msg.getTransactionId();
log.info("开始执行本地事务,transactionId: {},消息内容: {}", transactionId, new String(msg.getBody()));
try {
// 这里执行实际的本地事务逻辑
// 例如:数据库操作、业务逻辑处理等
// 模拟本地事务处理
if (arg != null && arg instanceof String) {
String businessKey = (String) arg;
log.info("执行本地事务,businessKey: {}", businessKey);
// 实际业务中根据处理结果返回不同状态
if ("success".equals(businessKey)) {
// 本地事务执行成功,提交消息
transactionStateMap.put(transactionId, LocalTransactionState.COMMIT_MESSAGE);
return LocalTransactionState.COMMIT_MESSAGE;
} else if ("fail".equals(businessKey)) {
// 本地事务执行失败,回滚消息
transactionStateMap.put(transactionId, LocalTransactionState.ROLLBACK_MESSAGE);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
// 未知状态,需要后续回查
transactionStateMap.put(transactionId, LocalTransactionState.UNKNOW);
return LocalTransactionState.UNKNOW;
} catch (Exception e) {
log.error("本地事务执行异常,transactionId: {}", transactionId, e);
// 本地事务执行异常,回滚消息
transactionStateMap.put(transactionId, LocalTransactionState.ROLLBACK_MESSAGE);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
/**
* 回查本地事务状态
* 当executeLocalTransaction返回UNKNOW时,Broker会定期调用此方法回查事务状态
*
* @param msg 半事务消息
* @return 本地事务状态
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String transactionId = msg.getTransactionId();
log.info("回查本地事务状态,transactionId: {},消息内容: {}", transactionId, new String(msg.getBody()));
try {
// 从存储中获取事务状态
LocalTransactionState state = transactionStateMap.get(transactionId);
if (state == null) {
// 未找到事务状态,可能是本地事务还未执行完成,返回UNKNOW继续回查
log.warn("未找到本地事务状态,transactionId: {}", transactionId);
return LocalTransactionState.UNKNOW;
}
log.info("本地事务状态回查结果,transactionId: {},状态: {}", transactionId, state);
// 如果状态是COMMIT或ROLLBACK,移除缓存的状态
if (state != LocalTransactionState.UNKNOW) {
transactionStateMap.remove(transactionId);
}
return state;
} catch (Exception e) {
log.error("本地事务状态回查异常,transactionId: {}", transactionId, e);
return LocalTransactionState.UNKNOW;
}
}
}
实战案例
1. 普通消息发送与消费
消息发送服务
package com.ken.rocketmq.service;
import com.ken.rocketmq.entity.Order;
import com.ken.rocketmq.entity.RocketMQMessage;
import com.ken.rocketmq.util.RocketMQSendUtil;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 普通消息发送服务
*
* @author ken
*/
@RestController
@RequestMapping("/message")
@Tag(name = "普通消息接口", description = "提供同步、异步、单向消息的发送功能")
@Slf4j
public class MessageSendService {
@Autowired
private RocketMQSendUtil rocketMQSendUtil;
/**
* 消息主题
*/
private static final String ORDER_TOPIC = "order_topic";
/**
* 消息标签
*/
private static final String CREATE_ORDER_TAG = "create_order";
/**
* 发送同步消息
*
* @param order 订单信息
* @return 发送结果
*/
@PostMapping("/sync")
@Operation(summary = "发送同步消息", description = "发送创建订单的同步消息")
public String sendSyncMessage(
@Parameter(description = "订单信息") @RequestBody Order order) {
try {
SendResult sendResult = rocketMQSendUtil.sendSyncMessage(
ORDER_TOPIC, CREATE_ORDER_TAG, order);
return "同步消息发送成功,messageId: " + sendResult.getMsgId() +
",状态: " + sendResult.getSendStatus();
} catch (MQClientException | MQBrokerException | RemotingException | InterruptedException e) {
log.error("同步消息发送失败", e);
return "同步消息发送失败:" + e.getMessage();
}
}
/**
* 发送异步消息
*
* @param order 订单信息
* @return 发送结果
*/
@PostMapping("/async")
@Operation(summary = "发送异步消息", description = "发送创建订单的异步消息")
public String sendAsyncMessage(
@Parameter(description = "订单信息") @RequestBody Order order) {
try {
rocketMQSendUtil.sendAsyncMessage(ORDER_TOPIC, CREATE_ORDER_TAG, order, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("异步消息发送成功,messageId: {},状态: {}",
sendResult.getMsgId(), sendResult.getSendStatus());
// 异步发送成功后的处理逻辑
}
@Override
public void onException(Throwable e) {
log.error("异步消息发送失败", e);
// 异步发送失败后的处理逻辑
}
});
return "异步消息发送请求已提交";
} catch (MQClientException | RemotingException | InterruptedException e) {
log.error("异步消息发送请求提交失败", e);
return "异步消息发送请求提交失败:" + e.getMessage();
}
}
/**
* 发送单向消息
*
* @param order 订单信息
* @return 发送结果
*/
@PostMapping("/oneway")
@Operation(summary = "发送单向消息", description = "发送创建订单的单向消息")
public String sendOnewayMessage(
@Parameter(description = "订单信息") @RequestBody Order order) {
try {
rocketMQSendUtil.sendOnewayMessage(ORDER_TOPIC, CREATE_ORDER_TAG, order);
return "单向消息发送成功";
} catch (MQClientException | RemotingException | InterruptedException e) {
log.error("单向消息发送失败", e);
return "单向消息发送失败:" + e.getMessage();
}
}
}
消息消费服务
package com.ken.rocketmq.service;
import com.ken.rocketmq.entity.Order;
import com.ken.rocketmq.entity.RocketMQMessage;
import com.ken.rocketmq.util.RocketMQConsumeUtil;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* 普通消息消费服务
*
* @author ken
*/
@Service
@Slf4j
public class MessageConsumeService {
@Autowired
private RocketMQConsumeUtil rocketMQConsumeUtil;
/**
* 消息主题
*/
private static final String ORDER_TOPIC = "order_topic";
/**
* 消息标签
*/
private static final String CREATE_ORDER_TAG = "create_order";
/**
* 初始化消息订阅
*/
@PostConstruct
public void init() {
try {
// 订阅订单创建消息
subscribeCreateOrderMessage();
} catch (MQClientException e) {
log.error("消息订阅初始化失败", e);
throw new RuntimeException("消息订阅初始化失败", e);
}
}
/**
* 订阅订单创建消息
*
* @throws MQClientException 客户端异常
*/
private void subscribeCreateOrderMessage() throws MQClientException {
rocketMQConsumeUtil.subscribe(ORDER_TOPIC, CREATE_ORDER_TAG, Order.class, this::handleCreateOrderMessage);
}
/**
* 处理订单创建消息
*
* @param message 订单消息
*/
private void handleCreateOrderMessage(RocketMQMessage<Order> message) {
log.info("开始处理订单创建消息,messageId: {},businessId: {}",
message.getMessageId(), message.getBusinessId());
Order order = message.getContent();
if (order == null) {
log.error("订单信息为空,messageId: {}", message.getMessageId());
return;
}
try {
// 处理订单逻辑,例如:更新订单状态、通知用户等
log.info("处理订单,订单号: {},用户ID: {},金额: {}",
order.getOrderNo(), order.getUserId(), order.getAmount());
// TODO: 实际业务处理逻辑
log.info("订单消息处理完成,messageId: {},订单号: {}",
message.getMessageId(), order.getOrderNo());
} catch (Exception e) {
log.error("订单消息处理异常,messageId: {},订单号: {}",
message.getMessageId(), order.getOrderNo(), e);
// 抛出异常会触发消息重试
throw new RuntimeException("订单消息处理异常", e);
}
}
}
2. 顺序消息实现
package com.ken.rocketmq.service;
import com.ken.rocketmq.entity.Order;
import com.ken.rocketmq.util.RocketMQConsumeUtil;
import com.ken.rocketmq.util.RocketMQSendUtil;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 顺序消息服务
* 处理订单状态变更的顺序消息
*
* @author ken
*/
@RestController
@RequestMapping("/order/sequence")
@Tag(name = "顺序消息接口", description = "提供订单状态变更的顺序消息发送和消费功能")
@Slf4j
public class OrderSequenceMessageService {
@Autowired
private RocketMQSendUtil rocketMQSendUtil;
@Autowired
private RocketMQConsumeUtil rocketMQConsumeUtil;
/**
* 消息主题
*/
private static final String ORDER_STATUS_TOPIC = "order_status_topic";
/**
* 消息标签
*/
private static final String STATUS_CHANGE_TAG = "status_change";
/**
* 初始化顺序消息订阅
*/
@PostConstruct
public void init() {
try {
// 订阅订单状态变更的顺序消息
subscribeOrderStatusMessage();
} catch (MQClientException e) {
log.error("顺序消息订阅初始化失败", e);
throw new RuntimeException("顺序消息订阅初始化失败", e);
}
}
/**
* 发送订单状态变更的顺序消息
*
* @param order 订单信息
* @return 发送结果
*/
@PostMapping("/send")
@Operation(summary = "发送顺序消息", description = "发送订单状态变更的顺序消息")
public String sendOrderStatusMessage(
@Parameter(description = "订单信息") @RequestBody Order order) {
try {
// 使用订单号作为分片键,确保同一个订单的消息发送到同一个队列
String shardingKey = order.getOrderNo();
SendResult sendResult = rocketMQSendUtil.sendOrderlyMessage(
ORDER_STATUS_TOPIC, STATUS_CHANGE_TAG, order, shardingKey);
return "顺序消息发送成功,messageId: " + sendResult.getMsgId() +
",队列ID: " + sendResult.getMessageQueue().getQueueId();
} catch (MQClientException | MQBrokerException | RemotingException | InterruptedException e) {
log.error("顺序消息发送失败", e);
return "顺序消息发送失败:" + e.getMessage();
}
}
/**
* 订阅订单状态变更消息
*
* @throws MQClientException 客户端异常
*/
private void subscribeOrderStatusMessage() throws MQClientException {
rocketMQConsumeUtil.subscribeOrderly(ORDER_STATUS_TOPIC, STATUS_CHANGE_TAG, Order.class,
this::handleOrderStatusMessage);
}
/**
* 处理订单状态变更消息
*
* @param message 订单消息
*/
private void handleOrderStatusMessage(RocketMQMessage<Order> message) {
log.info("开始处理订单状态消息,messageId: {},businessId: {}",
message.getMessageId(), message.getBusinessId());
Order order = message.getContent();
if (order == null) {
log.error("订单信息为空,messageId: {}", message.getMessageId());
return;
}
try {
// 处理订单状态变更逻辑
log.info("处理订单状态变更,订单号: {},当前状态: {},用户ID: {},金额: {}",
order.getOrderNo(), order.getStatus(), order.getUserId(), order.getAmount());
// TODO: 实际业务处理逻辑,如更新订单状态、记录状态变更日志等
log.info("订单状态消息处理完成,messageId: {},订单号: {}",
message.getMessageId(), order.getOrderNo());
} catch (Exception e) {
log.error("订单状态消息处理异常,messageId: {},订单号: {}",
message.getMessageId(), order.getOrderNo(), e);
// 抛出异常会触发消息重试
throw new RuntimeException("订单状态消息处理异常", e);
}
}
}
3. 事务消息处理
package com.ken.rocketmq.service;
import com.ken.rocketmq.entity.Order;
import com.ken.rocketmq.util.RocketMQSendUtil;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.ExecutionException;
/**
* 事务消息服务
* 处理创建订单的事务消息
*
* @author ken
*/
@RestController
@RequestMapping("/order/transaction")
@Tag(name = "事务消息接口", description = "提供创建订单的事务消息发送功能")
@Slf4j
public class OrderTransactionMessageService {
@Autowired
private RocketMQSendUtil rocketMQSendUtil;
/**
* 消息主题
*/
private static final String ORDER_TRANSACTION_TOPIC = "order_transaction_topic";
/**
* 消息标签
*/
private static final String CREATE_ORDER_TAG = "create_order";
/**
* 发送创建订单的事务消息
*
* @param order 订单信息
* @return 发送结果
*/
@PostMapping("/send")
@Operation(summary = "发送事务消息", description = "发送创建订单的事务消息")
public String sendCreateOrderTransactionMessage(
@Parameter(description = "订单信息") @RequestBody Order order) {
try {
// 事务参数,用于在TransactionListener中判断事务执行结果
// 实际应用中可以传递业务关键信息
String transactionArg = "success"; // success表示成功,fail表示失败
SendResult sendResult = rocketMQSendUtil.sendTransactionMessage(
ORDER_TRANSACTION_TOPIC, CREATE_ORDER_TAG, order, transactionArg);
return "事务消息发送成功,messageId: " + sendResult.getMsgId() +
",状态: " + sendResult.getSendStatus();
} catch (MQClientException | RemotingException | InterruptedException | ExecutionException e) {
log.error("事务消息发送失败", e);
return "事务消息发送失败:" + e.getMessage();
}
}
}
4. 延迟消息应用
package com.ken.rocketmq.service;
import com.ken.rocketmq.entity.Order;
import com.ken.rocketmq.util.RocketMQConsumeUtil;
import com.ken.rocketmq.util.RocketMQSendUtil;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* 延迟消息服务
* 处理订单超时取消的延迟消息
*
* @author ken
*/
@RestController
@RequestMapping("/order/delay")
@Tag(name = "延迟消息接口", description = "提供订单超时取消的延迟消息发送和消费功能")
@Slf4j
public class OrderDelayMessageService {
@Autowired
private RocketMQSendUtil rocketMQSendUtil;
@Autowired
private RocketMQConsumeUtil rocketMQConsumeUtil;
/**
* 消息主题
*/
private static final String ORDER_DELAY_TOPIC = "order_delay_topic";
/**
* 消息标签
*/
private static final String CANCEL_ORDER_TAG = "cancel_order";
/**
* RocketMQ延迟级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
*/
private static final int[] DELAY_LEVELS = {1, 5, 10, 30, 60, 120, 180, 240, 300, 360, 420, 480, 540, 600, 1200, 1800, 3600, 7200};
/**
* 初始化延迟消息订阅
*/
@PostConstruct
public void init() {
try {
// 订阅订单超时取消的延迟消息
subscribeOrderCancelMessage();
} catch (MQClientException e) {
log.error("延迟消息订阅初始化失败", e);
throw new RuntimeException("延迟消息订阅初始化失败", e);
}
}
/**
* 发送订单超时取消的延迟消息
*
* @param order 订单信息
* @param delayLevel 延迟级别(1-18)
* @return 发送结果
*/
@PostMapping("/send")
@Operation(summary = "发送延迟消息", description = "发送订单超时取消的延迟消息")
public String sendOrderCancelDelayMessage(
@Parameter(description = "订单信息") @RequestBody Order order,
@Parameter(description = "延迟级别(1-18),对应延迟时间:1s 5s 10s 30s 1m ... 2h")
@RequestParam int delayLevel) {
if (delayLevel < 1 || delayLevel > 18) {
return "延迟级别必须在1-18之间";
}
try {
SendResult sendResult = rocketMQSendUtil.sendSyncMessage(
ORDER_DELAY_TOPIC, CANCEL_ORDER_TAG, order, order.getOrderNo(), 0, delayLevel);
int delayTime = DELAY_LEVELS[delayLevel - 1];
String delayUnit = delayTime < 60 ? "秒" : "分钟";
int displayTime = delayTime < 60 ? delayTime : delayTime / 60;
return String.format("延迟消息发送成功,messageId: %s,订单号: %s,将在%d%s后处理",
sendResult.getMsgId(), order.getOrderNo(), displayTime, delayUnit);
} catch (MQClientException | MQBrokerException | RemotingException | InterruptedException e) {
log.error("延迟消息发送失败", e);
return "延迟消息发送失败:" + e.getMessage();
}
}
/**
* 订阅订单超时取消消息
*
* @throws MQClientException 客户端异常
*/
private void subscribeOrderCancelMessage() throws MQClientException {
rocketMQConsumeUtil.subscribe(ORDER_DELAY_TOPIC, CANCEL_ORDER_TAG, Order.class,
this::handleOrderCancelMessage);
}
/**
* 处理订单超时取消消息
*
* @param message 订单消息
*/
private void handleOrderCancelMessage(RocketMQMessage<Order> message) {
log.info("开始处理订单超时取消消息,messageId: {},businessId: {}",
message.getMessageId(), message.getBusinessId());
Order order = message.getContent();
if (order == null) {
log.error("订单信息为空,messageId: {}", message.getMessageId());
return;
}
try {
// 处理订单超时取消逻辑
log.info("处理订单超时取消,订单号: {},用户ID: {},当前状态: {}",
order.getOrderNo(), order.getUserId(), order.getStatus());
// TODO: 实际业务处理逻辑,如检查订单状态,如果仍未支付则取消订单
log.info("订单超时取消消息处理完成,messageId: {},订单号: {}",
message.getMessageId(), order.getOrderNo());
} catch (Exception e) {
log.error("订单超时取消消息处理异常,messageId: {},订单号: {}",
message.getMessageId(), order.getOrderNo(), e);
// 抛出异常会触发消息重试
throw new RuntimeException("订单超时取消消息处理异常", e);
}
}
}
数据库表设计
为了配合上面的实战案例,我们需要设计相关的数据库表:
-- 创建数据库
CREATE DATABASE IF NOT EXISTS rocketmq_demo CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
-- 使用数据库
USE rocketmq_demo;
-- 订单表
CREATE TABLE IF NOT EXISTS `order` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`order_no` varchar(64) NOT NULL COMMENT '订单编号',
`user_id` bigint NOT NULL COMMENT '用户ID',
`amount` decimal(10,2) NOT NULL COMMENT '订单金额',
`status` tinyint NOT NULL COMMENT '订单状态:0-创建中,1-已创建,2-已支付,3-已取消,4-已完成',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_order_no` (`order_no`),
KEY `idx_user_id` (`user_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单表';
-- 订单状态变更日志表
CREATE TABLE IF NOT EXISTS `order_status_log` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`order_no` varchar(64) NOT NULL COMMENT '订单编号',
`before_status` tinyint NOT NULL COMMENT '变更前状态',
`after_status` tinyint NOT NULL COMMENT '变更后状态',
`operate_user` varchar(64) NOT NULL COMMENT '操作人',
`operate_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '操作时间',
`remark` varchar(255) DEFAULT NULL COMMENT '备注',
PRIMARY KEY (`id`),
KEY `idx_order_no` (`order_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单状态变更日志表';
-- 事务消息状态表
CREATE TABLE IF NOT EXISTS `transaction_message` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`transaction_id` varchar(64) NOT NULL COMMENT '事务ID',
`message_id` varchar(64) NOT NULL COMMENT '消息ID',
`business_id` varchar(64) NOT NULL COMMENT '业务ID',
`topic` varchar(128) NOT NULL COMMENT '消息主题',
`tag` varchar(64) DEFAULT NULL COMMENT '消息标签',
`status` tinyint NOT NULL COMMENT '状态:0-处理中,1-已提交,2-已回滚',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
`update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_transaction_id` (`transaction_id`),
KEY `idx_business_id` (`business_id`),
KEY `idx_status` (`status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='事务消息状态表';
项目启动类
package com.ken.rocketmq;
import io.swagger.v3.oas.annotations.OpenAPIDefinition;
import io.swagger.v3.oas.annotations.info.Info;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* RocketMQ实战示例项目启动类
*
* @author ken
*/
@SpringBootApplication
@MapperScan("com.ken.rocketmq.mapper")
@OpenAPIDefinition(info = @Info(title = "RocketMQ实战API", version = "1.0", description = "RocketMQ实战示例接口文档"))
public class RocketmqDemoApplication {
public static void main(String[] args) {
SpringApplication.run(RocketmqDemoApplication.class, args);
}
}
总结
本文详细介绍了 RocketMQ 的实战应用,从核心架构原理到具体的配置类和工具类实现,再到各类消息的实战案例,全面覆盖了 RocketMQ 的主要使用场景。
