[源碼解析] 從TimeoutException看Flink的心跳機制

[源碼解析] 從TimeoutException看Flink的心跳機制

目錄

  • [源碼解析] 從TimeoutException看Flink的心跳機制
    • 0x00 摘要
    • 0x01 緣由
    • 0x02 背景概念
      • 2.1 四大模塊
      • 2.2 Akka
      • 2.3 RPC機制
        • 2.3.1 RpcEndpoint:RPC的基類
        • RpcService:RPC服務提供者
        • RpcGateway:RPC調用的網關
      • 2.4 常見心跳機制
    • 0x03 Flink心跳機制
      • 3.1 代碼和機制
      • 3.2 靜態架構
        • 3.2.1 HeartbeatTarget :監控目標抽象
        • 3.2.2 HeartbeatMonitor : 管理heartbeat target的心跳狀態
        • 3.2.3 HeartbeatManager :心跳管理者
        • 3.2.4 HearbeatListener 處理心跳結果
      • 3.3 動態運行機制
        • 3.3.1 HearbeatManagerImpl : Receiver
        • 3.3.2 HeartbeatManagerSenderImpl : Sender
        • 3.3.3 HeartbeatMonitorImpl
        • 3.3.3 HeartbeatServices
    • 0x04 初始化
      • 4.1 心跳服務創建
    • 0x05 Flink中具體應用
      • 5.1 總述
        • 5.1.1 RM, JM, TM之間關係
        • 5.1.2 三者間心跳機制
      • 5.2 初始化過程
        • 5.2.1 TaskExecutor初始化
        • 5.2.2 JobMaster的初始化
        • 5.2.3 ResourceManager初始化
      • 5.3 註冊過程
        • 5.3.1 TM註冊到RM中
          • 5.3.1.1 TM的操作
          • 5.3.1.2 RM的操作
          • 5.3.1.3 返回到TM
        • 5.3.2 TM註冊到 JM
      • 5.4 心跳過程
        • 5.4.1 ResourceManager主動發起
          • 5.4.1.1 Sender遍歷所有監控的Monitor(Target)
          • 5.4.1.2 Target進行具體操作
          • 5.4.1.3 RPC調用
        • 5.4.2 RM通過RPC調用TM
        • 5.4.3 TM 通過RPC回到 RM
      • 5.5 超時處理
        • 5.5.1 TaskManager
        • 5.5.2 ResourceManager
    • 0x06 解決問題
    • 0x07 參考

0x00 摘要

本文從一個調試時候常見的異常 “TimeoutException: Heartbeat of TaskManager timed out”切入,為大家剖析Flink的心跳機制。文中代碼基於Flink 1.10。

0x01 緣由

大家如果經常調試Flink,當進入斷點看到了堆棧和變量內容之後,你容易陷入了沉思。當你發現了問題可能所在,高興的讓程序Resume的時候,你發現程序無法運行,有如下提示:

Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 93aa1740-cd2c-4032-b74a-5f256edb3217 timed out.

這實在是很鬱悶的事情。作為程序猿不能忍啊,既然異常提示中有 Heartbeat 字樣,於是我們就來一起看看Flink的心跳機制,看看有沒有可以修改的途徑。

0x02 背景概念

2.1 四大模塊

Flink有核心四大組件:Dispatcher,JobMaster,ResourceManager,TaskExecutor。

  • Dispatcher(Application Master)用於接收client提交的任務和啟動相應的JobManager。其提供REST接口來接收client的application提交,負責啟動JM和提交application,同時運行Web UI。
  • ResourceManager:主要用於資源的申請和分配。當TM有空閑的slot就會告訴JM,沒有足夠的slot也會啟動新的TM。kill掉長時間空閑的TM。
  • JobMaster :功能主要包括(舊版本中JobManager的功能在新版本中以JobMaster形式出現,可能本文中會混淆這兩個詞,請大家諒解):
    • 將JobGraph轉化為ExecutionGraph(physical dataflow graph,并行化)。
    • 向RM申請資源、schedule tasks、保存作業的元數據。
  • TaskManager:類似Spark的executor,會跑多個線程的task、數據緩存與交換。Flink 架構遵循 Master – Slave 架構設計原則,JobMaster 為 Master 節點,TaskManager 為Slave節點。

這四大組件彼此之間的通信需要依賴RPC實現

2.2 Akka

Flink底層RPC基於Akka實現。Akka是一個開發併發、容錯和可伸縮應用的框架。它是Actor Model的一個實現,和Erlang的併發模型很像。在Actor模型中,所有的實體被認為是獨立的actors。actors和其他actors通過發送異步消息通信。

Actor模型的強大來自於異步。它也可以顯式等待響應,這使得可以執行同步操作。但是強烈不建議同步消息,因為它們限制了系統的伸縮性。

2.3 RPC機制

RPC作用是:讓異步調用看起來像同步調用。

Flink基於Akka構建了其底層通信系統,引入了RPC調用,各節點通過GateWay方式回調,隱藏通信組件的細節,實現解耦。Flink整個通信框架的組件主要由RpcEndpoint、RpcService、RpcServer、AkkaInvocationHandler、AkkaRpcActor等構成。

RPC相關的主要接口如下:

  • RpcEndpoint
  • RpcService
  • RpcGateway

2.3.1 RpcEndpoint:RPC的基類

RpcEndpoint是Flink RPC終端的基類,所有提供遠程過程調用的分佈式組件必須擴展RpcEndpoint,其功能由RpcService支持。

RpcEndpoint的子類只有四類組件:Dispatcher,JobMaster,ResourceManager,TaskExecutor,即Flink中只有這四個組件有RPC的能力,換句話說只有這四個組件有RPC的這個需求。

每個RpcEndpoint對應了一個路徑(endpointId和actorSystem共同確定),每個路徑對應一個Actor,其實現了RpcGateway接口,

RpcService:RPC服務提供者

RpcServer是RpcEndpoint的成員變量,為RpcService提供RPC服務/連接遠程Server,其只有一個子類實現:AkkaRpcService(可見目前Flink的通信方式依然是Akka)。

RpcServer用於啟動和連接到RpcEndpoint, 連接到rpc服務器將返回一個RpcGateway,可用於調用遠程過程。

Flink四大組件Dispatcher,JobMaster,ResourceManager,TaskExecutor,都是RpcEndpoint的實現,所以構建四大組件時,同步需要初始化RpcServer。如JobManager的構造方式,第一個參數就是需要知道RpcService。

RpcGateway:RPC調用的網關

Flink的RPC協議通過RpcGateway來定義;由前面可知,若想與遠端Actor通信,則必須提供地址(ip和port),如在Flink-on-Yarn模式下,JobMaster會先啟動ActorSystem,此時TaskExecutor的Container還未分配,後面與TaskExecutor通信時,必須讓其提供對應地址。

Dispatcher,JobMaster,ResourceManager,TaskExecutor 這四大組件通過各種方式實現了Gateway。以JobMaster為例,JobMaster實現JobMasterGateway接口。各組件類的成員變量都有需要通信的其他組件的GateWay實現類,這樣可通過各自的Gateway實現RPC調用。

2.4 常見心跳機制

常見的心跳檢測有兩種:

  • socket 套接字SO_KEEPALIVE本身帶有的心跳機制,定期向對方發送心跳包,對方收到心跳包後會自動回復;
  • 應用自身實現心跳機制,同樣也是使用定期發送請求的方式;

Flink實現的是第二種方案。

0x03 Flink心跳機制

3.1 代碼和機制

Flink的心跳機制代碼在:

Flink-master/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat

四個接口:

HeartbeatListener.java          HeartbeatManager.java      HeartbeatTarget.java  HeartbeatMonitor.java

以及如下幾個類:

HeartbeatManagerImpl.java   HeartbeatManagerSenderImpl.java   HeartbeatMonitorImpl.java
HeartbeatServices.java    NoOpHeartbeatManager.java

Flink集群有多種業務流程,比如Resource Manager, Task Manager, Job Manager。每種業務流程都有自己的心跳機制。Flink的心跳機制只是提供接口和基本功能,具體業務功能由各業務流程自己實現

我們首先設定 心跳系統中有兩種節點:sender和receiver。心跳機制是sender和receivers彼此相互檢測。但是檢測動作是Sender主動發起,即Sender主動發送請求探測receiver是否存活,因為Sender已經發送過來了探測心跳請求,所以這樣receiver同時也知道Sender是存活的,然後Reciver給Sender回應一個心跳錶示自己也是活着的。

因為Flink的幾個名詞和我們常見概念有所差別,所以流程上需要大家仔細甄別,即:

  • Flink Sender 主動發送Request請求給Receiver,要求Receiver回應一個心跳;
  • Flink Receiver 收到Request之後,通過Receive函數回應一個心跳請求給Sender;

3.2 靜態架構

3.2.1 HeartbeatTarget :監控目標抽象

HeartbeatTarget是對監控目標的抽象。心跳機制在行為上而言有兩種動作:

  • 向某個節點發送請求。
  • 處理某個節點發來的請求。

HeartbeatTarget的函數就是這兩個動作:

  • receiveHeartbeat :向某個節點(Sender)發送心跳回應,其參數heartbeatOrigin 就是 Receiver。
  • requestHeartbeat :向某個節點(Receiver)要求其回應一個心跳,其參數requestOrigin 就是 Sender。requestHeartbeat這個函數是Sender的函數,其中Sender通過RPC直接調用到Receiver

這兩個函數的參數也很簡單:分別是請求的發送放和接收方,還有Payload載荷。對於一個確定節點而言,接收的和發送的載荷是同一類型的。

public interface HeartbeatTarget<I> {
	/**
	 * Sends a heartbeat response to the target.
	 * @param heartbeatOrigin Resource ID identifying the machine for which a heartbeat shall be reported.
	 */
  // heartbeatOrigin 就是 Receiver
	void receiveHeartbeat(ResourceID heartbeatOrigin, I heartbeatPayload);

	/**
	 * Requests a heartbeat from the target. 
	 * @param requestOrigin Resource ID identifying the machine issuing the heartbeat request.
	 */
  // requestOrigin 就是 Sender
	void requestHeartbeat(ResourceID requestOrigin, I heartbeatPayload);
}

3.2.2 HeartbeatMonitor : 管理heartbeat target的心跳狀態

對HeartbeatTarget的封裝,這樣Manager對Target的操作是通過對Monitor完成,後續會在其繼承類中詳細說明。

public interface HeartbeatMonitor<O> {
  // Gets heartbeat target.
	HeartbeatTarget<O> getHeartbeatTarget();
	// Gets heartbeat target id.
	ResourceID getHeartbeatTargetId();
	// Report heartbeat from the monitored target.
	void reportHeartbeat();
	//Cancel this monitor.
	void cancel();
  //Gets the last heartbeat.
	long getLastHeartbeat();
}

3.2.3 HeartbeatManager :心跳管理者

HeartbeatManager負責管理心跳機制,比如啟動/停止/報告一個HeartbeatTarget。此接口繼承HeartbeatTarget。

除了HeartbeatTarget的函數之外,這接口有4個函數:

  • monitorTarget,把和某資源對應的節點加入到心跳監控列表;
  • unmonitorTarget,從心跳監控列表刪除某資源對應的節點;
  • stop,停止心跳管理服務,釋放資源;
  • getLastHeartbeatFrom,獲取某節點的最後一次心跳數據。
public interface HeartbeatManager<I, O> extends HeartbeatTarget<I> {
	void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget);
	void unmonitorTarget(ResourceID resourceID);
	void stop();
	long getLastHeartbeatFrom(ResourceID resourceId);
}

3.2.4 HearbeatListener 處理心跳結果

用戶業務邏輯需要繼承這個接口以處理心跳結果。其可以看做服務的輸出,實現了三個回調函數

  • notifyHeartbeatTimeout,處理節點心跳超時
  • reportPayload,處理節點發來的Payload載荷
  • retrievePayLoad。獲取對某節點發下一次心跳請求的Payload載荷
public interface HeartbeatListener<I, O> {
	void notifyHeartbeatTimeout(ResourceID resourceID);
	void reportPayload(ResourceID resourceID, I payload);
	O retrievePayload(ResourceID resourceID);
}

3.3 動態運行機制

之前提到Sender和Receiver,下面兩個類就對應上述概念。

  • HeartbeatManagerImpl :Receiver,存在於JobMaster與TaskExecutor中;
  • HeartbeatManagerSenderImpl :Sender,繼承 HeartbeatManagerImpl類,用於周期發送心跳要求,存在於JobMaster、ResourceManager中。

