01
02
===
图 1 Lambda 架构
爱奇艺采用数据集成、数据仓库、数据开发的典型数据架构,将埋点、后端日志、数据库、指标等来源的数据应用到推荐、搜索、营销、报表等场景中。整个数据架构的建设分为两条链路,如图 1 所示:
**离线通路:**使用 HDFS、Hive、Spark 等工具,按 ODS、DWD、DWS 分层的数仓架构构建了离线数仓。通过 Venus 日志采集、MySQLIO CDC 同步、Hubble 监控平台等工具将不同数据源集成到离线数仓,同时提供魔镜离线分析、Babel 离线计算两个开发平台支持数据应用开发。数仓中的 Hive 表按小时分区,从数据产生到业务场景应用通常有小时级的延时。
**实时通路:**数据价值随生命周期增长而迅速衰减,离线通路的延迟导致业务无法快速利用数据的价值,因此又构建了实时通路。参考离线数仓,使用 Kafka 和 Flink 搭建实时流式数仓。Venus、MySQLIO、Hubble 等数据集成工具支持不同数据源集成到实时流式数仓。在数据开发层面,新增 RCP 实时计算平台、RAP 实时分析平台,用于支持实时数据应用开发。实时数据通路可以达到秒级延时。
上述 Lambda 数据架构虽然可以满足业务场景需求,但带来了如下问题:
**系统复杂度高:**服务提供方需要提供多种组件,业务需要学习多个组件,并开发维护离线、实时两套程序。
**数据一致性难保障:**实时、离线两套代码,要做到数据处理逻辑统一、数据一致,存在较大挑战。
**资源成本高:**数据链路多个环节存在两套重复的存储和计算,成本较高。
**数据延时大:**实时数据虽然延时低,但是存储时间短,数据查询分析需求仍然依靠离线数据,具有 T+1 的延时。
为了解决上述这些问题,我们基于数据湖技术对数据生产链路进行了流批一体化改造,并构建了实时湖仓。
03
数据湖简介
在介绍实时湖仓之前,先简单介绍下数据湖(更多背景介绍可以参看之前发表的文章《爱奇艺数据湖实战》)。爱奇艺基于 Iceberg 构建了数据湖,使用 HDFS 作为底层存储、Alluxio 作为缓存层,Iceberg 作为表格式工具,表的元数据通过 Hive metastore 管理,如图 2 所示。业务在不同的开发平台上,使用 Spark、Flink、Trino 等计算引擎查询、处理湖数据。
图 2 爱奇艺数据湖
Iceberg 是读写数据湖的入口,也是实时湖仓架构最重要的组件,其架构如下:
图 3 Iceberg 架构
Iceberg 分为数据层、元数据层和 Catalog 层。Catalog 层是表的统一入口,数据层保存实际数据,在我们的数据湖架构中,Catalog 层为 Hive metastore,数据层数据文件存在 HDFS 上。元数据层主要存放表的 schema、分区、及每列的最大值、最小值等统计信息。Iceberg 通过 snapshot 进行读写隔离,每个写入周期对应一个 snapshot,新数据先写入到新的数据文件,再生成元数据文件,通过 commit 操作生成一个 snapshot,原子性的更新到 Catalog,commit 成功的 snapshot 才能读取。
Iceberg 具备如下特性:
**可控的数据延时:**snapshot 的生成速度决定了数据生成的延时,snapshot 生成越快,数据延时越低,最快可以到 1 分钟以内。
**流批一体:**Iceberg 既支持 Spark、Flink 引擎以批模式读写数据,也支持以流模式读写数据,能做到存储层面的流批一体。
**资源成本低:**底层存储在 HDFS 上,支持列存储及数据压缩,理论上与 Hive 的存储成本相当,远小于 Kafka 的存储成本。Iceberg 在元数据层加入了统计信息,可用于查询优化,查询成本比 Hive 低。
**支持数据变更:**Iceberg V2 格式的表支持行级数据变更,可以更好的集成数据库数据。
基于如上特性,Iceberg 可以完全替代 Hive,并在分钟级延时的场景中替代 Kafka。
实时湖仓一体化架构
图 4 实时湖仓一体架构
基于 Iceberg 和 Flink,我们构造了如图 4 所示的实时湖仓一体架构。ODS、DWD、DWS 每个层面的数据都存储在 Iceberg,每层的数据通过 Flink 流式消费上游数据实时生产。数仓的每张表既可以被 RAP 实时分析平台、RCP 实时计算平台流式实时处理,也可以被魔镜离线分析平台、Babel 离线计算平台以批模式处理。在数据集成方面,Venus 日志采集、MySQLIO CDC 同步、Hubble 监控支持将日志或指标类数据写入 Iceberg。依赖 Iceberg 支持行级数据变更的特性,借助 Flink CDC 可以实现数据库的全量数据实时同步。实时湖仓的数据架构具备如下特点:
**湖仓一体:**既具备数据湖的灵活性,也具备数仓的结构化数据管理能力,可以统一存储结构化、半结构化、非结构化的数据,形成统一的数据底座,消除数据孤岛。
**流批一体:**将 Lambda 架构下的实时、离线两条数仓生产通路合并成一条,数仓的每个层次生产一份数据,既能用于按小时、天读数据的批计算,也能用于流式消费的实时计算。一套数仓避免了开发两套代码、计算逻辑对齐的问题,能大幅提高数据开发效率
**资源成本低:**相比于实时、离线两个数仓生产通路,流批一体的实时湖仓的数据生产、存储成本更低。相比Hive,Iceberg 元数据层更丰富的统计信息,也有助于查询性能提升,降低查询成本。
**延时低:**Iceberg 支持分钟级的数据可见性,通过实时写入 Iceberg,实时湖仓可以达到分钟级的延时。数据的使用方不需要在应用侧采用流、批数据融合的方式支持最新的全量数据,简化业务侧处理逻辑。
实时湖仓架构虽然有较大收益,但结合爱奇艺的数据体系,在生产环境应用,需要应对如下挑战:
**单任务生产多表问题:**在爱奇艺的埋点、容器日志等数据源中,不同业务的日志混合在一起,需要按日志特征拆分到不同业务的 ODS 层表。单个任务消费一个数据源可能拆分出 500 多张表,最大的任务需要拆分到3000 多张表,总共拆分到上万张表。当前的 Flink Iceberg Sink 仅支持写入到一张表,而一个 Flink 任务也无法添加如此多的 Iceberg Sink。因此,如何解决单任务生产多表的问题是我们面临的首要难题。
**数据生产进度评估:**需要在实时生产数据的过程中评估数据生产进度,以便数据的使用方了解湖仓中数据的完整度,触发下游批任务。
**流式消费的积压监控:**对于下游的流式消费任务,需要像消费 Kafka 一样给出消费积压监控。
**数据完备性保障:**在 Lambda 架构下,在数据通路故障时,可以通过重新调度批计算修正数据,保证数据的最终完备性。实时湖仓架构也需要有保证数据完备性的机制。
接下来介绍下我们应对上述挑战的方案。
===
===
04
图 5 原生 Flink Iceberg Sink
原生的 Flink Iceberg Sink,包含一个 Writer 算子,一个 Committer 算子,如图 5 所示。Writer 算子接收的 DStream 中的数据全都属于一张表,每个 Writer 算子的 Task 将数据写入 Iceberg 数据文件后,在 Checkpoint 时,将文件信息封装在 WriterResult 发送给 Committer 算子。Committer 算子并行度为 1,只有一个 Task,收集到 Writer 算子全部 Task 发送的 WriterResult 后,提交文件信息给 Iceberg 表生成一个新的 Snapshot。原生的 Flink Iceberg Sink 仅支持写入一张表,如果写入多个表,需要添加多个 Sink 算子,如果太多,Flink Job 就会无法运行。因此,为了支持一个任务同时写大量表的场景,我们自研了支持多表写入的 Flink Iceberg Sink,如图 6 所示。
图 6 支持多表写入的 Flink Iceberg Sink
相比原生 Flink Iceberg Sink,主要改造点如下:
定义 MultiTableRow 类型。相比 Row,增加了所属的 Table 名称。
为避免每个 Writer 算子的 Task 写入过多表,出现小文件过多、内存使用过大及性能问题,在 Writer 算子之前加入 Partitioner 算子。通过 Partitioner 算子将相同表的数据路由到固定几个 Writer Task,一个 Writer Task 只处理部分表的写入。
Writer 算子基于 MultiTableRow 中的表名字加载表,将数据写入到对应表的文件。在 Checkpoint 时,首先按表名汇总各表新写入的文件构建 MultiTableWriteResult 对象,MultiTableWriteResult 对象相比 WriterResult 增加了表名信息。然后按表名 Shuffle 后发送给 Committer 算子。
Committer 算子的并行度不为 1,为默认并行度。在 Checkpoint 时,每个 Committer 算子的 Task 基于收到的MultiTableWriteResult 汇总各表的写入文件,提交到对应的表生成新的 Snapshot。
图 7 使用 MultiTable Flink Iceberg Sink 生产多表并自动负载均衡
在使用多表写入的 Flink Iceberg Sink 时,设计合适的 Partitioner 非常关键,如果实现不当,可能导致 Writer 算子的各 Task 负载不均衡。如图 7 所示,在我们的 Partitioner 实现中,基于实时统计的各表新写入数据的大小,及 Flink 任务分配的 Slot 数量动态生成 Shuffle 策略。大表的数据路由到多个 Writer Task,多个小表数据路由到一个 Writer Task,保证每个 Writer Task 处理的数据量基本均衡,达到最佳写入性能,也能降低小文件生成数量。使用多表写入的 Flink Iceberg Sink,我们的一个线上任务生产了 3000 多张表,多个任务生产了 500 多张表。
批任务通常基于消费表的分区级数据完整度触发计算,如一个计算表每小时数据的批任务,需要在每小时的数据完备后触发。因此,Iceberg 也要像 Hive 表一样提供分区完整度。我们通过改造 Flink Iceberg Sink 实现,针对原生的单表 Sink,我们的改造方案如图 8 所示,我们自研的多表 Sink 的改造方案类似。
图 8 Iceberg 表生产进度评估
Flink Iceberg Sink 负责数据的写入,内部包含两个算子:Writer 和Committer。Writer 算子的 Task 负责将数据写入Iceberg 的数据文件,在写入数据的同时,基于指定时间列的值,记录遇到的最大时间 max(ts)。在 Checkpoint 时,每个 Writer Task 把 max(ts) 及新写入的文件,一起传给 Committer,Committer 提交文件生成新 Snapshot 的同时,将接收的 max(ts) 中的最小值减去允许的延时时间作为 Snapshot 对应的 Watermark,记录到 Snapshot 的Summary 中。数据湖平台周期性的读取表的 Snapshot Summary 中 Wartermark,判断如果属于新的一小时,表示上一小时分区的数据已经完备,将完备信息提交给进度管理服务,进而触发离线计算平台上的对应批任务。
在流计算场景中,数据生产延时是一个重要的数据质量指标。流式消费 Kafka,Flink Kafka Souce 通过提交消费Offset,借助专门的服务(如 Kafka Manager、Burrow)计算生产的 offset 与消费提交的 offset 的差值评估消费积压,进而评判数据生产是否有延时。流式消费 Iceberg 也需要有类似的机制,社区版 Flink Iceberg Source 没有类似功能,需要改造实现。
Flink Iceberg Source 基于 Flink FLIP-27 中的 Source Interface 实现,如图 9 所示。该接口中有两个重要的组件:Split Enumerator 和 Reader,功能如下。
Split Enumerator:发现 Iceberg 表的新 Snapshot,读取 Snapshot 中的文件,并切分成 Split 块。
Reader:接收 Split Enumerator 分配的 Split 块,读取 Split 块中的数据,发送到下游算子。
图 9 Flink Source Interface 核心组件
Iceberg 的 Snapshot 是数据写入及消费获取新文件的最小粒度,我们的方案是在任务内,计算当前消费 Snapshot 的延时。具体实现是,在 Split Enumerator 组件内,用当前时间减去新获得的 Snapshot 的生成时间作为延时指标集成到 Flink 的 Metrics 体系。Flink 的 Metrics 已经对接到基于 Prometheus+Grafana 的监控报警平台,可以方便地配置延时监控及报警。
实时湖仓的数据通过 Flink 流任务实时生产,在组件服务故障,或者代码 bug 导致数据丢失时,如何修正数据,保证数据的完备性?流任务虽然可以回溯重新计算数据,但会造成新数据处理延时,也很难确定回溯的准确位置导致数据仍可能有重复或丢失,同时还可能遇到上游数据已过期删除的问题。流任务都在 RCP 实时计算平台上开发,我们的方案是拓展 RCP 实时计算平台,支持一个 Flink 应用既可以以流模式运行,也可以以批模式运行,使用批任务修正流任务的结果,如图 10 所示。
图 10 数据修正
正常情况下,应用只以流模式运行持续生产数据。当数据有问题时,保持流模式作业运行的情况下,新调度一个批模式运行的任务,按表分区修正下游表的数据。RCP 也支持定时调度批任务,可以用每天的批计算结果覆盖流计算结果。批任务与流任务的运行时参数和配置会有差别,RCP 支持对 SQL 开发应用配置流、批运行模式下使用不同的数据源及运行配置。同时,在启动任务时,通过 JVM 参数传递当前的运行模式,以便任务基于不同的模式调整执行逻辑。基于 RCP 流批一体功能,可以在不影响流任务的情况下实现批任务使用同构、异构数据源修正流任务生产的结果表。
===
05
实时湖仓一体化架构推出后,已在如下场景落地:
**Venus:**Venus 是爱奇艺的日志平台,负责后端日志采集、存储、分析。原来日志统一存储在 Elasticsearch,与大数据体系割裂,现在大部分日志都统一入湖,对应实时湖仓的 ODS 层表。容器类日志原来先采集到统一的Kafka topic,然后按业务分割到业务 topic,再写入 Elasticsearch。迁移到实时湖仓架构后,从统一 Kafka topic 按业务拆分后直接写入 Iceberg,省掉了业务 topic 环节。Venus 总共写入超过 1 万张 Iceberg 表,每日写入 PB 级日志,延时 5 分钟,每年节省上千万成本。Venus 入湖改造的详细介绍见之前发布的文章《爱奇艺数据湖实战 - 基于数据湖的日志平台架构演进》
**Pingback:**Pingback 是爱奇艺埋点数据的统称,大部分埋点数据需要经过数仓架构 ODS、DWD、DWS 分层处理。我们按照数仓层次从前到后、业务重要程度从低到高的顺序使用实时湖仓 Iceberg 表替代 Hive 表。同时,也推动了部分能接受分钟级延时的业务从 Kafka 数据迁移到 Iceberg 表。目前已上线 1300 多张 Iceberg 表,每日新增数百 TB 数据,每一层的 Iceberg 表最低延时 1 分钟。
**数据库数据:**Flink CDC 支持全量、增量数据的透明切换并实时写入 Iceberg,数据库数据入湖统一切换成了 Flink CDC。目前已同步广告、会员、用户增长等业务的近百张表。
06
实时湖仓一体化架构已经在实践中得到了充分验证,在节省成本、提高数据时效性、降低数据复杂性等方面都取得了较大收益,之后规划如下:
继续上线更多业务场景,完全替代 Hive,并替代接受分钟级延时的 Kafka 数据。
将 Flink CDC 从 2.x 升级到 3.x,支持 Schema 自动变更,降低数据库数据入湖的维护代价。
Iceberg 不支持更新部分列、基于变更数据继续构建 Pipeline 等功能,限制了一些应用场景,正在引入新兴的数据湖技术 Paimon 作为这类场景的替补。
也许你还想看