電動汽車租賃模式在中國興起

據悉,上海大眾交通目前正與特斯拉探討在租賃業務方面的合作。大眾交通正與特斯拉探討適應中國租賃客戶需求的租賃辦法,為與特斯拉合作積極做準備。

大眾交通汽車租賃客戶多為中高階客戶,對車型有較高要求,目前該公司正在進行客戶端調查,把客戶需求反饋至特斯拉廠家。另外,電動汽車租賃業務的關鍵在於充電樁問題,而特斯拉擬在北京、上海、廣州等大城市進行大規模投資建設,與中國政府共同建設電動車充電樁。

北汽與龐大集團的租賃業務

北汽集團目前也正與龐大集團合作新能源汽車的租賃業務,北汽集團和龐大集團合資成立的租賃公司將嘗試分時租賃等新能源汽車營銷服務新模式。

北汽集團銷售部人士表示,北汽的純電動汽車基本可以滿足市民短途的駕駛需求。同時,購買北汽新能源汽車的用戶可以憑一定數量的押金獲取龐大集團的租車券(例如3000元人民幣押金可以獲得價值1萬元的租車券),通過租賃燃油車滿足長途駕駛的需求。

易卡綠色的分時租賃方式

此前,隸屬於人民日報社的易卡綠色(北京)汽車租賃有限公司,也嘗試了採用分時租賃的方式推廣新能源汽車。目前,易卡綠色租賃的純電動汽車均為北汽E150EV。

易卡綠色方面表示,電動汽車採用分時共享租賃的方式,可以讓更多的客戶分攤租金,使單車獲得更好的收益。電動車分時租賃,既可以滿足個性化的城市短途交通出行,又可以提升車輛使用效率,減少資源佔用,有望使得電動汽車成為城市的公共交通工具。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※教你寫出一流的銷售文案?

史密斯電動獲得中聚電池4200萬美元承諾投資

美國純電動中型商用車生產商史密斯電動車公司(Smith Electric Vehicles)昨(12)日宣佈,該公司已經獲得了中聚電池有限公司的4200萬美元承諾投資。首輪200萬美元投資已經到位,剩餘資金將分兩輪進行投資,投資的前提條件是兩家公司需在未來幾個月取得具有里程碑意義的進展。

中聚電池是鋰離子電池及相關電動汽車產品製造商,此次進行4200萬美元投資讓中聚電池成為了史密斯電動的戰略性股東。

根據合作協定,中聚電池將成為史密斯電動的獨家電池供應商,而相關汽車應用領域的電池將會與史密斯電動的平臺和客戶要求實現相容。中聚電池還將成為某些電動汽車零部件的優先供應商,不過這些零部件要能在其下屬杭州工廠進行生產。

中聚電池公司主席兼執行董事曹忠先生說:「史密斯是一家國際知名的電動汽車供應商。中聚電池不久將改名為五龍電動車(集團),而史密斯和我們的專長會進行結合,再配以對替代能源汽車的宏觀補貼政策,這會讓我們獲得獨一無二的有利地位,能夠利用快速發展的中美電動汽車市場的機遇。」

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※幫你省時又省力,新北清潔一流服務好口碑

※回頭車貨運收費標準

[源碼解析] 從TimeoutException看Flink的心跳機制

[源碼解析] 從TimeoutException看Flink的心跳機制

目錄

  • [源碼解析] 從TimeoutException看Flink的心跳機制
    • 0x00 摘要
    • 0x01 緣由
    • 0x02 背景概念
      • 2.1 四大模塊
      • 2.2 Akka
      • 2.3 RPC機制
        • 2.3.1 RpcEndpoint:RPC的基類
        • RpcService:RPC服務提供者
        • RpcGateway:RPC調用的網關
      • 2.4 常見心跳機制
    • 0x03 Flink心跳機制
      • 3.1 代碼和機制
      • 3.2 靜態架構
        • 3.2.1 HeartbeatTarget :監控目標抽象
        • 3.2.2 HeartbeatMonitor : 管理heartbeat target的心跳狀態
        • 3.2.3 HeartbeatManager :心跳管理者
        • 3.2.4 HearbeatListener 處理心跳結果
      • 3.3 動態運行機制
        • 3.3.1 HearbeatManagerImpl : Receiver
        • 3.3.2 HeartbeatManagerSenderImpl : Sender
        • 3.3.3 HeartbeatMonitorImpl
        • 3.3.3 HeartbeatServices
    • 0x04 初始化
      • 4.1 心跳服務創建
    • 0x05 Flink中具體應用
      • 5.1 總述
        • 5.1.1 RM, JM, TM之間關係
        • 5.1.2 三者間心跳機制
      • 5.2 初始化過程
        • 5.2.1 TaskExecutor初始化
        • 5.2.2 JobMaster的初始化
        • 5.2.3 ResourceManager初始化
      • 5.3 註冊過程
        • 5.3.1 TM註冊到RM中
          • 5.3.1.1 TM的操作
          • 5.3.1.2 RM的操作
          • 5.3.1.3 返回到TM
        • 5.3.2 TM註冊到 JM
      • 5.4 心跳過程
        • 5.4.1 ResourceManager主動發起
          • 5.4.1.1 Sender遍歷所有監控的Monitor(Target)
          • 5.4.1.2 Target進行具體操作
          • 5.4.1.3 RPC調用
        • 5.4.2 RM通過RPC調用TM
        • 5.4.3 TM 通過RPC回到 RM
      • 5.5 超時處理
        • 5.5.1 TaskManager
        • 5.5.2 ResourceManager
    • 0x06 解決問題
    • 0x07 參考

0x00 摘要

本文從一個調試時候常見的異常 “TimeoutException: Heartbeat of TaskManager timed out”切入,為大家剖析Flink的心跳機制。文中代碼基於Flink 1.10。

0x01 緣由

大家如果經常調試Flink,當進入斷點看到了堆棧和變量內容之後,你容易陷入了沉思。當你發現了問題可能所在,高興的讓程序Resume的時候,你發現程序無法運行,有如下提示:

Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 93aa1740-cd2c-4032-b74a-5f256edb3217 timed out.

這實在是很鬱悶的事情。作為程序猿不能忍啊,既然異常提示中有 Heartbeat 字樣,於是我們就來一起看看Flink的心跳機制,看看有沒有可以修改的途徑。

0x02 背景概念

2.1 四大模塊

Flink有核心四大組件:Dispatcher,JobMaster,ResourceManager,TaskExecutor。

  • Dispatcher(Application Master)用於接收client提交的任務和啟動相應的JobManager。其提供REST接口來接收client的application提交,負責啟動JM和提交application,同時運行Web UI。
  • ResourceManager:主要用於資源的申請和分配。當TM有空閑的slot就會告訴JM,沒有足夠的slot也會啟動新的TM。kill掉長時間空閑的TM。
  • JobMaster :功能主要包括(舊版本中JobManager的功能在新版本中以JobMaster形式出現,可能本文中會混淆這兩個詞,請大家諒解):
    • 將JobGraph轉化為ExecutionGraph(physical dataflow graph,并行化)。
    • 向RM申請資源、schedule tasks、保存作業的元數據。
  • TaskManager:類似Spark的executor,會跑多個線程的task、數據緩存與交換。Flink 架構遵循 Master – Slave 架構設計原則,JobMaster 為 Master 節點,TaskManager 為Slave節點。

這四大組件彼此之間的通信需要依賴RPC實現

2.2 Akka

Flink底層RPC基於Akka實現。Akka是一個開發併發、容錯和可伸縮應用的框架。它是Actor Model的一個實現,和Erlang的併發模型很像。在Actor模型中,所有的實體被認為是獨立的actors。actors和其他actors通過發送異步消息通信。

Actor模型的強大來自於異步。它也可以顯式等待響應,這使得可以執行同步操作。但是強烈不建議同步消息,因為它們限制了系統的伸縮性。

2.3 RPC機制

RPC作用是:讓異步調用看起來像同步調用。

Flink基於Akka構建了其底層通信系統,引入了RPC調用,各節點通過GateWay方式回調,隱藏通信組件的細節,實現解耦。Flink整個通信框架的組件主要由RpcEndpoint、RpcService、RpcServer、AkkaInvocationHandler、AkkaRpcActor等構成。

RPC相關的主要接口如下:

  • RpcEndpoint
  • RpcService
  • RpcGateway

2.3.1 RpcEndpoint:RPC的基類

RpcEndpoint是Flink RPC終端的基類,所有提供遠程過程調用的分佈式組件必須擴展RpcEndpoint,其功能由RpcService支持。

RpcEndpoint的子類只有四類組件:Dispatcher,JobMaster,ResourceManager,TaskExecutor,即Flink中只有這四個組件有RPC的能力,換句話說只有這四個組件有RPC的這個需求。

每個RpcEndpoint對應了一個路徑(endpointId和actorSystem共同確定),每個路徑對應一個Actor,其實現了RpcGateway接口,

RpcService:RPC服務提供者

RpcServer是RpcEndpoint的成員變量,為RpcService提供RPC服務/連接遠程Server,其只有一個子類實現:AkkaRpcService(可見目前Flink的通信方式依然是Akka)。

RpcServer用於啟動和連接到RpcEndpoint, 連接到rpc服務器將返回一個RpcGateway,可用於調用遠程過程。

Flink四大組件Dispatcher,JobMaster,ResourceManager,TaskExecutor,都是RpcEndpoint的實現,所以構建四大組件時,同步需要初始化RpcServer。如JobManager的構造方式,第一個參數就是需要知道RpcService。

RpcGateway:RPC調用的網關

Flink的RPC協議通過RpcGateway來定義;由前面可知,若想與遠端Actor通信,則必須提供地址(ip和port),如在Flink-on-Yarn模式下,JobMaster會先啟動ActorSystem,此時TaskExecutor的Container還未分配,後面與TaskExecutor通信時,必須讓其提供對應地址。

Dispatcher,JobMaster,ResourceManager,TaskExecutor 這四大組件通過各種方式實現了Gateway。以JobMaster為例,JobMaster實現JobMasterGateway接口。各組件類的成員變量都有需要通信的其他組件的GateWay實現類,這樣可通過各自的Gateway實現RPC調用。

2.4 常見心跳機制

常見的心跳檢測有兩種:

  • socket 套接字SO_KEEPALIVE本身帶有的心跳機制,定期向對方發送心跳包,對方收到心跳包後會自動回復;
  • 應用自身實現心跳機制,同樣也是使用定期發送請求的方式;

Flink實現的是第二種方案。

0x03 Flink心跳機制

3.1 代碼和機制

Flink的心跳機制代碼在:

Flink-master/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat

四個接口:

HeartbeatListener.java          HeartbeatManager.java      HeartbeatTarget.java  HeartbeatMonitor.java

