消息队列

------ **作者:行癫(盗版必究)** ## 一:消息队列简介 #### 1.消息队列概念 ​ 消息队列(Message Queue),是分布式系统中重要的组件,其通用的使用场景可以简单地描述为:当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候 ​ 消息队列主要解决了应用耦合、异步处理、流量削锋等问题 ​ 当前使用较多的消息队列有RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,而部分数据库如Redis、Mysql以及phxsql也可实现消息队列的功能 #### 2.消息队列使用场景 1.应用解耦:将应用进行解耦 ​ 具体场景:用户下单后,订单系统需要通知库存系统 ​ 传统做法(传统模式的缺点:假如库存系统无法访问,则订单减库存将失败,从而导致订单失败,订单系统与库存系统耦合) image-20230306105904655 ​ 使用消息队列 image-20230306110107306 ​ 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功 ​ 库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作 ​ 假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦 2.异步处理:多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理,减少处理时间 ​ 具体场景:用户为了使用某个应用,进行注册,系统需要发送注册邮件并验证短信 1)串行方式:新注册信息生成后,先发送注册邮件,再发送验证短信 image-20230306220448414 2)并行处理:新注册信息写入后,由发短信和发邮件并行处理 image-20230306220509834 3)若使用消息队列:在写入消息队列后立即返回成功给客户端,则总的响应时间依赖于写入消息队列的时间,而写入消息队列的时间本身是可以很快的,基本可以忽略不计,因此总的处理时间相比串行提高了2倍,相比并行提高了一倍 image-20230306220604601 3.限流削峰:广泛应用于秒杀或抢购活动中,避免流量过大导致应用系统挂掉的情况 ​ 具体场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列 image-20230306220642886 1)作用 ​ (1)可以控制活动的人数 ​ (2)可以缓解短时间内高流量压垮应用 2)用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面 3)秒杀业务根据消息队列中的请求信息,再做后续处理 4.日志处理:日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题 image-20230306220749942 ​ 1)日志采集客户端,负责日志数据采集,定时写受写入Kafka队列 ​ 2)Kafka消息队列,负责日志数据的接收,存储和转发 ​ 3)日志处理应用:订阅并消费kafka队列中的日志数据 5.消息通讯:消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯 ​ 1)点对点通讯(客户端A和客户端B使用同一队列,进行消息通讯) image-20230306220849654 ​ 2)聊天时通讯(客户端A,客户端B,客户端N订阅同一主题,进行消息发布和接收。实现类似聊天室效果) image-20230306220906078 #### 3.消息队列模式 **点对点模式** ​ 每个消息只有一个接收者(Consumer)(即一旦被消费,消息就不再在消息队列中) ​ 发送者和接收者间没有依赖性,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息 ​ 接收者在成功接收消息之后需向队列应答成功,以便消息队列删除当前接收的消息 **发布/订阅模式** ​ 每个消息可以有多个订阅者 ​ 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息 ​ 为了消费消息,订阅者需要提前订阅该角色主题,并保持在线运行 #### 4.常见的消息队列 RabbitMQ: ​ RabbitMQ 2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一 ​ 主要特性 ​ 可靠性: 提供了多种技术可以让你在性能和可靠性之间进行权衡。这些技术包括持久性机制、投递确认、发布者证实和高可用性机制 ​ 灵活的路由: 消息在到达队列前是通过交换机进行路由的 ​ 消息集群:在相同局域网中的多个RabbitMQ服务器可以聚合在一起,作为一个独立的逻辑代理来使用 ​ 队列高可用:队列可以在集群中的机器上进行镜像,以确保在硬件问题下还保证消息安全 ​ 多种协议的支持:支持多种消息队列协议 ​ 服务器端用Erlang语言编写,支持只要是你能想到的所有编程语言 ​ 管理界面: RabbitMQ有一个易用的用户界面,使得用户可以监控和管理消息Broker的许多方面 ​ 跟踪机制:如果消息异常,RabbitMQ提供消息跟踪机制,使用者可以找出发生了什么 ​ 插件机制:提供了许多插件,来从多方面进行扩展,也可以编写自己的插件 ​ 优点 ​ 由于erlang语言的特性,mq 性能较好,高并发 ​ 健壮、稳定、易用、跨平台、支持多种语言、文档齐全 ​ 有消息确认机制和持久化机制,可靠性高 ​ 高度可定制的路由 ​ 管理界面较丰富,在互联网公司也有较大规模的应用 ​ 缺点 ​ 尽管结合erlang语言本身的并发优势,性能较好,但是不利于做二次开发和维护 ​ 实现了代理架构,意味着消息在发送到客户端之前可以在中央节点上排队;使得其运行速度较慢,消息封装后也比较大 ​ 需要学习比较复杂的接口和协议,学习和维护成本较高 ActiveMQ ​ ActiveMQ是由Apache出品,ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。它非常快速,支持多种语言的客户端和协议,而且可以非常容易的嵌入到企业的应用环境中,并有许多高级功能 RocketMQ ​ RocketMQ出自阿里公司的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进,消息可靠性上比 Kafka 更好。RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景 kafka ​ Apache Kafka是一个分布式消息发布订阅系统。它最初由LinkedIn公司基于独特的设计实现为一个分布式的提交日志系统( a distributed commit log),之后成为Apache项目的一部分。Kafka系统快速、可扩展并且可持久化。它的分区特性,可复制和可容错都是其不错的特性 总结: ​ Kafka在于分布式架构,RabbitMQ基于AMQP协议来实现,RocketMQ/思路来源于kafka,改成了主从结构,在事务性可靠性方面做了优化。广泛来说,电商、金融等对事务性要求很高的,可以考虑RabbitMQ和RocketMQ,对性能要求高的可考虑Kafka ## 二:安装RabbitMq #### 1.安装erllang ```shell [root@xingdian ~]# wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-23.3.4.11-1.el7.x86_64.rpm/download.rpm [root@xingdian ~]# yum install erlang-23.3.4.11-1.el7.x86_64.rpm -y ``` 2.安装RabbitMq ```shell [root@xingdian ~]# wget --content-disposition https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.10.0-1.el7.noarch.rpm/download.rpm [root@xingdian ~]# yum -y install rabbitmq-server-3.10.0-1.el7.noarch.rpm ``` #### 3.启动服务 ```shell [root@xingdian ~]# systemctl start rabbitmq-server ``` #### 4.配置远程访问 ​ 注意:需要手动上传配置文件到指定目录下 ​ 下载地址:https://xingdian-file.oss-cn-hangzhou.aliyuncs.com/rabbitmq.conf ```shell [root@xingdian ~]# vim /etc/rabbitmq/rabbitmq.conf loopback_users.guest = false ``` #### 5.开启WEB界面 ```shell [root@xingdian ~]# rabbitmq-plugins enable rabbitmq_management [root@xingdian ~]# systemctl restart rabbitmq-server ``` #### 6.浏览器访问 ​ 用户名:guest 密码:guest ![image-20230307193304467](https://xingdian-image.oss-cn-beijing.aliyuncs.com/xingdian-image/image-20230307193304467.png) #### 7.用户管理 ![image-20230307193506397](https://xingdian-image.oss-cn-beijing.aliyuncs.com/xingdian-image/image-20230307193506397.png) 超级管理员(administrator) ​ 可登陆管理控制台,可查看所有的信息,并且可以对用户,策略(policy)进行操作 监控者(monitoring) ​ 可登陆管理控制台,同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等) 策略制定者(policymaker) ​ 可登陆管理控制台, 同时可以对policy进行管理。但无法查看节点的相关信息 普通管理者(management) ​ 仅可登陆管理控制台,无法看到节点信息,也无法对策略进行管理 其他 ​ 无法登陆管理控制台,通常就是普通的生产者和消费者 #### 8.Channels信号 ​ 信道是生产消费者与rabbit通信的渠道,生产者publish或者消费者消费一个队列都是需要通过信道来通信的 ​ 信道是建立在TCP上面的虚拟链接,也就是rabbitMQ在一个TCP上面建立成百上千的信道来达到多个线程处理 注意: ​ 为什么RabbitMQ 需要信道,如果直接进行TCP通信呢? ​ TCP的创建开销很大,创建需要三次握手,销毁需要四次握手 ​ 如果不使用信道,那么引用程序就会使用TCP方式进行连接到RabbitMQ,因为MQ可能每秒会进行成千上万的链接 ​ 总之就是TCP消耗资源 ![image-20230307200529779](https://xingdian-image.oss-cn-beijing.aliyuncs.com/xingdian-image/image-20230307200529779.png) ​ 使用rabbitmq时不管是消费还是生产都需要创建信道(channel) 和connection(连接);连接是连接到RabbitMQ的服务器 #### 9.Exchanges交换机 介绍 ​ 生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列;交换机必须确切知道如何处理收到的消息;是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们;这就的由交换机的类型来决定 类型 ​ 直接(direct) ​ 主题(topic) ​ 标题(headers) ​ 扇出(fanout) 绑定(bindings) ​ binding 其实是 exchange 和 queue 之间的桥梁,它告诉我们 exchange 和那个队列进行了绑定关系( X 与 Q1 和 Q2 进行了绑定) image-20230307201201420 扇出(fanout) ​ 它是将接收到的所有消息广播到它知道的所有队列中 image-20230307201424965 直接(direct) ​ exchange在和queue进行binding时会设置routingkey ```erlang channel.QueueBind(queue: "create_pdf_queue", exchange: "pdf_events", routingKey: "pdf_create", arguments: null); ``` ​ 将消息发送到exchange时会设置对应的routingkey ```erlang channel.BasicPublish(exchange: "pdf_events", routingKey: "pdf_create", basicProperties: properties, body: body); ``` ​ 在direct类型的exchange中,只有这两个routingkey完全相同,exchange才会选择对应的binging进行消息路由 image-20230307201700721 主题(topic) ​ 此类型exchange和上面的direct类型差不多,但direct类型要求routingkey完全相等,这里的routingkey可以有通配符:'‘,’#‘. ​ 其中’'表示匹配一个单词, '#'则表示匹配没有或者多个单词 image-20230307201805873 ​ 第一个binding ```erlang exchange: agreements queue A: berlin_agreements binding routingkey: agreements.eu.berlin.# ``` ​ 第二个binding ```erlang exchange: agreements queue B: all_agreements binding routingkey: agreements.# ``` ​ 第三个binding ```erlang exchange: agreements queue c: headstore_agreements binding routingkey: agreements.eu.*.headstore ``` ​ 所以如果我们消息的routingkey为agreements.eu.berlin那么符合第一和第二个binding,但最后一个不符合 标题(headers) ​ 不处理路由键。而是根据发送的消息内容中的headers属性进行匹配;在绑定Queue与Exchange时指定一组键值对;当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列 ​ 匹配规则x-match有下列两种类型 ​ x-match = all :表示所有的键值对都匹配才能接受到消息 ​ x-match = any :表示只要有键值对匹配就能接受到消息 #### 10.Queues队列 image-20230307203729529 ​ name: 队列名称 ​ durable: 队列是否持久化;队列默认是存放到内存中的,rabbitmq重启则丢失,保存到Erlang自带的Mnesia数据库中可持久存储 ​ exclusive:是否排他的队列 ​ 当连接关闭时connection.close()该队列是否会自动删除 ​ 设置队列是否是私有的,如果非排他(false)的,可以使用两个消费者都访问同一个队列,没有任何问题,如果是排他的,会对当前队列加锁,其他通道channel是不能访问的,如果强制访问会报异常 ```shell com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue ‘queue_name’ in vhost ‘/’, class-id=50, method-id=20 ``` ​ autoDelete:是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除;当consumers = 0时队列就会自动删除 ​ arguments: 队列中的消息什么时候会自动被删除 ​ Message TTL(x-message-ttl):设置队列中的所有消息的生存周期;单位毫秒;生存时间到了,消息会被从队里中删除 ​ Auto Expire(x-expires): 当队列在指定的时间没有被访问(consume, basicGet, queueDeclare…)就会被删除,Features=Exp ​ Max Length(x-max-length): 限定队列的消息的最大值长度,超过指定长度将会把最早的几条删除掉,Feature=Lim ​ Max Length Bytes(x-max-length-bytes): 限定队列最大占用的空间大小, 一般受限于内存、磁盘的大小, Features=Lim B ​ Dead letter exchange(x-dead-letter-exchange): 当队列消息长度大于最大长度、或者过期的等,将从队列中删除的消息推送到指定的交换机中去而不是丢弃掉,Features=DLX ​ Dead letter routing key(x-dead-letter-routing-key):将删除的消息推送到指定交换机的指定路由键的队列中去, Feature=DLK ​ Maximum priority(x-max-priority):优先级队列,声明队列时先定义最大优先级值(定义最大值一般不要太大),在发布消息的时候指定该消息的优先级, 优先级更高(数值更大的)的消息先被消费 ​ Lazy mode(x-queue-mode=lazy): Lazy Queues: 先将消息保存到磁盘上,不放在内存中,当消费者开始消费的时候才加载到内存中,Master locator(x-queue-master-locator) ## 三:RabbitMq案例 #### 1.部署新服务器 ​ 准备python3环境 ​ 准备数据库 ​ 密码:123456 ​ 库名:pydb ​ 用户:root ​ 库名:user ​ 字符集:utf8 ​ 表名:name,age,height #### 2.项目地址 ​ https://xingdian-file.oss-cn-hangzhou.aliyuncs.com/py-rabbitmq/receive.py ​ https://xingdian-file.oss-cn-hangzhou.aliyuncs.com/py-rabbitmq/send.py ​ 注意修改程序信息,先运行send.py在运行receive.py ## 四:RabbitMq集群 #### 1.原理介绍 ​ RabbitMQ是依据erlang的分布式特性(RabbitMQ底层是通过Erlang架构来实现的,所以rabbitmqctl会启动Erlang节点,并基于Erlang节点来使用Erlang系统连接RabbitMQ节点,在连接过程中需要正确的Erlang Cookie和节点名称,Erlang节点通过交换Erlang Cookie以获得认证)来实现的,所以部署Rabbitmq分布式集群时要先安装Erlang,并把其中一个服务的cookie复制到另外的节点 ​ RabbitMQ集群中,各个RabbitMQ为对等节点,即每个节点均提供给客户端连接,进行消息的接收和发送。节点分为内存节点和磁盘节点,一般的,均应建立为磁盘节点,为了防止机器重启后的消息消失 ​ RabbitMQ的Cluster集群模式一般分为两种,普通模式和镜像模式。消息队列通过RabbitMQ HA镜像队列进行消息队列实体复制 ​ 普通模式下,以两个节点(rabbit01、rabbit02)为例来进行说明。对于Queue来说,消息实体只存在于其中一个节点rabbit01(或者rabbit02),rabbit01和rabbit02两个节点仅有相同的元数据,即队列的结构。当消息进入rabbit01节点的Queue后,consumer从rabbit02节点消费时,RabbitMQ会临时在rabbit01、rabbit02间进行消息传输,把A中的消息实体取出并经过B发送给consumer。所以consumer应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理Queue。否则无论consumer连rabbit01或rabbit02,出口总在rabbit01,会产生瓶颈 ​ 镜像模式下,将需要消费的队列变为镜像队列,存在于多个节点,这样就可以实现RabbitMQ的HA高可用性。作用就是消息实体会主动在镜像节点之间实现同步,而不是像普通模式那样,在consumer消费数据时临时读取。缺点就是,集群内部的同步通讯会占用大量的网络带宽 #### 2.集群部署 ​ 单一模式:即单机情况不做集群,就单独运行一个 rabbitmq 而已 ​ 普通模式:默认模式,以两个节点(rabbit01、rabbit02)为例来进行说明。对于 Queue 来说,消息实体只存在于其中一个节点 rabbit01(或者 rabbit02),rabbit01 和 rabbit02 两个节点仅有相同的元数据,即队列的结构。当消息进入 rabbit01 节点的 Queue 后,consumer 从 rabbit02 节点消费时,RabbitMQ 会临时在 rabbit01、rabbit02 间进行消息传输,把 A 中的消息实体取出并经过 B 发送给 consumer。所以 consumer 应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理 Queue。否则无论 consumer 连 rabbit01 或 rabbit02,出口总在 rabbit01,会产生瓶颈。当 rabbit01 节点故障后,rabbit02 节点无法取到 rabbit01 节点中还未消费的消息实体。如果做了消息持久化,那么得等 rabbit01 节点恢复,然后才可被消费;如果没有持久化的话,就会产生消息丢失的现象 ​ 镜像模式: 把需要的队列做成镜像队列,存在与多个节点属于 RabbitMQ 的 HA 方案。该模式解决了普通模式中的问题,其实质和普通模式不同之处在于,消息实体会主动在镜像节点间同步,而不是在客户端取数据时临时拉取。该模式带来的副作用也很明显,除了降低系统性能外,如果镜像队列数量过多,加之大量的消息进入,集群内部的网络带宽将会被这种同步通讯大大消耗掉。所以在对可靠性要求较高的场合中适用 环境准备 ```shell 3台centos7操作系统,ip分别为: 192.168.120.138 192.168.120.139 192.168.120.140 ``` 本地解析 ```shell [root@rabbitmq1 ~]# vim /etc/hosts 192.168.120.138 rabbitmq1 192.168.120.139 rabbitmq2 192.168.120.140 rabbitmq3 ``` 保证 3 台能 ping 通 ```shell [root@rabbitmq1 ~]# ping rabbitmq1 PING 192.168.31.154 (192.168.31.154) 56(84) bytes of data. 64 bytes from 192.168.31.154: icmp_seq=1 ttl=64 time=0.025 ms 64 bytes from 192.168.31.154: icmp_seq=2 ttl=64 time=0.043 ms 64 bytes from 192.168.31.154: icmp_seq=3 ttl=64 time=0.042 ms ^C --- 192.168.31.154 ping statistics --- 3 packets transmitted, 3 received, 0% packet loss, time 1999ms rtt min/avg/max/mdev = 0.025/0.036/0.043/0.010 ms [root@rabbitmq1 ~]# ping rabbitmq2 PING 192.168.31.155 (192.168.31.155) 56(84) bytes of data. 64 bytes from 192.168.31.155: icmp_seq=1 ttl=64 time=0.742 ms 64 bytes from 192.168.31.155: icmp_seq=2 ttl=64 time=0.343 ms ^C --- 192.168.31.155 ping statistics --- 2 packets transmitted, 2 received, 0% packet loss, time 1000ms rtt min/avg/max/mdev = 0.343/0.542/0.742/0.200 ms [root@rabbitmq1 ~]# ping rabbitmq3 PING 192.168.31.156 (192.168.31.156) 56(84) bytes of data. 64 bytes from 192.168.31.156: icmp_seq=1 ttl=64 time=1.15 ms 64 bytes from 192.168.31.156: icmp_seq=2 ttl=64 time=0.380 ms ^C --- 192.168.31.156 ping statistics --- 2 packets transmitted, 2 received, 0% packet loss, time 1001ms rtt min/avg/max/mdev = 0.380/0.768/1.157/0.389 ms ``` 安装erlang和socat ```shell [root@xingdian-server-1 /]# wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-23.3.4.11-1.el7.x86_64.rpm/download.rpm [root@xingdian-server-1 /]# yum install erlang-23.3.4.11-1.el7.x86_64.rpm -y [root@rabbitmq1 ~]# yum install -y socat ``` 安装rabbitmq ```shell [root@xingdian-server-1 /]# wget --content-disposition https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.10.0-1.el7.noarch.rpm/download.rpm [root@xingdian-server-1 /]# yum -y install rabbitmq-server-3.10.0-1.el7.noarch.rpm ``` 开启用户远程登录 ```shell [root@rabbitmq-1 rabbitmq]# vim rabbitmq.conf loopback_users.guest = false ``` rabbitmq 常用命令 ```shell [root@rabbitmq1 ~]# systemctl start rabbitmq-server.service 每台都操作开启rabbitmq的web访问界面: [root@rabbitmq-1 ~]# rabbitmq-plugins enable rabbitmq_management ``` 账号配置 ​ 安装启动后其实还不能在其它机器访问,rabbitmq 默认的 guest 账号只能在本地机器访问, 如果想在其它机器访问需配置其它账号 ```shell 4.创建用户: 注意:在一台机器操作 添加用户和密码 [root@rabbitmq-1 ~]# rabbitmqctl add_user soho soso Creating user "soho" ... ...done. 这是为管理员 [root@rabbitmq-1 ~]# rabbitmqctl set_user_tags soho administrator Setting tags for user "soho" to [administrator] ... ...done. 查看用户 [root@rabbitmq-1 ~]# rabbitmqctl list_users Listing users ... guest [administrator] soho [administrator] ...done. ``` 设置权限 ​ 此处设置权限时注意'.*'之间需要有空格 三个'.*'分别代表了conf权限,read权限与write权限 例如:当没有给soho设置这三个权限前是没有权限查询队列,在ui界面也看不见 ```shell [root@rabbitmq-1 ~]# rabbitmqctl set_permissions -p "/" soho ".*" ".*" ".*" Setting permissions for user "soho" in vhost "/" ... ...done. ``` 查看端口 ​ 4369 -- erlang发现口 ​ 5672 --程序连接端口 ​ 15672 -- 管理界面ui端口 ​ 25672 -- server间内部通信口 集群部署 ```shell 1.首先创建好数据存放目录和日志存放目录: [root@rabbitmq-1 ~]# mkdir -p /data/rabbitmq/data [root@rabbitmq-1 ~]# mkdir -p /data/rabbitmq/logs [root@rabbitmq-1 ~]# chmod 777 -R /data/rabbitmq [root@rabbitmq-1 ~]# chown rabbitmq.rabbitmq /data/ -R 创建配置文件: [root@rabbitmq-1 ~]# vim /etc/rabbitmq/rabbitmq-env.conf [root@rabbitmq-1 ~]# cat /etc/rabbitmq/rabbitmq-env.conf RABBITMQ_MNESIA_BASE=/data/rabbitmq/data RABBITMQ_LOG_BASE=/data/rabbitmq/logs 重启服务 [root@rabbitmq-1 ~]# systemctl restart rabbitmq-server ``` 拷⻉erlang.cookie ​ Rabbitmq的集群是依附于erlang的集群来⼯作的,所以必须先构建起erlang的集群景象。Erlang的集群中各节点是经由⼀个cookie来实现的,这个cookie存放在/var/lib/rabbitmq/.erlang.cookie中,⽂件是400的权限。所以必须保证各节点cookie⼀致,不然节点之间就⽆法通信 ```shell #如果执行# rabbitmqctl stop_app 这条命令报错:需要执行 #chmod 400 .erlang.cookie #chown rabbitmq.rabbitmq .erlang.cookie ``` ​ 官方在介绍集群的文档中提到过.erlang.cookie 一般会存在这两个地址:第一个是home/.erlang.cookie;第二个地方就是/var/lib/rabbitmq/.erlang.cookie。如果我们使用解压缩方式安装部署的rabbitmq,那么这个文件会在{home}目录下,也就是$home/.erlang.cookie。如果我们使用rpm等安装包方式进行安装的,那么这个文件会在/var/lib/rabbitmq目录下 ```shell [root@rabbitmq-1 ~]# cat /var/lib/rabbitmq/.erlang.cookie HOUCUGJDZYTFZDSWXTHJ ⽤scp的⽅式将rabbitmq-1节点的.erlang.cookie的值复制到其他两个节点中。 [root@rabbitmq-1 ~]# scp /var/lib/rabbitmq/.erlang.cookie root@192.168.50.139:/var/lib/rabbitmq/ [root@rabbitmq-1 ~]# scp /var/lib/rabbitmq/.erlang.cookie root@192.168.50.140:/var/lib/rabbitmq/ ``` 将mq-2、mq-3作为内存节点加⼊mq-1节点集群中 ```shell [root@rabbitmq2 ~]# systemctl restart rabbitmq-server [root@rabbitmq-2 ~]# rabbitmqctl stop_app #停止节点,切记不是停止服务 [root@rabbitmq-2 ~]# rabbitmqctl reset #如果有数据需要重置,没有则不用 [root@rabbitmq-2 ~]# rabbitmqctl join_cluster --ram rabbit@rabbitmq-1 Clustering node 'rabbit@rabbitmq-2' with 'rabbit@rabbitmq-1' ... [root@rabbitmq-2 ~]# rabbitmqctl start_app #启动节点 Starting node 'rabbit@rabbitmq-2' ... ``` ​ 默认rabbitmq启动后是磁盘节点,在这个cluster命令下,mq-2和mq-3是内存节点,mq-1是磁盘节点 ​ 如果要使mq-2、mq-3都是磁盘节点,去掉--ram参数即可 ​ 如果想要更改节点类型,可以使⽤命令rabbitmqctl change_cluster_node_type,disc(ram),前提是必须停掉rabbit应⽤ 如果有需要使用磁盘节点加入集群 ```shell [root@rabbitmq-2 ~]# rabbitmqctl join_cluster rabbit@rabbitmq-1 [root@rabbitmq-3 ~]# rabbitmqctl join_cluster rabbit@rabbitmq-1 ``` 查看集群状态 ​ 在 RabbitMQ 集群任意节点上执行 rabbitmqctl cluster_status来查看是否集群配置成功 ​ 在mq-1磁盘节点上面查看 ```shell [root@rabbitmq-1 ~]# rabbitmqctl cluster_status ``` ![image-20230307205356851](https://xingdian-image.oss-cn-beijing.aliyuncs.com/xingdian-image/image-20230307205356851.png) 登录rabbitmq web管理控制台 ![image-20230307205441863](https://xingdian-image.oss-cn-beijing.aliyuncs.com/xingdian-image/image-20230307205441863.png) 根据界⾯提示创建⼀条队列 ![image-20230307205509008](https://xingdian-image.oss-cn-beijing.aliyuncs.com/xingdian-image/image-20230307205509008.png) ![image-20230307205523905](https://xingdian-image.oss-cn-beijing.aliyuncs.com/xingdian-image/image-20230307205523905.png) 注意: ​ 在RabbitMQ集群中,必须⾄少有⼀个磁盘节点,否则队列元数据⽆法写⼊到集群中,当磁盘节点宕掉时,集群将⽆法写⼊新的队列元数据信息 部署RabbitMQ镜像配置 ​ 上面已经完成RabbitMQ默认集群模式,但并不保证队列的高可用性,队列内容不会复制。如果队列节点宕机直接导致该队列无法应用,只能等待重启,所以要想在队列节点宕机或故障也能正常应用,就要复制队列内容到集群里的每个节点,必须要创建镜像队列;镜像队列是基于普通的集群模式的 创建镜像集群:三台机器相同操作 ```shell rabbitmq set_policy :设置策略 [root@rabbitmq-1 ~]#rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}' Setting policy "ha-all" for pattern "^" to "{"ha-mode":"all"}" with priority "0" for vhost "/" ... [root@rabbitmq-2 ~]# rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}' Setting policy "ha-all" for pattern "^" to "{"ha-mode":"all"}" with priority "0" for vhost "/" ... [root@rabbitmq-3 ~]# rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}' Setting policy "ha-all" for pattern "^" to "{"ha-mode":"all"}" with priority "0" for vhost "/" ... ``` ![image-20230307205623465](https://xingdian-image.oss-cn-beijing.aliyuncs.com/xingdian-image/image-20230307205623465.png) 注意 ​ "^"匹配所有的队列, ha-all 策略名称为ha-all, '{"ha-mode":"all"}' 策略模式为 all 即复制到所有节点,包含新增节点 设置策略介绍 ```shell rabbitmqctl set_policy [-p Vhost] Name Pattern Definition -p Vhost: 可选参数,针对指定vhost下的queue进行设置 Name: policy(策略)的名称 Pattern: queue的匹配模式(正则表达式),也就是说会匹配一组。 Definition:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode ha-mode:指明镜像队列的模式,有效值为 all/exactly/nodes all:表示在集群中所有的节点上进行镜像 exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定 nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定 ha-params:ha-mode模式需要用到的参数 ha-sync-mode:进行队列中消息的同步方式,有效值为automatic(自动)和manual(手动) 案例: 例如,对队列名称以hello开头的所有队列进行镜像,并在集群的两个节点上完成镜像,policy的设置命令为: rabbitmqctl set_policy hello-ha "^hello" '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}' rabbitmqctl set_policy hello-ha "^hello" '{"ha-mode":"nodes","ha-params":["rabbit@rabbitmq-2"],"ha-sync-mode":"automatic"}' ``` 则此时镜像队列设置成功 已经部署完成 将所有队列设置为镜像队列,即队列会被复制到各个节点,各个节点状态保持一致