百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 热门文章 > 正文

flume部署安装以及案例运行

bigegpt 2024-11-22 11:04 3 浏览

基本认识:

大数据阶段数据的收集来源, flume的收集数据一般是日志,比如:网站日志

flume是一个分布式的,可靠的,可用的

flume可以做离线也可以做实时分析

collecting --》source --》数据采集来源

aggregating --》channel --》数据临时缓存(只要数据被move了,那就不在存储了)

moving --》sink --》数据的转移

1、agent :source、channel、sink

(1)source:用于采集数据,将产生的数据流传输到Channel

(2)channel:连接 sources 和 sinks ,临时缓存数据

(3)sink:从Channel收集数据,将数据写到目标源

2、Events:

(1)是Flume数据传输的基本单元

(2)由header和载有数据的一个byte array构成,byte array字节数组:存储真实的数据

(3)每一个事件的大小:deserializer.maxLineLength2048字节,编码格式:UTF-8

一个source,可以绑定多个channel

一个sink,只能绑定一个channel


flume安装:

准备安装包

apache-flume-1.7.0-bin.tar.gz

解压缩

tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /opt/bigdata/

配置文件:flume-env.sh

mv flume-env.sh.template flume-env.sh

配置jdk

export JAVA_HOME=/opt/bigdata/jdk1.8

测试是否成功

bin/flume-ng version

flume的flume-ng命令

Usage: bin/flume-ng [options]...

例如一些提交任务的命令(熟悉下格式):

bin/flume-ng agent --conf conf --name agent --conf-file conf/test.properties

bin/flume-ng agent -c conf -n agent -f conf/test.properties

bin/flume-ng avro-client --conf conf --host host --port 8080


配置情况选择:

1、flume安装在hadoop集群中:

(1)配置JAVA_HOME:

export JAVA_HOME= /opt/bigdata/jdk1.8

2、flume安装在hadoop集群中,而且还配置了HA:

(1)HDFS访问入口变化

(2)配置JAVA_HOME:

export JAVA_HOME= /opt/bigdata/jdk1.8

(3)还需要添加hadoop的core-site.xml和hdfs-site.xml拷贝到flume的conf目录

3、flume不在hadoop集群里:

(1)配置JAVA_HOME:

export JAVA_HOME= /opt/bigdata/jdk1.8

(2)还需要添加hadoop的core-site.xml和hdfs-site.xml拷贝到flume的conf目录

(3)将hadoop的一些jar包添加到flume的lib目录下(用的是什么版本拷贝什么版本)


运行官网案例:

编辑配置文件flume-test.properties(创建一个)

准备配置信息

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = masterhbase

a1.sources.r1.port = 44444


# Describe the sink

a1.sinks.k1.type = logger


# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

查看系统有没有安装telnet:rpm -qa | grep telnet

没有的就安装: yum -y install telnet或yum -y install nc

文件log4j.properties显示的是日志信息配置

运行:

bin/flume-ng agent --conf conf --conf-file conf/flume-test.properties --name a1 -Dflume.root.logger=INFO,console

开启一个窗口:telnet连接端口号

telnet masterhbase 44444 (卡在那是正常的,你可以随意输入信息)

输入hello world

在flume中就可以看到数据

退出telnet:输入ctrl + ] 然后输入quit

运行实例一

需求:监控apache服务器的日志,利用flume监控某一个文件

安装httpd服务

yum -y install httpd

安装完成之后,会有个目录生成 /var/www/html

到/var/www/html这个目录下 vim index.html [随意输入内容]

启动服务: service httpd start

浏览网页:输入主机名[hostname]

日志产生的路径:/var/log/httpd/access_log

配置flume agent

source: exec

channel:memory

sink:hdfs

我们复制配置文件

编辑信息

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -f /var/log/httpd/access_log


# Describe the sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = hdfs://masterhbase:8082/flume/webdata/roll/%y%m%d/%H

a1.sinks.k1.hdfs.rollInterval = 600

a1.sinks.k1.hdfs.rollSize = 1048576

a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k1.hdfs.minBlockReplicas = 1

a1.sinks.k1.hdfs.useLocalTimeStamp = true

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue= 1

a1.sinks.k1.hdfs.roundUnit = hour


# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

一些配置说明


问题:hdfs上的文件一般数据文件大小要大,而且文件数量是要少


