kode/limiting

PHP 限流器,支持令牌桶、滑动窗口算法,可用于本地和分布式限流

Maintainers

Package info

github.com/kodephp/limiting

pkg:composer/kode/limiting

Statistics

Installs: 3

Dependents: 1

Suggesters: 0

Stars: 0

Open Issues: 0

1.7.0 2026-04-02 03:51 UTC

This package is auto-updated.

Last update: 2026-04-02 04:00:25 UTC


README

高性能 PHP 限流器,支持令牌桶、滑动窗口算法,可用于本地和分布式限流。

功能特性

  • 令牌桶算法:支持突发流量,按固定速率补充令牌
  • 滑动窗口算法:基于时间窗口的精确限流
  • 本地限流:使用内存存储,适用于单进程
  • 分布式限流:使用 Redis,支持单机/Sentinel/Cluster 模式
  • 并发控制:支持任务、进程、Fiber 协程的本地和分布式限流
  • 原子操作:Lua 脚本保证分布式环境下的原子性
  • 高可用:支持 Redis Sentinel 和 Cluster 自动故障转移
  • PHP 8.2+:使用 readonly、enum 等新特性优化性能

系统要求

  • PHP >= 8.2
  • 可选:Redis 扩展(用于分布式部署)

安装

composer require kode/limiting

统一入口(推荐)

使用 Limiter 类可以更简洁地创建各种限流器:

use Kode\Limiting\Limiter;

// 令牌桶限流器(默认)
$limiter = Limiter::tokenBucket(100, 10.0);
$limiter->allow('api:user:123');

// 滑动窗口限流器
$limiter = Limiter::slidingWindow(100, 60.0);

// 漏桶限流器
$limiter = Limiter::leakyBucket(100, 1.0);

// 计数器限流器
$limiter = Limiter::counter(1000, 60);

// Redis 分布式限流器
$limiter = Limiter::redis(
    \Kode\Limiting\Enum\LimiterType::TOKEN_BUCKET,
    100, 10.0,
    '127.0.0.1', 6379
);

// Memcached 分布式限流器
$limiter = Limiter::memcached(
    \Kode\Limiting\Enum\LimiterType::TOKEN_BUCKET,
    100, 10.0,
    '127.0.0.1', 11211
);

// PDO 分布式限流器
$limiter = Limiter::pdo(
    \Kode\Limiting\Enum\LimiterType::TOKEN_BUCKET,
    100, 10.0,
    'mysql:host=127.0.0.1;dbname=limiting', 'root', 'password'
);

// 并发控制器
$taskLimiter = Limiter::task(10);      // 任务并发控制
$processLimiter = Limiter::process(10); // 进程并发控制
$fiberLimiter = Limiter::fiber(10);     // Fiber 并发控制

// 中间件
$middleware = Limiter::middleware(\Kode\Limiting\Enum\LimiterType::TOKEN_BUCKET, 100, 10.0);

快速开始

本地限流(令牌桶)

use Kode\Limiting\Algorithm\TokenBucket;
use Kode\Limiting\Store\MemoryStore;

$store = new MemoryStore();
$bucket = new TokenBucket($store, 100, 10.0);

if ($bucket->allow('user:123', 1)) {
    echo "请求通过";
} else {
    echo "被限流";
}

echo "剩余令牌: " . $bucket->getRemaining('user:123');
echo "等待时间: " . $bucket->getWaitTime('user:123') . "";

滑动窗口限流

use Kode\Limiting\Algorithm\SlidingWindow;
use Kode\Limiting\Store\MemoryStore;

$window = new SlidingWindow($store, 100, 1.0);

if ($window->allow('api:v1', 1)) {
    echo "请求通过";
}

分布式限流(跨机器)

use Kode\Limiting\Distributed\DistributedLimiter;

// 单机模式
$limiter = DistributedLimiter::create(
    '127.0.0.1', 6379,
    1000,    // 容量
    100.0    // 每秒补充100个令牌
);

// Sentinel 高可用模式
$limiter = DistributedLimiter::createSentinel(
    ['192.168.1.1:26379', '192.168.1.2:26379'],
    'mymaster',
    1000,
    100.0
);

// Cluster 分片模式
$limiter = DistributedLimiter::createCluster(
    ['192.168.1.1:6379', '192.168.1.2:6379', '192.168.1.3:6379'],
    1000,
    100.0
);

if ($limiter->allow('global:api', 1)) {
    echo "请求通过";
}

并发任务控制

use Kode\Limiting\Concurrency\TaskLimiter;

$limiter = TaskLimiter::create(10, 10, 1.0);

