使用 Maxwell 订阅 MySQL binlog 同步至 Kafka

Maxwell 是一款开源的 MySQL binlog 数据同步工具,它可以实时解析 MySQL binlog,将数据变更事件转换为 JSON 格式,然后推送至 Kafka 等消息队列中。

它是由美国 Zendesk 开源,用 Java 编写的 MySQL 实时抓取软件。实时读取MySQL 二进制日志 Binlog,并生成 JSON 格式的消息,作为生产者发送给 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。

这篇文章尽量简单的描述关键步骤,具体的实践小例子我通过代码来给大家呈现了,我的代码用golang编写包括简单数据插入mysql到触发Maxwell生产信息并通过Kafka消费解析binlog数据代码中都有详细的说明哦

我的代码在这里

Mysql主从复制过程

  1. Master 主库将改变记录,写到二进制日志(binary log)中
  2. Slave 从库向 mysql master 发送 dump 协议,将 master 主库的 binary log events 拷贝到它的中继日志(relay log);
  3. Slave 从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库。

Mysql中的binlog

什么是binlog

MySQL 的二进制日志可以说 MySQL 最重要的日志了,它记录了所有的 DDL 和 DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL 的二进制日志是事务安全型的

一般来说开启二进制日志大概会有 1%的性能损耗。二进制有两个最重要的使用场景:

其一:MySQL Replication 在 Master 端开启 binlog,Master 把它的二进制日志传递给 slaves 来达到 master-slave 数据一致的目的。
其二:自然就是数据恢复了,通过使用 mysqlbinlog 工具来恢复数据。

二进制日志包括两类文件:

二进制日志索引文件(文件名后缀为.index)用于记录所有 的二进制文件

二进制日志文件(文件名后缀为.00000*)记录数据库所有的 DDL 和 DML(除了数据查询语句)语句事件。

binlog的开启

查看是否开启binlog

1
mysql> show variables like 'log_bin';

如果开启了你将看到:

1
log_bin  ON

OFF表示未开启

首先我们需要找到mysql的配置文件的位置

1
/etc/my.cnf

在 mysql 的配置文件下,修改配置

在[mysqld] 区块,设置/添加 log-bin=mysql-bin

这个表示 binlog 日志的前缀是 mysql-bin,以后生成的日志文件就是 mysql-bin.000001的文件后面的数字按顺序生成,每次 mysql 重启或者到达单个文件大小的阈值时,新生一个文件,按顺序编号。

binlog的分类设置

mysql binlog 的格式有三种,分别是 STATEMENT,MIXED,ROW。

三种格式的区别:

statement

语句级,binlog 会记录每次一直行写操作的语句。相对 row 模式节省空间,但是可能产生不一致性,比如 update test set create_date=now(); 如果用 binlog 日志进行恢复,由于执行时间不同可能产生的数据就不同。
优点: 节省空间
缺点: 有可能造成数据不一致

row

星级, binlog 会记录每次操作后每行记录的变化。
优点:保持数据的绝对一致性。因为不管 sql 是什么,引用了什么函数,他只记录执行后的效果。
缺点:占用较大的空间。

mixed

混合级别,statement 的升级版,一定程度上解决了 statement 模式因为一些情况而造成的数据不一致问题
默认还是 statement,在某些情况下,譬如: 当函数中包含 UUID() 时; 包含 AUTO_INCREMENT 字段的表被更新时; 执行 INSERT DELAYED 语句时; 用 UDF 时; 会按照 ROW 的方式进行处理
优点:节省空间,同时兼顾了一定的一致性。
缺点:还有些极个别情况依旧会造成不一致,另外 statement 和 mixed 对于需要对binlog 监控什么情况都不方便。

综合上面对比,Maxwell 想做监控分析,选择 row 格式比较合适

查看是否为ROW模式

1
mysql> show variables like 'binlog_format';

命令行设置ROW

