消息中间件RabbitMQ(从入门到精通)

RabbitMQ概念_MQ

消息中间件RabbitMQ(从入门到精通)

消息队列

MQ全称Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于系统之间的异步通信

同步通信相当于两个人当面对话,你一言我一语。必须及时回复

消息中间件RabbitMQ(从入门到精通)

异步通信相当于通过第三方转述对话,可能有消息的延迟,但不需要二人时刻保持联系。

消息中间件RabbitMQ(从入门到精通)

消息

两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。

队列

数据结构中概念。在队列中,数据先进先出,后进后出。

消息中间件RabbitMQ(从入门到精通)

实时效果反馈

1. 消息队列多用于系统间的

A 同步通信

B 异步通信

C 跨域通信

D 以上说法都不对

2. 在队列中,数据

A 先进先出,后进后出

B 先进后出

C 后进先出

D 以上说法都不对

答案

1=>B 2=>A

RabbitMQ概念_MQ的优势

消息中间件RabbitMQ(从入门到精通)

应用解耦

在电商平台中,用户下订单需要调用订单系统,此时订单系统还需要调用库存系统、支付系统、物流系统完成业务。此时会产生两个问题:

如果库存系统出现故障,会造成整个订单系统崩溃。如果需求修改,新增了一个X系统,此时必须修改订单系统的代码。

消息中间件RabbitMQ(从入门到精通)

如果在系统中引入MQ,即订单系统将消息先发送到MQ中,MQ再转发到其他系统,则会解决以下问题:

由于订单系统只发消息给MQ,不直接对接其他系统,如果库存系统出现故障,不影响整个订单。如果需求修改,新增了一个X系统,此时无需修改订单系统的代码,只需修改MQ将消息发送给X系统即可。

消息中间件RabbitMQ(从入门到精通)

异步提速

如果订单系统同步访问每个系统,则用户下单等待时长如下:

消息中间件RabbitMQ(从入门到精通)

如果引入MQ,则用户下单等待时长如下:

消息中间件RabbitMQ(从入门到精通)

削峰填谷

假设我们的系统每秒只能承载1000请求,如果请求瞬间增多到每秒5000,则会造成系统崩溃。此时引入mq即可解决该问题

消息中间件RabbitMQ(从入门到精通)

使用了MQ之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在MQ中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”。

消息中间件RabbitMQ(从入门到精通)

实时效果反馈

1. MQ的优势是

A 应用解耦

B 异步提速

C 削峰填谷

D 以上说法都正确

答案

1=>D

RabbitMQ概念_MQ的劣势

消息中间件RabbitMQ(从入门到精通)

系统可用性降低 系统引入的外部依赖越多,系统稳定性越差。一旦MQ宕机,就会对业务造成影响。系统复杂度提高 MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用。一致性问题 A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理成功,D系统处理失败,则会造成数据处理的不一致。

实时效果反馈

1. 关于MQ的劣势,以下说法正确的是

A 会让系统可用性提高

B 会让系统可用性降低

C 会让系统复杂度降低

D 不会产生一致性问题

答案

1=>B

RabbitMQ概念_MQ应用场景

消息中间件RabbitMQ(从入门到精通)

抢红包、秒杀活动、抢火车票等

这些业务场景都是短时间内需要处理大量请求,如果直接连接系统处理业务,会耗费大量资源,有可能造成系统瘫痪。

消息中间件RabbitMQ(从入门到精通)

而使用MQ后,可以先让用户将请求发送到MQ中,MQ会先保存请求消息,不会占用系统资源,且MQ会进行消息排序,先请求的秒杀成功,后请求的秒杀失败。

消息中间件RabbitMQ(从入门到精通)

消息分发

如电商网站要推送促销信息,该业务耗费时间较多,但对时效性要求不高,可以使用MQ做消息分发。

消息中间件RabbitMQ(从入门到精通)

数据同步

假如我们需要将数据保存到数据库之外,还需要一段时间将数据同步到缓存(如Redis)、搜索引擎(如Elasticsearch)中。此时可以将数据库的数据作为消息发送到MQ中,并同步到缓存、搜索引擎中。

消息中间件RabbitMQ(从入门到精通)

异步处理

在电商系统中,订单完成后,需要及时的通知子系统(进销存系统发货,用户服务积分,发送短信)进行下一步操作。为了保证订单系统的高性能,应该直接返回订单结果,之后让MQ通知子系统做其他非实时的业务操作。这样能保证核心业务的高效及时。

消息中间件RabbitMQ(从入门到精通)

离线处理

在银行系统中,如果要查询近十年的历史账单,这是非常耗时的操作。如果发送同步请求,则会花费大量时间等待响应。此时使用MQ发送异步请求,等到查询出结果后获取结果即可。

消息中间件RabbitMQ(从入门到精通)

实时效果反馈

1. 以下业务场景中,可以使用MQ优化的是

A 抢红包

B 秒杀活动

C 抢火车票

D 以上场景都可以

答案

1=>D

RabbitMQ概念_AMQP

消息中间件RabbitMQ(从入门到精通)

RabbitMQ是由Erlang语言编写的基于AMQP的MQ产品。

AMQP

即Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,专门为消息中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受不同中间件产品,不同开发语言等条件的限制。2006年AMQP规范发布,类比HTTP。

AMQP工作过程

生产者(Publisher)将消息发布到交换机(Exchange),交换机根据规则将消息分发给交换机绑定的队列(Queue),队列再将消息投递给订阅了此队列的消费者。

消息中间件RabbitMQ(从入门到精通)

实时效果反馈

1. AMQP是为设计的网络协议

A 缓存

B Web服务器

C 消息中间件

D 数据库

2. 在AMQP中,将消息分发给队列

A 虚拟机

B 交换机

C 管道

D 生产者

答案

1=>C 2=>B

RabbitMQ概念_RabbitMQ工作原理

消息中间件RabbitMQ(从入门到精通)

Producer

消息的生产者。也是一个向交换机发布消息的客户端应用程序。

Connection

连接。生产者/消费者和RabbitMQ服务器之间建立的TCP连接。

Channel

信道。是TCP里面的虚拟连接。例如:Connection相当于电缆,Channel相当于独立光纤束,一条TCP连接中可以创建多条信道,增加连接效率。无论是发布消息、接收消息、订阅队列都是通过信道完成的。

Broker

消息队列服务器实体。即RabbitMQ服务器

Virtual host

虚拟主机。出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换机、绑定和权限机制。当多个不同的用户使用同一个RabbitMQ服务器时,可以划分出多个虚拟主机。RabbitMQ默认的虚拟主机路径是
/

Exchange

交换机。用来接收生产者发送的消息,并根据分发规则,将这些消息分发给服务器中的队列中。不同的交换机有不同的分发规则。

Queue

消息队列。用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。消息一直在队列里面,等待消费者链接到这个队列将其取走。

Binding

消息队列和交换机之间的虚拟连接,绑定中包含路由规则,绑定信息保存到交换机的路由表中,作为消息的分发依据。

Consumer

消息的消费者。表示一个从消息队列中取得消息的客户端应用程序。

RabbitMQ为什么使用信道而不直接使用TCP连接通信?

TCP连接的创建和销毁开销特别大。创建需要3次握手,销毁需要4次分手。高峰时每秒成千上万条TCP连接的创建会造成资源巨大的浪费。而且操作系统每秒处理TCP连接数也是有限制的,会造成性能瓶颈。而如果一条线程使用一条信道,一条TCP链接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能的瓶颈。

实时效果反馈

1. 在RabbitMQ中,是消息的生产者

A Publisher

B Consumer

C Connection

D Channel

2. 在RabbitMQ中,发布消息、接收消息是通过完成的

A Broker

B Connection

C Channel

D Exchange

答案

1=>A 2=>C

RabbitMQ安装_安装Erlang

RabbitMQ是使用Erlang语言编写的,所以在安装RabbitMQ前需要先安装Erlang环境

安装Erlang所需的依赖

yum install
-y epel-release

添加存储库条目

wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm




rpm
-Uvh erlang-solutions-1.0-1.noarch.rpm

安装Erlang

yum install
-y erlang

查看Erlang是否安装成功

erl
-version

实时效果反馈

1. RabbitMQ是语言编写的

A Java

B C++

C Erlang

D JavaScript

答案

1=>C

RabbitMQ安装_安装RabbitMQ

消息中间件RabbitMQ(从入门到精通)

为了外部能够正常访问RabbitMQ服务,先关闭防火墙


# 关闭运行的防火墙
systemctl stop firewalld.service
# 禁止防火墙自启动
systemctl disable firewalld.service

