Kafka在php中的使用----生产者与消费者

安装扩展

安装教程 kafka和php的rdkafka扩展教程网上有很多,大家可以自行查询,例如:Kafka-php-使用 PHP 编写的 Kafka 客户端

Kafka文档推荐

不清楚里面的api的可以在文档中查询 kafka中文文档

composer 依赖

创建 composer.json填写内容

{
  "require": {
        "nmred/kafka-php": "v0.2.0.8"
  }
}

异步调用生产者

<?php
require_once __DIR__ . '/Vendor/autoload.php';

use KafkaProducerConfig;
use KafkaProducer;

//异步方式调用
$config = ProducerConfig::getInstance();
//设置数据刷新时间(毫秒)
$config->setMetadataRefreshIntervalMs(10);
//地址
$config->setMetadataBrokerList('localhost:9092');
//设置代理版本
$config->setBrokerVersion('0.9.0.1');
//开启消息确认
$config->setRequiredAck(1);
$config->setIsAsyn(false);
//设置生产间隔
$config->setProduceInterval(500);
//生产者
$producer = new Producer(function () {
    return array([
        'topic' => 'test',//主题
        'value' => 'test message',
        'key' => 'testKey',//key
    ]);
});
$producer->success(function ($result) {
    echo '投递成功' . json_encode($result, 256) . PHP_EOL;
});
$producer->error(function ($result) {
    echo '投递失败' . json_encode($result, 256) . PHP_EOL;
});
$producer->send(true);

同步调用生产者

<?php
require_once __DIR__ . '/Vendor/autoload.php';

use KafkaProducerConfig;
use KafkaProducer;

$config = ProducerConfig::getInstance();
//这是元组数据刷新间隔毫秒
$config->setMetadataRefreshIntervalMs(10);
//代理地址
$config->setMetadataBrokerList('localhost:9092');
//设置代理版本
$config->setBrokerVersion('0.9.0.1');
//开启消息确认
$config->setRequiredAck(1);
$config->setIsAsyn(false);
//生产间隔
$config->setProduceInterval(10);
$producer = new Producer();
for ($i = 0; $i < 100; $i++) {
    $result = $producer->send(array(
        [
            'topic' => 'test',//主题
            'value' => 'test message',
            'key' => '',//key
        ]
    ));
    echo '投递成功' . json_encode($result, 256) . PHP_EOL;
}

消费者

<?php
require_once __DIR__ . '/Vendor/autoload.php';
use KafkaConsumerConfig;
use KafkaConsumer;
$config = ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10);
$config->setMetadataBrokerList('localhost:9092');
//设置分组分区
$config->setGroupId('test');
$config->setBrokerVersion('0.9.0.1');
$config->setTopics(['test']);
//设置偏移量
$config->setOffsetReset('earliest');
$consumer = new Consumer();
$consumer->start(function ($topic, $part, $message) {
    var_dump($message);
});

zhaohao

大家好,欢迎来到赵豪博客!赵豪,94年生人,PHP程序员一枚,因为对PHP开发有着相对比较浓厚的兴趣,所以现在从事着PHP程序员的工作。 今天再次开通这个博客,这里将记录我的职业生涯的点点滴滴,感谢来访与关注!如果我的博客能给您带来一些帮助那真是一件非常荣幸的事情~

相关推荐

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注

微信扫一扫

微信扫一扫

微信扫一扫,分享到朋友圈

Kafka在php中的使用----生产者与消费者
返回顶部

显示

忘记密码?

显示

显示

获取验证码

Close