kafka的一些调试手段 kafka调度
bigegpt 2024-10-19 02:49 8 浏览
简介
Kafka是一个使用Scala和Java编写的快速、可扩展、高吞吐、可容错的分布式发布订阅消息系统。Kafka具有高吞吐量、内置分区、支持数据副本和容错的特性,适合在大规模消息处理场景中使用。
在发布-订阅消息系统中,消息被持久化到一个topic中(即存放在磁盘中),消费者可以订阅一个或多个topic,消费者可以消费该topic中所有的数据,同一条数据可以被多个消费者消费,数据被消费后不会立马删除。在发布-订阅消息系统中,消息的生产者称为发布者,消费者称为订阅者
Kafka将消息持久化到磁盘中,并对消息创建了备份(replica)保证了数据的安全。Kafka在保证了较高的处理速度的同时,又能保证数据处理的低延迟和数据的零丢失。
Kafka的优势在于:
- 可靠性:Kafka是一个具有分区机制、副本机制和容错机制的分布式消息系统, 即消息被持久化到本地磁盘,并且支持数据备份防止数据丢失。如副本数量为5,则允许4(n-1)个节点失败)
- 可扩展性:Kafka消息系统支持集群规模的热扩展
- 高性能:Kafka在数据发布和订阅过程中都能保证数据的高吞吐量。即便在TB级数据存储的情况下,仍然能保证稳定的性能。Kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒
原理简述
Kafka架构
在我们的实现中,每一个服务器就是一个broker(或者叫kafka的实例), Kafka的broker是无状态的,broker使用Zookeeper维护集群的状态。Leader的选举也由Zookeeper负责。
Zookeeper负责维护和协调broker。当Kafka系统中新增了broker或者某个broker发生故障失效时,由ZooKeeper通知生产者和消费者。生产者和消费者依据Zookeeper的broker状态信息与broker协调数据的发布和订阅任务。
如,检查zookeeper配置
- cat /opt/zookeeper/conf/zoo.cfg server.1 = 0.0.0.0:2888:3888server.2 = 172.20.1.12:2888:3888server.3 = 172.20.1.13:2888:3888server.4 = 172.20.1.14:2888:3888server.5 = 172.20.1.15:2888:3888clientPort=2181dataDir=/home/zookeeper/data
配置说明:
- server.1: 表示第一号服务器(1也叫myid,用中主机在集群的指定标识)
- 172.20.1.12: 主机在集群中的IP地址
- 2888: Master与Slave之间的通讯端口
- 3888: Leader选举的端口(集群启动时或leader主机挂掉时进行新的leader选举)
- ClientPort: 客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户端的访问请求(如查询topic list的命令/opt/kafka/bin/kafka-topics.sh --zookeeper 172.20.1.11:2181 –list)
- dataDir: 快照日志的存储路径
可通/opt/zookeeper/bin/zkServer.sh status命令查看各个主机的服务状态
注: 5台主机的集群中仅有一个Leader,其余4台都是follower
Kafka的配置查看
cat /opt/kafka/config/server.properties
broker.id=1
#port=9092
zookeeper.connect=172.20.1.11:2181,172.20.1.12:2181,172.20.1.13:2181,172.20.1.14:2181,172.20.1.15:2181
log.dirs=/home/kafka-logs
default.replication.factor=5
auto.create.topics.enable=true
zookeeper.connection.timeout.ms=6000
log.retention.hours=1
log.dirs=/home/kafka-logs
配置说明:
- broker.id: 当前机器在集群中的唯一标识,和zookeeper的myid性质一样
- port=9092: 当前kafka对外提供服务的端口默认是9092
- num.io.threads:是borker进行I/O处理的线程数
- log.dir: 消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数。若配置多个目录,新创建的topic在消息持久化时会在放置在几个目录中分区最少的目录中。
- log.retention.hours=1 : 默认消息的最大持久化时间,我们选择一个小时
- default.replication.factor=5: #kafka保存消息的副本数,如果一个副本失效了,其余的还可以继续提供服务
基本概念
topic
Topic 被称为主题,在 kafka 中,使用一个类别属性来划分消息的所属类,划分消息的这个类称为 topic。topic 相当于消息的分配标签,是一个逻辑概念。主题好比是数据库的表,或者文件系统中的文件夹。
Partition
topic 中的消息被分割为一个或多个 partition,它是一个物理概念,对应到系统上的就是一个或若干个目录,一个分区就是一个 提交日志目录。消息以追加的形式写入分区,以先后顺序的方式读取。
Partition能水平扩展客户端的读写性能,是高吞吐量的保障(通过将分区分布在不同的服务器上,也就是说,一个主题可以跨越多个服务器,以此来提供比单个服务器更强大的性能)。简单来说,Partition就是一块保存具体数据的空间,本质就是磁盘上存放数据的文件夹,所以Partition是不能跨Broker存在(其他broker上存放的实际是Leader partition的副本),也不能在同一个Broker上跨磁盘。对于一个Topic,可以根据需要设定Partition的个数;Kafka默认的Partition个数num.partitions为1(/opt/kafka/config/server.properties),表示该Topic的所有数据均写入至一个文件夹下;用户也可以在通过指定–partitions来定义分区数(新建topic或通过—alter参数修改—注意是增加)。在数据持久化时,每条消息都是根据一定的分区规则路由到对应的Partition中,并append在log文件的尾部;在同一个Partition中消息是顺序写入的且始终保持有序性;但是不同Partition之间不能保证消息的有序性。
注意:由于一个主题包含无数个分区,因此无法保证在整个 topic 中有序,但是单个 Partition 分区可以保证有序。消息被迫加写入每个分区的尾部。Kafka 通过分区来实现数据冗余和伸缩性
Replica(副本)
Replica是Kafka架构中一个比较重要的概念,是系统高可用的一种保障。Replica逻辑上是作用于Topic的,但实际上是体现在每一个Partition上。例如:有一个Topic(如CMA_DATA),分区(partitions)数为2(分别为P0,P1),副本因子(replication-factor)数也为3;其本质就是该Topic一共有3个P0分区,3个P1分区。这样的设计在某种意义上就很大程度的提高了系统的容错率。
- Leader replica: 每一个Partition有且只有一个Replica可以作为Leader,其职责是负责处理所有Producer、Consumer的请求;与此同时,Leader还负责监管和维护ISR(In-Sync Replicas:副本同步队列)中所有follower的滞后状态。
- Follower replica: 每个Partition中除了Leader以外的所有Replica均为follower,其职责是通过Fetch Request拉取leader replica的数据进行同步(不处理任何来自客户端的请求)
为了尽可能的提升服务的可用性和容错率,Kafka遵循如下的分区分配原则:
- 所有的replica要尽可能的平均分配到集群中的每一台broker上
- 尽可能保证同一个partition的leader和follower分在不同的broker上
- 如果集群跨机架,尽可能的保证每个partition的replica分配到不同的机架上
消息有序性
Partition 中的每条记录都会被分配一个唯一的序号,称为 Offset(偏移量)。
Offset是一个递增的、不可变的数字,由 kafka 自动维护。
当一条记录写入Partition 的时候,它就被追加到 log 文件的末尾,并被分配一个序号作为Offset。向Topic 发送消息的时候,实际上是被写入某一个Partition,并赋予Offset。
一个Topic 如果有多个Partition 的话,那么从Topic 这个层面来看,消息是无序的。
但单独看Partition 的话,Partition 内部消息是有序的。
所以,一个Partition 内部消息有序,一个Topic 跨Partition 是无序的。
如果强制要求Topic 整体有序,就只能让Topic 只有一个Partition。
Segment
Partition是用来存储数据的,但并不是最小的数据存储单元。Partition下还可以细分成Segment,每个Partition是由一个或多个Segment组成。每个Segment分别对应两个文件:一个是以.index结尾的索引文件,另一个是以.log结尾的数据文件,且两个文件的文件名完全相同。所有的Segment均存在于所属Partition的目录下。
Segment的必要性:如果以partition作为数据存储的最小单元,那么partition将会是一个很大的数据文件,且数据量是持续递增的;当进行过期数据清理或消费指定offset数据时,操作如此的大文件将会是一个很严重的性能问题
broker
Kafka 集群包含一个或多个服务器,每个 Kafka 中服务器被称为 broker。broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker 为消费者提供服务,对读取分区的请求作出响应,返回已经提交到磁盘上的消息。
broker 是集群的组成部分,每个集群中都会有一个 broker 同时充当了 集群控制器(Leader)的角色,它是由集群中的活跃成员选举出来的。每个集群中的成员都有可能充当 Leader,Leader 负责管理工作,包括将分区分配给 broker 和监控 broker。集群中,一个分区从属于一个 Leader,但是一个分区可以分配给多个 broker(非Leader),这时候会发生分区复制。这种复制的机制为分区提供了消息冗余,如果一个 broker 失效,那么其他活跃用户会重新选举一个 Leader 接管。
Producer
生产者,即消息的发布者,其会将某 topic 的消息发布到相应的 partition 中。生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。不过,在某些情况下,生产者会把消息直接写到指定的分区。
Consumer
消费者,即消息的使用者,一个消费者可以消费多个 topic 的消息,对于某一个 topic 的消息,其只会消费同一个 partition 中的消息。
每个消费者组都有一个组id!同一个消费组者的消费者可以消费同一topic下不同分区的数据,但是不会组内多个消费者消费同一分区的数据!
即组内消费者小于Topic分区数量时,会存在某个消费者消费多个分区消息的情况;而组内消费者大于分区数量时,多出来的消费者不消费任何分区的消息,即尽量消费组的consumer数量与分区数量一致
调试
创建topic
如果kafka broker中的config/server.properties配置文件中配置了auto.create.topics.enable参数为true(默认值就是true),那么当生产者向一个尚未创建的topic发送消息时,会自动创建一个num.partitions(默认值为1)个分区和default.replication.factor(我们配置为5)个副本的对应topic。不过我们一般不建议将auto.create.topics.enable参数设置为true,因为这个参数会影响topic的管理与维护。
通过kafka提供的kafka-topics.sh脚本来创建,并且我们也建议通过这种方式(或者相关的变种方式)来创建topic。
如,通过kafka-topics.sh 脚本来创建一个名为PP8-LC1-LC2并且副本数为2、分区数为4的topic
/opt/kafka/bin/kafka-topics.sh --create --topic windtest --replication-factor 5 --partitions 4 --zookeeper localhost:2181
显示topic列表
/opt/kafka/bin/kafka-topics.sh --zookeeper 172.20.1.11:2181 --list
查看具体的某个topic信息
列出了某个topic的partition数量、replica因子以及每个partition的leader、replica信息,即创建的windtest topic4个分区每个分区的leader是哪个broker, replica情况等
/opt/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic windtest
发布和消费消息
1. 在新创建的topic上,利用脚本创建一个发布者
/opt/kafka/bin/kafka-console-producer.sh --broker-list 172.20.1.11:9092,172.20.1.12:9092,172.20.1.13:9092,172.20.1.14:9092,172.20.1.15:9092 --topic windtest
2. 在每个node上创建一个订阅者
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 172.20.1.11:9092,172.20.1.12:9092,172.20.1.13:9092,172.20.1.14:9092,172.20.1.15:9092 --topic windtest --from-beginning
3. 在发布者窗口发送消息(回车一次为一条消息),检查每个订阅者是否收到消息
前面4条消息时,5个订阅者均在,最后一条消息时,仅有三个订阅者存在
检查topic的offset
/opt/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 172.20.1.12:9092 --topic windtest
windtest:2:2
windtest:1:1
windtest:3:1
windtest:0:1
可见5条消息通过轮序的方式在每个partition中存放
查看消费组
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 172.20.1.12:9092 --list
命令行consumer方式时,未指定消费组,自动生成了五个消费组(如上图所示)
注:5个client在5个不同的消费组中,因此,发布的消息被5个client分别消费(即消费了5次)
同组消费
1. 将5个消费者编入同一个消费组
在5个设备上分别创建一个user.properties文件,输入group.id=consoleGroup1
[root@node1 ~]# cat ./user.properties
group.id=consoleGroup1
在五台主机上启动消费者
/opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 172.20.1.12:9092 --topic windtest --consumer.config ./user.properties
2. 启动消息发布者
启动前,记录每个partition的offset
发布一条消息
可见通一个组下的5个消费者,仅有一个client消费了消息
3. 检查消费组的消费情况
/opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server 172.20.1.15:9092 --describe --group consoleGroup1
在producer中发布多条消息,可见发布的消息会以round-robin方式放在各个partition中
发布10条消息后,由于我们只有4个partition, 而组内的消费者有5个,所以其中一个消费者始终没有消费任何消息,如下图所示的172.20.1.15这个消费者
注: LAG表示消息消费速度与生产速度之间的差异(若LAG值太大,证明消费能力不足)
4. 组内消费成员固定在一个partition上消费
通过比对partition offset与client的消费情况可见,每个client在启动后,分配在哪个partition就会一直在这个partition消费
消费分区策略
1. 消息生产时,若指定了分区信息,消息直接投递到该分区
2. 如果未指定消息生产分区,但是指定了key, 则基于该key的hash值分配分区
3. 若分区和key都未制定,消息按照round-robin(轮询)方式投递到各个分区
4. 同一条消息只能被消费组中一个消费者消费
5. 当分区数量大于组中消费者时,会出现组内某一个消费者负责多个分区的情况
6. 当分区数量小于组内消费者数目时,会出现多出来的消费者成员没有分区对应
查看partition消息内容
通过如下命令查看那个partition下有消息
/opt/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 172.20.1.12:9092 --topic CMA_DATA
然后通过如下命令查看这些消息的内容(/home/kafka-logs是日志存放位置,CMA_DATA是topic名字,23是其分区号)
/opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /home/kafka-logs/CMA_DATA-23/00000000000000000000.log --deep-iteration --print-data-log | more
修改topic分区
/opt/kafka/bin/kafka-topics.sh --zookeeper 172.20.1.11:2181 --alter --topic windtest --partitions 5
注意: 只能增加partition, 若减少已有消息的partition时,该partition的消息如何处理是个问题,实际操作时,我看到如下错误信息
修改后,已经启动且未能分配到partition的消费者会自动分配到新的partition进行消费
删除topic
/opt/kafka/bin/kafka-topics.sh --delete --topic windtest --zookeeper 172.20.1.11:2181
删除前,在log.dirs=/home/kafka-logs目录下,存在topic windtest 4个partition的消息存放目录,当删除后,4个partition目录也同时删除(实际上是标记为删除状态)
Kafka运行日志查看
在/opt/kafka/config/log4j.properties文件可查看到kafka的运行日志存放路径(如,将日志存在ceph网络磁盘的目录下----每个主机一个目录)
kafka.logs.dir=/usr/admin/log/hosts/node1/kafka/
同时也可log配置文件中修改日志输出等级
相关推荐
- 当Frida来“敲”门(frida是什么)
-
0x1渗透测试瓶颈目前,碰到越来越多的大客户都会将核心资产业务集中在统一的APP上,或者对自己比较重要的APP,如自己的主业务,办公APP进行加壳,流量加密,投入了很多精力在移动端的防护上。而现在挖...
- 服务端性能测试实战3-性能测试脚本开发
-
前言在前面的两篇文章中,我们分别介绍了性能测试的理论知识以及性能测试计划制定,本篇文章将重点介绍性能测试脚本开发。脚本开发将分为两个阶段:阶段一:了解各个接口的入参、出参,使用Python代码模拟前端...
- Springboot整合Apache Ftpserver拓展功能及业务讲解(三)
-
今日分享每天分享技术实战干货,技术在于积累和收藏,希望可以帮助到您,同时也希望获得您的支持和关注。架构开源地址:https://gitee.com/msxyspringboot整合Ftpserver参...
- Linux和Windows下:Python Crypto模块安装方式区别
-
一、Linux环境下:fromCrypto.SignatureimportPKCS1_v1_5如果导包报错:ImportError:Nomodulenamed'Crypt...
- Python 3 加密简介(python des加密解密)
-
Python3的标准库中是没多少用来解决加密的,不过却有用于处理哈希的库。在这里我们会对其进行一个简单的介绍,但重点会放在两个第三方的软件包:PyCrypto和cryptography上,我...
- 怎样从零开始编译一个魔兽世界开源服务端Windows
-
第二章:编译和安装我是艾西,上期我们讲述到编译一个魔兽世界开源服务端环境准备,那么今天跟大家聊聊怎么编译和安装我们直接进入正题(上一章没有看到的小伙伴可以点我主页查看)编译服务端:在D盘新建一个文件夹...
- 附1-Conda部署安装及基本使用(conda安装教程)
-
Windows环境安装安装介质下载下载地址:https://www.anaconda.com/products/individual安装Anaconda安装时,选择自定义安装,选择自定义安装路径:配置...
- 如何配置全世界最小的 MySQL 服务器
-
配置全世界最小的MySQL服务器——如何在一块IntelEdison为控制板上安装一个MySQL服务器。介绍在我最近的一篇博文中,物联网,消息以及MySQL,我展示了如果Partic...
- 如何使用Github Action来自动化编译PolarDB-PG数据库
-
随着PolarDB在国产数据库领域荣膺桂冠并持续获得广泛认可,越来越多的学生和技术爱好者开始关注并涉足这款由阿里巴巴集团倾力打造且性能卓越的关系型云原生数据库。有很多同学想要上手尝试,却卡在了编译数据...
- 面向NDK开发者的Android 7.0变更(ndk android.mk)
-
订阅Google官方微信公众号:谷歌开发者。与谷歌一起创造未来!受Android平台其他改进的影响,为了方便加载本机代码,AndroidM和N中的动态链接器对编写整洁且跨平台兼容的本机...
- 信创改造--人大金仓(Kingbase)数据库安装、备份恢复的问题纪要
-
问题一:在安装KingbaseES时,安装用户对于安装路径需有“读”、“写”、“执行”的权限。在Linux系统中,需要以非root用户执行安装程序,且该用户要有标准的home目录,您可...
- OpenSSH 安全漏洞,修补操作一手掌握
-
1.漏洞概述近日,国家信息安全漏洞库(CNNVD)收到关于OpenSSH安全漏洞(CNNVD-202407-017、CVE-2024-6387)情况的报送。攻击者可以利用该漏洞在无需认证的情况下,通...
- Linux:lsof命令详解(linux lsof命令详解)
-
介绍欢迎来到这篇博客。在这篇博客中,我们将学习Unix/Linux系统上的lsof命令行工具。命令行工具是您使用CLI(命令行界面)而不是GUI(图形用户界面)运行的程序或工具。lsoflsof代表&...
- 幻隐说固态第一期:固态硬盘接口类别
-
前排声明所有信息来源于网络收集,如有错误请评论区指出更正。废话不多说,目前固态硬盘接口按速度由慢到快分有这几类:SATA、mSATA、SATAExpress、PCI-E、m.2、u.2。下面我们来...
- 新品轰炸 影驰SSD多款产品登Computex
-
分享泡泡网SSD固态硬盘频道6月6日台北电脑展作为全球第二、亚洲最大的3C/IT产业链专业展,吸引了众多IT厂商和全球各地媒体的热烈关注,全球存储新势力—影驰,也积极参与其中,为广大玩家朋友带来了...
- 一周热门
- 最近发表
-
- 当Frida来“敲”门(frida是什么)
- 服务端性能测试实战3-性能测试脚本开发
- Springboot整合Apache Ftpserver拓展功能及业务讲解(三)
- Linux和Windows下:Python Crypto模块安装方式区别
- Python 3 加密简介(python des加密解密)
- 怎样从零开始编译一个魔兽世界开源服务端Windows
- 附1-Conda部署安装及基本使用(conda安装教程)
- 如何配置全世界最小的 MySQL 服务器
- 如何使用Github Action来自动化编译PolarDB-PG数据库
- 面向NDK开发者的Android 7.0变更(ndk android.mk)
- 标签列表
-
- mybatiscollection (79)
- mqtt服务器 (88)
- keyerror (78)
- c#map (65)
- resize函数 (64)
- xftp6 (83)
- bt搜索 (75)
- c#var (76)
- mybatis大于等于 (64)
- xcode-select (66)
- mysql授权 (74)
- 下载测试 (70)
- linuxlink (65)
- pythonwget (67)
- androidinclude (65)
- libcrypto.so (74)
- logstashinput (65)
- hadoop端口 (65)
- vue阻止冒泡 (67)
- jquery跨域 (68)
- php写入文件 (73)
- kafkatools (66)
- mysql导出数据库 (66)
- jquery鼠标移入移出 (71)
- 取小数点后两位的函数 (73)