RabbitMQ是通过主机名进行访问的,必须给服务器添加主机名

# 修改文件
vim /etc/sysconfig/network
# 添加如下内容
NETWORKING=yes
HOSTNAME=itbaizhan

# 修改文件
vim /etc/hosts
# 添加如下内容
服务器ip itbaizhan

使用rz命令上传RabbitMQ压缩文件

安装RabbitMQ


# 解压RabbitMQ
tar xf rabbitmq-server-generic-unix-3.9.13.tar.xz

# 重命名:
mv rabbitmq_server-3.9.13 rabbitmq

# 移动文件夹:
mv rabbitmq /usr/local/

配置环境变量

# 编辑/etc/profile文件
vim /etc/profile

#添加如下内容
export PATH=$PATH:/usr/local/rabbitmq/sbin

# 运行文件,让修改内容生效
source /etc/profile

开启管控台插件

rabbitmq-plugins enable rabbitmq_management

后台运行


#启动rabbitmq
rabbitmq-server -detached

#停止rabbitmq
rabbitmqctl stop


通过管控台访问RabbitMQ(15672是rabbitmq的管控台的port,5672才是他的port)

路径:
http://ip地址:15672
,用户名:
guest
,密码:
guest

此时会提示guest账户只允许本地使用,我们可以配置允许使用guest远程访问


# 创建配置文件夹
mkdir -p /usr/local/rabbitmq/etc/rabbitmq
# 创建配置文件
vim /usr/local/rabbitmq/etc/rabbitmq/rabbitmq.conf
# 添加如下内容
loopback_users=none

# 重启RabbitMQ
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app


RabbitMQ安装_账户管理

消息中间件RabbitMQ(从入门到精通)

guest账户默认只允许本地使用,我们可以创建新账户远程访问RabbitMQ

创建账户

# 创建账户

rabbitmqctl add_user 用户名 密码

给用户授予管理员角色

rabbitmqctl set_user_tags 用户名 administrator

给用户授权

# “/”表示虚拟机

# itbaizhan表示用户名

# “.*” “.*” “.*” 表示完整权限

rabbitmqctl set_permissions -p “/” itbaizhan “.*” “.*” “.*”

通过管控台访问rabbitmq

实时效果反馈

1. RabbitMQ新增用户的命令为


rabbitmqctl user


rabbitmqctl add


rabbitmqctl add_user


rabbitmqctl add_group

2. RabbitMQ默认用户名是

A admin

B rabbit

C root

D guest

答案

1=>C 2=>D

RabbitMQ安装_管控台

消息中间件RabbitMQ(从入门到精通)

消息中间件RabbitMQ(从入门到精通)

实时效果反馈

1. RabbitMQ管控台默认端口是

A 15672

B 5672

C 3306

D 5601

答案

1=>A

RabbitMQ安装_Docker安装

消息中间件RabbitMQ(从入门到精通)

关闭RabbitMQ服务

rabbitmqctl
stop

在Centos7中安装docker

# 安装Docker

curl
-fsSL https://get.docker.com |
bash
-s docker
–mirror Aliyun

# 启动docker

systemctl
start docker

拉取镜像

docker pull rabbitmq

启动RabbitMQ容器

docker run
-d
–hostname itbaizhan
–name rabbit
-p
15672:15672
-p
5672:5672 rabbitmq

开启管控台插件

# 查询rabbitmq容器ID

docker ps 

# 进入容器

docker exec -it 容器ID /bin/bash 

# 开启管控台插件

rabbitmq-plugins enable rabbitmq_management

# 退出容器

ctrl+p+q

通过管控台访问rabbitmq

路径:
http://ip地址:15672
,用户名:
guest
,密码:
guest

关闭RabbitMQ容器

docker
stop rabbit

实时效果反馈

1. RabbitMQ默认端口是

A 15672

B 5672

C 3306

D 5601

答案

1=>B

RabbitMQ简单模式_概念

消息中间件RabbitMQ(从入门到精通)

RabbitMQ共有六种工作模式:简单模式(Simple)、工作队列模式(Work Queue)、发布订阅模式(Publish/Subscribe)、路由模式(Routing)、通配符模式(Topics)、远程调用模式(RPC,不常用,课程不对此模式进行讲解)

首先我们讲解简单模式:

消息中间件RabbitMQ(从入门到精通)

特点:

一个生产者对应一个消费者,通过队列进行消息传递。该模式使用direct交换机,direct交换机是RabbitMQ默认交换机。

实时效果反馈

1. RabbitMQ简单模式使用的交换机为


direct


fanout


routing


topic

2. RabbitMQ简单模式的特点为

A 一个生产者对应一个消费者

B 一个生产者对应多个消费者

C 多个生产者对应一个消费者

D 多个生产者对应多个消费者

答案

1=>A 2=>A

RabbitMQ简单模式_项目搭建

消息中间件RabbitMQ(从入门到精通)

接下来我们使用JAVA代码操作RabbitMQ,让其按照简单模式进行工作。

JMS

由于MQ产品很多,操作方式各有不同,于是JAVA提供了一套规则——JMS,用于操作消息中间件。JMS即Java消息服务(JavaMessage Service)应用程序接口,是一个Java平台中关于面向消息中间件的API。JMS是JavaEE规范中的一种,类比JDBC。很多MQ产品都实现了JMS规范,例如ActiveMQ。RabbitMQ官方并没有实现JMS规范,但是开源社区有JMS的实现包。

创建项目

启动RabbitMQ



# 开启管控台插件
rabbitmq-plugins enable rabbitmq_management
# 启动rabbitmq
rabbitmq-server -detached

创建普通maven项目,添加RabbitMQ依赖:



<dependencies>
  <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.14.0</version>
  </dependency>
</dependencies>

实时效果反馈

1. JMS是JAVA为设计的规范

A 数据库

B 缓存

C 搜索引擎

D 消息中间件

2. 关于RabbitMQ,说法正确的是

A RabbitMQ官方并没有实现JMS规范

B RabbitMQ官方实现了JMS规范

C RabbitMQ开源社区没有JMS的实现包

D 以上说法都不对

答案

1=>D 2=>A

RabbitMQ简单模式_编写生产者

消息中间件RabbitMQ(从入门到精通)

接下来我们编写生产者代码创建消息:



// 生产者
public class Producer {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.创建队列,如果队列已存在,则使用该队列
    /**
     * 参数1:队列名
     * 参数2:是否持久化,true表示MQ重启后队列还在。
     * 参数3:是否私有化,false表示所有消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
     * 参数4:是否自动删除,true表示不再使用队列时自动删除队列
     * 参数5:其他额外参数
     */
    channel.queueDeclare("simple_queue",false,false,false,null);
    // 5.发送消息
    String message = "hello!rabbitmq!";
    /**
     * 参数1:交换机名,""表示默认交换机
     * 参数2:路由键,简单模式就是队列名
     * 参数3:其他额外参数
     * 参数4:要传递的消息字节数组
     */
    channel.basicPublish("","simple_queue",null,message.getBytes());
    // 6.关闭信道和连接
    channel.close();
    connection.close();
    System.out.println("===发送成功===");
   }
}

运行生产者后,我们可以看到在RabbitMQ中创建了队列,队列中已经有了消息。

消息中间件RabbitMQ(从入门到精通)

实时效果反馈

1. JAVA操作RabbitMQ时,调用的方法发送消息


ConnectionFactory


Connection


Channel


Queue

答案

1=>C

RabbitMQ简单模式_编写消费者

消息中间件RabbitMQ(从入门到精通)

接下来我们编写消费者代码消费消息:



// 消费者
public class Consumer {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.监听队列
    /**
     * 参数1:监听的队列名
     * 参数2:是否自动签收,如果设置为false,则需要手动确认消息已收到,否则MQ会一直发送消息
     * 参数3:Consumer的实现类,重写该类方法表示接受到消息后如何消费
     */
    channel.basicConsume("simple_queue",true,new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("接受消息,消息为:"+message);
       }
     });
   }
}

运行消费者后,我们可以看到在RabbitMQ中的消息已经被消费。

消息中间件RabbitMQ(从入门到精通)

实时效果反馈

1. JAVA操作RabbitMQ时,调用的方法监听队列


ConnectionFactory


Connection


Channel


Queue

答案

1=>C

RabbitMQ工作队列模式_概念

消息中间件RabbitMQ(从入门到精通)

与简单模式相比,工作队列模式(Work Queue)多了一些消费者,该模式也使用direct交换机,应用于处理消息较多的情况。特点如下:

一个队列对应多个消费者。一条消息只会被一个消费者消费。消息队列默认采用轮询的方式将消息平均发送给消费者。

实时效果反馈

1. RabbitMQ工作队列模式使用的交换机为


direct


fanout


routing


topic

2. RabbitMQ工作队列模式的特点为

A 一个生产者对应一个消费者

B 一个生产者对应多个消费者

C 多个生产者对应一个消费者

D 多个生产者对应多个消费者

答案

1=>A 2=>B

 

RabbitMQ工作队列模式_编写生产者

消息中间件RabbitMQ(从入门到精通)

接下来我们编写生产者代码创建大量消息:



public class Producer {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.创建队列,持久化队列
    channel.queueDeclare("work_queue",true,false,false,null);
    // 5.发送大量消息,参数3表示该消息为持久化消息,即除了保存到内存还会保存到磁盘中
    for (int i = 1; i <= 100; i++) {
      channel.basicPublish("","work_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,
           ("你好,这是今天的第"+i+"条消息").getBytes());
     }
    // 6.关闭资源
    channel.close();
    connection.close();
   }
}

效果如下:

消息中间件RabbitMQ(从入门到精通)

实时效果反馈

1. 原生JAVA操作RabbitMQ发送持久化消息时,添加的参数为


MessageProperties.PERSISTENT


MessageProperties.PERSISTENT_TEXT_PLAIN


MessageProperties.PERSISTENT_TEXT


MessageProperties.PERSISTENT_PLAIN

答案

1=>B

RabbitMQ工作队列模式_编写消费者

消息中间件RabbitMQ(从入门到精通)

接下来我们编写三个消费者监听同一个队列:



// 消费者1
public class Consumer1 {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.监听队列,处理消息
    channel.basicConsume("work_queue",true,new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("消费者1消费消息,消息为:"+message);
       }
     });
   }
}
 
 
// 消费者2
public class Consumer2 {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.监听队列,处理消息
    channel.basicConsume("work_queue",true,new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("消费者2消费消息,消息为:"+message);
       }
     });
   }
}
 
 
// 消费者3
public class Consumer3 {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.监听队列,处理消息
    channel.basicConsume("work_queue",true,new DefaultConsumer(channel){
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println("消费者3消费消息,消息为:"+message);
       }
     });
   }
}

效果如下:

消息中间件RabbitMQ(从入门到精通)

实时效果反馈

1. 关于RabbitMQ的工作队列模式,以下说法正确的是

A 一条消息只会被一个消费者消费。

B 消息队列默认将消息平均发送给消费者。

C 工作队列模式应用于处理消息较多的情况。

D 以上说法都正确。

答案

1=>D

RabbitMQ发布订阅模式_概念

消息中间件RabbitMQ(从入门到精通)

在开发过程中,有一些消息需要不同消费者进行不同的处理,如电商网站的同一条促销信息需要短信发送、邮件发送、站内信发送等。此时可以使用发布订阅模式(Publish/Subscribe)

特点:

生产者将消息发送给交换机,交换机将消息转发到绑定此交换机的每个队列中。工作队列模式的交换机只能将消息发送给一个队列,发布订阅模式的交换机能将消息发送给多个队列。发布订阅模式使用fanout交换机。

实时效果反馈

1. RabbitMQ发布订阅模式的特点为

A 能将消息发送给一条队列

B 能将消息发送给多条队列

C 能按照路由键将消息发送给指定队列

D 能按照通配符规则将消息发送给指定队列

答案

1=>B

RabbitMQ发布订阅模式_编写生产者

消息中间件RabbitMQ(从入门到精通)

接下来我们编写发布订阅模式的生产者:



// 生产者
public class Producer {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.创建交换机
    /**
     * 参数1:交换机名
     * 参数2:交换机类型
     * 参数3:交换机持久化
     */
    channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT,true);
    // 5.创建队列
    channel.queueDeclare("SEND_MAIL",true,false,false,null);
    channel.queueDeclare("SEND_MESSAGE",true,false,false,null);
    channel.queueDeclare("SEND_STATION",true,false,false,null);
    // 6.交换机绑定队列
    /**
     * 参数1:队列名
     * 参数2:交换机名
     * 参数3:路由关键字,发布订阅模式写""即可
     */
    channel.queueBind("SEND_MAIL","exchange_fanout","");
    channel.queueBind("SEND_MESSAGE","exchange_fanout","");
    channel.queueBind("SEND_STATION","exchange_fanout","");
    // 7.发送消息
    for (int i = 1; i <= 10 ; i++) {
      channel.basicPublish("exchange_fanout","",null,
           ("你好,尊敬的用户,秒杀商品开抢了!"+i).getBytes(StandardCharsets.UTF_8));
     }
    // 8.关闭资源
    channel.close();
    connection.close();
   }
}

效果如下:

消息中间件RabbitMQ(从入门到精通)

实时效果反馈

1. RabbitMQ发布订阅模式使用的交换机为


direct


fanout


routing


topic

答案

1=>B

RabbitMQ发布订阅模式_编写消费者

接下来编写三个消费者,分别监听各自的队列。



// 站内信消费者
public class CustomerStation {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");// 默认虚拟机
    //2.创建连接
    Connection conn = connectionFactory.newConnection();
    //3.建立信道
    Channel channel = conn.createChannel();
    // 4.监听队列
    channel.basicConsume("SEND_STATION", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送站内信:"+message);
       }
     });
   }
}
 
 
// 邮件消费者
public class Customer_Mail {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");// 默认虚拟机
    //2.创建连接
    Connection conn = connectionFactory.newConnection();
    //3.建立信道
    Channel channel = conn.createChannel();
    // 4.监听队列
    channel.basicConsume("SEND_MAIL", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送邮件:"+message);
       }
     });
   }
}
 
 
// 短信消费者
public class Customer_Message {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");// 默认虚拟机
    //2.创建连接
    Connection conn = connectionFactory.newConnection();
    //3.建立信道
    Channel channel = conn.createChannel();
    // 4.监听队列
    channel.basicConsume("SEND_MESSAGE", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送短信:"+message);
       }
     });
   }
}

也可以使用工作队列+发布订阅模式同时使用,两个消费者同时监听一个队列:



// 短信消费者2
public class Customer_Message2 {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");// 默认虚拟机
    //2.创建连接
    Connection conn = connectionFactory.newConnection();
    //3.建立信道
    Channel channel = conn.createChannel();
    // 4.监听队列
    channel.basicConsume("SEND_MESSAGE", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送短信2:"+message);
       }
     });
   }
}

效果如下:

消息中间件RabbitMQ(从入门到精通)

RabbitMQ路由模式_概念

消息中间件RabbitMQ(从入门到精通)

使用发布订阅模式时,所有消息都会发送到绑定的队列中,但很多时候,不是所有消息都无差别的发布到所有队列中。比如电商网站的促销活动,双十一大促可能会发布到所有队列;而一些小的促销活动为了节约成本,只发布到站内信队列。此时需要使用路由模式(Routing)完成这一需求。

特点:

每个队列绑定路由关键字RoutingKey生产者将带有RoutingKey的消息发送给交换机,交换机根据RoutingKey转发到指定队列。路由模式使用direct交换机(但是最好自己创建一个direct交换机 不用默认的)。

实时效果反馈

1. RabbitMQ路由模式的特点为

A 能将消息发送给一条队列

B 能将消息发送给多条队列

C 能按照路由键将消息发送给指定队列

D 能按照通配符规则将消息发送给指定队列

答案

1=>C

RabbitMQ路由模式_编写生产者

消息中间件RabbitMQ(从入门到精通)

接下来我们编写路由模式的生产者:



// 生产者
public class Producer {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.创建交换机
    channel.exchangeDeclare("exchange_routing", BuiltinExchangeType.DIRECT,true);
    // 5.创建队列
    channel.queueDeclare("SEND_MAIL2",true,false,false,null);
    channel.queueDeclare("SEND_MESSAGE2",true,false,false,null);
    channel.queueDeclare("SEND_STATION2",true,false,false,null);
    // 6.交换机绑定队列
    channel.queueBind("SEND_MAIL2","exchange_routing","import");
    channel.queueBind("SEND_MESSAGE2","exchange_routing","import");
    channel.queueBind("SEND_STATION2","exchange_routing","import");
    channel.queueBind("SEND_STATION2","exchange_routing","normal");
    // 7.发送消息
    channel.basicPublish("exchange_routing","import",null,
        "双十一大促活动".getBytes());
    channel.basicPublish("exchange_routing","normal",null,
        "小心促销活动".getBytes());
    // 8.关闭资源
    channel.close();
    connection.close();
   }
}

