webit/message-bus-amqp
Composer 安装命令:
composer require webit/message-bus-amqp
包简介
README 文档
README
AMQP protocol infrastructure for Message Bus
Installation
composer require webit/message-bus-amqp=^1.0.0
Usage
Connection Pool
Use ConnectionPoolBuilder to create one
use Webit\MessageBus\Infrastructure\Amqp\Connection\Pool\ConnectionPoolBuilder; use Webit\MessageBus\Infrastructure\Amqp\Connection\ConnectionParams; $builder = ConnectionPoolBuilder::create(); // optionally set connection factory (LazyConnectionFactory used by default) $builder->setConnectionFactory( new \Webit\MessageBus\Infrastructure\Amqp\Connection\InstantConnectionFactory() ); // optionally add logger (use a smarter one in real life) $logger = new \Psr\Log\NullLogger(); $builder->setLogger($logger); // register at least one connection $builder->registerConnection( new ConnectionParams( 'rabbitmq.host', '5672', // port, 'my-username', 'my-password' ), 'connection-1' ); $connectionPool = $builder->build();
ConnectionPool gives you a current connection. If you find something is wrong with the current connection, you can dispose it and ask pool to give you a next one (if has any more).
try { $connection = $connectionPool->current(); $channel = $connection->getChannel(); } catch (\Exception $e) { $connectionPool->disposeCurrent(); $connection = $connectionPool->current(); }
Publisher integration
To publish Message via AMQP use AmqpPublisher
use Webit\MessageBus\Infrastructure\Amqp\Connection\Channel\NewChannelConnectionAwareChannelFactory; use Webit\MessageBus\Infrastructure\Amqp\Publisher\ExchangePublicationTarget; use Webit\MessageBus\Infrastructure\Amqp\Publisher\QueuePublicationTarget; use Webit\MessageBus\Infrastructure\Amqp\Publisher\AmqpPublisher; use Webit\MessageBus\Infrastructure\Amqp\Publisher\Routing\FromMessageTypeRoutingKeyResolver; $channelFactory = new NewChannelConnectionAwareChannelFactory($connectionPool); $publicationTarget = new ExchangePublicationTarget( $channelFactory, new FromMessageTypeRoutingKeyResolver(), // you can provide your implementation 'exchange-name' ); // or $publicationTarget = new QueuePublicationTarget( $channelFactory, 'queueName' ); $publisher = new AmqpPublisher($publicationTarget); $message = new Message('my-type', 'message_content'); $publisher->publish($message);
Message consumption
To listen for messages from AMQP and consume them:
- Implement your Consumer
use Webit\MessageBus\Consumer; use Webit\MessageBus\Message; class \MyConsumer implements Consumer { public function consume(Message $message) { // do your stuff here } }
- Build AmqpConsumer
use Webit\MessageBus\Infrastructure\Amqp\Listener\Message\MessageFactory; use Webit\MessageBus\Infrastructure\Amqp\Listener\AmqpConsumerBuilder; $builder = AmqpConsumerBuilder::create(); $builder->setConsumer(new \MyConsumer()); $builder->setLogger(new NullLogger()); // optional $builder->shouldSendFeedback(false); // if you don't want to acknowledge messages, set this to false (true by default) $builder->setMessageFactory(new SimpleMessageFactory()); // optionally set your MessageFactory $amqpConsumer = $builder->build();
- Start listening for AMQPMessages
$listener = new SimpleAmqpListener( $channelFactory, $amqpConsumer, 'queue-name' ); // start listening (continuous process) $listener->listen();
Running tests
Install dependencies with composer
docker-compose run --rm composer
Unit tests
docker-compose run --rm unit-tests
Integration tests
docker-compose run --rm integration-tests
统计信息
- 总下载量: 24
- 月度下载量: 0
- 日度下载量: 0
- 收藏数: 1
- 点击次数: 0
- 依赖项目数: 1
- 推荐数: 1
其他信息
- 授权协议: MIT
- 更新时间: 2017-12-04