kafka简介 kafka使用入门
bigegpt 2024-10-19 02:48 7 浏览
kafka应用场景
kafka是分布式消息系统,具有高吞吐量,可容错的发布-订阅消息系统。
应用场景:
- 用户活动追踪
- 日志聚合
- 限流削峰
高吞吐率实现:
- 顺序读写
- 零拷贝
- 批量发送
- 消息压缩
kafka基本概念
- Topic,相当于消息的一个主题,标签
- Partition,一个topic可以有多个partition,一个partition对应系统上的一个到多个目录。一个topic的partition数量应该是broker的整数倍。
- segment,一个partition有多个segment组成,每个segment文件大小相等文件由.log 和 .index文件组成,.index是存放.log文件中消息的索引查看log文件:bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs-3/test-0/00000000000000000000.log --print-data-log
- broker,kafka集群中的每个节点称为一个broker
- producer,消息的生产者
- consumer,消息的消费者,一个消费者可以消费多个topic的消息,一个消费者可以消费一个topic的多个partition的消息一个partition允许多个消费者同时消费
- consumer group,消费者组,kafka保证一个消息只会被一个组中的某一个kafka消费。
- replicas of partition, 分区副本,为了防止消息丢失而创建的分区的备份。
- partition leader,每个partition有多个副本,而读写操作只能发生在leader上
- partition follower,所有follower都需要从leader同步消息,Leader与follower是主备关系,而非主从关系。
- ISR, In-Sync-Replicas,是指副本同步列表AR,Assigned Replicas,在最初没有leader时,ISR=AROSR,Outof-Sync-ReplicasAR = ISR + OSR + Leader,ISR是存放在zk中的
- offset,每条消息都有一个当前Partition下唯一的64字节的offset
- broker controller, kafka集群中有一个broker会被选举出来,作为controller,负责管理整个集群的partition和replicas的状态只有broker controller会向zookeeper中注册watcher
- 脑裂:(Brain Split),由于某种原因导致高可用集群中出现了两个master。zk的watcher机制及分布式锁会引发master的假死,从而导致脑裂。
- HW(High Water-Mark)与 LEO(Log End Offset)
HW 是kafka消费者可以消费到的最高partition的偏移量,HW保证了kafka集群中消息的一致性。LEO 是日志消息最后的偏移量对于partition leader中新写入的消息,是不能立即被消费者消费的,只有当ISR中所有的partition follower消费之后,更新HW,写入ISR,此时消息才能被消费者消费。HW的更新速度取决于那个性能最差的broker
- zookeeper
zookeeper负则broker controller的选举
partition leader是由 broker controller负责选举的
- Coordinator
coordinator是用来管理消费者组的,是运行在每个broker上的group coordinator进程,主要负则offset的位移管理和rebalance,一个coordinator可以管理多个消费者组
- rebalance当消费者组中的消费者数量发生变化,或者topic中的partition数量发生变化,会导致partition的重新分配,这个过程叫做Rebalance.rebalance可以给系统带来高可用性和伸缩性,但是在Rebalance期间,消费者是无法读取消息的,因此要避免不必要的Rebalance
- 引发Rebalance的情形: 消费者组中添加消费者消费者取消订阅, 关闭或崩溃向一个topic中添加新的partition当有broker挂了
- offset commit消费者从partition中取出一批消息放入buffer中进行消费,在规定的时间内(seession.timeout.ms)消费完消息后,会自动将其消费的commit提交给broker,broker可以判断哪些消息有被消费过,若在规定时间内没有消费完毕,其是不会提交offset的, 可以避免在Rebalance时重复消费。
注: 从kafka0.9开始,offset保存在brokers中,__consumers-offsets
kafka工作原理与流程
- 消息路由(即写入的消息放入到哪个partition)若指定了partition,则写入指定的partition若未指定partition,但指定了key,则对key取hash然后对partition个数取余partition和key均为指定,则根据轮询算法选出一个partition
- 消息写入算法(即消息写入的过程)producer从zookeeper中获取partition的leaderproducer将消息发送给leaderleader将消息写入到本地logISR中的follower从leader中pull消息,写入本地log后向leader发送ackleader收到所有follower的ack后,增加HW并向producer发送ACK
- HW截断机制HW截断机制保证了partition的leader宕机之后,leader与follower之间的数据不一致。两种情况:当leader宕机之后,选举出一个新的leader,为了防止leader和follower的数据不一致,此时所有的FOLLOWER都要将数据截断到HW位置, 然后再同步新leader中的数据当leader从宕机中恢复后,发现新的leader中和自己的数据不一致,此时宕机的leader会将数据截断到宕机之前的HW位置,然后同步新的leader中的数据
- 消息发送的可靠性机制 producer向kafka发送消息时,可以选择需要的可靠性级别,通过request.required.acks参数的值进行设置0值(异步发送)不需要kafka反馈成功ack,效率最高,可靠性最低,因为消息可能会丢失。消息丢失的情况:在传输途中丢失,网络原因在broker中丢失,消息发到broker时是先放入到buffer,当broker的buffer满足将消息写入到partition时(容量到,时间到,或数量到)在buffer正要写入到partition但还未写入时,新的消息又来了,可能丢失。顺序与生产顺序不一致(网络原因)1值(同步发送)消息发送成功后,立即向生产者返回ack(未等待ISR中的follower同步消息)当leader收到新的消息后还未同步,leader宕机,新选举出的leader是不知道该信息存在的,造成消息的丢失。-1值(同步发送)leader收到消息,并向ISR列表中的所有FOLLOWER都同步了消息之后再向producer返回ack.该模式消息几乎不会丢失,但有可能出现消息重复接收的情况。
- 消费者消费过程解析消费者消费订阅的topic, broker controller会为消费者指定消息的partition,并将partition的offset发送给消费者当有生产者向该partition中生产消息时,broker会将消息推送给消费者消费者收到推送,消费该消息消费者消费完该消息,向broker发送消费成功反馈broker收到消费者反馈,更新partition中的offset
- partition的leader选举范围partition的leader宕机后,broker controller从ISR中选举一个FOLLOWER成为新的leader,但若ISR中所有的FOLLOWER都宕机了, 则可以通过unclean.leader.election.enable的取值来设置leader的选举范围
- unclean.leader.election.enablefalse必须等到副本中有FOLLOWER活过来再进行新的选举,可靠性有保证,但可用性低。true选择任何一个没有宕机的FOLLOWER,但该FOLLOWER可能不在ISR中(OSR)。
- 重复消费及解决方案同一个consumer重复消费有一个消费的超时时间,auto.commit.interval.ms,在该时间内没有消费完消息,此时consumer会向broker提交一个异常,但是由于没有消费完,所以没有向partition提交offset,所以再次消费时还是消费的同样的消息。不同的consumer重复消费当consumer消费了某条消息后,提交了offset,但是由于网络等原因,没有在session.timeout.ms中将该offset发送给broker,broker认为该consumer宕机,然后rebalnce,这个partition又被分配给了其他消费者,由于该partition的offset没有被修改,所以会再次被消费
解决方案
增加auto.commit.interval.ms
设置enable.auto.commit为false,将kafka自动提交offset该为手动提交
手动提交分为:同步提交,异步提交,同异步联合提交
public SyncAsyncManualConsumer() {
super("KafkaConsumerTest", false);
Properties properties = new Properties();
String brokers = "kafkaOS1:9092,kafkaOS2:9092,kafkaOS3:9092";
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "cityGro11");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
// 设置一次提交的offset个数
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer<Integer, String>(properties);
}
@Override
public void doWork() {
// 指定要消费的主题
consumer.subscribe(Collections.singletonList("cities"));
ConsumerRecords<Integer, String> records = consumer.poll(1000);
for(ConsumerRecord record : records) {
System.out.print("topic = " + record.topic());
System.out.print(" partition = " + record.partition());
System.out.print(" key = " + record.key());
System.out.println(" value = " + record.value());
try {
// 带回调功能的手动异步提交
consumer.commitAsync((offsets, e) -> {
if (e != null) {
System.out.print("提交失败,offsets = " + offsets);
System.out.println(",exception = " + e);
}
});
}catch (Exception e) {
e.printStackTrace();
// 同步提交
consumer.commitSync();
}
}
}
kafka如何保证数据不丢失
- 生产者数据的不丢失同步模式 request.required.acks = 1(follower 未同步数据)/-1(follower同步完数据,但效率低)异步模式通过设置时间阈值和消息数量阈值, 并且设置为阻塞模式producer.type=async request.required.acks=1 queue.buffering.max.ms=5000 queue.buffering.max.messages=10000 queue.enqueue.timeout.ms = -1 batch.num.messages=200
- 消费者的数据不丢失通过offset commit 来保证数据的不丢失,kafka自己记录了每次消费的offset数值,下次继续消费的时候,接着上次的offset进行消费即可
kafka性能到底好在哪儿
- 采用BIO, 虽AIO性能更好, 但是编程难度较大
- 高性能的网络设计
- 顺序写客户端写数据----> 操作系统缓存 ----> 写入磁盘(顺序写), 如果磁盘的个数和转数跟得上的话, 都快赶上写内存的速度了.
- 跳表, 稀松索引, 零拷贝
相关推荐
- 当Frida来“敲”门(frida是什么)
-
0x1渗透测试瓶颈目前,碰到越来越多的大客户都会将核心资产业务集中在统一的APP上,或者对自己比较重要的APP,如自己的主业务,办公APP进行加壳,流量加密,投入了很多精力在移动端的防护上。而现在挖...
- 服务端性能测试实战3-性能测试脚本开发
-
前言在前面的两篇文章中,我们分别介绍了性能测试的理论知识以及性能测试计划制定,本篇文章将重点介绍性能测试脚本开发。脚本开发将分为两个阶段:阶段一:了解各个接口的入参、出参,使用Python代码模拟前端...
- Springboot整合Apache Ftpserver拓展功能及业务讲解(三)
-
今日分享每天分享技术实战干货,技术在于积累和收藏,希望可以帮助到您,同时也希望获得您的支持和关注。架构开源地址:https://gitee.com/msxyspringboot整合Ftpserver参...
- Linux和Windows下:Python Crypto模块安装方式区别
-
一、Linux环境下:fromCrypto.SignatureimportPKCS1_v1_5如果导包报错:ImportError:Nomodulenamed'Crypt...
- Python 3 加密简介(python des加密解密)
-
Python3的标准库中是没多少用来解决加密的,不过却有用于处理哈希的库。在这里我们会对其进行一个简单的介绍,但重点会放在两个第三方的软件包:PyCrypto和cryptography上,我...
- 怎样从零开始编译一个魔兽世界开源服务端Windows
-
第二章:编译和安装我是艾西,上期我们讲述到编译一个魔兽世界开源服务端环境准备,那么今天跟大家聊聊怎么编译和安装我们直接进入正题(上一章没有看到的小伙伴可以点我主页查看)编译服务端:在D盘新建一个文件夹...
- 附1-Conda部署安装及基本使用(conda安装教程)
-
Windows环境安装安装介质下载下载地址:https://www.anaconda.com/products/individual安装Anaconda安装时,选择自定义安装,选择自定义安装路径:配置...
- 如何配置全世界最小的 MySQL 服务器
-
配置全世界最小的MySQL服务器——如何在一块IntelEdison为控制板上安装一个MySQL服务器。介绍在我最近的一篇博文中,物联网,消息以及MySQL,我展示了如果Partic...
- 如何使用Github Action来自动化编译PolarDB-PG数据库
-
随着PolarDB在国产数据库领域荣膺桂冠并持续获得广泛认可,越来越多的学生和技术爱好者开始关注并涉足这款由阿里巴巴集团倾力打造且性能卓越的关系型云原生数据库。有很多同学想要上手尝试,却卡在了编译数据...
- 面向NDK开发者的Android 7.0变更(ndk android.mk)
-
订阅Google官方微信公众号:谷歌开发者。与谷歌一起创造未来!受Android平台其他改进的影响,为了方便加载本机代码,AndroidM和N中的动态链接器对编写整洁且跨平台兼容的本机...
- 信创改造--人大金仓(Kingbase)数据库安装、备份恢复的问题纪要
-
问题一:在安装KingbaseES时,安装用户对于安装路径需有“读”、“写”、“执行”的权限。在Linux系统中,需要以非root用户执行安装程序,且该用户要有标准的home目录,您可...
- OpenSSH 安全漏洞,修补操作一手掌握
-
1.漏洞概述近日,国家信息安全漏洞库(CNNVD)收到关于OpenSSH安全漏洞(CNNVD-202407-017、CVE-2024-6387)情况的报送。攻击者可以利用该漏洞在无需认证的情况下,通...
- Linux:lsof命令详解(linux lsof命令详解)
-
介绍欢迎来到这篇博客。在这篇博客中,我们将学习Unix/Linux系统上的lsof命令行工具。命令行工具是您使用CLI(命令行界面)而不是GUI(图形用户界面)运行的程序或工具。lsoflsof代表&...
- 幻隐说固态第一期:固态硬盘接口类别
-
前排声明所有信息来源于网络收集,如有错误请评论区指出更正。废话不多说,目前固态硬盘接口按速度由慢到快分有这几类:SATA、mSATA、SATAExpress、PCI-E、m.2、u.2。下面我们来...
- 新品轰炸 影驰SSD多款产品登Computex
-
分享泡泡网SSD固态硬盘频道6月6日台北电脑展作为全球第二、亚洲最大的3C/IT产业链专业展,吸引了众多IT厂商和全球各地媒体的热烈关注,全球存储新势力—影驰,也积极参与其中,为广大玩家朋友带来了...
- 一周热门
- 最近发表
-
- 当Frida来“敲”门(frida是什么)
- 服务端性能测试实战3-性能测试脚本开发
- Springboot整合Apache Ftpserver拓展功能及业务讲解(三)
- Linux和Windows下:Python Crypto模块安装方式区别
- Python 3 加密简介(python des加密解密)
- 怎样从零开始编译一个魔兽世界开源服务端Windows
- 附1-Conda部署安装及基本使用(conda安装教程)
- 如何配置全世界最小的 MySQL 服务器
- 如何使用Github Action来自动化编译PolarDB-PG数据库
- 面向NDK开发者的Android 7.0变更(ndk android.mk)
- 标签列表
-
- mybatiscollection (79)
- mqtt服务器 (88)
- keyerror (78)
- c#map (65)
- resize函数 (64)
- xftp6 (83)
- bt搜索 (75)
- c#var (76)
- mybatis大于等于 (64)
- xcode-select (66)
- mysql授权 (74)
- 下载测试 (70)
- linuxlink (65)
- pythonwget (67)
- androidinclude (65)
- libcrypto.so (74)
- logstashinput (65)
- hadoop端口 (65)
- vue阻止冒泡 (67)
- jquery跨域 (68)
- php写入文件 (73)
- kafkatools (66)
- mysql导出数据库 (66)
- jquery鼠标移入移出 (71)
- 取小数点后两位的函数 (73)