幾個關鍵問題:

  1. 如何判定心跳超時?

    心跳服務啟動后,Flink在Monitor中通過 ScheduledFuture 會啟動一個線程來處理心跳超時事件。在設定的心跳超時時間到達后才執行線程。

    如果在設定的心跳超時時間內接收到組件的心跳消息,會先將該線程取消而後重新開啟,重置心跳超時事件的觸發。

    如果在設定的心跳超時時間內沒有收到組件的心跳,則會通知組件:你超時了。

  2. 何時”調用雙方”發起心跳檢查?

    心跳檢查是雙向的,一方(Sender)會主動發起心跳請求,而另一方(Receiver)則是對心跳做出響應,兩者通過RPC相互調用,重置對方的 Monitor 超時線程。

    以JobMaster和TaskManager為例,JM在啟動時會開啟周期調度,向已經註冊到JM中的TM發起心跳檢查,通過RPC調用TM的requestHeartbeat方法,重置TM中對JM超時線程的調用,表示當前JM狀態正常。在TM的requestHeartbeat方法被調用后,通過RPC調用JM的receiveHeartbeat,重置 JM 中對TM超時線程的調用,表示TM狀態正常。

  3. 如何處理心跳超時?

    心跳服務依賴 HeartbeatListener,當在timeout時間範圍內未接收到心跳響應,則會觸發超時處理線程,該線程通過調用HeartbeatListener.notifyHeartbeatTimeout方法做後續重連操作或者直接斷開。

下面是一個概要(以RM & TM為例):

  • RM : 實現了ResourceManagerGateway (可以直接被RPC調用)

  • TM : 實現了TaskExecutorGateway (可以直接被RPC調用)

  • RM :有一個Sender HM : taskManagerHeartbeatManager,Sender HM 擁有用戶定義的 TaskManagerHeartbeatListener

  • TM :有一個Receiver HM :resourceManagerHeartbeatManager,Receiver HM 擁有用戶定義的ResourceManagerHeartbeatListener。

  • HeartbeatManager 有一個ConcurrentHashMap<ResourceID, HeartbeatMonitor > heartbeatTargets,這個Map是它監控的所有Target。

  • 對於RM的每一個需要監控的TM, 其生成一個HeartbeatTarget,進而被構造成一個HeartbeatMonitor,放置到ResourceManager.taskManagerHeartbeatManager中。

  • 每一個Target對應的Monitor中,有自己的異步任務ScheduledFuture,這個ScheduledFuture不停的被取消/重新生成。如果在某個期間內沒有被取消,則通知用戶定義的listener出現了timeout。

3.3.1 HearbeatManagerImpl : Receiver

HearbeatManagerImpl是receiver的具體實現。它由 心跳 被發起方(就是Receiver,例如TM) 創建,接收 發起方(就是Sender,例如 JM)的心跳發送請求。心跳超時 會觸發 heartbeatListener.notifyHeartbeatTimeout方法。

注意:被發起方監控線程(Monitor)的開啟是在接收到請求心跳(requestHeartbeat被調用后)以後才觸發的,屬於被動觸發。

HearbeatManagerImpl主要維護了

  • 一個心跳監控列表 map : <ResourceID, HeartbeatMonitor<O>> heartbeatTargets;。這是一個KV關聯。

    key代表要發送心跳組件(例如:TM)的ID,value則是為當前組件創建的觸發心跳超時的線程HeartbeatMonitor,兩者一一對應。

    當一個從所聯繫的machine發過來的心跳被收到時候,對應的monitor的狀態會被更新(重啟一個新ScheduledFuture)。當一個monitor發現了一個 heartbeat timed out,它會通知自己的HeartbeatListener。

  • 一個 ScheduledExecutor mainThreadExecutor 負責heartbeat timeout notifications。

  • heartbeatListener :處理心跳結果。

HearbeatManagerImpl 數據結構如下:

@ThreadSafe
public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {

	/** Heartbeat timeout interval in milli seconds. */
	private final long heartbeatTimeoutIntervalMs;

	/** Resource ID which is used to mark one own's heartbeat signals. */
	private final ResourceID ownResourceID;

	/** Heartbeat listener with which the heartbeat manager has been associated. */
	private final HeartbeatListener<I, O> heartbeatListener;

	/** Executor service used to run heartbeat timeout notifications. */
	private final ScheduledExecutor mainThreadExecutor;

	/** Map containing the heartbeat monitors associated with the respective resource ID. */
	private final ConcurrentHashMap<ResourceID, HeartbeatMonitor<O>> heartbeatTargets;

	/** Running state of the heartbeat manager. */
	protected volatile boolean stopped;  
}

HearbeatManagerImpl實現的主要函數有:

  • monitorTarget :把一個節點加入到心跳監控列表。
    • 傳入參數有:ResourceId和HearbeatTarget,monitorTarget根據這兩個參數,生成一個HeartbeatMonitor對象,然後把這個對象跟ResrouceId做kv關聯,存入到heartbeatTargets。 一個節點可能參与多個業務流程,因此一個節點參与多個心跳流程,一個節點上運行多個不同類型的HearbeatTarget。所以一個ResourceID可能會跟不同類型的HearbeatTarget對象關聯,分別加入到多個HeartbeatManager,進行不同類型的心跳監控。也因此這個函數入參是兩個參數。
  • requestHeartbeat :Sender通過RPC異步調用到Receiver的這個函數 以要求receiver向requestOrigin節點(就是Sender)發起一次心跳響應,載荷是heartbeatPayLoad。其內部流程如下:
    • 首先會調用reportHeartbeat函數,作用是 通過Monitor 記錄發起請求的這個時間點,然後創建一個ScheduleFuture。如果到期后,requestOrigin沒有作出響應,那麼就將requestOrigin節點對應的HeartbeatMonitor的state設置成TIMEOUT狀態,如果到期內requestOrigin響應了,ScheduleFuture會被取消,HeartbeatMonitor的state仍然是RUNNING。
    • 其次調用reportPayload函數,把requestOrigin節點的最新的heartbeatPayload通知給heartbeatListener。heartbeatListener是外部傳入的,它根據所有節點的心跳記錄做監聽管理。
    • 最後調用receiveHearbeat函數,響應一個心跳給Sender。

3.3.2 HeartbeatManagerSenderImpl : Sender

繼承HearbeatManagerImpl,由心跳管理的一方(例如JM)創建,實現了run函數(即它可以作為一個單獨線程運行),創建后立即開啟周期調度線程,每次遍歷自己管理的heartbeatTarget,觸發heartbeatTarget.requestHeartbeat,要求 Target 返回一個心跳響應。屬於主動觸發心跳請求。

public class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I, O> implements Runnable {
	public void run() {
		if (!stopped) {
			for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {
				requestHeartbeat(heartbeatMonitor);
			}
      // 周期調度
			getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
		}
	}

  //  主動發起心跳檢查
	private void requestHeartbeat(HeartbeatMonitor<O> heartbeatMonitor) {
		O payload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId());
		final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();
    // 調用 Target 的 requestHeartbeat 函數
		heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload);
	}  
}

3.3.3 HeartbeatMonitorImpl

Heartbeat monitor管理心跳目標,它啟動一個ScheduledExecutor。

  • 如果在timeout時間內沒有接收到心跳信號,則判定心跳超時,通知給HeartbeatListener。
  • 如果在timeout時間內接收到心跳信號,則重置當前ScheduledExecutor。
public class HeartbeatMonitorImpl<O> implements HeartbeatMonitor<O>, Runnable {

	/** Resource ID of the monitored heartbeat target. */
	private final ResourceID resourceID;   // 被監控的resource ID

	/** Associated heartbeat target. */
	private final HeartbeatTarget<O> heartbeatTarget; //心跳目標

	private final ScheduledExecutor scheduledExecutor;

	/** Listener which is notified about heartbeat timeouts. */
	private final HeartbeatListener<?, ?> heartbeatListener; // 心跳監聽器

	/** Maximum heartbeat timeout interval. */
	private final long heartbeatTimeoutIntervalMs;

	private volatile ScheduledFuture<?> futureTimeout;
  //  AtomicReference  使用 
	private final AtomicReference<State> state = new AtomicReference<>(State.RUNNING);
   //  最近一次接收到心跳的時間
  private volatile long lastHeartbeat;
  
	// 報告心跳
	public void reportHeartbeat() {
    // 保留最近一次接收心跳時間
		lastHeartbeat = System.currentTimeMillis();
    // 接收心跳后,重置timeout線程
		resetHeartbeatTimeout(heartbeatTimeoutIntervalMs);
	}

  // 心跳超時,觸發lister的notifyHeartbeatTimeout
	public void run() {
		// The heartbeat has timed out if we're in state running
		if (state.compareAndSet(State.RUNNING, State.TIMEOUT)) {
			heartbeatListener.notifyHeartbeatTimeout(resourceID);
		}
	}

  //  重置TIMEOUT
	void resetHeartbeatTimeout(long heartbeatTimeout) {
		if (state.get() == State.RUNNING) {
      //先取消線程,在重新開啟
			cancelTimeout();
      // 啟動超時線程
			futureTimeout = scheduledExecutor.schedule(this, heartbeatTimeout, TimeUnit.MILLISECONDS);

			// Double check for concurrent accesses (e.g. a firing of the scheduled future)
			if (state.get() != State.RUNNING) {
				cancelTimeout();
			}
		}
	}

3.3.3 HeartbeatServices

建立heartbeat receivers and heartbeat senders,主要是對外提供服務。這裏我們可以看到:

  • HeartbeatManagerImpl就是receivers。
  • HeartbeatManagerSenderImpl就是senders。
public class HeartbeatServices {
	// Creates a heartbeat manager which does not actively send heartbeats.
	public <I, O> HeartbeatManager<I, O> createHeartbeatManager(...) {
		return new HeartbeatManagerImpl<>(...);
	}
	// Creates a heartbeat manager which actively sends heartbeats to monitoring targets.
	public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(...) {
		return new HeartbeatManagerSenderImpl<>(...);
	}
}

0x04 初始化

4.1 心跳服務創建

心跳管理服務在Cluster入口創建。因為我們是調試,所以在MiniCluster.start調用。

public void start() throws Exception {
				......
				heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
				......
}

HeartbeatServices.fromConfiguration會從Configuration中獲取配置信息:

  • 心跳間隔 heartbeat.interval
  • 心跳超時時間 heartbeat.timeout

個就是我們解決最開始問題的思路:從配置信息入手,擴大心跳間隔

public HeartbeatServices(long heartbeatInterval, long heartbeatTimeout) {
		this.heartbeatInterval = heartbeatInterval;
		this.heartbeatTimeout = heartbeatTimeout;
}

public static HeartbeatServices fromConfiguration(Configuration configuration) {
		long heartbeatInterval = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL);
		long heartbeatTimeout = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT);

		return new HeartbeatServices(heartbeatInterval, heartbeatTimeout);
}

0x05 Flink中具體應用

5.1 總述

5.1.1 RM, JM, TM之間關係

系統中有幾個ResourceManager?整個 Flink 集群中只有一個 ResourceManager。

系統中有幾個JobManager?JobManager 負責管理作業的執行。默認情況下,每個 Flink 集群只有一個 JobManager 實例。JobManager 相當於整個集群的 Master 節點,負責整個集群的任務管理和資源管理。

系統中有幾個TaskManager?這個由具體啟動方式決定。比如Flink on Yarn,Session模式能夠指定拉起多少個TaskManager。 Per job模式中TaskManager數量是在提交作業時根據併發度動態計算,即Number of TM = Parallelism/numberOfTaskSlots。比如:有一個作業,Parallelism為10,numberOfTaskSlots為1,則TaskManager為10。

5.1.2 三者間心跳機制

Flink中ResourceManager、JobMaster、TaskExecutor三者之間存在相互檢測的心跳機制:

  • ResourceManager會主動發送請求探測JobMaster、TaskExecutor是否存活。
  • JobMaster也會主動發送請求探測TaskExecutor是否存活,以便進行任務重啟或者失敗處理。

我們之前講過,HeartbeatManagerSenderImpl屬於Sender,HeartbeatManagerImpl屬於Receiver。

  1. HeartbeatManagerImpl所處位置可以理解為client,存在於JobMaster與TaskExecutor中;
  2. HeartbeatManagerSenderImpl類,繼承 HeartbeatManagerImpl類,用於周期發送心跳請求,所處位置可以理解為server, 存在於JobMaster、ResourceManager中。

ResourceManager 級別最高,所以兩個HM都是Sender,監控taskManager和jobManager

public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
		extends FencedRpcEndpoint<ResourceManagerId>
		implements ResourceManagerGateway, LeaderContender {
	taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender
	jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender
}

JobMaster級別中等,一個Sender, 一個Receiver,受到ResourceManager的監控,監控taskManager。

public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService {
	taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender
	resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager
}

TaskExecutor級別最低,兩個Receiver,分別被JM和RM疾控。

public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
	this.jobManagerHeartbeatManager = return heartbeatServices.createHeartbeatManager
	this.resourceManagerHeartbeatManager = return heartbeatServices.createHeartbeatManager
}

以JobManager和TaskManager為例。JM在啟動時會開啟周期調度,向已經註冊到JM中的TM發起心跳檢查,通過RPC調用TM的requestHeartbeat方法,重置對JM超時線程的調用,表示當前JM狀態正常。在TM的requestHeartbeat方法被調用后,通過RPC調用JM的receiveHeartbeat,重置對TM超時線程的調用,表示TM狀態正常。

5.2 初始化過程

