canal同步mysql数据到es中

2022/4/14 19:13:22

本文主要是介绍canal同步mysql数据到es中,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

cannl同步mysql数据到es中

canal组件介绍

canal-admin
(非必须但推荐使用):为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作。

canal-server:
服务端,从mysql读取binlog日志获取增量日志,可以通过tcp、kafka、RocketMQ等方式与客户端通信;通过zookeeper搭建集群。

canal-adapter
客户端,根据canal-server获取的增量日志执行适配到其他诸如elasticsearch、redis、mysql等端,实现数据同步。


本次实验环境

首先我们需要下载canal的各个组件canal-servercanal-adaptercanal-admin,下载地址:https://github.com/alibaba/canal/releases

es及相关工具的部署可参考Elasticsearch详解及部署

1、开启mysql的binlog

使用canal-server需要先准备mysql,对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式

vi /etc/my.cnf

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

配置完成后重启mysql,并查询是否配置生效:ON就是开启

systemctl restart mysqld

可以进入数据库再次查看下

mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
1 row in set (0.01 sec)

mysql> show variables like 'binlog_format%';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
1 row in set (0.00 sec)

2、mysql用户数据准备

 创建一个canal用户并对其进行授权

CREATE USER canal IDENTIFIED BY 'Cjz123456.';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

创建一个测试用户数据库及表

create database canal;
USE canal;
CREATE TABLE product  (
  id bigint(20) NOT NULL AUTO_INCREMENT,
  title varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  sub_title varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  price decimal(10, 2) NULL DEFAULT NULL,
  pic varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  PRIMARY KEY (id) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

3、创建存放canal的目录  

mkdir /usr/local/canal-server

mkdir /usr/local/canal-adpter  

4、部署canal-server

tar -zxvf canal.deployer-1.1.6-SNAPSHOT.tar.gz -C /usr/local/canal-server

vim /usr/local/canal-server/conf/example/instance.properties

# 需要同步数据的MySQL地址
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# 用于同步数据的数据库账号
canal.instance.dbUsername=canal
# 用于同步数据的数据库密码
canal.instance.dbPassword=Cjz123456.
# 数据库连接编码
canal.instance.connectionCharset = UTF-8
# 需要订阅binlog的表过滤正则表达式
canal.instance.filter.regex=.*\\..*

cd /usr/local/canal-server/bin

./startup.sh

查看日志查看是否正常启动

cd /usr/local/canal-server/logs/

tail -f example/example.log

tail -f canal/canal.log

5、部署canal-adapter

tar -zxvf canal.adapter-1.1.6-SNAPSHOT.tar.gz -C /usr/local/canal-adpter/

cd /usr/local/canal-adpter/conf

vim application.yml

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp # 客户端的模式,可选tcp kafka rocketMQ
  flatMessage: true # 扁平message开关, 是否以json字符串形式投递数据, 仅在kafka/rocketMQ模式下有效
  zookeeperHosts:    # 对应集群模式下的zk地址
  syncBatchSize: 1000 # 每次同步的批数量
  retries: 0 # 重试次数, -1为无限重试
  timeout: # 同步超时时间, 单位毫秒
  accessKey:
  secretKey:
  consumerProperties:
    canal.tcp.server.host: 127.0.0.1:11111 #设置canal-server的地址
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://192.168.111.129:3306/canal?useUnicode=true&charaterEncoding=utf-8&useSSL=false
      username: canal
      password: Cjz123456.
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es7
        hosts: http://192.168.111.129:9200 # 127.0.0.1:9200 for rest mode
        properties:
          mode: rest # or rest
          security.auth: elastic:elastic #  only used for rest mode
          cluster.name: my-es

修改 canal-adapter/conf/es7/mytest_user.yml 文件,用于配置MySQL中的表与Elasticsearch中索引的映射关系

vim es7/mytest_user.yml

dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: example  # canal的instance或者MQ的topic
groupId: g1 # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
  _index: canal_product # es 的索引名称
  _id: _id  # 将Mysql表里的id对应上es上的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
  sql: "SELECT
         p.id AS _id,
         p.title,
         p.sub_title,
         p.price,
         p.pic
        FROM
         product p"        # sql映射
  etlCondition: "where p.id>={}"   #etl的条件参数
  commitBatch: 3000   # 提交批大小

cd /usr/local/canal-adpter/bin/

./startup.sh

查看日志看是否正常启动 

cd /usr/local/canal-adpter/logs/adapter/

tail -f adapter.log

6、创建ES索引

可以直接通过Kibana页面进行创建,不难看出,索引的属性对应着就是我们表的属性(Mysql表id与es的_id做了对应,所以这里不需要设置)

PUT canal_product
{
  "mappings": {
    "properties": {
      "title": {
        "type": "text"
      },
      "sub_title": {
        "type": "text"
      },
      "pic": {
        "type": "text"
      },
      "price": {
        "type": "double"
      }
    }
  }
}

7、验证

canal初始化是不会全量同步数据的,所以我们需要登录mysql,插入一条数据

INSERT INTO product ( id, title, sub_title, price, pic ) VALUES ( 15, '小米8', ' 全面屏游戏智能手机 6GB+64GB', 1999.00, NULL );

创建完成后在Kibana上查看即可发现数据已经同步

GET canal_product/_search

在elasticsearch-head上查看也是,可以看出,我们的表id变成了es的_id

在Mysql上修改数据看看

UPDATE product SET title='小米10' WHERE id=15;

 可以看到我们修改的数据也进行了同步

 


该文章参考于:https://blog.csdn.net/zh1998wx/article/details/123101442

 



这篇关于canal同步mysql数据到es中的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程