Home / News / Small Swoole Rx Events

Small Swoole Rx Events

Reactive event bus for PHP powered by RxPHP and Swoole: Sarcastic Recap

The event bus in this article highlights the power of RxPHP and Swoole in creating a simple and flexible event-driven solution for PHP. The article covers various features and functionalities provided by the event bus, including:Event Bus: A simple, event-driven bus that supports RxPHP operators, making it easy to publish, subscribe, and process event data. It provides methods like on(), onMany(), payloads(), once(), and request() for subscribing to events and handling them.
Swoole Scheduler: A Async SchedulerInterface using Swoole\Timer (works with RxPHP time operators) that enables the asynchronous execution of events in the

Reactive event bus for PHP powered by RxPHP and Swoole.

It lets you publish/subscribe domain and infrastructure events, compose pipelines with Rx operators, and run time-based operators on Swoole’s event loop.

  • EventBus — simple Rx‐backed bus with on(), onMany(), payloads(), once(), request()
  • SwooleSchedulerAsyncSchedulerInterface using Swoole\Timer (works with RxPHP time operators)
  • Event modelBasicEvent (name, payload, meta, rid) and EventInterface (correlation id)

Requirements

  • PHP 8.3+
  • ext-swoole 4.8+ / 5.x
  • reactivex/rxphp (2.x)

Installation

composer require small/swoole-rx-events

Quick start

use Small\SwooleRxEvents\EventBus;
use Small\SwooleRxEvents\SwooleScheduler;
use Small\SwooleRxEvents\Event\BasicEvent;

// Use the Swoole async scheduler
$bus = new EventBus(new SwooleScheduler());

// Subscribe by name
$bus->on('order.created')->subscribe(function ($e) {
    echo "order rid={$e->getRid()} payload=", json_encode($e->getPayload()), PHP_EOL;
});

// Emit an event
$bus->emitName('order.created', ['id' => 123]);

// If you’re in a plain CLI script, keep the loop alive briefly:
\Swoole\Timer::after(20, fn () => \Swoole\Event::exit());
\Swoole\Event::wait();

Concepts

Event

All event must implement EventInterface

namespace Small\SwooleRxEvents\Contract;

interface EventInterface
{

    public function getName(): string;
    public function getRid(): string;
    public function setRid(string $rid): self;

}

BasicEvent carries:

  • name (string)
  • payload (array)
  • meta (array, e.g. tracing, user)
  • rid (string, auto‐generated correlation id)

Bus

  • stream() — all events
  • on($name) / onMany([...]) — filtered streams
  • payloads($name) — payload‐only stream
  • once($name, ?map, ?timeoutMs) — resolve first matching event (optionally mapped)
  • request($requestName, $responseName, $payload = [], $meta = [], ?$timeoutMs)
    Emits a request with a new rid, waits for the first response with the same rid.

Timeouts require an async scheduler. This library provides SwooleScheduler which implements AsyncSchedulerInterface.

API Examples

1) Listen & emit

$bus->on('user.created')->subscribe(fn($e) => audit($e->getMeta(), $e->getPayload()));
$bus->emitName('user.created', ['id' => 42], ['by' => 'admin']);

2) Request/Response with correlation id

// Responder: copies rid from incoming 'REQ' and emits 'RESP'
$bus->on('REQ')->subscribe(function ($e) use ($bus) {
    $bus->emit(
        (new BasicEvent('RESP', ['ok' => true], $e->getMeta()))
            ->setRid($e->getRid())   // correlate
    );
});

// Caller: request() subscribes FIRST, then emits; no race conditions
$bus->request('REQ', 'RESP', ['foo' => 'bar'], ['trace' => 'abc'], 100)
    ->subscribe(
        fn($resp) => var_dump($resp->getPayload()),          // ['ok' => true]
        fn($err)  => error_log($err->getMessage())
    );

3) once() with mapping & timeout

$bus->once('health.ok', fn($e) => $e->getMeta()['node'] ?? 'unknown', 50)
    ->subscribe(
        fn($node) => echo "node=$node\n",
        fn($err)  => echo "timeout\n"
    );
$bus->emitName('health.ok', [], ['node' => 'api-1']);

4) Backpressure / batching (Rx composition)

$bus->on('order.created')
    ->bufferWithTimeOrCount(500, 100, $bus->scheduler()) // every 0.5s or 100 items
    ->filter(fn($batch) => !empty($batch))
    ->subscribe(fn(array $batch) => persist_batch($batch));

Swoole integration tips

  • HTTP server: in on('request'), emit an event with meta containing a respond callable or the Response object. Downstream subscribers can produce a ResponseEvent.
  • Coroutines per subscriber: use Swoole coroutines in your subscribers if you do IO; Rx operators will orchestrate sequencing.
  • Event loop in CLI: outside a Swoole Server, start/stop the reactor with Swoole\Event::wait() / Event::exit() for timers to fire.
Tagged: