Ubuntu安装kafka和zookeeper并设置开机自动启动

kafka是一个分布式消息队列。具有高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。一般在架构设计中起到解耦、削峰、异步处理的作用。kafka对外使用topic的概念,生产者往topic里写消息,消费者从读消息。为了做到水平扩展,一个topic实际是由多个partition组成的,遇到瓶颈时,可以通过增加partition的数量来进行横向扩容。单个parition内是保证消息有序。每新写一条消息,kafka就是在对应的文件append写,所以性能非常高。

本文介绍如何在ubuntu上安装kafka并设置开机自动启动。

关键词:kafka,ubuntu,linux

安装kafka

下载

下载地址:https://kafka.apache.org/downloads

这里以 kafka_2.12-3.0.0.tgz 为例。

解压

下载完毕以后上传到服务器并将其解压:

1
tar -xvzf kafka_2.12-3.0.0.tgz

修改配置

默认配置 advertised.listeners=PLAINTEXT://your.host.name:9092修改为

advertised.listeners=PLAINTEXT://<ip>:9092

其中ip是服务器ip,hostname和端口是用来建议给生产者和消费者使用的,如果没有设置,将会使用listener的设置;

如果listener也没有设置,将使用java.net.InetAddress.getCanonicalHostName()来获取hostname和port,通常这个获取的值就是localhost.

"PLAINTEXT"表示协议,可选的值有PLAINTEXT和SSL,hostname可以指定IP地址,不可以用"0.0.0.0",一般用公网ip或者局域网内的ip

如果hostname为空表示只对默认的网络接口(localhost)有效;如果没有配置advertised.listeners,就使用listeners的配置通告给消息的生产者和消费者,这个过程是在生产者和消费者获取源数据(metadata)。

其他配置

见文末。

运行kafka

安装JDK

运行kafka之前还需要安装JDK,具体可参考笔者的这篇文章:

Ubuntu配置jdk-12环境

运行zookeeper

运行kafka之前需要先打开zookeeper,否则会报错。

可以在zookeeper.properties修改存放数据的文件地址,默认是/tmp/zookeeper

1
2
cd kafka_2.12-3.0.0
./bin/zookeeper-server-start.sh config/zookeeper.properties

运行成功:

运行kafka

1
2
cd kafka_2.12-3.0.0
./bin/kafka-server-start.sh config/server.properties

运行成功:

kafka终端读取(消费)数据

Windows:

1
bin\windows\kafka-console-consumer.bat --bootstrap-server=127.0.0.1:9092 --topic=web_log --from-beginning

设置自动启动

编写启动脚本

将上述两个命令封装成bash脚本:

  • init-zookeeper.sh:
1
2
3
4
5
6
7
8
#!/bin/sh
trap "echo 'Zookeeper Stopped.'; exit 0;" INT TERM
export JAVA_HOME=/usr/local/jdk-12.0.2
export PATH=$JAVA_HOME/bin:$PATH
echo "Init Zookeeper Service"
cd /home/wf09/kafka
./bin/zookeeper-server-start.sh config/zookeeper.properties
exit 255
  • init-kafka.sh
1
2
3
4
5
6
7
8
#!/bin/sh
trap "echo 'Kafka Stopped.'; exit 0;" INT TERM
export JAVA_HOME=/usr/local/jdk-12.0.2
export PATH=$JAVA_HOME/bin:$PATH
echo "Init Kafka Service"
cd /home/wf09/kafka
./bin/kafka-server-start.sh config/server.properties
exit 255

编写服务文件

  • zerokeeper.service
1
2
3
4
5
6
7
8
9
10
11
12
13
[Unit]
Description=zerokeeper-service
After=network.target

[Service]
#KillMode=mixed
User=wf09
Type=simple
ExecStart=/bin/bash /home/wf09/init-zookeeper.sh
ExecStop=/bin/kill -s SIGTERM $MAINPID

[Install]
WantedBy=multi-user.target
  • kafka.service
1
2
3
4
5
6
7
8
9
10
11
12
13
14
[Unit]
Description=Kafka-service
After=network.target
Requires=zookeeper.service

