kafaka学习笔记之快速入门

业务需要,最近在看尚硅谷的教程,本着好记性不如烂笔头的原则,有了这一系列的文章。

关键词:kafaka

安装部署

docker安装

这里使用docker的安装方式,这里贴出使用的镜像地址:https://hub.docker.com/r/bitnami/kafka/

docker安装方式十分简单,这里贴出使用的docker-compose.yml文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
version: "3"
services:
zookeeper:
image: 'bitnami/zookeeper:latest'
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:latest'
ports:
- '9092:9092'
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181/kafka
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper

有几点需要注意:

  • KAFKA_BROKER_ID:设置broker.id,每一个broker必须设置不同的值,否则会启动失败;

  • KAFKA_CFG_LISTENERS:设置kafka服务端监听的网络地址,这里是默认值0.0.0.0;

  • KAFKA_CFG_ADVERTISED_LISTENERS:向外通告kafka实例的网络地址,一般使用宿主机的局域网地址或者公网地址;

  • KAFKA_CFG_ZOOKEEPER_CONNECT:设置zookeeper的地址,这里最好是加上/kafka后缀,不然导致zookeeper的一级目录看起来混乱;

配置文件

可以通过环境变量快速设置几个关键选项,其他选项可以通过挂载配置文件进行修改:

  • 环境变量设置

The configuration can easily be setup with the Bitnami Apache Kafka Docker image using the following environment variables:

  • ALLOW_PLAINTEXT_LISTENER: Allow to use the PLAINTEXT listener. Default: no.
  • KAFKA_INTER_BROKER_USER: Apache Kafka inter broker communication user. Default: admin. Default: user.
  • KAFKA_INTER_BROKER_PASSWORD: Apache Kafka inter broker communication password. Default: bitnami.
  • KAFKA_CERTIFICATE_PASSWORD: Password for certificates. No defaults.
  • KAFKA_HEAP_OPTS: Apache Kafka's Java Heap size. Default: -Xmx1024m -Xms1024m.
  • KAFKA_ZOOKEEPER_PROTOCOL: Authentication protocol for Zookeeper connections. Allowed protocols: PLAINTEXT, SASL, SSL, and SASL_SSL. Defaults: PLAINTEXT.
  • KAFKA_ZOOKEEPER_USER: Apache Kafka Zookeeper user for SASL authentication. No defaults.
  • KAFKA_ZOOKEEPER_PASSWORD: Apache Kafka Zookeeper user password for SASL authentication. No defaults.
  • KAFKA_ZOOKEEPER_TLS_KEYSTORE_PASSWORD: Apache Kafka Zookeeper keystore file password and key password. No defaults.
  • KAFKA_ZOOKEEPER_TLS_TRUSTSTORE_PASSWORD: Apache Kafka Zookeeper truststore file password. No defaults.
  • KAFKA_ZOOKEEPER_TLS_VERIFY_HOSTNAME: Verify Zookeeper hostname on TLS certificates. Defaults: true.
  • KAFKA_ZOOKEEPER_TLS_TYPE: Choose the TLS certificate format to use. Allowed values: JKS, PEM. Defaults: JKS.
  • KAFKA_CFG_SASL_ENABLED_MECHANISMS: Allowed mechanism when using SASL either for clients, inter broker, or zookeeper comunications. Allowed values: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512 or a comma separated combination of those values. Default: PLAIN,SCRAM-SHA-256,SCRAM-SHA-512
  • KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: SASL mechanism to use for inter broker communications. No defaults.
  • KAFKA_TLS_CLIENT_AUTH: Configures kafka brokers to request client authentication. Allowed values: required, requested, none. Defaults: required.
  • KAFKA_TLS_TYPE: Choose the TLS certificate format to use. Allowed values: JKS, PEM. Defaults: JKS.
  • KAFKA_CLIENT_USERS: Users that will be created into Zookeeper when using SASL for client communications. Separated by commas. Default: user
  • KAFKA_CLIENT_PASSWORDS: Passwords for the users specified atKAFKA_CLIENT_USERS. Separated by commas. Default: bitnami
  • KAFKA_CFG_MAX_PARTITION_FETCH_BYTES: The maximum amount of data per-partition the server will return. Default: 1048576
  • KAFKA_CFG_MAX_REQUEST_SIZE: The maximum size of a request in bytes. Default: 1048576

Additionally, any environment variable beginning with KAFKA_CFG_ will be mapped to its corresponding Apache Kafka key. For example, use KAFKA_CFG_BACKGROUND_THREADS in order to set background.threads or KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE in order to configure auto.create.topics.enable.

  • 挂载配置文件
1
2
3
4
5
6
7
...
services:
kafka:
...
volumes:
- 'kafka_data:/bitnami'
+ - /path/to/server.properties:/bitnami/kafka/config/server.properties

一些推荐配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#broker 的全局唯一编号,不能重复,只能是数字。 
broker.id=0
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘 IO 的线程数量
num.io.threads=8 #发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志(数据)存放的路径,路径不需要提前创建,kafka 自动帮你创建,可以 配置多个磁盘路径,路径与路径之间可以用","分隔
log.dirs=/opt/module/kafka/datas
#topic 在当前 broker 上的分区个数
num.partitions=1
#用来恢复和清理 data 下数据的线程数量
num.recovery.threads.per.data.dir=1
# 每个topic创建时的副本数,默认时1个副本
offsets.topic.replication.factor=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#每个 segment 文件的大小,默认最大 1G
log.segment.bytes=1073741824
# 检查过期数据的时间,默认5分钟检查一次是否数据过期
log.retention.check.interval.ms=300000
#配置连接 Zookeeper 集群地址(在 zk 根目录下创建/kafka,方便管理)
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka

kafka命令行操作

主题命令行参数

1
bin/kafka-topics.sh
参数 描述
--bootstrap-server <String: server toconnect to> 链接kafka broker主机名称
--topic <String: topic> 操作的 topic 名称
--create 创建主题
--alter 修改主题
--delete 删除主题
--list 查看所有主题
--describe 查看主题详细描述
--partitions <Integer: # of partitions> 设置分区数
--replication-factor<Integer: replication factor> 设置分区副本
--config <String: name=value> 更新系统默认的配置

主题命令行相关操作

1
2
3
4
5
6
7
8
9
10
# 查看当前服务器所有topic
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --list
# 指定topic名、定义副本数、分区数,创建topic
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --partitions 1 --replication-factor 3 --topic first
# 查看first主题的详情
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --describe --topic first
# 修改分区数(分区数只能增加不能减少)
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --alter --topic first --partitions 3
# 删除主题(需要将server.properties 中设置 delete.topic.enable=true,否则只是标记删除)
bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --delete --topic first

生产者命令行操作

生产者命令行参数

参数 描述
--bootstrap-server <String: server toconnect to> 连接的 Kafka Broker 主机名称和端口号
--topic <String: topic> 操作的 topic 名称

发送消息

1
bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first

消费者命令行参操作

消费者命令行参数

参数 描述
--bootstrap-server <String: server toconnect to> 链接的kafka broker主机名和短端口号
--topic 操作的topic名称
--from-beginning 从头开始消费
--group <String: consumer group id> 指定消费者组名称

消费消息

1
2
3
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
# 从主题的开始消费消息
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first