LEN

PHP Kafka 初体验
kafka 一个陌生的东西! 简单百度一下:  Kafka是一种高吞吐量的分布式发布订阅消息...
扫描右侧二维码阅读全文
16
2017/08

PHP Kafka 初体验

kafka 一个陌生的东西! 简单百度一下:

  Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据.

介绍下我使用的场景. 公司流量日益增长, 原有打点统计已经不足以承受, 当前的流量.
原有打点服务使用redis队列收集数据. 每天上亿的打点数据决定将这部分打点数据交由大数据工程师处理.

我们还是将打点数据打入 codis集群队列. 使用php脚本RPOP 简单数据验证后打入kafka.

开发流程:

1. 找轮子 到composer 找到 [nmred/kafka-php][1] 下载量最高还有中文文档和qq群(缺点也很明显, 需要依赖Psr及Amp类库!! 我框架中有日志类!!!)

2. 百度 kafka 到底是个什么鬼. 看看使用场景及如何调用安装. (理解为消息队列)

3. 查看 类库 demo 生产者部分代码及对照手册使用 坑很多呀, 生产者同步异步在不同tag包中均有问题, 我选择的是同步回调, 代码使用CGI运行. 

4. 封装一个生产者Module类, CGI运行方式必须使用单例模式(今天又踩坑了).

分享下我写的Module 类 :

<?php
/**
 * Created by PhpStorm.
 * User: Len
 * Date: 2017/8/14
 * Time: 20:22
 * 生产者 Kafka
 */

namespace DB;

use Kafka\ProducerConfig;
use Kafka\Producer;

class KafkaModule
{
    public $topic;

    private $last_result;

    /**
     * @var Producer
     */
    static $producer = null;

    const PRODUCER = 1;

    /**
     * KafkaModule constructor.
     * @param $config_info
     * @param int $mod
     */
    private function __construct($config_info, $mod = self::PRODUCER)
    {
        if (is_null(self::$producer)) {
            include_once(VENDOR_DIR . 'Amp/functions.php'); // autoload 不支持
            switch ($mod) {
                case self::PRODUCER:
                    $this->producerConfig($config_info);
                    self::$producer = new Producer();
                    break;
                default:
                    throw new \Exception('No such operation', 100);
            }
        }
    }

    static $connet_pool = [];

    /**
     * KafkaModule constructor.
     * @param $config_info
     * @param int $mod
     * @throws \Exception
     */
    public static function instanceKafka($config_info, $mod = self::PRODUCER)
    {
        $tag = md5(json_encode($config_info));
        if (!isset(self::$connet_pool[$tag])) {
            self::$connet_pool[$tag] = new self($config_info, $mod);
        }

        return self::$connet_pool[$tag];
    }

    /**
     * 构建生产者配置
     * @param $config_info
     * @return \Kafka\Consumer
     */
    public function producerConfig($config_info)
    {
        if (empty($config_info['addr'])) {
            throw new \Exception('addr is null');
        }
        $config = ProducerConfig::getInstance();
        $config->setMetadataBrokerList($config_info['addr']);
        !empty($config_info['ms']) && $config->setMetadataRefreshIntervalMs($config_info['ms']);
        !empty($config_info['ack']) && $config->setRequiredAck($config_info['ack']);
        !empty($config_info['is_asyn']) && $config->setIsAsyn($config_info['is_asyn']);
        !empty($config_info['version']) && $config->setBrokerVersion($config_info['version']);
        !empty($config_info['interval']) && $config->setProduceInterval($config_info['interval']);
        !empty($config_info['topic']) && $this->topic = $config_info['topic'];

        return $config;
    }

    /**
     * 同步方式
     * @param $data
     * @param null $topic
     * @return bool|void
     * @internal param $msg
     * @internal param string $key
     * @internal param $data
     */
    public function send($data, $topic = null)
    {
        if ($this->topic) {
            $topic = $this->topic;
        }
        if (is_array($data)) {
            $send_array = array_map(function ($value) use ($topic) {
                return array(
                    'topic' => $topic,
                    'value' => $value
                );
            }, $data);
        } else if (is_string($data)) {
            $send_array = array([
                'topic' => $topic,
                'value' => $data
            ]);
        } else {
            return $this->last_result = false;
        }
        $this->last_result = self::$producer->send($send_array);

        return $this->last_result;
    }

    /**
     * 返回最近一次send 是否成功
     * @return bool
     */
    public function isTrue()
    {
        $bool = true;
        if (empty($this->last_result)) {
            return false;
        }
        array_map(function ($result) use (&$bool) {
            if ($bool && (isset($result['data']['partitions']['errorCode']) || $result['data']['partitions']['errorCode'] != 0)) {
                $bool = false;
            }
        }, $this->last_result);

        return $bool;
    }

    /**
     * @param mixed $topic
     */
    public function setTopic($topic)
    {
        $this->topic = $topic;
    }
}

Module 现在仅能使用producer 消费者昨天没有时间封装. 代码日后补齐(没准就忘了)

今天踩得坑很多. Module没单例! composer包版本! 类库依赖! autoload不兼容! ...

后续说一下:

今天同事告诉我运行2天的1/4量脚本没问题, 但是今天放全量有一个脚本队列堵了70多W... 
启了8个脚本 什么都没干pop出来数据就send 但队列还是堵了. 考虑可能是kafka性能问题. 问了群主 **linux@/ty**

我: 您好 kafka 类库 支持长连接吗? 不考虑网络原因 多久会断?
linux@/ty: 生产还是消费
我: 生产
linux@/ty: 生产同步模式是短连接 异步模式是长连接

我用的是脚本启动 使用同步模式在但想要使用的长连接;

 我: new \Kafka\Producer 每次实例化都相当于 建立一次连接吗?
    linux@/ty: 分开用  同步和异步的参数稍微有些区别
    我: 尴尬了 ,  那我想在守护进程的脚本中 仅建立一次连接 使用异步方法每次实例化 都会重新建立连接
 异步实例化后不能重用对吗?
    linux@/ty: 实例化一次就可以了, 为啥要重复实例化了
    我: ......(不要意思写了 , 我还是个菜鸟对于回调函数的使用还是个彩笔)
    linux@/ty: 这块可以通过闭包扔数据进去, 说白了就是一个回调函数, 你要明白实例化的时候传入的是一个回调函数,不是写入的数据, 具体的回调函数怎么实现你自行组织代码就行
    我: 噢噢噢哦哦 明白了谢谢......

真心感谢, linux@/ty群主 又问必答再次感谢.

学到一些东西, 我真的基础很差........ 后续更新下代码.

Last modification:August 21st, 2017 at 06:53 pm
If you think my article is useful to you, please feel free to appreciate

Leave a Comment

One comment

  1. LEN

    think you!