SpringBoot 通过 JMS 集成 IBM MQ

内容分享1小时前发布
0 0 0

前言

IBM MQ 作为老牌的企业级消息中间件,虽然在当下云原生时代已不再是主流选择,但在金融、银行、保险等传统行业的核心业务系统中仍然占据着重要地位。这些行业对系统的稳定性、可靠性和事务一致性有着极高的要求,IBM MQ 正是在这些方面表现出色。

尽管近年来 RabbitMQ、Apache Kafka 等新一代消息中间件在新兴项目中更受欢迎,但许多金融公司和大型企业仍在使用 IBM MQ 作为核心业务系统的消息传递基础设施。本文将介绍如何在 Spring Boot 项目中通过 JMS 集成 IBM MQ,帮助开发者在需要维护或对接这些传统系统时能够快速上手。

环境准备

依赖配置

首先确保项目中包含必要的依赖:


<dependencies>
    <!-- Spring Boot Web Starter -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <!-- IBM MQ Dependencies -->
    <dependency>
        <groupId>com.ibm.mq</groupId>
        <artifactId>com.ibm.mq.jakarta.client</artifactId>
        <version>9.4.4.0</version>
    </dependency>

    <!-- Spring JMS -->
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
    </dependency>

    <!-- JMS API -->
    <dependency>
        <groupId>jakarta.jms</groupId>
        <artifactId>jakarta.jms-api</artifactId>
    </dependency>

    <!-- Lombok -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>

    <!-- Spring Boot Configuration Processor -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-configuration-processor</artifactId>
        <optional>true</optional>
    </dependency>
</dependencies>

核心依赖说明:


com.ibm.mq.jakarta.client
: IBM MQ 的官方 JMS 客户端库
spring-jms
: Spring 的 JMS 支持,提供 JmsTemplate 和 JMS 监听器功能
jakarta.jms-api
: JMS API 接口定义
spring-boot-starter-web
: 提供 Spring Boot 的基础功能支持

配置文件


application.yml
中添加 IBM MQ 相关配置:


ibm:
  mq:
    connection-name-list: 192.168.1.60(1414)  # 多节点配置示例: 192.168.1.10(1414),192.168.1.11(1414),192.168.1.12(1414)
    queue-manager: QM1
    channel: DEV.APP.SVRCONN
    queue: DEV.QUEUE.1
    username: mqm
    password: password
    ccsid: 1208
    receive-timeout: 5000

核心配置类

配置属性类

首先定义配置属性类来绑定配置文件中的参数:


@Data
@ConfigurationProperties(prefix = "ibm.mq")
public class IBMMqProperties {
    private String connectionNameList;
    private String queueManager;
    private String channel;
    private String username;
    private String password;
    private String queue;
    private int receiveTimeout = 5000;
    private int ccsid = 1208;
}

MQ 连接工厂配置

这是整个集成的核心配置类:


import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import com.ibm.mq.jakarta.jms.MQConnectionFactory;
import com.ibm.msg.client.jakarta.wmq.WMQConstants;
import jakarta.jms.ConnectionFactory;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;

@Slf4j
@Configuration
@EnableJms
@EnableConfigurationProperties(IBMMqProperties.class)
public class MqConfiguration {

    /**
     * IBM MQ配置属性
     */
    private final IBMMqProperties ibmMqProperties;

    /**
     * 构造函数
     *
     * @param ibmMqProperties IBM MQ配置属性
     */
    public MqConfiguration(IBMMqProperties ibmMqProperties) {
        this.ibmMqProperties = ibmMqProperties;
    }

    /**
     * 创建IBM MQ连接工厂
     *
     * @return 配置好的连接工厂
     * @throws Exception 连接创建异常
     */
    @Bean
    public ConnectionFactory connectionFactory() throws Exception {
        log.info("初始化IBM MQ连接工厂: connectionNameList={}, queueManager={}, ccsid={}",
                ibmMqProperties.getConnectionNameList(),
                ibmMqProperties.getQueueManager(),
                ibmMqProperties.getCcsid());

        ConnectionFactory connectionFactory;
        MQConnectionFactory mqConnectionFactory = new MQConnectionFactory();
        mqConnectionFactory.setConnectionNameList(ibmMqProperties.getConnectionNameList());
        mqConnectionFactory.setQueueManager(ibmMqProperties.getQueueManager());
        mqConnectionFactory.setChannel(ibmMqProperties.getChannel());
        mqConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT); // WMQ_CM_CLIENT
        mqConnectionFactory.setCCSID(ibmMqProperties.getCcsid());

