php-rdkafka-class

rdkafka的下php-rdkafka的php类库

目录

安装 使用 消费 生产 更多配置

安装

具体查看librdkafk和php-rdkafka

使用

消费

# setTopic('qkl01', 0, $offset) 不设置,从最后一次服务器记录一次消费开始消费 $offset = 86; //开始消费点 $consumer = new VendorsQueueMsgKafkaConsumer(['ip'=>'192.168.216.122']); $consumer->setConsumerGroup('test-110-sxx1') ->setBrokerServer('192.168.216.122') ->setConsumerTopic() ->setTopic('qkl01', 0, $offset) ->subscribe(['qkl01']) ->consumer(function($msg){ var_dump($msg); });

生产

$config = [ 'ip'=>'192.168.216.122', 'dr_msg_cb' => function($kafka, $message) { var_dump((array)$message); //todo //do biz something, don't exit() or die() } ]; $producer = new VendorsQueueMsgKafkaProducer($config); $rst = $producer->setBrokerServer() ->setProducerTopic('qkl01') ->producer('qkl037', 90); var_dump($rst);

更多配置

$defaultConfig = [ 'ip'=>'127.0.0.1', #默认服务器地址 'log_path'=> sys_get_temp_dir(), #日志默认地址 'dr_msg_cb' => [$this, 'defaultDrMsg'], #生产的dr回调 'error_cb' => [$this, 'defaultErrorCb'], #错误回调 'rebalance_cb' => [$this, 'defaultRebalance'] #负载回调,你可以用匿名方法自定义 ]; # broker相关配置,你可以参考Configuration.md $brokerConfig = [ 'request.required.acks'=> -1, 'auto.commit.enable'=> 1, 'auto.commit.interval.ms'=> 100, 'offset.store.method'=> 'broker', 'offset.store.path'=> sys_get_temp_dir(), 'auto.offset.reset'=> 'smallest', ];

defaultDrMsg

function defaultDrMsg($kafka, $message) { file_put_contents($this->config['log_path'] . "/dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND); }

defaultErrorCb

function defaultErrorCb($kafka, $err, $reason) { file_put_contents($this->config['log_path'] . "/err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND); }

defaultRebalance

function defaultRebalance(RdKafkaKafkaConsumer $kafka, $err, array $partitions = null) { switch ($err) { case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: echo "Assign: "; if (is_null($this->getCurrentTopic())) { $kafka->assign(); } else { $kafka->assign([ new RdKafkaTopicPartition( $this->getCurrentTopic(), $this->getPartition($this->getCurrentTopic()), $this->getOffset($this->getCurrentTopic()) ) ]); } break; case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: echo "Revoke: "; var_dump($partitions); $kafka->assign(NULL); break; default: throw new Exception($err); } }

版权声明:

1、该文章(资料)来源于互联网公开信息,我方只是对该内容做点评,所分享的下载地址为原作者公开地址。
2、网站不提供资料下载,如需下载请到原作者页面进行下载。
3、本站所有内容均由合作方或网友上传,本站不对文档的完整性、权威性及其观点立场正确性做任何保证或承诺!文档内容仅供研究参考学习用!
4、如文档内容存在违规,或者侵犯商业秘密、侵犯著作权等,请点击“违规举报”。