各类数据库 CDC 配置指南
本页目录
各类数据库 CDC 配置指南#
MySQL#
MySQL CDC 连接器允许从 MySQL 数据库读取快照数据和增量数据。本文描述了如何准备好 MySQL 侧相关功能。
支持的数据库#
Connector |
Database |
Driver |
---|---|---|
MySQL: 5.6, 5.7, 8.0.x |
JDBC Driver: 8.0.27 |
创建用户#
CDC connector需要使用 MySQL 用户连接抽取数据。该用户需要拥有所有需要进行 CDC 抽取的数据库的权限。
创建 MySQL 用户:
mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
用户赋权:
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
提交权限
mysql> FLUSH PRIVILEGES;
权限说明:
权限关键字 |
说明 |
---|---|
|
Enables the connector to select rows from tables in databases. This is used only when performing a snapshot. |
|
Enables the connector the use of the |
|
Enables the connector to see database names by issuing the |
|
Enables the connector to connect to and read the MySQL server binlog. |
|
Enables the connector the use of the following statements: |
|
Identifies the database to which the permissions apply. |
|
Specifies the user to grant the permissions to. |
|
Specifies the user’s MySQL password. |
打开 binlog#
检查
log-bin
是否已经打开mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM information_schema.global_variables WHERE variable_name='log_bin';
如果是是
OFF
, 修改 MySQL 配置文件如下配置项:server-id = 223344 log_bin = mysql-bin binlog_format = ROW binlog_row_image = FULL expire_logs_days = 10
再次检查 binlog 是否打开
mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM information_schema.global_variables WHERE variable_name='log_bin';
MySQL 配置项说明
配置项 |
说明 |
---|---|
|
The value for the |
|
The value of |
|
The |
|
The |
|
This is the number of days for automatic binlog file removal. The default is |
Oracle#
Oracle CDC 连接器允许从 Oracle 数据库读取快照数据和增量数据。本文描述了如何准备好 Oracle 侧相关功能。
支持的数据库#
Connector |
Database |
Driver |
---|---|---|
Oracle: 11, 12, 19 |
Oracle Driver: 19.3.0.0 |
Oracle 准备#
首先需要打开 Oracle 数据库的 log archiving 功能,并且准备一个 Oracle 用户,其需要具有对于所有需要进行 CDC 抽取的数据库合适的权限。
打开 log archiving
(1.1). 以DBA身份连接OracleORACLE_SID=SID export ORACLE_SID sqlplus /nolog CONNECT sys/password AS SYSDBA
(1.2). 打开 log archiving
alter system set db_recovery_file_dest_size = 10G; alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile; shutdown immediate; startup mount; alter database archivelog; alter database open;
注意:
打开 log archiving 需要重启 Oracle 数据库
archived logs 可能会占用大量磁盘空间,建议定期清理过期数据
(1.3). 检查 log archiving 状态
-- Should now "Database log mode: Archive Mode" archive log list;
注意:
需要在待抽取的表或数据库上开启 supplemental logging,这是为了能抓取 CDC 数据每行的before state 。如下说明了如何配置打开该功能。
-- Enable supplemental logging for a specific table: ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
-- Enable supplemental logging for database ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
创建有权限的 Oracle 用户
(2.1). 创建 Tablespace
sqlplus sys/password@host:port/SID AS SYSDBA; CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED; exit;
(2.2). 创建用户并赋权
sqlplus sys/password@host:port/SID AS SYSDBA; CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS; GRANT CREATE SESSION TO flinkuser; GRANT SET CONTAINER TO flinkuser; GRANT SELECT ON V_$DATABASE to flinkuser; GRANT FLASHBACK ANY TABLE TO flinkuser; GRANT SELECT ANY TABLE TO flinkuser; GRANT SELECT_CATALOG_ROLE TO flinkuser; GRANT EXECUTE_CATALOG_ROLE TO flinkuser; GRANT SELECT ANY TRANSACTION TO flinkuser; GRANT LOGMINING TO flinkuser; GRANT CREATE TABLE TO flinkuser; GRANT LOCK ANY TABLE TO flinkuser; GRANT ALTER ANY TABLE TO flinkuser; GRANT CREATE SEQUENCE TO flinkuser; GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser; GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser; GRANT SELECT ON V_$LOG TO flinkuser; GRANT SELECT ON V_$LOG_HISTORY TO flinkuser; GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser; GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser; GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser; GRANT SELECT ON V_$LOGFILE TO flinkuser; GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser; GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser; exit;
对于 CDB database#
整体来说和以上非 CDB 数据库操作相似,但是具体命令不尽相同。
打开 log archiving
ORACLE_SID=ORCLCDB export ORACLE_SID sqlplus /nolog CONNECT sys/password AS SYSDBA alter system set db_recovery_file_dest_size = 10G; -- should exist alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile; shutdown immediate startup mount alter database archivelog; alter database open; -- Should show "Database log mode: Archive Mode" archive log list exit;
Note: 这里也需要打开 supplemental logging:
-- Enable supplemental logging for a specific table: ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; -- Enable supplemental logging for database ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
创建有权限的 Oracle 用户
sqlplus sys/password@//localhost:1521/ORCLCDB as sysdba CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED; exit
sqlplus sys/password@//localhost:1521/ORCLPDB1 as sysdba CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED; exit
sqlplus sys/password@//localhost:1521/ORCLCDB as sysdba CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL; GRANT CREATE SESSION TO flinkuser CONTAINER=ALL; GRANT SET CONTAINER TO flinkuser CONTAINER=ALL; GRANT SELECT ON V_$DATABASE to flinkuser CONTAINER=ALL; GRANT FLASHBACK ANY TABLE TO flinkuser CONTAINER=ALL; GRANT SELECT ANY TABLE TO flinkuser CONTAINER=ALL; GRANT SELECT_CATALOG_ROLE TO flinkuser CONTAINER=ALL; GRANT EXECUTE_CATALOG_ROLE TO flinkuser CONTAINER=ALL; GRANT SELECT ANY TRANSACTION TO flinkuser CONTAINER=ALL; GRANT LOGMINING TO flinkuser CONTAINER=ALL; GRANT CREATE TABLE TO flinkuser CONTAINER=ALL; -- need not to execute if set scan.incremental.snapshot.enabled=true(default) GRANT LOCK ANY TABLE TO flinkuser CONTAINER=ALL; GRANT CREATE SEQUENCE TO flinkuser CONTAINER=ALL; GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser CONTAINER=ALL; GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser CONTAINER=ALL; GRANT SELECT ON V_$LOG TO flinkuser CONTAINER=ALL; GRANT SELECT ON V_$LOG_HISTORY TO flinkuser CONTAINER=ALL; GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser CONTAINER=ALL; GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser CONTAINER=ALL; GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser CONTAINER=ALL; GRANT SELECT ON V_$LOGFILE TO flinkuser CONTAINER=ALL; GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser CONTAINER=ALL; GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser CONTAINER=ALL; exit
更多信息可以参考: Setting up Oracle
DB2#
DB2 CDC 连接器允许从 DB2 数据库读取快照数据和增量数据。本文描述了如何准备好 DB2 侧相关功能。
支持的数据库#
Connector |
Database |
Driver |
---|---|---|
Db2: 11.5 |
Db2 Driver: 11.5.0.0 |
DB2 准备#
添加 debezium management udf 到 DB2, 设置 asn agent,添加所需的表
登录 DB2 服务器,以要链接的 DB 用户,将 debezium 提供的 mangement udf包(asncdctools)传到 DB2 机器上, 放到目录 $HOME/asncdctools
使用如下命编译 debezium 提供的 udf
cd $HOME/asncdctools /opt/ibm/db2/V11.5/samples/c/bldrtn asncdc
确保 JDBC 可以读取 DB2 元数据目录
cd $HOME/sqllib/bnd db2 bind db2schema.bnd blocking all grant public sqlerror continue
连接到数据库安装 Debezium 管理 UDF。假设您以 db2instl 用户身份登录,因此应在该 db2inst1 用户上安装 UDF。注意替换 DB_NAME
db2 connect to DB_NAME
复制 Debezium mangement UDF 并为其设置权限
cp $HOME/asncdctools/asncdc $HOME/sqllib/function chmod 777 $HOME/sqllib/function
启用启动和停止 ASN 捕获代理 的 Debezium UDF
db2 -tvmf $HOME/asncdctools/asncdc_UDF.sql
创建 ASN 控制表
db2 -tvmf $HOME/asncdctools/asncdctables.sql
启用将表添加到捕获模式并从捕获模式中删除表的 Debezium UDF
db2 -tvmf $HOME/asncdctools/asncdcaddremove.sql
进入 db2 sql client,注意替换 DB_NAME,启动 ASN 代理
db2 connect to DB_NAME VALUES ASNCDC.ASNCDCSERVICES('start','asncdc')
将表置于捕获模式。为要捕获的每个表调用以下语句。替换 MYSCHEMA 为包含要进入捕获模式的表的模式的名称。同样,替换 MYTABLE 为要进入捕获模式的表的名称
CALL ASNCDC.ADDTABLE('test', 'test_mq')
重新初始化 ASN 服务
VALUES ASNCDC.ASNCDCSERVICES('reinit','asncdc')
重新初始化后看见类似如下字样
2022-12-08-11.46.53.420967 ASN0600I "AsnCcmd" : "" : "Initial" : Program "capcmd 11.4.0 (Build 11.5.0.0 Level s1906101300, PTF DYN1906101300AMD64)" is starting.
更多信息可以参考: Setting up DB2