// 手动获取和释放
if ($limiter->tryAcquire('task:1')) {
    try {
        do_something();
    } finally {
        $limiter->release('task:1');
    }
}

// 自动释放
$result = $limiter->run('task:2', fn() => do_something());

// 阻塞等待
$limiter->acquire('task:3', 30.0);

架构设计

src/
├── Limiter.php                    # 统一入口类
├── DTO/                          # 数据传输对象
│   ├── LimiterConfig.php        # 限流器配置(readonly)
│   └── LimiterResult.php        # 限流结果(readonly)
├── Enum/                         # 枚举类型
│   ├── LimiterType.php          # 限流器类型
│   ├── StoreType.php            # 存储类型
│   └── RedisMode.php           # Redis 模式
├── Store/                        # 存储层
│   ├── StoreInterface.php       # 存储接口
│   ├── MemoryStore.php          # 内存存储
│   ├── RedisStore.php           # Redis 存储(支持 Sentinel/Cluster)
│   ├── MemcachedStore.php       # Memcached 存储
│   └── PdoStore.php            # PDO 存储(MySQL/SQLite/PostgreSQL)
├── Algorithm/                    # 限流算法
│   ├── RateLimiterInterface.php # 限流器接口
│   ├── TokenBucket.php          # 令牌桶
│   ├── SlidingWindow.php        # 滑动窗口
│   ├── LeakyBucket.php          # 漏桶
│   └── Counter.php              # 计数器
├── Distributed/                   # 分布式限流
│   ├── DistributedLimiter.php           # 分布式令牌桶
│   ├── DistributedTaskLimiter.php       # 分布式任务限流
│   ├── DistributedProcessLimiter.php    # 分布式进程限流
│   └── DistributedFiberLimiter.php      # 分布式 Fiber 限流
├── Concurrency/                   # 本地并发控制
│   ├── TaskLimiter.php           # 任务限流
│   ├── ProcessLimiter.php        # 进程限流
│   └── FiberLimiter.php         # Fiber 限流
└── Middleware/                    # 中间件
    ├── LimiterMiddleware.php          # 限流中间件
    └── LimiterMiddlewareInterface.php # 中间件接口

API 文档

StoreInterface

存储接口,所有存储实现必须实现此接口。

public function get(string $key): ?string;              // 获取值
public function set(string $key, string $value, int $ttl = 0): void;  // 设置值
public function delete(string $key): void;             // 删除键
public function incr(string $key, int $step = 1): int; // 原子递增
public function ttl(string $key): int;                 // 获取 TTL

RateLimiterInterface

限流器接口。

public function allow(string $key, int $tokens = 1): bool;  // 检查是否允许
public function getRemaining(string $key): float;            // 获取剩余数量
public function getWaitTime(string $key): float;             // 获取等待时间
public function reset(string $key): void;                    // 重置

Limiter(统一入口)

统一入口类,提供简洁的 API 创建限流器。

// 工厂方法
Limiter::tokenBucket(int $capacity, float $refillRate, $store);      // 令牌桶
Limiter::slidingWindow(int $capacity, float $windowSize, $store);    // 滑动窗口
Limiter::leakyBucket(int $capacity, float $leakRate, $store);       // 漏桶
Limiter::counter(int $maxRequests, int $windowSeconds, $store);      // 计数器
Limiter::redis(LimiterType, int $capacity, float $refillRate, ...); // Redis 限流
Limiter::memcached(LimiterType, int $capacity, float $refillRate, ...); // Memcached 限流
Limiter::pdo(LimiterType, int $capacity, float $refillRate, ...);    // PDO 限流
Limiter::task(int $maxConcurrency);    // 任务并发控制器
Limiter::process(int $maxConcurrency); // 进程并发控制器
Limiter::fiber(int $maxConcurrency);  // Fiber 并发控制器
Limiter::middleware(LimiterType, int $capacity, float $refillRate);  // 中间件

// 实例方法
$limiter->allow(string $key, int $tokens);   // 检查是否允许
$limiter->getRemaining(string $key);         // 获取剩余数量
$limiter->getWaitTime(string $key);          // 获取等待时间
$limiter->reset(string $key);                // 重置
$limiter->build();                           // 获取底层限流器
$limiter->getStore();                        // 获取存储
$limiter->getConfig();                       // 获取配置

TokenBucket

令牌桶限流算法。

$bucket = new TokenBucket($store, $capacity, $refillRate, $ttl = 3600, $prefix = 'bucket:');