实时效果反馈

1. RabbitMQ路由模式使用的交换机为


direct


fanout


routing


topic

2. 关于RabbitMQ路由模式,说法正确的是

A 队列需要绑定路由关键字

B 一个队列可以绑定多个路由关键字

C 交换机根据路由关键字将消息转发到指定队列

D 以上说法都正确

答案

1=>A 2=>D

RabbitMQ路由模式_编写消费者

接下来我们编写路由模式的消费者:



// 站内信消费者
public class Customer_Station {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");// 默认虚拟机
    //2.创建连接
    Connection conn = connectionFactory.newConnection();
    //3.建立信道
    Channel channel = conn.createChannel();
    // 4.监听队列
    channel.basicConsume("SEND_STATION2", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送站内信:"+message);
       }
     });
   }
}
 
 
// 邮件消费者
public class Customer_Mail {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");
    //2.创建连接
    Connection conn = connectionFactory.newConnection();
    //3.建立信道
    Channel channel = conn.createChannel();
    // 4.监听队列
    channel.basicConsume("SEND_MAIL2", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送邮件:"+message);
       }
     });
   }
}
 
 
// 短信消费者
public class Customer_Message {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");// 默认虚拟机
    //2.创建连接
    Connection conn = connectionFactory.newConnection();
    //3.建立信道
    Channel channel = conn.createChannel();
    // 4.监听队列
    channel.basicConsume("SEND_MESSAGE2", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送短信:"+message);
       }
     });
   }
}

运行生产者和消费者,效果如下:

消息中间件RabbitMQ(从入门到精通)

RabbitMQ通配符模式_概念

消息中间件RabbitMQ(从入门到精通)

通配符模式(Topic)是在路由模式的基础上,给队列绑定带通配符的路由关键字,只要消息的RoutingKey能实现通配符匹配,就会将消息转发到该队列。通配符模式比路由模式更灵活,使用topic交换机。

通配符规则:

消息设置RoutingKey时,RoutingKey由多个单词构成,中间以
.
分割。队列设置RoutingKey时,
#
可以匹配任意多个单词,
*
可以匹配任意一个单词。

实时效果反馈

1. RabbitMQ通配符模式的特点为

A 能将消息发送给一条队列

B 能将消息发送给多条队列

C 能按照路由键将消息发送给指定队列

D 能按照通配符规则将消息发送给指定队列

2. 关于RabbitMQ通配符模式的匹配规则,说法正确的是


#
可以匹配任意多个单词,
*
可以匹配任意一个单词


#
可以匹配任意一个单词,
*
可以匹配任意多个单词


#

*
都可以匹配任意多个单词


#

*
都可以匹配任意一个单词

答案

1=>D 2=>A

RabbitMQ通配符模式_编写生产者

消息中间件RabbitMQ(从入门到精通)

接下来我们编写通配符模式的生产者:



// 生产者
public class Producer {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");
    // 2.创建连接
    Connection connection = connectionFactory.newConnection();
    // 3.建立信道
    Channel channel = connection.createChannel();
    // 4.创建交换机
    channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC,true);
    // 5.创建队列
    channel.queueDeclare("SEND_MAIL3",true,false,false,null);
    channel.queueDeclare("SEND_MESSAGE3",true,false,false,null);
    channel.queueDeclare("SEND_STATION3",true,false,false,null);
    // 6.交换机绑定队列
    channel.queueBind("SEND_MAIL3","exchange_topic","#.mail.#");
    channel.queueBind("SEND_MESSAGE3","exchange_topic","#.message.#");
    channel.queueBind("SEND_STATION3","exchange_topic","#.station.#");
    // 7.发送消息
    channel.basicPublish("exchange_topic","mail.message.station",null,
        "双十一大促活动".getBytes());
    channel.basicPublish("exchange_topic","station",null,
        "小型促销活动".getBytes());
    // 8.关闭资源
    channel.close();
    connection.close();
   }
}

实时效果反馈

1. RabbitMQ通配符模式发送消息时,消息的的特点为

A 由一个单词构成

B 由多个单词构成,中间以
.
分割

C 由多个单词构成,中间以
_
分割

D 由多个单词构成,中间以
,
分割

2. RabbitMQ通配符模式使用的交换机为


direct


fanout


routing


topic

答案

1=>B 2=>D

RabbitMQ通配符模式_编写消费者

接下来我们编写通配符模式的消费者:



// 站内信消费者
public class Customer_Station {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");// 默认虚拟机
 
 
    //2.创建连接
    Connection conn = connectionFactory.newConnection();
  
    //3.建立信道
    Channel channel = conn.createChannel();
  
    // 4.监听队列
    channel.basicConsume("SEND_STATION3", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送站内信:"+message);
       }
     });
   }
}
 
 
// 邮件消费者
public class Customer_Mail {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");// 默认虚拟机
 
 
    //2.创建连接
    Connection conn = connectionFactory.newConnection();
  
    //3.建立信道
    Channel channel = conn.createChannel();
  
    // 4.监听队列
    channel.basicConsume("SEND_MAIL3", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送邮件:"+message);
       }
     });
   }
 
 
}
 
 
// 短信消费者
public class Customer_Message {
  public static void main(String[] args) throws IOException, TimeoutException {
    // 1.创建连接工厂
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("itbaizhan");
    connectionFactory.setPassword("itbaizhan");
    connectionFactory.setVirtualHost("/");// 默认虚拟机
 
 
    //2.创建连接
    Connection conn = connectionFactory.newConnection();
  
    //3.建立信道
    Channel channel = conn.createChannel();
  
    // 4.监听队列
    channel.basicConsume("SEND_MESSAGE3", true, new DefaultConsumer(channel) {
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "utf-8");
        System.out.println("发送短信:"+message);
       }
     });
   }
}

SpringBoot整合RabbitMQ_项目搭建

消息中间件RabbitMQ(从入门到精通)

之前我们使用原生JAVA操作RabbitMQ较为繁琐,接下来我们使用SpringBoot整合RabbitMQ,简化代码编写。

创建SpringBoot项目,引入RabbitMQ起步依赖



<!-- RabbitMQ起步依赖 -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

编写配置文件



spring:
  rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: itbaizhan
   password: itbaizhan
   virtual-host: /
 
 
#日志格式
logging:
  pattern:
   console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'

实时效果反馈

1. RabbitMQ默认虚拟机路径为


/default


/rabbit


/guest


/

答案

1=>D

pringBoot整合RabbitMQ_创建对列和交换机

消息中间件RabbitMQ(从入门到精通)

SpringBoot整合RabbitMQ时,需要在配置类创建队列和交换机,写法如下:



@Configuration
public class RabbitConfig {
  private final String EXCHANGE_NAME = "boot_topic_exchange";
  private final String QUEUE_NAME = "boot_queue";
 
 
  // 创建交换机
  @Bean("bootExchange")
  public Exchange getExchange() {
    return ExchangeBuilder
         .topicExchange(EXCHANGE_NAME) // 交换机类型
         .durable(true) // 是否持久化
         .build();
   }
 
 
  // 创建队列
  @Bean("bootQueue")
  public Queue getMessageQueue() {
    return new Queue(QUEUE_NAME); // 队列名
   }
 
 
  // 交换机绑定队列
  @Bean
  public Binding bindMessageQueue(@Qualifier("bootExchange") Exchange exchange, @Qualifier("bootQueue") Queue queue) {
    return BindingBuilder
         .bind(queue)
         .to(exchange)
         .with("#.message.#")
         .noargs();
   }
}

实时效果反馈

1. SpringBoot整合RabbitMQ时,在创建交换机和队列

A 配置文件

B 配置类

C 测试类

D 实体类

答案

1=>B

SpringBoot整合RabbitMQ_编写生产者

消息中间件RabbitMQ(从入门到精通)

SpringBoot整合RabbitMQ时,提供了工具类RabbitTemplate发送消息,编写生产者时只需要注入RabbitTemplate即可发送消息。



@SpringBootTest
public class TestProducer {
  // 注入RabbitTemplate工具类
  @Autowired
  private RabbitTemplate rabbitTemplate;
 
 
  @Test
  public void testSendMessage(){
    /**
     * 发送消息
     * 参数1:交换机
     * 参数2:路由key
     * 参数3:要发送的消息
     */
    rabbitTemplate.convertAndSend("boot_topic_exchange","message","双十一开始了!");
   }
}

运行生产者代码,即可看到消息发送到了RabbitMQ中:

消息中间件RabbitMQ(从入门到精通)

实时效果反馈

1. SpringBoot整合RabbitMQ时,对象可以发送消息


RabbitRestTemplate


RabbitMqTemplate


RabbitTemplate


Template

答案

1=>C

SpringBoot整合RabbitMQ_编写消费者

消息中间件RabbitMQ(从入门到精通)

我们编写另一个SpringBoot项目作为RabbitMQ的消费者

创建SpringBoot项目,引入RabbitMQ起步依赖



<!-- rabbitmq起步依赖 -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

编写配置文件



spring:
  rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: itbaizhan
   password: itbaizhan
   virtual-host: /
 
 
#日志格式
logging:
  pattern:
   console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'

编写消费者,监听队列



@Component
public class Consumer {
  // 监听队列
  @RabbitListener(queues = "boot_queue")
  public void listen_message(String message){
    System.out.println("发送短信:"+message);
   }
}

启动项目,可以看到消费者会消费队列中的消息

实时效果反馈

1. SpringBoot整合RabbitMQ时,监听队列的注解为


@RabbitMqListener


@QueueListener


@MqListener


@RabbitListener

答案

1=>D

消息的可靠性投递_概念

消息中间件RabbitMQ(从入门到精通)

RabbitMQ消息投递的路径为:


生产者
—>
交换机
—>
队列
—>
消费者

在RabbitMQ工作的过程中,每个环节消息都可能传递失败,那么RabbitMQ是如何监听消息是否成功投递的呢?

确认模式(confirm)可以监听消息是否从生产者成功传递到交换机。退回模式(return)可以监听消息是否从交换机成功传递到队列。消费者消息确认(Consumer Ack)可以监听消费者是否成功处理消息。

首先我们准备两个SpringBoot项目,分别代表生产者和消费者,配置文件如下:



spring:
  rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: itbaizhan
   password: itbaizhan
   virtual-host: /
  
#日志格式
logging:
  pattern:
   console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'

在生产者的配置类创建交换机和队列



@Configuration
public class RabbitConfig {
  private final String EXCHANGE_NAME="my_topic_exchange";
  private final String QUEUE_NAME="my_queue";
 
 
  // 1.创建交换机
  @Bean("bootExchange")
  public Exchange getExchange(){
    return ExchangeBuilder
         .topicExchange(EXCHANGE_NAME) // 交换机类型
         .durable(true) // 是否持久化
           .build();
   }
 
 
  // 2.创建队列
  @Bean("bootQueue")
  public Queue getMessageQueue(){
    return QueueBuilder
         .durable(QUEUE_NAME) // 队列持久化
         .build();
   }
 
 
  // 3.将队列绑定到交换机
  @Bean
  public Binding bindMessageQueue(@Qualifier("bootExchange") Exchange exchange, @Qualifier("bootQueue") Queue queue){
    return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();
   }
}

实时效果反馈

1. 在RabbitMQ中,可以监听消息是否从生产者成功传递到交换机


confirm


return


Ack

D 以上说法都不对

2. 在RabbitMQ中,可以监听消费者是否成功处理消息


confirm


return


Ack

D 以上说法都不对

答案

1=>A 2=>C

消息的可靠性投递_确认模式

消息中间件RabbitMQ(从入门到精通)

确认模式(confirm)可以监听消息是否从生产者成功传递到交换机,使用方法如下:

生产者配置文件开启确认模式



spring:
  rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: itbaizhan
   password: itbaizhan
   virtual-host: /
  # 开启确认模式
   publisher-confirm-type: correlated

生产者定义确认模式的回调方法



@SpringBootTest
public class ProducerTest {
  @Autowired
  private RabbitTemplate rabbitTemplate;
 
 
  @Test
  public void testConfirm(){
    // 定义确认模式的回调方法,消息向交换机发送后会调用confirm方法
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
      /**
       * 被调用的回调方法
       * @param correlationData 相关配置信息
       * @param ack 交换机是否成功收到了消息
       * @param cause 失败原因
       */
      @Override
      public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack){
          System.out.println("confirm接受成功!");
         }else{
          System.out.println("confirm接受失败,原因为:"+cause);
          // 做一些处理。
         }
       }
     });
    rabbitTemplate.convertAndSend("my_topic_exchange","my_routing","send message...");
   }
}

实时效果反馈

1. 在RabbitMQ中,需要在中配置才能开启确认模式

A 生产者

B 消费者

C 生产者和消费者

D 不需要配置

2. 在RabbitMQ中,RabbitTemplate调用方法定义确认模式的回调方法


setReturnsCallback


setConfirmCallback


setReturns


setConfirm

答案

1=>A 2=>B

消息的可靠性投递_退回模式(失败的时候才会报错 正常时不会输出信息)

消息中间件RabbitMQ(从入门到精通)

退回模式(return)可以监听消息是否从交换机成功传递到队列,使用方法如下:

生产者配置文件开启退回模式



spring:
  rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: itbaizhan
   password: itbaizhan
   virtual-host: /
  # 开启确认模式
   publisher-confirm-type: correlated
  # 开启回退模式
   publisher-returns: true

生产者定义退回模式的回调方法



@SpringBootTest
public class ProducerTest {
  @Autowired
  private RabbitTemplate rabbitTemplate;
 
 
  @Test
  public void testReturn(){
    // 定义退回模式的回调方法。交换机发送到队列失败后才会执行returnedMessage方法
    rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
      /**
       * @param returned 失败后将失败信息封装到参数中
       */
      @Override
      public void returnedMessage(ReturnedMessage returned) {
        System.out.println("消息对象:"+returned.getMessage());
        System.out.println("错误码:"+returned.getReplyCode());
        System.out.println("错误信息:"+returned.getReplyText());
        System.out.println("交换机:"+returned.getExchange());
        System.out.println("路由键:"+returned.getRoutingKey());
        // 处理消息...
       }
     });
    rabbitTemplate.convertAndSend("my_topic_exchange","my_routing1","send message...");
   }
}

实时效果反馈

1. 在RabbitMQ中,退回模式可以监听

A 生产者是否成功传递到交换机

B 交换机是否成功传递到队列

C 消费者是否成功消费消息

D 以上都可以监听

2. 在RabbitMQ中,RabbitTemplate调用方法定义退回模式的回调方法


setReturnsCallback


setConfirmCallback


setReturns


setConfirm

答案

1=>B 2=A

消息的可靠性投递_Ack

消息中间件RabbitMQ(从入门到精通)

在RabbitMQ中,消费者接收到消息后会向队列发送确认签收的消息,只有确认签收的消息才会被移除队列。这种机制称为消费者消息确认(Consumer Acknowledge,简称Ack)。类似快递员派送快递也需要我们签收,否则一直存在于快递公司的系统中。

消息分为自动确认和手动确认。自动确认指消息只要被消费者接收到,无论是否成功处理消息,则自动签收,并将消息从队列中移除。但是在实际开发中,收到消息后可能业务处理出现异常,那么消息就会丢失。此时需要设置手动签收,即在业务处理成功再通知签收消息,如果出现异常,则拒签消息,让消息依然保留在队列当中。

自动确认:spring.rabbitmq.listener.simple.acknowledge=”none”手动确认:spring.rabbitmq.listener.simple.acknowledge=”manual”

消费者配置开启手动签收



spring:
  rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: itbaizhan
   password: itbaizhan
   virtual-host: /
  # 开启手动签收
   listener:
    simple:
     acknowledge-mode: manual

消费者处理消息时定义手动签收和拒绝签收的情况



@Component
public class AckConsumer {
  @RabbitListener(queues = "my_queue")
  public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException {
    // 消息投递序号,消息每次投递该值都会+1
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    try {
      int i = 1/0; //模拟处理消息出现bug
      System.out.println("成功接受到消息:"+message);
      // 签收消息
      /**
       * 参数1:消息投递序号
       * 参数2:是否一次可以签收多条消息
       */
      channel.basicAck(deliveryTag,true);
     }catch (Exception e){
      System.out.println("消息消费失败!");
      Thread.sleep(2000);
      // 拒签消息
      /**
       * 参数1:消息投递序号
       * 参数2:是否一次可以拒签多条消息
       * 参数3:拒签后消息是否重回队列
       */
      channel.basicNack(deliveryTag,true,true);
     }
   }
}

实时效果反馈

1. 在RabbitMQ中,Ack可以监听

A 生产者是否成功传递到交换机

B 交换机是否成功传递到队列

C 消费者是否成功消费消息

D 以上都可以监听

