flume部署安装以及案例运行
bigegpt 2024-11-22 11:04 3 浏览
基本认识:
大数据阶段数据的收集来源, flume的收集数据一般是日志,比如:网站日志
flume是一个分布式的,可靠的,可用的
flume可以做离线也可以做实时分析
collecting --》source --》数据采集来源
aggregating --》channel --》数据临时缓存(只要数据被move了,那就不在存储了)
moving --》sink --》数据的转移
1、agent :source、channel、sink
(1)source:用于采集数据,将产生的数据流传输到Channel
(2)channel:连接 sources 和 sinks ,临时缓存数据
(3)sink:从Channel收集数据,将数据写到目标源
2、Events:
(1)是Flume数据传输的基本单元
(2)由header和载有数据的一个byte array构成,byte array字节数组:存储真实的数据
(3)每一个事件的大小:deserializer.maxLineLength2048字节,编码格式:UTF-8
一个source,可以绑定多个channel
一个sink,只能绑定一个channel
flume安装:
准备安装包
apache-flume-1.7.0-bin.tar.gz
解压缩
tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /opt/bigdata/
配置文件:flume-env.sh
mv flume-env.sh.template flume-env.sh
配置jdk
export JAVA_HOME=/opt/bigdata/jdk1.8
测试是否成功
bin/flume-ng version
flume的flume-ng命令
Usage: bin/flume-ng [options]...
例如一些提交任务的命令(熟悉下格式):
bin/flume-ng agent --conf conf --name agent --conf-file conf/test.properties
bin/flume-ng agent -c conf -n agent -f conf/test.properties
bin/flume-ng avro-client --conf conf --host host --port 8080
配置情况选择:
1、flume安装在hadoop集群中:
(1)配置JAVA_HOME:
export JAVA_HOME= /opt/bigdata/jdk1.8
2、flume安装在hadoop集群中,而且还配置了HA:
(1)HDFS访问入口变化
(2)配置JAVA_HOME:
export JAVA_HOME= /opt/bigdata/jdk1.8
(3)还需要添加hadoop的core-site.xml和hdfs-site.xml拷贝到flume的conf目录
3、flume不在hadoop集群里:
(1)配置JAVA_HOME:
export JAVA_HOME= /opt/bigdata/jdk1.8
(2)还需要添加hadoop的core-site.xml和hdfs-site.xml拷贝到flume的conf目录
(3)将hadoop的一些jar包添加到flume的lib目录下(用的是什么版本拷贝什么版本)
运行官网案例:
编辑配置文件flume-test.properties(创建一个)
准备配置信息
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = masterhbase
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
查看系统有没有安装telnet:rpm -qa | grep telnet
没有的就安装: yum -y install telnet或yum -y install nc
文件log4j.properties显示的是日志信息配置
运行:
bin/flume-ng agent --conf conf --conf-file conf/flume-test.properties --name a1 -Dflume.root.logger=INFO,console
开启一个窗口:telnet连接端口号
telnet masterhbase 44444 (卡在那是正常的,你可以随意输入信息)
输入hello world
在flume中就可以看到数据
退出telnet:输入ctrl + ] 然后输入quit
运行实例一
需求:监控apache服务器的日志,利用flume监控某一个文件
安装httpd服务
yum -y install httpd
安装完成之后,会有个目录生成 /var/www/html
到/var/www/html这个目录下 vim index.html [随意输入内容]
启动服务: service httpd start
浏览网页:输入主机名[hostname]
日志产生的路径:/var/log/httpd/access_log
配置flume agent
source: exec
channel:memory
sink:hdfs
我们复制配置文件
编辑信息
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -f /var/log/httpd/access_log
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://masterhbase:8082/flume/webdata/roll/%y%m%d/%H
a1.sinks.k1.hdfs.rollInterval = 600
a1.sinks.k1.hdfs.rollSize = 1048576
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue= 1
a1.sinks.k1.hdfs.roundUnit = hour
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
一些配置说明
问题:hdfs上的文件一般数据文件大小要大,而且文件数量是要少
hdfs.rollInterval = 600 (这个地方最好还是设置一个时间)
hdfs.rollSize = 1048576 (1M,134217728-》128M)
hdfs.rollCount = 0
hdfs.minBlockReplicas = 1 (这个不设置的话,上面的参数有可能不会生效)
在hdfs文件上设置时间格式分层 年月日/时 每小时生成一个文件
hdfs.useLocalTimeStamp = true
hdfs.round = true
hdfs.roundValue= 1
hdfs.roundUnit = hour
将准备好的jar上传到flume/lib中
运行
查看hdfs上,不断刷新会有新的文件
查看下进程
运行实例二
利用flume监控某一个文件目录,将目录下滚动好的文件实时抽取到HDFS上
类型选择
source:spooldir
channel:file
sink:hdfs
创建配置文件flume-spooldir.properties
编写信息
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /data/logs
a1.sources.r1.recursiveDirectorySearch = true
# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://masterhbase:8082/flume/webdata/spooldir/%y%m%d/%H
a1.sinks.k1.hdfs.rollInterval = 600
a1.sinks.k1.hdfs.rollSize = 1048576
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.minBlockReplicas = 1
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue= 1
a1.sinks.k1.hdfs.roundUnit = hour
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
# Describe the channel
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/bigdata/apache-flume-1.7.0-bin/checkpointDir
a1.channels.c1.dataDirs = /opt/bigdata/apache-flume-1.7.0-bin/dataDirs
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
配置信息概念补充
1.source:spooldir(已经生成好的最终的数据文件)
(1)recursiveDirectorySearch 是否监视子目录以查找要读取的新文件
(2)includePattern 正则表达式,指定要包含的文件 (只.csv数据文件,是正则匹配)
(3)ignorePattern 正则表达式,指定要忽略的文件 (不抽取.csv数据文件,是正则匹配)
(4)缺点:不能对目录文件进行修改,如果有追加内容的文本文件,是不允许的(有可能不会被抽取,有可能会有错误)
2.flume监控目录,支持文件修改,并记录文件状态
(1)source:taildir (类似exec + spooldir的组合)
(2)filegroups :设置source组 可设置多个 filegroups = f1
(3)filegroups.:设置组员的监控目录和监控文件类型,使用正则表示,只能监控文件
(4)positionFile:设置定位文件的位置,以JSON格式写入给定位置文件上每个文件的最后读取位置
3.Memory Channel是一个不稳定的channel,它在内存中存储所有事件,
如果进程异常停止,内存中的数据将不能让恢复,而且受内存大小的限制。
4.flie channel:是一个持久化的channel,数据安全并且只要磁盘空间足够,它就可以将数据存储到磁盘上
5.checkpointDir:检查数据完整性,存放检查点目录,可以检测出哪些数据已被抽取,哪些还没有
6.dataDirs:存放数据的目录,dataDirs可以是多个目录,以逗号隔开,用独立的多个磁盘上的多个目录可以提高file channel的性能。
7.hdfs上数据默认是二进制的文件类型:bin/hdfs dfs -text /
8.可以修改hdfs.fileType 改为DataStream(数据流)hdfs.writeFormat = Text 改为文本格式
9.当使用DataStream时候,文件不会被压缩,不需要设置hdfs.codeC;当使用CompressedStream时候,必须设置一个正确的hdfs.codeC值;hdfs.codeC压缩编码解码器 --》snappy压缩
10.batchSize默认值:100 每个批次刷新到HDFS上的events数量;
创建目录
mkdir –p /data/logs
模拟数据
cp -r /opt/bigdata/hadoop-2.7.3/logs/* /data/logs/
查看数据
运行
bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-spooldir.properties
查看下HDFS
运行实例三
将hive的一些jar拷贝过来 flume的lib目录下
配置flume agent
source:netcat
channel:Memory
sink:hive
启动hive的元数据服务:
/opt/bigdata/apache-hive-1.2.1-bin/bin/hive --service metastore &
创建库和表 (表必须是CLUSTERED BY ,INTO BUCKETS)
create database flume_test;
use flume_test;
create table flume_user(
user_id int,
user_name string,
user_age int
)CLUSTERED BY (user_id) INTO 2 BUCKETS
row format delimited fields terminated by '\t'
stored as orc;
准备配置文件flume-sink-hive.properties
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = masterhbase
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = hive
a1.sinks.k1.hive.metastore = thrift://masterhbase:9083
a1.sinks.k1.hive.database = flume_test
a1.sinks.k1.hive.table = flume_user
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = "\t"
a1.sinks.k1.serializer.fieldnames = user_id,user_name,user_age
a1.sinks.k1.serializer.serdeSeparator = '\t'
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
配置概念补充
1.serializer: 负责解析事件中的字段并将它们映射到hive表中的列
(2)DELIMITED 普通文本
(2)json json文件 (不需要配置,JSON中的对象名称直接映射到Hive表中具有相同名称的列, 内部使用
org.apache.hive.hcatalog.data.JsonSerDe)
2.DELIMITED:
serializer.delimiter:传入数据中的字段分隔符,用双引号括起来,例如"\t"
serializer.fieldnames:从输入字段到hive表中的列的映射,指定为hive表列名称的逗号分隔列表
serializer.serdeSeparator :输出字段分隔符,单引号括起来,例如'\t'
hive参数设置vim hive-site.xml:
<property>
??? <name>hive.metastore.uris</name>
??? <value>thrift://masterhbase:9083</value>
</property>
<property>
??? <name>hive.txn.manager</name>
??? <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
??? <name>hive.compactor.initiator.on</name>
??? <value>true</value>
</property>
<property>
??? <name>hive.compactor.worker.threads</name>
??? <value>1</value>
</property>
<property>
??? <name>hive.support.concurrency</name>
??? <value>true</value>
</property>
<property>
??? <name>hive.enforce.bucketing</name>
??? <value>true</value>
</property>
<property>
??? <name> hive.exec.dynamic.partition.mode</name>
??? <value>nonstrict</value>
</property>
<property>
??? <name>hive.in.test</name>
??? <value>true</value>
</property>
解决报错问题
(1)报错:
Caused by: org.apache.thrift.TApplicationException: Internal error processing open_txns
-》hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
打开一部分事务支持
-》协同配置
hive.compactor.initiator.on=true; -》运行启动程序和清除线程,用于打开所需参数的完整列表事务
hive.compactor.worker.threads=1; -》增加工作线程的数量将减少花费的时间
hive.support.concurrency=true; -》是否支持并发,默认是false
hive.enforce.bucketing=true; -》是否启用bucketing,写入table数据时会启动分桶
hive.exec.dynamic.partition.mode=nonstrict; -》设置非严格模式
(2)启动metastore时报错:
Table 'metastore.COMPACTION_QUEUE' doesn't exist
配置以下属性:这个是用来创建COMPACTION_QUEUE这张表的
hive.in.test
true
(3)再启动metastore时报错:
Error rolling back: Can't call rollback when autocommit=true
去掉以下属性:
hive.in.test
true
之前没有安装,先安装
启动flume agent
bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-sink-hive.properties
使用nc去连接,然后输入数据,数据以制表符分割
Hive中可以看到数据
运行实例四(hive)
创建表
create table emp(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int
)CLUSTERED BY (empno) INTO 2 BUCKETS
row format delimited fields terminated by '\t'
stored as orc;
准备配置信息flume-sink-hive2.properties
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = cat /data/emp.txt
# Describe the sink
a1.sinks.k1.type = hive
a1.sinks.k1.hive.metastore = thrift://masterhbase:9083
a1.sinks.k1.hive.database = flume_test
a1.sinks.k1.hive.table = emp
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = "\t"
a1.sinks.k1.serializer.fieldnames = empno,ename,job,mgr,hiredate,sal,comm,deptno
a1.sinks.k1.serializer.serdeSeparator = '\t'
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
运行flume
bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-sink-hive2.properties
查看数据
相关推荐
- 悠悠万事,吃饭为大(悠悠万事吃饭为大,什么意思)
-
新媒体编辑:杜岷赵蕾初审:程秀娟审核:汤小俊审签:周星...
- 高铁扒门事件升级版!婚宴上‘冲喜’老人团:我们抢的是社会资源
-
凌晨两点改方案时,突然收到婚庆团队发来的视频——胶东某酒店宴会厅,三个穿大红棉袄的中年妇女跟敢死队似的往前冲,眼瞅着就要扑到新娘的高额钻石项链上。要不是门口小伙及时阻拦,这婚礼造型团队熬了三个月的方案...
- 微服务架构实战:商家管理后台与sso设计,SSO客户端设计
-
SSO客户端设计下面通过模块merchant-security对SSO客户端安全认证部分的实现进行封装,以便各个接入SSO的客户端应用进行引用。安全认证的项目管理配置SSO客户端安全认证的项目管理使...
- 还在为 Spring Boot 配置类加载机制困惑?一文为你彻底解惑
-
在当今微服务架构盛行、项目复杂度不断攀升的开发环境下,SpringBoot作为Java后端开发的主流框架,无疑是我们手中的得力武器。然而,当我们在享受其自动配置带来的便捷时,是否曾被配置类加载...
- Seata源码—6.Seata AT模式的数据源代理二
-
大纲1.Seata的Resource资源接口源码2.Seata数据源连接池代理的实现源码3.Client向Server发起注册RM的源码4.Client向Server注册RM时的交互源码5.数据源连接...
- 30分钟了解K8S(30分钟了解微积分)
-
微服务演进方向o面向分布式设计(Distribution):容器、微服务、API驱动的开发;o面向配置设计(Configuration):一个镜像,多个环境配置;o面向韧性设计(Resista...
- SpringBoot条件化配置(@Conditional)全面解析与实战指南
-
一、条件化配置基础概念1.1什么是条件化配置条件化配置是Spring框架提供的一种基于特定条件来决定是否注册Bean或加载配置的机制。在SpringBoot中,这一机制通过@Conditional...
- 一招解决所有依赖冲突(克服依赖)
-
背景介绍最近遇到了这样一个问题,我们有一个jar包common-tool,作为基础工具包,被各个项目在引用。突然某一天发现日志很多报错。一看是NoSuchMethodError,意思是Dis...
- 你读过Mybatis的源码?说说它用到了几种设计模式
-
学习设计模式时,很多人都有类似的困扰——明明概念背得滚瓜烂熟,一到写代码就完全想不起来怎么用。就像学了一堆游泳技巧,却从没下过水实践,很难真正掌握。其实理解一个知识点,就像看立体模型,单角度观察总...
- golang对接阿里云私有Bucket上传图片、授权访问图片
-
1、为什么要设置私有bucket公共读写:互联网上任何用户都可以对该Bucket内的文件进行访问,并且向该Bucket写入数据。这有可能造成您数据的外泄以及费用激增,若被人恶意写入违法信息还可...
- spring中的资源的加载(spring加载原理)
-
最近在网上看到有人问@ContextConfiguration("classpath:/bean.xml")中除了classpath这种还有其他的写法么,看他的意思是想从本地文件...
- Android资源使用(android资源文件)
-
Android资源管理机制在Android的开发中,需要使用到各式各样的资源,这些资源往往是一些静态资源,比如位图,颜色,布局定义,用户界面使用到的字符串,动画等。这些资源统统放在项目的res/独立子...
- 如何深度理解mybatis?(如何深度理解康乐服务质量管理的5个维度)
-
深度自定义mybatis回顾mybatis的操作的核心步骤编写核心类SqlSessionFacotryBuild进行解析配置文件深度分析解析SqlSessionFacotryBuild干的核心工作编写...
- @Autowired与@Resource原理知识点详解
-
springIOCAOP的不多做赘述了,说下IOC:SpringIOC解决的是对象管理和对象依赖的问题,IOC容器可以理解为一个对象工厂,我们都把该对象交给工厂,工厂管理这些对象的创建以及依赖关系...
- java的redis连接工具篇(java redis client)
-
在Java里,有不少用于连接Redis的工具,下面为你介绍一些主流的工具及其特点:JedisJedis是Redis官方推荐的Java连接工具,它提供了全面的Redis命令支持,且...
- 一周热门
- 最近发表
- 标签列表
-
- mybatiscollection (79)
- mqtt服务器 (88)
- keyerror (78)
- c#map (65)
- resize函数 (64)
- xftp6 (83)
- bt搜索 (75)
- c#var (76)
- mybatis大于等于 (64)
- xcode-select (66)
- mysql授权 (74)
- 下载测试 (70)
- linuxlink (65)
- pythonwget (67)
- androidinclude (65)
- logstashinput (65)
- hadoop端口 (65)
- vue阻止冒泡 (67)
- oracle时间戳转换日期 (64)
- jquery跨域 (68)
- php写入文件 (73)
- kafkatools (66)
- mysql导出数据库 (66)
- jquery鼠标移入移出 (71)
- 取小数点后两位的函数 (73)