OGG 实用案例(一)-oracle 同步kafka

2021/8/4 19:08:29

本文主要是介绍OGG 实用案例(一)-oracle 同步kafka,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

01、源库安装ogg

       见OGG 安装 博客

02、源库配置oracle环境

环境说明:

主机

角色

Ogg版本

192.168.192.26

源库

191004_fbo_ggs_Linux_x64_shiphome.zip

192.168.193.221

目标库中间件Kafka

OGG_BigData_Linux_x64_19.1.0.0.5.zip

     01)、创建ogg同步用户

SQL>create tablespace ogg datafile '/u02/oradata/ogg.dbf' size 5000M;

SQL>create user ogg identified by "ogg" default tablespace ogg temporary tablespace TEMP;

 

     02)、权限赋予

GRANT CONNECT TO ogg;

GRANT ALTER ANY TABLE TO ogg;

GRANT ALTER SESSION TO ogg;

GRANT CREATE SESSION TO ogg;

GRANT FLASHBACK ANY TABLE TO ogg;

GRANT SELECT ANY DICTIONARY TO ogg;

GRANT SELECT ANY TABLE TO ogg;

GRANT RESOURCE TO ogg;

GRANT SELECT ANY TRANSACTION TO ogg;

 

     03)、归档模式开启

       已开启则忽略

SQL>archive log list;

SQL>shutdown immediate;

SQL>startup mount;

SQL>alter database archivelog;

SQL>alter database open;

SQL>alter system switch logfile;

 

     04)、开启oracle最小附加日志

       Oracle 执行查询语句,确保显示结果为yes

Select force_logging,SUPPLEMENTAL_LOG_DATA_MIN,

 SUPPLEMENTAL_LOG_DATA_PK,

SUPPLEMENTAL_LOG_DATA_UI,

 SUPPLEMENTAL_LOG_DATA_FK,

 SUPPLEMENTAL_LOG_DATA_ALL from v$database;

 

       Min列不为YES则执行开启操作:

alter database add supplemental log data ;

alter system switch logfile;

 

       ALL 列如果开启则关闭

alter database drop supplemental log data (ALL) columns;

 

未成功则执行一下语句关闭:

alter database drop supplemental log data (primary key, unique,foreign key) columns;

alter database drop supplemental log data ;

alter system switch logfile;

 

     05)、开启强制日志模式

alter database force logging;

 

     06)、环境变量配置

Vi /home/oracle/.bash_profile

export OGG_HOME=/home/oracle/app/ogg/

export LD_LIBRARY_PATH=$OGG_HOME:$ORACLE_HOME/lib:/usr/bin

 

03、配置源库ogg环境

     01)、初始化ogg

当前ogg 默认需要安装在oracle 用户下操作。

Su – oracle

. ~/.bash_profile

Cd /home/oracle/app/ogg/

[oracle@hso32-db-test ogg]$ . ~/.bash_profile

[oracle@hso32-db-test ogg]$ ./ggsci

Oracle GoldenGate Command Interpreter for Oracle

Version 19.1.0.0.4 OGGCORE_19.1.0.0.0_PLATFORMS_191017.1054_FBO

Linux, x64, 64bit (optimized), Oracle 11g on Oct 17 2019 23:13:12

Operating system character set identified as UTF-8.

 

Copyright (C) 1995, 2019, Oracle and/or its affiliates. All rights reserved.

  

 

GGSCI (hso32-db-test) 1> create subdirs

 

Creating subdirectories under current directory /home/oracle/app/ogg

 

Parameter file                 /home/oracle/app/ogg/dirprm: already exists.

Report file                    /home/oracle/app/ogg/dirrpt: already exists.

Checkpoint file                /home/oracle/app/ogg/dirchk: already exists.

Process status files           /home/oracle/app/ogg/dirpcs: already exists.

SQL script files               /home/oracle/app/ogg/dirsql: already exists.

Database definitions files     /home/oracle/app/ogg/dirdef: already exists.

Extract data files             /home/oracle/app/ogg/dirdat: already exists.

Temporary files                /home/oracle/app/ogg/dirtmp: already exists.

Credential store files         /home/oracle/app/ogg/dircrd: already exists.

Masterkey wallet files         /home/oracle/app/ogg/dirwlt: already exists.

Dump files                     /home/oracle/app/ogg/dirdmp: already exists.

 

 

GGSCI (hso32-db-test) 2> exit

 

当前环境配置成功

04、创建源库测试表,测试数据

     01) 、创建测试用户,表

SQL> create user test_ogg identified by oracle ;   

User created.

SQL> grant dba to test_ogg;

Grant succeeded.

SQL> conn test_ogg

Enter password:

Connected.

SQL> create table test_ogg( id int,name varchar(20),primary key(id));

