百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 热门文章 > 正文

canal 基于Mysql数据库增量日志解析

bigegpt 2024-08-18 13:59 3 浏览

canal 基于Mysql数据库增量日志解析


?1.前言

?最近太多事情 工作的事情,以及终身大事等等 耽误更新,由于最近做项目需要同步监听 未来电视 mysql的变更了解到公司会用canal做增量监听,就尝试使用了一下 这里做个demo 简单的记录一下。


?2.canal简介

?canal:主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费的中间件?当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x


?3.MySQL 注备复制原理

??3.1 mysql主备复制工作原理

??1.MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)??2.MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)??3.MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据


??3.2 canal 工作原理

??1.canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议??2.MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )??3.canal 解析 binary log 对象(原始为 byte 流)


?4.准备

?对于自建MySQL ,需要先开启 Binlog写入功能,并且配置binlog-format 为Row模式 在my.cnf中配置

?授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;

?5.canal 下载安装配置


??5.1 canal下载

??https://github.com/alibaba/canal/releases (下载速度可能很慢)

??下载 canal.deployer-xxx.tar.gz 如 canal.deployer-1.1.4.tar.gz

??解压后 可以看到如下结构


??5.2 canal 初始配置

??配置修改:

    vim conf/example/instance.properties

??如下:

    #################################################
    ## mysql serverId
    canal.instance.mysql.slaveId = 2020
    # position info 修改自己的数据库(canal要监听的数据库 地址 )
    canal.instance.master.address = 127.0.0.1:3306
    canal.instance.master.journal.name = 
    canal.instance.master.position = 
    canal.instance.master.timestamp = 
    #canal.instance.standby.address = 
    #canal.instance.standby.journal.name =
    #canal.instance.standby.position = 
    #canal.instance.standby.timestamp = 
    # username/password 修改成自己 数据库信息的账号 (单独开一个 准备阶段创建的账号)
    canal.instance.dbUsername = canal
    canal.instance.dbPassword = canal
    canal.instance.defaultDatabaseName =
    canal.instance.connectionCharset = UTF-8
    # table regex  表的监听规则 
    # canal.instance.filter.regex = blogs\.blog_info  
    canal.instance.filter.regex = .\*\\\\..\*
    # table black regex
    canal.instance.filter.black.regex = 

??启动canal

    sh bin/startup.sh

??查看server日志??看到 the canal server is running now 表示启动成功

    vi logs/canal/canal.log
    2020-01-08 15:25:33.361 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ##     start the canal server.
    2020-01-08 15:25:33.468 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.12.245:11111]
    2020-01-08 15:25:34.061 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

??查看instance的日志

    vi logs/example/example.log
    2020-01-08 15:25:33.864 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-blogs 
    2020-01-08 15:25:33.998 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
    2020-01-08 15:25:33.999 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - prepare to find start position just show master status

??5.3 扩展 destination 配置


    vi conf/canal.properties

??在canal.destinations 处可以配置当前server上部署的instance 列表 默认为 example ,我这里改成了 blogs最好对应数据库名称。一个instance 对应一个 数据库


?6.创建Java 客户端 监听canal 消费数据


??6.1 创建maven项目


??6.2 添加canal client POM 依赖


    <dependency>
      <groupId>com.alibaba.otter</groupId>
      <artifactId>canal.client</artifactId>
      <version>1.1.0</version>
    </dependency>

??6.3 创建 canal 的客户端监听

??CanalMessageListener.java

??该类实现InitializingBean 主要是在初始化的时候 执行 init 方法,在init()方法中 创建 CanalConnector对象,连接需要监听的canal,主要提供 canal的 host ,port ,destination ,以及username 和 password