hdfs.rollInterval = 600 (这个地方最好还是设置一个时间)

hdfs.rollSize = 1048576 (1M,134217728-》128M)

hdfs.rollCount = 0

hdfs.minBlockReplicas = 1 (这个不设置的话,上面的参数有可能不会生效)


在hdfs文件上设置时间格式分层 年月日/时 每小时生成一个文件

hdfs.useLocalTimeStamp = true

hdfs.round = true

hdfs.roundValue= 1

hdfs.roundUnit = hour

将准备好的jar上传到flume/lib中

运行

查看hdfs上,不断刷新会有新的文件

查看下进程

运行实例二

利用flume监控某一个文件目录,将目录下滚动好的文件实时抽取到HDFS上

类型选择

source:spooldir

channel:file

sink:hdfs

创建配置文件flume-spooldir.properties

编写信息

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = spooldir

a1.sources.r1.spoolDir = /data/logs

a1.sources.r1.recursiveDirectorySearch = true


# Describe the sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = hdfs://masterhbase:8082/flume/webdata/spooldir/%y%m%d/%H

a1.sinks.k1.hdfs.rollInterval = 600

a1.sinks.k1.hdfs.rollSize = 1048576

a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k1.hdfs.minBlockReplicas = 1

a1.sinks.k1.hdfs.useLocalTimeStamp = true

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue= 1

a1.sinks.k1.hdfs.roundUnit = hour

a1.sinks.k1.hdfs.fileType = DataStream

a1.sinks.k1.hdfs.writeFormat = Text


# Describe the channel

a1.channels.c1.type = file

a1.channels.c1.checkpointDir = /opt/bigdata/apache-flume-1.7.0-bin/checkpointDir

a1.channels.c1.dataDirs = /opt/bigdata/apache-flume-1.7.0-bin/dataDirs


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

配置信息概念补充

1.source:spooldir(已经生成好的最终的数据文件)

(1)recursiveDirectorySearch 是否监视子目录以查找要读取的新文件

(2)includePattern 正则表达式,指定要包含的文件 (只.csv数据文件,是正则匹配)

(3)ignorePattern 正则表达式,指定要忽略的文件 (不抽取.csv数据文件,是正则匹配)

(4)缺点:不能对目录文件进行修改,如果有追加内容的文本文件,是不允许的(有可能不会被抽取,有可能会有错误)


2.flume监控目录,支持文件修改,并记录文件状态

(1)source:taildir (类似exec + spooldir的组合)

(2)filegroups :设置source组 可设置多个 filegroups = f1

(3)filegroups.:设置组员的监控目录和监控文件类型,使用正则表示,只能监控文件

(4)positionFile:设置定位文件的位置,以JSON格式写入给定位置文件上每个文件的最后读取位置

3.Memory Channel是一个不稳定的channel,它在内存中存储所有事件,

如果进程异常停止,内存中的数据将不能让恢复,而且受内存大小的限制。

4.flie channel:是一个持久化的channel,数据安全并且只要磁盘空间足够,它就可以将数据存储到磁盘上

5.checkpointDir:检查数据完整性,存放检查点目录,可以检测出哪些数据已被抽取,哪些还没有

6.dataDirs:存放数据的目录,dataDirs可以是多个目录,以逗号隔开,用独立的多个磁盘上的多个目录可以提高file channel的性能。

7.hdfs上数据默认是二进制的文件类型:bin/hdfs dfs -text /

8.可以修改hdfs.fileType 改为DataStream(数据流)hdfs.writeFormat = Text 改为文本格式

9.当使用DataStream时候,文件不会被压缩,不需要设置hdfs.codeC;当使用CompressedStream时候,必须设置一个正确的hdfs.codeC值;hdfs.codeC压缩编码解码器 --》snappy压缩

10.batchSize默认值:100 每个批次刷新到HDFS上的events数量;


创建目录

mkdir –p /data/logs

模拟数据

