ETL的系统核心特征

ETL 系统核心特征

数据重跑及其优化

重跑的场景

场景 导致原因 影响
kafka consumer poll消息失败 1. 网络问题;2. kafka broker 磁盘坏道,拉取消息一直失败或其他 kafka 原因 导致一个或多个topic&partition的消息未消费完整
硬件故障,机器重启 磁盘满、硬件故障等 机器宕机、重启、yarn内部机制会重新在另外一个nodeManager节点重新分配宕机节点的mapper task,可能会造成数据重复
task killed 1. yarn 主动 killed task : ①, mapper 初始化某个逻辑卡住,导致 mappper 超时;如:加载调试设备信息时 mysql 阻塞、加载 lP 库一直Full GC 或者 mapper 获取不到执行资源一直等待等; ② yarn . nodemanager .local-dirs and yarn. nodemanager . log-dirs 配置的磁盘使用率超过 90 % ResourceManager 标记那台 NodeManger 为bad ,把机器上跑的 container 都 kill了,导致 ETL 一个 mapper 被 kill ,然后起一个新的 mapper task ,但是因为 kill mapper task attmept 未正常关闭,导致文件租约 Iease 未安全释放,后面起 Mapper 一直写失败; 2.S手工 killed task ①Hadoop 集群节点负载太高,一些 DataNode 响应慢或者进程值死无响应,导致HDFS 写入一直超时失败等;人工 killed 这个 mapper 分配执行的所有 topic partition 消息未消费或者消息不完整(影响多个 topic ) ;mapper 执行慢,数据一直写不进去(创建、读写 block 超时) :

当用户调用 application kill 时会经历三个阶段:

  1. kill SIGTERM(-15) pid;
  2. Sleep for 250ms;
  3. 3.kill SIGKILL(-9) pid 。

重跑的方式

重跑方式 优点 缺点
1.备份目标目录的数据
2.将 kafka topic &partition 的 offset 重新消费一遍清洗落地;
简单(由于每个 et l执行消费的offset 都是自己记录维护的.找到当前小时或者当天对应的最小以及最大offset 重新消费即可) 依赖于 kafka 消息过期时间,过期后无法重新消费;
因为大部分时间分区是是按天的.目前重跑一天的数需要 4 +小时.故障后恢复非常慢;
记录未消费的offset信息,补跑未消费的offset即可 速度快(只处理未消费完整的topic&partitioner offset) 1 )依赖于 kafka 消息过期时间,过期后无法重新消费
2 ) MapTask 被 yarn killed 或者人为 killed ,会导致写数据的 Writer 不正常关闭流,从而引发 hdfs 文件租约 Iease 未释放、 block recovery 等问题,导致后面数据 append 失败;
3 ) MapTask attempt 被 yarn killed 后会自动起另外一个 attemPt (目前来.无法避免,这是 yarn 的机制,需要改源码),可能导致数据重复写入:
4 ) killed 或者机器宕机是无法记录到。offset 消息到哪里,只能每段小段时间记录当前 offset .当 MapTask 被 killed . offset 可能不是最新的,导致补跑时重复写了部分数据;
1 ) ETL 小时任务,数据落地到临时目录.文件按小时划分.支持覆盖写(保存一段时间) ;
2 )当数据落地成功,检测每个 toPic partition 的消费情况进行处理;
3 )如果 topic partitlon 的消息消费不完整时,告警通知,手工重跑相关 toPic partition 的小时任务;
4 )如果消费完整,将小时数据文件合并到仓库的目标文件(每个 topic 、 partition 单独一个文件) ;
5 )如果合并过程失败.告警通知,手工触发重新将已经落地的小文件合并成一个目标仓库文件;
支持小时重跑、覆盖写
恢复速度较快
HDFS block 丢失、可以从小文件新恢复
1 )流程比较复杂,多出一个数据合并 Merge 的步骤;
2 )每小时的 ETL 会产生非常多的小文件间定期删除;需要保留一段时

如下图所示是第三种重跑方式的整体流程,ETL 是按照小时调度的,首先将数据按小时写到临时目录中,如果消费失败会告警通知并重跑消费当前小时。如果落地成功则合并到仓库目录的目标文件,合并失败同样会告警通知并人工重跑,将小文件合并成目标文件。

graph TB 开始 --> 小时ETL--write -->1{落地临时目录/etl/work/按小时区分}--落地成功-->2{到仓库目录的目标文件每个partition一个文件}--合并成功-->结束 1-->失败消费不完整-->告警通知-->重跑消费当前小时即可-->小时ETL 2-->合并失败-->告警通知-->根据每小时产生的小文件重新合并一个目标文件--合并成功-->结束

重跑的优化

