百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 热门文章 > 正文

MYSQL数据同步

bigegpt 2025-05-21 12:11 18 浏览

java开发工程师在实际的开发经常会需要实现两台不同机器上的MySQL数据库的数据同步,要解决这个问题不难,无非就是mysql数据库的数据同步问题。但要看你是一次性的数据同步需求,还是定时数据同步,亦或是持续性实时数据同步。

其中一次性的数据同步需求比较简单,这里主要介绍一次性的数据同步需求后的增量数据同步方案:

方案一:canal

github

简介

canal [k'nael],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

下载

canal.deployer-1.1.6.tar.gz

canal.adapter-1.1.6.tar.gz

canal.admin-1.1.6.tar.gz

deployer:读取binlog,读取SQL,默认将数据放在缓存中,也可以将数据同步到MQ中

adapter:连接deployer,读取sql,同步数据到目标存储中(支持elasticsearch,hbase,kudu,rdb.tablestore)

admin:可视化页面

准备

  • 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下
  • [mysqld]
    log-bin=mysql-bin # 开启 binlog
    binlog-format=ROW # 选择 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
    • 注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步
  • 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
  • CREATE USER canal IDENTIFIED BY 'canal';
    GRANT
    SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;

配置deployer

解压deployer,修改conf/example目录下的instance.properties

  • 为了方便我用的是root账号
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=192.168.2.4:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
  • 启动deployer
sh bin/startup.sh
  • 查看log下的日志文件,查看是否启动成功

配置adapter

解压adapter,进入到conf目录

  • 修改bootstrap.yml
  • canal:
    manager:
    jdbc:
    url: jdbc:mysql://192.168.2.4:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8
    username: root
    password: 123456
  • 创建canal_manager的schama
  • 执行sql语句
  • canal_manager.sql
  • 修改application.yml
  • server:
    port: 8081
    spring:
    jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

    canal.conf:
    mode: tcp #tcp kafka rocketMQ rabbitMQ
    flatMessage: true
    zookeeperHosts:
    syncBatchSize: 1000
    retries: -1
    timeout:
    accessKey:
    secretKey:
    consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    kafka.bootstrap.servers: 127.0.0.1:9092
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: latest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000
    # rocketMQ consumer
    rocketmq.namespace:
    rocketmq.namesrv.addr: 127.0.0.1:9876
    rocketmq.batch.size: 1000
    rocketmq.enable.message.trace: false
    rocketmq.customized.trace.topic:
    rocketmq.access.channel:
    rocketmq.subscribe.filter:
    # rabbitMQ consumer
    rabbitmq.host:
    rabbitmq.virtual.host:
    rabbitmq.username:
    rabbitmq.password:
    rabbitmq.resource.ownerId:

    srcDataSources:
    defaultDS:
    url: jdbc:mysql://192.168.2.4:3307/test2?useUnicode=true
    username: root
    password: 123456
    canalAdapters:
    - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
    outerAdapters:
    # - name: logger
    - name: rdb
    key: mysql1
    properties:
    jdbc.driverClassName: com.mysql.jdbc.Driver
    jdbc.url: jdbc:mysql://192.168.2.4:3307/test1?useUnicode=true
    jdbc.username: root
    jdbc.password: 123456
    druid.stat.enable: false
    druid.stat.slowSqlMillis: 1000
    - name: rdb
    key: mysql2
    properties:
    jdbc.driverClassName: com.mysql.jdbc.Driver
    jdbc.url: jdbc:mysql://192.168.2.4:3307/test3?useUnicode=true
    jdbc.username: root
    jdbc.password: 123456
    druid.stat.enable: false
    druid.stat.slowSqlMillis: 1000
    # - name: rdb
    # key: oracle1
    # properties:
    # jdbc.driverClassName: oracle.jdbc.OracleDriver
    # jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
    # jdbc.username: mytest
    # jdbc.password: m121212
    # - name: rdb
    # key: postgres1
    # properties:
    # jdbc.driverClassName: org.postgresql.Driver
    # jdbc.url: jdbc:postgresql://localhost:5432/postgres
    # jdbc.username: postgres
    # jdbc.password: 121212
    # threads: 1
    # commitSize: 3000
    # - name: hbase
    # properties:
    # hbase.zookeeper.quorum: 127.0.0.1
    # hbase.zookeeper.property.clientPort: 2181
    # zookeeper.znode.parent: /hbase
    # - name: es
    # hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
    # properties:
    # mode: transport # or rest
    # # security.auth: test:123456 # only used for rest mode
    # cluster.name: elasticsearch
    # - name: kudu
    # key: kudu
    # properties:
    # kudu.master.address: 127.0.0.1 # ',' split multi address
    # - name: phoenix
    # key: phoenix
    # properties:
    # jdbc.driverClassName: org.apache.phoenix.jdbc.PhoenixDriver
    # jdbc.url: jdbc:phoenix:127.0.0.1:2181:/hbase/db
    # jdbc.username:
    # jdbc.password:
    • 在目标库创建号需要同步的schama
  • 继续进入到conf/rdb目录,创建适配器(以test1,test3db为例,创建test1.yml,test3.yml)
  • test1.yml
    # dataSourceKey: defaultDS
    # destination: example
    # groupId: g1
    # outerAdapterKey: mysql1
    # concurrent: true
    # dbMapping:
    # database: test1
    # table: user
    # targetTable: mytest2.user
    # targetPk:
    # id: id
    # # mapAll: true
    # targetColumns:
    # id:
    # name:
    # role_id:
    # c_time:
    # test1:
    # etlCondition: "where c_time>={}"
    # commitBatch: 3000 # 批量提交的大小


    ## Mirror schema synchronize config
    dataSourceKey: defaultDS
    destination: example
    groupId: g1
    outerAdapterKey: mysql1
    concurrent: true
    dbMapping:
    mirrorDb: true
    database: test1

    ##############################
    test3.yml
    ## Mirror schema synchronize config
    dataSourceKey: defaultDS
    destination: example
    groupId: g1
    outerAdapterKey: mysql2
    concurrent: true
    dbMapping:
    mirrorDb: true
    database: test3
  • 启动
  • bin/startup.sh
  • 查看log下的日志

验证

在源数据库创建表,新增,更新,删除等操作,查看目标数据库是否更新

方案二:datax

github

简介

DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS, databend 等各种异构数据源之间高效的数据同步功能。

下载

Source code(tar.gz)

配置

解压后,修改在job下创建mysql_2_mysql.json

{
    "job": {
        "setting": {
            "speed": {
                "channel": 1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "111111",
                        "column": [ "id", "name","content" ,"createdate"],
                        "splitPk": "id",
                        "connection": [
                            {
                                "table": [
                                    "t_user_info"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://192.168.2.4:3306/sourcedb"
                                ]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password": "111111",
                        "column": [ "id", "name","content","createdate"],
                        "session": [
                            "set session sql_mode='ANSI'"
                        ],
                        "preSql": [
                            "delete from t_user_info"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://192.168.2.4:3307/targetdb",
                                "table": [
                                    "t_user_info"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}
  • 需要在目标库创建对应的表

启动

python .\bin\datax.py .\job\mysql-2-mysql.json

问题

  • 同步需要写sql或者配置好字段全量更新
  • 增量更新需要表具有create_time,update_time字段

方案三:存储SQL

python 爬虫,将sql写到中间件存储(s3,kafka,redis,es)等中,然后写一个程序读取存储,将数据写到目标数据库中

对比

对比项

canal

datax

存储SQL

对源数据库的影响

需要开启bin_log,占用磁盘,有会影响数据库性能

查询源数据库(select),数据量越大对数据库的影响越大

没有影响

是否需要在目标数据库创建schama

是否需要在目标数据库创建表

增量更新

启动适配器就会增量更新

需要表字段有create_time,update_time这种时间戳标记

启动同步程序就行

怎么操作

需要启动,deployer和adapter程序

需要启动datax程序

需要改造爬虫程序,还需要创建一个同步程序



相关推荐

Linux gron 命令使用详解(linux gminer)

简介gron是一个独特的命令行工具,用于将JSON数据转换为离散的、易于grep处理的赋值语句格式。它的名字来源于"grepableon"或"grepable...

【Linux】——从0到1的学习,让你熟练掌握,带你玩转Linu

学习Linux并掌握Java环境配置及SpringBoot项目部署是一个系统化的过程,以下是从零开始的详细指南,帮助你逐步掌握这些技能。一、Linux基础入门1.安装Linux系统选择发行版:推荐...

Linux常用的shell命令汇总(linux中shell的作用)

本文介绍Linux系统下常用的系统级命令,包括软硬件查看、修改命令,有CPU、内存、硬盘、网络、系统管理等命令。说明命令是在Centos6.464位的虚拟机系统进行测试的。本文介绍的命令都会在此C...

零成本搭建个人加密文件保险柜(适用于 Win11 和 Linux)

不依赖收费软件操作简单,小白也能跟着做支持双系统,跨平台使用实现数据加密、防删除、防泄露内容通俗无技术门槛,秒懂秒用使用工具简介我们将使用两个核心工具:工具名用途系统支持Veracrypt创建加密虚...

如何在 Linux 中使用 Gzip 命令?(linux怎么用gzip命令)

gzip(GNUzip)是Linux系统中一个开源的压缩工具,用于压缩和解压缩文件。它基于DEFLATE算法,广泛应用于文件压缩、备份和数据传输。gzip生成的文件通常带有.gz后缀,压缩效率...

Linux 必备的20个核心知识点(linux内核知识点)

学习和使用Linux所必备的20个核心知识点。这些知识点涵盖了从基础操作到系统管理和网络概念,是构建扎实Linux技能的基础。Linux必备的20个知识点1.Linux文件系统层级标...

谷歌 ChromeOS 已支持 7z、iso、tar 文件格式

IT之家6月21日消息,谷歌ChromeOS在管理文件方面进行了改进,新增了对7z、iso和tar等格式的支持。从5月的ChromeOS101更新开始,ChromeOS...

如何在 Linux 中提取 Tar Bz2 文件?

在深入解压方法之前,我们先来了解.tar.bz2文件的本质。.tar.bz2是一种组合文件格式,包含两个步骤:Tar(TapeArchive):tar是一种归档工具,用于将多个文件或目录打包...

如何在 CentOS 7/8 上安装 Kitematic Docker 管理器

Kitematic是一款流行的Docker图形界面管理平台,适用于Ubuntu、macOS和Windows操作系统。然而,其他发行版(如CentOS、OpenSUSE、Fedora、R...

Nacos3.0重磅来袭!全面拥抱AI,单机及集群模式安装详细教程!

之前和大家分享过JDK17的多版本管理及详细安装过程,然后在项目升级完jdk17后又发现之前的注册和配置中心nacos又用不了,原因是之前的nacos1.3版本的,版本太老了,已经无法适配当前新的JD...

爬虫搞崩网站后,程序员自制“Zip炸弹”反击,6刀服务器成功扛住4.6万请求

在这个爬虫横行的时代,越来越多开发者深受其害:有人怒斥OpenAI的爬虫疯狂“偷”数据,7人团队十年心血的网站一夜崩溃;也有人被爬虫逼到极限,最后只好封掉整个巴西的访问才勉强止血。但本文作者却走...

Ubuntu 操作系统常用命令详解(ubuntu必学的60个命令)

UbuntuLinux是一款流行的开源操作系统,广泛应用于服务器、开发、学习等场景。命令行是Ubuntu的灵魂,也是高效、稳定管理系统的利器。本文按照各大常用领域,详细总结Ubuntu必学...

Linux面板8.0.54 测试版-已上线(linux主机面板)

Linux面板8.0.54测试版【增加】[网站]Java项目新增刷新列表按钮【增加】[网站]PHP项目-Apache-服务新增守护进程功能【增加】[网站]Python项目创建/删除网站时新增同时创建...

开源三剑客——构建私有云世界的基石

公共云原生的浪潮正在席卷这个世界,亚马逊AWS、谷歌GCP和微软的Azure年收入增长超过了30%,越来越多的公司和个人开始将自己的服务部署到云环境中,大型数据中心的规模经济带来了成本的降低,可以在保...

2.2k star,一款业界领先的私有云+在线文档管理系统

简介kodbox可道云(原KodExplorer)是业内领先的企业私有云和在线文档管理系统,为个人网站、企业私有云部署、网络存储、在线文档管理、在线办公等提供安全可控,简便易用、可高度定制的私有云产品...