2. 在RabbitMQ中,使用Ack手动签收一般在代码中的表现形式为

A 在
try
中签收消息

B 在
catch
中拒签消息

C 在
try
中签收消息,在
catch
中拒签消息

D 在
try
中拒签消息,在
catch
中签收消息

答案

1=>C 2=>C

消息的可靠性投递_Ack

消息中间件RabbitMQ(从入门到精通)

在RabbitMQ中,消费者接收到消息后会向队列发送确认签收的消息,只有确认签收的消息才会被移除队列。这种机制称为消费者消息确认(Consumer Acknowledge,简称Ack)。类似快递员派送快递也需要我们签收,否则一直存在于快递公司的系统中。

消息分为自动确认和手动确认。自动确认指消息只要被消费者接收到,无论是否成功处理消息,则自动签收,并将消息从队列中移除。但是在实际开发中,收到消息后可能业务处理出现异常,那么消息就会丢失。此时需要设置手动签收,即在业务处理成功再通知签收消息,如果出现异常,则拒签消息,让消息依然保留在队列当中。

自动确认:spring.rabbitmq.listener.simple.acknowledge=”none”手动确认:spring.rabbitmq.listener.simple.acknowledge=”manual”

消费者配置开启手动签收



spring:
  rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: itbaizhan
   password: itbaizhan
   virtual-host: /
  # 开启手动签收
   listener:
    simple:
     acknowledge-mode: manual

消费者处理消息时定义手动签收和拒绝签收的情况



@Component
public class AckConsumer {
  @RabbitListener(queues = "my_queue")
  public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException {
    // 消息投递序号,消息每次投递该值都会+1
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    try {
      int i = 1/0; //模拟处理消息出现bug
      System.out.println("成功接受到消息:"+message);
      // 签收消息
      /**
       * 参数1:消息投递序号
       * 参数2:是否一次可以签收多条消息
       */
      channel.basicAck(deliveryTag,true);
     }catch (Exception e){
      System.out.println("消息消费失败!");
      Thread.sleep(2000);
      // 拒签消息
      /**
       * 参数1:消息投递序号
       * 参数2:是否一次可以拒签多条消息
       * 参数3:拒签后消息是否重回队列
       */
      channel.basicNack(deliveryTag,true,true);
     }
   }
}

实时效果反馈

1. 在RabbitMQ中,Ack可以监听

A 生产者是否成功传递到交换机

B 交换机是否成功传递到队列

C 消费者是否成功消费消息

D 以上都可以监听

2. 在RabbitMQ中,使用Ack手动签收一般在代码中的表现形式为

A 在
try
中签收消息

B 在
catch
中拒签消息

C 在
try
中签收消息,在
catch
中拒签消息

D 在
try
中拒签消息,在
catch
中签收消息

答案

1=>C 2=>C

RabbitMQ高级特性_消费端限流

消息中间件RabbitMQ(从入门到精通)

之前我们讲过MQ可以对请求进行“削峰填谷”,即通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。

消费端限流的写法如下:

生产者批量发送消息



@Test
public void testSendBatch() {
  // 发送十条消息
  for (int i = 0; i < 10; i++) {
    rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", "send message..."+i);
   }
}

消费端配置限流机制



spring:
  rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: itbaizhan
   password: itbaizhan
   virtual-host: /
   listener:
    simple:
    # 限流机制必须开启手动签收
     acknowledge-mode: manual
    # 消费端最多拉取5条消息消费,签收后不满5条才会继续拉取消息。
     prefetch: 5

消费者监听队列



@Component
public class QosConsumer{
  @RabbitListener(queues = "my_queue")
  public void listenMessage(Message message, Channel channel) throws IOException, InterruptedException {
    // 1.获取消息
    System.out.println(new String(message.getBody()));
    // 2.模拟业务处理
    Thread.sleep(3000);
    // 3.签收消息
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
   }
}

实时效果反馈

1. 在RabbitMQ中,使用消费端限流必须开启

A 确认模式

B 退回模式

C 手动签收消息

D 什么都不需要开启

答案

1=>C

RabbitMQ高级特性_利用限流实现不公平分发

消息中间件RabbitMQ(从入门到精通)

在RabbitMQ中,多个消费者监听同一条队列,则队列默认采用的轮询分发。但是在某种场景下这种策略并不是很好,例如消费者1处理任务的速度非常快,而其他消费者处理速度却很慢。此时如果采用公平分发,则消费者1有很大一部分时间处于空闲状态。此时可以采用不公平分发,即谁处理的快,谁处理的消息多。

使用方法如下:

生产者批量发送消息



@Test
public void testSendBatch() {
  // 发送十条消息
  for (int i = 0; i < 10; i++) {
    rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", "send message..."+i);
   }
}

消费端配置不公平分发



spring:
  rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: itbaizhan
   password: itbaizhan
   virtual-host: /
   listener:
    simple:
    # 限流机制必须开启手动签收
     acknowledge-mode: manual
    # 消费端最多拉取1条消息消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发
     prefetch: 1

编写两个消费者



@Component
public class UnfairConsumer {
  // 消费者1
  @RabbitListener(queues = "my_queue")
  public void listenMessage1(Message message, Channel channel) throws Exception {
    //1.获取消息
    System.out.println("消费者1:"+new String(message.getBody(),"UTF-8"));
    //2. 处理业务逻辑
    Thread.sleep(500); // 消费者1处理快
    //3. 手动签收
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
   }
  
  // 消费者2
  @RabbitListener(queues = "my_queue")
  public void listenMessage2(Message message, Channel channel) throws Exception {
    //1.获取消息
    System.out.println("消费者2:"+new String(message.getBody(),"UTF-8"));
    //2. 处理业务逻辑
    Thread.sleep(3000);// 消费者2处理慢
    //3. 手动签收
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
   }
}

实时效果反馈

1. 在RabbitMQ中,实现不公平分发需要

A 将消费端限流为0

B 将消费端限流为1

C 将消费端限流为2

D 将消费端限流为3

答案

1=>B

RabbitMQ高级特性_消息存活时间

消息中间件RabbitMQ(从入门到精通)

RabbitMQ可以设置消息的存活时间(Time To Live,简称TTL),当消息到达存活时间后还没有被消费,会被移出队列。RabbitMQ可以对队列的所有消息设置存活时间,也可以对某条消息设置存活时间。

设置队列所有消息存活时间(就是在创建队列的时候设置存活时间就行)

在创建队列时设置其存活时间:



@Configuration
public class RabbitConfig2 {
  private final String EXCHANGE_NAME="my_topic_exchange2";
  private final String QUEUE_NAME="my_queue2";
 
 
  // 1.创建交换机
  @Bean("bootExchange2")
  public Exchange getExchange2(){
    return ExchangeBuilder
         .topicExchange(EXCHANGE_NAME)
         .durable(true).
        build();
   }
 
 
  // 2.创建队列
  @Bean("bootQueue2")
  public Queue getMessageQueue2(){
    return QueueBuilder
         .durable(QUEUE_NAME)
         .ttl(10000) //队列的每条消息存活10s
         .build();
   }
 
 
  // 3.将队列绑定到交换机
  @Bean
  public Binding bindMessageQueue2(@Qualifier("bootExchange2") Exchange exchange, @Qualifier("bootQueue2") Queue queue){
    return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();
   }
}

生产者批量生产消息,测试存活时间



@Test
public void testSendBatch2() throws InterruptedException {
  // 发送十条消息
  for (int i = 0; i < 10; i++) {
    rabbitTemplate.convertAndSend("my_topic_exchange2", "my_routing", "send message..."+i);
   }
}

实时效果反馈

1. 在RabbitMQ中,当消息到达存活时间后还没有被消费,则

A 会被自动消费

B 会被自动清除

C 会被持久化

D 没有任何效果

2. 以下关于RabbitMQ的说法,正确的是

A 只能对队列的所有消息设置存活时间

B 只能对某条消息设置存活时间

C 可以对队列的所有消息设置存活时间,也可以对某条消息设置存活时间

D 不能设置存活时间

答案

1=>B 2=>C

设置单条消息存活时间



@Test
public void testSendMessage() {
  //设置消息属性
  MessageProperties messageProperties = new MessageProperties();
  //设置存活时间
  messageProperties.setExpiration("10000");
  // 创建消息对象 里面的参数1是发送的信息体 参数2是消息属性)
  Message message = new Message("send message...".getBytes(StandardCharsets.UTF_8), messageProperties);
  // 发送消息
  rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", message);
}

注意