??parse 方法 主要用于将监听的对象 通过反射等转换成对应的实体类

    /**
     * @author johnny
     **/
    @Component
    @Slf4j
    @ConditionalOnProperty(name = "application.canal.accessor", havingValue = "canal")
    public class CanalMessageListener implements InitializingBean, ParseCanal {
    private CanalConnector connector;
    @Autowired
    private CanalConfig canalConfig;
    @Autowired
    private IParseDispatcher configParseDispatcher;
    private void init() {
        //创建canal 监听 传入host port destination等参数
        connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalConfig.getHost(), canalConfig.getPort()),
                canalConfig.getDestination(), canalConfig.getUsername(), canalConfig.getPassword());
        connector.connect();
        //  .*\..*
        connector.subscribe(".*\\..*");
        connector.rollback();
        new Thread(() -> {
            while (true) {
                Message message = connector.getWithoutAck(canalConfig.getBatchSize());
                long batchId = message.getId();
                long size = message.getEntries().size();
            //batchId == -1 表示没有数据变更
                if (batchId == -1 || size == 0) {
                    System.out.println("empty data ");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                //解析数据变更
                    resoleveEntry(message.getEntries());
                }
            }
        }).start();
    }
    //解析数据变更
    private void resoleveEntry(List<CanalEntry.Entry> entries) {
        CanalEntry.RowChange rowChange = null;
        for (CanalEntry.Entry row : entries) {
         //判断是否是 事物开始 和 事物结束 
            if (row.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || row.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }
            try {
                rowChange = CanalEntry.RowChange.parseFrom(row.getStoreValue());
            } catch (InvalidProtocolBufferException e) {
                e.printStackTrace();
            }
            List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
            String tableName = row.getHeader().getTableName();
            CanalEntry.EventType eventType = row.getHeader().getEventType();
            for (CanalEntry.RowData rowData : rowDataList) {
                if (eventType == CanalEntry.EventType.UPDATE) {
                    List<CanalEntry.Column> columns = rowData.getBeforeColumnsList();
                    Object object = parse(columns, tableName);
                    log.info("收到的 object:{}", JsonUtils.marshalToString(object));
                    //根据收到的对象 处理后续业务逻辑
                }
            }
        }
    }
    @Override
    public void afterPropertiesSet() throws Exception {
        init();
    }
    //解析 List<CanalEntry.Column>对象到对应的 实体类
    @Override
    public Object parse(List<CanalEntry.Column> canalDatas, String tableName) {
    //根据配置好的map 从中根据key 表名 获取对应的映射后的 实体类class
        String className = configParseDispatcher.dispatch(tableName);
        Object entity = null;
        Class c = null;
        try {
            c = Class.forName(className);
            entity = c.newInstance();
        } catch (ClassNotFoundException e) {
            log.error("【未找到对应 {} 的 实体类 】", className);
        } catch (Exception e) {
        }
        for (CanalEntry.Column canalDataColumn : canalDatas) {
            String columnName = canalDataColumn.getName();
            Field[] fields = c.getDeclaredFields();
            for (Field field : fields) {
                Object fieldValue = null;
                field.setAccessible(true);
                String fiedName = CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, field.getName());
                log.info("【filedName: {}】", fiedName);
                if (fiedName.equals(columnName)) {
                    try {
                        if (Long.class.equals(field.getType())) {
                            fieldValue = NumberUtils.toLong(canalDataColumn.getValue());
                        }else if(Integer.class.equals(field.getType())){
                            fieldValue = NumberUtils.toInt(canalDataColumn.getValue());
                        }else if(Double.class.equals(field.getType())){
                            fieldValue = NumberUtils.toDouble(canalDataColumn.getValue());
                        }else if(Date.class.equals(field.getType())){
                            try {
                                fieldValue = DateUtils.parseDate(canalDataColumn.getValue(), new String[]{"yyyy-MM-dd HH:mm:ss"});
                            } catch (ParseException e) {
                                e.printStackTrace();
                            }
                        }else{
                            fieldValue = canalDataColumn.getValue();
                        }
                        field.set(entity, fieldValue);
                        break;
                    } catch (IllegalAccessException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        return entity;
    }
    }

??application.yml??配置canal 地址,以及表名和实体的映射规则


    server:
      port: 8881

    application:
        canal:
        accessor: canal
        host: 127.0.0.1
        port: 11111
        username:
        password:
        destination: blogs
        batchSize: 30
        parse:   规则,根据表名获取对应要映射的 实体class
          rule:
            mapping:
              blog_info: com.johnny.canal.canal_test.entity.BlogInfo

??IParseDispatcher.java??接口:用来根据表名key获取对应的 要映射的实体,这里写成接口是因为可以提供多种获取方式,比如我这里通过yml 配置去获取

    /**
     * @author johnny
     * @create 2020-01-17 上午11:09
     **/
    public interface IParseDispatcher {
     String dispatch(String key);
    }

??ConfigParseDispatcher.java??实现上面的接口,提供一种从 application.yml 获取初始源配置 根据 application.canal.parse.rule进行配置

    /**
     * @author johnny
     * @create 2020-01-17 上午11:07
     **/
    @Data
    @Configuration
    @ConfigurationProperties(prefix = "application.canal.parse.rule")
    public class ConfigParseDispatcher implements IParseDispatcher {
    private Map<String,String> mapping=new HashMap<>();
    @Override
    public String dispatch(String key) {
        return mapping.get(key);
    }
    }

??7.演示

??启动项目 此时控制台打印 empty data ,无数据变更

??通过执行 在 canal监听的mysql 上执行 更新语句

    update blog_info set blog_title = 'SpringBoot配置相关for canal test '  where id = 40

??debug 程序,当执行上面的update语句后 可以看到立即收到

??通过parse方法解析为对应的 实体对象,后续做自己的业务逻辑 即可


?8.总结

?本篇主要介绍了canal是什么,如何下载安装和配置 ,以及提供了自己写的一个简单demo 。后续有机会深入了解一下canal的其他功能,比如 如何同步到Kafka/RocketMQ等等。。


个人博客网站 https://www.askajohnny.com 欢迎访问!

本文由博客一文多发平台 https://openwrite.cn?from=article_bottom 发布!

相关推荐

悠悠万事,吃饭为大(悠悠万事吃饭为大,什么意思)

新媒体编辑:杜岷赵蕾初审:程秀娟审核:汤小俊审签:周星...

高铁扒门事件升级版!婚宴上‘冲喜’老人团:我们抢的是社会资源

凌晨两点改方案时,突然收到婚庆团队发来的视频——胶东某酒店宴会厅,三个穿大红棉袄的中年妇女跟敢死队似的往前冲,眼瞅着就要扑到新娘的高额钻石项链上。要不是门口小伙及时阻拦,这婚礼造型团队熬了三个月的方案...

微服务架构实战:商家管理后台与sso设计,SSO客户端设计

SSO客户端设计下面通过模块merchant-security对SSO客户端安全认证部分的实现进行封装,以便各个接入SSO的客户端应用进行引用。安全认证的项目管理配置SSO客户端安全认证的项目管理使...

还在为 Spring Boot 配置类加载机制困惑?一文为你彻底解惑

在当今微服务架构盛行、项目复杂度不断攀升的开发环境下,SpringBoot作为Java后端开发的主流框架,无疑是我们手中的得力武器。然而,当我们在享受其自动配置带来的便捷时,是否曾被配置类加载...

Seata源码—6.Seata AT模式的数据源代理二

大纲1.Seata的Resource资源接口源码2.Seata数据源连接池代理的实现源码3.Client向Server发起注册RM的源码4.Client向Server注册RM时的交互源码5.数据源连接...

30分钟了解K8S(30分钟了解微积分)

微服务演进方向o面向分布式设计(Distribution):容器、微服务、API驱动的开发;o面向配置设计(Configuration):一个镜像,多个环境配置;o面向韧性设计(Resista...

SpringBoot条件化配置(@Conditional)全面解析与实战指南

一、条件化配置基础概念1.1什么是条件化配置条件化配置是Spring框架提供的一种基于特定条件来决定是否注册Bean或加载配置的机制。在SpringBoot中,这一机制通过@Conditional...

一招解决所有依赖冲突(克服依赖)

背景介绍最近遇到了这样一个问题,我们有一个jar包common-tool,作为基础工具包,被各个项目在引用。突然某一天发现日志很多报错。一看是NoSuchMethodError,意思是Dis...

你读过Mybatis的源码?说说它用到了几种设计模式

学习设计模式时,很多人都有类似的困扰——明明概念背得滚瓜烂熟,一到写代码就完全想不起来怎么用。就像学了一堆游泳技巧,却从没下过水实践,很难真正掌握。其实理解一个知识点,就像看立体模型,单角度观察总...

golang对接阿里云私有Bucket上传图片、授权访问图片

1、为什么要设置私有bucket公共读写:互联网上任何用户都可以对该Bucket内的文件进行访问,并且向该Bucket写入数据。这有可能造成您数据的外泄以及费用激增,若被人恶意写入违法信息还可...

spring中的资源的加载(spring加载原理)

最近在网上看到有人问@ContextConfiguration("classpath:/bean.xml")中除了classpath这种还有其他的写法么,看他的意思是想从本地文件...

Android资源使用(android资源文件)

Android资源管理机制在Android的开发中,需要使用到各式各样的资源,这些资源往往是一些静态资源,比如位图,颜色,布局定义,用户界面使用到的字符串,动画等。这些资源统统放在项目的res/独立子...

如何深度理解mybatis?(如何深度理解康乐服务质量管理的5个维度)

深度自定义mybatis回顾mybatis的操作的核心步骤编写核心类SqlSessionFacotryBuild进行解析配置文件深度分析解析SqlSessionFacotryBuild干的核心工作编写...

@Autowired与@Resource原理知识点详解

springIOCAOP的不多做赘述了,说下IOC:SpringIOC解决的是对象管理和对象依赖的问题,IOC容器可以理解为一个对象工厂,我们都把该对象交给工厂,工厂管理这些对象的创建以及依赖关系...

java的redis连接工具篇(java redis client)

在Java里,有不少用于连接Redis的工具,下面为你介绍一些主流的工具及其特点:JedisJedis是Redis官方推荐的Java连接工具,它提供了全面的Redis命令支持,且...