以及如下幾個類:

HeartbeatManagerImpl.java   HeartbeatManagerSenderImpl.java   HeartbeatMonitorImpl.java
HeartbeatServices.java    NoOpHeartbeatManager.java

Flink集群有多種業務流程,比如Resource Manager, Task Manager, Job Manager。每種業務流程都有自己的心跳機制。Flink的心跳機制只是提供接口和基本功能,具體業務功能由各業務流程自己實現

我們首先設定 心跳系統中有兩種節點:sender和receiver。心跳機制是sender和receivers彼此相互檢測。但是檢測動作是Sender主動發起,即Sender主動發送請求探測receiver是否存活,因為Sender已經發送過來了探測心跳請求,所以這樣receiver同時也知道Sender是存活的,然後Reciver給Sender回應一個心跳錶示自己也是活着的。

因為Flink的幾個名詞和我們常見概念有所差別,所以流程上需要大家仔細甄別,即:

  • Flink Sender 主動發送Request請求給Receiver,要求Receiver回應一個心跳;
  • Flink Receiver 收到Request之後,通過Receive函數回應一個心跳請求給Sender;

3.2 靜態架構

3.2.1 HeartbeatTarget :監控目標抽象

HeartbeatTarget是對監控目標的抽象。心跳機制在行為上而言有兩種動作:

  • 向某個節點發送請求。
  • 處理某個節點發來的請求。

HeartbeatTarget的函數就是這兩個動作:

  • receiveHeartbeat :向某個節點(Sender)發送心跳回應,其參數heartbeatOrigin 就是 Receiver。
  • requestHeartbeat :向某個節點(Receiver)要求其回應一個心跳,其參數requestOrigin 就是 Sender。requestHeartbeat這個函數是Sender的函數,其中Sender通過RPC直接調用到Receiver

這兩個函數的參數也很簡單:分別是請求的發送放和接收方,還有Payload載荷。對於一個確定節點而言,接收的和發送的載荷是同一類型的。

public interface HeartbeatTarget<I> {
	/**
	 * Sends a heartbeat response to the target.
	 * @param heartbeatOrigin Resource ID identifying the machine for which a heartbeat shall be reported.
	 */
  // heartbeatOrigin 就是 Receiver
	void receiveHeartbeat(ResourceID heartbeatOrigin, I heartbeatPayload);

	/**
	 * Requests a heartbeat from the target. 
	 * @param requestOrigin Resource ID identifying the machine issuing the heartbeat request.
	 */
  // requestOrigin 就是 Sender
	void requestHeartbeat(ResourceID requestOrigin, I heartbeatPayload);
}

3.2.2 HeartbeatMonitor : 管理heartbeat target的心跳狀態

對HeartbeatTarget的封裝,這樣Manager對Target的操作是通過對Monitor完成,後續會在其繼承類中詳細說明。

public interface HeartbeatMonitor<O> {
  // Gets heartbeat target.
	HeartbeatTarget<O> getHeartbeatTarget();
	// Gets heartbeat target id.
	ResourceID getHeartbeatTargetId();
	// Report heartbeat from the monitored target.
	void reportHeartbeat();
	//Cancel this monitor.
	void cancel();
  //Gets the last heartbeat.
	long getLastHeartbeat();
}

3.2.3 HeartbeatManager :心跳管理者

HeartbeatManager負責管理心跳機制,比如啟動/停止/報告一個HeartbeatTarget。此接口繼承HeartbeatTarget。

除了HeartbeatTarget的函數之外,這接口有4個函數:

  • monitorTarget,把和某資源對應的節點加入到心跳監控列表;
  • unmonitorTarget,從心跳監控列表刪除某資源對應的節點;
  • stop,停止心跳管理服務,釋放資源;
  • getLastHeartbeatFrom,獲取某節點的最後一次心跳數據。
public interface HeartbeatManager<I, O> extends HeartbeatTarget<I> {
	void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget);
	void unmonitorTarget(ResourceID resourceID);
	void stop();
	long getLastHeartbeatFrom(ResourceID resourceId);
}

3.2.4 HearbeatListener 處理心跳結果

用戶業務邏輯需要繼承這個接口以處理心跳結果。其可以看做服務的輸出,實現了三個回調函數

  • notifyHeartbeatTimeout,處理節點心跳超時
  • reportPayload,處理節點發來的Payload載荷
  • retrievePayLoad。獲取對某節點發下一次心跳請求的Payload載荷
public interface HeartbeatListener<I, O> {
	void notifyHeartbeatTimeout(ResourceID resourceID);
	void reportPayload(ResourceID resourceID, I payload);
	O retrievePayload(ResourceID resourceID);
}

3.3 動態運行機制

之前提到Sender和Receiver,下面兩個類就對應上述概念。

  • HeartbeatManagerImpl :Receiver,存在於JobMaster與TaskExecutor中;
  • HeartbeatManagerSenderImpl :Sender,繼承 HeartbeatManagerImpl類,用於周期發送心跳要求,存在於JobMaster、ResourceManager中。

幾個關鍵問題:

  1. 如何判定心跳超時?

    心跳服務啟動后,Flink在Monitor中通過 ScheduledFuture 會啟動一個線程來處理心跳超時事件。在設定的心跳超時時間到達后才執行線程。

    如果在設定的心跳超時時間內接收到組件的心跳消息,會先將該線程取消而後重新開啟,重置心跳超時事件的觸發。

    如果在設定的心跳超時時間內沒有收到組件的心跳,則會通知組件:你超時了。

  2. 何時”調用雙方”發起心跳檢查?

    心跳檢查是雙向的,一方(Sender)會主動發起心跳請求,而另一方(Receiver)則是對心跳做出響應,兩者通過RPC相互調用,重置對方的 Monitor 超時線程。

    以JobMaster和TaskManager為例,JM在啟動時會開啟周期調度,向已經註冊到JM中的TM發起心跳檢查,通過RPC調用TM的requestHeartbeat方法,重置TM中對JM超時線程的調用,表示當前JM狀態正常。在TM的requestHeartbeat方法被調用后,通過RPC調用JM的receiveHeartbeat,重置 JM 中對TM超時線程的調用,表示TM狀態正常。

  3. 如何處理心跳超時?

    心跳服務依賴 HeartbeatListener,當在timeout時間範圍內未接收到心跳響應,則會觸發超時處理線程,該線程通過調用HeartbeatListener.notifyHeartbeatTimeout方法做後續重連操作或者直接斷開。

下面是一個概要(以RM & TM為例):

  • RM : 實現了ResourceManagerGateway (可以直接被RPC調用)

  • TM : 實現了TaskExecutorGateway (可以直接被RPC調用)

  • RM :有一個Sender HM : taskManagerHeartbeatManager,Sender HM 擁有用戶定義的 TaskManagerHeartbeatListener

  • TM :有一個Receiver HM :resourceManagerHeartbeatManager,Receiver HM 擁有用戶定義的ResourceManagerHeartbeatListener。

  • HeartbeatManager 有一個ConcurrentHashMap<ResourceID, HeartbeatMonitor > heartbeatTargets,這個Map是它監控的所有Target。

  • 對於RM的每一個需要監控的TM, 其生成一個HeartbeatTarget,進而被構造成一個HeartbeatMonitor,放置到ResourceManager.taskManagerHeartbeatManager中。

  • 每一個Target對應的Monitor中,有自己的異步任務ScheduledFuture,這個ScheduledFuture不停的被取消/重新生成。如果在某個期間內沒有被取消,則通知用戶定義的listener出現了timeout。

3.3.1 HearbeatManagerImpl : Receiver

HearbeatManagerImpl是receiver的具體實現。它由 心跳 被發起方(就是Receiver,例如TM) 創建,接收 發起方(就是Sender,例如 JM)的心跳發送請求。心跳超時 會觸發 heartbeatListener.notifyHeartbeatTimeout方法。

注意:被發起方監控線程(Monitor)的開啟是在接收到請求心跳(requestHeartbeat被調用后)以後才觸發的,屬於被動觸發。

HearbeatManagerImpl主要維護了

  • 一個心跳監控列表 map : <ResourceID, HeartbeatMonitor<O>> heartbeatTargets;。這是一個KV關聯。

    key代表要發送心跳組件(例如:TM)的ID,value則是為當前組件創建的觸發心跳超時的線程HeartbeatMonitor,兩者一一對應。

    當一個從所聯繫的machine發過來的心跳被收到時候,對應的monitor的狀態會被更新(重啟一個新ScheduledFuture)。當一個monitor發現了一個 heartbeat timed out,它會通知自己的HeartbeatListener。

  • 一個 ScheduledExecutor mainThreadExecutor 負責heartbeat timeout notifications。

  • heartbeatListener :處理心跳結果。

HearbeatManagerImpl 數據結構如下:

@ThreadSafe
public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {

	/** Heartbeat timeout interval in milli seconds. */
	private final long heartbeatTimeoutIntervalMs;

	/** Resource ID which is used to mark one own's heartbeat signals. */
	private final ResourceID ownResourceID;

	/** Heartbeat listener with which the heartbeat manager has been associated. */
	private final HeartbeatListener<I, O> heartbeatListener;

	/** Executor service used to run heartbeat timeout notifications. */
	private final ScheduledExecutor mainThreadExecutor;

	/** Map containing the heartbeat monitors associated with the respective resource ID. */
	private final ConcurrentHashMap<ResourceID, HeartbeatMonitor<O>> heartbeatTargets;

	/** Running state of the heartbeat manager. */
	protected volatile boolean stopped;  
}

HearbeatManagerImpl實現的主要函數有:

  • monitorTarget :把一個節點加入到心跳監控列表。
    • 傳入參數有:ResourceId和HearbeatTarget,monitorTarget根據這兩個參數,生成一個HeartbeatMonitor對象,然後把這個對象跟ResrouceId做kv關聯,存入到heartbeatTargets。 一個節點可能參与多個業務流程,因此一個節點參与多個心跳流程,一個節點上運行多個不同類型的HearbeatTarget。所以一個ResourceID可能會跟不同類型的HearbeatTarget對象關聯,分別加入到多個HeartbeatManager,進行不同類型的心跳監控。也因此這個函數入參是兩個參數。
  • requestHeartbeat :Sender通過RPC異步調用到Receiver的這個函數 以要求receiver向requestOrigin節點(就是Sender)發起一次心跳響應,載荷是heartbeatPayLoad。其內部流程如下:
    • 首先會調用reportHeartbeat函數,作用是 通過Monitor 記錄發起請求的這個時間點,然後創建一個ScheduleFuture。如果到期后,requestOrigin沒有作出響應,那麼就將requestOrigin節點對應的HeartbeatMonitor的state設置成TIMEOUT狀態,如果到期內requestOrigin響應了,ScheduleFuture會被取消,HeartbeatMonitor的state仍然是RUNNING。
    • 其次調用reportPayload函數,把requestOrigin節點的最新的heartbeatPayload通知給heartbeatListener。heartbeatListener是外部傳入的,它根據所有節點的心跳記錄做監聽管理。
    • 最後調用receiveHearbeat函數,響應一個心跳給Sender。

