Canal学习笔记
2022/1/13 6:07:09
本文主要是介绍Canal学习笔记,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
Cancle
1.目的
阿里解决异地机房数据同步(基于cannal的Otter)问题。直接读取数据库进行同步会造成数据库服务器压力过大,所以通过读取binlog增量日志(增删改)来进行增量数据的获取,由此衍生了数据订阅和消费的业务。
2.原理及作用
基于数据库日志解析完成数据库增量数据同步。
3.MySql的binglog
3.1作用
以事件的形式记录了DDL和DML操作,且是事务安全型的。开启binglog会有大概1%的性能损耗。
3.2应用
数据实时收仓,数据恢复,缓存刷新,es同步,Otter组件(原本的目的收到数据后otter同步到不同机房数据库)
3.3binlog的分类
a statement 语句级别
节省空间但是可能造成数据不一致 如now()函数
b row 行级别
保持数据一致性,只记录sql执行后的结果,占用空间。数据分析一般用这个。
c mixed
二者的结合,不常用
4.工作原理
把自己伪装成一个slave,假装从Master中复制数据。
5.环境准备
a Mysql 8
(Mysql8 依赖)https://www.microsoft.com/zh-cn/download/details.aspx?id=42642
b 不能远程连接问题
use mysql; update user set host = '%' where user ='root';
GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' WITH GRANT OPTION; //赋予任何主机访问数据的权限
FLUSH PRIVILEGES;
b 创建数据库和表
c 修改配置文件开启Binlog
win需要打开可查看隐藏文件,C:\ProgramData\MySQL\MySQL Server 8.0可以查看到my.ini
server-id=服务id
log-bin=二进制日志文件的名称
binlog_format=日志类型
binlog-do-db=数据库
INSERT INTO fruit VALUES (1,'香蕉',20.22),(2,'苹果',40.22); INSERT INTO fruit VALUES (3,'菠萝',20.22),(4,'榴莲',40.22); INSERT INTO fruit VALUES (4,'西瓜',20.22),(5,'葡萄',40.22);
binlog日志变大
重启服务会新产生一个binlog文件
6.下载和安装
a 下载链接 https://github.com/alibaba/canal/releases
b 目录结构
c 修改 canal.properties
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml # canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL; #这后面不需要分号!!! canal.instance.tsdb.url = jdbc:mysql://192.168.67.222:13306/goods_mark canal.instance.tsdb.dbUsername = root canal.instance.tsdb.dbPassword = root
7启动canal
POM
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.6.2</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>cannalDemo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>cannalDemo</name> <description>Demo project for Spring Boot</description> <properties> <java.version>11</java.version> </properties> <dependencies> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.2</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
测试代码
package com.example.cannaldemo.config; import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; import java.net.InetSocketAddress; import java.util.List; public class CanalClient { public static void main(String[] args) throws InvalidProtocolBufferException { //1.获取 canal 连接对象 user pwd 为cannal的密码可以不写 CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111), "example", "", ""); //2.获取连接 只需要获取一次即可 canalConnector.connect(); while (true) { //3.指定要监控的数据库 * 表示数据库下面所有表 canalConnector.subscribe("goods_mark.*"); //4.获取 Message Message message = canalConnector.get(100); List<CanalEntry.Entry> entries = message.getEntries(); if (entries.size() <= 0) { System.out.println("没有数据,休息一会"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } else { for (CanalEntry.Entry entry : entries) { //TODO 获取表名 String tableName = entry.getHeader().getTableName(); // TODO Entry 类型 CanalEntry.EntryType entryType = entry.getEntryType(); // TODO 判断 entryType 是否为 ROWDATA if (CanalEntry.EntryType.ROWDATA.equals(entryType)) { // TODO 序列化数据 ByteString storeValue = entry.getStoreValue(); // TODO 反序列化 CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue); //TODO 获取事件类型 CanalEntry.EventType eventType = rowChange.getEventType(); //TODO 获取具体的数据 List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList(); //TODO 遍历并打印数据 for (CanalEntry.RowData rowData : rowDatasList) { List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); JSONObject beforeData = new JSONObject(); for (CanalEntry.Column column : beforeColumnsList) { beforeData.put(column.getName(), column.getValue()); } JSONObject afterData = new JSONObject(); List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); for (CanalEntry.Column column : afterColumnsList) { afterData.put(column.getName(), column.getValue()); } System.out.println("TableName:" + tableName + ",EventType:" + eventType + ",After:" + beforeData + ",After:" + afterData); } } } } } } }
这篇关于Canal学习笔记的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-20获取apk的md5值有哪些方法?-icode9专业技术文章分享
- 2024-11-20xml报文没有传 IdentCode ,为什么正常解析没报错呢?-icode9专业技术文章分享
- 2024-11-20如何知道代码有没有进行 Schema 验证?-icode9专业技术文章分享
- 2024-11-20Mycat教程:新手快速入门指南
- 2024-11-20WebSocket入门:轻松掌握WebSocket基础
- 2024-11-19WebSocket入门指南:轻松搭建实时通信应用
- 2024-11-19Nacos安装资料详解:新手入门教程
- 2024-11-19Nacos安装资料:新手入门教程
- 2024-11-19升级 Gerrit 时有哪些注意事项?-icode9专业技术文章分享
- 2024-11-19pnpm是什么?-icode9专业技术文章分享