数据管道工具 - mysql-dataflowsync(DFS)¶
qqAys/mysql-dataflowsync - GitHub | English
DFS 介绍¶
DFS是一个数据同步框架,通过流式读取mysql binlog实现数据捕获,目标数据库可以是mysql,也可以是其他数据库。这个框架由一个毫秒级的增量数据捕获模块(CDC)、基于sqlite数据库的持久化队列 persist-queue、自动关系处理模块(rel_mgr)以及符合业务逻辑的、复杂的数据转换模块(DPU)构成,使用 dfs-entrypoint.py 启动,可以自由组合调用。在一个日均7.5万事件负载的集群数据库中,增量数据同步能够在秒级延迟内完成(平均延迟1秒内),同时保持了数据的完整性和一致性。
DFS 工作流¶
开发背景¶
公司有个项目要实施数据迁移,我先来介绍一下当时的业务场景,我们有一个旧业务系统,这个系统是大范围使用的,产品设计了新一代业务系统,需要进行小范围生产试点,试点需要使用生产数据,并且产生的数据影响也是生产级别的。试点环境需直接使用生产数据,但旧系统数据高度耦合,无法通过类似"删档内测"模式实现数据隔离。
我首先评估了阿里云DTS,其在数据同步场景中展现出卓越性能。然而,新旧系统间存在显著差异:第一就是表结构重构,业务模型升级导致表间关系与字段定义发生根本性变化;第二就是字段值语义演变,业务字段的编码规则、数据类型及业务含义均发生变化或迁移。
这些差异使得直接的数据同步无法满足需求,传统ETL工具也因缺乏业务规则适配能力而受限,那不如就自己开发个ETL工具吧。
DFS 使用场景¶
-
用于数据同步 - 用于源数据库与目标数据库表结构一致情况下的数据同步。
-
用于数据管道 - 用于源数据库与目标数据库表结构不一致情况下的数据同步,经过DPU自定义表结构映射与数据加工,可实现数据管道功能。
-
用于数据备份 - 不设置目标数据库,使用内置的DFS日志数据库,实现增量数据事件备份功能。
已测试环境¶
- 源数据库: MySQL 5.7.x(打开binlog,且
binlog-format = ROW
) - 目标数据库: MySQL 8.0.x
- Python 版本: 3.12+
- Python 库: requirements.txt
安装¶
-
按需修改
config.example.yaml
并重命名为config.yaml
> 如需指定binlog文件与binlog位置,请在config.yaml
中设置binlog_file
与binlog_pos
。 -
构建并启动容器
Performance¶
Configuration: Using a 2022 M2 MacBook Pro (16GB), Python 3.12, source database is MySQL 5.7.x (PolarDB), target database is MySQL 8.0.x (PolarDB)
CDC (Change Data Capture) Unit¶
A change data capture unit implemented using the Python mysql-replication
and persist-queue
libraries.
CDC processing speed: Maximum
59 records per second (MAX 59rps
) on a single node, with an average of 17ms
per record.
DPU (Data Processing Unit)¶
Custom Data Processing Unit for unidirectional data synchronisation between two databases.
DPU processing speed: in the case of a single node, the maximum processing 18
records per second (MAX 18rps
), the average 55ms
processing a record.
License¶
This project is licensed under the MIT License.
References¶
- python-mysql-replication - Apache License, Version 2.0
- persist-queue - BSD-3-Clause license