3.3.2 HeartbeatManagerSenderImpl : Sender

繼承HearbeatManagerImpl,由心跳管理的一方(例如JM)創建,實現了run函數(即它可以作為一個單獨線程運行),創建后立即開啟周期調度線程,每次遍歷自己管理的heartbeatTarget,觸發heartbeatTarget.requestHeartbeat,要求 Target 返回一個心跳響應。屬於主動觸發心跳請求。

public class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I, O> implements Runnable {
	public void run() {
		if (!stopped) {
			for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {
				requestHeartbeat(heartbeatMonitor);
			}
      // 周期調度
			getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
		}
	}

  //  主動發起心跳檢查
	private void requestHeartbeat(HeartbeatMonitor<O> heartbeatMonitor) {
		O payload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId());
		final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();
    // 調用 Target 的 requestHeartbeat 函數
		heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload);
	}  
}

3.3.3 HeartbeatMonitorImpl

Heartbeat monitor管理心跳目標,它啟動一個ScheduledExecutor。

  • 如果在timeout時間內沒有接收到心跳信號,則判定心跳超時,通知給HeartbeatListener。
  • 如果在timeout時間內接收到心跳信號,則重置當前ScheduledExecutor。
public class HeartbeatMonitorImpl<O> implements HeartbeatMonitor<O>, Runnable {

	/** Resource ID of the monitored heartbeat target. */
	private final ResourceID resourceID;   // 被監控的resource ID

	/** Associated heartbeat target. */
	private final HeartbeatTarget<O> heartbeatTarget; //心跳目標

	private final ScheduledExecutor scheduledExecutor;

	/** Listener which is notified about heartbeat timeouts. */
	private final HeartbeatListener<?, ?> heartbeatListener; // 心跳監聽器

	/** Maximum heartbeat timeout interval. */
	private final long heartbeatTimeoutIntervalMs;

	private volatile ScheduledFuture<?> futureTimeout;
  //  AtomicReference  使用 
	private final AtomicReference<State> state = new AtomicReference<>(State.RUNNING);
   //  最近一次接收到心跳的時間
  private volatile long lastHeartbeat;
  
	// 報告心跳
	public void reportHeartbeat() {
    // 保留最近一次接收心跳時間
		lastHeartbeat = System.currentTimeMillis();
    // 接收心跳后,重置timeout線程
		resetHeartbeatTimeout(heartbeatTimeoutIntervalMs);
	}

  // 心跳超時,觸發lister的notifyHeartbeatTimeout
	public void run() {
		// The heartbeat has timed out if we're in state running
		if (state.compareAndSet(State.RUNNING, State.TIMEOUT)) {
			heartbeatListener.notifyHeartbeatTimeout(resourceID);
		}
	}

  //  重置TIMEOUT
	void resetHeartbeatTimeout(long heartbeatTimeout) {
		if (state.get() == State.RUNNING) {
      //先取消線程,在重新開啟
			cancelTimeout();
      // 啟動超時線程
			futureTimeout = scheduledExecutor.schedule(this, heartbeatTimeout, TimeUnit.MILLISECONDS);

			// Double check for concurrent accesses (e.g. a firing of the scheduled future)
			if (state.get() != State.RUNNING) {
				cancelTimeout();
			}
		}
	}

3.3.3 HeartbeatServices

建立heartbeat receivers and heartbeat senders,主要是對外提供服務。這裏我們可以看到:

  • HeartbeatManagerImpl就是receivers。
  • HeartbeatManagerSenderImpl就是senders。
public class HeartbeatServices {
	// Creates a heartbeat manager which does not actively send heartbeats.
	public <I, O> HeartbeatManager<I, O> createHeartbeatManager(...) {
		return new HeartbeatManagerImpl<>(...);
	}
	// Creates a heartbeat manager which actively sends heartbeats to monitoring targets.
	public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(...) {
		return new HeartbeatManagerSenderImpl<>(...);
	}
}

0x04 初始化

4.1 心跳服務創建

心跳管理服務在Cluster入口創建。因為我們是調試,所以在MiniCluster.start調用。

public void start() throws Exception {
				......
				heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
				......
}

HeartbeatServices.fromConfiguration會從Configuration中獲取配置信息:

  • 心跳間隔 heartbeat.interval
  • 心跳超時時間 heartbeat.timeout

個就是我們解決最開始問題的思路:從配置信息入手,擴大心跳間隔

public HeartbeatServices(long heartbeatInterval, long heartbeatTimeout) {
		this.heartbeatInterval = heartbeatInterval;
		this.heartbeatTimeout = heartbeatTimeout;
}

public static HeartbeatServices fromConfiguration(Configuration configuration) {
		long heartbeatInterval = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL);
		long heartbeatTimeout = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT);

		return new HeartbeatServices(heartbeatInterval, heartbeatTimeout);
}

0x05 Flink中具體應用

5.1 總述

5.1.1 RM, JM, TM之間關係

系統中有幾個ResourceManager?整個 Flink 集群中只有一個 ResourceManager。

系統中有幾個JobManager?JobManager 負責管理作業的執行。默認情況下,每個 Flink 集群只有一個 JobManager 實例。JobManager 相當於整個集群的 Master 節點,負責整個集群的任務管理和資源管理。

系統中有幾個TaskManager?這個由具體啟動方式決定。比如Flink on Yarn,Session模式能夠指定拉起多少個TaskManager。 Per job模式中TaskManager數量是在提交作業時根據併發度動態計算,即Number of TM = Parallelism/numberOfTaskSlots。比如:有一個作業,Parallelism為10,numberOfTaskSlots為1,則TaskManager為10。

5.1.2 三者間心跳機制

Flink中ResourceManager、JobMaster、TaskExecutor三者之間存在相互檢測的心跳機制:

  • ResourceManager會主動發送請求探測JobMaster、TaskExecutor是否存活。
  • JobMaster也會主動發送請求探測TaskExecutor是否存活,以便進行任務重啟或者失敗處理。

我們之前講過,HeartbeatManagerSenderImpl屬於Sender,HeartbeatManagerImpl屬於Receiver。

  1. HeartbeatManagerImpl所處位置可以理解為client,存在於JobMaster與TaskExecutor中;
  2. HeartbeatManagerSenderImpl類,繼承 HeartbeatManagerImpl類,用於周期發送心跳請求,所處位置可以理解為server, 存在於JobMaster、ResourceManager中。

ResourceManager 級別最高,所以兩個HM都是Sender,監控taskManager和jobManager

public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
		extends FencedRpcEndpoint<ResourceManagerId>
		implements ResourceManagerGateway, LeaderContender {
	taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender
	jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender
}

JobMaster級別中等,一個Sender, 一個Receiver,受到ResourceManager的監控,監控taskManager。

public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService {
	taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender
	resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager
}

TaskExecutor級別最低,兩個Receiver,分別被JM和RM疾控。

public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
	this.jobManagerHeartbeatManager = return heartbeatServices.createHeartbeatManager
	this.resourceManagerHeartbeatManager = return heartbeatServices.createHeartbeatManager
}

以JobManager和TaskManager為例。JM在啟動時會開啟周期調度,向已經註冊到JM中的TM發起心跳檢查,通過RPC調用TM的requestHeartbeat方法,重置對JM超時線程的調用,表示當前JM狀態正常。在TM的requestHeartbeat方法被調用后,通過RPC調用JM的receiveHeartbeat,重置對TM超時線程的調用,表示TM狀態正常。

5.2 初始化過程

5.2.1 TaskExecutor初始化

TM初始化生成了兩個Receiver HM。

public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
	/** The heartbeat manager for job manager in the task manager. */
	private final HeartbeatManager<AllocatedSlotReport, AccumulatorReport> jobManagerHeartbeatManager;

	/** The heartbeat manager for resource manager in the task manager. */
	private final HeartbeatManager<Void, TaskExecutorHeartbeatPayload> resourceManagerHeartbeatManager;
  
  //初始化函數
	this.jobManagerHeartbeatManager = createJobManagerHeartbeatManager(heartbeatServices, resourceId);
	this.resourceManagerHeartbeatManager = createResourceManagerHeartbeatManager(heartbeatServices, resourceId);
}

生成HeartbeatManager時,就註冊了ResourceManagerHeartbeatListener和JobManagerHeartbeatListener。

此時,兩個HeartbeatManagerImpl中已經創建好對應monitor線程,只有在JM或者RM執行requestHeartbeat后,才會觸發該線程的執行。

5.2.2 JobMaster的初始化

JM生成了一個Sender HM,一個Receiver HM。這裡會註冊 TaskManagerHeartbeatListener 和 ResourceManagerHeartbeatListener

public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService {
  private HeartbeatManager<AccumulatorReport, AllocatedSlotReport> taskManagerHeartbeatManager;
  private HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;

  private void startHeartbeatServices() {
      taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
        resourceId,
        new TaskManagerHeartbeatListener(),
        getMainThreadExecutor(),
        log);

      resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager(
        resourceId,
        new ResourceManagerHeartbeatListener(),
        getMainThreadExecutor(),
        log);
  }
}

5.2.3 ResourceManager初始化

JobMaster在啟動時候,會在startHeartbeatServices函數中生成兩個Sender HeartbeatManager。

taskManagerHeartbeatManager :HeartbeatManagerSenderImpl對象,會反覆啟動一個定時器,定時掃描需要探測的對象並且發送心跳請求。

jobManagerHeartbeatManager :HeartbeatManagerSenderImpl,會反覆啟動一個定時器,定時掃描需要探測的對象並且發送心跳請求。

taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
			resourceId,
			new TaskManagerHeartbeatListener(),
			getMainThreadExecutor(),
			log);

jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
			resourceId,
			new JobManagerHeartbeatListener(),
			getMainThreadExecutor(),
			log);  

5.3 註冊過程

我們以TM與RM交互為例。TaskExecutor啟動之後,需要註冊到RM和JM中。

