yng/phpqueue

A simple and powerful PHP queue package with Redis support

dev-master 2025-09-01 08:08 UTC

This package is auto-updated.

Last update: 2025-09-01 08:08:25 UTC


README

GitHub License PHP Version

一个轻量级、高性能的PHP队列处理包,支持Redis驱动,提供完整的任务队列解决方案。

特性

  • 🚀 高性能Redis队列驱动
  • 📦 开箱即用,零配置启动
  • 🔧 灵活的配置系统
  • ✨ 灵活的配置系统
  • 📊 完整的任务监控和状态管理
  • 🔄 任务重试和失败处理
  • 📝 详细的日志记录
  • 🎯 支持多队列和优先级
  • 🛡️ 内存和时间限制保护
  • 🔌 易于集成到现有项目

安装

使用 Composer 安装

composer require yng/phpqueue

手动安装

  1. 克隆仓库到你的项目目录:
git clone https://github.com/yng666/phpqueue.git
  1. 在你的项目中引入自动加载文件:
require_once 'vendor/autoload.php';
// 或者如果没有使用composer
require_once 'path/to/phpqueue/src/autoload.php';

快速开始

1. 基本初始化

use Yng\PhpQueue\QueuePackage;
use Yng\PhpQueue\Jobs\Job;

// 使用默认配置初始化
QueuePackage::initWithDefaults();

// 或者指定数据目录
QueuePackage::initWithDefaults('/path/to/your/data');

// 或者从环境变量初始化
QueuePackage::initFromEnv();

2. 创建任务类

use Yng\PhpQueue\Jobs\Job;

class SendEmailJob extends Job
{
    protected $email;
    protected $subject;
    protected $content;
    
    public function __construct($email, $subject, $content)
    {
        $this->email = $email;
        $this->subject = $subject;
        $this->content = $content;
    }
    
    public function handle()
    {
        // 发送邮件的逻辑
        mail($this->email, $this->subject, $this->content);
        
        $this->log('info', "Email sent to {$this->email}");
    }
    
    public function failed(\Exception $exception)
    {
        $this->log('error', "Failed to send email to {$this->email}: " . $exception->getMessage());
    }
}

3. 推送任务到队列

// 获取队列管理器
$queueManager = QueuePackage::getQueueManager();
$queue = $queueManager->connection();

// 创建并推送任务
$job = new SendEmailJob('user@example.com', 'Hello', 'This is a test email');
$queue->push($job);

// 推送到指定队列
$queue->push($job, 'emails');

// 延迟执行(60秒后)
$queue->later(60, $job);

4. 启动工作进程

// 获取工作进程
$worker = QueuePackage::getWorker();

// 处理默认队列的任务
$worker->work('default');

// 处理指定队列的任务
$worker->work('emails');

// 只处理一个任务后退出
$worker->work('default', ['once' => true]);

命令行工具

包提供了便捷的命令行工具:

启动工作进程

# 处理默认队列
php vendor/bin/phpqueue work
# 或者
php bin/phpqueue work

# 处理指定队列
php vendor/bin/phpqueue work --queue=emails

# 只处理一个任务
php vendor/bin/phpqueue work --once

# 设置内存限制
php vendor/bin/phpqueue work --memory=256

# 设置超时时间
php vendor/bin/phpqueue work --timeout=120

查看队列状态

php vendor/bin/phpqueue status
# 或者
php bin/phpqueue status

重启工作进程

php vendor/bin/phpqueue restart
# 或者
php bin/phpqueue restart

配置

环境变量配置

创建 .env 文件或设置环境变量:

# Redis配置
REDIS_HOST=127.0.0.1
REDIS_PORT=6379
REDIS_PASSWORD=
REDIS_QUEUE_DB=0
REDIS_QUEUE_PREFIX=queue:

# 队列配置
QUEUE_CONNECTION=redis
QUEUE_DATA_PATH=/path/to/data
QUEUE_LOG_PATH=/path/to/logs

# 工作进程配置
QUEUE_WORKER_SLEEP=3
QUEUE_WORKER_TIMEOUT=60
QUEUE_WORKER_MEMORY=128
QUEUE_WORKER_MAX_JOBS=1000

自定义配置文件

