爱奇艺基于 Hive 构建了传统的离线数据仓库,支持了公司运营决策、用户增长、视频推荐、会员、广告等业务需求。近几年,随着业务对数据实时性的更高要求。我们引入了基于 Iceberg 的数据湖技术,大幅提升数据查询性能及整体流通效率。从性能和成本角度考虑,将现有的Hive表迁移到数据湖是必要的。然而多年来,大数据平台上已经积累了数百 PB 的 Hive 数据,如何将 Hive 迁移到数据湖,成为我们面临的一大挑战。本文介绍了爱奇艺从 Hive 平滑迁移到 Iceberg 数据湖的技术方案,帮助业务加速数据流程,提效增收。
01
Hive 是一个基于 Hadoop 的数据仓库和分析平台,提供了类似 SQL 的语言,支持复杂的数据处理和分析。
Iceberg 是一个开源的数据表格式,旨在提供可扩展、稳定和高效的表格存储,以支持分析性工作负载。Iceberg 提供了类似传统数据库的事务性保证和数据一致性,并支持复杂的数据操作,如更新、删除等。
表 1-1 分别列出了 Hive 和 Iceberg 在时效性、查询性能等方面的比较情况:
表 1-1 Hive 和 Iceberg 的对比
切换到 Iceberg 可以提高数据处理的效率和可靠性,为复杂的数据操作提供更好的支持,目前已接入广告、会员、Venus 日志、审核等十几个业务。关于爱奇艺 Iceberg 实践的更多细节,可阅读之前的系列文章(见文末引用)。
02
Iceberg 相比 Hive 有诸多优势,可是业务的数据已经运行在 Hive 环境中,业务不希望投入大量的人力修改存量的任务。我们调研了业界常用切换方法 [1],在数据湖平台上提供了自助式 Hive 平滑切换 Iceberg 的能力,本节将阐述具体实现方案。
在实际切换前,我们验证了 Spark 对 Hive 和 Iceberg 的兼容性。
Spark 对 Hive 和 Iceberg 表的查询、写入语法基本是相同的,对 Hive 表的查询 SQL 语句无需修改即可查询 Iceberg 表。
但 Iceberg 与 Hive 在 DDL 方面存在较大差异,主要表现在对表结构进行修改的处理方式上,详细信息如表 2-1 所述。实际 schema 与数据文件的 schema 需要一一对应,否则会影响数据的查询,因此对于 DDL 语句的处理应该更为谨慎,不建议将这类 DDL 语句与任务绑定在一起。
表 2-1 Hive 和 Iceberg 语法兼容性对比
业务复制现有的pipeline,实现Hive、Iceberg双写。待新旧通路对数一致后,切换到 Iceberg 通路,并下线原有的通路。该方案需要业务投入人力进行开发、对数,耗时耗力。
如果业务允许停写一段时间进行切换,则可以用如下一些方式:
CALL catalog_name.system.migrate('db.sample');
该程序不会修改原始数据,仅会扫描原表的数据然后构建 Iceberg 元信息,引用原始的文件。因而 migrate 程序执行速度非常快,但存量数据无法利用文件索引等特性加速查询。如希望存量数据也加速,可以使用 Spark 的rewrite_data_files 方法重写历史数据。
migrate 程序并不会将 Hive 表删除,而是将其重命名为 sample__BACKUP__,此处 __BACKUP__ 后缀是硬编码的,如果需要回滚可将新建的 Iceberg 表 Drop 掉,将 Hive 表 rename 回去。
CREATE TABLE db.sample_iceberg
(id bigint, ..., dt string)
USING Iceberg
PARTITIONED BY dt
LOCATION 'qbfs://....'
TBLPROPERTIES('write.target-file-size-bytes' = '512m', ...)
AS SELECT * FROM db.sample;
在写入完成后进行对数,符合要求后,通过重命名完成切换。
ALTER TABLE db.sample RENAME TO db.sample_backup;
ALTER TABLE db.sample_iceberg RENAME TO db.sample;
CTAS 相比于 migrate 优势是存量数据重新写入,因而可以优化分区、列排序、文件格式和小文件等。缺点是如果存量数据较多,重写耗时耗资源。
以上两个方案,具有如下特性:
优点:
方案简单,执行已有的 SQL 即可
可回滚,原 Hive 表还在
缺点:
写入/读取程序未验证:切换到 Iceberg 表后,可能出现写入或查询异常
要求切换过程停写,对一些业务是不能接受的
考虑到上述方案的缺点,我们设计了原地双写 + 透明切换的方案,实现平滑迁移,如图 2-1 所示:
建表**:**创建与 Hive 相同 schema 的 Iceberg 表,同步 Hive 表的 TTL、权限等元信息到 Iceberg 表。
历史数据迁移到 Iceberg**:**Hive 历史数据通过 add_file procedure 添加到 Iceberg 中,该操作会根据 Hive 数据构建出 Iceberg 的元数据,实际上 Iceberg 的元数据中指向的是 Hive 的数据文件,减少了数据冗余及历史数据同步时间。
增量数据双写**:**通过爱奇艺自研的 Pilot SQL 网关探测 Hive 表的写入任务,自动复制写入 SQL,并将输出替换成 Iceberg 表,实现双写。
**数据一致性****校验:**当历史数据同步完成且增量双写到一定次数之后,后台会自动发起对数,校验 Hive 和 Iceberg 中的数据是否一致。对于历史数据与增量数据会选取一部分数据进行 count 以及字段 CRC 数值校验。
切换**:**数据一致性校验完成后,进行 Hive 和 Iceberg 的切换,用户不需要修改任务,直接使用原来的表名进行访问即可。正常切换过程耗时在几分钟之内。
图 2-1 Hive 切换到 Iceberg 大致流程
图 2-2 展示了 Hive to Iceberg 相关操作界面,点击创建转化任务即可开始进行切换流程,当任务创建成功会在下方展示任务的状态以及运行阶段等信息。
图 2-2 Hive to Iceberg 相关操作界面
03
Iceberg 自身提供了三层数据过滤策略,分别是 [2]:
分区剪裁:和 Hive 表类似,对于分区表,引擎端可以自动从 where 条件中根据分区键直接提取出需要访问的分区,从而避免扫描所有的分区。分区剪裁可以细分为静态分区剪裁和动态分区剪裁,其中静态分区剪裁发生在 SQL 语句编译阶段,而动态分区剪裁则发生在 SQL 语句执行阶段。
文件过滤:Iceberg 提供了文件级别的统计信息,例如 Min/Max 等,可以快速过滤无关数据和文件,可以用 where 语句中的过滤条件去判断目标数据是否存在于文件中。例如 SELECT * FROM table WHERE dt='2023-01-01' AND channel_id = '20',dt 是分区,channel_id 是字段,对于 channel_id = '20' 这样的过滤条件,元信息中存储了每个文件 channel_id 的 upper_bounds 和 lower_bounds,可以通过判断列值是否在范围内决定是否需要扫描当前文件。
但实际使用中,这种过滤发挥的作用比较小。因为数据写入是随机且无序的,导致 upper_bounds 和 lower_bounds 范围重合度非常高,这种情况下目标数据可能会分布在大部分文件甚至所有文件,扫描数据文件的范围也大大增加。因此在切换为 Iceberg 后,我们可以基于过滤条件中的高频列进行排序,降低文件级别的 upper_bounds 和 lower_bounds 的范围重合度。
除了 MinMax 外,Iceberg 还可以支持更多类型的索引进行文件级过滤,例如字典、布隆过滤器等。
文件内 RowGroup 过滤:对于 Parquet、ORC 这类列式存储文件格式,在文件内部也存在相应的统计信息,例如Min、Max、BloomFiter 等等,利用这些信息可以快速跳过无关的 RowGroup 或者 Stripe,减少文件内数据扫描的量。
基于 Iceberg 查询更快的基本原理,我们可以总结出如下技巧:
配置分区:使用分区剪裁的方式使查询只针对特定分区的数据执行,而不需要扫描整个数据集。
指定排序列:通过对数据分布进行合理的组织,最大限度的发挥文件级别的过滤效果,使得查询只集中在特定的文件。例如通过下面的方式使得写入 sample 表的数据按照 category, id 降序写入,注意由于多了一个排序的环节,这种方式会比非排序的写入耗时长。
ALTER TABLE db.sample WRITE ORDERED BY category, id DESC
write.parquet.bloom-filter-enabled.column.test = true -- parquet 文件给 test 列增加 bloom-filter
write.orc.bloom.filter.columns = test -- orc 文件给 test 列增加 bloom-filter
使用 Trino 代替 Spark:由于 Trino 自身 MPP 的架构,在查询上相较于 Spark 更有优势,并且 Trino 自身对 Iceberg 也有相应的优化,因此如果有秒级查询的需求,可将引擎由 Spark 切换到 Trino。
Alluxio 缓存:使用 Alluxio 作为数据缓存层,将数据缓存在内存中。在查询时可以直接从内存中获取数据,避免从磁盘读取数据的开销,可大大提高查询速度,也可防止 HDFS 抖动对任务的影响。
ORC 代替 Parquet:由于 Trino 对 ORC 格式有特定的优化,使得 ORC 的读取性能要优于 Parquet,可以将文件格式设置为 ORC 加速查询。
配置合并:写 Iceberg 的任务往往会出现写入文件较小但数量较多的情况,通过将小文件合并成一个或少量更大的文件,有利于减少读取的文件数,降低磁盘 I/O。
背景:数据集市是从 Hive 表切换为 Iceberg 表的场景之一,在切换到 Iceberg 后查询速度明显地变快。经过实验对比,确认性能是由文件内 RowGroup 过滤带来的。
图 3-1 Hive 和 Iceberg 查询对比
我们在另一个场景进一步探索排序对性能的影响。由于分区下仅一个文件,因而文件级过滤不起作用。我们分别比较了 Parquet 和 ORC 这两种文件格式,在排序和未排序下的查询性能,最终结论如下:
同样的文件格式,排序后文件内过滤效果更好,大致能快 40%;
ORC 查询性能优于 Parquet;
使用 Trino 查询,我们推荐 Iceberg 表 + ORC 文件格式 + 列排序;
图 3-2 Iceberg 分别在 Parquet、ORC 格式上文件内过滤性能对比
业务表特定的列可能会频繁用做过滤条件,默认情况下数据是乱序组织,此时列 MinMax 值过滤也难以发挥作用,因特定值在每个文件都被包含。如果在数据写入时,按照该列进行排序组织,则 MinMax 值就能过滤掉大部分无关文件,大幅减少读取的数据量,加速查询。
下面以 CDN 的一个表为例,它的查询频繁用到 isp 和 prov 两个列,一个典型查询如下:
SELECT
"date" / 300 * 300 as "date",
isp,
ip_type,
sum("traffic") as "traffic"
FROM
table
WHERE "date" >= 1698986100 AND "date" < 1698986400
AND isp IN ('TV', 'Mobile', 'Phone')
AND prov IN ('BeiJing')
GROUP BY
"date" / 300 * 300, isp, ip_type;
我们分别测试对应表,默认不排序,按照 isp 排序,按照 prov 排序 3 种情况,最终性能如下:
按照 prov 排序查询读取数据量是不排序的 25%,耗时是 66%;
按照 isp 排序提升不明显,这是因为 isp 数据量有明显的倾斜,条件中 isp 值占比高达 90%;
图 3-3 Iceberg 文件级过滤性能提升对比
在会员订单场景,业务既有基于订单 ID 检索的需求,又有查询某个用户 UserId 历史订单的需求。这两个列基数都非常大,无论用哪个列排序,另一个列的查询都会退化为全表检索。此类场景可以通过布隆过滤器满足。下图演示了开启布隆过滤器后,订单表的性能和 Impala + Kudu 接近,而未开启的情况下查询要接近 1000 秒。
图 3-4 Iceberg 使用布隆过滤器和 Impala + Kudu 的性能对比
Trino 社区早期版本仅支持 Iceberg 表 V1 的查询,而对 V2 表格式的支持有问题(查询结果不正确)。爱奇艺的方案是在 Pilot SQL 网关中基于 Iceberg 表格式进行路由,V1 表路由到 Trino 引擎,V2 表路由到 Spark 引擎。
我们在 Trino 434 版本,重新验证了 Trino 对 Iceberg V2 表的查询。实验过程如下,对于 TPC-DS 测试集,我们每次变更表 0.1% 的数据,累计变更 20 轮,表使用 Merge On Read 模式,通过 Spark 执行变更生成 Position Delete 文件。changeN 代表 N 次变更后,rewrite_position 代表执行了 Spark rewrite_position_delete_files 后,rewrite_data 代表rewrite_data_files 后。
图 3-5 Spark 和 Trino 对于 Iceberg 的查询性能对比
可以看到:
Trino 对于 V2 表查询结果与 Spark 一致,且在相同核数性能优于 Spark,耗时是 Spark 的 1/3 左右;
随着变更轮次的增加(Data File 和 Postition Delete File 数量增加),Trino 查询性能也会逐渐变慢,需要定期进行合并。
===
04
传统上大数据表对变更的支持较差,然而业务上有很多的变更需求:
ETL 计算:如广告计费,通过接入 Iceberg 实现变更,简化业务逻辑,实现了更长时间范围的转化回收;
数据修正:批量修正,如对某个数据的状态进行修改、批量删除等;
隐私相关:如播放记录、搜索记录,用户需要删除历史条目等;
CDC 同步:如订单业务,需要将 MySQL 中的数据进行大数据分析,通过 Flink CDC 技术很方便地将 MySQL 数据入湖,实时性可达到分钟级。
在 Hive 中实现变更,主要有如下两种方式:
分区覆写 例如修改某个 id 的相关内容,先筛选出要修改的目标行,更新后与历史数据进行合并,最后覆盖原表。这种方式对不需要修改的数据进行了重写,浪费计算资源;且覆写的粒度最小是分区级别,数据无法进一步细分,任务耗时相对较长。
标记删除 通常的做法是添加标志位,数据初始写入时标志位置 0,需要删除时,插入相同的数据,且标志位置 1,查询时过滤掉标志位为 1 的数据即可。这种方式在语义上未实现真正的删除,历史数据仍然保存在 Hive 中,浪费空间,而且查询语句较为复杂。
Iceberg 目前支持的变更类型如下:
DELETE FROM table_name WHERE channel_id= '2'
UPDATE table_name SET category='c2' WHERE id='2'
MERGE INTO db.target t -- a target table
USING (SELECT ...) s -- the source updates
ON t.id = s.id -- condition to find updates for target rows
WHEN MATCHED AND t.count IS NULL AND s.op = 'increment' THEN UPDATE SET t.count = 0
WHEN NOT MATCHED THEN INSERT *
Iceberg 支持多种变更策略,每个策略有各自的优劣和适用场景,下面简单介绍一下每种策略的原理 [3]。
Copy on Write(写时合并):当进行删除或更新特定行时,包含这些行的数据文件将被重写。写入耗时取决于重写的数据文件数量,频繁变更会面临写放大问题。如果更新数据分布在大量不同的文件,那么更新的执行速度比较慢。这种方式由于结果文件数较少,读取的速度会比较快,适合频繁读取、低频批次更新的场景。
Merge on Read(读时合并):文件不会被重写,而是将更改写入新文件,当读取数据时,将新文件合并到原始数据文件得到最终结果。这使得写入速度更快,但读取数据时必须完成更多工作。写入新文件有两种方式,分别是记录删除某个文件对应的行(position delete)、记录删除的数据(equality detete)。
Position Delete:当前 Spark 的实现方式,记录变更对应的文件及行位置。这种方式不需要重写整个数据文件,只需找到对应数据的文件位置并记录,减少了写入的延迟,读取时合并的代价较小。
Equality Delete:当前 Flink 的实现方式,记录了删除数据行的主键。这种方式要求表必须有唯一的主键,写入过程无需查询数据文件,延迟最低;然而它的读取代价最大,这是由于读取时需要将 equality delete 记录和所有的原始文件进行 JOIN。
表 4-1 总结了不同变更策略的特点及适用场景:
表 4-1 Iceberg 不同变更策略对比
Iceberg **配置变更策略:**Iceberg 中可以通过 write.delete.mode、write.merge.mode、write.update.mode 属性分别设置删除、合并、更新等写入模式,默认值均是 copy-on-write。当前只有 V2 表支持 Merge-on-read 模式。
表 4-2 Iceberg 变更属性配置方式
本节通过一些例子,说明 Iceberg 支持变更给业务带来的价值。
如图 4-1 所示,在效果广告场景中,客户有查询计费转化数和深度转化数据的需求(基于计费时间)。比如某垂直领域客户,希望把用户行为统一起来,1 号发生的 100 万曝光,产生了 40 万点击(仅为示例,非真实数据),进而在后续的第 N 天内发生了 5000 的用户付费行为,需将第 N 天的付费归因到 1 号的曝光。广告报表都是基于用户行为时间,即日志时间聚合而成,为支持将深度转换归因到广告计费的当日,由于 Hive 不支持变更做了如下复杂的设计:
每天触发一次计算,从行为表聚合出过去 7 天的“计费时间”数据。此处用 rt 字段代表计费时间
提供统一视图合并行为数据和计费时间数据,计费归因表 rt as dt 作为分区过滤查询条件,满足同时检索曝光和计费转化的需求
图 4-1 广告计费转换场景
而在 Iceberg 场景下,其支持变更因而无需使用多个不同的表,直接在原表通过如下 SQL 即可完成:
MERGE INTO iceberg_taget_table t
USING (
SELECT * FROM changes_table
WHERE dt='2023-12-12'
) s
ON t.id = s.id
...
AND t.dt = s.dt
WHEN MATCHED THEN
UPDATE SET
count = count + s.cnt,
deep_count = deep_count + s.deep_cnt,
...
通过 Iceberg 表 merge 可简化整个处理流程:
时效性提升:从天级缩短到小时级,客户更实时观察成本,有利于预算引入;
计算更长周期数据:原先为计算效率仅提供 7 日内转换,而真实场景转换周期可能超过 1 个月;
表语义清晰:多表联合变为单表查询。
举个例子,业务发现线上 ETL 任务逻辑有 BUG,导致某个列的值不准确。虽然线上 ETL 任务已经修复,但是错误的数据已经写入到下游的 Iceberg 表里。如果是 Hive 场景,需要重跑 ETL 任务,全量覆盖天分区进行修正。而在 Iceberg 表我们可以通过如下 SQL 进行修正:
UPDATE your_iceberg_table
SET strategy_code = 'correct_value'
WHERE dt = '2023-12-01' and strategy_code = 'wrong_value'
05
Iceberg 不仅提供了数据删除、更新等功能,有效满足数据保留政策和合规性要求,而且查询加速措施更为多样,可以利用列式存储、索引和元数据统计信息来优化查询计划,提高查询性能,可以帮助我们简化业务逻辑,提升时效性,加快数据产出。
通过以上平滑切换方案,从 Hive 到 Iceberg 不需要重新构建数据湖,在尽量保持语义兼容性的情况下,可以进行无缝迁移,减少了迁移的复杂性和风险。
后续我们将继续推进 Hive 到 Iceberg 的迁移,提升数据流通效率,促进业务提效增收。
06
也许你还想看