流程圖如下:

 * 1. Run in Task Manager
 *
 *    TaskExecutor.onStart //Life cycle
 *        |
 *        +----> startTaskExecutorServices@TaskExecutor
 *        |     //開始TM服務
 *        |     
 *        +----> resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
 *        |     // 開始連接到RM
 *        |     // start by connecting to the ResourceManager
 *        |    
 *        +----> notifyLeaderAddress@ResourceManagerLeaderListener  
 *        |     // 當RM狀態變化之後,將回調到這裏    
 *        |     // The listener for leader changes of the resource manager.
 *        |        
 *        +----> reconnectToResourceManager@TaskExecutor   
 *        |     // 以下三步調用是漸進的,就是與RM聯繫。
 *        |    
 *        +----> tryConnectToResourceManager@TaskExecutor 
 *        |     
 *        +----> connectToResourceManager()@TaskExecutor
 *        |     // 主要作用是生成了 TaskExecutorToResourceManagerConnection    
 *        |  
 *        +----> start@TaskExecutorToResourceManagerConnection
 *        |     // 開始RPC調用,將會調用到其基類RegisteredRpcConnection的start   
 *        |   
 *        +----> start@RegisteredRpcConnection
 *        |     // RegisteredRpcConnection實現了組件之間註冊聯繫的基本RPC
 *        |      
   
   
 * ~~~~~~~~ 這裡是 Akka RPC
   
 * 2. Run in Resource Manager   
 * 現在程序執行序列到達了RM, 主要是添加一個Target到RM 的 Sender HM;
 *
 *    registerTaskExecutor@ResourceManager
 *        |
 *        +----> taskExecutorGatewayFuture.handleAsync
 *        |     // 異步調用到這裏
 *        |     
 *        +----> registerTaskExecutorInternal@ResourceManager
 *        |     // RM的內部實現,將把TM註冊到RM自己這裏
 *        |      
 *        +----> taskManagerHeartbeatManager.monitorTarget
 *        |     // 生成HeartbeatMonitor,
 *        |      
 *        +---->  heartbeatTargets.put(resourceID,heartbeatMonitor);
 *        |     // 把Monitor放到 HM in TM之中,就是說TM開始監控了RM
 *        |        

 * ~~~~~~~~ 這裡是 Akka RPC
  
 * 3. Run in Task Manager
 * 現在程序回到了TM, 主要是添加一個Target到 TM 的 Receiver HM;
 *
 *    onRegistrationSuccess@TaskExecutorToResourceManagerConnection
 *        |   
 *        |   
 *        +---->  onRegistrationSuccess@ResourceManagerRegistrationListener 
 *        |       // 回調函數
 *        |  
 *        +---->  runAsync(establishResourceManagerConnection) 
 *        |       // 異步執行
 *        |  
 *        +---->  establishResourceManagerConnection@TaskExecutor 
 *        |       // 說明已經和RM建立了聯繫,所以可以開始監控RM了
 *        |  
 *        +---->  resourceManagerHeartbeatManager.monitorTarget 
 *        |     // 生成HeartbeatMonitor,
 *        |      
 *        +---->  heartbeatTargets.put(resourceID,heartbeatMonitor);  
 *        |       // 把 RM 也註冊到 TM了
 *        |       // monitor the resource manager as heartbeat target 

下面是具體文字描述。

5.3.1 TM註冊到RM中

5.3.1.1 TM的操作
  • TaskExecutor啟動之後,調用onStart,開始其生命周期。
  • onStart直接調用startTaskExecutorServices。
  • 啟動服務的第一步就是與ResourceManager取得聯繫,這裏註冊了一個ResourceManagerLeaderListener(),用來監聽RM Leader的變化。
private final LeaderRetrievalService resourceManagerLeaderRetriever;
// resourceManagerLeaderRetriever其實是EmbeddedLeaderService的實現,A simple leader election service, which selects a leader among contenders and notifies listeners.

resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
  • 當得到RM Leader的地址之後,會調用到回調函數notifyLeaderAddress@ResourceManagerLeaderListener,然後調用notifyOfNewResourceManagerLeader。
  • notifyOfNewResourceManagerLeader中獲取到RM地址后,就通過reconnectToResourceManager與RM聯繫。
  • reconnectToResourceManager中間接調用到TaskExecutorToResourceManagerConnection。其作用是建立TaskExecutor 和 ResourceManager之間的聯繫。因為知道 ResourceManagerGateway所以才能進行RPC操作。
  • 然後在 TaskExecutorToResourceManagerConnection中,就通過RPC與RM聯繫。
5.3.1.2 RM的操作
  • RPC調用后,程序就來到了RM中,RM做如下操作:
  • 會註冊一個新的TaskExecutor到自己的taskManagerHeartbeatManager中。
  • registerTaskExecutor@ResourceManager會通過異步調用到registerTaskExecutorInternal。
  • registerTaskExecutorInternal中首先看看是否這個TaskExecutor的ResourceID之前註冊過,如果註冊過就移除再添加一個新的TaskExecutor。
  • 通過 taskManagerHeartbeatManager.monitorTarget 開始進行心跳機制的註冊。
taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() {
				public void receiveHeartbeat(ResourceID resourceID, Void payload) {
					// the ResourceManager will always send heartbeat requests to the
					// TaskManager
				}
				public void requestHeartbeat(ResourceID resourceID, Void payload) {
					taskExecutorGateway.heartbeatFromResourceManager(resourceID);
				}
});

當註冊完成后,RM中的Sender HM內部結構如下,能看出來多了一個Target:

taskManagerHeartbeatManager = {HeartbeatManagerSenderImpl@8866} 
 heartbeatPeriod = 10000
 heartbeatTimeoutIntervalMs = 50000
 ownResourceID = {ResourceID@8871} "040709f36ebf38f309fed518a88946af"
 heartbeatListener = {ResourceManager$TaskManagerHeartbeatListener@8872} 
 mainThreadExecutor = {RpcEndpoint$MainThreadExecutor@8873} 
 heartbeatTargets = {ConcurrentHashMap@8875}  size = 1
  {ResourceID@8867} "630c15c9-4861-4b41-9c95-92504f458b71" -> {HeartbeatMonitorImpl@9448} 
   key = {ResourceID@8867} "630c15c9-4861-4b41-9c95-92504f458b71"
   value = {HeartbeatMonitorImpl@9448} 
    resourceID = {ResourceID@8867} "630c15c9-4861-4b41-9c95-92504f458b71"
    heartbeatTarget = {ResourceManager$2@8868} 
    scheduledExecutor = {RpcEndpoint$MainThreadExecutor@8873} 
    heartbeatListener = {ResourceManager$TaskManagerHeartbeatListener@8872} 
    heartbeatTimeoutIntervalMs = 50000
    futureTimeout = {ScheduledFutureAdapter@10140} 
    state = {AtomicReference@9786} "RUNNING"
    lastHeartbeat = 0
5.3.1.3 返回到TM

RM會通過RPC再次回到TaskExecutor,其新執行序列如下:

  • 首先RPC調用到了 onRegistrationSuccess@TaskExecutorToResourceManagerConnection。
  • 然後onRegistrationSuccess@ResourceManagerRegistrationListener中通過異步執行調用到了establishResourceManagerConnection。這說明TM已經和RM建立了聯繫,所以可以開始監控RM了。
  • 然後和RM操作類似,通過resourceManagerHeartbeatManager.monitorTarget 來把RM註冊到自己這裏。
HeartbeatMonitor<O> heartbeatMonitor = heartbeatMonitorFactory.createHeartbeatMonitor 
heartbeatTargets.put(resourceID, heartbeatMonitor);  

當註冊完成后,其Receiver HM結構如下:

resourceManagerHeartbeatManager = {HeartbeatManagerImpl@10163} 
 heartbeatTimeoutIntervalMs = 50000
 ownResourceID = {ResourceID@8882} "96a9b80c-dd97-4b63-9049-afb6662ea3e2"
 heartbeatListener = {TaskExecutor$ResourceManagerHeartbeatListener@10425} 
 mainThreadExecutor = {RpcEndpoint$MainThreadExecutor@10426} 
 heartbeatTargets = {ConcurrentHashMap@10427}  size = 1
  {ResourceID@8886} "122fa66685133b11ea26ee1b1a6cef75" -> {HeartbeatMonitorImpl@10666} 
   key = {ResourceID@8886} "122fa66685133b11ea26ee1b1a6cef75"
   value = {HeartbeatMonitorImpl@10666} 
    resourceID = {ResourceID@8886} "122fa66685133b11ea26ee1b1a6cef75"
    heartbeatTarget = {TaskExecutor$1@10668} 
    scheduledExecutor = {RpcEndpoint$MainThreadExecutor@10426} 
    heartbeatListener = {TaskExecutor$ResourceManagerHeartbeatListener@10425} 
    heartbeatTimeoutIntervalMs = 50000
    futureTimeout = {ScheduledFutureAdapter@10992} 
    state = {AtomicReference@10667} "RUNNING"
    lastHeartbeat = 0

5.3.2 TM註冊到 JM

其調用基本思路與之前相同,就是TM和JM之間互相註冊一個代表對方的monitor:

JobLeaderListenerImpl ----> establishJobManagerConnection

消息到了JM中,做如下操作。

registerTaskManager ----> taskManagerHeartbeatManager.monitorTarget
  // monitor the task manager as heartbeat target

5.4 心跳過程

在任務提交之後,我們就進入了正常的心跳監控流程。我們依然用 TM 和 RM進行演示。

