博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
部分转 php kafka
阅读量:6847 次
发布时间:2019-06-26

本文共 4263 字,大约阅读时间需要 14 分钟。

 

Step 1: 下载Kafka

(官网地址:)

 

Kafka入门经典教程

 

 

php kafka:

php kafka 例子:

 

二、环境搭建

Step 1: 下载Kafka

(官网地址:)

点击下载最新的版本并解压.

  1. > tar -xzf kafka_2.9.2-0.8.1.1.tgz
  2. > cd kafka_2.9.2-0.8.1.1
复制代码

Step 2: 启动服务
Kafka用到了Zookeeper,所有首先启动Zookper,下面简单的启用一个单实例的Zookkeeper服务。可以在命令的结尾加个&符号,这样就可以启动后离开控制台。

  1. > bin/zookeeper-server-start.sh config/zookeeper.properties &
  2. [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
  3. ...
复制代码

现在启动Kafka:

  1. > bin/kafka-server-start.sh config/server.properties
  2. [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
  3. [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
  4. ...
复制代码

Step 3: 创建 topic
创建一个叫做“test”的topic,它只有一个分区,一个副本。

  1. > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
复制代码

可以通过list命令查看创建的topic:

  1. > bin/kafka-topics.sh --list --zookeeper localhost:2181
  2. test
复制代码

除了手动创建topic,还可以配置broker让它自动创建topic.
Step 4:发送消息.
Kafka 使用一个简单的命令行producer,从文件中或者从标准输入中读取消息并发送到服务端。默认的每条命令将发送一条消息。
运行producer并在控制台中输一些消息,这些消息将被发送到服务端:

  1. > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
  2. This is a messageThis is another message
复制代码

ctrl+c可以退出发送。
Step 5: 启动consumer
Kafka also has a command line consumer that will dump out messages to standard output.
Kafka也有一个命令行consumer可以读取消息并输出到标准输出:

  1. > bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
  2. This is a message
  3. This is another message
复制代码

你在一个终端中运行consumer命令行,另一个终端中运行producer命令行,就可以在一个终端输入消息,另一个终端读取消息。
这两个命令都有自己的可选参数,可以在运行的时候不加任何参数可以看到帮助信息。

 

php kafka:

 

1、安装librdkafka

cd /usr/local/src #进入安装包存放目录

wget https://github.com/edenhill/librdkafka/archive/master.zip #下载

mv master.zip librdkafka-master.zip #修改包名

unzip librdkafka-master.zip #解压

cd librdkafka-master #进入安装文件夹

./configure #配置

make #编译

make install #安装

2、安装phpkafka

cd /usr/local/src #进入安装包存放目录

wget https://github.com/EVODelavega/phpkafka/archive/master.zip #下载

mv master.zip phpkafka-master.zip #修改包名

unzip phpkafka-master.zip #解压

cd phpkafka-master #进入安装文件夹

/usr/local/php/bin/phpize #加载php扩展模块

./configure --enable-kafka --with-php-config=/usr/local/php/bin/php-config #配置

make #编译

make install #安装

3、修改php配置文件

vi /usr/local/php/etc/php.ini #打开php配置文件,在最后一行添加下面的代码

extension="kafka.so"

:wq! #保存退出

 

 

Requirements:

Download and install . Run sudo ldconfig to update shared libraries.

Installing PHP extension:

 

1 phpize2 ./configure --enable-kafka3 make4 sudo make install5 sudo sh -c 'echo "extension=kafka.so" >> /etc/php5/conf.d/kafka.ini'6 #For CLI mode:7 sudo sh -c 'echo "extension=kafka.so" >> /etc/php5/cli/conf.d/20-kafka.ini'

 

Examples:

1 // Produce a message 2 $kafka = new Kafka("localhost:9092"); 3 $kafka->produce("topic_name", "message content"); 4 //get all the available partitions 5 $partitions = $kafka->getPartitionsForTopic('topic_name'); 6 //use it to OPTIONALLY specify a partition to consume from 7 //if not, consuming IS slower. To set the partition: 8 $kafka->setPartition($partitions[0]);//set to first partition 9 //then consume, for example, starting with the first offset, consume 20 messages10 $msg = $kafka->consume("topic_name", Kafka::OFFSET_BEGIN, 20);11 var_dump($msg);//dumps array of messages

 

php kafka 例子:

 

1 
$host,16 "client_ip" => $client_ip,17 "method" => $method,18 "product" => $product,19 "ua" => $ua,20 "product" => $product,21 );22 $json = json_encode($message);23 print_r($json);24 echo "\n";25 return $json;26 }27 while(1) {28 $json = create_message();29 $kafka->produce("test", $json);30 //get all the available partitions31 $partitions = $kafka->getPartitionsForTopic('test');32 //use it to OPTIONALLY specify a partition to consume from33 //if not, consuming IS slower. To set the partition:34 $kafka->setPartition($partitions[0]);//set to first partition35 //then consume, for example, starting with the first offset, consume 20 messages36 $msg = $kafka->consume("test", Kafka::OFFSET_BEGIN, 20);37 usleep(300);38 }39 //var_dump($msg);//dumps array of messages40 ?>

 

转载于:https://www.cnblogs.com/njczy2010/p/5711729.html

你可能感兴趣的文章
Docker存储驱动之AUFS简介
查看>>
Java中如何封装自己的类,建立并使用自己的类库?
查看>>
Java Http请求工具类
查看>>
iscsi集群搭建
查看>>
Spring IOC FactoryBean检测与获取Bean
查看>>
Server certificate verification failed: certificate issued for a different hostn
查看>>
Flutter Web - 目标全平台开发的Flutter再下一城!
查看>>
Nginx代理Tomcat
查看>>
Apache与Tomcat的区别
查看>>
mysql—Access denied for user 'root'@'localhost' (using password:NO)
查看>>
hibernate 懒加载异常
查看>>
python3的zip函数
查看>>
cp命令
查看>>
Paxos算法详细图解
查看>>
如何用Exchange Server 2003 构建多域名邮件系统
查看>>
configparser.ConfigParser.writer()
查看>>
httpd服务如何开机启动
查看>>
JAVA帮助文档全系列 JDK1.5 JDK1.6 JDK1.7 官方中英完整版下载
查看>>
Linux学习笔记之bash
查看>>
android 1.6中LinearLayout getBaseline函数的一个bug
查看>>