Spring Boot集成Redis Stream消息队列:从入门到实战

Spring Boot集成Redis Stream消息队列:从入门到实战

在现代分布式系统中,消息队列是实现系统解耦、异步处理和流量削峰的重要组件。Redis Stream作为Redis 5.0引入的新数据类型,提供了完整的消息队列功能,成为轻量级消息中间件的优秀选择。

前言

早期我使用 redis pubsub 的方式实现了消息订阅,但是在使用过程,发现如果部署多实例将会重复处理消息事件,导致业务重复处理的情况。

根本原因是 redis pubsub 都是直接广播,无法控制多实例重复订阅消息的情况。

为了解决这个问题,则需要改动 Redis Stream 的方式来构建消息队列。

以下我分别三个案例逐步实践,达到一个可以适应生产环境要求的成熟方案:

简单使用 StringRedisTemplate 构建一个消费队列使用 StringRedisTemplate 构建多个消费队列使用 RedissonClient 构建多个消费队列,包含消费队列的运维监控、消费队列查询/重置 等功能

一、Redis Stream简介与核心特性

Redis Stream是Redis 5.0版本专门为消息队列场景设计的数据结构,它借鉴了Kafka的设计理念,提供了消息持久化消费者组消息确认机制等核心功能。

Redis Stream核心优势

高性能:基于内存操作,吞吐量高持久化:消息可持久化保存,支持RDB和AOF消费者组:支持多消费者负载均衡,确保消息不被重复消费阻塞操作:支持类似Kafka的长轮询机制

Redis Stream基础操作示例

1. 添加消息到Stream

# 自动生成消息ID
XADD mystream * name "订单创建" order_id "1001" amount "299.99"

# 限制Stream最大长度(保留最新1000条消息)
XADD mystream MAXLEN 1000 * name "订单支付" order_id "1001"
2. 查询消息

# 查询所有消息
XRANGE mystream - +

# 分页查询,每次返回10条
XRANGE mystream - + COUNT 10

# 反向查询(从新到旧)
XREVRANGE mystream + - COUNT 5
3. 监控新消息

# 阻塞监听新消息(0表示不超时)
XREAD BLOCK 0 STREAMS mystream $

二、Spring Boot 简单集成案例

1. 添加依赖


<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.18</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-pool2</artifactId>
    </dependency>
</dependencies>

2. 配置文件


# redis 配置
spring:
  redis:
    host: localhost
    port: 6379
    database: 0
    lettuce:
      pool:
        max-active: 8
        max-wait: -1
        max-idle: 8
        min-idle: 0

# redis stream 消费组配置
app:
  stream:
    key: "order_stream"
    group: "order_group"

3. 消息生产者服务


package com.lijw.mp.event;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import java.util.Map;
import java.util.UUID;

@Service
@Slf4j
public class MessageProducerService {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Value("${app.stream.key}")
    private String streamKey;

    /**
     * 发送消息到Redis Stream
     */
    public String sendMessage(String messageType, Map<String, String> data) {
        try {
            // 添加消息类型和时间戳
            data.put("messageType", messageType);
            data.put("timestamp", String.valueOf(System.currentTimeMillis()));
            data.put("messageId", UUID.randomUUID().toString());

            RecordId messageId = stringRedisTemplate.opsForStream()
                    .add(streamKey, data);

            log.info("消息发送成功: {}", messageId);
            return messageId.toString();
        } catch (Exception e) {
            log.error("消息发送失败: {}", e.getMessage());
            throw new RuntimeException("消息发送失败", e);
        }
    }
}

4. 消息消费者服务(支持幂等性)


package com.lijw.mp.event;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.Map;

@Component
@Slf4j
public class MessageConsumerService implements StreamListener<String, MapRecord<String, String, String>> {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Value("${app.stream.group}")
    private String groupName;

    @Value("${app.stream.key}")
    private String streamKey;

    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        String messageId = message.getId().toString();
        Map<String, String> messageBody = message.getValue();

        // 幂等性检查:防止重复处理[1](@ref)
        if (isMessageProcessed(messageId)) {
            log.info("消息已处理,跳过: {}", messageId);
            acknowledgeMessage(message);
            return;
        }

        try {
            log.info("消费者收到消息 - ID: {}, 内容: {}", messageId, messageBody);

            // 处理业务逻辑
            boolean processSuccess = processBusiness(messageBody);

            if (processSuccess) {
                // 标记消息已处理
                markMessageProcessed(messageId);
                // 清除重试次数
                clearRetryCount(messageId);
                // 手动确认消息
                acknowledgeMessage(message);
                log.info("消息处理完成: {}", messageId);
            } else {
                log.error("业务处理失败,消息将重试: {}", messageId);
                handleRetry(messageId, messageBody, message, "业务处理失败");
            }

        } catch (Exception e) {
            log.error("消息处理异常: {}", messageId, e);
            handleRetry(messageId, messageBody, message, "消息处理异常");
        }
    }

    /**
     * 幂等性检查
     */
    private boolean isMessageProcessed(String messageId) {
        // 使用Redis存储,判断redis是否已存在处理key,如果存在则说明消息已处理,确保多实例间幂等性
        return stringRedisTemplate.opsForValue().get("processed:" + messageId) != null;
    }

    /**
     * 标记消息已处理
     */
    private void markMessageProcessed(String messageId) {
        // 使用Redis存储事件处理ID,设置过期时间
        stringRedisTemplate.opsForValue().set("processed:" + messageId, "1",
                Duration.ofHours(24));
    }

    /**
     * 业务处理逻辑
     */
    private boolean processBusiness(Map<String, String> messageBody) {
        try {
            String messageType = messageBody.get("messageType");
            String orderId = messageBody.get("orderId");

            log.info("处理{}消息,订单ID: {}", messageType, orderId);

            // 模拟业务处理
            Thread.sleep(5000);

            return true;
        } catch (Exception e) {
            log.error("业务处理异常", e);
            return false;
        }
    }

    /**
     * 手动确认消息
     */
    private void acknowledgeMessage(MapRecord<String, String, String> message) {
        try {
            stringRedisTemplate.opsForStream()
                    .acknowledge(groupName, message);
        } catch (Exception e) {
            log.error("消息确认失败: {}", message.getId(), e);
        }
    }

    /**
     * 增加重试次数
     */
    private int incrementRetryCount(String messageId) {
        String retryKey = "retry:count:" + messageId;
        String countStr = stringRedisTemplate.opsForValue().get(retryKey);
        int retryCount = countStr == null ? 0 : Integer.parseInt(countStr);
        retryCount++;
        // 设置重试次数,过期时间为24小时
        stringRedisTemplate.opsForValue().set(retryKey, String.valueOf(retryCount), Duration.ofHours(24));
        return retryCount;
    }

    /**
     * 清除重试次数
     */
    private void clearRetryCount(String messageId) {
        String retryKey = "retry:count:" + messageId;
        stringRedisTemplate.delete(retryKey);
    }

    /**
     * 处理消息重试逻辑
     */
    private void handleRetry(String messageId, Map<String, String> messageBody, 
                             MapRecord<String, String, String> message, String errorDescription) {
        // 记录重试次数
        int retryCount = incrementRetryCount(messageId);
        // 检查是否超过最大重试次数
        int maxRetryCount = 3; // 最大重试次数,可根据业务需求调整
        if (retryCount >= maxRetryCount) {
            log.error("消息{}重试次数已达上限({}),将停止重试并记录: {}", errorDescription, maxRetryCount, messageId);
            // 记录失败消息到死信队列或告警(可根据业务需求实现)
            handleMaxRetryExceeded(messageId, messageBody, retryCount);
            // 确认消息,避免无限重试
            acknowledgeMessage(message);
        } else {
            log.warn("消息{},当前重试次数: {}/{}, 消息ID: {}", errorDescription, retryCount, maxRetryCount, messageId);
            // 不确认消息,等待重试
        }
    }

    /**
     * 处理超过最大重试次数的消息
     */
    private void handleMaxRetryExceeded(String messageId, Map<String, String> messageBody, int retryCount) {
        try {
            // 记录失败消息详情(可根据业务需求实现,如存储到数据库、发送告警等)
            String failedKey = "failed:message:" + messageId;
            String failedInfo = String.format("消息ID: %s, 重试次数: %d, 消息内容: %s, 失败时间: %s",
                    messageId, retryCount, messageBody, System.currentTimeMillis());
            stringRedisTemplate.opsForValue().set(failedKey, failedInfo, Duration.ofDays(7));
            log.error("失败消息已记录: {}", failedInfo);
            // TODO: 可根据业务需求添加其他处理逻辑,如:
            // 1. 发送告警通知
            // 2. 存储到数据库死信表
            // 3. 发送到死信队列
        } catch (Exception e) {
            log.error("处理超过最大重试次数消息异常: {}", messageId, e);
        }
    }
}

5. 消费者容器配置(支持多实例负载均衡)


package com.lijw.mp.config.redisstream;

import com.lijw.mp.event.MessageConsumerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.net.InetAddress;
import java.time.Duration;
import java.lang.management.ManagementFactory;
import java.util.UUID;
import java.util.concurrent.ExecutorService;

@Configuration
@Slf4j
public class RedisStreamConfig {

    @Autowired
    private RedisConnectionFactory redisConnectionFactory;

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Value("${app.stream.key}")
    private String streamKey;

    @Value("${app.stream.group}")
    private String groupName;

    /**
     * 创建消费者组(如果不存在)
     */
    @EventListener(ApplicationReadyEvent.class)
    public void createConsumerGroup() {
        try {
            stringRedisTemplate.opsForStream()
                    .createGroup(streamKey, groupName);
            log.info("创建消费者组成功: {}", groupName);
        } catch (Exception e) {
            log.info("消费者组已存在: {}", e.getMessage());
        }
    }

    /**
     * 配置Stream消息监听容器
     */
    @Bean
    public StreamMessageListenerContainer<String, MapRecord<String, String, String>>
    streamMessageListenerContainer(MessageConsumerService messageConsumerService) {

        // 容器配置
        StreamMessageListenerContainer<String, MapRecord<String, String, String>> container =
                StreamMessageListenerContainer.create(redisConnectionFactory,
                        StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
                                .pollTimeout(Duration.ofSeconds(2))
                                .batchSize(10) // 批量处理提高性能
                                .executor(createThreadPool()) // 自定义线程池
                                .build());

        // 为每个实例生成唯一消费者名称
        String consumerName = generateUniqueConsumerName();

        // 配置消费偏移量
        StreamOffset<String> offset = StreamOffset.create(streamKey, ReadOffset.lastConsumed());

        // 创建消费者
        Consumer consumer = Consumer.from(groupName, consumerName);

        // 构建读取请求(手动确认模式)
        StreamMessageListenerContainer.StreamReadRequest<String> request =
                StreamMessageListenerContainer.StreamReadRequest.builder(offset)
                        .consumer(consumer)
                        .autoAcknowledge(false) // 手动确认确保可靠性[2](@ref)
                        .cancelOnError(e -> false) // 错误时不停止消费
                        .build();

        container.register(request, messageConsumerService);
        container.start();

        log.info("Redis Stream消费者启动成功 - 消费者名称: {}", consumerName);
        return container;
    }

    /**
     * 生成唯一消费者名称(支持多实例部署的关键)
     * 使用IP+进程ID确保集群环境下唯一性
     */
    private String generateUniqueConsumerName() {
        try {
            String hostAddress = InetAddress.getLocalHost().getHostAddress();
            String processId = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
            return hostAddress + "_" + processId + "_" + System.currentTimeMillis();
        } catch (Exception e) {
            //  fallback:使用UUID
            return "consumer_" + UUID.randomUUID().toString().substring(0, 8);
        }
    }

    /**
     * 创建专用线程池
     */
    private ExecutorService createThreadPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(5);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("redis-stream-");
        executor.setDaemon(true);
        executor.initialize();
        return executor.getThreadPoolExecutor();
    }
}

6. 待处理消息重试机制


package com.lijw.mp.config.redisstream;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.connection.stream.PendingMessage;
import org.springframework.data.redis.connection.stream.PendingMessages;
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.time.Duration;

@Component
@Slf4j
public class PendingMessageRetryService {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Value("${app.stream.key}")
    private String streamKey;

    @Value("${app.stream.group}")
    private String groupName;

    /**
     * 定时处理未确认的消息
     * 执行时机:每30秒执行一次(通过@Scheduled注解配置)
     */
    @Scheduled(fixedDelay = 30000) // 每30秒执行一次
    public void retryPendingMessages() {
        try {
            // 获取待处理消息摘要
            PendingMessagesSummary pendingSummary = stringRedisTemplate.opsForStream()
                    .pending(streamKey, groupName);

            if (pendingSummary != null) {
                log.info("检查待处理消息,消费者组: {}", groupName);
                
                // TODO: 根据Spring Data Redis版本,使用正确的API获取详细的pending消息列表
                // 例如:使用pendingRange方法或其他方法获取PendingMessages
                // 获取到PendingMessages后,调用processPendingMessages方法进行处理
                // 
                // 示例调用(需要根据实际API调整):
                // PendingMessages pendingMessages = stringRedisTemplate.opsForStream()
                //         .pendingRange(streamKey, groupName, ...);
                // if (pendingMessages != null && pendingMessages.size() > 0) {
                //     processPendingMessages(pendingMessages);
                // }
            }
        } catch (Exception e) {
            log.error("处理待处理消息异常", e);
        }
    }

