流計(jì)算又稱實(shí)時(shí)計(jì)算,是以Map-Reduce為代表的批處理之后的另一種重要計(jì)算模型。隨著互聯(lián)網(wǎng)業(yè)務(wù)的發(fā)展和數(shù)據(jù)規(guī)模的不斷擴(kuò)大,傳統(tǒng)的批量計(jì)算很難快速有效地處理數(shù)據(jù),并且延遲和返回結(jié)果較低。由于數(shù)據(jù)幾乎處于增長狀態(tài),及時(shí)處理和計(jì)算大量數(shù)據(jù)成為批量計(jì)算的一大難題。在此背景下,流量計(jì)算應(yīng)運(yùn)而生。與傳統(tǒng)的批處理計(jì)算相比,流計(jì)算具有低延遲、高響應(yīng)和連續(xù)處理的特點(diǎn)。在數(shù)據(jù)生成的同時(shí),我們可以計(jì)算并得到結(jié)果。更有甚者,Lambda架構(gòu)可以將即時(shí)的流量計(jì)算結(jié)果和延遲的批量計(jì)算結(jié)果結(jié)合起來,更好地滿足低延遲、高精度的業(yè)務(wù)需求。
Twitter由于自身的業(yè)務(wù)特點(diǎn),對實(shí)時(shí)性有很強(qiáng)的需求。因此,在流計(jì)算方面投入了大量的資源。Storm作為第一代流處理系統(tǒng),自發(fā)布以來得到了廣泛的關(guān)注和應(yīng)用。根據(jù)Storm在實(shí)踐中遇到的性能、規(guī)模、可用性等問題,Twitter開發(fā)了第二代流處理系統(tǒng)——Heron[1],并于2016年開放。
重要概念的定義
在開始了解Heron的具體架構(gòu)和設(shè)計(jì)之前,我們先定義一些流量計(jì)算和Heron設(shè)計(jì)的基本概念:
元組:流計(jì)算任務(wù)中處理的最小單位數(shù)據(jù)的抽象。
流:由無限元組組成的連續(xù)序列。
Spout:從外部數(shù)據(jù)源獲取數(shù)據(jù)并生成Tuple的計(jì)算任務(wù)。
Bolt:處理上游噴口或Bolt生成Tuple的計(jì)算任務(wù)。
拓?fù)?通過流連接噴口和螺栓來處理元組的邏輯計(jì)算任務(wù)。
分組:流計(jì)算中的元組分布策略。在通過流向下游螺栓傳遞元組的過程中,分組策略決定如何將元組路由到特定的螺栓實(shí)例。典型的分組策略有:隨機(jī)分布、基于元組內(nèi)容的分布等。
物理計(jì)劃:基于拓?fù)涠x的邏輯計(jì)算任務(wù)和擁有的計(jì)算資源生成的實(shí)際運(yùn)行時(shí)信息的集合。
基于上述流處理的基本概念,我們可以構(gòu)造流處理的三種不同的處理語義:
頂多一次:盡可能多的處理數(shù)據(jù),但是不能保證數(shù)據(jù)會(huì)被處理。吞吐量大,計(jì)算速度快,但計(jì)算結(jié)果存在一定誤差。
至少一次:當(dāng)外部數(shù)據(jù)源允許重放時(shí),確保數(shù)據(jù)至少被處理一次。如果出現(xiàn)錯(cuò)誤,數(shù)據(jù)將被重新處理,同一數(shù)據(jù)可能會(huì)被重新處理多次。數(shù)據(jù)處理得到保證,但延遲會(huì)增加。
恰好一次:每個(gè)數(shù)據(jù)保證只處理一次。結(jié)果是準(zhǔn)確的,但是增加了計(jì)算資源并且降低了計(jì)算效率。
從上面可以看出,三種不同的處理模式各有優(yōu)缺點(diǎn),因此在選擇處理模式時(shí),需要綜合考慮一個(gè)拓?fù)鋵ν掏铝?、延遲、結(jié)果誤差和計(jì)算資源的要求,從而做出最佳選擇。目前,Heron已經(jīng)實(shí)現(xiàn)了最多支持一次和至少一次語義,并且正在向只支持一次語義發(fā)展。
Heron系統(tǒng)概述
保持與風(fēng)暴界面(API)的兼容性是鷺的設(shè)計(jì)目標(biāo)之一。所以Heron的數(shù)據(jù)模型和Storm的數(shù)據(jù)模型基本一致。提交給Heron的每一個(gè)拓?fù)涠际且粋€(gè)有向無環(huán)圖,由噴口和螺栓兩種頂點(diǎn)組成,以流為邊。其中,Spout節(jié)點(diǎn)是Topology的數(shù)據(jù)源,它從外部讀取Topology需要處理的數(shù)據(jù),比如kafka-spout,然后發(fā)送給后續(xù)的Bolt節(jié)點(diǎn)進(jìn)行處理。螺栓節(jié)點(diǎn)執(zhí)行實(shí)際數(shù)據(jù)計(jì)算,如過濾器、地圖和平面地圖。
我們可以把Heron的拓?fù)鋵W(xué)比作數(shù)據(jù)庫的邏輯查詢計(jì)劃。這種邏輯計(jì)劃只有變成實(shí)際的加工計(jì)劃才能實(shí)施。用戶在編寫拓?fù)鋵W(xué)時(shí)指定每個(gè)Spout和Bolt任務(wù)的并行性以及拓?fù)渲泄?jié)點(diǎn)間元組的分組策略。用戶提供的所有信息都是通過打包來計(jì)算的,這些噴口和螺栓任務(wù)被分配給一批抽象容器。最后,通過將這些抽象容器映射到真實(shí)容器,可以生成物理計(jì)劃,它是所有邏輯信息(拓?fù)鋱D、并行性、計(jì)算任務(wù))和運(yùn)行時(shí)信息(計(jì)算任務(wù)和容器之間的對應(yīng)關(guān)系、實(shí)際運(yùn)行地址)的集合。
整體結(jié)構(gòu)
總體來說,Heron的整體架構(gòu)如圖1所示。用戶通過命令行工具向海倫調(diào)度器提交拓?fù)?。然后,調(diào)度程序分配資源并調(diào)度提交的拓?fù)?。同時(shí),多個(gè)獨(dú)立拓?fù)淇梢栽谕毁Y源平臺(tái)上運(yùn)行。
圖1鷺建筑
與風(fēng)暴的服務(wù)架構(gòu)不同,赫倫是一個(gè)圖書館架構(gòu)。Storm在架構(gòu)設(shè)計(jì)上是基于服務(wù)的,所以需要設(shè)置一個(gè)專門的Storm集群來運(yùn)行用戶提交的Topology。在開發(fā)、運(yùn)維、成本等方面都有很多不足。Heron是基于庫的,可以在任何共享資源調(diào)度平臺(tái)上運(yùn)行。最大限度地降低了運(yùn)行維護(hù)負(fù)擔(dān)和成本。
目前Heron支持Aurora、shate、Mesos和EC2,Kubernetes和Docker目前正在開發(fā)中。通過可擴(kuò)展插件Heron Scheduler,用戶可以根據(jù)不同的需求和實(shí)際情況選擇相應(yīng)的運(yùn)行平臺(tái),從而實(shí)現(xiàn)多平臺(tái)資源管理器的支持[2]。
提交運(yùn)行的拓?fù)鋬?nèi)部結(jié)構(gòu)如圖2所示,不同的計(jì)算任務(wù)封裝在多個(gè)容器中運(yùn)行。由調(diào)度程序調(diào)度的這些容器可以在同一個(gè)物理主機(jī)上,也可以分布在多個(gè)主機(jī)上。每個(gè)拓?fù)涞牡谝粋€(gè)容器(容器0)負(fù)責(zé)整個(gè)拓?fù)涞墓芾恚饕\(yùn)行一個(gè)拓?fù)渲鬟M(jìn)程;其他容器負(fù)責(zé)實(shí)現(xiàn)用戶提交的計(jì)算邏輯,每個(gè)容器主要運(yùn)行一個(gè)流管理器進(jìn)程、一個(gè)度量管理器進(jìn)程和多個(gè)實(shí)例進(jìn)程。每個(gè)實(shí)例負(fù)責(zé)運(yùn)行噴口或螺栓任務(wù)。我們將在本文后面的章節(jié)中詳細(xì)分析拓?fù)涔芾砥?、流管理器和?shí)例進(jìn)程的結(jié)構(gòu)和重要功能。
圖2拓?fù)浣Y(jié)構(gòu)
狀態(tài)存儲(chǔ)和監(jiān)控
Heron的State Manager是一個(gè)抽象模塊,具體實(shí)現(xiàn)可以是ZooKeeper,也可以是文件系統(tǒng)。它的主要功能是保存每個(gè)拓?fù)涞母鞣N元信息:拓?fù)涞奶峤徽摺⑻峤粫r(shí)間、運(yùn)行時(shí)生成的物理規(guī)劃、拓?fù)渲鞯牡刂返?。,為拓?fù)涞淖曰謴?fù)提供幫助。
每個(gè)容器中的度量管理器負(fù)責(zé)收集容器的運(yùn)行時(shí)狀態(tài)度量,并將它們上傳到監(jiān)控系統(tǒng)。在當(dāng)前的Heron版本中,簡化的監(jiān)控系統(tǒng)集成在拓?fù)涔芾砥髦?。將來,該監(jiān)控模塊將成為容器0中的獨(dú)立進(jìn)程。蒼鷺還提供了兩個(gè)工具,蒼鷺跟蹤器和蒼鷺用戶界面,以查看和監(jiān)控?cái)?shù)據(jù)中心運(yùn)行的所有拓?fù)洹?/p>
起動(dòng)程序
在拓?fù)渲?,拓?fù)渲鞴?jié)點(diǎn)是整個(gè)拓?fù)涞脑畔⒐芾砥?,它維護(hù)拓?fù)涞耐暾畔ⅰ6鞴芾砥魇敲總€(gè)容器的網(wǎng)關(guān),負(fù)責(zé)實(shí)例間的數(shù)據(jù)通信和實(shí)例與拓?fù)渲骺仄鏖g的控制信令。
用戶提交拓?fù)浜?,調(diào)度程序?qū)㈤_始分配資源并運(yùn)行容器。每個(gè)容器中啟動(dòng)一個(gè)Heron執(zhí)行器進(jìn)程,它將容器0與其他容器區(qū)分開來,并分別啟動(dòng)拓?fù)渲鬟M(jìn)程或流管理器進(jìn)程。在普通容器中,當(dāng)實(shí)例進(jìn)程啟動(dòng)時(shí),它將主動(dòng)向本地容器的流管理器注冊。在收到實(shí)例的所有注冊請求后,流管理器會(huì)將自己負(fù)責(zé)的實(shí)例的注冊信息發(fā)送給拓?fù)渲鳈C(jī)。拓?fù)渲鳈C(jī)在收到所有流管理器的注冊信息后,將生成每個(gè)實(shí)例的物理計(jì)劃和流管理器的實(shí)際運(yùn)行地址,并廣播和分發(fā)它。每個(gè)流管理器收到物理計(jì)劃后,可以根據(jù)物理計(jì)劃建立相互連接,形成一個(gè)完整的圖形,然后開始處理數(shù)據(jù)。
實(shí)例執(zhí)行特定的元組數(shù)據(jù)計(jì)算和處理。流管理器不執(zhí)行特定的計(jì)算和處理任務(wù),只負(fù)責(zé)中繼和轉(zhuǎn)發(fā)元組。從數(shù)據(jù)流網(wǎng)絡(luò)的角度來看,Stream Manager可以理解為每個(gè)容器的路由器。所有實(shí)例之間的元組傳輸通過流管理器進(jìn)行中繼。因此,容器中實(shí)例之間的通信是一個(gè)星型網(wǎng)絡(luò)。所有流管理器都連接在一起形成網(wǎng)狀網(wǎng)絡(luò)。容器之間的通信也是由Stream Manager中繼的,由兩跳中繼完成。
核心成分分析
TMaster
TMaster是拓?fù)渲鳈C(jī)的縮寫。作為主角色,TMaster提供了一個(gè)全局接口來了解拓?fù)涞倪\(yùn)行狀態(tài),這與許多主從分布式系統(tǒng)中的主單點(diǎn)處理控制邏輯具有相同的功能。同時(shí),通過將重要的狀態(tài)信息(物理計(jì)劃)記錄到ZooKeeper中,確保TMaster在崩潰恢復(fù)后可以繼續(xù)運(yùn)行。
當(dāng)實(shí)際產(chǎn)品中的TMaster啟動(dòng)時(shí),它會(huì)在ZooKeeper的指定目錄中創(chuàng)建一個(gè)臨時(shí)節(jié)點(diǎn)來存儲(chǔ)自己的IP地址和端口,以便流管理器可以發(fā)現(xiàn)自己。蒼鷺使用短命節(jié)點(diǎn)的原因包括:
在一個(gè)拓?fù)渲斜苊舛鄠€(gè)主器件。這樣,該拓?fù)涞乃羞M(jìn)程都可以識(shí)別同一個(gè)TMaster;
同一個(gè)拓?fù)渲械倪M(jìn)程可以通過ZooKeeper找到TMaster的位置,然后與之建立連接。
TMaster主要有以下三個(gè)功能:
建立、分發(fā)和維護(hù)物理計(jì)劃;拓?fù)鋵W(xué);
收集每個(gè)流管理器的心跳,確認(rèn)流管理器的存活;
收集和分發(fā)拓?fù)涞闹匾\(yùn)行時(shí)狀態(tài)指標(biāo)。
由于拓?fù)涞奈锢硪?guī)劃只能在運(yùn)行時(shí)確定,因此TMaster是構(gòu)建、分發(fā)和維護(hù)物理規(guī)劃的最佳選擇。在TMaster完成啟動(dòng)并向ZooKeeper注冊后,它將等待所有流管理器與它們自己建立連接。在流管理器和主服務(wù)器之間的連接建立后,流管理器將報(bào)告它自己的實(shí)際IP地址和端口,以及它負(fù)責(zé)的實(shí)例地址和端口。天貓可以建立物理計(jì)劃,并在收到流管理器報(bào)告的所有地址信息后進(jìn)行廣播。所有的流管理器將接收到由TMaster構(gòu)建的物理計(jì)劃,并根據(jù)其中的信息與其他流管理器建立成對的連接。只有當(dāng)所有的連接都建立后,拓?fù)洳艜?huì)真正開始運(yùn)行和處理數(shù)據(jù)。流管理器丟失并重新連接后,TMaster會(huì)檢測其運(yùn)行地址和端口是否發(fā)生了變化;如果發(fā)生變化,物理計(jì)劃將及時(shí)更新并廣播和分發(fā),以便流管理器可以建立正確的連接,從而確保整個(gè)拓?fù)涞恼_運(yùn)行。
TMaster將接收流管理器定期發(fā)送的心跳信息,并維護(hù)每個(gè)流管理器的最新心跳時(shí)間戳。心跳可以首先幫助TMaster確認(rèn)流管理器的存在,然后幫助它決定是否更新流管理器連接和更新物理計(jì)劃。
天貓還接受由指標(biāo)管理器發(fā)送的一些重要指標(biāo),并將這些指標(biāo)提供給海倫追蹤器。蒼鷺跟蹤器可以通過這些度量來確定拓?fù)涞倪\(yùn)行狀態(tài),并使蒼鷺用戶界面能夠基于這些重要的度量來監(jiān)控和檢測。典型的度量是:分配元組的次數(shù)、計(jì)算元組的次數(shù)以及處于背壓狀態(tài)的時(shí)間。
值得注意的是,TMaster本身并不涉及任何實(shí)際的數(shù)據(jù)處理。因此,它不會(huì)接受和分發(fā)任何元組。有了這種設(shè)計(jì),TMaster本身邏輯清晰,重量輕,也為以后的功能擴(kuò)展留下了巨大的空空間。
流管理器和背壓機(jī)制
Stmgr是流管理器的縮寫。Stmgr管理元組的路由,并負(fù)責(zé)轉(zhuǎn)發(fā)元組。STGR獲得物理規(guī)劃后,可以從信息中知道它與其他STGR連接形成網(wǎng)狀網(wǎng)絡(luò),從而進(jìn)行數(shù)據(jù)中繼和背壓控制。元組的傳輸路徑可以由圖3來說明。在圖3中,容器1的實(shí)例D(1D)向容器4的實(shí)例C(4C)發(fā)送元組。這個(gè)元組的路徑是容器1的1D、容器1的Stmgr、容器4的Stmgr和容器4的4C。例如,元組從3A到3B的路徑是容器3的3A、Stmgr和3B。與互聯(lián)網(wǎng)的路由機(jī)制相比,Heron的路由非常簡單,這是由于Stmgr成對連接,使得所有實(shí)例之間的距離小于2跳。
圖3元組發(fā)送路徑的示例
打包
除了路由中繼元組之外,Stmgr還負(fù)責(zé)打包已處理的元組。Acking的概念在Heron的前身Storm就有了。打包機(jī)制的目的是實(shí)現(xiàn)至少一次的語義。原則上,在螺栓實(shí)例處理元組之后,螺栓實(shí)例向螺栓的上游螺栓實(shí)例或出口實(shí)例發(fā)送特殊的打包元組,并向上游節(jié)點(diǎn)確認(rèn)元組已經(jīng)被處理。在這個(gè)過程中,層前進(jìn)到上游節(jié)點(diǎn),直到噴口節(jié)點(diǎn)。實(shí)現(xiàn)上,Acking Tuple通過Stmgr時(shí),Tuple用xor運(yùn)算標(biāo)記,從xor運(yùn)算的特性可知處理是否完成。當(dāng)Spout的一個(gè)實(shí)例在一定時(shí)間內(nèi)沒有收集到打包元組時(shí),它將重新發(fā)送相應(yīng)的數(shù)據(jù)元組。Heron的Acking機(jī)制的實(shí)現(xiàn)與其前身Storm是一致的。
背壓
Heron引入了一種背壓機(jī)制,動(dòng)態(tài)調(diào)整Tuple的處理速度,避免系統(tǒng)過載。一般來說,解決系統(tǒng)過載問題有三種策略:1。放手;2.丟棄過載數(shù)據(jù);3.請求減輕負(fù)荷。Heron采用第三種策略,利用背壓機(jī)制從過載中恢復(fù),以保證系統(tǒng)在過載下不會(huì)崩潰。
背壓機(jī)制的觸發(fā)過程如下:當(dāng)一個(gè)Bolt實(shí)例的處理速度跟不上元組的輸入速度時(shí),負(fù)責(zé)向該實(shí)例轉(zhuǎn)發(fā)元組的Stmgr緩存會(huì)不斷累加。當(dāng)緩存大小超過高水位線時(shí),Stmgr將停止從本地噴口讀取元組,并向拓?fù)渲械乃衅渌鸖tmgr發(fā)送“啟動(dòng)背壓”消息。而剩下的Stmgr將在收到這個(gè)消息時(shí)停止從它們負(fù)責(zé)的噴口實(shí)例讀取和轉(zhuǎn)發(fā)元組。此時(shí),整個(gè)拓?fù)洳辉購耐獠孔x取元組,而只處理內(nèi)部累積的未處理元組。處理速度由最慢的實(shí)例決定。經(jīng)過一段時(shí)間的處理后,當(dāng)高速緩存的大小減小到低水位標(biāo)記時(shí),開始發(fā)送“開始背壓”的Stmgr將再次發(fā)送“停止背壓”消息,以便所有Stmgr將再次開始從噴口實(shí)例讀取分布式數(shù)據(jù)。由于Spout通常從具有重放權(quán)限的消息隊(duì)列中讀取數(shù)據(jù),即使它被凍結(jié),也不會(huì)導(dǎo)致數(shù)據(jù)丟失。
背壓過程中要注意兩個(gè)重要的值:高水位線和低水位線。背壓僅在緩沖區(qū)的大小超過上限時(shí)觸發(fā),然后持續(xù)到緩沖區(qū)的大小降低到下限。這種設(shè)計(jì)有效地避免了拓?fù)湓诒硥籂顟B(tài)和正常狀態(tài)之間不斷振蕩的發(fā)展,并在一定程度上保證了拓?fù)涞姆€(wěn)定性。
例子
實(shí)例是整個(gè)Heron處理引擎的核心部分之一。拓?fù)渲械膰娍陬愋凸?jié)點(diǎn)或螺栓類型節(jié)點(diǎn)由實(shí)例實(shí)現(xiàn)。不同于Storm的Worker設(shè)計(jì),當(dāng)前Heron中的每個(gè)Instance都是一個(gè)獨(dú)立的JVM進(jìn)程,通過Stmgr分發(fā)和接受數(shù)據(jù),完成用戶定義的計(jì)算任務(wù)。獨(dú)立進(jìn)程的設(shè)計(jì)帶來了一系列的優(yōu)勢,如易于調(diào)試、調(diào)優(yōu)、資源隔離、容錯(cuò)恢復(fù)等。同時(shí),由于數(shù)據(jù)分發(fā)和傳輸?shù)娜蝿?wù)已經(jīng)由Stmgr處理,實(shí)例可以用任何編程語言實(shí)現(xiàn),從而支持各種語言平臺(tái)。
實(shí)例設(shè)計(jì)有兩個(gè)線程,如圖4所示。實(shí)例的進(jìn)程包括兩個(gè)線程:網(wǎng)關(guān)和任務(wù)執(zhí)行。網(wǎng)關(guān)線程主要控制實(shí)例與本地Stmgr和Metrics Manager之間的數(shù)據(jù)交換。通過TCP連接,網(wǎng)關(guān)線程:1。接受待定元組;由Stmgr分發(fā);2.將任務(wù)執(zhí)行處理后的元組發(fā)送到Stmgr;3.將任務(wù)執(zhí)行線程生成的指標(biāo)轉(zhuǎn)發(fā)給指標(biāo)管理器。無論是噴口還是螺栓,網(wǎng)關(guān)線程執(zhí)行相同的任務(wù)。
任務(wù)執(zhí)行線程負(fù)責(zé)執(zhí)行用戶定義的計(jì)算任務(wù)。對于噴口和螺栓,任務(wù)執(zhí)行線程將執(zhí)行open()和prepare()方法,以相應(yīng)地初始化它們的狀態(tài)。如果運(yùn)行的實(shí)例是螺栓實(shí)例,任務(wù)執(zhí)行線程將執(zhí)行execute()方法來處理接收到的元組;;如果是Spout,將重復(fù)執(zhí)行nextTuple()方法,從外部數(shù)據(jù)源獲取數(shù)據(jù),生成一個(gè)Tuple,并發(fā)送給下游的Instance進(jìn)行處理。處理后的元組將被發(fā)送到網(wǎng)關(guān)線程進(jìn)行進(jìn)一步分發(fā)。同時(shí),在執(zhí)行過程中,任務(wù)執(zhí)行線程會(huì)生成各種度量(元組處理量、元組處理延遲等)。)并將它們發(fā)送給指標(biāo)管理器進(jìn)行狀態(tài)監(jiān)控。
圖4實(shí)例結(jié)構(gòu)
網(wǎng)關(guān)線程和任務(wù)執(zhí)行線程通過三個(gè)單向隊(duì)列進(jìn)行通信,即數(shù)據(jù)輸入隊(duì)列、數(shù)據(jù)發(fā)送隊(duì)列和度量發(fā)送隊(duì)列。網(wǎng)關(guān)線程通過Tuple通過數(shù)據(jù)輸入隊(duì)列到任務(wù)執(zhí)行線程;任務(wù)執(zhí)行通過數(shù)據(jù)發(fā)送隊(duì)列將處理后的元組發(fā)送給網(wǎng)關(guān)線程;任務(wù)執(zhí)行線程通過度量發(fā)送隊(duì)列將收集的度量發(fā)送到網(wǎng)關(guān)線程。
總結(jié)
本文介紹了流媒體計(jì)算的背景和重要概念,詳細(xì)分析了當(dāng)前Twitter的流媒體計(jì)算引擎Heron的結(jié)構(gòu)和重要組件。希望能為您提供一些設(shè)計(jì)和構(gòu)建流媒體計(jì)算系統(tǒng)的經(jīng)驗(yàn),歡迎您給我們建議和幫助。如果你對Heron的開發(fā)和改進(jìn)感興趣,可以在Github上查看。
【1】Kulkarni,Sanjeev,Nikunj Bhagat,Maosong Fu,Vikas Kedigehalli,Christopher Kellogg,Sailesh Mittal,Jignesh M. Patel,Karthik Ramasamy,Siddarth Taneja。"推特蒼鷺:大規(guī)模流處理." alt="Heron 深度揭秘Twitter的新一代流處理引擎Heron">