汽车之家基于 Apache Flink 的跨数据库实时物化视图探索

汽车之家基于 Apache Flink 的跨数据库实时物化视图探索
作者:
本文介绍了汽车之家在基于 Flink 的实时物化视图的一些实践经验与探索,并尝试让用户直接以批处理 SQL 的思路开发 Flink Streaming SQL 任务。主要内容为:物化视图这一功能想必大家都不陌生,我们可以通过使用物化视图,将预先设定好的复杂 SQL 逻辑,以增量迭代的形式实时 (按照事务地) 更新结果集,从而通过查询结果集来避免每次查询复杂的开销,从而节省时间与计算资源。事实上,很多数据库系统和 OLAP 引擎都不同程度地支持了物化视图。另一方面,Streaming SQL 本身就和物化视图有着很深的联系,那么基于 Apche Flink (下称 Flink) SQL 去做一套实时物化视图系统是一件十分自然而然的事情了。
#行业实践#生活服务

汽车之家基于 Apache Flink 的跨数据库实时物化视图探索

本文介绍了汽车之家在基于 Flink 的实时物化视图的一些实践经验与探索,并尝试让用户直接以批处理 SQL 的思路开发 Flink Streaming SQL 任务。主要内容为:物化视图这一功能想必大家都不陌生,我们可以通过使用物化视图,将预先设定好的复杂 SQL 逻辑,以增量迭代的形式实时 (按照事务地) 更新结果集,从而通过查询结果集来避免每次查询复杂的开销,从而节省时间与计算资源。事实上,很多数据库系统和 OLAP 引擎都不同程度地支持了物化视图。另一方面,Streaming SQL 本身就和物化视图有着很深的联系,那么基于 Apche Flink (下称 Flink) SQL 去做一套实时物化视图系统是一件十分自然而然的事情了。

本文介绍了汽车之家在基于 Flink 的实时物化视图的一些实践经验与探索,并尝试让用户直接以批处理 SQL 的思路开发 Flink Streaming SQL 任务。主要内容为:

  1. 系统分析与问题拆解
  2. 问题解决与系统实现
  3. 实时物化视图实践
  4. 限制与不足
  5. 总结与展望

null

前言

物化视图这一功能想必大家都不陌生,我们可以通过使用物化视图,将预先设定好的复杂 SQL 逻辑,以增量迭代的形式实时 (按照事务地) 更新结果集,从而通过查询结果集来避免每次查询复杂的开销,从而节省时间与计算资源。事实上,很多数据库系统和 OLAP 引擎都不同程度地支持了物化视图。另一方面,Streaming SQL 本身就和物化视图有着很深的联系,那么基于 Apche Flink (下称 Flink) SQL 去做一套实时物化视图系统是一件十分自然而然的事情了。

本文介绍了汽车之家 (下称之家) 在基于 Flink 的实时物化视图的一些实践经验与探索,并尝试让用户直接以批处理 SQL 的思路开发 Flink Streaming SQL 任务。希望能给大家带来一些启发,共同探索这一领域。

一、系统分析与问题拆解

Flink 在 Table & SQL 模块做了大量的工作,Flink SQL 已经实现了一套成熟与相对完备的 SQL 系统,同时,我们也在 Flink SQL 上有着比较多的技术和产品积累,直接基于 Flink SQL 本身就已经解决了构建实时物化系统的大部分问题,而唯一一个需要我们解决的问题是如何不重不漏地生成数据源表对应的语义完备的 Changelog DataStream,包括增量和全量历史两部分。

虽然规约到只剩一个问题,但是这个问题解决起来还是比较困难的,那我们将这个问题继续拆解为以下几个子问题:

  1. 加载全量数据;
  2. 加载增量数据;
  3. 增量数据与全量数据整合。

二、问题解决与系统实现

问题一:基于数据传输平台的增量数据读取

增量数据加载还是相对比较好解决的,我们直接复用实时数据传输平台的基础建设。数据传输平台[1] 已经将 Mysql / SqlServer / TiDB 等增量数据以统一的数据格式写入到特定的 Kafka Topic 中,我们只要获取到对应的 Kafka Topic 就可以进行读取即可。

