各类数据库 CDC 配置指南#

MySQL#

MySQL CDC 连接器允许从 MySQL 数据库读取快照数据和增量数据。本文描述了如何准备好 MySQL 侧相关功能。

支持的数据库#

Connector

Database

Driver

mysql-cdc

MySQL: 5.6, 5.7, 8.0.x
RDS MySQL: 5.6, 5.7, 8.0.x
PolarDB MySQL: 5.6, 5.7, 8.0.x
Aurora MySQL: 5.6, 5.7, 8.0.x
MariaDB: 10.x
PolarDB X: 2.0.1

JDBC Driver: 8.0.27

创建用户#

CDC connector需要使用 MySQL 用户连接抽取数据。该用户需要拥有所有需要进行 CDC 抽取的数据库的权限。

  1. 创建 MySQL 用户:

    mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
    
  2. 用户赋权:

    mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
    
  3. 提交权限

    mysql> FLUSH PRIVILEGES;
    

权限说明:

权限关键字

说明

SELECT

Enables the connector to select rows from tables in databases. This is used only when performing a snapshot.

RELOAD

Enables the connector the use of the FLUSH statement to clear or reload internal caches, flush tables, or acquire locks. This is used only when performing a snapshot.

SHOW DATABASES

Enables the connector to see database names by issuing the SHOW DATABASE statement. This is used only when performing a snapshot.

REPLICATION SLAVE

Enables the connector to connect to and read the MySQL server binlog.

REPLICATION CLIENT

Enables the connector the use of the following statements:SHOW MASTER STATUS``SHOW SLAVE STATUS``SHOW BINARY LOGSThe connector always requires this.

ON

Identifies the database to which the permissions apply.

TO 'user'

Specifies the user to grant the permissions to.

IDENTIFIED BY 'password'

Specifies the user’s MySQL password.

打开 binlog#

  1. 检查 log-bin 是否已经打开

    mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
    FROM information_schema.global_variables WHERE variable_name='log_bin';
    
  2. 如果是是 OFF, 修改 MySQL 配置文件如下配置项:

    server-id         = 223344
    log_bin           = mysql-bin
    binlog_format     = ROW
    binlog_row_image  = FULL
    expire_logs_days  = 10
    
  3. 再次检查 binlog 是否打开

    mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
    FROM information_schema.global_variables WHERE variable_name='log_bin';
    

MySQL 配置项说明

配置项

说明

server-id

The value for the server-id must be unique for each server and replication client in the MySQL cluster. During MySQL connector set up, Debezium assigns a unique server ID to the connector.

log_bin

The value of log_bin is the base name of the sequence of binlog files.

binlog_format

The binlog-format must be set to ROW or row.

binlog_row_image

The binlog_row_image must be set to FULL or full.

expire_logs_days

This is the number of days for automatic binlog file removal. The default is 0, which means no automatic removal. Set the value to match the needs of your environment. See MySQL purges binlog files.

Oracle#

Oracle CDC 连接器允许从 Oracle 数据库读取快照数据和增量数据。本文描述了如何准备好 Oracle 侧相关功能。

支持的数据库#

Connector

Database

Driver

oracle-cdc

Oracle: 11, 12, 19

Oracle Driver: 19.3.0.0

Oracle 准备#

首先需要打开 Oracle 数据库的 log archiving 功能,并且准备一个 Oracle 用户,其需要具有对于所有需要进行 CDC 抽取的数据库合适的权限。

  1. 打开 log archiving
    (1.1). 以DBA身份连接Oracle

    ORACLE_SID=SID
    export ORACLE_SID
    sqlplus /nolog
      CONNECT sys/password AS SYSDBA
    

    (1.2). 打开 log archiving

    alter system set db_recovery_file_dest_size = 10G;
    alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
    shutdown immediate;
    startup mount;
    alter database archivelog;
    alter database open;
    

    注意:

    • 打开 log archiving 需要重启 Oracle 数据库

    • archived logs 可能会占用大量磁盘空间,建议定期清理过期数据

    (1.3). 检查 log archiving 状态

    -- Should now "Database log mode: Archive Mode"
    archive log list;
    

    注意:

    需要在待抽取的表或数据库上开启 supplemental logging,这是为了能抓取 CDC 数据每行的before state 。如下说明了如何配置打开该功能。

    -- Enable supplemental logging for a specific table:
    ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
    
    -- Enable supplemental logging for database
    ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
    
  2. 创建有权限的 Oracle 用户

    (2.1). 创建 Tablespace

    sqlplus sys/password@host:port/SID AS SYSDBA;
      CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
      exit;
    

    (2.2). 创建用户并赋权

    sqlplus sys/password@host:port/SID AS SYSDBA;
      CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
      GRANT CREATE SESSION TO flinkuser;
      GRANT SET CONTAINER TO flinkuser;
      GRANT SELECT ON V_$DATABASE to flinkuser;
      GRANT FLASHBACK ANY TABLE TO flinkuser;
      GRANT SELECT ANY TABLE TO flinkuser;
      GRANT SELECT_CATALOG_ROLE TO flinkuser;
      GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
      GRANT SELECT ANY TRANSACTION TO flinkuser;
      GRANT LOGMINING TO flinkuser;
    
      GRANT CREATE TABLE TO flinkuser;
      GRANT LOCK ANY TABLE TO flinkuser;
      GRANT ALTER ANY TABLE TO flinkuser;
      GRANT CREATE SEQUENCE TO flinkuser;
    
      GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
      GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
    
      GRANT SELECT ON V_$LOG TO flinkuser;
      GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
      GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
      GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
      GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
      GRANT SELECT ON V_$LOGFILE TO flinkuser;
      GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
      GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
      exit;
    

