1. 背景
在当前全行业、全公司降本增效的背景下,哔哩哔哩也在积极推进实时、一流的业务资源整合,向云原生架构迁移,统一资源池和调度,提高资源利用效率。 但真正的问题是,不同业务场景下资源的规格和需求是不同的。
由于业务性质的原因,业务资源池一般只有很强的算力,基本不具备存储和IO能力。 Flink 虽然是一个计算引擎,但由于其有状态特性,在很多计算场景中对存储和 IO 都有很强的要求,因此实时资源池也具有很强的存储和计算能力。 考虑到大数据存算分离的整体发展趋势,我们尝试用存算分离来改造Flink,核心工作是statebackend的远程化。
二、痛点
flink 中的 statebackend 用于存储任务状态。 在使用上,分为operatorstatebackend和keyedstatebackend,operatorstatebackend一般存储一些与计算逻辑本身无关的数据,比如Kafka的偏移量,比较小,不会受到数据规模和计算逻辑的影响,而keyedstatebackend则相反,存储的是与计算逻辑强绑定的状态数据, 如agg双流连接的中间结果大小受数据规模和计算逻辑的影响。
哔哩哔哩上有4000+个流式计算任务,其中95%是SQL任务,50%有状态,数百个任务状态大小超过500GB。 目前默认的statebackend使用rocksdbstatebackend,在实现中支持增量快照,可以减少大量重复的状态数据上传到文件系统,而历史的flink集群机器配置了相对高性能的磁盘,可以支持将TaskManager中的大状态本地存储到rocksdb,当任务是checkpoint时, 增量状态文件上传到文件系统,无论 keyedstate 大小是 0 还是 TB,文件系统都可以支持所有流式计算任务。现有环境将存在以下两个痛点:
1.整体磁盘利用率较低
所有 Flink 机器都配备了高性能的大盘,可以支持超大状态任务的健康运行,并且我们默认配置了状态的 retentiontime,80% 的任务在 100 GB 状态内是小的或无键的,TaskManager 运行的机器整体利用率明显偏低, 而如果机器没有处于大型状态,那么当前机器的高性能大盘显然是浪费的。
2.大任务重新缩放很慢
在重新扩缩容超大状态时,任务会先从文件系统中移除状态数据** 当状态处于TB级别时,状态数据的重新分发成本比较大,恢复时间大约需要半小时,用户体验会很差,容易增加引擎端的duty成本。
3. 远程状态后端
解决上述痛点,一是状态数据需要实时存储在远程服务中,减少Flink集群对磁盘的强依赖,实现存储和计算分离,这也符合云原生架构的演进目标另一种是状态数据可以存储在密钥组单元中,以避免数据重新分发操作。
在与分布式存储团队沟通后,其自研的泰山(B站分布式KV存储[1])存储基本可以满足我们的需求,泰山存储基于RockSDB和SparrowDB的改造,采用Raft一致性协议来保证多副本数据的一致性,构建一个高可靠、高可用、高性能、高扩展的存储系统。 泰山存储提供了 j**a 的 Put Get del Scan 等 API,可以具备快照功能,在 API 层面与 RocksDB 的功能基本一致,并且可以支持横向扩展和升级,完全可以满足 Flink 的需求,经过多次沟通和功能支持,具备了构建 TaishanStatebackend 的条件。
1.状态切换保证
在 KeyedStateBackend 中,每条要计算的数据都进入子任务,并根据数据的键进行传递"mathutils.murmurhash(key's hashcode) %maxparallelism"计算一个id作为keygroupid,可以保证同一key的数据在同一个子任务中计算,不会造成计算引擎中乱序的现象,maxparallelism为最大并行度,任务启动后其值不会改变。 根据 keygroupid 将每条数据归属于一个键组分片,键组分片总数为 maxparallelism,每个分片是不能修改的,因为重启前后不允许修改 maxparallelism 的值,当任务重新扩容时,键组下的数据会为状态移动, 如下图所示。
泰山存储有分片的概念,类似于 Flink 的 keygroup,泰山的分片有 ** 和合并的能力,我们只需要同意 Flink 的泰山表禁用了这个功能。 Flink 的另一个关键点是 CheckPoint,RocksDBsDBSuptendEnd 的增量 checkpoint 是在同步过程中对 RocksDB 做一个快照,异步进程在任务重启时将修改后的 SST 文件上传到文件系统进行恢复,Taishan Storage 提供了在每个分片上创建和恢复快照的能力,而 Flink 在做 checkpoint 时可以做 checkpoint 在每个子任务中, 在 keygroupRange 中从头到尾进行快照,snapshotId 与 Flink 的 checkpointid 相同。
2.Statebackend 拓扑设计
Keyed StateBackend 负责核心状态数据管理,托管状态分为 keyed state 和 priority queue state。 键控状态有基本的 internalvaluestate 和 internalmapstate,以及复杂的 internalliststate、internalaggregatingstate、internalReducingState 和 internalfoldingState,不同类型的状态会应用于不同 SQL 场景下的算子优先级队列状态需要由 InternalPriorityQueue 实现。 Flink Flow Computation 的 checkpoint 机制是任务在过程中运行时可靠性的基石** 如果发生故障,可以将 checkpoint 的信息恢复到故障前的某个状态,然后从该状态恢复任务的运行,checkpoint 的执行策略根据 snapshotstrategy 的实现确定, 并且根据 restoreoperation 的实现确定 checkpoint 的恢复策略 statebackend 的整体拓扑如下图所示。
3.台山State后端架构设计
TaishanStateBackend是根据上述规则和泰山存储做出以下约定的:
taishan 表的 keygroupid 和 shardid 是一对一映射的。
taishan 表中的分片数量与 Flink 的 maxparallelism 相同,并且总是不可避免地被合并。
当任务首次启动时,每个具有 keyedstate 的算子只会创建一个 taishan 表。
一个运算符下可能有多个状态,发送的 KV 数据的 K 以 columnfamily 前缀为前缀。
当 flink 作为 checkpoint 时,会为 Taishan 中的每个分片创建一个快照,并且每个分片的 snapshotid 是相同的。
Flink 在做任务恢复时,本质是根据 snapshotid 恢复泰山的每个分片。
修改远程存储架构后,主要优点如下:
1)检查点更轻
RocksDBsNationalBackend 的状态存储在子任务的本地 RocksDB 中,只有在做 checkpoint 的时候,才会根据 snapshotstrategy 决定是以增量还是全量模式上传到文件系统,上传的内容分为元数据和 keyedstate 内容,修改为 taishanstatebackend 时, 只需要将元数据元数据文件上传到文件系统中,单个分片的快照过程就可以在毫秒级完成。
2)存储和计算分离
使用TaishanStateBackend后,有状态算子不需要节点机拥有高性能磁盘,状态数据存储在远程泰山系统中,减少了容器机对磁盘的强依赖性,从而达到存储和计算分离的效果。
3) 加速任务重扩
在 RocksDB StateBackend 中对任务进行扩容或扩缩容时,由于子任务下对应的 KeyGroupRange 的起始和结束发生变化,需要根据子任务之间的 keygroupid 前缀对本地 RocksDB 实例进行状态寻道和 batchput 操作,从而根据密钥组重新分配状态数据。使用 TaishanStateBackend 后,该任务无需迁移状态数据进行伸缩操作,因为每个密钥组都与分片一一对应,只需要修改密钥组范围的开始和结束。
四、优化
在 rocksdbstatebackend 的前期,我们已经对 read、write、delete、readnull、seek 等请求的状态的耗时、请求量、数据包长度等做了指标统计。 通过对在线运行的任务指标的分析可以看出,状态每秒读写请求数最高可达100万,每秒10万个读写请求任务有上百个,在group agg和window agg场景下,每个请求包的大小一般为几十字节, 而即使在 ZSTD 压缩之后,联接场景中单个状态的值仍将存在于数百 kb 中。
在功能测试中发现,使用TaishanStateBackend的任务在使用相同资源时CPU负载较高,并且发现当状态下的RPC请求数量较大时,网络消耗的CPU占比较大。 我们状态下的数据从本地存储转换为远程存储后,每个请求都是由网络 RPC 请求发出的,虽然 RocksDB 中没有堆外内存的消耗,但是对网络的依赖性会很大,当 RPC 请求达到一定阈值时,网络必然会成为当前场景下的瓶颈。 为了解决这个问题,我们选择在状态和远程KV存储之间增加一层缓存层,以减少对网络的请求读取。 整体结构如下,下面将逐步说明。
1.写入优化
1) 优化写入
而不是将缓存中的结果写入远端,或者在put remove时直接与远端交互,将put远程请求放入当前子任务的blockingqueue中,并使用put线程来消费put请求量达到一定阈值时,将请求量刷新到远程KV存储中, 或者当 Flink 作为检查点时触发刷新操作。添加指标和压测后发现,当最大批量大小设置为 800 时,对远端 KV 存储的 PUT 请求只会增加 2-4 倍,整体网络写入请求量会减少 100 倍,提高了状态的写入速率。
2.读取优化
1) 缓存加速读取
而不是将缓存中的结果写入远端,或者在put remove时直接与远端交互,将put远程请求放入当前子任务的blockingqueue中,并使用put线程来消费put请求量达到一定阈值时,将请求量刷新到远程KV存储中, 或者当 Flink 作为检查点时触发刷新操作。添加指标和压测后发现,当最大批量大小设置为 800 时,对远端 KV 存储的 PUT 请求只会增加 2-4 倍,整体网络写入请求量会减少 100 倍,提高了状态的写入速率。
2) Readnull 优化
在实践过程中,会发现以下两种场景都存在较大的 readnull 请求,即缓存获取的结果为 null,从远端获取的结果也是 null。
当任务的 key 稀疏时,通过指标会发现大量的 readnull 请求,尤其是在去重和窗口 agg hop 场景下。
当任务的 key 按天或按小时周期性变化时,会产生大量的 readnull 请求,尤其是在 group agg 场景下。
由于 readnull 请求较多,经过调查,我们最初尝试使用 Hadoop 的 bloomfilter 来过滤 readnull 请求,当 Flink 进行 checkpoint 时,将 bloomfilter 结果写入远程存储系统,在任务恢复时可以恢复任务。 但是,无论如何调整 BloomFilter 的系数和容量,随着时间的推移,BloomFilter 的过滤效果越来越低,误报率越来越高,无法解决 ReadNull 的根本问题。
我们内部的 flink bsql 任务目前对状态有默认的 TTL 设置为 24 小时,当超过 24 小时未访问状态的数据时,状态会过期,然后在 rocksdb 做 compaction 时删除,基本可以满足大部分用户的实际使用。
经过团队和用户的排查总结,发现在窗口 5 10 分钟和双流加入 1 2 小时的用户业务行为中存在很多场景,所以 24 小时 TTL 对于这类情况非常丰富,在这种情况下,我们可以假设大多数场景下数据都不会有延迟, 所以任务在5分钟或1小时内的实际有效密钥会比24小时内少很多。在这种情况下,可以选择为缓存分配一定比例的内存,以创建一个独立的 kv 缓存使用逐出策略删除数据时,可以自定义删除逻辑,删除逻辑是判断当前时间与失效的绝对时间的对比, 如果逐出数据不满足当前时间,小于过期时间,则会抛出OOM异常,并添加相关内存或调整KV缓存比例。
由于 GC 对堆内存的限制,以及研究使用堆外内存作为缓存的默认选择,只有在使用堆外内存时才允许使用这种独立的 KV 缓存,我们称之为 offheapbloomfilter,下面将解释堆内外的选择。 OffHeapBloomFilter 会将所有有效键缓存在内存中,充当 bloomfilter,从而过滤大量 readnull 请求。
当任务重启时,OffHeapBloomFilter 会在第一个 TTL 时间内过期,但它会记录状态键前后的数据,OffHeapBloomFilter 会在第一个 TTL 时间后开始正常工作,可以保证 Bloomfilter 的准确性和有效性。
由于窗口 agg 的键控状态使用优先级队列状态按照窗口的顺序进行清理,因此其键控状态默认没有 TTL。CTL 是 Assigner 的窗口时间加上允许的延迟时间,一般用于做任务重启、失败或累加的缓冲时间,避免因任务异常时间过长导致数据结果不满意的问题,导致状态为 null。同样,我们也分别对间隔加入和两种延迟双流加入的键控状态提供了相应的支持,TTL设置为左右流的最大时间加上允许的延迟时间。
3.内存模型优化
在缓存读优化中,缓存使用的内存是任务堆的堆内存,与用户JVM的堆内存共享,缓存使用高性能缓存库Caffeine。 当一个算子至少有一个状态时,创建一个泰山表,子任务会根据它有多少个键控状态来决定创建多少个咖啡因缓存,如下图所示:
这样的结构会遇到两个问题:
创建的咖啡因缓存数等于 n*m,其中 n 是当前 TaskManager 中的插槽数,m 是键控状态数。 由于流式计算前后可能存在AGG或filter,缓存前后缓存处理的状态数据量不同,导致缓存难以根据时间或数量配置逐出策略,且缓存对象之间不共享,频繁更新的缓存内存无法充分利用。
研究发现,经过G1参数[2]的多次调整后,GC时间仍然很高,并且不时出现GC高耗时抖动的现象,这使得任务不那么流畅,影响了用户体验。
在研究了 RocksDBSdBStateBackend [3] 的堆外内存机制和市面上的堆外内存框架后,我们选择使用 Off Heap Cache(OHC)框架 [4] 来制作缓存,并生成了如下结构。 Caffeine Cache 被 OHC Cache 取代,内存从 Task HEAP 转换为 Managed OffHeap,多个子任务共享当前插槽分配的 OHC Cache,并且分配的缓存对象唯一共享,以上两个主要问题在这样的替换后可以完全解决,执行效率基本符合预期。
由于早期泰山存储中没有使用 ColumnFamily 的概念,而 KeyGroupID 作为 API 级别的参数放在 API 中,所以原 RocksDB 的 K 中的 KeyGroupID 被 ColumnFamily 替换,并且由于 OHC 缓存在堆外共享,并且可能存在多个状态,具有相同的 columnfamily 和 key, 所以为了防止算子影响彼此的状态数据,将算子的前八位作为前缀添加到泰山状态的K中。
此外,对于 OHC 缓存,我们默认选择 OffHeapLinkedLrumap,其内存模型结构如下图所示,并进行了一些适配:
更改了 hashtablesize 并禁用了 map 的自动重新哈希,以防止键迭代器调用导致数据不准确。
修改了 LRU 的逐出逻辑,ohc 在 cacheserializer 中新增了 elementcouldremove 方法,在 OHC 逐出数据之前会调用该方法判断是否可以移除数据,而 flink 端 Serializer 上的值只需要实现判断当前时间是否大于过期时间, 即可以与异步延迟时间刷新对齐,确保无效数据已写入远程存储,另一个用于 offheaplumfilter 中数据失效的逐出策略逻辑。
虽然OHC的超时触发逻辑可以通过指标准确查看缓存中堆外内存的实际使用情况,但对任务的吞吐量有负面影响,因此默认关闭超时功能。
5. 现在和未来
目前,哔哩哔哩自 2022 年 11 月初开始,已逐步切换了 100+ 个具有 keystate 的在线任务,由于 RPC 网络开销的增加,整体资源使用量略有增加,但存储计算分离、加速任务扩容的目的已基本实现,整体符合预期。
当然也有一些特殊情况,需要优化的主要项目如下:
高QPS场景:在一些业务场景中,密钥非常稀疏,缓存命中率较低,当状态数据量足够大时,offheapbloomfilter 的内存要求过高,在降本增效的环境下难以实现,关闭 offheapbloomfilter 会导致大量 readnull 现象, 一般表现为单个子任务的grPC请求QPS过高,网络压力大。
大键值场景:如果双流加入场景下存在较大值,在多字段去重场景下存在较大密钥,运行一段时间后会出现写卡顿现象,影响任务的健康运行,这也是存储团队下一阶段需要做进一步联合调试和优化的地方。
状态分层存储:目前缓存的实现是使用堆外内存作为存储介质,在上述高QPS和大键值场景下,由于内存空间限制,缓存命中率会下降。 未来,我们计划参考 Flink Forward Asia 2022 中提到的分层状态后端的思想,同时使用机器上的磁盘和内存作为缓存加速资源,同时保持远程存储上的状态数据完整,形成一套分层状态存储架构,不仅可以解决单个缓存介质的容量限制, 而且在混合零件的情况下,也更有效地提高了机器的资源利用率。接下来,Flink 的实时任务将通过 K8S 平台和 ** 业务进行统一的混合和管理,并且随着哔哩哔哩的 ** 业务正在积极推进“无盘”的转型,本地闲置的 SSD 磁盘资源可以更好地被 Flink 任务用作分层状态存储的缓存资源, 进一步提高混合部件技术下机器资源的利用率。
在进行上述优化项的同时,将进一步提升taishanstatebackend的覆盖率,并选择合适的任务部署在混合部分集群中,最终达到默认激活的效果。
引用
作者丨Zhang**&Cao Jie**丨***哔哩哔哩科技(ID:Bilibili-TC) DBAPLUS社区欢迎技术人员投稿,投稿邮箱:editor@dbapluscn