small / swoole-rx-events
This project provides implementations of reactive events for Swoole.
0.2.0
2025-09-30 14:28 UTC
Requires
- php: >=8.4
- reactivex/rxphp: 2.*
- small/collection: 3.*
- small/swoole-db: 1.3.*
- symfony/uid: *
Requires (Dev)
README
>

Reactive event bus for PHP powered by RxPHP and Swoole.
It lets you publish/subscribe domain and infrastructure events, compose pipelines with Rx operators, and run time-based operators on Swoole’s event loop.
- EventBus — simple Rx‐backed bus with
on(),onMany(),payloads(),once(),request() - SwooleScheduler —
AsyncSchedulerInterfaceusingSwoole\Timer(works with RxPHP time operators) - Event model —
BasicEvent(name, payload, meta, rid) andEventInterface(correlation id)
Requirements
- PHP 8.3+
ext-swoole4.8+ / 5.xreactivex/rxphp(2.x)
Installation
composer require small/swoole-rx-events
Quick start
use Small\SwooleRxEvents\EventBus;
use Small\SwooleRxEvents\SwooleScheduler;
use Small\SwooleRxEvents\Event\BasicEvent;
// Use the Swoole async scheduler
$bus = new EventBus(new SwooleScheduler());
// Subscribe by name
$bus->on('order.created')->subscribe(function ($e) {
echo "order rid={$e->getRid()} payload=", json_encode($e->getPayload()), PHP_EOL;
});
// Emit an event
$bus->emitName('order.created', ['id' => 123]);
// If you’re in a plain CLI script, keep the loop alive briefly:
\Swoole\Timer::after(20, fn () => \Swoole\Event::exit());
\Swoole\Event::wait();
Concepts
Event
All event must implement EventInterface
namespace Small\SwooleRxEvents\Contract;
interface EventInterface
{
public function getName(): string;
public function getRid(): string;
public function setRid(string $rid): self;
}
BasicEvent carries:
name(string)payload(array)meta(array, e.g. tracing, user)rid(string, auto‐generated correlation id)
Bus
stream()— all eventson($name)/onMany([...])— filtered streamspayloads($name)— payload‐only streamonce($name, ?map, ?timeoutMs)— resolve first matching event (optionally mapped)request($requestName, $responseName, $payload = [], $meta = [], ?$timeoutMs)
Emits a request with a newrid, waits for the first response with the samerid.
Timeouts require an async scheduler. This library provides
SwooleSchedulerwhich implementsAsyncSchedulerInterface.
API Examples
1) Listen & emit
$bus->on('user.created')->subscribe(fn($e) => audit($e->getMeta(), $e->getPayload()));
$bus->emitName('user.created', ['id' => 42], ['by' => 'admin']);
2) Request/Response with correlation id
// Responder: copies rid from incoming 'REQ' and emits 'RESP'
$bus->on('REQ')->subscribe(function ($e) use ($bus) {
$bus->emit(
(new BasicEvent('RESP', ['ok' => true], $e->getMeta()))
->setRid($e->getRid()) // correlate
);
});
// Caller: request() subscribes FIRST, then emits; no race conditions
$bus->request('REQ', 'RESP', ['foo' => 'bar'], ['trace' => 'abc'], 100)
->subscribe(
fn($resp) => var_dump($resp->getPayload()), // ['ok' => true]
fn($err) => error_log($err->getMessage())
);
3) once() with mapping & timeout
$bus->once('health.ok', fn($e) => $e->getMeta()['node'] ?? 'unknown', 50)
->subscribe(
fn($node) => echo "node=$node\n",
fn($err) => echo "timeout\n"
);
$bus->emitName('health.ok', [], ['node' => 'api-1']);
4) Backpressure / batching (Rx composition)
$bus->on('order.created')
->bufferWithTimeOrCount(500, 100, $bus->scheduler()) // every 0.5s or 100 items
->filter(fn($batch) => !empty($batch))
->subscribe(fn(array $batch) => persist_batch($batch));
Swoole integration tips
- HTTP server: in
on('request'), emit an event with meta containing arespondcallable or theResponseobject. Downstream subscribers can produce aResponseEvent. - Coroutines per subscriber: use Swoole coroutines in your subscribers if you do IO; Rx operators will orchestrate sequencing.
- Event loop in CLI: outside a Swoole
Server, start/stop the reactor withSwoole\Event::wait()/Event::exit()for timers to fire.
License
GPL-3.0-only — see LICENSE.