workbunny/webman-rabbitmq

Webman plugin workbunny/webman-rabbitmq

Installs: 3 344

Dependents: 0

Suggesters: 0

Security: 0

Stars: 33

Watchers: 1

Forks: 6

Open Issues: 2

pkg:composer/workbunny/webman-rabbitmq

3.0.0-RC.1 2026-02-16 04:39 UTC

README

workbunny

workbunny/webman-rabbitmq

🐇 A PHP implementation of RabbitMQ Client for webman plugin. 🐇

A PHP implementation of RabbitMQ Client for webman plugin

Latest Stable Version Total Downloads License PHP Version Require

说明

简介

适配Workerman/webmanAMQP组件包

  • 支持基于AMQP协议工具实现AMQP-Server
  • 支持5种消费模式:简单队列、workQueue、routing、pub/sub、exchange;
  • 支持延迟队列(rabbitMQ须安装插件);
  • 支持连接池,支持通道池,Builder支持影子模式(并发补偿);
  • 3.0与之前版本相比,更符合AMQO协议约定,更合理的架构设计和使用逻辑
    • 使用ConnectionManagement多连接管理器管理ConnectionClient),合理复用机制及并发使用能力
    • 使用Channel-Pool管理Channel,合理的复用和并发机制
    • 提供AMQP协议包,可供开发者自定义实现AMQP-ClientAMQP-Server,并提供AMQP-Frame协议帧工具

