百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 热门文章 > 正文

Flink CDC | Mysql指定时间戳读取

bigegpt 2025-02-21 12:18 10 浏览

Flink CDC在配置mysql时,可以指定几种方式来选择位点: INITIAL、EARLIEST_OFFSET、LATEST_OFFSET、SPECIFIC_OFFSETS、TIMESTAMP、SNAPSHOT。

INITIAL: 全量与增量

EARLIEST_OFFSET:最早位点

LATEST_OFFSET:最近的位点

SPECIFIC_OFFSETS:指定位点

TIMESTAMP:指定时间点

SNAPSHOT:全量

源码分析

设置该类型的cdc同步任务,机制会检查当前存在的binlog文件列表,因为每个文件是按顺序排列,同时对应的时间也是有顺序的,最终是通过二分法进行查找。

public static void main(String[] args) {
    MySqlSource.builder()
    .startupOptions(StartupOptions.timestamp(System.currentTimeMillis()))
    .build();
}

当设置了cdc任务的类型为TIMESTAMP时,会通过以下的方法来获取对应的binlogfile,具体查看类 BinlogOffsetUtils.java

public static BinlogOffset initializeEffectiveOffset(
    BinlogOffset offset, MySqlConnection connection) {
    BinlogOffsetKind offsetKind = offset.getOffsetKind();
    switch (offsetKind) {
        case EARLIEST:
            return BinlogOffset.ofBinlogFilePosition("", 0);
        case TIMESTAMP:
            // 遍历当前所有存在的binlogfile文件,取每个文件的文件头来判断时间
            // 所以一定是当前整个文件的数据,也是按binlogfile文件名来读取数据的
            return DebeziumUtils.findBinlogOffset(offset.getTimestampSec() * 1000, connection);
        case LATEST:
            return DebeziumUtils.currentBinlogOffset(connection);
        default:
            return offset;
    }
}
public static BinlogOffset findBinlogOffset(long targetMs, MySqlConnection connection) {
        MySqlConnection.MySqlConnectionConfiguration config = connection.connectionConfig();
        BinaryLogClient client =
                new BinaryLogClient(
                        config.hostname(), config.port(), config.username(), config.password());

        List binlogFiles = new ArrayList<>();
        JdbcConnection.ResultSetConsumer rsc =
                rs -> {
                    while (rs.next()) {
                        String fileName = rs.getString(1);
                        long fileSize = rs.getLong(2);
                        if (fileSize > 0) {
                            binlogFiles.add(fileName);
                        }
                    }
                };

        try {
            // 获取mysql系统内存在的binlog
            connection.query("SHOW BINARY LOGS", rsc);
            LOG.info("Total search binlog: {}", binlogFiles);

            if (binlogFiles.isEmpty()) {
                return BinlogOffset.ofBinlogFilePosition("", 0);
            }
            // 搜索最接近的binlog文件
            String binlogName = searchBinlogName(client, targetMs, binlogFiles);
            return BinlogOffset.ofBinlogFilePosition(binlogName, 0);
        } catch (Exception e) {
            throw new FlinkRuntimeException(e);
        }
    }
    private static String searchBinlogName(
            BinaryLogClient client, long targetMs, List binlogFiles)
            throws IOException, InterruptedException {
        int startIdx = 0;
        int endIdx = binlogFiles.size() - 1;
        // 因为binlog文件名是递增的,同时时间也是递增的
        // 以二分法进行查找
        while (startIdx <= endIdx) {
            int mid = startIdx + (endIdx - startIdx) / 2;
            long midTs = getBinlogTimestamp(client, binlogFiles.get(mid));
            if (midTs < targetMs) {
                startIdx = mid + 1;
            } else if (targetMs < midTs) {
                endIdx = mid - 1;
            } else {
                return binlogFiles.get(mid);
            }
        }

        return endIdx < 0 ? binlogFiles.get(0) : binlogFiles.get(endIdx);
    }

从以上的逻辑可以看到,当指定了timestamp时,会从最接近的那个binlog文件开始从头开始读取数据,那会不会多读很多数据呢?答案是否定的,当从找到的binlog文件中读取数据后,真正在处理的时候,会再判断一次当前的事件是否在指定的时间范围内,代码在
MySqlBinlogSplitReadTask.java

protected void handleEvent(
            MySqlPartition partition, MySqlOffsetContext offsetContext, Event event) {
    // 当从binlog读取数据后,进行一次过滤  
    if (!eventFilter.test(event)) {
            return;
        }
        super.handleEvent(partition, offsetContext, event);
        // check do we need to stop for read binlog for snapshot split.
        if (isBoundedRead()) {
            final BinlogOffset currentBinlogOffset =
                    RecordUtils.getBinlogPosition(offsetContext.getOffset());
            // reach the high watermark, the binlog reader should finished
            if (currentBinlogOffset.isAtOrAfter(binlogSplit.getEndingOffset())) {
                // send binlog end event
                try {
                    signalEventDispatcher.dispatchWatermarkEvent(
                            binlogSplit,
                            currentBinlogOffset,
                            SignalEventDispatcher.WatermarkKind.BINLOG_END);
                } catch (InterruptedException e) {
                    LOG.error("Send signal event error.", e);
                    errorHandler.setProducerThrowable(
                            new DebeziumException("Error processing binlog signal event", e));
                }
                // tell reader the binlog task finished
                ((StoppableChangeEventSourceContext) context).stopChangeEventSource();
            }
        }
    }

eventFilter由BinlogSplitReader在创建MySqlBinlogSplitReadTask时处理。

    private Predicate createEventFilter(BinlogOffset startingOffset) {
        // 当是TIMESTAMP类型时,需要将小于指定时间的事件进行移除
        if (BinlogOffsetKind.TIMESTAMP.equals(startingOffset.getOffsetKind())) {
            long startTimestampSec = startingOffset.getTimestampSec();
            return event ->
                    EventType.HEARTBEAT.equals(event.getHeader().getEventType())
                            || event.getHeader().getTimestamp() >= startTimestampSec * 1000;
        }
        return event -> true;
    }

相关推荐

Linux 系统启动完整流程

一、启动系统流程简介如上图,简述系统启动的大概流程:1:硬件引导UEFi或BIOS初始化,运行POST开机自检2:grub2引导阶段系统固件会从MBR中读取启动加载器,然后将控制权交给启动加载器GRU...

超专业解析!10分钟带你搞懂Linux中直接I/O原理

我们先看一张图:这张图大体上描述了Linux系统上,应用程序对磁盘上的文件进行读写时,从上到下经历了哪些事情。这篇文章就以这张图为基础,介绍Linux在I/O上做了哪些事情。文件系统什么是...

linux入门系列12--磁盘管理之分区、格式化与挂载

前面系列文章讲解了VI编辑器、常用命令、防火墙及网络服务管理,本篇将讲解磁盘管理相关知识。本文将会介绍大量的Linux命令,其中有一部分在“linux入门系列5--新手必会的linux命令”一文中已经...

Linux环境下如何设置多个交叉编译工具链?

常见的Linux操作系统都可以通过包管理器安装交叉编译工具链,比如Ubuntu环境下使用如下命令安装gcc交叉编译器:sudoapt-getinstallgcc-arm-linux-gnueab...

可算是有文章,把Linux零拷贝技术讲透彻了

阅读本文大概需要6.0分钟。作者:卡巴拉的树链接:https://dwz.cn/BaQWWtmh本文探讨Linux中主要的几种零拷贝技术以及零拷贝技术适用的场景。为了迅速建立起零拷贝的概念...

linux软链接的创建、删除和更新

大家都知道,有的时候,我们为了省下空间,都会使用链接的方式来进行引用操作。同样的,在系统级别也有。在Windows系列中,我们称其为快捷方式,在Linux中我们称其为链接(基本上都差不多了,其中可能...

Linux 中最容易被黑客动手脚的关键目录

在Linux系统中,黑客攻击后常会针对关键目录和文件进行修改以实现持久化、提权或隐藏恶意活动。本文介绍下黑客最常修改的目录及其手法。一、/etc目录关键文件有:/etc/passwd和/et...

linux之间传文件命令之Rsync傻瓜式教程

1.前言linux之间传文件命令用什么命令?本文介绍一种最常用,也是功能强大的文件同步和传输工具Rsync,本文提供详细傻瓜式教程。在本教程中,我们将通过实际使用案例和最常见的rsync选项的详细说...

Linux下删除目录符号链接的方法

技术背景在Linux系统中,符号链接(symlink)是一种特殊的文件,它指向另一个文件或目录。有时候,我们可能需要删除符号链接,但保留其指向的目标目录。然而,在删除符号链接时可能会遇到一些问题,例如...

阿里云国际站注册教程:aa云服务器怎么远程链接?

在全球化的今天,互联网带给我们无以计数的便利,而云服务器则是其中的重要基础设施之一。这篇文章将围绕阿里云国际站注册、aa云服务器如何远程链接,以及服务器安全防护如Ddos防火墙、网站应用防护waf防火...

Linux 5.16 网络子系统大范围升级 多个新适配器驱动加入

Linux在数据中心中占主导地位,因此每个内核升级周期的网络子系统变化仍然相当活跃。Linux5.16也不例外,周一最新与网络相关的更新加入了大量的驱动和新规范的支持。一个较新硬件的驱动是Realt...

搭建局域网文件共享服务(Samba),手机电脑都能看喜欢的影视剧

作为一名影视爱好者,为了方便地观看自己喜欢的影视作品,在家里搞一个专门用来存放电影的服务器是有必要的。蚁哥选则用一台Ubuntu系统的电脑做为服务器,共享影音文件,其他同一个局域网内的电脑或手机可以...

分享一个实用脚本—centos7系统巡检

概述这周闲得慌,就根据需求写了差不多20个脚本(部分是之前分享过的做了一些改进),今天主要分享一个给平时运维人员用的centos7系统巡检的脚本,或者排查问题检查系统情况也可以用..实用脚本#!/bi...

Linux 中创建符号链接的方法

技术背景在Linux系统里,符号链接(SymbolicLink),也被叫做软链接(SoftLink),是一种特殊的文件,它指向另一个文件或者目录。符号链接为文件和目录的管理带来了极大的便利,比...

一文掌握 Linux 符号链接

符号链接(SymbolicLink),通常被称为“软链接”,是Linux文件系统中一种强大而灵活的工具。它允许用户创建指向文件或目录的“快捷方式”,不仅简化了文件管理,还在系统配置、软件开发和日...