本站提供互联网编程技术交流分享,部分技术教程不断更新中,请随时关注或联系我寻求帮助 ,同时也欢迎有兴趣的朋友进行投稿。

Kafka_2.11-0.10.0.1 详细使用指南【建议收藏】

大数据 熊哥club 8677℃ 0评论

提到消息系统,目前最火热的非 Kafka 莫属,公司也打算利用 Kafka 进行各业务日志统一收集,这里结合自己的实践来分享一下具体的配置及使用。Kafka 版本 0.10.0.1


 

介绍

作为云计算大数据的套件,Kafka 是一个分布式的、可分区的、可复制的消息系统。该有的功能基本都有,而且有自己的特色:

  • 以 topic 为单位进行消息归纳
  • 向 topic 发布消息的是 producer
  • 从 topic 获取消息的是 consumer
  • 集群方式运行,每个服务叫 broker
  • 客户端和服务器通过 TCP 进行通信

在Kafka集群中,没有“中心主节点”的概念,集群中所有的服务器都是对等的,因此,可以在不做任何配置的更改的情况下实现服务器的的添加与删除,同样的消息的生产者和消费者也能够做到随意重启和机器的上下线。

对每个 topic 来说,Kafka 会对其进行分区,每个分区都由一系列有序的、不可变的消息组成,这些消息被连续的追加到分区中。分区中的每个消息都有一个连续的序列号叫做 offset,用来在分区中唯一的标识这个消息。

发布消息通常有两种模式:队列模式(queuing)和发布-订阅模式(publish-subscribe)。队列模式中,consumers 可以同时从服务端读取消息,每个消息只被其中一个 consumer 读到;发布-订阅模式中消息被广播到所有的 consumer 中。更常见的是,每个 topic 都有若干数量的 consumer 组,每个组都是一个逻辑上的『订阅者』,为了容错和更好的稳定性,每个组由若干 consumer 组成。这其实就是一个发布-订阅模式,只不过订阅者是个组而不是单个 consumer。

通过分区的概念,Kafka可以在多个consumer组并发的情况下提供较好的有序性和负载均衡。将每个分区分只分发给一个consumer组,这样一个分区就只被这个组的一个consumer消费,就可以顺序的消费这个分区的消息。因为有多个分区,依然可以在多个consumer组之间进行负载均衡。注意consumer组的数量不能多于分区的数量,也就是有多少分区就允许多少并发消费。

Kafka 只能保证一个分区之内消息的有序性,在不同的分区之间是不可以的,这已经可以满足大部分应用的需求。如果需要 topic 中所有消息的有序性,那就只能让这个 topic 只有一个分区,当然也就只有一个 consumer 组消费它。

单机配置

按照下列步骤即可(来自官网教程)

1. 下载 Kafka

  • 下载 wget http://apache.01link.hk/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz 或者 wget http://ftp.cuhk.edu.hk/pub/packages/apache.org/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz(看哪个源比较快)
  • 解压 tar -xzf kafka_2.11-0.10.0.0.tgz
  • 进入文件夹 cd kafka_2.11-0.10.0.0/

2. 启动服务

  • 启动 ZooKeeper bin/zookeeper-server-start.sh config/zookeeper.properties &(利用 &放到后台方便继续操作)
  • 启动 Kafka bin/kafka-server-start.sh config/server.properties &

3. 创建一个叫做 dawang 的 topic,它只有一个分区,一个副本

  • 创建 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic dawang
  • 查看 bin/kafka-topics.sh --list --zookeeper localhost:2181
  • 还可以配置 broker 让它自动创建 topic

4. 发送消息。Kafka 使用一个简单的命令行producer,从文件中或者从标准输入中读取消息并发送到服务端。默认的每条命令将发送一条消息。

  • 发送消息 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic dawang(然后可以随意输入内容,回车可以发送,ctrl+c 退出)

5. 启动 consumer。可以读取消息并输出到标准输出:

  • 接收消息 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic dawang --from-beginning
  • 在一个终端中运行 consumer 命令行,另一个终端中运行 producer 命令行,就可以在一个终端输入消息,另一个终端读取消息。这两个命令都有自己的可选参数,可以在运行的时候不加任何参数可以看到帮助信息。