$bucket->allow('key', 1);           // 检查是否允许
$bucket->getRemaining('key');        // 获取剩余令牌数
$bucket->getWaitTime('key');         // 获取等待时间
$bucket->reset('key');               // 重置
$bucket->getCapacity();              // 获取容量
$bucket->getRefillRate();           // 获取补充速率
$bucket->getTtl();                  // 获取 TTL
$bucket->check('key', 1);           // 检查并返回 LimiterResult

SlidingWindow

滑动窗口限流算法。

$window = new SlidingWindow($store, $capacity, $windowSize = 1.0, $ttl = 3600, $prefix = 'sw:');

$window->allow('key', 1);           // 检查是否允许
$window->getRemaining('key');        // 获取剩余请求数
$window->getWaitTime('key');         // 获取等待时间
$window->reset('key');               // 重置
$window->getCapacity();               // 获取容量
$window->getWindowSize();            // 获取窗口大小
$window->check('key', 1);           // 检查并返回 LimiterResult

DistributedLimiter

分布式限流器(基于 Redis)。

// 单机模式
$limiter = DistributedLimiter::create($host, $port, $capacity, $refillRate, $password, $database);

// Sentinel 模式
$limiter = DistributedLimiter::createSentinel($sentinels, $masterName, $capacity, $refillRate);

// Cluster 模式
$limiter = DistributedLimiter::createCluster($nodes, $capacity, $refillRate);

// 方法
$limiter->allow('key', 1);                    // 检查是否允许(原子操作)
$limiter->tryAcquire('key', 1);               // 尝试获取(别名)
$limiter->getRemaining('key');                 // 获取剩余令牌数
$limiter->getWaitTime('key');                  // 获取等待时间
$limiter->reset('key');                        // 重置
$limiter->allowBatch(['key1', 'key2'], 1);     // 批量检查
$limiter->check('key', 1);                     // 检查并返回 LimiterResult

分布式并发控制器

use Kode\Limiting\Distributed\DistributedTaskLimiter;
use Kode\Limiting\Distributed\DistributedProcessLimiter;
use Kode\Limiting\Distributed\DistributedFiberLimiter;

$taskLimiter = DistributedTaskLimiter::create('127.0.0.1', 6379, 10);
$processLimiter = DistributedProcessLimiter::create('127.0.0.1', 6379, 10);
$fiberLimiter = DistributedFiberLimiter::create('127.0.0.1', 6379, 100);

$taskLimiter->tryAcquire('task:1');           // 非阻塞获取
$taskLimiter->acquire('task:1', 30.0);        // 阻塞等待
$taskLimiter->run('task:1', fn() => null);   // 执行任务
$taskLimiter->release('task:1');               // 释放
$taskLimiter->getActiveCount();                // 当前活跃数

本地并发控制器

use Kode\Limiting\Concurrency\TaskLimiter;
use Kode\Limiting\Concurrency\ProcessLimiter;
use Kode\Limiting\Concurrency\FiberLimiter;

$taskLimiter = TaskLimiter::create($maxConcurrency, $capacity, $refillRate);
$processLimiter = ProcessLimiter::getInstance($maxProcesses, $capacity, $refillRate);
$fiberLimiter = new FiberLimiter($maxFibers, $capacity, $refillRate, $store);

$taskLimiter->tryAcquire('task:1');
$taskLimiter->acquire('task:1', 30.0);
$taskLimiter->run('task:1', fn() => null);
$taskLimiter->release('task:1');
$taskLimiter->getMaxConcurrency();

DTO 对象

LimiterConfig

限流器配置(不可变对象)。

$config = new LimiterConfig($capacity, $refillRate, $ttl = 3600, $prefix = 'limiter:');

$config->capacity;      // 容量
$config->refillRate;    // 补充速率
$config->ttl;           // TTL
$config->prefix;        // 前缀

// 链式调用创建新实例
$newConfig = $config->withCapacity(200)->withRefillRate(20.0);

LimiterResult

限流结果(不可变对象)。

$result = $bucket->check('key', 1);

$result->allowed;       // 是否允许
$result->remaining;    // 剩余数量
$result->waitTime;     // 等待时间
$result->timestamp;    // 时间戳

$result->isAllowed();   // 是否允许(布尔值)
$result->toArray();     // 转换为数组

枚举类型

use Kode\Limiting\Enum\LimiterType;
use Kode\Limiting\Enum\StoreType;
use Kode\Limiting\Enum\RedisMode;

LimiterType::TOKEN_BUCKET->label();      // 令牌桶
LimiterType::SLIDING_WINDOW->label();    // 滑动窗口

StoreType::MEMORY->label();              // 内存存储
StoreType::REDIS->label();               // Redis 存储