流程 按天分区的topic
每小时05分调度的ETL正常跑完 1.包含之前的数据.如:昨天漏采集的日志.直接在对应日期分区目录下创建一个新的数据文件; 2.今天的日志,直接在今天的日期分区下创建一个新的数据文件; 3.将当前 ETL 产生的小文件直接 append 到对应日期分区以及相同 partition 下的文件;
一个小时内执行多次ETL 1.因为目前落地的临时文件都是按 partition +执行时间(yyyyMMddHH )生成的,如: all _ 1 _ 2018031511 . avro ; 2.如果一个小时执行多次 ETL ,每次都会消费新的 offset ( etl _ 0ffsets 会存多条记录),但是临时目录的数据文件的名称相同,这时候通过 append 方式追加数据; 3.如果小时重跑时.会找到当前小时内 topic--partjtion 的最小的 beginoffset 、最大的 endoffset 重新从 kafka 消费,这样保证了数据的完整性;
小时ETL重跑(前提未进行merge) 1 )小时三跑,直接找到对应小时 topic--partition 的最小的 beginoffset 、最大的 endoffset 重新从 kafka 消费: 2 )之前已经创建的临时目录的数据文件会以 create overwrite 形式重新写入数据; 3 )最后在将文件 append 到对应日期分区下相同 partition 的文件;
merge失败(小时ETL产生的临时数据文件已经是完整、可靠的) 重新根据当天每小时 ETL 产生的临时数据文件合并一个目标仓库文件即可:
ETL消费消息不完整 1.消费不完整的 toPic 、 partition 信息会记录.告警短信通知: 2 )人工重跑对应 topic--partition 小时 ETL 即可(支持细化到 partition 的小时数据重跑,因为是 overwrite )
MapTask attempt killed by yarn 1 ) MapTask attempt 被 yarn 的机制 killed 后.另启的 attempt 会 overwrite 之前的文件(排除一个小时执行多次 ETL 的情况.这时候需要人工重跑).比较好的兼容 hadoop 集群的故障;
NodeManager carsh by hardware fault 由于机器硬件故障导致运行 ETL 的 MapTask 的 NodeManager 直接 crash :1 )数据正在写入临时目录,手工重跑对应 topic 一 partition 小时的 ETL 即可: 2 )数据正在 merge ,手工重新 merge 对应 topic date 的数据;
人为killed整个ETL job 人工处理未执行的topic :1.数据正在写入临时目录,手工重跑对于topic-partition小时的ETL即可 2.数据正在 merge , 手工重新merge对应 topic--date 的数据:

自动水平扩展

现在离线 Kafka-ETL 是每小时 05 分调度,每次调度的 ETL 都会获取每个 topic&partition 当前最新、最大的 latest offset,同时与上个小时消费的截止 offset 组合成本地要消费的 kafkaEvent。由于每次获取的 latest offset 是不可控的,有些情况下某些 topic&partition 的消息 offset 增长非常快,同时 kafka topic 的 partition 数量来不及调整,导致 ETL 消费处理延迟,影响下游的业务处理流程:

  • 由于扩容、故障等原因需要补采集漏采集的数据或者历史数据,这种情况下 topic&&partition 的消息 offset 增长非常快,仅仅依赖 kafka topic partiton 扩容是不靠谱的,补采集完后面还得删除扩容的 partition;
  • 周末高峰、节假日、6.18、双十一等用户流量高峰期,收集的用户行为数据会比平时翻几倍、几十倍,但是同样遇到来不及扩容 topic partition 个数、扩容后需要缩容的情况;

Kafka ETL 是否能自动水平扩展不强依赖于 kafka topic partition 的个数。如果某个 topic kafkaEvent 需要处理的数据过大,评估在合理时间范围单个 mapper 能消费的最大的条数,再将 kafkaEvent 水平拆分成多个子 kafkaEvent,并分配到各个 mapper 中处理,这样就避免单个 mapper 单次需要处理过大 kafkaEvent 而导致延迟,提高水平扩展能力。拆分的逻辑如下图所示:

graph TB 开始 --> 根据ETLSchema选择SplitStrategy-->IsRemaining("SplitStrategy是否配置") IsRemaining -->配置--> 生成对应的SplitStrategy-->根据SplitStrategy拆分KafkaEvent-->oneChosse("消息是否超过阈值") oneChosse -->no["否"]-->KafkaEvent不拆分-->结束 oneChosse -->yes["是"]-->twoChosse("消息是否超过阈值2倍")-->yes2["是"]-->根据倍数进行合理拆分-->结束 twoChosse-->no2["否"]-->KafkaEvent拆分为两个-->结束 IsRemaining -->未配置-->KafkaEvent不拆分-->结束

后续我们将针对以下两点进行自动水平扩展的优化:

  • 如果单个 mapper 处理的总消息数据比较大,将考虑扩容 mapper 个数并生成分片 split 进行负载均衡。
  • 每种格式的消息处理速度不一样,分配时可能出现一些 mapper 负担比较重,将给每个格式配置一定的权重,根据消息条数、权重等结合一起分配 kafkaEvent。

参考链接

https://blog.csdn.net/javastart/article/details/113838240

美图离线ETL实践 - 掘金 (juejin.cn)

热门相关:最强狂兵   网游之逆天飞扬   寂静王冠   情生意动   豪门闪婚:帝少的神秘冷妻