// config/queue.php
return [
    'default' => 'redis',
    
    'connections' => [
        'redis' => [
            'driver' => 'redis',
            'host' => '127.0.0.1',
            'port' => 6379,
            'password' => null,
            'database' => 0,
            'prefix' => 'queue:',
        ],
    ],
    
    'worker' => [
        'sleep' => 3,
        'timeout' => 60,
        'memory' => 128,
        'max_jobs' => 1000,
    ],
    
    'logging' => [
        'enabled' => true,
        'level' => 'info',
        'path' => '/path/to/logs/queue.log',
        'max_files' => 30,        // 最大保留日志文件数
        'max_size' => '10M',      // 单个日志文件最大大小
    ],
];

// 使用自定义配置初始化
QueuePackage::init([], '/path/to/config/queue.php');

高级用法

任务重试

use Yng\PhpQueue\Jobs\Job;

class RetryableJob extends Job
{
    public $tries = 3; // 最大重试次数
    public $timeout = 120; // 超时时间(秒)
    
    public function handle()
    {
        // 可能失败的操作
        if (rand(1, 10) > 7) {
            throw new \Exception('Random failure');
        }
        
        $this->log('info', 'Job completed successfully');
    }
    
    public function retryUntil()
    {
        return now()->addMinutes(10); // 10分钟内可以重试
    }
}

批量任务

// 批量推送任务
$jobs = [
    new SendEmailJob('user1@example.com', 'Hello', 'Message 1'),
    new SendEmailJob('user2@example.com', 'Hello', 'Message 2'),
    new SendEmailJob('user3@example.com', 'Hello', 'Message 3'),
];

foreach ($jobs as $job) {
    $queue->push($job, 'emails');
}

任务优先级

// 高优先级任务
$urgentJob = new SendEmailJob('admin@example.com', 'Urgent', 'Important message');
$queue->push($urgentJob, 'high-priority');

// 工作进程按优先级处理
$worker->work(['high-priority', 'default', 'low-priority']);

监控和调试

查看失败的任务

// 失败的任务会记录在日志文件中
// 默认位置:{data_path}/log/queue_failed.log

// 可以通过命令行查看
php vendor/bin/phpqueue status --failed
// 或者
php bin/phpqueue status --failed

自定义日志

use Yng\PhpQueue\Jobs\Job;

class LoggingJob extends Job
{
    public function handle()
    {
        $this->log('info', 'Job started');
        
        // 执行任务逻辑
        
        $this->log('info', 'Job completed');
    }
}

日志管理

日志轮转

PhpQueue 支持自动日志轮转功能,防止日志文件过大:

// config/queue.php
'logging' => [
    'enabled' => true,
    'level' => 'info',
    'path' => '/path/to/logs/queue.log',
    'max_files' => 30,        // 最大保留日志文件数(默认30个)
    'max_size' => '10M',      // 单个日志文件最大大小(默认10MB)
],

日志文件说明

  • queue_worker.log - 工作进程运行日志
  • queue_failed.log - 失败任务日志
  • queue_error.log - 系统错误日志

日志轮转规则

当日志文件达到 max_size 设定的大小时,系统会自动进行轮转:

  1. 当前日志文件重命名为 .1.log
  2. 历史日志文件依次重命名(.1.log.2.log.2.log.3.log,以此类推)
  3. 超过 max_files 数量的最旧日志文件会被删除
  4. 创建新的日志文件继续记录

支持的文件大小格式

  • Kk - KB(千字节)
  • Mm - MB(兆字节)
  • Gg - GB(吉字节)
  • 纯数字 - 字节数

示例:

  • 1024 - 1024字节
  • 500K - 500KB
  • 10M - 10MB
  • 1G - 1GB

手动清理日志

如果需要手动清理日志文件:

# 清理所有日志文件
rm -f /path/to/logs/*.log*

# 只清理轮转的历史日志文件
rm -f /path/to/logs/*.*.log

系统要求

  • PHP >= 7.4
  • Redis 扩展
  • JSON 扩展
  • Redis 服务器 >= 3.0

许可证

MIT License

贡献

欢迎提交 Issue 和 Pull Request!

更新日志

v1.1.1

  • 新增日志轮转功能,防止日志文件过大
  • 支持配置最大日志文件数量和单文件大小限制
  • 自动清理超过限制的历史日志文件
  • 支持多种文件大小格式(K/M/G)
  • 优化日志记录性能

v1.1.0

  • 统一命名空间为 Yng\PhpQueue
  • 更新包名为 yng/phpqueue
  • 优化代码结构和文档
  • 完善 GitHub 链接和贡献指南
  • 性能优化和 bug 修复

v1.0.0

  • 初始版本
  • 基本队列功能
  • Redis 驱动支持
  • 命令行工具
  • 多语言支持(中文/英文)