gdx / p-service-bus-laravel-package
PServiceBus
Installs: 3 175
Dependents: 0
Suggesters: 0
Security: 0
Stars: 0
Forks: 0
Type:laravel-package
pkg:composer/gdx/p-service-bus-laravel-package
Requires
- php: >=8
- friendsofphp/proxy-manager-lts: ^1.0
- gdx/p-service-bus: ^2.13.0
- haydenpierce/class-finder: ^0.5.3
- illuminate/support: ^9.0|^10.13|^v11.25.0
- laminas/laminas-hydrator: ^4.16.0
Requires (Dev)
- doctrine/doctrine-bundle: ^2.4
- doctrine/orm: ^2.10
- laravel/framework: ^9.0|^10.0|^v11.25.0
- phpunit/phpunit: ^10.2
- roave/security-advisories: dev-latest
- vimeo/psalm: ^6.13.1
README
Laravel integration for library https://gitlab.com/GDXbsv/pservicebus
Packagist: https://packagist.org/packages/gdx/p-service-bus-laravel-package
Installation
composer require gdx/p-service-bus-laravel-package
Configuration
Add your transports in configuration. You can create as many transports as you want with any names. The example demonstrates several transport options: SNS/SQS for AWS, Kafka for event streaming, and CompositeTransport for multi-transport scenarios.
'transports' => [
'external' => in_array(env('APP_ENV'), ['production', 'staging',]) ? 'app.service_bus.transport.external_composite' : InMemoryTransport::class,
'main' => in_array(env('APP_ENV'), ['production', 'staging',]) ? 'app.service_bus.transport.main' : InMemoryTransport::class,
],
'transport_groups' => [
'example_group' => ['example_name_for_transport', 'example_name_for_transport2'],
],
Message Filtering by Transport
When defining external transports (like SNS/SQS or Kafka), you should filter the message maps to only include messages configured for that specific transport. This is done using the getFilteredMessageNameMapIn() and getFilteredMessageNameMapOut() methods on the build storage.
Messages are associated with transports using the #[ExternalIn('transport_name')] and #[ExternalOut('transport_name')] attributes on your message classes. The filtering ensures each transport only handles messages it's configured for.
Define those transports in your application provider:
<?php declare(strict_types=1);
namespace App\Providers;
use GDXbsv\PServiceBus\Transport\Sns\SnsSqsTransport;
use GDXbsv\PServiceBus\Transport\Sqs\SqsTransport;
use GDXbsv\PServiceBus\Transport\Kafka\KafkaExternalTransport;
use GDXbsv\PServiceBus\Transport\Kafka\KafkaFactory;
use GDXbsv\PServiceBus\Transport\CompositeTransport;
use Illuminate\Contracts\Foundation\Application;
use Illuminate\Support\ServiceProvider;
class PServiceBusProvider extends ServiceProvider
{
public function register()
{
// SNS/SQS Transport for external communications
$snsDsn = env('SNS_DSN', 'sns+http://key:secret@aws/000000000000?region=eu-central-1&topic=service_bus');
$sqsDsn = env('SQS_DSN', 'sqs+http://key:secret@aws/000000000000?region=eu-central-1');
$this->app->singleton(SnsSqsTransport::class,
fn(Application $app) => SnsSqsTransport::ofDsn(
$snsDsn,
SqsTransport::ofDsn($sqsDsn . '&retries=3&queue=editing_ext'),
// Filter messages for 'external' transport only
$app->make('p-service-bus.build.storage')->getFilteredMessageNameMapIn('external'),
)
);
// Kafka External Transport
$kafkaExtDsn = env('SB_KAFKA_EXT_DSN', 'kafka://localhost:9093/service_ext?partitions.amount=10&replication.factor=1');
$this->app->singleton(KafkaExternalTransport::class,
fn(Application $app) => KafkaFactory::createExternalFromDsn(
$kafkaExtDsn,
// Filter messages for 'external' transport only
$app->make('p-service-bus.build.storage')->getFilteredMessageNameMapOut('external'),
$app->make('p-service-bus.build.storage')->getFilteredMessageNameMapIn('external'),
)
);
// Internal SQS Transport
$this->app->singleton('app.service_bus.transport.main',
fn() => SqsTransport::ofDsn($sqsDsn . '&retries=3&queue=editing_main')
);
// Composite Transport combining SNS/SQS and Kafka
$this->app->singleton('app.service_bus.transport.external_composite',
fn(Application $app) => new CompositeTransport([
$app->make(SnsSqsTransport::class),
$app->make(KafkaExternalTransport::class),
])
);
}
}
Environment Configuration Example
For production with Kafka using AWS IAM authentication:
SNS_DSN=sns+http://key:secret@aws/000000000000?region=eu-central-1&topic=service_bus
SQS_DSN=sqs+http://key:secret@aws/000000000000?region=eu-central-1
SB_KAFKA_EXT_DSN=kafka+ssl://BROKERS_IAM/photo-service_ext?partitions.amount=10&replication.factor=3&ssl.ca.location=/etc/ssl/certs/ca-certificates.crt&aws.iam.auth=true&aws.region=eu-central-1
For local development:
SNS_DSN=sns+http://key:secret@localhost/000000000000?region=eu-central-1&topic=service_bus
SQS_DSN=sqs+http://key:secret@localhost:9324?region=eu-central-1
SB_KAFKA_EXT_DSN=kafka://kafka:9093/photo-service_ext?partitions.amount=10&replication.factor=1
Saga Eloquent
create saga:
The main difference you have to extend GDXbsv\PServiceBusLaravel\Saga\SagaEloquent and implement getEloquentModelClass
<?php declare(strict_types=1);
namespace App\Saga;
use Doctrine\ORM\Mapping as ORM;
use GDXbsv\PServiceBus\Bus\Handling\Handle;
use GDXbsv\PServiceBus\Id;
use GDXbsv\PServiceBus\Message\TimeSpan;
use GDXbsv\PServiceBus\Saga\MessageSagaContext;
use GDXbsv\PServiceBus\Saga\SagaContext;
use GDXbsv\PServiceBus\Saga\SagaCreateMapper;
use GDXbsv\PServiceBus\Saga\SagaPropertyMapper;
use GDXbsv\PServiceBusLaravel\Saga\SagaEloquent;
/**
* @final
*/
final class TestSaga extends SagaEloquent
{
private Id $id;
public string $string;
public ?string $value = null;
public static function getEloquentModelClass(): string
{
return TestSagaModel::class;
}
/**
* @param Id<static> $id
*/
private function __construct(Id $id, string $string)
{
$this->id = $id;
$this->string = $string;
}
public static function configureHowToCreateSaga(SagaCreateMapper $mapper): void
{
$mapper
->toMessage(
// do not forget to create handling function in a case if saga exists and to let saga know that we wait this message
function (TestSagaCreateCommand $command, MessageSagaContext $context) {
return new self(new Id($command->id), $command->string);
}
);
}
public static function configureHowToFindSaga(SagaPropertyMapper $mapper): void
{
$mapper
->mapSaga(new \ReflectionProperty(TestSaga::class, 'id'))
->toMessage(
function (TestSagaCommand $command, MessageSagaContext $context) {
return new Id($command->id);
}
);
}
/** We have to tell saga we wait this message */
#[Handle('main', 3)]
public function testSagaCreateCommand(TestSagaCreateCommand $command, SagaContext $context)
{
$this->string = $command->string;
$context->publish(new TestsSagaOutputEvent('testHandlerFunction'));
}
}
And create provided EloquentModel:
<?php declare(strict_types=1);
namespace App\Saga;
use Illuminate\Database\Eloquent\Model;
class TestSagaModel extends Model
{
protected $table = 'saga';
protected $primaryKey = 'id';
public $incrementing = false;
protected $guarded = [];
}
After this use it as usual.
Commands for laravel
p-service-bus:cache:clear clean cache and search attributes again. Needed only for debug=false modes.
p-service-bus:saga:eloquent:outbox:recover-messages run it send messages from outbox if something goes wrong. Do it periodically.
p-service-bus:saga:eloquent:only-once:clean {days=30} cleanup old messages form only once control. Do it once a day.