消息队列之 RabbitMQ-安装

RabbitMQ 安装

一般来说安装 RabbitMQ 之前要安装 Erlang ,可以去Erlang官网下载。接着去RabbitMQ官网下载安装包,之后解压缩即可。根据操作系统不同官网提供了相应的安装说明:WindowsDebian / UbuntuRPM-based LinuxMac

如果是Mac 用户,个人推荐使用 HomeBrew 来安装,安装前要先更新 brew:

brew update

接着安装 rabbitmq 服务器:

brew install rabbitmq

这样 RabbitMQ 就安装好了,安装过程中会自动其所依赖的 Erlang 。

RabbitMQ 运行和管理

  1. 启动
    启动很简单,找到安装后的 RabbitMQ 所在目录下的 sbin 目录,可以看到该目录下有6个以 rabbitmq 开头的可执行文件,直接执行 rabbitmq-server 即可,下面将 RabbitMQ 的安装位置以 . 代替,启动命令就是:
./sbin/rabbitmq-server

启动正常的话会看到一些启动过程信息和最后的 completed with 7 plugins,这也说明启动的时候默认加载了7个插件。


正常启动
  1. 后台启动
    如果想让 RabbitMQ 以守护程序的方式在后台运行,可以在启动的时候加上 -detached 参数:
./sbin/rabbitmq-server -detached
  1. 查询服务器状态
    sbin 目录下有个特别重要的文件叫 rabbitmqctl ,它提供了 RabbitMQ 管理需要的几乎一站式解决方案,绝大部分的运维命令它都可以提供。
    查询 RabbitMQ 服务器的状态信息可以用参数 status :
./sbin/rabbitmqctl status

该命令将输出服务器的很多信息,比如 RabbitMQ 和 Erlang 的版本、OS 名称、内存等等

  1. 关闭 RabbitMQ 节点
    我们知道 RabbitMQ 是用 Erlang 语言写的,在Erlang 中有两个概念:节点和应用程序。节点就是 Erlang 虚拟机的每个实例,而多个 Erlang 应用程序可以运行在同一个节点之上。节点之间可以进行本地通信(不管他们是不是运行在同一台服务器之上)。比如一个运行在节点A上的应用程序可以调用节点B上应用程序的方法,就好像调用本地函数一样。如果应用程序由于某些原因奔溃,Erlang 节点会自动尝试重启应用程序。
    如果要关闭整个 RabbitMQ 节点可以用参数 stop :
./sbin/rabbitmqctl stop

它会和本地节点通信并指示其干净的关闭,也可以指定关闭不同的节点,包括远程节点,只需要传入参数 -n :

./sbin/rabbitmqctl -n rabbit@server.example.com stop 

-n node 默认 node 名称是 rabbit@server ,如果你的主机名是 server.example.com ,那么 node 名称就是 rabbit@server.example.com

  1. 关闭 RabbitMQ 应用程序
    如果只想关闭应用程序,同时保持 Erlang 节点运行则可以用 stop_app:
./sbin/rabbitmqctl stop_app

这个命令在后面要讲的集群模式中将会很有用。

  1. 启动 RabbitMQ 应用程序
./sbin/rabbitmqctl start_app
  1. 重置 RabbitMQ 节点
./sbin/rabbitmqctl reset

该命令将清除所有的队列。

  1. 查看已声明的队列
./sbin/rabbitmqctl list_queues
  1. 查看交换器
./sbin/rabbitmqctl list_exchanges

该命令还可以附加参数,比如列出交换器的名称、类型、是否持久化、是否自动删除:

./sbin/rabbitmqctl list_exchanges name type durable auto_delete
  1. 查看绑定
./sbin/rabbitmqctl list_bindings

Java 客户端访问

RabbitMQ 支持多种语言访问,以 Java 为例看下一般使用 RabbitMQ 的步骤。

  1. maven工程的pom文件中添加依赖
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version></dependency>
  1. 消息生产者
