使用ogg实现oracle到kafka的增量数据实时同步
2022/4/12 2:12:31
本文主要是介绍使用ogg实现oracle到kafka的增量数据实时同步,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
使用ogg实现oracle到kafka的增量数据实时同步
彬彬
2022.04.07
一、OGG概述
OGG全称为Oracle GoldenGate,是由Oracle官方提供的用于解决异构数据环境中数据复制的一个商业工具。相比于其它迁移工具OGG的优势在于可以直接解析源端Oracle的redolog,因此能够实现在不需要对原表结构做太多调整的前提下完成数据增量部分的同步。基于Oracle OGG,介绍一种将Oracle数据库的数据实时同步到Kafka消息队列的方法。
1、OGG逻辑架构
2、 OGG概念
Manager进程:需要源端跟目标端同时运行,主要作用是监控管理其它进程,报告错误,分配及清理数据存储空间,发布阈值报告等。 Extract进程:运行在数据库源端,主要用于捕获数据的变化,负责全量、增量数据的抽取。 Data Pump进程:运行在数据库源端,属于Extract进程的一个辅助进程,,从本地Trail文件中读取数据,并通过网络将数据发送到目标端OGG。 Collector进程:数据接收程序运行在目标端机器,用于接收Data Pump发送过来的Trail日志,并将数据写入到本地Trail文件。 Replicat进程:数据复制(Replicat):数据复制运行在目标端机器,从Trail文件读取数据变更,并将变更数据应用到目标端数据存储系统。本案例中,数据复制将数据推送到kafka消息队列。*** Trails文件:临时存放在磁盘上的数据文件。
3、OGG检查点
作为一个复制软件,首要是考察是它的可靠性,确保事务的完整性,在复制的过程中,源端和目标端的一致性。在日常运维可能会发生各种故障:进程故障、trail文件故障、网络故障、服务器故障等等。然后OGG各种故障的解决办法:一是靠进程的自动重启机制,二是靠checkpoint机制,保证在各种故障情况下不丢数据。
OGG检查点:记录进程的读、写的位置,在恢复时需要使用,保证事务的完整性。
OGG两种存储方式: 1)存放在dirchk下的文件中 2)存放在指定的checkpoint table
对比: 1)nodbcheckpoint:性能较高 2)checkpointtable:检查点信息存储在数据库表中,和实际事务作为一个事务提交,可以从数据表中找到更多的信息
检查点分为Startup检查点信息、Recovery检查点、Current检查点 1)Startup检查点信息:进程启动时,会创建startup检查点 2)Recovery检查点:进程恢复时,需要从哪个点开始恢复 3)Current检查点:进程当前(最近的)检查点信息 查看指令:info ext1, showch
3.1 检查点-extract进程
1、读检查点:读到哪个日志文件及相对位移值 1)有startup、recovery、current checkpoint 2)一般是修改current checkpoint来调整日志文件读的位置alter extract ext1, [thread n,] extseqno , extrba 0
2、写检查点:正在写到的trail文件编号及相对位移值 1)有current checkpoint 2)修改写检查点:重启进程或者etrollover alter extract ext1, etrollover (执行完成之后注意需要手工设置pmp进程的读检查点位置: info ext1, showch确认新的写检查点的trail文件编号为N,然后 alter pmp1, extseqno N, extrba 0)
3.2 检查点-pump进程
1、读检查点:读到的trail文件的编号和具体字节位置 1)有startup、current checkpoint 2)通过alter pmp1, extseqno N, extrba 0来修改
2、写检查点:正在写的remote trail文件编号及相对位移值 1)有current checkpoint 2)通过alter pmp1, etrollover来修改但是修改后同样需要手工调整replicate进程读的位置:alter rep1, extseqno N, extrba 0 (N为新的pmp1进程写检查点remote trial文件编号)
3.3 检查点-replicat进程
1、读检查点:读到的trail文件的编号和具体字节位置 1)有startup、current checkpoint 2)修改当前读检查点位置 alter rep1, extseqno N, extrba 0
2、写检查点:无
二、OGG配置
1、环境信息
**组件 ** | 版本 | **IP地址 ** | 描述 |
---|---|---|---|
源端Oracle | 11.2.0.4.0 | 192.168.152.101 | 源端Oracle数据库 |
源端OGG | 12.3.2.1.1 | 192.168.152.101 | 源端OGG,用于抽取源端Oracle数据变更,并将变更日志发送到目标端 |
目标端OGG | 12.3.2.1.1 | 192.168.152.101 | 目标端OGG,接受源端发送的Oracle事务变更日志,并将变更推送到kafka消息队列 |
目标端Kafka | 2.11-0.11.0.0 | 192.168.152.101、102、103 | 消息队列,接收目标端OGG推送过来的数据 |
2、源端OGG配置
源端OGG 管理进程(mgr)配置: PORT 7809 DYNAMICPORTLIST 7810-7909 AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3 说明 PORT 即mgr的默认监听端口; DYNAMICPORTLIST 动态端口列表,当指定的mgr端口不可用时,会在这个端口列表中选择一个,最大指定范围为256个; AUTORESTART 重启参数设置表示重启所有EXTRACT进程,最多5次,每次间隔3分钟; PURGEOLDEXTRACTS 即TRAIL文件的定期清理
源端OGG 抽取进程(extract)配置 extract extkafka dynamicresolution SETENV (ORACLE_SID = "orcl") SETENV (NLS_LANG = "american_america.AL32UTF8") userid ogg,password ogg exttrail /opt/ogg/dirdat/to table test_ogg.test_ogg; 说明 第一行指定extract进程名称; dynamicresolution 动态解析; SETENV 设置环境变量,这里分别设置了Oracle数据库以及字符集; userid ogg,password ogg 即OGG连接Oracle数据库的帐号密码 exttrail 定义trail文件的保存位置以及文件名,注意这里文件名只能是2个字母,其余部分OGG会补齐; table 即复制表的表名,支持*通配,必须以;结尾
源端OGG 传输进程(pump)配置 extract pukafka passthru dynamicresolution userid ogg,password ogg rmthost 192.168.152.131 mgrport 7909 rmttrail /home/gg/ogg/dirdat/go table test_ogg.test_ogg; 说明 第一行指定extract进程名称; Passthru 即禁止OGG与Oracle交互,我们这里使用pump逻辑传输,故禁止即可; Dynamicresolution 动态解析; userid ogg,password ogg 即OGG连接Oracle数据库的帐号密码 rmthost和mgrhost 即目标端(kafka)OGG的mgr服务的地址以及监听端口; rmttrail 即目标端trail文件存储位置以及名称。
3、目标端OGG配置
目标端OGG管理进程(mgr)配置: PORT 7909 DYNAMICPORTLIST 7810-7909 AUTORESTART EXTRACT *,RETRIES 5,WAITMINUTES 3 PURGEOLDEXTRACTS ./dirdat/*,usecheckpoints, minkeepdays 3 说明 PORT 即mgr的默认监听端口; DYNAMICPORTLIST 动态端口列表,当指定的mgr端口不可用时,会在这个端口列表中选择一个,最大指定范围为256个; AUTORESTART 重启参数设置表示重启所有EXTRACT进程,最多5次,每次间隔3分钟; PURGEOLDEXTRACTS 即TRAIL文件的定期清理
目标端OGG复制进程(replicat)配置: REPLICAT rekafka sourcedefs /home/gg/ogg/dirdef/test_ogg.test_ogg TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props REPORTCOUNT EVERY 1 MINUTES, RATE GROUPTRANSOPS 10000 MAP test_ogg.test_ogg, TARGET test_ogg.test_ogg; 说明 REPLICATE rekafka 定义rep进程名称; Sourcedefs 即在源服务器上做的表映射文件; TARGETDB LIBFILE 即定义kafka一些适配性的库文件以及配置文件,配置文件位于OGG主目录下的dirprm/kafka.props; REPORTCOUNT 即复制任务的报告生成频率; GROUPTRANSOPS 为以事务传输时,事务合并的单位,减少IO操作; MAP 即源端与目标端的映射关系
三、数据测试
启动所有进程
在源端和目标端的OGG命令行下使用start [进程名]的形式启动所有进程。
启动顺序按照源mgr——目标mgr——源extract——源pump——目标replicate来完成。
全部需要在ogg目录下执行ggsci目录进入ogg命令行。
源端依次是 start mgr start extkafka start pukafka
目标端依次是 start mgr start rekafka
可以通过info all 或者info [进程名] 查看状态,所有的进程都为RUNNING才算成功
如果有不是RUNNING可通过查看日志的方法检查解决问题
ogg命令行,以rekafka进程为例
view report rekafka
源端执行sql语句 insert into test_ogg(id,name) values('1','test'); commit; update test_ogg set name=‘zhangsan ' where id='1'; commit; delete test_ogg where id='1'; commit; 查看源端trail文件状态 ls -l /opt/ogg/dirdat/to* 查看目标端trail文件状态 ls -l /home/gg/ogg/dirdat/go* 通过消费者看是否有同步消息 bin/kafka-console-consumer.sh --bootstrap-server bigdata02:9092 --topic test_ogg
本文由博客一文多发平台 OpenWrite 发布!
这篇关于使用ogg实现oracle到kafka的增量数据实时同步的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23增量更新怎么做?-icode9专业技术文章分享
- 2024-11-23压缩包加密方案有哪些?-icode9专业技术文章分享
- 2024-11-23用shell怎么写一个开机时自动同步远程仓库的代码?-icode9专业技术文章分享
- 2024-11-23webman可以同步自己的仓库吗?-icode9专业技术文章分享
- 2024-11-23在 Webman 中怎么判断是否有某命令进程正在运行?-icode9专业技术文章分享
- 2024-11-23如何重置new Swiper?-icode9专业技术文章分享
- 2024-11-23oss直传有什么好处?-icode9专业技术文章分享
- 2024-11-23如何将oss直传封装成一个组件在其他页面调用时都可以使用?-icode9专业技术文章分享
- 2024-11-23怎么使用laravel 11在代码里获取路由列表?-icode9专业技术文章分享
- 2024-11-22怎么实现ansible playbook 备份代码中命名包含时间戳功能?-icode9专业技术文章分享