adminmatrix/matrix_swoole

基于 think-swoole 的二次开发扩展,新增 Gateway 服务支持和事件监听机制

Installs: 19

Dependents: 0

Suggesters: 0

Security: 0

pkg:composer/adminmatrix/matrix_swoole

1.0.1 2026-02-25 03:27 UTC

This package is auto-updated.

Last update: 2026-02-25 09:47:35 UTC


README

English | 中文

基于 think-swoole 二次开发的 Swoole 扩展,支持 HTTP、WebSocket、RPC 等功能,并新增了 Gateway 服务支持。

交流群

点击加群

功能特性

核心功能

  • HTTP 服务 - 高性能 HTTP Server,支持协程风格
  • WebSocket 服务 - 支持路由调度方式的 WebSocket 实现
  • RPC 远程调用 - 支持 JSON 格式的 RPC 协议
  • Gateway 服务 - 基于 GatewayWorker 架构的 WebSocket Gateway
  • 热更新 - 文件修改自动重载
  • 队列支持 - 集成 think-queue

新增 Gateway 服务

基于 GatewayWorker 架构实现的 WebSocket Gateway 服务,支持多 Gateway 服务配置和事件监听机制。

特性

  • 支持多个 Gateway 服务实例
  • GatewayWorker 风格的静态方法
  • 事件监听机制(onopen, onmessage, onclose)
  • 独立进程启动

安装

composer require adminmatrix/matrix_swoole

需要先安装 swoole 扩展。

配置

config/swoole.php 中配置服务参数:

return [
    'gateway' => [
        'enable' => true,
        'registerAddress' => '127.0.0.1',
        'registerPort' => 1236,
        'registerName' => 'websocket_register',
        'registerType' => 'text',
        'services' => [
            [
                'enable' => true,
                'name' => '聊天服务',
                'port' => 2346,
                'address' => '0.0.0.0',
                'handler' => \adminmatrix\swoole\gateway\Event::class,
                'worker_num' => 2,
            ],
        ],
    ],
];

使用方法

启动服务

php think swoole

启动后默认在 0.0.0.0:8080 启动 HTTP Server。

Gateway 使用示例

use adminmatrix\swoole\gateway\Gateway;

// 注册事件监听器
Gateway::getInstance()->on('gateway.onopen', function ($clientId, $request) {
    echo "客户端连接: {$clientId}\n";
});

Gateway::getInstance()->on('gateway.onmessage', function ($clientId, $message) {
    echo "收到消息: {$clientId} -> {$message}\n";
    // 广播消息
    Gateway::sendToAll([
        'type' => 'message',
        $clientId,
        'content' => $message
    ]);
});

Gateway 'from' =>::getInstance()->on('gateway.onclose', function ($clientId) {
    echo "客户端断开: {$clientId}\n";
});

支持的静态方法

  • Gateway::sendToAll($send_data, $client_id_array, $exclude_client_id, $raw) - 向所有客户端发送消息
  • Gateway::sendToClient($client_id, $send_data, $raw) - 向指定客户端发送消息
  • Gateway::isOnline($client_id) - 检查客户端是否在线
  • Gateway::getAllClientCount() - 获取在线客户端数量
  • Gateway::getAllClientIdList() - 获取所有在线客户端 ID 列表

WebSocket 使用

use \think\swoole\Websocket;
use \think\swoole\websocket\Event;
use \Swoole\WebSocket\Frame;

Route::get('ws', 'websocket/index');

class IndexController extends Controller {
    public function index() {
        return \think\swoole\helper\websocket()
            ->onOpen(function(Websocket $websocket, Request $request) {
                echo "WebSocket 连接打开\n";
            })
            ->onMessage(function(Websocket $websocket, Frame $frame) {
                $websocket->push(['type' => 'reply', 'data' => $frame->data]);
            })
            ->onClose(function(Websocket $websocket) {
                echo "WebSocket 连接关闭\n";
            });
    }
}

流式输出

class IndexController extends Controller {
    public function stream() {
        return \think\swoole\helper\iterator(value(function(){
            foreach(range(1, 10) as $i) {
                yield $i;
                sleep(1);
            }
        }));
    }
}