[Service]
#KillMode=mixed
User=wf09
Type=simple
ExecStart=/bin/bash /home/wf09/init-kafka.sh
ExecStop=/bin/kill -s SIGTERM $MAINPID

[Install]
WantedBy=multi-user.target

设置自动启动

设置自动启动可以参考笔者的这一篇文章

Ubuntu服务管理

kafka其他配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

##################################################################################
# broker就是一个kafka的部署实例,在一个kafka集群中,每一台kafka都要有一个broker.id
# 并且,该id唯一,且必须为整数
##################################################################################
broker.id=10

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = security_protocol://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

##################################################################################
#The number of threads handling network requests
# 默认处理网络请求的线程个数 3个
##################################################################################
num.network.threads=3
##################################################################################
# The number of threads doing disk I/O
# 执行磁盘IO操作的默认线程个数 8
##################################################################################
num.io.threads=8

##################################################################################
# The send buffer (SO_SNDBUF) used by the socket server
# socket服务使用的进行发送数据的缓冲区大小,默认100kb
##################################################################################
socket.send.buffer.bytes=102400

##################################################################################
# The receive buffer (SO_SNDBUF) used by the socket server
# socket服务使用的进行接受数据的缓冲区大小,默认100kb
##################################################################################
socket.receive.buffer.bytes=102400

##################################################################################
# The maximum size of a request that the socket server will accept (protection against OOM)
# socket服务所能够接受的最大的请求量,防止出现OOM(Out of memory)内存溢出,默认值为:100m
# (应该是socker server所能接受的一个请求的最大大小,默认为100M)
##################################################################################
socket.request.max.bytes=104857600

############################# Log Basics (数据相关部分,kafka的数据称为log)#############################

##################################################################################
# A comma seperated list of directories under which to store log files
# 一个用逗号分隔的目录列表,用于存储kafka接受到的数据
##################################################################################
log.dirs=/home/uplooking/data/kafka

##################################################################################
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
# 每一个topic所对应的log的partition分区数目,默认1个。更多的partition数目会提高消费
# 并行度,但是也会导致在kafka集群中有更多的文件进行传输
# (partition就是分布式存储,相当于是把一份数据分开几份来进行存储,即划分块、划分分区的意思)
##################################################################################
num.partitions=1

##################################################################################
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
# 每一个数据目录用于在启动kafka时恢复数据和在关闭时刷新数据的线程个数。如果kafka数据存储在磁盘阵列中
# 建议此值可以调整更大。
##################################################################################
num.recovery.threads.per.data.dir=1

############################# Log Flush Policy (数据刷新策略)#############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs(平衡) here:
# 1. Durability 持久性: Unflushed data may be lost if you are not using replication.
# 2. Latency 延时性: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput 吞吐量: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
# kafka中只有基于消息条数和时间间隔数来制定数据刷新策略,而没有大小的选项,这两个选项可以选择配置一个
# 当然也可以两个都配置,默认情况下两个都配置,配置如下。

# The number of messages to accept before forcing a flush of data to disk
# 消息刷新到磁盘中的消息条数阈值
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
# 消息刷新到磁盘生成一个log数据文件的时间间隔
#log.flush.interval.ms=1000

############################# Log Retention Policy(数据保留策略) #############################

# The following configurations control the disposal(清理) of log segments(分片). The policy can
# be set to delete segments after a period of time, or after a given size has accumulated(累积).
# A segment will be deleted whenever(无论什么时间) *either* of these criteria(标准) are met. Deletion always happens
# from the end of the log.
# 下面的配置用于控制数据片段的清理,只要满足其中一个策略(基于时间或基于大小),分片就会被删除

# The minimum age of a log file to be eligible for deletion
# 基于时间的策略,删除日志数据的时间,默认保存7天
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes. 1G
# 基于大小的策略,1G
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
# 数据分片策略
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies 5分钟
# 每隔多长时间检测数据是否达到删除条件
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=uplooking01:2181,uplooking02:2181,uplooking03:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

参考链接

  • https://www.cnblogs.com/toutou/p/linux_install_kafka.html