5.2.1 TaskExecutor初始化

TM初始化生成了兩個Receiver HM。

public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
	/** The heartbeat manager for job manager in the task manager. */
	private final HeartbeatManager<AllocatedSlotReport, AccumulatorReport> jobManagerHeartbeatManager;

	/** The heartbeat manager for resource manager in the task manager. */
	private final HeartbeatManager<Void, TaskExecutorHeartbeatPayload> resourceManagerHeartbeatManager;
  
  //初始化函數
	this.jobManagerHeartbeatManager = createJobManagerHeartbeatManager(heartbeatServices, resourceId);
	this.resourceManagerHeartbeatManager = createResourceManagerHeartbeatManager(heartbeatServices, resourceId);
}

生成HeartbeatManager時,就註冊了ResourceManagerHeartbeatListener和JobManagerHeartbeatListener。

此時,兩個HeartbeatManagerImpl中已經創建好對應monitor線程,只有在JM或者RM執行requestHeartbeat后,才會觸發該線程的執行。

5.2.2 JobMaster的初始化

JM生成了一個Sender HM,一個Receiver HM。這裡會註冊 TaskManagerHeartbeatListener 和 ResourceManagerHeartbeatListener

public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService {
  private HeartbeatManager<AccumulatorReport, AllocatedSlotReport> taskManagerHeartbeatManager;
  private HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;

  private void startHeartbeatServices() {
      taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
        resourceId,
        new TaskManagerHeartbeatListener(),
        getMainThreadExecutor(),
        log);

      resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager(
        resourceId,
        new ResourceManagerHeartbeatListener(),
        getMainThreadExecutor(),
        log);
  }
}

5.2.3 ResourceManager初始化

JobMaster在啟動時候,會在startHeartbeatServices函數中生成兩個Sender HeartbeatManager。

taskManagerHeartbeatManager :HeartbeatManagerSenderImpl對象,會反覆啟動一個定時器,定時掃描需要探測的對象並且發送心跳請求。

jobManagerHeartbeatManager :HeartbeatManagerSenderImpl,會反覆啟動一個定時器,定時掃描需要探測的對象並且發送心跳請求。

taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
			resourceId,
			new TaskManagerHeartbeatListener(),
			getMainThreadExecutor(),
			log);

jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
			resourceId,
			new JobManagerHeartbeatListener(),
			getMainThreadExecutor(),
			log);  

5.3 註冊過程

我們以TM與RM交互為例。TaskExecutor啟動之後,需要註冊到RM和JM中。

流程圖如下:

 * 1. Run in Task Manager
 *
 *    TaskExecutor.onStart //Life cycle
 *        |
 *        +----> startTaskExecutorServices@TaskExecutor
 *        |     //開始TM服務
 *        |     
 *        +----> resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
 *        |     // 開始連接到RM
 *        |     // start by connecting to the ResourceManager
 *        |    
 *        +----> notifyLeaderAddress@ResourceManagerLeaderListener  
 *        |     // 當RM狀態變化之後,將回調到這裏    
 *        |     // The listener for leader changes of the resource manager.
 *        |        
 *        +----> reconnectToResourceManager@TaskExecutor   
 *        |     // 以下三步調用是漸進的,就是與RM聯繫。
 *        |    
 *        +----> tryConnectToResourceManager@TaskExecutor 
 *        |     
 *        +----> connectToResourceManager()@TaskExecutor
 *        |     // 主要作用是生成了 TaskExecutorToResourceManagerConnection    
 *        |  
 *        +----> start@TaskExecutorToResourceManagerConnection
 *        |     // 開始RPC調用,將會調用到其基類RegisteredRpcConnection的start   
 *        |   
 *        +----> start@RegisteredRpcConnection
 *        |     // RegisteredRpcConnection實現了組件之間註冊聯繫的基本RPC
 *        |      
   
   
 * ~~~~~~~~ 這裡是 Akka RPC
   
 * 2. Run in Resource Manager   
 * 現在程序執行序列到達了RM, 主要是添加一個Target到RM 的 Sender HM;
 *
 *    registerTaskExecutor@ResourceManager
 *        |
 *        +----> taskExecutorGatewayFuture.handleAsync
 *        |     // 異步調用到這裏
 *        |     
 *        +----> registerTaskExecutorInternal@ResourceManager
 *        |     // RM的內部實現,將把TM註冊到RM自己這裏
 *        |      
 *        +----> taskManagerHeartbeatManager.monitorTarget
 *        |     // 生成HeartbeatMonitor,
 *        |      
 *        +---->  heartbeatTargets.put(resourceID,heartbeatMonitor);
 *        |     // 把Monitor放到 HM in TM之中,就是說TM開始監控了RM
 *        |        

 * ~~~~~~~~ 這裡是 Akka RPC
  
 * 3. Run in Task Manager
 * 現在程序回到了TM, 主要是添加一個Target到 TM 的 Receiver HM;
 *
 *    onRegistrationSuccess@TaskExecutorToResourceManagerConnection
 *        |   
 *        |   
 *        +---->  onRegistrationSuccess@ResourceManagerRegistrationListener 
 *        |       // 回調函數
 *        |  
 *        +---->  runAsync(establishResourceManagerConnection) 
 *        |       // 異步執行
 *        |  
 *        +---->  establishResourceManagerConnection@TaskExecutor 
 *        |       // 說明已經和RM建立了聯繫,所以可以開始監控RM了
 *        |  
 *        +---->  resourceManagerHeartbeatManager.monitorTarget 
 *        |     // 生成HeartbeatMonitor,
 *        |      
 *        +---->  heartbeatTargets.put(resourceID,heartbeatMonitor);  
 *        |       // 把 RM 也註冊到 TM了
 *        |       // monitor the resource manager as heartbeat target 

下面是具體文字描述。

5.3.1 TM註冊到RM中

5.3.1.1 TM的操作
  • TaskExecutor啟動之後,調用onStart,開始其生命周期。
  • onStart直接調用startTaskExecutorServices。
  • 啟動服務的第一步就是與ResourceManager取得聯繫,這裏註冊了一個ResourceManagerLeaderListener(),用來監聽RM Leader的變化。
private final LeaderRetrievalService resourceManagerLeaderRetriever;
// resourceManagerLeaderRetriever其實是EmbeddedLeaderService的實現,A simple leader election service, which selects a leader among contenders and notifies listeners.

resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
  • 當得到RM Leader的地址之後,會調用到回調函數notifyLeaderAddress@ResourceManagerLeaderListener,然後調用notifyOfNewResourceManagerLeader。
  • notifyOfNewResourceManagerLeader中獲取到RM地址后,就通過reconnectToResourceManager與RM聯繫。
  • reconnectToResourceManager中間接調用到TaskExecutorToResourceManagerConnection。其作用是建立TaskExecutor 和 ResourceManager之間的聯繫。因為知道 ResourceManagerGateway所以才能進行RPC操作。
  • 然後在 TaskExecutorToResourceManagerConnection中,就通過RPC與RM聯繫。
5.3.1.2 RM的操作
  • RPC調用后,程序就來到了RM中,RM做如下操作:
  • 會註冊一個新的TaskExecutor到自己的taskManagerHeartbeatManager中。
  • registerTaskExecutor@ResourceManager會通過異步調用到registerTaskExecutorInternal。
  • registerTaskExecutorInternal中首先看看是否這個TaskExecutor的ResourceID之前註冊過,如果註冊過就移除再添加一個新的TaskExecutor。
  • 通過 taskManagerHeartbeatManager.monitorTarget 開始進行心跳機制的註冊。
taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() {
				public void receiveHeartbeat(ResourceID resourceID, Void payload) {
					// the ResourceManager will always send heartbeat requests to the
					// TaskManager
				}
				public void requestHeartbeat(ResourceID resourceID, Void payload) {
					taskExecutorGateway.heartbeatFromResourceManager(resourceID);
				}
});

當註冊完成后,RM中的Sender HM內部結構如下,能看出來多了一個Target:

taskManagerHeartbeatManager = {HeartbeatManagerSenderImpl@8866} 
 heartbeatPeriod = 10000
 heartbeatTimeoutIntervalMs = 50000
 ownResourceID = {ResourceID@8871} "040709f36ebf38f309fed518a88946af"
 heartbeatListener = {ResourceManager$TaskManagerHeartbeatListener@8872} 
 mainThreadExecutor = {RpcEndpoint$MainThreadExecutor@8873} 
 heartbeatTargets = {ConcurrentHashMap@8875}  size = 1
  {ResourceID@8867} "630c15c9-4861-4b41-9c95-92504f458b71" -> {HeartbeatMonitorImpl@9448} 
   key = {ResourceID@8867} "630c15c9-4861-4b41-9c95-92504f458b71"
   value = {HeartbeatMonitorImpl@9448} 
    resourceID = {ResourceID@8867} "630c15c9-4861-4b41-9c95-92504f458b71"
    heartbeatTarget = {ResourceManager$2@8868} 
    scheduledExecutor = {RpcEndpoint$MainThreadExecutor@8873} 
    heartbeatListener = {ResourceManager$TaskManagerHeartbeatListener@8872} 
    heartbeatTimeoutIntervalMs = 50000
    futureTimeout = {ScheduledFutureAdapter@10140} 
    state = {AtomicReference@9786} "RUNNING"
    lastHeartbeat = 0
5.3.1.3 返回到TM

RM會通過RPC再次回到TaskExecutor,其新執行序列如下:

  • 首先RPC調用到了 onRegistrationSuccess@TaskExecutorToResourceManagerConnection。
  • 然後onRegistrationSuccess@ResourceManagerRegistrationListener中通過異步執行調用到了establishResourceManagerConnection。這說明TM已經和RM建立了聯繫,所以可以開始監控RM了。
  • 然後和RM操作類似,通過resourceManagerHeartbeatManager.monitorTarget 來把RM註冊到自己這裏。
HeartbeatMonitor<O> heartbeatMonitor = heartbeatMonitorFactory.createHeartbeatMonitor 
heartbeatTargets.put(resourceID, heartbeatMonitor);  

當註冊完成后,其Receiver HM結構如下:

resourceManagerHeartbeatManager = {HeartbeatManagerImpl@10163} 
 heartbeatTimeoutIntervalMs = 50000
 ownResourceID = {ResourceID@8882} "96a9b80c-dd97-4b63-9049-afb6662ea3e2"
 heartbeatListener = {TaskExecutor$ResourceManagerHeartbeatListener@10425} 
 mainThreadExecutor = {RpcEndpoint$MainThreadExecutor@10426} 
 heartbeatTargets = {ConcurrentHashMap@10427}  size = 1
  {ResourceID@8886} "122fa66685133b11ea26ee1b1a6cef75" -> {HeartbeatMonitorImpl@10666} 
   key = {ResourceID@8886} "122fa66685133b11ea26ee1b1a6cef75"
   value = {HeartbeatMonitorImpl@10666} 
    resourceID = {ResourceID@8886} "122fa66685133b11ea26ee1b1a6cef75"
    heartbeatTarget = {TaskExecutor$1@10668} 
    scheduledExecutor = {RpcEndpoint$MainThreadExecutor@10426} 
    heartbeatListener = {TaskExecutor$ResourceManagerHeartbeatListener@10425} 
    heartbeatTimeoutIntervalMs = 50000
    futureTimeout = {ScheduledFutureAdapter@10992} 
    state = {AtomicReference@10667} "RUNNING"
    lastHeartbeat = 0

5.3.2 TM註冊到 JM

其調用基本思路與之前相同,就是TM和JM之間互相註冊一個代表對方的monitor:

JobLeaderListenerImpl ----> establishJobManagerConnection

消息到了JM中,做如下操作。

registerTaskManager ----> taskManagerHeartbeatManager.monitorTarget
  // monitor the task manager as heartbeat target

5.4 心跳過程

在任務提交之後,我們就進入了正常的心跳監控流程。我們依然用 TM 和 RM進行演示。

