kafka的一些调试手段 kafka调度
bigegpt 2024-10-19 02:49 3 浏览
简介
Kafka是一个使用Scala和Java编写的快速、可扩展、高吞吐、可容错的分布式发布订阅消息系统。Kafka具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。
在发布-订阅消息系统中,消息被持久化到一个topic中(即存放在磁盘中),消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者
Kafka将消息持久化到磁盘中,并对消息创建了备份(replica)保证了数据的安全。Kafka在保证了较高的处理速度的同时,又能保证数据处理的低延迟和数据的零丢失。
Kafka的优势在于:
- 可靠性:Kafka是一个具有分区机制、副本机制和容错机制的分布式消息系统, 即消息被持久化到本地磁盘,并且支持数据备份防止数据丢失。如副本数量为5,则允许4(n-1)个节点失败)
- 可扩展性:Kafka消息系统支持集群规模的热扩展
- 高性能:Kafka在数据发布和订阅过程中都能保证数据的高吞吐量。即便在TB级数据存储的情况下,仍然能保证稳定的性能。Kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒
原理简述
Kafka架构
在我们的实现中,每一个服务器就是一个broker(或者叫kafka的实例), Kafka的broker是无状态的,broker使用Zookeeper维护集群的状态。Leader的选举也由Zookeeper负责。
Zookeeper负责维护和协调broker。当Kafka系统中新增了broker或者某个broker发生故障失效时,由ZooKeeper通知生产者和消费者。生产者和消费者依据Zookeeper的broker状态信息与broker协调数据的发布和订阅任务。
如,检查zookeeper配置
- cat /opt/zookeeper/conf/zoo.cfg server.1 = 0.0.0.0:2888:3888server.2 = 172.20.1.12:2888:3888server.3 = 172.20.1.13:2888:3888server.4 = 172.20.1.14:2888:3888server.5 = 172.20.1.15:2888:3888clientPort=2181dataDir=/home/zookeeper/data
配置说明:
- server.1: 表示第一号服务器(1也叫myid,用中主机在集群的指定标识)
- 172.20.1.12: 主机在集群中的IP地址
- 2888: Master与Slave之间的通讯端口
- 3888: Leader选举的端口(集群启动时或leader主机挂掉时进行新的leader选举)
- ClientPort: 客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求(如查询topic list的命令/opt/kafka/bin/kafka-topics.sh --zookeeper 172.20.1.11:2181 –list)
- dataDir: 快照日志的存储路径
可通/opt/zookeeper/bin/zkServer.sh status命令查看各个主机的服务状态
注: 5台主机的集群中仅有一个Leader,其余4台都是follower
Kafka的配置查看
cat /opt/kafka/config/server.properties
broker.id=1
#port=9092
zookeeper.connect=172.20.1.11:2181,172.20.1.12:2181,172.20.1.13:2181,172.20.1.14:2181,172.20.1.15:2181
log.dirs=/home/kafka-logs
default.replication.factor=5
auto.create.topics.enable=true
zookeeper.connection.timeout.ms=6000
log.retention.hours=1
log.dirs=/home/kafka-logs
配置说明:
- broker.id: 当前机器在集群中的唯一标识,和zookeeper的myid性质一样
- port=9092: 当前kafka对外提供服务的端口默认是9092
- num.io.threads:是borker进行I/O处理的线程数
- log.dir: 消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数。若配置多个目录,新创建的topic在消息持久化时会在放置在几个目录中分区最少的目录中。
- log.retention.hours=1 : 默认消息的最大持久化时间,我们选择一个小时
- default.replication.factor=5: #kafka保存消息的副本数,如果一个副本失效了,其余的还可以继续提供服务
基本概念
topic
Topic 被称为主题,在 kafka 中,使用一个类别属性来划分消息的所属类,划分消息的这个类称为 topic。topic 相当于消息的分配标签,是一个逻辑概念。主题好比是数据库的表,或者文件系统中的文件夹。
Partition
topic 中的消息被分割为一个或多个 partition,它是一个物理概念,对应到系统上的就是一个或若干个目录,一个分区就是一个 提交日志目录。消息以追加的形式写入分区,以先后顺序的方式读取。
Partition能水平扩展客户端的读写性能,是高吞吐量的保障(通过将分区分布在不同的服务器上,也就是说,一个主题可以跨越多个服务器,以此来提供比单个服务器更强大的性能)。简单来说,Partition就是一块保存具体数据的空间,本质就是磁盘上存放数据的文件夹,所以Partition是不能跨Broker存在(其他broker上存放的实际是Leader partition的副本),也不能在同一个Broker上跨磁盘。对于一个Topic,可以根据需要设定Partition的个数;Kafka默认的Partition个数num.partitions为1(/opt/kafka/config/server.properties),表示该Topic的所有数据均写入至一个文件夹下;用户也可以在通过指定–partitions来定义分区数(新建topic或通过—alter参数修改—注意是增加)。在数据持久化时,每条消息都是根据一定的分区规则路由到对应的Partition中,并append在log文件的尾部;在同一个Partition中消息是顺序写入的且始终保持有序性;但是不同Partition之间不能保证消息的有序性。
注意:由于一个主题包含无数个分区,因此无法保证在整个 topic 中有序,但是单个 Partition 分区可以保证有序。消息被迫加写入每个分区的尾部。Kafka 通过分区来实现数据冗余和伸缩性
Replica(副本)
Replica是Kafka架构中一个比较重要的概念,是系统高可用的一种保障。Replica逻辑上是作用于Topic的,但实际上是体现在每一个Partition上。例如:有一个Topic(如CMA_DATA),分区(partitions)数为2(分别为P0,P1),副本因子(replication-factor)数也为3;其本质就是该Topic一共有3个P0分区,3个P1分区。这样的设计在某种意义上就很大程度的提高了系统的容错率。
- Leader replica: 每一个Partition有且只有一个Replica可以作为Leader,其职责是负责处理所有Producer、Consumer的请求;与此同时,Leader还负责监管和维护ISR(In-Sync Replicas:副本同步队列)中所有follower的滞后状态。
- Follower replica: 每个Partition中除了Leader以外的所有Replica均为follower,其职责是通过Fetch Request拉取leader replica的数据进行同步(不处理任何来自客户端的请求)
为了尽可能的提升服务的可用性和容错率,Kafka遵循如下的分区分配原则:
- 所有的replica要尽可能的平均分配到集群中的每一台broker上
- 尽可能保证同一个partition的leader和follower分在不同的broker上
- 如果集群跨机架,尽可能的保证每个partition的replica分配到不同的机架上
消息有序性
Partition 中的每条记录都会被分配一个唯一的序号,称为 Offset(偏移量)。
Offset是一个递增的、不可变的数字,由 kafka 自动维护。
当一条记录写入Partition 的时候,它就被追加到 log 文件的末尾,并被分配一个序号作为Offset。向Topic 发送消息的时候,实际上是被写入某一个Partition,并赋予Offset。
一个Topic 如果有多个Partition 的话,那么从Topic 这个层面来看,消息是无序的。
但单独看Partition 的话,Partition 内部消息是有序的。
所以,一个Partition 内部消息有序,一个Topic 跨Partition 是无序的。
如果强制要求Topic 整体有序,就只能让Topic 只有一个Partition。
Segment
Partition是用来存储数据的,但并不是最小的数据存储单元。Partition下还可以细分成Segment,每个Partition是由一个或多个Segment组成。每个Segment分别对应两个文件:一个是以.index结尾的索引文件,另一个是以.log结尾的数据文件,且两个文件的文件名完全相同。所有的Segment均存在于所属Partition的目录下。
Segment的必要性:如果以partition作为数据存储的最小单元,那么partition将会是一个很大的数据文件,且数据量是持续递增的;当进行过期数据清理或消费指定offset数据时,操作如此的大文件将会是一个很严重的性能问题
broker
Kafka 集群包含一个或多个服务器,每个 Kafka 中服务器被称为 broker。broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker 为消费者提供服务,对读取分区的请求作出响应,返回已经提交到磁盘上的消息。
broker 是集群的组成部分,每个集群中都会有一个 broker 同时充当了 集群控制器(Leader)的角色,它是由集群中的活跃成员选举出来的。每个集群中的成员都有可能充当 Leader,Leader 负责管理工作,包括将分区分配给 broker 和监控 broker。集群中,一个分区从属于一个 Leader,但是一个分区可以分配给多个 broker(非Leader),这时候会发生分区复制。这种复制的机制为分区提供了消息冗余,如果一个 broker 失效,那么其他活跃用户会重新选举一个 Leader 接管。
Producer
生产者,即消息的发布者,其会将某 topic 的消息发布到相应的 partition 中。生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。不过,在某些情况下,生产者会把消息直接写到指定的分区。
Consumer
消费者,即消息的使用者,一个消费者可以消费多个 topic 的消息,对于某一个 topic 的消息,其只会消费同一个 partition 中的消息。
每个消费者组都有一个组id!同一个消费组者的消费者可以消费同一topic下不同分区的数据,但是不会组内多个消费者消费同一分区的数据!
即组内消费者小于Topic分区数量时,会存在某个消费者消费多个分区消息的情况;而组内消费者大于分区数量时,多出来的消费者不消费任何分区的消息,即尽量消费组的consumer数量与分区数量一致
调试
创建topic
如果kafka broker中的config/server.properties配置文件中配置了auto.create.topics.enable参数为true(默认值就是true),那么当生产者向一个尚未创建的topic发送消息时,会自动创建一个num.partitions(默认值为1)个分区和default.replication.factor(我们配置为5)个副本的对应topic。不过我们一般不建议将auto.create.topics.enable参数设置为true,因为这个参数会影响topic的管理与维护。
通过kafka提供的kafka-topics.sh脚本来创建,并且我们也建议通过这种方式(或者相关的变种方式)来创建topic。
如,通过kafka-topics.sh 脚本来创建一个名为PP8-LC1-LC2并且副本数为2、分区数为4的topic
/opt/kafka/bin/kafka-topics.sh --create --topic windtest --replication-factor 5 --partitions 4 --zookeeper localhost:2181
显示topic列表
/opt/kafka/bin/kafka-topics.sh --zookeeper 172.20.1.11:2181 --list
查看具体的某个topic信息
列出了某个topic的partition数量、replica因子以及每个partition的leader、replica信息,即创建的windtest topic4个分区每个分区的leader是哪个broker, replica情况等
/opt/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic windtest
发布和消费消息
1. 在新创建的topic上,利用脚本创建一个发布者
/opt/kafka/bin/kafka-console-producer.sh --broker-list 172.20.1.11:9092,172.20.1.12:9092,172.20.1.13:9092,172.20.1.14:9092,172.20.1.15:9092 --topic windtest
2. 在每个node上创建一个订阅者
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 172.20.1.11:9092,172.20.1.12:9092,172.20.1.13:9092,172.20.1.14:9092,172.20.1.15:9092 --topic windtest --from-beginning
3. 在发布者窗口发送消息(回车一次为一条消息),检查每个订阅者是否收到消息
前面4条消息时,5个订阅者均在,最后一条消息时,仅有三个订阅者存在
检查topic的offset
/opt/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 172.20.1.12:9092 --topic windtest
windtest:2:2
windtest:1:1
windtest:3:1
windtest:0:1
可见5条消息通过轮序的方式在每个partition中存放
查看消费组
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 172.20.1.12:9092 --list
命令行consumer方式时,未指定消费组,自动生成了五个消费组(如上图所示)
注:5个client在5个不同的消费组中,因此,发布的消息被5个client分别消费(即消费了5次)
同组消费
1. 将5个消费者编入同一个消费组
在5个设备上分别创建一个user.properties文件,输入group.id=consoleGroup1
[root@node1 ~]# cat ./user.properties
group.id=consoleGroup1
在五台主机上启动消费者
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 172.20.1.12:9092 --topic windtest --consumer.config ./user.properties
2. 启动消息发布者
启动前,记录每个partition的offset
发布一条消息
可见通一个组下的5个消费者,仅有一个client消费了消息
3. 检查消费组的消费情况
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 172.20.1.15:9092 --describe --group consoleGroup1
在producer中发布多条消息,可见发布的消息会以round-robin方式放在各个partition中
发布10条消息后,由于我们只有4个partition, 而组内的消费者有5个,所以其中一个消费者始终没有消费任何消息,如下图所示的172.20.1.15这个消费者
注: LAG表示消息消费速度与生产速度之间的差异(若LAG值太大,证明消费能力不足)
4. 组内消费成员固定在一个partition上消费
通过比对partition offset与client的消费情况可见,每个client在启动后,分配在哪个partition就会一直在这个partition消费
消费分区策略
1. 消息生产时,若指定了分区信息,消息直接投递到该分区
2. 如果未指定消息生产分区,但是指定了key, 则基于该key的hash值分配分区
3. 若分区和key都未制定,消息按照round-robin(轮询)方式投递到各个分区
4. 同一条消息只能被消费组中一个消费者消费
5. 当分区数量大于组中消费者时,会出现组内某一个消费者负责多个分区的情况
6. 当分区数量小于组内消费者数目时,会出现多出来的消费者成员没有分区对应
查看partition消息内容
通过如下命令查看那个partition下有消息
/opt/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 172.20.1.12:9092 --topic CMA_DATA
然后通过如下命令查看这些消息的内容(/home/kafka-logs是日志存放位置,CMA_DATA是topic名字,23是其分区号)
/opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /home/kafka-logs/CMA_DATA-23/00000000000000000000.log --deep-iteration --print-data-log | more
修改topic分区
/opt/kafka/bin/kafka-topics.sh --zookeeper 172.20.1.11:2181 --alter --topic windtest --partitions 5
注意: 只能增加partition, 若减少已有消息的partition时,该partition的消息如何处理是个问题,实际操作时,我看到如下错误信息
修改后,已经启动且未能分配到partition的消费者会自动分配到新的partition进行消费
删除topic
/opt/kafka/bin/kafka-topics.sh --delete --topic windtest --zookeeper 172.20.1.11:2181
删除前,在log.dirs=/home/kafka-logs目录下,存在topic windtest 4个partition的消息存放目录,当删除后,4个partition目录也同时删除(实际上是标记为删除状态)
Kafka运行日志查看
在/opt/kafka/config/log4j.properties文件可查看到kafka的运行日志存放路径(如,将日志存在ceph网络磁盘的目录下----每个主机一个目录)
kafka.logs.dir=/usr/admin/log/hosts/node1/kafka/
同时也可log配置文件中修改日志输出等级
相关推荐
- 得物可观测平台架构升级:基于GreptimeDB的全新监控体系实践
-
一、摘要在前端可观测分析场景中,需要实时观测并处理多地、多环境的运行情况,以保障Web应用和移动端的可用性与性能。传统方案往往依赖代理Agent→消息队列→流计算引擎→OLAP存储...
- warm-flow新春版:网关直连和流程图重构
-
本期主要解决了网关直连和流程图重构,可以自此之后可支持各种复杂的网关混合、多网关直连使用。-新增Ruoyi-Vue-Plus优秀开源集成案例更新日志[feat]导入、导出和保存等新增json格式支持...
- 扣子空间体验报告
-
在数字化时代,智能工具的应用正不断拓展到我们工作和生活的各个角落。从任务规划到项目执行,再到任务管理,作者深入探讨了这款工具在不同场景下的表现和潜力。通过具体的应用实例,文章展示了扣子空间如何帮助用户...
- spider-flow:开源的可视化方式定义爬虫方案
-
spider-flow简介spider-flow是一个爬虫平台,以可视化推拽方式定义爬取流程,无需代码即可实现一个爬虫服务。spider-flow特性支持css选择器、正则提取支持JSON/XML格式...
- solon-flow 你好世界!
-
solon-flow是一个基础级的流处理引擎(可用于业务规则、决策处理、计算编排、流程审批等......)。提供有“开放式”驱动定制支持,像jdbc有mysql或pgsql等驱动,可...
- 新一代开源爬虫平台:SpiderFlow
-
SpiderFlow:新一代爬虫平台,以图形化方式定义爬虫流程,不写代码即可完成爬虫。-精选真开源,释放新价值。概览Spider-Flow是一个开源的、面向所有用户的Web端爬虫构建平台,它使用Ja...
- 通过 SQL 训练机器学习模型的引擎
-
关注薪资待遇的同学应该知道,机器学习相关的岗位工资普遍偏高啊。同时随着各种通用机器学习框架的出现,机器学习的门槛也在逐渐降低,训练一个简单的机器学习模型变得不那么难。但是不得不承认对于一些数据相关的工...
- 鼠须管输入法rime for Mac
-
鼠须管输入法forMac是一款十分新颖的跨平台输入法软件,全名是中州韵输入法引擎,鼠须管输入法mac版不仅仅是一个输入法,而是一个输入法算法框架。Rime的基础架构十分精良,一套算法支持了拼音、...
- Go语言 1.20 版本正式发布:新版详细介绍
-
Go1.20简介最新的Go版本1.20在Go1.19发布六个月后发布。它的大部分更改都在工具链、运行时和库的实现中。一如既往,该版本保持了Go1的兼容性承诺。我们期望几乎所...
- iOS 10平台SpriteKit新特性之Tile Maps(上)
-
简介苹果公司在WWDC2016大会上向人们展示了一大批新的好东西。其中之一就是SpriteKitTileEditor。这款工具易于上手,而且看起来速度特别快。在本教程中,你将了解关于TileE...
- 程序员简历例句—范例Java、Python、C++模板
-
个人简介通用简介:有良好的代码风格,通过添加注释提高代码可读性,注重代码质量,研读过XXX,XXX等多个开源项目源码从而学习增强代码的健壮性与扩展性。具备良好的代码编程习惯及文档编写能力,参与多个高...
- Telerik UI for iOS Q3 2015正式发布
-
近日,TelerikUIforiOS正式发布了Q32015。新版本新增对XCode7、Swift2.0和iOS9的支持,同时还新增了对数轴、不连续的日期时间轴等;改进TKDataPoin...
- ios使用ijkplayer+nginx进行视频直播
-
上两节,我们讲到使用nginx和ngixn的rtmp模块搭建直播的服务器,接着我们讲解了在Android使用ijkplayer来作为我们的视频直播播放器,整个过程中,需要注意的就是ijlplayer编...
- IOS技术分享|iOS快速生成开发文档(一)
-
前言对于开发人员而言,文档的作用不言而喻。文档不仅可以提高软件开发效率,还能便于以后的软件开发、使用和维护。本文主要讲述Objective-C快速生成开发文档工具appledoc。简介apple...
- macOS下配置VS Code C++开发环境
-
本文介绍在苹果macOS操作系统下,配置VisualStudioCode的C/C++开发环境的过程,本环境使用Clang/LLVM编译器和调试器。一、前置条件本文默认前置条件是,您的开发设备已...
- 一周热门
- 最近发表
- 标签列表
-
- mybatiscollection (79)
- mqtt服务器 (88)
- keyerror (78)
- c#map (65)
- resize函数 (64)
- xftp6 (83)
- bt搜索 (75)
- c#var (76)
- mybatis大于等于 (64)
- xcode-select (66)
- httperror403.14-forbidden (63)
- logstashinput (65)
- hadoop端口 (65)
- dockernetworkconnect (63)
- esxi7 (63)
- vue阻止冒泡 (67)
- c#for循环 (63)
- oracle时间戳转换日期 (64)
- jquery跨域 (68)
- php写入文件 (73)
- java大写转小写 (63)
- kafkatools (66)
- mysql导出数据库 (66)
- jquery鼠标移入移出 (71)
- 取小数点后两位的函数 (73)