You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

32 KiB

消息队列


作者:行癫(盗版必究)

一:消息队列简介

1.消息队列概念

消息队列Message Queue是分布式系统中重要的组件其通用的使用场景可以简单地描述为当不需要立即获得结果但是并发量又需要进行控制的时候差不多就是需要使用消息队列的时候

消息队列主要解决了应用耦合、异步处理、流量削锋等问题

当前使用较多的消息队列有RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等而部分数据库如Redis、Mysql以及phxsql也可实现消息队列的功能

2.消息队列使用场景

1.应用解耦:将应用进行解耦

具体场景:用户下单后,订单系统需要通知库存系统

传统做法(传统模式的缺点:假如库存系统无法访问,则订单减库存将失败,从而导致订单失败,订单系统与库存系统耦合)

image-20230306105904655

使用消息队列

image-20230306110107306

订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功

库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作

假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦

2.异步处理:多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间

具体场景:用户为了使用某个应用,进行注册,系统需要发送注册邮件并验证短信

1串行方式新注册信息生成后先发送注册邮件再发送验证短信

image-20230306220448414

2并行处理新注册信息写入后由发短信和发邮件并行处理

image-20230306220509834

3若使用消息队列在写入消息队列后立即返回成功给客户端则总的响应时间依赖于写入消息队列的时间而写入消息队列的时间本身是可以很快的基本可以忽略不计因此总的处理时间相比串行提高了2倍相比并行提高了一倍

image-20230306220604601

3.限流削峰:广泛应用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉的情况

具体场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列

image-20230306220642886

1作用

1可以控制活动的人数

2可以缓解短时间内高流量压垮应用

2用户的请求服务器接收后首先写入消息队列。假如消息队列长度超过最大数量则直接抛弃用户请求或跳转到错误页面

3秒杀业务根据消息队列中的请求信息再做后续处理

4.日志处理日志处理是指将消息队列用在日志处理中比如Kafka的应用解决大量日志传输的问题

image-20230306220749942

1日志采集客户端负责日志数据采集定时写受写入Kafka队列

2Kafka消息队列负责日志数据的接收存储和转发

3日志处理应用订阅并消费kafka队列中的日志数据

5.消息通讯:消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯

1点对点通讯客户端A和客户端B使用同一队列进行消息通讯

image-20230306220849654

2聊天时通讯客户端A客户端B客户端N订阅同一主题进行消息发布和接收。实现类似聊天室效果

image-20230306220906078

3.消息队列模式

点对点模式

每个消息只有一个接收者Consumer(即一旦被消费,消息就不再在消息队列中)

发送者和接收者间没有依赖性,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息

接收者在成功接收消息之后需向队列应答成功,以便消息队列删除当前接收的消息

发布/订阅模式

每个消息可以有多个订阅者

发布者和订阅者之间有时间上的依赖性。针对某个主题Topic的订阅者它必须创建一个订阅者之后才能消费发布者的消息

为了消费消息,订阅者需要提前订阅该角色主题,并保持在线运行

4.常见的消息队列

RabbitMQ

RabbitMQ 2007年发布是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一

主要特性

可靠性: 提供了多种技术可以让你在性能和可靠性之间进行权衡。这些技术包括持久性机制、投递确认、发布者证实和高可用性机制

灵活的路由: 消息在到达队列前是通过交换机进行路由的

消息集群在相同局域网中的多个RabbitMQ服务器可以聚合在一起作为一个独立的逻辑代理来使用

队列高可用:队列可以在集群中的机器上进行镜像,以确保在硬件问题下还保证消息安全

多种协议的支持:支持多种消息队列协议

服务器端用Erlang语言编写支持只要是你能想到的所有编程语言

管理界面: RabbitMQ有一个易用的用户界面使得用户可以监控和管理消息Broker的许多方面

跟踪机制如果消息异常RabbitMQ提供消息跟踪机制使用者可以找出发生了什么

插件机制:提供了许多插件,来从多方面进行扩展,也可以编写自己的插件

优点

由于erlang语言的特性mq 性能较好,高并发

健壮、稳定、易用、跨平台、支持多种语言、文档齐全

有消息确认机制和持久化机制,可靠性高

高度可定制的路由

管理界面较丰富,在互联网公司也有较大规模的应用

缺点

尽管结合erlang语言本身的并发优势性能较好但是不利于做二次开发和维护

实现了代理架构,意味着消息在发送到客户端之前可以在中央节点上排队;使得其运行速度较慢,消息封装后也比较大

