Reactive event bus for PHP powered by RxPHP and Swoole: Sarcastic Recap
The event bus in this article highlights the power of RxPHP and Swoole in creating a simple and flexible event-driven solution for PHP. The article covers various features and functionalities provided by the event bus, including:- Event Bus: A simple, event-driven bus that supports RxPHP operators, making it easy to publish, subscribe, and process event data. It provides methods like on()
, onMany()
, payloads()
, once()
, and request()
for subscribing to events and handling them.
– Swoole Scheduler: A Async SchedulerInterface using Swoole\Timer (works with RxPHP time operators) that enables the asynchronous execution of events in the
Reactive event bus for PHP powered by RxPHP and Swoole.
It lets you publish/subscribe domain and infrastructure events, compose pipelines with Rx operators, and run time-based operators on Swoole’s event loop.
-
EventBus — simple Rx‐backed bus with
on()
,onMany()
,payloads()
,once()
,request()
-
SwooleScheduler —
AsyncSchedulerInterface
usingSwoole\Timer
(works with RxPHP time operators) -
Event model —
BasicEvent
(name, payload, meta, rid) andEventInterface
(correlation id)
Requirements
- PHP 8.3+
-
ext-swoole
4.8+ / 5.x -
reactivex/rxphp
(2.x)
Installation
composer require small/swoole-rx-events
Quick start
use Small\SwooleRxEvents\EventBus;
use Small\SwooleRxEvents\SwooleScheduler;
use Small\SwooleRxEvents\Event\BasicEvent;
// Use the Swoole async scheduler
$bus = new EventBus(new SwooleScheduler());
// Subscribe by name
$bus->on('order.created')->subscribe(function ($e) {
echo "order rid={$e->getRid()} payload=", json_encode($e->getPayload()), PHP_EOL;
});
// Emit an event
$bus->emitName('order.created', ['id' => 123]);
// If you’re in a plain CLI script, keep the loop alive briefly:
\Swoole\Timer::after(20, fn () => \Swoole\Event::exit());
\Swoole\Event::wait();
Concepts
Event
All event must implement EventInterface
namespace Small\SwooleRxEvents\Contract;
interface EventInterface
{
public function getName(): string;
public function getRid(): string;
public function setRid(string $rid): self;
}
BasicEvent
carries:
-
name
(string) -
payload
(array) -
meta
(array, e.g. tracing, user) -
rid
(string, auto‐generated correlation id)
Bus
-
stream()
— all events -
on($name)
/onMany([...])
— filtered streams -
payloads($name)
— payload‐only stream -
once($name, ?map, ?timeoutMs)
— resolve first matching event (optionally mapped) -
request($requestName, $responseName, $payload = [], $meta = [], ?$timeoutMs)
Emits a request with a newrid
, waits for the first response with the samerid
.
Timeouts require an async scheduler. This library provides
SwooleScheduler
which implementsAsyncSchedulerInterface
.
API Examples
1) Listen & emit
$bus->on('user.created')->subscribe(fn($e) => audit($e->getMeta(), $e->getPayload()));
$bus->emitName('user.created', ['id' => 42], ['by' => 'admin']);
2) Request/Response with correlation id
// Responder: copies rid from incoming 'REQ' and emits 'RESP'
$bus->on('REQ')->subscribe(function ($e) use ($bus) {
$bus->emit(
(new BasicEvent('RESP', ['ok' => true], $e->getMeta()))
->setRid($e->getRid()) // correlate
);
});
// Caller: request() subscribes FIRST, then emits; no race conditions
$bus->request('REQ', 'RESP', ['foo' => 'bar'], ['trace' => 'abc'], 100)
->subscribe(
fn($resp) => var_dump($resp->getPayload()), // ['ok' => true]
fn($err) => error_log($err->getMessage())
);
3) once()
with mapping & timeout
$bus->once('health.ok', fn($e) => $e->getMeta()['node'] ?? 'unknown', 50)
->subscribe(
fn($node) => echo "node=$node\n",
fn($err) => echo "timeout\n"
);
$bus->emitName('health.ok', [], ['node' => 'api-1']);
4) Backpressure / batching (Rx composition)
$bus->on('order.created')
->bufferWithTimeOrCount(500, 100, $bus->scheduler()) // every 0.5s or 100 items
->filter(fn($batch) => !empty($batch))
->subscribe(fn(array $batch) => persist_batch($batch));
Swoole integration tips
-
HTTP server: in
on('request')
, emit an event with meta containing arespond
callable or theResponse
object. Downstream subscribers can produce aResponseEvent
. - Coroutines per subscriber: use Swoole coroutines in your subscribers if you do IO; Rx operators will orchestrate sequencing.
-
Event loop in CLI: outside a Swoole
Server
, start/stop the reactor withSwoole\Event::wait()
/Event::exit()
for timers to fire.