    /**
     * 处理待处理消息列表
     * 
     * 执行时机:
     * 1. 由 retryPendingMessages() 定时任务调用(每30秒执行一次)
     * 2. 当 retryPendingMessages() 获取到 PendingMessages 列表后调用
     * 3. 用于处理Redis Stream中未被确认(ACK)的消息
     * 
     * 处理逻辑:
     * - 遍历每条pending消息
     * - 记录消息ID和消费者名称
     * - 检查消息空闲时间,如果超过阈值则重新分配
     */
    private void processPendingMessages(PendingMessages pendingMessages) {
        pendingMessages.forEach(message -> {
            String messageId = message.getId().toString();
            String consumerName = message.getConsumerName();
            
            // 注意:根据Spring Data Redis版本,PendingMessage的API可能不同
            // 获取空闲时间的方法名可能是getIdleTimeMs()、getElapsedTimeMs()等
            // 这里提供基础框架,需要根据实际API调整
            
            log.info("处理待处理消息: {}, 消费者: {}", messageId, consumerName);

            // 如果消息空闲时间超过阈值(如5分钟),重新分配
            // 示例逻辑(需要根据实际API调整):
            // Duration idleTime = Duration.ofMillis(message.getIdleTimeMs());
            // if (idleTime.toMinutes() > 5) {
            //     log.info("重新分配超时消息: {}, 原消费者: {}, 空闲时间: {}分钟",
            //             messageId, consumerName, idleTime.toMinutes());
            //     // 可以使用XCLAIM命令将消息重新分配给其他消费者
            //     // stringRedisTemplate.opsForStream().claim(...);
            // }
        });
    }
}

7. REST控制器


package com.lijw.mp.controller;

import com.lijw.mp.event.MessageProducerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.util.HashMap;
import java.util.Map;

@RestController
@RequestMapping("/api/message")
@Slf4j
public class MessageController {

    @Autowired
    private MessageProducerService messageProducerService;

    @PostMapping("/send-order")
    public ResponseEntity<Map<String, Object>> sendOrderMessage(
            @RequestParam String orderId,
            @RequestParam String amount) {

        Map<String, String> message = new HashMap<>();
        message.put("orderId", orderId);
        message.put("amount", amount);
        message.put("messageType", "ORDER_CREATED");

        try {
            String messageId = messageProducerService.sendMessage("ORDER", message);

            Map<String, Object> result = new HashMap<>();
            result.put("success", true);
            result.put("messageId", messageId);
            result.put("timestamp", System.currentTimeMillis());

            return ResponseEntity.ok(result);
        } catch (Exception e) {
            log.error("发送消息失败", e);

            Map<String, Object> result = new HashMap<>();
            result.put("success", false);
            result.put("error", e.getMessage());

            return ResponseEntity.status(500).body(result);
        }
    }

    @PostMapping("/send-custom")
    public ResponseEntity<Map<String, Object>> sendCustomMessage(
            @RequestBody Map<String, String> messageData) {

        try {
            String messageId = messageProducerService.sendMessage("CUSTOM", messageData);

            Map<String, Object> result = new HashMap<>();
            result.put("success", true);
            result.put("messageId", messageId);

            return ResponseEntity.ok(result);
        } catch (Exception e) {
            log.error("发送自定义消息失败", e);

            Map<String, Object> result = new HashMap<>();
            result.put("success", false);
            result.put("error", e.getMessage());

            return ResponseEntity.status(500).body(result);
        }
    }
}
7.1 测试发送订单消息

Spring Boot集成Redis Stream消息队列:从入门到实战


POST http://localhost:8083/api/message/send-order

orderId=order123456
amount=500
7.2 测试发送自定义消息

Spring Boot集成Redis Stream消息队列:从入门到实战


POST http://localhost:8083/api/message/send-custom

{
    "key1": "value1",
    "key2": "valuie2"
}

8.启动多个SpringBoot实例,测试实例是否会重复处理事件

8.1 通过修改端口号,启动多个实例

实例1


server:
  port: 8083

实例2


server:
  port: 8084
8.2 发送自定义事件消息

Spring Boot集成Redis Stream消息队列:从入门到实战
发送多条事件消息

8.3 查看实例日志,确认事件未被重复消费

实例1
Spring Boot集成Redis Stream消息队列:从入门到实战

实例2
Spring Boot集成Redis Stream消息队列:从入门到实战
可以看到 1764271408044-0 只在实例2处理,并没有在实例1处理。说明多个实例并不会重复消费同一事件。

三、SpringBoot 使用 StringRedisTemplate 集成 Redis Stream 进阶:配置多个消费组

1.配置文件

配置多个消费组key


# redis 配置
spring:
  redis:
    host: localhost
    port: 6379
    database: 0
    lettuce:
      pool:
        max-active: 8
        max-wait: -1
        max-idle: 8
        min-idle: 0

# redis stream 配置多个消费组key
app:
  stream:
    groups:
      - key: "order_stream"
        group: "order_group"
        consumer-prefix: "consumer_"
      - key: "payment_stream"
        group: "payment_group"
        consumer-prefix: "consumer_"
      - key: "notification_stream"
        group: "notification_group"
        consumer-prefix: "consumer_"

2.Redis Stream消费组配置属性


package com.lijw.mp.config.redisstream;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * Redis Stream消费组配置属性
 * - 读取 app.stream 配置前缀下的所有配置项
 *
 * @author Aron.li
 * @date 2025/11/30 11:21
 */
@Data
@Component
@ConfigurationProperties(prefix = "app.stream")
public class StreamGroupProperties {

    /**
     * 消费组列表
     */
    private List<StreamGroupConfig> groups;

    /**
     * 单个消费组配置
     */
    @Data
    public static class StreamGroupConfig {
        /**
         * Stream键名
         */
        private String key;
        
        /**
         * 消费组名称
         */
        private String group;
        
        /**
         * 消费者名称前缀
         */
        private String consumerPrefix;
    }
}

3. 消息生产者服务


package com.lijw.mp.event;

import com.lijw.mp.config.redisstream.StreamGroupProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import java.util.Map;
import java.util.UUID;

/**
 * Redis Stream消息生产者服务
 * 负责向Redis Stream发送消息
 */
@Service
@Slf4j
public class MessageProducerService {

    /**
     * Redis字符串模板
     */
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    /**
     * Stream消费组配置属性
     */
    @Autowired
    private StreamGroupProperties streamGroupProperties;

    /**
     * 发送消息到Redis Stream(根据消息类型自动选择stream)
     * 
     * @param messageType 消息类型
     * @param data 消息数据
     * @return 消息ID
     */
    public String sendMessage(String messageType, Map<String, String> data) {
        // 根据消息类型自动选择stream key
        String streamKey = getStreamKeyByMessageType(messageType);
        return sendMessage(streamKey, messageType, data);
    }

    /**
     * 发送消息到指定的Redis Stream
     * 
     * @param streamKey Stream键名
     * @param messageType 消息类型
     * @param data 消息数据
     * @return 消息ID
     */
    public String sendMessage(String streamKey, String messageType, Map<String, String> data) {
        try {
            // 添加消息类型和时间戳
            data.put("messageType", messageType);
            data.put("timestamp", String.valueOf(System.currentTimeMillis()));
            data.put("messageId", UUID.randomUUID().toString());

            RecordId messageId = stringRedisTemplate.opsForStream()
                    .add(streamKey, data);

            log.info("消息发送成功 - Stream: {}, MessageId: {}", streamKey, messageId);
            return messageId.toString();
        } catch (Exception e) {
            log.error("消息发送失败 - Stream: {}", streamKey, e);
            throw new RuntimeException("消息发送失败", e);
        }
    }

    /**
     * 根据消息类型获取对应的stream key
     * 
     * @param messageType 消息类型
     * @return Stream键名
     */
    private String getStreamKeyByMessageType(String messageType) {
        if (streamGroupProperties.getGroups() == null || streamGroupProperties.getGroups().isEmpty()) {
            throw new RuntimeException("未配置任何消费组");
        }

        // 根据消息类型前缀匹配stream key
        String upperMessageType = messageType.toUpperCase();
        if (upperMessageType.contains("ORDER")) {
            return findStreamKey("order");
        } else if (upperMessageType.contains("PAYMENT") || upperMessageType.contains("PAY")) {
            return findStreamKey("payment");
        } else if (upperMessageType.contains("NOTIFICATION") || upperMessageType.contains("NOTIFY")) {
            return findStreamKey("notification");
        } else {
            // 默认使用第一个stream
            log.warn("未匹配到消息类型对应的stream,使用默认stream: {}", messageType);
            return streamGroupProperties.getGroups().get(0).getKey();
        }
    }

    /**
     * 根据关键字查找stream key
     * 
     * @param keyword 关键字
     * @return Stream键名
     */
    private String findStreamKey(String keyword) {
        if (streamGroupProperties.getGroups() != null) {
            for (StreamGroupProperties.StreamGroupConfig groupConfig : streamGroupProperties.getGroups()) {
                if (groupConfig.getKey().toLowerCase().contains(keyword.toLowerCase())) {
                    return groupConfig.getKey();
                }
            }
        }
        // 如果找不到,返回第一个stream
        if (streamGroupProperties.getGroups() != null && !streamGroupProperties.getGroups().isEmpty()) {
            return streamGroupProperties.getGroups().get(0).getKey();
        }
        throw new RuntimeException("未找到匹配的stream key: " + keyword);
    }
}

4. 消息消费者服务(支持幂等性)


package com.lijw.mp.event;

import com.lijw.mp.config.redisstream.StreamGroupProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Redis Stream消息消费者服务
 * 负责消费Redis Stream中的消息并进行业务处理
 */
@Component
@Slf4j
public class MessageConsumerService implements StreamListener<String, MapRecord<String, String, String>> {

    /**
     * Redis字符串模板
     */
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    /**
     * Stream消费组配置属性
     */
    @Autowired
    private StreamGroupProperties streamGroupProperties;

    /**
     * 缓存stream key到group name的映射关系
     */
    private final Map<String, String> streamToGroupMap = new ConcurrentHashMap<>();

    /**
     * 消息消费处理方法
     * 当Redis Stream中有新消息时,会调用此方法
     * 
     * @param message Stream消息记录
     */
    @Override
    public void onMessage(MapRecord<String, String, String> message) {
        String messageId = message.getId().toString();
        String streamKey = message.getStream();
        Map<String, String> messageBody = message.getValue();

        // 获取对应的消费组名称
        String groupName = getGroupNameByStreamKey(streamKey);
        if (groupName == null) {
            log.error("未找到stream key对应的消费组: {}, 消息ID: {}", streamKey, messageId);
            return;
        }

        // 幂等性检查:防止重复处理
        if (isMessageProcessed(messageId)) {
            log.info("消息已处理,跳过: {}", messageId);
            acknowledgeMessage(message, groupName);
            return;
        }

        try {
            log.info("消费者收到消息 - Stream: {}, Group: {}, ID: {}, 内容: {}", streamKey, groupName, messageId, messageBody);

            // 处理业务逻辑(根据stream key路由到不同的业务处理)
            boolean processSuccess = processBusiness(streamKey, messageBody);

            if (processSuccess) {
                // 标记消息已处理
                markMessageProcessed(messageId);
                // 清除重试次数
                clearRetryCount(messageId);
                // 手动确认消息
                acknowledgeMessage(message, groupName);
                log.info("消息处理完成: {}", messageId);
            } else {
                log.error("业务处理失败,消息将重试: {}", messageId);
                handleRetry(messageId, messageBody, message, groupName, "业务处理失败");
            }

        } catch (Exception e) {
            log.error("消息处理异常: {}", messageId, e);
            handleRetry(messageId, messageBody, message, groupName, "消息处理异常");
        }
    }

    /**
     * 根据stream key获取对应的消费组名称
     * 
     * @param streamKey Stream键名
     * @return 消费组名称,如果未找到返回null
     */
    private String getGroupNameByStreamKey(String streamKey) {
        // 先从缓存中获取
        if (streamToGroupMap.containsKey(streamKey)) {
            return streamToGroupMap.get(streamKey);
        }
        
        // 从配置中查找
        if (streamGroupProperties.getGroups() != null) {
            for (StreamGroupProperties.StreamGroupConfig groupConfig : streamGroupProperties.getGroups()) {
                if (groupConfig.getKey().equals(streamKey)) {
                    streamToGroupMap.put(streamKey, groupConfig.getGroup());
                    return groupConfig.getGroup();
                }
            }
        }
        
        return null;
    }

    /**
     * 幂等性检查
     * 使用Redis存储,判断redis是否已存在处理key,如果存在则说明消息已处理,确保多实例间幂等性
     * 
     * @param messageId 消息ID
     * @return 如果消息已处理返回true,否则返回false
     */
    private boolean isMessageProcessed(String messageId) {
        // 使用Redis存储,判断redis是否已存在处理key,如果存在则说明消息已处理,确保多实例间幂等性
        return stringRedisTemplate.opsForValue().get("processed:" + messageId) != null;
    }

    /**
     * 标记消息已处理
     * 使用Redis存储事件处理ID,设置过期时间
     * 
     * @param messageId 消息ID
     */
    private void markMessageProcessed(String messageId) {
        // 使用Redis存储事件处理ID,设置过期时间
        stringRedisTemplate.opsForValue().set("processed:" + messageId, "1",
                Duration.ofHours(24));
    }

    /**
     * 业务处理逻辑(根据stream key路由到不同的业务处理)
     */
    private boolean processBusiness(String streamKey, Map<String, String> messageBody) {
        try {
            // 根据stream key路由到不同的业务处理
            if (streamKey.contains("order")) {
                return processOrderBusiness(messageBody);
            } else if (streamKey.contains("payment")) {
                return processPaymentBusiness(messageBody);
            } else if (streamKey.contains("notification")) {
                return processNotificationBusiness(messageBody);
            } else {
                log.warn("未识别的stream key: {}, 使用默认处理逻辑", streamKey);
                return processDefaultBusiness(messageBody);
            }
        } catch (Exception e) {
            log.error("业务处理异常", e);
            return false;
        }
    }