        // 设置用户名和密码
        if (ibmMqProperties.getUsername() != null && !ibmMqProperties.getUsername().isEmpty()) {
            UserCredentialsConnectionFactoryAdapter userCredentialsFactory =
                    new UserCredentialsConnectionFactoryAdapter();
            userCredentialsFactory.setTargetConnectionFactory(mqConnectionFactory);
            userCredentialsFactory.setUsername(ibmMqProperties.getUsername());
            userCredentialsFactory.setPassword(ibmMqProperties.getPassword());
            connectionFactory = userCredentialsFactory;
        }else {
            connectionFactory = mqConnectionFactory;
        }
        // 使用CachingConnectionFactory包装原始工厂
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setTargetConnectionFactory(connectionFactory);
        cachingConnectionFactory.setSessionCacheSize(10);
        return cachingConnectionFactory;
    }

    /**
     * 创建JMS模板
     *
     * @param connectionFactory 连接工厂
     * @return 配置好的JMS模板
     */
    @Bean
    public JmsTemplate jmsTemplate(ConnectionFactory connectionFactory) {
        JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
        jmsTemplate.setDefaultDestinationName(ibmMqProperties.getQueue());
        jmsTemplate.setReceiveTimeout(ibmMqProperties.getReceiveTimeout());
        jmsTemplate.setSessionTransacted(true);
        return jmsTemplate;
    }

    /**
     * 创建JMS监听器容器工厂
     *
     * @param connectionFactory 连接工厂
     * @return 配置好的监听器容器工厂
     */
    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrency("1-1"); // 这里需要设置合理的并发数,例如"1-1"表示单消费者模式
        factory.setSessionTransacted(true);
        factory.setReceiveTimeout((long) ibmMqProperties.getReceiveTimeout());
        factory.setAutoStartup(true); // 启用自动启动,让消费者能够接收消息
        return factory;
    }
}

特别说明:并发数配置说明


setConcurrency()
方法的参数格式为 “最小并发数-最大并发数”,例如:


"1-1"
:固定单个消费者线程
"1-5"
:最小1个,最大5个消费者线程
"5-10"
:最小5个,最大10个消费者线程

选择合适的并发数需要考虑以下因素:

业务场景:如果涉及文件操作、数据库写入等需要串行处理的场景,建议使用单消费者模式消息吞吐量:对于高吞吐量且无状态的消息处理,可以适当增加并发数资源消耗:每个消费者都会占用系统资源,需要根据服务器配置合理设置消息顺序:如果需要保证消息处理的顺序性,必须使用单消费者模式

特别说明:消息确认机制

在示例中使用了
message.acknowledge()
进行手动确认,这与连接工厂的确认模式设置密切相关:

手动确认模式(CLIENT_ACKNOWLEDGE):必须显式调用
acknowledge()
方法确认消息,如果没有确认,消息会重新投递自动确认模式(AUTO_ACKNOWLEDGE):消息处理完成后自动确认,无需手动调用
acknowledge()
事务模式(SESSION_TRANSACTED):在示例中启用了事务支持,消息确认与事务提交绑定


DefaultJmsListenerContainerFactory
中设置了
setSessionTransacted(true)
,表示使用事务模式。在这种模式下:

如果消息处理成功且事务提交,消息会自动确认如果消息处理失败或事务回滚,消息会重新投递即使在事务模式下,显式调用
acknowledge()
仍然是良好实践

关键配置解析

消息确认机制与事务配置详解

1. 消息确认的三种模式

自动确认模式(AUTO_ACKNOWLEDGE)

消息处理完成后自动确认,无需手动操作适合简单的消息处理场景