我們先給出一個流程圖。

 * 1. Run in Resouce Manager
 *
 *    HeartbeatManagerSender in RM
 *        |
 *        +----> run@HeartbeatManagerSenderImpl
 *        |     //遍歷所有監控的Monitor(Target),逐一在Target上調用requestHeartbeat
 *        |     
 *        +----> requestHeartbeat@HeartbeatManagerSenderImpl 
 *        |     // 將調用具體監控對象的自定義函數
 *        |     // heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload);
 *        |      
 *        +----> getHeartbeatListener().retrievePayload   
 *        |     // 調用到TaskManagerHeartbeatListener@ResourceManager    
 *        |     // 這裡是return null;,因為RM不會是任何人的Receiver
 *        |        
 *        +----> requestHeartbeat@HeartbeatTarget   
 *        |     // 調用到Target這裏,代碼在ResourceManager這裏,就是生成Target時候賦值的
 *        |    
 *        +----> taskExecutorGateway.heartbeatFromResourceManager
 *        |     // 會通過gateway RPC 調用到TM,這就是主動對TM發起了心跳請求
 *        |      

 * ~~~~~~~~ 這裡是 Akka RPC
   
 * 2. Run in Task Manager   
 * 現在程序執行序列到達了TM, 主要是 1. 重置TM的Monitor線程; 2.返回一些負載信息;
 *
 *    heartbeatFromResourceManager@TaskExecutor
 *        |
 *        +----> resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
 *        |     //開始要調用到 Receiver HM in Task Manager
 *        |     
 *        +----> requestHeartbeat@HeartbeatManager in TM 
 *        |     // 在Receiver HM in Task Manager 這裏運行
 *        |      
 *        +----> reportHeartbeat@HeartbeatMonitor   
 *        |     //reportHeartbeat : 記錄發起請求的這個時間點,然後resetHeartbeatTimeout
 *        |      
 *        +----> resetHeartbeatTimeout@HeartbeatMonitor
 *        |     // 如果Monitor狀態依然是RUNNING,則取消之前設置的ScheduledFuture。
 *        |     // 重新創建一個ScheduleFuture。因為如果不取消,則之前那個ScheduleFuture運行時
 *        |     // 會調用HeartbeatMonitorImpl.run函數,run直接compareAndSet后,通知目標函數
 *        |     // 目前已經超時,即調用heartbeatListener.notifyHeartbeatTimeout。
 *        |     // 這裏代表 JM 狀態正常。   
 *        |      
 *        +---->  heartbeatListener.reportPayload
 *        |     // 把Target節點的最新的heartbeatPayload通知給heartbeatListener。 
 *        |     // heartbeatListerner是外部傳入的,它根據所擁有的節點的心跳記錄做監聽管理。
 *        |        
 *        +---->  heartbeatTarget.receiveHeartbeat(getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin));
 *        |      
 *        |     
 *        +---->  retrievePayload@ResourceManagerHeartbeatListener in TM   
 *        |       // heartbeatTarget.receiveHeartbeat參數調用的
 *        |     
 *        +---->  return new TaskExecutorHeartbeatPayload
 *        |   
 *        |     
 *        +---->  receiveHeartbeat in TM 
 *        |       // 回到 heartbeatTarget.receiveHeartbeat,這就是TM生成Target的時候的自定義函數
 *        |       // 就是響應一個心跳消息回給RM
 *        |     
 *        +----> resourceManagerGateway.heartbeatFromTaskManager  
 *        |     // 會通過gateway RPC 調用到 ResourcManager
 *        |      

 * ~~~~~~~~ 這裡是 Akka RPC
  
 * 3. Run in Resouce Manager
 * 現在程序回到了RM, 主要是 1.重置RM的Monitor線程;2. 上報收到TaskExecutor的負載信息  
 *
 *    heartbeatFromTaskManager in RM
 *        |   
 *        |   
 *        +---->  taskManagerHeartbeatManager.receiveHeartbeat 
 *        |       // 這是個Sender HM
 *        |  
 *        +---->  HeartbeatManagerImpl.receiveHeartbeat  
 *        |  
 *        |     
 *        +---->  HeartbeatManagerImpl.reportHeartbeat(heartbeatOrigin);
 *        | 
 *        |    
 *        +---->  heartbeatMonitor.reportHeartbeat(); 
 *        |      // 這裏就是重置RM 這裏對應的Monitor。在reportHeartbeat重置 JM monitor線程的觸發,即cancelTimeout取消註冊時候的超時定時任務,並且註冊下一個超時檢測futureTimeout;這代表TM正常執行。
 *        |     
 *        +----> heartbeatListener.reportPayload    
 *        |      //把Target節點的最新的heartbeatPayload通知給 TaskManagerHeartbeatListener。heartbeatListerner是外部傳入的,它根據所擁有的節點的心跳記錄做監聽管理。 
 *        |      
 *        +----> slotManager.reportSlotStatus(instanceId, payload.getSlotReport());
 *        |     // TaskManagerHeartbeatListener中調用,上報收到TaskExecutor的負載信息
 *        |        

下面是具體文字描述。

5.4.1 ResourceManager主動發起

5.4.1.1 Sender遍歷所有監控的Monitor(Target)

心跳機制是由Sender主動發起的。這裏就是 ResourceManager 的HeartbeatManagerSenderImpl中定時schedual調用,這裡會遍歷所有監控的Monitor(Target),逐一在Target上調用requestHeartbeat。

// HeartbeatManagerSenderImpl中的代碼

	@Override
	public void run() {
		if (!stopped) {
			for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {
        // 這裏向被監控對象節點發起一次心跳請求,載荷是heartbeatPayLoad,要求被監控對象回應心跳
				requestHeartbeat(heartbeatMonitor);
			}
			getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
		}
	}
}

// 運行時候的變量
this = {HeartbeatManagerSenderImpl@9037} 
 heartbeatPeriod = 10000
 heartbeatTimeoutIntervalMs = 50000
 ownResourceID = {ResourceID@8788} "d349506cae32cadbe99b9f9c49a01c95"
 heartbeatListener = {ResourceManager$TaskManagerHeartbeatListener@8789} 
 mainThreadExecutor = {RpcEndpoint$MainThreadExecutor@8790} 

// 調用棧如下
requestHeartbeat:711, ResourceManager$2 (org.apache.flink.runtime.resourcemanager)
requestHeartbeat:702, ResourceManager$2 (org.apache.flink.runtime.resourcemanager)
requestHeartbeat:92, HeartbeatManagerSenderImpl (org.apache.flink.runtime.heartbeat)
run:81, HeartbeatManagerSenderImpl (org.apache.flink.runtime.heartbeat)
call:511, Executors$RunnableAdapter (java.util.concurrent)
run$$$capture:266, FutureTask (java.util.concurrent)
run:-1, FutureTask (java.util.concurrent)
5.4.1.2 Target進行具體操作

具體監控對象 Target 會調用自定義的requestHeartbeat。

HeartbeatManagerSenderImpl

	private void requestHeartbeat(HeartbeatMonitor<O> heartbeatMonitor) {
		O payload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId());
		final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();

    // 這裏就是具體監控對象
		heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload);
	}

heartbeatTarget = {ResourceManager$2@10688} 
 taskExecutorGateway = {$Proxy42@9459} "org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler@6d0c8334"
 this$0 = {StandaloneResourceManager@9458} 

請注意,每一個Target都是由ResourceManager生成的。ResourceManager之前註冊成為Monitor時候就註冊了這個HeartbeatTarget。

這個HeartbeatTarget的定義如下,兩個函數是:

  • receiveHeartbeat :這個是空,因為RM沒有自己的Sender。

  • requestHeartbeat :這個針對TM,就是調用TM的heartbeatFromResourceManager,當然是通過RPC調用。

5.4.1.3 RPC調用

會調用到ResourceManager定義的函數requestHeartbeat,而requestHeartbeat會通過gateway調用到TM,這就是主動對TM發起了心跳請求。

taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() {
   @Override
   public void receiveHeartbeat(ResourceID resourceID, Void payload) {
      // the ResourceManager will always send heartbeat requests to the TaskManager
   }

   @Override
   public void requestHeartbeat(ResourceID resourceID, Void payload) {
      //就是調用到這裏
      taskExecutorGateway.heartbeatFromResourceManager(resourceID); 
   }
});

5.4.2 RM通過RPC調用TM

通過taskExecutorGateway。心跳程序執行就通過RPC從RM跳躍到了TM。

taskExecutorGateway.heartbeatFromResourceManager 的意義就是:通過RPC調用回到TaskExecutor。這個是在TaskExecutorGateway就定義好的。

// TaskExecutor RPC gateway interface.
public interface TaskExecutorGateway extends RpcGateway

TaskExecutor實現了TaskExecutorGateway,所以具體在TaskExecutor內部實現了接口函數。

@Override
public void heartbeatFromResourceManager(ResourceID resourceID) {
    //調用到了這裏 ...........
		resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
}

TM中,resourceManagerHeartbeatManager 定義如下。

/** The heartbeat manager for resource manager in the task manager. */
private final HeartbeatManager<Void, TaskExecutorHeartbeatPayload> resourceManagerHeartbeatManager;

所以下面就是執行TM中的Receiver HM。在這個過程中有兩個處理步驟:

  1. 調用對應HeartbeatMonitor的reportHeartbeat方法,cancelTimeout取消註冊時候的超時定時任務,並且註冊下一個超時檢測futureTimeout;
  2. 調用monitorTarget的receiveHeartbeat方法,也就是會通過rpc調用JobMaster的heartbeatFromTaskManager方法返回一些負載信息;

具體是調用 requestHeartbeat@HeartbeatManager。在其中會

  • 調用reportHeartbeat@HeartbeatMonitor,記錄發起請求的這個時間點,然後resetHeartbeatTimeout。
  • 在resetHeartbeatTimeout@HeartbeatMonitor之中,如果Monitor狀態依然是RUNNING,則取消之前設置的ScheduledFuture。重新創建一個ScheduleFuture。因為如果不取消,則之前那個ScheduleFuture運行時會調用HeartbeatMonitorImpl.run函數,run直接compareAndSet后,通知目標函數目前已經超時,即調用heartbeatListener.notifyHeartbeatTimeout。
  • 調用 heartbeatListener.reportPayload,把Target節點的最新的heartbeatPayload通知給heartbeatListener。
  • 調用 heartbeatTarget.receiveHeartbeat(getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin)); 就是響應一個心跳消息回給RM。
	@Override
	public void requestHeartbeat(final ResourceID requestOrigin, I heartbeatPayload) {
		if (!stopped) {
			log.debug("Received heartbeat request from {}.", requestOrigin);

			final HeartbeatTarget<O> heartbeatTarget = reportHeartbeat(requestOrigin);

			if (heartbeatTarget != null) {
				if (heartbeatPayload != null) {
					heartbeatListener.reportPayload(requestOrigin, heartbeatPayload);
				}

				heartbeatTarget.receiveHeartbeat(getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin));
			}
		}
	}

最後會通過resourceManagerGateway.heartbeatFromTaskManager 調用到 ResourcManager。

5.4.3 TM 通過RPC回到 RM

JobMaster在接收到rpc請求后調用其heartbeatFromTaskManager方法,會調用taskManagerHeartbeatManager的receiveHeartbeat方法,在這個過程中同樣有兩個處理步驟:

  1. 調用對應HeartbeatMonitor的reportHeartbeat方法,cancelTimeout取消註冊時候的超時定時任務,並且註冊下一個超時檢測futureTimeout;
  2. 調用TaskManagerHeartbeatListener的reportPayload方法,上報收到TaskExecutor的負載信息

至此一次完成心跳過程已經完成,會根據heartbeatInterval執行下一次心跳。

5.5 超時處理

5.5.1 TaskManager

首先,在HeartbeatMonitorImpl中,如果超時,會調用Listener。

public void run() {
   // The heartbeat has timed out if we're in state running
   if (state.compareAndSet(State.RUNNING, State.TIMEOUT)) {
      heartbeatListener.notifyHeartbeatTimeout(resourceID);
   }
}

這就來到了ResourceManagerHeartbeatListener,會嘗試再次連接RM。

private class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, TaskExecutorHeartbeatPayload> {

   @Override
   public void notifyHeartbeatTimeout(final ResourceID resourceId) {
      validateRunsInMainThread();
      // first check whether the timeout is still valid
      if (establishedResourceManagerConnection != null && establishedResourceManagerConnection.getResourceManagerResourceId().equals(resourceId)) {
         reconnectToResourceManager(new TaskManagerException(
            String.format("The heartbeat of ResourceManager with id %s timed out.", resourceId)));
      } else {
         .....
      }
   }

5.5.2 ResourceManager

RM就直接簡單粗暴,關閉連接。

private class TaskManagerHeartbeatListener implements HeartbeatListener<TaskExecutorHeartbeatPayload, Void> {

   @Override
   public void notifyHeartbeatTimeout(final ResourceID resourceID) {
      validateRunsInMainThread();
      closeTaskManagerConnection(
         resourceID,
         new TimeoutException("The heartbeat of TaskManager with id " + resourceID + "  timed out."));
   }
}  

0x06 解決問題

心跳機制我們講解完了,但是我們最初提到的異常應該如何解決呢?在程序最開始生成環境變量時候,通過設置環境變量的配置即可搞定:

Configuration conf = new Configuration();
conf.setString("heartbeat.timeout", "18000000");
final LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);

0x07 參考

[flink-001]flink的心跳機制

Flink中心跳機制

flink1.8 心跳服務

你有必要了解一下Flink底層RPC使用的框架和原理

flink RPC(akka)

弄清Flink1.8的遠程過程調用(RPC)

Apache Flink源碼解析 (七)Flink RPC的底層實現

flink源碼閱讀第一篇—入口

flink-on-yarn 基礎架構和啟動流程

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※教你寫出一流的銷售文案?

【asp.net core 系列】13 Identity 身份驗證入門

0. 前言

通過前兩篇我們實現了如何在Service層如何訪問數據,以及如何運用簡單的加密算法對數據加密。這一篇我們將探索如何實現asp.net core的身份驗證。

1. 身份驗證