需要学习比较复杂的接口和协议,学习和维护成本较高

ActiveMQ

ActiveMQ是由Apache出品ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。它非常快速支持多种语言的客户端和协议而且可以非常容易的嵌入到企业的应用环境中并有许多高级功能

RocketMQ

RocketMQ出自阿里公司的开源产品用 Java 语言实现,在设计时参考了 Kafka并做出了自己的一些改进消息可靠性上比 Kafka 更好。RocketMQ在阿里集团被广泛应用在订单交易充值流计算消息推送日志流式处理binglog分发等场景

kafka

Apache Kafka是一个分布式消息发布订阅系统。它最初由LinkedIn公司基于独特的设计实现为一个分布式的提交日志系统( a distributed commit log)之后成为Apache项目的一部分。Kafka系统快速、可扩展并且可持久化。它的分区特性可复制和可容错都是其不错的特性

总结:

Kafka在于分布式架构RabbitMQ基于AMQP协议来实现RocketMQ/思路来源于kafka改成了主从结构在事务性可靠性方面做了优化。广泛来说电商、金融等对事务性要求很高的可以考虑RabbitMQ和RocketMQ对性能要求高的可考虑Kafka

安装RabbitMq

1.安装erllang

[root@xingdian ~]#  wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-23.3.4.11-1.el7.x86_64.rpm/download.rpm
[root@xingdian ~]# yum install erlang-23.3.4.11-1.el7.x86_64.rpm -y

2.安装RabbitMq

[root@xingdian ~]#  wget --content-disposition https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.10.0-1.el7.noarch.rpm/download.rpm
[root@xingdian ~]# yum -y install rabbitmq-server-3.10.0-1.el7.noarch.rpm

3.启动服务

[root@xingdian ~]# systemctl start rabbitmq-server

4.配置远程访问

注意:需要手动上传配置文件到指定目录下

下载地址:https://xingdian-file.oss-cn-hangzhou.aliyuncs.com/rabbitmq.conf

[root@xingdian ~]# vim /etc/rabbitmq/rabbitmq.conf
loopback_users.guest = false

5.开启WEB界面

[root@xingdian ~]# rabbitmq-plugins enable rabbitmq_management
[root@xingdian ~]# systemctl restart rabbitmq-server

6.浏览器访问

用户名guest 密码guest

image-20230307193304467

7.用户管理

image-20230307193506397

超级管理员(administrator)

可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作

监控者(monitoring)

可登陆管理控制台同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)

策略制定者(policymaker)

可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息

普通管理者(management)

仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理

其他

无法登陆管理控制台,通常就是普通的生产者和消费者

8.Channels信号

信道是生产消费者与rabbit通信的渠道生产者publish或者消费者消费一个队列都是需要通过信道来通信的

信道是建立在TCP上面的虚拟链接也就是rabbitMQ在一个TCP上面建立成百上千的信道来达到多个线程处理

注意:

为什么RabbitMQ 需要信道如果直接进行TCP通信呢

TCP的创建开销很大创建需要三次握手销毁需要四次握手

如果不使用信道那么引用程序就会使用TCP方式进行连接到RabbitMQ因为MQ可能每秒会进行成千上万的链接

总之就是TCP消耗资源

image-20230307200529779

使用rabbitmq时不管是消费还是生产都需要创建信道channel 和connection连接连接是连接到RabbitMQ的服务器

9.Exchanges交换机

介绍

生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列;交换机必须确切知道如何处理收到的消息;是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们;这就的由交换机的类型来决定

类型

直接(direct)

主题(topic)

标题(headers)

扇出(fanout)

绑定bindings

binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系( X 与 Q1 和 Q2 进行了绑定)

image-20230307201201420

扇出fanout

它是将接收到的所有消息广播到它知道的所有队列中

image-20230307201424965

直接direct

exchange在和queue进行binding时会设置routingkey

channel.QueueBind(queue: "create_pdf_queue",
                    exchange: "pdf_events",
                    routingKey: "pdf_create",
                    arguments: null);

将消息发送到exchange时会设置对应的routingkey

channel.BasicPublish(exchange: "pdf_events",
                        routingKey: "pdf_create",
                        basicProperties: properties,
                        body: body);

在direct类型的exchange中只有这两个routingkey完全相同exchange才会选择对应的binging进行消息路由

image-20230307201700721

主题topic

此类型exchange和上面的direct类型差不多但direct类型要求routingkey完全相等这里的routingkey可以有通配符',#.

其中’'表示匹配一个单词, '#'则表示匹配没有或者多个单词

