imiphp / imi-amqp
支持在 imi 框架中使用 支持 AMQP 协议的消息队列,如:RabbitMQ
Installs: 376
Dependents: 0
Suggesters: 0
Security: 0
Stars: 4
Watchers: 2
Forks: 2
pkg:composer/imiphp/imi-amqp
Requires
- imiphp/imi-queue: ~2.1.0
- php-amqplib/php-amqplib: ^2.12.0|^3.0.0
Requires (Dev)
- phpunit/phpunit: ~9.6
- swoole/ide-helper: ~4.8
- yurunsoft/ide-helper: ~1.0
- 3.0.x-dev
- 2.1.x-dev
- v2.1.34
- v2.1.33
- v2.1.32
- v2.1.31
- v2.1.30
- v2.1.29
- v2.1.28
- v2.1.27
- v2.1.26
- v2.1.25
- v2.1.24
- v2.1.23
- v2.1.22
- v2.1.21
- v2.1.20
- v2.1.19
- v2.1.18
- v2.1.17
- v2.1.16
- v2.1.15
- v2.1.14
- v2.1.13
- v2.1.12
- v2.1.11
- v2.1.10
- v2.1.9
- v2.1.8
- v2.1.7
- v2.1.6
- v2.1.5
- v2.1.4
- v2.1.3
- v2.1.2
- v2.1.1
- v2.1.0
- 2.0.x-dev
- v2.0.27
- v2.0.26
- v2.0.25
- v2.0.24
- v2.0.23
- v2.0.22
- v2.0.21
- v2.0.20
- v2.0.19
- v2.0.18
- v2.0.17
- v2.0.16
- v2.0.15
- v2.0.14
- v2.0.13
- v2.0.12
- v2.0.11
- v2.0.10
- v2.0.9
- v2.0.8
- v2.0.7
- v2.0.6
- v2.0.5
- v2.0.4
- v2.0.3
- v2.0.2
- v2.0.1
- v2.0.0
- 1.x-dev
- v1.2.8
- v1.2.7
- v1.2.6
- v1.2.5
- v1.2.4
- v1.2.3
- v1.2.2
- v1.2.1
- v1.2.0
- v1.1.0
- v1.0.3
- v1.0.2
- v1.0.1
- v1.0.0
This package is auto-updated.
Last update: 2025-09-29 03:05:28 UTC
README
介绍
支持在 imi 框架中使用 支持 AMQP 协议的消息队列,如:RabbitMQ
支持消息发布和消费
本仓库仅用于浏览,不接受 issue 和 Pull Requests,请前往:https://github.com/imiphp/imi
Composer
本项目可以使用composer安装,遵循psr-4自动加载规则,在你的 composer.json 中加入下面的内容:
{
"require": {
"imiphp/imi-amqp": "~2.0.0"
}
}
然后执行 composer update 安装。
使用说明
可以参考 example 目录示例,包括完整的消息发布和消费功能。
在项目 config/config.php 中配置:
[
'components' => [
// 引入组件
'AMQP' => 'Imi\AMQP',
],
]
连接池配置:
[
'pools' => [
'rabbit' => [
'sync' => [
'pool' => [
'class' => \Imi\AMQP\Pool\AMQPSyncPool::class,
'config' => [
'maxResources' => 10,
'minResources' => 0,
],
],
'resource' => [
'host' => '127.0.0.1',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
]
],
'async' => [
'pool' => [
'class' => \Imi\AMQP\Pool\AMQPCoroutinePool::class,
'config' => [
'maxResources' => 10,
'minResources' => 1,
],
],
'resource' => [
'host' => '127.0.0.1',
'port' => 5672,
'user' => 'guest',
'password' => 'guest',
]
],
],
]
]
默认连接池:
[
'beans' => [
'AMQP' => [
'defaultPoolName' => 'rabbit',
],
],
]
连接配置项
| 属性名称 | 说明 |
|---|---|
| host | 主机 |
| port | 端口 |
| user | 用户名 |
| vhost | vhost,默认/ |
| insist | insist |
| loginMethod | 默认AMQPLAIN |
| loginResponse | loginResponse |
| locale | 默认en_US |
| connectionTimeout | 连接超时 |
| readWriteTimeout | 读写超时 |
| keepalive | keepalive,默认false |
| heartbeat | 心跳时间,默认0 |
| channelRpcTimeout | 频道 RPC 超时时间,默认0.0 |
| sslProtocol | ssl 协议,默认null |
消息定义
继承 Imi\AMQP\Message 类,可在构造方法中对属性修改。
根据需要可以覆盖实现setBodyData、getBodyData方法,实现自定义的消息结构。
<?php namespace AMQPApp\AMQP\Test2; use Imi\AMQP\Message; class TestMessage2 extends Message { /** * 用户ID * * @var int */ private $memberId; /** * 内容 * * @var string */ private $content; public function __construct() { parent::__construct(); $this->routingKey = 'imi-2'; $this->format = \Imi\Util\Format\Json::class; } /** * 设置主体数据 * * @param mixed $data * @return self */ public function setBodyData($data) { foreach($data as $k => $v) { $this->$k = $v; } } /** * 获取主体数据 * * @return mixed */ public function getBodyData() { return [ 'memberId' => $this->memberId, 'content' => $this->content, ]; } /** * Get 用户ID * * @return int */ public function getMemberId() { return $this->memberId; } /** * Set 用户ID * * @param int $memberId 用户ID * * @return self */ public function setMemberId(int $memberId) { $this->memberId = $memberId; return $this; } /** * Get 内容 * * @return string */ public function getContent() { return $this->content; } /** * Set 内容 * * @param string $content 内容 * * @return self */ public function setContent(string $content) { $this->content = $content; return $this; } }
属性列表:
| 名称 | 说明 | 默认值 |
|---|---|---|
| bodyData | 消息主体内容,非字符串 | null |
| properties | 属性 | ['content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,] |
| routingKey | 路由键 | 空字符串 |
| format | 如果设置了,发布的消息是编码后的bodyData,同理读取时也会解码。实现了Imi\Util\Format\IFormat的格式化类。支持Json、PhpSerialize |
null |
| mandatory | mandatory标志位 | false |
| immediate | immediate标志位 | false |
| ticket | ticket | null |
发布者
必选注解:@Publisher
可选注解:@Queue、@Exchange、@Connection
不配置 @Connection 注解,可以从连接池中获取连接
<?php namespace AMQPApp\AMQP\Test; use Imi\Bean\Annotation\Bean; use Imi\AMQP\Annotation\Queue; use Imi\AMQP\Base\BasePublisher; use Imi\AMQP\Annotation\Consumer; use Imi\AMQP\Annotation\Exchange; use Imi\AMQP\Annotation\Publisher; use Imi\AMQP\Annotation\Connection; /** * @Bean("TestPublisher") * @Connection(host="127.0.0.1", port=5672, user="guest", password="guest") * @Publisher(tag="tag-imi", queue="queue-imi-1", exchange="exchange-imi", routingKey="imi-1") * @Queue(name="queue-imi-1", routingKey="imi-1") * @Exchange(name="exchange-imi") */ class TestPublisher extends BasePublisher { }
消费者
必选注解:@Consumer
可选注解:@Queue、@Exchange、@Connection
不配置 @Connection 注解,可以从连接池中获取连接
<?php namespace AMQPApp\AMQP\Test; use Imi\Redis\Redis; use Imi\Bean\Annotation\Bean; use Imi\AMQP\Annotation\Queue; use Imi\AMQP\Base\BaseConsumer; use Imi\AMQP\Contract\IMessage; use Imi\AMQP\Annotation\Consumer; use Imi\AMQP\Annotation\Exchange; use Imi\AMQP\Enum\ConsumerResult; use Imi\AMQP\Annotation\Connection; /** * 启动一个新连接消费 * * @Bean("TestConsumer") * @Connection(host="127.0.0.1", port=5672, user="guest", password="guest") * @Consumer(tag="tag-imi", queue="queue-imi-1", message=\AMQPApp\AMQP\Test\TestMessage::class) */ class TestConsumer extends BaseConsumer { /** * 消费任务 * * @param \AMQPApp\AMQP\Test\TestMessage $message */ protected function consume(IMessage $message): int { var_dump(__CLASS__, $message->getBody(), get_class($message)); Redis::set('imi-amqp:consume:1:' . $message->getMemberId(), $message->getBody()); return ConsumerResult::ACK; } }
注解说明
@Publisher
发布者注解
| 属性名称 | 说明 |
|---|---|
| queue | 队列名称 |
| exchange | 交换机名称 |
| routingKey | 路由键 |
@Consumer
消费者注解
| 属性名称 | 说明 |
|---|---|
| tag | 消费者标签 |
| queue | 队列名称 |
| exchange | 交换机名称 |
| routingKey | 路由键 |
| message | 消息类名,默认:Imi\AMQP\Message |
| mandatory | mandatory标志位 |
| immediate | immediate标志位 |
| ticket | ticket |
@Queue
队列注解
| 属性名称 | 说明 |
|---|---|
| name | 队列名称 |
| routingKey | 路由键 |
| passive | 被动模式,默认false |
| durable | 消息队列持久化,默认true |
| exclusive | 独占,默认false |
| autoDelete | 自动删除,默认false |
| nowait | 是否非阻塞,默认false |
| arguments | 参数 |
| ticket | ticket |
@Exchange
交换机注解
| 属性名称 | 说明 |
|---|---|
| name | 交换机名称 |
| type | 类型可选:direct、fanout、topic、headers |
| passive | 被动模式,默认false |
| durable | 消息队列持久化,默认true |
| autoDelete | 自动删除,默认false |
| internal | 设置是否为rabbitmq内部使用, true表示是内部使用, false表示不是内部使用 |
| nowait | 是否非阻塞,默认false |
| arguments | 参数 |
| ticket | ticket |
@Connection
连接注解
| 属性名称 | 说明 |
|---|---|
| poolName | 不为 null 时,无视其他属性,直接用该连接池配置。默认为null,如果host、port、user、password都未设置,则获取默认的连接池。 |
| host | 主机 |
| port | 端口 |
| user | 用户名 |
| vhost | vhost,默认/ |
| insist | insist |
| loginMethod | 默认AMQPLAIN |
| loginResponse | loginResponse |
| locale | 默认en_US |
| connectionTimeout | 连接超时 |
| readWriteTimeout | 读写超时 |
| keepalive | keepalive,默认false |
| heartbeat | 心跳时间,默认0 |
| channelRpcTimeout | 频道 RPC 超时时间,默认0.0 |
| sslProtocol | ssl 协议,默认null |
队列组件支持
本组件额外实现了 imiphp/imi-queue 的接口,可以用 Queue 组件的 API 进行调用。
只需要将队列驱动配置为:AMQPQueueDriver
配置示例:
[
'components' => [
'AMQP' => 'Imi\AMQP',
],
'beans' => [
'AutoRunProcessManager' => [
'processes' => [
// 加入队列消费进程,非必须,你也可以自己写进程消费
'QueueConsumer',
],
],
'imiQueue' => [
// 默认队列
'default' => 'test1',
// 队列列表
'list' => [
// 队列名称
'test1' => [
// 使用的队列驱动
'driver' => 'AMQPQueueDriver',
// 消费协程数量
'co' => 1,
// 消费进程数量;可能会受进程分组影响,以同一组中配置的最多进程数量为准
'process' => 1,
// 消费循环尝试 pop 的时间间隔,单位:秒(仅使用消费者类时有效)
'timespan' => 0.1,
// 进程分组名称
'processGroup' => 'a',
// 自动消费
'autoConsumer' => true,
// 消费者类
'consumer' => 'AConsumer',
// 驱动类所需要的参数数组
'config' => [
// AMQP 连接池名称
'poolName' => 'amqp',
// Redis 连接池名称
'redisPoolName'=> 'redis',
// Redis 键名前缀
'redisPrefix' => 'test1:',
// 可选配置:
// 支持消息删除功能,依赖 Redis
'supportDelete' => true,
// 支持消费超时队列功能,依赖 Redis,并且自动增加一个队列
'supportTimeout' => true,
// 支持消费失败队列功能,自动增加一个队列
'supportFail' => true,
// 循环尝试 pop 的时间间隔,单位:秒
'timespan' => 0.03,
// 本地缓存的队列长度。由于 AMQP 不支持主动pop,而是主动推送,所以本地会有缓存队列,这个队列不宜过大。
'queueLength' => 16,
// 消息类名
'message' => \Imi\AMQP\Queue\JsonAMQPMessage::class,
]
],
],
],
]
]
消费者类写法,与imi-queue组件用法一致。
具体可以参考:<example/AMQP/QueueTest>、<example/ApiServer/Controller/IndexController.php>
免费技术支持
运行环境
版权信息
imi-amqp 遵循 MIT 开源协议发布,并提供免费使用。
捐赠
开源不求盈利,多少都是心意,生活不易,随缘随缘……