承接 daalvand/kafka 相关项目开发

从需求分析到上线部署,全程专人跟进,保证项目质量与交付效率

邮箱:yvsm@zunyunkeji.com | QQ:316430983 | 微信:yvsm316

daalvand/kafka

最新稳定版本:1.0.0

Composer 安装命令:

composer require daalvand/kafka

包简介

This package is for produce and consume from kafka

README 文档

README

  • This package is for kafka consume and produce in laravel

installation

Install Kafka

Apache Kafka is need for many sections of our ecosystem

  1. install librdkafka from The Apache Kafka C/C++ client library

    for the ubuntu: apt install librdkafka-dev.

    for the centos: yum install librdkafka-devel

  2. Then build php extension from Manually Installing the extension

    git clone https://github.com/arnaud-lb/php-rdkafka.git
    cd php-rdkafka
    phpize
    ./configure
    make all -j 5
    sudo make install
  3. Then extension to php.ini

    extension=rdkafka.so

  4. Then restart php-fpm service service php-fpm restart

install package

  1. run composer require daalvand/kafka

2 . publish provider:

Laravel

  • php artisan vendor:publish --provider="Daalvand\Kafka\KafkaServiceProvider"

Lumen

  • Add the service provider to bootstrap/app.php file:
<?php
 $app->register(Daalvand\Kafka\KafkaServiceProvider::class);
  • Copy the config file from /vendor/daalvand/kafka/src/config to /config directory. Then configure it in /bootstrap/app.php file:
<?php

$app->configure("kafka");

Usage

Producer

<?php
use Daalvand\Kafka\Message\ProducerMessage;
use Daalvand\Kafka\Facades\Producer;

$producer = Producer::withAdditionalBroker('localhost:9092')
    ->build();

$message = (new ProducerMessage('topic-name', 0))
            ->withKey('test-key')
            ->withBody('some test message payload')
            ->withHeaders(['header' => 'value']);

$producer->produce($message);
$producer->flush(-1);

Consumer

<?php

use Daalvand\Kafka\Facades\Consumer;
use Daalvand\Kafka\Exceptions\ConsumerConsumeException;
use Daalvand\Kafka\Exceptions\ConsumerEndOfPartitionException;
use Daalvand\Kafka\Exceptions\ConsumerTimeoutException;

$consumer = Consumer::withAdditionalConfig([
            'compression.codec'       => 'lz4',
            'auto.commit.interval.ms' => 500
    ])
    ->withAdditionalBroker('kafka:9092')
    ->withConsumerGroup('testGroup')
    ->withAdditionalSubscription('test-topic')
    ->build();

$consumer->subscribe();

while (true) {
    try {
        $message = $consumer->consume();
        // your business logic
        $consumer->commit($message);
    } catch (ConsumerTimeoutException $e) {
        //no messages were read in a given time
    } catch (ConsumerEndOfPartitionException $e) {
        //only occurs if enable.partition.eof is true (default: false)
    } catch (ConsumerConsumeException $e) {
        // Failed
    }
}
  • auto.offset.reset option is (largest, smallest) valid values for kafka by older version than 0.9 and for after .9 is (earliest, latest)

统计信息

  • 总下载量: 3.99k
  • 月度下载量: 0
  • 日度下载量: 0
  • 收藏数: 0
  • 点击次数: 0
  • 依赖项目数: 2
  • 推荐数: 0

GitHub 信息

  • Stars: 0
  • Watchers: 1
  • Forks: 0
  • 开发语言: PHP

其他信息

  • 授权协议: MIT
  • 更新时间: 2021-09-24

承接程序开发

PHP开发

VUE

Vue开发

前端开发

小程序开发

公众号开发

系统定制

数据库设计

云部署

网站建设

安全加固