asp.net core的身份驗證有 JwtBearer和Cookie兩種常見的模式,在這一篇我們將啟用Cookie作為身份信息的保存。那麼,我們如何啟用呢?

在Startup.cs 的ConfigureServices(IServiceCollection services) 方法里添加如下:

services.AddAuthentication(CookieAuthenticationDefaults.AuthenticationScheme)
                .AddCookie(CookieAuthenticationDefaults.AuthenticationScheme, options =>
                {
                    Configuration.Bind("CookieSettings",options);
                });

此時可以啟動一個權限驗證,當用戶訪問需要驗證的頁面或接口時,如果沒有登錄,則會自動跳轉到:

https://localhost:5001/Account/Login?ReturnUrl=XXXX

其中ReturnUrl指向來源頁。

1.1 設置驗證

當我們在Startup類里設置啟用了身份驗證后,並不是訪問所有接口都會被跳轉到登錄頁面。那麼如何設置訪問的路徑需要身份驗證呢?asp.net core為我們提供了一個特性類:

[AttributeUsage(AttributeTargets.Class | AttributeTargets.Method, AllowMultiple = true, Inherited = true)]
public class AuthorizeAttribute : Attribute, IAuthorizeData
{
    public string Policy { get; set; }
    public string Roles { get; set; }
    public string AuthenticationSchemes { get; set; }
}

可以看的出,這個特性類允許設置在類、方法上,可以設置多個,允許子類繼承父類的特性。所以可以在控制器上設置[Authorize],當在控制器上設置以後訪問控制器里所有的Action都會要求驗證身份;也可以單獨設置在Action上,表示該Action需要驗證身份,控制器里的其他方法不需要驗證。

1.2 設置忽略

我們在開發過程中,會遇到這樣的一組鏈接或者頁面:請求地址同屬於一個控制器下,但其中某個地址可以不用用戶登錄就可以訪問。通常我們為了減少重複代碼以及復用性等方面的考慮,會直接在控制器上設置身份驗證要求,而不是在控制器里所有的Action上添加驗證要求。

那麼,我們如何放開其中的某個請求,可以允許它不用身份驗證。

[AttributeUsage(AttributeTargets.Class | AttributeTargets.Method, AllowMultiple = false, Inherited = true)]
public class AllowAnonymousAttribute : Attribute, IAllowAnonymous
{
}

仔細觀察,可以看得出這個特性可以設置在類、方法上,不允許多次設置,允許子類繼承父類的特性。

這個特性的使用沒啥可說的,不過需要注意的是,不要與AuthorizeAttribute一起使用。雖然編譯上沒啥問題,但實際上會對程序員的邏輯照成一定程度的誤導。

2.保存身份

有身份驗證,就必然需要保存身份。當我們從數據庫中或者其他的三方服務中獲取到用戶信息后,我們需要將用戶信息保存起來,而不是每次都向用戶或者服務提供方索求信息。

在asp.net core中,Controller類里有一個屬性:

public HttpContext HttpContext { get; }

HttpContext 提供了一個擴展方法,可以用來保存用戶信息:

public static Task SignInAsync(this HttpContext context, ClaimsPrincipal principal);

暫時忽略這個方法的返回類型,它接受了一個ClaimsPrincipal類型的參數。我們來看下這個類的基本情況吧:

public class ClaimsPrincipal : IPrincipal
{

    public ClaimsPrincipal();
    public ClaimsPrincipal(IEnumerable<ClaimsIdentity> identities);
    public ClaimsPrincipal(BinaryReader reader);
    public ClaimsPrincipal(IIdentity identity);
    public ClaimsPrincipal(IPrincipal principal);
    
    public static ClaimsPrincipal Current { get; }
    public static Func<ClaimsPrincipal> ClaimsPrincipalSelector { get; set; }
    public static Func<IEnumerable<ClaimsIdentity>, ClaimsIdentity> PrimaryIdentitySelector { get; set; }
    public virtual IIdentity Identity { get; }
    public virtual IEnumerable<ClaimsIdentity> Identities { get; }
    public virtual IEnumerable<Claim> Claims { get; }
    public virtual void AddIdentities(IEnumerable<ClaimsIdentity> identities);
    public virtual void AddIdentity(ClaimsIdentity identity);
    public virtual ClaimsPrincipal Clone();
    public virtual IEnumerable<Claim> FindAll(Predicate<Claim> match);
    public virtual IEnumerable<Claim> FindAll(string type);
    public virtual Claim FindFirst(string type);
    public virtual Claim FindFirst(Predicate<Claim> match);
    public virtual bool HasClaim(Predicate<Claim> match);
    public virtual bool HasClaim(string type, string value);
    public virtual bool IsInRole(string role);
    public virtual void WriteTo(BinaryWriter writer);
}

方法和屬性有點多,那麼我們重點關注一下構造函數以及可以AddXXX開頭的方法。

這裡有一個竅門,對於一個陌生的類來說,構造函數對於類本身是個很重要的特徵,我們可以通過構造函數分析出這個類需要哪些基礎數據。

所以,通過簡單的分析,我們需要繼續了解這兩個類:

public class ClaimsIdentity : IIdentity
{
    public ClaimsIdentity();
    public ClaimsIdentity(string authenticationType);
    public ClaimsIdentity(IIdentity identity);
    public ClaimsIdentity(IEnumerable<Claim> claims);
    public ClaimsIdentity(IEnumerable<Claim> claims, string authenticationType);
    public ClaimsIdentity(IIdentity identity, IEnumerable<Claim> claims);
    public ClaimsIdentity(string authenticationType, string nameType, string roleType);
    public ClaimsIdentity(IEnumerable<Claim> claims, string authenticationType, string nameType, string roleType);
    public ClaimsIdentity(IIdentity identity, IEnumerable<Claim> claims, string authenticationType, string nameType, string roleType);
    
}

public class Claim
{
    public Claim(BinaryReader reader);
    public Claim(BinaryReader reader, ClaimsIdentity subject);

    public Claim(string type, string value);
    public Claim(string type, string value, string valueType);
    public Claim(string type, string value, string valueType, string issuer);

    public Claim(string type, string value, string valueType, string issuer, string originalIssuer);
    public Claim(string type, string value, string valueType, string issuer, string originalIssuer, ClaimsIdentity subject);
    protected Claim(Claim other);
    protected Claim(Claim other, ClaimsIdentity subject);
    public string Type { get; }
    public ClaimsIdentity Subject { get; }
    public IDictionary<string, string> Properties { get; }
    public string OriginalIssuer { get; }
    public string Issuer { get; }
    public string ValueType { get; }
    public string Value { get; }
    protected virtual byte[] CustomSerializationData { get; }
    public virtual Claim Clone();
    public virtual Claim Clone(ClaimsIdentity identity);
    public override string ToString();
    public virtual void WriteTo(BinaryWriter writer);
    protected virtual void WriteTo(BinaryWriter writer, byte[] userData);
}

所以,看到這裏就會發現,我們可以通過以下方式保存信息:

List<Claim> claims = null;
var identity = new ClaimsIdentity(claims, CookieAuthenticationDefaults.AuthenticationScheme);

HttpContext.SignInAsync( CookieAuthenticationDefaults.AuthenticationScheme,new ClaimsPrincipal(identity));

這時候,數據就可以保存在Cookie里了,那麼如何在控制器中獲取到數據呢:

public ClaimsPrincipal User { get; }

在控制器中,提供了這樣一個屬性,當然如果想要正確獲取到值的話,需要在 Startup.cs類中的添加如下配置:

public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
    // ……省略其他配置
    app.UseAuthorization();
    app.UseAuthentication();
    // ……省略其他配置
}

3. 總結

在這一篇中,簡單介紹了asp.net core的identity,下一篇將從實際上帶領大家設置不一樣的identity以及Authorize驗證。

更多內容煩請關注我的博客《高先生小屋》

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※台北網頁設計公司全省服務真心推薦

※想知道最厲害的網頁設計公司"嚨底家"!

新北清潔公司,居家、辦公、裝潢細清專業服務

※推薦評價好的iphone維修中心

Mysql和Redis數據同步策略

目錄

  • 為什麼對緩存只刪除不更新
  • 先更新數據庫還是先刪除緩存?
  • Cache Aside Pattern
  • Double-Delete
  • Read/Write Through Pattern
  • Write Behind
  • 設置緩存過期時間
  • 總結

為什麼對緩存只刪除不更新

不更新緩存是防止併發更新導致的數據不一致。
所以為了降低數據不一致的概率,不應該更新緩存,而是直接將其刪除,
然後等待下次發生cache miss時再把數據庫中的數據同步到緩存。

先更新數據庫還是先刪除緩存?

有兩個選擇:
1. 先刪除緩存,再更新數據庫
2. 先更新數據庫,再刪除緩存

如果先刪除緩存,有一個明顯的邏輯錯誤:考慮兩個併發操作,線程A刪除緩存后,線程B讀該數據時會發生Cache Miss,然後從數據庫中讀出該數據並同步到緩存中,此時線程A更新了數據庫。
結果導致,緩存中是老數據,數據庫中是新數據,並且之後的讀操作都會直接讀取緩存中的臟數據。(直到key過期被刪除或者被LRU策略踢出)
如果數據庫更新成功后,再刪除緩存,就不會有上面這個問題。
可能是由於數據庫優先,第二種方式也被稱為Cache Aside Pattern。

Cache Aside Pattern

cache aside在絕大多數情況下能做到數據一致性,但是在極端情況仍然存在問題。

  • 首先更新數據庫(A)和刪除緩存(B)不是原子操作,任何在A之後B之前的讀操作,都會讀到redis中的舊數據。
    正常情況下操作緩存的速度會很快,通常是毫秒級,臟數據存在的時間極端。
    但是,對超高併發的應用可能會在意這幾毫秒。
  • 更新完數據庫后,線程意外被kill掉(真的很不幸),由於沒有刪除緩存,緩存中的臟數據會一直存在。
  • 線程A讀數據時cache miss,從Mysql中查詢到數據,還沒來得及同步到redis中,
    此時線程B更新了數據庫並把Redis中的舊值刪除。隨後,線程A把之前查到的數據同步到了Redis。
    顯然,此時redis中的是臟數據。
    通常數據庫讀操作比寫操作快很多,所以除非線程A在同步redis前意外卡住了,否則發生上述情況的概率極低。

雖然以上情況都有可能發生,但是發生的概率相比“先刪除緩存再更新數據庫”會低很多。

Double-Delete

前面我們講到先刪除緩存(A)、后更新數據庫(B)的方案有明顯的錯誤,任何發生在A操作和B操作之間的併發讀都會造成數據的最終不一致。
Double-Delete是一種比較笨拙的修補方案,執行過程如下:

1.delete redis cache
2.update database
3.sleep(500ms)
3.delete redis cache

也很好理解,在睡眠500ms后嘗試再次刪除緩存中的臟數據,它通過兩次刪除來盡可能做到數據的最終一致。
其實,Double-Delete在數據一致性上比Cache Aside更靠譜,但是它的代價是昂貴的,
即使,把睡眠時間縮短到100ms,對耗時敏感的應用也不會考慮這種方案。

Read/Write Through Pattern

cache aside是我們自己的應用程序維護兩個數據存儲系統,而Read/Write Through Pattern是把同步數據的問題交給緩存系統了,應用程序不需要關心。
Read Through是指發生cache miss時,緩存系統自動去數據庫加載數據。
Write Through是指如果cache miss,直接更新數據庫,然後返回,如果cache hit,則更新緩存后,由緩存系統自動同步到數據庫。
以Redis為例,通常我們不會把數據庫的數據全部緩存到redis,而是採用一定的數據精簡或壓縮策略,以節省緩存空間。
就是說,讓緩存系統設計出通用的緩存方案不太現實,不過根據自己的業務定製一個在項目內部通用的中間件是可行的。

Write Behind

Write Behind方案在更新數據時,只更新緩存,不更新數據庫。而是由另外一個服務異步的把數據更新到數據庫。
邏輯上,和Linux中的write back很類似。這個設計的好處是,I/O操作很快,因為是純內存操作。
但是由於異步寫庫,可能要犧牲一些數據一致性,譬如突然宕機會丟失所有未寫入數據庫的內存數據。

阿里巴巴的Canal中間件是一種相反的設計,它先更新mysql,然後通過binlog把數據自動同步到redis。
這種方案會全量同步數據到redis,不適合只緩存熱點數據的應用。

設置緩存過期時間

無論採用哪種策略都應該設置緩存的過期時間。
過期時間設置太長,臟數據存在的時間就越長;
過期時間設置太短,大量的cache miss會讓查詢直接進入數據庫。
所以,需要根據業務場景考慮過期時間。

總結

以上沒有哪種方案是完美的,都無法做到強一致性。
我們總要在性能和數據準確性之間做出妥協。
Cache Aside Pattern適用於絕大多數的場景。

https://www.pixelstech.net/article/1562504974-Consistency-between-Redis-Cache-and-SQL-Database
https://coolshell.cn/articles/17416.html
為什麼不更新緩存,而是直接刪除

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※幫你省時又省力,新北清潔一流服務好口碑