手动确认模式(CLIENT_ACKNOWLEDGE)

必须显式调用
message.acknowledge()
方法确认消息如果没有确认,消息会重新投递提供更精确的控制能力

事务模式(SESSION_TRANSACTED)

消息确认与事务提交绑定消息处理成功且事务提交时,消息自动确认消息处理失败或事务回滚时,消息重新投递

2. Spring Boot 默认配置

重要:如果没有主动设置事务和确认模式,Spring Boot 使用什么?

答案是:自动确认模式(AUTO_ACKNOWLEDGE)


// 这是 Spring Boot 的默认行为
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    // 默认情况下:
    // - setSessionTransacted(false)  // 不启用事务
    // - setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE)  // 自动确认
    return factory;
}

默认行为的特点

消息处理完成后自动确认,无需手动调用
acknowledge()
如果处理过程中抛出异常,消息不会被重新投递(因为已经自动确认了)适合简单的、幂等的消息处理场景风险:消息处理失败时可能丢失消息

默认配置示例


@JmsListener(...)
public void onMessage(Message message) {
    // 不需要手动确认
    String content = ((TextMessage) message).getText();
    processMessage(content);  // 处理完成后消息自动确认

    // 即使这里抛出异常,消息也不会重新投递(因为已经确认了)
}

什么时候需要手动配置?

需要消息可靠性:处理失败时需要重新投递 → 启用事务模式需要精确控制确认时机:根据业务逻辑决定何时确认 → 手动确认模式简单的可重复操作:消息处理失败也没关系 → 使用默认的自动确认模式

3. 事务配置的两种方式

Spring 便捷方式(推荐)


factory.setSessionTransacted(true);

Spring 原生 API 方式


factory.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);

这两种方式效果完全相同,Spring 内部会将第一种转换为第二种。

重要说明:这里指的事务是 JMS 事务,不是数据库事务!

JMS 事务范围:仅控制消息的发送、接收、确认操作数据库事务范围:控制数据库的增删改查操作两者独立:JMS 事务成功不等于数据库事务成功,反之亦然

4. JMS 事务的自动回滚机制

关键特性:异常时自动回滚


@JmsListener(...)
public void onMessage(Message message) {
    try {
        String content = ((TextMessage) message).getText();

        // 1. 消息处理逻辑
        processContent(content);

        // 2. 数据库操作(需要单独的事务注解)
        saveToDatabase(content);

        // 3. 方法正常结束 -> JMS 事务自动提交 -> 消息确认

    } catch (RuntimeException e) {
        // 4. 抛出 RuntimeException -> JMS 事务自动回滚 -> 消息重新投递
        // 这个回滚是 JMS 事务的回滚,不影响数据库事务
        throw e;
    }
}

自动回滚的触发点

方法执行抛出任何 RuntimeException 或 ErrorSpring 框架检测到异常后自动调用 session.rollback()消息从队列中删除失败,等待重新投递回滚发生在方法返回之前,是 Spring 容器自动处理的

注意事项


@Transactional  // 这是数据库事务
@JmsListener(...)  // 这是 JMS 事务
public void processMessage(Message message) {
    try {
        // 数据库操作:在数据库事务中
        dataService.save(data);

        // JMS 相关操作:在 JMS 事务中
        // 消息确认由 JMS 事务控制

    } catch (DataAccessException e) {
        // 在 try-catch 内部捕获异常后,如果不重新抛出:
        // - 数据库事务不会回滚(因为异常被捕获了)
        // - JMS 事务不会回滚(因为方法正常结束)

        // 如果要让两个事务都回滚,必须重新抛出异常:
        throw new RuntimeException("处理失败", e);
    }
}

重要理解:事务回滚的条件

两个注解都在同一个方法上:方法执行结果是事务回滚的唯一判断标准方法正常结束:两个事务都提交方法抛出未处理异常:两个事务都回滚异常被捕获且未重新抛出:两个事务都不会回滚(误以为正常的完成)

5. 配置优先级规则

重要原则:当设置冲突时,事务配置优先级最高!


