kafka消息存储机制和原理 kafka消息保留机制
bigegpt 2024-10-19 02:48 8 浏览
消息的保存路径
消息发送端发送消息到 broker 上以后,消息是如何持久化的?
数据分片
kafka 使用日志文件的方式来保存生产者和发送者的消息,每条消息都有一个 offset 值来表示它在分区中的偏移量。Kafka 中存储的一般都是海量的消息数据,为了避免日志文件过大,一个分片 并不是直接对应在一个磁盘上的日志文件,而是对应磁盘上的一个目录,这个目录的命名规则是<topic_name>_<partition_id>。
比如创建一个名为firstTopic的topic,其中有3个partition,那么在 kafka 的数据目录(/tmp/kafka-log)中就有 3 个目录,firstTopic-0~3
多个分区在集群中多个broker上的分配方法
1.将所有 N Broker 和待分配的 i 个 Partition 排序
2.将第 i 个 Partition 分配到第(i mod n)个 Broker 上
log分段:
每个分片目录中,kafka 通过分段的方式将 数据 分为多个 LogSegment,一个 LogSegment 对应磁盘上的一个日志文件(00000000000000000000.log)和一个索引文件(如上:00000000000000000000.index),其中日志文件是用来记录消息的。索引文件是用来保存消息的索引。每个LogSegment 的大小可以在server.properties 中log.segment.bytes=107370 (设置分段大小,默认是1gb)选项进行设置。
segment 的 index file 和 data file 2 个文件一一对应,成对出现,后缀".index"和“.log”分别表示为 segment 索引文件、数据文件.命名规则:partion 全局的第一个 segment从 0 开始,后续每个 segment 文件名为上一个 segment文件最后一条消息的 offset 值进行递增。数值最大为 64 位long 大小,20 位数字字符长度,没有数字用 0 填充
第一个 log 文件的最后一个 offset 为:5376,所以下一个segment 的文件命名为: 0000000000000005376.log。对应的 index 为 00000000000000005376.index
kafka 这种分片和分段策略,避免了数据量过大时,数据文件文件无限扩张带来的隐患,更有助于消息文件的维护以及被消费的消息的清理。
日志和索引文件内容分析:
通过下面这条命令可以看到 kafka 消息日志的内容
sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.log --print-data-log
输出结果为:
offset: 5376 position: 102124 CreateTime: 1531477349287isvalid: true keysize: -1 valuesize: 12 magic: 2compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: []payload: message_5376
可以看到一条消息,会包含很多的字段,如下:
offset: 5371 position: 102124 CreateTime: 1531477349286isvalid: true keysize: -1 valuesize: 12 magic: 2compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: []payload: message_5371
各字段的意义:
offset:记录号 ;
position:偏移量;
createTime 创建时间、
keysize 和 valuesize 表示 key 和 value 的大小
compresscodec 表示压缩编码
payload:表示消息的具体内容
为了提高查找消息的性能,kafka为每一个日志文件添加 了2 个索引文件:OffsetIndex 和 TimeIndex,分别对应*.index以及*.timeindex, *.TimeIndex 是映射时间戳和相对 offset的文件
查 看 索 引 内 容 命令:
sh kafka-run-class.shkafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.index --print-data-log
索引文件和日志文件内容关系如下
如上图所示,index 文件中存储了索引以及物理偏移量。 log 文件存储了消息的内容。索引文件中保存了部分offset和偏移量position的对应关系。比如 index文件中 [4053,80899],表示在 log 文件中,对应的是第 4053 条记录,物理偏移量(position)为 80899.
在 partition 中通过 offset 查找 message过程
- 根据 offset 的值,查找 segment 段中的 index 索引文件。由于索引文件命名是以上一个文件的最后一个offset 进行命名的,所以,使用二分查找算法能够根据offset 快速定位到指定的索引文件
- 找到索引文件后,根据 offset 进行定位,找到索引文件中的匹配范围的偏移量position。(kafka 采用稀疏索引的方式来提高查找性能)
- 得到 position 以后,再到对应的 log 文件中,从 position处开始查找 offset 对应的消息,将每条消息的 offset 与目标 offset 进行比较,直到找到消息
比如说,我们要查找 offset=2490 这条消息,那么先找到00000000000000000000.index, 然后找到[2487,49111]这个索引,再到 log 文件中,根据 49111 这个 position 开始查找,比较每条消息的 offset 是否大于等于 2490。最后查找到对应的消息以后返回
日志的清除策略以及压缩策略
日志的清理策略有两个
- 根据消息的保留时间,当消息在 kafka 中保存的时间超过了指定的时间,就会触发清理过程
- 根据 topic 存储的数据大小,当 topic 所占的日志文件大小大于一定的阀值,则可以开始删除最旧的消息。
- 通过 log.retention.bytes 和 log.retention.hours 这两个参数来设置,当其中任意一个达到要求,都会执行删除。默认的保留时间是:7 天
kafka会启动一个后台线程,定期检查是否存在可以删除的消息。
日志压缩策略
Kafka 还提供了“日志压缩(Log Compaction)”功能,通过这个功能可以有效的减少日志文件的大小,缓解磁盘紧张的情况,在很多实际场景中,消息的 key 和 value 的值之间的对应关系是不断变化的,就像数据库中的数据会不断被修改一样,消费者只关心 key 对应的最新的 value。因此,我们可以开启 kafka 的日志压缩功能,服务端会在后台启动Cleaner线程池,定期将相同的key进行合并,只保留最新的 value 值。日志的压缩原理如下图:
消息写入的性能
顺序写
我们现在大部分企业仍然用的是机械结构的磁盘,如果把消息以随机的方式写入到磁盘,那么磁盘首先要做的就是寻址,也就是定位到数据所在的物理地址,在磁盘上就要找到对应的柱面、磁头以及对应的扇区;这个过程相对内存来说会消耗大量时间,为了规避随机读写带来的时间消耗,kafka 采用顺序写的方式存储数据。
零拷贝
即使采用顺序写,但是频繁的 I/O 操作仍然会造成磁盘的性能瓶颈,所以 kafka还有一个性能策略:零拷贝
消息从发送到落地保存,broker 维护的消息日志本身就是文件目录,每个文件都是二进制保存,生产者和消费者使用相同的格式来处理。在消费者获取消息时,服务器先从硬盘读取数据到内存,然后把内存中的数据原封不动的通过 socket 发送给消费者。虽然这个操作描述起来很简单,但实际上经历了很多步骤。如下:
? 操作系统将数据从磁盘读入到内核空间的页缓存
? 应用程序将数据从内核空间读入到用户空间缓存中
? 应用程序将数据写回到内核空间到 socket 缓存中
? 操作系统将数据从 socket 缓冲区复制到网卡缓冲区,以便将数据经网络发出
这个过程涉及到 4 次上下文切换以及 4 次数据复制,并且有两次复制操作是由 CPU 完成。但是这个过程中,数据完全没有
进行变化,仅仅是从磁盘复制到网卡缓冲区。通过“零拷贝”技术,可以去掉这些没必要的数据复制操作,同时也会减少上下文切换次数。现代的 unix 操作系统提供一个优化的代码路径,用于将数据从页缓存传输到 socket;
在 Linux 中,是通过 sendfile 系统调用来完成的。Java 提供了访问这个系统调用的方法:FileChannel.transferTo API。使用 sendfile,只需要一次拷贝就行,允许操作系统将数据直接从页缓存发送到网络上。所以在这个优化的路径中,只有最后一步将数据拷贝到网卡缓存中是需要的
相关推荐
- 当Frida来“敲”门(frida是什么)
-
0x1渗透测试瓶颈目前,碰到越来越多的大客户都会将核心资产业务集中在统一的APP上,或者对自己比较重要的APP,如自己的主业务,办公APP进行加壳,流量加密,投入了很多精力在移动端的防护上。而现在挖...
- 服务端性能测试实战3-性能测试脚本开发
-
前言在前面的两篇文章中,我们分别介绍了性能测试的理论知识以及性能测试计划制定,本篇文章将重点介绍性能测试脚本开发。脚本开发将分为两个阶段:阶段一:了解各个接口的入参、出参,使用Python代码模拟前端...
- Springboot整合Apache Ftpserver拓展功能及业务讲解(三)
-
今日分享每天分享技术实战干货,技术在于积累和收藏,希望可以帮助到您,同时也希望获得您的支持和关注。架构开源地址:https://gitee.com/msxyspringboot整合Ftpserver参...
- Linux和Windows下:Python Crypto模块安装方式区别
-
一、Linux环境下:fromCrypto.SignatureimportPKCS1_v1_5如果导包报错:ImportError:Nomodulenamed'Crypt...
- Python 3 加密简介(python des加密解密)
-
Python3的标准库中是没多少用来解决加密的,不过却有用于处理哈希的库。在这里我们会对其进行一个简单的介绍,但重点会放在两个第三方的软件包:PyCrypto和cryptography上,我...
- 怎样从零开始编译一个魔兽世界开源服务端Windows
-
第二章:编译和安装我是艾西,上期我们讲述到编译一个魔兽世界开源服务端环境准备,那么今天跟大家聊聊怎么编译和安装我们直接进入正题(上一章没有看到的小伙伴可以点我主页查看)编译服务端:在D盘新建一个文件夹...
- 附1-Conda部署安装及基本使用(conda安装教程)
-
Windows环境安装安装介质下载下载地址:https://www.anaconda.com/products/individual安装Anaconda安装时,选择自定义安装,选择自定义安装路径:配置...
- 如何配置全世界最小的 MySQL 服务器
-
配置全世界最小的MySQL服务器——如何在一块IntelEdison为控制板上安装一个MySQL服务器。介绍在我最近的一篇博文中,物联网,消息以及MySQL,我展示了如果Partic...
- 如何使用Github Action来自动化编译PolarDB-PG数据库
-
随着PolarDB在国产数据库领域荣膺桂冠并持续获得广泛认可,越来越多的学生和技术爱好者开始关注并涉足这款由阿里巴巴集团倾力打造且性能卓越的关系型云原生数据库。有很多同学想要上手尝试,却卡在了编译数据...
- 面向NDK开发者的Android 7.0变更(ndk android.mk)
-
订阅Google官方微信公众号:谷歌开发者。与谷歌一起创造未来!受Android平台其他改进的影响,为了方便加载本机代码,AndroidM和N中的动态链接器对编写整洁且跨平台兼容的本机...
- 信创改造--人大金仓(Kingbase)数据库安装、备份恢复的问题纪要
-
问题一:在安装KingbaseES时,安装用户对于安装路径需有“读”、“写”、“执行”的权限。在Linux系统中,需要以非root用户执行安装程序,且该用户要有标准的home目录,您可...
- OpenSSH 安全漏洞,修补操作一手掌握
-
1.漏洞概述近日,国家信息安全漏洞库(CNNVD)收到关于OpenSSH安全漏洞(CNNVD-202407-017、CVE-2024-6387)情况的报送。攻击者可以利用该漏洞在无需认证的情况下,通...
- Linux:lsof命令详解(linux lsof命令详解)
-
介绍欢迎来到这篇博客。在这篇博客中,我们将学习Unix/Linux系统上的lsof命令行工具。命令行工具是您使用CLI(命令行界面)而不是GUI(图形用户界面)运行的程序或工具。lsoflsof代表&...
- 幻隐说固态第一期:固态硬盘接口类别
-
前排声明所有信息来源于网络收集,如有错误请评论区指出更正。废话不多说,目前固态硬盘接口按速度由慢到快分有这几类:SATA、mSATA、SATAExpress、PCI-E、m.2、u.2。下面我们来...
- 新品轰炸 影驰SSD多款产品登Computex
-
分享泡泡网SSD固态硬盘频道6月6日台北电脑展作为全球第二、亚洲最大的3C/IT产业链专业展,吸引了众多IT厂商和全球各地媒体的热烈关注,全球存储新势力—影驰,也积极参与其中,为广大玩家朋友带来了...
- 一周热门
- 最近发表
-
- 当Frida来“敲”门(frida是什么)
- 服务端性能测试实战3-性能测试脚本开发
- Springboot整合Apache Ftpserver拓展功能及业务讲解(三)
- Linux和Windows下:Python Crypto模块安装方式区别
- Python 3 加密简介(python des加密解密)
- 怎样从零开始编译一个魔兽世界开源服务端Windows
- 附1-Conda部署安装及基本使用(conda安装教程)
- 如何配置全世界最小的 MySQL 服务器
- 如何使用Github Action来自动化编译PolarDB-PG数据库
- 面向NDK开发者的Android 7.0变更(ndk android.mk)
- 标签列表
-
- mybatiscollection (79)
- mqtt服务器 (88)
- keyerror (78)
- c#map (65)
- resize函数 (64)
- xftp6 (83)
- bt搜索 (75)
- c#var (76)
- mybatis大于等于 (64)
- xcode-select (66)
- mysql授权 (74)
- 下载测试 (70)
- linuxlink (65)
- pythonwget (67)
- androidinclude (65)
- libcrypto.so (74)
- logstashinput (65)
- hadoop端口 (65)
- vue阻止冒泡 (67)
- jquery跨域 (68)
- php写入文件 (73)
- kafkatools (66)
- mysql导出数据库 (66)
- jquery鼠标移入移出 (71)
- 取小数点后两位的函数 (73)