6. 搭建一个多个 broker 的集群,启动有 3 个 broker 组成的集群,这些 broker 节点也都在本机

首先复制一下配置文件:cp config/server.properties config/server-1.properties 和 cp config/server.properties config/server-2.properties

两个文件需要改动的内容为:

config/server-1.properties:
broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/tmp/kafka-logs-2

这里我们把 broker id, 端口号和日志地址配置成和之前不一样,然后我们启动这两个 broker:

bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &

然后创建一个复制因子为 3 的 topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic oh3topic

可以使用 describe 命令来显示 topic 详情

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic oh3topic
Topic:oh3topic PartitionCount:1 ReplicationFactor:3 Configs:
Topic: oh3topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2

这里简单解释一下

  • Leader 是给定分区的节点编号,每个分区的部分数据会随机指定不同的节点
  • Replicas 是该日志会保存的复制
  • Isr 表示正在同步的复制

我们也可以来看看之前的另一个 topic 的情况

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic dawang
Topic:dawang PartitionCount:1 ReplicationFactor:1 Configs:
Topic: dawang Partition: 0 Leader: 0 Replicas: 0 Isr: 0

最后我们可以按照同样的方法来生产和消费消息,例如

# 生产
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic oh3topic
# 消费
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic oh3topic

开俩终端就可以一边生产消息,一边消费消息了。

注意事项

如果要配置自定义端口,server.properties 中 listeners 一定要配置成为 IP 地址;如果配置为 localhost 或服务器的 hostname,在使用 java 发送数据时就会抛出异

# 创建 topic
bin/kafka-topics.sh --create --zookeeper bi03:2181 --replication-factor 1 --partitions 1 --topic logs
# 生产消息
bin/kafka-console-producer.sh --broker-list localhost:13647 --topic logs
# 消费消息
# bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic logs

如果 Zookeeper 出现 fsync-ing the write ahead log in SyncThread:1 took 2243ms which will adversely effect operation latency. See the ZooKeeper troubleshooting guide,是因为 FOLLOWER 在跟 LEADER 同步时,fsync 操作时间过长,导致超时。增加 tickTime 或者 initLimit 和 syncLimit 的值即可

集群配置

kafka 使用 ZooKeeper 用于管理、协调代理。每个 Kafka 代理通过 Zookeeper 协调其他 Kafka 代理。当 Kafka 系统中新增了代理或某个代理失效时,Zookeeper 服务将通知生产者和消费者。生产者与消费者据此开始与其他代理协调工作。

安装 Java

先给两台机子安装 Java

sudo add-apt-repository -y ppa:webupd8team/java
sudo apt-get update
sudo apt-get -y install oracle-java8-installer

更新 Hosts

这里用两台机器做例子(理论上最好是 3 台起步,偶数个不是不可以的,但是zookeeper集群是以宕机个数过半才会让整个集群宕机的,所以奇数个集群更佳),分别配置 /etc/hosts 文件为

127.0.0.1 localhost
10.1.1.164 bi03
10.1.1.44 bi02

修改 Zookeeper 配置文件

修改 config/zookeeper.properties 为

dataDir=/data/home/logger/kafka_2.11-0.10.0.0/zookeeper-logs/
clientPort=2181
# maxClientCnxns=0
tickTime=2000
initLimit=5
syncLimit=2
server.1=bi03:13645:13646
server.2=bi02:13645:13646

参数的意义为:

  • initLimit: zookeeper集群中的包含多台 server,其中一台为 leader,集群中其余的 server 为 follower。initLimit 参数配置初始化连接时,follower 和 leader 之间的最长心跳时间。此时该参数设置为 5,说明时间限制为 5 倍 tickTime,即 5*2000=10000ms=10s
  • syncLimit: 该参数配置 leader 和 follower 之间发送消息,请求和应答的最大时间长度。此时该参数设置为 2,说明时间限制为 2 倍 tickTime,即 4000ms
  • server.X=A:B:C 其中 X 是一个数字, 表示这是第几号 server。A 是该 server 所在的 IP 地址。B 配置该 server 和集群中的 leader 交换消息所使用的端口。C 配置选举 leader 时所使用的端口。

