# 各类数据库 CDC 配置指南 ## MySQL MySQL CDC 连接器允许从 MySQL 数据库读取快照数据和增量数据。本文描述了如何准备好 MySQL 侧相关功能。 ### 支持的数据库 | Connector | Database | Driver | | ------------------------------------------------------------ | ------------------------------------------------------------ | ------------------- | | [mysql-cdc](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc(ZH).html#) | [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x
[RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x
[PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x
[Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x
[MariaDB](https://mariadb.org/): 10.x
[PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.27 | ### 创建用户 CDC connector需要使用 MySQL 用户连接抽取数据。该用户需要拥有所有需要进行 CDC 抽取的数据库的权限。 1. 创建 MySQL 用户: ```SQL mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password'; ``` 2. 用户赋权: ```SQL mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password'; ``` 3. 提交权限 ```SQL mysql> FLUSH PRIVILEGES; ``` 权限说明: | 权限关键字 | 说明 | | :------------------------- | :----------------------------------------------------------- | | `SELECT` | Enables the connector to select rows from tables in databases. This is used only when performing a snapshot. | | `RELOAD` | Enables the connector the use of the `FLUSH` statement to clear or reload internal caches, flush tables, or acquire locks. This is used only when performing a snapshot. | | `SHOW DATABASES` | Enables the connector to see database names by issuing the `SHOW DATABASE` statement. This is used only when performing a snapshot. | | `REPLICATION SLAVE` | Enables the connector to connect to and read the MySQL server binlog. | | `REPLICATION CLIENT` | Enables the connector the use of the following statements:`SHOW MASTER STATUS``SHOW SLAVE STATUS``SHOW BINARY LOGS`The connector always requires this. | | `ON` | Identifies the database to which the permissions apply. | | `TO 'user'` | Specifies the user to grant the permissions to. | | `IDENTIFIED BY 'password'` | Specifies the user’s MySQL password. | ### 打开 binlog 1. 检查 `log-bin` 是否已经打开 ```SQL mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM information_schema.global_variables WHERE variable_name='log_bin'; ``` 2. 如果是是 `OFF`, 修改 MySQL 配置文件如下配置项: ```properties server-id = 223344 log_bin = mysql-bin binlog_format = ROW binlog_row_image = FULL expire_logs_days = 10 ``` 3. 再次检查 binlog 是否打开 ```SQL mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM information_schema.global_variables WHERE variable_name='log_bin'; ``` MySQL 配置项说明 | 配置项 | 说明 | | :----------------- | :----------------------------------------------------------- | | `server-id` | The value for the `server-id` must be unique for each server and replication client in the MySQL cluster. During MySQL connector set up, Debezium assigns a unique server ID to the connector. | | `log_bin` | The value of `log_bin` is the base name of the sequence of binlog files. | | `binlog_format` | The `binlog-format` must be set to `ROW` or `row`. | | `binlog_row_image` | The `binlog_row_image` must be set to `FULL` or `full`. | | `expire_logs_days` | This is the number of days for automatic binlog file removal. The default is `0`, which means no automatic removal. Set the value to match the needs of your environment. See [MySQL purges binlog files](https://debezium.io/documentation/reference/1.6/connectors/mysql.html#mysql-purges-binlog-files-used-by-debezium). | ## Oracle Oracle CDC 连接器允许从 Oracle 数据库读取快照数据和增量数据。本文描述了如何准备好 Oracle 侧相关功能。 ### 支持的数据库 | Connector | Database | Driver | | ------------------------------------------------------------ | ------------------ | ----------------------- | | [oracle-cdc](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/oracle-cdc.html#) | Oracle: 11, 12, 19 | Oracle Driver: 19.3.0.0 | ### Oracle 准备 首先需要打开 Oracle 数据库的 log archiving 功能,并且准备一个 Oracle 用户,其需要具有对于所有需要进行 CDC 抽取的数据库合适的权限。 1. 打开 log archiving (1.1). 以DBA身份连接Oracle ``` ORACLE_SID=SID export ORACLE_SID sqlplus /nolog CONNECT sys/password AS SYSDBA ``` (1.2). 打开 log archiving ``` alter system set db_recovery_file_dest_size = 10G; alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile; shutdown immediate; startup mount; alter database archivelog; alter database open; ``` **注意:** - 打开 log archiving 需要重启 Oracle 数据库 - archived logs 可能会占用大量磁盘空间,建议定期清理过期数据 (1.3). 检查 log archiving 状态 ``` -- Should now "Database log mode: Archive Mode" archive log list; ``` **注意:** 需要在待抽取的表或数据库上开启 supplemental logging,这是为了能抓取 CDC 数据每行的*before* state 。如下说明了如何配置打开该功能。 ``` -- Enable supplemental logging for a specific table: ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; ``` ``` -- Enable supplemental logging for database ALTER DATABASE ADD SUPPLEMENTAL LOG DATA; ``` 2. 创建有权限的 Oracle 用户 (2.1). 创建 Tablespace ``` sqlplus sys/password@host:port/SID AS SYSDBA; CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED; exit; ``` (2.2). 创建用户并赋权 ``` sqlplus sys/password@host:port/SID AS SYSDBA; CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS; GRANT CREATE SESSION TO flinkuser; GRANT SET CONTAINER TO flinkuser; GRANT SELECT ON V_$DATABASE to flinkuser; GRANT FLASHBACK ANY TABLE TO flinkuser; GRANT SELECT ANY TABLE TO flinkuser; GRANT SELECT_CATALOG_ROLE TO flinkuser; GRANT EXECUTE_CATALOG_ROLE TO flinkuser; GRANT SELECT ANY TRANSACTION TO flinkuser; GRANT LOGMINING TO flinkuser; GRANT CREATE TABLE TO flinkuser; GRANT LOCK ANY TABLE TO flinkuser; GRANT ALTER ANY TABLE TO flinkuser; GRANT CREATE SEQUENCE TO flinkuser; GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser; GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser; GRANT SELECT ON V_$LOG TO flinkuser; GRANT SELECT ON V_$LOG_HISTORY TO flinkuser; GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser; GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser; GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser; GRANT SELECT ON V_$LOGFILE TO flinkuser; GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser; GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser; exit; ``` ### 对于 CDB database 整体来说和以上非 CDB 数据库操作相似,但是具体命令不尽相同。 1. 打开 log archiving ``` ORACLE_SID=ORCLCDB export ORACLE_SID sqlplus /nolog CONNECT sys/password AS SYSDBA alter system set db_recovery_file_dest_size = 10G; -- should exist alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile; shutdown immediate startup mount alter database archivelog; alter database open; -- Should show "Database log mode: Archive Mode" archive log list exit; ``` **Note:** 这里也需要打开 supplemental logging: ``` -- Enable supplemental logging for a specific table: ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; -- Enable supplemental logging for database ALTER DATABASE ADD SUPPLEMENTAL LOG DATA; ``` 2. 创建有权限的 Oracle 用户 ``` sqlplus sys/password@//localhost:1521/ORCLCDB as sysdba CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED; exit ``` ``` sqlplus sys/password@//localhost:1521/ORCLPDB1 as sysdba CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED; exit ``` ``` sqlplus sys/password@//localhost:1521/ORCLCDB as sysdba CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL; GRANT CREATE SESSION TO flinkuser CONTAINER=ALL; GRANT SET CONTAINER TO flinkuser CONTAINER=ALL; GRANT SELECT ON V_$DATABASE to flinkuser CONTAINER=ALL; GRANT FLASHBACK ANY TABLE TO flinkuser CONTAINER=ALL; GRANT SELECT ANY TABLE TO flinkuser CONTAINER=ALL; GRANT SELECT_CATALOG_ROLE TO flinkuser CONTAINER=ALL; GRANT EXECUTE_CATALOG_ROLE TO flinkuser CONTAINER=ALL; GRANT SELECT ANY TRANSACTION TO flinkuser CONTAINER=ALL; GRANT LOGMINING TO flinkuser CONTAINER=ALL; GRANT CREATE TABLE TO flinkuser CONTAINER=ALL; -- need not to execute if set scan.incremental.snapshot.enabled=true(default) GRANT LOCK ANY TABLE TO flinkuser CONTAINER=ALL; GRANT CREATE SEQUENCE TO flinkuser CONTAINER=ALL; GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser CONTAINER=ALL; GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser CONTAINER=ALL; GRANT SELECT ON V_$LOG TO flinkuser CONTAINER=ALL; GRANT SELECT ON V_$LOG_HISTORY TO flinkuser CONTAINER=ALL; GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser CONTAINER=ALL; GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser CONTAINER=ALL; GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser CONTAINER=ALL; GRANT SELECT ON V_$LOGFILE TO flinkuser CONTAINER=ALL; GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser CONTAINER=ALL; GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser CONTAINER=ALL; exit ``` 更多信息可以参考: [Setting up Oracle](https://debezium.io/documentation/reference/1.6/connectors/oracle.html#setting-up-oracle) ## DB2 DB2 CDC 连接器允许从 DB2 数据库读取快照数据和增量数据。本文描述了如何准备好 DB2 侧相关功能。 ### 支持的数据库 | Connector | Database | Driver | | ------------------------------------------------------------ | --------------------------------------------- | -------------------- | | [Db2-cdc](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/db2-cdc.html) | [Db2](https://www.ibm.com/products/db2): 11.5 | Db2 Driver: 11.5.0.0 | ### DB2 准备 添加 debezium management udf 到 DB2, 设置 asn agent,添加所需的表 1. 登录 DB2 服务器,以要链接的 DB 用户,将 debezium 提供的 mangement udf包(asncdctools)传到 DB2 机器上, 放到目录 $HOME/asncdctools 2. 使用如下命编译 debezium 提供的 udf ``` cd $HOME/asncdctools /opt/ibm/db2/V11.5/samples/c/bldrtn asncdc ``` 3. 确保 JDBC 可以读取 DB2 元数据目录 ``` cd $HOME/sqllib/bnd db2 bind db2schema.bnd blocking all grant public sqlerror continue ``` 4. 连接到数据库安装 Debezium 管理 UDF。假设您以 db2instl 用户身份登录,因此应在该 db2inst1 用户上安装 UDF。注意替换 DB_NAME ``` db2 connect to DB_NAME ``` 5. 复制 Debezium mangement UDF 并为其设置权限 ``` cp $HOME/asncdctools/asncdc $HOME/sqllib/function chmod 777 $HOME/sqllib/function ``` 6. 启用启动和停止 ASN 捕获代理 的 Debezium UDF ``` db2 -tvmf $HOME/asncdctools/asncdc_UDF.sql ``` 7. 创建 ASN 控制表 ``` db2 -tvmf $HOME/asncdctools/asncdctables.sql ``` 8. 启用将表添加到捕获模式并从捕获模式中删除表的 Debezium UDF ``` db2 -tvmf $HOME/asncdctools/asncdcaddremove.sql ``` 9. 进入 db2 sql client,注意替换 DB_NAME,启动 ASN 代理 ``` db2 connect to DB_NAME VALUES ASNCDC.ASNCDCSERVICES('start','asncdc') ``` 10. 将表置于捕获模式。为要捕获的每个表调用以下语句。替换 MYSCHEMA 为包含要进入捕获模式的表的模式的名称。同样,替换 MYTABLE 为要进入捕获模式的表的名称 ``` CALL ASNCDC.ADDTABLE('test', 'test_mq') ``` 11. 重新初始化 ASN 服务 ``` VALUES ASNCDC.ASNCDCSERVICES('reinit','asncdc') 重新初始化后看见类似如下字样 2022-12-08-11.46.53.420967 ASN0600I "AsnCcmd" : "" : "Initial" : Program "capcmd 11.4.0 (Build 11.5.0.0 Level s1906101300, PTF DYN1906101300AMD64)" is starting. ``` 更多信息可以参考: [Setting up DB2](https://debezium.io/documentation/reference/1.6/connectors/db2.html#setting-up-db2)