kafka安装
下载安装
到官网http://kafka.apache.org/downloads.html下载想要的版本;我这里下载的最新稳定版2.1.0
注:由于Kafka控制台脚本对于基于Unix和Windows的平台是不同的,因此在Windows平台上使用bin\windows\ 而不是bin/ 将脚本扩展名更改为.bat。
1 2 3 | [root@along ~]
[root@along ~]
[root@along ~]
|
配置启动zookeeper
kafka正常运行,必须配置zookeeper,否则无论是kafka集群还是客户端的生存者和消费者都无法正常的工作的;所以需要配置启动zookeeper服务。
(1)zookeeper需要java环境
(2)这里kafka下载包已经包括zookeeper服务,所以只需修改配置文件,启动即可。
如果需要下载指定zookeeper版本;可以单独去zookeeper官网http://mirrors.shu.edu.cn/apache/zookeeper/下载指定版本。
1 2 3 4 5 | [root@along ~]
[root@along kafka_2.11-2.1.0]
dataDir= /tmp/zookeeper
clientPort=2181
maxClientCnxns=0
|
注:可自行添加修改zookeeper配置
配置kafka
(1)修改配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 | [root@along kafka_2.11-2.1.0]
broker. id =0
listeners=PLAINTEXT: //localhost :9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log. dirs = /tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data. dir =1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
|
注:可根据自己需求修改配置文件
(2)配置环境变量
1 2 3 4 | [root@along ~]
export KAFKA_HOME= "/data/kafka_2.11-2.1.0"
export PATH= "${KAFKA_HOME}/bin:$PATH"
[root@along ~]
|
(3)配置服务启动脚本
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 | [root@along ~]
#!/bin/sh
source /etc/rc .d /init .d /functions
KAFKA_HOME= /data/kafka_2 .11-2.1.0
KAFKA_USER=root
export LOG_DIR= /tmp/kafka-logs
[ -e /etc/sysconfig/kafka ] && . /etc/sysconfig/kafka
case "$1" in
start)
echo -n "Starting Kafka:"
/sbin/runuser -s /bin/sh $KAFKA_USER -c "nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties > $LOG_DIR/server.out 2> $LOG_DIR/server.err &"
echo " done."
exit 0
;;
stop)
echo -n "Stopping Kafka: "
/sbin/runuser -s /bin/sh $KAFKA_USER -c "ps -ef | grep kafka.Kafka | grep -v grep | awk '{print \$2}' | xargs kill \-9"
echo " done."
exit 0
;;
hardstop)
echo -n "Stopping (hard) Kafka: "
/sbin/runuser -s /bin/sh $KAFKA_USER -c "ps -ef | grep kafka.Kafka | grep -v grep | awk '{print \$2}' | xargs kill -9"
echo " done."
exit 0
;;
status)
c_pid=` ps -ef | grep kafka.Kafka | grep - v grep | awk '{print $2}' `
if [ "$c_pid" = "" ] ; then
echo "Stopped"
exit 3
else
echo "Running $c_pid"
exit 0
fi
;;
restart)
stop
start
;;
*)
echo "Usage: kafka {start|stop|hardstop|status|restart}"
exit 1
;;
esac
|
启动kafka服务
(1)后台启动zookeeper服务
(2)启动kafka服务
1 2 3 4 5 6 7 8 | [root@along ~]
Starting kafka (via systemctl): [ OK ]
[root@along ~]
Running 86018
[root@along ~]
Netid State Recv-Q Send-Q Local Address:Port Peer Address:Port
tcp LISTEN 0 50 :::9092 :::*
tcp LISTEN 0 50 :::2181 :::*
|
kafka使用简单入门
创建主题topics
创建一个名为“along”的主题,它只包含一个分区,只有一个副本:
1 2 | [root@along ~]
Created topic "along" .
|
如果我们运行list topic命令,我们现在可以看到该主题:
发送一些消息
Kafka附带一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到Kafka集群。默认情况下,每行将作为单独的消息发送。
运行生产者,然后在控制台中键入一些消息以发送到服务器。
1 2 3 | [root@along ~]
>This is a message
>This is another message
|
启动消费者
Kafka还有一个命令行使用者,它会将消息转储到标准输出。
1 2 3 | [root@along ~]
This is a message
This is another message
|
所有命令行工具都有其他选项; 运行不带参数的命令将显示更详细地记录它们的使用信息。
以上资料均来源自互联网