侧边栏壁纸
  • 累计撰写 16 篇文章
  • 累计创建 17 个标签
  • 累计收到 1 条评论
PHP

RabbitMQ-PHP

xiuxiubiu
2018-09-27 / 0 评论 / 0 点赞 / 624 阅读 / 15,077 字 / 正在检测是否收录...

Centos安装RabbitMQ(Yum)

  • 官方安装文档

    Installing on RPM-based Linux (RHEL, CentOS, Fedora, openSUSE)

  • 安装Erlang

  • 安装RabbitMQ Server

    • 添加公钥

      rpm --import https://dl.bintray.com/rabbitmq/Keys/rabbitmq-release-signing-key.asc
      
    • 添加RabbitMQ Server源

      • centos 7:

        # In /etc/yum.repos.d/rabbitmq-server.repo
        [bintray-rabbitmq-server]
        name=bintray-rabbitmq-rpm
        baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.7.x/el/7/
        gpgcheck=0
        repo_gpgcheck=0
        enabled=1
        
      • centos 6:

        # In /etc/yum.repos.d/rabbitmq-server.repo
        [bintray-rabbitmq-server]
        name=bintray-rabbitmq-rpm
        baseurl=https://dl.bintray.com/rabbitmq/rpm/rabbitmq-server/v3.7.x/el/6/
        gpgcheck=0
        repo_gpgcheck=0
        enabled=1
        
    • 安装RabbitMQ Server

      yum -y install rabbitmq-server
      
  • centos开机自启动

    # 启动
    systemctl start rabbitmq-server
    
    # 停止
    systemctl stop rabbitmq-server
    
    # 重启
    systemctl restart rabbitmq-server
    
    # 运行状态
    systemctl status rabbitmq-server
    
    # 开机自启动
    systemctl enable rabbitmq-server
    
    # 关闭开机自启动
    systemctl disable rabbitmq-server
    
    # 自启动状态
    systemctl list-unit-files | grep rabbitmq-server
    