RedisMode::STANDALONE->label();          // 单机模式
RedisMode::SENTINEL->label();            // Sentinel 高可用
RedisMode::CLUSTER->label();             // Cluster 分片

单元测试

测试覆盖所有核心功能,共计 35 个测试用例。

测试用例列表

Token Bucket (令牌桶)
 ✔ 允许首次请求
 ✔ 允许多次请求
 ✔ 超出容量拒绝
 ✔ 获取剩余令牌数
 ✔ 新键返回完整容量
 ✔ 获取等待时间
 ✔ 重置限流器
 ✔ 时间推移后补充令牌

Sliding Window (滑动窗口)
 ✔ 容量内允许通过
 ✔ 超出容量拒绝
 ✔ 获取剩余请求数
 ✔ 请求后剩余数正确
 ✔ 重置限流器
 ✔ 可用时等待时间为0
 ✔ 耗尽时等待时间大于0

Memory Store (内存存储)
 ✔ 设置和获取
 ✔ 获取不存在的键
 ✔ 删除键
 ✔ 递增操作
 ✔ 新键递增
 ✔ TTL 计算
 ✔ 清空存储

Task Limiter (任务限流)
 ✔ 尝试获取许可
 ✔ 多次获取
 ✔ 释放许可
 ✔ 执行任务
 ✔ 获取最大并发数

Process Limiter (进程限流)
 ✔ 尝试获取许可
 ✔ 释放许可
 ✔ 获取最大进程数
 ✔ 单例模式

Fiber Limiter (Fiber 限流)
 ✔ 支持检查
 ✔ 尝试获取许可
 ✔ 释放许可
 ✔ 获取最大 Fiber 数

运行测试

# 运行所有测试
./vendor/bin/phpunit

# 显示详细输出
./vendor/bin/phpunit --testdox

# 运行指定测试类
./vendor/bin/phpunit tests/TokenBucketTest.php

# 生成覆盖率报告
./vendor/bin/phpunit --coverage-html coverage

自定义存储

实现 StoreInterface 接口即可:

use Kode\Limiting\Store\StoreInterface;

class MyStore implements StoreInterface
{
    public function get(string $key): ?string { /* ... */ }
    public function set(string $key, string $value, int $ttl = 0): void { /* ... */ }
    public function delete(string $key): void { /* ... */ }
    public function incr(string $key, int $step = 1): int { /* ... */ }
    public function ttl(string $key): int { /* ... */ }
}

Redis 配置

单机模式

$limiter = DistributedLimiter::create(
    '127.0.0.1',    // 主机
    6379,           // 端口
    1000,           // 容量
    100.0,          // 补充速率
    'password',     // 密码(可选)
    0               // 数据库编号
);

Sentinel 模式

$limiter = DistributedLimiter::createSentinel(
    ['192.168.1.1:26379', '192.168.1.2:26379', '192.168.1.3:26379'],
    'mymaster',     // 主节点名称
    1000,
    100.0,
    'sentinel_pass'
);

Cluster 模式

$limiter = DistributedLimiter::createCluster(
    ['192.168.1.1:6379', '192.168.1.2:6379', '192.168.1.3:6379'],
    1000,
    100.0,
    'cluster_pass'
);

使用场景

API 限流

$limiter = DistributedLimiter::create('127.0.0.1', 6379, 100, 10.0);

$response = $limiter->allow('api:user:' . $userId, 1)
    ? processRequest()
    : new Response('Rate limited', 429);

秒杀活动

$limiter = DistributedLimiter::create('127.0.0.1', 6379, 1000, 0);

if (!$limiter->allow('seckill:' . $productId, 1)) {
    throw new Exception('活动已结束');
}

并发控制

$limiter = DistributedTaskLimiter::create('127.0.0.1', 6379, 10);

foreach ($tasks as $task) {
    $limiter->run('task:' . $task['id'], function () use ($task) {
        processTask($task);
    });
}

开发

# 安装依赖
composer install

# 运行测试
./vendor/bin/phpunit

# 语法检查
find src -name "*.php" -exec php -l {} \;

目录结构

kode/limiting/
├── src/
│   ├── DTO/                       # 数据传输对象
│   ├── Enum/                      # 枚举类型
│   ├── Store/                     # 存储层
│   ├── Algorithm/                 # 限流算法
│   ├── Distributed/              # 分布式限流
│   └── Concurrency/               # 并发控制
├── tests/                         # 单元测试
├── composer.json
├── phpunit.xml
├── LICENSE                        # Apache 2.0
└── README.md

许可证

Apache License 2.0 - 参见 LICENSE 文件