1
2
mysql> set global binlog_format=ROW;
mysql> set global binlog_row_image=FULL;

注意:我用的 mysql8.0.32 默认就是对的所以没有改动,这些改动是否生效我也不晓得。。。

综上

binlog_format,binlog_row_image 也可以配置文件开启,总结下来你需要在 /etc/my.cnf 没有可以自己创建并添加以下

1
2
3
4
5
6
[mysqld]
log-bin=mysql-bin # 开启Binlog 一般只需要修改这一行即可
binlog-format=ROW # 设置格式 此行可以不加 命令设置即可 详见下方拓展
binlog_row_image=FULL # 记录前后镜像所有内容
server_id=1 # 配置serverID 这一行本来就存在
```·

maxwell工作原理

Maxwell 的工作原理很简单,就是把自己伪装成 MySQL 的一个 slave,然后以 slave的身份假装从 MySQL(master)复制数据

意义

通过使用 Maxwell,我们可以实现以下目标:

  1. 实时数据同步:Maxwell 能够实时捕获 MySQL 数据库的数据变更,帮助用户快速同步数据到其他系统。
  2. 数据解耦:Maxwell 将数据变更事件发布到 Kafka,降低多系统间的直接依赖,提高系统的可扩展性和可维护性。
  3. 数据审计:通过解析 binlog,可以记录数据的变更历史,便于数据审计和排查问题。
  4. 事件驱动架构:Maxwell 可以为事件驱动架构提供一种实现方式,通过监听 Kafka 上的数据变更事件,触发相应的业务逻辑。

应用场景

以下是使用 Maxwell 实现 MySQL binlog 同步至 Kafka 的一些典型应用场景:

  1. 数据实时同步:通过 Maxwell 实现 MySQL 到其他数据存储系统(如 Hadoop、Elasticsearch 等)的实时数据同步。
  2. 数据事件驱动:在微服务架构中,通过监听 Kafka 上的数据变更事件,实现业务逻辑的解耦和异步处理。
  3. 实时数据监控:基于 Maxwell 和 Kafka,可以方便地实现实时数据监控和告警系统。
  4. 数据迁移:使用 Maxwell 进行数据迁移,保证数据的一致性和实时性。

使用说明

接下来,我们将介绍如何使用 Maxwell 将 MySQL 的 binlog 同步至 Kafka。

环境准备

安装 MySQL,并启用 binlog 功能及bing。见上面的 Mysql 的 bingo 的说明,说的很清楚。

安装 Kafka

安装 Maxwell

配置 Maxwell

创建 Maxwell 用户和数据库:

1
2
mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY 'password';
mysql> CREATE USER 'maxwell'@'localhost' IDENTIFIED BY 'password'

为 Maxwell 用户授权访问 binlog:

1
2
3
4
5
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'localhost';

mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'localhost';

Maxwell 配置详解

上面的例子是我用命令行启动的,可以满足大多数的需求,但我在使用时遇到了大坑,所以我觉得有必要把Maxwell的配置字段弄清楚,然后使用配置文件的方式来启动。

输出格式相关的配置

输出JSON字符串的格式

1
{"database":"JYDB","table":"LC_STIBOpTradInfoAtta","type":"update","ts":1680756712,"xid":102261559175,"commit":true,"data":{"ID":631388766750,"RID":631388766016,"ReportArea":13,"SerialNumber":1,"SalesDepartmentName":"国融证券股份有限公司湖南分公 司","BOCode":246118,"SecuCoBelongedCode":17059,"SecuCoBelonged":"国融证券","BuySum":null,"SaleSum":7362289.0700,"UpdateTime":"2020-01-03 17:51:11","InsertTime":"2020-01-03 17:46:07","JSID":631389071369},"old":{"SalesDepartmentName":"国融证券股份有限公司湖南分公司"}}
  • data 最新的数据,修改后的数据
  • old 旧数据,修改前的数据
  • type 操作类型,有insert, update, delete, database-create, database-alter, database-drop, table-create, table-alter, table-drop,bootstrap-insert,int(未知类型)
  • xid 事务id
  • commit 同一个xid代表同一个事务,事务的最后一条语句会有commit,可以利用这个重现事务
  • server_id
  • thread_id
  • 运行程序时添加参数–output_ddl,可以捕捉到ddl语句
  • datetime列会输出为”YYYY-MM-DD hh:mm:ss”,如果遇到”0000-00-00 00:00:00”会原样输出
  • maxwell支持多种编码,但仅输出utf8编码
  • maxwell的TIMESTAMP总是作为UTC处理,如果要调整为自己的时区,需要在后端逻辑上进行处理

与输出格式相关的配置如下

选项 参数值 描述 默认值
output_binlog_position BOOLEAN 是否包含 binlog position false
output_gtid_position BOOLEAN 是否包含 gtid position false
output_commit_info BOOLEAN 是否包含 commit and xid true
output_xoffset BOOLEAN 是否包含 virtual tx-row offset false
output_nulls BOOLEAN 是否包含值为NULL的字段 true
output_server_id BOOLEAN 是否包含 server_id false
output_thread_id BOOLEAN 是否包含 thread_id false
output_schema_id BOOLEAN 是否包含 schema_id false
output_row_query BOOLEAN 是否包含 INSERT/UPDATE/DELETE 语句. Mysql需要开启 binlog_rows_query_log_events false
output_ddl BOOLEAN 是否包含 DDL (table-alter, table-create, etc) events false
output_null_zerodates BOOLEAN 是否将 ‘0000-00-00’ 转换为 null? false

进阶使用

基本的配置

选项 参数值 描述 默认值
config 配置文件 config.properties 的路径
log_level `[debug info warn
daemon 指定Maxwell实例作为守护进程到后台运行
env_config_prefix STRING 匹配该前缀的环境变量将被视为配置值

可以把Maxwell的启动参数写到一个配置文件 config.properties 中,然后通过 config 选项指定, bin/maxwell--config config.properties

mysql 配置选项

Maxwell 根据用途将 MySQL 划分为3种角色:

host:主机,建maxwell库表,存储捕获到的schema等信息

  • 主要有六张表,bootstrap用于数据初始化,schemas记录所有的binlog文件信息,databases记录了所有的数据库信息,tables记录了所有的表信息,columns记录了所有的字段信息,positions记录了读取binlog的位移信息,heartbeats记录了心跳信息

replication_host:复制主机,Event监听,读取该主机binlog

  • hostreplication_host分开,可以避免 replication_user 往生产库里写数据

schema_host:schema主机,捕获表结构schema的主机

  • binlog里面没有字段信息,所以maxwell需要从数据库查出schema,存起来。
  • schema_host一般用不到,但在binlog-proxy场景下就很实用。比如要将已经离线的binlog通过maxwell生成json流,于是自建一个mysql server里面没有结构,只用于发送binlog,此时表机构就可以制动从 schema_host 获取。

通常,这三个主机都是同一个, schema_host 只在有 replication_host 的时候使用。

与MySQL相关的有下列配置

选项 参数值 描述 默认值
host STRING mysql 地址 localhost
user STRING mysql 用户名
password STRING mysql 密码 (no password)
port INT mysql 端口 3306
jdbc_options STRING mysql jdbc connection options DEFAULT_JDBC_OPTS
ssl SSL_OPT SSL behavior for mysql cx DISABLED
schema_database STRING Maxwell用于维护的schema和position将使用的数据库 maxwell
client_id STRING 用于标识Maxwell实例的唯一字符串 maxwell
replica_server_id LONG 用于标识Maxwell实例的唯一数字 6379 (see notes)
master_recovery BOOLEAN enable experimental master recovery code false
gtid_mode BOOLEAN 是否开启基于GTID的复制 false
recapture_schema BOOLEAN 重新捕获最新的表结构(schema),不可在 config.properties中配置 false
replication_host STRING server to replicate from. See split server roles schema-store host
replication_password STRING password on replication server (none)
replication_port INT port on replication server 3306
replication_user STRING user on replication server
replication_ssl SSL_OPT SSL behavior for replication cx cx DISABLED
schema_host STRING server to capture schema from. See split server roles schema-store host
schema_password STRING password on schema-capture server (none)
schema_port INT port on schema-capture server 3306
schema_user STRING user on schema-capture server
schema_ssl SSL_OPT SSL behavior for schema-capture server DISABLED

生产者的配置

仅介绍kafka,其他的生产者的配置详见官方文档。

kafka是maxwell支持最完善的一个生产者,并且内置了多个版本的kafka客户端(0.8.2.2, 0.9.0.1, 0.10.0.1, 0.10.2.1 or 0.11.0.1, 1.0.0.),默认 kafka_version=1.0.0(当前Maxwell版本1.20.0)

Maxwell 会将消息投递到Kafka的Topic中,该Topic由 kafka_topic 选项指定,默认值为 maxwell,除了指定为静态的Topic,还可以指定为动态的,譬如 namespace_%{database}_%{table}%{database}%{table} 将被具体的消息的 database 和 table 替换。

Maxwell 读取配置时,如果配置项是以 kafka. 开头,那么该配置将设置到 Kafka Producer 客户端的连接参数中去,譬如

1
kafka.acks = 1kafka.compression.type = snappykafka.retries=5

下面是Maxwell通用生产者和Kafka生产者的配置参数

选项 参数值 描述 默认值
producer PRODUCER_TYPE 生产者类型 stdout
custom_producer.factory CLASS_NAME 自定义消费者的工厂类
producer_ack_timeout PRODUCER_ACK_TIMEOUT 异步消费认为消息丢失的超时时间(毫秒ms)
producer_partition_by PARTITION_BY 输入到kafka/kinesis的分区函数 database
producer_partition_columns STRING 若按列分区,以逗号分隔的列名称
producer_partition_by_fallback PARTITION_BY_FALLBACK producer_partition_by=column时需要,当列不存在是使用
ignore_producer_error BOOLEAN 为false时,在kafka/kinesis发生错误时退出程序;为true时,仅记录日志 See also dead_letter_topic true
kafka.bootstrap.servers STRING kafka 集群列表, HOST:PORT[,HOST:PORT]
kafka_topic STRING kafka topic maxwell
dead_letter_topic STRING 详见官方文档
kafka_version KAFKA_VERSION 指定maxwell的 kafka 生产者客户端版本,不可在config.properties中配置 0.11.0.1
kafka_partition_hash `[default murmur3]` 选择kafka分区时使用的hash方法
kafka_key_format `[array hash]` how maxwell outputs kafka keys, either a hash or an array of hashes
ddl_kafka_topic STRING 当 output_ddl为true时, 所有DDL的消息都将投递到该topic kafka_topic

过滤器配置

Maxwell 可以通过 --filter 配置项来指定过滤规则,通过 exclude 排除,通过 include 包含,值可以为具体的数据库、数据表、数据列,甚至用 Javascript 来定义复杂的过滤规则;可以用正则表达式描述,有几个来自官网的例子

1
2
3
4
5
# 仅匹配foodb数据库的tbl表和所有table_数字的表--filter='exclude: foodb.*, include: foodb.tbl, include: foodb./table_\d+/'
# 排除所有库所有表,仅匹配db1数据库--filter = 'exclude: *.*, include: db1.*'
# 排除含db.tbl.col列值为reject的所有更新--filter = 'exclude: db.tbl.col = reject'
# 排除任何包含col_a列的更新--filter = 'exclude: *.*.col_a = *'
# blacklist 黑名单,完全排除bad_db数据库,若要恢复,必须删除maxwell库--filter = 'blacklist: bad_db.*'

Maxwell监控

Maxwell 提供了 baselogging mechanism,JMX,HTTPorbypush toDatadog 这四种监控方式,与监控相关的配置项有下列这些:

选项 参数值 描述 默认值
metrics_prefix STRING 指标的前缀 MaxwellMetrics
metrics_type `[slf4j jmx http
metrics_jvm BOOLEAN 是否收集JVM信息 false
metrics_slf4j_interval SECONDS 将指标记录到日志的频率, metrics_type须配置为slf4j 60
http_port INT metrics_type为http时,发布指标绑定的端口 8080
http_path_prefix STRING http的路径前缀 /
http_bind_address STRING http发布指标绑定的地址 all addresses
http_diagnostic BOOLEAN http是否开启diagnostic后缀 false
http_diagnostic_timeout MILLISECONDS http diagnostic 响应超时时间 10000
metrics_datadog_type `[udp http]` metrics_type为datadog时发布指标的方式
metrics_datadog_tags STRING 提供给 datadog 的标签,如 tag1:value1,tag2:value2
metrics_datadog_interval INT 推指标到datadog的频率,单位秒 60
metrics_datadog_apikey STRING 当 metrics_datadog_type=http 时datadog用的api key
metrics_datadog_host STRING 当 metrics_datadog_type=udp时推指标的目标地址 localhost
metrics_datadog_port INT 当 metrics_datadog_type=udp 时推指标的端口 8125

具体可以得到哪些监控指标呢?有如下,注意所有指标都预先配置了指标前缀 metrics_prefix

指标 类型 说明
messages.succeeded Counters 成功发送到kafka的消息数量
messages.failed Counters 发送失败的消息数量
row.count Counters 已处理的binlog行数,注意并非所有binlog都发往kafka
messages.succeeded.meter Meters 消息成功发送到Kafka的速率
messages.failed.meter Meters 消息发送失败到kafka的速率
row.meter Meters 行(row)从binlog连接器到达maxwell的速率
replication.lag Gauges 从数据库事务提交到Maxwell处理该事务之间所用的时间(毫秒)
inflightmessages.count Gauges 当前正在处理的消息数(等待来自目的地的确认,或在消息之前)
message.publish.time Timers 向kafka发送record所用的时间(毫秒)
message.publish.age Timers 从数据库产生事件到发送到Kafka之间的时间(毫秒),精确度为+/-500ms
replication.queue.time Timers 将一个binlog事件送到处理队列所用的时间(毫秒)

上述有些指标为kafka特有的,并不支持所有的生产者。

实验一番,通过 http 方式获取监控指标

1
docker run -p 8080:8080 -it --rm zendesk/maxwell bin/maxwell --user='maxwell' \    --password='123456' --host='10.100.97.246' --producer=kafka \    --kafka.bootstrap.servers='10.100.97.246:9092' --kafka_topic=maxwell --log_level=debug \    --metrics_type=http  --metrics_jvm=true --http_port=8080

上面的配置大部分与前面的相同,不同的有 -p8080:8080 docker端口映射,以及 --metrics_type=http--metrics_jvm=true--http_port=8080,配置了通过http方式发布指标,启用收集JVM信息,端口为8080,之后可以通过 http://10.100.97.246:8080/metrics 便可获取所有的指标

image-20230406150757776

http 方式有四种后缀,分别对应四种不同的格式

1
所有指标以JSON格式返回

如果是通过 JMX 的方式收集Maxwell监控指标,可以 JAVA_OPTS 环境变量配置JMX访问权限

1
export JAVA_OPTS="-Dcom.sun.management.jmxremote \-Dcom.sun.management.jmxremote.port=9010 \-Dcom.sun.management.jmxremote.local.only=false \-Dcom.sun.management.jmxremote.authenticate=false \-Dcom.sun.management.jmxremote.ssl=false \-Djava.rmi.server.hostname=10.100.97.246"

GTID 支持

Maxwell 从1.8.0版本开始支持基于GTID的复制(GTID-based replication),在GTID模式下,Maxwell将在主机更改后透明地选择新的复制位置。

什么是GTID Replication?

GTID (Global Transaction ID) 是对于一个已提交事务的编号,并且是一个全局唯一的编号。

从 MySQL 5.6.5 开始新增了一种基于 GTID 的复制方式。通过 GTID 保证了每个在主库上提交的事务在集群中有一个唯一的ID。这种方式强化了数据库的主备一致性,故障恢复以及容错能力。

在原来基于二进制日志的复制中,从库需要告知主库要从哪个偏移量进行增量同步,如果指定错误会造成数据的遗漏,从而造成数据的不一致。借助GTID,在发生主备切换的情况下,MySQL的其它从库可以自动在新主库上找到正确的复制位置,这大大简化了复杂复制拓扑下集群的维护,也减少了人为设置复制位置发生误操作的风险。另外,基于GTID的复制可以忽略已经执行过的事务,减少了数据发生不一致的风险。

注意事项

timestamp column

maxwell对时间类型(datetime, timestamp, date)都是当做字符串处理的,这也是为了保证数据一致(比如 0000-00-0000:00:00这样的时间在timestamp里是非法的,但mysql却认,解析成java或者python类型就是null/None)。

如果MySQL表上的字段是 timestamp 类型,是有时区的概念,binlog解析出来的是标准UTC时间,但用户看到的是本地时间。比如 f_create_time timestamp 创建时间是北京时间 2018-01-0521:01:01,那么mysql实际存储的是 2018-01-0513:01:01,binlog里面也是这个时间字符串。如果不做消费者不做时区转换,会少8个小时。

与其每个客户端都要考虑这个问题,我觉得更合理的做法是提供时区参数,然后maxwell自动处理时区问题,否则要么客户端先需要知道哪些列是timestamp类型,或者连接上原库缓存上这些类型。

binary column

maxwell可以处理binary类型的列,如blob、varbinary,它的做法就是对二进制列使用 base64_encode,当做字符串输出到json。消费者拿到这个列数据后,不能直接拼装,需要 base64_decode

表结构不同步

如果是拿比较老的binlog,放到新的mysql server上去用maxwell拉去,有可能表结构已经发生了变化,比如binlog里面字段比 schema_host 里面的字段多一个。目前这种情况没有发现异常,比如阿里RDS默认会为 无主键无唯一索引的表,增加一个 __##alibaba_rds_rowid##__,在 show create tableschema 里面都看不到这个隐藏主键,但binlog里面会有,同步到从库。

另外我们有通过git去管理结构版本,如果真有这种场景,也可以应对。

启动 Maxwell

命令行启动

刚刚步骤中解压安装了 Maxwell 之后进入 Maxwell 文件夹中使用命令行进行启动,我使用的是以下的命令行

使用以下命令启动 Maxwell:

1
2
/home/ubuntu/maxwell-1.24.0/bin/maxwell --user="maxwell" --password="xxxxx" --host="xxxxx" --replica_server_id=12345 --client_id=12345 --producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=maxwell --filter 'exclude: *.*, include:test1.stocks'

注意:这个命令是配合我的代码例子的

简单说明一下,这个producer是指定你的输出目标为什么,我这里使用的是kafka,你在调试时可以先使用 stdout,直接输出数据在终端。 –kafka.bootstrap.servers=localhost:9092 这个就指定我的 Kafka broker 的地址当然我用的本机的。–kafka_topic=maxwell 这个就是输出数据目标 topic。 –filter 后面是一个简单的过滤规则,我这个filter的意思是除了* 。* 外的数据都不要而且只要包含test1数据库stocks表对应的 binlog 数据

重要的注意事项!!!

你可能注意到了

–replica_server_id=12345

–client_id=12345

这两个配置项,在上面的 mysql 配置选项说明汇总有说明,这两个参数是标识从服务的唯一身份的,在maxwell的配置文件中如果没有配置,就会使用默认值。正式由于我没有配置,导致我后面碰到了 Slave Server_id 冲突的坑。

事情是这样的我在生产部署了一套binlog监听,在测试也部署了一套,因为我们的源数据库是同步别人的,只有一个。。。 所以我生产和测试订阅的是同一个库,这样问题就来了,我在配好生产环境后就去配测试环境,在我跑起测试环境的一刹那,生产环境就凉了。一直报从服务ID冲突:

1
2
3
ERROR TaskManager - cause:
com.github.shyiko.mysql.binlog.network.ServerException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event 'mysql-bin.006738' at 147606315, the last event read from '/data1/mysql_root/log/20120/mysql-bin.006738' at 147609022, the last byte read from '/data1/mysql_root/log/20120/mysql-bin.006738' at 147609022.

我当时因为不懂原理,很懵,重新生成一个 mysql 用户,依然不行,后面去了解了整个原理才发现,原来这个货时伪装成了一个mysql副本,然后mysql以为这是自己的一个从库,就不断的同步binlog信息过来。

所以当我跑起来两个但没有设置replica_server_id,client_id这个时候必然出现Slave Server_id 冲突。希望大家不要碰壁了。

配置文件启动

可以把Maxwell的启动参数写到一个配置文件 config.properties 中,然后通过 config 选项指定, bin/maxwell --config config.properties

我的配置文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 日志级别
log_level=info
client_id=12345
replica_server_id=12345

# Maxwell 元数据数据库信息
host=localhost
port=3306
user=maxwell
password=xxxx

#producer kafka的配置信息
producer=kafka
kafka.bootstrap.servers=localhost:9092
kafka_topic=GMALL_WD_DATA_INFO
kafka_partition_hash=murmur3
producer_partition_by=primary_key

#被同步的数据库的信息
replication_host=localhost
replication_user=maxwell
replication_password=xxxxx
replication_port=3306
jdbc_options=useSSL=false&serverTimezone=Asia/Shanghai

# 配置过滤器,可以配置只同步指定的数据库,指定的表的数据变动。
filter=exclude: *.*, include:test1.stocks

MySQL 数据变更监听

当 Maxwell 成功启动后,你可以在 MySQL 中进行数据的增删改查操作,Maxwell 会实时将数据变更事件推送至 Kafka。

Kafka 消费端监听

在 Kafka 的消费端,你可以监听 Maxwell 发布的数据变更事件,然后根据业务需求进行相应的处理。

我的代码中/LittleBeeMark/CoolGoPkg/apply_kfk/instance/maxwell_test.go 下有完整的示例可以直接跑:

1
2
3
4
5
6
7
8
9
10
11
12
func TestMaxWell_Start(t *testing.T) {
// 初始化配置
conf.InitConfig("../conf/config.yaml")
db.InitDB()

mw := &MaxWell{}
// 初始化kafka消费者
mw.KafkaConsumer = consumer.NewKfkConsumerGroup(&conf.Config.KafkaGroupConf)
mw.KafkaConsumer.AddHandler(&handler.BinlogHandler{})
mw.Start()
}

这个示例主要使用的函数mw.Start() 在同目录下的maxwell.go中,我简单说下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
type MaxWell struct {
KafkaConsumer *consumer.KfkConsumerGroup
}

func (mw *MaxWell) Start() {
// 启动kafka消费者
mw.KafkaConsumer.StartASync()

// 模拟插入数据
mw.KafkaConsumer.Wg.Add(1)
go func() {
defer mw.KafkaConsumer.Wg.Done()
for {
if mw.KafkaConsumer.StopCtx.Err() != nil {
fmt.Println("insert db stop ctx err : ", mw.KafkaConsumer.StopCtx.Err())
break
}
time.Sleep(500 * time.Millisecond)
err := db.InsertStockInfo(&model.Stock{
ProdCode: "000001.SS",
ProdName: "上证指数",
TradeStatus: "Trade",
Last: 3000.00,
})
if err != nil {
fmt.Println("insert db err : ", err)
}
}
}()

// 5秒后优雅退出
time.Sleep(5 * time.Second)
mw.KafkaConsumer.Quit()
}

就是启动一个监听叫做 maxwell 的kafka topic 的消费者,然后不断的每隔半秒插入一个股票数据,查看监听消费者的数据解析打印情况,跑出来的结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
Try to connect to MYSQL host:  127.0.0.1 , port:  3306 root:xxxxx@tcp(127.0.0.1:3306)/test1?charset=utf8&parseTime=True&allowNativePasswords=true
Connected to MYSQL: 127.0.0.1 , port: 3306

2023/04/02 19:08:29 /Users/zhankunyang/go/src/LittleBeeMark/CoolGoPkg/apply_kfk/db/client.go:15
[0.952ms] [rows:-] SELECT column_name, column_default, is_nullable = 'YES', data_type, character_maximum_length, column_type, column_key, extra, column_comment, numeric_precision, numeric_scale , datetime_precision FROM information_schema.columns WHERE table_schema = 'test1' AND table_name = 'fiances' ORDER BY ORDINAL_POSITION
2023/04/02 19:08:29 Starting a new Sarama consumer
parse version : 3.4.0
Consumer Group Setup ..... [maxwell]
Sarama consumer up and running!...

2023/04/02 19:08:34 /Users/zhankunyang/go/src/LittleBeeMark/CoolGoPkg/apply_kfk/db/stock.go:6
[2.537ms] [rows:1] INSERT INTO `stocks` (`prod_code`,`prod_name`,`trade_status`,`last`) VALUES ('000001.SS','上证指数','Trade',3000.000000)
BinlogHandler HandleMessage obj:handler.BinLogItem{Database:"test1", Table:"stocks", Type:"insert", Ts:1680433714, Xid:82272, Xoffset:0, Commit:true, Data:map[string]interface {}{"id":26, "last":3000, "prod_code":"000001.SS", "prod_name":"上证指数", "trade_status":"Trade"}, Old:interface {}(nil)}

......

2023/04/02 19:08:35 /Users/zhankunyang/go/src/LittleBeeMark/CoolGoPkg/apply_kfk/db/stock.go:6
[3.089ms] [rows:1] INSERT INTO `stocks` (`prod_code`,`prod_name`,`trade_status`,`last`) VALUES ('000001.SS','上证指数','Trade',3000.000000)
insert db stop ctx err : context canceled
Consumer Group Cleanup ..... [maxwell]
2023/04/02 19:08:35 ctx error context canceled
kfk topic:[maxwell] consumer has quit .....
--- PASS: TestMaxWell_Start (5.21s)
PASS


其中 BinlogHandler HandleMessage obj:handler.BinLogItem{Database:”test1”, Table:”stocks”, Type:”insert”, Ts:1680433714, Xid:82272, Xoffset:0, Commit:true, Data:map[string]interface {}{“id”:26, “last”:3000, “prod_code”:”000001.SS”, “prod_name”:”上证指数”, “trade_status”:”Trade”}, Old:interface {}(nil)} 是我的消费者已经解析出的 binlog 数据可以看到数据库表操作等信息都有。

总结

Maxwell 是一个功能强大、实时性高的 MySQL binlog 数据同步工具,它可以将数据变更事件推送至 Kafka,帮助我们实现数据的实时同步、数据事件驱动等多种应用场景。通过使用 Maxwell,我们可以提高系统的可扩展性、可维护性和实时性,为业务发展提供强有力的支持。