    /**
     * 处理订单业务
     */
    private boolean processOrderBusiness(Map<String, String> messageBody) {
        String messageType = messageBody.get("messageType");
        String orderId = messageBody.get("orderId");
        log.info("处理订单{}消息,订单ID: {}", messageType, orderId);
        // TODO: 实现订单业务处理逻辑
        try {
            // 模拟10秒处理业务
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return true;
    }

    /**
     * 处理支付业务
     */
    private boolean processPaymentBusiness(Map<String, String> messageBody) {
        String messageType = messageBody.get("messageType");
        String paymentId = messageBody.get("paymentId");
        log.info("处理支付{}消息,支付ID: {}", messageType, paymentId);
        // TODO: 实现支付业务处理逻辑
        return true;
    }

    /**
     * 处理通知业务
     */
    private boolean processNotificationBusiness(Map<String, String> messageBody) {
        String messageType = messageBody.get("messageType");
        String userId = messageBody.get("userId");
        log.info("处理通知{}消息,用户ID: {}", messageType, userId);
        // TODO: 实现通知业务处理逻辑
        return true;
    }

    /**
     * 默认业务处理
     */
    private boolean processDefaultBusiness(Map<String, String> messageBody) {
        String messageType = messageBody.get("messageType");
        log.info("处理默认业务消息: {}", messageType);
        // TODO: 实现默认业务处理逻辑
        return true;
    }

    /**
     * 手动确认消息
     * 
     * @param message 消息记录
     * @param groupName 消费组名称
     */
    private void acknowledgeMessage(MapRecord<String, String, String> message, String groupName) {
        try {
            stringRedisTemplate.opsForStream()
                    .acknowledge(groupName, message);
        } catch (Exception e) {
            log.error("消息确认失败: {}", message.getId(), e);
        }
    }

    /**
     * 增加重试次数
     * 
     * @param messageId 消息ID
     * @return 当前重试次数
     */
    private int incrementRetryCount(String messageId) {
        String retryKey = "retry:count:" + messageId;
        String countStr = stringRedisTemplate.opsForValue().get(retryKey);
        int retryCount = countStr == null ? 0 : Integer.parseInt(countStr);
        retryCount++;
        // 设置重试次数,过期时间为24小时
        stringRedisTemplate.opsForValue().set(retryKey, String.valueOf(retryCount), Duration.ofHours(24));
        return retryCount;
    }

    /**
     * 清除重试次数
     * 
     * @param messageId 消息ID
     */
    private void clearRetryCount(String messageId) {
        String retryKey = "retry:count:" + messageId;
        stringRedisTemplate.delete(retryKey);
    }

    /**
     * 处理消息重试逻辑
     * 
     * @param messageId 消息ID
     * @param messageBody 消息内容
     * @param message 消息记录
     * @param groupName 消费组名称
     * @param errorDescription 错误描述
     */
    private void handleRetry(String messageId, Map<String, String> messageBody, 
                             MapRecord<String, String, String> message, String groupName, String errorDescription) {
        // 记录重试次数
        int retryCount = incrementRetryCount(messageId);
        // 检查是否超过最大重试次数
        int maxRetryCount = 3; // 最大重试次数,可根据业务需求调整
        if (retryCount >= maxRetryCount) {
            log.error("消息{}重试次数已达上限({}),将停止重试并记录: {}", errorDescription, maxRetryCount, messageId);
            // 记录失败消息到死信队列或告警(可根据业务需求实现)
            handleMaxRetryExceeded(messageId, messageBody, retryCount);
            // 确认消息,避免无限重试
            acknowledgeMessage(message, groupName);
        } else {
            log.warn("消息{},当前重试次数: {}/{}, 消息ID: {}", errorDescription, retryCount, maxRetryCount, messageId);
            // 不确认消息,等待重试
        }
    }

    /**
     * 处理超过最大重试次数的消息
     * 记录失败消息详情,可根据业务需求扩展(如存储到数据库、发送告警等)
     * 
     * @param messageId 消息ID
     * @param messageBody 消息内容
     * @param retryCount 重试次数
     */
    private void handleMaxRetryExceeded(String messageId, Map<String, String> messageBody, int retryCount) {
        try {
            // 记录失败消息详情(可根据业务需求实现,如存储到数据库、发送告警等)
            String failedKey = "failed:message:" + messageId;
            String failedInfo = String.format("消息ID: %s, 重试次数: %d, 消息内容: %s, 失败时间: %s",
                    messageId, retryCount, messageBody, System.currentTimeMillis());
            stringRedisTemplate.opsForValue().set(failedKey, failedInfo, Duration.ofDays(7));
            log.error("失败消息已记录: {}", failedInfo);
            // TODO: 可根据业务需求添加其他处理逻辑,如:
            // 1. 发送告警通知
            // 2. 存储到数据库死信表
            // 3. 发送到死信队列
        } catch (Exception e) {
            log.error("处理超过最大重试次数消息异常: {}", messageId, e);
        }
    }
}

5.消费者容器配置(支持多实例负载均衡)


package com.lijw.mp.config.redisstream;

import com.lijw.mp.event.MessageConsumerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.net.InetAddress;
import java.time.Duration;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;

/**
 * Redis Stream配置类
 * 负责创建消费组、配置消息监听容器等
 */
@Configuration
@Slf4j
public class RedisStreamConfig {

    /**
     * Redis连接工厂
     */
    @Autowired
    private RedisConnectionFactory redisConnectionFactory;

    /**
     * Redis字符串模板
     */
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    /**
     * Stream消费组配置属性
     */
    @Autowired
    private StreamGroupProperties streamGroupProperties;

    /**
     * 应用启动完成后,验证所有消费组是否已创建
     * 主要用于日志记录和验证
     */
    @EventListener(ApplicationReadyEvent.class)
    public void verifyConsumerGroups() {
        if (streamGroupProperties.getGroups() == null || streamGroupProperties.getGroups().isEmpty()) {
            log.warn("未配置任何消费组");
            return;
        }
        
        log.info("验证所有消费组状态...");
        for (StreamGroupProperties.StreamGroupConfig groupConfig : streamGroupProperties.getGroups()) {
            try {
                // 验证消费组是否存在
                stringRedisTemplate.opsForStream().pending(groupConfig.getKey(), groupConfig.getGroup());
                log.info("消费组验证成功: stream={}, group={}", groupConfig.getKey(), groupConfig.getGroup());
            } catch (Exception e) {
                log.warn("消费组验证失败,尝试重新创建: stream={}, group={}", 
                        groupConfig.getKey(), groupConfig.getGroup());
                // 尝试重新创建
                try {
                    ensureStreamExists(groupConfig.getKey());
                    stringRedisTemplate.opsForStream()
                            .createGroup(groupConfig.getKey(), groupConfig.getGroup());
                    log.info("重新创建消费者组成功: stream={}, group={}", 
                            groupConfig.getKey(), groupConfig.getGroup());
                } catch (Exception ex) {
                    log.error("重新创建消费者组失败: stream={}, group={}, error={}", 
                            groupConfig.getKey(), groupConfig.getGroup(), ex.getMessage());
                }
            }
        }
    }

    /**
     * 确保stream存在,如果不存在则创建
     * 
     * @param streamKey Stream键名
     */
    private void ensureStreamExists(String streamKey) {
        try {
            // 检查stream是否存在
            Long size = stringRedisTemplate.opsForStream().size(streamKey);
            if (size == null || size == 0) {
                // Stream不存在,创建一个空消息然后立即删除,以创建stream
                // 或者直接使用XGROUP CREATE的MKSTREAM选项
                // 由于Spring Data Redis可能不支持MKSTREAM,我们通过添加一条临时消息来创建stream
                Map<String, String> tempData = new HashMap<>();
                tempData.put("_init", "true");
                tempData.put("_timestamp", String.valueOf(System.currentTimeMillis()));
                
                try {
                    stringRedisTemplate.opsForStream().add(streamKey, tempData);
                    log.debug("创建stream成功: {}", streamKey);
                } catch (Exception ex) {
                    log.debug("Stream可能已存在或创建失败: {}, {}", streamKey, ex.getMessage());
                }
            }
        } catch (Exception e) {
            // Stream不存在,创建它
            try {
                Map<String, String> tempData = new HashMap<>();
                tempData.put("_init", "true");
                tempData.put("_timestamp", String.valueOf(System.currentTimeMillis()));
                stringRedisTemplate.opsForStream().add(streamKey, tempData);
                log.info("创建stream成功: {}", streamKey);
            } catch (Exception ex) {
                log.warn("创建stream失败: {}, {}", streamKey, ex.getMessage());
            }
        }
    }

    /**
     * 为每个消费组配置Stream消息监听容器
     * 在创建监听容器之前,先确保所有消费组都已创建
     */
    @Bean
    public List<StreamMessageListenerContainer<String, MapRecord<String, String, String>>>
    streamMessageListenerContainers(MessageConsumerService messageConsumerService) {
        
        List<StreamMessageListenerContainer<String, MapRecord<String, String, String>>> containers = new ArrayList<>();
        
        if (streamGroupProperties.getGroups() == null || streamGroupProperties.getGroups().isEmpty()) {
            log.warn("未配置任何消费组,无法创建监听容器");
            return containers;
        }

        // 在创建监听容器之前,先确保所有消费组都已创建
        createAllConsumerGroups();

        for (StreamGroupProperties.StreamGroupConfig groupConfig : streamGroupProperties.getGroups()) {
            StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = 
                    createListenerContainer(groupConfig, messageConsumerService);
            containers.add(container);
        }

        return containers;
    }

    /**
     * 创建所有消费者组(如果不存在)
     * 如果stream不存在,会先创建stream
     */
    private void createAllConsumerGroups() {
        if (streamGroupProperties.getGroups() == null || streamGroupProperties.getGroups().isEmpty()) {
            log.warn("未配置任何消费组");
            return;
        }
        
        for (StreamGroupProperties.StreamGroupConfig groupConfig : streamGroupProperties.getGroups()) {
            try {
                // 先检查stream是否存在,如果不存在则创建
                ensureStreamExists(groupConfig.getKey());
                
                // 创建消费组
                stringRedisTemplate.opsForStream()
                        .createGroup(groupConfig.getKey(), groupConfig.getGroup());
                log.info("创建消费者组成功: stream={}, group={}", groupConfig.getKey(), groupConfig.getGroup());
            } catch (Exception e) {
                // 如果消费组已存在,这是正常情况
                String errorMsg = e.getMessage() != null ? e.getMessage() : "";
                if (errorMsg.contains("BUSYGROUP") || errorMsg.contains("already exists")) {
                    log.info("消费者组已存在: stream={}, group={}", groupConfig.getKey(), groupConfig.getGroup());
                } else {
                    log.warn("创建消费者组失败: stream={}, group={}, error={}", 
                            groupConfig.getKey(), groupConfig.getGroup(), errorMsg);
                    // 即使创建失败,也继续处理其他消费组
                }
            }
        }
    }

    /**
     * 为单个消费组创建监听容器
     * 
     * @param groupConfig 消费组配置
     * @param messageConsumerService 消息消费者服务
     * @return Stream消息监听容器
     */
    private StreamMessageListenerContainer<String, MapRecord<String, String, String>>
    createListenerContainer(StreamGroupProperties.StreamGroupConfig groupConfig, 
                           MessageConsumerService messageConsumerService) {
        
        // 容器配置
        StreamMessageListenerContainer<String, MapRecord<String, String, String>> container =
                StreamMessageListenerContainer.create(redisConnectionFactory,
                        StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
                                .pollTimeout(Duration.ofSeconds(2)) // 轮询超时时间
                                .batchSize(10) // 批量处理提高性能
                                .executor(createThreadPool()) // 自定义线程池
                                .build());

        // 为每个实例生成唯一消费者名称
        String consumerName = generateUniqueConsumerName(groupConfig.getConsumerPrefix());

        // 配置消费偏移量(从最后消费的位置开始)
        StreamOffset<String> offset = StreamOffset.create(groupConfig.getKey(), ReadOffset.lastConsumed());

        // 创建消费者
        Consumer consumer = Consumer.from(groupConfig.getGroup(), consumerName);

        // 构建读取请求(手动确认模式)
        StreamMessageListenerContainer.StreamReadRequest<String> request =
                StreamMessageListenerContainer.StreamReadRequest.builder(offset)
                        .consumer(consumer)
                        .autoAcknowledge(false) // 手动确认确保可靠性
                        .cancelOnError(e -> false) // 错误时不停止消费
                        .build();

        // 注册监听器并启动容器
        container.register(request, messageConsumerService);
        container.start();

        log.info("Redis Stream消费者启动成功 - Stream: {}, Group: {}, Consumer: {}", 
                groupConfig.getKey(), groupConfig.getGroup(), consumerName);
        return container;
    }

    /**
     * 生成唯一消费者名称(支持多实例部署的关键)
     * 使用IP+进程ID确保集群环境下唯一性
     * 
     * @param prefix 消费者名称前缀
     * @return 唯一的消费者名称
     */
    private String generateUniqueConsumerName(String prefix) {
        try {
            String hostAddress = InetAddress.getLocalHost().getHostAddress();
            String processId = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
            return (prefix != null ? prefix : "consumer_") + hostAddress + "_" + processId + "_" + System.currentTimeMillis();
        } catch (Exception e) {
            //  fallback:使用UUID
            return (prefix != null ? prefix : "consumer_") + UUID.randomUUID().toString().substring(0, 8);
        }
    }

