jerome / matrix
An unparalleled PHP asynchronous experience, offering genuine concurrency and fiber-based task management.
Fund package maintenance!
thavarshan
Buy Me A Coffee
Installs: 4 587
Dependents: 2
Suggesters: 0
Security: 0
Stars: 50
Watchers: 4
Forks: 2
Open Issues: 1
Requires
- php: ^8.2
- ext-pcntl: *
- ext-posix: *
- react/event-loop: ^1.5
- react/promise: ^3.2
Requires (Dev)
- friendsofphp/php-cs-fixer: ^3.65
- guzzlehttp/guzzle: ^7.0
- laravel/pint: ^1.18
- mockery/mockery: ^1.6
- php-mock/php-mock: ^2.5
- php-mock/php-mock-mockery: ^1.4
- phpstan/phpstan: ^1.11.5
- phpunit/phpunit: ^11.5
- squizlabs/php_codesniffer: ^3.11
- symfony/var-dumper: ^7.2
- tightenco/duster: ^3.2
README
Matrix
Matrix is a PHP library that brings event-driven, asynchronous programming to PHP, inspired by JavaScript's async
/await
syntax. Built on top of ReactPHP's event loop, Matrix makes it easier to write non-blocking I/O operations and manage concurrency with a simple, intuitive API.
Understanding Async in PHP
Important: PHP runs in a single-threaded environment. Matrix doesn't create true parallelism but enables event-driven, non-blocking I/O operations through ReactPHP's event loop. This means:
- ✅ Non-blocking I/O: Network requests, file operations, and timers don't block execution
- ✅ Concurrent operations: Multiple I/O operations can run simultaneously
- ❌ CPU-bound tasks: Heavy computations will still block the event loop
- ❌ True parallelism: No multiple threads or processes
Matrix shines when dealing with I/O-heavy applications like API clients, web scrapers, or microservices.
Why Choose Matrix?
Matrix simplifies ReactPHP development by providing a familiar async/await syntax while maintaining full compatibility with ReactPHP's ecosystem. It handles the complexity of promise management and event loop integration behind a clean, intuitive API.
Key Features
- JavaScript-like API: Use
async()
andawait()
for straightforward asynchronous programming - Powered by ReactPHP: Built on ReactPHP's battle-tested event loop for true non-blocking I/O
- Robust Error Handling: Catch and handle exceptions with
.catch()
ortry-catch
- Automatic Loop Management: The event loop runs automatically to handle promise resolution
- Concurrent Operations: Run multiple I/O operations simultaneously
- Rate Limiting: Control the frequency of asynchronous operations
- Promise Cancellation: Cancel pending operations when they're no longer needed
- Retry Mechanism: Automatically retry failed operations with configurable backoff strategies
- Batch Processing: Process items in batches for improved performance
- Enhanced Error Handling: Add context to errors for better debugging
Installation
Install Matrix via Composer:
composer require jerome/matrix
Requirements
- PHP 8.0 or higher
sockets
extension enabled
ReactPHP promises and the event loop will be installed automatically via Composer.
API Overview
Core Functions
async(callable $callable): PromiseInterface
Wraps a callable into an asynchronous function that returns a promise.
$func = async(fn () => 'Success'); $func->then(fn ($value) => echo $value) // Outputs: Success ->catch(fn ($e) => echo 'Error: ' . $e->getMessage());
await(PromiseInterface $promise, ?float $timeout = null): mixed
Awaits the resolution of a promise and returns its value. Optionally accepts a timeout in seconds.
try { $result = await(async(fn () => 'Success')); echo $result; // Outputs: Success // With timeout $result = await(async(fn () => sleep(2) && 'Delayed Success'), 3.0); echo $result; // Outputs: Delayed Success (or throws TimeoutException if it takes too long) } catch (\Throwable $e) { echo 'Error: ' . $e->getMessage(); }
Promise Combination
all(array $promises): PromiseInterface
Runs multiple promises concurrently and returns a promise that resolves with an array of all results.
$promises = [ async(fn () => 'Result 1'), async(fn () => 'Result 2'), async(fn () => 'Result 3'), ]; $results = await(all($promises)); // $results = ['Result 1', 'Result 2', 'Result 3']
race(array $promises): PromiseInterface
Returns a promise that resolves with the value of the first resolved promise in the array.
$promises = [ async(function () { sleep(2); return 'Slow'; }), async(function () { sleep(1); return 'Medium'; }), async(function () { return 'Fast'; }), ]; $result = await(race($promises)); // $result = 'Fast'
any(array $promises): PromiseInterface
Returns a promise that resolves when any promise resolves, or rejects when all promises reject.
$promises = [ async(function () { throw new \Exception('Error 1'); }), async(function () { return 'Success'; }), async(function () { throw new \Exception('Error 2'); }), ]; $result = await(any($promises)); // $result = 'Success'
Concurrency Control
map(array $items, callable $callback, int $concurrency = 0, ?callable $onProgress = null): PromiseInterface
Maps an array of items through an async function with optional concurrency control and progress tracking.
use React\Http\Browser; $urls = ['https://example.com', 'https://example.org', 'https://example.net']; $browser = new Browser(); $results = await(map( $urls, function ($url) use ($browser) { // Non-blocking HTTP request return $browser->get($url)->then(function ($response) use ($url) { return [ 'url' => $url, 'status' => $response->getStatusCode(), 'size' => strlen($response->getBody()) ]; }); }, 2, // Process 2 URLs at a time function ($done, $total) { echo "Processed $done of $total URLs\n"; } )); print_r($results); // Array of response data
batch(array $items, callable $batchCallback, int $batchSize = 10, int $concurrency = 1): PromiseInterface
Processes items in batches rather than one at a time for improved performance.
$items = range(1, 100); // 100 items to process $results = await(batch( $items, function ($batch) { return async(function () use ($batch) { // Process the entire batch at once return array_map(fn ($item) => $item * 2, $batch); }); }, 25, // 25 items per batch 2 // Process 2 batches concurrently )); print_r($results); // Array of processed items
pool(array $callables, int $concurrency = 5, ?callable $onProgress = null): PromiseInterface
Executes an array of callables with limited concurrency.
$tasks = [ fn () => async(fn () => performTask(1)), fn () => async(fn () => performTask(2)), fn () => async(fn () => performTask(3)), // ...more tasks ]; $results = await(pool( $tasks, 3, // Run 3 tasks concurrently function ($done, $total) { echo "Completed $done of $total tasks\n"; } )); print_r($results); // Array of task results
Error Handling and Control
timeout(PromiseInterface $promise, float $seconds, string $message = 'Operation timed out'): PromiseInterface
Creates a promise that times out after a specified period.
try { $result = await(timeout( async(function () { sleep(5); // Long operation return 'Done'; }), 2.0, // 2 second timeout 'The operation took too long' )); } catch (\Matrix\Exceptions\TimeoutException $e) { echo $e->getMessage(); // "The operation took too long" echo "Duration: " . $e->getDuration() . " seconds"; // "Duration: 2 seconds" }
retry(callable $factory, int $maxAttempts = 3, ?callable $backoffStrategy = null): PromiseInterface
Retries a promise-returning function multiple times until success or max attempts reached.
use React\Http\Browser; $browser = new Browser(); try { $result = await(retry( function () use ($browser) { // Non-blocking HTTP request with potential for failure return $browser->get('https://unreliable-api.com/data') ->then(function ($response) { if ($response->getStatusCode() !== 200) { throw new \RuntimeException('API returned ' . $response->getStatusCode()); } return $response->getBody()->getContents(); }); }, 5, // Try up to 5 times function ($attempt, $error) { // Exponential backoff with jitter $delay = min(pow(2, $attempt - 1) * 0.1, 5.0) * (0.8 + 0.4 * mt_rand() / mt_getrandmax()); echo "Attempt $attempt failed: {$error->getMessage()}, retrying in {$delay}s...\n"; return $delay; // Return null to stop retrying } )); echo "Finally succeeded: $result\n"; } catch (\Matrix\Exceptions\RetryException $e) { echo "All {$e->getAttempts()} attempts failed\n"; foreach ($e->getFailures() as $index => $failure) { echo "Failure " . ($index + 1) . ": " . $failure->getMessage() . "\n"; } }
cancellable(PromiseInterface $promise, callable $onCancel): CancellablePromise
Creates a cancellable promise with a cleanup function.
// Start a long operation $operation = async(function () { // Simulate long computation for ($i = 0; $i < 10; $i++) { // Check for cancellation at safe points if (/* cancelled check */) { throw new \RuntimeException('Operation cancelled'); } sleep(1); } return 'Completed'; }); // Create a clean-up function for when the operation is cancelled $cleanup = function () { echo "Cleaning up resources...\n"; // Release any held resources }; // Create a cancellable promise $cancellable = cancellable($operation, $cleanup); // Later, when you need to cancel the operation $cancellable->cancel(); // Check if it was cancelled if ($cancellable->isCancelled()) { echo "Operation was cancelled.\n"; }
withErrorContext(PromiseInterface $promise, string $context): PromiseInterface
Enhances a promise with additional error context.
try { await(withErrorContext( async(function () { throw new \RuntimeException('Database connection failed'); }), 'While initializing user service' )); } catch (\Matrix\Exceptions\AsyncException $e) { // Outputs: "While initializing user service: Database connection failed" echo $e->getMessage() . "\n"; // Original exception is preserved as the previous exception echo "Original error: " . $e->getPrevious()->getMessage() . "\n"; }
Timing and Flow Control
delay(float $seconds, mixed $value = null): PromiseInterface
Creates a promise that resolves after a specified delay.
$result = await(delay(2.0, 'Delayed result')); echo $result; // Outputs: Delayed result (after 2 seconds) // Can be used in promise chains async(fn () => 'Step 1') ->then(function ($result) { echo "$result\n"; return delay(1.0, $result . ' -> Step 2'); }) ->then(function ($result) { echo "$result\n"; });
waterfall(array $callables, mixed $initialValue = null): PromiseInterface
Executes promises in sequence, passing the result of each to the next.
$result = await(waterfall( [ function ($value) { return async(fn () => $value . ' -> Step 1'); }, function ($value) { return async(fn () => $value . ' -> Step 2'); }, function ($value) { return async(fn () => $value . ' -> Step 3'); } ], 'Initial value' )); echo $result; // Outputs: Initial value -> Step 1 -> Step 2 -> Step 3
rateLimit(callable $fn, int $maxCalls, float $period): callable
Creates a rate-limited version of an async function.
use React\Http\Browser; $browser = new Browser(); // Create a function that's limited to 2 calls per second $limitedFetch = rateLimit( function ($url) use ($browser) { // Non-blocking HTTP request return $browser->get($url); }, 2, // Maximum 2 calls 1.0 // Per 1 second ); // Make multiple calls $urls = [ 'https://example.com/api/1', 'https://example.com/api/2', 'https://example.com/api/3', 'https://example.com/api/4', 'https://example.com/api/5', ]; // These will automatically be rate-limited foreach ($urls as $url) { $limitedFetch($url)->then(function ($response) use ($url) { echo "Fetched $url: HTTP " . $response->getStatusCode() . "\n"; }); } // Wait for all to complete await(delay(10)); // Give time for requests to complete
Examples
Running Asynchronous Tasks
$promise = async(fn () => 'Task Completed'); $promise->then(fn ($value) => echo $value) // Outputs: Task Completed ->catch(fn ($e) => echo 'Error: ' . $e->getMessage());
Using the Await Syntax
try { $result = await(async(fn () => 'Finished Task')); echo $result; // Outputs: Finished Task } catch (\Throwable $e) { echo 'Error: ' . $e->getMessage(); }
Handling Errors
$promise = async(fn () => throw new \RuntimeException('Task Failed')); $promise->then(fn ($value) => echo $value) ->catch(fn ($e) => echo 'Caught Error: ' . $e->getMessage()); // Outputs: Caught Error: Task Failed
Chaining Promises
$promise = async(fn () => 'First Operation') ->then(function ($result) { echo $result . "\n"; // Outputs: First Operation return async(fn () => $result . ' -> Second Operation'); }) ->then(function ($result) { echo $result; // Outputs: First Operation -> Second Operation return $result; }); await($promise); // Wait for all operations to complete
Non-blocking HTTP Requests Example
use React\Http\Browser; // Fetch multiple URLs concurrently using non-blocking requests $browser = new Browser(); $urls = [ 'https://example.com', 'https://example.org', 'https://example.net' ]; $results = await(map( $urls, function ($url) use ($browser) { $start = microtime(true); // Non-blocking HTTP request return $browser->get($url)->then(function ($response) use ($url, $start) { $duration = microtime(true) - $start; return [ 'url' => $url, 'status' => $response->getStatusCode(), 'size' => strlen($response->getBody()), 'time' => round($duration, 2) . 's' ]; }); }, 2 // Process 2 at a time )); // Display results foreach ($results as $result) { echo "URL: {$result['url']}\n"; echo "Status: {$result['status']}\n"; echo "Size: {$result['size']} bytes\n"; echo "Time: {$result['time']}\n\n"; }
Database Operations Example (using ReactPHP MySQL)
use React\MySQL\Factory; use React\MySQL\QueryResult; // Define database operations as non-blocking tasks $factory = new Factory(); $connection = $factory->createLazyConnection('user:pass@localhost/test'); $tasks = [ function () use ($connection) { return $connection->query('SELECT * FROM users LIMIT 10') ->then(fn (QueryResult $result) => $result->resultRows); }, function () use ($connection) { return $connection->query('SELECT * FROM products LIMIT 10') ->then(fn (QueryResult $result) => $result->resultRows); }, function () use ($connection) { return $connection->query('SELECT * FROM orders LIMIT 10') ->then(fn (QueryResult $result) => $result->resultRows); } ]; // Execute all database queries concurrently [$users, $products, $orders] = await(pool($tasks)); // Process the data echo "Found " . count($users) . " users\n"; echo "Found " . count($products) . " products\n"; echo "Found " . count($orders) . " orders\n"; $connection->quit();
API Integration Example with Retry
use React\Http\Browser; // Fetch API data with retry support and batch processing function fetchApiData($endpoint, $apiToken, $batchSize = 50, $maxBatches = 10) { $browser = new Browser(); $baseUrl = "https://api.example.com"; // Create batches of requests $batches = []; for ($i = 0; $i < $maxBatches; $i++) { $batches[] = [ 'url' => "{$baseUrl}/{$endpoint}", 'params' => [ 'offset' => $i * $batchSize, 'limit' => $batchSize ] ]; } return await(batch( $batches, function ($batchItems) use ($browser, $apiToken) { return retry( function () use ($batchItems, $browser, $apiToken) { $promises = []; foreach ($batchItems as $item) { $url = $item['url'] . '?' . http_build_query($item['params']); $promises[] = $browser->get($url, [ 'Authorization' => "Bearer {$apiToken}", 'Content-Type' => 'application/json' ])->then(function ($response) { if ($response->getStatusCode() !== 200) { throw new \RuntimeException("API request failed with status " . $response->getStatusCode()); } $data = json_decode($response->getBody(), true); if (!isset($data['results'])) { throw new \RuntimeException("Unexpected response format"); } return $data['results']; }); } return all($promises)->then(function ($results) { return array_merge(...$results); }); }, 3, // 3 retry attempts function ($attempt, $error) { echo "API request failed (attempt {$attempt}): {$error->getMessage()}\n"; return $attempt * 1.5; // Increasing delay between retries } ); }, 2, // 2 items per batch 3 // 3 concurrent batches )); } // Usage try { $data = fetchApiData('users', 'your-api-token'); echo "Fetched " . count($data) . " records\n"; foreach ($data as $record) { echo "- {$record['id']}: {$record['name']}\n"; } } catch (\Throwable $e) { echo "Error fetching API data: " . $e->getMessage() . "\n"; }
Advanced Testing
Matrix includes a comprehensive test suite to ensure reliability, including:
Unit Tests
Test individual components in isolation:
public function test_async_returns_promise(): void { $result = async(fn () => 'test'); $this->assertInstanceOf(PromiseInterface::class, $result); }
Integration Tests
Test how components work together in real-world scenarios:
public function test_concurrent_operations(): void { $tasks = [ fn () => async(fn () => 'task1'), fn () => async(fn () => 'task2'), fn () => async(fn () => 'task3'), ]; $results = await(pool($tasks, 2)); $this->assertEquals(['task1', 'task2', 'task3'], $results); }
Error Handling Tests
Test that errors are properly handled and propagated:
public function test_error_context_enhancement(): void { try { await(withErrorContext( async(fn () => throw new \RuntimeException('Original error')), 'Error context' )); $this->fail('Should have thrown an exception'); } catch (AsyncException $e) { $this->assertStringContainsString('Error context', $e->getMessage()); $this->assertInstanceOf(\RuntimeException::class, $e->getPrevious()); } }
Concurrency Pattern Tests
Test various concurrency patterns:
public function test_rate_limiting(): void { $startTime = microtime(true); $results = []; $limitedFn = rateLimit( function ($i) use (&$results) { return async(function () use ($i, &$results) { $results[] = $i; return $i; }); }, 2, // Max 2 calls 0.5 // Per 0.5 seconds ); $promises = []; for ($i = 1; $i <= 5; $i++) { $promises[] = $limitedFn($i); } await(all($promises)); $duration = microtime(true) - $startTime; $this->assertGreaterThanOrEqual(1.0, $duration); $this->assertEquals([1, 2, 3, 4, 5], $results); }
Performance Considerations
- Event Loop: Matrix uses ReactPHP's event loop, which should be run only once in your application
- Blocking Operations: Avoid CPU-intensive tasks and blocking I/O operations (like
file_get_contents()
,sleep()
, or database queries without ReactPHP adapters) in async functions as they will block the entire event loop - Memory Management: Be mindful of memory usage when creating many promises, as they remain in memory until resolved
- Error Handling: Always handle promise rejections to prevent unhandled promise rejection warnings
- Concurrency Limits: Use the concurrency parameters in
map()
,batch()
, andpool()
to control resource usage and prevent overwhelming external services - Rate Limiting: Use
rateLimit()
when working with APIs that have rate limits to avoid being throttled
Common Pitfalls to Avoid
❌ Using Blocking Operations
// DON'T - This blocks the entire event loop $result = await(async(function () { return file_get_contents('https://api.example.com'); // Blocking! }));
✅ Use Non-blocking Alternatives
// DO - Use ReactPHP's non-blocking HTTP client use React\Http\Browser; $browser = new Browser(); $result = await($browser->get('https://api.example.com'));
❌ CPU-Intensive Operations
// DON'T - Heavy computation blocks the event loop $result = await(async(function () { return array_sum(range(1, 10000000)); // Blocks event loop! }));
✅ Break Up Heavy Operations
// DO - Break into smaller chunks or use separate processes $result = await(async(function () { // Process in smaller batches with yields to event loop $sum = 0; for ($i = 1; $i <= 10000000; $i += 1000) { $sum += array_sum(range($i, min($i + 999, 10000000))); if ($i % 10000 === 0) { // Yield control back to event loop periodically await(delay(0.001)); } } return $sum; }));
How It Works
Matrix provides an intuitive async/await interface on top of ReactPHP's powerful event loop system:
- Event Loop Management: The
async()
function schedules work on ReactPHP's event loop, enabling non-blocking execution of I/O operations - Promise Interface: All async operations return ReactPHP promises with
then()
andcatch()
methods for handling success and error cases - Synchronous Await: The
await()
function runs the event loop until the promise resolves, providing a synchronous-looking interface - Concurrency Control: Functions like
map()
,batch()
, andpool()
limit concurrent operations to prevent resource exhaustion - Error Handling: Custom exception classes provide detailed information about failures, timeouts, and retry attempts
Key Point: Matrix doesn't change PHP's single-threaded nature but makes it much easier to write efficient, non-blocking I/O code that can handle thousands of concurrent operations.
Testing
Run the test suite to ensure everything is working as expected:
composer test
Contributing
We welcome contributions! To get started:
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature
) - Commit your changes (
git commit -m 'Add some amazing feature'
) - Push to the branch (
git push origin feature/amazing-feature
) - Open a Pull Request
Please make sure your code follows the project's coding standards and includes appropriate tests.
License
Matrix is open-source software licensed under the MIT License.