首页 教程 服务器/数据库 FlinkCDC 实现 MySQL 数据变更实时同步

FlinkCDC 实现 MySQL 数据变更实时同步

文章目录

  • 1、基本介绍
  • 2、代码实战
    • 2.1、数据源准备
    • 2.2、代码实战
    • 2.3、数据格式

1、基本介绍

Flink CDC 是 Apache Flink 提供的一个功能强大的组件,用于实时捕获和处理数据库中的数据变更。可以实时地从各种数据库(如MySQL、PostgreSQL、Oracle、MongoDB等)中捕获数据变更并将其转换为流式数据,FlinkCDC 同步数据有两种方式:

  1. FlinkSQL
  2. Flink DataStream 和 Table API(本文使用该方式)
    FlinkCDC 实现 MySQL 数据变更实时同步
    对比其他的CDC开源方案,发现FlinkCDC是绝大多数场景最好的选择方式,别在傻傻的只关注Canal了,如下图所示:
    FlinkCDC 实现 MySQL 数据变更实时同步

2、代码实战

2.1、数据源准备

本次我是用MySQL 8.0版本,并且创建好数据库(库名为quick_chat),本次演示表结构如下:

CREATETABLE`quick_chat_msg`(`id`bigintNOTNULLCOMMENT'主键id',`from_id`varchar(20)CHARACTERSET utf8 COLLATE utf8_unicode_ci DEFAULTNULLCOMMENT'账户id(发送人)',`to_id`varchar(20)CHARACTERSET utf8 COLLATE utf8_unicode_ci DEFAULTNULLCOMMENT'账户id(接收人)',`relation_id`varchar(50)CHARACTERSET utf8 COLLATE utf8_unicode_ci DEFAULTNULLCOMMENT'发送关联',`content`varchar(500)DEFAULTNULLCOMMENT'消息内容',`msg_type`tinyint(1)DEFAULTNULLCOMMENT'消息类型(1:文字,2:语音,3:表情包,4:文件,5:语音通话,6:视频通话)',`extra_info`varchar(500)DEFAULTNULLCOMMENT'额外信息',`create_time`datetimeDEFAULTNULLCOMMENT'创建时间',`deleted`tinyint(1)DEFAULTNULLCOMMENT'删除标识',PRIMARYKEY(`id`)USINGBTREE)ENGINE=InnoDBDEFAULTCHARSET=utf8mb3;

需要保证MySQL的Binlog格式是ROW,不过MySQL 8.0版本格式默认就是ROW:
FlinkCDC 实现 MySQL 数据变更实时同步
最后,要把数据库时区配置好,否则会出现问题,命令如下:

SET persist time_zone ='+8:00';SET time_zone ='+8:00';SHOW VARIABLES LIKE'%time_zone%';

FlinkCDC 实现 MySQL 数据变更实时同步

2.2、代码实战

首先,引入Flink CDC相关依赖,内容如下:

<dependencies><!-- Flink connector连接器基础包 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>1.14.0</version></dependency><!-- Flink CDC MySQL源 --><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>2.3.0</version></dependency><!-- Flink DataStream数据流API --><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.2.0</version><scope>provided</scope></dependency><!-- Flink客户端--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.14.0</version></dependency><!--Flink WebUI,端口8081(默认没有开启)--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.12</artifactId><version>1.14.0</version></dependency><!--Flink Table API&SQL程序可以连接到其他外部系统,用于读写批处理表和流式表。--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime_2.12</artifactId><version>1.14.0</version></dependency></dependencies>

第二步,开发 Sink 监听类,用于监听 MySQL 数据变化:

importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;publicclassMySinkHandlerextendsRichSinkFunction<String>{@Overridepublicvoidinvoke(String value,Context context)throwsException{System.out.println(value);}@Overridepublicvoidopen(Configuration parameters)throwsException{}@Overridepublicvoidclose()throwsException{}}

最后,配置好 Flink CDC 监听进程,随着项目启动运行:

importcom.ververica.cdc.connectors.mysql.source.MySqlSource;importcom.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.configuration.RestOptions;importorg.apache.flink.streaming.api.datastream.DataStreamSink;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.springframework.stereotype.Component;importjavax.annotation.PostConstruct;@ComponentpublicclassMySqlSourceExample{@PostConstructpublicvoidinit()throwsException{// 配置监听数据源MySqlSource<String> source =MySqlSource.<String>builder().hostname("8.141.28.132").port(3306)// 数据库集合,可以配置多个.databaseList("quick_chat")// 表集合,可以配置多个.tableList("quick_chat.quick_chat_msg").username("root").password("root").deserializer(newJsonDebeziumDeserializationSchema()).includeSchemaChanges(true).build();// 配置 Flink WebUIConfiguration configuration =newConfiguration(); configuration.setInteger(RestOptions.PORT,8081);StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(configuration);// 检查点间隔时间// checkpoint的侧重点是“容错”,即Flink作业意外失败并重启之后,能够直接从早先打下的checkpoint恢复运行,且不影响作业逻辑的准确性。 env.enableCheckpointing(5000);DataStreamSink<String> sink = env.fromSource(source,WatermarkStrategy.noWatermarks(),"MySQL Source").addSink(newMySinkHandler()); env.execute();}}

项目启动完毕后,可以通过8081端口访问Flink UI页面:
FlinkCDC 实现 MySQL 数据变更实时同步

2.3、数据格式

上述操作完毕后,我对表数据进行了新增、修改、删除操作,控制台可以看到MySQL变更监听日志输出信息:

# 新增{"before": null, "after":{"id":3, "from_id":"dog", "to_id":"cat", "relation_id":"dog:cat", "content":"你好啊", "msg_type":1, "extra_info": null, "create_time":1729164075000, "deleted":0}, "source":{"version":"1.6.4.Final", "connector":"mysql", "name":"mysql_binlog_source", "ts_ms":1729135279000, "snapshot":"false", "db":"quick_chat", "sequence": null, "table":"quick_chat_msg", "server_id":1, "gtid": null, "file":"binlog.000002", "pos":2452, "row":0, "thread": null, "query": null }, "op":"c", "ts_ms":1729135278633, "transaction": null }

# 修改{"before":{"id":3, "from_id":"dog", "to_id":"cat", "relation_id":"dog:cat", "content":"你好啊", "msg_type":1, "extra_info": null, "create_time":1729164075000, "deleted":0}, "after":{"id":3, "from_id":"dog", "to_id":"cat", "relation_id":"dog:cat", "content":"你好啊,小猫咪", "msg_type":1, "extra_info": null, "create_time":1729164075000, "deleted":0}, "source":{"version":"1.6.4.Final", "connector":"mysql", "name":"mysql_binlog_source", "ts_ms":1729135289000, "snapshot":"false", "db":"quick_chat", "sequence": null, "table":"quick_chat_msg", "server_id":1, "gtid": null, "file":"binlog.000002", "pos":2825, "row":0, "thread": null, "query": null }, "op":"u", "ts_ms":1729135288473, "transaction": null }

# 删除{"before":{"id":3, "from_id":"dog", "to_id":"cat", "relation_id":"dog:cat", "content":"你好啊,小猫咪", "msg_type":1, "extra_info": null, "create_time":1729164075000, "deleted":0}, "after": null, "source":{"version":"1.6.4.Final", "connector":"mysql", "name":"mysql_binlog_source", "ts_ms":1729135301000, "snapshot":"false", "db":"quick_chat", "sequence": null, "table":"quick_chat_msg", "server_id":1, "gtid": null, "file":"binlog.000002", "pos":3247, "row":0, "thread": null, "query": null }, "op":"d", "ts_ms":1729135300692, "transaction": null }

评论(0)条

提示:请勿发布广告垃圾评论,否则封号处理!!

    猜你喜欢
    【MySQL】用户管理

    【MySQL】用户管理

     服务器/数据库  2个月前  2.18k

    我们推荐使用普通用户对数据的访问。而root作为管理员可以对普通用户对应的权限进行设置和管理。如给张三和李四这样的普通用户权限设定后。就只能操作给你权限的库了。

    Cursor Rules 让开发效率变成10倍速

    Cursor Rules 让开发效率变成10倍速

     服务器/数据库  2个月前  1.24k

    在AI与编程的交汇点上,awesome-cursorrules项目犹如一座灯塔,指引着开发者们驶向更高效、更智能的编程未来。无论你是经验丰富的老手,还是刚入行的新人,这个项目都能为你的编程之旅增添一抹亮色。这些规则文件就像是你私人定制的AI助手,能够根据你的项目需求和个人偏好,精确地调教AI的行为。突然间,你会发现AI不仅能理解Next.js的最佳实践,还能自动应用TypeScript的类型检查,甚至主动提供Tailwind CSS的类名建议。探索新的应用场景,推动AI辅助编程的边界。

    探索Django 5: 从零开始,打造你的第一个Web应用

    探索Django 5: 从零开始,打造你的第一个Web应用

     服务器/数据库  2个月前  1.16k

    Django 是一个开放源代码的 Web 应用程序框架,由 Python 写成。它遵循 MVT(Model-View-Template)的设计模式,旨在帮助开发者高效地构建复杂且功能丰富的 Web 应用程序。随着每个版本的升级,Django 不断演变,提供更多功能和改进,让开发变得更加便捷。《Django 5 Web应用开发实战》集Django架站基础、项目实践、开发经验于一体,是一本从零基础到精通Django Web企业级开发技术的实战指南《Django 5 Web应用开发实战》内容以。

    MySQL 的mysql_secure_installation安全脚本执行过程介绍

    MySQL 的mysql_secure_installation安全脚本执行过程介绍

     服务器/数据库  2个月前  1.09k

    mysql_secure_installation 是 MySQL 提供的一个安全脚本,用于提高数据库服务器的安全性

    【MySQL基础篇】概述及SQL指令:DDL及DML

    【MySQL基础篇】概述及SQL指令:DDL及DML

     服务器/数据库  2个月前  491

    数据库是长期存储在计算机内的、有组织的、可共享的、统一管理的大量数据的集合。数据库不仅仅是数据的简单堆积,而是遵循一定的规则和模式进行组织和管理的。数据库中的数据可以包括文本、数字、图像、音频等各种类型的信息。

    Redis中的哨兵(Sentinel)

    Redis中的哨兵(Sentinel)

     服务器/数据库  2个月前  316

    ​ 上篇文章我们讲述了Redis中的主从复制(Redis分布式系统中的主从复制-CSDN博客),本篇文章针对主从复制中的问题引出Redis中的哨兵,希望本篇文章会对你有所帮助。