我們先給出一個流程圖。

 * 1. Run in Resouce Manager
 *
 *    HeartbeatManagerSender in RM
 *        |
 *        +----> run@HeartbeatManagerSenderImpl
 *        |     //遍歷所有監控的Monitor(Target),逐一在Target上調用requestHeartbeat
 *        |     
 *        +----> requestHeartbeat@HeartbeatManagerSenderImpl 
 *        |     // 將調用具體監控對象的自定義函數
 *        |     // heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload);
 *        |      
 *        +----> getHeartbeatListener().retrievePayload   
 *        |     // 調用到TaskManagerHeartbeatListener@ResourceManager    
 *        |     // 這裡是return null;,因為RM不會是任何人的Receiver
 *        |        
 *        +----> requestHeartbeat@HeartbeatTarget   
 *        |     // 調用到Target這裏,代碼在ResourceManager這裏,就是生成Target時候賦值的
 *        |    
 *        +----> taskExecutorGateway.heartbeatFromResourceManager
 *        |     // 會通過gateway RPC 調用到TM,這就是主動對TM發起了心跳請求
 *        |      

 * ~~~~~~~~ 這裡是 Akka RPC
   
 * 2. Run in Task Manager   
 * 現在程序執行序列到達了TM, 主要是 1. 重置TM的Monitor線程; 2.返回一些負載信息;
 *
 *    heartbeatFromResourceManager@TaskExecutor
 *        |
 *        +----> resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
 *        |     //開始要調用到 Receiver HM in Task Manager
 *        |     
 *        +----> requestHeartbeat@HeartbeatManager in TM 
 *        |     // 在Receiver HM in Task Manager 這裏運行
 *        |      
 *        +----> reportHeartbeat@HeartbeatMonitor   
 *        |     //reportHeartbeat : 記錄發起請求的這個時間點,然後resetHeartbeatTimeout
 *        |      
 *        +----> resetHeartbeatTimeout@HeartbeatMonitor
 *        |     // 如果Monitor狀態依然是RUNNING,則取消之前設置的ScheduledFuture。
 *        |     // 重新創建一個ScheduleFuture。因為如果不取消,則之前那個ScheduleFuture運行時
 *        |     // 會調用HeartbeatMonitorImpl.run函數,run直接compareAndSet后,通知目標函數
 *        |     // 目前已經超時,即調用heartbeatListener.notifyHeartbeatTimeout。
 *        |     // 這裏代表 JM 狀態正常。   
 *        |      
 *        +---->  heartbeatListener.reportPayload
 *        |     // 把Target節點的最新的heartbeatPayload通知給heartbeatListener。 
 *        |     // heartbeatListerner是外部傳入的,它根據所擁有的節點的心跳記錄做監聽管理。
 *        |        
 *        +---->  heartbeatTarget.receiveHeartbeat(getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin));
 *        |      
 *        |     
 *        +---->  retrievePayload@ResourceManagerHeartbeatListener in TM   
 *        |       // heartbeatTarget.receiveHeartbeat參數調用的
 *        |     
 *        +---->  return new TaskExecutorHeartbeatPayload
 *        |   
 *        |     
 *        +---->  receiveHeartbeat in TM 
 *        |       // 回到 heartbeatTarget.receiveHeartbeat,這就是TM生成Target的時候的自定義函數
 *        |       // 就是響應一個心跳消息回給RM
 *        |     
 *        +----> resourceManagerGateway.heartbeatFromTaskManager  
 *        |     // 會通過gateway RPC 調用到 ResourcManager
 *        |      

 * ~~~~~~~~ 這裡是 Akka RPC
  
 * 3. Run in Resouce Manager
 * 現在程序回到了RM, 主要是 1.重置RM的Monitor線程;2. 上報收到TaskExecutor的負載信息  
 *
 *    heartbeatFromTaskManager in RM
 *        |   
 *        |   
 *        +---->  taskManagerHeartbeatManager.receiveHeartbeat 
 *        |       // 這是個Sender HM
 *        |  
 *        +---->  HeartbeatManagerImpl.receiveHeartbeat  
 *        |  
 *        |     
 *        +---->  HeartbeatManagerImpl.reportHeartbeat(heartbeatOrigin);
 *        | 
 *        |    
 *        +---->  heartbeatMonitor.reportHeartbeat(); 
 *        |      // 這裏就是重置RM 這裏對應的Monitor。在reportHeartbeat重置 JM monitor線程的觸發,即cancelTimeout取消註冊時候的超時定時任務,並且註冊下一個超時檢測futureTimeout;這代表TM正常執行。
 *        |     
 *        +----> heartbeatListener.reportPayload    
 *        |      //把Target節點的最新的heartbeatPayload通知給 TaskManagerHeartbeatListener。heartbeatListerner是外部傳入的,它根據所擁有的節點的心跳記錄做監聽管理。 
 *        |      
 *        +----> slotManager.reportSlotStatus(instanceId, payload.getSlotReport());
 *        |     // TaskManagerHeartbeatListener中調用,上報收到TaskExecutor的負載信息
 *        |        

下面是具體文字描述。

5.4.1 ResourceManager主動發起

5.4.1.1 Sender遍歷所有監控的Monitor(Target)

心跳機制是由Sender主動發起的。這裏就是 ResourceManager 的HeartbeatManagerSenderImpl中定時schedual調用,這裡會遍歷所有監控的Monitor(Target),逐一在Target上調用requestHeartbeat。

// HeartbeatManagerSenderImpl中的代碼

	@Override
	public void run() {
		if (!stopped) {
			for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {
        // 這裏向被監控對象節點發起一次心跳請求,載荷是heartbeatPayLoad,要求被監控對象回應心跳
				requestHeartbeat(heartbeatMonitor);
			}
			getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
		}
	}
}

// 運行時候的變量
this = {HeartbeatManagerSenderImpl@9037} 
 heartbeatPeriod = 10000
 heartbeatTimeoutIntervalMs = 50000
 ownResourceID = {ResourceID@8788} "d349506cae32cadbe99b9f9c49a01c95"
 heartbeatListener = {ResourceManager$TaskManagerHeartbeatListener@8789} 
 mainThreadExecutor = {RpcEndpoint$MainThreadExecutor@8790} 

// 調用棧如下
requestHeartbeat:711, ResourceManager$2 (org.apache.flink.runtime.resourcemanager)
requestHeartbeat:702, ResourceManager$2 (org.apache.flink.runtime.resourcemanager)
requestHeartbeat:92, HeartbeatManagerSenderImpl (org.apache.flink.runtime.heartbeat)
run:81, HeartbeatManagerSenderImpl (org.apache.flink.runtime.heartbeat)
call:511, Executors$RunnableAdapter (java.util.concurrent)
run$$$capture:266, FutureTask (java.util.concurrent)
run:-1, FutureTask (java.util.concurrent)
5.4.1.2 Target進行具體操作

具體監控對象 Target 會調用自定義的requestHeartbeat。

HeartbeatManagerSenderImpl

	private void requestHeartbeat(HeartbeatMonitor<O> heartbeatMonitor) {
		O payload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId());
		final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();

    // 這裏就是具體監控對象
		heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload);
	}

heartbeatTarget = {ResourceManager$2@10688} 
 taskExecutorGateway = {$Proxy42@9459} "org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler@6d0c8334"
 this$0 = {StandaloneResourceManager@9458} 

請注意,每一個Target都是由ResourceManager生成的。ResourceManager之前註冊成為Monitor時候就註冊了這個HeartbeatTarget。

這個HeartbeatTarget的定義如下,兩個函數是:

  • receiveHeartbeat :這個是空,因為RM沒有自己的Sender。

  • requestHeartbeat :這個針對TM,就是調用TM的heartbeatFromResourceManager,當然是通過RPC調用。

5.4.1.3 RPC調用

會調用到ResourceManager定義的函數requestHeartbeat,而requestHeartbeat會通過gateway調用到TM,這就是主動對TM發起了心跳請求。

taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() {
   @Override
   public void receiveHeartbeat(ResourceID resourceID, Void payload) {
      // the ResourceManager will always send heartbeat requests to the TaskManager
   }

   @Override
   public void requestHeartbeat(ResourceID resourceID, Void payload) {
      //就是調用到這裏
      taskExecutorGateway.heartbeatFromResourceManager(resourceID); 
   }
});

5.4.2 RM通過RPC調用TM

通過taskExecutorGateway。心跳程序執行就通過RPC從RM跳躍到了TM。

taskExecutorGateway.heartbeatFromResourceManager 的意義就是:通過RPC調用回到TaskExecutor。這個是在TaskExecutorGateway就定義好的。

// TaskExecutor RPC gateway interface.
public interface TaskExecutorGateway extends RpcGateway

TaskExecutor實現了TaskExecutorGateway,所以具體在TaskExecutor內部實現了接口函數。

@Override
public void heartbeatFromResourceManager(ResourceID resourceID) {
    //調用到了這裏 ...........
		resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
}

TM中,resourceManagerHeartbeatManager 定義如下。

/** The heartbeat manager for resource manager in the task manager. */
private final HeartbeatManager<Void, TaskExecutorHeartbeatPayload> resourceManagerHeartbeatManager;

所以下面就是執行TM中的Receiver HM。在這個過程中有兩個處理步驟:

  1. 調用對應HeartbeatMonitor的reportHeartbeat方法,cancelTimeout取消註冊時候的超時定時任務,並且註冊下一個超時檢測futureTimeout;
  2. 調用monitorTarget的receiveHeartbeat方法,也就是會通過rpc調用JobMaster的heartbeatFromTaskManager方法返回一些負載信息;

具體是調用 requestHeartbeat@HeartbeatManager。在其中會

  • 調用reportHeartbeat@HeartbeatMonitor,記錄發起請求的這個時間點,然後resetHeartbeatTimeout。
  • 在resetHeartbeatTimeout@HeartbeatMonitor之中,如果Monitor狀態依然是RUNNING,則取消之前設置的ScheduledFuture。重新創建一個ScheduleFuture。因為如果不取消,則之前那個ScheduleFuture運行時會調用HeartbeatMonitorImpl.run函數,run直接compareAndSet后,通知目標函數目前已經超時,即調用heartbeatListener.notifyHeartbeatTimeout。
  • 調用 heartbeatListener.reportPayload,把Target節點的最新的heartbeatPayload通知給heartbeatListener。
  • 調用 heartbeatTarget.receiveHeartbeat(getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin)); 就是響應一個心跳消息回給RM。
	@Override
	public void requestHeartbeat(final ResourceID requestOrigin, I heartbeatPayload) {
		if (!stopped) {
			log.debug("Received heartbeat request from {}.", requestOrigin);

			final HeartbeatTarget<O> heartbeatTarget = reportHeartbeat(requestOrigin);

			if (heartbeatTarget != null) {
				if (heartbeatPayload != null) {
					heartbeatListener.reportPayload(requestOrigin, heartbeatPayload);
				}

				heartbeatTarget.receiveHeartbeat(getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin));
			}
		}
	}

最後會通過resourceManagerGateway.heartbeatFromTaskManager 調用到 ResourcManager。

5.4.3 TM 通過RPC回到 RM

JobMaster在接收到rpc請求后調用其heartbeatFromTaskManager方法,會調用taskManagerHeartbeatManager的receiveHeartbeat方法,在這個過程中同樣有兩個處理步驟:

  1. 調用對應HeartbeatMonitor的reportHeartbeat方法,cancelTimeout取消註冊時候的超時定時任務,並且註冊下一個超時檢測futureTimeout;
  2. 調用TaskManagerHeartbeatListener的reportPayload方法,上報收到TaskExecutor的負載信息

至此一次完成心跳過程已經完成,會根據heartbeatInterval執行下一次心跳。

5.5 超時處理

5.5.1 TaskManager

首先,在HeartbeatMonitorImpl中,如果超時,會調用Listener。

public void run() {
   // The heartbeat has timed out if we're in state running
   if (state.compareAndSet(State.RUNNING, State.TIMEOUT)) {
      heartbeatListener.notifyHeartbeatTimeout(resourceID);
   }
}

這就來到了ResourceManagerHeartbeatListener,會嘗試再次連接RM。

private class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, TaskExecutorHeartbeatPayload> {