※回頭車貨運收費標準

一起玩轉微服務(12)——揭密starter

介紹

Spring Boot的starter主要用來簡化依賴用的,對於企業級開發中的與第三方的集成,可以通過一段簡單的配置來完成,這樣開發人員無需再對包依賴的問題頭疼。Spring Boot為我們提供了簡化企業級開發的絕大多數場景的starter pom,只需要指定需要配置的starter,Spring Boot會自動為我們提供配置好的bean。

通常流程

通常我們要搭建一個基於Spring的Web應用,我們需要做以下一些工作:

pom文件中引入相關jar包,包括spring、springmvc、redis、mybaits、log4j、mysql-connector-java 等等相關jar …

配置web.xml,Listener配置、Filter配置、Servlet配置、log4j配置、error配置 …

配置數據庫連接、配置spring事務

配置視圖解析器

開啟註解、自動掃描功能

配置完成後部署tomcat、啟動調試

……

花在搭建一個初始項目,可能一個小時就過去了或者半天就過了,但是用了SpringBoot之後一切都會變得非常便捷。

常用的starter

Spring Boot常用的starter(啟動器)包括:

  • Spring-boot-starter-logging :使用 Spring Boot 默認的日誌框架 Logback。
  • Spring-boot-starter-log4j :添加 Log4j 的支持。
  • Spring-boot-starter-web :支持 Web 應用開發,包含 Tomcat 和 Spring-mvc。
  • Spring-boot-starter-tomcat :使用 Spring Boot 默認的 Tomcat 作為應用服務器。
  • Spring-boot-starter-jetty :使用 Jetty 而不是默認的 Tomcat 作為應用服務器。
  • Spring-boot-starter-test :包含常用的測試所需的依賴,如 Junit、Hamcrest、Mockito 和 Spring-test 等。
  • Spring-boot-starter-AOP :包含 Spring-AOP 和 AspectJ 來支持面向切面編程(AOP)。
  • Spring-boot-starter-security :包含 Spring-security。
  • Spring-boot-starter-jdbc :支持使用 JDBC 訪問數據庫。
  • Spring-boot-starter-redis :支持使用 Redis。
  • Spring-boot-starter-data-mongodb :包含 Spring-data-mongodb 來支持 MongoDB。
  • Spring-boot-starter-data-jpa :包含 Spring-data-jpa、Spring-orm 和 Hibernate 來支持 JPA。
  • Spring-boot-starter-amqp :通過 Spring-rabbit 支持 AMQP。
  • Spring-boot-starter-actuator : 添加適用於生產環境的功能,如性能指標和監測等功能。

當然,如果有必要,也可以定製自己的starter。

起步依賴

在我們的pom文件裏面引入以下jar:

spring-boot-starter-web包自動幫我們引入了web模塊開發需要的相關jar包。

 

mybatis-spring-boot-starter幫我們引入了dao開發相關的jar包。 spring-boot-starter-xxx是官方提供的starter,xxx-spring-boot-starter是第三方提供的starter。

截圖看一下我們的mybatis-spring-boot-starter

可以看出mybatis-spring-boot-starter並沒有任何源碼,只有一個pom文件,它的作用就是幫我們引入其它jar。

 

得益於starter的作用,使用SpringBoot確實方便,但對剛剛上手SpringBoot的人來說,可能只知道配置屬性是在application.xml或application.yml中添加,但他們各自的屬性都有哪些,具體怎麼配置,卻無從下手。這裏先解決SpringBoot-starter中各屬性的配置問題。

Mybatis的配置是怎麼生效的?查看示例工程的pom依賴:

注意到mybatis-spring-boot-starter幫我們自動依賴了Mybatis所需jar包,其中有一個負責自動配置的mybatis-spring-boot-autoconfigure.jar,緊接着打開此jar,如下:

META-INF/spring-configuration-metadata.json中便是Mybatis在SpringBoot中的所有配置屬性和介紹。

SpringBoot-starter自動配置bean

現在已得知jar包是怎麼樣自動依賴進來,以及他們的配置屬性,那麼接下來該考慮Mybatis所需的bean(如必需的sqlSessionFactory、sqlSessionTemplate等)是如何被自動加載的?

理所應當地,我們繼續去查看mybatis-spring-boot-autoconfigure.jar,注意到裏面有一個自動配置的類MybatisAutoConfiguration:

(1)@Configuration:被掛上@Configuration註解,表明它是一個配置類,作用等同於xml配置,裏面有被@Bean註解的方法,也等同於xml配置的各種。

(2)@ConditionalOnClass/@ConditionalOnBean:自動配置條件註解,用於在某一部分配置中,將另一模塊部分的配置自動加載進來,因為隨着系統越來越大,配置內容越來越多,我們應當將Mybatis的配置放在一處,將log4j的配置放在一處,將SpringBoot自身的配置放在一處,當他們需要互相依賴時,可通過這類註解進行自動配置,如下:

@ConditionalOnClass @ConditionalOnMissingClass
@ConditionalOnBean @ConditionalOnMissingBean
@ConditionalOnProperty
@ConditionalOnResource
@ConditionalOnWebApplication @ConditionalOnNotWebApplication
@ConditionalOnExpression
 
@AutoConfigureAfter @AutoConfigureBefore @AutoConfigureOrder(指定順序)
 

(3)@EnableConfigurationProperties:啟用對@ConfigurationProperties註解的bean的支持,這裏對應了配置屬性類MybatisProperties,它裏面定義了Mybatis的所有配置。

(4)@AutoConfigureAfter:應在其他指定的自動配置類之後應用自動配置。即org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration被自動配置后,才會接着自動配置MybatisAutoConfiguration。這裏也解釋了為什麼我們在application.xml中只配置了數據源,而沒有配置Mybatis,但是Mybatis可以正常查庫的原因,就是因為它們配置之間的依賴關係。

到這裏,差不多明白了starter自動配置bean的方式,但是如若再去深究,各種starter的bean是如何被自動加載的,猜想會不會是項目啟動后,SpringBoot自動掃描裏面所有的jar包,再去掃描所有的類,從而將各個bean放置IOC容器中。從結果來看,肯定是SpringBoot在啟動時確確實實地自動加載了數據源和Mybatis相關的bean,不然他們無法正常工作。

回想在我們啟動示例工程時,SpringBoot會自動掃描啟動類所在包下的所有類,而如果還去掃描所有的jar包的話,又是具體怎麼做到的?不妨從入口類調試一把,在SpringApplication.run(DemoApplication.class, args)打斷點,一直追蹤到getSpringFactoriesInstances這塊:

查看SpringFactoriesLoader.loadFactoryNames的方法註釋:

 

使用給定的類加載器從META-INF / spring.factories加載給定類型的工廠實現的完全限定類名。

這裏的spring.factories剛好也存在於mybatis-spring-boot-autoconfigure.jar中,

繼續調試,進入SpringFactoriesLoader.loadFactoryNames,

這裏用類加載器得到工程中所有jar包中的META-INF/spring.factories文件資源,進而通過此文件得到了一些包括自動配置相關的類的集合,有各種工廠類、監聽器、處理器、過濾器、初始化器等等,如下:

最後的org.springframework.boot.autoconfigure.EnableAutoConfiguration集合中當然包括了org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration和org.mybatis.spring.boot.autoconfigure.MybatisAutoConfiguration。接着必然是將實例化的各個bean放進IOC容器中。

至此我們便明白了SpringBoot是如何自動配置starter裏面的bean的。

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※教你寫出一流的銷售文案?

那些年,我們用過的服務器軟件

引言

看過這部電影的人都老了。。。

很多人都知道我是一名後端開發底層小碼農,平時打交道最多的就是服務器,而關於服務器,又有一堆名詞,看起來好像是一個意思,仔細想想又好像不對。

不信?

先放三個名詞「Web 服務器」、「HTTP 服務器」、「應用服務器」。這三種服務器有誰能現在立刻馬上區分開么。

反正我是區分不開。這個問題要是扔到一堆程序員中間,指不定還能引發一場菜雞爭奪戰。

雖然我不會,不會可以度娘啊,希望經常看我文章同學可以有我這種敢於承認自己菜的精神,沒啥好丟人的,不會可以學,學完了下次就會了,總比不會裝會死鴨子嘴硬,下次還不會要強。

經過我一翻度娘 + 整理后,基本上這三個名詞解釋有了:

「Web 服務器」它一般指的是網站服務器,可以向瀏覽器( PC 端或者移動端)等 Web 客戶端提供服務,供請求數據或者下載數據。

而由於 Web 服務器主要支持的協議就是 HTTP 或者 HTTPS ,所以通常情況下 Web 服務器和 HTTP 服務器是等同的,這兩種服務器之間是可以畫上等號的。

而應用服務器是一個很大的概念,微軟對它的定義是「我們把應用程序服務器定義為:作為服務器執行共享業務應用程序的底層的系統軟件。 就像文件服務器為很多用戶提供文件一樣,應用程序服務器讓多個用戶可以同時使用應用程序(通常是客戶創建的應用程序)」。

講的通俗一點就是一種特定應用的承載容器,一般來講,它需要有運行時環境的支持,比如說在 Java 領域,比較常用的應用服務器 Tomcat ,它就必須要 Java 的環境支持。

站在我的角度上,一名 Java 底層碼農從業人員的角度上來看,「Web 服務器」、「HTTP 服務器」、「應用服務器」之間的界限是非常模糊的,因為同樣一個 Tomcat 服務器,我叫它「Web 服務器」或者「HTTP 服務器」沒有問題,叫它「應用服務器」也沒有半毛錢問題。

常見的「Web 服務器」有 Nginx 、 Apache 、 IIS (這個做 .Net 的同學應該不陌生),常見的「應用服務器」軟件包括 WebLogic、JBoss,前者更輕量級,後者更重量級。

靜態服務和動態服務

接下來科普另一個概念:「靜態服務」和「動態服務」。

「靜態服務」返回的是一些靜態資源,比如圖片、HTML 、 CSS 、 JavaScript 等資源,這些靜態資源有一個顯著的特點是在我們的電腦上,只要路徑寫對了,可以在瀏覽器裏面直接訪問。

比如我在電腦上新建一個文件,把後綴改成 html ,裏面使用 html 隨便寫點內容:

<html>
    <h1>Hello World!</h1>
</html>

靜態服務就是每個人訪問,得到的內容都是一樣的,而動態服務就比較牛逼了,能做到每個人訪問,得到的內容都是不一樣的。

最直接的例子就是經常用的淘寶京東這些網站,登錄以後,訪問自己賬號的個人中心,肯定每個人得到的結果都是不一樣的結果。

還有就是我的博客,最開始我的博客是使用 Hexo 搭建的靜態博客,託管在騰訊雲的文件服務上,後來開了一台雲服務器,就換成了使用 WordPress 構建的動態博客。

Nginx

Nginx 是一款輕量級的 Web 服務器 / 反向代理服務器及电子郵件( IMAP / POP3 )代理服務器。

不查還真不知道,原來 Nginx 還提供了 IMAP / POP3 / SMTP 服務,設置過郵箱客戶端的同學對這三個名詞應該不陌生。

關於 Nginx ,比較令人遺憾的一件事是,它的作者伊戈爾·賽索耶夫進了監獄。

Nginx 的特點是佔有內存少,併發能力強,在同類型的網頁服務器中表現較好,國內比較有名的公司,比如說百度、京東、新浪、網易、騰訊、淘寶等都在使用。

Nginx 現在用途最多的應該是作為反向代理服務器在使用,因為它的特性穩定、佔用系統資源少、併發能力強,一般都直接放在直面用戶的最外層應對用戶流量。

用戶的訪問請求先落到 Nginx 上,由 Nginx 進行代理轉發,負載均衡到後續的 Tomcat 應用服務器上,盡可能的提升系統的穩定性。

至於 Nginx 如何複雜均衡到後面的應用服務器上,這就是另一個問題了, Nginx 有很多種的負載均衡方案,這裏我就不展開介紹了。

Nginx 是一個典型的靜態服務,把圖片等內容放在 Nginx 上,可以通過固定的鏈接直接訪問。

不過現在通過 Lua 的加持,我們也可以在 Nginx 做一些動態服務才能做的事情,這就是大名鼎鼎的 OpenResty 。

至於 Nginx 安裝或者是 OpenResty 的安裝以及簡單的使用,大家可以訪問各自的官網查看,我就不演示介紹了(反正都不難)。

Tomcat

在我的碼農生涯中,使用最多的莫過於 Tomcat ,沒有之一。

Tomcat 啟動成功的話,訪問它的首頁,正常情況下是能看到一隻貓的,雖然這隻貓長得實在是有點抽象,但人家確實是只貓。

Tomcat 是 Apache 軟件基金會( Apache Software Foundation )的 Jakarta 項目中的一個核心項目,由 Apache 、 Sun 和其他一些公司及個人共同開發而成。

Tomcat 服務器是一個免費的開放源代碼的 Web 應用服務器,這也是為什麼它可以風靡全球的重要原因。

