rxnet / eventstore-client
EventStore asynchronous PHP client with reactiveX flavours
Installs: 42 872
Dependents: 3
Suggesters: 0
Security: 0
Stars: 26
Watchers: 3
Forks: 10
Open Issues: 6
Requires
- php: ^7.2
- ext-json: *
- google/protobuf: ^3.2
- ramsey/uuid: ^3.5
- reactivex/rxphp: ^2.0
- rxnet/socket: ^0.2.0
- trafficcophp/bytebuffer: ^0.3
- voryx/event-loop: ^3.0 || ^2.0
- zendframework/zend-stdlib: ^3.2
Requires (Dev)
- friendsofphp/php-cs-fixer: ^2.14
- phpstan/phpstan: ^0.11.1
- protobuf-php/protobuf-plugin: ^0.1.3
README
Asynchronous client for EventStore TCP Api
Usage
Connect
<?php $eventStore = new \Rxnet\EventStore\EventStore(); // Default value $eventStore->connect('tcp://admin:changeit@localhost:1113'); $eventStore = new \Rxnet\EventStore\EventStore(); // Lazy way, to connect $eventStore->connect() ->subscribe(function() { echo "connected"; });
Write
You can put as many event you want (max 2000)
<?php $eventA = new \Rxnet\EventStore\NewEvent\JsonEvent('event_type1', ['data' => 'a'], ['worker'=>'metadata']); $eventB = new \Rxnet\EventStore\RawEvent('event_type2', 'raw data', 'raw metadata'); $eventStore->write('category-test_stream_id', [$eventA, $eventB]) ->subscribe(function(\Rxnet\EventStore\Data\WriteEventsCompleted $eventsCompleted) { echo "Last event number {$eventsCompleted->getLastEventNumber()} on commit position {$eventsCompleted->getCommitPosition()} \n"; });
Transaction
<?php $eventStore->startTransaction('category-test_stream') ->subscribe( function (\Rxnet\EventStore\Transaction $transaction) { $eventA = new JsonEvent('event_type', ['i' => "data"]); $eventB = new JsonEvent('event_type', ['i' => "data"]); // You can write as many as you want return $transaction->write([$eventA, $eventB]) // Commit to make it work ->flatMap([$transaction, 'commit']) ->subscribe( function (TransactionCommitCompleted $commitCompleted) { echo "Transaction {$commitCompleted->getTransactionId()} commit completed : events from {$commitCompleted->getFirstEventNumber()} to {$commitCompleted->getLastEventNumber()} \n"; } ); } );
Subscription
Connect to persistent subscription $ce-category (projection) has group my-group, then acknowledge or not
<?php $eventStore->persistentSubscription('projection-name', 'my-group') ->subscribe(function(\Rxnet\EventStore\AcknowledgeableEventRecord $event) { echo "received {$event->getId()} event {$event->getType()} ({$event->getNumber()}) with id {$event->getId()} on {$event->getStreamId()} \n"; if($event->getNumber() %2) { $event->ack(); } else { $event->nack($event::NACK_ACTION_RETRY, 'Explain why'); } });
Watch given stream for new events.
SubscribeCallback will be called when a new event appeared
<?php $eventStore->volatileSubscription('category-test_stream_id') ->subscribe(function(\Rxnet\EventStore\EventRecord $event) { echo "received {$event->getId()} event {$event->getType()} ({$event->getNumber()}) with id {$event->getId()} on {$event->getStreamId()} \n"; });
Read all events from position 100, when everything is read, watch for new events (like volatile)
<?php $eventStore->catchUpSubscription('category-test_stream_id', 100) ->subscribe(function(\Rxnet\EventStore\EventRecord $event) { echo "received {$event->getId()} event {$event->getType()} ({$event->getNumber()}) with id {$event->getId()} on {$event->getStreamId()} \n"; });
Read
Read from event 0 to event 100 on stream category-test_stream_id then end
<?php $eventStore->readEventsForward('category-test_stream_id', 0, 100) ->subscribe(function(\Rxnet\EventStore\EventRecord $event) { echo "received {$event->getId()} event {$event->getType()} ({$event->getNumber()}) with id {$event->getId()} on {$event->getStreamId()} \n"; });
Read backward (latest to oldest) from event 100 to event 90 on stream category-test_stream_id then end
<?php $eventStore->readEventsBackWard('category-test_stream_id', 100, 10) ->subscribe(function(\Rxnet\EventStore\EventRecord $event) { echo "received {$event->getId()} event {$event->getType()} ({$event->getNumber()}) with id {$event->getId()} on {$event->getStreamId()} \n"; });
Read first event detail from category-test_stream_id
<?php $eventStore->readEvent('category-test_stream_id', 0) ->subscribe(function(\Rxnet\EventStore\EventRecord $event) { echo "received {$event->getId()} event {$event->getType()} ({$event->getNumber()}) with id {$event->getId()} on {$event->getStreamId()} \n"; });
Contribute
TODO
- Append event to stream
- Read given stream
- Subscribe to given stream
- Read a huge stream
- Persistent subscription
- Connect to cluster
- Auto re-connect to master if needed
- Reconnect and disconnected from remote
- Transactions
- TLS connect
- Write some specs
- create / update / delete persistent subscription
- create / update / delete projection
- delete stream
Protocol buffer
If ClientMessageDtos.proto is modified, you must generate new Data php class
./vendor/bin/protobuf --include-descriptors -i . -o ./src ./ClientMessageDtos.proto