百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 热门文章 > 正文

Flink CDC connect DB To Hive

bigegpt 2024-08-26 11:12 2 浏览

1.Flink CDC 是什么?

做数仓的伙伴们都知道第一步就是考虑如何"入仓"或者"入湖". 同步方式细分下来有四种,全量、增量、新增及变化、缓慢变化。实际生产中大部分情况其实也就考虑全量 和 新增及变化。

首先说CDC(Change Data Capture)技术主要是面向数据库的变更捕获,技术方案也是非常丰富,从实现原理上可以分为两类:一种是基于查询,一种是基于日志的CDC。查询的CDC就很简单了,通过batch的方式实现就好,缺点也很明显不能保证强一致性和实时性,而日志的CDC可以实时消费日志,streaming处理保证了一致性和实时性,实现复杂。

对于Flink CDC相比Debezium、DataX、Canal、sqoop以及闭源的OGG在功能的全面性及架构可扩展性上有一定的优势体现。其实它的底层数据捕获引擎用的就是Debezium。套用网上的总结好的一个方案对比图,大家对于查询类工具可能会更加熟悉一些,比如DataX、Sqoop、Kettle这些,对于传统离线数仓来说也是可以的,当然对于业务的发展来说一定会有其瓶颈,延迟高、扩展性差等。之后进入Lambda架构,全量和增量分为两条数据线路后,此时就需要引入Canal 或者OGG、Debezium + Kafka 这套实时方案定时回写HDFS/OSS/S3,最后合并全量和增量数据形成结果表,这一下链路是不是就拉长了?又是DataX 又是Canal 又是kafka 等等,组件也多了,维护成本一下就提升了许多,并且全量和增量同步之间依然是隔离的, 实时性仍不可保证。

开源CDC方案对比图

想到在生产中大家可能更关心的一些问题

1) 集成过程中对业务库的影响有多大?

无论是增量还是全量,压力主要在读取上,索引读、并发控制,主要还是以限流为目标,对于体量很大的增量数据还是通过kafka会更加稳定。具体方案还是要全方位考虑,比如source DB的性能、日增数据的大小、集群性能等

2)集成的时候是否会锁表?

2.0之后引入无锁算法,(我没看懂)

3)如果集成过程中断网了,是否能从断点继续?(断点续传)

支持checkpoint(全量+增量)

4)数据质量可以保证嘛?

天然的exactly once , source 和 target都能保证精准一次。

5)物理删除日志数据还能捕获到嘛?

只要在CDC消费的offset之前就可以,如果还没消费就删除了,那就无法捕获了。

2.Flink CDC connect Oracle / Mysql Sink To Hive

Flink CDC 的双重角色一个是connector ,另一个就是consumer了, 如下图当前主流的一些业务DB都在支持和持续优化中,而对于consumer则可以通过Flink sql 或者 Datastream API来处理,当然在Flink SQL的迭代速度以及对hive的功能集成更是突飞猛进,因为Flink 它旨在打造流批一体化的数据引擎。

截至当前supported connectors

1) 采用DataStream API 的方式

Oracle

package cn.source.oracle

import java.util.Properties

import com.ververica.cdc.connectors.oracle.OracleSource
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema
import org.apache.flink.streaming.api.datastream.DataStreamSource
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.source.SourceFunction

class oracleSource {