给服务器编号

在 dataDir 目录下建立一个 myid 文件,分别为

# server.1
echo 1 > myid
# server.2
echo 2 > myid

启动 Zookeeper

然后在每台机子上启动 zookeeper 服务

bin/zookeeper-server-start.sh config/zookeeper.properties &

所有机子的 zookeeper 都启动之前会报错,这都是正常的

如果不想要任何输出

nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

修改 Kafka 配置文件

修改 config/server.properties,几个要改的部分是

# 允许删除 topic
delete.topic.enable=true
broker.id=0 # 这里不能重复
listeners=PLAINTEXT://bi03:13647 # 这里要配置成本机的 host name
# 这里需要配置成外网能够访问的地址及端口
advertised.listeners=PLAINTEXT://external.ip:8080
log.dirs=/data/home/logger/kafka_2.11-0.10.0.0/kafka-logs
num.partitions=2
zookeeper.connect=bi03:2181,bi02:2181

启动 Kafka

在每个节点上执行

bin/kafka-server-start.sh config/server.properties &

如果不想要任何输出

nohup bin/kafka-server-start.sh config/server.properties &

验证安装

创建一个 topic

bin/kafka-topics.sh --create --zookeeper bi03:2181,bi02:2181 --replication-factor 2 --partitions 1 --topic test

查看集群状态

bin/kafka-topics.sh --describe --zookeeper bi03:2181,bi02:2181 --topic test

生产消息,这里注意要生产到前面设置的监听端口,而不是 zookeeper 的端口

bin/kafka-console-producer.sh --broker-list bi03:13647,bi02:13647 --topic test

消费消息,这里注意是 zookeeper 的端口,而不是 kafka 的端口

bin/kafka-console-consumer.sh --zookeeper bi03:2181,bi02:2181 --from-beginning --topic test

显示 topic 列表

bin/kafka-topics.sh --zookeeper bi03:2181,bi02:2181 --list

删除 topic

bin/kafka-topics.sh --zookeeper bi03:2181,bi02:2181 --delete --topic hello

其他配置

Kafka 使用键值对的属性文件格式来进行配置,比如 config/server.properties,具体的值可以从文件中读取,或者在代码中进行指定。最重要的三个属性是:

  • broker.id: broker 的编号,不能相同
  • log.dirs: 日志保存的文件夹,默认为 /tmp/kafka-logs
  • zookeeper.connect: zookeeper 的 host

其他一些我觉得比较有用的属性为

  • auto.create.topics.enable 是否允许自动创建 topic,boolean 值,默认为 true
  • auto.leader.rebalance.enable 是否允许 leader 进行自动平衡,boolean 值,默认为 true
  • background.threads 后台进程数目,int 值,默认为 10 个
  • compression.type 指定 topic 的压缩方式,string 值,可选有
    • gzipsnappylz4 压缩方法
    • uncompressed 不压缩
    • producer 跟随 producer 的压缩方式
  • delete.topic.enable 是否允许删除 topic,boolean 值,默认为 false(主要用于控制 admin 界面中的控制)
  • leader.imbalance.check.interval.seconds 检查是否平衡的时间间隔,long 值,默认为 300
  • leader.imbalance.per.broker.percentage 允许的不平衡的百分比,超出则会进行重平衡,int 值,默认为 10
  • log.flush.interval.messages 攒了多少条消息之后会把数据刷入磁盘,long 值,默认是 9223372036854775807
  • log.flush.interval.ms 每条消息在保存到磁盘中前会在内存中待多久,单位毫秒,long 值,如果不设定,默认使用 log.flush.scheduler.interval.ms,也就是 9223372036854775807

更多的配置可以参考这里,以上的配置均针对 broker,因为目前我只用 broker 的部分

基本操作

所有的工具都可以在 bin/ 文件夹下查看,如果不带任何参数,就会给出所有命令的列表说明,这里只简要说明一些常用的命令

创建和移除 topic

可以手动创建 topic,或在数据进来时自动创建不存在的 topic,如果是自动创建的话,可能需要根据这里来进行对应调整。

创建 topic

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y