    /**
     * 创建专用线程池
     * 用于处理Redis Stream消息消费
     * 
     * @return 线程池执行器
     */
    private ExecutorService createThreadPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2); // 核心线程数
        executor.setMaxPoolSize(5); // 最大线程数
        executor.setQueueCapacity(100); // 队列容量
        executor.setThreadNamePrefix("redis-stream-"); // 线程名前缀
        executor.setDaemon(true); // 守护线程
        executor.initialize();
        return executor.getThreadPoolExecutor();
    }
}

6.处理消息告警机制

当消息堵塞达到一定,则出发告警,提醒需要扩容实例服务


package com.lijw.mp.config.redisstream;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

/**
 * 待处理消息告警服务
 * 定时检查Redis Stream中未被确认(ACK)的消息,仅用于监控和告警
 * 注意:不进行实际的消息处理,通过增加消费实例来提升处理能力
 */
@Component
@Slf4j
public class PendingMessageAlertService {

    /**
     * Redis字符串模板
     */
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    /**
     * Stream消费组配置属性
     */
    @Autowired
    private StreamGroupProperties streamGroupProperties;


    /**
     * 定时监控未确认的消息
     * 执行时机:每30秒执行一次(通过@Scheduled注解配置)
     * 仅用于监控和告警,不进行实际的消息处理
     * 建议通过增加消费实例来提升处理能力
     */
    @Scheduled(fixedDelay = 30000) // 每30秒执行一次
    public void monitorPendingMessages() {
        if (streamGroupProperties.getGroups() == null || streamGroupProperties.getGroups().isEmpty()) {
            return;
        }
        
        // 遍历所有配置的消费组
        for (StreamGroupProperties.StreamGroupConfig groupConfig : streamGroupProperties.getGroups()) {
            try {
                // 获取待处理消息摘要
                PendingMessagesSummary pendingSummary = stringRedisTemplate.opsForStream()
                        .pending(groupConfig.getKey(), groupConfig.getGroup());

                if (pendingSummary != null && pendingSummary.getTotalPendingMessages() > 0) {
                    // 记录监控信息
                    logPendingMessageInfo(pendingSummary, groupConfig);
                }
            } catch (Exception e) {
                log.error("监控待处理消息异常 - Stream: {}, Group: {}", 
                        groupConfig.getKey(), groupConfig.getGroup(), e);
            }
        }
    }

    /**
     * 记录pending消息监控信息
     * 根据pending消息数量进行不同级别的告警
     * 
     * @param pendingSummary 待处理消息摘要
     * @param groupConfig 消费组配置
     */
    private void logPendingMessageInfo(PendingMessagesSummary pendingSummary, 
                                      StreamGroupProperties.StreamGroupConfig groupConfig) {
        long totalPending = pendingSummary.getTotalPendingMessages();
        
        // 根据pending消息数量进行分级告警
        if (totalPending > 2) {
            // 严重告警:pending消息数量超过1000
            log.error("【严重告警】待处理消息数量过多 - Stream: {}, Group: {}, 数量: {}, " +
                    "建议:1.检查消费者是否正常运行 2.增加消费实例 3.检查消息处理逻辑", 
                    groupConfig.getKey(), groupConfig.getGroup(), totalPending);
        } else if (totalPending > 1) {
            // 警告:pending消息数量超过100
            log.warn("【警告】待处理消息数量较多 - Stream: {}, Group: {}, 数量: {}, " +
                    "建议:考虑增加消费实例或检查消息处理性能", 
                    groupConfig.getKey(), groupConfig.getGroup(), totalPending);
        } else {
            // 信息:pending消息数量正常
            log.info("监控待处理消息 - Stream: {}, Group: {}, 数量: {}", 
                    groupConfig.getKey(), groupConfig.getGroup(), totalPending);
        }
    }

}

7.REST控制器


package com.lijw.mp.controller;

import com.lijw.mp.event.MessageProducerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.util.HashMap;
import java.util.Map;

/**
 * 测试发送redis stream事件消息
 */
@RestController
@RequestMapping("/api/message")
@Slf4j
public class MessageController {

    @Autowired
    private MessageProducerService messageProducerService;

    /**
     * 发送订单消息
     */
    @PostMapping("/send-order")
    public ResponseEntity<Map<String, Object>> sendOrderMessage(
            @RequestParam String orderId,
            @RequestParam String amount,
            @RequestParam(required = false) String streamKey) {

        Map<String, String> message = new HashMap<>();
        message.put("orderId", orderId);
        message.put("amount", amount);
        message.put("messageType", "ORDER_CREATED");

        try {
            String messageId;
            if (streamKey != null && !streamKey.isEmpty()) {
                // 指定stream key发送
                messageId = messageProducerService.sendMessage(streamKey, "ORDER", message);
            } else {
                // 自动根据消息类型路由
                messageId = messageProducerService.sendMessage("ORDER", message);
            }

            Map<String, Object> result = new HashMap<>();
            result.put("success", true);
            result.put("messageId", messageId);
            result.put("streamKey", streamKey != null ? streamKey : "auto-routed");
            result.put("timestamp", System.currentTimeMillis());

            return ResponseEntity.ok(result);
        } catch (Exception e) {
            log.error("发送订单消息失败", e);

            Map<String, Object> result = new HashMap<>();
            result.put("success", false);
            result.put("error", e.getMessage());

            return ResponseEntity.status(500).body(result);
        }
    }

    /**
     * 发送支付消息
     */
    @PostMapping("/send-payment")
    public ResponseEntity<Map<String, Object>> sendPaymentMessage(
            @RequestParam String paymentId,
            @RequestParam String amount,
            @RequestParam String status,
            @RequestParam(required = false) String streamKey) {

        Map<String, String> message = new HashMap<>();
        message.put("paymentId", paymentId);
        message.put("amount", amount);
        message.put("status", status);
        message.put("messageType", "PAYMENT_" + status.toUpperCase());

        try {
            String messageId;
            if (streamKey != null && !streamKey.isEmpty()) {
                messageId = messageProducerService.sendMessage(streamKey, "PAYMENT", message);
            } else {
                messageId = messageProducerService.sendMessage("PAYMENT", message);
            }

            Map<String, Object> result = new HashMap<>();
            result.put("success", true);
            result.put("messageId", messageId);
            result.put("streamKey", streamKey != null ? streamKey : "auto-routed");
            result.put("timestamp", System.currentTimeMillis());

            return ResponseEntity.ok(result);
        } catch (Exception e) {
            log.error("发送支付消息失败", e);

            Map<String, Object> result = new HashMap<>();
            result.put("success", false);
            result.put("error", e.getMessage());

            return ResponseEntity.status(500).body(result);
        }
    }

    /**
     * 发送通知消息
     */
    @PostMapping("/send-notification")
    public ResponseEntity<Map<String, Object>> sendNotificationMessage(
            @RequestParam String userId,
            @RequestParam String title,
            @RequestParam String content,
            @RequestParam(required = false) String streamKey) {

        Map<String, String> message = new HashMap<>();
        message.put("userId", userId);
        message.put("title", title);
        message.put("content", content);
        message.put("messageType", "NOTIFICATION");

        try {
            String messageId;
            if (streamKey != null && !streamKey.isEmpty()) {
                messageId = messageProducerService.sendMessage(streamKey, "NOTIFICATION", message);
            } else {
                messageId = messageProducerService.sendMessage("NOTIFICATION", message);
            }

            Map<String, Object> result = new HashMap<>();
            result.put("success", true);
            result.put("messageId", messageId);
            result.put("streamKey", streamKey != null ? streamKey : "auto-routed");
            result.put("timestamp", System.currentTimeMillis());

            return ResponseEntity.ok(result);
        } catch (Exception e) {
            log.error("发送通知消息失败", e);

            Map<String, Object> result = new HashMap<>();
            result.put("success", false);
            result.put("error", e.getMessage());

            return ResponseEntity.status(500).body(result);
        }
    }

    /**
     * 发送自定义消息(支持指定stream key和消息类型)
     */
    @PostMapping("/send-custom")
    public ResponseEntity<Map<String, Object>> sendCustomMessage(
            @RequestBody Map<String, String> messageData,
            @RequestParam(required = false) String streamKey,
            @RequestParam(required = false) String messageType) {

        try {
            // 如果未指定消息类型,从消息数据中获取或使用默认值
            String msgType = messageType != null ? messageType : 
                    messageData.getOrDefault("messageType", "CUSTOM");

            String messageId;
            if (streamKey != null && !streamKey.isEmpty()) {
                // 指定stream key发送
                messageId = messageProducerService.sendMessage(streamKey, msgType, messageData);
            } else {
                // 自动根据消息类型路由
                messageId = messageProducerService.sendMessage(msgType, messageData);
            }

            Map<String, Object> result = new HashMap<>();
            result.put("success", true);
            result.put("messageId", messageId);
            result.put("streamKey", streamKey != null ? streamKey : "auto-routed");
            result.put("messageType", msgType);
            result.put("timestamp", System.currentTimeMillis());

            return ResponseEntity.ok(result);
        } catch (Exception e) {
            log.error("发送自定义消息失败", e);

            Map<String, Object> result = new HashMap<>();
            result.put("success", false);
            result.put("error", e.getMessage());

            return ResponseEntity.status(500).body(result);
        }
    }
}
7.1 测试发送订单消息

Spring Boot集成Redis Stream消息队列:从入门到实战


POST http://localhost:8083/api/message/send-order

# 请求参数
streamKey:order_stream
orderId:order123456
amount:500
status:ok
7.2 测试发送支付消息

Spring Boot集成Redis Stream消息队列:从入门到实战


POST http://localhost:8083/api/message/send-payment

# 请求参数
streamKey:payment_stream
paymentId:payId123456
amount:500
status:ok
7.3 测试发送通知消息

Spring Boot集成Redis Stream消息队列:从入门到实战


POST http://localhost:8083/api/message/send-notification

# 请求参数
streamKey:notification_stream
userId:userId123456
title:通知消息
content:通知内容
7.4 测试发送通知消息

Spring Boot集成Redis Stream消息队列:从入门到实战


POST http://localhost:8083/api/message/send-custom

# 请求参数
streamKey:custom_stream

# 请求体
{
    "key1": "value1",
    "key2": "valuie2"
}

8.启动多个SpringBoot实例,测试实例是否会重复处理事件

跟案例1的操作一致,以发送订单消息的事件作为验证示例

8.1 通过修改端口号,启动多个实例

实例1


server:
  port: 8083

实例2


server:
  port: 8084
8.2 发送订单事件消息,查看实例日志,确认事件未被重复消费

实例1
Spring Boot集成Redis Stream消息队列:从入门到实战

实例2
Spring Boot集成Redis Stream消息队列:从入门到实战
可以看到只有一个实例处理
1764865303698-0
事件,并没有被重复消费

9.触发多个消息,造成堵塞,检查告警日志

Spring Boot集成Redis Stream消息队列:从入门到实战
模拟大量出现订阅消息,导致事件堵塞的情况。

10. Redis 查看队列消息

10.1 查看当前redis的所有keys

# 查看当前redis的所有keys
172.17.0.6:6379> KEYS *
1) "payment_stream"
2) "mystream"
3) "notification_stream"
4) "custom_stream"
5) "order_stream"
172.17.0.6:6379> 
10.2 查询所有order队列消息

XRANGE order_stream - +

# 查询消息如下:
140) 1) "1764898453817-0"
     2)  1) "amount"
         2) "500"
         3) "messageType"
         4) "ORDER"
         5) "orderId"
         6) "order123456"
         7) "messageId"
         8) "98e48bde-88df-46c1-a0a1-ac917ac14fa6"
         9) "timestamp"
        10) "1764898453753"
141) 1) "1764898468568-0"
     2)  1) "amount"
         2) "500"
         3) "messageType"
         4) "ORDER"
         5) "orderId"
         6) "order123456"
         7) "messageId"
         8) "93978d1c-d18e-4760-8657-8cf0042906a2"
         9) "timestamp"
        10) "1764898468507"
172.17.0.6:6379> 
10.3 分页查询,每次返回3条

# 分页查询,每次返回3条
XRANGE order_stream - + COUNT 3

# 执行如下:
172.17.0.6:6379> XRANGE order_stream - + COUNT 3
1) 1) "1764268213474-0"
   2) 1) "source"
      2) "TEST_CONTROLLER"
      3) "type"
      4) "TEST_MESSAGE"
      5) "content"
      6) "test msg"
      7) "timestamp"
      8) "1764268202906"
2) 1) "1764268564758-0"
   2)  1) "amount"
       2) "500"
       3) "messageType"
       4) "ORDER"
       5) "orderId"
       6) "order123456"
       7) "messageId"
       8) "eb1a1fc1-12a9-4a80-84f4-f219a05094b5"
       9) "timestamp"
      10) "1764268564695"
3) 1) "1764268699963-0"
   2)  1) "amount"
       2) "500"
       3) "messageType"
       4) "ORDER"
       5) "orderId"
       6) "order123456"
       7) "messageId"
       8) "419c1c9a-7bcb-4f4e-b08c-70e86c5e7387"
       9) "timestamp"
      10) "1764268693593"
172.17.0.6:6379> 
10.4 反向查询(从新到旧)

# 反向查询(从新到旧)
XREVRANGE order_stream + - COUNT 3
10.5 查询队列总数

172.17.0.6:6379> XLEN order_stream
(integer) 141
172.17.0.6:6379> 
11. 总结