如果设置了单条消息的存活时间,也设置了队列的存活时间,以时间短的为准

消息过期后,并不会马上移除消息,只有消息消费到队列顶端时,才会移除该消息。



@Test
public void testSendMessage2() {
  for (int i = 0; i < 10; i++) {
    if (i == 5) {
      // 1.创建消息属性
      MessageProperties messageProperties = new MessageProperties();
      // 2.设置存活时间
      messageProperties.setExpiration("10000");
      // 3.创建消息对象
      Message message = new Message(("send message..." + i).getBytes(), messageProperties);
      // 4.发送消息
      rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", message);
     } else {
      rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", "send message..." + i);
     }
   }
}

在以上案例中,i=5的消息才有过期时间,10s后消息并没有马上被移除,但该消息已经不会被消费了,当它到达队列顶端时会被移除。

实时效果反馈

1. 在RabbitMQ中,如果设置了单条消息的存活时间,也设置了队列的存活时间,

A 以单条消息的存活时间为准

B 以队列的存活时间为准

C 以时间短的为准

D 以时间长的为准

2. RabbitMQ消息过期后,

A 消息会被立即移除

B 只有消息在队列顶端时,才会被立即移除。

C 只有消息不在队列顶端时,才会被立即移除。

D 以上说法都不对

答案

1=>C 2=>B

RabbitMQ高级特性_优先级队列

消息中间件RabbitMQ(从入门到精通)

假设在电商系统中有一个订单催付的场景,即客户在一段时间内未付款会给用户推送一条短信提醒,但是系统中分为大型商家和小型商家。比如像苹果,小米这样大商家一年能给我们创造很大的利润,所以在订单量大时,他们的订单必须得到优先处理,此时就需要为不同的消息设置不同的优先级,此时我们要使用优先级队列。

优先级队列用法如下:

创建队列和交换机



@Configuration
public class RabbitConfig3 {
  private final String EXCHANGE_NAME="priority_exchange";
  private final String QUEUE_NAME="priority_queue";
 
 
  // 1.创建交换机
  @Bean(EXCHANGE_NAME)
  public Exchange priorityExchange(){
    return ExchangeBuilder
         .topicExchange(EXCHANGE_NAME)
         .durable(true).
        build();
   }
 
 
  // 2.创建队列
  @Bean(QUEUE_NAME)
  public Queue priorityQueue(){
    return QueueBuilder
         .durable(QUEUE_NAME)
        //设置队列的最大优先级,最大可以设置到255,官网推荐不要超过10,,如果设置太高比较浪费资源
         .maxPriority(10)
         .build();
   }
 
 
  // 3.将队列绑定到交换机
  @Bean
  public Binding bindPriority(@Qualifier(EXCHANGE_NAME) Exchange exchange, @Qualifier(QUEUE_NAME) Queue queue){
    return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs();
   }
}

编写生产者



@Test
public void testPriority() {
  for (int i = 0; i < 10; i++) {
    if (i == 5) {
      // i为5时消息的优先级较高
      MessageProperties messageProperties = new MessageProperties();
      messageProperties.setPriority(9);
      Message message = new Message(("send message..." + i).getBytes(StandardCharsets.UTF_8), messageProperties);
      rabbitTemplate.convertAndSend("priority_exchange", "my_routing", message);
     } else {
      rabbitTemplate.convertAndSend("priority_exchange", "my_routing", "send message..." + i);
     }
   }
}

编写消费者



@Component
public class PriorityConsumer {
  @RabbitListener(queues = "priority_queue")
  public void listenMessage(Message message, Channel channel) throws Exception {
    //获取消息
    System.out.println(new String(message.getBody()));
    //手动签收
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
   }
}

实时效果反馈

1. 在RabbitMQ中,消息的优先级数值越大,

A 越先被消费

B 越后被消费

C 随机被消费

D 以上说法都不对

答案

1=>A

RabbitMQ死信队列_概念

消息中间件RabbitMQ(从入门到精通)

在MQ中,当消息成为死信(Dead message)后,消息中间件可以将其从当前队列发送到另一个队列中,这个队列就是死信队列。而在RabbitMQ中,由于有交换机的概念,实际是将死信发送给了死信交换机(Dead Letter Exchange,简称DLX)。死信交换机和死信队列和普通的没有区别。

消息中间件RabbitMQ(从入门到精通)

消息成为死信的情况:

队列消息长度到达限制。消费者拒签消息,并且不把消息重新放入原队列。消息到达存活时间未被消费。

实时效果反馈

1. 在RabbitMQ中,什么情况下消息会成为死信

A 队列消息长度到达限制。

B 消费者拒签消息,并且不把消息重新放入原队列。

C 消息到达存活时间未被消费。

D 以上说法都正确

2. 在RabbitMQ中,消息成为死信后,会

A 什么都不做

B 持久化消息

C 发送到死信交换机

D 被消费者消费

答案

1=>D 2=>C

RabbitMQ死信队列_代码实现

消息中间件RabbitMQ(从入门到精通)

创建死信队列(创建和普通的一样,只不过普通的在创建队列的时候需要绑定死信交换机和死信的路由键)



@Configuration
public class RabbitConfig4 {
  private final String DEAD_EXCHANGE = "dead_exchange";
  private final String DEAD_QUEUE = "dead_queue";
 
 
  private final String NORMAL_EXCHANGE = "normal_exchange";
  private final String NORMAL_QUEUE = "normal_queue";
 
 
 
 
  // 死信交换机
  @Bean(DEAD_EXCHANGE)
  public Exchange deadExchange(){
    return ExchangeBuilder
         .topicExchange(DEAD_EXCHANGE)
         .durable(true)
         .build();
   }
 
 
  // 死信队列
  @Bean(DEAD_QUEUE)
  public Queue deadQueue(){
    return QueueBuilder
         .durable(DEAD_QUEUE)
         .build();
   }
 
 
  // 死信交换机绑定死信队列
  @Bean
  public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange exchange,@Qualifier(DEAD_QUEUE)Queue queue){
    return BindingBuilder
         .bind(queue)
         .to(exchange)
         .with("dead_routing")
         .noargs();
   }
 
 
  // 普通交换机
  @Bean(NORMAL_EXCHANGE)
  public Exchange normalExchange(){
    return ExchangeBuilder
         .topicExchange(NORMAL_EXCHANGE)
         .durable(true)
         .build();
   }
 
 
  // 普通队列
  @Bean(NORMAL_QUEUE)
  public Queue normalQueue(){
    return QueueBuilder
         .durable(NORMAL_QUEUE)
         .deadLetterExchange(DEAD_EXCHANGE) // 绑定死信交换机
         .deadLetterRoutingKey("dead_routing") // 死信队列路由关键字
         .ttl(10000) // 消息存活10s
         .maxLength(10) // 队列最大长度为10
         .build();
   }
 
 
  // 普通交换机绑定普通队列
  @Bean
  public Binding bindNormalQueue(@Qualifier(NORMAL_EXCHANGE) Exchange exchange,@Qualifier(NORMAL_QUEUE)Queue queue){
    return BindingBuilder
         .bind(queue)
         .to(exchange)
         .with("my_routing")
         .noargs();
   }
}

实时效果反馈

1. 在RabbitMQ中,普通队列绑定死信队列时,需要绑定

A 死信交换机

B 死信队列的路由关键字

C 死信交换机和死信队列的路由关键字

D 以上说法都不正确

答案

1=>C

RabbitMQ延迟队列_概念

消息中间件RabbitMQ(从入门到精通)

延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。

例如:用户下单后,30分钟后查询订单状态,未支付则会取消订单。

消息中间件RabbitMQ(从入门到精通)

但RabbitMQ中并未提供延迟队列功能,我们可以使用死信队列实现延迟队列的效果。

消息中间件RabbitMQ(从入门到精通)

实时效果反馈

1. 延迟队列的特点是

A 消息持久化后才会被消费。

B 只有到达指定时间后,才会被消费。

C 消息会被消费多次

D 以上说法都不正确

2. 在RabbitMQ中,我们可以用实现延迟队列

A 死信队列

B 优先级队列

C 确认模式

D 消费端限流

答案

1=>B 2=>A

RabbitMQ延迟队列_死信队列实现

消息中间件RabbitMQ(从入门到精通)

接下来我们使用死信队列实现延迟队列

创建SpringBoot订单模块,添加SpringMVC、RabbitMQ、lombok依赖。



<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.projectlombok</groupId>
  <artifactId>lombok</artifactId>
</dependency>

编写配置文件