replication-factor 控制复制的份数,建议 2-3 份来兼顾容错和效率。partitions 控制该 topic 将被分区的数目,partitions 的数目最好不要超过服务器的个数(因为分区的意义是增加并行效率,而服务器数量决定了并行的数量,假设只有 2 台服务器,分 4 个区和 2 个区其实差别不大)。另外,topic 的名称不能超过 249 个字符

修改 topic

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --partitions 40

这里需要注意,即使修改了分区的个数,已有的数据也不会进行变动,Kafka 不会做任何自动重分布

增加配置

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --config x=y

移除配置

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --delete-config x

删除 topic

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name

这个需要 delete.topic.enable=true,目前 Kafka 不支持减少 topic 的分区数目

优雅关闭

Kafka 会自动检测 broker 的状态并根据机器状态选举出新的 leader。但是如果需要进行配置更改停机的时候,我们就需要使用优雅关闭了,好处在于:

  1. 会把所有的日志同步到磁盘上,避免重启之后的日志恢复,减少重启时间
  2. 会在关闭前把以这台机为 leader 的分区数据迁移到其他节点,会减少不可用的时间

但是这个需要开启 controlled.shutdown.enable=true

刚重启之后的节点不是任何分区的 leader,所以这时候需要进行重新分配:

bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot

这里需要开启 auto.leader.rebalance.enable=true

然后可以使用脚本 bin/kafka-server-stop.sh

注意,如果配置文件中没有 auto.leader.rebalance.enable=true,就还需要重新平衡

深入理解

这里只是一部分摘录,更多内容可查阅参考链接(尤其是美团技术博客的那篇)

文件系统

Kafka 大量依赖文件系统去存储和缓存消息。而文件系统最终会放在硬盘上,不过不用担心,很多时候硬盘的快慢完全取决于使用它的方式。设计良好的硬盘架构可以和内存一样快。

所以与传统的将数据缓存在内存中然后刷到硬盘的设计不同,Kafka直接将数据写到了文件系统的日志中,因此也避开了 JVM 的劣势——Java 对象占用空间巨大,数据量增大后垃圾回收有困难。使用文件系统,即使系统重启了,也不需要刷新数据,也简化了维护数据一致性的逻辑。

对于主要用于日志处理的消息系统,数据的持久化可以简单的通过将数据追加到文件中实现,读的时候从文件中读就好了。这样做的好处是读和写都是 O(1) 的,并且读操作不会阻塞写操作和其他操作。这样带来的性能优势是很明显的,因为性能和数据的大小没有关系了。

既然可以使用几乎没有容量限制(相对于内存来说)的硬盘空间建立消息系统,就可以在没有性能损失的情况下提供一些一般消息系统不具备的特性。比如,一般的消息系统都是在消息被消费后立即删除,Kafka却可以将消息保存一段时间(比如一星期),这给consumer提供了很好的机动性和灵活性。

事务定义

数据传输的事务定义通常有以下三种级别:

  • 最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输。
  • 最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输.
  • 精确的一次(Exactly once): 不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅被传输一次,这是大家所期望的。

Kafka 的机制和 git 有点类似,有一个 commit 的概念,一旦提交且 broker 在工作,那么数据就不会丢失。如果 producer 发布消息时发生了网络错误,但又不确定实在提交之前发生的还是提交之后发生的,这种情况虽然不常见,但是必须考虑进去,现在Kafka版本还没有解决这个问题,将来的版本正在努力尝试解决。

并不是所有的情况都需要“精确的一次”这样高的级别,Kafka 允许 producer 灵活的指定级别。比如 producer 可以指定必须等待消息被提交的通知,或者完全的异步发送消息而不等待任何通知,或者仅仅等待 leader 声明它拿到了消息(followers没有必要)。