image-20230307201805873

第一个binding

exchange: agreements
queue A:  berlin_agreements
binding routingkey: agreements.eu.berlin.#

第二个binding

exchange: agreements
queue B: all_agreements
binding routingkey: agreements.#

第三个binding

exchange: agreements
queue c: headstore_agreements
binding routingkey: agreements.eu.*.headstore

所以如果我们消息的routingkey为agreements.eu.berlin那么符合第一和第二个binding但最后一个不符合

标题(headers)

不处理路由键。而是根据发送的消息内容中的headers属性进行匹配在绑定Queue与Exchange时指定一组键值对当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配如果完全匹配则消息会路由到该队列否则不会路由到该队列

匹配规则x-match有下列两种类型

x-match = all :表示所有的键值对都匹配才能接受到消息

x-match = any :表示只要有键值对匹配就能接受到消息

10.Queues队列

image-20230307203729529

name: 队列名称

durable 队列是否持久化队列默认是存放到内存中的rabbitmq重启则丢失保存到Erlang自带的Mnesia数据库中可持久存储

exclusive是否排他的队列

当连接关闭时connection.close()该队列是否会自动删除

设置队列是否是私有的,如果非排他(false)的可以使用两个消费者都访问同一个队列没有任何问题如果是排他的会对当前队列加锁其他通道channel是不能访问的如果强制访问会报异常

com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue queue_name in vhost /, class-id=50, method-id=20

autoDelete是否自动删除当最后一个消费者断开连接之后队列是否自动被删除当consumers = 0时队列就会自动删除

arguments 队列中的消息什么时候会自动被删除

Message TTL(x-message-ttl):设置队列中的所有消息的生存周期;单位毫秒;生存时间到了,消息会被从队里中删除

Auto Expire(x-expires): 当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…)就会被删除,Features=Exp

Max Length(x-max-length): 限定队列的消息的最大值长度超过指定长度将会把最早的几条删除掉Feature=Lim

Max Length Bytes(x-max-length-bytes): 限定队列最大占用的空间大小, 一般受限于内存、磁盘的大小, Features=Lim B

Dead letter exchange(x-dead-letter-exchange) 当队列消息长度大于最大长度、或者过期的等,将从队列中删除的消息推送到指定的交换机中去而不是丢弃掉,Features=DLX

Dead letter routing key(x-dead-letter-routing-key):将删除的消息推送到指定交换机的指定路由键的队列中去, Feature=DLK

Maximum priority(x-max-priority):优先级队列,声明队列时先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高(数值更大的)的消息先被消费

Lazy mode(x-queue-mode=lazy) Lazy Queues: 先将消息保存到磁盘上不放在内存中当消费者开始消费的时候才加载到内存中Master locator(x-queue-master-locator)

RabbitMq案例

1.部署新服务器

准备python3环境

准备数据库

密码123456

库名pydb

用户root

库名user

字符集utf8

表名nameageheight

2.项目地址

https://xingdian-file.oss-cn-hangzhou.aliyuncs.com/py-rabbitmq/receive.py

https://xingdian-file.oss-cn-hangzhou.aliyuncs.com/py-rabbitmq/send.py

注意修改程序信息先运行send.py在运行receive.py

RabbitMq集群

1.原理介绍

RabbitMQ是依据erlang的分布式特性RabbitMQ底层是通过Erlang架构来实现的所以rabbitmqctl会启动Erlang节点并基于Erlang节点来使用Erlang系统连接RabbitMQ节点在连接过程中需要正确的Erlang Cookie和节点名称Erlang节点通过交换Erlang Cookie以获得认证来实现的所以部署Rabbitmq分布式集群时要先安装Erlang并把其中一个服务的cookie复制到另外的节点

RabbitMQ集群中各个RabbitMQ为对等节点即每个节点均提供给客户端连接进行消息的接收和发送。节点分为内存节点和磁盘节点一般的均应建立为磁盘节点为了防止机器重启后的消息消失

RabbitMQ的Cluster集群模式一般分为两种普通模式和镜像模式。消息队列通过RabbitMQ HA镜像队列进行消息队列实体复制

普通模式下以两个节点rabbit01、rabbit02为例来进行说明。对于Queue来说消息实体只存在于其中一个节点rabbit01或者rabbit02rabbit01和rabbit02两个节点仅有相同的元数据即队列的结构。当消息进入rabbit01节点的Queue后consumer从rabbit02节点消费时RabbitMQ会临时在rabbit01、rabbit02间进行消息传输把A中的消息实体取出并经过B发送给consumer。所以consumer应尽量连接每一个节点从中取消息。即对于同一个逻辑队列要在多个节点建立物理Queue。否则无论consumer连rabbit01或rabbit02出口总在rabbit01会产生瓶颈

