lwz/hyperf-rocketmq

hyperf rocketmq extend

v1.0.1 2022-05-25 12:18 UTC

This package is auto-updated.

Last update: 2025-03-26 07:53:56 UTC


README

基于 https://github.com/thefair-net/hyperf_rocketmq 进行封装

1、安装

composer require lwz/hyperf-rocketmq

2、配置

发布配置

php bin/hyperf.php vendor:publish lwz/hyperf-rocketmq

配置说明

配置类型默认值备注
hoststringHTTP协议客户端接入点
access_keystringAccessKey ID
secret_keystringAccessKey Secret
instance_idstring实例id
poolarray连接池配置
return [
    'default' => [ // 分组名,基于 host、port、scheme 进行区分
        'host' => env('ROCKETMQ_HTTP_HOST'),
        'access_key' => env('ROCKETMQ_HTTP_ACCESS_KEY_ID'),
        'secret_key' => env('ROCKET_MQ_HTTP_ACCESS_KEY_SECRET'),
        'instance_id' => env('ROCKET_MQ_HTTP_INSTANCE_ID'),
        'pool' => [
            'min_connections' => 50,
            'max_connections' => 300,
            'connect_timeout' => 3.0,
            'wait_timeout' => 30.0,
            'heartbeat' => -1,
            'max_idle_time' => 60.0,
        ],
    ],
];

3、创建相关数据表

如果不需要记录日志 或 消息不需要可靠投递,可以忽略这步

php bin/hyperf.php migrate --path=migrations/rocketmq

表说明:

mq_status_log:消息生产状态表

mq_produce_status_log:生成消息状态

mq_consume_log:消费日志

4、投递消息

Producer注解参数

字段名类型描述默认值
poolNamestring连接池名称。对应配置文件 rocketmq.php 中的keydefault
dbConnectionstring数据库连接名称(用于记录生产日志)default
topicstringtopic
messageKeystring消息key随机生成
messageTagstring消息标签

4.1 定义生产者相关信息

在 DemoProducer 文件中,我们可以修改 @Producer 注解对应的字段来替换对应的 poolNametopicmessageTag。就是最终投递到消息队列中的数据,所以我们可以随意改写 __construct 方法,只要最后赋值 payload 即可。

使用 @Producer 注解时需 use Lwz\HyperfRocketMQ\Annotation\Producer; 命名空间;

<?php
declare(strict_types=1);

namespace App\Test\Queue\Producer;

use Lwz\HyperfRocketMQ\Annotation\Producer;
use Lwz\HyperfRocketMQ\Message\ProducerMessage;

#[Producer(topic:"Topic_03_test", messageTag:"tMsgKey")]
class DemoProducer extends ProducerMessage
{
    public function __construct(array $data)
    {
        // 设置消息内容
        $this->setPayload($data);
        // 自定义messageKey(不定义,会自动生成)
        $this->setMessageKey('xxxxx');
    }
}

4.2 普通投递方式

通过Lwz\HyperfRocketMQ\Producer实例,即可投递消息。

<?php

declare(strict_types=1);

namespace App\Test\Controller;

use App\Test\Queue\Producer\DemoProducer;
use Core\Abstracts\AbstractController;
use Hyperf\Di\Annotation\Inject;
use Hyperf\HttpServer\Annotation\Controller;
use Hyperf\HttpServer\Annotation\GetMapping;
use Lwz\HyperfRocketMQ\Producer;

#[Controller(prefix: "test/bar")]
class BarController extends AbstractController
{
    /**
     * @Inject
     * @var Producer
     */
    protected Producer $producer;

    #[GetMapping("index")]
    public function index()
    {
        $demoProducer = new DemoProducer(['test' => 12345, 'name' => '张三']);
        $this->producer->produce($demoProducer);
        return $this->response->success();
    }
}

4.3 消息可靠投递方式

目前,消息投递时Rocketmq返回成功响应,就视为投递成功(暂不考虑Rocketmq缓存丢失的问题)。

实现原理:先将需要投递的消息入库处理,然后再进行发送操作

  1. 执行以下命令,生成相关的数据表

    php bin/hyperf.php migrate --path=migrations/rocketmq
    
  2. 使用示例

    $demoProducer = new BarProducer(['test' => 12345, 'name' => '张三1231']);
       
    Db::beginTransaction();
    try{
        // todo 业务逻辑
       
        // 记录消息状态
        $demoProducer->saveMessageStatus();
        Db::commit();
    } catch(\Throwable $ex){
        Db::rollBack();
    }
       
    // 推送消息
    $this->producer->produce($demoProducer);
    
  3. 投递失败的消息,可以通过守护进程监听 mq_status_log 数据表status不等于3的消息,进行重新投递(后面实现)

5、消息消费

Consumer注解属性说明

属性类型描述默认值
namestring消费名称
poolNamestring连接池名称。对应配置文件 rocketmq.php 中的keydefault
topicstringtopic
groupIdstring消费组id
messageTagstring消息标签
numOfMessageint每次拉取消息数3
waitSecondsint轮询等待时间3
processNumsint启动消费进程数1
enablebool是否初始化启动进程true

在 DemoConsumer文件中,我们可以修改 @Consumer 注解对应的字段来替换对应的 topicgroupIdmessageTag

使用 @Consumer 注解时需 use Lwz\HyperfRocketMQ\Annotation\Consumer; 命名空间;

use Lwz\HyperfRocketMQ\Annotation\Consumer;
use Lwz\HyperfRocketMQ\Library\Model\Message as RocketMQMessage;
use Lwz\HyperfRocketMQ\Message\ConsumerMessage;

#[Consumer(topic:"Topic_03_test",groupId: "test_test", messageTag:"tMsgKey")]
class DemoConsumer extends ConsumerMessage
{
    public function consumeMessage(RocketMQMessage $rocketMQMessage)
    {
        $msgTag = $rocketMQMessage->getMessageTag(); // 消息标签
        $msgKey = $rocketMQMessage->getMessageKey(); // 消息唯一标识
        $msgBody = $this->unserialize($rocketMQMessage->getMessageBody()); // 消息体
        $msgId = $rocketMQMessage->getMessageId();

        dump('消费端接收到消息:',$msgBody);
    }
}

6、事件说明

下面事件都在 Lwz\HyperfRocketMQ\Event 命名空间下

事件说明
AfterProduce消息生产成功触发的事件
BeforeConsume开启消费前触发的事件
AfterConsume成功消费后触发的事件
FailToConsume消费失败触发的事件