Table created.

02) 、配置Oracle 数据支持脚本

配置squence支持

在/home/oracle/app/ogg/下执行

Sqlplus / as  sysdba

@seqence.sql

键入schemas :test_ogg

 

配置支持脚本

SQL> @marker_setup

SQL> @ddl_setup

SQL> @role_setup

SQL> grant GGS_GGSUSER_ROLE to goldengate;

SQL> @ddl_enable

SQL> @marker_status.sql

SQL> @?/rdbms/admin/dbmspool.sql

SQL> @ddl_pin.sql ogg

 

05、配置源库ogg 配置文件

     01)、配置mgr进程

登入ogg

Cd /home/oracle/app/ogg/

./ggsci

GGSCI (hso32-db-test) 1> dblogin userid ogg password ogg;

Successfully logged into database.

GGSCI (hso32-db-test as ogg@hso32) 9> edit param mgr

PORT 7809

DYNAMICPORTLIST  7840-7850

AUTORESTART EXTRACT *, RETRIES 5, WAITMINUTES 3

PURGEOLDEXTRACTS ./dirdat/*, USECHECKPOINTS, MINKEEPDAYS 3

 

 

Ps:

  PORT即mgr的默认监听端口;

  DYNAMICPORTLIST动态端口列表,当指定的mgr端口不可用时,会在这个端口列表中选择一个,最大指定范围为256个;

  AUTORESTART重启参数设置表示重启所有EXTRACT进程,最多5次,每次间隔3分钟;PURGEOLDEXTRACTS即TRAIL文件的定期清理

     02)、配置ext抽取数据进程

GGSCI (hso32-db-test as ogg@hso32) 11> exit param extkafka

EXTRACT extkafka

dynamicresolution

SETENV (ORACLE_SID="hso32")

SETENV (NLS_LANG="AMERICAN_AMERICA.AL32UTF8")

userid ogg,password ogg

EXTTRAIL ./dirdat/to

table test_kingle.oggest;

table test_ogg.test_ogg;

 

 

ps:

  第一行指定extract进程名称;

  dynamicresolution动态解析;

  SETENV设置环境变量,这里分别设置了Oracle数据库以及字符集;

  userid ggs,password ggs即OGG连接Oracle数据库的帐号密码,使用上面创建的账号;

  exttrail定义trail文件的保存位置以及文件名,注意这里文件名只能是2个字母,其余部分OGG会补齐;

  table即复制表的表名,支持*通配,必须以;结尾

     03)、配置push 发送进程

GGSCI (hso32-db-test as ogg@hso32) 11> exit param pukafka

EXTRACT pukafka

passthru

dynamicresolution

userid ogg,password ogg

RMTHOST 10.118.193.223, MGRPORT 7809

RMTTRAIL ./dirdat/to

table test_kingle.oggest;

table test_ogg.test_ogg; 

       ps,

第一行指定extract进程名称;

passthru即禁止OGG与Oracle交互,我们这里使用pump逻辑传输,故禁止即可;dynamicresolution动态解析;

userid ogg,password ogg即OGG连接Oracle数据库的帐号密码rmthost和mgrhost即目标端(kafka)OGG的mgr服务的地址以及监听端口;

rmttrail即目标端trail文件存储位置以及名称。

     04)、加入需要监控的表(下面的所有操作都是基于本次操作进行)

下面包含 linux 同步到windwos 都是基于这个完成,添加监控表至关重要。

GGSCI (hso32-db-test as ogg@hso32) 21> add trandata test_ogg.test_ogg

2021-07-29 09:03:01  INFO    OGG-15132  Logging of supplemental redo data enabled for table TEST_OGG.TEST_OGG.

2021-07-29 09:03:01  INFO    OGG-15133  TRANDATA for scheduling columns has been added on table TEST_OGG.TEST_OGG.

2021-07-29 09:03:01  INFO    OGG-15135  TRANDATA for instantiation CSN has been added on table TEST_OGG.TEST_OGG.

GGSCI (hso32-db-test as ogg@hso32) 22> info trandata test_ogg.test_ogg

Logging of supplemental redo log data is enabled for table TEST_OGG.TEST_OGG.

Columns supplementally logged for table TEST_OGG.TEST_OGG: "ID".

Prepared CSN for table TEST_OGG.TEST_OGG: 5982609614988

 

     05)、配置defile 文件

Oracle 到其他数据属于异构操作,需要定义映射关系

GGSCI (hso32-db-test as ogg@hso32) 21>edit param test_ogg

defsfile ./dirdef/ test_ogg.test_ogg

userid ogg,password ogg

table test_ogg. test_ogg

 

生成完成后,选择在ogg主目录下,并且oracle用户下执行

./defgen paramfile dirprm/test_ogg.prm

 

执行成功后没有报错的话,会在./dirdef下面生成一个文件,我们需要把这个文件拷贝到目标库ogg主目录下的diedef目录下即可

06、目标库221配置

     01)、安装 java

       当前环境为绿色版,我直接解压到/usr/local/java 目录下即可,环境根据自己的配置

配置环境变量

Vi ~/.bash_porofile

export ZOO_HOME=/root/ogg/zookeeper/

export KAFKA_HOME=/root/ogg/kafka/

export OGG_HOME=/root/ogg/ogg/

 

export RUN_AS_USER=root

export JAVA_HOME=/usr/local/java/

export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar

export PATH=$PATH:$JAVA_HOME/bin:$ZOO_HOME/bin/:$JAVA_HOME/jre/bin/:$KAFKA_HOME/bin

export LD_LIBRARY_PATH=$OGG_HOME/lib/:/usr/bin:$OGG_HOME/:$JAVA_HOME/jre/lib/amd64:$JAVA_HOME/jre/lib/amd64/server:$JAVA_HOMEjre/lib/amd64/libjsig.so:$JAVA_HOME/jre/lib/amd64/server/libjvm.so:/usr/lib

     02)、安装 配置zookeeper

       Tar xf apache-zookeeper-3.6.3.tar.gz –C /root/ogg/ zookeeper

       Cd /root/ogg/ zookeeper

       编辑配置文件

  

     [root@RHEL44223 zookeeper]# cat conf/zoo.cfg

tickTime=2000

initLimit=10

syncLimit=5

dataDir=/root/ogg/zoo/data

dataLogDir=/root/ogg/zoo/log

clientPort=2181

 

       启动 zoo

Cd /root/ogg/zookeeper/bin

./zkServer.sh start

       启动成功即可

     03)、安装配置kafka

解压kafka

Tar xf kafka_2.12-2.8.0.tgz –c /roo/ogg/kafka

配置kafka配置文件

Cd kafka

Vi config/server.properties

broker.id=0

listeners=PLAINTEXT://10.118.193.223:9092

host.name=10.118.193.223

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

log.dirs=/root/ogg/zoo/kafka-logs

num.partitions=1

num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=1

transaction.state.log.replication.factor=1

transaction.state.log.min.isr=1

log.retention.hours=168

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

zookeeper.connect=localhost:2181

zookeeper.connection.timeout.ms=18000

group.initial.rebalance.delay.ms=0

 

启动 kafka

kafka-server-start.sh -daemon ./config/server.properties

     04)、安装ogg(root用户即可)

解压ogg

Tar xf OGG_BigData_Linux_x64_19.1.0.0.5.tar –C /root/ogg/ogg/

初始化ogg

Cd /root/ogg/ogg/

./ggsci

[oracle@RHEL44223 ogg]$ ./ggsci

 

Oracle GoldenGate Command Interpreter for Oracle

Version 19.1.0.0.4 OGGCORE_19.1.0.0.0_PLATFORMS_191017.1054_FBO

Linux, x64, 64bit (optimized), Oracle 11g on Oct 17 2019 23:13:12

Operating system character set identified as UTF-8.

 

Copyright (C) 1995, 2019, Oracle and/or its affiliates. All rights reserved.

 

 

 

GGSCI RHEL44223) 1> create subdirs

 

Creating subdirectories under current directory /home/oracle/app/ogg

 

Parameter file                 /home/oracle/app/ogg/dirprm: already exists.

Report file                    /home/oracle/app/ogg/dirrpt: already exists.

Checkpoint file                /home/oracle/app/ogg/dirchk: already exists.

Process status files           /home/oracle/app/ogg/dirpcs: already exists.

SQL script files               /home/oracle/app/ogg/dirsql: already exists.

Database definitions files     /home/oracle/app/ogg/dirdef: already exists.

Extract data files             /home/oracle/app/ogg/dirdat: already exists.

Temporary files                /home/oracle/app/ogg/dirtmp: already exists.

Credential store files         /home/oracle/app/ogg/dircrd: already exists.

Masterkey wallet files         /home/oracle/app/ogg/dirwlt: already exists.

Dump files                     /home/oracle/app/ogg/dirdmp: already exists.

 

 

GGSCI (RHEL44223) 2> exit

05) 配置ogg 配置文件

<01>、配置mgr

GGSCI (RHEL44223) 4> edit param mgr

PORT 7809

DYNAMICPORTLIST  7840-7850

AUTORESTART EXTRACT *, RETRIES 5, WAITMINUTES 3

PURGEOLDEXTRACTS /root/ogg/ogg/dirdat/*, USECHECKPOINTS, MINKEEPDAYS 3

<02>、配置 应用配置文件

GGSCI (RHEL44223) 5> edit param REKAFKA

REPLICAT rekafka

sourcedefs /root/ogg/ogg/dirdef/test_kingle.oggtest

TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props

REPORTCOUNT EVERY 1 MINUTES,RATE

GROUPTRANSOPS 10000

MAP test_kingle.oggtest,TARGET test_kingle.oggtest;

MAP test_ogg.test_ogg,TARGET test_ogg.test_ogg;

REPLICATE rekafka定义rep进程名称;

sourcedefs即在4.6中在源服务器上做的表映射文件;

TARGETDB LIBFILE即定义kafka一些适配性的库文件以及配置文件,配置文件位于OGG主目录下的dirprm/kafka.props;

REPORTCOUNT即复制任务的报告生成频率;

GROUPTRANSOPS为以事务传输时,事务合并的单位,减少IO操作;MAP即源端与目标端的映射关系

<03>、配置checkpoint

GGSCI (RHEL44223) 5>Edit param ./GLOBALS

checkpointtable test_kingle.checkpoint

<04>、配置kafka 控制文件

Cd /root/ogg/ogg/dirprm

[root@RHEL44223 dirprm]# cat rekafka.prm

REPLICAT rekafka

sourcedefs /root/ogg/ogg/dirdef/test_kingle.oggtest

TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props

REPORTCOUNT EVERY 1 MINUTES,RATE

GROUPTRANSOPS 10000

MAP test_kingle.oggtest,TARGET test_kingle.oggtest;

MAP test_ogg.test_ogg,TARGET test_ogg.test_ogg;

[root@RHEL44223 dirprm]# cat custom_kafka_producer.properties

bootstrap.servers=10.118.193.223:9092

acks=1

compression.type=gzip

reconnect.backoff.ms=1000

value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer

batch.size=102400

linger.ms=10000

[root@RHEL44223 dirprm]#、

06) 、创建kafka主题

查看主题有哪些

kafka-topics.sh --list --zookeeper localhost:2181

创建主题testogg

kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testogg

再次查看是否存在

kafka-topics.sh --list --zookeeper localhost:2181

07、26同步221进程启动配置

     01)、源库26 ogg 操作

       通过上面编辑的配置文件添加进程,配置操作都是是使用的相对路径,注意自己的环境是不是一直。

Cd /home/oracle/app/ogg/

GGSCI (hso32-db-test) 1> dblogin userid ogg password ogg;

Successfully logged into database.

GSCI (hso32-db-test as ogg@hso32) 41> add extract extkafka,tranlog,begin now

EXTRACT added.

GGSCI (hso32-db-test as ogg@hso32) 42> add exttrail ./dirdat/to,extract extkafka

EXTTRAIL added.

GGSCI (hso32-db-test as ogg@hso32) 43> edit param pukafka

GGSCI (hso32-db-test as ogg@hso32) 44> add extract pukafka,exttrailsource ./dirdat/to

EXTRACT added.

GGSCI (hso32-db-test as ogg@hso32) 45> add rmttrail ./dirdat/to,extract pukafka

RMTTRAIL added.

     02)、目标库221 添加进程

       通过刚刚编辑的配置文件配置进程

GGSCI (RHEL44223) 5> add replicat rekafka exttrail ./dirdat/to,checkpointtable test_kingle.checkpoint

       所有配置完成后,进程启动

     03)、启动同步进程

       进程启动注意事项,一定要遵守启动顺序,方式获取不到数据

启动1:源库mgr 启动

 Start mgr

启动2:目标库mgr 启动

Start mgr

启动3:源库ext和pu 进程

Start extkafka

Start pukafka

启动4:目标库

Start REKAFKA
Info all

查看启动进程状态

等待启动成功后查看kafka消息队列

[root@RHEL44223 ogg]# kafka-console-consumer.sh --bootstrap-server 10.118.193.223:9092  --topic testogg --from-beginning

{"table":"TEST_OGG.TEST_OGG","op_type":"I","op_ts":"2021-07-29 09:15:58.595966","current_ts":"2021-07-29T10:18:15.963000","pos":"00000000020000002057","after":{"ID":1,"NAME":"1"}}

{"table":"TEST_OGG.TEST_OGG","op_type":"I","op_ts":"2021-07-29 09:15:58.595966","current_ts":"2021-07-29T10:18:16.295000","pos":"00000000020000002191","after":{"ID":2,"NAME":"1"}}

 

 

源库模拟测试插入的时候看是否会有数据出来,如果没有出现可以查看相应日志文件

Ogg 日志文件 在ogg目录下ggserrot.log

或者命令行查看

View report rekafka

后面跟随为进程名称 可以通过info all 打印出来 group 列就是了。

 



这篇关于OGG 实用案例(一)-oracle 同步kafka的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程