  def connectWithDataStream(getEnvironment:StreamExecutionEnvironment):DataStreamSource[String] = {

    val properties:Properties = new Properties()
    properties.setProperty("log.mining.strategy","online_catalog")
   // properties.setProperty("log.mining.continuous.mine","true")
    /**
     * https://debezium.io/documentation/reference/1.6/connectors/oracle.html#oracle-connector-properties
     * Notes:continuous.mine has de-support on oracle19C
     * */
    properties.setProperty("log.mining.batch.size.max","90000000")
    properties.setProperty("log.mining.batch.size.default","10000")

    properties.setProperty("debezium.database.tablename.case.insensitive","false")
    /**
     * only valid to oracle 11G and default is true, other version is invalid
     * */
    properties.setProperty("scan.startup.mode","latest-offset")
    /**
     * Get incremental data ,otherwise will get fully data first then incremental, Default is initial
     * The mechanism of scan.startup.mode option relying on Debezium’s snapshot.mode configuration.
     * So please do not use them together.
     * If you specific both scan.startup.mode and debezium.snapshot.mode options in the table DDL,
     * it may make scan.startup.mode doesn’t work.
     * */

    val sourceFunction :SourceFunction[String] = OracleSource.builder()
      .url("jdbc:oracle:thin:@hostname:1521/serviceName")
      .port(1521)
      .database("serviceName") // service name
      .schemaList("schema")
      .tableList("TableName")
      .username("userAccout")
      .password("userPassword")
      .debeziumProperties(properties)
      .deserializer(new JsonDebeziumDeserializationSchema()) // convert source record to Json String
      .build()
    val oracleSourceStreaming = getEnvironment.addSource(sourceFunction)

    oracleSourceStreaming

  }

}

Mysql

package cn.source.mysql

import java.time.Duration
import java.util.Properties
import com.ververica.cdc.connectors.mysql.source.MySqlSource
import com.ververica.cdc.connectors.mysql.table.StartupOptions
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.streaming.api.datastream.DataStreamSource
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.source.SourceFunction
import projectUtils.createEnv

class mysqlSource {
  def connectWithDataStream(getEnvironment:StreamExecutionEnvironment)={

    val properties = new Properties()
    properties.setProperty("snapshot.locking.mode","none") // no locking mode

    val heartBeater = Duration.ofSeconds(20)
    val sourceFunction : MySqlSource[String] = MySqlSource.builder()
      .hostname("hostname")
      .port(3306)
      .databaseList("flink_cdc") // if you want to sync all DB , set up ".*"
      .tableList("flink_cdc.t_name")
      .username("userAccount")
      .password("userPassword")
      .heartbeatInterval(heartBeater) // for the table with slow update that we need to monitor heart beat to ensure get the latest offset in binlog
      .debeziumProperties(properties)
      .deserializer(new JsonDebeziumDeserializationSchema()) // transfer SourceRecords to JSON String
      .startupOptions(StartupOptions.initial()) // scan mode , default is initial,initial load snapshot then get the latest bin log
      .build()
    sourceFunction

    val mySqlSourceStreaming=getEnvironment.fromSource(sourceFunction.asInstanceOf,WatermarkStrategy.noWatermarks(),"Mysql-Source")
    mySqlSourceStreaming
  }

}

2) Transformation

Flink CDC消费到数据后可以采用Flink sql 进行ETL中数据解析、清洗过滤、多streaming join 打宽等操作.上面已经转成了json string ,如下图展示里面有个"op"属性,c 代表插入 ,u 代表更新 ,d 代表删除。在解析json的时候需要注意根目录,delete 和 insert、update对应的路径是不一样的

Streaming中消费到的日志数据

      val sqlResult = streamTableEnv.sqlQuery(
        """
          |select
          |case when JSON_VALUE(f0,'$.op') = 'd' then JSON_VALUE(f0,'$.before.IN_STU') else JSON_VALUE(f0,'$.after.IN_STU') end as IN_STU,
          |case when JSON_VALUE(f0,'$.op') = 'd' then JSON_VALUE(f0,'$.before.NAME2') else JSON_VALUE(f0,'$.after.NAME2') end as  name2,
          |JSON_VALUE(f0,'$.op') as op,
          |cast(TO_TIMESTAMP(FROM_UNIXTIME(CAST(JSON_VALUE(f0,'$.ts_ms') AS BIGINT) /1000 ,'yyyy-MM-dd HH:mm:ss')) as string) as time_create
          |from t_source
          |""".stripMargin)

3) Sink To Target

Flink sql 完成ETL过程后可将数据sink 到下游的hive 、kafka、data lake (Hudi、Iceberg等)、OLAP(Doris)等平台再进行进一步的数据处理和分析.

以Sink to hive为例,首先创建catalog ,具体关于flink connect to hive 可以参考梳理#1,创建好catalog后就可以将数据流式写进Hive 表了。

package cn.target.hive.catalog

