1.Flink CDC 是什么?
做数仓的伙伴们都知道第一步就是考虑如何"入仓"或者"入湖". 同步方式细分下来有四种,全量、增量、新增及变化、缓慢变化。实际生产中大部分情况其实也就考虑全量 和 新增及变化。
首先说CDC(Change Data Capture)技术主要是面向数据库的变更捕获,技术方案也是非常丰富,从实现原理上可以分为两类:一种是基于查询,一种是基于日志的CDC。查询的CDC就很简单了,通过batch的方式实现就好,缺点也很明显不能保证强一致性和实时性,而日志的CDC可以实时消费日志,streaming处理保证了一致性和实时性,实现复杂。
对于Flink CDC相比Debezium、DataX、Canal、sqoop以及闭源的OGG在功能的全面性及架构可扩展性上有一定的优势体现。其实它的底层数据捕获引擎用的就是Debezium。套用网上的总结好的一个方案对比图,大家对于查询类工具可能会更加熟悉一些,比如DataX、Sqoop、Kettle这些,对于传统离线数仓来说也是可以的,当然对于业务的发展来说一定会有其瓶颈,延迟高、扩展性差等。之后进入Lambda架构,全量和增量分为两条数据线路后,此时就需要引入Canal 或者OGG、Debezium + Kafka 这套实时方案定时回写HDFS/OSS/S3,最后合并全量和增量数据形成结果表,这一下链路是不是就拉长了?又是DataX 又是Canal 又是kafka 等等,组件也多了,维护成本一下就提升了许多,并且全量和增量同步之间依然是隔离的, 实时性仍不可保证。
开源CDC方案对比图
想到在生产中大家可能更关心的一些问题
1) 集成过程中对业务库的影响有多大?
无论是增量还是全量,压力主要在读取上,索引读、并发控制,主要还是以限流为目标,对于体量很大的增量数据还是通过kafka会更加稳定。具体方案还是要全方位考虑,比如source DB的性能、日增数据的大小、集群性能等
2)集成的时候是否会锁表?
2.0之后引入无锁算法,(我没看懂)
3)如果集成过程中断网了,是否能从断点继续?(断点续传)
支持checkpoint(全量+增量)
4)数据质量可以保证嘛?
天然的exactly once , source 和 target都能保证精准一次。
5)物理删除日志数据还能捕获到嘛?
只要在CDC消费的offset之前就可以,如果还没消费就删除了,那就无法捕获了。
2.Flink CDC connect Oracle / Mysql Sink To Hive
Flink CDC 的双重角色一个是connector ,另一个就是consumer了, 如下图当前主流的一些业务DB都在支持和持续优化中,而对于consumer则可以通过Flink sql 或者 Datastream API来处理,当然在Flink SQL的迭代速度以及对hive的功能集成更是突飞猛进,因为Flink 它旨在打造流批一体化的数据引擎。
截至当前supported connectors
1) 采用DataStream API 的方式
Oracle
package cn.source.oracle
import java.util.Properties
import com.ververica.cdc.connectors.oracle.OracleSource
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema
import org.apache.flink.streaming.api.datastream.DataStreamSource
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.source.SourceFunction
class oracleSource {
def connectWithDataStream(getEnvironment:StreamExecutionEnvironment):DataStreamSource[String] = {
val properties:Properties = new Properties()
properties.setProperty("log.mining.strategy","online_catalog")
// properties.setProperty("log.mining.continuous.mine","true")
/**
* https://debezium.io/documentation/reference/1.6/connectors/oracle.html#oracle-connector-properties
* Notes:continuous.mine has de-support on oracle19C
* */
properties.setProperty("log.mining.batch.size.max","90000000")
properties.setProperty("log.mining.batch.size.default","10000")
properties.setProperty("debezium.database.tablename.case.insensitive","false")
/**
* only valid to oracle 11G and default is true, other version is invalid
* */
properties.setProperty("scan.startup.mode","latest-offset")
/**
* Get incremental data ,otherwise will get fully data first then incremental, Default is initial
* The mechanism of scan.startup.mode option relying on Debezium’s snapshot.mode configuration.
* So please do not use them together.
* If you specific both scan.startup.mode and debezium.snapshot.mode options in the table DDL,
* it may make scan.startup.mode doesn’t work.
* */
val sourceFunction :SourceFunction[String] = OracleSource.builder()
.url("jdbc:oracle:thin:@hostname:1521/serviceName")
.port(1521)
.database("serviceName") // service name
.schemaList("schema")
.tableList("TableName")
.username("userAccout")
.password("userPassword")
.debeziumProperties(properties)
.deserializer(new JsonDebeziumDeserializationSchema()) // convert source record to Json String
.build()
val oracleSourceStreaming = getEnvironment.addSource(sourceFunction)
oracleSourceStreaming
}
}
Mysql
package cn.source.mysql
import java.time.Duration
import java.util.Properties
import com.ververica.cdc.connectors.mysql.source.MySqlSource
import com.ververica.cdc.connectors.mysql.table.StartupOptions
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.streaming.api.datastream.DataStreamSource
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.source.SourceFunction
import projectUtils.createEnv
class mysqlSource {
def connectWithDataStream(getEnvironment:StreamExecutionEnvironment)={
val properties = new Properties()
properties.setProperty("snapshot.locking.mode","none") // no locking mode
val heartBeater = Duration.ofSeconds(20)
val sourceFunction : MySqlSource[String] = MySqlSource.builder()
.hostname("hostname")
.port(3306)
.databaseList("flink_cdc") // if you want to sync all DB , set up ".*"
.tableList("flink_cdc.t_name")
.username("userAccount")
.password("userPassword")
.heartbeatInterval(heartBeater) // for the table with slow update that we need to monitor heart beat to ensure get the latest offset in binlog
.debeziumProperties(properties)
.deserializer(new JsonDebeziumDeserializationSchema()) // transfer SourceRecords to JSON String
.startupOptions(StartupOptions.initial()) // scan mode , default is initial,initial load snapshot then get the latest bin log
.build()
sourceFunction
val mySqlSourceStreaming=getEnvironment.fromSource(sourceFunction.asInstanceOf,WatermarkStrategy.noWatermarks(),"Mysql-Source")
mySqlSourceStreaming
}
}
2) Transformation
Flink CDC消费到数据后可以采用Flink sql 进行ETL中数据解析、清洗过滤、多streaming join 打宽等操作.上面已经转成了json string ,如下图展示里面有个"op"属性,c 代表插入 ,u 代表更新 ,d 代表删除。在解析json的时候需要注意根目录,delete 和 insert、update对应的路径是不一样的。
Streaming中消费到的日志数据
val sqlResult = streamTableEnv.sqlQuery(
"""
|select
|case when JSON_VALUE(f0,'$.op') = 'd' then JSON_VALUE(f0,'$.before.IN_STU') else JSON_VALUE(f0,'$.after.IN_STU') end as IN_STU,
|case when JSON_VALUE(f0,'$.op') = 'd' then JSON_VALUE(f0,'$.before.NAME2') else JSON_VALUE(f0,'$.after.NAME2') end as name2,
|JSON_VALUE(f0,'$.op') as op,
|cast(TO_TIMESTAMP(FROM_UNIXTIME(CAST(JSON_VALUE(f0,'$.ts_ms') AS BIGINT) /1000 ,'yyyy-MM-dd HH:mm:ss')) as string) as time_create
|from t_source
|""".stripMargin)
3) Sink To Target
Flink sql 完成ETL过程后可将数据sink 到下游的hive 、kafka、data lake (Hudi、Iceberg等)、OLAP(Doris)等平台再进行进一步的数据处理和分析.
以Sink to hive为例,首先创建catalog ,具体关于flink connect to hive 可以参考梳理#1,创建好catalog后就可以将数据流式写进Hive 表了。
package cn.target.hive.catalog
import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
import org.apache.flink.table.catalog.hive.HiveCatalog
class hiveCatalog {
def create(tableEnv:TableEnvironment,name:String,defaultDB:String,hiveConfDir:String)={
val hive = new HiveCatalog(name,defaultDB,hiveConfDir)
tableEnv.registerCatalog(name,hive)
// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog(name)
// set dialect to hive language
println("After register to get the current catalog name :" + tableEnv.getCurrentCatalog)
}
}
4) Flink CDC Oracle 中遇到的一些问题
a. 数据延迟
这个问题在FAQ中做出了解答,使用在线模式,而不是去消费redo log ,那么为什么需要加入这两个参数呢 ?log.mining.strategy的默认值是redo_log_catalog,也就是去读archived logs ,这时一定会存在延迟的,因为需要写入redo,数据量越大耗时也会越长, 而online模式就不会存在这个问题, 但是无法捕获DDL,不过业务系统的操作还是以DML为主,所以切到online模式问题不大。而对于log.mining.continuous.mine默认是false,开启后Oracle会自动实时连续挖掘变化数据。
Flink SQL properties
'debezium.log.mining.strategy'='online_catalog', --//默认是redo_log_catalog
'debezium.log.mining.continuous.mine'='true' --// 这个参数19c之后Oracle宣布就不再支持了,default是false
Debezium properties
'log.mining.strategy'='online_catalog',
'log.mining.continuous.mine'='true'
b. 大小写问题
字段和表名都需要大写,字段类型一定要按照官网的mapping,否则会出现<Null>的情况,对于Oracle11G可以加上参数'debezium.database.tablename.case.insensitive'='false' ,其它version此参数无效
c. service_name 和 SID 识别问题
其实这个问题已经从源码上做了修复, 可以在connect时加上参数url , 如果不加的话,则需要自行修改源码如下<changed by jasonchenjc keep Oracle Service Name>, 原来代码中port 和 dbname 中间使用 “ : ” 分隔开,也就是必须指定SID , 而这样对于Oracle RAC部署则只能连接1个SID实例了,建议添加url或者修改源码指定为service_name .
public class OracleJdbcUrlUtils {
/**
* Gets the URL in SID format.
*
* @param properties
* @return jdbcUrl in SID format.
*/
public static String getConnectionUrlWithSid(Properties properties) {
String url;
if (properties.containsKey("database.url")) {
url = properties.getProperty("database.url");
} else {
String hostname = properties.getProperty("database.hostname");
String port = properties.getProperty("database.port");
String dbname = properties.getProperty("database.dbname");
url = "jdbc:oracle:thin:@" + hostname + ":" + port + "/" + dbname; // changed by jasonchenjc keep Oracle Service Name
}
return url;
}
}
3. Basic on Flink CDC 的流批一体
想法:
在流式处理上可以分成有界流和无界流,如下构建一个这样的思路,虽然Flink CDC 支持丰富的Flink 生态,但个人想法还是让擅长的组件处理擅长的事情,实时数仓直接进到OLAP平台,比如Doris ,利用物化视图、索引、联邦查询等特性直接构建实时框架,离线数仓可以构建到hive 或者 spark DataFrame 中,存储double ?
RT and Batch
明显上述架构中存储出现了问题,面对多场景的业务形式,我们既要考虑成本又要满足绝大多数需求,快、准、稳一般来说是业务的基础诉求,可以对于大多数IT来说这算是较高的要求了O(∩_∩)O~~~
优化:
简单聊一下Flink CDC + Hudi 这套流批一体的方案,为什么要引入Hudi , 相比另外两类湖产品,Hudi 是真正意义上做到流读、流写,批读、批写的产品,Hudi 可以存储到hdfs以及云上的对象存储oss 、S3等, 这样就可以统一存储了,再利用它ACID语义的加强,可以做到近实时的数据处理,这样对于RT部分大多数需求还是可以满足的(Doris On Hudi 生态还不支持,后续Doris version会支持)。