adminmatrix / matrix_swoole
基于 think-swoole 的二次开发扩展,新增 Gateway 服务支持和事件监听机制
Requires
- php: ^8.0
- ext-json: *
- ext-swoole: ^4.0|^5.0|^6.0
- nette/php-generator: ^4.0
- open-smf/connection-pool: >=1.0
- stechstudio/backoff: ^1.2
- swoole/ide-helper: ^5.0
- symfony/finder: >=4.3
- symfony/process: >=4.2
- topthink/framework: ^6.0|^8.0
- workerman/gateway-worker: ^3.1
- workerman/gatewayclient: ^3.1
Requires (Dev)
- pestphp/pest: ^3.7
- phpstan/phpstan: ^2.0
- topthink/think-queue: ^3.0
- topthink/think-tracing: ^1.0
This package is auto-updated.
Last update: 2026-02-25 09:47:35 UTC
README
English | 中文
基于 think-swoole 二次开发的 Swoole 扩展,支持 HTTP、WebSocket、RPC 等功能,并新增了 Gateway 服务支持。
交流群
功能特性
核心功能
- HTTP 服务 - 高性能 HTTP Server,支持协程风格
- WebSocket 服务 - 支持路由调度方式的 WebSocket 实现
- RPC 远程调用 - 支持 JSON 格式的 RPC 协议
- Gateway 服务 - 基于 GatewayWorker 架构的 WebSocket Gateway
- 热更新 - 文件修改自动重载
- 队列支持 - 集成 think-queue
新增 Gateway 服务
基于 GatewayWorker 架构实现的 WebSocket Gateway 服务,支持多 Gateway 服务配置和事件监听机制。
特性
- 支持多个 Gateway 服务实例
- GatewayWorker 风格的静态方法
- 事件监听机制(onopen, onmessage, onclose)
- 独立进程启动
安装
composer require adminmatrix/matrix_swoole
需要先安装 swoole 扩展。
配置
在 config/swoole.php 中配置服务参数:
return [
'gateway' => [
'enable' => true,
'registerAddress' => '127.0.0.1',
'registerPort' => 1236,
'registerName' => 'websocket_register',
'registerType' => 'text',
'services' => [
[
'enable' => true,
'name' => '聊天服务',
'port' => 2346,
'address' => '0.0.0.0',
'handler' => \adminmatrix\swoole\gateway\Event::class,
'worker_num' => 2,
],
],
],
];
使用方法
启动服务
php think swoole
启动后默认在 0.0.0.0:8080 启动 HTTP Server。
Gateway 使用示例
use adminmatrix\swoole\gateway\Gateway;
// 注册事件监听器
Gateway::getInstance()->on('gateway.onopen', function ($clientId, $request) {
echo "客户端连接: {$clientId}\n";
});
Gateway::getInstance()->on('gateway.onmessage', function ($clientId, $message) {
echo "收到消息: {$clientId} -> {$message}\n";
// 广播消息
Gateway::sendToAll([
'type' => 'message',
$clientId,
'content' => $message
]);
});
Gateway 'from' =>::getInstance()->on('gateway.onclose', function ($clientId) {
echo "客户端断开: {$clientId}\n";
});
支持的静态方法
Gateway::sendToAll($send_data, $client_id_array, $exclude_client_id, $raw)- 向所有客户端发送消息Gateway::sendToClient($client_id, $send_data, $raw)- 向指定客户端发送消息Gateway::isOnline($client_id)- 检查客户端是否在线Gateway::getAllClientCount()- 获取在线客户端数量Gateway::getAllClientIdList()- 获取所有在线客户端 ID 列表
WebSocket 使用
use \think\swoole\Websocket;
use \think\swoole\websocket\Event;
use \Swoole\WebSocket\Frame;
Route::get('ws', 'websocket/index');
class IndexController extends Controller {
public function index() {
return \think\swoole\helper\websocket()
->onOpen(function(Websocket $websocket, Request $request) {
echo "WebSocket 连接打开\n";
})
->onMessage(function(Websocket $websocket, Frame $frame) {
$websocket->push(['type' => 'reply', 'data' => $frame->data]);
})
->onClose(function(Websocket $websocket) {
echo "WebSocket 连接关闭\n";
});
}
}
流式输出
class IndexController extends Controller {
public function stream() {
return \think\swoole\helper\iterator(value(function(){
foreach(range(1, 10) as $i) {
yield $i;
sleep(1);
}
}));
}
}
进程配置建议
Gateway 进程
- 每个 Gateway 进程可处理 5000-10000 连接
- 在线连接 < 10000:2 个 Gateway 进程
- 每增加 10000 连接:增加 1 个 Gateway 进程
BusinessWorker 进程
- 阻塞式 IO:CPU 核心数的 1-4 倍
- 非阻塞式 IO:CPU 核心数的 4-8 倍
项目结构
src/
├── command/ # 命令行相关
├── concerns/ # Traits 特性
├── config/ # 配置文件
├── contract/ # 接口定义
├── coroutine/ # 协程工具
├── exception/ # 异常类
├── gateway/ # Gateway 服务
├── helpers.php # 辅助函数
├── ipc/ # 进程间通信
├── listen/ # 事件监听
├── lock/ # 分布式锁
├── message/ # 消息类
├── middleware/ # 中间件
├── packet/ # 数据包处理
├── pool/ # 连接池
├── response/ # 响应类
├── resetters/ # 重置器
├── rpc/ # RPC 实现
├── support/ # 工具支持
├── watcher/ # 文件监控
└── websocket/ # WebSocket 实现
注意事项
- 4.0 版本开始协程风格服务端默认不支持静态文件访问,建议使用 nginx 处理静态文件
- 4.0 版本没有 task 进程,使用 think-queue 代替
License
使用方法
geteway
配置
config/swoole.php 配置文件
'gateway' => [
'enable'=> true, // 开启服务
'services' => [
[
'enable' => true,
'name' => '聊天服务',
'port' => 2346,
'address' => '0.0.0.0',
'handler' => \adminmatrix\swoole\gateway\Event::class,
'worker_num' => swoole_cpu_num(),
'options' => [],
"eventHandler" => '',
"pingInterval" => 30,
"pingNotResponseLimit" => 1,
"pingData" => '{"type":"ping"}',
],
... 更多服务
]
]
rpc 服务
RPC 服务核心
并发核心:PHP 的传统 FPM 模式不支持真正的并发,必须依赖 Swoole/Workerman 等协程 / 异步扩展,通过协程(Coroutine)实现 “同时发起多个 RPC 请求”。
耗时边界:总耗时≈单个调用的最长耗时(比如如果其中一个调用是 12 秒,另外两个是 10 秒,总耗时就是 12 秒),而非累加。
RPC 的作用:RPC 本身是 “跨服务调用的协议 / 方式”,解决的是不同服务之间的通信问题;而并发是 “调用方式”,二者结合才能实现你说的 “10 秒返回 3 个结果”。
串行执行(原本 30 秒的逻辑)
假设你要做三件事,每件都需要 10 分钟:
给快递员打电话(10 分钟):必须等打完这个电话,才能打下一个
给外卖员打电话(10 分钟):等快递电话打完,再打外卖电话
给维修师傅打电话(10 分钟):等外卖电话打完,再打维修电话
总耗时:10+10+10=30 分钟 → 对应你说的 “3 个 new 各 10 秒,总计 30 秒”。
RPC 并发调用(优化后 10 秒的逻辑)
还是这三件事,但你用了 “免提 + 同时拨号” 的方式:
先拨快递员电话(不挂,放免提),10 分钟后等回复
不等快递员回复,立刻拨外卖员电话(同样放免提),10 分钟后等回复
再立刻拨维修师傅电话(放免提),10 分钟后等回复
你只需要坐在旁边等,10 分钟后三个人几乎同时回复你,总耗时就是 10 分钟 → 对应 “RPC 并发调用,3 个操作同时执行,总耗时 10 秒”。
注意
如果需要使用RPC 必须要开启 swoole:run
php think swoole:run 运行后配置文件
config/swoole.php
'rpc' => [
'server' => [
'enable' => true,
'host' => '0.0.0.0',
'port' => 9000,
'worker_num' => swoole_cpu_num(),
'services' => [
\app\api\logic\v1\OrderLogic::class,
],
],
'client' => [
],
],
实现实例
比如 我们要加载一个用户的订单数据
RPC
// app/rpc/interface/OrderService.php
<?php
declare(strict_types=1);
namespace app\rpc\interface;
interface OrderService
{
public function getOrderList(int $page, int $pageSize): array;
}
业务逻辑 实际可以使用模型
// app/api/logic/OrderLogic.php
<?php
declare(strict_types=1);
namespace app\api\logic;
/**
* 订单服务 RPC 实现类
* 该类实现了 OrderService 接口,用于响应 RPC 调用
*/
class OrderLogic implements \app\rpc\interface\OrderService
{
/**
* 获取订单列表
* @param int $page 页码
* @param int $pageSize 每页数量
* @return array
*/
public function getOrderList(int $page, int $pageSize): array
{
// 这里是示例实现,实际应该查询数据库
return [
'code' => 200,
'msg' => 'success',
'data' => [
'list' => [
['id' => 1, 'order_no' => 'ORDER_001', 'amount' => 99.99],
['id' => 2, 'order_no' => 'ORDER_002', 'amount' => 199.99],
],
'page' => $page,
'pageSize' => $pageSize,
'total' => 2,
],
];
}
}
使用RPC 请求返回
<?
declare(strict_types=1);
namespace app\index\controller;
use app\rpc\interface\OrderService;
class Index{
public function index( OrderService $orderService)
{
halt($orderService->getOrderList(1, 10));
}
}
携程
锁
「锁」的作用就是:让同一时间只有一个进程能操作这个资源,其他进程必须等它用完,避免并发冲突。
- enable => false:锁的总开关
这个值是false时,整个锁机制都不生效,多进程可以随意竞争资源;设为true才会启用锁。 类比:办公室里的打印机(共享资源),如果没锁(enable=false),所有人都能同时去用,容易卡纸 / 打错;开了锁(enable=true),只有拿到钥匙的人能先用,其他人排队。
- type => 'table':本机进程的 “轻量锁”
table指的是Swoole Table,是 Swoole 内置的共享内存表,基于它实现的锁只适用于「同一台机器」的多进程。 特点:速度极快(内存级操作),但只能管本机的进程,跨机器无效。 类比:办公室内部的打印机钥匙,只有本办公室的人能用来排队,其他办公室的人管不到。
- redis配置:跨机器的 “分布式锁”
当你的 Swoole 服务部署在多台机器上(比如 2 台服务器都跑 Swoole),本机的table锁就没用了(A 机器的进程管不到 B 机器的进程),这时需要用 Redis 实现「分布式锁」—— 所有机器的进程都去 Redis 里 “抢锁”,谁抢到谁用资源。 配置里的max_active/max_wait_time:和之前 IPC 里的 Redis 配置逻辑一样,控制 Redis 连接池的大小和等待时间,避免 Redis 连接不够用导致锁机制卡壳。 类比:总公司的共享打印机,多个分公司(多台机器)的人都要用,就需要一个统一的 “锁管理中心”(Redis),所有人都去这抢钥匙,不管在哪办公都得排队。
协程
并发执行
协程说明
协程的核心思维(先理解本质,再看代码) 协程(Coroutine)是 Swoole 实现 “并发” 的核心,你可以把它理解为:
轻量级的 “微线程”,由 PHP 代码层面调度(而非操作系统),多个协程在同一个进程里 “交替执行”,遇到耗时操作(比如 sleep、网络请求)时,协程会 “让出 CPU” 给其他协程,从而实现 “看似同时执行” 的效果。
类比理解(和你之前的 “打电话” 例子呼应)
串行执行:你挨个打 3 个电话,每个打 1 分钟,总共 3 分钟(必须等上一个挂了才能打下一个); 协程并发:你用 “免提 + 多线拨号”,同时拨 3 个电话,每个电话都要等 1 分钟对方接,但你不用等,拨完就切到下一个,最终 1 分钟后 3 个电话都接通了,总耗时 1 分钟。
/**
* 示例1: 协程并发执行多个任务
* 使用 go() 创建协程,可以并发执行多个任务
*/
protected function asyncMultipleTasks()
{
$results = [];
$channel = new Channel(3);
// 并发执行3个任务
go(function () use ($channel) {
// 模拟耗时任务1
Coroutine::sleep(1); // 模拟1秒延迟
$channel->push(['task' => '任务1', 'result' => '完成任务1', 'time' => microtime(true)]);
});
go(function () use ($channel) {
// 模拟耗时任务2
Coroutine::sleep(1);
$channel->push(['task' => '任务2', 'result' => '完成任务2', 'time' => microtime(true)]);
});
go(function () use ($channel) {
// 模拟耗时任务3
Coroutine::sleep(1);
$channel->push(['task' => '任务3', 'result' => '完成任务3', 'time' => microtime(true)]);
});
// 接收所有任务的结果(总共需要约1秒,而不是3秒)
for ($i = 0; $i < 3; $i++) {
$results[] = $channel->pop();
}
$channel->close();
return [
'type' => '并发多任1务',
'total_time' => '约1秒(串行需要3秒)',
'results' => $results
];
}
go() 函数:
作用:创建一个新的协程,立即执行里面的匿名函数;
特点:调用 go () 后不会阻塞当前流程,3 个 go () 会在瞬间全部启动,相当于 “同时按下 3 个任务的启动键”。
Channel
作用:协程间的 “通信桥梁”,用于接收协程的执行结果(因为协程是异步的,不能直接 return 结果);
容量 3:表示最多能存 3 个结果,push () 会在通道满时阻塞,pop () 会在通道空时阻塞;
核心逻辑:每个协程完成后把结果 “推” 进通道,主流程通过 “取” 通道里的结果,等待所有协程完成。
Coroutine::sleep(1):
注意:这不是 PHP 原生的 sleep ()!原生 sleep () 会阻塞整个进程,而 Coroutine::sleep () 是 “协程休眠”,只会让当前协程暂停,CPU 会切给其他协程执行;
类比:你打第一个电话时,对方说 “等 1 分钟回你”,你不用干等,而是立刻打第二个电话,这就是协程 sleep 的作用
Coroutine\run():
作用:启动 Swoole 的协程调度器,所有协程必须在这个调度器里运行(相当于给协程提供 “执行环境”)。
注意点
1、协程的 “轻量性”:一个进程可以创建上万甚至十万个协程(远多于线程),因为协程是用户态调度,没有操作系统切换的开销; 2、Channel 的容量:如果创建 Channel 时容量设为 1,3 个协程的 push () 会排队,失去并发效果(所以容量要≥协程数); 3、协程安全:多个协程如果操作同一个变量(比如 $results 直接赋值),会出现 “竞态条件”(数据混乱),所以必须用 Channel 传递结果; 4、耗时操作必须是 “协程化” 的:只有 Swoole 提供的协程化函数(比如 Coroutine::sleep ()、Swoole\Coroutine\MySQL、Swoole\Coroutine\Redis)才会让出 CPU,原生的 file_get_contents ()、sleep () 依然会阻塞整个进程。
进阶
协程 可以与 RPC调用
可以尝试把示例里的 Coroutine::sleep (1) 换成实际的 RPC 调用 / 数据库查询
queue 消息列队
首先需要安装 thinkphp 的 topthink/think-queue
composer require topthink/think-queue
使用 命令行后; 可以不用再使用进程守护开启 php think queue:listen 开消息列队了
php think swoole:run
MIT License
测试数据结果
===========================================
=== 最终压测结果 ===
CPU核心数:物理8核 / 逻辑16核
压测配置:并发线程数10 | 总请求数10000
成功请求数:9999 | 失败请求数:1 | 成功率:99.99%
总耗时:51.21秒 | 平均响应时间:34.23毫秒
最终QPS:195.25
【系统资源峰值统计】
CPU占用:最高67.7% | 最低26.3% | 最终8.2%
内存占用:最高31.6% | 最低31.5% | 最终31.6%
=============================================================================