# 各类数据库 CDC 配置指南
## MySQL
MySQL CDC 连接器允许从 MySQL 数据库读取快照数据和增量数据。本文描述了如何准备好 MySQL 侧相关功能。
### 支持的数据库
| Connector | Database | Driver |
| ------------------------------------------------------------ | ------------------------------------------------------------ | ------------------- |
| [mysql-cdc](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc(ZH).html#) | [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x
[RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x
[PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x
[Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x
[MariaDB](https://mariadb.org/): 10.x
[PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.27 |
### 创建用户
CDC connector需要使用 MySQL 用户连接抽取数据。该用户需要拥有所有需要进行 CDC 抽取的数据库的权限。
1. 创建 MySQL 用户:
```SQL
mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
```
2. 用户赋权:
```SQL
mysql> GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';
```
3. 提交权限
```SQL
mysql> FLUSH PRIVILEGES;
```
权限说明:
| 权限关键字 | 说明 |
| :------------------------- | :----------------------------------------------------------- |
| `SELECT` | Enables the connector to select rows from tables in databases. This is used only when performing a snapshot. |
| `RELOAD` | Enables the connector the use of the `FLUSH` statement to clear or reload internal caches, flush tables, or acquire locks. This is used only when performing a snapshot. |
| `SHOW DATABASES` | Enables the connector to see database names by issuing the `SHOW DATABASE` statement. This is used only when performing a snapshot. |
| `REPLICATION SLAVE` | Enables the connector to connect to and read the MySQL server binlog. |
| `REPLICATION CLIENT` | Enables the connector the use of the following statements:`SHOW MASTER STATUS``SHOW SLAVE STATUS``SHOW BINARY LOGS`The connector always requires this. |
| `ON` | Identifies the database to which the permissions apply. |
| `TO 'user'` | Specifies the user to grant the permissions to. |
| `IDENTIFIED BY 'password'` | Specifies the user’s MySQL password. |
### 打开 binlog
1. 检查 `log-bin` 是否已经打开
```SQL
mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
FROM information_schema.global_variables WHERE variable_name='log_bin';
```
2. 如果是是 `OFF`, 修改 MySQL 配置文件如下配置项:
```properties
server-id = 223344
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
expire_logs_days = 10
```
3. 再次检查 binlog 是否打开
```SQL
mysql> SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::"
FROM information_schema.global_variables WHERE variable_name='log_bin';
```
MySQL 配置项说明
| 配置项 | 说明 |
| :----------------- | :----------------------------------------------------------- |
| `server-id` | The value for the `server-id` must be unique for each server and replication client in the MySQL cluster. During MySQL connector set up, Debezium assigns a unique server ID to the connector. |
| `log_bin` | The value of `log_bin` is the base name of the sequence of binlog files. |
| `binlog_format` | The `binlog-format` must be set to `ROW` or `row`. |
| `binlog_row_image` | The `binlog_row_image` must be set to `FULL` or `full`. |
| `expire_logs_days` | This is the number of days for automatic binlog file removal. The default is `0`, which means no automatic removal. Set the value to match the needs of your environment. See [MySQL purges binlog files](https://debezium.io/documentation/reference/1.6/connectors/mysql.html#mysql-purges-binlog-files-used-by-debezium). |
## Oracle
Oracle CDC 连接器允许从 Oracle 数据库读取快照数据和增量数据。本文描述了如何准备好 Oracle 侧相关功能。
### 支持的数据库
| Connector | Database | Driver |
| ------------------------------------------------------------ | ------------------ | ----------------------- |
| [oracle-cdc](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/oracle-cdc.html#) | Oracle: 11, 12, 19 | Oracle Driver: 19.3.0.0 |
### Oracle 准备
首先需要打开 Oracle 数据库的 log archiving 功能,并且准备一个 Oracle 用户,其需要具有对于所有需要进行 CDC 抽取的数据库合适的权限。
1. 打开 log archiving
(1.1). 以DBA身份连接Oracle
```
ORACLE_SID=SID
export ORACLE_SID
sqlplus /nolog
CONNECT sys/password AS SYSDBA
```
(1.2). 打开 log archiving
```
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
```
**注意:**
- 打开 log archiving 需要重启 Oracle 数据库
- archived logs 可能会占用大量磁盘空间,建议定期清理过期数据
(1.3). 检查 log archiving 状态
```
-- Should now "Database log mode: Archive Mode"
archive log list;
```
**注意:**
需要在待抽取的表或数据库上开启 supplemental logging,这是为了能抓取 CDC 数据每行的*before* state 。如下说明了如何配置打开该功能。
```
-- Enable supplemental logging for a specific table:
ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
```
```
-- Enable supplemental logging for database
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
```
2. 创建有权限的 Oracle 用户
(2.1). 创建 Tablespace
```
sqlplus sys/password@host:port/SID AS SYSDBA;
CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
exit;
```
(2.2). 创建用户并赋权
```
sqlplus sys/password@host:port/SID AS SYSDBA;
CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
GRANT CREATE SESSION TO flinkuser;
GRANT SET CONTAINER TO flinkuser;
GRANT SELECT ON V_$DATABASE to flinkuser;
GRANT FLASHBACK ANY TABLE TO flinkuser;
GRANT SELECT ANY TABLE TO flinkuser;
GRANT SELECT_CATALOG_ROLE TO flinkuser;
GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
GRANT SELECT ANY TRANSACTION TO flinkuser;
GRANT LOGMINING TO flinkuser;
GRANT CREATE TABLE TO flinkuser;
GRANT LOCK ANY TABLE TO flinkuser;
GRANT ALTER ANY TABLE TO flinkuser;
GRANT CREATE SEQUENCE TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
GRANT SELECT ON V_$LOG TO flinkuser;
GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
GRANT SELECT ON V_$LOGFILE TO flinkuser;
GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
exit;
```
### 对于 CDB database
整体来说和以上非 CDB 数据库操作相似,但是具体命令不尽相同。
1. 打开 log archiving
```
ORACLE_SID=ORCLCDB
export ORACLE_SID
sqlplus /nolog
CONNECT sys/password AS SYSDBA
alter system set db_recovery_file_dest_size = 10G;
-- should exist
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate
startup mount
alter database archivelog;
alter database open;
-- Should show "Database log mode: Archive Mode"
archive log list
exit;
```
**Note:** 这里也需要打开 supplemental logging:
```
-- Enable supplemental logging for a specific table:
ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
-- Enable supplemental logging for database
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
```
2. 创建有权限的 Oracle 用户
```
sqlplus sys/password@//localhost:1521/ORCLCDB as sysdba
CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
exit
```
```
sqlplus sys/password@//localhost:1521/ORCLPDB1 as sysdba
CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
exit
```
```
sqlplus sys/password@//localhost:1521/ORCLCDB as sysdba
CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;
GRANT CREATE SESSION TO flinkuser CONTAINER=ALL;
GRANT SET CONTAINER TO flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$DATABASE to flinkuser CONTAINER=ALL;
GRANT FLASHBACK ANY TABLE TO flinkuser CONTAINER=ALL;
GRANT SELECT ANY TABLE TO flinkuser CONTAINER=ALL;
GRANT SELECT_CATALOG_ROLE TO flinkuser CONTAINER=ALL;
GRANT EXECUTE_CATALOG_ROLE TO flinkuser CONTAINER=ALL;
GRANT SELECT ANY TRANSACTION TO flinkuser CONTAINER=ALL;
GRANT LOGMINING TO flinkuser CONTAINER=ALL;
GRANT CREATE TABLE TO flinkuser CONTAINER=ALL;
-- need not to execute if set scan.incremental.snapshot.enabled=true(default)
GRANT LOCK ANY TABLE TO flinkuser CONTAINER=ALL;
GRANT CREATE SEQUENCE TO flinkuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG TO flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG_HISTORY TO flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGFILE TO flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser CONTAINER=ALL;
exit
```
更多信息可以参考: [Setting up Oracle](https://debezium.io/documentation/reference/1.6/connectors/oracle.html#setting-up-oracle)
## DB2
DB2 CDC 连接器允许从 DB2 数据库读取快照数据和增量数据。本文描述了如何准备好 DB2 侧相关功能。
### 支持的数据库
| Connector | Database | Driver |
| ------------------------------------------------------------ | --------------------------------------------- | -------------------- |
| [Db2-cdc](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/db2-cdc.html) | [Db2](https://www.ibm.com/products/db2): 11.5 | Db2 Driver: 11.5.0.0 |
### DB2 准备
添加 debezium management udf 到 DB2, 设置 asn agent,添加所需的表
1. 登录 DB2 服务器,以要链接的 DB 用户,将 debezium 提供的 mangement udf包(asncdctools)传到 DB2 机器上, 放到目录 $HOME/asncdctools
2. 使用如下命编译 debezium 提供的 udf
```
cd $HOME/asncdctools
/opt/ibm/db2/V11.5/samples/c/bldrtn asncdc
```
3. 确保 JDBC 可以读取 DB2 元数据目录
```
cd $HOME/sqllib/bnd
db2 bind db2schema.bnd blocking all grant public sqlerror continue
```
4. 连接到数据库安装 Debezium 管理 UDF。假设您以 db2instl 用户身份登录,因此应在该 db2inst1 用户上安装 UDF。注意替换 DB_NAME
```
db2 connect to DB_NAME
```
5. 复制 Debezium mangement UDF 并为其设置权限
```
cp $HOME/asncdctools/asncdc $HOME/sqllib/function
chmod 777 $HOME/sqllib/function
```
6. 启用启动和停止 ASN 捕获代理 的 Debezium UDF
```
db2 -tvmf $HOME/asncdctools/asncdc_UDF.sql
```
7. 创建 ASN 控制表
```
db2 -tvmf $HOME/asncdctools/asncdctables.sql
```
8. 启用将表添加到捕获模式并从捕获模式中删除表的 Debezium UDF
```
db2 -tvmf $HOME/asncdctools/asncdcaddremove.sql
```
9. 进入 db2 sql client,注意替换 DB_NAME,启动 ASN 代理
```
db2
connect to DB_NAME
VALUES ASNCDC.ASNCDCSERVICES('start','asncdc')
```
10. 将表置于捕获模式。为要捕获的每个表调用以下语句。替换 MYSCHEMA 为包含要进入捕获模式的表的模式的名称。同样,替换 MYTABLE 为要进入捕获模式的表的名称
```
CALL ASNCDC.ADDTABLE('test', 'test_mq')
```
11. 重新初始化 ASN 服务
```
VALUES ASNCDC.ASNCDCSERVICES('reinit','asncdc')
重新初始化后看见类似如下字样
2022-12-08-11.46.53.420967 ASN0600I "AsnCcmd" : "" : "Initial" : Program "capcmd 11.4.0 (Build 11.5.0.0 Level s1906101300, PTF DYN1906101300AMD64)" is starting.
```
更多信息可以参考: [Setting up DB2](https://debezium.io/documentation/reference/1.6/connectors/db2.html#setting-up-db2)