由於這個項目有了 Sun 公司的參与和支持,所以 Tomcat 一般都會支持最新版的 Servlet 和 JSP 規範。這也是為什麼 Java 初學者接觸到的第一個 Web 服務總會是 Tomcat 。

但是 Tomcat 並未支持 EJB 和 JMS ,所以說 Tomcat 是一款輕量級的 Web 容器。

GlassFish

Sun 公司為 Java 提供了商業兼容的 Web 容器: Glassfish ,不過說實話,我沒怎麼用過這個容器,這個 Web 容器僅存在於我上大學的時候的課本上以及課後大作業上。

GlassFish 達到產品級質量,可免費用於開發、部署和重新分發。開發者可以免費獲得源代碼,還可以對代碼進行更改。

Glassfish 既是 EJB 容器也是 WEB 容器,它支持最新版的 Java EE 標準。

而剛才前面介紹的 Tomcat 則僅僅只是一個 Web 容器。

Jboss

Jboss 是一個基於 Java EE 的開放源代碼的應用服務器。 JBoss 代碼遵循 LGPL 許可,可以在任何商業應用中免費使用。

Jboss 和上面的 Glassfish 一樣,同樣是企業級的 Web 容器,並且在 2004 年 6 月, JBoss 公司宣布, JBoss 應用服務器通過了 Sun 公司的 J2EE 認證后,一直在緊跟最新的 J2EE 規範,而且在某些技術領域引領 J2EE 規範的開發。

因此,無論在商業領域,還是在開源社區, JBoss 成為了第一個通過 J2EE 1.4 認證的主流應用服務器。 JBoss 應用服務器已經真正發展成企業級應用服務器。

之後好景不長,在 2006 年, JBoss 被 Red Hat 以三億五千萬美金併購。

之後在 2019 年,也就是去年, Red Hat 為 JBoss Application Server 換了一個新的名字,即: WildFly 。

因為 JBoss 本身是開源免費的,而 Red Hat 的企業產品 JBoss EAP 是一個收費產品,Red Hat 為了使這兩個產品差異化,避免用戶混淆,而進行更名。

JBoss 版本:

  • 社區版:JBoss AS(Application Server) -> WildFly
  • 企業版:JBoss EAP(Enterprise Application Server)

JBoss 核心服務不包括支持 servlet / JSP 的 WEB 容器,一般與 Tomcat 綁定使用, JBoss 的 Web 容器使用的是 Tomcat 。

Apache

如果不是 IT 行業,如果說起來 Apache ,是不是大多數人想到的是這個東西:

不過顯然我要說的不是這個,而是這個小羽毛:

說起來慚愧,我在剛入門的很長一段時間中,一直以為 Apache 就是 Tomcat , 傻傻分不清楚。

後來接觸到 PHP 以後,才知道他們倆完全不同, Logo 就不同(這不是廢話)。

Apache 一般是指 Apache HTTP Server,是 Apache 軟件基金會(和 Tomcat 同屬一家基金會,並且 Apache 服務和 Apache 基金會名字都一樣,新人能分清才見鬼了)下的一個網頁服務器軟件。

由於其跨平台和安全性,被廣泛使用,是最流行的 Web 服務器軟件之一。它快速、可靠並且可通過簡單的 API 擴展。

我現在的博客站使用的就是 Apache 的服務,當時搞 WordPress 的時候着實坑了我一把,完全沒想到一個 PHP 環境這麼難搞,後來在網上找問題的搜索的時候才知道有 LAMP 這麼個東西。

  • Linux,操作系統。
  • Apache,網頁服務器。
  • MySQL,數據庫管理系統(或者數據庫服務器)。
  • PHP 、 Perl 或 Python,腳本語言。

不過還可以使用 Nginx 替換 Apache ,這個新的組合叫 LNMP 。

Jetty

Jetty 和 Tomcat 有很多相似之處,比如說可以為 JSP 和 Servlet 提供運行時環境。Jetty 是 Java 語言編寫的,它的 API 以一組 JAR 包的形式發布。

相比較 Tomcat 而言, Jetty 更加的輕量級,因為 Tomcat 除了遵循 Servlet 規範以外,自身還擴展了大量 Java EE 特性以滿足企業級應用的需求。

但對於大量普通的互聯網應用而言,並不需要用到 Tomcat 其他高級特性,所以在這種情況下,使用 Tomcat 是很浪費資源的。

而這時換成 Jetty ,每個應用服務器省下那幾 MB 內存,對於大的分佈式環境則是節省大量資源。

Jetty 可以同時處理大量鏈接並且長時間的保持這些鏈接,例如,一些 Web 聊天應用非常適合用 Jetty 服務器。

Jetty 的架構比較簡單,它有一個基本數據模型,這個數據模型就是 Handler,所有可以被擴展的組件都可以作為一個 Handler,添加到 Server 中,Jetty 就是幫我們管理這些 Handler 的。

Resin

最後一個放出來的是 Resin ,不知道有多少人聽說過這個 Web 容器。

Resin 是 CAUCHO 公司的產品,也是一個 Web 容器,對 Servlet / JSP 提供了良好的支持,性能也比較優良, Resin 自身採用 JAVA 語言開發。

基於百度百科的介紹是說 Resin 是一個非常流行的 Web 容器。

請恕我直言,這個容器真的這麼流行么,如果我不是因為維護一個公司的老系統,還真不知道還有這麼個 Web 容器。

可能 Resin 流行的年代比較久遠了,從我入行以後就不流行了。

其他

再說幾個只聽過沒接觸過的容器:

  • Undertow: JFinal 框架的默認容器切換成了 Undertow 。
  • WebLogic: 甲骨文出品,這個我沒接觸過,不過聽朋友講用這個大多數都是銀行,據說買一買挺貴的。
  • WebSphere: IBM 出品,這個和上面那個 WebLogic 一樣,只聽說過銀行在用。

JFinal 是基於 Java 語言的極速 WEB + ORM 框架,其核心設計目標是開發迅速、代碼量少、學習簡單、功能強大、輕量級、易擴展、Restful。用於一些小項目還是很合適的。

WebLogic 和 WebSphere 一看出品方都挺 NB 的,據說還有後台界面可以操作使用,發布程序都是點鼠標完成的。

不過現在發布程序大多數都開始用 Jenkins 了,其實也方便了很多,包括很多公司可能都上線了 DevOps 系統,程序發布只會越來越簡單。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※台北網頁設計公司全省服務真心推薦

※想知道最厲害的網頁設計公司"嚨底家"!

新北清潔公司,居家、辦公、裝潢細清專業服務

※推薦評價好的iphone維修中心

重學 Java 設計模式:實戰迭代器模式「模擬公司組織架構樹結構關係,深度迭代遍歷人員信息輸出場景」

作者:小傅哥
博客:https://bugstack.cn – 原創系列專題文章

沉澱、分享、成長,讓自己和他人都能有所收穫!

一、前言

相信相信的力量!

從懵懂的少年,到拿起鍵盤,可以寫一個HelloWorld。多數人在這並不會感覺有多難,也不會認為做不出來。因為這樣的例子,有老師的指導、有書本的例子、有前人的經驗。但隨着你的開發時間越來越長,要解決更複雜的問題或者技術創新,因此在網上搜了幾天幾夜都沒有答案,這個時候是否想過放棄,還是一直堅持不斷的嘗試一點點完成自己心裏要的結果。往往這種沒有前車之鑒需要自己解決問題的時候,可能真的會折磨到要崩潰,但你要願意執着、願意倔強,願意選擇相信相信的力量,就一定能解決。哪怕解決不了,也可以在這條路上摸索出其他更多的收穫,為後續前進的道路填充好墊腳石。

時間緊是寫垃圾代碼的理由?

擰螺絲?Ctrl+C、Ctrl+V?貼膏藥一樣寫代碼?沒有辦法,沒有時間,往往真的是借口,胸中沒用筆墨,才只能湊合。難道一定是好好寫代碼就浪費時間,拼湊CRUD就快嗎,根本不可能的。因為不會,沒用實操過,很少架構出全場景的設計,才很難寫出優良的代碼。多增強自身的編碼(武術)修為,在各種編碼場景中讓自己變得老練,才好應對緊急情況下的需求開發和人員安排。就像韓信一樣有謀有略,才能執掌百萬雄兵。

不要只是做個工具人!

因為日常的編寫簡單業務需求,導致自己像個工具人一樣,日久天長的也就很少去深入學習更多技術棧。看見有工具、有組件、有框架,拿來就用用,反正沒什麼體量也不會出什麼問題。但如果你想要更多的收入,哪怕是重複的造輪子,你也要去嘗試造一個,就算不用到生產,自己玩玩總可以吧。有些事情只有自己經歷過,才能有最深的感觸,參与過實踐過,才好總結點評學習。

二、開發環境

  1. JDK 1.8
  2. Idea + Maven
  3. 涉及工程一個,可以通過關注公眾號bugstack蟲洞棧,回復源碼下載獲取(打開獲取的鏈接,找到序號18)
工程 描述
itstack-demo-design-15-00 開發樹形組織架構關係迭代器

三、迭代器模式介紹

迭代器模式,常見的就是我們日常使用的iterator遍歷。雖然這個設計模式在我們的實際業務開發中的場景並不多,但卻幾乎每天都要使用jdk為我們提供的list集合遍歷。另外增強的for循環雖然是循環輸出數據,但是他不是迭代器模式。迭代器模式的特點是實現Iterable接口,通過next的方式獲取集合元素,同時具備對元素的刪除等操作。而增強的for循環是不可以的。

這種設計模式的優點是可以讓我們以相同的方式,遍歷不同的數據結構元素,這些數據結構包括;數組鏈表等,而用戶在使用遍歷的時候並不需要去關心每一種數據結構的遍歷處理邏輯,從讓使用變得統一易用。

四、案例場景模擬

在本案例中我們模擬迭代遍歷輸出公司中樹形結構的組織架構關係中僱員列表

大部分公司的組織架構都是金字塔結構,也就這種樹形結構,分為一級、二級、三級等部門,每個組織部門由僱員填充,最終體現出一個整體的樹形組織架構關係。

一般我們常用的遍歷就是jdk默認提供的方法,對list集合遍歷。但是對於這樣的偏業務特性較大的樹形結構,如果需要使用到遍歷,那麼就可以自己來實現。接下來我們會把這個組織層次關係通過樹形數據結構來實現,並完成迭代器功能。

五、迭代器模式遍歷組織結構

在實現迭代器模式之前可以先閱讀下javalist方法關於iterator的實現部分,幾乎所有的迭代器開發都會按照這個模式來實現,這個模式主要分為以下幾塊;

  1. Collection,集合方法部分用於對自定義的數據結構添加通用方法;addremoveiterator等核心方法。
  2. Iterable,提供獲取迭代器,這個接口類會被Collection繼承。
  3. Iterator,提供了兩個方法的定義;hasNextnext,會在具體的數據結構中寫實現方式。

除了這樣通用的迭代器實現方式外,我們的組織關係結構樹,是由節點和節點間的關係鏈構成,所以會比上述的內容多一些入參。

1. 工程結構

itstack-demo-design-15-02
└── src
    ├── main
    │   └── java
    │       └── org.itstack.demo.design
    │           ├── group
    │           │	├── Employee.java
    │           │	├── GroupStructure.java
    │           │	└── Link.java
    │           └──  lang
    │            	├── Collection.java
    │            	├── Iterable.java
    │            	└── Iterator.java
    └── test
        └── java
            └── org.itstack.demo.design.test
                └── ApiTest.java

迭代器模式模型結構

  • 以上是我們工程類圖的模型結構,左側是對迭代器的定義,右側是在數據結構中實現迭代器功能。
  • 關於左側部分的實現與jdk中的方式是一樣的,所以在學習的過程中可以互相參考,也可以自己擴展學習。
  • 另外這個遍歷方式一個樹形結構的深度遍歷,為了可以更加讓學習的小夥伴容易理解,這裏我實現了一種比較簡單的樹形結構深度遍歷方式。後續讀者也可以把遍歷擴展為橫向遍歷也就是寬度遍歷。

2. 代碼實現

2.1 僱員實體類

/**
 * 僱員
 */
public class Employee {

    private String uId;   // ID
    private String name;  // 姓名
    private String desc;  // 備註
    
    // ...get/set
}
  • 這是一個簡單的僱員類,也就是公司員工的信息類,包括必要的信息;id、姓名、備註。

2.2 樹節點鏈路

/**
 * 樹節點鏈路
 */
public class Link {

    private String fromId; // 僱員ID
    private String toId;   // 僱員ID    
    
    // ...get/set
}
  • 這個類用於描述結構樹中的各個節點之間的關係鏈,也就是A to BB to CB to D,以此描述出一套完整的樹組織結構。

2.3 迭代器定義

public interface Iterator<E> {

    boolean hasNext();

    E next();
    
}
  • 這裏的這個類和javajdk中提供的是一樣的,這樣也方面後續讀者可以對照listIterator進行源碼學習。
  • 方法描述;hasNext,判斷是否有下一個元素、next,獲取下一個元素。這個在list的遍歷中是經常用到的。

