部署三台服务器的高可用 kafka 环境。
Server1:192.168.1.87
Server2:192.168.1.88
Server3:192.168.1.89
https://kafka.apache.org/downloads
1、各节点部署
节点1
root@kafka01:~# cd /usr/local/src/
root@kafka01:/usr/local/src# wget https://archive.apache.org/dist/kafka/2.3.1/kafka_2.12-2.3.1.tgz
root@kafka01:/usr/local/src# tar -xzf kafka_2.12-2.3.1.tgz
root@kafka01:/usr/local/src# ln -sv /usr/local/src/kafka_2.12-2.3.1 /usr/local/kafka
'/usr/local/kafka' -> '/usr/local/src/kafka_2.12-2.3.1'
root@kafka01:/usr/local/src# cd /usr/local/kafka/config/
root@kafka01:/usr/local/kafka/config# grep ^[a-Z] server.properties
broker.id=1
listeners=PLAINTEXT://192.168.1.87:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.1.84:2181,192.168.1.85:2181,192.168.1.86:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
节点2
root@kafka02:~# cd /usr/local/src/
root@kafka02:/usr/local/src# wget https://archive.apache.org/dist/kafka/2.3.1/kafka_2.12-2.3.1.tgz
root@kafka02:/usr/local/src# tar -xzf kafka_2.12-2.3.1.tgz
root@kafka02:/usr/local/src# ln -sv /usr/local/src/kafka_2.12-2.3.1 /usr/local/kafka
'/usr/local/kafka' -> '/usr/local/src/kafka_2.12-2.3.1'
root@kafka02:/usr/local/src# cd /usr/local/kafka/config/
root@kafka02:/usr/local/kafka/config# grep ^[a-Z] server.properties
broker.id=2
listeners=PLAINTEXT://192.168.1.88:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.1.84:2181,192.168.1.85:2181,192.168.1.86:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
节点3
root@kafka03:~# cd /usr/local/src/
root@kafka03:/usr/local/src# wget https://archive.apache.org/dist/kafka/2.3.1/kafka_2.12-2.3.1.tgz
root@kafka03:/usr/local/src# tar -xzf kafka_2.12-2.3.1.tgz
root@kafka03:/usr/local/src# ln -sv /usr/local/src/kafka_2.12-2.3.1 /usr/local/kafka
'/usr/local/kafka' -> '/usr/local/src/kafka_2.12-2.3.1'
root@kafka03:/usr/local/src# cd /usr/local/kafka/config/
root@kafka03:/usr/local/kafka/config# grep ^[a-Z] server.properties
broker.id=3
listeners=PLAINTEXT://192.168.1.89:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.1.84:2181,192.168.1.85:2181,192.168.1.86:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
配置说明
21 broker.id=1 每个 broker 在集群中的唯一标识,正整数。
31 listeners=PLAINTEXT://172.18.0.101:9092 #监听地址
60 log.dirs=/usr/local/kafka/kafka-logs #kakfa 用于保存数据的目录,所有的消息
都会存储在该目录当中
65 num.partitions=1 #设置创新新的 topic 默认分区数量
103 log.retention.hours=168 #设置 kafka 中消息保留时间,默认为 168 小时即 7 天
# zookeeper.connect 指定连接的 zk 的地址,zk 中存储了 broker 的元数据信息,格式如下:
123 zookeeper.connect=192.168.1.84:2181,192.168.1.85:2181,192.168.1.86:2181
126 zookeeper.connection.timeout.ms=6000 #设置连接 zookeeper 的超时时间,默认 6 秒钟
2、各节点启动
# /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties
&
# 此方式 zookeeper 会在 shell 断开后关
节点 1
# 创建数据目录
root@kafka01:~# mkdir /usr/local/kafka/kafka-logs
# 以守护进程启动
root@kafka01:~# /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
![图片[1]-kafka 部署-李佳程的个人主页](http://www.lijiach.com/wp-content/uploads/2022/12/image-290.png)
节点 2
root@kafka02:~# mkdir /usr/local/kafka/kafka-logs
root@kafka02:~# /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
![图片[2]-kafka 部署-李佳程的个人主页](http://www.lijiach.com/wp-content/uploads/2022/12/image-291.png)
节点 3
root@kafka03:~# mkdir /usr/local/kafka/kafka-logs
root@kafka03:~# /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
![图片[3]-kafka 部署-李佳程的个人主页](http://www.lijiach.com/wp-content/uploads/2022/12/image-292.png)
3、验证 zookeeper 中 kafka 元数据
![图片[4]-kafka 部署-李佳程的个人主页](http://www.lijiach.com/wp-content/uploads/2022/12/image-293.png)
- Broker 依赖于 Zookeeper,每个 Broker 的 id 和 Topic、Partition 这些元数据信息都会写入 Zookeeper 的 ZNode 节点中;
- Consumer 依赖于 Zookeeper,Consumer 在消费消息时,每消费完一条消息,会将产生的 offset 保存到 Zookeeper 中,下次消费在当前 offset 往后继续消费;ps:kafka0.9 之前 Consumer 的 offset 存储在 Zookeeper 中,kafka0,9 以后 offset存储在本地。
- Partition 依赖于 Zookeeper,Partition 完成 Replication 备份后,选举出一个 Leader,这个是依托于Zookeeper 的选举机制实现的
4、测试 kafka 读写数据
创建 topic
创建名为 logstashtest,partitions(分区)为 3,replication(每个分区的副本数/每个分区的分区因子)为 3 的 topic(主题):
在任意 kafaka 服务器操作:
root@kafka01:~# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.1.84:2181,192.168.1.85:2181,192.168.1.86:2181 --partitions 3 --replication-factor 3 --topic test1
Created topic test1.
验证 topic
状态说明:logstashtest 有三个分区分别为 0、1、2,分区 0 的 leader 是 3(broker.id),分区 0 有三个副本,并且状态都为 lsr(ln-sync,表示可以参加选举成为 leader)。
root@kafka01:~# /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.1.84:2181,192.168.1.85:2181,192.168.1.86:2181 --topic test1
Topic:test1 PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test1 Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: test1 Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: test1 Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
获取所有 topic
root@kafka03:~# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.1.84:2181,192.168.1.85:2181,192.168.1.86:2181
test1
测试发送消息
root@kafka01:~# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.1.87:9092,192.168.1.88:9092,192.168.1.89:9092 --topic test1
>msg1
>msg2
>msg3
测试获取消息
root@kafka03:~# /usr/local/kafka/bin/kafka-console-consumer.sh --topic test1 --bootstrap-server 192.168.1.88:9092 --from-beginning
msg1
msg2
msg3
删除 topic
root@kafka03:~# # /usr/local/kafka/bin/kafka-topics.sh --delete --zookeeper 192.168.1.84:2181,192.168.1.85:2181,192.168.1.86:2181 --topic test1
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END