yng / phpqueue
A simple and powerful PHP queue package with Redis support
dev-master
2025-09-01 08:08 UTC
Requires
- php: >=7.4
- predis/predis: ^1.1|^2.0
Requires (Dev)
- phpunit/phpunit: ^9.0|^10.0
This package is auto-updated.
Last update: 2025-09-01 08:08:25 UTC
README
一个轻量级、高性能的PHP队列处理包,支持Redis驱动,提供完整的任务队列解决方案。
特性
- 🚀 高性能Redis队列驱动
- 📦 开箱即用,零配置启动
- 🔧 灵活的配置系统
- ✨ 灵活的配置系统
- 📊 完整的任务监控和状态管理
- 🔄 任务重试和失败处理
- 📝 详细的日志记录
- 🎯 支持多队列和优先级
- 🛡️ 内存和时间限制保护
- 🔌 易于集成到现有项目
安装
使用 Composer 安装
composer require yng/phpqueue
手动安装
- 克隆仓库到你的项目目录:
git clone https://github.com/yng666/phpqueue.git
- 在你的项目中引入自动加载文件:
require_once 'vendor/autoload.php'; // 或者如果没有使用composer require_once 'path/to/phpqueue/src/autoload.php';
快速开始
1. 基本初始化
use Yng\PhpQueue\QueuePackage; use Yng\PhpQueue\Jobs\Job; // 使用默认配置初始化 QueuePackage::initWithDefaults(); // 或者指定数据目录 QueuePackage::initWithDefaults('/path/to/your/data'); // 或者从环境变量初始化 QueuePackage::initFromEnv();
2. 创建任务类
use Yng\PhpQueue\Jobs\Job; class SendEmailJob extends Job { protected $email; protected $subject; protected $content; public function __construct($email, $subject, $content) { $this->email = $email; $this->subject = $subject; $this->content = $content; } public function handle() { // 发送邮件的逻辑 mail($this->email, $this->subject, $this->content); $this->log('info', "Email sent to {$this->email}"); } public function failed(\Exception $exception) { $this->log('error', "Failed to send email to {$this->email}: " . $exception->getMessage()); } }
3. 推送任务到队列
// 获取队列管理器 $queueManager = QueuePackage::getQueueManager(); $queue = $queueManager->connection(); // 创建并推送任务 $job = new SendEmailJob('user@example.com', 'Hello', 'This is a test email'); $queue->push($job); // 推送到指定队列 $queue->push($job, 'emails'); // 延迟执行(60秒后) $queue->later(60, $job);
4. 启动工作进程
// 获取工作进程 $worker = QueuePackage::getWorker(); // 处理默认队列的任务 $worker->work('default'); // 处理指定队列的任务 $worker->work('emails'); // 只处理一个任务后退出 $worker->work('default', ['once' => true]);
命令行工具
包提供了便捷的命令行工具:
启动工作进程
# 处理默认队列 php vendor/bin/phpqueue work # 或者 php bin/phpqueue work # 处理指定队列 php vendor/bin/phpqueue work --queue=emails # 只处理一个任务 php vendor/bin/phpqueue work --once # 设置内存限制 php vendor/bin/phpqueue work --memory=256 # 设置超时时间 php vendor/bin/phpqueue work --timeout=120
查看队列状态
php vendor/bin/phpqueue status
# 或者
php bin/phpqueue status
重启工作进程
php vendor/bin/phpqueue restart
# 或者
php bin/phpqueue restart
配置
环境变量配置
创建 .env
文件或设置环境变量:
# Redis配置 REDIS_HOST=127.0.0.1 REDIS_PORT=6379 REDIS_PASSWORD= REDIS_QUEUE_DB=0 REDIS_QUEUE_PREFIX=queue: # 队列配置 QUEUE_CONNECTION=redis QUEUE_DATA_PATH=/path/to/data QUEUE_LOG_PATH=/path/to/logs # 工作进程配置 QUEUE_WORKER_SLEEP=3 QUEUE_WORKER_TIMEOUT=60 QUEUE_WORKER_MEMORY=128 QUEUE_WORKER_MAX_JOBS=1000
自定义配置文件
// config/queue.php return [ 'default' => 'redis', 'connections' => [ 'redis' => [ 'driver' => 'redis', 'host' => '127.0.0.1', 'port' => 6379, 'password' => null, 'database' => 0, 'prefix' => 'queue:', ], ], 'worker' => [ 'sleep' => 3, 'timeout' => 60, 'memory' => 128, 'max_jobs' => 1000, ], 'logging' => [ 'enabled' => true, 'level' => 'info', 'path' => '/path/to/logs/queue.log', 'max_files' => 30, // 最大保留日志文件数 'max_size' => '10M', // 单个日志文件最大大小 ], ]; // 使用自定义配置初始化 QueuePackage::init([], '/path/to/config/queue.php');
高级用法
任务重试
use Yng\PhpQueue\Jobs\Job; class RetryableJob extends Job { public $tries = 3; // 最大重试次数 public $timeout = 120; // 超时时间(秒) public function handle() { // 可能失败的操作 if (rand(1, 10) > 7) { throw new \Exception('Random failure'); } $this->log('info', 'Job completed successfully'); } public function retryUntil() { return now()->addMinutes(10); // 10分钟内可以重试 } }
批量任务
// 批量推送任务 $jobs = [ new SendEmailJob('user1@example.com', 'Hello', 'Message 1'), new SendEmailJob('user2@example.com', 'Hello', 'Message 2'), new SendEmailJob('user3@example.com', 'Hello', 'Message 3'), ]; foreach ($jobs as $job) { $queue->push($job, 'emails'); }
任务优先级
// 高优先级任务 $urgentJob = new SendEmailJob('admin@example.com', 'Urgent', 'Important message'); $queue->push($urgentJob, 'high-priority'); // 工作进程按优先级处理 $worker->work(['high-priority', 'default', 'low-priority']);
监控和调试
查看失败的任务
// 失败的任务会记录在日志文件中 // 默认位置:{data_path}/log/queue_failed.log // 可以通过命令行查看 php vendor/bin/phpqueue status --failed // 或者 php bin/phpqueue status --failed
自定义日志
use Yng\PhpQueue\Jobs\Job; class LoggingJob extends Job { public function handle() { $this->log('info', 'Job started'); // 执行任务逻辑 $this->log('info', 'Job completed'); } }
日志管理
日志轮转
PhpQueue 支持自动日志轮转功能,防止日志文件过大:
// config/queue.php 'logging' => [ 'enabled' => true, 'level' => 'info', 'path' => '/path/to/logs/queue.log', 'max_files' => 30, // 最大保留日志文件数(默认30个) 'max_size' => '10M', // 单个日志文件最大大小(默认10MB) ],
日志文件说明
queue_worker.log
- 工作进程运行日志queue_failed.log
- 失败任务日志queue_error.log
- 系统错误日志
日志轮转规则
当日志文件达到 max_size
设定的大小时,系统会自动进行轮转:
- 当前日志文件重命名为
.1.log
- 历史日志文件依次重命名(
.1.log
→.2.log
,.2.log
→.3.log
,以此类推) - 超过
max_files
数量的最旧日志文件会被删除 - 创建新的日志文件继续记录
支持的文件大小格式
K
或k
- KB(千字节)M
或m
- MB(兆字节)G
或g
- GB(吉字节)- 纯数字 - 字节数
示例:
1024
- 1024字节500K
- 500KB10M
- 10MB1G
- 1GB
手动清理日志
如果需要手动清理日志文件:
# 清理所有日志文件 rm -f /path/to/logs/*.log* # 只清理轮转的历史日志文件 rm -f /path/to/logs/*.*.log
系统要求
- PHP >= 7.4
- Redis 扩展
- JSON 扩展
- Redis 服务器 >= 3.0
许可证
MIT License
贡献
欢迎提交 Issue 和 Pull Request!
- GitHub 仓库:https://github.com/yng666/phpqueue
- 提交 Issue:https://github.com/yng666/phpqueue/issues
- 提交 Pull Request:https://github.com/yng666/phpqueue/pulls
更新日志
v1.1.1
- 新增日志轮转功能,防止日志文件过大
- 支持配置最大日志文件数量和单文件大小限制
- 自动清理超过限制的历史日志文件
- 支持多种文件大小格式(K/M/G)
- 优化日志记录性能
v1.1.0
- 统一命名空间为 Yng\PhpQueue
- 更新包名为 yng/phpqueue
- 优化代码结构和文档
- 完善 GitHub 链接和贡献指南
- 性能优化和 bug 修复
v1.0.0
- 初始版本
- 基本队列功能
- Redis 驱动支持
- 命令行工具
- 多语言支持(中文/英文)