上面的项目是通过
StringRedisTemplate
做队列消息的读取,有些时候项目不能直接使用。


@Autowired
private StringRedisTemplate stringRedisTemplate;

所以下一章节,我将会使用
RedissonClient
来做替代实现的方法,增加符合生产环境使用,并且还会增加消息队列总数的监控、查询消息队列总数接口、清除消息队列的运维接口。

四、SpringBoot 使用 RedissonClient 集成 Redis Stream 进阶:配置多个消费组

1. 添加依赖


<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.7.18</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<!-- Spring Boot 2.7.18 兼容的 redisson 版本 -->
<!-- 注意:Redisson 3.20.1需要Spring Boot 3.x,Spring Boot 2.7使用3.17.x版本 -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.17.7</version>
</dependency>

2.配置文件

配置多个消费组key,跟上一个案例的配置一致


# redis stream 配置多个消费组key
app:
  stream:
    groups:
      - key: "order_stream"
        group: "order_group"
        consumer-prefix: "consumer_"
      - key: "payment_stream"
        group: "payment_group"
        consumer-prefix: "consumer_"
      - key: "notification_stream"
        group: "notification_group"
        consumer-prefix: "consumer_"
        
spring:
  # redisson配置
  redisson:
    config: |
      singleServerConfig:
        address: "redis://127.0.0.1:6379"
        database: 0
        # 连接超时时间(毫秒),默认3000
        connectionTimeout: 10000
        # 命令执行超时时间(毫秒),默认3000,增加超时时间避免PING超时
        timeout: 10000
        # 空闲连接超时时间(毫秒),默认10000
        idleConnectionTimeout: 10000
        # PING连接间隔(毫秒),默认0(禁用),设置为0可以禁用PING避免超时
        # 如果Redis服务器稳定,可以设置为0禁用PING
        pingConnectionInterval: 0
        # 重试次数
        retryAttempts: 3
        # 重试间隔(毫秒)
        retryInterval: 1500
        # 保持连接活跃
        keepAlive: true
        # TCP无延迟
        tcpNoDelay: true
        # 连接池配置
        connectionPoolSize: 64
        connectionMinimumIdleSize: 24
        # 如果Redis设置了密码,取消下面的注释并填写密码
        # password: your_password

3.Redis Stream消费组配置属性


package com.lijw.mp.config.redisstream;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * Redis Stream消费组配置属性
 * - 读取 app.stream 配置前缀下的所有配置项
 *
 * @author Aron.li
 * @date 2025/11/30 11:21
 */
@Data
@Component
@ConfigurationProperties(prefix = "app.stream")
public class StreamGroupProperties {

    /**
     * 消费组列表
     */
    private List<StreamGroupConfig> groups;

    /**
     * 单个消费组配置
     */
    @Data
    public static class StreamGroupConfig {
        /**
         * Stream键名
         */
        private String key;
        
        /**
         * 消费组名称
         */
        private String group;
        
        /**
         * 消费者名称前缀
         */
        private String consumerPrefix;
    }
}

4. 消息生产者服务


package com.lijw.mp.event;

import com.lijw.mp.config.redisstream.StreamGroupProperties;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RStream;
import org.redisson.api.RedissonClient;
import org.redisson.api.StreamMessageId;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Map;
import java.util.UUID;

/**
 * Redis Stream消息生产者服务
 * 负责向Redis Stream发送消息
 */
@Service
@Slf4j
public class MessageProducerService {

    /**
     * Redisson客户端
     */
    @Autowired
    private RedissonClient redissonClient;

    /**
     * Stream消费组配置属性
     */
    @Autowired
    private StreamGroupProperties streamGroupProperties;

    /**
     * 发送消息到Redis Stream(根据消息类型自动选择stream)
     * 
     * @param messageType 消息类型
     * @param data 消息数据
     * @return 消息ID
     */
    public String sendMessage(String messageType, Map<String, String> data) {
        // 根据消息类型自动选择stream key
        String streamKey = getStreamKeyByMessageType(messageType);
        return sendMessage(streamKey, messageType, data);
    }

    /**
     * 发送消息到指定的Redis Stream
     * 
     * @param streamKey Stream键名
     * @param messageType 消息类型
     * @param data 消息数据
     * @return 消息ID
     */
    /**
     * 发送消息到指定的Redis Stream
     * 
     * @param streamKey Stream键名
     * @param messageType 消息类型
     * @param data 消息数据
     * @return 消息ID
     */
    public String sendMessage(String streamKey, String messageType, Map<String, String> data) {
        try {
            // 添加消息类型和时间戳
            data.put("messageType", messageType);
            data.put("timestamp", String.valueOf(System.currentTimeMillis()));
            data.put("messageId", UUID.randomUUID().toString());

            // 使用Redisson发送消息
            RStream<String, String> stream = redissonClient.getStream(streamKey);
            // Redisson 3.17.7中,add方法直接接受Map参数
            StreamMessageId messageId = stream.addAll(data);

            log.info("消息发送成功 - Stream: {}, MessageId: {}", streamKey, messageId);
            return messageId.toString();
        } catch (Exception e) {
            log.error("消息发送失败 - Stream: {}", streamKey, e);
            throw new RuntimeException("消息发送失败", e);
        }
    }

    /**
     * 根据消息类型获取对应的stream key
     * 
     * @param messageType 消息类型
     * @return Stream键名
     */
    private String getStreamKeyByMessageType(String messageType) {
        if (streamGroupProperties.getGroups() == null || streamGroupProperties.getGroups().isEmpty()) {
            throw new RuntimeException("未配置任何消费组");
        }

        // 根据消息类型前缀匹配stream key
        String upperMessageType = messageType.toUpperCase();
        if (upperMessageType.contains("ORDER")) {
            return findStreamKey("order");
        } else if (upperMessageType.contains("PAYMENT") || upperMessageType.contains("PAY")) {
            return findStreamKey("payment");
        } else if (upperMessageType.contains("NOTIFICATION") || upperMessageType.contains("NOTIFY")) {
            return findStreamKey("notification");
        } else {
            // 默认使用第一个stream
            log.warn("未匹配到消息类型对应的stream,使用默认stream: {}", messageType);
            return streamGroupProperties.getGroups().get(0).getKey();
        }
    }

    /**
     * 根据关键字查找stream key
     * 
     * @param keyword 关键字
     * @return Stream键名
     */
    private String findStreamKey(String keyword) {
        if (streamGroupProperties.getGroups() != null) {
            for (StreamGroupProperties.StreamGroupConfig groupConfig : streamGroupProperties.getGroups()) {
                if (groupConfig.getKey().toLowerCase().contains(keyword.toLowerCase())) {
                    return groupConfig.getKey();
                }
            }
        }
        // 如果找不到,返回第一个stream
        if (streamGroupProperties.getGroups() != null && !streamGroupProperties.getGroups().isEmpty()) {
            return streamGroupProperties.getGroups().get(0).getKey();
        }
        throw new RuntimeException("未找到匹配的stream key: " + keyword);
    }
}

5. 消息消费者服务(支持幂等性)


package com.lijw.mp.event;

import com.lijw.mp.config.redisstream.StreamGroupProperties;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBucket;
import org.redisson.api.RStream;
import org.redisson.api.RedissonClient;
import org.redisson.api.StreamMessageId;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
 * Redis Stream消息消费者服务
 * 负责消费Redis Stream中的消息并进行业务处理
 */
@Component
@Slf4j
public class MessageConsumerService {

    /**
     * Redisson客户端
     */
    @Autowired
    private RedissonClient redissonClient;

    /**
     * Stream消费组配置属性
     */
    @Autowired
    private StreamGroupProperties streamGroupProperties;

    /**
     * 缓存stream key到group name的映射关系
     */
    private final Map<String, String> streamToGroupMap = new ConcurrentHashMap<>();

    /**
     * 消息消费处理方法
     * 当Redis Stream中有新消息时,会调用此方法
     * 
     * 多实例部署安全性:
     * 1. Redis Stream Consumer Group 保证每条消息只会被一个消费者读取
     * 2. 分布式锁防止多实例同时处理同一条消息
     * 3. 幂等性检查作为双重保障
     * 
     * @param streamKey Stream键名
     * @param messageId 消息ID
     * @param messageBody 消息内容
     * @param groupName 消费组名称
     */
    public void onMessage(String streamKey, String messageId, Map<String, String> messageBody, String groupName) {

        if (groupName == null) {
            log.error("未找到stream key对应的消费组: {}, 消息ID: {}", streamKey, messageId);
            return;
        }

        // 幂等性检查:防止重复处理(第一道防线)
        if (isMessageProcessed(messageId)) {
            log.info("消息已处理,跳过: {}", messageId);
            acknowledgeMessage(streamKey, messageId, groupName);
            return;
        }

        // 分布式锁:防止多实例同时处理同一条消息(第二道防线)
        String lockKey = "message:process:lock:" + messageId;
        RBucket<String> lockBucket = redissonClient.getBucket(lockKey);
        boolean lockAcquired = lockBucket.trySet("1", 5, TimeUnit.MINUTES);

        if (!lockAcquired) {
            log.info("消息正在被其他实例处理,跳过: {}", messageId);
            return;
        }

        try {
            log.info("消费者收到消息 - Stream: {}, Group: {}, ID: {}, 内容: {}", streamKey, groupName, messageId, messageBody);

            // 再次幂等性检查(双重检查,防止在获取锁的过程中消息已被处理)
            if (isMessageProcessed(messageId)) {
                log.info("消息在处理过程中已被其他实例处理,跳过: {}", messageId);
                acknowledgeMessage(streamKey, messageId, groupName);
                return;
            }

            // 处理业务逻辑(根据stream key路由到不同的业务处理)
            boolean processSuccess = processBusiness(streamKey, messageBody);

            if (processSuccess) {
                // 标记消息已处理
                markMessageProcessed(messageId);
                // 清除重试次数
                clearRetryCount(messageId);
                // 手动确认消息
                acknowledgeMessage(streamKey, messageId, groupName);
                log.info("消息处理完成: {}", messageId);
            } else {
                log.error("业务处理失败,消息将重试: {}", messageId);
                handleRetry(streamKey, messageId, messageBody, groupName, "业务处理失败");
            }

        } catch (Exception e) {
            log.error("消息处理异常: {}", messageId, e);
            handleRetry(streamKey, messageId, messageBody, groupName, "消息处理异常");
        } finally {
            // 释放分布式锁
            try {
                lockBucket.delete();
            } catch (Exception e) {
                log.warn("释放分布式锁失败: {}", messageId, e);
            }
        }
    }

    /**
     * 根据stream key获取对应的消费组名称
     * 
     * @param streamKey Stream键名
     * @return 消费组名称,如果未找到返回null
     */
    private String getGroupNameByStreamKey(String streamKey) {
        // 先从缓存中获取
        if (streamToGroupMap.containsKey(streamKey)) {
            return streamToGroupMap.get(streamKey);
        }
        
        // 从配置中查找
        if (streamGroupProperties.getGroups() != null) {
            for (StreamGroupProperties.StreamGroupConfig groupConfig : streamGroupProperties.getGroups()) {
                if (groupConfig.getKey().equals(streamKey)) {
                    streamToGroupMap.put(streamKey, groupConfig.getGroup());
                    return groupConfig.getGroup();
                }
            }
        }
        
        return null;
    }

    /**
     * 幂等性检查
     * 使用Redis存储,判断redis是否已存在处理key,如果存在则说明消息已处理,确保多实例间幂等性
     * 
     * @param messageId 消息ID
     * @return 如果消息已处理返回true,否则返回false
     */
    private boolean isMessageProcessed(String messageId) {
        RBucket<String> bucket = redissonClient.getBucket("processed:" + messageId);
        return bucket.isExists();
    }

    /**
     * 标记消息已处理
     * 使用Redis存储事件处理ID,设置过期时间
     * 
     * @param messageId 消息ID
     */
    private void markMessageProcessed(String messageId) {
        RBucket<String> bucket = redissonClient.getBucket("processed:" + messageId);
        bucket.set("1", 24, TimeUnit.HOURS);
    }

    /**
     * 业务处理逻辑(根据stream key路由到不同的业务处理)
     */
    private boolean processBusiness(String streamKey, Map<String, String> messageBody) {
        try {
            // 根据stream key路由到不同的业务处理
            if (streamKey.contains("order")) {
                return processOrderBusiness(messageBody);
            } else if (streamKey.contains("payment")) {
                return processPaymentBusiness(messageBody);
            } else if (streamKey.contains("notification")) {
                return processNotificationBusiness(messageBody);
            } else {
                log.warn("未识别的stream key: {}, 使用默认处理逻辑", streamKey);
                return processDefaultBusiness(messageBody);
            }
        } catch (Exception e) {
            log.error("业务处理异常", e);
            return false;
        }
    }

    /**
     * 处理订单业务
     */
    private boolean processOrderBusiness(Map<String, String> messageBody) {
        String messageType = messageBody.get("messageType");
        String orderId = messageBody.get("orderId");
        log.info("处理订单{}消息,订单ID: {}", messageType, orderId);
        // TODO: 实现订单业务处理逻辑
//        try {
//            // 模拟10秒处理业务
//            Thread.sleep(100000);
//        } catch (InterruptedException e) {
//            throw new RuntimeException(e);
//        }
//        throw new RuntimeException("订单业务处理异常");

        return true;
    }

    /**
     * 处理支付业务
     */
    private boolean processPaymentBusiness(Map<String, String> messageBody) {
        String messageType = messageBody.get("messageType");
        String paymentId = messageBody.get("paymentId");
        log.info("处理支付{}消息,支付ID: {}", messageType, paymentId);
        // TODO: 实现支付业务处理逻辑
        return true;
    }