import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
import org.apache.flink.table.catalog.hive.HiveCatalog

class hiveCatalog {

  def create(tableEnv:TableEnvironment,name:String,defaultDB:String,hiveConfDir:String)={

    val hive = new HiveCatalog(name,defaultDB,hiveConfDir)
    tableEnv.registerCatalog(name,hive)
    // set the HiveCatalog as the current catalog of the session
    tableEnv.useCatalog(name)
    // set dialect to hive language
    println("After register to get the current catalog name :" + tableEnv.getCurrentCatalog)
  }

}

4) Flink CDC Oracle 中遇到的一些问题

a. 数据延迟

这个问题在FAQ中做出了解答,使用在线模式,而不是去消费redo log ,那么为什么需要加入这两个参数呢 ?log.mining.strategy的默认值是redo_log_catalog,也就是去读archived logs ,这时一定会存在延迟的,因为需要写入redo,数据量越大耗时也会越长, 而online模式就不会存在这个问题, 但是无法捕获DDL,不过业务系统的操作还是以DML为主,所以切到online模式问题不大。而对于log.mining.continuous.mine默认是false,开启后Oracle会自动实时连续挖掘变化数据。

Flink SQL properties

'debezium.log.mining.strategy'='online_catalog', --//默认是redo_log_catalog
'debezium.log.mining.continuous.mine'='true' --// 这个参数19c之后Oracle宣布就不再支持了,default是false

Debezium properties

'log.mining.strategy'='online_catalog',
'log.mining.continuous.mine'='true'

b. 大小写问题

字段和表名都需要大写,字段类型一定要按照官网的mapping,否则会出现<Null>的情况,对于Oracle11G可以加上参数'debezium.database.tablename.case.insensitive'='false' ,其它version此参数无效

c. service_name 和 SID 识别问题

其实这个问题已经从源码上做了修复, 可以在connect时加上参数url , 如果不加的话,则需要自行修改源码如下<changed by jasonchenjc keep Oracle Service Name>, 原来代码中port 和 dbname 中间使用 “ : ” 分隔开,也就是必须指定SID , 而这样对于Oracle RAC部署则只能连接1个SID实例了,建议添加url或者修改源码指定为service_name .

public class OracleJdbcUrlUtils {

    /**
     * Gets the URL in SID format.
     *
     * @param properties
     * @return jdbcUrl in SID format.
     */
    public static String getConnectionUrlWithSid(Properties properties) {
        String url;
        if (properties.containsKey("database.url")) {
            url = properties.getProperty("database.url");
        } else {
            String hostname = properties.getProperty("database.hostname");
            String port = properties.getProperty("database.port");
            String dbname = properties.getProperty("database.dbname");
            url = "jdbc:oracle:thin:@" + hostname + ":" + port + "/" + dbname;  // changed by jasonchenjc keep Oracle Service Name
        }
        return url;
    }
}

3. Basic on Flink CDC 的流批一体

想法:

在流式处理上可以分成有界流和无界流,如下构建一个这样的思路,虽然Flink CDC 支持丰富的Flink 生态,但个人想法还是让擅长的组件处理擅长的事情,实时数仓直接进到OLAP平台,比如Doris ,利用物化视图、索引、联邦查询等特性直接构建实时框架,离线数仓可以构建到hive 或者 spark DataFrame 中,存储double ?

RT and Batch

明显上述架构中存储出现了问题,面对多场景的业务形式,我们既要考虑成本又要满足绝大多数需求,快、准、稳一般来说是业务的基础诉求,可以对于大多数IT来说这算是较高的要求了O(∩_∩)O~~~

优化:

简单聊一下Flink CDC + Hudi 这套流批一体的方案,为什么要引入Hudi , 相比另外两类湖产品,Hudi 是真正意义上做到流读、流写,批读、批写的产品,Hudi 可以存储到hdfs以及云上的对象存储oss 、S3等, 这样就可以统一存储了,再利用它ACID语义的加强,可以做到近实时的数据处理,这样对于RT部分大多数需求还是可以满足的(Doris On Hudi 生态还不支持,后续Doris version会支持)。

相关推荐