// 冲突配置示例
factory.setSessionTransacted(true);                           // 最高优先级
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE); // 会被忽略

实际生效逻辑

如果
setSessionTransacted(true)
,确认模式强制为
Session.SESSION_TRANSACTED
任何手动设置的确认模式都会被忽略只有在非事务模式下,确认模式设置才会生效

6. 不同配置组合的实际效果
事务设置 确认模式设置 最终生效模式 说明

setSessionTransacted(true)
任意值
SESSION_TRANSACTED
事务模式优先

setSessionTransacted(false)

AUTO_ACKNOWLEDGE

AUTO_ACKNOWLEDGE
自动确认

setSessionTransacted(false)

CLIENT_ACKNOWLEDGE

CLIENT_ACKNOWLEDGE
手动确认
7. 实际编码建议

事务模式推荐写法


@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setSessionTransacted(true);  // 只需要设置这个
    // 不需要设置确认模式,避免混淆
    return factory;
}

@JmsListener(...)
public void onMessage(Message message) {
    try {
        String content = ((TextMessage) message).getText();
        processMessage(content);

        // 不需要调用 message.acknowledge()
        // 事务会在方法正常结束时自动提交并确认消息

    } catch (Exception e) {
        // 抛出异常触发事务回滚,消息重新投递
        throw new RuntimeException("处理失败", e);
    }
}

手动确认模式写法


@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setSessionTransacted(false);                    // 关闭事务
    factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
    return factory;
}

@JmsListener(...)
public void onMessage(Message message) {
    try {
        String content = ((TextMessage) message).getText();
        processMessage(content);

        // 必须手动确认
        message.acknowledge();

    } catch (Exception e) {
        // 不确认,消息会重新投递
        log.error("处理失败", e);
    }
}
8. 重要注意事项

避免概念混淆:不要同时设置事务和手动确认,明确选择一种模式JMS事务 ≠ 数据库事务:JMS事务只控制消息操作,数据库事务需要单独的
@Transactional
性能考虑:事务模式有额外开销,简单场景可考虑自动确认模式错误处理:事务模式下通过异常控制回滚,手动确认模式下通过确认行为控制

认证配置的重要性

在集成 IBM MQ 时,一个常见的问题是认证失败,会出现类似这样的错误:


JMSWMQ2013: 为队列管理器"QM1"提供的安全性认证无效,连接方式为"Client",主机名为"192.168.1.60(1414)"
com.ibm.mq.MQException: JMSCMQ0001: IBM MQ 调用失败,完成代码为"2"("MQCC_FAILED"),原因码为"2035"("MQRC_NOT_AUTHORIZED")

错误原因分析:

Spring JMS 在创建连接时默认调用无参的
createConnection()
方法直接在连接工厂上设置的用户名密码属性可能不会被正确传递IBM MQ 需要在连接创建时明确传递认证信息

解决方案:
使用
UserCredentialsConnectionFactoryAdapter
包装原始连接工厂。这个适配器会在 Spring JMS 调用
createConnection()
时,自动调用带用户名密码的
createConnection(username, password)
方法。


// 设置用户名和密码
if (ibmMqProperties.getUsername() != null && !ibmMqProperties.getUsername().isEmpty()) {
    UserCredentialsConnectionFactoryAdapter userCredentialsFactory =
            new UserCredentialsConnectionFactoryAdapter();
    userCredentialsFactory.setTargetConnectionFactory(mqConnectionFactory);
    userCredentialsFactory.setUsername(ibmMqProperties.getUsername());
    userCredentialsFactory.setPassword(ibmMqProperties.getPassword());
    connectionFactory = userCredentialsFactory;
}

注意事项:

确保 MQ 服务器上配置的用户名密码正确检查用户是否有访问指定队列管理器的权限确认通道(Channel)配置正确且允许连接
UserCredentialsConnectionFactoryAdapter
是解决认证问题的关键组件使用
CachingConnectionFactory
包装以提高性能

CachingConnectionFactory 的作用

性能优化:缓存连接和会话,避免频繁创建和销毁资源管理:自动管理连接池,提高资源利用效率事务支持:提供更好的事务管理