镜像模式下将需要消费的队列变为镜像队列存在于多个节点这样就可以实现RabbitMQ的HA高可用性。作用就是消息实体会主动在镜像节点之间实现同步而不是像普通模式那样在consumer消费数据时临时读取。缺点就是集群内部的同步通讯会占用大量的网络带宽

2.集群部署

单一模式:即单机情况不做集群,就单独运行一个 rabbitmq 而已

普通模式默认模式以两个节点rabbit01、rabbit02为例来进行说明。对于 Queue 来说,消息实体只存在于其中一个节点 rabbit01或者 rabbit02rabbit01 和 rabbit02 两个节点仅有相同的元数据,即队列的结构。当消息进入 rabbit01 节点的 Queue 后consumer 从 rabbit02 节点消费时RabbitMQ 会临时在 rabbit01、rabbit02 间进行消息传输,把 A 中的消息实体取出并经过 B 发送给 consumer。所以 consumer 应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理 Queue。否则无论 consumer 连 rabbit01 或 rabbit02出口总在 rabbit01会产生瓶颈。当 rabbit01 节点故障后rabbit02 节点无法取到 rabbit01 节点中还未消费的消息实体。如果做了消息持久化,那么得等 rabbit01 节点恢复,然后才可被消费;如果没有持久化的话,就会产生消息丢失的现象

镜像模式: 把需要的队列做成镜像队列,存在与多个节点属于 RabbitMQ 的 HA 方案。该模式解决了普通模式中的问题,其实质和普通模式不同之处在于,消息实体会主动在镜像节点间同步,而不是在客户端取数据时临时拉取。该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的网络带宽将会被这种同步通讯大大消耗掉。所以在对可靠性要求较高的场合中适用

环境准备

3台centos7操作系统ip分别为
192.168.120.138
192.168.120.139
192.168.120.140

本地解析

[root@rabbitmq1 ~]# vim /etc/hosts
192.168.120.138 rabbitmq1
192.168.120.139 rabbitmq2
192.168.120.140 rabbitmq3

保证 3 台能 ping 通

[root@rabbitmq1 ~]# ping  rabbitmq1
PING 192.168.31.154 (192.168.31.154) 56(84) bytes of data.
64 bytes from 192.168.31.154: icmp_seq=1 ttl=64 time=0.025 ms
64 bytes from 192.168.31.154: icmp_seq=2 ttl=64 time=0.043 ms
64 bytes from 192.168.31.154: icmp_seq=3 ttl=64 time=0.042 ms
^C
--- 192.168.31.154 ping statistics ---
3 packets transmitted, 3 received, 0% packet loss, time 1999ms
rtt min/avg/max/mdev = 0.025/0.036/0.043/0.010 ms
[root@rabbitmq1 ~]# ping  rabbitmq2
PING 192.168.31.155 (192.168.31.155) 56(84) bytes of data.
64 bytes from 192.168.31.155: icmp_seq=1 ttl=64 time=0.742 ms
64 bytes from 192.168.31.155: icmp_seq=2 ttl=64 time=0.343 ms
^C
--- 192.168.31.155 ping statistics ---
2 packets transmitted, 2 received, 0% packet loss, time 1000ms
rtt min/avg/max/mdev = 0.343/0.542/0.742/0.200 ms
[root@rabbitmq1 ~]# ping rabbitmq3
PING 192.168.31.156 (192.168.31.156) 56(84) bytes of data.
64 bytes from 192.168.31.156: icmp_seq=1 ttl=64 time=1.15 ms
64 bytes from 192.168.31.156: icmp_seq=2 ttl=64 time=0.380 ms
^C
--- 192.168.31.156 ping statistics ---
2 packets transmitted, 2 received, 0% packet loss, time 1001ms
rtt min/avg/max/mdev = 0.380/0.768/1.157/0.389 ms

安装erlang和socat

[root@xingdian-server-1 /]# wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-23.3.4.11-1.el7.x86_64.rpm/download.rpm
[root@xingdian-server-1 /]#  yum install erlang-23.3.4.11-1.el7.x86_64.rpm -y
[root@rabbitmq1 ~]# yum install -y socat

安装rabbitmq