spring:
  rabbitmq:
   host: 192.168.0.162
   port: 5672
   username: itbaizhan
   password: itbaizhan
   virtual-host: /
  
# 日志格式
logging:
  pattern:
   console: '%d{HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n'

RabbitMQ延迟队列_插件实现

消息中间件RabbitMQ(从入门到精通)

在使用死信队列实现延迟队列时,会遇到一个问题:RabbitMQ只会移除队列顶端的过期消息,如果第一个消息的存活时长较长,而第二个消息的存活时长较短,则第二个消息并不会及时执行。

消息中间件RabbitMQ(从入门到精通)

RabbitMQ虽然本身不能使用延迟队列,但官方提供了延迟队列插件,安装后可直接使用延迟队列。

消息中间件RabbitMQ(从入门到精通)

安装延迟队列插件

使用rz将插件上传至虚拟机

安装插件

# 将插件放入RabbitMQ插件目录中
mv rabbitmq_delayed_message_exchange-3.9.0.ez /usr/local/rabbitmq/plugins/

# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

重启RabbitMQ服务

#停止rabbitmq
rabbitmqctl stop

#启动rabbitmq
rabbitmq-server restart -detached

此时登录管控台可以看到交换机类型多了延迟消息

消息中间件RabbitMQ(从入门到精通)

使用延迟队列(只用创建延迟队列和交换机,以及在发送消息的时候设置延迟时间就行)

创建延迟交换机和延迟队列



@Configuration
public class RabbitConfig2 {
  public final String DELAYED_EXCHANGE = "delayed_exchange";
  public final String DELAYED_QUEUE = "delayed_queue";
 
 
  //1.延迟交换机
  @Bean(DELAYED_EXCHANGE)
  public Exchange delayedExchange() {
    // 创建自定义交换机
    Map<String, Object> args = new HashMap<>();
    args.put("x-delayed-type", "topic"); // topic类型的延迟交换机
    return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, args);
   }
 
 
  //2.延迟队列
  @Bean(DELAYED_QUEUE)
  public Queue delayedQueue() {
    return QueueBuilder
         .durable(DELAYED_QUEUE)
         .build();
   }
 
 
  // 3.绑定
  @Bean
  public Binding bindingDelayedQueue(@Qualifier(DELAYED_QUEUE) Queue queue, @Qualifier(DELAYED_EXCHANGE) Exchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with("order_routing").noargs();
   }
}

编写下单的控制器方法



@GetMapping("/place2/{orderId}")
public String placeOrder2(@PathVariable String orderId) {
  System.out.println("处理订单数据...");
 
 
  // 设置消息延迟时间为10秒
  MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
      message.getMessageProperties().setDelay(10000);
      return message;
     }
   };
  // 将订单id发送到订单队列
  rabbitTemplate.convertAndSend("delayed_exchange", "order_routing", orderId, messagePostProcessor);
  return "下单成功,修改库存";
}

编写延迟队列的消费者



@RabbitListener(queues = "delayed_queue")
public void listenMessage(String orderId){
  System.out.println("查询"+orderId+"号订单的状态,如果已支付则无需处理,如果未支付则需要回退库存");
}

下单测试:访问http://localhost:8080/place/10002

实时效果反馈

1. 在RabbitMQ中,使用延迟插件后,延迟交换机的类型是


direct


fanout


topic


x-delayed-message

答案

1=>D

RabbitMQ集群_集群搭建

消息中间件RabbitMQ(从入门到精通)

在生产环境中,当单台RabbitMQ服务器无法满足消息的吞吐量及安全性要求时,需要搭建RabbitMQ集群。

设置两个RabbitMQ服务



# 关闭RabbitMQ服务
rabbitmqctl stop
 
 
# 设置服务一
RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=rabbit1 rabbitmq-server start -detached
 
 
# 设置服务二
RABBITMQ_NODE_PORT=5674 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15674}]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server start -detached

将两个服务设置到同一集群中



# 关闭服务2
rabbitmqctl -n rabbit2 stop_app
# 重新设置服务2
rabbitmqctl -n rabbit2 reset
# 将服务2加入服务1中
rabbitmqctl -n rabbit2 join_cluster rabbit1@localhost
# 启动服务2
rabbitmqctl -n rabbit2 start_app

实时效果反馈

1. 单机版的RabbitMQ服务无法满足真实应用的要求时,应当

A 搭建RabbitMQ集群

B 增强主机性能

C 更换MQ产品

D 以上说法都不对

答案

1=>A

RabbitMQ集群_集群搭建

消息中间件RabbitMQ(从入门到精通)

在生产环境中,当单台RabbitMQ服务器无法满足消息的吞吐量及安全性要求时,需要搭建RabbitMQ集群。

设置两个RabbitMQ服务



# 关闭RabbitMQ服务
rabbitmqctl stop
 
 
# 设置服务一
RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=rabbit1 rabbitmq-server start -detached
 
 
# 设置服务二
RABBITMQ_NODE_PORT=5674 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15674}]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server start -detached

将两个服务设置到同一集群中



# 关闭服务2
rabbitmqctl -n rabbit2 stop_app
# 重新设置服务2
rabbitmqctl -n rabbit2 reset
# 将服务2加入服务1中
rabbitmqctl -n rabbit2 join_cluster rabbit1@localhost
# 启动服务2
rabbitmqctl -n rabbit2 start_app

实时效果反馈

1. 单机版的RabbitMQ服务无法满足真实应用的要求时,应当

A 搭建RabbitMQ集群

B 增强主机性能

C 更换MQ产品

D 以上说法都不对

答案

1=>A

RabbitMQ集群_镜像队列

消息中间件RabbitMQ(从入门到精通)

搭建了集群后,虽然多个节点可以互相通信,但队列只保存在了一个节点中,如果该节点故障,则整个集群都将丢失消息。

# 关闭服务1

rabbitmqctl -n rabbit1 stop_app

消息中间件RabbitMQ(从入门到精通)

此时我们需要引入镜像队列机制,它可以将队列消息复制到集群中的其他节点上。如果一个节点失效了,另一个节点上的镜像可以保证服务的可用性。

在管控台点击
Admin
—>
Policies
设置镜像队列

消息中间件RabbitMQ(从入门到精通)

此时某个节点故障则不会影响整个集群。

实时效果反馈

1. 在RabbitMQ集群中,通过设置可以为队列创建副本

A 副本队列

B 镜像队列

C 负载均衡

D 备用队列

答案

1=>B

RabbitMQ集群_负载均衡

消息中间件RabbitMQ(从入门到精通)

无论是生产者还是消费者,只能连接一个RabbitMQ节点,而在我们使用RabbitMQ集群时,如果只连接一个RabbitMQ节点,会造成该节点的压力过大。我们需要平均的向每个RabbitMQ节点发送请求,此时需要一个负载均衡工具帮助我们分发请求,接下来使用Haproxy做负载均衡:

安装Haproxy

yum
-y install haproxy

配置Haproxy

vim /etc/haproxy/haproxy.cfg

添加如下内容:



# 以下为修改内容
defaults
        # 修改为tcp
        mode tcp
 
 
# 以下为添加内容
listen rabbitmq_cluster
        # 对外暴露端口
     bind 0.0.0.0:5672
     mode tcp
     balance roundrobin
     # 代理RabbitMQ的端口
     server node1 127.0.0.1:5673 check inter 5000 rise 2 fall 2
     server node2 127.0.0.1:5674 check inter 5000 rise 2 fall 2
 
 
listen stats
        # Haproxy控制台路径
     bind 192.168.0.162:8100
     mode http
     option httplog
     stats enable
     stats uri /rabbitmq-stats
     stats refresh 5s

启动Haproxy

haproxy
-f /etc/haproxy/haproxy.cfg

访问Haproxy控制台:http://192.168.0.162:8100/rabbitmq-stats

生产者连接Haproxy发送消息



// 生产者
public class Producer {
  public static void main(String[] args) throws IOException, TimeoutException {
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("192.168.0.162");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    connectionFactory.setVirtualHost("/");
 
 
    Connection conn = connectionFactory.newConnection();
    Channel channel = conn.createChannel();
    channel.queueDeclare("simple_queue", false, false, false, null);
 
 
    channel.basicPublish("", "simple_queue", null, "hello!rabbitmq!".getBytes());
 
 
    channel.close();
    conn.close();
   }
}

实时效果反馈

1. 在RabbitMQ集群中,通过设置可以帮助我们给不同节点分发请求

A 副本队列

B 镜像队列

C 负载均衡

D 备用队列

答案

1=>C

© 版权声明

相关文章

暂无评论

none
暂无评论...