百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 热门文章 > 正文

0232-如何使用StreamSets实现MySQL中变化数据实时写入Kudu

bigegpt 2024-08-18 13:59 7 浏览

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

Fayson的github:https://github.com/fayson/cdhproject

提示:代码块部分可以左右滑动查看噢

1.文档编写目的


在前面Fayson介绍了《

如何在CDH中安装和使用StreamSets

》和《

如何使用StreamSets从MySQL增量更新数据到Hive

》,通过StreamSets实现数据采集,在实际生产中需要实时捕获MySQL、Oracle等其他数据源的变化数据(简称CDC)将变化数据实时的写入大数据平台的Hive、HDFS、HBase、Solr、Elasticserach等。在《

如何使用StreamSets从MySQL增量更新数据到Hive

》中,使用受限于表需要主键或者更新字段,我们在本篇文章主要介绍如何将MySQL Binary Log作为StreamSets的源,来实时捕获MySQL变化数据并将变化数据存入Kudu。

StreamSets实现的流程如下:

  • 内容概述

1.环境准备

2.创建StreamSets的Pipeline流程

3.Pipeline流程测试

4.总结

  • 测试环境

1.StreamSets版本为3.1.2.0

2.CM和CDH版本为5.13.1

3.MariaDB版本为5.5.56

2.环境准备


1.开启MariaDB的Binlog日志

修改/etc/my.conf文件,在配置文件[mysqld]下增加如下配置

server-id=1
log-bin=mysql-bin
binlog_format=ROW

(可左右滑动)

注意:MySQL Binlog支持多种数据更新格式包括Row、Statement和mix(Row和Statement的混合),这里建议使用Row模式的Binlog格式,可以更加方便实时的反应行级别的数据变化。

修改完MariaDB的配置后重启服务。

[root@ip-172-31-16-68 ~]# systemctl restart mariadb
[root@ip-172-31-16-68 ~]# systemctl status mariadb

(可左右滑动)

登录MariaDB创建同步账号

GRANT ALL on maxwell.* to 'maxwell'@'%' identified by '123456';
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
FLUSH PRIVILEGES;

(可左右滑动)

2.StreamSets安装MySQL驱动

将MySQL的JDBC驱动拷贝至

/opt/cloudera/parcels/STREAMSETS_DATACOLLECTOR/streamsets-libs/streamsets-datacollector-mysql-binlog-lib/lib目录

3.在MariaDB数据库中创建测试表

create database test;
create table cdc_test (
 id int,
 name varchar(32)
);

(可左右滑动)

4.使用Hue创建Kudu表

create table cdc_test (
 id int,
 name String,
 primary key(id)
)
 PARTITION BY HASH PARTITIONS 16
STORED AS KUDU; 

(可左右滑动)

3.创建StreamSets的Pipline


1.登录StreamSets,创建一个新的Pipline

2.选择Origins类别,搜索MySQL Binary Log

配置MySQL Binary Log

配置MySQL信息

配置同步账号信息

高级配置,根据自己的需要进行配置

到此MySQL Binary Log的配置完成。

3.添加表过滤的Stream Selector

Stream Selector基本配置

配置分流条件

4.添加插入类型分流的Stream Selector

Stream Selector基本配置

配置分流条件

5.添加处理Delete类型日志的JavaScript Evaluator

该JavaScript Evaluator主要用于解析DELETE类型的Binary Log 日志

配置基本属性

配置JavaScript脚本,脚本如下:

for(var i = 0; i < records.length; i++) {
 try { 
 var newRecord = sdcFunctions.createRecord(true);
 newRecord.value = records[i].value['OldData'];
 newRecord.value.Type = records[i].value['Type'];
 newRecord.value.Database = records[i].value['Database'];
 newRecord.value.Table = records[i].value['Table'];
 log.info(records[i].value['Type'])
 output.write(newRecord);
 } catch (e) {
 // Send record to error
 error.write(records[i], e);
 }
}

(可左右滑动)

6.添加处理INSRET和UPDATE类型日志的JavaScript Evaluator

该JavaScript Evaluator主要用于解析INSERT和UPDATE类型的日志

配置基本属性

配置JavaScript脚本,脚本如下:

for(var i = 0; i < records.length; i++) {
 try { 
 var newRecord = sdcFunctions.createRecord(true);
 newRecord.value = records[i].value['Data'];
 newRecord.value.Type = records[i].value['Type'];
 newRecord.value.Database = records[i].value['Database'];
 newRecord.value.Table = records[i].value['Table'];
 log.info(records[i].value['Type'])
 output.write(newRecord);
 } catch (e) {
 // Send record to error
 error.write(records[i], e);
 }
}

(可左右滑动)

7.为JavaScript Evaluator-DELETE添加Kudu

配置Kudu基本属性

配置Kudu环境

Kudu的高级配置,Fayson这里使用的是默认配置

8.为JavaScript Evaluator-UPSERT添加Kudu

配置基础属性

配置Kudu环境

Kudu高级配置

9.流程创建完成后,启动该Pipelines

4.Pipeline流程测试


1.登录MariaDB数据库,向cdc_test表中插入数据

insert into cdc_test values(1, 'fayson');

(可左右滑动)

查看StreamSets的Pipeline实时状态

可以看到Kudu-Upsert成功的处理了一条数据

使用Hue查看Kudu表数据

数据成功的插入到Kudu的cdc_test表中。

2.登录MariaDB数据库修改cdc_test表中数据

update cdc_test set name='fayson-update' where id=1;

(可左右滑动)

查看StreamSets的Pipeline实时状态

可以看到Kudu-Upsert成功处理了两条数据,这两条数据分别是INSERT和UPDATE

使用Hue查看Kudu的cdc_test表

