# 数据集成任务创建指南 ## 数据库-CDC 任务创建指南 本章节介绍如何创建以 CDC 模式抽取源数据库的任务,以及注意事项。 我们以 Oracle 作为源,OushuDB 作为目标举例。其他数据库作为源配置 CDC 任务时,也可参考此文章,只是源库侧 CDC 的配置有些不同。 ### 前提 1. 源侧开启 CDC 相关配置,详见 [CDC配置指南](wasp-cdc-config-guide.md) 2. 已添加 Oracle 数据源,保证数据源的连通性,访问用户有必要权限。 3. 已添加 OushuDB 数据源,保证数据源的连通性,访问用户有必要权限。 4. Wasp 侧需要保证,已创建好 Flink 集群并且状态是运行中。 ### 任务创建流程 1. 选择数据源 * 选择一个 Oracle 数据源。 ![image](images/selected_oracle_source.png) * 选择任务模式为“实时模式”。 ![image](images/db-cdc/realtime-mode.png) * 高级配置选择同步历史。 并行度建议根据具体场景配置,数据量、TPS 要求、Flink 集群可用 slot 数等。 更新频率其实是 Flink checkpoint 的间隔,每次 checkpoint,我们会向目标数据库提交一次。当然,OushuDB 作为目标时,我们做了一些优化,满足一定 batch 时就会提交,所以通常这个配置项可配置成您能接收的最大延迟。 ![image](images/db-cdc/db-cdc-config.png) * 测试连接成功后,进行下一步。 2. 选择处理对象 根据您的业务需要,您可以选择多个 schema 下的多张表进行 CDC 同步。 ![image](images/db-cdc/selected_oracle_objects.png) 3. 选择数据目标 选择一个 OushuDB 数据目标。 ![image](images/selected_oushu_sink.png) 4. 配置列映射 在这里,您可以指定某张源表要导入到某张目标表,是否是已有表/自动建表等,点击“列映射”可以配置源表到目标表的列对应关系、名称、主键、类型等。 详情请见[使用指南](wasp-user-guide.md)中的“创建任务-列映射”章节。 至此,一个从 Oracle 通过实时 CDC 的方式抽取并导入到 OushuDB 的数据集成任务就创建完成了。 ## 数据库-JDBC 任务创建指南 本章节介绍如何创建以 JDBC 模式抽取源数据库的任务,以及注意事项。 我们以 Oracle 作为源,OushuDB 作为目标举例。其他关系型数据库 JDBC 抽取也可以参照此文章。 ### 前提 1. 已添加 Oracle 数据源,保证数据源的连通性,访问用户有必要权限。 2. 已添加 OushuDB 数据源,保证数据源的连通性,访问用户有必要权限。 3. 已创建好 Flink 集群并且状态是运行中。 ### 任务创建流程 1. 选择数据源 * 选择一个 Oracle 数据源。 ![image](images/selected_oracle_source.png) * 选择任务模式为“批量模式”。 ![image](images/db-jdbc/batch-mode.png) * 定时配置 通常 JDBC 批量抽取的场景,我们往往需要结合定时调度完成场景。Wasp 支持时间间隔和 cron 表达式两种定时方式,这里按需配置就好。 ![image](images/db-jdbc/timer-config.png) * 高级配置 目前批量模式中只有并行度的配置,建议根据具体场景配置,数据量、吞吐量、Flink 集群可用 slot 数等。 * 测试连接成功后,进行下一步。 2. 选择处理对象 根据您的业务需要,您可以选择多个 schema 下的多张表进行 JDBC 同步。 ![image](images/db-jdbc/selected_oracle_objects.png) 对于不同的表,您可以选择不同的同步方式,全量(每次同步),全量(仅初次同步),全量(根据 where 条件同步,支持时间宏语法,详见 [时间宏配置](time-marco.md)),增量(根据时间戳同步), 增量(根据增量列同步),增量(根据时间戳和增量列同步),具体介绍可以查看产品界面右侧的文档。 3. 选择数据目标 选择一个 OushuDB 数据目标。 ![image](images/selected_oushu_sink.png) 4. 配置列映射 在这里,您可以指定某张源表要导入到某张目标表,是否是已有表/自动建表等,点击“列映射”可以配置源表到目标表的列对应关系、名称、主键、类型等。 详情请见[使用指南](wasp-user-guide.md)中的“创建任务-列映射”章节。 至此,一个从 Oracle 通过 JDBC 批量的方式抽取并导入到 OushuDB 的数据集成任务就创建完成了。 ## Kafka-CDC 任务创建指南 本章节介绍如何创建 Kafka CDC 模式抽取源数据库的任务,以及注意事项。 ### 任务创建流程 1. 首先选择 Kafka 数据源 接着选择模式,这里我们选择 CDC 模式 ![kafka_mode](images/integration/task-mode-kafka.png) 选中 CDC 模式的话,在高级配置中需要填写CDC相关的配置信息,这里默认提供了CDC的常用模板,如Ogg,Canal,腾讯数据订阅,也可以自定义配置模板 ![kafka_cdc_config](images/kafka-cdc/kafka_cdc_config.png) 这里我们以下面这条Canal的 CDC JSON 数据举例 ``` { "data": [ { "id": "111", "name": "scooter", "description": "Big 2-wheel scooter", "weight": "5.18" } ], "database": "inventory", "es": 1589373560000, "id": 9, "isDdl": false, "mysqlType": { "id": "INTEGER", "name": "VARCHAR(255)", "description": "VARCHAR(512)", "weight": "FLOAT" }, "old": [ { "weight": "5.15" } ], "pkNames": [ "id" ], "sql": "", "sqlType": { "id": 4, "name": 12, "description": 12, "weight": 7 }, "table": "products", "ts": 1589373560798, "type": "UPDATE" } ``` * 操作类型路径是描述 JSON 数据中,操作类型的 JsonPath,如样例中格式,则填写 `/type` * 操作类型格式是描述这段 CDC 数据的操作和对应的值,根据'type'字段中的值对应 INSERT:INSERT UPDATE:UPDATE DELETE:DELETE * 源表信息格式,是用来描述这条 CDC 数据来自哪张表,这里填写对应的 JsonPath,表名需要是全限定表名称,比如db1.schema2.table3,如样例中的 Canal CDC 数据,全限定表名是由字段 'table' 和 'database' 两个字段组成,源表信息格式处需要填写上 Database 和 Table 的 JsonPath,这样在 CDC 数据从 Kafka 中消费出来时,才能解析出它是来自哪一张源表,根据样例数据对应字段值,我们能知道这条 CDC 数据是来自 inventory.products 表。 测试连接通过后进入到下一步选择处理对象 2. 选择处理对象 这里分为两个步骤 1. 选择 topic,表示从哪些 topic 中获取数据 2. 添加元数据映射(元数据的创建和管理,详情请见[数据源使用手册](../../../../dataend/zh/dataend-user-guide.md)中的“元数据管理”章节)元数据是用来描述表的结构,即我们用哪种表结构来解析这条数据,数据源使用手册中已经介绍过元数据的定义和使用,由于 CDC 元数据定义会有特殊,这里再额外补充上。 ![kafka_cdc_metadata](images/kafka-cdc/kafka_cdc_metadata.png) 对于Kafka Upsert 模式和 无Before/After 的 CDC 数据,元数据定义 JsonPath 时我们需要传递完整的 Path 路径。 但对于有 Before 和 After 的CDC数据来说,如上面 Canal 的样例数据,在 /data 和 /old 中都会出现源表的列,这样我们在定义元数据时,只需要定义 After 和 Before 字段下的子 JsonPath ,这样再加上配置模版时 Before 和 After 的数据路径我们能得到完整的 JsonPath。 对于 TENCENT_DATA 这种格式,只需要定义列名和类型,无需定义 JsonPath,Wasp 内部处理时会根据 Json 中的 columns 字段自动识别 JsonPath。 此外,对于 CDC 元数据创建时,名称建议创建为全限定表名,方便快速添加。 3. 先选择需要消费的 Topic ![topic](images/kafka-cdc/select_kafka_cdc_topic.png) 接着选择元数据后并填写对应的全限定表名,表示这个元数据和哪张源表关联上 ![metadata](images/kafka-cdc/select_kafka_cdc_metadata.png) 3. 选择数据目标 选择一个 OushuDB 数据目标。 ![image](images/selected_oushu_sink.png) 4. 配置列映射 在这里,您可以指定某个元数据对应的表要导入到某张目标表,是否是已有表/自动建表等,如果是自动建表需要手动配置主键。 点击“列映射”可以配置源表到目标表的列对应关系、名称、主键、类型等。 详情请见[使用指南](wasp-user-guide.md)中的“创建任务-列映射”章节。 至此,一个从 Kafka CDC 的方式抽取并导入到 OushuDB 的数据集成任务就创建完成了。 ## Kafka-upsert 任务创建指南 本章节介绍如何创建抽取 Kafka 数据,按照 CSV/JSON 格式解析后 upsert 到 OushuDB 的任务。 ### 前提 1. 已添加 Kafka 数据源,保证数据源的连通性。 2. 已添加 OushuDB 数据源,保证数据源的连通性,访问用户有必要权限。 ### 任务创建流程 1. 创建元数据 在**数据源**模块编辑已添加的 Kafka 数据源,添加元数据。元数据的创建和管理,详情请见[数据源使用手册](../../../../dataend/zh/dataend-user-guide.md)中的“元数据管理”章节 ![image](images/kafka-upsert/metadata.png) 2. 选择数据源 接着选择模式,这里我们选择 upsert 模式 测试连接通过后进入到下一步选择处理对象 ![image](images/integration/task-mode-kafka.png) 3. 选择处理对象 这里每个 topic 对应一个元数据,选中 topic 后需要选择不同类型的元数据,这里支持 JSON 和 CSV 类型的元数据。 ![img.png](images/kafka-upsert/select_upsert_objects.png) 值得注意的是:对于 CSV 元数据,需要填写分隔符 4. 选择数据目标 选择一个 OushuDB 数据目标。 ![image](images/selected_oushu_sink.png) 5. 配置列映射 在这里,您可以指定某个元数据对应的表要导入到某张目标表,是否是已有表/自动建表等,如果是自动建表需要手动配置主键。 点击“列映射”可以配置源表到目标表的列对应关系、名称、主键、类型等。 详情请见[使用指南](wasp-user-guide.md)中的“创建任务-列映射”章节。 至此,一个从 Kafka 抽取,按照 CSV/JSON 格式解析后,upsert 导入到 OushuDB 的数据集成任务就创建完成了。 ## 定量文件同步任务创建指南 本章节介绍如何创建抽取定量文件,按照 CSV/JSON 格式解析后 upsert 到 OushuDB 的任务,下面用 HDFS->OushuDB 举例 ### 前提 1. 已添加 HDFS 数据源,保证数据源的连通性,注意 HDFS 数据源中"工作目录"的指定,这将指定数据集成时可选的文件根目录。 2. 已添加 OushuDB 数据源,保证数据源的连通性,访问用户有必要权限。 ### 任务创建流程 1. 创建元数据 在**数据源**模块编辑已添加的 HDFS 数据源,添加元数据。元数据的创建和管理,详情请见[数据源使用手册](../../../../dataend/zh/dataend-user-guide.md)中的“元数据管理”章节 ![image](images/file-filecollection/metadata.png) 2. 创建数据集成任务 选择数据源,接着选择模式,这里我们选择定量文件同步模式 测试连接通过后进入到下一步选择处理对象 ![image](images/integration/task-mode-file.png) 高级配置: * 更新频率,影响写入目标的频率和记录 checkpoint 的频率,适当增加该值可以增加吞吐量。 * 并行度,即并发读取/写入。 * 忽略首行数据,勾选后读取每个文件时都会跳过首行。 * 编码格式,当前仅开放 UTF-8 ![image](images/file-filecollection/config.png) 3. 选择处理对象 这里每个文件/目录对应一个元数据,勾选文件/目录后需要选择不同类型的元数据,这里支持 JSON 和 CSV 类型的元数据。 ![img.png](images/file-filecollection/select_objects.png) 值得注意的是:对于 CSV 元数据,需要填写分隔符 4. 选择数据目标 选择一个 OushuDB 数据目标。 ![image](images/selected_oushu_sink.png) 5. 配置列映射 在这里,您可以指定某个元数据对应的表要导入到某张目标表,是否是已有表/自动建表等,如果是自动建表需要手动配置主键。 点击“列映射”可以配置源表到目标表的列对应关系、名称、主键、类型等。 详情请见[使用指南](wasp-user-guide.md)中的“创建任务-列映射”章节。 至此,一个从 HDFS 抽取,按照 CSV/JSON 格式解析后,upsert 导入到 OushuDB 的数据集成任务就创建完成了。 ## 目录监听同步任务创建指南 本章节介绍如何创建监听文件目录抽取文件,按照 CSV/JSON 格式解析后 upsert 到 OushuDB 的任务,下面用 HDFS->OushuDB 举例 该模式不仅会抽取当前选中的文件/目录下的内容,还会定时扫描目录捕获新增文件并抽取。 ### 前提 1. 已添加 HDFS 数据源,保证数据源的连通性,注意 HDFS 数据源中"工作目录"的指定,这将指定数据集成时可选的文件根目录。 2. 已添加 OushuDB 数据源,保证数据源的连通性,访问用户有必要权限。 ### 任务创建流程 1. 创建元数据 在**数据源**模块编辑已添加的 HDFS 数据源,添加元数据。元数据的创建和管理,详情请见[数据源使用手册](../../../../dataend/zh/dataend-user-guide.md)中的“元数据管理”章节 ![image](images/file-foldermonitor/metadata.png) 2. 创建数据集成任务 选择数据源,接着选择模式,这里我们选择目录监听同步模式,该模式将会监听选中的目录,如果有文件新增,则会抽取该文件。在此模式下依然 可以选择文件或者目录,只是会监听目录捕获新增文件,而不能抓到文件中新增的内容。测试连接通过后进入到下一步选择处理对象 ![image](images/integration/task-mode-foldermonitor.png) 高级配置: * 更新频率,影响写入目标的频率和记录 checkpoint 的频率,适当增加该值可以增加吞吐量。 * 并行度,即并发读取/写入。 * 忽略首行数据,勾选后读取每个文件时都会跳过首行。 * 编码格式,当前仅开放 UTF-8 * 目录监听间隔,配置定时监听目录的时间间隔,单位:秒。 ![image](images/file-foldermonitor/config.png) 3. 选择处理对象 这里每个文件/目录对应一个元数据,勾选文件/目录后需要选择不同类型的元数据,这里支持 JSON 和 CSV 类型的元数据。 ![img.png](images/file-foldermonitor/select_objects.png) 值得注意的是:对于 CSV 元数据,需要填写分隔符 4. 选择数据目标 选择一个 OushuDB 数据目标。 ![image](images/selected_oushu_sink.png) 5. 配置列映射 在这里,您可以指定某个元数据对应的表要导入到某张目标表,是否是已有表/自动建表等,如果是自动建表需要手动配置主键。 点击“列映射”可以配置源表到目标表的列对应关系、名称、主键、类型等。 详情请见[使用指南](wasp-user-guide.md)中的“创建任务-列映射”章节。 至此,一个从 HDFS 抽取,并且监听目录新增文件,按照 CSV/JSON 格式解析后,upsert 导入到 OushuDB 的数据集成任务就创建完成了。