Step 1: 下载Kafka
(官网地址:)
Kafka入门经典教程
php kafka:
php kafka 例子:
二、环境搭建
Step 1: 下载Kafka(官网地址:)
点击下载最新的版本并解压.- > tar -xzf kafka_2.9.2-0.8.1.1.tgz
- > cd kafka_2.9.2-0.8.1.1
Step 2: 启动服务Kafka用到了Zookeeper,所有首先启动Zookper,下面简单的启用一个单实例的Zookkeeper服务。可以在命令的结尾加个&符号,这样就可以启动后离开控制台。
- > bin/zookeeper-server-start.sh config/zookeeper.properties &
- [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
- ...
现在启动Kafka:
- > bin/kafka-server-start.sh config/server.properties
- [2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
- [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
- ...
Step 3: 创建 topic创建一个叫做“test”的topic,它只有一个分区,一个副本。
- > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
可以通过list命令查看创建的topic:
- > bin/kafka-topics.sh --list --zookeeper localhost:2181
- test
除了手动创建topic,还可以配置broker让它自动创建topic.Step 4:发送消息.Kafka 使用一个简单的命令行producer,从文件中或者从标准输入中读取消息并发送到服务端。默认的每条命令将发送一条消息。运行producer并在控制台中输一些消息,这些消息将被发送到服务端:
- > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
- This is a messageThis is another message
ctrl+c可以退出发送。Step 5: 启动consumerKafka also has a command line consumer that will dump out messages to standard output.Kafka也有一个命令行consumer可以读取消息并输出到标准输出:
- > bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
- This is a message
- This is another message
你在一个终端中运行consumer命令行,另一个终端中运行producer命令行,就可以在一个终端输入消息,另一个终端读取消息。这两个命令都有自己的可选参数,可以在运行的时候不加任何参数可以看到帮助信息。
php kafka:
1、安装librdkafka
cd /usr/local/src #进入安装包存放目录
wget https://github.com/edenhill/librdkafka/archive/master.zip #下载
mv master.zip librdkafka-master.zip #修改包名
unzip librdkafka-master.zip #解压
cd librdkafka-master #进入安装文件夹
./configure #配置
make #编译
make install #安装
2、安装phpkafka
cd /usr/local/src #进入安装包存放目录
wget https://github.com/EVODelavega/phpkafka/archive/master.zip #下载
mv master.zip phpkafka-master.zip #修改包名
unzip phpkafka-master.zip #解压
cd phpkafka-master #进入安装文件夹
/usr/local/php/bin/phpize #加载php扩展模块
./configure --enable-kafka --with-php-config=/usr/local/php/bin/php-config #配置
make #编译
make install #安装
3、修改php配置文件
vi /usr/local/php/etc/php.ini #打开php配置文件,在最后一行添加下面的代码
extension="kafka.so"
:wq! #保存退出
Requirements:
Download and install . Run sudo ldconfig
to update shared libraries.
Installing PHP extension:
1 phpize2 ./configure --enable-kafka3 make4 sudo make install5 sudo sh -c 'echo "extension=kafka.so" >> /etc/php5/conf.d/kafka.ini'6 #For CLI mode:7 sudo sh -c 'echo "extension=kafka.so" >> /etc/php5/cli/conf.d/20-kafka.ini'
Examples:
1 // Produce a message 2 $kafka = new Kafka("localhost:9092"); 3 $kafka->produce("topic_name", "message content"); 4 //get all the available partitions 5 $partitions = $kafka->getPartitionsForTopic('topic_name'); 6 //use it to OPTIONALLY specify a partition to consume from 7 //if not, consuming IS slower. To set the partition: 8 $kafka->setPartition($partitions[0]);//set to first partition 9 //then consume, for example, starting with the first offset, consume 20 messages10 $msg = $kafka->consume("topic_name", Kafka::OFFSET_BEGIN, 20);11 var_dump($msg);//dumps array of messages
php kafka 例子:
1 $host,16 "client_ip" => $client_ip,17 "method" => $method,18 "product" => $product,19 "ua" => $ua,20 "product" => $product,21 );22 $json = json_encode($message);23 print_r($json);24 echo "\n";25 return $json;26 }27 while(1) {28 $json = create_message();29 $kafka->produce("test", $json);30 //get all the available partitions31 $partitions = $kafka->getPartitionsForTopic('test');32 //use it to OPTIONALLY specify a partition to consume from33 //if not, consuming IS slower. To set the partition:34 $kafka->setPartition($partitions[0]);//set to first partition35 //then consume, for example, starting with the first offset, consume 20 messages36 $msg = $kafka->consume("test", Kafka::OFFSET_BEGIN, 20);37 usleep(300);38 }39 //var_dump($msg);//dumps array of messages40 ?>