现在从 consumer 的方面考虑这个问题,所有的副本都有相同的日志文件和相同的offset,consumer 维护自己消费的消息的 offset。如果 consumer 崩溃了,会有另外一个 consumer 接着消费消息,它需要从一个合适的 offset 继续处理。这种情况下可以有以下选择:

  • consumer 可以先读取消息,然后将 offset 写入日志文件中,然后再处理消息。这存在一种可能就是在存储 offset 后还没处理消息就 crash 了,新的 consumer 继续从这个 offset 处理,那么就会有些消息永远不会被处理,这就是上面说的『最多一次』
  • consumer 可以先读取消息,处理消息,最后记录o ffset,当然如果在记录 offset 之前就 crash 了,新的 consumer 会重复的消费一些消息,这就是上面说的『最少一次』
  • 『精确一次』可以通过将提交分为两个阶段来解决:保存了 offset 后提交一次,消息处理成功之后再提交一次。但是还有个更简单的做法:将消息的 offset 和消息被处理后的结果保存在一起。比如用 Hadoop ETL 处理消息时,将处理后的结果和 offset 同时保存在 HDFS 中,这样就能保证消息和 offser 同时被处理了

性能优化

Kafka 在提高效率方面做了很大努力。Kafka 的一个主要使用场景是处理网站活动日志,吞吐量是非常大的,每个页面都会产生好多次写操作。读方面,假设每个消息只被消费一次,读的量的也是很大的,Kafka 也尽量使读的操作更轻量化。

线性读写的情况下影响磁盘性能问题大约有两个方面:太多的琐碎的 I/O 操作和太多的字节拷贝。I/O 问题发生在客户端和服务端之间,也发生在服务端内部的持久化的操作中。

消息集(message set)

为了避免这些问题,Kafka 建立了消息集(message set)的概念,将消息组织到一起,作为处理的单位。以消息集为单位处理消息,比以单个的消息为单位处理,会提升不少性能。Producer 把消息集一块发送给服务端,而不是一条条的发送;服务端把消息集一次性的追加到日志文件中,这样减少了琐碎的 I/O 操作。consumer 也可以一次性的请求一个消息集。

另外一个性能优化是在字节拷贝方面。在低负载的情况下这不是问题,但是在高负载的情况下它的影响还是很大的。为了避免这个问题,Kafka 使用了标准的二进制消息格式,这个格式可以在 producer, broker 和 producer 之间共享而无需做任何改动。

zero copy

Broker 维护的消息日志仅仅是一些目录文件,消息集以固定队的格式写入到日志文件中,这个格式 producer 和 consumer 是共享的,这使得 Kafka 可以一个很重要的点进行优化:消息在网络上的传递。现代的 unix 操作系统提供了高性能的将数据从页面缓存发送到 socket 的系统函数,在 linux 中,这个函数是 sendfile

为了更好的理解 sendfile 的好处,我们先来看下一般将数据从文件发送到 socket 的数据流向:

  • 操作系统把数据从文件拷贝内核中的页缓存中
  • 应用程序从页缓存从把数据拷贝自己的内存缓存中
  • 应用程序将数据写入到内核中 socket 缓存中
  • 操作系统把数据从 socket 缓存中拷贝到网卡接口缓存,从这里发送到网络上。

这显然是低效率的,有 4 次拷贝和 2 次系统调用。sendfile 通过直接将数据从页面缓存发送网卡接口缓存,避免了重复拷贝,大大的优化了性能。

在一个多consumers的场景里,数据仅仅被拷贝到页面缓存一次而不是每次消费消息的时候都重复的进行拷贝。这使得消息以近乎网络带宽的速率发送出去。这样在磁盘层面你几乎看不到任何的读操作,因为数据都是从页面缓存中直接发送到网络上去了。

数据压缩

很多时候,性能的瓶颈并非CPU或者硬盘而是网络带宽,对于需要在数据中心之间传送大量数据的应用更是如此。当然用户可以在没有 Kafka 支持的情况下各自压缩自己的消息,但是这将导致较低的压缩率,因为相比于将消息单独压缩,将大量文件压缩在一起才能起到最好的压缩效果。

Kafka 采用了端到端的压缩:因为有『消息集』的概念,客户端的消息可以一起被压缩后送到服务端,并以压缩后的格式写入日志文件,以压缩的格式发送到 consumer,消息从 producer 发出到 consumer 拿到都被是压缩的,只有在 consumer 使用的时候才被解压缩,所以叫做『端到端的压缩』。Kafka支持GZIP和Snappy压缩协议。