    /**
     * 处理通知业务
     */
    private boolean processNotificationBusiness(Map<String, String> messageBody) {
        String messageType = messageBody.get("messageType");
        String userId = messageBody.get("userId");
        log.info("处理通知{}消息,用户ID: {}", messageType, userId);
        // TODO: 实现通知业务处理逻辑
        return true;
    }

    /**
     * 默认业务处理
     */
    private boolean processDefaultBusiness(Map<String, String> messageBody) {
        String messageType = messageBody.get("messageType");
        log.info("处理默认业务消息: {}", messageType);
        // TODO: 实现默认业务处理逻辑
        return true;
    }

    /**
     * 手动确认消息
     * 
     * @param streamKey Stream键名
     * @param messageId 消息ID
     * @param groupName 消费组名称
     */
    private void acknowledgeMessage(String streamKey, String messageId, String groupName) {
        try {
            RStream<String, String> stream = redissonClient.getStream(streamKey);
            // 解析消息ID字符串为StreamMessageId
            StreamMessageId streamMessageId = parseStreamMessageId(messageId);
            stream.ack(groupName, streamMessageId);
        } catch (Exception e) {
            log.error("消息确认失败: {}", messageId, e);
        }
    }

    /**
     * 解析消息ID字符串为StreamMessageId对象
     * 
     * @param messageId 消息ID字符串,格式如 "1234567890-0"
     * @return StreamMessageId对象
     */
    private org.redisson.api.StreamMessageId parseStreamMessageId(String messageId) {
        // StreamMessageId格式为 "timestamp-sequence"
        String[] parts = messageId.split("-");
        if (parts.length == 2) {
            return new org.redisson.api.StreamMessageId(Long.parseLong(parts[0]), Long.parseLong(parts[1]));
        }
        // 如果格式不正确,尝试直接解析
        return new org.redisson.api.StreamMessageId(Long.parseLong(messageId), 0);
    }

    /**
     * 增加重试次数
     * 
     * @param messageId 消息ID
     * @return 当前重试次数
     */
    private int incrementRetryCount(String messageId) {
        String retryKey = "retry:count:" + messageId;
        RBucket<String> bucket = redissonClient.getBucket(retryKey);
        String countStr = bucket.get();
        int retryCount = countStr == null ? 0 : Integer.parseInt(countStr);
        retryCount++;
        // 设置重试次数,过期时间为24小时
        bucket.set(String.valueOf(retryCount), 24, TimeUnit.HOURS);
        return retryCount;
    }

    /**
     * 清除重试次数
     * 
     * @param messageId 消息ID
     */
    private void clearRetryCount(String messageId) {
        String retryKey = "retry:count:" + messageId;
        RBucket<String> bucket = redissonClient.getBucket(retryKey);
        bucket.delete();
    }

    /**
     * 处理消息重试逻辑
     * 
     * @param streamKey Stream键名
     * @param messageId 消息ID
     * @param messageBody 消息内容
     * @param groupName 消费组名称
     * @param errorDescription 错误描述
     */
    private void handleRetry(String streamKey, String messageId, Map<String, String> messageBody, 
                             String groupName, String errorDescription) {
        // 记录重试次数
        int retryCount = incrementRetryCount(messageId);
        // 检查是否超过最大重试次数
        int maxRetryCount = 3; // 最大重试次数,可根据业务需求调整
        if (retryCount >= maxRetryCount) {
            log.error("消息{}重试次数已达上限({}),将停止重试并记录: {}", errorDescription, maxRetryCount, messageId);
            // 记录失败消息到死信队列或告警(可根据业务需求实现)
            handleMaxRetryExceeded(messageId, messageBody, retryCount);
            // 确认消息,避免无限重试
            acknowledgeMessage(streamKey, messageId, groupName);
            return;
        }
        
        log.warn("消息{},当前重试次数: {}/{}, 消息ID: {}", errorDescription, retryCount, maxRetryCount, messageId);
        // 异步延迟重新处理消息
        scheduleRetry(streamKey, messageId, messageBody, groupName, retryCount);
    }

    /**
     * 调度消息重试
     * 使用异步方式延迟重新处理消息,避免立即重试
     * 
     * @param streamKey Stream键名
     * @param messageId 消息ID
     * @param messageBody 消息内容
     * @param groupName 消费组名称
     * @param retryCount 当前重试次数
     */
    private void scheduleRetry(String streamKey, String messageId, Map<String, String> messageBody, 
                               String groupName, int retryCount) {
        // 计算延迟时间:重试次数越多,延迟时间越长(指数退避策略)
        long delaySeconds = (long) Math.pow(2, retryCount - 1);
        log.info("消息将在{}秒后重新处理,消息ID: {}", delaySeconds, messageId);
        
        // 异步延迟重新处理消息
        CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(delaySeconds * 1000);
                log.info("开始重新处理消息,消息ID: {}", messageId);
                // 重新调用onMessage处理消息
                onMessage(streamKey, messageId, messageBody, groupName);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.warn("消息重试被中断,消息ID: {}", messageId);
            } catch (Exception e) {
                log.error("消息重试处理异常,消息ID: {}", messageId, e);
            }
        });
    }

    /**
     * 处理超过最大重试次数的消息
     * 记录失败消息详情,可根据业务需求扩展(如存储到数据库、发送告警等)
     * 
     * @param messageId 消息ID
     * @param messageBody 消息内容
     * @param retryCount 重试次数
     */
    private void handleMaxRetryExceeded(String messageId, Map<String, String> messageBody, int retryCount) {
        try {
            // 记录失败消息详情(可根据业务需求实现,如存储到数据库、发送告警等)
            String failedKey = "failed:message:" + messageId;
            String failedInfo = String.format("消息ID: %s, 重试次数: %d, 消息内容: %s, 失败时间: %s",
                    messageId, retryCount, messageBody, System.currentTimeMillis());
            RBucket<String> bucket = redissonClient.getBucket(failedKey);
            bucket.set(failedInfo, 7, TimeUnit.DAYS);
            log.error("失败消息已记录: {}", failedInfo);
            // TODO: 可根据业务需求添加其他处理逻辑,如:
            // 1. 发送告警通知
            // 2. 存储到数据库死信表
            // 3. 发送到死信队列
        } catch (Exception e) {
            log.error("处理超过最大重试次数消息异常: {}", messageId, e);
        }
    }
}

6. 消费者容器配置(支持多实例负载均衡)


package com.lijw.mp.config.redisstream;

import com.lijw.mp.event.MessageConsumerService;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RStream;
import org.redisson.api.RedissonClient;
import org.redisson.api.StreamMessageId;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.net.InetAddress;
import java.lang.management.ManagementFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * Redis Stream配置类
 * 负责创建消费组、配置消息监听容器等
 * 使用Redisson实现消息消费
 */
@Configuration
@Slf4j
public class RedisStreamConfig {

    /**
     * Redisson客户端
     */
    @Autowired
    private RedissonClient redissonClient;

    /**
     * Stream消费组配置属性
     */
    @Autowired
    private StreamGroupProperties streamGroupProperties;

    /**
     * 消息消费者服务
     */
    @Autowired
    private MessageConsumerService messageConsumerService;

    /**
     * 消费者线程池
     */
    private ExecutorService consumerExecutor;

    /**
     * 应用启动完成后,初始化消费组并启动消费者
     */
    @EventListener(ApplicationReadyEvent.class)
    public void initStreamConsumers() {
        if (streamGroupProperties.getGroups() == null || streamGroupProperties.getGroups().isEmpty()) {
            log.warn("未配置任何消费组");
            return;
        }

        // 创建消费者线程池
        consumerExecutor = createConsumerThreadPool();

        // 创建所有消费组
        createAllConsumerGroups();

        // 启动所有消费者
        startAllConsumers();
    }

    /**
     * 确保stream存在,如果不存在则创建
     * 
     * @param streamKey Stream键名
     */
    private void ensureStreamExists(String streamKey) {
        try {
            RStream<String, String> stream = redissonClient.getStream(streamKey);
            // 检查stream是否存在
            Long size = stream.size();
            if (size == null || size == 0) {
                // Stream不存在,创建一条临时消息来初始化stream
                Map<String, String> tempData = new HashMap<>();
                tempData.put("_init", "true");
                tempData.put("_timestamp", String.valueOf(System.currentTimeMillis()));
                stream.addAll(tempData);
                log.debug("创建stream成功: {}", streamKey);
            }
        } catch (Exception e) {
            // Stream不存在,创建它
            try {
                RStream<String, String> stream = redissonClient.getStream(streamKey);
                Map<String, String> tempData = new HashMap<>();
                tempData.put("_init", "true");
                tempData.put("_timestamp", String.valueOf(System.currentTimeMillis()));
                stream.addAll(tempData);
                log.info("创建stream成功: {}", streamKey);
            } catch (Exception ex) {
                log.warn("创建stream失败: {}, {}", streamKey, ex.getMessage());
            }
        }
    }

    /**
     * 启动所有消费者
     */
    private void startAllConsumers() {
        if (streamGroupProperties.getGroups() == null || streamGroupProperties.getGroups().isEmpty()) {
            log.warn("未配置任何消费组,无法启动消费者");
            return;
        }

        for (StreamGroupProperties.StreamGroupConfig groupConfig : streamGroupProperties.getGroups()) {
            startConsumer(groupConfig);
        }
    }

    /**
     * 创建所有消费者组(如果不存在)
     * 如果stream不存在,会先创建stream
     */
    private void createAllConsumerGroups() {
        if (streamGroupProperties.getGroups() == null || streamGroupProperties.getGroups().isEmpty()) {
            log.warn("未配置任何消费组");
            return;
        }
        
        for (StreamGroupProperties.StreamGroupConfig groupConfig : streamGroupProperties.getGroups()) {
            try {
                // 先检查stream是否存在,如果不存在则创建
                ensureStreamExists(groupConfig.getKey());
                
                // 创建消费组
                RStream<String, String> stream = redissonClient.getStream(groupConfig.getKey());
                stream.createGroup(groupConfig.getGroup(), StreamMessageId.ALL);
                log.info("创建消费者组成功: stream={}, group={}", groupConfig.getKey(), groupConfig.getGroup());
            } catch (Exception e) {
                // 如果消费组已存在,这是正常情况
                String errorMsg = e.getMessage() != null ? e.getMessage() : "";
                if (errorMsg.contains("BUSYGROUP") || errorMsg.contains("already exists")) {
                    log.info("消费者组已存在: stream={}, group={}", groupConfig.getKey(), groupConfig.getGroup());
                } else {
                    log.warn("创建消费者组失败: stream={}, group={}, error={}", 
                            groupConfig.getKey(), groupConfig.getGroup(), errorMsg);
                    // 即使创建失败,也继续处理其他消费组
                }
            }
        }
    }

    /**
     * 启动单个消费组的消费者
     * 
     * @param groupConfig 消费组配置
     */
    private void startConsumer(StreamGroupProperties.StreamGroupConfig groupConfig) {
        // 为每个实例生成唯一消费者名称
        String consumerName = generateUniqueConsumerName(groupConfig.getConsumerPrefix());

        // 启动消费者线程
        consumerExecutor.submit(() -> {
            consumeMessages(groupConfig, consumerName);
        });

        log.info("Redis Stream消费者启动成功 - Stream: {}, Group: {}, Consumer: {}", 
                groupConfig.getKey(), groupConfig.getGroup(), consumerName);
    }

    /**
     * 消费消息的循环方法
     * 
     * @param groupConfig 消费组配置
     * @param consumerName 消费者名称
     */
    private void consumeMessages(StreamGroupProperties.StreamGroupConfig groupConfig, String consumerName) {
        String streamKey = groupConfig.getKey();
        String groupName = groupConfig.getGroup();
        RStream<String, String> stream = redissonClient.getStream(streamKey);

        log.info("开始消费消息 - Stream: {}, Group: {}, Consumer: {}", streamKey, groupName, consumerName);

        while (!Thread.currentThread().isInterrupted()) {
            try {
                // 读取消息(从未确认的消息开始,读取最多10条)
                Map<StreamMessageId, Map<String, String>> messages = stream.readGroup(
                        groupName, 
                        consumerName,
                        10
                );

                if (messages != null && !messages.isEmpty()) {
                    // 处理每条消息
                    for (Map.Entry<StreamMessageId, Map<String, String>> entry : messages.entrySet()) {
                        StreamMessageId messageId = entry.getKey();
                        Map<String, String> messageBody = entry.getValue();
                        
                        // 调用消费者服务处理消息
                        messageConsumerService.onMessage(streamKey, messageId.toString(), messageBody, groupName);
                    }
                } else {
                    // 没有消息时,短暂休眠避免CPU空转
                    Thread.sleep(100);
                }
            } catch (InterruptedException e) {
                log.info("消费者线程被中断 - Stream: {}, Group: {}, Consumer: {}", streamKey, groupName, consumerName);
                Thread.currentThread().interrupt();
                break;
            } catch (Exception e) {
                log.error("消费消息异常 - Stream: {}, Group: {}, Consumer: {}", 
                        streamKey, groupName, consumerName, e);
                // 发生异常时短暂休眠,避免快速重试导致CPU占用过高
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }

        log.info("消费者线程结束 - Stream: {}, Group: {}, Consumer: {}", streamKey, groupName, consumerName);
    }

    /**
     * 生成唯一消费者名称(支持多实例部署的关键)
     * 使用IP+进程ID确保集群环境下唯一性
     * 
     * @param prefix 消费者名称前缀
     * @return 唯一的消费者名称
     */
    private String generateUniqueConsumerName(String prefix) {
        try {
            String hostAddress = InetAddress.getLocalHost().getHostAddress();
            String processId = ManagementFactory.getRuntimeMXBean().getName().split("@")[0];
            return (prefix != null ? prefix : "consumer_") + hostAddress + "_" + processId + "_" + System.currentTimeMillis();
        } catch (Exception e) {
            //  fallback:使用UUID
            return (prefix != null ? prefix : "consumer_") + UUID.randomUUID().toString().substring(0, 8);
        }
    }

    /**
     * 创建专用线程池
     * 用于处理Redis Stream消息消费
     * 
     * @return 线程池执行器
     */
    private ExecutorService createConsumerThreadPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2); // 核心线程数
        executor.setMaxPoolSize(10); // 最大线程数
        executor.setQueueCapacity(100); // 队列容量
        executor.setThreadNamePrefix("redis-stream-consumer-"); // 线程名前缀
        executor.setDaemon(false); // 非守护线程,确保应用关闭时能正常结束
        executor.initialize();
        return executor.getThreadPoolExecutor();
    }
}