2.4 可迭代接口定義

public interface Iterable<E> {

    Iterator<E> iterator();

}
  • 這個接口中提供了上面迭代器的實現Iterator的獲取,也就是後續在自己的數據結構中需要實現迭代器的功能並交給Iterable,由此讓外部調用方進行獲取使用。

2.5 集合功能接口定義

public interface Collection<E, L> extends Iterable<E> {

    boolean add(E e);

    boolean remove(E e);

    boolean addLink(String key, L l);

    boolean removeLink(String key);

    Iterator<E> iterator();

}
  • 這裏我們定義集合操作接口;Collection,同時繼承了另外一個接口Iterable的方法iterator()。這樣後續誰來實現這個接口,就需要實現上述定義的一些基本功能;添加元素刪除元素遍歷
  • 同時你可能注意到這裏定義了兩個泛型<E, L>,因為我們的數據結構一個是用於添加元素,另外一個是用於添加樹節點的鏈路關係。

2.6 (核心)迭代器功能實現

public class GroupStructure implements Collection<Employee, Link> {

    private String groupId;                                                 // 組織ID,也是一個組織鏈的頭部ID
    private String groupName;                                               // 組織名稱
    private Map<String, Employee> employeeMap = new ConcurrentHashMap<String, Employee>();  // 僱員列表
    private Map<String, List<Link>> linkMap = new ConcurrentHashMap<String, List<Link>>();  // 組織架構關係;id->list
    private Map<String, String> invertedMap = new ConcurrentHashMap<String, String>();       // 反向關係鏈

    public GroupStructure(String groupId, String groupName) {
        this.groupId = groupId;
        this.groupName = groupName;
    }

    public boolean add(Employee employee) {
        return null != employeeMap.put(employee.getuId(), employee);
    }

    public boolean remove(Employee o) {
        return null != employeeMap.remove(o.getuId());
    }

    public boolean addLink(String key, Link link) {
        invertedMap.put(link.getToId(), link.getFromId());
        if (linkMap.containsKey(key)) {
            return linkMap.get(key).add(link);
        } else {
            List<Link> links = new LinkedList<Link>();
            links.add(link);
            linkMap.put(key, links);
            return true;
        }
    }

    public boolean removeLink(String key) {
        return null != linkMap.remove(key);
    }

    public Iterator<Employee> iterator() {

        return new Iterator<Employee>() {

            HashMap<String, Integer> keyMap = new HashMap<String, Integer>();

            int totalIdx = 0;
            private String fromId = groupId;  // 僱員ID,From
            private String toId = groupId;   // 僱員ID,To

            public boolean hasNext() {
                return totalIdx < employeeMap.size();
            }

            public Employee next() {
                List<Link> links = linkMap.get(toId);
                int cursorIdx = getCursorIdx(toId);

                // 同級節點掃描
                if (null == links) {
                    cursorIdx = getCursorIdx(fromId);
                    links = linkMap.get(fromId);
                }

                // 上級節點掃描
                while (cursorIdx > links.size() - 1) {
                    fromId = invertedMap.get(fromId);
                    cursorIdx = getCursorIdx(fromId);
                    links = linkMap.get(fromId);
                }

                // 獲取節點
                Link link = links.get(cursorIdx);
                toId = link.getToId();
                fromId = link.getFromId();
                totalIdx++;

                // 返回結果
                return employeeMap.get(link.getToId());
            }
             
            // 給每個層級定義寬度遍歷進度
            public int getCursorIdx(String key) {
                int idx = 0;
                if (keyMap.containsKey(key)) {
                    idx = keyMap.get(key);
                    keyMap.put(key, ++idx);
                } else {
                    keyMap.put(key, idx);
                }
                return idx;
            }
        };
    }

}
  • 以上的這部分代碼稍微有點長,主要包括了對元素的添加和刪除。另外最重要的是對遍歷的實現 new Iterator<Employee>
  • 添加和刪除元素相對來說比較簡單,使用了兩個map數組結構進行定義;僱員列表組織架構關係;id->list。當元素添加元素的時候,會分別在不同的方法中向map結構中進行填充指向關係(A->B),也就構建出了我們的樹形組織關係。

迭代器實現思路

  1. 這裏的樹形結構我們需要做的是深度遍歷,也就是左側的一直遍歷到最深節點。
  2. 當遍歷到最深節點后,開始遍歷最深節點的橫向節點。
  3. 當橫向節點遍歷完成后則向上尋找橫向節點,直至樹結構全部遍歷完成。

3. 測試驗證

3.1 編寫測試類

@Test
public void test_iterator() { 
    // 數據填充
    GroupStructure groupStructure = new GroupStructure("1", "小傅哥");  
    
    // 僱員信息
    groupStructure.add(new Employee("2", "花花", "二級部門"));
    groupStructure.add(new Employee("3", "豆包", "二級部門"));
    groupStructure.add(new Employee("4", "蹦蹦", "三級部門"));
    groupStructure.add(new Employee("5", "大燒", "三級部門"));
    groupStructure.add(new Employee("6", "虎哥", "四級部門"));
    groupStructure.add(new Employee("7", "玲姐", "四級部門"));
    groupStructure.add(new Employee("8", "秋雅", "四級部門"));   
    
    // 節點關係 1->(1,2) 2->(4,5)
    groupStructure.addLink("1", new Link("1", "2"));
    groupStructure.addLink("1", new Link("1", "3"));
    groupStructure.addLink("2", new Link("2", "4"));
    groupStructure.addLink("2", new Link("2", "5"));
    groupStructure.addLink("5", new Link("5", "6"));
    groupStructure.addLink("5", new Link("5", "7"));
    groupStructure.addLink("5", new Link("5", "8"));       

    Iterator<Employee> iterator = groupStructure.iterator();
    while (iterator.hasNext()) {
        Employee employee = iterator.next();
        logger.info("{},僱員 Id:{} Name:{}", employee.getDesc(), employee.getuId(), employee.getName());
    }
}

3.2 測試結果

22:23:37.166 [main] INFO  org.itstack.demo.design.test.ApiTest - 二級部門,僱員 Id:2 Name:花花
22:23:37.168 [main] INFO  org.itstack.demo.design.test.ApiTest - 三級部門,僱員 Id:4 Name:蹦蹦
22:23:37.169 [main] INFO  org.itstack.demo.design.test.ApiTest - 三級部門,僱員 Id:5 Name:大燒
22:23:37.169 [main] INFO  org.itstack.demo.design.test.ApiTest - 四級部門,僱員 Id:6 Name:虎哥
22:23:37.169 [main] INFO  org.itstack.demo.design.test.ApiTest - 四級部門,僱員 Id:7 Name:玲姐
22:23:37.169 [main] INFO  org.itstack.demo.design.test.ApiTest - 四級部門,僱員 Id:8 Name:秋雅
22:23:37.169 [main] INFO  org.itstack.demo.design.test.ApiTest - 二級部門,僱員 Id:3 Name:豆包

Process finished with exit code 0
  • 從遍歷的結果可以看到,我們是順着樹形結構的深度開始遍歷,一直到右側的節點3僱員 Id:2、僱員 Id:4...僱員 Id:3

六、總結

  • 迭代器的設計模式從以上的功能實現可以看到,滿足了單一職責和開閉原則,外界的調用方也不需要知道任何一個不同的數據結構在使用上的遍歷差異。可以非常方便的擴展,也讓整個遍歷變得更加乾淨整潔。
  • 但從結構的實現上可以看到,迭代器模式的實現過程相對來說是比較負責的,類的實現上也擴增了需要外部定義的類,使得遍歷與原數據結構分開。雖然這是比較麻煩的,但可以看到在使用java的jdk時候,迭代器的模式還是很好用的,可以非常方便擴展和升級。
  • 以上的設計模式場景實現過程可能對新人有一些不好理解點,包括;迭代器三個和接口的定義、樹形結構的數據關係、樹結構深度遍歷思路。這些都需要反覆實現練習才能深入的理解,事必躬親,親歷親為,才能讓自己掌握這些知識。

七、推薦閱讀

  • 1. 重學 Java 設計模式:實戰工廠方法模式「多種類型商品不同接口,統一發獎服務搭建場景」
  • 2. 重學 Java 設計模式:實戰原型模式「上機考試多套試,每人題目和答案亂序排列場景」
  • 3. 重學 Java 設計模式:實戰橋接模式「多支付渠道(微信、支付寶)與多支付模式(刷臉、指紋)場景」
  • 4. 重學 Java 設計模式:實戰組合模式「營銷差異化人群發券,決策樹引擎搭建場景」
  • 5. 重學 Java 設計模式:實戰外觀模式「基於SpringBoot開發門面模式中間件,統一控制接口白名單場景」
  • 6. 重學 Java 設計模式:實戰享元模式「基於Redis秒殺,提供活動與庫存信息查詢場景」

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※幫你省時又省力,新北清潔一流服務好口碑

※回頭車貨運收費標準

奧迪看衰純電動車市場 新款車型延後推出

 

隨著寶馬等公司逐漸擴大投入純電動車市場,奧迪並未選擇加速緊追,相反的,奧迪並不看好純電動車市場的發展。

近日歐洲媒體 Worldcarfuns 報導,奧迪公司並不想花費大量的時間和金錢來推出出色的電動車型,來與寶馬、特斯拉等品牌的電動車競爭。

根據奧迪的純電動車產品規劃,其曾計劃推出的 R8 電動版、Q6 電動版等 4 款新純電動車,量產時間都將推遲延後。對此,奧迪董事會成員兼銷售總監 Luca de Meo 表示,奧迪對純電動車的前景並不看好,因此暫時沒有生產純電動車的計劃。

 

(圖片來源:)

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※教你寫出一流的銷售文案?

澳大學生研發電動車 續航力超越 Tesla

    電動車廠特斯拉遇到了新對手,但這並不是寶馬也不是通用汽車,而是來自澳洲大學的一群學生。這些學生研發 eVe 電動車,單次充電後以每小時 100 公里的時速行駛了 500 公里,這也打破了塵封已久的電動車續航世界紀錄。此前,這項記錄為單次充電後,以 72 公里每小時速度跑滿 500 公里。   一直以來,續航能力是電動車推廣普及的最大難題,而 eVe 打破記錄外也表明,電動車可以在合理高速下行駛數百公里。eVe 由澳洲新南威爾士大學 Sunswift 團隊學生研發,到如今已是第五代版本。Sunswift 團隊早前曾以打造太陽能汽車名聲大噪,其推出的 IVy 太陽能動力車在 2011 年跑出了時速達 88 公里/小時,創造了該領域車輛的最快時速記錄。   eVe 電動車配有傳統電池,可以採用使用常規充電樁進行電力補充,其也可以通過覆蓋車身的太陽能電池板充電,車身重量為 317公斤(700磅),使用重量 59 公斤的松下電池,用常規家用插座可在 8 小時內充滿電,若接入工業用電插口,可在 5 小時內充滿。Sunswift 團隊表示,如果 eVe 停在太陽下 8 個小時,搭載的 800 瓦太陽能電池組可以提供 2 小時行駛里程,且太陽能面板還可在車輛行駛過程中收集能量。   Sunswift 團隊負責人Hayden Smith 表示,eVe 證明太陽能電動車是傳統石化燃料汽車可行性替代方案。該團隊希望以此激勵商業公司進入這一技術領域,並促使 eVe 成為澳洲首個合法上路的太陽能電動車。     (圖片來源:)

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※台北網頁設計公司全省服務真心推薦

※想知道最厲害的網頁設計公司"嚨底家"!

新北清潔公司,居家、辦公、裝潢細清專業服務

※推薦評價好的iphone維修中心

Tesla 解決大陸商標爭議、Model X 行情看俏

電動車製造商特斯拉 (Tesla ) 6 日宣佈,該公司已和在大陸搶先註冊商標的商人占寶生 (Zhan Baosheng) 敲定協議,雙方化解了商標爭議。占寶生也會轉交他在大陸註冊的網站名稱,當中包括「tesla.cn」、「teslamotors.cn」。   特斯拉表示,占寶生已同意讓大陸主管機關註銷他先前註冊或申請的商標,而且完全不向特斯拉收費。   MarketWatch 則指出,特斯拉計畫推出的電動休旅車 (SUV)「Model X」,很有機會比電動轎車「Model S」更受歡迎。特斯拉目前首度暫停旗下唯一一座組裝廠,以重整產品線加快 Model S 的出貨速度、同時也為生產次世代電動休旅車「Model X」預做準備。  

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※幫你省時又省力,新北清潔一流服務好口碑

※回頭車貨運收費標準

東電紀錄指出 日本輻射污染魚 放射性鍶含量再破紀錄

文:宋瑞文(媽媽監督核電廠聯盟特約撰述)

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※想知道最厲害的網頁設計公司"嚨底家"!

※幫你省時又省力,新北清潔一流服務好口碑

※別再煩惱如何寫文案,掌握八大原則!