sunmking/think8-queue

thinkphp8 queue lib

1.0 2023-12-22 01:41 UTC

This package is auto-updated.

Last update: 2025-09-15 09:18:09 UTC


README

基于现代设计模式重构的 ThinkPHP 8 队列库

PHP Version ThinkPHP License

✨ 特性

  • 🏗️ Builder模式:流畅的API设计,支持链式调用
  • 🎯 中间件支持:可插拔的中间件系统,灵活扩展
  • 📡 事件系统:完整的事件监听机制,支持任务生命周期
  • ⚙️ 配置管理:灵活的配置系统,支持动态配置
  • 🔧 类型安全:完整的类型声明,减少运行时错误
  • 🚀 高性能:优化的执行流程,支持批量任务
  • 🔄 向后兼容:保持与旧版本的兼容性

📦 安装

composer require sunmking/think8-queue

🚀 快速开始

1. 创建任务类

<?php

namespace App\Jobs;

use Sunmking\Think8Queue\Jobs\AbstractJob;
use Sunmking\Think8Queue\Middleware\LoggingMiddleware;
use think\queue\Job;

class SendEmailJob extends AbstractJob
{
    public function __construct()
    {
        parent::__construct();
        
        // 添加中间件
        $this->middleware(LoggingMiddleware::class);
    }

    protected function execute(Job $job, array $data): void
    {
        $email = $data['email'] ?? '';
        $subject = $data['subject'] ?? '';
        $content = $data['content'] ?? '';
        
        // 发送邮件逻辑
        echo "Sending email to: {$email}\n";
        echo "Subject: {$subject}\n";
        echo "Content: {$content}\n";
    }
}

2. 使用队列

方式一:使用Builder模式(推荐)

use Sunmking\Think8Queue\Facades\Queue;

// 基本使用
Queue::job(SendEmailJob::class)
    ->data([
        'email' => 'user@example.com',
        'subject' => 'Welcome',
        'content' => 'Welcome to our service!'
    ])
    ->dispatch();

// 高级配置
Queue::job(SendEmailJob::class)
    ->data(['email' => 'user@example.com'])
    ->delay(60)        // 60秒后执行
    ->attempts(5)      // 最多重试5次
    ->timeout(120)     // 超时时间120秒
    ->priority(10)     // 优先级
    ->queue('emails')  // 指定队列
    ->dispatch();

方式二:直接使用Manager

use Sunmking\Think8Queue\Manager\QueueManager;

$queue = QueueManager::instance();

// 推送任务
$queue->push(SendEmailJob::class, [
    'email' => 'user@example.com',
    'subject' => 'Test',
    'content' => 'Test content'
]);

// 延迟推送
$queue->later(300, SendEmailJob::class, [
    'email' => 'user@example.com'
]);

⚙️ 配置

use Sunmking\Think8Queue\Facades\Queue;

// 设置配置
Queue::config([
    'default_queue' => 'default',
    'default_attempts' => 3,
    'default_timeout' => 60,
    'default_priority' => 0,
    'prefix' => 'qn_',
    'retry_after' => 90,
]);

🎯 中间件系统

内置中间件

use Sunmking\Think8Queue\Middleware\LoggingMiddleware;
use Sunmking\Think8Queue\Middleware\RetryMiddleware;

class MyJob extends AbstractJob
{
    public function __construct()
    {
        parent::__construct();
        
        // 添加日志中间件
        $this->middleware(LoggingMiddleware::class);
        
        // 添加重试中间件
        $this->middleware(RetryMiddleware::class);
    }
}

自定义中间件

<?php

namespace App\Middleware;

use Sunmking\Think8Queue\Middleware\MiddlewareInterface;
use think\queue\Job;

class CustomMiddleware implements MiddlewareInterface
{
    public function handle(Job $job, array $data, callable $next): mixed
    {
        // 前置处理
        echo "Before job execution\n";
        
        $result = $next($job, $data);
        
        // 后置处理
        echo "After job execution\n";
        
        return $result;
    }
}

📡 事件系统

use Sunmking\Think8Queue\Events\JobEvent;