每个kafka broker中配置文件server.properties默认必须配置的属性如下:

  1. broker.id=0
  2. num.network.threads=2
  3. num.io.threads=8
  4. socket.send.buffer.bytes=1048576
  5. socket.receive.buffer.bytes=1048576
  6. socket.request.max.bytes=104857600
  7. log.dirs=/tmp/kafka-logs
  8. num.partitions=2
  9. log.retention.hours=168
  10. log.segment.bytes=536870912
  11. log.retention.check.interval.ms=60000
  12. log.cleaner.enable=false
  13. zookeeper.connect=localhost:2181
  14. zookeeper.connection.timeout.ms=1000000

server.properties中所有配置参数说明(解释)如下列表:

参数 说明(解释)
broker.id =0 每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况
log.dirs=/data/kafka-logs kafka数据的存放地址,多个地址的话用逗号分割,多个目录分布在不同磁盘上可以提高读写性能  /data/kafka-logs-1,/data/kafka-logs-2
port =9092 broker server服务端口
message.max.bytes =6525000 表示消息体的最大大小,单位是字节
num.network.threads =4 broker处理消息的最大线程数,一般情况下数量为cpu核数
num.io.threads =8 broker处理磁盘IO的线程数,数值为cpu核数2倍
background.threads =4 一些后台任务处理的线程数,例如过期消息文件的删除等,一般情况下不需要去做修改
queued.max.requests =500 等待IO线程处理的请求队列最大数,若是等待IO的请求超过这个数值,那么会停止接受外部消息,应该是一种自我保护机制。
host.name broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK,一般不设置
socket.send.buffer.bytes=100*1024 socket的发送缓冲区,socket的调优参数SO_SNDBUFF
socket.receive.buffer.bytes =100*1024 socket的接受缓冲区,socket的调优参数SO_RCVBUFF
socket.request.max.bytes =100*1024*1024 socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
log.segment.bytes =1024*1024*1024 topic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖
log.roll.hours =24*7 这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segment会被 topic创建时的指定参数覆盖
log.cleanup.policy = delete 日志清理策略选择有:delete和compact主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖
log.retention.minutes=300

log.retention.hours=24

数据文件保留多长时间, 存储的最大时间超过这个时间会根据log.cleanup.policy设置数据清除策略

log.retention.bytes和log.retention.minutes或log.retention.hours任意一个达到要求,都会执行删除

有2删除数据文件方式:

按照文件大小删除:log.retention.bytes

按照2中不同时间粒度删除:分别为分钟,小时

log.retention.bytes=-1 topic每个分区的最大文件大小,一个topic的大小限制 = 分区数*log.retention.bytes。-1没有大小限log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
log.retention.check.interval.ms=5minutes 文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略
log.cleaner.enable=false 是否开启日志清理
log.cleaner.threads = 2 日志清理运行的线程数
log.cleaner.io.max.bytes.per.second=None 日志清理时候处理的最大大小
log.cleaner.dedupe.buffer.size=500*1024*1024 日志清理去重时候的缓存空间,在空间允许的情况下,越大越好
log.cleaner.io.buffer.size=512*1024 日志清理时候用到的IO块大小一般不需要修改
log.cleaner.io.buffer.load.factor =0.9 日志清理中hash表的扩大因子一般不需要修改
log.cleaner.backoff.ms =15000 检查是否处罚日志清理的间隔
log.cleaner.min.cleanable.ratio=0.5 日志清理的频率控制,越大意味着更高效的清理,同时会存在一些空间上的浪费,会被topic创建时的指定参数覆盖
log.cleaner.delete.retention.ms =1day 对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。会被topic创建时的指定参数覆盖
log.index.size.max.bytes =10*1024*1024 对于segment日志的索引文件大小限制,会被topic创建时的指定参数覆盖
log.index.interval.bytes =4096 当执行一个fetch操作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数
log.flush.interval.messages=None

例如log.flush.interval.messages=1000

表示每当消息记录数达到1000时flush一次数据到磁盘

log文件”sync”到磁盘之前累积的消息条数,因为磁盘IO操作是一个慢操作,但又是一个”数据可靠性"的必要手段,所以此参数的设置,需要在"数据可靠性"与"性能"之间做必要的权衡.如果此值过大,将会导致每次"fsync"的时间较长(IO阻塞),如果此值过小,将会导致"fsync"的次数较多,这也意味着整体的client请求有一定的延迟.物理server故障,将会导致没有fsync的消息丢失.
log.flush.scheduler.interval.ms =3000 检查是否需要固化到硬盘的时间间隔
log.flush.interval.ms = None

