大数据实时技术,KAFKA使用与版本新特性
bigegpt 2024-10-19 02:48 6 浏览
简易版 JAVA 开发
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* Created by 30869 on 2017/6/1.
*/
public class MyProducer {
//0.9版本过后,使用新的API KAFKA PRODUCER构建
private static KafkaProducer producer;
public static void main(String[] args) {
Properties properties=new Properties();
// properties.setProperty("metadata.broker.list","192.168.133.134:19092,192.168.133.133:19092," +
// "192.168.133.130:19092");
//必要参数 3个
//设置消息发送到broker 集群
properties.setProperty("bootstrap.servers","192.168.133.134:19092,192.168.133.133:19092," +
"192.168.133.130:19092");
//指定我们的消息序列化类
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//构建producer的时候需要指定producer发送的参数
producer=new KafkaProducer(properties);
//如果发送的主题不存在 ,会自动进行创建
producer.send(new ProducerRecord("changjinlu","Hello changjinglu"));
producer.close();
}
}
KAFKA 最终优化
1, 写负载均衡
2, 批量提交,使用producer.send(list)
3, 大批量提交过后 关闭producer,然后new Producer()
4, 使用异步发送
KAFKA 并发读:
涉及到的概念,分区与CONSUMER的关系
理论上 ,一个分区使用一个CONSUMER线程进行处理
前提:
一个PRODUCER , 消息并发写,然后使用的是负载均衡
例如: 分区数量为 3 , producer 同时写三个分区
最终我们要实现: consumer并发的读取 三个分区的数据,并且 一个线程只能取一个分区的数据。
最终展现如下:
Message1 from partion 0 , thread 1
Message2 from partion 1 , thread 2
Message3 from partion 2 , thread 3
新版本特性:
KafkaProducer 的发送消息都是异步方式发送
发送端可配置参数及其含义
Batch.size 针对partition级别的缓存,当发送到partition上的消息到达一定量过后,才进行发送。
linger.ms 针对PRODUCER级别,如果没有到达batch.size ,那么就会以linger.ms为准。
例如: batch.size 设置为 16384
Linger.ms 设置为 3000
消息发送过后(1S的延时):没有到达batch.size, 首先触发 linger.ms
buffer.memory 针对PRODUCER级别的缓存,当所有partition的缓存加起来超过producer缓存时,触发 消息的批量发送。
Acks 配置选项 :0 1 all
分别代表 0 没有ack确认消息
1 发送一条确认,LEAD写入成功即成功,其他FOLLOWER不管
All 发送所有确认,所有集群机器确认
Retries 消息发送重试 ,如果发送失败会重新发送,会出现 消息重复发送
enable.auto.commit 可选项: ture false
当为true的时候 kafka 自动提交我们的消费纪录
为false的时候,关闭自动提交功能
如果关闭会出现: 消息虽然读取了,但是 再开consumer又会重新读取一次,因为当为false的时候,消费的offset没有提交到 kafka。
解决上面的情况:
通过 consumer.commitSync() 进行手动提交当前消费的offset
底层存储多了一个 timeindex
主要保存两个内容,一个 producer 发送这条消息的时候,一个是 broker写入消息成功的时间
主要用于消息的合并,还有kStream的实时消费。
SEEK 跳转到固定的OFFSET进行消费?
要实现 from beginning
1, 去掉 consumer.subscribe() ,添加consumer.assign()
两者 二选其一
2, 因为consumer.seek()需要传入 topicPartition
所以呢需要自己定义 topicPartition(topic,parition)
3,设置consumer.seekToBeginning(parts);
例如代码:
TopicPartition topicPartition=new TopicPartition(topic,0);
// TopicPartition topicPartition1=new TopicPartition(topic,1);
// TopicPartition topicPartition2=new TopicPartition(topic,2);
List<TopicPartition> parts=new ArrayList<TopicPartition>();
parts.add(topicPartition);
// parts.add(topicPartition1);
// parts.add(topicPartition2);
consumer.assign(parts);
// consumer.subscribe(topis);
consumer.seekToBeginning(parts);
需要定位到 指定的 offset 偏移量
使用函数: consumer.seek(topPartition,offset)
针对单分区的偏移量记录
KAFKA SHELL 高阶
[hadoop@master bin]$ ./kafka-consumer-offset-checker.sh --zookeeper master:12181 --topic pengyong --group guoxu
#PS : 以上命令,通过 kafka工具类定位 guoxu 这一个group组消费到了哪一个offset
OFFSET 是针对PARTITION级别的。
[2017-06-01 06:01:43,203] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Group Topic Pid Offset logSize Lag
guoxu pengyong 0 502595 502595 0
guoxu pengyong 1 567011 567011 0
guoxu pengyong 2 540550 540550 0
group 代表 consumer group的名称
topic 主题ID
PID 代表 topic的分区
Offset 当前分区,当前group组,消费topic到那一条消息
LAG 还剩下多少没读
LOGSIZE 这一个PARTITION总共有多少条消息
最终确定 :
KAFKA纪录消费者的 四种属性 为
Group Topic Pid Offset
@来自科多大数据
相关推荐
- Java 泛型大揭秘:类型参数、通配符与最佳实践
-
引言在编程世界中,代码的可重用性和可维护性是至关重要的。为了实现这些目标,Java5引入了一种名为泛型(Generics)的强大功能。本文将详细介绍Java泛型的概念、优势和局限性,以及如何在...
- K8s 的标签与选择器:流畅运维的秘诀
-
在Kubernetes的世界里,**标签(Label)和选择器(Selector)**并不是最炫酷的技术,但却是贯穿整个集群管理与运维流程的核心机制。正是它们让复杂的资源调度、查询、自动化运维变得...
- 哈希Hash算法:原理、应用(哈希算法 知乎)
-
原作者:Linux教程,原文地址:「链接」什么是哈希算法?哈希算法(HashAlgorithm),又称为散列算法或杂凑算法,是一种将任意长度的数据输入转换为固定长度输出值的数学函数。其输出结果通常被...
- C#学习:基于LLM的简历评估程序(c# 简历)
-
前言在pocketflow的例子中看到了一个基于LLM的简历评估程序的例子,感觉还挺好玩的,为了练习一下C#,我最近使用C#重写了一个。准备不同的简历:image-20250528183949844查...
- 55顺位,砍41+14+3!季后赛也成得分王,难道他也是一名球星?
-
雷霆队最不可思议的新星:一个55号秀的疯狂逆袭!你是不是也觉得NBA最底层的55号秀,就只能当饮水机管理员?今年的55号秀阿龙·威金斯恐怕要打破你的认知了!常规赛阶段,这位二轮秀就像开了窍的天才,直接...
- 5分钟读懂C#字典对象(c# 字典获取值)
-
什么是字典对象在C#中,使用Dictionary类来管理由键值对组成的集合,这类集合被称为字典。字典最大的特点就是能够根据键来快速查找集合中的值,其键的定义不能重复,具有唯一性,相当于数组索引值,字典...
- c#窗体传值(c# 跨窗体传递数据)
-
在WinForm编程中我们经常需要进行俩个窗体间的传值。下面我给出了两种方法,来实现传值一、在输入数据的界面中定义一个属性,供接受数据的窗体使用1、子窗体usingSystem;usingSyst...
- C#入门篇章—委托(c#委托的理解)
-
C#委托1.委托的定义和使用委托的作用:如果要把方法作为函数来进行传递的话,就要用到委托。委托是一个类型,这个类型可以赋值一个方法的引用。C#的委托通过delegate关键字来声明。声明委托的...
- C#.NET in、out、ref详解(c#.net framework)
-
简介在C#中,in、ref和out是用于修改方法参数传递方式的关键字,它们决定了参数是按值传递还是按引用传递,以及参数是否必须在传递前初始化。基本语义对比修饰符传递方式可读写性必须初始化调用...
- C#广义表(广义表headtail)
-
在C#中,广义表(GeneralizedList)是一种特殊的数据结构,它是线性表的推广。广义表可以包含单个元素(称为原子),也可以包含另一个广义表(称为子表)。以下是一个简单的C#广义表示例代...
- 「C#.NET 拾遗补漏」04:你必须知道的反射
-
阅读本文大概需要3分钟。通常,反射用于动态获取对象的类型、属性和方法等信息。今天带你玩转反射,来汇总一下反射的各种常见操作,捡漏看看有没有你不知道的。获取类型的成员Type类的GetMembe...
- C#启动外部程序的问题(c#怎么启动)
-
IT&OT的深度融合是智能制造的基石。本公众号将聚焦于PLC编程与上位机开发。除理论知识外,也会结合我们团队在开发过程中遇到的具体问题介绍一些项目经验。在使用C#开发上位机时,有时会需要启动外部的一些...
- 全网最狠C#面试拷问:这20道题没答出来,别说你懂.NET!
-
在竞争激烈的C#开发岗位求职过程中,面试是必经的一道关卡。而一场高质量的面试,不仅能筛选出真正掌握C#和.NET技术精髓的人才,也能让求职者对自身技术水平有更清晰的认知。今天,就为大家精心准备了20道...
- C#匿名方法(c#匿名方法与匿名类)
-
C#中的匿名方法是一种没有名称只有主体的方法,它提供了一种传递代码块作为委托参数的技术。以下是关于C#匿名方法的一些重要特点和用法:特点省略参数列表:使用匿名方法可省略参数列表,这意味着匿名方法...
- C# Windows窗体(.Net Framework)知识总结
-
Windows窗体可大致分为Form窗体和MDI窗体,Form窗体没什么好细说的,知识点总结都在思维导图里面了,下文将围绕MDI窗体来讲述。MDI(MultipleDocumentInterfac...
- 一周热门
- 最近发表
- 标签列表
-
- mybatiscollection (79)
- mqtt服务器 (88)
- keyerror (78)
- c#map (65)
- resize函数 (64)
- xftp6 (83)
- bt搜索 (75)
- c#var (76)
- mybatis大于等于 (64)
- xcode-select (66)
- mysql授权 (74)
- 下载测试 (70)
- linuxlink (65)
- pythonwget (67)
- androidinclude (65)
- logstashinput (65)
- hadoop端口 (65)
- vue阻止冒泡 (67)
- oracle时间戳转换日期 (64)
- jquery跨域 (68)
- php写入文件 (73)
- kafkatools (66)
- mysql导出数据库 (66)
- jquery鼠标移入移出 (71)
- 取小数点后两位的函数 (73)