跳转至

数据管道工具 - mysql-dataflowsync(DFS)

qqAys/mysql-dataflowsync - GitHub | English

GitHub License

DFS 介绍

DFS是一个数据同步框架,通过流式读取mysql binlog实现数据捕获,目标数据库可以是mysql,也可以是其他数据库。这个框架由一个毫秒级的增量数据捕获模块(CDC)、基于sqlite数据库的持久化队列 persist-queue、自动关系处理模块(rel_mgr)以及符合业务逻辑的、复杂的数据转换模块(DPU)构成,使用 dfs-entrypoint.py 启动,可以自由组合调用。在一个日均7.5万事件负载的集群数据库中,增量数据同步能够在秒级延迟内完成(平均延迟1秒内),同时保持了数据的完整性和一致性。

DFS 工作流

开发背景

公司有个项目要实施数据迁移,我先来介绍一下当时的业务场景,我们有一个旧业务系统,这个系统是大范围使用的,产品设计了新一代业务系统,需要进行小范围生产试点,试点需要使用生产数据,并且产生的数据影响也是生产级别的。试点环境需直接使用生产数据,但旧系统数据高度耦合,无法通过类似"删档内测"模式实现数据隔离。

我首先评估了阿里云DTS,其在数据同步场景中展现出卓越性能。然而,新旧系统间存在显著差异:第一就是表结构重构,业务模型升级导致表间关系与字段定义发生根本性变化;第二就是字段值语义演变,业务字段的编码规则、数据类型及业务含义均发生变化或迁移。

这些差异使得直接的数据同步无法满足需求,传统ETL工具也因缺乏业务规则适配能力而受限,那不如就自己开发个ETL工具吧。

DFS 使用场景

  • 用于数据同步 - 用于源数据库与目标数据库表结构一致情况下的数据同步。

  • 用于数据管道 - 用于源数据库与目标数据库表结构不一致情况下的数据同步,经过DPU自定义表结构映射与数据加工,可实现数据管道功能。

  • 用于数据备份 - 不设置目标数据库,使用内置的DFS日志数据库,实现增量数据事件备份功能。

已测试环境

  • 源数据库: MySQL 5.7.x(打开binlog,且binlog-format = ROW
  • 目标数据库: MySQL 8.0.x
  • Python 版本: 3.12+
  • Python 库: requirements.txt

安装

  1. 按需修改 config.example.yaml 并重命名为 config.yaml > 如需指定binlog文件与binlog位置,请在 config.yaml 中设置 binlog_filebinlog_pos

  2. 构建并启动容器

    Bash
    docker build -t mysql-dataflowsync:latest .
    docker compose up -d
    

Performance

Configuration: Using a 2022 M2 MacBook Pro (16GB), Python 3.12, source database is MySQL 5.7.x (PolarDB), target database is MySQL 8.0.x (PolarDB)

CDC CDC (Change Data Capture) Unit

A change data capture unit implemented using the Python mysql-replication and persist-queue libraries.

CDC processing speed: Maximum 59 records per second (MAX 59rps) on a single node, with an average of 17ms per record.

DPU DPU (Data Processing Unit)

Custom Data Processing Unit for unidirectional data synchronisation between two databases.

DPU processing speed: in the case of a single node, the maximum processing 18 records per second (MAX 18rps), the average 55ms processing a record.

License

This project is licensed under the MIT License.

References

评论