问题二:支持 checkpoint 的全量数据加载

对于全量数据载入,我们先后写了两个版本。

第一版我们用 Legacy Source 写了一套 BulkLoadSourceFunction,这一版的思路比较朴素,就是全量从数据源表进行查询。这个版本确实能完成全量数据的加载,但是问题也是比较明显的。如果在 bulk load 阶段作业发生了重启,我们就不得不重新进行全量数据加载。对于数据量大的表,这个问题带来的后果还是比较严重的。

对于第一版的固有问题,我们一直都没有特别好的对策,直到 Flink-CDC[2] 2.0 的发布。我们参考了 Flink-CDC 的全量数据加载阶段支持 Checkpoint 的思路,基于 FLIP-27 开发了新的 BulkLoadSource。第二版不论在性能上还是可用性上,对比第一版都有了大幅提升。

问题三:基于全局版本的轻量 CDC 数据整合算法

这三个子问题中,问题三的难度是远大于前面两个子问题的。这个问题的朴素思路或许很简单,我们只要按照 Key 缓存全部数据,然后根据增量数据流来触发 Changelog DataStream 更新即可。

事实上我们也曾按照这个思路开发了一版整合逻辑的算子。这版算子对于小表还是比较 work 的,但是对于大表,这种思路固有的 overhead 开始变得不可接受。我们曾用一张数据量在 12 亿,大小约 120G 的 SqlServer 表进行测试,本身就巨大的数据再加上 JVM 上不可避免的膨胀,状态大小变得比较夸张。经过这次测试,我们一致认为这样粗放的策略似乎不适合作为生产版本发布,于是我们不得不开始重新思考数据整合的算法与策略。

在谈论我们的算法设计思路之前,我不得不提到 DBLog[3] 的算法设计, 这个算法的核心思路利用 watermark 对历史数据进行标识,并和对应的增量数据进行合并,达到不使用锁即可完成整个增量数据和历史数据的整合,Flink-CDC 也是基于这个思路进行的实现与改进。在相关资料搜集和分析的过程中,我们发现我们的算法思路与 DBLog 的算法的核心思路非常相似, 但是是基于我们的场景和情况进行了设计与特化。

首先分析我们的情况:

  • 增量数据需要来自于数据传输平台的 Kafka Topic;
  • 增量数据的是 at least once 的;
  • 增量数据是存在全序版本号的。

结合上述情况进行分析,我们来规约一下这个算法必须要达成的目标:

  • 保证数据的 Changelog Stream,数据完整,Event (RowKind) 语义完备
  • 保证该算法的 overhead 是可控的;
  • 保证算法实现的处理性能是足够高效;
  • 保证算法实现不依赖任何来自于 Flink 外部的系统或者功能。

经过大家的分析与讨论后,我们设计出了一套数据整合的算法,命名为 Global Version Based Pause-free Change-Data-Capture Algorithm

3.1 算法原理

我们同时读入 BulkLoadSource 的全量数据与 RealtimeChangelogSource 增量数据,并根据主键进行 KeyBy 与 Connect,而算法的核心逻辑主要由之后的 KeyedCoProcess 阶段完成。下面交待几个关键的字段值:

  • SearchTs:全量数据从数据源查询出来的时间戳;
  • Watermark:基于增量数据在数据库里产生的时间戳生成;
  • Version:全序版本号,全量数据是 0,即一定最小版本。

KeyedCoProcess 收到全量数据后,不会直接发送,而是先缓存起来,等到 Watermark 的值大于该 SearchTs 后发送并清除对应 version0 版本数据的缓存。在等待的期间,如果有对应的 Changlog Data,就将被缓存的 Version0 全量数据丢弃,然后处理 Changelog Data 并发送。在整个数据处理的流程中,全量数据和增量数据都是同时进行消费与处理的,完全不需要引入暂停阶段来进行数据的整合。

null

             增量数据在全量数据发送 watermark 之前到来,只发送增量数据即可,全量数据直接丢弃        

null

             全量数据发送 watermark 到达后,仍未有对应的增量数据,直接发送全量数据

3.2 算法实现