class SendEmailJob extends AbstractJob
{
    public function __construct()
    {
        parent::__construct();
        
        // 监听任务开始事件
        $this->on(JobEvent\JobProcessing::class, function($job, $data) {
            echo "Job started: " . get_class($job) . "\n";
        });
        
        // 监听任务完成事件
        $this->on(JobEvent\JobProcessed::class, function($job, $data) {
            echo "Job completed: " . get_class($job) . "\n";
        });
        
        // 监听任务失败事件
        $this->on(JobEvent\JobFailed::class, function($job, $data, $exception) {
            echo "Job failed: " . $exception->getMessage() . "\n";
        });
    }
}

📦 批量任务

use Sunmking\Think8Queue\Manager\QueueManager;

$queue = QueueManager::instance();

// 批量推送
$jobs = [
    SendEmailJob::class,
    ProcessOrderJob::class,
    GenerateReportJob::class
];

$queue->bulk($jobs, [
    'batch_id' => 'batch_001'
]);

🔄 迁移指南

从旧版本迁移

1. 任务类迁移

旧版本:

use Sunmking\Think8Queue\basic\BaseJob;

class OldJob extends BaseJob
{
    public function doJob($id): bool
    {
        // 业务逻辑
        return true;
    }
}

新版本:

use Sunmking\Think8Queue\Jobs\AbstractJob;
use think\queue\Job;

class NewJob extends AbstractJob
{
    protected function execute(Job $job, array $data): void
    {
        $id = $data['id'] ?? $data[0] ?? null;
        // 业务逻辑
    }
}

2. 队列调用迁移

旧版本:

use Sunmking\Think8Queue\Queue;

Queue::instance()->job(TestJob::class)
    ->errorCount(3)
    ->delay(10)
    ->secs(10)
    ->data($id)
    ->push();

新版本:

use Sunmking\Think8Queue\Facades\Queue;

Queue::job(TestJob::class)
    ->data(['id' => $id])
    ->attempts(3)
    ->delay(10)
    ->timeout(120)
    ->dispatch();

🏗️ 架构设计

核心组件

  • QueueManager: 队列管理器,负责任务调度
  • JobBuilder: 任务构建器,提供流畅的API
  • AbstractJob: 抽象任务基类
  • MiddlewareManager: 中间件管理器
  • EventDispatcher: 事件分发器
  • QueueConfig: 配置管理器

设计模式

  • Builder模式: 用于构建复杂的任务配置
  • 策略模式: 用于不同的任务处理策略
  • 观察者模式: 用于事件系统
  • 责任链模式: 用于中间件系统

📁 目录结构

src/
├── Contracts/          # 接口定义
│   ├── JobInterface.php
│   ├── QueueInterface.php
│   └── JobBuilderInterface.php
├── Config/            # 配置管理
│   └── QueueConfig.php
├── Events/            # 事件系统
│   ├── JobEvent.php
│   └── EventDispatcher.php
├── Middleware/        # 中间件系统
│   ├── MiddlewareInterface.php
│   ├── MiddlewareManager.php
│   ├── LoggingMiddleware.php
│   └── RetryMiddleware.php
├── Jobs/              # 任务类
│   ├── AbstractJob.php
│   └── ExampleJob.php
├── Builder/           # 构建器
│   └── JobBuilder.php
├── Manager/           # 管理器
│   └── QueueManager.php
└── Facades/           # 门面类
    └── Queue.php

🚀 启动队列

# 启动消费队列(推荐使用 supervisor 守护进程)
php think queue:listen --memory 8 --queue default

# 指定队列
php think queue:listen --queue emails

# 指定进程数
php think queue:listen --queue default --tries 3 --timeout 60

📊 性能优化

  • 使用对象池减少内存分配
  • 中间件按需加载
  • 事件系统异步处理
  • 配置缓存机制
  • 批量任务处理

🧪 测试

# 运行测试
composer test

# 代码覆盖率
composer test-coverage

📝 更新日志

v2.0.0 (重构版)

  • 🎉 完全重构,采用现代设计模式
  • ✨ 新增Builder模式API
  • 🎯 新增中间件系统
  • 📡 新增事件系统
  • ⚙️ 新增配置管理
  • 🔧 增强类型安全
  • 🚀 性能优化

v1.x (旧版本)

  • 基础队列功能
  • 简单的任务处理
  • 基本的错误处理

🤝 贡献

欢迎提交 Issue 和 Pull Request!

📄 许可证

MIT License

📞 联系方式