进程配置建议

Gateway 进程

  • 每个 Gateway 进程可处理 5000-10000 连接
  • 在线连接 < 10000:2 个 Gateway 进程
  • 每增加 10000 连接:增加 1 个 Gateway 进程

BusinessWorker 进程

  • 阻塞式 IO:CPU 核心数的 1-4 倍
  • 非阻塞式 IO:CPU 核心数的 4-8 倍

项目结构

src/
├── command/          # 命令行相关
├── concerns/         # Traits 特性
├── config/           # 配置文件
├── contract/         # 接口定义
├── coroutine/        # 协程工具
├── exception/        # 异常类
├── gateway/          # Gateway 服务
├── helpers.php       # 辅助函数
├── ipc/              # 进程间通信
├── listen/           # 事件监听
├── lock/             # 分布式锁
├── message/          # 消息类
├── middleware/       # 中间件
├── packet/           # 数据包处理
├── pool/             # 连接池
├── response/         # 响应类
├── resetters/        # 重置器
├── rpc/              # RPC 实现
├── support/          # 工具支持
├── watcher/          # 文件监控
└── websocket/        # WebSocket 实现

注意事项

  • 4.0 版本开始协程风格服务端默认不支持静态文件访问,建议使用 nginx 处理静态文件
  • 4.0 版本没有 task 进程,使用 think-queue 代替

License

使用方法

geteway

配置

config/swoole.php 配置文件

'gateway' => [
    'enable'=> true,  // 开启服务
    'services'  => [
        [
                'enable' => true,
                'name' => '聊天服务',
                'port' => 2346,
                'address' => '0.0.0.0',
                'handler' => \adminmatrix\swoole\gateway\Event::class,
                'worker_num' => swoole_cpu_num(),
                'options' => [],
                "eventHandler" => '',
                "pingInterval" => 30,
                "pingNotResponseLimit" => 1,
                "pingData" => '{"type":"ping"}',
            ],
       ... 更多服务
    ]
]

rpc 服务

RPC 服务核心

并发核心:PHP 的传统 FPM 模式不支持真正的并发,必须依赖 Swoole/Workerman 等协程 / 异步扩展,通过协程(Coroutine)实现 “同时发起多个 RPC 请求”。

耗时边界:总耗时≈单个调用的最长耗时(比如如果其中一个调用是 12 秒,另外两个是 10 秒,总耗时就是 12 秒),而非累加。

RPC 的作用:RPC 本身是 “跨服务调用的协议 / 方式”,解决的是不同服务之间的通信问题;而并发是 “调用方式”,二者结合才能实现你说的 “10 秒返回 3 个结果”。

串行执行(原本 30 秒的逻辑)

假设你要做三件事,每件都需要 10 分钟:

给快递员打电话(10 分钟):必须等打完这个电话,才能打下一个

给外卖员打电话(10 分钟):等快递电话打完,再打外卖电话

给维修师傅打电话(10 分钟):等外卖电话打完,再打维修电话

总耗时:10+10+10=30 分钟 → 对应你说的 “3 个 new 各 10 秒,总计 30 秒”。

RPC 并发调用(优化后 10 秒的逻辑)

还是这三件事,但你用了 “免提 + 同时拨号” 的方式:

先拨快递员电话(不挂,放免提),10 分钟后等回复

不等快递员回复,立刻拨外卖员电话(同样放免提),10 分钟后等回复

再立刻拨维修师傅电话(放免提),10 分钟后等回复

你只需要坐在旁边等,10 分钟后三个人几乎同时回复你,总耗时就是 10 分钟 → 对应 “RPC 并发调用,3 个操作同时执行,总耗时 10 秒”。

注意

如果需要使用RPC 必须要开启 swoole:run

php think swoole:run 运行后配置文件

config/swoole.php

'rpc'        => [
        'server' => [
            'enable'     => true,
            'host'       => '0.0.0.0',
            'port'       => 9000,
            'worker_num' => swoole_cpu_num(),
            'services'   => [
                \app\api\logic\v1\OrderLogic::class,
            ],
        ],
        'client' => [
        ],
    ],