[root@xingdian-server-1 /]#  wget --content-disposition https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.10.0-1.el7.noarch.rpm/download.rpm
[root@xingdian-server-1 /]#  yum -y install rabbitmq-server-3.10.0-1.el7.noarch.rpm

开启用户远程登录

[root@rabbitmq-1 rabbitmq]# vim rabbitmq.conf
loopback_users.guest = false

rabbitmq 常用命令

[root@rabbitmq1 ~]# systemctl start rabbitmq-server.service 
每台都操作开启rabbitmq的web访问界面
[root@rabbitmq-1 ~]# rabbitmq-plugins enable rabbitmq_management

账号配置

安装启动后其实还不能在其它机器访问rabbitmq 默认的 guest 账号只能在本地机器访问, 如果想在其它机器访问需配置其它账号

4.创建用户:
注意:在一台机器操作
添加用户和密码
[root@rabbitmq-1 ~]# rabbitmqctl add_user soho soso
Creating user "soho" ...
...done.
这是为管理员
[root@rabbitmq-1 ~]# rabbitmqctl set_user_tags soho administrator
Setting tags for user "soho" to [administrator] ...
...done.
查看用户
[root@rabbitmq-1 ~]# rabbitmqctl list_users
Listing users ...
guest	[administrator]
soho	[administrator]
...done.

设置权限

此处设置权限时注意'.'之间需要有空格 三个'.'分别代表了conf权限read权限与write权限 例如当没有给soho设置这三个权限前是没有权限查询队列在ui界面也看不见

[root@rabbitmq-1 ~]# rabbitmqctl set_permissions -p "/" soho ".*" ".*" ".*"
Setting permissions for user "soho" in vhost "/" ...
...done.   

查看端口

4369 -- erlang发现口

5672 --程序连接端口

15672 -- 管理界面ui端口

25672 -- server间内部通信口

集群部署

1.首先创建好数据存放目录和日志存放目录:
[root@rabbitmq-1 ~]# mkdir -p /data/rabbitmq/data
[root@rabbitmq-1 ~]# mkdir -p /data/rabbitmq/logs
[root@rabbitmq-1 ~]# chmod 777 -R /data/rabbitmq
[root@rabbitmq-1 ~]# chown rabbitmq.rabbitmq /data/ -R
创建配置文件:
[root@rabbitmq-1 ~]# vim /etc/rabbitmq/rabbitmq-env.conf
[root@rabbitmq-1 ~]# cat /etc/rabbitmq/rabbitmq-env.conf
RABBITMQ_MNESIA_BASE=/data/rabbitmq/data
RABBITMQ_LOG_BASE=/data/rabbitmq/logs
重启服务
[root@rabbitmq-1 ~]# systemctl restart rabbitmq-server

拷⻉erlang.cookie

Rabbitmq的集群是依附于erlang的集群来⼯作的,所以必须先构建起erlang的集群景象。Erlang的集群中各节点是经由⼀个cookie来实现的,这个cookie存放在/var/lib/rabbitmq/.erlang.cookie中⽂件是400的权限。所以必须保证各节点cookie⼀致,不然节点之间就⽆法通信

#如果执行# rabbitmqctl stop_app 这条命令报错:需要执行
#chmod 400 .erlang.cookie
#chown rabbitmq.rabbitmq .erlang.cookie

官方在介绍集群的文档中提到过.erlang.cookie 一般会存在这两个地址第一个是home/.erlang.cookie第二个地方就是/var/lib/rabbitmq/.erlang.cookie。如果我们使用解压缩方式安装部署的rabbitmq那么这个文件会在{home}目录下,也就是$home/.erlang.cookie。如果我们使用rpm等安装包方式进行安装的那么这个文件会在/var/lib/rabbitmq目录下

[root@rabbitmq-1 ~]# cat /var/lib/rabbitmq/.erlang.cookie
HOUCUGJDZYTFZDSWXTHJ
⽤scp的⽅式将rabbitmq-1节点的.erlang.cookie的值复制到其他两个节点中。
[root@rabbitmq-1 ~]# scp /var/lib/rabbitmq/.erlang.cookie root@192.168.50.139:/var/lib/rabbitmq/
[root@rabbitmq-1 ~]# scp /var/lib/rabbitmq/.erlang.cookie root@192.168.50.140:/var/lib/rabbitmq/

将mq-2、mq-3作为内存节点加⼊mq-1节点集群中

