ano / barbeq
Abstraction of some Message Queuing system using Adapter pattern
dev-master
2013-04-03 15:59 UTC
Requires
- php: >=5.3.3
- symfony/event-dispatcher: >=2.0
- symfony/options-resolver: >=2.0
Requires (Dev)
- alchemy/rabbitmq-management-client: dev-master
- phpunit/phpunit: 3.*
- videlalvaro/php-amqplib: 2.0.*
This package is not auto-updated.
Last update: 2024-11-09 14:14:27 UTC
README
Abstract some Message Queuing system using the Adapter pattern
Work In Progress
This is a work in progress and unfinished business!
Usage example
producer.php
<?php require_once 'autoload.php'; use Symfony\Component\EventDispatcher\EventDispatcher; use BarbeQ\BarbeQ; use BarbeQ\Adapter\AmqpAdapter; use BarbeQ\Model\Message; $messageDispatcher = new EventDispatcher(); $dispatcher = new EventDispatcher(); // AMQP $connection = array('host' => 'localhost'); $exchange = array('name' => 'test_direct'); $queues = array(array('name' => 'test')); $adapter = new AmqpAdapter($connection, $exchange, $queues); // PDO //$pdo = new \PDO('mysql:dbname=barbeq', 'root', ''); //$adapter = new \BarbeQ\Adapter\PdoAdapter($pdo, array( // 'table' => 'queuing', //)); $barbeQ = new BarbeQ($adapter, $messageDispatcher, $dispatcher); $barbeQ->cook('test', new Message(array( 'id' => 1, 'foo' => 'bar', ))); // or $barbeQ->publish(...), same action
consumer.php
<?php require_once 'autoload.php'; use Symfony\Component\EventDispatcher\EventDispatcher; use BarbeQ\Adapter\AmqpAdapter; use BarbeQ\BarbeQ; use Acme\PocBundle\Consumer\TestConsumer; $messageDispatcher = new EventDispatcher(); $dispatcher = new EventDispatcher(); // AMQP $connection = array('host' => 'localhost'); $exchange = array('name' => 'test_direct'); $queues = array(array('name' => 'test')); $adapter = new AmqpAdapter($connection, $exchange, $queues); // PDO //$pdo = new \PDO('mysql:dbname=barbeq', 'root', ''); //$adapter = new \BarbeQ\Adapter\PdoAdapter($pdo, array( // 'table' => 'queuing', //)); $barbeQ = new BarbeQ($adapter, $messageDispatcher, $dispatcher); $testConsumer = new TestConsumer(); $barbeQ->addConsumer('test', $testConsumer); // Trace what's happening $barbeQ->addListener('barbeq.pre_consume', function(ConsumeEvent $event) { echo sprintf("Start consuming message #%d\n", $event->getMessage()->getMetadataValue('index')); }); $barbeQ->addListener('barbeq.post_consume', function(ConsumeEvent $event) { echo sprintf("Memory: %s, Time: %0.04fs\n", $event->getMessage()->getMemory(), $event->getMessage()->getTime()); }); $barbeQ->eat('test', 5); // or $barbeQ->consume(...), same action
Testing
$ php composer.phar update --dev $ phpunit
License
This bundle is under the MIT license. See the complete license in the bundle:
LICENSE