刘阳,张扬扬,周号益
面向流式数据处理系统的高效故障恢复方法
刘阳1,2,3,张扬扬1,2,周号益1,2,4*
(1.北京航空航天大学 大数据科学与脑机智能高精尖创新中心,北京 100191; 2.北京航空航天大学 计算机学院,北京 100191; 3.北京航空航天大学 未来空天技术学院/高等理工学院,北京 100191; 4.北京航空航天大学 软件学院,北京 100191)(∗通信作者电子邮箱haoyi@buaa.edu.cn)
针对流式数据处理系统Flink无法高效处理单点故障的问题,提出了一种基于增量状态和备份的故障容错系统Flink+。首先,提前建立备份算子和数据通路;然后,对数据流图中的输出数据进行缓存,必要时使用磁盘;其次,在系统快照时进行任务状态同步;最后,在系统故障时使用备份任务和缓存的数据恢复计算。在系统实验测试中,Flink+在无故障运行时没有显著增加额外容错开销;而在单机和分布式环境下处理单点故障时,与Flink系统相比,所提系统在单机8任务并行度下故障恢复时间减少了96.98%,在分布式16任务并行度下故障恢复时间减少了88.75%。实验结果表明,增量状态和备份方法一起使用可以有效减少流式系统单点故障的恢复时间,增强系统的鲁棒性。
流式数据处理系统;故障恢复;分布式检查点;状态备份;Apache Flink
大数据时代,随着互联网、物联网等技术的快速发展,诸如工业监控、社交媒体、实时搜索引擎等应用场景产生了海量的数据,并对计算处理有了更严格的要求,需要数据处理系统提供低延迟的实时计算,而对实时计算的需求进一步促进了分布式数据处理系统从批处理(Batch Processing)模式逐步转变为流处理(Stream Processing)模式。
批处理系统通过对输入数据进行采样收集,当数据规模达到设定阈值后利用批处理引擎计算累积的数据,可以反映一段时间内数据的特征,同时还能够保证数据分析结果的正确性,也被广泛用于机器学习、图计算等领域,并采用分布式检查点[1]、反应式故障恢复[2]等技术进行容错。此外,批处理系统的框架相对简单且易于扩展。虽然批处理可以达到很高的吞吐,但在实时性方面难以满足当前大数据背景下各类实时应用的低延迟的需求。
相较于批处理模式,流式系统逐条地对输入数据进行实时处理,可以捕获动态实时数据的最新特征,更快地挖掘数据背后的价值。流式系统通常会对无界数据提供长期稳定且实时的计算处理,可以很好地满足商业公司对数据处理的实时性需求。流式处理分为有状态处理和无状态处理,前者语义丰富可以表达更真实的数据场景,后者的使用场景较为简单。在分布式系统中,有状态的流数据处理完全依赖于前序计算状态,如果在数据处理过程中发生故障,将导致前序状态的丢失,从而导致流计算必须整体重新开始,严重情况下如持续大规模数据输入场景,甚至无法实现状态恢复,故障后系统恢复代价很高。因此,针对有状态流数据处理的故障容错需要系统在计算资源和处理速度上进行一定妥协。但随着新兴应用场景的出现,流式系统对数据处理延迟和备份开销要求越来越严格,当前故障容错机制面临着新的挑战。
目前,主流开源流式系统[3-11]和商业流式系统[12]根据系统设计与能力支持的不同,采用的容错机制不尽相同,针对的应用场景和容错能力也有较大差异。如Storm[10-11]采用消息确认的机制实现容错,但仅支持至少一次语义支持;Spark Streaming[5-9]将流式数据视为一个个小的批数据,利用微批(Micro Batch)以支持流式计算,并复用Spark的血缘容错机制,支持精确一次语义,但对其他的流式特性支持较差。Flink[3-4]采用了一种简化的Chandy‑Lamport分布式快照算法[13-14],在保证精确一次语义的同时,实现轻量级的容错。当一次快照完成之后可视为数据成功处理,从而实现端到端的精确一次语义支持。当故障发生时,所有计算任务整体回滚到上一次快照状态,并重新消费之前的数据。然而这种方法存在一个弊端,即使故障规模较小,甚至是单节点故障,也不得不将所有计算节点进行回滚,故障恢复时间长,并且需要重新计算因回滚丢失的进度。
针对这一问题,本文提出一种基于增量式状态备份的高效故障恢复系统Flink+,通过增量式状态同步实现快速状态备份;利用上游输出缓存和备份数据通路实现故障任务的快速恢复。Flink+基于Flink已有的分布式快照机制,利用已有的计算任务进行增量式的互相备份,一个计算任务在进行自己主计算的同时,也负责备份其他节点的任务状态和计算逻辑。在创建计算图时,备份任务提前建立好与主任务上下游任务的网络连接,以降低故障恢复时任务切换的时间,该备份连接在无故障运行时并不进行传输;当快照时,备份任务增量式地同步主任务的状态,降低状态备份开销。在故障发生后,备份任务立即启动备份计算逻辑,并利用备份网络连接接管上下游数据,实现快速的故障任务切换与恢复。为了进一步细化故障恢复的粒度,采用上游输出缓存机制,在无故障运行时,上游任务的输出会被保留一段时间,以防止下游故障之后需要从数据源头进行重计算,只需要重新消费其上游的输出数据,进一步降低故障恢复时间。
本文的主要工作如下:
1)设计了一种基于增量式状态的快速备份方法,结合快照机制和增量状态备份,实现对系统状态的快速备份。
2)利用上游输出缓存和备份数据通路实现故障任务的快速切换和状态恢复,提高系统对单点故障的处理速度。
3)在开源流式系统Flink中进行了实现和实验验证,实验结果验证了本文方法的可行性和有效性,该方法在无故障运行时没有显著增加额外容错开销,同时实现了非常显著的故障恢复加速效果,加速比可达6~8。
在流式系统中,数据的最小单位是消息,对消息的处理次数保证被称为投递语义(Delivery Semantic),为了更好地理解流式计算,需要首先介绍一下流处理的三种语义:
1)最多一次:对于数据中的每条消息,至多只进行一次处理,如果发生故障,消息会丢失,并且系统不会进行故障恢复计算,而是继续处理后续到达的消息。这种语义放松了系统的计算保证,简化了系统设计,适合能够容忍数据丢失的应用。
2)至少一次:对于每条消息,至少会计算一次。考虑消息已被计算但未被确认的情况,若此时发生故障,系统重启,数据源重新发送未被确认的消息,则会导致对同一条消息的多次计算。这种语义能够保证数据的完整性,但需要上层应用处理数据重复的问题。
3)精确一次:不论是发生故障还是正常运行,对于每个消息,从系统整体的端到端来看都只会处理一次,即一份输入数据对应一份输出数据。这种语义提供最强的数据保证,可以满足对数据有强一致性要求的应用,但增加了流式系统的复杂性。
流式系统的容错机制[15-16]可分为三个类别:主动备份、被动备份以及混合备份[17-18]。
1)主动备份:系统中的计算节点都有一个独立的备份节点,备份节点和主节点拥有一样的资源和计算逻辑,系统正常运行时,二者处理一样的数据流,因此主节点和备份节点的状态可以实现同步,当然备份节点的输出会被丢弃或者缓存起来,这取决于不同的实现方式,只有主节点的输出会传向下游计算节点。当系统中发生节点故障时,自动切换到备份节点,因为主节点和备份节点状态是同步的,只有切换时间代价,没有状态恢复的时间代价。该策略可以实现最低延迟的故障处理,但仅考虑单一节点备份就需要消耗2倍的软硬件资源,对于大型系统来说负担较重。双集群备份是一种典型的主动备份方法。
2)被动备份:对于系统中的每一个节点,定期将节点的状态通过快照或者其他形式保存到备份节点上,备份节点可以获取主节点的计算逻辑,当系统发生故障时,备份节点从最近一次缓存的状态开始恢复,输入数据可以一起放入快照或者缓存在输出队列中。该策略的具体的实现方式较多,但是不可避免的是节点状态存在恢复过程,可以满足对故障恢复时间要求不严格的场景需求,但仍无法满足诸如“双十一”等关键业务的严格需求。被动备份策略往往采用节点冷启动的方案,因此资源需求相对较低。分布式检查点是一种典型的被动备份方法。
3)混合备份[19-22]:该策略综合考虑了上述不同策略的优缺点,通过部分热启动的备份节点实时进行状态同步。系统正常工作时,备份节点会对同步的状态进行恢复计算,发生故障时,主节点计算逻辑就可以直接切换到备份逻辑上。例如,相关工作FP4S(Fragment‑based Parallel State Recovery for Stateful Stream Applications)[23]借鉴链式复制的思想将任务组织成环形一致性哈希(Consistent Hashing)进行互备,建立路由和邻居表来选择状态的地理优先备份,减少网络延迟;同时使用纠错码将任务内存状态分块写入备份节点,在恢复时并行拉取状态块来提高恢复速度,其后续工作SR3(Customizable Recovery for Stateful Stream Processing Systems)[24]更进一步优化了状态备份的选择性。
混合策略可以有效结合各种容错机制的优点,充分利用资源降低开销,加速故障恢复。而开源流式系统目前还是使用单一容错策略居多,因此存在进一步优化的空间。Apache Storm作为最早的流式系统,是原生的流式系统,采用被动备份和消息确认机制来实现容错,但只能提供至少一次投递语义,容错能力薄弱,且消息确认机制导致系统吞吐量不高;Apache Spark的Spark streaming部分属于流处理框架,底层采用弹性分布式数据集(Resilient Distributed Dataset, RDD)来实现流计算,但本质上是“微批”的处理思想,在处理延迟上存在不足(尽管Spark Streaming的底层已经开始向流处理原生框架迁移,但是在适配方面还是略微逊色);Apache Flink目前是最火热的开源流处理框架之一,原生流处理框架使Flink可以实现毫秒级的事务,并且相比Storm又具有更高的吞吐量,但是Flink本身的容错机制也存在一些问题,因此,本文针对Flink系统,提出了一种混合备份方法,设计并实现了Flink+系统,通过主动状态同步与被动计算来实现高效的故障恢复。
Apache Flink属于原生的流处理架构,但Flink也同时支持批处理,它把批处理当作流处理中的一种特殊情况,用流处理来模拟批处理,本文在此只讨论Flink的流处理框架。
在Flink中,所有的数据都被看作流的一部分,这种抽象很接近于现实世界。互联网数据往往都是事件流,从一个数据库转移到另外一个数据库,进行一些操作,生成新的事件等,每个事件还往往伴随着其被处理的时间,而传统的批处理对时间信息不敏感,因此无法通过时间信息获取更多数据信息,这在电商经济中尤为凸显,用户点击和购买对于商家的推荐有明显的影响,并且推荐要实时,否则就可能错失用户。Flink作为最新一代的原生流处理框架,在事件处理时延上可以实现毫秒级别的延迟,其稳定性和可扩展性非常适合大规模集群的数据处理,本文的实验也基于Flink进行。
Flink流处理框架由两类运行时进程构成:JobManager和TaskManager。在后续讨论过程中将前者称之为作业管理器,后者称之为执行管理器。
作业管理器负责协调申请各种资源、对流处理任务建模、执行快照等。调度器(Scheduler)负责调度子任务(SubTask)的运行,快照协调器(Checkpoint Coordinator)负责分布式快照相关的逻辑。执行管理器是任务实际运行的地方,执行管理器中的任务槽(Task Slot)作为最小任务执行单位,由作业管理器申请使用。每个Task Slot同一时刻只能处理一个任务。不同的Task Slot通过本地消息队列或者网络传递数据。Flink采用Akka作为底层高并发的运行时,各组件通过Actor模型通信。
接下来详细介绍Flink目前的容错恢复机制[1]。Flink的容错机制属于被动备份策略,主要通过分布式快照实现,周期性地把系统算子状态备份到远端持久化存储,当系统发生故障后,利用上一次快照的状态重新计算并恢复状态。
Flink的分布式快照算法是基于Chandy‑Lamport算法[11]修改后实现的异步算法[12]。Chandy‑Lamport算法用来对分布式系统状态做快照,把分布式系统的全局状态和链路状态记录下来用以故障恢复或死锁检测等。Flink对其进行修改后,实现了一个轻量级的异步分布式快照算法。
Flink流计算模型中包含Source operator作为数据输入源算子、Transformation operator作为变换算子和Sink operator作为数据输出算子三种算子类型。
Flink系统由一个快照协调器协调全局状态快照,该协调器会周期性触发快照消息给Source operator,该算子在收到消息后,会首先把自身状态备份到快照中,然后生成一个barrier消息传递给下游全部算子。该barrier消息标有对应的序号并且和普通数据共享同一通路,当下游算子处理到barrier时,会触发自身的快照,如果算子有多个输入通道,那么当每个输入通道都接收到对应的barrier后才会触发算子的状态快照,否则输入通道会暂时阻塞该通道的输入数据直到快照开始。下游算子把状态备份到快照后同样生成新的barrier,并且传递给自己的全部下游算子;当Sink operator收到输入通道的全部对应一致序号的barrier后,本次快照完全结束,并且通知快照协调器。之后由协调器把备份的状态发送到持久化存储中。
Flink的这种异步快照方式可以在不影响其他算子正常计算的情况下完成整个执行图的状态备份,将快照和数据流处理完美地融合在一起。
针对上述提到的阻塞情况,如果因为系统负载不均,导致算子某个输入通道被阻塞较长时间会对算子计算造成影响,因此Flink对这种情况设计了非对齐的快照算法:即当第一个输入通道收到barrier后,就立即向下游算子广播该barrier;同时立即开始状态备份,还会把还没有接收到对应标号的输入通道的数据全部备份起来。这种情况下的快照算法需要算子记录输入流的部分信息,与原始Chandy‑ Lamport算法较为相似。这种非对齐的快照会导致备份状态的数据量变大,同时在状态恢复时需要对状态和保存的输入流数据重新计算,增加了恢复时间。
当发生单一节点故障时,Flink系统会计算故障区域,因为流计算算子联系比较紧密,下游算子的故障会导致上游算子停止计算,陷入等待,进而使得整个流计算全部停止。Flink系统首先会把远程存储的快照状态拉取到本地,释放之前计算任务的资源,把之前的计算任务全部取消,然后重新拉起全部计算任务,分配资源,并且把快照状态分配给对应的算子,将数据源消费偏移指向快照时刻的位置。因此,Flink在开启快照后的数据源必须具备重放的能力,以满足故障后重新处理部分输入数据的需求。从Flink目前的容错机制可以明显看出其存在的部分问题:单一节点故障导致整个计算图的重启;状态恢复过程后数据重新计算不可避免。
因为Flink以批量的数据进行快照,相比Storm而言更加轻量,但是不可避免的是在每一次恢复时数据会有回滚计算,而且对于某些场景,快照的周期不能过短,否则会给系统带来较大的负担,影响算子的计算性能,因此这种情况下故障恢复所需要的时间也比较长,可能无法满足场景需求。以研究的应用场景为例,Flink系统可以实现容错,但是无法满足故障后快速恢复,如果将快照周期缩短,会给系统带来较大的运行负担,影响正常的算子计算。Flink提供至少一次和精确一次的处理语义
本文设计的容错方案基于快照技术进行优化。一方面,针对大数据场景下全量状态备份数据量巨大从而导致备份开销大的问题,通过采用增量式的状态备份(将快照间隔内的状态改变量定义为状态的增量变化,即状态增量),减少快照数据传输量,加快快照过程,进而缩短快照周期,从而使得故障后回滚时间变短,加快故障恢复。之所以采用状态增量是因为大数据流计算场景下,任务的状态数据量通常很大,而快照时间间隔内的状态改变量通常相对较小。如果每次快照都把全部状态复制备份,那么相邻两次快照有很多冗余的备份以及网络传输。采用增量状态,就可以在每次快照时只对状态改变量进行备份传输,在远端存储进行全量状态的恢复,可以大幅减小快照时的网络带宽压力。另一方面,针对故障后系统在恢复过程中的时间开销问题,通过采用备份节点以及上游输出缓存技术,实现故障后任务的快速切换,并且直接消费上游缓存的输出数据进行状态恢复,降低故障恢复时间代价,避免单点故障向上游扩散导致系统全局回滚。
将上述容错方案应用到Flink系统,改进后的系统称为Flink+,具体的设计分为以下两个阶段:
1)快照阶段:采用RocksDB作为状态后端的Flink系统,在增量快照时会将每个任务的快照周期内状态改变量发送到持久化存储远端,基于此,可以利用持久化远端存储的增量状态,在备份任务上提前进行重放(Replay),保证备份任务和主任务的检查点状态一致性。因为传输的快照状态主要为周期内改变量,所以网络带宽占用会显著下降。
2)故障恢复阶段:为了加快故障恢复,在系统启动时,提前在备份任务和上下游任务建立数据通路,而不是故障后再新建任务并建立通路。但该备份数据通路只在故障时工作,正常情况下不用于数据交换。故障发生时,通过备份数据通路可以做到快速的任务切换,而备份任务已有上次快照时的状态,可直接利用上游缓存重新计算进行状态追赶,避免计算状态的全局回滚和恢复任务的启动时间。
整个系统架构如图1所示,Flink+将上层应用的流计算转换为对应的工作流图,同时选择部分算子构建备份流图。在每次快照时同步主算子和备份算子之间的状态,构建备份算子和上游算子的数据通路。当有备份的算子发生故障时,系统可以利用备份算子和提前建立的数据通路以及上游输出缓存来快速恢复丢失的状态。
图 1 Flink+系统架构
Flink系统的快照是基于Chandy‑Lamport算法的异步改进版,通过引入barrier消息,实现了在不停止系统正常工作的情况下,完成系统整体状态的备份。但是Flink系统在遇到节点故障后只能通过系统整体回滚到上一次快照状态的方式来恢复系统状态。因此可以将关键节点的状态和输入都备份起来,在故障发生时直接利用备份的信息恢复,这种做法可以控制回滚区域以及加快故障恢复的过程。具体介绍如下:
1)备份节点:本文基于Flink的快照技术,引入了备份节点的概念(在Flink中是备份任务),如图2所示,备份节点拥有主节点的静态资源,主要负责状态同步,不进行计算,运行开销较低。
图 2 基于备份的快照
把快照的状态增量同步给备份节点,备份节点通过合并增量状态与上次同步的状态,使主备份节点的状态在每一次检查点都是一致的;备份节点的数据通路在正常运行时处于关闭状态,当系统某个任务发生故障时,备份任务的数据通路可以立即打开,并基于同步的状态和上游任务缓存的输出进行恢复,从而避免系统整体重启。
2)链式备份:针对备份节点的组织形式,本文将流式处理系统中的有状态任务组织成多条链式结构[25-26],这里借鉴链式复制的思想。假设每个任务连成一条链(其中为用户指定的容错参数,一般取=3)。成链的方法有多种选择,比如考虑到故障问题,一条链的节点由不同机器上的节点组成,可以避免因机器故障导致链上全部节点故障;或者在考虑网络带宽的情况下,将链上的节点组织成为地理上直接连通的节点,减小网络延迟。具体来说,可以根据任务有向无环图(Directed Acyclic Graph,DAG)对成链方式进行选择,对不同的需求适配不同的成链方式,成链方式的选择作为本文的一个后续研究方向。
在无故障运行阶段,链上的每一个节点(即任务)都周期性地向其后继节点(即备份任务)同步任务状态和计算逻辑(仅需同步一次),后继节点作为前序节点的备份节点存在。考虑到Flink目前的三层执行图模式,可以将成链逻辑放在Job Graph或者Execution Graph。因为任务具体执行时分配单位是Execution,所以成链逻辑在Execution Graph可以更加有效地利用分配方式选择成链方式。在无故障运行阶段,可以利用Flink流式系统中流式快照的机制;不同的是Flink快照技术中任务在接收到快照消息后仅将自己的状态写入到持久化存储中,而Flink+系统链式地将状态增量同步到链上的后继节点直到链尾节点。备份任务拥有主任务的计算逻辑但无故障运行时不进行计算,只做状态同步,为了降低内存占用,备份任务采用RocksDB作为状态存储后端。
在无故障运行时,任务会在每一次快照点把状态信息发送给备份任务,备份任务基于主任务的状态信息维护和主任务一致检查点状态。备份任务同时向后继节点发送状态信息,使整个链的检查点状态一致。这样,链上的每个节点都会拥有前序节点的同步状态,方便故障时的恢复计算。
3)上游输出缓存:为了避免故障恢复的时候全局回滚,本方案采用了上游备份机制。如图3所示,在正常运行过程中,任务向下游发送自己的输出后,并不清理本地的这些输出数据buffer,而是缓存下来,使用空间超过内存限制则溢出到磁盘,并在收到快照消息时记录当前输出的offset,当下游节点完成状态同步后,会向上游发送清理输出的消息,此时上游任务清理掉offset之前缓存的输出数据以减少内存占用。当故障发生时,只将故障任务切换到备份任务,并重新消费上游备份的输出,其他任务仍执行之前的计算。
4)消息ID去重:考虑到故障恢复后,备份节点重新消费数据并向下游发送输出,而故障发生之前故障节点可能已经发送过部分相同的数据,此时下游节点则可能会处理相同的数据,从而不满足精确一次的投递语义。本文方案给每个消息都编上全局唯一的ID,并在每个任务中用RocksDB维护一个已处理消息ID的集合,当检测到消息ID在这个集合中时,则直接丢弃不进行处理,为了加快检测速度,可以采用布隆过滤器进行过滤,当布隆过滤器无法判断时再访问RocksDB进行确定。为了减少资源占用,系统会定期清理布隆过滤器。考虑到对于有精确一次投递语义需求的事务,消息ID去重是必须的,而对于较宽松的投递语义不是必要的,且对于事务型处理来说,精确一次的投递语义有多种实现方式,因此本文实验未对消息ID进行测试。
图 3 上游备份
总之,虽然本文方案仍然采用快照的基本机制,但是通过备份节点的方式,利用部分额外资源对主节点状态的同步备份,可以有效节省故障后系统重启计算的代价,缩短快照周期,防止故障发生时不必要的全局数据回滚。消息去重对精确一次语义才会起作用,对于至少一次语义来说是不必要的,因此该功能可以根据上层应用对一致性的需求开启或者关闭。
当单点故障发生时,直接将故障任务切换到备份任务,此时备份任务启动计算逻辑准备计算,同时上游任务将缓存的输出恢复到上一次快照的offset,并向下游发送备份的输出数据,备份任务基于同步的状态重新消费上游输出数据进行状态恢复并向下游输出。由于仅进行任务的切换,单点故障下流式处理可以被快速恢复。
通过提前建立备份任务和上游任务以及下游任务的数据通路,在故障恢复阶段,系统可以快速地将故障节点切换到其链上的后继节点,并重新消费计算图中上游节点的备份输出,恢复流式计算。同时在集群中重启故障节点任务,重启成功后的节点可以采用追赶备份任务状态的方式,在二者状态同步时再次切换;或者可以把重启后的节点作为新的备份节点添加到链的末尾。整个过程如图4所示,备份算子的存在使恢复过程可以在极短的时间内开始,而且不影响上游算子的正常工作。
图 4 故障恢复
在Flink系统中,由于采用RocksDB作为状态存储,备份任务可能同时备份多个节点的状态,进一步还受限于所在机器的内存、CPU等资源,计算可能比较慢,效率相比主节点可能会比较低,有可能触发系统的反压机制,降低系统整体性能。因此本文方案会同时原地重启主任务并接管原有RockDB状态,即和备份任务同步状态,若无法原地重启,则在其他机器上重启,此时该状态为空,可以将其挂到备份任务的链尾。然后主任务通过其前序节点进行状态追赶。当完成状态同步后,在备份任务接收到快照消息并完成状态同步后,将重启的主任务重新恢复成链头,并将计算切换到主任务。至此,整个故障恢复完成。
Flink原始容错过程如图5所示。主任务在计算过程中发生故障,任务管理器在感知到故障后会首先释放故障任务的资源并停止整个执行图的计算,然后开始推导需要恢复的算子区域;之后,重新拉起故障区域的任务,并基于快照存储的上一次状态和可重放数据源进行任务的状态恢复,系统重启后重新消费自上一个快照开始的数据。
可以看出Flink原方案存在的问题在于:单个算子故障导致系统整体重启,并且回滚后系统需整体重新处理自上一次快照的数据。对于简单的流处理任务,系统整体重启的代价相对较小,但是对于较大规模的流理系统,系统算子大规模重启的代价对于实时性来说是不可接受的。
图 5 Flink系统在单点故障发生后的流程
改进后Flink的容错过程如图6所示。本文设计的容错机制分为状态备份、主备份任务切换、上游输出缓存、备份任务状态恢复几个模块。
图 6 Flink+系统在单点故障发生后的流程
Flink的快照机制会把执行图算子状态保存到持久化存储中,以便在故障时拉取用于恢复,本方案基于此,在每次快照做持久化的同时把状态同步到备份任务,使备份任务维持和主任务一样的快照状态。为了方便实现,改进措施直接利用Flink原有的状态恢复过程,在快照时,备份节点进行状态“恢复”,即同步状态。在成链方式上,因为备份节点和主节点之间的联系属于备份层面的,并不是实际流处理的数据通路,因此成链作为在Flink三层图结构中Execution的一个单独抽象来对待,通过记录节点之间的成链关系,在快照时同步状态,在故障时切换,把上游节点的输出切换到备份节点的通路上。同时,因为使用了RocksDB作为状态后端,在快照时,可以利用RocksDB本身的changelog实现增量式的状态备份,即:在每次快照时只发送新增的或者压缩的状态文件,未改变的状态文件不再进行传输。增量形式使快照传输的网络流量大幅降低,后续实验结果也体现了这一点。
备份任务在正常情况下启动后就直接挂起,只在状态同步时工作,不触发任何计算逻辑,不占用CPU资源。当主任务发生故障时,系统检测到后会立即把主任务挂起,把数据通路切换到备份任务上,并且启动备份计算逻辑。这种情况下,备份节点时刻处于热启动状态,但是几乎不占用CPU资的计算资源。
在Flink系统中,正常的流处理执行图中的上下游任务会通过数据通路channel实现数据交换,且上下游任务之间的channel是一一对应关系;发生故障后,备份任务可以通过重新建立和上游任务以及下游任务的数据通路来代替主任务执行计算,但是这一过程需要花费一定的时间,且随着计算图规模变大,花费时间越多。因此,为了实现快速的任务切换,本文的方案会让备份任务和主任务一样,提前把数据通路建立好并通过标记来控制数据的交换。
Flink的数据通路在向下游发送数据时,可以通过添加标记来控制数据是否被实际发送,改进后的系统同样通过一个flag实现数据通路的控制,在正常情况下,该flag值为true,会使得上游算子的数据只发送给主任务,发生故障后,会将flag置为false,使数据可以被备份任务的数据通路接受。
Flink的每个任务会把输出缓存在buffer中,由管理器通知下游算子buffer数据可以被消费,本文基于Flink的buffer,将其缓存下来,在每个快照周期清理一次,仅保存上一次快照期间的输出数据,以满足故障恢复的需求。对于快照时的offset,Flink会一起保存到每一次快照中,因此在故障后恢复状态时,offset会自动指向上一次快照时的输出位置;改进措施则是在备份节点中同步记录该offset值,在故障时直接使用备份节点的offset。如果buffer使用的内存超过了限额,Flink会使用磁盘来缓存。有了上游输出备份,在故障后下游算子可以直接消费上游算子缓存的数据恢复状态,而不需要从头开始消费数据,很大程度上减弱了故障的影响。Flink系统本身提供了从持久化存储中拉取到本地的状态文件来恢复状态的API,同时,因为Flink的不同模块的通信由Akka提供,两个任务之间没有直接通信方式,均通过执行管理器来调度;因此为了方便实现状态同步,在系统设计中,基于上述API进行改进,结合快照协调器的消息,当快照结束后,备份任务收到执行管理器的消息便开始从持久化远端拉取对应的状态文件并在本地恢复。这种方式比主任务直接把快照状态文件传输给备份任务要花费更多的时间,但是实现更加简单。
故障后备份任务基于上游缓存的数据和之前同步的主任务状态来进行状态恢复,但是此时上游算子的计算并没有被停止,系统整体仍处于运行状态,故障算子则在恢复后从故障时间点开始重新处理上游缓存的数据,追赶备份任务的状态。考虑到这一点,系统可能会因为故障算子的重复处理产生反压问题,但考虑到故障能够被很快恢复,反压问题可能并不严重。
Flink+在面临算子子任务崩溃时的恢复过程整体变得较为复杂,但是避免了整个执行图的重启,只对故障的算子进行重启,同时备份算子可以立即切换过来执行计算任务,加快了恢复过程
为了验证本文提出的轻量级故障恢复方案,在单机模式和分布式环境下分别对Flink系统(1.13.0版本)和改进后的Flink+系统进行测试。实验采用WordCount任务来测试流式系统的计算、备份、故障恢复能力,数据源由英文版《哈利波特》构成。
1)实验目的。针对流式系统备份开销问题和故障恢复延迟问题,本文设计一种基于增量状态备份的快照容错方案,并在Apache Flink系统上实现了原型,本实验的目的是为了验证该方案的可行性和有效性,探究增量状态对快照速度的影响以及上游输出备份和状态备份对故障恢复速度的影响。
2)实验内容。利用WordCount流式计算任务,对Flink+系统的故障容错能力进行验证。实验对单机和分布式集群下的Flink系统和Flink+系统进行对比探究,基于不同任务并行度,测试了Flink和Flink+的故障恢复速度。为了验证改进部件对Flink系统本身的影响可以忽略不计,实验也对同一任务下两个系统的CPU占用率和内存占用率进行了测试。
3)单点故障。Flink中最小运行单位是Execution任务,会被分配给一个Java虚拟机(Java Virtual Machine, JVM)线程执行,本文将其在运算过程发生异常导致自身崩溃的问题定义为单点故障。
4)机器故障。单个机器上可能运行多个JVM,每个JVM可以运行多个执行管理器(TaskManager),每个执行管理器可以调度运行多个Execution任务。机器的故障会导致多个JVM的故障,进而导致多个单点故障。机器故障可以具化为多个单点故障,因此机器故障导致的多个单点故障如果相互无关,那么可以被视为多个单点故障的处理;如果多个单点故障有关系,可以把有联系的单点故障认为是一个统一的大的单点故障,其恢复流程和普通的单点故障基本一致。因此实验过程只针对单点故障。
实验用到的物理机均为16核32线程的Linux机器,CPU为Intel Xeon E5‑2650,主频为2.00 GHz,每台机器的内存为256 GB,操作系统均为Ubuntu 16.04.10。
Flink系统的部署主要分为Taskmanager和Jobmanager两个部件:Jobmanager负责整个流式任务的调度执行;Taskmanager负责最小粒度单位“并行任务”的执行,其内部有slot,是用户代码实际执行的地方。
单机实验模式下,实验环境由1台16核32线程的机器构成,Flink组件由1个Jobmanager和16个Taskmanager构成,每个Taskmanager仅包含一个slot执行单位。
分布式集群实验模式下,实验环境由8台16核32线程的机器构成,Flink组件由1个Jobmanager(部署在控制机器上1上)和16个Taskmanager(每台机器部署2个Taskmanager)构成,同样的,每个Taskmanager仅由一个slot构成。
这里每个Taskmanager仅分配一个slot是为了使任务之间通过网络通信而不是Taskmanager内部的内存空间通信,尽可能模拟真实大数据场景下的网络通信场景。
从表1的实验结果可以看出,基于本文设计改进后的Flink+系统在单机模式下的整体表现都明显优于原Flink系统,其在系统故障后的恢复时间可以达到数十毫秒到数百毫秒级,而Flink系统面对单点故障需要注销原有资源,重新拉起任务,恢复时间在秒级。
表 1 单机模式下Flink和Flink+系统恢复时间对比
为了探究并行度对故障恢复的影响,本文针对不同的并行度进行了实验测试。实验发现,对于同一任务的不同并行度,随着任务并行度的增大,恢复时间减小比例基本上在逐渐增大,说明改进后的系统在高并行度下表现更加良好,相较于原Flink系统,更能适应并行化任务。观察实验结果还能发现随着并行度的增加,两个系统的故障恢复时间都呈现先减小后增大的趋势。
造成这一现象的主要原因:单机下资源总数受限,一方面,提高并行度可以充分利用CPU的并行能力加快故障恢复,缩短恢复时间;而另一方面,并行任务增多意味着CPU负荷变大,一定程度上会延长恢复时间。当并行度逐渐接近CPU核数时,提高并行度带来的增益被多任务的负荷抵消,甚至产生负增益,导致故障恢复时间增加。因此,可以看出流式任务的并行度并不是越大越好,适中的并行度有利于故障后的恢复。
考虑到实际环境中流计算任务往往是分布式进行,本文对分布式环境下的Flink+系统和Flink系统的故障恢复表现进行了实验测试。从表2的实验结果可以看出,改进后的Flink+系统在分布式环境下同样表现优异,基本能够维持在200 ms以内的故障恢复时间。但是因为分布式环境下不可避免的网络通信延迟,系统在高并行度任务上并没有单机模式那样优异的表现,本研究认为这主要是网络延迟带来的影响。
表 2 分布式环境下Flink和Flink+系统恢复时间对比
因为单机模式和分布式集群的区别主要在于Taskmanager的部署位置不同,二者对于资源的使用大致相同,而实验中给每个Taskmanager仅配置了1个slot资源,因此在分布式环境下当并行度达到32时,并行任务之间需要共享slot资源,此时,并行任务带来的负增益大于并行计算带来的正增益,因此整体恢复时间变长,正如实验结果显示,在并行度为32时,Flink系统和改进后的Flink+系统表现均有所下降。
为了验证本文的容错机制对于流式系统本身并没有较大的影响,即可以在满足轻量性的同时加快系统在故障时的恢复,本文针对Flink系统和改进后的Flink+系统进行了CPU和内存测试实验。
实验由2台16核32线程的机器构成的集群进行,机器1负责kafka数据读入和Jobmanager执行,机器2负责Taskmanager任务执行。通过对机器2上运行的Taskmanager的检测,实验获取了执行同样任务处理同样数据情况下两个系统的CPU和内存占用率结果,为了方便获取数据,本实验在较小规模下进行,仅在机器2上开启了两个Taskmanager进行测试。
如图7所示,Flink和Flink+的整体执行过程几乎完美吻合,二者在任务过程中占用率几乎相等,CPU占用率维持在200%~300%,这说明改进后的Flink+并没有给系统带来较大的CPU负担。通过计算两个系统的平均CPU占用率可得:Flink系统的平均CPU占用率为264%,Flink+系统的平均占用率是258.8%,考虑到其他进程以及CPU占用率的波动,可以认为二者占用率基本一致,即本文的改进没有给Flink系统带来较大的负担。
在内存方面,二者的占用率也维持在相同的水平,实验结果显示并无差距。
综合实验结果,可以得出如下结论:本文提出的轻量级快照容错方案可以大幅减少故障恢复时间;流式任务的并行度太高会导致故障后恢复时间增加;本文提出的方案不会给流式系统本身带来显著CPU压力和内存压力。
图 7 Flink与Flink+的CPU占用率对比
本文针对流式系统分布式快照机制故障恢复慢的问题,提出了一种基于增量状态备份的故障恢复方法Flink+,通过增量式状态同步实现了快速状态备份,通过上游输出缓存与提前网络连接进一步细化了故障恢复粒度并实现了快速故障恢复。实现结果表明,Flink+能够在不显著增加额外CPU、内存开销的同时实现6~8倍的故障恢复加速。
[1] ZHANG Y Y, LI J X, ZHANG Y M, et al. FreeLauncher: lossless failure recovery of parameter servers with ultralight replication[C]// Proceedings of the IEEE 41st International Conference on Distributed Computing Systems. Piscataway: IEEE, 2021: 472-482.
[2] ZHANG Y Y, LI J X, SUN C G, et al. HotML: a DSM‑based machine learning system for social networks[J]. Journal of Computational Science, 2018, 26: 478-487.
[3] CARBONE P, KATSIFODIMOS A, EWEN S, et al. Apache Flink: stream and batch processing in a single engine[J]. Bulletin of the IEEE Computer Society Technical Committee on Data Engineering, 2015, 38(4): 28-38.
[4] CARBONE P, EWEN S, FÓRA G, et al. State management in Apache Flink: consistent stateful distributed stream processing[J]. Proceedings of the VLDB Endowment, 2017, 10(12): 1718-1729.
[5] GARCÍA‑GIL D, RAMÍREZ‑GALLEGO S, GARCÍA S, et al. A comparison on scalability for batch big data processing on Apache Spark and Apache Flink[J]. Big Data Analytics, 2017, 2: No.1.
[6] MENG X R, BRADLEY J, YAVUZ B, et al. MLlib: machine learning in Apache Spark[J]. Journal of Machine Learning Research, 2016, 17: 1-7.
[7] ZAHARIA M, DAS T, LI H Y, et al. Discretized streams: fault‑tolerant streaming computation at scale[C]// Proceedings of the 24th ACM Symposium on Operating Systems Principles. New York: ACM, 2013: 423-438.
[8] ZAHARIA M, CHOWDHURY M, DAS T, et al. Resilient distributed datasets: a fault‑tolerant abstraction for in‑memory cluster computing[C]// Proceedings of the 9th USENIX Symposium on Networked Systems Design and Implementation. Berkeley: USENIX Association, 2012: 15-28.
[9] ZAHARIA M, DAS T, LI H Y, et al. Discretized streams: an efficient and fault‑tolerant model for stream processing on large clusters[C]// Proceedings of the 4th USENIX Workshop on Hot Topics in Cloud Computing. Berkeley: USENIX Association, 2012: No.10.
[10] TOSHNIWAL A, TANEJA S, SHUKLA A, et al. Storm@Twitter[C]// Proceedings of the 2014 ACM SIGMOD international Conference on Management of Data. New York: ACM, 2014: 147-156.
[11] IQBAL M H, SOOMRO T R. Big data analysis: Apache Storm perspective[J]. International Journal of Computer Trends and Technology, 2015, 19(1): 9-14.
[12] NOGHABI S A, PARAMASIVAM K, PAN Y, et al. Samza: stateful scalable stream processing at LinkedIn[J]. Proceedings of the VLDB Endowment, 2017, 10(12): 1634-1645.
[13] CHANDY K M, LAMPORT L. Distributed snapshots: Determining global states of distributed systems[J]. ACM Transactions on Computer Systems, 1985, 3(1): 63-75.
[14] CARBONE P, FÓRA G, EWEN S, et al. Lightweight asynchronous snapshots for distributed dataflows[EB/OL]. (2015-06-29)[2021-12-15].https://arxiv.org/pdf/1506.08603.pdf.
[15] 段泽源. 大数据流式处理系统负载均衡与容错机制的研究[D]. 北京:华北电力大学, 2017: 28-30.(DUAN Z Y. Research on load balancing and fault tolerant mechanism of big data stream processing system[D]. Beijing: North China Electric Power University, 2017:28-30.)
[16] 孙大为,张广艳,郑纬民. 大数据流式计算:关键技术及系统实例[J]. 软件学报, 2014, 25(4):839-862.(SUN D W, ZHANG G Y, ZHENG W M. Big data stream computing: technologies and instances[J]. Journal of Software, 2014, 25(4): 839-862.)
[17] LI H L, WU J, JIANG Z, et al. Integrated recovery and task allocation for stream processing[C]// Proceedings of the IEEE 36th International Performance Computing and Communications Conference. Piscataway: IEEE, 2017: 1-8.
[18] LI H L, WU J, JIANG Z, et al. Task allocation for stream processing with recovery latency guarantee[C]// Proceedings of the 2017 IEEE International Conference on Cluster Computing. Piscataway: IEEE, 2017: 379-383.
[19] AKIDAU T, BALIKOV A, BEKIROĞLU K, et al. MillWheel: fault‑tolerant stream processing at Internet scale[J]. Proceedings of the VLDB Endowment, 2013, 6(11): 1033-1044.
[20] GUO J, AGRAWAL G. Smart Streaming: a high‑throughput fault‑ tolerant online processing system[C]// Proceedings of the 2020 IEEE International Parallel and Distributed Processing Symposium Workshops. Piscataway: IEEE, 2020: 396‑405.
[21] LIN C F, ZHAN J J, CHEN H H, et al. Ares: a high performance and fault‑tolerant distributed stream processing system[C]// Proceedings of the IEEE 26th International Conference on Network Protocols. Piscataway: IEEE, 2018: 176-186.
[22] VENKATARAMAN S, PANDA A, OUSTERHOUT K, et al. Drizzle: fast and adaptable stream processing at scale[C]// Proceedings of the 26th Symposium on Operating Systems Principles. New York: ACM, 2017: 374-389.
[23] LIU P C, XU H L, DA SILVA D, et al. FP4S: fragment‑based parallel state recovery for stateful stream applications[C]// Proceedings of the 2020 IEEE International Parallel and Distributed Processing Symposium. Piscataway: IEEE, 2020: 1102-1111.
[24] XU H L, LIU P C, CRUZ‑DIAZ S, et al. SR3: customizable recovery for stateful stream processing systems[C]// Proceedings of the 21st International Middleware Conference. New York: ACM, 2020: 251-264.
[25] RENESSE R van, SCHNEIDER F B. Chain replication for supporting high throughput and availability[C]// Proceedings of the 6th Symposium on Operating Systems Design and Implementation. Berkeley: USENIX Association, 2004: 91-104.
[26] TERRACE J, FREEDMAN M J. Object storage on CRAQ: high‑ throughput chain replication for read‑mostly workloads[C]// Proceedings of the 2009 USENIX Annual Technical Conference. Berkeley: USENIX Association, 2009: No.11.
Efficient failure recovery method for stream data processing system
LIU Yang1,2,3, ZHANG Yangyang1,2, ZHOU Haoyi1,2,4*
(1,,100191,;2,,100191,;3,,100191,;4,,100191,)
Focusing on the issue that the single point of failure cannot be efficiently handled by streaming data processing system Flink, a new fault‑tolerant system based on incremental state and backup, Flink+, was proposed. Firstly, backup operators and data paths were established in advance. Secondly, the output data in the data flow diagram was cached, and disks were used if necessary. Thirdly, task state synchronization was performed during system snapshots. Finally, backup tasks and cached data were used to recover calculation in case of system failure. In the system experiment and test, Flink+ dose not significantly increase the additional fault tolerance overhead during fault‑free operation; when dealing with the single point of failure in both single‑machine and distributed environments, compared with Flink system, the proposed system has the failure recovery time reduced by 96.98% in single‑machine 8‑task parallelism and by 88.75% in distributed 16‑task parallelism. Experimental results show that using incremental state and backup method together can effectively reduce the recovery time of the single point of failure of the stream system and enhance the robustness of the system.
stream data processing system; failure recovery; distributed checkpoint; state backup; Apache Flink
This work is partially supported by National Natural Science Foundation of China (U20B2053, 61872022), Open Project of State Key Laboratory of Software Development Environment (SKLSDE‑2020ZX‑12).
LIU Yang, born in 1999, Ph. D. candidate. His research interests include distributed systems, graph processing systems.
ZHANG Yangyang, born in 1991, Ph. D. candidate. His research interests include distributed systems, machine learning, graph processing.
ZHOU Haoyi, born in 1991, Ph. D., lecturer. His research interests include big data system, machine learning.
1001-9081(2022)11-3337-09
10.11772/j.issn.1001-9081.2021122108
2021⁃12⁃15;
2022⁃02⁃27;
2022⁃03⁃04。
国家自然科学基金资助项目(U20B2053, 61872022);软件开发环境国家重点实验室开放课题(SKLSDE‑2020ZX‑12)。
TP311.5
A
刘阳(1999—),男,山西大同人,博士研究生,CCF会员,主要研究方向:分布式系统、图计算系统;张扬扬(1991—),男,河北保定人,博士研究生,CCF会员,主要研究方向:分布式系统、机器学习、图计算;周号益(1991—),男,四川德阳人,讲师,博士,CCF会员,主要研究方向:大数据系统、机器学习。