例如:log.flush.interval.ms=1000

表示每间隔1000毫秒flush一次数据到磁盘

仅仅通过interval来控制消息的磁盘写入时机,是不足的.此参数用于控制"fsync"的时间间隔,如果消息量始终没有达到阀值,但是离上一次磁盘同步的时间间隔达到阀值,也将触发.
log.delete.delay.ms =60000 文件在索引中清除后保留的时间一般不需要去修改
log.flush.offset.checkpoint.interval.ms =60000 控制上次固化硬盘的时间点,以便于数据恢复一般不需要去修改
auto.create.topics.enable =true 是否允许自动创建topic,若是false,就需要通过命令创建topic
default.replication.factor =1 是否允许自动创建topic,若是false,就需要通过命令创建topic
num.partitions =1 每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖
以下是kafka中Leader,replicas配置参数
controller.socket.timeout.ms =30000 partition leader与replicas之间通讯时,socket的超时时间
controller.message.queue.size=10 partition leader与replicas数据同步时,消息的队列尺寸
replica.lag.time.max.ms =10000 replicas响应partition leader的最长等待时间,若是超过这个时间,就将replicas列入ISR(in-sync replicas),并认为它是死的,不会再加入管理中
replica.lag.max.messages =4000 如果follower落后与leader太多,将会认为此follower[或者说partition relicas]已经失效

##通常,在follower与leader通讯时,因为网络延迟或者链接断开,总会导致replicas中消息同步滞后

##如果消息之后太多,leader将认为此follower网络延迟较大或者消息吞吐能力有限,将会把此replicas迁移

##到其他follower中.

##在broker数量较少,或者网络不足的环境中,建议提高此值.

replica.socket.timeout.ms=30*1000 follower与leader之间的socket超时时间
replica.socket.receive.buffer.bytes=64*1024 leader复制时候的socket缓存大小
replica.fetch.max.bytes =1024*1024 replicas每次获取数据的最大大小
replica.fetch.wait.max.ms =500 replicas同leader之间通信的最大等待时间,失败了会重试
replica.fetch.min.bytes =1 fetch的最小数据尺寸,如果leader中尚未同步的数据不足此值,将会阻塞,直到满足条件
num.replica.fetchers=1 leader进行复制的线程数,增大这个数值会增加follower的IO
replica.high.watermark.checkpoint.interval.ms =5000 每个replica检查是否将最高水位进行固化的频率
controlled.shutdown.enable =false 是否允许控制器关闭broker ,若是设置为true,会关闭所有在这个broker上的leader,并转移到其他broker
controlled.shutdown.max.retries =3 控制器关闭的尝试次数
controlled.shutdown.retry.backoff.ms =5000 每次关闭尝试的时间间隔
leader.imbalance.per.broker.percentage =10 leader的不平衡比例,若是超过这个数值,会对分区进行重新的平衡
leader.imbalance.check.interval.seconds =300 检查leader是否不平衡的时间间隔
offset.metadata.max.bytes 客户端保留offset信息的最大空间大小
kafka中zookeeper参数配置
zookeeper.connect = localhost:2181 zookeeper集群的地址,可以是多个,多个之间用逗号分割 hostname1:port1,hostname2:port2,hostname3:port3
zookeeper.session.timeout.ms=6000 ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过大
zookeeper.connection.timeout.ms =6000 ZooKeeper的连接超时时间
zookeeper.sync.time.ms =2000 ZooKeeper集群中leader和follower之间的同步实际那

参考链接

本文地址: https://www.xiongge.club/biancheng/elk/1341.html

转载请注明:熊哥clubKafka_2.11-0.10.0.1 详细使用指南【建议收藏】

©熊哥club,本站推荐使用的主机:阿里云,CDN建议使用七牛云


关注微信公众号『熊哥club』

免费提供IT技术指导交流
  关注博主不迷路~

喜欢 (5)
[您的支持是我最大的动力]
分享 (0)
发表我的评论
取消评论
表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
×
订阅图标按钮