定制 gdx/p-service-bus-laravel-package 二次开发

按需修改功能、优化性能、对接业务系统,提供一站式技术支持

邮箱:yvsm@zunyunkeji.com | QQ:316430983 | 微信:yvsm316

gdx/p-service-bus-laravel-package

最新稳定版本:2.7.0

Composer 安装命令:

composer require gdx/p-service-bus-laravel-package

包简介

PServiceBus

README 文档

README

Laravel integration for library https://gitlab.com/GDXbsv/pservicebus

Packagist: https://packagist.org/packages/gdx/p-service-bus-laravel-package

Installation

composer require gdx/p-service-bus-laravel-package

Configuration

Add your transports in configuration. You can create as many transports as you want with any names. The example demonstrates several transport options: SNS/SQS for AWS, Kafka for event streaming, and CompositeTransport for multi-transport scenarios.

    'transports' => [
        'external' => in_array(env('APP_ENV'), ['production', 'staging',]) ? 'app.service_bus.transport.external_composite' : InMemoryTransport::class,
        'main' => in_array(env('APP_ENV'), ['production', 'staging',]) ? 'app.service_bus.transport.main' : InMemoryTransport::class,
    ],
    'transport_groups' => [
        'example_group' => ['example_name_for_transport', 'example_name_for_transport2'],
    ],

Message Filtering by Transport

When defining external transports (like SNS/SQS or Kafka), you should filter the message maps to only include messages configured for that specific transport. This is done using the getFilteredMessageNameMapIn() and getFilteredMessageNameMapOut() methods on the build storage.

Messages are associated with transports using the #[ExternalIn('transport_name')] and #[ExternalOut('transport_name')] attributes on your message classes. The filtering ensures each transport only handles messages it's configured for.

Define those transports in your application provider:

<?php declare(strict_types=1);

namespace App\Providers;

use GDXbsv\PServiceBus\Transport\Sns\SnsSqsTransport;
use GDXbsv\PServiceBus\Transport\Sqs\SqsTransport;
use GDXbsv\PServiceBus\Transport\Kafka\KafkaExternalTransport;
use GDXbsv\PServiceBus\Transport\Kafka\KafkaFactory;
use GDXbsv\PServiceBus\Transport\CompositeTransport;
use Illuminate\Contracts\Foundation\Application;
use Illuminate\Support\ServiceProvider;

class PServiceBusProvider extends ServiceProvider
{
    public function register()
    {
        // SNS/SQS Transport for external communications
        $snsDsn = env('SNS_DSN', 'sns+http://key:secret@aws/000000000000?region=eu-central-1&topic=service_bus');
        $sqsDsn = env('SQS_DSN', 'sqs+http://key:secret@aws/000000000000?region=eu-central-1');

        $this->app->singleton(SnsSqsTransport::class,
            fn(Application $app) => SnsSqsTransport::ofDsn(
                $snsDsn,
                SqsTransport::ofDsn($sqsDsn . '&retries=3&queue=editing_ext'),
                // Filter messages for 'external' transport only
                $app->make('p-service-bus.build.storage')->getFilteredMessageNameMapIn('external'),
            )
        );

        // Kafka External Transport
        $kafkaExtDsn = env('SB_KAFKA_EXT_DSN', 'kafka://localhost:9093/service_ext?partitions.amount=10&replication.factor=1');

        $this->app->singleton(KafkaExternalTransport::class,
            fn(Application $app) => KafkaFactory::createExternalFromDsn(
                $kafkaExtDsn,
                // Filter messages for 'external' transport only
                $app->make('p-service-bus.build.storage')->getFilteredMessageNameMapOut('external'),
                $app->make('p-service-bus.build.storage')->getFilteredMessageNameMapIn('external'),
            )
        );

        // Internal SQS Transport
        $this->app->singleton('app.service_bus.transport.main',
            fn() => SqsTransport::ofDsn($sqsDsn . '&retries=3&queue=editing_main')
        );

        // Composite Transport combining SNS/SQS and Kafka
        $this->app->singleton('app.service_bus.transport.external_composite',
            fn(Application $app) => new CompositeTransport([
                $app->make(SnsSqsTransport::class),
                $app->make(KafkaExternalTransport::class),
            ])
        );
    }
}

Environment Configuration Example

For production with Kafka using AWS IAM authentication:

SNS_DSN=sns+http://key:secret@aws/000000000000?region=eu-central-1&topic=service_bus
SQS_DSN=sqs+http://key:secret@aws/000000000000?region=eu-central-1
SB_KAFKA_EXT_DSN=kafka+ssl://BROKERS_IAM/photo-service_ext?partitions.amount=10&replication.factor=3&ssl.ca.location=/etc/ssl/certs/ca-certificates.crt&aws.iam.auth=true&aws.region=eu-central-1

For local development:

SNS_DSN=sns+http://key:secret@localhost/000000000000?region=eu-central-1&topic=service_bus
SQS_DSN=sqs+http://key:secret@localhost:9324?region=eu-central-1
SB_KAFKA_EXT_DSN=kafka://kafka:9093/photo-service_ext?partitions.amount=10&replication.factor=1

Saga Eloquent

create saga:

The main difference you have to extend GDXbsv\PServiceBusLaravel\Saga\SagaEloquent and implement getEloquentModelClass

<?php declare(strict_types=1);

namespace App\Saga;

use Doctrine\ORM\Mapping as ORM;
use GDXbsv\PServiceBus\Bus\Handling\Handle;
use GDXbsv\PServiceBus\Id;
use GDXbsv\PServiceBus\Message\TimeSpan;
use GDXbsv\PServiceBus\Saga\MessageSagaContext;
use GDXbsv\PServiceBus\Saga\SagaContext;
use GDXbsv\PServiceBus\Saga\SagaCreateMapper;
use GDXbsv\PServiceBus\Saga\SagaPropertyMapper;
use GDXbsv\PServiceBusLaravel\Saga\SagaEloquent;


/**
 * @final
 */
final class TestSaga extends SagaEloquent
{
    private Id $id;
    public string $string;
    public ?string $value = null;


    public static function getEloquentModelClass(): string
    {
        return TestSagaModel::class;
    }

    /**
     * @param Id<static> $id
     */
    private function __construct(Id $id, string $string)
    {
        $this->id = $id;
        $this->string = $string;
    }

    public static function configureHowToCreateSaga(SagaCreateMapper $mapper): void
    {
        $mapper
            ->toMessage(
                // do not forget to create handling function in a case if saga exists and to let saga know that we wait this message
                function (TestSagaCreateCommand $command, MessageSagaContext $context) {
                    return new self(new Id($command->id), $command->string);
                }
            );
    }

    public static function configureHowToFindSaga(SagaPropertyMapper $mapper): void
    {
        $mapper
            ->mapSaga(new \ReflectionProperty(TestSaga::class, 'id'))
            ->toMessage(
                function (TestSagaCommand $command, MessageSagaContext $context) {
                    return new Id($command->id);
                }
            );
    }

    /** We have to tell saga we wait this message */
    #[Handle('main', 3)]
    public function testSagaCreateCommand(TestSagaCreateCommand $command, SagaContext $context)
    {
        $this->string = $command->string;
        $context->publish(new TestsSagaOutputEvent('testHandlerFunction'));
    }
}

And create provided EloquentModel:

<?php declare(strict_types=1);

namespace App\Saga;

use Illuminate\Database\Eloquent\Model;

class TestSagaModel extends Model
{
    protected $table = 'saga';
    protected $primaryKey = 'id';
    public $incrementing = false;
    protected $guarded = [];
}

After this use it as usual.

Commands for laravel

p-service-bus:cache:clear clean cache and search attributes again. Needed only for debug=false modes.

p-service-bus:saga:eloquent:outbox:recover-messages run it send messages from outbox if something goes wrong. Do it periodically.

p-service-bus:saga:eloquent:only-once:clean {days=30} cleanup old messages form only once control. Do it once a day.

统计信息

  • 总下载量: 3.26k
  • 月度下载量: 0
  • 日度下载量: 0
  • 收藏数: 0
  • 点击次数: 1
  • 依赖项目数: 0
  • 推荐数: 0

GitHub 信息

  • Stars: 0
  • Watchers: 0
  • Forks: 0
  • 开发语言: PHP

其他信息

  • 授权协议: MIT
  • 更新时间: 2023-06-20

承接程序开发

PHP开发

VUE

Vue开发

前端开发

小程序开发

公众号开发

系统定制

数据库设计

云部署

网站建设

安全加固