webit/message-bus-amqp 问题修复 & 功能扩展

解决BUG、新增功能、兼容多环境部署,快速响应你的开发需求

邮箱:yvsm@zunyunkeji.com | QQ:316430983 | 微信:yvsm316

webit/message-bus-amqp

Composer 安装命令:

composer require webit/message-bus-amqp

包简介

README 文档

README

AMQP protocol infrastructure for Message Bus

Installation

composer require webit/message-bus-amqp=^1.0.0

Usage

Connection Pool

Use ConnectionPoolBuilder to create one

use Webit\MessageBus\Infrastructure\Amqp\Connection\Pool\ConnectionPoolBuilder;
use Webit\MessageBus\Infrastructure\Amqp\Connection\ConnectionParams;


$builder = ConnectionPoolBuilder::create();

// optionally set connection factory (LazyConnectionFactory used by default)
$builder->setConnectionFactory(
    new \Webit\MessageBus\Infrastructure\Amqp\Connection\InstantConnectionFactory()
);

// optionally add logger (use a smarter one in real life)
$logger = new \Psr\Log\NullLogger();
$builder->setLogger($logger);

// register at least one connection
$builder->registerConnection(
    new ConnectionParams(
        'rabbitmq.host',
        '5672', // port,
        'my-username',
        'my-password'
    ),
    'connection-1'
);

$connectionPool = $builder->build();
 

ConnectionPool gives you a current connection. If you find something is wrong with the current connection, you can dispose it and ask pool to give you a next one (if has any more).

try {
    $connection = $connectionPool->current();
    $channel = $connection->getChannel();
} catch (\Exception $e) {
    $connectionPool->disposeCurrent();
    $connection = $connectionPool->current();
}

Publisher integration

To publish Message via AMQP use AmqpPublisher

use Webit\MessageBus\Infrastructure\Amqp\Connection\Channel\NewChannelConnectionAwareChannelFactory;
use Webit\MessageBus\Infrastructure\Amqp\Publisher\ExchangePublicationTarget;
use Webit\MessageBus\Infrastructure\Amqp\Publisher\QueuePublicationTarget;
use Webit\MessageBus\Infrastructure\Amqp\Publisher\AmqpPublisher;
use Webit\MessageBus\Infrastructure\Amqp\Publisher\Routing\FromMessageTypeRoutingKeyResolver;

$channelFactory = new NewChannelConnectionAwareChannelFactory($connectionPool);

$publicationTarget = new ExchangePublicationTarget(
    $channelFactory,
    new FromMessageTypeRoutingKeyResolver(), // you can provide your implementation
    'exchange-name'
);

// or

$publicationTarget = new QueuePublicationTarget(
    $channelFactory,
    'queueName'
);

$publisher = new AmqpPublisher($publicationTarget);

$message = new Message('my-type', 'message_content');
$publisher->publish($message);

Message consumption

To listen for messages from AMQP and consume them:

  1. Implement your Consumer
use Webit\MessageBus\Consumer;
use Webit\MessageBus\Message;

class \MyConsumer implements Consumer
{
    public function consume(Message $message)
    {
        // do your stuff here
    }
}
  1. Build AmqpConsumer
use Webit\MessageBus\Infrastructure\Amqp\Listener\Message\MessageFactory;
use Webit\MessageBus\Infrastructure\Amqp\Listener\AmqpConsumerBuilder;

$builder = AmqpConsumerBuilder::create();
$builder->setConsumer(new \MyConsumer());
$builder->setLogger(new NullLogger()); // optional
$builder->shouldSendFeedback(false); // if you don't want to acknowledge messages, set this to false (true by default)
$builder->setMessageFactory(new SimpleMessageFactory()); // optionally set your MessageFactory

$amqpConsumer = $builder->build();
  1. Start listening for AMQPMessages
$listener = new SimpleAmqpListener(
  $channelFactory,
  $amqpConsumer,
  'queue-name'
);

// start listening (continuous process)
$listener->listen();

Running tests

Install dependencies with composer

docker-compose run --rm composer

Unit tests

docker-compose run --rm unit-tests

Integration tests

docker-compose run --rm integration-tests

统计信息

  • 总下载量: 24
  • 月度下载量: 0
  • 日度下载量: 0
  • 收藏数: 1
  • 点击次数: 0
  • 依赖项目数: 1
  • 推荐数: 1

GitHub 信息

  • Stars: 1
  • Watchers: 2
  • Forks: 0
  • 开发语言: PHP

其他信息

  • 授权协议: MIT
  • 更新时间: 2017-12-04

承接程序开发

PHP开发

VUE

Vue开发

前端开发

小程序开发

公众号开发

系统定制

数据库设计

云部署

网站建设

安全加固