7. 定时处理消息告警、重试pending消息


package com.lijw.mp.config.redisstream;

import com.lijw.mp.event.MessageConsumerService;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBucket;
import org.redisson.api.RStream;
import org.redisson.api.RedissonClient;
import org.redisson.api.StreamMessageId;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
 * 待处理消息重试服务
 * 定时检查并处理Redis Stream中未被确认(ACK)的消息
 * 
 * 生产环境特性:
 * 1. 定时扫描pending消息
 * 2. 读取pending消息并重新处理
 * 3. 复用MessageConsumerService进行业务处理
 * 4. 支持消息空闲时间检查和告警
 */
@Component
@Slf4j
public class PendingMessageRetryService {

    /**
     * Redisson客户端
     */
    @Autowired
    private RedissonClient redissonClient;

    /**
     * Stream消费组配置属性
     */
    @Autowired
    private StreamGroupProperties streamGroupProperties;

    /**
     * 消息消费者服务(用于重新处理pending消息)
     */
    @Autowired
    private MessageConsumerService messageConsumerService;


    /**
     * 定时监控各个消费队列的数量
     * 执行时机:每10秒执行一次
     */
    @Scheduled(fixedDelay = 10000)
    public void monitorQueueStats() {
        if (streamGroupProperties.getGroups() == null || streamGroupProperties.getGroups().isEmpty()) {
            return;
        }

        long totalMessages = 0;
        long totalPendingMessages = 0;
        StringBuilder statsBuilder = new StringBuilder("消息队列监控统计 | ");

        // 遍历所有配置的消费组
        for (StreamGroupProperties.StreamGroupConfig groupConfig : streamGroupProperties.getGroups()) {
            try {
                RStream<String, String> stream = redissonClient.getStream(groupConfig.getKey());
                
                // 获取总消息数
                Long messageCount = stream.size();
                messageCount = messageCount != null ? messageCount : 0L;
                
                // 获取pending消息数
                long pendingCount = getPendingMessageCount(groupConfig);
                
                // 计算已处理消息数
                long processedCount = messageCount - pendingCount;
                
                totalMessages += messageCount;
                totalPendingMessages += pendingCount;
                
                // 构建单条日志信息
                statsBuilder.append(String.format("[%s/%s: 总数=%d, Pending=%d, 已处理=%d] ", 
                        groupConfig.getKey(), 
                        groupConfig.getGroup(), 
                        messageCount, 
                        pendingCount, 
                        Math.max(0, processedCount)));
                
            } catch (Exception e) {
                log.error("监控Stream统计信息失败 - Stream: {}, Group: {}", 
                        groupConfig.getKey(), groupConfig.getGroup(), e);
                statsBuilder.append(String.format("[%s/%s: 统计失败] ", 
                        groupConfig.getKey(), groupConfig.getGroup()));
            }
        }
        
        // 添加总计信息
        statsBuilder.append(String.format("| 总计: 总数=%d, Pending=%d, 已处理=%d", 
                totalMessages, 
                totalPendingMessages, 
                Math.max(0, totalMessages - totalPendingMessages)));
        
        // 打印单条日志
        log.info(statsBuilder.toString());
    }

    /**
     * 定时处理未确认的消息
     * 执行时机:每30秒(30000)执行一次(通过@Scheduled注解配置)
     */
    @Scheduled(fixedDelay = 30000)
    public void retryPendingMessages() {
        if (streamGroupProperties.getGroups() == null || streamGroupProperties.getGroups().isEmpty()) {
            return;
        }

        // 遍历所有配置的消费组
        for (StreamGroupProperties.StreamGroupConfig groupConfig : streamGroupProperties.getGroups()) {
            try {
                // 获取待处理消息数量(通过读取pending消息来统计)
                long totalPending = getPendingMessageCount(groupConfig);
                
                if (totalPending > 0) {
                    log.info("发现待处理消息 - Stream: {}, Group: {}, 待处理数量: {}",
                            groupConfig.getKey(), groupConfig.getGroup(), totalPending);

                    // 根据pending消息数量进行告警
                    if (totalPending > 1000) {
                        log.error("【严重告警】待处理消息数量过多 - Stream: {}, Group: {}, 数量: {}, " +
                                "建议:1.检查消费者是否正常运行 2.增加消费实例 3.检查消息处理逻辑",
                                groupConfig.getKey(), groupConfig.getGroup(), totalPending);
                    } else if (totalPending > 100) {
                        log.warn("【警告】待处理消息数量较多 - Stream: {}, Group: {}, 数量: {}, " +
                                "建议:考虑增加消费实例或检查消息处理性能",
                                groupConfig.getKey(), groupConfig.getGroup(), totalPending);
                    }

                    // 读取并处理pending消息
                    readAndProcessPendingMessages(groupConfig);
                }
            } catch (Exception e) {
                log.error("处理待处理消息异常 - Stream: {}, Group: {}",
                        groupConfig.getKey(), groupConfig.getGroup(), e);
            }
        }
    }

    /**
     * 获取待处理消息数量
     * 通过读取pending消息来统计数量
     * 
     * @param groupConfig 消费组配置
     * @return 待处理消息数量
     */
    private long getPendingMessageCount(StreamGroupProperties.StreamGroupConfig groupConfig) {
        try {
            RStream<String, String> stream = redissonClient.getStream(groupConfig.getKey());
            String consumerName = "count-consumer";
            
            // 尝试读取pending消息(使用0作为起始ID,读取最多1000条来统计)
            Map<StreamMessageId, Map<String, String>> messages = stream.readGroup(
                    groupConfig.getGroup(), 
                    consumerName,
                    1000,
                    new StreamMessageId(0, 0)
            );
            
            return messages != null ? messages.size() : 0;
        } catch (Exception e) {
            log.warn("获取pending消息数量失败 - Stream: {}, Group: {}", 
                    groupConfig.getKey(), groupConfig.getGroup(), e);
            return 0;
        }
    }

    /**
     * 读取并处理pending消息
     * 使用XREADGROUP命令读取pending消息(使用0作为起始ID)
     * 
     * 在Redis Stream中,使用XREADGROUP GROUP group consumer STREAMS key 0
     * 可以读取该消费者组中所有pending的消息
     * 
     * @param groupConfig 消费组配置
     */
    private void readAndProcessPendingMessages(StreamGroupProperties.StreamGroupConfig groupConfig) {
        try {
            // 使用固定的消费者名称来读取pending消息
            String retryConsumerName = "retry-consumer";
            RStream<String, String> stream = redissonClient.getStream(groupConfig.getKey());
            
            log.info("尝试读取pending消息 - Stream: {}, Group: {}, Consumer: {}", 
                    groupConfig.getKey(), groupConfig.getGroup(), retryConsumerName);
            
            // 读取pending消息(使用0作为起始ID读取所有pending消息,最多100条)
            // 在Redis Stream中,0表示读取所有pending消息
            Map<StreamMessageId, Map<String, String>> messages = stream.readGroup(
                    groupConfig.getGroup(), 
                    retryConsumerName,
                    100,
                    new StreamMessageId(0, 0)
            );
            
            log.info("读取结果 - Stream: {}, Group: {}, Consumer: {}, 记录数量: {}", 
                    groupConfig.getKey(), groupConfig.getGroup(), retryConsumerName,
                    messages != null ? messages.size() : 0);
            
            if (messages != null && !messages.isEmpty()) {
                log.info("读取到{}条pending消息,开始重新处理 - Stream: {}, Group: {}", 
                        messages.size(), groupConfig.getKey(), groupConfig.getGroup());
                
                int successCount = 0;
                int failCount = 0;
                
                // 复用MessageConsumerService的业务处理方法
                for (Map.Entry<StreamMessageId, Map<String, String>> entry : messages.entrySet()) {
                    StreamMessageId streamMessageId = entry.getKey();
                    String messageId = streamMessageId.toString();
                    Map<String, String> messageBody = entry.getValue();
                    String lockKey = "pending:retry:lock:" + messageId;
                    
                    try {
                        // 使用分布式锁防止多实例重复处理
                        RBucket<String> lockBucket = redissonClient.getBucket(lockKey);
                        boolean lockAcquired = lockBucket.trySet("1", 5, TimeUnit.MINUTES);
                        
                        if (!lockAcquired) {
                            // 锁已被其他实例获取,跳过处理
                            log.info("pending消息正在被其他实例处理,跳过 - Stream: {}, Group: {}, MessageId: {}", 
                                    groupConfig.getKey(), groupConfig.getGroup(), messageId);
                            continue;
                        }
                        
                        // 幂等性检查:检查消息是否已被处理
                        if (isMessageProcessed(messageId)) {
                            log.info("pending消息已处理,跳过 - Stream: {}, Group: {}, MessageId: {}", 
                                    groupConfig.getKey(), groupConfig.getGroup(), messageId);
                            // 释放锁
                            lockBucket.delete();
                            continue;
                        }
                        
                        log.info("开始处理pending消息 - Stream: {}, Group: {}, MessageId: {}", 
                                groupConfig.getKey(), groupConfig.getGroup(), messageId);
                        log.debug("消息内容: {}", messageBody);
                        
                        // 复用MessageConsumerService的onMessage方法进行业务处理
                        messageConsumerService.onMessage(groupConfig.getKey(), messageId, messageBody, groupConfig.getGroup());
                        successCount++;
                        
                        log.info("pending消息重新处理成功 - Stream: {}, Group: {}, MessageId: {}", 
                                groupConfig.getKey(), groupConfig.getGroup(), messageId);
                        
                        // 处理成功后释放锁
                        lockBucket.delete();
                    } catch (Exception e) {
                        failCount++;
                        log.error("pending消息重新处理失败 - Stream: {}, Group: {}, MessageId: {}, 错误: {}", 
                                groupConfig.getKey(), groupConfig.getGroup(), messageId, 
                                e.getMessage(), e);
                        // 处理失败后释放锁,允许重试
                        try {
                            RBucket<String> lockBucket = redissonClient.getBucket(lockKey);
                            lockBucket.delete();
                        } catch (Exception ex) {
                            log.warn("释放锁失败: {}", ex.getMessage());
                        }
                    }
                }
                
                log.info("pending消息处理完成 - Stream: {}, Group: {}, 成功: {}, 失败: {}", 
                        groupConfig.getKey(), groupConfig.getGroup(), successCount, failCount);
            } else {
                log.warn("未读取到pending消息 - Stream: {}, Group: {}, 可能原因:1.消息已被其他消费者处理 2.消费者名称不匹配 3.消息已被ACK", 
                        groupConfig.getKey(), groupConfig.getGroup());
            }
        } catch (Exception e) {
            log.error("读取pending消息失败 - Stream: {}, Group: {}, 错误类型: {}, 错误消息: {}", 
                    groupConfig.getKey(), groupConfig.getGroup(), 
                    e.getClass().getSimpleName(), e.getMessage(), e);
        }
    }

    /**
     * 幂等性检查:检查消息是否已被处理
     * 与MessageConsumerService使用相同的幂等性检查机制
     * 
     * @param messageId 消息ID
     * @return 如果消息已处理返回true,否则返回false
     */
    private boolean isMessageProcessed(String messageId) {
        RBucket<String> bucket = redissonClient.getBucket("processed:" + messageId);
        return bucket.isExists();
    }
}

8.REST控制器


package com.lijw.mp.controller;

import com.lijw.mp.config.redisstream.StreamGroupProperties;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RStream;
import org.redisson.api.RedissonClient;
import org.redisson.api.StreamMessageId;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@RestController
@RequestMapping("/api/test/stream")
@Slf4j
public class StreamTestController {

    @Autowired
    private RedissonClient redissonClient;

    @Autowired
    private StreamGroupProperties streamGroupProperties;

    /**
     * 获取默认的stream key(订单stream)
     */
    private String getDefaultStreamKey() {
        if (streamGroupProperties.getGroups() != null && !streamGroupProperties.getGroups().isEmpty()) {
            // 优先查找order_stream,否则返回第一个
            for (StreamGroupProperties.StreamGroupConfig groupConfig : streamGroupProperties.getGroups()) {
                if (groupConfig.getKey().contains("order")) {
                    return groupConfig.getKey();
                }
            }
            return streamGroupProperties.getGroups().get(0).getKey();
        }
        throw new RuntimeException("未配置任何消费组");
    }