概念

    ┌───────────┐                              
    | Builder A | ──┐          
    └───────────┘   |                                          | ┌───────────┐
                    |                                          | | Channel 1 |
                    |                                          | └───────────┘
    ┌───────────┐   └─> ┌──────────────────┐                   | ┌───────────┐
    | Builder A | ────> | Connections Pool | ── connection ──> | | Channel 2 |
    └───────────┘   ┌─> └──────────────────┘   min ... MAX     | └───────────┘
                    |         <static>          <context>      | ┌───────────┐
                    |                                          | | Channel 3 |
    ┌───────────┐   |                                          | └───────────┘
    | Builder C | ──┘                                                 ...
    └───────────┘                                                 channel-max

  • Builder:队列消费者、生产者的抽象结构,类似ORMModel
    • BuilderConfig: 队列配置结构
    • Builder可以指定不同的connection配置进行连接,以区分业务/服务
    • Builderpublish/consume使用了影子模式(当前ConnectionChannel耗尽时,会自动从Connection Pool获取新的连接创建Channel
      • 影子模式下请尽量将Connection PoolChannels Pool的配置wait_timeout改小,避免过长时间的等待(等待中会出让控制权,不会阻塞)
  • Connection:基于AsyncTcpConnection封装的AMQP-client
    • ConnectionConnectionManagement管理,连接池为静态,不会因为Builder的释放而释放
    • Connection Pool中通过get拿取Connection后需要手动调用release归还,或者使用action通过传入回调函数来执行并自动归还
    • 配置信息:
      • min_connections: 最小连接数
      • max_connections: 最大连接数
      • idel_timeout: 空闲回收时间 [s]
      • wait_timeout: 等待连接超时时间 [s]
  • Channel:抽象的通道对象
    • 每一个Connection都具备一个Channel
      • 多协程时,自动创建新的Channel消费,并在协程结束后自动归还/释放
      • 单协程时,复用Channel消费
    • 配置信息:
      • idel_timeout: 空闲回收时间 [s]
      • wait_timeout: 等待连接超时时间 [s]
  • AMQP: workerman支持的协议封装

详细文档

使用

要求

  • php >= 8.1
  • webman-framework >= 2.0 或 workerman >= 5.1
  • rabbitmq-server >= 3.10

安装

composer require workbunny/webman-rabbitmq

配置

基础配置 app.php

<?php declare(strict_types=1);

return [
    'enable' => true,
    // 日志 LoggerInterface | LoggerInterface::class
    'logger'   => null,
];

连接配置 connections.php

<?php declare(strict_types=1);

use Workbunny\WebmanRabbitMQ\Clients\AbstractClient;
use Workbunny\WebmanRabbitMQ\Connections\Connection;

return [
    'default' => [
        'connection'       => Connection::class,
        // 连接池
        'connections_pool' => [
            'min_connections'       => 1,
            'max_connections'       => 20,
            'idle_timeout'          => 60,
            'wait_timeout'          => 10
        ],
        'config' => [
            'host'               => 'rabbitmq',
            'vhost'              => '/',
            'port'               => 5672,
            'username'           => 'guest',
            'password'           => 'guest',
            'mechanism'          => 'AMQPLAIN',
            'timeout'            => 10,
            // 重启间隔
            'restart_interval'   => 5,
            // 通道池
            'channels_pool'      => [
                'idle_timeout'     => 60,
                'wait_timeout'     => 10
            ],
            'client_properties' => [
                'name'     => 'workbunny/webman-rabbitmq',
                'version'  => \Composer\InstalledVersions::getVersion('workbunny/webman-rabbitmq')
            ],
            // 心跳回调 callable
            'heartbeat_callback' => null,
        ]
    ]
];

命令行

  • 构建:php webman workbunny:rabbitmq-builder -h
  • 移除/关闭:php webman workbunny:rabbitmq-remove -h
  • 列表:php webman workbunny:rabbitmq-list -h

延迟队列

延迟队列需要为 rabbitMQ 安装 rabbitmq_delayed_message_exchange 插件

  1. 进入 rabbitMQ 的 plugins 目录下执行命令下载插件(以rabbitMQ 3.10.2举例):
    wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.10.2/rabbitmq_delayed_message_exchange-3.10.2.ez
  2. 执行安装命令
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  3. 生产
    publish(new TestBuilder(), 'abc', headers: [
        'x-delay' => 10000, # 延迟10秒
    ]); # return bool
    注:向延迟队列发布普通消息会抛出一个 WebmanRabbitMQException 异常

注意

  • 不少第三方厂商不支持安装延迟队列插件
  • 当不支持安装延迟队列时,可以通过优先级队列 + REQUEUE实现
    • Builder支持通过REQUEUE标记进行消息重入队尾
    • 通过自定义header中的时间标记,和逻辑判断,当满足时间条件时则执行,不满足条件则通过REQUEUE将数据自动推回队尾
    • 为了减少数据延迟问题,使用优先级标识将时间较近的消息优先级定义高一些,而时间较长的数据优先级定义低一些
      • 队列通常支持0-9的优先级,合理分配时间段和优先级的匹配关系

生产

注:向延迟队列发布普通消息会抛出一个 WebmanRabbitMQPublishException 异常

  • 快捷发送

    use function Workbunny\WebmanRabbitMQ\publish;
    use process\workbunny\rabbitmq\TestBuilder;
    
    publish(new TestBuilder(), 'abc'); # return bool
  • Builder发送

    use process\workbunny\rabbitmq\TestBuilder;
    use Workbunny\WebmanRabbitMQ\Connection\ConnectionInterface;
    $builder = new TestBuilder();
    $body = 'abc';
    return $builder->action(function (ConnectionInterface $connection) use ($builder, $body) {
        $config = new BuilderConfig($builder->getBuilderConfig()());
        $config->setBody($body);
        $connection->publish($connection, $config)
    });
  • 原生发送,需要自行指定exchange等参数

    use Workbunny\WebmanRabbitMQ\BuilderConfig;
    use Workbunny\WebmanRabbitMQ\Connection\ConnectionInterface;
    use Workbunny\WebmanRabbitMQ\ConnectionsManagement;
    $config = new \Workbunny\WebmanRabbitMQ\BuilderConfig();
    $config->setExchange('your_exchange');
    $config->setRoutingKey('your_routing_key');
    $config->setQueue('your_queue');
    $config->setBody('abc');
    // 其他设置参数 ...
    
    // 使用 your_connection 配置连接发送
    return ConnectionsManagement::connection(function (ConnectionInterface $connection) use ($config) {
        $connection->publish($connection, $config)
    }, 'your_connection');