workbunny / webman-rabbitmq
Webman plugin workbunny/webman-rabbitmq
Installs: 3 344
Dependents: 0
Suggesters: 0
Security: 0
Stars: 33
Watchers: 1
Forks: 6
Open Issues: 2
pkg:composer/workbunny/webman-rabbitmq
Requires
- php: ^8.1
- bunny/bunny: ^0.5
- psr/log: ^3.0
Requires (Dev)
- friendsofphp/php-cs-fixer: ^3.0
- phpunit/phpunit: ^10.0
- revolt/event-loop: ^1.0
- swow/swow-stub: ^1.0
- symfony/var-dumper: ^6.0 | ^7.0
- webman/console: ^2.0
- workerman/webman-framework: ^2.0
Suggests
- ext-swoole: supported Event/Swoole
- ext-swow: supported Event/Swow
- revolt/event-loop: supported Event/Fiber
- webman/console: supported builder commands
- dev-master / 3.x-dev
- 3.0.0-RC.1
- 3.0.0-alpha.3
- 3.0.0-alpha.2
- 3.0.0-alpha.1
- 2.x-dev
- 2.4.1
- 2.4.0
- 2.3.5
- 2.3.4
- 2.3.3
- 2.3.2
- 2.3.1
- 2.3.0
- 2.2.1
- 2.2.0
- 2.1.7
- 2.1.6
- 2.1.5
- 2.1.4
- 2.1.3
- 2.1.2
- 2.1.1
- 2.1.0
- 2.0.2
- 2.0.1
- 2.0.0
- 2.0.0-beta.4
- 2.0.0-beta.3
- 2.0.0-beta.2
- 2.0.0-beta-1
- 1.x-dev
- 1.0.11
- 1.0.10
- 1.0.9
- 1.0.8
- 1.0.7
- 1.0.6
- 1.0.5
- 1.0.4
- 1.0.3
- 1.0.2
- 1.0.1
- 1.0.0
This package is auto-updated.
Last update: 2026-02-16 04:42:57 UTC
README
workbunny/webman-rabbitmq
🐇 A PHP implementation of RabbitMQ Client for webman plugin. 🐇
A PHP implementation of RabbitMQ Client for webman plugin
说明
- 此文档为3.0
- 2.0 LTS 2.0文档
简介
适配Workerman/webman的AMQP组件包
- 支持基于
AMQP协议工具实现AMQP-Server - 支持5种消费模式:简单队列、workQueue、routing、pub/sub、exchange;
- 支持延迟队列(rabbitMQ须安装插件);
- 支持连接池,支持通道池,
Builder支持影子模式(并发补偿); - 3.0与之前版本相比,更符合
AMQO协议约定,更合理的架构设计和使用逻辑- 使用
ConnectionManagement多连接管理器管理Connection(Client),合理复用机制及并发使用能力 - 使用
Channel-Pool管理Channel,合理的复用和并发机制 - 提供
AMQP协议包,可供开发者自定义实现AMQP-Client或AMQP-Server,并提供AMQP-Frame协议帧工具
- 使用
概念
┌───────────┐
| Builder A | ──┐
└───────────┘ | | ┌───────────┐
| | | Channel 1 |
| | └───────────┘
┌───────────┐ └─> ┌──────────────────┐ | ┌───────────┐
| Builder A | ────> | Connections Pool | ── connection ──> | | Channel 2 |
└───────────┘ ┌─> └──────────────────┘ min ... MAX | └───────────┘
| <static> <context> | ┌───────────┐
| | | Channel 3 |
┌───────────┐ | | └───────────┘
| Builder C | ──┘ ...
└───────────┘ channel-max
Builder:队列消费者、生产者的抽象结构,类似ORM的ModelBuilderConfig: 队列配置结构Builder可以指定不同的connection配置进行连接,以区分业务/服务Builder的publish/consume使用了影子模式(当前Connection的Channel耗尽时,会自动从Connection Pool获取新的连接创建Channel)- 影子模式下请尽量将
Connection Pool和Channels Pool的配置wait_timeout改小,避免过长时间的等待(等待中会出让控制权,不会阻塞)
- 影子模式下请尽量将
Connection:基于AsyncTcpConnection封装的AMQP-clientConnection由ConnectionManagement管理,连接池为静态,不会因为Builder的释放而释放Connection Pool中通过get拿取Connection后需要手动调用release归还,或者使用action通过传入回调函数来执行并自动归还- 配置信息:
min_connections: 最小连接数max_connections: 最大连接数idel_timeout: 空闲回收时间 [s]wait_timeout: 等待连接超时时间 [s]
Channel:抽象的通道对象- 每一个
Connection都具备一个Channel池- 多协程时,自动创建新的
Channel消费,并在协程结束后自动归还/释放 - 单协程时,复用
Channel消费
- 多协程时,自动创建新的
- 配置信息:
idel_timeout: 空闲回收时间 [s]wait_timeout: 等待连接超时时间 [s]
- 每一个
AMQP:workerman支持的协议封装
使用
要求
- php >= 8.1
- webman-framework >= 2.0 或 workerman >= 5.1
- rabbitmq-server >= 3.10
安装
composer require workbunny/webman-rabbitmq
配置
基础配置 app.php
<?php declare(strict_types=1); return [ 'enable' => true, // 日志 LoggerInterface | LoggerInterface::class 'logger' => null, ];
连接配置 connections.php
<?php declare(strict_types=1); use Workbunny\WebmanRabbitMQ\Clients\AbstractClient; use Workbunny\WebmanRabbitMQ\Connections\Connection; return [ 'default' => [ 'connection' => Connection::class, // 连接池 'connections_pool' => [ 'min_connections' => 1, 'max_connections' => 20, 'idle_timeout' => 60, 'wait_timeout' => 10 ], 'config' => [ 'host' => 'rabbitmq', 'vhost' => '/', 'port' => 5672, 'username' => 'guest', 'password' => 'guest', 'mechanism' => 'AMQPLAIN', 'timeout' => 10, // 重启间隔 'restart_interval' => 5, // 通道池 'channels_pool' => [ 'idle_timeout' => 60, 'wait_timeout' => 10 ], 'client_properties' => [ 'name' => 'workbunny/webman-rabbitmq', 'version' => \Composer\InstalledVersions::getVersion('workbunny/webman-rabbitmq') ], // 心跳回调 callable 'heartbeat_callback' => null, ] ] ];
命令行
- 构建:
php webman workbunny:rabbitmq-builder -h - 移除/关闭:
php webman workbunny:rabbitmq-remove -h - 列表:
php webman workbunny:rabbitmq-list -h
延迟队列
延迟队列需要为 rabbitMQ 安装 rabbitmq_delayed_message_exchange 插件
- 进入 rabbitMQ 的 plugins 目录下执行命令下载插件(以rabbitMQ 3.10.2举例):
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.10.2/rabbitmq_delayed_message_exchange-3.10.2.ez
- 执行安装命令
rabbitmq-plugins enable rabbitmq_delayed_message_exchange - 生产
publish(new TestBuilder(), 'abc', headers: [ 'x-delay' => 10000, # 延迟10秒 ]); # return bool
注:向延迟队列发布普通消息会抛出一个 WebmanRabbitMQException 异常
注意
- 不少第三方厂商不支持安装延迟队列插件
- 当不支持安装延迟队列时,可以通过优先级队列 +
REQUEUE实现Builder支持通过REQUEUE标记进行消息重入队尾- 通过自定义
header中的时间标记,和逻辑判断,当满足时间条件时则执行,不满足条件则通过REQUEUE将数据自动推回队尾 - 为了减少数据延迟问题,使用优先级标识将时间较近的消息优先级定义高一些,而时间较长的数据优先级定义低一些
- 队列通常支持
0-9的优先级,合理分配时间段和优先级的匹配关系
- 队列通常支持
生产
注:向延迟队列发布普通消息会抛出一个 WebmanRabbitMQPublishException 异常
-
快捷发送
use function Workbunny\WebmanRabbitMQ\publish; use process\workbunny\rabbitmq\TestBuilder; publish(new TestBuilder(), 'abc'); # return bool
-
Builder发送use process\workbunny\rabbitmq\TestBuilder; use Workbunny\WebmanRabbitMQ\Connection\ConnectionInterface; $builder = new TestBuilder(); $body = 'abc'; return $builder->action(function (ConnectionInterface $connection) use ($builder, $body) { $config = new BuilderConfig($builder->getBuilderConfig()()); $config->setBody($body); $connection->publish($connection, $config) });
-
原生发送,需要自行指定
exchange等参数use Workbunny\WebmanRabbitMQ\BuilderConfig; use Workbunny\WebmanRabbitMQ\Connection\ConnectionInterface; use Workbunny\WebmanRabbitMQ\ConnectionsManagement; $config = new \Workbunny\WebmanRabbitMQ\BuilderConfig(); $config->setExchange('your_exchange'); $config->setRoutingKey('your_routing_key'); $config->setQueue('your_queue'); $config->setBody('abc'); // 其他设置参数 ... // 使用 your_connection 配置连接发送 return ConnectionsManagement::connection(function (ConnectionInterface $connection) use ($config) { $connection->publish($connection, $config) }, 'your_connection');