对于 CDB database#

整体来说和以上非 CDB 数据库操作相似,但是具体命令不尽相同。

  1. 打开 log archiving

    ORACLE_SID=ORCLCDB
    export ORACLE_SID
    sqlplus /nolog
      CONNECT sys/password AS SYSDBA
      alter system set db_recovery_file_dest_size = 10G;
      -- should exist
      alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
      shutdown immediate
      startup mount
      alter database archivelog;
      alter database open;
      -- Should show "Database log mode: Archive Mode"
      archive log list
      exit;
    

    Note: 这里也需要打开 supplemental logging:

    -- Enable supplemental logging for a specific table:
    ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
    -- Enable supplemental logging for database
    ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
    
  2. 创建有权限的 Oracle 用户

    sqlplus sys/password@//localhost:1521/ORCLCDB as sysdba
      CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
      exit
    
    sqlplus sys/password@//localhost:1521/ORCLPDB1 as sysdba
      CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
      exit
    
    sqlplus sys/password@//localhost:1521/ORCLCDB as sysdba
      CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;
      GRANT CREATE SESSION TO flinkuser CONTAINER=ALL;
      GRANT SET CONTAINER TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$DATABASE to flinkuser CONTAINER=ALL;
      GRANT FLASHBACK ANY TABLE TO flinkuser CONTAINER=ALL;
      GRANT SELECT ANY TABLE TO flinkuser CONTAINER=ALL;
      GRANT SELECT_CATALOG_ROLE TO flinkuser CONTAINER=ALL;
      GRANT EXECUTE_CATALOG_ROLE TO flinkuser CONTAINER=ALL;
      GRANT SELECT ANY TRANSACTION TO flinkuser CONTAINER=ALL;
      GRANT LOGMINING TO flinkuser CONTAINER=ALL;
      GRANT CREATE TABLE TO flinkuser CONTAINER=ALL;
      -- need not to execute if set scan.incremental.snapshot.enabled=true(default)
      GRANT LOCK ANY TABLE TO flinkuser CONTAINER=ALL;
      GRANT CREATE SEQUENCE TO flinkuser CONTAINER=ALL;
    
      GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser CONTAINER=ALL;
      GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser CONTAINER=ALL;
    
      GRANT SELECT ON V_$LOG TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$LOG_HISTORY TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$LOGFILE TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser CONTAINER=ALL;
      exit
    

更多信息可以参考: Setting up Oracle

DB2#

DB2 CDC 连接器允许从 DB2 数据库读取快照数据和增量数据。本文描述了如何准备好 DB2 侧相关功能。

支持的数据库#

Connector

Database

Driver

Db2-cdc

Db2: 11.5

Db2 Driver: 11.5.0.0

DB2 准备#

添加 debezium management udf 到 DB2, 设置 asn agent,添加所需的表

  1. 登录 DB2 服务器,以要链接的 DB 用户,将 debezium 提供的 mangement udf包(asncdctools)传到 DB2 机器上, 放到目录 $HOME/asncdctools

  2. 使用如下命编译 debezium 提供的 udf

    cd $HOME/asncdctools
    /opt/ibm/db2/V11.5/samples/c/bldrtn asncdc
    
  3. 确保 JDBC 可以读取 DB2 元数据目录

    cd $HOME/sqllib/bnd
    db2 bind db2schema.bnd blocking all grant public sqlerror continue
    
  4. 连接到数据库安装 Debezium 管理 UDF。假设您以 db2instl 用户身份登录,因此应在该 db2inst1 用户上安装 UDF。注意替换 DB_NAME

    db2 connect to DB_NAME
    
  5. 复制 Debezium mangement UDF 并为其设置权限

    cp $HOME/asncdctools/asncdc $HOME/sqllib/function
    chmod 777 $HOME/sqllib/function
    
  6. 启用启动和停止 ASN 捕获代理 的 Debezium UDF

    db2 -tvmf $HOME/asncdctools/asncdc_UDF.sql
    
  7. 创建 ASN 控制表

    db2 -tvmf $HOME/asncdctools/asncdctables.sql
    
  8. 启用将表添加到捕获模式并从捕获模式中删除表的 Debezium UDF

    db2 -tvmf $HOME/asncdctools/asncdcaddremove.sql
    
  9. 进入 db2 sql client,注意替换 DB_NAME,启动 ASN 代理

    db2
    connect to DB_NAME
    
    VALUES ASNCDC.ASNCDCSERVICES('start','asncdc')
    
  10. 将表置于捕获模式。为要捕获的每个表调用以下语句。替换 MYSCHEMA 为包含要进入捕获模式的表的模式的名称。同样,替换 MYTABLE 为要进入捕获模式的表的名称

CALL ASNCDC.ADDTABLE('test', 'test_mq')
  1. 重新初始化 ASN 服务

VALUES ASNCDC.ASNCDCSERVICES('reinit','asncdc')

重新初始化后看见类似如下字样
2022-12-08-11.46.53.420967 ASN0600I  "AsnCcmd" : "" : "Initial" : Program "capcmd 11.4.0 (Build 11.5.0.0 Level s1906101300, PTF DYN1906101300AMD64)" is starting.

更多信息可以参考: Setting up DB2