使用指南
本页目录
使用指南#
本章节介绍数据集成、加工工具 Wasp 的使用
前提#
Wasp 集群已经部署,详见:Wasp安装。
运行数据集成任务需要依赖 Flink 作为计算引擎,因此需要部署 Flink 集群,详见:Flink安装。
控制面板#
5700版本后wasp新增了控制面板模块,目前仅用于管理数据集成任务
整体分为下面6个模块
作业状态分布#
这里可以看到所有数据集成任务和对应的状态总揽。
Flink集群状态#
这里可以查看所有 flink 集群的状态
集群中存活的 taskManager 节点数和可用的 slot 数
所有 flink 节点机器的状态以及 cpu 和 内存使用情况
最近一段时间该 flink 集群的同步速率,时间区间为【最近两小时】-【最近两周】
正在运行的任务#
展示了当前正在运行态的任务,包括【启动中】/【提交中】/【运行中】/【停止中】状态
点击查看更多能展示更多的信息
已结束的的任务#
展示了当前正在结束态的任务,包括【成功】/【已停止】/【失败】状态。
点击查看更多能展示更多的信息,高级搜索可以进行快速检索。这里展示的是任务历史粒度的,即一个任务运行了多次,这里会展示多次的运行历史。
数据源同步链路#
这部分统计了数据源 source 端到数据目标 sink 端的整条链路信息。包括在这条链路中创建的数据集成任务,总共同步表数、列数,实时的速度等。
点击某条链路的详情,可以看到链路里每个任务的详细信息。
定时日历#
对于配置了定时设置的任务,可以通过这个日历模块观察某天是否会有定时任务运行。
数据集成#
进入 Skylab 云数据平台,在界面上方产品的开发套件,点击“数据工厂 Wasp” 即可进入“数据集成”菜单。
我们通过任务的形式去定义一个数据集成的流程。这里展示和管理了所有数据集成任务。
新建任务#
在创建任务之前,需要先在数据源服务添加将要作为 Wasp 数据集成的源/目标。比如要从 MySQL 抽取数据到 OushuDB,那么需要事先添加这两个数据源。 详见数据源。
引导式的任务创建需要经过四个步骤:1.选择数据源。2.选择处理对象。3.选择数据目标。4.配置列映射。下面详细介绍每个步骤。
选择数据源#
决定从哪个数据源抽取数据,这里只能选择事先在数据源服务添加好的数据源。
任务模式
不同的数据源类型支持的任务模式不同当源为数据库类型,支持 CDC 的实时同步和 JDBC 直连的批量同步。
CDC 模式的实时同步我们是基于 flink-cdc-connector 直接去读取数据库日志的,如果选择这种模式,需要实现在数据库配置好 CDC 相关配置。
当源为 Kafka,支持 upsert 同步和 CDC 的同步方式。 upsert 模式是消费 Kafka,对目标进行 upsert 操作,CDC 模式则会解析从 Kafka 消费出来的 CDC 格式数据,从而反应到目标。
高级配置
不同的数据源、任务模式支持的高级配置不同。常见的高级配置如下
更新频率:两次 checkpoint 之间的间隔,这个值越低,数据的实时性越高,但是频繁的 checkpoint 会让写数据的性能也降低。
并行度:该配置决定了启动 Flink 任务时配置的并行度是多少,通常情况下并行度越高,性能越高效。
超时时间:做完一次 checkpoint 的最大超时时间,如果是【DB类型批量同步】无主键的源表时,需要根据数据量和抽取速度酌情调大此参数。
出错重启策略:该配置决定flink任务失败后的行为,具体配置方式见下文【失败重启】章节。
任务出错报警:根据【平台基础】-【消息中心】的告警配置,进行任务告警通知。
Kafka任务高级配置
跳过脏数据:解析 CSV / JSON 数据出错时,是否忽略掉该脏数据继续运行任务,对应脏数据会在flink日志中打印。
忽略大小写:用于处理解析 JSON 数据时是否忽略键值的大小写,这可能会导致忽略大小写后相同的键会被覆盖。
忽略过期数据:仅用于处理 KafkaCDC 任务中,处理 topic-partition 里有数据乱序的情况,打开该选项后需要指定保留时间, 且在下一步选择对象时,乱序的表需要指定数据比较列。相同主键的数据会根据数据比较列进行对比,迟到的数据会被丢弃, 最新的数据会写入库且保留比较列值一定时间,用于和下次同主键数据比较。如果超出了最大保留时间,迟到的数据会被认为最新数据,所以需要考量数据最多迟到多久。
HDFS 任务高级配置
忽略首行数据:用于处理文件中有列信息这种head数据,避免解析失败。
编码格式:指定文件的编码,目前只允许默认的 UTF-8 编码格式。
在数据源配置完毕后,需要点击“测试连接”以验证数据源的连通性。
选择处理对象#
所谓处理对象,即选择数据源之后需要选择一些表 /topic 作为要抽取的对象。如下图是一个 Oracle 作为源、批量模式的对象选择页面。
在这里您可以选择想要抽取的表,每张表可以配置按照不同方式同步。对于不同的导入模式,界面右侧有详细的介绍。 不同数据源、任务模式在选择对象环节也可能不同,这里不一一赘述,详见此章节末的创建任务指南。
选择数据目标#
同选择数据源,在这里选择您想将数据导入的目标。当前数据目标只支持 OushuDB。
高级配置栏目中”预执行 Sql”配置项支持用户自定义 Sql 语句,其将在任务运行前被执行。
对于KafkaCDC任务而言还有其他sink高级配置:
错误行处理策略:决定sink写库失败时的处理逻辑,包括“出错立即停止”和“跳过错误仅告警”
写入模式:分为更新插入(upsert)和变化日志(changeLog)模式。更新插入会实时反映数据在目标表执行dml语句; 变化日志会只会执行 insert 语句,将数据的变化操作(insert/update/delete)都记录在目标表中, 其中 update 数据会插入两条,分别是 update_before 数据和 update_after 数据
配置列映射#
根据第二步选择的对象,这里会进行自动映射。
第一步:先匹配 schema
对于 Oracle 这类源存在 schema 信息的数据库,会先根据源表 schema 匹配,如果 schema 没匹配上映射状态为:未映射,需要手动调整映射。 而 MySQL,Kafka,HDFS 这类源没有 schema 信息的源,会默认映射 public schema对于没有映射成功的表,可以批量选择后点击右上角设置目标模式,指定 schema 重新映射
第二步:匹配表名
如果目标表存在,则映射为已有表,反之若目标表不存在,则映射为自动建表,自动建表会在任务启动时自动创建。也可以调整映射类型,指定去映射目标 schema 的某张已有表,重新进行映射
第三步:匹配列
会按照列名进行匹配,如果源表的转换列没有全部匹配上,映射状态为警告,如果全部未匹配上,映射状态为失败。对于警告和失败状态的映射,需要用户手动修改,修改后映射状态为成功。点击设置列映射,可以看到源表到目标表的列对应关系、名称、主键、类型等。
在目标表右上角齿轮按钮,可以修改目标表属性
对于自动建表,可以修改目标列名,类型,主键。
也可以在列映射里切换到已有表,如果是已有表不能修改目标列。
注意这里的上一步和完成是表示任务的步骤,如果像回到表映射状态的列表,需要点击左上角的后退按钮。
所有的映射对象状态为映射成功后,才能启动任务,否则任务会启动失败
任务创建指南#
上章统一介绍了任务的创建流程,由于不同数据源不同任务模式的配置可能不同,可以参考如下指南去创建任务。
数据源 |
数据目标 |
任务模式 |
---|---|---|
MySQL,Oracle,DB2 等关系型数据库 |
OushuDB |
|
Kafka |
OushuDB |
|
HDFS |
OushuDB |
任务管理#
Wasp 提供文件夹的形式去组织管理任务,下面介绍文件夹和任务两种组织元素。
文件夹#
新建文件夹
您可以选择在任意一级文件夹下创建新文件夹。修改文件夹
删除文件夹
删除文件夹会递归删除该文件夹下的所有子文件夹和任务。移动文件夹/任务 您可以在左侧任务树通过鼠标拖动的方式移动文件夹/任务。
任务#
任务列表
此处展示了当前文件夹下所有的子文件夹和任务,当您在左侧树点击某个文件夹或在任务列表点击某个文件夹后,则会展示对于文件夹下的内容 在这里,您可以对任务进行新建、删除、修改、启动、停止等操作。任务重置
任务重置主要是针对增量任务的增量位置和任务记录的checkpoint,当点击重置后,配置为全量(仅初次)的表可以再次执行,配置为增量的 表将会清除其增量位置,下次执行时又是从头开始;另外还会清空该任务的 checkpoint,这显著影响的则是上次运行失败或者是手动停止的任务, 清除 checkpoint 后下次运行将不会从上次的断点继续。定时调度
定时调度支持时间间隔和 crontab 表达式两种方式。可以根据需求灵活选择 1.时间间隔:2.cron 表达式:
Kafka 消费策略配置#
启动 Kafka 源任务时,可以指定不同的起始偏移量策略和偏移量重置策略,Kafka 每个 topic 会包含一个或者多个 partition, 我们下文称之为 topic-partition, 每个 partition 中的数据都会伴随着有偏移量和时间戳(在0.10.0版本之后),其中偏移量是用于标记数据在 partition 中的位置的。越新的数据偏移量越大。
Kafka source connector 会根据 Flink checkpoint 间隔,将每个 topic-partition 对应消费到的偏移量提交到 checkpoint 和 Kafka broker,这两者偏移量是一致的。
最早
每个 topic-partition 从头开始消费。
最新
每个 topic-partition 从最后消费,等待最新的数据。
已提交
默认是这种策略,起始偏移量从上次提交位置继续,偏移量重置策略默认为”最早”,指的是当没有找到之前的偏移量记录时,该 topic-partition 从头开始消费。
时间戳
读取此时间戳之后的数据。将时间戳大于/等于输入时间戳的第一条数据的偏移量作为起始偏移量;如果在源 Kafka 找不到该时间戳及之后的数据, 会从最新消费,等待满足该条件的数据。
时间戳的输入方式有两种,可以直接输入 long 类型的时间戳,也可以通过日历组件输入想要的时间,系统会自动转化其为时间戳。
自定义
自定义每个 topic-partition 的起始偏移量位置。对于指定偏移量的 topic-partition, 从指定的偏移量消费;对于未指定偏移量的 topic-partition, 则从已经提交的偏移量记录继续向后消费;若没有偏移量记录,则根据”偏移量重置策略”决定从最早消费,从最新消费或者是直接报错。
每次启动都可以配置消费策略,只在本次运行生效。
任务详情#
点击任务名称链接,即可跳转到任务详情。在这里您可以看到该任务的整体运行状况,也可以对任务做一些编辑操作。
任务监控
任务详情界面顶部展示了任务的进度条,包括自动建表的进度,全量导入的进度和增量导入的进度,这里不同的数据源/任务模式也会有不同的展示:对于数据库类型的数据源,一个处理对象(表)一定会经过自动建表、全量、增量三个阶段。这三个进度条会全部展示,若配置的是实时模式(CDC 模式), 那么增量的进度条会是无限的。
如果是 Kafka 这种流式的数据源,进度条则没有全量的概念,会展示自动建表进度和一个无限的同步进度。
监控会记录最多24小时内条数和速度,以柱状图的形式展示。右上角可以切换
处理对象
类似创建任务时的处理对象选择,在这里您可以看到每个处理对象的处理进度,也可以新增新的处理对象和删除不用的处理对象。 也可以设置映射。重建映射
如果在源表有列的改动或者元数据处有列信息的改动,那么 Wasp 需要重新识别源对象的列信息,重新创建映射。”重建映射”可以拉取源最新的列信息,再次自动映射, 此处是按照默认规则重新映射,不会保留之前修改过的映射信息,因此在重建映射后可能需要按需调整。同步历史
这里记录了该任务的每次运行。
错误恢复#
Wasp 基于 Flink 的 checkpoint 机制,实现了部分数据源的断点续传功能,每次任务运行都会记录 checkpoint 到 HDFS 上, 当任务失败或者手动停止后,再次启动会基于上一次 checkpoint 记录的位置继续运行,避免了任务出错后需要重新导数的问题。
失败重启#
Wasp 支持在任务上配置失败重启策略,预防网络超时或是其他需要重试的错误带来的影响,支持如下几种策略。
无
默认配置,任务失败不会重启任务。固定延时重启
指定失败重启次数,重启之间的延时秒数;默认重启次数是 int 最大值,延时秒数为3秒。适配集群配置的策略
失败重启策略退化到集群级别,也就是 Wasp 提交的 Flink job 不会携带出错重启相关配置,此时会根据 Flink 集群 配置去决定使用何种策略,默认是固定延时策略,重启次数为 int 最大值,延时秒数为3秒。
源表无主键CDC#
Wasp CDC 实时同步模式,推荐源表具有主键或者唯一键约束,可以实现高效实时的数据同步,写入目标表时会根据数据中的唯一字段进行 upsert 操作。 同时在 5.2.0.0 版本后支持 Oracle 和 MySQL 源表无唯一键的数据抽取,用户可以在列映射时,手动指定数据具有唯一性的目标列做主键来提升写入性能。
数据加工#
进入 Skylab 云数据平台,在界面上方产品的开发套件,点击“数据工厂 Wasp” 即可进入“数据集成”菜单。
这里展示了所有的数据加工任务
新建任务#
在创建任务之前,需要先在数据源服务添加将要作为 Wasp 数据加工的源/目标。比如要从 kafka 抽取数据到 OushuDB,那么需要事先添加这两个数据源。 详见数据源。
数据加工目前只支持kafka作为源,OushuDB作为目标
在任务列表中,单击左上角的创建任务按钮即可创建一个数据加工任务
编辑任务#
刚创建好的任务是空白的,分为三个区域
左侧的算子和数据源区域,在此处将要处理的数据源以及进行数据加工的算子拖入画布中,用于后续定义数据加工流程
中间的画布区域,在此处基于拖进来的数据源和算子来定义加工流程图
配置区域,在此处修改任务、算子的配置参数和基础信息
下面以从kafka数据源提取数据,经过衍生算子,最终写入oushudb中为场景,举例说明如何创建一个任务
第一步:在左侧源中选择kafka数据源,选择一个topic拖入画布中,此topic将作为数据加工任务处理数据的来源。 点击kafka算子,在右侧配置offset策略和消费参数,详见数据加工算子指南
offset策略只有首次运行时会生效,再次运行时会从上一次消费的数据之后继续消费,以防止出现数据的遗漏或者重复消费
第二步:在左侧算子中找到解析算子,拖动它到画布中,用鼠标从kafka算子的输出端拖出一条线,连接解析算子的输入端。 点击解析算子,在右侧的配置区配置数据格式和元数据。详见数据加工算子指南
第三步:在左侧算子中找到衍生算子,拖动它到画布中,将解析节点的输出端和衍生算子的输入端连接起来。 点击衍生算子,在右侧的配置区配置衍生规则与要保留的原始列。衍生规则可以配置多个,每个衍生规则可以生成一个新的列。详见数据加工算子指南
第四步:在左侧的算子中找到新建目标表,拖动它到画布中,连接衍生节点的输出端和新建目标表的输入端。 点击新建目标表,配置新建表的所属位置,任务运行时,会在所属位置出自动创建一个表。详见数据加工算子指南
配置任务#
鼠标点击画布空白处,可以在右侧配置区域编辑任务的基本信息与资源配置
资源配置中可以配置任务的并行度,默认是1
运行任务#
创建完成后,点击画布左上角的运行按钮运行任务,点击运行后画布进入运行态,不可编辑,可以点击画布左上角停止按钮停止任务运行。 画布右上角会显示当前任务运行状态,当任务运行完成或者停止后,可以点击返回编辑任务回到编辑态。当任务运行失败后可以通过点击 查看失败日志按钮查看失败原因。右上角的运行历史按钮可以查看运行历史