   @Override
   public void notifyHeartbeatTimeout(final ResourceID resourceId) {
      validateRunsInMainThread();
      // first check whether the timeout is still valid
      if (establishedResourceManagerConnection != null && establishedResourceManagerConnection.getResourceManagerResourceId().equals(resourceId)) {
         reconnectToResourceManager(new TaskManagerException(
            String.format("The heartbeat of ResourceManager with id %s timed out.", resourceId)));
      } else {
         .....
      }
   }

5.5.2 ResourceManager

RM就直接簡單粗暴,關閉連接。

private class TaskManagerHeartbeatListener implements HeartbeatListener<TaskExecutorHeartbeatPayload, Void> {

   @Override
   public void notifyHeartbeatTimeout(final ResourceID resourceID) {
      validateRunsInMainThread();
      closeTaskManagerConnection(
         resourceID,
         new TimeoutException("The heartbeat of TaskManager with id " + resourceID + "  timed out."));
   }
}  

0x06 解決問題

心跳機制我們講解完了,但是我們最初提到的異常應該如何解決呢?在程序最開始生成環境變量時候,通過設置環境變量的配置即可搞定:

Configuration conf = new Configuration();
conf.setString("heartbeat.timeout", "18000000");
final LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);

0x07 參考

[flink-001]flink的心跳機制

Flink中心跳機制

flink1.8 心跳服務

你有必要了解一下Flink底層RPC使用的框架和原理

flink RPC(akka)

弄清Flink1.8的遠程過程調用(RPC)

Apache Flink源碼解析 (七)Flink RPC的底層實現

flink源碼閱讀第一篇—入口

flink-on-yarn 基礎架構和啟動流程

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※教你寫出一流的銷售文案?

Mysql和Redis數據同步策略

目錄

  • 為什麼對緩存只刪除不更新
  • 先更新數據庫還是先刪除緩存?
  • Cache Aside Pattern
  • Double-Delete
  • Read/Write Through Pattern
  • Write Behind
  • 設置緩存過期時間
  • 總結

為什麼對緩存只刪除不更新

不更新緩存是防止併發更新導致的數據不一致。
所以為了降低數據不一致的概率,不應該更新緩存,而是直接將其刪除,
然後等待下次發生cache miss時再把數據庫中的數據同步到緩存。

先更新數據庫還是先刪除緩存?

有兩個選擇:
1. 先刪除緩存,再更新數據庫
2. 先更新數據庫,再刪除緩存

如果先刪除緩存,有一個明顯的邏輯錯誤:考慮兩個併發操作,線程A刪除緩存后,線程B讀該數據時會發生Cache Miss,然後從數據庫中讀出該數據並同步到緩存中,此時線程A更新了數據庫。
結果導致,緩存中是老數據,數據庫中是新數據,並且之後的讀操作都會直接讀取緩存中的臟數據。(直到key過期被刪除或者被LRU策略踢出)
如果數據庫更新成功后,再刪除緩存,就不會有上面這個問題。
可能是由於數據庫優先,第二種方式也被稱為Cache Aside Pattern。

Cache Aside Pattern

cache aside在絕大多數情況下能做到數據一致性,但是在極端情況仍然存在問題。

  • 首先更新數據庫(A)和刪除緩存(B)不是原子操作,任何在A之後B之前的讀操作,都會讀到redis中的舊數據。
    正常情況下操作緩存的速度會很快,通常是毫秒級,臟數據存在的時間極端。
    但是,對超高併發的應用可能會在意這幾毫秒。
  • 更新完數據庫后,線程意外被kill掉(真的很不幸),由於沒有刪除緩存,緩存中的臟數據會一直存在。
  • 線程A讀數據時cache miss,從Mysql中查詢到數據,還沒來得及同步到redis中,
    此時線程B更新了數據庫並把Redis中的舊值刪除。隨後,線程A把之前查到的數據同步到了Redis。
    顯然,此時redis中的是臟數據。
    通常數據庫讀操作比寫操作快很多,所以除非線程A在同步redis前意外卡住了,否則發生上述情況的概率極低。

雖然以上情況都有可能發生,但是發生的概率相比“先刪除緩存再更新數據庫”會低很多。

Double-Delete

前面我們講到先刪除緩存(A)、后更新數據庫(B)的方案有明顯的錯誤,任何發生在A操作和B操作之間的併發讀都會造成數據的最終不一致。
Double-Delete是一種比較笨拙的修補方案,執行過程如下:

1.delete redis cache
2.update database
3.sleep(500ms)
3.delete redis cache

也很好理解,在睡眠500ms后嘗試再次刪除緩存中的臟數據,它通過兩次刪除來盡可能做到數據的最終一致。
其實,Double-Delete在數據一致性上比Cache Aside更靠譜,但是它的代價是昂貴的,
即使,把睡眠時間縮短到100ms,對耗時敏感的應用也不會考慮這種方案。

Read/Write Through Pattern

cache aside是我們自己的應用程序維護兩個數據存儲系統,而Read/Write Through Pattern是把同步數據的問題交給緩存系統了,應用程序不需要關心。
Read Through是指發生cache miss時,緩存系統自動去數據庫加載數據。
Write Through是指如果cache miss,直接更新數據庫,然後返回,如果cache hit,則更新緩存后,由緩存系統自動同步到數據庫。
以Redis為例,通常我們不會把數據庫的數據全部緩存到redis,而是採用一定的數據精簡或壓縮策略,以節省緩存空間。
就是說,讓緩存系統設計出通用的緩存方案不太現實,不過根據自己的業務定製一個在項目內部通用的中間件是可行的。

Write Behind

Write Behind方案在更新數據時,只更新緩存,不更新數據庫。而是由另外一個服務異步的把數據更新到數據庫。
邏輯上,和Linux中的write back很類似。這個設計的好處是,I/O操作很快,因為是純內存操作。
但是由於異步寫庫,可能要犧牲一些數據一致性,譬如突然宕機會丟失所有未寫入數據庫的內存數據。

阿里巴巴的Canal中間件是一種相反的設計,它先更新mysql,然後通過binlog把數據自動同步到redis。
這種方案會全量同步數據到redis,不適合只緩存熱點數據的應用。

設置緩存過期時間

無論採用哪種策略都應該設置緩存的過期時間。
過期時間設置太長,臟數據存在的時間就越長;
過期時間設置太短,大量的cache miss會讓查詢直接進入數據庫。
所以,需要根據業務場景考慮過期時間。

總結

以上沒有哪種方案是完美的,都無法做到強一致性。
我們總要在性能和數據準確性之間做出妥協。
Cache Aside Pattern適用於絕大多數的場景。

https://www.pixelstech.net/article/1562504974-Consistency-between-Redis-Cache-and-SQL-Database
https://coolshell.cn/articles/17416.html
為什麼不更新緩存,而是直接刪除

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※幫你省時又省力,新北清潔一流服務好口碑

※回頭車貨運收費標準

一起玩轉微服務(12)——揭密starter

介紹

Spring Boot的starter主要用來簡化依賴用的,對於企業級開發中的與第三方的集成,可以通過一段簡單的配置來完成,這樣開發人員無需再對包依賴的問題頭疼。Spring Boot為我們提供了簡化企業級開發的絕大多數場景的starter pom,只需要指定需要配置的starter,Spring Boot會自動為我們提供配置好的bean。

通常流程

通常我們要搭建一個基於Spring的Web應用,我們需要做以下一些工作:

pom文件中引入相關jar包,包括spring、springmvc、redis、mybaits、log4j、mysql-connector-java 等等相關jar …

配置web.xml,Listener配置、Filter配置、Servlet配置、log4j配置、error配置 …

配置數據庫連接、配置spring事務

配置視圖解析器

開啟註解、自動掃描功能

配置完成後部署tomcat、啟動調試

……

花在搭建一個初始項目,可能一個小時就過去了或者半天就過了,但是用了SpringBoot之後一切都會變得非常便捷。

常用的starter

Spring Boot常用的starter(啟動器)包括:

  • Spring-boot-starter-logging :使用 Spring Boot 默認的日誌框架 Logback。
  • Spring-boot-starter-log4j :添加 Log4j 的支持。
  • Spring-boot-starter-web :支持 Web 應用開發,包含 Tomcat 和 Spring-mvc。
  • Spring-boot-starter-tomcat :使用 Spring Boot 默認的 Tomcat 作為應用服務器。
  • Spring-boot-starter-jetty :使用 Jetty 而不是默認的 Tomcat 作為應用服務器。
  • Spring-boot-starter-test :包含常用的測試所需的依賴,如 Junit、Hamcrest、Mockito 和 Spring-test 等。
  • Spring-boot-starter-AOP :包含 Spring-AOP 和 AspectJ 來支持面向切面編程(AOP)。
  • Spring-boot-starter-security :包含 Spring-security。
  • Spring-boot-starter-jdbc :支持使用 JDBC 訪問數據庫。
  • Spring-boot-starter-redis :支持使用 Redis。
  • Spring-boot-starter-data-mongodb :包含 Spring-data-mongodb 來支持 MongoDB。
  • Spring-boot-starter-data-jpa :包含 Spring-data-jpa、Spring-orm 和 Hibernate 來支持 JPA。
  • Spring-boot-starter-amqp :通過 Spring-rabbit 支持 AMQP。
  • Spring-boot-starter-actuator : 添加適用於生產環境的功能,如性能指標和監測等功能。

當然,如果有必要,也可以定製自己的starter。

起步依賴

在我們的pom文件裏面引入以下jar:

spring-boot-starter-web包自動幫我們引入了web模塊開發需要的相關jar包。

 

mybatis-spring-boot-starter幫我們引入了dao開發相關的jar包。 spring-boot-starter-xxx是官方提供的starter,xxx-spring-boot-starter是第三方提供的starter。

截圖看一下我們的mybatis-spring-boot-starter

可以看出mybatis-spring-boot-starter並沒有任何源碼,只有一個pom文件,它的作用就是幫我們引入其它jar。

 

得益於starter的作用,使用SpringBoot確實方便,但對剛剛上手SpringBoot的人來說,可能只知道配置屬性是在application.xml或application.yml中添加,但他們各自的屬性都有哪些,具體怎麼配置,卻無從下手。這裏先解決SpringBoot-starter中各屬性的配置問題。

Mybatis的配置是怎麼生效的?查看示例工程的pom依賴:

注意到mybatis-spring-boot-starter幫我們自動依賴了Mybatis所需jar包,其中有一個負責自動配置的mybatis-spring-boot-autoconfigure.jar,緊接着打開此jar,如下:

META-INF/spring-configuration-metadata.json中便是Mybatis在SpringBoot中的所有配置屬性和介紹。

SpringBoot-starter自動配置bean

現在已得知jar包是怎麼樣自動依賴進來,以及他們的配置屬性,那麼接下來該考慮Mybatis所需的bean(如必需的sqlSessionFactory、sqlSessionTemplate等)是如何被自動加載的?

理所應當地,我們繼續去查看mybatis-spring-boot-autoconfigure.jar,注意到裏面有一個自動配置的類MybatisAutoConfiguration:

(1)@Configuration:被掛上@Configuration註解,表明它是一個配置類,作用等同於xml配置,裏面有被@Bean註解的方法,也等同於xml配置的各種。

(2)@ConditionalOnClass/@ConditionalOnBean:自動配置條件註解,用於在某一部分配置中,將另一模塊部分的配置自動加載進來,因為隨着系統越來越大,配置內容越來越多,我們應當將Mybatis的配置放在一處,將log4j的配置放在一處,將SpringBoot自身的配置放在一處,當他們需要互相依賴時,可通過這類註解進行自動配置,如下:

@ConditionalOnClass @ConditionalOnMissingClass
@ConditionalOnBean @ConditionalOnMissingBean
@ConditionalOnProperty
@ConditionalOnResource
@ConditionalOnWebApplication @ConditionalOnNotWebApplication
@ConditionalOnExpression
 
@AutoConfigureAfter @AutoConfigureBefore @AutoConfigureOrder(指定順序)
 

(3)@EnableConfigurationProperties:啟用對@ConfigurationProperties註解的bean的支持,這裏對應了配置屬性類MybatisProperties,它裏面定義了Mybatis的所有配置。

(4)@AutoConfigureAfter:應在其他指定的自動配置類之後應用自動配置。即org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration被自動配置后,才會接着自動配置MybatisAutoConfiguration。這裏也解釋了為什麼我們在application.xml中只配置了數據源,而沒有配置Mybatis,但是Mybatis可以正常查庫的原因,就是因為它們配置之間的依賴關係。

到這裏,差不多明白了starter自動配置bean的方式,但是如若再去深究,各種starter的bean是如何被自動加載的,猜想會不會是項目啟動后,SpringBoot自動掃描裏面所有的jar包,再去掃描所有的類,從而將各個bean放置IOC容器中。從結果來看,肯定是SpringBoot在啟動時確確實實地自動加載了數據源和Mybatis相關的bean,不然他們無法正常工作。

回想在我們啟動示例工程時,SpringBoot會自動掃描啟動類所在包下的所有類,而如果還去掃描所有的jar包的話,又是具體怎麼做到的?不妨從入口類調試一把,在SpringApplication.run(DemoApplication.class, args)打斷點,一直追蹤到getSpringFactoriesInstances這塊:

查看SpringFactoriesLoader.loadFactoryNames的方法註釋:

 

使用給定的類加載器從META-INF / spring.factories加載給定類型的工廠實現的完全限定類名。

這裏的spring.factories剛好也存在於mybatis-spring-boot-autoconfigure.jar中,

繼續調試,進入SpringFactoriesLoader.loadFactoryNames,

這裏用類加載器得到工程中所有jar包中的META-INF/spring.factories文件資源,進而通過此文件得到了一些包括自動配置相關的類的集合,有各種工廠類、監聽器、處理器、過濾器、初始化器等等,如下:

最後的org.springframework.boot.autoconfigure.EnableAutoConfiguration集合中當然包括了org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration和org.mybatis.spring.boot.autoconfigure.MybatisAutoConfiguration。接着必然是將實例化的各個bean放進IOC容器中。

至此我們便明白了SpringBoot是如何自動配置starter裏面的bean的。

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※教你寫出一流的銷售文案?

重學 Java 設計模式:實戰迭代器模式「模擬公司組織架構樹結構關係,深度迭代遍歷人員信息輸出場景」

作者:小傅哥
博客:https://bugstack.cn – 原創系列專題文章

沉澱、分享、成長,讓自己和他人都能有所收穫!

一、前言

相信相信的力量!

從懵懂的少年,到拿起鍵盤,可以寫一個HelloWorld。多數人在這並不會感覺有多難,也不會認為做不出來。因為這樣的例子,有老師的指導、有書本的例子、有前人的經驗。但隨着你的開發時間越來越長,要解決更複雜的問題或者技術創新,因此在網上搜了幾天幾夜都沒有答案,這個時候是否想過放棄,還是一直堅持不斷的嘗試一點點完成自己心裏要的結果。往往這種沒有前車之鑒需要自己解決問題的時候,可能真的會折磨到要崩潰,但你要願意執着、願意倔強,願意選擇相信相信的力量,就一定能解決。哪怕解決不了,也可以在這條路上摸索出其他更多的收穫,為後續前進的道路填充好墊腳石。

時間緊是寫垃圾代碼的理由?

擰螺絲?Ctrl+C、Ctrl+V?貼膏藥一樣寫代碼?沒有辦法,沒有時間,往往真的是借口,胸中沒用筆墨,才只能湊合。難道一定是好好寫代碼就浪費時間,拼湊CRUD就快嗎,根本不可能的。因為不會,沒用實操過,很少架構出全場景的設計,才很難寫出優良的代碼。多增強自身的編碼(武術)修為,在各種編碼場景中讓自己變得老練,才好應對緊急情況下的需求開發和人員安排。就像韓信一樣有謀有略,才能執掌百萬雄兵。

不要只是做個工具人!

因為日常的編寫簡單業務需求,導致自己像個工具人一樣,日久天長的也就很少去深入學習更多技術棧。看見有工具、有組件、有框架,拿來就用用,反正沒什麼體量也不會出什麼問題。但如果你想要更多的收入,哪怕是重複的造輪子,你也要去嘗試造一個,就算不用到生產,自己玩玩總可以吧。有些事情只有自己經歷過,才能有最深的感觸,參与過實踐過,才好總結點評學習。

二、開發環境

  1. JDK 1.8
  2. Idea + Maven
  3. 涉及工程一個,可以通過關注公眾號bugstack蟲洞棧,回復源碼下載獲取(打開獲取的鏈接,找到序號18)
工程 描述
itstack-demo-design-15-00 開發樹形組織架構關係迭代器

三、迭代器模式介紹

迭代器模式,常見的就是我們日常使用的iterator遍歷。雖然這個設計模式在我們的實際業務開發中的場景並不多,但卻幾乎每天都要使用jdk為我們提供的list集合遍歷。另外增強的for循環雖然是循環輸出數據,但是他不是迭代器模式。迭代器模式的特點是實現Iterable接口,通過next的方式獲取集合元素,同時具備對元素的刪除等操作。而增強的for循環是不可以的。

這種設計模式的優點是可以讓我們以相同的方式,遍歷不同的數據結構元素,這些數據結構包括;數組鏈表等,而用戶在使用遍歷的時候並不需要去關心每一種數據結構的遍歷處理邏輯,從讓使用變得統一易用。

四、案例場景模擬

在本案例中我們模擬迭代遍歷輸出公司中樹形結構的組織架構關係中僱員列表

大部分公司的組織架構都是金字塔結構,也就這種樹形結構,分為一級、二級、三級等部門,每個組織部門由僱員填充,最終體現出一個整體的樹形組織架構關係。

一般我們常用的遍歷就是jdk默認提供的方法,對list集合遍歷。但是對於這樣的偏業務特性較大的樹形結構,如果需要使用到遍歷,那麼就可以自己來實現。接下來我們會把這個組織層次關係通過樹形數據結構來實現,並完成迭代器功能。

五、迭代器模式遍歷組織結構

在實現迭代器模式之前可以先閱讀下javalist方法關於iterator的實現部分,幾乎所有的迭代器開發都會按照這個模式來實現,這個模式主要分為以下幾塊;

  1. Collection,集合方法部分用於對自定義的數據結構添加通用方法;addremoveiterator等核心方法。
  2. Iterable,提供獲取迭代器,這個接口類會被Collection繼承。
  3. Iterator,提供了兩個方法的定義;hasNextnext,會在具體的數據結構中寫實現方式。

除了這樣通用的迭代器實現方式外,我們的組織關係結構樹,是由節點和節點間的關係鏈構成,所以會比上述的內容多一些入參。

1. 工程結構

itstack-demo-design-15-02
└── src
    ├── main
    │   └── java
    │       └── org.itstack.demo.design
    │           ├── group
    │           │	├── Employee.java
    │           │	├── GroupStructure.java
    │           │	└── Link.java
    │           └──  lang
    │            	├── Collection.java
    │            	├── Iterable.java
    │            	└── Iterator.java
    └── test
        └── java
            └── org.itstack.demo.design.test
                └── ApiTest.java

迭代器模式模型結構

  • 以上是我們工程類圖的模型結構,左側是對迭代器的定義,右側是在數據結構中實現迭代器功能。
  • 關於左側部分的實現與jdk中的方式是一樣的,所以在學習的過程中可以互相參考,也可以自己擴展學習。
  • 另外這個遍歷方式一個樹形結構的深度遍歷,為了可以更加讓學習的小夥伴容易理解,這裏我實現了一種比較簡單的樹形結構深度遍歷方式。後續讀者也可以把遍歷擴展為橫向遍歷也就是寬度遍歷。

2. 代碼實現

2.1 僱員實體類

/**
 * 僱員
 */
public class Employee {

    private String uId;   // ID
    private String name;  // 姓名
    private String desc;  // 備註
    
    // ...get/set
}
  • 這是一個簡單的僱員類,也就是公司員工的信息類,包括必要的信息;id、姓名、備註。

2.2 樹節點鏈路

/**
 * 樹節點鏈路
 */
public class Link {

    private String fromId; // 僱員ID
    private String toId;   // 僱員ID    
    
    // ...get/set
}
  • 這個類用於描述結構樹中的各個節點之間的關係鏈,也就是A to BB to CB to D,以此描述出一套完整的樹組織結構。

2.3 迭代器定義

public interface Iterator<E> {

    boolean hasNext();

    E next();
    
}
  • 這裏的這個類和javajdk中提供的是一樣的,這樣也方面後續讀者可以對照listIterator進行源碼學習。
  • 方法描述;hasNext,判斷是否有下一個元素、next,獲取下一個元素。這個在list的遍歷中是經常用到的。

2.4 可迭代接口定義

public interface Iterable<E> {

    Iterator<E> iterator();

}
  • 這個接口中提供了上面迭代器的實現Iterator的獲取,也就是後續在自己的數據結構中需要實現迭代器的功能並交給Iterable,由此讓外部調用方進行獲取使用。

2.5 集合功能接口定義

public interface Collection<E, L> extends Iterable<E> {

    boolean add(E e);

    boolean remove(E e);

    boolean addLink(String key, L l);

    boolean removeLink(String key);

