easyswoole / nsq
An efficient swoole framework
1.0.1
2020-06-25 01:33 UTC
Requires
- php: >=5.3.0
- ext-json: *
- easyswoole/easyswoole: 3.x
- easyswoole/http-client: ^1.2.5
- easyswoole/pool: ^1.0
- easyswoole/spl: ^1.1
- monolog/monolog: ~1.0
- react/react: >=0.2.1
Requires (Dev)
- easyswoole/phpunit: ^1.0
- easyswoole/swoole-ide-helper: ^1.1
This package is auto-updated.
Last update: 2024-10-25 10:47:20 UTC
README
NSQ客户端
安装
composer required easyswoole/nsq
注册Nsq服务
namespace EasySwoole\EasySwoole; use App\Producer\Process as ProducerProcess; use App\Consumer\Process as ConsumerProcess; use EasySwoole\EasySwoole\Swoole\EventRegister; use EasySwoole\EasySwoole\AbstractInterface\Event; use EasySwoole\Http\Request; use EasySwoole\Http\Response; class EasySwooleEvent implements Event { public static function initialize() { // TODO: Implement initialize() method. date_default_timezone_set('Asia/Shanghai'); } public static function mainServerCreate(EventRegister $register) { // TODO: Implement mainServerCreate() method. // 生产者 ServerManager::getInstance()->getSwooleServer()->addProcess((new ProducerProcess())->getProcess()); // 消费者 ServerManager::getInstance()->getSwooleServer()->addProcess((new ConsumerProcess())->getProcess()); } ...... }
生产者
namespace App\Producer; use EasySwoole\Component\Process\AbstractProcess; class Process extends AbstractProcess { protected function run($arg) { go(function () { $config = new \EasySwoole\Nsq\Config(); $topic = "topic.test"; $hosts = $config->getNsqdUrl(); foreach ($hosts as $host) { $nsq = new \EasySwoole\Nsq\Nsq(); for ($i = 0; $i < 10; $i++) { $msg = new \EasySwoole\Nsq\Message\Message(); $msg->setPayload("test$i"); $nsq->push( new \EasySwoole\Nsq\Connection\Producer($host, $config), $topic, $msg ); } } }); } }
消费者
namespace App\Consumer; use EasySwoole\Component\Process\AbstractProcess; class Process extends AbstractProcess { protected function run($arg) { go(function () { $topic = "topic.test"; $channel = "test.consuming"; $config = new \EasySwoole\Nsq\Config(); $nsqlookup = new \EasySwoole\Nsq\Lookup\Nsqlookupd($config->getNsqlookupUrl()); $hosts = $nsqlookup->lookupHosts($topic); foreach ($hosts as $host) { $nsq = new \EasySwoole\Nsq\Nsq(); $nsq->subscribe( new \EasySwoole\Nsq\Connection\Consumer($host, $config, $topic, $channel), function ($item) { var_dump($item['message']); } ); } }); } }
附赠
- Nsq 集群部署 docker-compose.yml 一份,使用方式如下
- 保证4150,4151,4160,4161,4171端口未被占用(占用后可以修改compose文件中的端口号)
- 根目录下,docker-compose up -d
- 访问localhost:4171,可以查看Web版 nsqadmin 状态。
Any Question
Nsq使用问题及bug,欢迎到提issue或者在官方群反馈