package org.study.rabbitmq;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Producer {    public static void main(String[] args) throws IOException, TimeoutException {        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");        //设置 RabbitMQ 地址
        factory.setHost("localhost");        //建立到代理服务器到连接
        Connection conn = factory.newConnection();        //获得信道
        Channel channel = conn.createChannel();        //声明交换器
        String exchangeName = "hello-exchange";
        channel.exchangeDeclare(exchangeName, "direct", true);

        String routingKey = "hola";        //发布消息
        byte[] messageBodyBytes = "quit".getBytes();
        channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

        channel.close();
        conn.close();
    }
}
  1. 消息消费者
package org.study.rabbitmq;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;public class Consumer {    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("localhost");        //建立到代理服务器到连接
        Connection conn = factory.newConnection();        //获得信道
        final Channel channel = conn.createChannel();        //声明交换器
        String exchangeName = "hello-exchange";
        channel.exchangeDeclare(exchangeName, "direct", true);        //声明队列
        String queueName = channel.queueDeclare().getQueue();
        String routingKey = "hola";        //绑定队列,通过键 hola 将队列和交换器绑定起来
        channel.queueBind(queueName, exchangeName, routingKey);        while(true) {            //消费消息
            boolean autoAck = false;
            String consumerTag = "";
            channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,                                           byte[] body) throws IOException {
                    String routingKey = envelope.getRoutingKey();
                    String contentType = properties.getContentType();
                    System.out.println("消费的路由键:" + routingKey);
                    System.out.println("消费的内容类型:" + contentType);                    long deliveryTag = envelope.getDeliveryTag();                    //确认消息
                    channel.basicAck(deliveryTag, false);
                    System.out.println("消费的消息体内容:");
                    String bodyStr = new String(body, "UTF-8");
                    System.out.println(bodyStr);

                }
            });
        }
    }
}
  1. 启动 RabbitMQ 服务器
./sbin/rabbitmq-server
  1. 运行 Consumer
    先运行 Consumer ,这样当生产者发送消息的时候能在消费者后端看到消息记录。
  2. 运行 Producer
    接着运行 Producer ,发布一条消息,在 Consumer 的控制台能看到接收的消息:

    Consumer 控制台

RabbitMQ 集群

RabbitMQ 最优秀的功能之一就是内建集群,这个功能设计的目的是允许消费者和生产者在节点崩溃的情况下继续运行,以及通过添加更多的节点来线性扩展消息通信吞吐量。RabbitMQ 内部利用 Erlang 提供的分布式通信框架 OTP 来满足上述需求,使客户端在失去一个 RabbitMQ 节点连接的情况下,还是能够重新连接到集群中的任何其他节点继续生产、消费消息。

RabbitMQ 集群中的一些概念

RabbitMQ 会始终记录以下四种类型的内部元数据:

  1. 队列元数据
    包括队列名称和它们的属性,比如是否可持久化,是否自动删除
  2. 交换器元数据
    交换器名称、类型、属性
  3. 绑定元数据
    内部是一张表格记录如何将消息路由到队列
  4. vhost 元数据
    为 vhost 内部的队列、交换器、绑定提供命名空间和安全属性

在单一节点中,RabbitMQ 会将所有这些信息存储在内存中,同时将标记为可持久化的队列、交换器、绑定存储到硬盘上。存到硬盘上可以确保队列和交换器在节点重启后能够重建。而在集群模式下同样也提供两种选择:存到硬盘上(独立节点的默认设置),存在内存中。

如果在集群中创建队列,集群只会在单个节点而不是所有节点上创建完整的队列信息(元数据、状态、内容)。结果是只有队列的所有者节点知道有关队列的所有信息,因此当集群节点崩溃时,该节点的队列和绑定就消失了,并且任何匹配该队列的绑定的新消息也丢失了。还好RabbitMQ 2.6.0之后提供了镜像队列以避免集群节点故障导致的队列内容不可用。

RabbitMQ 集群中可以共享 user、vhost、exchange等,所有的数据和状态都是必须在所有节点上复制的,例外就是上面所说的消息队列。RabbitMQ 节点可以动态的加入到集群中。