    Iterator<E> iterator();

}
  • 這裏我們定義集合操作接口;Collection,同時繼承了另外一個接口Iterable的方法iterator()。這樣後續誰來實現這個接口,就需要實現上述定義的一些基本功能;添加元素刪除元素遍歷
  • 同時你可能注意到這裏定義了兩個泛型<E, L>,因為我們的數據結構一個是用於添加元素,另外一個是用於添加樹節點的鏈路關係。

2.6 (核心)迭代器功能實現

public class GroupStructure implements Collection<Employee, Link> {

    private String groupId;                                                 // 組織ID,也是一個組織鏈的頭部ID
    private String groupName;                                               // 組織名稱
    private Map<String, Employee> employeeMap = new ConcurrentHashMap<String, Employee>();  // 僱員列表
    private Map<String, List<Link>> linkMap = new ConcurrentHashMap<String, List<Link>>();  // 組織架構關係;id->list
    private Map<String, String> invertedMap = new ConcurrentHashMap<String, String>();       // 反向關係鏈

    public GroupStructure(String groupId, String groupName) {
        this.groupId = groupId;
        this.groupName = groupName;
    }

    public boolean add(Employee employee) {
        return null != employeeMap.put(employee.getuId(), employee);
    }

    public boolean remove(Employee o) {
        return null != employeeMap.remove(o.getuId());
    }

    public boolean addLink(String key, Link link) {
        invertedMap.put(link.getToId(), link.getFromId());
        if (linkMap.containsKey(key)) {
            return linkMap.get(key).add(link);
        } else {
            List<Link> links = new LinkedList<Link>();
            links.add(link);
            linkMap.put(key, links);
            return true;
        }
    }

    public boolean removeLink(String key) {
        return null != linkMap.remove(key);
    }

    public Iterator<Employee> iterator() {

        return new Iterator<Employee>() {

            HashMap<String, Integer> keyMap = new HashMap<String, Integer>();

            int totalIdx = 0;
            private String fromId = groupId;  // 僱員ID,From
            private String toId = groupId;   // 僱員ID,To

            public boolean hasNext() {
                return totalIdx < employeeMap.size();
            }

            public Employee next() {
                List<Link> links = linkMap.get(toId);
                int cursorIdx = getCursorIdx(toId);

                // 同級節點掃描
                if (null == links) {
                    cursorIdx = getCursorIdx(fromId);
                    links = linkMap.get(fromId);
                }

                // 上級節點掃描
                while (cursorIdx > links.size() - 1) {
                    fromId = invertedMap.get(fromId);
                    cursorIdx = getCursorIdx(fromId);
                    links = linkMap.get(fromId);
                }

                // 獲取節點
                Link link = links.get(cursorIdx);
                toId = link.getToId();
                fromId = link.getFromId();
                totalIdx++;

                // 返回結果
                return employeeMap.get(link.getToId());
            }
             
            // 給每個層級定義寬度遍歷進度
            public int getCursorIdx(String key) {
                int idx = 0;
                if (keyMap.containsKey(key)) {
                    idx = keyMap.get(key);
                    keyMap.put(key, ++idx);
                } else {
                    keyMap.put(key, idx);
                }
                return idx;
            }
        };
    }

}
  • 以上的這部分代碼稍微有點長,主要包括了對元素的添加和刪除。另外最重要的是對遍歷的實現 new Iterator<Employee>
  • 添加和刪除元素相對來說比較簡單,使用了兩個map數組結構進行定義;僱員列表組織架構關係;id->list。當元素添加元素的時候,會分別在不同的方法中向map結構中進行填充指向關係(A->B),也就構建出了我們的樹形組織關係。

迭代器實現思路

  1. 這裏的樹形結構我們需要做的是深度遍歷,也就是左側的一直遍歷到最深節點。
  2. 當遍歷到最深節點后,開始遍歷最深節點的橫向節點。
  3. 當橫向節點遍歷完成后則向上尋找橫向節點,直至樹結構全部遍歷完成。

3. 測試驗證

3.1 編寫測試類

@Test
public void test_iterator() { 
    // 數據填充
    GroupStructure groupStructure = new GroupStructure("1", "小傅哥");  
    
    // 僱員信息
    groupStructure.add(new Employee("2", "花花", "二級部門"));
    groupStructure.add(new Employee("3", "豆包", "二級部門"));
    groupStructure.add(new Employee("4", "蹦蹦", "三級部門"));
    groupStructure.add(new Employee("5", "大燒", "三級部門"));
    groupStructure.add(new Employee("6", "虎哥", "四級部門"));
    groupStructure.add(new Employee("7", "玲姐", "四級部門"));
    groupStructure.add(new Employee("8", "秋雅", "四級部門"));   
    
    // 節點關係 1->(1,2) 2->(4,5)
    groupStructure.addLink("1", new Link("1", "2"));
    groupStructure.addLink("1", new Link("1", "3"));
    groupStructure.addLink("2", new Link("2", "4"));
    groupStructure.addLink("2", new Link("2", "5"));
    groupStructure.addLink("5", new Link("5", "6"));
    groupStructure.addLink("5", new Link("5", "7"));
    groupStructure.addLink("5", new Link("5", "8"));       

    Iterator<Employee> iterator = groupStructure.iterator();
    while (iterator.hasNext()) {
        Employee employee = iterator.next();
        logger.info("{},僱員 Id:{} Name:{}", employee.getDesc(), employee.getuId(), employee.getName());
    }
}

3.2 測試結果

22:23:37.166 [main] INFO  org.itstack.demo.design.test.ApiTest - 二級部門,僱員 Id:2 Name:花花
22:23:37.168 [main] INFO  org.itstack.demo.design.test.ApiTest - 三級部門,僱員 Id:4 Name:蹦蹦
22:23:37.169 [main] INFO  org.itstack.demo.design.test.ApiTest - 三級部門,僱員 Id:5 Name:大燒
22:23:37.169 [main] INFO  org.itstack.demo.design.test.ApiTest - 四級部門,僱員 Id:6 Name:虎哥
22:23:37.169 [main] INFO  org.itstack.demo.design.test.ApiTest - 四級部門,僱員 Id:7 Name:玲姐
22:23:37.169 [main] INFO  org.itstack.demo.design.test.ApiTest - 四級部門,僱員 Id:8 Name:秋雅
22:23:37.169 [main] INFO  org.itstack.demo.design.test.ApiTest - 二級部門,僱員 Id:3 Name:豆包

Process finished with exit code 0
  • 從遍歷的結果可以看到,我們是順着樹形結構的深度開始遍歷,一直到右側的節點3僱員 Id:2、僱員 Id:4...僱員 Id:3

六、總結

  • 迭代器的設計模式從以上的功能實現可以看到,滿足了單一職責和開閉原則,外界的調用方也不需要知道任何一個不同的數據結構在使用上的遍歷差異。可以非常方便的擴展,也讓整個遍歷變得更加乾淨整潔。
  • 但從結構的實現上可以看到,迭代器模式的實現過程相對來說是比較負責的,類的實現上也擴增了需要外部定義的類,使得遍歷與原數據結構分開。雖然這是比較麻煩的,但可以看到在使用java的jdk時候,迭代器的模式還是很好用的,可以非常方便擴展和升級。
  • 以上的設計模式場景實現過程可能對新人有一些不好理解點,包括;迭代器三個和接口的定義、樹形結構的數據關係、樹結構深度遍歷思路。這些都需要反覆實現練習才能深入的理解,事必躬親,親歷親為,才能讓自己掌握這些知識。

七、推薦閱讀

  • 1. 重學 Java 設計模式:實戰工廠方法模式「多種類型商品不同接口,統一發獎服務搭建場景」
  • 2. 重學 Java 設計模式:實戰原型模式「上機考試多套試,每人題目和答案亂序排列場景」
  • 3. 重學 Java 設計模式:實戰橋接模式「多支付渠道(微信、支付寶)與多支付模式(刷臉、指紋)場景」
  • 4. 重學 Java 設計模式:實戰組合模式「營銷差異化人群發券,決策樹引擎搭建場景」
  • 5. 重學 Java 設計模式:實戰外觀模式「基於SpringBoot開發門面模式中間件,統一控制接口白名單場景」
  • 6. 重學 Java 設計模式:實戰享元模式「基於Redis秒殺,提供活動與庫存信息查詢場景」

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※幫你省時又省力,新北清潔一流服務好口碑

※回頭車貨運收費標準

奧迪看衰純電動車市場 新款車型延後推出

 

隨著寶馬等公司逐漸擴大投入純電動車市場,奧迪並未選擇加速緊追,相反的,奧迪並不看好純電動車市場的發展。

近日歐洲媒體 Worldcarfuns 報導,奧迪公司並不想花費大量的時間和金錢來推出出色的電動車型,來與寶馬、特斯拉等品牌的電動車競爭。

根據奧迪的純電動車產品規劃,其曾計劃推出的 R8 電動版、Q6 電動版等 4 款新純電動車,量產時間都將推遲延後。對此,奧迪董事會成員兼銷售總監 Luca de Meo 表示,奧迪對純電動車的前景並不看好,因此暫時沒有生產純電動車的計劃。

 

(圖片來源:)

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※教你寫出一流的銷售文案?

Tesla 解決大陸商標爭議、Model X 行情看俏

電動車製造商特斯拉 (Tesla ) 6 日宣佈,該公司已和在大陸搶先註冊商標的商人占寶生 (Zhan Baosheng) 敲定協議,雙方化解了商標爭議。占寶生也會轉交他在大陸註冊的網站名稱,當中包括「tesla.cn」、「teslamotors.cn」。   特斯拉表示,占寶生已同意讓大陸主管機關註銷他先前註冊或申請的商標,而且完全不向特斯拉收費。   MarketWatch 則指出,特斯拉計畫推出的電動休旅車 (SUV)「Model X」,很有機會比電動轎車「Model S」更受歡迎。特斯拉目前首度暫停旗下唯一一座組裝廠,以重整產品線加快 Model S 的出貨速度、同時也為生產次世代電動休旅車「Model X」預做準備。  

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※幫你省時又省力,新北清潔一流服務好口碑

※回頭車貨運收費標準

東電紀錄指出 日本輻射污染魚 放射性鍶含量再破紀錄

文:宋瑞文(媽媽監督核電廠聯盟特約撰述)

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※想知道最厲害的網頁設計公司"嚨底家"!

※幫你省時又省力,新北清潔一流服務好口碑

※別再煩惱如何寫文案,掌握八大原則!

人糞變肥料 混雜8萬種化學物質 美國掀起再利用論戰

環境資訊中心綜合外電;姜唯 編譯;林大利 審校

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※教你寫出一流的銷售文案?

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※回頭車貨運收費標準

※別再煩惱如何寫文案,掌握八大原則!

※超省錢租車方案