实现实例

比如 我们要加载一个用户的订单数据

RPC

// app/rpc/interface/OrderService.php
<?php
declare(strict_types=1);
namespace app\rpc\interface;

interface OrderService
{
    public function getOrderList(int $page, int $pageSize): array;
}

业务逻辑 实际可以使用模型

//  app/api/logic/OrderLogic.php
<?php
declare(strict_types=1);

namespace app\api\logic;

/**
 * 订单服务 RPC 实现类
 * 该类实现了 OrderService 接口,用于响应 RPC 调用
 */
class OrderLogic implements \app\rpc\interface\OrderService
{
    /**
     * 获取订单列表
     * @param int $page 页码
     * @param int $pageSize 每页数量
     * @return array
     */
    public function getOrderList(int $page, int $pageSize): array
    {
        // 这里是示例实现,实际应该查询数据库
        return [
            'code' => 200,
            'msg'  => 'success',
            'data' => [
                'list' => [
                    ['id' => 1, 'order_no' => 'ORDER_001', 'amount' => 99.99],
                    ['id' => 2, 'order_no' => 'ORDER_002', 'amount' => 199.99],
                ],
                'page'      => $page,
                'pageSize'  => $pageSize,
                'total'     => 2,
            ],
        ];
    }
}

使用RPC 请求返回

<?

declare(strict_types=1);

namespace app\index\controller;
use app\rpc\interface\OrderService;
class Index{

    public function index(  OrderService $orderService)
    {

        halt($orderService->getOrderList(1, 10));
    }
}

携程

「锁」的作用就是:让同一时间只有一个进程能操作这个资源,其他进程必须等它用完,避免并发冲突。

  1. enable => false:锁的总开关

这个值是false时,整个锁机制都不生效,多进程可以随意竞争资源;设为true才会启用锁。 类比:办公室里的打印机(共享资源),如果没锁(enable=false),所有人都能同时去用,容易卡纸 / 打错;开了锁(enable=true),只有拿到钥匙的人能先用,其他人排队。

  1. type => 'table':本机进程的 “轻量锁”

table指的是Swoole Table,是 Swoole 内置的共享内存表,基于它实现的锁只适用于「同一台机器」的多进程。 特点:速度极快(内存级操作),但只能管本机的进程,跨机器无效。 类比:办公室内部的打印机钥匙,只有本办公室的人能用来排队,其他办公室的人管不到。

  1. redis配置:跨机器的 “分布式锁”

当你的 Swoole 服务部署在多台机器上(比如 2 台服务器都跑 Swoole),本机的table锁就没用了(A 机器的进程管不到 B 机器的进程),这时需要用 Redis 实现「分布式锁」—— 所有机器的进程都去 Redis 里 “抢锁”,谁抢到谁用资源。 配置里的max_active/max_wait_time:和之前 IPC 里的 Redis 配置逻辑一样,控制 Redis 连接池的大小和等待时间,避免 Redis 连接不够用导致锁机制卡壳。 类比:总公司的共享打印机,多个分公司(多台机器)的人都要用,就需要一个统一的 “锁管理中心”(Redis),所有人都去这抢钥匙,不管在哪办公都得排队。

协程

并发执行

协程说明

协程的核心思维(先理解本质,再看代码) 协程(Coroutine)是 Swoole 实现 “并发” 的核心,你可以把它理解为:

轻量级的 “微线程”,由 PHP 代码层面调度(而非操作系统),多个协程在同一个进程里 “交替执行”,遇到耗时操作(比如 sleep、网络请求)时,协程会 “让出 CPU” 给其他协程,从而实现 “看似同时执行” 的效果。

类比理解(和你之前的 “打电话” 例子呼应)

串行执行:你挨个打 3 个电话,每个打 1 分钟,总共 3 分钟(必须等上一个挂了才能打下一个); 协程并发:你用 “免提 + 多线拨号”,同时拨 3 个电话,每个电话都要等 1 分钟对方接,但你不用等,拨完就切到下一个,最终 1 分钟后 3 个电话都接通了,总耗时 1 分钟。

