使用 Maxwell 订阅 MySQL binlog 同步至 Kafka
Maxwell 是一款开源的 MySQL binlog 数据同步工具,它可以实时解析 MySQL binlog,将数据变更事件转换为 JSON 格式,然后推送至 Kafka 等消息队列中。
它是由美国 Zendesk 开源,用 Java 编写的 MySQL 实时抓取软件。实时读取MySQL 二进制日志 Binlog,并生成 JSON 格式的消息,作为生产者发送给 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。
这篇文章尽量简单的描述关键步骤,具体的实践小例子我通过代码来给大家呈现了,我的代码用golang编写包括简单数据插入mysql到触发Maxwell生产信息并通过Kafka消费解析binlog数据代码中都有详细的说明哦
Mysql主从复制过程
- Master 主库将改变记录,写到二进制日志(binary log)中
- Slave 从库向 mysql master 发送 dump 协议,将 master 主库的 binary log events 拷贝到它的中继日志(relay log);
- Slave 从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库。
Mysql中的binlog
什么是binlog
MySQL 的二进制日志可以说 MySQL 最重要的日志了,它记录了所有的 DDL 和 DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL 的二进制日志是事务安全型的
一般来说开启二进制日志大概会有 1%的性能损耗。二进制有两个最重要的使用场景:
其一:MySQL Replication 在 Master 端开启 binlog,Master 把它的二进制日志传递给 slaves 来达到 master-slave 数据一致的目的。
其二:自然就是数据恢复了,通过使用 mysqlbinlog 工具来恢复数据。
二进制日志包括两类文件:
二进制日志索引文件(文件名后缀为.index)用于记录所有 的二进制文件
二进制日志文件(文件名后缀为.00000*)记录数据库所有的 DDL 和 DML(除了数据查询语句)语句事件。
binlog的开启
查看是否开启binlog
1 | mysql> show variables like 'log_bin'; |
如果开启了你将看到:
1 | log_bin ON |
OFF表示未开启
首先我们需要找到mysql的配置文件的位置
1 | /etc/my.cnf |
在 mysql 的配置文件下,修改配置
在[mysqld] 区块,设置/添加 log-bin=mysql-bin
这个表示 binlog 日志的前缀是 mysql-bin,以后生成的日志文件就是 mysql-bin.000001的文件后面的数字按顺序生成,每次 mysql 重启或者到达单个文件大小的阈值时,新生一个文件,按顺序编号。
binlog的分类设置
mysql binlog 的格式有三种,分别是 STATEMENT,MIXED,ROW。
三种格式的区别:
statement
语句级,binlog 会记录每次一直行写操作的语句。相对 row 模式节省空间,但是可能产生不一致性,比如 update test set create_date=now(); 如果用 binlog 日志进行恢复,由于执行时间不同可能产生的数据就不同。
优点: 节省空间
缺点: 有可能造成数据不一致
row
星级, binlog 会记录每次操作后每行记录的变化。
优点:保持数据的绝对一致性。因为不管 sql 是什么,引用了什么函数,他只记录执行后的效果。
缺点:占用较大的空间。
mixed
混合级别,statement 的升级版,一定程度上解决了 statement 模式因为一些情况而造成的数据不一致问题
默认还是 statement,在某些情况下,譬如: 当函数中包含 UUID() 时; 包含 AUTO_INCREMENT 字段的表被更新时; 执行 INSERT DELAYED 语句时; 用 UDF 时; 会按照 ROW 的方式进行处理
优点:节省空间,同时兼顾了一定的一致性。
缺点:还有些极个别情况依旧会造成不一致,另外 statement 和 mixed 对于需要对binlog 监控什么情况都不方便。
综合上面对比,Maxwell 想做监控分析,选择 row 格式比较合适
查看是否为ROW模式
1 | mysql> show variables like 'binlog_format'; |
命令行设置ROW
1 | mysql> set global binlog_format=ROW; |
注意:我用的 mysql8.0.32 默认就是对的所以没有改动,这些改动是否生效我也不晓得。。。
综上
binlog_format,binlog_row_image 也可以配置文件开启,总结下来你需要在 /etc/my.cnf 没有可以自己创建并添加以下
1 | [mysqld] |
maxwell工作原理
Maxwell 的工作原理很简单,就是把自己伪装成 MySQL 的一个 slave,然后以 slave的身份假装从 MySQL(master)复制数据
意义
通过使用 Maxwell,我们可以实现以下目标:
- 实时数据同步:Maxwell 能够实时捕获 MySQL 数据库的数据变更,帮助用户快速同步数据到其他系统。
- 数据解耦:Maxwell 将数据变更事件发布到 Kafka,降低多系统间的直接依赖,提高系统的可扩展性和可维护性。
- 数据审计:通过解析 binlog,可以记录数据的变更历史,便于数据审计和排查问题。
- 事件驱动架构:Maxwell 可以为事件驱动架构提供一种实现方式,通过监听 Kafka 上的数据变更事件,触发相应的业务逻辑。
应用场景
以下是使用 Maxwell 实现 MySQL binlog 同步至 Kafka 的一些典型应用场景:
- 数据实时同步:通过 Maxwell 实现 MySQL 到其他数据存储系统(如 Hadoop、Elasticsearch 等)的实时数据同步。
- 数据事件驱动:在微服务架构中,通过监听 Kafka 上的数据变更事件,实现业务逻辑的解耦和异步处理。
- 实时数据监控:基于 Maxwell 和 Kafka,可以方便地实现实时数据监控和告警系统。
- 数据迁移:使用 Maxwell 进行数据迁移,保证数据的一致性和实时性。
使用说明
接下来,我们将介绍如何使用 Maxwell 将 MySQL 的 binlog 同步至 Kafka。
环境准备
安装 MySQL,并启用 binlog 功能及bing。见上面的 Mysql 的 bingo 的说明,说的很清楚。
安装 Kafka。
安装 Maxwell。
配置 Maxwell
创建 Maxwell 用户和数据库:
1 | mysql> CREATE USER 'maxwell'@'%' IDENTIFIED BY 'password'; |
为 Maxwell 用户授权访问 binlog:
1 | mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%'; |
Maxwell 配置详解
上面的例子是我用命令行启动的,可以满足大多数的需求,但我在使用时遇到了大坑,所以我觉得有必要把Maxwell的配置字段弄清楚,然后使用配置文件的方式来启动。
输出格式相关的配置
输出JSON字符串的格式
1 | {"database":"JYDB","table":"LC_STIBOpTradInfoAtta","type":"update","ts":1680756712,"xid":102261559175,"commit":true,"data":{"ID":631388766750,"RID":631388766016,"ReportArea":13,"SerialNumber":1,"SalesDepartmentName":"国融证券股份有限公司湖南分公 司","BOCode":246118,"SecuCoBelongedCode":17059,"SecuCoBelonged":"国融证券","BuySum":null,"SaleSum":7362289.0700,"UpdateTime":"2020-01-03 17:51:11","InsertTime":"2020-01-03 17:46:07","JSID":631389071369},"old":{"SalesDepartmentName":"国融证券股份有限公司湖南分公司"}} |
- data 最新的数据,修改后的数据
- old 旧数据,修改前的数据
- type 操作类型,有insert, update, delete, database-create, database-alter, database-drop, table-create, table-alter, table-drop,bootstrap-insert,int(未知类型)
- xid 事务id
- commit 同一个xid代表同一个事务,事务的最后一条语句会有commit,可以利用这个重现事务
- server_id
- thread_id
- 运行程序时添加参数–output_ddl,可以捕捉到ddl语句
- datetime列会输出为”YYYY-MM-DD hh:mm:ss”,如果遇到”0000-00-00 00:00:00”会原样输出
- maxwell支持多种编码,但仅输出utf8编码
- maxwell的TIMESTAMP总是作为UTC处理,如果要调整为自己的时区,需要在后端逻辑上进行处理
与输出格式相关的配置如下
选项 | 参数值 | 描述 | 默认值 |
---|---|---|---|
output_binlog_position | BOOLEAN | 是否包含 binlog position | false |
output_gtid_position | BOOLEAN | 是否包含 gtid position | false |
output_commit_info | BOOLEAN | 是否包含 commit and xid | true |
output_xoffset | BOOLEAN | 是否包含 virtual tx-row offset | false |
output_nulls | BOOLEAN | 是否包含值为NULL的字段 | true |
output_server_id | BOOLEAN | 是否包含 server_id | false |
output_thread_id | BOOLEAN | 是否包含 thread_id | false |
output_schema_id | BOOLEAN | 是否包含 schema_id | false |
output_row_query | BOOLEAN | 是否包含 INSERT/UPDATE/DELETE 语句. Mysql需要开启 binlog_rows_query_log_events | false |
output_ddl | BOOLEAN | 是否包含 DDL (table-alter, table-create, etc) events | false |
output_null_zerodates | BOOLEAN | 是否将 ‘0000-00-00’ 转换为 null? | false |
进阶使用
基本的配置
选项 | 参数值 | 描述 | 默认值 |
---|---|---|---|
config | 配置文件 config.properties 的路径 | ||
log_level | `[debug | info | warn |
daemon | 指定Maxwell实例作为守护进程到后台运行 | ||
env_config_prefix | STRING | 匹配该前缀的环境变量将被视为配置值 |
可以把Maxwell的启动参数写到一个配置文件 config.properties
中,然后通过 config 选项指定, bin/maxwell--config config.properties
mysql 配置选项
Maxwell 根据用途将 MySQL 划分为3种角色:
host
:主机,建maxwell库表,存储捕获到的schema等信息
- 主要有六张表,bootstrap用于数据初始化,schemas记录所有的binlog文件信息,databases记录了所有的数据库信息,tables记录了所有的表信息,columns记录了所有的字段信息,positions记录了读取binlog的位移信息,heartbeats记录了心跳信息
replication_host
:复制主机,Event监听,读取该主机binlog
- 将
host
和replication_host
分开,可以避免replication_user
往生产库里写数据
schema_host
:schema主机,捕获表结构schema的主机
- binlog里面没有字段信息,所以maxwell需要从数据库查出schema,存起来。
schema_host
一般用不到,但在binlog-proxy
场景下就很实用。比如要将已经离线的binlog通过maxwell生成json流,于是自建一个mysql server里面没有结构,只用于发送binlog,此时表机构就可以制动从 schema_host 获取。
通常,这三个主机都是同一个, schema_host
只在有 replication_host
的时候使用。
与MySQL相关的有下列配置
选项 | 参数值 | 描述 | 默认值 |
---|---|---|---|
host | STRING | mysql 地址 | localhost |
user | STRING | mysql 用户名 | |
password | STRING | mysql 密码 | (no password) |
port | INT | mysql 端口 3306 | |
jdbc_options | STRING | mysql jdbc connection options | DEFAULT_JDBC_OPTS |
ssl | SSL_OPT | SSL behavior for mysql cx | DISABLED |
schema_database | STRING | Maxwell用于维护的schema和position将使用的数据库 | maxwell |
client_id | STRING | 用于标识Maxwell实例的唯一字符串 | maxwell |
replica_server_id | LONG | 用于标识Maxwell实例的唯一数字 | 6379 (see notes) |
master_recovery | BOOLEAN | enable experimental master recovery code | false |
gtid_mode | BOOLEAN | 是否开启基于GTID的复制 | false |
recapture_schema | BOOLEAN | 重新捕获最新的表结构(schema),不可在 config.properties中配置 | false |
replication_host | STRING | server to replicate from. See split server roles | schema-store host |
replication_password | STRING | password on replication server | (none) |
replication_port | INT | port on replication server | 3306 |
replication_user | STRING | user on replication server | |
replication_ssl | SSL_OPT | SSL behavior for replication cx cx | DISABLED |
schema_host | STRING | server to capture schema from. See split server roles | schema-store host |
schema_password | STRING | password on schema-capture server | (none) |
schema_port | INT | port on schema-capture server | 3306 |
schema_user | STRING | user on schema-capture server | |
schema_ssl | SSL_OPT | SSL behavior for schema-capture server | DISABLED |
生产者的配置
仅介绍kafka,其他的生产者的配置详见官方文档。
kafka是maxwell支持最完善的一个生产者,并且内置了多个版本的kafka客户端(0.8.2.2, 0.9.0.1, 0.10.0.1, 0.10.2.1 or 0.11.0.1, 1.0.0.),默认 kafka_version=1.0.0(当前Maxwell版本1.20.0)
Maxwell 会将消息投递到Kafka的Topic中,该Topic由 kafka_topic
选项指定,默认值为 maxwell
,除了指定为静态的Topic,还可以指定为动态的,譬如 namespace_%{database}_%{table}
, %{database}
和 %{table}
将被具体的消息的 database 和 table 替换。
Maxwell 读取配置时,如果配置项是以 kafka.
开头,那么该配置将设置到 Kafka Producer 客户端的连接参数中去,譬如
1 | kafka.acks = 1kafka.compression.type = snappykafka.retries=5 |
下面是Maxwell通用生产者和Kafka生产者的配置参数
选项 | 参数值 | 描述 | 默认值 |
---|---|---|---|
producer | PRODUCER_TYPE | 生产者类型 | stdout |
custom_producer.factory | CLASS_NAME | 自定义消费者的工厂类 | |
producer_ack_timeout | PRODUCER_ACK_TIMEOUT | 异步消费认为消息丢失的超时时间(毫秒ms) | |
producer_partition_by | PARTITION_BY | 输入到kafka/kinesis的分区函数 | database |
producer_partition_columns | STRING | 若按列分区,以逗号分隔的列名称 | |
producer_partition_by_fallback | PARTITION_BY_FALLBACK | producer_partition_by=column时需要,当列不存在是使用 | |
ignore_producer_error | BOOLEAN | 为false时,在kafka/kinesis发生错误时退出程序;为true时,仅记录日志 See also dead_letter_topic | true |
kafka.bootstrap.servers | STRING | kafka 集群列表, HOST:PORT[,HOST:PORT] | |
kafka_topic | STRING | kafka topic | maxwell |
dead_letter_topic | STRING | 详见官方文档 | |
kafka_version | KAFKA_VERSION | 指定maxwell的 kafka 生产者客户端版本,不可在config.properties中配置 | 0.11.0.1 |
kafka_partition_hash | `[default | murmur3]` | 选择kafka分区时使用的hash方法 |
kafka_key_format | `[array | hash]` | how maxwell outputs kafka keys, either a hash or an array of hashes |
ddl_kafka_topic | STRING | 当 output_ddl为true时, 所有DDL的消息都将投递到该topic | kafka_topic |
过滤器配置
Maxwell 可以通过 --filter
配置项来指定过滤规则,通过 exclude
排除,通过 include
包含,值可以为具体的数据库、数据表、数据列,甚至用 Javascript 来定义复杂的过滤规则;可以用正则表达式描述,有几个来自官网的例子
1 | # 仅匹配foodb数据库的tbl表和所有table_数字的表--filter='exclude: foodb.*, include: foodb.tbl, include: foodb./table_\d+/' |
Maxwell监控
Maxwell 提供了 baselogging mechanism,JMX,HTTPorbypush toDatadog
这四种监控方式,与监控相关的配置项有下列这些:
选项 | 参数值 | 描述 | 默认值 |
---|---|---|---|
metrics_prefix | STRING | 指标的前缀 | MaxwellMetrics |
metrics_type | `[slf4j | jmx | http |
metrics_jvm | BOOLEAN | 是否收集JVM信息 | false |
metrics_slf4j_interval | SECONDS | 将指标记录到日志的频率, metrics_type须配置为slf4j | 60 |
http_port | INT | metrics_type为http时,发布指标绑定的端口 | 8080 |
http_path_prefix | STRING | http的路径前缀 | / |
http_bind_address | STRING | http发布指标绑定的地址 | all addresses |
http_diagnostic | BOOLEAN | http是否开启diagnostic后缀 | false |
http_diagnostic_timeout | MILLISECONDS | http diagnostic 响应超时时间 | 10000 |
metrics_datadog_type | `[udp | http]` | metrics_type为datadog时发布指标的方式 |
metrics_datadog_tags | STRING | 提供给 datadog 的标签,如 tag1:value1,tag2:value2 | |
metrics_datadog_interval | INT | 推指标到datadog的频率,单位秒 | 60 |
metrics_datadog_apikey | STRING | 当 metrics_datadog_type=http 时datadog用的api key | |
metrics_datadog_host | STRING | 当 metrics_datadog_type=udp时推指标的目标地址 | localhost |
metrics_datadog_port | INT | 当 metrics_datadog_type=udp 时推指标的端口 | 8125 |
具体可以得到哪些监控指标呢?有如下,注意所有指标都预先配置了指标前缀 metrics_prefix
指标 | 类型 | 说明 |
---|---|---|
messages.succeeded | Counters | 成功发送到kafka的消息数量 |
messages.failed | Counters | 发送失败的消息数量 |
row.count | Counters | 已处理的binlog行数,注意并非所有binlog都发往kafka |
messages.succeeded.meter | Meters | 消息成功发送到Kafka的速率 |
messages.failed.meter | Meters | 消息发送失败到kafka的速率 |
row.meter | Meters | 行(row)从binlog连接器到达maxwell的速率 |
replication.lag | Gauges | 从数据库事务提交到Maxwell处理该事务之间所用的时间(毫秒) |
inflightmessages.count | Gauges | 当前正在处理的消息数(等待来自目的地的确认,或在消息之前) |
message.publish.time | Timers | 向kafka发送record所用的时间(毫秒) |
message.publish.age | Timers | 从数据库产生事件到发送到Kafka之间的时间(毫秒),精确度为+/-500ms |
replication.queue.time | Timers | 将一个binlog事件送到处理队列所用的时间(毫秒) |
上述有些指标为kafka特有的,并不支持所有的生产者。
实验一番,通过 http 方式获取监控指标
1 | docker run -p 8080:8080 -it --rm zendesk/maxwell bin/maxwell --user='maxwell' \ --password='123456' --host='10.100.97.246' --producer=kafka \ --kafka.bootstrap.servers='10.100.97.246:9092' --kafka_topic=maxwell --log_level=debug \ --metrics_type=http --metrics_jvm=true --http_port=8080 |
上面的配置大部分与前面的相同,不同的有 -p8080:8080
docker端口映射,以及 --metrics_type=http--metrics_jvm=true--http_port=8080
,配置了通过http方式发布指标,启用收集JVM信息,端口为8080,之后可以通过 http://10.100.97.246:8080/metrics
便可获取所有的指标
http 方式有四种后缀,分别对应四种不同的格式
1 | 所有指标以JSON格式返回 |
如果是通过 JMX 的方式收集Maxwell监控指标,可以 JAVA_OPTS
环境变量配置JMX访问权限
1 | export JAVA_OPTS="-Dcom.sun.management.jmxremote \-Dcom.sun.management.jmxremote.port=9010 \-Dcom.sun.management.jmxremote.local.only=false \-Dcom.sun.management.jmxremote.authenticate=false \-Dcom.sun.management.jmxremote.ssl=false \-Djava.rmi.server.hostname=10.100.97.246" |
GTID 支持
Maxwell 从1.8.0版本开始支持基于GTID的复制(GTID-based replication),在GTID模式下,Maxwell将在主机更改后透明地选择新的复制位置。
什么是GTID Replication?
GTID (Global Transaction ID) 是对于一个已提交事务的编号,并且是一个全局唯一的编号。
从 MySQL 5.6.5 开始新增了一种基于 GTID 的复制方式。通过 GTID 保证了每个在主库上提交的事务在集群中有一个唯一的ID。这种方式强化了数据库的主备一致性,故障恢复以及容错能力。
在原来基于二进制日志的复制中,从库需要告知主库要从哪个偏移量进行增量同步,如果指定错误会造成数据的遗漏,从而造成数据的不一致。借助GTID,在发生主备切换的情况下,MySQL的其它从库可以自动在新主库上找到正确的复制位置,这大大简化了复杂复制拓扑下集群的维护,也减少了人为设置复制位置发生误操作的风险。另外,基于GTID的复制可以忽略已经执行过的事务,减少了数据发生不一致的风险。
注意事项
timestamp column
maxwell对时间类型(datetime, timestamp, date)都是当做字符串处理的,这也是为了保证数据一致(比如 0000-00-0000:00:00
这样的时间在timestamp里是非法的,但mysql却认,解析成java或者python类型就是null/None)。
如果MySQL表上的字段是 timestamp 类型,是有时区的概念,binlog解析出来的是标准UTC时间,但用户看到的是本地时间。比如 f_create_time timestamp
创建时间是北京时间 2018-01-0521:01:01
,那么mysql实际存储的是 2018-01-0513:01:01
,binlog里面也是这个时间字符串。如果不做消费者不做时区转换,会少8个小时。
与其每个客户端都要考虑这个问题,我觉得更合理的做法是提供时区参数,然后maxwell自动处理时区问题,否则要么客户端先需要知道哪些列是timestamp类型,或者连接上原库缓存上这些类型。
binary column
maxwell可以处理binary类型的列,如blob、varbinary,它的做法就是对二进制列使用 base64_encode
,当做字符串输出到json。消费者拿到这个列数据后,不能直接拼装,需要 base64_decode
。
表结构不同步
如果是拿比较老的binlog,放到新的mysql server上去用maxwell拉去,有可能表结构已经发生了变化,比如binlog里面字段比 schema_host
里面的字段多一个。目前这种情况没有发现异常,比如阿里RDS默认会为 无主键无唯一索引的表,增加一个 __##alibaba_rds_rowid##__
,在 show create table
和 schema
里面都看不到这个隐藏主键,但binlog里面会有,同步到从库。
另外我们有通过git去管理结构版本,如果真有这种场景,也可以应对。
启动 Maxwell
命令行启动
刚刚步骤中解压安装了 Maxwell 之后进入 Maxwell 文件夹中使用命令行进行启动,我使用的是以下的命令行
使用以下命令启动 Maxwell:
1 | /home/ubuntu/maxwell-1.24.0/bin/maxwell --user="maxwell" --password="xxxxx" --host="xxxxx" --replica_server_id=12345 --client_id=12345 --producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=maxwell --filter 'exclude: *.*, include:test1.stocks' |
注意:这个命令是配合我的代码例子的
简单说明一下,这个producer是指定你的输出目标为什么,我这里使用的是kafka,你在调试时可以先使用 stdout,直接输出数据在终端。 –kafka.bootstrap.servers=localhost:9092 这个就指定我的 Kafka broker 的地址当然我用的本机的。–kafka_topic=maxwell 这个就是输出数据目标 topic。 –filter 后面是一个简单的过滤规则,我这个filter的意思是除了* 。* 外的数据都不要而且只要包含test1数据库stocks表对应的 binlog 数据
重要的注意事项!!!
你可能注意到了
–replica_server_id=12345
–client_id=12345
这两个配置项,在上面的 mysql 配置选项说明汇总有说明,这两个参数是标识从服务的唯一身份的,在maxwell的配置文件中如果没有配置,就会使用默认值。正式由于我没有配置,导致我后面碰到了 Slave Server_id 冲突的坑。
事情是这样的我在生产部署了一套binlog监听,在测试也部署了一套,因为我们的源数据库是同步别人的,只有一个。。。 所以我生产和测试订阅的是同一个库,这样问题就来了,我在配好生产环境后就去配测试环境,在我跑起测试环境的一刹那,生产环境就凉了。一直报从服务ID冲突:
1 | ERROR TaskManager - cause: |
我当时因为不懂原理,很懵,重新生成一个 mysql 用户,依然不行,后面去了解了整个原理才发现,原来这个货时伪装成了一个mysql副本,然后mysql以为这是自己的一个从库,就不断的同步binlog信息过来。
所以当我跑起来两个但没有设置replica_server_id,client_id这个时候必然出现Slave Server_id 冲突。希望大家不要碰壁了。
配置文件启动
可以把Maxwell的启动参数写到一个配置文件 config.properties
中,然后通过 config 选项指定, bin/maxwell --config config.properties
我的配置文件:
1 | # 日志级别 |
MySQL 数据变更监听
当 Maxwell 成功启动后,你可以在 MySQL 中进行数据的增删改查操作,Maxwell 会实时将数据变更事件推送至 Kafka。
Kafka 消费端监听
在 Kafka 的消费端,你可以监听 Maxwell 发布的数据变更事件,然后根据业务需求进行相应的处理。
我的代码中/LittleBeeMark/CoolGoPkg/apply_kfk/instance/maxwell_test.go 下有完整的示例可以直接跑:
1 | func TestMaxWell_Start(t *testing.T) { |
这个示例主要使用的函数mw.Start() 在同目录下的maxwell.go中,我简单说下:
1 | type MaxWell struct { |
就是启动一个监听叫做 maxwell 的kafka topic 的消费者,然后不断的每隔半秒插入一个股票数据,查看监听消费者的数据解析打印情况,跑出来的结果如下:
1 | Try to connect to MYSQL host: 127.0.0.1 , port: 3306 root:xxxxx@tcp(127.0.0.1:3306)/test1?charset=utf8&parseTime=True&allowNativePasswords=true |
其中 BinlogHandler HandleMessage obj:handler.BinLogItem{Database:”test1”, Table:”stocks”, Type:”insert”, Ts:1680433714, Xid:82272, Xoffset:0, Commit:true, Data:map[string]interface {}{“id”:26, “last”:3000, “prod_code”:”000001.SS”, “prod_name”:”上证指数”, “trade_status”:”Trade”}, Old:interface {}(nil)} 是我的消费者已经解析出的 binlog 数据可以看到数据库表操作等信息都有。
总结
Maxwell 是一个功能强大、实时性高的 MySQL binlog 数据同步工具,它可以将数据变更事件推送至 Kafka,帮助我们实现数据的实时同步、数据事件驱动等多种应用场景。通过使用 Maxwell,我们可以提高系统的可扩展性、可维护性和实时性,为业务发展提供强有力的支持。