我们决定以 Flink Connector 的形式开展算法的实现,我们以接入 SDK 的名字 Estuary 为该 Connector 命名。通过使用 DataStreamScanProvider,来完成 Source 内部算子间的串联,Source 的算子组织如下图 (chain 到一起的算子已拆开展示)。

null

  • BulkLoadSource / ChangelogSource 主要负责数据的读入和统一格式处理;
  • BulkNormalize / ChangelogNormalize 主要是负责处理数据运行时信息的添加与覆盖,主键语义处理等工作;
  • WatermarkGenerator 是针对算法工作需求定制的 Watermark 生成逻辑的算子;
  • VersionBasedKeyedCoProcess 就是核心的处理合并逻辑和 RowKind 语义完备性的算子。

算法实现的过程中还是有很多需要优化或者进行权衡的点。全量数据进入 CoProcess 数据后,会首先检查当前是否处理过更大版本的数据,如果没有的话才进行处理,数据首先会被存入 State 中并根据 SearchTs + T (T 是我们设置的固有时延) 注册 EventTimeTimer。如果没有高版本的数据到来,定时器触发发送 Version 0 的数据,否则直接抛弃改为发送 RowKind 语义处理好的高版本增量数据。

另一方面,避免状态的无限增长,当系统判定 BulkLoad 阶段结束后,会结束对相关 Flink State 的使用,存在的 State 只要等待 TTL 过期即可。

另外,我们针对在数据同步且下游 Sink 支持 Upsert 能力的场景下,开发了特别优化的超轻量模式,可以以超低的 overhead 完成全量+增量的数据同步

开发完成后,我们的反复测试修改与验证,完成 MVP 版本的开发。

三、实时物化视图实践

MVP 版本发布后,我们与用户同学一起,进行了基于 Flink 的物化视图试点。

1. 基于多数据源复杂逻辑的 Data Pipeline 实时化

下面是用户的一个真实生产需求:有三张表,分别来自于 TiDB /。SqlServer / Mysql,数据行数分别为千万级 / 亿级 / 千万级,计算逻辑相对复杂,涉及到去重,多表 Join。原有通过离线批处理产生 T+1 的结果表。而用户希望尽可能降低该 Pipeline 的延迟。

由于我们使用的 TiCDC Update 数据尚不包含 -U 部分,故 TiDB 表的整合算法还是采取 Legacy Mode 进行加载。

我们与用户沟通,建议他们以批处理的思路去编写 Flink SQL,把结果的明细数据的数据输出到 StarRocks 中。用户也在我们的协助下,较为快速地完成了 SQL 的开发,任务的计算拓补图如下:

null

结果是相当让人惊喜的!我们成功地在保证了数据准确性的情况下,将原来天级延迟的 Pipeline 降低至 10s 左右的延迟。数据也从原来查询 Hive 变为查询 StarRocks,不论从数据接入,数据预计算,还是数据计算与查询,实现了全面的实时化。另一方面,三张表每秒的增量最大不超过 300 条,且该任务不存在更新放大的问题,所以资源使用相当的少。根据监控反馈的信息,初始化阶段完成后,整个任务 TM 部分只需要使用 1 个 Cpu (on YARN),且 Cpu 使用常态不超过 20%。对比原来批处理的资源使用,无疑也是巨大提升。

2. 数据湖场景优化

正如上文提到的,对于数据同步,我们做了专门的优化。只需要使用专用的 Source 表,就可以一键开启历史数据 + 增量数据数据同步,大大简化了数据同步的流程。我们目前尝试使用该功能将数据同步至基于 Iceberg 的数据湖中,从数据同步层面大幅提升数据新鲜度。

null

四、限制与不足

虽然我们在这个方向的探索取得了一定成果,但是仍有一定的限制和不足。

1. 服务器时钟的隐式依赖

仔细阅读上面算法原理,我们会发现,不论是 SearchTs 的生成还是 Watermark 的生成,实际上最后都依赖了服务器系统的时钟,而非依赖类似 Time Oracle 机制。我们虽然算法实现上引入固有延迟去规避这个问题,但是如果服务器出现非常严重时钟不一致,超过固有延迟的话,此时 watermark 是不可靠的,有可能会造成处理逻辑的错误。

经确认,之家服务器时钟会进行校准操作。

