定制 jerome/matrix 二次开发

按需修改功能、优化性能、对接业务系统,提供一站式技术支持

邮箱:yvsm@zunyunkeji.com | QQ:316430983 | 微信:yvsm316

jerome/matrix

最新稳定版本:3.4.0

Composer 安装命令:

composer require jerome/matrix

包简介

An unparalleled PHP asynchronous experience, offering genuine concurrency and fiber-based task management.

README 文档

README

Matrix

Matrix

Latest Version on Packagist Tests Lint CodeQL PHPStan PHP Version License Total Downloads GitHub Stars

Matrix is a PHP library that brings event-driven, asynchronous programming to PHP, inspired by JavaScript's async/await syntax. Built on top of ReactPHP's event loop, Matrix makes it easier to write non-blocking I/O operations and manage concurrency with a simple, intuitive API.

Understanding Async in PHP

Important: PHP runs in a single-threaded environment. Matrix doesn't create true parallelism but enables event-driven, non-blocking I/O operations through ReactPHP's event loop. This means:

  • Non-blocking I/O: Network requests, file operations, and timers don't block execution
  • Concurrent operations: Multiple I/O operations can run simultaneously
  • CPU-bound tasks: Heavy computations will still block the event loop
  • True parallelism: No multiple threads or processes

Matrix shines when dealing with I/O-heavy applications like API clients, web scrapers, or microservices.

Why Choose Matrix?

Matrix simplifies ReactPHP development by providing a familiar async/await syntax while maintaining full compatibility with ReactPHP's ecosystem. It handles the complexity of promise management and event loop integration behind a clean, intuitive API.

Key Features

  • JavaScript-like API: Use async() and await() for straightforward asynchronous programming
  • Powered by ReactPHP: Built on ReactPHP's battle-tested event loop for true non-blocking I/O
  • Robust Error Handling: Catch and handle exceptions with .catch() or try-catch
  • Automatic Loop Management: The event loop runs automatically to handle promise resolution
  • Concurrent Operations: Run multiple I/O operations simultaneously
  • Rate Limiting: Control the frequency of asynchronous operations
  • Promise Cancellation: Cancel pending operations when they're no longer needed
  • Retry Mechanism: Automatically retry failed operations with configurable backoff strategies
  • Batch Processing: Process items in batches for improved performance
  • Enhanced Error Handling: Add context to errors for better debugging

Installation

Install Matrix via Composer:

composer require jerome/matrix

Requirements

  • PHP 8.0 or higher
  • sockets extension enabled

ReactPHP promises and the event loop will be installed automatically via Composer.

API Overview

Core Functions

async(callable $callable): PromiseInterface

Wraps a callable into an asynchronous function that returns a promise.

$func = async(fn () => 'Success'); $func->then(fn ($value) => echo $value) // Outputs: Success ->catch(fn ($e) => echo 'Error: ' . $e->getMessage());

await(PromiseInterface $promise, ?float $timeout = null): mixed

Awaits the resolution of a promise and returns its value. Optionally accepts a timeout in seconds.

