thesis/pgmq

A non-blocking php client for Postgres Message Queue (PGMQ).

Fund package maintenance!
www.tinkoff.ru/cf/5MqZQas2dk7

Installs: 8

Dependents: 0

Suggesters: 0

Security: 0

Stars: 3

Watchers: 0

Forks: 1

Open Issues: 0

pkg:composer/thesis/pgmq

0.1.4 2025-12-21 14:40 UTC

README

Non-blocking php client for pgmq. See the extension installation guide.

Installation

composer require thesis/pgmq

Why is almost all the API functional?

Since you most likely expect exactly-once semantics from a database-based queue, all requests — sending or processing business logic with message acknowledgments — must be transactional. And the transaction object is short-lived: it cannot be used after rollback() or commit(), so it cannot be made a dependency. That's why all the API is built on functions that take Amp\Postgres\PostgresLink as their first parameter, which can be either a transaction object or just a connection. And only the consumer accepts Amp\Postgres\PostgresConnection, because it itself opens transactions for reading and acknowledging messages transactionally.

Contents

Create queue

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');

Create unlogged queue

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createUnloggedQueue($pg, 'events');

Create partitioned queue

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createPartitionedQueue(
    pg: $pg,
    queue: 'events',
    partitionInterval: 10000,
    retentionInterval: 100000,
);

List queues

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

foreach (Pgmq\listQueues($pg) as $queue) {
    $md = $queue->metadata();
    var_dump($md);
}

List queue metrics

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

foreach (Pgmq\metrics($pg) as $metrics) {
    var_dump($metrics);
}

List queue metadata

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

foreach (Pgmq\listQueueMetadata($pg) as $md) {
    var_dump($md);
}

Drop queue

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$queue->drop();

Purge queue

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
var_dump($queue->purge());

Send message

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messageId = $queue->send(new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'));

Send message with relative delay

use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messageId = $queue->send(
    new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'),
    TimeSpan::fromSeconds(5),
);

Send message with absolute delay

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messageId = $queue->send(
    new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'),
    new \DateTimeImmutable('+5 seconds'),
);

Send batch

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messageIds = $queue->sendBatch([
    new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'),
    new Pgmq\SendMessage('{"id": 2}', '{"x-header": "x-value"}'),
]);

Send batch with relative delay

use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messageIds = $queue->sendBatch(
    [
        new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'),
        new Pgmq\SendMessage('{"id": 2}', '{"x-header": "x-value"}'),
    ],
    TimeSpan::fromSeconds(5),
);

Send batch with absolute delay

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messageIds = $queue->sendBatch(
    [
        new Pgmq\SendMessage('{"id": 1}', '{"x-header": "x-value"}'),
        new Pgmq\SendMessage('{"id": 2}', '{"x-header": "x-value"}'),
    ],
    new \DateTimeImmutable('+5 seconds'),
);

Read message

use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->read(TimeSpan::fromSeconds(20));

Read batch

use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->readBatch(10, TimeSpan::fromSeconds(20));

Pop message

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->pop();

Read batch with poll

use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messages = $queue->readPoll(
    batch: 10,
    maxPoll: TimeSpan::fromSeconds(5),
    pollInterval: TimeSpan::fromMilliseconds(250),
);

Set visibility timeout

use Thesis\Pgmq;
use Amp\Postgres;
use Thesis\Time\TimeSpan;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->read();

if ($message !== null) {
    // handle the message

    $queue->setVisibilityTimeout($message->id, TimeSpan::fromSeconds(10));
}

Archive message

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->read();

if ($message !== null) {
    $queue->archive($message->id);
}

Archive batch

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messages = [...$queue->readBatch(5)];

if ($messages !== []) {
    $queue->archiveBatch(array_map(
        static fn(Pgmq\Message $message): int => $messages->id),
        $messages,
    );
}

Delete message

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$message = $queue->read();

if ($message !== null) {
    $queue->delete($message->id);
}

Delete batch

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$messages = [...$queue->readBatch(5)];

if ($messages !== []) {
    $queue->deleteBatch(array_map(
        static fn(Pgmq\Message $message): int => $messages->id),
        $messages,
    );
}

Enable notify insert

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$channel = $queue->enableNotifyInsert(); // postgres channel to listen is returned

Disable notify insert

use Thesis\Pgmq;
use Amp\Postgres;

$pg = new Postgres\PostgresConnectionPool(Postgres\PostgresConfig::fromString(''));

$queue = Pgmq\createQueue($pg, 'events');
$queue->disableNotifyInsert();

Consume messages

This functionality is not a standard feature of the pgmq extension, but is provided by the library as an add-on for reliable and correct processing of message batches from the queue, with the ability to ack, nack (with delay) and archive (term) messages from the queue.

  1. First of all, create the extension if it doesn't exist yet:
use Thesis\Pgmq;

Pgmq\createExtension($pg);
  1. Then create a queue:
use Thesis\Pgmq;

Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');
  1. Next, create the consumer object:
use Thesis\Pgmq;

Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');

$consumer = Pgmq\createConsumer($pg);
  1. Now we can proceed to configure the queue consumer handler:
use Thesis\Pgmq;

Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');

$consumer = Pgmq\createConsumer($pg);

$context = $consumer->consume(
    static function (array $messages, Pgmq\ConsumeController $ctrl): void {
        var_dump($messages);
        $ctrl->ack($messages);
    },
    new Pgmq\ConsumeConfig(
        queue: 'events',
    ),
);

Through Pgmq\ConsumeConfig you can configure:

  • the batch size of received messages;
  • the message visibility timeout;
  • enable monitoring for queue inserts via the LISTEN/NOTIFY mechanism;
  • and set the polling interval.

At least one of these settings — listenForInserts or pollTimeout — must be specified.

Through the Pgmq\ConsumeController, you can:

  • ack messages, causing them to be deleted from the queue;
  • nack messages with a delay, setting a visibility timeout for them;
  • terminate processing (when a message can no longer be retried), resulting in them being archived;
  • stop the consumer.

Since receiving messages and acking/nacking them occur within the same transaction, for your own database queries you must use the ConsumeController::$tx object to ensure exactly-once semantics for message processing.

use Thesis\Pgmq;

Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');

$consumer = Pgmq\createConsumer($pg);

$context = $consumer->consume(
    static function (array $messages, Pgmq\ConsumeController $ctrl): void {
        $ctrl->tx->execute('...some business logic');
        $ctrl->ack($messages);
    },
    new Pgmq\ConsumeConfig(
        queue: 'events',
    ),
);

Using ConsumeContext, you can gracefully stop the consumer, waiting for the current batch to finish processing.

use Thesis\Pgmq;
use function Amp\trapSignal;

Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');

$consumer = Pgmq\createConsumer($pg);

$context = $consumer->consume(
    static function (array $messages, Pgmq\ConsumeController $ctrl): void {
        $ctrl->tx->execute('...some business logic');
        $ctrl->ack($messages);
    },
    new Pgmq\ConsumeConfig(
        queue: 'events',
    ),
);

trapSignal([\SIGINT, \SIGTERM])

$context->stop();
$context->awaitCompletion();

Or stop all current consumers using $consumer->stop():

use Thesis\Pgmq;
use function Amp\trapSignal;

Pgmq\createExtension($pg);
Pgmq\createQueue($pg, 'events');

$consumer = Pgmq\createConsumer($pg);

$context = $consumer->consume(...);

trapSignal([\SIGINT, \SIGTERM])

$consumer->stop();
$context->awaitCompletion();

License

The MIT License (MIT). Please see License File for more information.