cp -r /opt/bigdata/hadoop-2.7.3/logs/* /data/logs/

查看数据

运行

bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-spooldir.properties

查看下HDFS

运行实例三

将hive的一些jar拷贝过来 flume的lib目录下

配置flume agent

source:netcat

channel:Memory

sink:hive

启动hive的元数据服务:

/opt/bigdata/apache-hive-1.2.1-bin/bin/hive --service metastore &

创建库和表 (表必须是CLUSTERED BY ,INTO BUCKETS)

create database flume_test;

use flume_test;

create table flume_user(

user_id int,

user_name string,

user_age int

)CLUSTERED BY (user_id) INTO 2 BUCKETS

row format delimited fields terminated by '\t'

stored as orc;

准备配置文件flume-sink-hive.properties

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = masterhbase

a1.sources.r1.port = 44444


# Describe the sink

a1.sinks.k1.type = hive

a1.sinks.k1.hive.metastore = thrift://masterhbase:9083

a1.sinks.k1.hive.database = flume_test

a1.sinks.k1.hive.table = flume_user

a1.sinks.k1.serializer = DELIMITED

a1.sinks.k1.serializer.delimiter = "\t"

a1.sinks.k1.serializer.fieldnames = user_id,user_name,user_age

a1.sinks.k1.serializer.serdeSeparator = '\t'


# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

配置概念补充

1.serializer: 负责解析事件中的字段并将它们映射到hive表中的列

(2)DELIMITED 普通文本

(2)json json文件 (不需要配置,JSON中的对象名称直接映射到Hive表中具有相同名称的列, 内部使用

org.apache.hive.hcatalog.data.JsonSerDe)


2.DELIMITED:

serializer.delimiter:传入数据中的字段分隔符,用双引号括起来,例如"\t"

serializer.fieldnames:从输入字段到hive表中的列的映射,指定为hive表列名称的逗号分隔列表

serializer.serdeSeparator :输出字段分隔符,单引号括起来,例如'\t'

hive参数设置vim hive-site.xml:


<property>
??? <name>hive.metastore.uris</name>
??? <value>thrift://masterhbase:9083</value>
</property>
<property>
??? <name>hive.txn.manager</name>
??? <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
??? <name>hive.compactor.initiator.on</name>
??? <value>true</value>
</property>
<property>
??? <name>hive.compactor.worker.threads</name>
??? <value>1</value>
</property>
<property>
??? <name>hive.support.concurrency</name>
??? <value>true</value>
</property>
<property>
??? <name>hive.enforce.bucketing</name>
??? <value>true</value>
</property>
<property>
??? <name> hive.exec.dynamic.partition.mode</name>
??? <value>nonstrict</value>
</property>
<property>
??? <name>hive.in.test</name>
??? <value>true</value>
</property>


解决报错问题

(1)报错:

Caused by: org.apache.thrift.TApplicationException: Internal error processing open_txns

-》hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;

打开一部分事务支持

-》协同配置

hive.compactor.initiator.on=true; -》运行启动程序和清除线程,用于打开所需参数的完整列表事务

hive.compactor.worker.threads=1; -》增加工作线程的数量将减少花费的时间

hive.support.concurrency=true; -》是否支持并发,默认是false

hive.enforce.bucketing=true; -》是否启用bucketing,写入table数据时会启动分桶

hive.exec.dynamic.partition.mode=nonstrict; -》设置非严格模式

(2)启动metastore时报错:

Table 'metastore.COMPACTION_QUEUE' doesn't exist

配置以下属性:这个是用来创建COMPACTION_QUEUE这张表的


hive.in.test

true


(3)再启动metastore时报错:

Error rolling back: Can't call rollback when autocommit=true

去掉以下属性:


hive.in.test

true

之前没有安装,先安装

启动flume agent

bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-sink-hive.properties

使用nc去连接,然后输入数据,数据以制表符分割

Hive中可以看到数据

运行实例四(hive)

创建表

create table emp(

empno int,

ename string,

job string,

mgr int,

hiredate string,

sal double,

comm double,

deptno int

)CLUSTERED BY (empno) INTO 2 BUCKETS

row format delimited fields terminated by '\t'

stored as orc;

准备配置信息flume-sink-hive2.properties

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1


# Describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.command = cat /data/emp.txt


# Describe the sink

a1.sinks.k1.type = hive

a1.sinks.k1.hive.metastore = thrift://masterhbase:9083

a1.sinks.k1.hive.database = flume_test

a1.sinks.k1.hive.table = emp

a1.sinks.k1.serializer = DELIMITED

a1.sinks.k1.serializer.delimiter = "\t"

a1.sinks.k1.serializer.fieldnames = empno,ename,job,mgr,hiredate,sal,comm,deptno

a1.sinks.k1.serializer.serdeSeparator = '\t'


# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100


# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

运行flume

bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-sink-hive2.properties

查看数据


相关推荐

悠悠万事,吃饭为大(悠悠万事吃饭为大,什么意思)

新媒体编辑:杜岷赵蕾初审:程秀娟审核:汤小俊审签:周星...

高铁扒门事件升级版!婚宴上‘冲喜’老人团:我们抢的是社会资源

凌晨两点改方案时,突然收到婚庆团队发来的视频——胶东某酒店宴会厅,三个穿大红棉袄的中年妇女跟敢死队似的往前冲,眼瞅着就要扑到新娘的高额钻石项链上。要不是门口小伙及时阻拦,这婚礼造型团队熬了三个月的方案...

微服务架构实战:商家管理后台与sso设计,SSO客户端设计

SSO客户端设计下面通过模块merchant-security对SSO客户端安全认证部分的实现进行封装,以便各个接入SSO的客户端应用进行引用。安全认证的项目管理配置SSO客户端安全认证的项目管理使...

还在为 Spring Boot 配置类加载机制困惑?一文为你彻底解惑

在当今微服务架构盛行、项目复杂度不断攀升的开发环境下,SpringBoot作为Java后端开发的主流框架,无疑是我们手中的得力武器。然而,当我们在享受其自动配置带来的便捷时,是否曾被配置类加载...

Seata源码—6.Seata AT模式的数据源代理二

大纲1.Seata的Resource资源接口源码2.Seata数据源连接池代理的实现源码3.Client向Server发起注册RM的源码4.Client向Server注册RM时的交互源码5.数据源连接...

30分钟了解K8S(30分钟了解微积分)

微服务演进方向o面向分布式设计(Distribution):容器、微服务、API驱动的开发;o面向配置设计(Configuration):一个镜像,多个环境配置;o面向韧性设计(Resista...

SpringBoot条件化配置(@Conditional)全面解析与实战指南

一、条件化配置基础概念1.1什么是条件化配置条件化配置是Spring框架提供的一种基于特定条件来决定是否注册Bean或加载配置的机制。在SpringBoot中,这一机制通过@Conditional...

一招解决所有依赖冲突(克服依赖)

背景介绍最近遇到了这样一个问题,我们有一个jar包common-tool,作为基础工具包,被各个项目在引用。突然某一天发现日志很多报错。一看是NoSuchMethodError,意思是Dis...

你读过Mybatis的源码?说说它用到了几种设计模式

学习设计模式时,很多人都有类似的困扰——明明概念背得滚瓜烂熟,一到写代码就完全想不起来怎么用。就像学了一堆游泳技巧,却从没下过水实践,很难真正掌握。其实理解一个知识点,就像看立体模型,单角度观察总...

golang对接阿里云私有Bucket上传图片、授权访问图片

1、为什么要设置私有bucket公共读写:互联网上任何用户都可以对该Bucket内的文件进行访问,并且向该Bucket写入数据。这有可能造成您数据的外泄以及费用激增,若被人恶意写入违法信息还可...

spring中的资源的加载(spring加载原理)

最近在网上看到有人问@ContextConfiguration("classpath:/bean.xml")中除了classpath这种还有其他的写法么,看他的意思是想从本地文件...

Android资源使用(android资源文件)

Android资源管理机制在Android的开发中,需要使用到各式各样的资源,这些资源往往是一些静态资源,比如位图,颜色,布局定义,用户界面使用到的字符串,动画等。这些资源统统放在项目的res/独立子...

如何深度理解mybatis?(如何深度理解康乐服务质量管理的5个维度)

深度自定义mybatis回顾mybatis的操作的核心步骤编写核心类SqlSessionFacotryBuild进行解析配置文件深度分析解析SqlSessionFacotryBuild干的核心工作编写...

@Autowired与@Resource原理知识点详解

springIOCAOP的不多做赘述了,说下IOC:SpringIOC解决的是对象管理和对象依赖的问题,IOC容器可以理解为一个对象工厂,我们都把该对象交给工厂,工厂管理这些对象的创建以及依赖关系...

java的redis连接工具篇(java redis client)

在Java里,有不少用于连接Redis的工具,下面为你介绍一些主流的工具及其特点:JedisJedis是Redis官方推荐的Java连接工具,它提供了全面的Redis命令支持,且...