/**
     * 示例1: 协程并发执行多个任务
     * 使用 go() 创建协程,可以并发执行多个任务
     */
    protected function asyncMultipleTasks()
    {
        $results = [];
        $channel = new Channel(3);

        // 并发执行3个任务
        go(function () use ($channel) {
            // 模拟耗时任务1
            Coroutine::sleep(1); // 模拟1秒延迟
            $channel->push(['task' => '任务1', 'result' => '完成任务1', 'time' => microtime(true)]);
        });

        go(function () use ($channel) {
            // 模拟耗时任务2
            Coroutine::sleep(1);
            $channel->push(['task' => '任务2', 'result' => '完成任务2', 'time' => microtime(true)]);
        });

        go(function () use ($channel) {
            // 模拟耗时任务3
            Coroutine::sleep(1);
            $channel->push(['task' => '任务3', 'result' => '完成任务3', 'time' => microtime(true)]);
        });

        // 接收所有任务的结果(总共需要约1秒,而不是3秒)
        for ($i = 0; $i < 3; $i++) {
            $results[] = $channel->pop();
        }
        $channel->close();

        return [
            'type' => '并发多任1务',
            'total_time' => '约1秒(串行需要3秒)',
            'results' => $results
        ];
    }

go() 函数:

作用:创建一个新的协程,立即执行里面的匿名函数;

特点:调用 go () 后不会阻塞当前流程,3 个 go () 会在瞬间全部启动,相当于 “同时按下 3 个任务的启动键”。

Channel

作用:协程间的 “通信桥梁”,用于接收协程的执行结果(因为协程是异步的,不能直接 return 结果);

容量 3:表示最多能存 3 个结果,push () 会在通道满时阻塞,pop () 会在通道空时阻塞;

核心逻辑:每个协程完成后把结果 “推” 进通道,主流程通过 “取” 通道里的结果,等待所有协程完成。

Coroutine::sleep(1):

注意:这不是 PHP 原生的 sleep ()!原生 sleep () 会阻塞整个进程,而 Coroutine::sleep () 是 “协程休眠”,只会让当前协程暂停,CPU 会切给其他协程执行;

类比:你打第一个电话时,对方说 “等 1 分钟回你”,你不用干等,而是立刻打第二个电话,这就是协程 sleep 的作用

Coroutine\run():

作用:启动 Swoole 的协程调度器,所有协程必须在这个调度器里运行(相当于给协程提供 “执行环境”)。

注意点

1、协程的 “轻量性”:一个进程可以创建上万甚至十万个协程(远多于线程),因为协程是用户态调度,没有操作系统切换的开销; 2、Channel 的容量:如果创建 Channel 时容量设为 1,3 个协程的 push () 会排队,失去并发效果(所以容量要≥协程数); 3、协程安全:多个协程如果操作同一个变量(比如 $results 直接赋值),会出现 “竞态条件”(数据混乱),所以必须用 Channel 传递结果; 4、耗时操作必须是 “协程化” 的:只有 Swoole 提供的协程化函数(比如 Coroutine::sleep ()、Swoole\Coroutine\MySQL、Swoole\Coroutine\Redis)才会让出 CPU,原生的 file_get_contents ()、sleep () 依然会阻塞整个进程。

进阶

协程 可以与 RPC调用

可以尝试把示例里的 Coroutine::sleep (1) 换成实际的 RPC 调用 / 数据库查询

queue 消息列队

首先需要安装 thinkphp 的 topthink/think-queue

composer require topthink/think-queue

使用 命令行后; 可以不用再使用进程守护开启 php think queue:listen 开消息列队了

php think swoole:run

MIT License

测试数据结果

===========================================

=== 最终压测结果 ===

CPU核心数:物理8核 / 逻辑16核

压测配置:并发线程数10 | 总请求数10000

成功请求数:9999 | 失败请求数:1 | 成功率:99.99%

总耗时:51.21秒 | 平均响应时间:34.23毫秒

最终QPS:195.25

【系统资源峰值统计】

CPU占用:最高67.7% | 最低26.3% | 最终8.2%

内存占用:最高31.6% | 最低31.5% | 最终31.6%

=============================================================================