2. 一致性与事务

事实上我们目前这套实现没有任何事务相关的保证机制,仅能承诺结果的最终一致性,最终一致性其实是一种相当弱的保证。就拿上文提到的例子来说,如果其中一张表存在 2 个小时的消费延迟,另一张表基本不存在延迟,这个时候两表 Join 产生的结果其实是一种中间状态,或者说对于外部系统应该是不可见的。

为了完成更高的一致性保证,避免上面问题的产生,我们自然会想到引入事务提交机制。然而目前我们暂时没有找到比较好的实现思路,但是可以探讨下我们目前的思考。

2.1 如何定义事务

事务这个概念想必大家或多或少都有认识,在此不多赘述。如何数据库系统内部定义事务是一件特别自然且必要的事情,但是如何在这种跨数据源场景下定义事务,其实是一件非常困难的事情。还是以上文的例子来展开,我们能看到数据源来自各种不同数据库,我们其实对于单表记录了对应的事务信息,但是确实没有办法定义来自不同数据源的统一事务。我们目前的朴素思路是根据数据产生的时间为基准,结合 checkpoint 统一划定 Epoch,实现类似 Epoch-based Commit 的提交机制。但是这样做又回到前面提到的问题,需要对服务器时间产生依赖,无法从根源保证正确性。

2.2 跨表事务

对于 Flink 物化视图一致性提交这个问题,TiFlink[4] 已经做了很多相关工作。但是我们的 Source 来自不同数据源,且读取自 Kafka,所以问题变得更为复杂,还是上面提到的例子,两张表 Join 过后,如果想保证一致性,不只是 Source 和 Sink 算子,整个关系代数算子体系都需要考虑引入事务提交的概念和机制,从而避免中间状态的对外部系统的发布。

3. 更新放大

这个问题其实比较好理解。现在有两张表 join,对于左表的每一行数据,对应右表都有 n (n > 100) 条数据与之对应。那么现在更新左表的任意一行,都会有 2n 的更新放大。

4. 状态大小

目前整套算法在全量同步阶段的 Overhead 虽然可控,但是仍有优化空间。我们目前实测,对于一张数据量在 1 亿左右的表,在全量数据阶段,需要峰值最大为 1.5G 左右的 State。我们打算在下个版本继续优化状态大小,最直接的思路就是 BulkSource 通知 KeyedCoProcess 哪些主键集合是已经处理完毕的,这样可以使对应的 Key 提早进入全量阶段完成模式,从而进一步优化状态大小。

五、总结与展望

本文分析了基于 Flink 物化视图实现的问题与挑战,着重介绍了处理生成完整的 Changelog DataStream 的算法与实现和在业务上的收益,也充分阐述了目前的限制与不足。

虽然这次实践的结果称不上完备,存在一些问题亟待解决,但是我们仍看到了巨大的突破与进步,不论是从技术还是业务使用上。我们充分相信未来这项技术会越来越成熟,越来越被更多人认可和使用,也通过此次探索充分验证了流处理和批处理的统一性。

我们目前的实现还处在早期版本,仍有着工程优化和 bug fix 的空间与工作 (比如前文提到的两表的推进的 skew 太大问题,可以尝试引入 Coordinator 进行调节与对齐),但是相信随着不断的迭代与发展,这项工作会变得越来越稳固,从而支撑更多业务场景,充分提升数据处理的质量与效率!

特别鸣谢张茄子和云邪老师的帮助与勘误。

引用

[1] http://mp.weixin.qq.com/s/KQH-relbrZ2GUqdmaTWx6Q

[2] http://github.com/ververica/flink-cdc-connectors

[3] http://arxiv.org/pdf/2010.12597.pdf

[4] http://zhuanlan.zhihu.com/p/422931694


更多 Flink 相关技术问题,可扫码加入社区钉钉交流群;

第一时间获取最新技术文章和社区动态,请关注公众号~

null

Flink CDC 系列 - 构建 MySQL 和 Postgres 上的 Streaming ETL
  • 关注
    • qr_code

      微信公众号

      最新前沿最热资讯

    • qr_code

      技术支持钉钉群

      时时刻刻得到帮助

  • TOP