消息监听器实现


@Slf4j
@Component
public class MqMessageConsumer implements MessageListener {

    @Override
    @JmsListener(destination = "${ibm.mq.queue}", containerFactory = "jmsListenerContainerFactory")
    public void onMessage(Message message) {
        try {
            if (message instanceof TextMessage textMessage) {
                String messageContent = textMessage.getText();
                log.info("接收到消息: messageId={}, content={}",
                    textMessage.getJMSMessageID(), messageContent);

                // 获取消息时间戳
                long jmsTimestamp = textMessage.getJMSTimestamp();
                long receiveTime = System.currentTimeMillis();

                // 处理消息逻辑
                processMessage(messageContent, jmsTimestamp, receiveTime);

                // 手动确认消息
                message.acknowledge();
                log.debug("消息确认完成");
            }
        } catch (JMSException e) {
            log.error("消息处理失败", e);
            // 根据业务需要决定是否重新投递消息
        }
    }

    private void processMessage(String content, long jmsTimestamp, long receiveTime) {
        // 实现具体的消息处理逻辑
        log.info("处理消息内容: {}, 消息时间戳: {}, 接收时间: {}, 延迟: {}ms",
            content, jmsTimestamp, receiveTime, (receiveTime - jmsTimestamp));
    }
}

消息发送实现


@Slf4j
@Service
@RequiredArgsConstructor
public class MqMessageProducer {

    private final JmsTemplate jmsTemplate;
    private final IBMMqProperties ibmMqProperties;

    /**
     * 发送文本消息
     *
     * @param messageContent 消息内容
     * @return 消息ID
     */
    public String sendMessage(String messageContent) {
        log.info("开始发送消息到队列: {}, 内容长度: {}",
            ibmMqProperties.getQueue(), messageContent.length());

        try {
            jmsTemplate.send(ibmMqProperties.getQueue(), session -> {
                TextMessage message = session.createTextMessage(messageContent);
                message.setJMSTimestamp(System.currentTimeMillis());
                return message;
            });

            log.info("消息发送成功");
            return "MSG_" + System.currentTimeMillis();

        } catch (Exception e) {
            log.error("消息发送失败: {}", e.getMessage(), e);
            return null;
        }
    }

    /**
     * 异步发送消息
     *
     * @param messageContent 消息内容
     */
    @Async  // 注意:需要在配置类上使用 @EnableAsync 注解启用异步支持
    public void sendMessageAsync(String messageContent) {
        sendMessage(messageContent);
    }
}

常见问题与解决方案

1. 认证失败问题

问题表现:
MQRC_NOT_AUTHORIZED
错误

解决方案:

确保
UserCredentialsConnectionFactoryAdapter
正确配置检查用户名密码是否正确确认 MQ 用户有相应的权限

2. 连接超时问题

问题表现: 连接建立超时

解决方案:

检查网络连接和防火墙设置确认 MQ 服务器端口是否开放调整连接超时配置

3. 字符编码问题

问题表现: 中文字符乱码

解决方案:

设置正确的 CCSID(如 1208 表示 UTF-8)确保消息内容使用正确的字符编码

最佳实践

连接池配置:合理设置
sessionCacheSize
,避免资源浪费事务管理:启用事务支持,确保消息传递的可靠性错误处理:实现完善的异常处理和重试机制日志记录:记录关键操作和错误信息,便于问题排查性能监控:监控消息处理性能,及时优化配置

总结

通过本文的介绍,我们了解了如何在 Spring Boot 中集成 IBM MQ,重点是解决了认证传递这个关键问题。使用
UserCredentialsConnectionFactoryAdapter
是确保用户名密码正确传递的关键,避免了
MQRC_NOT_AUTHORIZED
错误的发生。

完整的配置包括连接工厂设置、消息监听器和消息发送器,这些组件共同构成了一个完整的 IBM MQ 集成解决方案。在实际项目中,还需要根据具体业务需求进行相应的调整和优化。


(END)

© 版权声明

相关文章

暂无评论

none
暂无评论...