[root@rabbitmq2 ~]# systemctl restart rabbitmq-server
[root@rabbitmq-2 ~]# rabbitmqctl stop_app  #停止节点,切记不是停止服务
[root@rabbitmq-2 ~]# rabbitmqctl reset   #如果有数据需要重置,没有则不用
[root@rabbitmq-2 ~]# rabbitmqctl join_cluster --ram rabbit@rabbitmq-1  
Clustering node 'rabbit@rabbitmq-2' with 'rabbit@rabbitmq-1' ...
[root@rabbitmq-2 ~]# rabbitmqctl start_app  #启动节点
Starting node 'rabbit@rabbitmq-2' ...

默认rabbitmq启动后是磁盘节点在这个cluster命令下mq-2和mq-3是内存节点mq-1是磁盘节点

如果要使mq-2、mq-3都是磁盘节点去掉--ram参数即可

如果想要更改节点类型可以使⽤命令rabbitmqctl change_cluster_node_type,disc(ram),前提是必须停掉rabbit应⽤

如果有需要使用磁盘节点加入集群

 [root@rabbitmq-2 ~]# rabbitmqctl join_cluster  rabbit@rabbitmq-1
 [root@rabbitmq-3 ~]# rabbitmqctl join_cluster  rabbit@rabbitmq-1

查看集群状态

在 RabbitMQ 集群任意节点上执行 rabbitmqctl cluster_status来查看是否集群配置成功

在mq-1磁盘节点上面查看

[root@rabbitmq-1 ~]# rabbitmqctl cluster_status

image-20230307205356851

登录rabbitmq web管理控制台

image-20230307205441863

根据界⾯提示创建⼀条队列

image-20230307205509008

image-20230307205523905

注意:

在RabbitMQ集群中必须⾄少有⼀个磁盘节点否则队列元数据⽆法写⼊到集群中当磁盘节点宕掉时集群将⽆法写⼊新的队列元数据信息

部署RabbitMQ镜像配置

上面已经完成RabbitMQ默认集群模式但并不保证队列的高可用性队列内容不会复制。如果队列节点宕机直接导致该队列无法应用只能等待重启所以要想在队列节点宕机或故障也能正常应用就要复制队列内容到集群里的每个节点必须要创建镜像队列镜像队列是基于普通的集群模式的

创建镜像集群:三台机器相同操作

rabbitmq set_policy :设置策略
[root@rabbitmq-1 ~]#rabbitmqctl set_policy  ha-all "^" '{"ha-mode":"all"}'
Setting policy "ha-all" for pattern "^" to "{"ha-mode":"all"}" with priority "0" for vhost "/" ...

[root@rabbitmq-2 ~]# rabbitmqctl set_policy  ha-all "^" '{"ha-mode":"all"}'
Setting policy "ha-all" for pattern "^" to "{"ha-mode":"all"}" with priority "0" for vhost "/" ...

[root@rabbitmq-3 ~]# rabbitmqctl set_policy  ha-all "^" '{"ha-mode":"all"}'
Setting policy "ha-all" for pattern "^" to "{"ha-mode":"all"}" with priority "0" for vhost "/" ...

image-20230307205623465

注意

"^"匹配所有的队列, ha-all 策略名称为ha-all, '{"ha-mode":"all"}' 策略模式为 all 即复制到所有节点,包含新增节点

设置策略介绍

rabbitmqctl set_policy [-p Vhost] Name Pattern Definition
-p Vhost 可选参数针对指定vhost下的queue进行设置
Name: policy(策略)的名称
Pattern: queue的匹配模式(正则表达式),也就是说会匹配一组。
Definition镜像定义包括三个部分ha-mode, ha-params, ha-sync-mode
    ha-mode:指明镜像队列的模式,有效值为 all/exactly/nodes
        all表示在集群中所有的节点上进行镜像
        exactly表示在指定个数的节点上进行镜像节点的个数由ha-params指定
        nodes表示在指定的节点上进行镜像节点名称通过ha-params指定
    ha-paramsha-mode模式需要用到的参数
    ha-sync-mode进行队列中消息的同步方式有效值为automatic(自动)和manual(手动)
案例:
例如对队列名称以hello开头的所有队列进行镜像并在集群的两个节点上完成镜像policy的设置命令为
rabbitmqctl set_policy hello-ha "^hello" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
rabbitmqctl set_policy hello-ha "^hello" '{"ha-mode":"nodes","ha-params":["rabbit@rabbitmq-2"],"ha-sync-mode":"automatic"}'

则此时镜像队列设置成功 已经部署完成 将所有队列设置为镜像队列,即队列会被复制到各个节点,各个节点状态保持一致