3.登录MariaDB数据,删除cdc_test表中数据

delete from cdc_test where id=1;

(可左右滑动)

查看StreamSets的Pipeline实时状态

可以看到Kudu-Delete成功处理一条日志

使用Hue查看Kudu的cdc_test表,id为1的数据已不存在

5.总结


  • 实现MySQL CDC的前提是需要开启MySQL的Binary Log日志,并且需要创建复制账号,SreamSets中MySQL-Binary Log实际充当的为MySQL的一个Slave。
  • 向Kudu实时写入数据的前提是Kudu的表已存在,否则无法正常写入数据。
  • JavaScript脚本需要注意在解析每一条Record是需要使用其内置的Function,在示例中Fayson将MySQL Binary Log复杂的JSON数据解析重组为简单的Map对象,这里就省去了Kudu入库时“Field to Column Mapping”的映射,需要去确保组装的Map数据中Key与Kudu表中的column字段一致。
  • 在Kudu插入数据时指定Kudu表名需要注意,如果使用Impala创建的表,则需要加上impala的前缀格式impala:<database>:<table>。

提示:代码块部分可以左右滑动查看噢

为天地立心,为生民立命,为往圣继绝学,为万世开太平。

温馨提示:要看高清无码套图,请使用手机打开并单击图片放大查看。

推荐关注Hadoop实操,第一时间,分享更多Hadoop干货,欢迎转发和分享。

原创文章,欢迎转载,转载请注明:转载自微信公众号Hadoop实操

相关推荐

悠悠万事,吃饭为大(悠悠万事吃饭为大,什么意思)

新媒体编辑:杜岷赵蕾初审:程秀娟审核:汤小俊审签:周星...

高铁扒门事件升级版!婚宴上‘冲喜’老人团:我们抢的是社会资源

凌晨两点改方案时,突然收到婚庆团队发来的视频——胶东某酒店宴会厅,三个穿大红棉袄的中年妇女跟敢死队似的往前冲,眼瞅着就要扑到新娘的高额钻石项链上。要不是门口小伙及时阻拦,这婚礼造型团队熬了三个月的方案...

微服务架构实战:商家管理后台与sso设计,SSO客户端设计

SSO客户端设计下面通过模块merchant-security对SSO客户端安全认证部分的实现进行封装,以便各个接入SSO的客户端应用进行引用。安全认证的项目管理配置SSO客户端安全认证的项目管理使...

还在为 Spring Boot 配置类加载机制困惑?一文为你彻底解惑

在当今微服务架构盛行、项目复杂度不断攀升的开发环境下,SpringBoot作为Java后端开发的主流框架,无疑是我们手中的得力武器。然而,当我们在享受其自动配置带来的便捷时,是否曾被配置类加载...

Seata源码—6.Seata AT模式的数据源代理二

大纲1.Seata的Resource资源接口源码2.Seata数据源连接池代理的实现源码3.Client向Server发起注册RM的源码4.Client向Server注册RM时的交互源码5.数据源连接...

30分钟了解K8S(30分钟了解微积分)

微服务演进方向o面向分布式设计(Distribution):容器、微服务、API驱动的开发;o面向配置设计(Configuration):一个镜像,多个环境配置;o面向韧性设计(Resista...

SpringBoot条件化配置(@Conditional)全面解析与实战指南

一、条件化配置基础概念1.1什么是条件化配置条件化配置是Spring框架提供的一种基于特定条件来决定是否注册Bean或加载配置的机制。在SpringBoot中,这一机制通过@Conditional...

一招解决所有依赖冲突(克服依赖)

背景介绍最近遇到了这样一个问题,我们有一个jar包common-tool,作为基础工具包,被各个项目在引用。突然某一天发现日志很多报错。一看是NoSuchMethodError,意思是Dis...

你读过Mybatis的源码?说说它用到了几种设计模式

学习设计模式时,很多人都有类似的困扰——明明概念背得滚瓜烂熟,一到写代码就完全想不起来怎么用。就像学了一堆游泳技巧,却从没下过水实践,很难真正掌握。其实理解一个知识点,就像看立体模型,单角度观察总...

golang对接阿里云私有Bucket上传图片、授权访问图片

1、为什么要设置私有bucket公共读写:互联网上任何用户都可以对该Bucket内的文件进行访问,并且向该Bucket写入数据。这有可能造成您数据的外泄以及费用激增,若被人恶意写入违法信息还可...

spring中的资源的加载(spring加载原理)

最近在网上看到有人问@ContextConfiguration("classpath:/bean.xml")中除了classpath这种还有其他的写法么,看他的意思是想从本地文件...

Android资源使用(android资源文件)

Android资源管理机制在Android的开发中,需要使用到各式各样的资源,这些资源往往是一些静态资源,比如位图,颜色,布局定义,用户界面使用到的字符串,动画等。这些资源统统放在项目的res/独立子...

如何深度理解mybatis?(如何深度理解康乐服务质量管理的5个维度)

深度自定义mybatis回顾mybatis的操作的核心步骤编写核心类SqlSessionFacotryBuild进行解析配置文件深度分析解析SqlSessionFacotryBuild干的核心工作编写...

@Autowired与@Resource原理知识点详解

springIOCAOP的不多做赘述了,说下IOC:SpringIOC解决的是对象管理和对象依赖的问题,IOC容器可以理解为一个对象工厂,我们都把该对象交给工厂,工厂管理这些对象的创建以及依赖关系...

java的redis连接工具篇(java redis client)

在Java里,有不少用于连接Redis的工具,下面为你介绍一些主流的工具及其特点:JedisJedis是Redis官方推荐的Java连接工具,它提供了全面的Redis命令支持,且...