    /**
     * 发送JSON格式复杂消息
     */
    @PostMapping("/send-json")
    public ResponseEntity<Map<String, Object>> sendJsonMessage(
            @RequestBody Map<String, Object> messageData,
            @RequestParam(required = false) String streamKey) {

        try {
            // 如果未指定stream key,使用默认的
            if (streamKey == null || streamKey.isEmpty()) {
                streamKey = getDefaultStreamKey();
            }

            // 添加元数据
            messageData.put("timestamp", System.currentTimeMillis());
            messageData.put("messageType", "CUSTOM_EVENT");

            // 转换Map类型以适应Redis Template
            Map<String, String> stringMessage = new HashMap<>();
            messageData.forEach((k, v) -> stringMessage.put(k, v.toString()));

            RStream<String, String> stream = redissonClient.getStream(streamKey);
            StreamMessageId messageId = stream.addAll(stringMessage);

            Map<String, Object> result = new HashMap<>();
            result.put("success", true);
            result.put("messageId", messageId.toString());
            result.put("data", messageData);

            log.info("JSON消息发送成功: {}", messageId);
            return ResponseEntity.ok(result);

        } catch (Exception e) {
            log.error("发送JSON消息失败", e);

            Map<String, Object> result = new HashMap<>();
            result.put("success", false);
            result.put("error", e.getMessage());
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(result);
        }
    }

    /**
     * 发送订单事件测试消息
     */
    @PostMapping("/send-order")
    public ResponseEntity<Map<String, Object>> sendOrderMessage(
            @RequestParam String orderId,
            @RequestParam Double amount,
            @RequestParam String status,
            @RequestParam(required = false) String streamKey) {

        try {
            // 如果未指定stream key,使用默认的订单stream
            if (streamKey == null || streamKey.isEmpty()) {
                streamKey = getDefaultStreamKey();
            }

            Map<String, String> message = new HashMap<>();
            message.put("eventType", "ORDER_" + status.toUpperCase());
            message.put("orderId", orderId);
            message.put("amount", amount.toString());
            message.put("status", status);
            message.put("eventTime", LocalDateTime.now().toString());

            RStream<String, String> stream = redissonClient.getStream(streamKey);
            StreamMessageId messageId = stream.addAll(message);

            Map<String, Object> result = new HashMap<>();
            result.put("success", true);
            result.put("messageId", messageId.toString());
            result.put("eventType", message.get("eventType"));
            result.put("orderId", orderId);

            log.info("订单事件发送成功: {} - {}", message.get("eventType"), orderId);
            return ResponseEntity.ok(result);

        } catch (Exception e) {
            log.error("发送订单事件失败", e);

            Map<String, Object> result = new HashMap<>();
            result.put("success", false);
            result.put("error", e.getMessage());
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(result);
        }
    }

    /**
     * 查看所有配置的Stream信息
     */
    @GetMapping("/info/all")
    public ResponseEntity<Map<String, Object>> getAllStreamInfo() {
        try {
            Map<String, Object> result = new HashMap<>();
            Map<String, Long> streams = new HashMap<>();

            if (streamGroupProperties.getGroups() != null) {
                for (StreamGroupProperties.StreamGroupConfig groupConfig : streamGroupProperties.getGroups()) {
                    try {
                        RStream<String, String> stream = redissonClient.getStream(groupConfig.getKey());
                        Long streamSize = stream.size();
                        streams.put(groupConfig.getKey(), streamSize);
                    } catch (Exception e) {
                        log.warn("获取Stream信息失败: {}", groupConfig.getKey(), e);
                        streams.put(groupConfig.getKey(), -1L);
                    }
                }
            }

            result.put("streams", streams);
            result.put("timestamp", System.currentTimeMillis());
            return ResponseEntity.ok(result);

        } catch (Exception e) {
            log.error("获取所有Stream信息失败", e);
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).build();
        }
    }

    /**
     * 查询消息队列总数情况(监控接口)
     * 返回所有Stream的详细信息,包括总消息数、pending消息数、消费组信息等
     */
    @GetMapping("/stats")
    public ResponseEntity<Map<String, Object>> getStreamStats() {
        try {
            Map<String, Object> result = new HashMap<>();
            List<Map<String, Object>> streamStats = new ArrayList<>();
            long totalMessages = 0;
            long totalPendingMessages = 0;

            if (streamGroupProperties.getGroups() != null) {
                for (StreamGroupProperties.StreamGroupConfig groupConfig : streamGroupProperties.getGroups()) {
                    try {
                        Map<String, Object> stats = getStreamStatistics(groupConfig);
                        streamStats.add(stats);
                        
                        // 累计统计
                        totalMessages += (Long) stats.getOrDefault("messageCount", 0L);
                        totalPendingMessages += (Long) stats.getOrDefault("pendingCount", 0L);
                    } catch (Exception e) {
                        log.warn("获取Stream统计信息失败: {}", groupConfig.getKey(), e);
                        Map<String, Object> errorStats = new HashMap<>();
                        errorStats.put("streamKey", groupConfig.getKey());
                        errorStats.put("groupName", groupConfig.getGroup());
                        errorStats.put("error", e.getMessage());
                        streamStats.add(errorStats);
                    }
                }
            }

            result.put("streams", streamStats);
            
            // 创建汇总信息
            Map<String, Object> summary = new HashMap<>();
            summary.put("totalStreams", streamStats.size());
            summary.put("totalMessages", totalMessages);
            summary.put("totalPendingMessages", totalPendingMessages);
            result.put("summary", summary);
            result.put("timestamp", System.currentTimeMillis());
            
            return ResponseEntity.ok(result);
        } catch (Exception e) {
            log.error("获取Stream统计信息失败", e);
            Map<String, Object> result = new HashMap<>();
            result.put("success", false);
            result.put("error", e.getMessage());
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(result);
        }
    }

    /**
     * 获取单个Stream的统计信息
     * 
     * @param groupConfig 消费组配置
     * @return Stream统计信息
     */
    private Map<String, Object> getStreamStatistics(StreamGroupProperties.StreamGroupConfig groupConfig) {
        Map<String, Object> stats = new HashMap<>();
        RStream<String, String> stream = redissonClient.getStream(groupConfig.getKey());
        
        // 总消息数
        Long messageCount = stream.size();
        stats.put("streamKey", groupConfig.getKey());
        stats.put("groupName", groupConfig.getGroup());
        stats.put("messageCount", messageCount != null ? messageCount : 0L);
        
        // Pending消息数(通过读取pending消息来统计)
        long pendingCount = getPendingMessageCount(groupConfig);
        stats.put("pendingCount", pendingCount);
        
        // 已处理消息数(总消息数 - pending消息数)
        long processedCount = (messageCount != null ? messageCount : 0L) - pendingCount;
        stats.put("processedCount", Math.max(0, processedCount));
        
        return stats;
    }

    /**
     * 获取待处理消息数量
     * 
     * @param groupConfig 消费组配置
     * @return 待处理消息数量
     */
    private long getPendingMessageCount(StreamGroupProperties.StreamGroupConfig groupConfig) {
        try {
            RStream<String, String> stream = redissonClient.getStream(groupConfig.getKey());
            String consumerName = "stats-consumer";
            
            // 尝试读取pending消息(使用0作为起始ID,读取最多1000条来统计)
            Map<StreamMessageId, Map<String, String>> messages = stream.readGroup(
                    groupConfig.getGroup(), 
                    consumerName,
                    1000,
                    new StreamMessageId(0, 0)
            );
            
            return messages != null ? messages.size() : 0;
        } catch (Exception e) {
            log.warn("获取pending消息数量失败 - Stream: {}, Group: {}", 
                    groupConfig.getKey(), groupConfig.getGroup(), e);
            return 0;
        }
    }

    /**
     * 清除消息队列(运维接口)
     * 注意:此操作会删除Stream中的所有消息,请谨慎使用
     * 
     * @param streamKey Stream键名(必填,防止误操作)
     * @param confirm 确认参数,必须为"DELETE"才会执行删除操作
     * @return 操作结果
     */
    @DeleteMapping("/clear")
    public ResponseEntity<Map<String, Object>> clearStream(
            @RequestParam String streamKey,
            @RequestParam(required = false, defaultValue = "") String confirm) {
        
        try {
            Map<String, Object> result = new HashMap<>();
            
            // 安全检查:必须提供确认参数
            if (!"DELETE".equals(confirm)) {
                result.put("success", false);
                result.put("error", "清除操作需要确认,请在confirm参数中传入'DELETE'");
                result.put("message", "此操作会删除Stream中的所有消息,请谨慎使用");
                return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(result);
            }
            
            // 检查stream是否存在
            RStream<String, String> stream = redissonClient.getStream(streamKey);
            Long sizeBefore = stream.size();
            
            if (sizeBefore == null || sizeBefore == 0) {
                result.put("success", true);
                result.put("message", "Stream为空,无需清除");
                result.put("streamKey", streamKey);
                result.put("deletedCount", 0);
                return ResponseEntity.ok(result);
            }
            
            // 删除Stream(删除整个Stream会清除所有消息)
            // 注意:Redisson中删除Stream需要使用Redis命令,这里使用trim到0的方式
            // 或者直接删除key
            redissonClient.getKeys().delete(streamKey);
            
            result.put("success", true);
            result.put("message", "Stream清除成功");
            result.put("streamKey", streamKey);
            result.put("deletedCount", sizeBefore);
            result.put("timestamp", System.currentTimeMillis());
            
            log.warn("Stream已清除 - Stream: {}, 删除消息数: {}", streamKey, sizeBefore);
            return ResponseEntity.ok(result);
            
        } catch (Exception e) {
            log.error("清除Stream失败 - Stream: {}", streamKey, e);
            Map<String, Object> result = new HashMap<>();
            result.put("success", false);
            result.put("error", e.getMessage());
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(result);
        }
    }

    /**
     * 清除指定Stream的pending消息(运维接口)
     * 只清除pending消息,保留已处理的消息
     * 
     * @param streamKey Stream键名
     * @param groupName 消费组名称
     * @param confirm 确认参数,必须为"DELETE_PENDING"才会执行删除操作
     * @return 操作结果
     */
    @DeleteMapping("/clear-pending")
    public ResponseEntity<Map<String, Object>> clearPendingMessages(
            @RequestParam String streamKey,
            @RequestParam String groupName,
            @RequestParam(required = false, defaultValue = "") String confirm) {
        
        try {
            Map<String, Object> result = new HashMap<>();
            
            // 安全检查:必须提供确认参数
            if (!"DELETE_PENDING".equals(confirm)) {
                result.put("success", false);
                result.put("error", "清除pending消息操作需要确认,请在confirm参数中传入'DELETE_PENDING'");
                result.put("message", "此操作会删除Stream中所有pending消息,请谨慎使用");
                return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(result);
            }
            
            RStream<String, String> stream = redissonClient.getStream(streamKey);
            
            // 读取所有pending消息并确认它们(通过ACK来清除pending状态)
            String consumerName = "clear-pending-consumer";
            int clearedCount = 0;
            int maxIterations = 100; // 防止无限循环
            int iteration = 0;
            
            while (iteration < maxIterations) {
                Map<StreamMessageId, Map<String, String>> messages = stream.readGroup(
                        groupName, 
                        consumerName,
                        100,
                        new StreamMessageId(0, 0)
                );
                
                if (messages == null || messages.isEmpty()) {
                    break;
                }
                
                // 确认所有pending消息
                for (StreamMessageId messageId : messages.keySet()) {
                    stream.ack(groupName, messageId);
                    clearedCount++;
                }
                
                iteration++;
            }
            
            result.put("success", true);
            result.put("message", "Pending消息清除成功");
            result.put("streamKey", streamKey);
            result.put("groupName", groupName);
            result.put("clearedCount", clearedCount);
            result.put("timestamp", System.currentTimeMillis());
            
            log.warn("Pending消息已清除 - Stream: {}, Group: {}, 清除数量: {}", 
                    streamKey, groupName, clearedCount);
            return ResponseEntity.ok(result);
            
        } catch (Exception e) {
            log.error("清除Pending消息失败 - Stream: {}, Group: {}", streamKey, groupName, e);
            Map<String, Object> result = new HashMap<>();
            result.put("success", false);
            result.put("error", e.getMessage());
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(result);
        }
    }
}
8.1 查询消息队列总数情况(监控接口)

GET http://localhost:8083/api/test/stream/stats

Spring Boot集成Redis Stream消息队列:从入门到实战

8.2 查看所有配置的Stream信息

GET http://localhost:8083/api/test/stream/info/all

Spring Boot集成Redis Stream消息队列:从入门到实战

8.3 发送JSON格式复杂消息

POST http://localhost:8083/api/test/stream/send-json

# PARAM
streamKey=order_stream

# Body
{
    "orderId": "orderId123345123",
    "amount": 500,
    "messageType": "ORDER"
}

Spring Boot集成Redis Stream消息队列:从入门到实战

8.4 发送订单事件测试消息

POST http://localhost:8083/api/test/stream/send-order

# PARAM
streamKey:order_stream
orderId:orderId2025120701
amount:500
status:ok
messageType:order

Spring Boot集成Redis Stream消息队列:从入门到实战

9.总结

上面的几个接口已经说明了使用方式,下面截图展示一下相应的执行日志:

消息的发送以及处理
Spring Boot集成Redis Stream消息队列:从入门到实战

消息的定时监控
Spring Boot集成Redis Stream消息队列:从入门到实战


消息队列监控统计 | 
[order_stream/order_group: 总数=212, Pending=0, 已处理=212] 
[payment_stream/payment_group: 总数=2, Pending=0, 已处理=2] [notification_stream/notification_group: 总数=2, Pending=0, 已处理=2] 
| 总计: 总数=216, Pending=0, 已处理=216
© 版权声明

相关文章

暂无评论

none
暂无评论...