try { $result = await(async(fn () => 'Success')); echo $result; // Outputs: Success // With timeout $result = await(async(fn () => sleep(2) && 'Delayed Success'), 3.0); echo $result; // Outputs: Delayed Success (or throws TimeoutException if it takes too long) } catch (\Throwable $e) { echo 'Error: ' . $e->getMessage(); }

Promise Combination

all(array $promises): PromiseInterface

Runs multiple promises concurrently and returns a promise that resolves with an array of all results.

$promises = [ async(fn () => 'Result 1'), async(fn () => 'Result 2'), async(fn () => 'Result 3'), ]; $results = await(all($promises)); // $results = ['Result 1', 'Result 2', 'Result 3']

race(array $promises): PromiseInterface

Returns a promise that resolves with the value of the first resolved promise in the array.

$promises = [ async(function () { sleep(2); return 'Slow'; }), async(function () { sleep(1); return 'Medium'; }), async(function () { return 'Fast'; }), ]; $result = await(race($promises)); // $result = 'Fast'

any(array $promises): PromiseInterface

Returns a promise that resolves when any promise resolves, or rejects when all promises reject.

$promises = [ async(function () { throw new \Exception('Error 1'); }), async(function () { return 'Success'; }), async(function () { throw new \Exception('Error 2'); }), ]; $result = await(any($promises)); // $result = 'Success'

Concurrency Control

map(array $items, callable $callback, int $concurrency = 0, ?callable $onProgress = null): PromiseInterface

Maps an array of items through an async function with optional concurrency control and progress tracking.

use React\Http\Browser; $urls = ['https://example.com', 'https://example.org', 'https://example.net']; $browser = new Browser(); $results = await(map( $urls, function ($url) use ($browser) { // Non-blocking HTTP request return $browser->get($url)->then(function ($response) use ($url) { return [ 'url' => $url, 'status' => $response->getStatusCode(), 'size' => strlen($response->getBody()) ]; }); }, 2, // Process 2 URLs at a time function ($done, $total) { echo "Processed $done of $total URLs\n"; } )); print_r($results); // Array of response data

batch(array $items, callable $batchCallback, int $batchSize = 10, int $concurrency = 1): PromiseInterface

Processes items in batches rather than one at a time for improved performance.

$items = range(1, 100); // 100 items to process $results = await(batch( $items, function ($batch) { return async(function () use ($batch) { // Process the entire batch at once return array_map(fn ($item) => $item * 2, $batch); }); }, 25, // 25 items per batch 2 // Process 2 batches concurrently )); print_r($results); // Array of processed items

pool(array $callables, int $concurrency = 5, ?callable $onProgress = null): PromiseInterface

Executes an array of callables with limited concurrency.

$tasks = [ fn () => async(fn () => performTask(1)), fn () => async(fn () => performTask(2)), fn () => async(fn () => performTask(3)), // ...more tasks ]; $results = await(pool( $tasks, 3, // Run 3 tasks concurrently function ($done, $total) { echo "Completed $done of $total tasks\n"; } )); print_r($results); // Array of task results

Error Handling and Control

timeout(PromiseInterface $promise, float $seconds, string $message = 'Operation timed out'): PromiseInterface

Creates a promise that times out after a specified period.

try { $result = await(timeout( async(function () { sleep(5); // Long operation return 'Done'; }), 2.0, // 2 second timeout 'The operation took too long' )); } catch (\Matrix\Exceptions\TimeoutException $e) { echo $e->getMessage(); // "The operation took too long" echo "Duration: " . $e->getDuration() . " seconds"; // "Duration: 2 seconds" }

retry(callable $factory, int $maxAttempts = 3, ?callable $backoffStrategy = null): PromiseInterface

Retries a promise-returning function multiple times until success or max attempts reached.

use React\Http\Browser; $browser = new Browser(); try { $result = await(retry( function () use ($browser) { // Non-blocking HTTP request with potential for failure return $browser->get('https://unreliable-api.com/data') ->then(function ($response) { if ($response->getStatusCode() !== 200) { throw new \RuntimeException('API returned ' . $response->getStatusCode()); } return $response->getBody()->getContents(); }); }, 5, // Try up to 5 times function ($attempt, $error) { // Exponential backoff with jitter $delay = min(pow(2, $attempt - 1) * 0.1, 5.0) * (0.8 + 0.4 * mt_rand() / mt_getrandmax()); echo "Attempt $attempt failed: {$error->getMessage()}, retrying in {$delay}s...\n"; return $delay; // Return null to stop retrying } )); echo "Finally succeeded: $result\n"; } catch (\Matrix\Exceptions\RetryException $e) { echo "All {$e->getAttempts()} attempts failed\n"; foreach ($e->getFailures() as $index => $failure) { echo "Failure " . ($index + 1) . ": " . $failure->getMessage() . "\n"; } }

cancellable(PromiseInterface $promise, callable $onCancel): CancellablePromise

Creates a cancellable promise with a cleanup function.

// Start a long operation $operation = async(function () { // Simulate long computation for ($i = 0; $i < 10; $i++) { // Check for cancellation at safe points if (/* cancelled check */) { throw new \RuntimeException('Operation cancelled'); } sleep(1); } return 'Completed'; }); // Create a clean-up function for when the operation is cancelled $cleanup = function () { echo "Cleaning up resources...\n"; // Release any held resources }; // Create a cancellable promise $cancellable = cancellable($operation, $cleanup); // Later, when you need to cancel the operation $cancellable->cancel(); // Check if it was cancelled if ($cancellable->isCancelled()) { echo "Operation was cancelled.\n"; }

withErrorContext(PromiseInterface $promise, string $context): PromiseInterface

Enhances a promise with additional error context.

try { await(withErrorContext( async(function () { throw new \RuntimeException('Database connection failed'); }), 'While initializing user service' )); } catch (\Matrix\Exceptions\AsyncException $e) { // Outputs: "While initializing user service: Database connection failed" echo $e->getMessage() . "\n"; // Original exception is preserved as the previous exception echo "Original error: " . $e->getPrevious()->getMessage() . "\n"; }

Timing and Flow Control

delay(float $seconds, mixed $value = null): PromiseInterface

Creates a promise that resolves after a specified delay.

$result = await(delay(2.0, 'Delayed result')); echo $result; // Outputs: Delayed result (after 2 seconds) // Can be used in promise chains async(fn () => 'Step 1') ->then(function ($result) { echo "$result\n"; return delay(1.0, $result . ' -> Step 2'); }) ->then(function ($result) { echo "$result\n"; });

waterfall(array $callables, mixed $initialValue = null): PromiseInterface

Executes promises in sequence, passing the result of each to the next.

$result = await(waterfall( [ function ($value) { return async(fn () => $value . ' -> Step 1'); }, function ($value) { return async(fn () => $value . ' -> Step 2'); }, function ($value) { return async(fn () => $value . ' -> Step 3'); } ], 'Initial value' )); echo $result; // Outputs: Initial value -> Step 1 -> Step 2 -> Step 3

rateLimit(callable $fn, int $maxCalls, float $period): callable

Creates a rate-limited version of an async function.

use React\Http\Browser; $browser = new Browser(); // Create a function that's limited to 2 calls per second $limitedFetch = rateLimit( function ($url) use ($browser) { // Non-blocking HTTP request return $browser->get($url); }, 2, // Maximum 2 calls 1.0 // Per 1 second ); // Make multiple calls $urls = [ 'https://example.com/api/1', 'https://example.com/api/2', 'https://example.com/api/3', 'https://example.com/api/4', 'https://example.com/api/5', ]; // These will automatically be rate-limited foreach ($urls as $url) { $limitedFetch($url)->then(function ($response) use ($url) { echo "Fetched $url: HTTP " . $response->getStatusCode() . "\n"; }); } // Wait for all to complete await(delay(10)); // Give time for requests to complete

Event System & Observability

Matrix v3.4.0 introduces a comprehensive event system for monitoring and debugging async operations.

Event Listening

Listen to promise lifecycle events:

use function Matrix\Support\listen; use function Matrix\Support\async; use function Matrix\Support\await; // Listen to promise creation events listen('promise.created', function ($event) { echo "Promise {$event->getPromiseId()} created (type: {$event->getType()})\n"; }); // Listen to promise resolution events listen('promise.resolved', function ($event) { echo "Promise {$event->getPromiseId()} resolved in {$event->getDuration()}s\n"; }); // Listen to promise rejection events listen('promise.rejected', function ($event) { echo "Promise {$event->getPromiseId()} rejected: {$event->getErrorMessage()}\n"; }); // Listen to timeout events listen('promise.timeout', function ($event) { echo "Promise {$event->getPromiseId()} timed out after {$event->getTimeoutDuration()}s\n"; }); // Your async operations will now fire events $result = await(async(function () { return 'Hello, World!'; }));

Available Events

  • promise.created: Fired when a promise is created
  • promise.resolved: Fired when a promise resolves successfully
  • promise.rejected: Fired when a promise is rejected
  • promise.timeout: Fired when a promise times out

Metrics Collection

Monitor performance with built-in metrics:

use function Matrix\Support\getMetrics; use function Matrix\Support\metricsCollector; // Execute some async operations await(async(fn () => 'task 1')); await(async(fn () => 'task 2')); // Get comprehensive metrics $metrics = getMetrics(); echo "Active promises: {$metrics['active_promises']}\n"; echo "Completed promises: {$metrics['completed_promises']}\n"; echo "Success rate: {$metrics['success_rate']}%\n"; echo "Average resolution time: {$metrics['average_resolution_time']}s\n"; // Access detailed counters echo "Total created: {$metrics['counters']['promises.created']}\n"; echo "Total resolved: {$metrics['counters']['promises.resolved']}\n"; echo "Total rejected: {$metrics['counters']['promises.rejected']}\n";

Advanced Event Usage

Create custom monitoring and debugging tools:

use function Matrix\Support\listen; use function Matrix\Support\eventDispatcher; // Custom performance monitoring $slowOperations = []; listen('promise.resolved', function ($event) use (&$slowOperations) { if ($event->getDuration() > 1.0) { // Slower than 1 second $slowOperations[] = [ 'id' => $event->getPromiseId(), 'duration' => $event->getDuration(), ]; } }); // Custom error tracking $errorCounts = []; listen('promise.rejected', function ($event) use (&$errorCounts) { $errorClass = $event->getErrorClass(); $errorCounts[$errorClass] = ($errorCounts[$errorClass] ?? 0) + 1; }); // Disable events temporarily for performance-critical sections eventDispatcher()->setEnabled(false); // ... critical operations eventDispatcher()->setEnabled(true);

Development and Debugging

Use events for enhanced debugging:

// Debug mode - log all promise activity if ($_ENV['DEBUG_PROMISES'] ?? false) { listen('promise.created', fn($e) => error_log("Created: {$e->getPromiseId()}")); listen('promise.resolved', fn($e) => error_log("Resolved: {$e->getPromiseId()} in {$e->getDuration()}s")); listen('promise.rejected', fn($e) => error_log("Rejected: {$e->getPromiseId()} - {$e->getErrorMessage()}")); } // Performance monitoring listen('promise.resolved', function ($event) { if ($event->getDuration() > 0.5) { echo "⚠️ Slow operation detected: {$event->getDuration()}s\n"; } });

Examples

Running Asynchronous Tasks

$promise = async(fn () => 'Task Completed'); $promise->then(fn ($value) => echo $value) // Outputs: Task Completed ->catch(fn ($e) => echo 'Error: ' . $e->getMessage());

Using the Await Syntax

try { $result = await(async(fn () => 'Finished Task')); echo $result; // Outputs: Finished Task } catch (\Throwable $e) { echo 'Error: ' . $e->getMessage(); }

Handling Errors

$promise = async(fn () => throw new \RuntimeException('Task Failed')); $promise->then(fn ($value) => echo $value) ->catch(fn ($e) => echo 'Caught Error: ' . $e->getMessage()); // Outputs: Caught Error: Task Failed

Chaining Promises

$promise = async(fn () => 'First Operation') ->then(function ($result) { echo $result . "\n"; // Outputs: First Operation return async(fn () => $result . ' -> Second Operation'); }) ->then(function ($result) { echo $result; // Outputs: First Operation -> Second Operation return $result; }); await($promise); // Wait for all operations to complete

Non-blocking HTTP Requests Example

use React\Http\Browser; // Fetch multiple URLs concurrently using non-blocking requests $browser = new Browser(); $urls = [ 'https://example.com', 'https://example.org', 'https://example.net' ]; $results = await(map( $urls, function ($url) use ($browser) { $start = microtime(true); // Non-blocking HTTP request return $browser->get($url)->then(function ($response) use ($url, $start) { $duration = microtime(true) - $start; return [ 'url' => $url, 'status' => $response->getStatusCode(), 'size' => strlen($response->getBody()), 'time' => round($duration, 2) . 's' ]; }); }, 2 // Process 2 at a time )); // Display results foreach ($results as $result) { echo "URL: {$result['url']}\n"; echo "Status: {$result['status']}\n"; echo "Size: {$result['size']} bytes\n"; echo "Time: {$result['time']}\n\n"; }

Database Operations Example (using ReactPHP MySQL)

use React\MySQL\Factory; use React\MySQL\QueryResult; // Define database operations as non-blocking tasks $factory = new Factory(); $connection = $factory->createLazyConnection('user:pass@localhost/test'); $tasks = [ function () use ($connection) { return $connection->query('SELECT * FROM users LIMIT 10') ->then(fn (QueryResult $result) => $result->resultRows); }, function () use ($connection) { return $connection->query('SELECT * FROM products LIMIT 10') ->then(fn (QueryResult $result) => $result->resultRows); }, function () use ($connection) { return $connection->query('SELECT * FROM orders LIMIT 10') ->then(fn (QueryResult $result) => $result->resultRows); } ]; // Execute all database queries concurrently [$users, $products, $orders] = await(pool($tasks)); // Process the data echo "Found " . count($users) . " users\n"; echo "Found " . count($products) . " products\n"; echo "Found " . count($orders) . " orders\n"; $connection->quit();

API Integration Example with Retry

use React\Http\Browser; // Fetch API data with retry support and batch processing function fetchApiData($endpoint, $apiToken, $batchSize = 50, $maxBatches = 10) { $browser = new Browser(); $baseUrl = "https://api.example.com"; // Create batches of requests $batches = []; for ($i = 0; $i < $maxBatches; $i++) { $batches[] = [ 'url' => "{$baseUrl}/{$endpoint}", 'params' => [ 'offset' => $i * $batchSize, 'limit' => $batchSize ] ]; } return await(batch( $batches, function ($batchItems) use ($browser, $apiToken) { return retry( function () use ($batchItems, $browser, $apiToken) { $promises = []; foreach ($batchItems as $item) { $url = $item['url'] . '?' . http_build_query($item['params']); $promises[] = $browser->get($url, [ 'Authorization' => "Bearer {$apiToken}", 'Content-Type' => 'application/json' ])->then(function ($response) { if ($response->getStatusCode() !== 200) { throw new \RuntimeException("API request failed with status " . $response->getStatusCode()); } $data = json_decode($response->getBody(), true); if (!isset($data['results'])) { throw new \RuntimeException("Unexpected response format"); } return $data['results']; }); } return all($promises)->then(function ($results) { return array_merge(...$results); }); }, 3, // 3 retry attempts function ($attempt, $error) { echo "API request failed (attempt {$attempt}): {$error->getMessage()}\n"; return $attempt * 1.5; // Increasing delay between retries } ); }, 2, // 2 items per batch 3 // 3 concurrent batches )); } // Usage try { $data = fetchApiData('users', 'your-api-token'); echo "Fetched " . count($data) . " records\n"; foreach ($data as $record) { echo "- {$record['id']}: {$record['name']}\n"; } } catch (\Throwable $e) { echo "Error fetching API data: " . $e->getMessage() . "\n"; }

Advanced Testing

Matrix includes a comprehensive test suite to ensure reliability, including:

Unit Tests

Test individual components in isolation:

public function test_async_returns_promise(): void { $result = async(fn () => 'test'); $this->assertInstanceOf(PromiseInterface::class, $result); }

Integration Tests

Test how components work together in real-world scenarios:

public function test_concurrent_operations(): void { $tasks = [ fn () => async(fn () => 'task1'), fn () => async(fn () => 'task2'), fn () => async(fn () => 'task3'), ]; $results = await(pool($tasks, 2)); $this->assertEquals(['task1', 'task2', 'task3'], $results); }

Error Handling Tests

Test that errors are properly handled and propagated:

public function test_error_context_enhancement(): void { try { await(withErrorContext( async(fn () => throw new \RuntimeException('Original error')), 'Error context' )); $this->fail('Should have thrown an exception'); } catch (AsyncException $e) { $this->assertStringContainsString('Error context', $e->getMessage()); $this->assertInstanceOf(\RuntimeException::class, $e->getPrevious()); } }

Concurrency Pattern Tests

Test various concurrency patterns:

public function test_rate_limiting(): void { $startTime = microtime(true); $results = []; $limitedFn = rateLimit( function ($i) use (&$results) { return async(function () use ($i, &$results) { $results[] = $i; return $i; }); }, 2, // Max 2 calls 0.5 // Per 0.5 seconds ); $promises = []; for ($i = 1; $i <= 5; $i++) { $promises[] = $limitedFn($i); } await(all($promises)); $duration = microtime(true) - $startTime; $this->assertGreaterThanOrEqual(1.0, $duration); $this->assertEquals([1, 2, 3, 4, 5], $results); }

Performance Considerations

  • Event Loop: Matrix uses ReactPHP's event loop, which should be run only once in your application
  • Blocking Operations: Avoid CPU-intensive tasks and blocking I/O operations (like file_get_contents(), sleep(), or database queries without ReactPHP adapters) in async functions as they will block the entire event loop
  • Memory Management: Be mindful of memory usage when creating many promises, as they remain in memory until resolved
  • Error Handling: Always handle promise rejections to prevent unhandled promise rejection warnings
  • Concurrency Limits: Use the concurrency parameters in map(), batch(), and pool() to control resource usage and prevent overwhelming external services
  • Rate Limiting: Use rateLimit() when working with APIs that have rate limits to avoid being throttled

Common Pitfalls to Avoid

❌ Using Blocking Operations

// DON'T - This blocks the entire event loop $result = await(async(function () { return file_get_contents('https://api.example.com'); // Blocking! }));

✅ Use Non-blocking Alternatives

// DO - Use ReactPHP's non-blocking HTTP client use React\Http\Browser; $browser = new Browser(); $result = await($browser->get('https://api.example.com'));

❌ CPU-Intensive Operations

// DON'T - Heavy computation blocks the event loop $result = await(async(function () { return array_sum(range(1, 10000000)); // Blocks event loop! }));

✅ Break Up Heavy Operations

// DO - Break into smaller chunks or use separate processes $result = await(async(function () { // Process in smaller batches with yields to event loop $sum = 0; for ($i = 1; $i <= 10000000; $i += 1000) { $sum += array_sum(range($i, min($i + 999, 10000000))); if ($i % 10000 === 0) { // Yield control back to event loop periodically await(delay(0.001)); } } return $sum; }));

How It Works

Matrix provides an intuitive async/await interface on top of ReactPHP's powerful event loop system:

  1. Event Loop Management: The async() function schedules work on ReactPHP's event loop, enabling non-blocking execution of I/O operations
  2. Promise Interface: All async operations return ReactPHP promises with then() and catch() methods for handling success and error cases
  3. Synchronous Await: The await() function runs the event loop until the promise resolves, providing a synchronous-looking interface
  4. Concurrency Control: Functions like map(), batch(), and pool() limit concurrent operations to prevent resource exhaustion
  5. Error Handling: Custom exception classes provide detailed information about failures, timeouts, and retry attempts

Key Point: Matrix doesn't change PHP's single-threaded nature but makes it much easier to write efficient, non-blocking I/O code that can handle thousands of concurrent operations.

Testing

Run the test suite to ensure everything is working as expected:

composer test

Contributing

We welcome contributions! To get started:

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add some amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

Please make sure your code follows the project's coding standards and includes appropriate tests.

License

Matrix is open-source software licensed under the MIT License.

统计信息

  • 总下载量: 8.26k
  • 月度下载量: 0
  • 日度下载量: 0
  • 收藏数: 56
  • 点击次数: 1
  • 依赖项目数: 2
  • 推荐数: 0

GitHub 信息

  • Stars: 56
  • Watchers: 4
  • Forks: 2
  • 开发语言: PHP

其他信息

  • 授权协议: MIT
  • 更新时间: 2026-01-04

承接程序开发

PHP开发

VUE

Vue开发

前端开发

小程序开发

公众号开发

系统定制

数据库设计

云部署

网站建设

安全加固