当在集群中声明队列、交换器、绑定的时候,这些操作会直到所有集群节点都成功提交元数据变更后才返回。集群中有内存节点和磁盘节点两种类型,内存节点虽然不写入磁盘,但是它的执行比磁盘节点要好。内存节点可以提供出色的性能,磁盘节点能保障配置信息在节点重启后仍然可用,那集群中如何平衡这两者呢?

RabbitMQ 只要求集群中至少有一个磁盘节点,所有其他节点可以是内存节点,当节点加入火离开集群时,它们必须要将该变更通知到至少一个磁盘节点。如果只有一个磁盘节点,刚好又是该节点崩溃了,那么集群可以继续路由消息,但不能创建队列、创建交换器、创建绑定、添加用户、更改权限、添加或删除集群节点。换句话说集群中的唯一磁盘节点崩溃的话,集群仍然可以运行,但知道该节点恢复,否则无法更改任何东西。

RabbitMQ 集群配置和启动

如果是在一台机器上同时启动多个 RabbitMQ 节点来组建集群的话,只用上面介绍的方式启动第二、第三个节点将会因为节点名称和端口冲突导致启动失败。所以在每次调用 rabbitmq-server 命令前,设置环境变量 RABBITMQ_NODENAME 和 RABBITMQ_NODE_PORT 来明确指定唯一的节点名称和端口。下面的例子端口号从5672开始,每个新启动的节点都加1,节点也分别命名为test_rabbit_1、test_rabbit_2、test_rabbit_3。

启动第1个节点:

RABBITMQ_NODENAME=test_rabbit_1 RABBITMQ_NODE_PORT=5672 ./sbin/rabbitmq-server -detached

启动第2个节点:

RABBITMQ_NODENAME=test_rabbit_2 RABBITMQ_NODE_PORT=5673 ./sbin/rabbitmq-server -detached

启动第2个节点前建议将 RabbitMQ 默认激活的插件关掉,否则会存在使用了某个插件的端口号冲突,导致节点启动不成功。

现在第2个节点和第1个节点都是独立节点,它们并不知道其他节点的存在。集群中除第一个节点外后加入的节点需要获取集群中的元数据,所以要先停止 Erlang 节点上运行的 RabbitMQ 应用程序,并重置该节点元数据,再加入并且获取集群的元数据,最后重新启动 RabbitMQ 应用程序。

停止第2个节点的应用程序:

./sbin/rabbitmqctl -n test_rabbit_2 stop_app

重置第2个节点元数据:

./sbin/rabbitmqctl -n test_rabbit_2 reset

第2节点加入第1个节点组成的集群:

./sbin/rabbitmqctl -n test_rabbit_2 join_cluster test_rabbit_1@localhost

启动第2个节点的应用程序

./sbin/rabbitmqctl -n test_rabbit_2 start_app

第3个节点的配置过程和第2个节点类似:

RABBITMQ_NODENAME=test_rabbit_3 RABBITMQ_NODE_PORT=5674 ./sbin/rabbitmq-server -detached

./sbin/rabbitmqctl -n test_rabbit_3 stop_app

./sbin/rabbitmqctl -n test_rabbit_3 reset

./sbin/rabbitmqctl -n test_rabbit_3 join_cluster test_rabbit_1@localhost./sbin/rabbitmqctl -n test_rabbit_3 start_app
RabbitMQ 集群运维

停止某个指定的节点,比如停止第2个节点:

RABBITMQ_NODENAME=test_rabbit_2 ./sbin/rabbitmqctl stop

查看节点3的集群状态:

./sbin/rabbitmqctl -n test_rabbit_3 cluster_status


版权及转载说明

本站原创、转载文章欢迎任何形式的转载,但请务必注明出处,尊重他人劳动共创开源社区

本站转载文章版权归原作者所有,如发现本站文章涉嫌侵权请点击「联系我们」反馈,本站将立即给予删除

转载请注明:文章转载自:全分享社区 「http://www.aweb.cc

本文转载自:https://www.jianshu.com/p/79ca08116d57

本文标题:消息队列之 RabbitMQ-安装

本文地址:http://www.aweb.cc/article/detail/id/682.html

消息队列之 RabbitMQ-概念介绍 <<上一篇 下一篇>>我为什么要选择RabbitMQ ,Rab

200g_v3.jpg