Docker篇(二):Docker实战,命令解析

大家好,我是杰哥上周我们通过几个问题,让大家对于Docker有了一个全局的认识。然而,说跟练往往是两个概念。从学习的角度来说,理论知识的学习,往往只是第一步,只有经过实战,才能真正掌握一门技术所以,本...

docker学习笔记——安装和基本操作

今天学习了docker的基本知识,记录一下docker的安装步骤和基本命令(以CentOS7.x为例)一、安装docker的步骤:1.yuminstall-yyum-utils2.yum-con...

不可错过的Docker完整笔记(dockerhib)

简介一、Docker简介Docker是一个开源的应用容器引擎,基于Go语言并遵从Apache2.0协议开源。Docker可以让开发者打包他们的应用以及依赖包到一个轻量级、可移植的容器中,...

扔掉运营商的 IPTV 机顶盒,全屋全设备畅看 IPTV!

其实现在看电视节目的需求确实大大降低了,折腾也只是为了单纯的让它实现,享受这个过程带来的快乐而已,哈哈!预期构想家里所有设备直接接入网络随时接收并播放IPTV直播(电信点播的节目不是太多,但好在非常稳...

第五节 Docker 入门实践:从 Hello World 到容器操作

一、Docker容器基础运行(一)单次命令执行通过dockerrun命令可以直接在容器中执行指定命令,这是体验Docker最快捷的方式:#在ubuntu:15.10容器中执行ech...

替代Docker build的Buildah简单介绍

Buildah是用于通过较低级别的coreutils接口构建OCI兼容镜像的工具。与Podman相似,Buildah不依赖于Docker或CRI-O之类的守护程序,并且不需要root特权。Builda...

Docker 命令大全(docker命令大全记录表)

容器生命周期管理run-创建并启动一个新的容器。start/stop/restart-这些命令主要用于启动、停止和重启容器。kill-立即终止一个或多个正在运行的容器rm-于删除一个或...

docker常用指令及安装rabbitMQ(docker安装rabbitmq配置环境)

一、docker常用指令启动docker:systemctlstartdocker停止docker:systemctlstopdocker重启docker:systemctlrestart...

使用Docker快速部署Storm环境(docker部署confluence)

Storm的部署虽然不是特别麻烦,但是在生产环境中,为了提高部署效率,方便管理维护,使用Docker来统一管理部署是一个不错的选择。下面是我开源的一个新的项目,一个配置好了storm与mono环境的D...

Docker Desktop安装使用指南:零基础教程

在之前的文章中,我多次提到使用Docker来安装各类软件,尤其是开源软件应用。鉴于不少读者对此有需求,我决定专门制作一期关于Docker安装与使用的详细教程。我主要以Macbook(Mac平台)为例进...

Linux如何成功地离线安装docker(linux离线安装httpd)

系统环境:Redhat7.2和Centos7.4实测成功近期因项目需要用docker,所以记录一些相关知识,由于生产环境是不能直接连接互联网,尝试在linux中离线安装docker。步骤1.下载...

Docker 类面试题(常见问题)(docker面试题目)

Docker常见问题汇总镜像相关1、如何批量清理临时镜像文件?可以使用sudodockerrmi$(sudodockerimages-q-fdanging=true)命令2、如何查看...

面试官:你知道Dubbo怎么优雅上下线的吗?你:优雅上下线是啥?

最近无论是校招还是社招,都进行的如火如荼,我也承担了很多的面试工作,在一次面试过程中,和候选人聊了一些关于Dubbo的知识。Dubbo是一个比较著名的RPC框架,很多人对于他的一些网络通信、通信协议、...

【Docker 新手入门指南】第五章:Hello Word

适合人群:完全零基础新手|学习目标:30分钟掌握Docker核心操作一、准备工作:先确认是否安装成功打开终端(Windows用户用PowerShell或GitBash),输入:docker--...

松勤软件测试:详解Docker,如何用portainer管理Docker容器

镜像管理搜索镜像dockersearch镜像名称拉取镜像dockerpullname[:tag]列出镜像dockerimages删除镜像dockerrmiimage名称或id删除...