基本的消息发布和消费操作(Hello World

  • 向队列发送消息

    • 引入autoload、创建连接、创建channel。

      require_once __DIR__ . '/vendor/autoload.php';
      
      use PhpAmqpLib\Connection\AMQPStreamConnection;
      use PhpAmqpLib\Message\AMQPMessage;
      
      $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
      $channel = $connection->channel();
      

      channel提供了大多数的API操作方法。

    • 声明消息发布队列。

      $channel->queue_declare('hello', false, false, false);
      

      queue_declare操作是幂等性的,声明多次和一次都只会在队列不存在时才创建队列。

    • 发布消息

      $message = new AMQPMessage('Hello World!');
      $channel->basic_publish($message, '', 'hello');
      
    • 关闭通道和连接

      $channel->close();
      $connection->close();
      
    • send.php 完整代码

  • 从接收消息

    • 引入autoload、创建连接、创建channel。

      require_once __DIR__ . '/vendor/autoload.php';
      
      use PhpAmqpLib\Connection\AMQPStreamConnection;
      
      $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
      $channel = $connection->channel();
      
    • 声明要消费的队列

      $channel->queue_declare('hello', false, false, false);
      

      这里声明消费队列是正确的,因为不能保证消费操作一定在发送操作后才执行,所以声明消费队列保证消费时队列存在。

    • 处理接收到的消息

      $callback = function ($message) {
          echo ' [x] Received ', $message->body, "\n";
      };
      
      $channel->basic_consume('hello', '', false, true, false, false, $callback);
      
    • receive.php 完整代码

  • 总结

    • 发布消息和消费消息时都应该声明队列。

    • 因为queue_declare是幂等性的,并且发布和消费都要保证队列存在。

    • 注意同一个发布和消费,声明队列时参数保持一致。

工作队列(Work Queues

  • 准备工作

    我们没有真实的工作任务,因此用sleep()函数模拟耗时任务操作,接收到的消息中,每个“.”代表一秒任务执行耗时。例如:Hello...表示完成任务操作需要3秒。

    • 发布任务消息

      send.php的基础上做修改,新文件命名为new_task.php。

      $data = implode(' ', array_slice($argv, 1));
      if (empty($data)) {
          $data = "Hello World!";
      }
      $msg = new AMQPMessage($data);
      
      $channel->basic_publish($msg, '', 'hello');
      
      echo ' [x] Sent ', $data, "\n";
      
    • 消费任务消息

      receive.php的基础上模拟耗时任务,命名为worker.php。

      $callback = function ($message) {
          echo ' [x] Received ', $message->body, "\n";
          sleep(substr_count($message->body, '.'));
          echo " [x] Done\n";
      };
      
      $channel->basic_consume('hello', '', false, true, false, false, $callback);
      
  • 轮询调度(Round-robin dispatching)

    开启两个控制台,运行worker.php,消费队列任务。

    # shell 1
    php worker.php
    # => [*] Waiting for messages. To exit press CTRL+C
    
    # shell 2
    php worker.php
    # => [*] Waiting for messages. To exit press CTRL+C
    

    再开启一个控制台,运行new_task.php,发布任务消息。

    # shell 3
    php new_task.php First message.
    php new_task.php Second message..
    php new_task.php Third message...
    php new_task.php Fourth message....
    php new_task.php Fifth message.....
    

    运行结果

    # shell 1
    php worker.php
    # => [*] Waiting for messages. To exit press CTRL+C
    # => [x] Received 'First message.'
    # => [x] Received 'Third message...'
    # => [x] Received 'Fifth message.....'
    
    # shell 2
    php worker.php
    # => [*] Waiting for messages. To exit press CTRL+C
    # => [x] Received 'Second message..'
    # => [x] Received 'Fourth message....'
    
    默认RabbitMQ会一次性地平均地将消息发送到下个消费者,每个消费者都会得到相同数量的消息。
  • 消息确认(Message acknowledgment)

    当前代码,RabbitMQ发送消息给消费者后会立马删除消息,如果杀死正在执行任务的消费者,消息就会丢失。

    为了保证消息不会丢失,RabbitMQ支持Message acknowledgment,消费者发送确认消息给RabbitMQ,告诉它消息已经被接收、处理,可以删除消息了。如果消费者被杀死没有发送确认消息,RabbitMQ会将消息重新转发给别的消费者。

    Message acknowledgment没有超时限制,所以执行非常耗时的操作是没问题的。

    Message acknowledgment默认是关闭的,开启只需要设置basic_consume的第四个参数no_ack为false。

    $callback = function ($message) {
        echo ' [x] Received ', $message->body, "\n";
        sleep(substr_count($message->body, '.'));
        echo " [x] Done\n";
        $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
    };
    
    $channel->basic_consume('hello', '', false, false, false, false, $callback);
    
  • 消息持久化(Message durability)

    当RabbitMQ退出,在我们不设置持久化的情况下,队列和消息都会丢失。为了保证消息不会丢失,我们需要做两个操作:

    • 队列持久化

      queue_declare声明队列时设置第三个参数durable为true

      $channel->queue_declare('hello', false, true, false);
      
    • 消息持久化

      设置AMQPMessage的第二个参数properties中的delivery_mode为2。

      $properties = [
          'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
      ];
      $message = new AMQPMessage($data, $properties);
      
  • 公平调度(Fair dispatch)

    假如在轮询调度(Round-robin dispatching)机制下,有两个消费者,基数任务特别耗时,偶数任务很轻松,RabbitMQ不在乎消费者任务是否积压,还是会固定将奇数任务分配给一个消费者,偶数任务分配给另一个消费者。

    为了解决这个问题,可以在消费者代码中使用basic_qos方法,设置第二个参数prefetch_count的值为1,此参数的意思是,如果有prefetch_count个消息没有ack,则不会收到新的消息。设置为1告诉RabbitMQ不要同一时间分配超过一个消息给消费者,这样只有在消费者空闲时才会分配下一条消息给消费者。(还记得轮询调度的介绍吗?

    basic_qos只有在需要消息确认,即设置basic_consume的第4个参数no_ack为false时生效。

    $channel->basic_qos(null, 1, null);
    
  • 完整代码

发布和订阅(Publish/Subscribe)

我们构建一个日志系统,广播一条消息给多个消费者,多个消费者收到消息后做不同的处理。比如,一个保存日志到硬盘,另一个打印输出到屏幕。

  • 交换机(Exchanges)

    在RabbitMQ中生产者(producer)不会直接发送消息给队列(queue),甚至生产者都不知道消息是否会发送给哪个队列。

    生产者只会将消息发送给交换机(exchange),交换机一边接收生产者发送的消息,另一边将消息放到队列里。

    在前边的代码里没有指定exchange为什么队列还能收到消息呢?那是因为我们使用了默认的空字符串''作为exchange,如果有第三个参数routing_key,则routing_key决定将消息放到哪个队列。

    $channel->basic_publish($message, '', 'hello');
    

    交换机(exchange)必须知道收到消息后如何处理,是放到指定的队列,还是放到多个队列,还是丢弃。这个规则由路由类型(exchange type)定义,例如:direct、topic、headers、fanout。

    首先创建一个叫做logs的fanout类型的exchange,fanout的意思是将消息广播给已知的队列。

    $channel->exchange_declare('logs', 'fanout', false, false, false);
    $channel->basic_publish($msg, 'logs');
    
  • 临时队列(Temporary queues)

    前面说到了,RabbitMQ只会将消息发送给交换机,然后交换机放到队列,所以我们需要将队列绑定到交换机,然后交换机才会发消息给队列。

    生产者发布消息时,因为发送到交换机,所以并没有声明队列,在消费者代码李我们需要一个新的,空队列,这个队列在消费者断开连接时会自动删除。

    php-amqplib的queue_declare方法中,当第一个参数queue为空字符串时,RabbitMQ会生成类似amq.gen-JzTY20BRgKO-HjmUJj0wLg格式的随机队列名称。queue_declare第四个参数exclusive会设置队列为排他性,当连接断开时会自动删除队列。

    然后将队列绑定到交换机,然后交换机才会发消息给队列。

    list($queue_name, , ) = $channel->queue_declare('', false, false, true, false);
    
    $channel->queue_bind($queue_name, 'logs');
    
  • 完整代码

  • rabbitmqctl命令

    • exchange列表

      rabbitmqctl list_exchanges
      
    • exchange绑定列表

      rabbitmqctl list_bindings
      

路由(Routing)

RabbitMQ可以将消息广播给许多接收者,如何只发送给指定类型或者一部分接收者呢?比如:日志系统中将所有日志打印输出,而重要的错误日志存放磁盘。

  • 绑定(Binding)

    在前边的代码里,我们已经绑定过队列,回想下代码:

    $channel->queue_bind($queue_name, 'logs');
    

    绑定是exchange和queue之间的一种关系,可以理解为,queue关注或者对exchange的消息感兴趣,想要接收到exchange的消息。

    queue_bind方法可以设置第三个参数routing_key,为了和$channel::basic_publish的第三个参数routing_key做区分,我们称其为binding_key。

    $binding_key = 'black';
    $channel->queue_bind($queue_name, $exchange_name, $binding_key);
    

    binding_key的含义根据exchange的不同而改变。当exchange为fanout时,binding_key被忽略。

  • Direct exchange

    之前我们的日志系统会将所有的消息广播给所有的消费者,但是我们希望程序可以根据队列处理的错误级别去过滤消息,比如:我们只希望将收到的error信息写到磁盘,而不希望info和warning也占用磁盘空间。

    我们使用的fanout exchange没有给我们自由操作的空间,它只能无脑地将消息广播给所有绑定的队列。

    这里我们使用direct exchange,其背后的路由算法也非常简单,exchange会将消息发送给binding key完全和routing key匹配的queue。

  • Multiple binding

    多个queue的binding key允许相同

    $binding_key = 'black';
    $channel->queue_bind($queue_one, $exchange_name, $binding_key);
    $channel->queue_bind($queue_two, $exchange_name, $binding_key);
    
  • 发送日志(Emitting logs)

    我们使用direct exchage代替日志系统中的fanout,将日志的严重级别作为binding key,这样接收脚本可以根据严重界别接收消息。

    首先我们需要创建exchange:

    $channel->exchange_declare('direct_logs', 'direct', false, false, false);
    

    接下来发送消息:

    $channel->exchange_declare('direct_logs', 'direct', false, false, false);
    $channel->basic_publish($msg, 'direct_logs', $severity);
    
  • 订阅消息(Subscribing)

    根据队列接收日志的级别创建绑定。

    foreach ($severities as $severity) {
        $channel->queue_bind($queue_name, 'direct_logs', $severity);
    }
    
  • 整合代码

    emit_log_direct.php代码:

    <?php
    
    require_once dirname(__DIR__).'/vendor/autoload.php';
    
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;
    
    $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
    $channel = $connection->channel();
    
    $channel->exchange_declare('direct_logs', 'direct', false, false, false);
    
    $serverity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';
    
    $data = implode(' ', array_slice($argv, 1));
    if (empty($data)) {
        $data = 'Hello World!';
    }
    
    $messaga = new AMQPMessage($data);
    
    $channel->basic_publish($messaga, 'direct_logs', $serverity);
    
    echo ' [x] Sent ', $severity, ':', $data, "\n";
    
    $channel->close();
    $connection->close();
    

    receive_log_direct.php代码:

    <?php
    
    require_once dirname(__DIR__).'/vendor/autoload.php';
    
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    
    $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest');
    $channel = $connection->channel();
    
    $channel->exchange_declare('direct_logs', 'direct', false, false, false);
    
    list($queue_name, , ) = $channel->queue_declare('', false, false, true, false);
    
    $serverities = array_slice($argv, 1);
    if (empty($serverities)) {
        file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
        exit(1);
    }
    
    foreach ($serverities as $serverity) {
        $channel->queue_bind($queue_name, 'direct_logs', $serverity);
    }
    
    
    $callback = function ($msg) {
        echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
    };
    
    $channel->basic_consume($queue_name, '', false, true, false, false, $callback);
    
    while (count($channel->callbacks)) {
        $channel->wait();
    }
    
    $channel->close();
    $connection->close();
    

    保存warning和error日志信息:

    php receive_logs_direct.php warning error > logs_from_rabbit.log
    

    输出所有的日志信息:

    php receive_logs_direct.php info warning error
    

    发送error日志:

    php emit_log_direct.php error "Run. Run. Or it will explode."
    

Topic Exchange

在日志系统中,我们希望不仅仅根据日志级别订阅消息,还可以根据日志来源。比如unix中的syslog会根据日志级别(info/warn/crit...)和类型(auth/cron/kern...)来处理日志。

我们可能想即监听来自cron的crit日志,也想监听kern的所有日志。为了完成这个功能我们需要用到topic exchange。

  • Topic exchange

    消息发送到topic exchange不能使用随意的routing key,必须是一个由.分隔的单词列表。单词列表可以使用任何单词,但是通常它们会说明消息的特征。一些有效的routing key示例:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。routing key可以由很多单词组成,最多不超过255字节。

    binding key必须是相同的形式。topic exchange背后的逻辑和direct exchange很相似,同样是将指定了routing key的消息发送给routing key和binding key匹配的queue。但是topic exchange有两种特殊情况:

    * 只能匹配一个单词

    # 可以匹配0个或多个单词

    如果发送消息时的routing key在消费端没有binding key和其匹配,那么消息将被抛弃。

    topic exchange时非常强大的,可以实现和其他exchange一样的功能。

    • 当biding key为#时,queue将会收到所有的消息,和fanout exchange一样。

    • 当binding key不使用#和*时,会和direct exchange一样发送消息到binding key和routing key完全匹配的queue。

  • 整合代码

    假设日志系统中的routing key有两个关键字:"<类型>.<级别>"。

    emit_log_topic.php代码:

    <?php
    
    require_once __DIR__ . '/vendor/autoload.php';
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    use PhpAmqpLib\Message\AMQPMessage;
    
    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();
    
    $channel->exchange_declare('topic_logs', 'topic', false, false, false);
    
    $routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';
    $data = implode(' ', array_slice($argv, 2));
    if (empty($data)) {
        $data = "Hello World!";
    }
    
    $msg = new AMQPMessage($data);
    
    $channel->basic_publish($msg, 'topic_logs', $routing_key);
    
    echo ' [x] Sent ', $routing_key, ':', $data, "\n";
    
    $channel->close();
    $connection->close();
    

    receive_log_topic.php代码:

    <?php
    
    require_once __DIR__ . '/vendor/autoload.php';
    use PhpAmqpLib\Connection\AMQPStreamConnection;
    
    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();
    
    $channel->exchange_declare('topic_logs', 'topic', false, false, false);
    
    list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
    
    $binding_keys = array_slice($argv, 1);
    if (empty($binding_keys)) {
        file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
        exit(1);
    }
    
    foreach ($binding_keys as $binding_key) {
        $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
    }
    
    echo " [*] Waiting for logs. To exit press CTRL+C\n";
    
    $callback = function ($msg) {
        echo ' [x] ', $msg->delivery_info['routing_key'], ':', $msg->body, "\n";
    };
    
    $channel->basic_consume($queue_name, '', false, true, false, false, $callback);
    
    while (count($channel->callbacks)) {
        $channel->wait();
    }
    
    $channel->close();
    $connection->close();
    

    接收所有日志:

    php receive_logs_topic.php "#"
    

    接收kern类型的所有日志:

    php receive_logs_topic.php "kern.*"
    

    接收所有critical级别的日志:

    php receive_logs_topic.php "*.critical"
    

    多个binding key绑定

    php receive_logs_topic.php "kern.*" "*.critical"
    

    发送kern.critical日志:

    php emit_log_topic.php "kern.critical" "A critical kernel error"
    

延迟队列

RabbitMQ 3.5.8以后提供了rabbitmq_delayed_message_exchange插件来实现延迟队列。

  • 插件列表命令

    查看RabbitMQ是否已安装rabbitmq_delayed_message_exchange:

    rabbitmq-plugins list | grep rabbitmq_delayed_message_exchange
    

    插件已安装:

    [  ] rabbitmq_delayed_message_exchange 20171201-3.7.x
    

    插件已启动:

    [E*] rabbitmq_delayed_message_exchange 20171201-3.7.x
    
  • 插件安装

    首先,Community Plugins找到rabbitmq_delayed_message_exchange下载插件。解压后将插件放到RabbitMQ安装目录下的plugins文件夹下。

    MacOS使用brew的安装目录:

    /usr/local/Cellar/rabbitmq/3.7.7_1/
    

    CentOS使用yum的安装目录:

    /usr/lib/rabbimtq/lib/rabbitmq_server_3.7.7/
    

    实际目录根据安装版本确定。

    确认插件安装成功:

    rabbitmq-plugins list | grep rabbitmq_delayed_message_exchange
    [  ] rabbitmq_delayed_message_exchange 20171201-3.7.x
    

    启动插件:

    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    

    插件启动成功:

    rabbitmq-plugins list | grep rabbitmq_delayed_message_exchange
    [E*] rabbitmq_delayed_message_exchange 20171201-3.7.x
    
  • 延迟消息

    使用延迟队列,需要声明x-delayed-message类型的exchange:

    $args = new AMQPTable(['x-delayed-type' => 'direct']);
    $channel->exchange_declare('my-delayed', 'x-delayed-message', false, true, false, false, false, $args);
    

    这里添加了一个叫做x-delayed-type的header信息,更多关于x-delayed-type的介绍查看Routing部分

    声明exchange后,就可以在发送消息时设置消息的延迟时间了:

    $message5000 = new AMQPMessage('延迟5000毫秒', ['application_headers' => new AMQPTable(['x-delay'=>5000])]);
    $channel->basic_publish($message5000, 'my-delayed', '', false, false);
    
    $message1000 = new AMQPMessage('延迟1000毫秒', ['application_headers' => new AMQPTable(['x-delay'=>1000])]);
    $channel->basic_publish($message1000, 'my-delayed', '', false, false);
    

    设置AMQMessage的properties属性中的application_headers信息里的x-delay参数指定延迟时间。如果此参数没有设置,则消息不延迟。

  • Routing

    插件可以通过x-delayed-type参数提供灵活的路由行为。比如:设置x-delayed-type为direct,表示插件使用和direct exchange一样的路由行为,当然也可以设置为topic等等exchange类型,或者是别的插件提供的exchange。

    注意:x-delayed-type是必须的,并且指定的exchange必须要存在

  • 完整代码

    delayed_message_send.php

    delayed_message_receive.php

优先级队列

RabbitMQ 3.5.0以后任何queue都可以声明优先级。

  • 声明优先级队列

    queue_declare声明队列时,设置最后一个参数arguments的x-max-priority属性来声明队列的最大优先级,此参数为1~255之间的正整数,推荐使用1~10之间的证书。

    $args = new AMQPTable(['x-max-priority'=>10]);
    $channel->queue_declare('priority_queue', false, false, false, false, false, $args);
    
  • 发送消息的优先级

    发送消息时通过设置AMQPMessage第二个参数properties的priority属性设置消息的优先级:

    $properties = ['priority'=>5];
    $message = new AMQPMessage($data, $properties);
    

    当priority参数为空时当作0处理,当超过队列的x-max-priority值时,按最大优先级处理。比如:x-max-priority为10,priority设置为20,实际按10处理。

  • 完整代码

    priority_send.php

    我们执行程序priority_send.php时接收一个参数作为消息的优先级。

    $properties = isset($argv[1]) && !empty($argv[1]) ? ['priority'=>(int)$argv[1]] : [];
    $message = new AMQPMessage($data, $properties);
    

    priority_receive.php

    消费者收到消息后sleep(5)模拟耗时操作,好让消息有时间进行优先级排序。

    $callback = function ($message) {
        echo ' [x] Received ', $message->body, "\n";
        sleep(5);
        $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
        echo " [x] Done\n";
    };
    

    发送消息:

    php priority_queue/priority_send.php
    
    php priority_queue/priority_send.php
    
    php priority_queue/priority_send.php 6
    
    php priority_queue/priority_send.php 10
    
    php priority_queue/priority_send.php 200
    

    接收消息:

    php priority_queue/priority_receive.php
    
0

评论