[源碼解析] 從TimeoutException看Flink的心跳機制

3{icon} {views}

[源碼解析] 從TimeoutException看Flink的心跳機制

目錄

  • [源碼解析] 從TimeoutException看Flink的心跳機制
    • 0x00 摘要
    • 0x01 緣由
    • 0x02 背景概念
      • 2.1 四大模塊
      • 2.2 Akka
      • 2.3 RPC機制
        • 2.3.1 RpcEndpoint:RPC的基類
        • RpcService:RPC服務提供者
        • RpcGateway:RPC調用的網關
      • 2.4 常見心跳機制
    • 0x03 Flink心跳機制
      • 3.1 代碼和機制
      • 3.2 靜態架構
        • 3.2.1 HeartbeatTarget :監控目標抽象
        • 3.2.2 HeartbeatMonitor : 管理heartbeat target的心跳狀態
        • 3.2.3 HeartbeatManager :心跳管理者
        • 3.2.4 HearbeatListener 處理心跳結果
      • 3.3 動態運行機制
        • 3.3.1 HearbeatManagerImpl : Receiver
        • 3.3.2 HeartbeatManagerSenderImpl : Sender
        • 3.3.3 HeartbeatMonitorImpl
        • 3.3.3 HeartbeatServices
    • 0x04 初始化
      • 4.1 心跳服務創建
    • 0x05 Flink中具體應用
      • 5.1 總述
        • 5.1.1 RM, JM, TM之間關係
        • 5.1.2 三者間心跳機制
      • 5.2 初始化過程
        • 5.2.1 TaskExecutor初始化
        • 5.2.2 JobMaster的初始化
        • 5.2.3 ResourceManager初始化
      • 5.3 註冊過程
        • 5.3.1 TM註冊到RM中
          • 5.3.1.1 TM的操作
          • 5.3.1.2 RM的操作
          • 5.3.1.3 返回到TM
        • 5.3.2 TM註冊到 JM
      • 5.4 心跳過程
        • 5.4.1 ResourceManager主動發起
          • 5.4.1.1 Sender遍歷所有監控的Monitor(Target)
          • 5.4.1.2 Target進行具體操作
          • 5.4.1.3 RPC調用
        • 5.4.2 RM通過RPC調用TM
        • 5.4.3 TM 通過RPC回到 RM
      • 5.5 超時處理
        • 5.5.1 TaskManager
        • 5.5.2 ResourceManager
    • 0x06 解決問題
    • 0x07 參考

0x00 摘要

本文從一個調試時候常見的異常 “TimeoutException: Heartbeat of TaskManager timed out”切入,為大家剖析Flink的心跳機制。文中代碼基於Flink 1.10。

0x01 緣由

大家如果經常調試Flink,當進入斷點看到了堆棧和變量內容之後,你容易陷入了沉思。當你發現了問題可能所在,高興的讓程序Resume的時候,你發現程序無法運行,有如下提示:

Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 93aa1740-cd2c-4032-b74a-5f256edb3217 timed out.

這實在是很鬱悶的事情。作為程序猿不能忍啊,既然異常提示中有 Heartbeat 字樣,於是我們就來一起看看Flink的心跳機制,看看有沒有可以修改的途徑。

0x02 背景概念

2.1 四大模塊

Flink有核心四大組件:Dispatcher,JobMaster,ResourceManager,TaskExecutor。

  • Dispatcher(Application Master)用於接收client提交的任務和啟動相應的JobManager。其提供REST接口來接收client的application提交,負責啟動JM和提交application,同時運行Web UI。
  • ResourceManager:主要用於資源的申請和分配。當TM有空閑的slot就會告訴JM,沒有足夠的slot也會啟動新的TM。kill掉長時間空閑的TM。
  • JobMaster :功能主要包括(舊版本中JobManager的功能在新版本中以JobMaster形式出現,可能本文中會混淆這兩個詞,請大家諒解):
    • 將JobGraph轉化為ExecutionGraph(physical dataflow graph,并行化)。
    • 向RM申請資源、schedule tasks、保存作業的元數據。
  • TaskManager:類似Spark的executor,會跑多個線程的task、數據緩存與交換。Flink 架構遵循 Master – Slave 架構設計原則,JobMaster 為 Master 節點,TaskManager 為Slave節點。

這四大組件彼此之間的通信需要依賴RPC實現

2.2 Akka

Flink底層RPC基於Akka實現。Akka是一個開發併發、容錯和可伸縮應用的框架。它是Actor Model的一個實現,和Erlang的併發模型很像。在Actor模型中,所有的實體被認為是獨立的actors。actors和其他actors通過發送異步消息通信。

Actor模型的強大來自於異步。它也可以顯式等待響應,這使得可以執行同步操作。但是強烈不建議同步消息,因為它們限制了系統的伸縮性。

2.3 RPC機制

RPC作用是:讓異步調用看起來像同步調用。

Flink基於Akka構建了其底層通信系統,引入了RPC調用,各節點通過GateWay方式回調,隱藏通信組件的細節,實現解耦。Flink整個通信框架的組件主要由RpcEndpoint、RpcService、RpcServer、AkkaInvocationHandler、AkkaRpcActor等構成。

RPC相關的主要接口如下:

  • RpcEndpoint
  • RpcService
  • RpcGateway

2.3.1 RpcEndpoint:RPC的基類

RpcEndpoint是Flink RPC終端的基類,所有提供遠程過程調用的分佈式組件必須擴展RpcEndpoint,其功能由RpcService支持。

RpcEndpoint的子類只有四類組件:Dispatcher,JobMaster,ResourceManager,TaskExecutor,即Flink中只有這四個組件有RPC的能力,換句話說只有這四個組件有RPC的這個需求。

每個RpcEndpoint對應了一個路徑(endpointId和actorSystem共同確定),每個路徑對應一個Actor,其實現了RpcGateway接口,

RpcService:RPC服務提供者

RpcServer是RpcEndpoint的成員變量,為RpcService提供RPC服務/連接遠程Server,其只有一個子類實現:AkkaRpcService(可見目前Flink的通信方式依然是Akka)。

RpcServer用於啟動和連接到RpcEndpoint, 連接到rpc服務器將返回一個RpcGateway,可用於調用遠程過程。

Flink四大組件Dispatcher,JobMaster,ResourceManager,TaskExecutor,都是RpcEndpoint的實現,所以構建四大組件時,同步需要初始化RpcServer。如JobManager的構造方式,第一個參數就是需要知道RpcService。

RpcGateway:RPC調用的網關

Flink的RPC協議通過RpcGateway來定義;由前面可知,若想與遠端Actor通信,則必須提供地址(ip和port),如在Flink-on-Yarn模式下,JobMaster會先啟動ActorSystem,此時TaskExecutor的Container還未分配,後面與TaskExecutor通信時,必須讓其提供對應地址。

Dispatcher,JobMaster,ResourceManager,TaskExecutor 這四大組件通過各種方式實現了Gateway。以JobMaster為例,JobMaster實現JobMasterGateway接口。各組件類的成員變量都有需要通信的其他組件的GateWay實現類,這樣可通過各自的Gateway實現RPC調用。

2.4 常見心跳機制

常見的心跳檢測有兩種:

  • socket 套接字SO_KEEPALIVE本身帶有的心跳機制,定期向對方發送心跳包,對方收到心跳包後會自動回復;
  • 應用自身實現心跳機制,同樣也是使用定期發送請求的方式;

Flink實現的是第二種方案。

0x03 Flink心跳機制

3.1 代碼和機制

Flink的心跳機制代碼在:

Flink-master/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat

四個接口:

HeartbeatListener.java          HeartbeatManager.java      HeartbeatTarget.java  HeartbeatMonitor.java

以及如下幾個類:

HeartbeatManagerImpl.java   HeartbeatManagerSenderImpl.java   HeartbeatMonitorImpl.java
HeartbeatServices.java    NoOpHeartbeatManager.java

Flink集群有多種業務流程,比如Resource Manager, Task Manager, Job Manager。每種業務流程都有自己的心跳機制。Flink的心跳機制只是提供接口和基本功能,具體業務功能由各業務流程自己實現

我們首先設定 心跳系統中有兩種節點:sender和receiver。心跳機制是sender和receivers彼此相互檢測。但是檢測動作是Sender主動發起,即Sender主動發送請求探測receiver是否存活,因為Sender已經發送過來了探測心跳請求,所以這樣receiver同時也知道Sender是存活的,然後Reciver給Sender回應一個心跳錶示自己也是活着的。

因為Flink的幾個名詞和我們常見概念有所差別,所以流程上需要大家仔細甄別,即:

  • Flink Sender 主動發送Request請求給Receiver,要求Receiver回應一個心跳;
  • Flink Receiver 收到Request之後,通過Receive函數回應一個心跳請求給Sender;

3.2 靜態架構

3.2.1 HeartbeatTarget :監控目標抽象

HeartbeatTarget是對監控目標的抽象。心跳機制在行為上而言有兩種動作:

  • 向某個節點發送請求。
  • 處理某個節點發來的請求。

HeartbeatTarget的函數就是這兩個動作:

  • receiveHeartbeat :向某個節點(Sender)發送心跳回應,其參數heartbeatOrigin 就是 Receiver。
  • requestHeartbeat :向某個節點(Receiver)要求其回應一個心跳,其參數requestOrigin 就是 Sender。requestHeartbeat這個函數是Sender的函數,其中Sender通過RPC直接調用到Receiver

這兩個函數的參數也很簡單:分別是請求的發送放和接收方,還有Payload載荷。對於一個確定節點而言,接收的和發送的載荷是同一類型的。

public interface HeartbeatTarget<I> {
	/**
	 * Sends a heartbeat response to the target.
	 * @param heartbeatOrigin Resource ID identifying the machine for which a heartbeat shall be reported.
	 */
  // heartbeatOrigin 就是 Receiver
	void receiveHeartbeat(ResourceID heartbeatOrigin, I heartbeatPayload);

	/**
	 * Requests a heartbeat from the target. 
	 * @param requestOrigin Resource ID identifying the machine issuing the heartbeat request.
	 */
  // requestOrigin 就是 Sender
	void requestHeartbeat(ResourceID requestOrigin, I heartbeatPayload);
}

3.2.2 HeartbeatMonitor : 管理heartbeat target的心跳狀態

對HeartbeatTarget的封裝,這樣Manager對Target的操作是通過對Monitor完成,後續會在其繼承類中詳細說明。

public interface HeartbeatMonitor<O> {
  // Gets heartbeat target.
	HeartbeatTarget<O> getHeartbeatTarget();
	// Gets heartbeat target id.
	ResourceID getHeartbeatTargetId();
	// Report heartbeat from the monitored target.
	void reportHeartbeat();
	//Cancel this monitor.
	void cancel();
  //Gets the last heartbeat.
	long getLastHeartbeat();
}

3.2.3 HeartbeatManager :心跳管理者

HeartbeatManager負責管理心跳機制,比如啟動/停止/報告一個HeartbeatTarget。此接口繼承HeartbeatTarget。

除了HeartbeatTarget的函數之外,這接口有4個函數:

  • monitorTarget,把和某資源對應的節點加入到心跳監控列表;
  • unmonitorTarget,從心跳監控列表刪除某資源對應的節點;
  • stop,停止心跳管理服務,釋放資源;
  • getLastHeartbeatFrom,獲取某節點的最後一次心跳數據。
public interface HeartbeatManager<I, O> extends HeartbeatTarget<I> {
	void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget);
	void unmonitorTarget(ResourceID resourceID);
	void stop();
	long getLastHeartbeatFrom(ResourceID resourceId);
}

3.2.4 HearbeatListener 處理心跳結果

用戶業務邏輯需要繼承這個接口以處理心跳結果。其可以看做服務的輸出,實現了三個回調函數

  • notifyHeartbeatTimeout,處理節點心跳超時
  • reportPayload,處理節點發來的Payload載荷
  • retrievePayLoad。獲取對某節點發下一次心跳請求的Payload載荷
public interface HeartbeatListener<I, O> {
	void notifyHeartbeatTimeout(ResourceID resourceID);
	void reportPayload(ResourceID resourceID, I payload);
	O retrievePayload(ResourceID resourceID);
}

3.3 動態運行機制

之前提到Sender和Receiver,下面兩個類就對應上述概念。

  • HeartbeatManagerImpl :Receiver,存在於JobMaster與TaskExecutor中;
  • HeartbeatManagerSenderImpl :Sender,繼承 HeartbeatManagerImpl類,用於周期發送心跳要求,存在於JobMaster、ResourceManager中。

幾個關鍵問題:

  1. 如何判定心跳超時?

    心跳服務啟動后,Flink在Monitor中通過 ScheduledFuture 會啟動一個線程來處理心跳超時事件。在設定的心跳超時時間到達后才執行線程。

    如果在設定的心跳超時時間內接收到組件的心跳消息,會先將該線程取消而後重新開啟,重置心跳超時事件的觸發。

    如果在設定的心跳超時時間內沒有收到組件的心跳,則會通知組件:你超時了。

  2. 何時”調用雙方”發起心跳檢查?

    心跳檢查是雙向的,一方(Sender)會主動發起心跳請求,而另一方(Receiver)則是對心跳做出響應,兩者通過RPC相互調用,重置對方的 Monitor 超時線程。

    以JobMaster和TaskManager為例,JM在啟動時會開啟周期調度,向已經註冊到JM中的TM發起心跳檢查,通過RPC調用TM的requestHeartbeat方法,重置TM中對JM超時線程的調用,表示當前JM狀態正常。在TM的requestHeartbeat方法被調用后,通過RPC調用JM的receiveHeartbeat,重置 JM 中對TM超時線程的調用,表示TM狀態正常。

  3. 如何處理心跳超時?

    心跳服務依賴 HeartbeatListener,當在timeout時間範圍內未接收到心跳響應,則會觸發超時處理線程,該線程通過調用HeartbeatListener.notifyHeartbeatTimeout方法做後續重連操作或者直接斷開。

下面是一個概要(以RM & TM為例):

  • RM : 實現了ResourceManagerGateway (可以直接被RPC調用)

  • TM : 實現了TaskExecutorGateway (可以直接被RPC調用)

  • RM :有一個Sender HM : taskManagerHeartbeatManager,Sender HM 擁有用戶定義的 TaskManagerHeartbeatListener

  • TM :有一個Receiver HM :resourceManagerHeartbeatManager,Receiver HM 擁有用戶定義的ResourceManagerHeartbeatListener。

  • HeartbeatManager 有一個ConcurrentHashMap<ResourceID, HeartbeatMonitor > heartbeatTargets,這個Map是它監控的所有Target。

  • 對於RM的每一個需要監控的TM, 其生成一個HeartbeatTarget,進而被構造成一個HeartbeatMonitor,放置到ResourceManager.taskManagerHeartbeatManager中。

  • 每一個Target對應的Monitor中,有自己的異步任務ScheduledFuture,這個ScheduledFuture不停的被取消/重新生成。如果在某個期間內沒有被取消,則通知用戶定義的listener出現了timeout。

3.3.1 HearbeatManagerImpl : Receiver

HearbeatManagerImpl是receiver的具體實現。它由 心跳 被發起方(就是Receiver,例如TM) 創建,接收 發起方(就是Sender,例如 JM)的心跳發送請求。心跳超時 會觸發 heartbeatListener.notifyHeartbeatTimeout方法。

注意:被發起方監控線程(Monitor)的開啟是在接收到請求心跳(requestHeartbeat被調用后)以後才觸發的,屬於被動觸發。

HearbeatManagerImpl主要維護了

  • 一個心跳監控列表 map : <ResourceID, HeartbeatMonitor<O>> heartbeatTargets;。這是一個KV關聯。

    key代表要發送心跳組件(例如:TM)的ID,value則是為當前組件創建的觸發心跳超時的線程HeartbeatMonitor,兩者一一對應。

    當一個從所聯繫的machine發過來的心跳被收到時候,對應的monitor的狀態會被更新(重啟一個新ScheduledFuture)。當一個monitor發現了一個 heartbeat timed out,它會通知自己的HeartbeatListener。

  • 一個 ScheduledExecutor mainThreadExecutor 負責heartbeat timeout notifications。

  • heartbeatListener :處理心跳結果。

HearbeatManagerImpl 數據結構如下:

@ThreadSafe
public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {

	/** Heartbeat timeout interval in milli seconds. */
	private final long heartbeatTimeoutIntervalMs;

	/** Resource ID which is used to mark one own's heartbeat signals. */
	private final ResourceID ownResourceID;

	/** Heartbeat listener with which the heartbeat manager has been associated. */
	private final HeartbeatListener<I, O> heartbeatListener;

	/** Executor service used to run heartbeat timeout notifications. */
	private final ScheduledExecutor mainThreadExecutor;

	/** Map containing the heartbeat monitors associated with the respective resource ID. */
	private final ConcurrentHashMap<ResourceID, HeartbeatMonitor<O>> heartbeatTargets;

	/** Running state of the heartbeat manager. */
	protected volatile boolean stopped;  
}

HearbeatManagerImpl實現的主要函數有:

  • monitorTarget :把一個節點加入到心跳監控列表。
    • 傳入參數有:ResourceId和HearbeatTarget,monitorTarget根據這兩個參數,生成一個HeartbeatMonitor對象,然後把這個對象跟ResrouceId做kv關聯,存入到heartbeatTargets。 一個節點可能參与多個業務流程,因此一個節點參与多個心跳流程,一個節點上運行多個不同類型的HearbeatTarget。所以一個ResourceID可能會跟不同類型的HearbeatTarget對象關聯,分別加入到多個HeartbeatManager,進行不同類型的心跳監控。也因此這個函數入參是兩個參數。
  • requestHeartbeat :Sender通過RPC異步調用到Receiver的這個函數 以要求receiver向requestOrigin節點(就是Sender)發起一次心跳響應,載荷是heartbeatPayLoad。其內部流程如下:
    • 首先會調用reportHeartbeat函數,作用是 通過Monitor 記錄發起請求的這個時間點,然後創建一個ScheduleFuture。如果到期后,requestOrigin沒有作出響應,那麼就將requestOrigin節點對應的HeartbeatMonitor的state設置成TIMEOUT狀態,如果到期內requestOrigin響應了,ScheduleFuture會被取消,HeartbeatMonitor的state仍然是RUNNING。
    • 其次調用reportPayload函數,把requestOrigin節點的最新的heartbeatPayload通知給heartbeatListener。heartbeatListener是外部傳入的,它根據所有節點的心跳記錄做監聽管理。
    • 最後調用receiveHearbeat函數,響應一個心跳給Sender。

3.3.2 HeartbeatManagerSenderImpl : Sender

繼承HearbeatManagerImpl,由心跳管理的一方(例如JM)創建,實現了run函數(即它可以作為一個單獨線程運行),創建后立即開啟周期調度線程,每次遍歷自己管理的heartbeatTarget,觸發heartbeatTarget.requestHeartbeat,要求 Target 返回一個心跳響應。屬於主動觸發心跳請求。

public class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I, O> implements Runnable {
	public void run() {
		if (!stopped) {
			for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {
				requestHeartbeat(heartbeatMonitor);
			}
      // 周期調度
			getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
		}
	}

  //  主動發起心跳檢查
	private void requestHeartbeat(HeartbeatMonitor<O> heartbeatMonitor) {
		O payload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId());
		final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();
    // 調用 Target 的 requestHeartbeat 函數
		heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload);
	}  
}

3.3.3 HeartbeatMonitorImpl

Heartbeat monitor管理心跳目標,它啟動一個ScheduledExecutor。

  • 如果在timeout時間內沒有接收到心跳信號,則判定心跳超時,通知給HeartbeatListener。
  • 如果在timeout時間內接收到心跳信號,則重置當前ScheduledExecutor。
public class HeartbeatMonitorImpl<O> implements HeartbeatMonitor<O>, Runnable {

	/** Resource ID of the monitored heartbeat target. */
	private final ResourceID resourceID;   // 被監控的resource ID

	/** Associated heartbeat target. */
	private final HeartbeatTarget<O> heartbeatTarget; //心跳目標

	private final ScheduledExecutor scheduledExecutor;

	/** Listener which is notified about heartbeat timeouts. */
	private final HeartbeatListener<?, ?> heartbeatListener; // 心跳監聽器

	/** Maximum heartbeat timeout interval. */
	private final long heartbeatTimeoutIntervalMs;

	private volatile ScheduledFuture<?> futureTimeout;
  //  AtomicReference  使用 
	private final AtomicReference<State> state = new AtomicReference<>(State.RUNNING);
   //  最近一次接收到心跳的時間
  private volatile long lastHeartbeat;
  
	// 報告心跳
	public void reportHeartbeat() {
    // 保留最近一次接收心跳時間
		lastHeartbeat = System.currentTimeMillis();
    // 接收心跳后,重置timeout線程
		resetHeartbeatTimeout(heartbeatTimeoutIntervalMs);
	}

  // 心跳超時,觸發lister的notifyHeartbeatTimeout
	public void run() {
		// The heartbeat has timed out if we're in state running
		if (state.compareAndSet(State.RUNNING, State.TIMEOUT)) {
			heartbeatListener.notifyHeartbeatTimeout(resourceID);
		}
	}

  //  重置TIMEOUT
	void resetHeartbeatTimeout(long heartbeatTimeout) {
		if (state.get() == State.RUNNING) {
      //先取消線程,在重新開啟
			cancelTimeout();
      // 啟動超時線程
			futureTimeout = scheduledExecutor.schedule(this, heartbeatTimeout, TimeUnit.MILLISECONDS);

			// Double check for concurrent accesses (e.g. a firing of the scheduled future)
			if (state.get() != State.RUNNING) {
				cancelTimeout();
			}
		}
	}

3.3.3 HeartbeatServices

建立heartbeat receivers and heartbeat senders,主要是對外提供服務。這裏我們可以看到:

  • HeartbeatManagerImpl就是receivers。
  • HeartbeatManagerSenderImpl就是senders。
public class HeartbeatServices {
	// Creates a heartbeat manager which does not actively send heartbeats.
	public <I, O> HeartbeatManager<I, O> createHeartbeatManager(...) {
		return new HeartbeatManagerImpl<>(...);
	}
	// Creates a heartbeat manager which actively sends heartbeats to monitoring targets.
	public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(...) {
		return new HeartbeatManagerSenderImpl<>(...);
	}
}

0x04 初始化

4.1 心跳服務創建

心跳管理服務在Cluster入口創建。因為我們是調試,所以在MiniCluster.start調用。

public void start() throws Exception {
				......
				heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
				......
}

HeartbeatServices.fromConfiguration會從Configuration中獲取配置信息:

  • 心跳間隔 heartbeat.interval
  • 心跳超時時間 heartbeat.timeout

個就是我們解決最開始問題的思路:從配置信息入手,擴大心跳間隔

public HeartbeatServices(long heartbeatInterval, long heartbeatTimeout) {
		this.heartbeatInterval = heartbeatInterval;
		this.heartbeatTimeout = heartbeatTimeout;
}

public static HeartbeatServices fromConfiguration(Configuration configuration) {
		long heartbeatInterval = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL);
		long heartbeatTimeout = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT);

		return new HeartbeatServices(heartbeatInterval, heartbeatTimeout);
}

0x05 Flink中具體應用

5.1 總述

5.1.1 RM, JM, TM之間關係

系統中有幾個ResourceManager?整個 Flink 集群中只有一個 ResourceManager。

系統中有幾個JobManager?JobManager 負責管理作業的執行。默認情況下,每個 Flink 集群只有一個 JobManager 實例。JobManager 相當於整個集群的 Master 節點,負責整個集群的任務管理和資源管理。

系統中有幾個TaskManager?這個由具體啟動方式決定。比如Flink on Yarn,Session模式能夠指定拉起多少個TaskManager。 Per job模式中TaskManager數量是在提交作業時根據併發度動態計算,即Number of TM = Parallelism/numberOfTaskSlots。比如:有一個作業,Parallelism為10,numberOfTaskSlots為1,則TaskManager為10。

5.1.2 三者間心跳機制

Flink中ResourceManager、JobMaster、TaskExecutor三者之間存在相互檢測的心跳機制:

  • ResourceManager會主動發送請求探測JobMaster、TaskExecutor是否存活。
  • JobMaster也會主動發送請求探測TaskExecutor是否存活,以便進行任務重啟或者失敗處理。

我們之前講過,HeartbeatManagerSenderImpl屬於Sender,HeartbeatManagerImpl屬於Receiver。

  1. HeartbeatManagerImpl所處位置可以理解為client,存在於JobMaster與TaskExecutor中;
  2. HeartbeatManagerSenderImpl類,繼承 HeartbeatManagerImpl類,用於周期發送心跳請求,所處位置可以理解為server, 存在於JobMaster、ResourceManager中。

ResourceManager 級別最高,所以兩個HM都是Sender,監控taskManager和jobManager

public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
		extends FencedRpcEndpoint<ResourceManagerId>
		implements ResourceManagerGateway, LeaderContender {
	taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender
	jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender
}

JobMaster級別中等,一個Sender, 一個Receiver,受到ResourceManager的監控,監控taskManager。

public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService {
	taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender
	resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager
}

TaskExecutor級別最低,兩個Receiver,分別被JM和RM疾控。

public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
	this.jobManagerHeartbeatManager = return heartbeatServices.createHeartbeatManager
	this.resourceManagerHeartbeatManager = return heartbeatServices.createHeartbeatManager
}

以JobManager和TaskManager為例。JM在啟動時會開啟周期調度,向已經註冊到JM中的TM發起心跳檢查,通過RPC調用TM的requestHeartbeat方法,重置對JM超時線程的調用,表示當前JM狀態正常。在TM的requestHeartbeat方法被調用后,通過RPC調用JM的receiveHeartbeat,重置對TM超時線程的調用,表示TM狀態正常。

5.2 初始化過程

5.2.1 TaskExecutor初始化

TM初始化生成了兩個Receiver HM。

public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
	/** The heartbeat manager for job manager in the task manager. */
	private final HeartbeatManager<AllocatedSlotReport, AccumulatorReport> jobManagerHeartbeatManager;

	/** The heartbeat manager for resource manager in the task manager. */
	private final HeartbeatManager<Void, TaskExecutorHeartbeatPayload> resourceManagerHeartbeatManager;
  
  //初始化函數
	this.jobManagerHeartbeatManager = createJobManagerHeartbeatManager(heartbeatServices, resourceId);
	this.resourceManagerHeartbeatManager = createResourceManagerHeartbeatManager(heartbeatServices, resourceId);
}

生成HeartbeatManager時,就註冊了ResourceManagerHeartbeatListener和JobManagerHeartbeatListener。

此時,兩個HeartbeatManagerImpl中已經創建好對應monitor線程,只有在JM或者RM執行requestHeartbeat后,才會觸發該線程的執行。

5.2.2 JobMaster的初始化

JM生成了一個Sender HM,一個Receiver HM。這裡會註冊 TaskManagerHeartbeatListener 和 ResourceManagerHeartbeatListener

public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService {
  private HeartbeatManager<AccumulatorReport, AllocatedSlotReport> taskManagerHeartbeatManager;
  private HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;

  private void startHeartbeatServices() {
      taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
        resourceId,
        new TaskManagerHeartbeatListener(),
        getMainThreadExecutor(),
        log);

      resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager(
        resourceId,
        new ResourceManagerHeartbeatListener(),
        getMainThreadExecutor(),
        log);
  }
}

5.2.3 ResourceManager初始化

JobMaster在啟動時候,會在startHeartbeatServices函數中生成兩個Sender HeartbeatManager。

taskManagerHeartbeatManager :HeartbeatManagerSenderImpl對象,會反覆啟動一個定時器,定時掃描需要探測的對象並且發送心跳請求。

jobManagerHeartbeatManager :HeartbeatManagerSenderImpl,會反覆啟動一個定時器,定時掃描需要探測的對象並且發送心跳請求。

taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
			resourceId,
			new TaskManagerHeartbeatListener(),
			getMainThreadExecutor(),
			log);

jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
			resourceId,
			new JobManagerHeartbeatListener(),
			getMainThreadExecutor(),
			log);  

5.3 註冊過程

我們以TM與RM交互為例。TaskExecutor啟動之後,需要註冊到RM和JM中。

流程圖如下:

 * 1. Run in Task Manager
 *
 *    TaskExecutor.onStart //Life cycle
 *        |
 *        +----> startTaskExecutorServices@TaskExecutor
 *        |     //開始TM服務
 *        |     
 *        +----> resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
 *        |     // 開始連接到RM
 *        |     // start by connecting to the ResourceManager
 *        |    
 *        +----> notifyLeaderAddress@ResourceManagerLeaderListener  
 *        |     // 當RM狀態變化之後,將回調到這裏    
 *        |     // The listener for leader changes of the resource manager.
 *        |        
 *        +----> reconnectToResourceManager@TaskExecutor   
 *        |     // 以下三步調用是漸進的,就是與RM聯繫。
 *        |    
 *        +----> tryConnectToResourceManager@TaskExecutor 
 *        |     
 *        +----> connectToResourceManager()@TaskExecutor
 *        |     // 主要作用是生成了 TaskExecutorToResourceManagerConnection    
 *        |  
 *        +----> start@TaskExecutorToResourceManagerConnection
 *        |     // 開始RPC調用,將會調用到其基類RegisteredRpcConnection的start   
 *        |   
 *        +----> start@RegisteredRpcConnection
 *        |     // RegisteredRpcConnection實現了組件之間註冊聯繫的基本RPC
 *        |      
   
   
 * ~~~~~~~~ 這裡是 Akka RPC
   
 * 2. Run in Resource Manager   
 * 現在程序執行序列到達了RM, 主要是添加一個Target到RM 的 Sender HM;
 *
 *    registerTaskExecutor@ResourceManager
 *        |
 *        +----> taskExecutorGatewayFuture.handleAsync
 *        |     // 異步調用到這裏
 *        |     
 *        +----> registerTaskExecutorInternal@ResourceManager
 *        |     // RM的內部實現,將把TM註冊到RM自己這裏
 *        |      
 *        +----> taskManagerHeartbeatManager.monitorTarget
 *        |     // 生成HeartbeatMonitor,
 *        |      
 *        +---->  heartbeatTargets.put(resourceID,heartbeatMonitor);
 *        |     // 把Monitor放到 HM in TM之中,就是說TM開始監控了RM
 *        |        

 * ~~~~~~~~ 這裡是 Akka RPC
  
 * 3. Run in Task Manager
 * 現在程序回到了TM, 主要是添加一個Target到 TM 的 Receiver HM;
 *
 *    onRegistrationSuccess@TaskExecutorToResourceManagerConnection
 *        |   
 *        |   
 *        +---->  onRegistrationSuccess@ResourceManagerRegistrationListener 
 *        |       // 回調函數
 *        |  
 *        +---->  runAsync(establishResourceManagerConnection) 
 *        |       // 異步執行
 *        |  
 *        +---->  establishResourceManagerConnection@TaskExecutor 
 *        |       // 說明已經和RM建立了聯繫,所以可以開始監控RM了
 *        |  
 *        +---->  resourceManagerHeartbeatManager.monitorTarget 
 *        |     // 生成HeartbeatMonitor,
 *        |      
 *        +---->  heartbeatTargets.put(resourceID,heartbeatMonitor);  
 *        |       // 把 RM 也註冊到 TM了
 *        |       // monitor the resource manager as heartbeat target 

下面是具體文字描述。

5.3.1 TM註冊到RM中

5.3.1.1 TM的操作
  • TaskExecutor啟動之後,調用onStart,開始其生命周期。
  • onStart直接調用startTaskExecutorServices。
  • 啟動服務的第一步就是與ResourceManager取得聯繫,這裏註冊了一個ResourceManagerLeaderListener(),用來監聽RM Leader的變化。
private final LeaderRetrievalService resourceManagerLeaderRetriever;
// resourceManagerLeaderRetriever其實是EmbeddedLeaderService的實現,A simple leader election service, which selects a leader among contenders and notifies listeners.

resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
  • 當得到RM Leader的地址之後,會調用到回調函數notifyLeaderAddress@ResourceManagerLeaderListener,然後調用notifyOfNewResourceManagerLeader。
  • notifyOfNewResourceManagerLeader中獲取到RM地址后,就通過reconnectToResourceManager與RM聯繫。
  • reconnectToResourceManager中間接調用到TaskExecutorToResourceManagerConnection。其作用是建立TaskExecutor 和 ResourceManager之間的聯繫。因為知道 ResourceManagerGateway所以才能進行RPC操作。
  • 然後在 TaskExecutorToResourceManagerConnection中,就通過RPC與RM聯繫。
5.3.1.2 RM的操作
  • RPC調用后,程序就來到了RM中,RM做如下操作:
  • 會註冊一個新的TaskExecutor到自己的taskManagerHeartbeatManager中。
  • registerTaskExecutor@ResourceManager會通過異步調用到registerTaskExecutorInternal。
  • registerTaskExecutorInternal中首先看看是否這個TaskExecutor的ResourceID之前註冊過,如果註冊過就移除再添加一個新的TaskExecutor。
  • 通過 taskManagerHeartbeatManager.monitorTarget 開始進行心跳機制的註冊。
taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() {
				public void receiveHeartbeat(ResourceID resourceID, Void payload) {
					// the ResourceManager will always send heartbeat requests to the
					// TaskManager
				}
				public void requestHeartbeat(ResourceID resourceID, Void payload) {
					taskExecutorGateway.heartbeatFromResourceManager(resourceID);
				}
});

當註冊完成后,RM中的Sender HM內部結構如下,能看出來多了一個Target:

taskManagerHeartbeatManager = {HeartbeatManagerSenderImpl@8866} 
 heartbeatPeriod = 10000
 heartbeatTimeoutIntervalMs = 50000
 ownResourceID = {ResourceID@8871} "040709f36ebf38f309fed518a88946af"
 heartbeatListener = {ResourceManager$TaskManagerHeartbeatListener@8872} 
 mainThreadExecutor = {RpcEndpoint$MainThreadExecutor@8873} 
 heartbeatTargets = {ConcurrentHashMap@8875}  size = 1
  {ResourceID@8867} "630c15c9-4861-4b41-9c95-92504f458b71" -> {HeartbeatMonitorImpl@9448} 
   key = {ResourceID@8867} "630c15c9-4861-4b41-9c95-92504f458b71"
   value = {HeartbeatMonitorImpl@9448} 
    resourceID = {ResourceID@8867} "630c15c9-4861-4b41-9c95-92504f458b71"
    heartbeatTarget = {ResourceManager$2@8868} 
    scheduledExecutor = {RpcEndpoint$MainThreadExecutor@8873} 
    heartbeatListener = {ResourceManager$TaskManagerHeartbeatListener@8872} 
    heartbeatTimeoutIntervalMs = 50000
    futureTimeout = {ScheduledFutureAdapter@10140} 
    state = {AtomicReference@9786} "RUNNING"
    lastHeartbeat = 0
5.3.1.3 返回到TM

RM會通過RPC再次回到TaskExecutor,其新執行序列如下:

  • 首先RPC調用到了 onRegistrationSuccess@TaskExecutorToResourceManagerConnection。
  • 然後onRegistrationSuccess@ResourceManagerRegistrationListener中通過異步執行調用到了establishResourceManagerConnection。這說明TM已經和RM建立了聯繫,所以可以開始監控RM了。
  • 然後和RM操作類似,通過resourceManagerHeartbeatManager.monitorTarget 來把RM註冊到自己這裏。
HeartbeatMonitor<O> heartbeatMonitor = heartbeatMonitorFactory.createHeartbeatMonitor 
heartbeatTargets.put(resourceID, heartbeatMonitor);  

當註冊完成后,其Receiver HM結構如下:

resourceManagerHeartbeatManager = {HeartbeatManagerImpl@10163} 
 heartbeatTimeoutIntervalMs = 50000
 ownResourceID = {ResourceID@8882} "96a9b80c-dd97-4b63-9049-afb6662ea3e2"
 heartbeatListener = {TaskExecutor$ResourceManagerHeartbeatListener@10425} 
 mainThreadExecutor = {RpcEndpoint$MainThreadExecutor@10426} 
 heartbeatTargets = {ConcurrentHashMap@10427}  size = 1
  {ResourceID@8886} "122fa66685133b11ea26ee1b1a6cef75" -> {HeartbeatMonitorImpl@10666} 
   key = {ResourceID@8886} "122fa66685133b11ea26ee1b1a6cef75"
   value = {HeartbeatMonitorImpl@10666} 
    resourceID = {ResourceID@8886} "122fa66685133b11ea26ee1b1a6cef75"
    heartbeatTarget = {TaskExecutor$1@10668} 
    scheduledExecutor = {RpcEndpoint$MainThreadExecutor@10426} 
    heartbeatListener = {TaskExecutor$ResourceManagerHeartbeatListener@10425} 
    heartbeatTimeoutIntervalMs = 50000
    futureTimeout = {ScheduledFutureAdapter@10992} 
    state = {AtomicReference@10667} "RUNNING"
    lastHeartbeat = 0

5.3.2 TM註冊到 JM

其調用基本思路與之前相同,就是TM和JM之間互相註冊一個代表對方的monitor:

JobLeaderListenerImpl ----> establishJobManagerConnection

消息到了JM中,做如下操作。

registerTaskManager ----> taskManagerHeartbeatManager.monitorTarget
  // monitor the task manager as heartbeat target

5.4 心跳過程

在任務提交之後,我們就進入了正常的心跳監控流程。我們依然用 TM 和 RM進行演示。

我們先給出一個流程圖。

 * 1. Run in Resouce Manager
 *
 *    HeartbeatManagerSender in RM
 *        |
 *        +----> run@HeartbeatManagerSenderImpl
 *        |     //遍歷所有監控的Monitor(Target),逐一在Target上調用requestHeartbeat
 *        |     
 *        +----> requestHeartbeat@HeartbeatManagerSenderImpl 
 *        |     // 將調用具體監控對象的自定義函數
 *        |     // heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload);
 *        |      
 *        +----> getHeartbeatListener().retrievePayload   
 *        |     // 調用到TaskManagerHeartbeatListener@ResourceManager    
 *        |     // 這裡是return null;,因為RM不會是任何人的Receiver
 *        |        
 *        +----> requestHeartbeat@HeartbeatTarget   
 *        |     // 調用到Target這裏,代碼在ResourceManager這裏,就是生成Target時候賦值的
 *        |    
 *        +----> taskExecutorGateway.heartbeatFromResourceManager
 *        |     // 會通過gateway RPC 調用到TM,這就是主動對TM發起了心跳請求
 *        |      

 * ~~~~~~~~ 這裡是 Akka RPC
   
 * 2. Run in Task Manager   
 * 現在程序執行序列到達了TM, 主要是 1. 重置TM的Monitor線程; 2.返回一些負載信息;
 *
 *    heartbeatFromResourceManager@TaskExecutor
 *        |
 *        +----> resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
 *        |     //開始要調用到 Receiver HM in Task Manager
 *        |     
 *        +----> requestHeartbeat@HeartbeatManager in TM 
 *        |     // 在Receiver HM in Task Manager 這裏運行
 *        |      
 *        +----> reportHeartbeat@HeartbeatMonitor   
 *        |     //reportHeartbeat : 記錄發起請求的這個時間點,然後resetHeartbeatTimeout
 *        |      
 *        +----> resetHeartbeatTimeout@HeartbeatMonitor
 *        |     // 如果Monitor狀態依然是RUNNING,則取消之前設置的ScheduledFuture。
 *        |     // 重新創建一個ScheduleFuture。因為如果不取消,則之前那個ScheduleFuture運行時
 *        |     // 會調用HeartbeatMonitorImpl.run函數,run直接compareAndSet后,通知目標函數
 *        |     // 目前已經超時,即調用heartbeatListener.notifyHeartbeatTimeout。
 *        |     // 這裏代表 JM 狀態正常。   
 *        |      
 *        +---->  heartbeatListener.reportPayload
 *        |     // 把Target節點的最新的heartbeatPayload通知給heartbeatListener。 
 *        |     // heartbeatListerner是外部傳入的,它根據所擁有的節點的心跳記錄做監聽管理。
 *        |        
 *        +---->  heartbeatTarget.receiveHeartbeat(getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin));
 *        |      
 *        |     
 *        +---->  retrievePayload@ResourceManagerHeartbeatListener in TM   
 *        |       // heartbeatTarget.receiveHeartbeat參數調用的
 *        |     
 *        +---->  return new TaskExecutorHeartbeatPayload
 *        |   
 *        |     
 *        +---->  receiveHeartbeat in TM 
 *        |       // 回到 heartbeatTarget.receiveHeartbeat,這就是TM生成Target的時候的自定義函數
 *        |       // 就是響應一個心跳消息回給RM
 *        |     
 *        +----> resourceManagerGateway.heartbeatFromTaskManager  
 *        |     // 會通過gateway RPC 調用到 ResourcManager
 *        |      

 * ~~~~~~~~ 這裡是 Akka RPC
  
 * 3. Run in Resouce Manager
 * 現在程序回到了RM, 主要是 1.重置RM的Monitor線程;2. 上報收到TaskExecutor的負載信息  
 *
 *    heartbeatFromTaskManager in RM
 *        |   
 *        |   
 *        +---->  taskManagerHeartbeatManager.receiveHeartbeat 
 *        |       // 這是個Sender HM
 *        |  
 *        +---->  HeartbeatManagerImpl.receiveHeartbeat  
 *        |  
 *        |     
 *        +---->  HeartbeatManagerImpl.reportHeartbeat(heartbeatOrigin);
 *        | 
 *        |    
 *        +---->  heartbeatMonitor.reportHeartbeat(); 
 *        |      // 這裏就是重置RM 這裏對應的Monitor。在reportHeartbeat重置 JM monitor線程的觸發,即cancelTimeout取消註冊時候的超時定時任務,並且註冊下一個超時檢測futureTimeout;這代表TM正常執行。
 *        |     
 *        +----> heartbeatListener.reportPayload    
 *        |      //把Target節點的最新的heartbeatPayload通知給 TaskManagerHeartbeatListener。heartbeatListerner是外部傳入的,它根據所擁有的節點的心跳記錄做監聽管理。 
 *        |      
 *        +----> slotManager.reportSlotStatus(instanceId, payload.getSlotReport());
 *        |     // TaskManagerHeartbeatListener中調用,上報收到TaskExecutor的負載信息
 *        |        

下面是具體文字描述。

5.4.1 ResourceManager主動發起

5.4.1.1 Sender遍歷所有監控的Monitor(Target)

心跳機制是由Sender主動發起的。這裏就是 ResourceManager 的HeartbeatManagerSenderImpl中定時schedual調用,這裡會遍歷所有監控的Monitor(Target),逐一在Target上調用requestHeartbeat。

// HeartbeatManagerSenderImpl中的代碼

	@Override
	public void run() {
		if (!stopped) {
			for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {
        // 這裏向被監控對象節點發起一次心跳請求,載荷是heartbeatPayLoad,要求被監控對象回應心跳
				requestHeartbeat(heartbeatMonitor);
			}
			getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
		}
	}
}

// 運行時候的變量
this = {HeartbeatManagerSenderImpl@9037} 
 heartbeatPeriod = 10000
 heartbeatTimeoutIntervalMs = 50000
 ownResourceID = {ResourceID@8788} "d349506cae32cadbe99b9f9c49a01c95"
 heartbeatListener = {ResourceManager$TaskManagerHeartbeatListener@8789} 
 mainThreadExecutor = {RpcEndpoint$MainThreadExecutor@8790} 

// 調用棧如下
requestHeartbeat:711, ResourceManager$2 (org.apache.flink.runtime.resourcemanager)
requestHeartbeat:702, ResourceManager$2 (org.apache.flink.runtime.resourcemanager)
requestHeartbeat:92, HeartbeatManagerSenderImpl (org.apache.flink.runtime.heartbeat)
run:81, HeartbeatManagerSenderImpl (org.apache.flink.runtime.heartbeat)
call:511, Executors$RunnableAdapter (java.util.concurrent)
run$$$capture:266, FutureTask (java.util.concurrent)
run:-1, FutureTask (java.util.concurrent)
5.4.1.2 Target進行具體操作

具體監控對象 Target 會調用自定義的requestHeartbeat。

HeartbeatManagerSenderImpl

	private void requestHeartbeat(HeartbeatMonitor<O> heartbeatMonitor) {
		O payload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId());
		final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();

    // 這裏就是具體監控對象
		heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload);
	}

heartbeatTarget = {ResourceManager$2@10688} 
 taskExecutorGateway = {$Proxy42@9459} "org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler@6d0c8334"
 this$0 = {StandaloneResourceManager@9458} 

請注意,每一個Target都是由ResourceManager生成的。ResourceManager之前註冊成為Monitor時候就註冊了這個HeartbeatTarget。

這個HeartbeatTarget的定義如下,兩個函數是:

  • receiveHeartbeat :這個是空,因為RM沒有自己的Sender。

  • requestHeartbeat :這個針對TM,就是調用TM的heartbeatFromResourceManager,當然是通過RPC調用。

5.4.1.3 RPC調用

會調用到ResourceManager定義的函數requestHeartbeat,而requestHeartbeat會通過gateway調用到TM,這就是主動對TM發起了心跳請求。

taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() {
   @Override
   public void receiveHeartbeat(ResourceID resourceID, Void payload) {
      // the ResourceManager will always send heartbeat requests to the TaskManager
   }

   @Override
   public void requestHeartbeat(ResourceID resourceID, Void payload) {
      //就是調用到這裏
      taskExecutorGateway.heartbeatFromResourceManager(resourceID); 
   }
});

5.4.2 RM通過RPC調用TM

通過taskExecutorGateway。心跳程序執行就通過RPC從RM跳躍到了TM。

taskExecutorGateway.heartbeatFromResourceManager 的意義就是:通過RPC調用回到TaskExecutor。這個是在TaskExecutorGateway就定義好的。

// TaskExecutor RPC gateway interface.
public interface TaskExecutorGateway extends RpcGateway

TaskExecutor實現了TaskExecutorGateway,所以具體在TaskExecutor內部實現了接口函數。

@Override
public void heartbeatFromResourceManager(ResourceID resourceID) {
    //調用到了這裏 ...........
		resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
}

TM中,resourceManagerHeartbeatManager 定義如下。

/** The heartbeat manager for resource manager in the task manager. */
private final HeartbeatManager<Void, TaskExecutorHeartbeatPayload> resourceManagerHeartbeatManager;

所以下面就是執行TM中的Receiver HM。在這個過程中有兩個處理步驟:

  1. 調用對應HeartbeatMonitor的reportHeartbeat方法,cancelTimeout取消註冊時候的超時定時任務,並且註冊下一個超時檢測futureTimeout;
  2. 調用monitorTarget的receiveHeartbeat方法,也就是會通過rpc調用JobMaster的heartbeatFromTaskManager方法返回一些負載信息;

具體是調用 requestHeartbeat@HeartbeatManager。在其中會

  • 調用reportHeartbeat@HeartbeatMonitor,記錄發起請求的這個時間點,然後resetHeartbeatTimeout。
  • 在resetHeartbeatTimeout@HeartbeatMonitor之中,如果Monitor狀態依然是RUNNING,則取消之前設置的ScheduledFuture。重新創建一個ScheduleFuture。因為如果不取消,則之前那個ScheduleFuture運行時會調用HeartbeatMonitorImpl.run函數,run直接compareAndSet后,通知目標函數目前已經超時,即調用heartbeatListener.notifyHeartbeatTimeout。
  • 調用 heartbeatListener.reportPayload,把Target節點的最新的heartbeatPayload通知給heartbeatListener。
  • 調用 heartbeatTarget.receiveHeartbeat(getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin)); 就是響應一個心跳消息回給RM。
	@Override
	public void requestHeartbeat(final ResourceID requestOrigin, I heartbeatPayload) {
		if (!stopped) {
			log.debug("Received heartbeat request from {}.", requestOrigin);

			final HeartbeatTarget<O> heartbeatTarget = reportHeartbeat(requestOrigin);

			if (heartbeatTarget != null) {
				if (heartbeatPayload != null) {
					heartbeatListener.reportPayload(requestOrigin, heartbeatPayload);
				}

				heartbeatTarget.receiveHeartbeat(getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin));
			}
		}
	}

最後會通過resourceManagerGateway.heartbeatFromTaskManager 調用到 ResourcManager。

5.4.3 TM 通過RPC回到 RM

JobMaster在接收到rpc請求后調用其heartbeatFromTaskManager方法,會調用taskManagerHeartbeatManager的receiveHeartbeat方法,在這個過程中同樣有兩個處理步驟:

  1. 調用對應HeartbeatMonitor的reportHeartbeat方法,cancelTimeout取消註冊時候的超時定時任務,並且註冊下一個超時檢測futureTimeout;
  2. 調用TaskManagerHeartbeatListener的reportPayload方法,上報收到TaskExecutor的負載信息

至此一次完成心跳過程已經完成,會根據heartbeatInterval執行下一次心跳。

5.5 超時處理

5.5.1 TaskManager

首先,在HeartbeatMonitorImpl中,如果超時,會調用Listener。

public void run() {
   // The heartbeat has timed out if we're in state running
   if (state.compareAndSet(State.RUNNING, State.TIMEOUT)) {
      heartbeatListener.notifyHeartbeatTimeout(resourceID);
   }
}

這就來到了ResourceManagerHeartbeatListener,會嘗試再次連接RM。

private class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, TaskExecutorHeartbeatPayload> {

   @Override
   public void notifyHeartbeatTimeout(final ResourceID resourceId) {
      validateRunsInMainThread();
      // first check whether the timeout is still valid
      if (establishedResourceManagerConnection != null && establishedResourceManagerConnection.getResourceManagerResourceId().equals(resourceId)) {
         reconnectToResourceManager(new TaskManagerException(
            String.format("The heartbeat of ResourceManager with id %s timed out.", resourceId)));
      } else {
         .....
      }
   }

5.5.2 ResourceManager

RM就直接簡單粗暴,關閉連接。

private class TaskManagerHeartbeatListener implements HeartbeatListener<TaskExecutorHeartbeatPayload, Void> {

   @Override
   public void notifyHeartbeatTimeout(final ResourceID resourceID) {
      validateRunsInMainThread();
      closeTaskManagerConnection(
         resourceID,
         new TimeoutException("The heartbeat of TaskManager with id " + resourceID + "  timed out."));
   }
}  

0x06 解決問題

心跳機制我們講解完了,但是我們最初提到的異常應該如何解決呢?在程序最開始生成環境變量時候,通過設置環境變量的配置即可搞定:

Configuration conf = new Configuration();
conf.setString("heartbeat.timeout", "18000000");
final LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);

0x07 參考

[flink-001]flink的心跳機制

Flink中心跳機制

flink1.8 心跳服務

你有必要了解一下Flink底層RPC使用的框架和原理

flink RPC(akka)

弄清Flink1.8的遠程過程調用(RPC)

Apache Flink源碼解析 (七)Flink RPC的底層實現

flink源碼閱讀第一篇—入口

flink-on-yarn 基礎架構和啟動流程

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※教你寫出一流的銷售文案?

【asp.net core 系列】13 Identity 身份驗證入門

2{icon} {views}

0. 前言

通過前兩篇我們實現了如何在Service層如何訪問數據,以及如何運用簡單的加密算法對數據加密。這一篇我們將探索如何實現asp.net core的身份驗證。

1. 身份驗證

asp.net core的身份驗證有 JwtBearer和Cookie兩種常見的模式,在這一篇我們將啟用Cookie作為身份信息的保存。那麼,我們如何啟用呢?

在Startup.cs 的ConfigureServices(IServiceCollection services) 方法里添加如下:

services.AddAuthentication(CookieAuthenticationDefaults.AuthenticationScheme)
                .AddCookie(CookieAuthenticationDefaults.AuthenticationScheme, options =>
                {
                    Configuration.Bind("CookieSettings",options);
                });

此時可以啟動一個權限驗證,當用戶訪問需要驗證的頁面或接口時,如果沒有登錄,則會自動跳轉到:

https://localhost:5001/Account/Login?ReturnUrl=XXXX

其中ReturnUrl指向來源頁。

1.1 設置驗證

當我們在Startup類里設置啟用了身份驗證后,並不是訪問所有接口都會被跳轉到登錄頁面。那麼如何設置訪問的路徑需要身份驗證呢?asp.net core為我們提供了一個特性類:

[AttributeUsage(AttributeTargets.Class | AttributeTargets.Method, AllowMultiple = true, Inherited = true)]
public class AuthorizeAttribute : Attribute, IAuthorizeData
{
    public string Policy { get; set; }
    public string Roles { get; set; }
    public string AuthenticationSchemes { get; set; }
}

可以看的出,這個特性類允許設置在類、方法上,可以設置多個,允許子類繼承父類的特性。所以可以在控制器上設置[Authorize],當在控制器上設置以後訪問控制器里所有的Action都會要求驗證身份;也可以單獨設置在Action上,表示該Action需要驗證身份,控制器里的其他方法不需要驗證。

1.2 設置忽略

我們在開發過程中,會遇到這樣的一組鏈接或者頁面:請求地址同屬於一個控制器下,但其中某個地址可以不用用戶登錄就可以訪問。通常我們為了減少重複代碼以及復用性等方面的考慮,會直接在控制器上設置身份驗證要求,而不是在控制器里所有的Action上添加驗證要求。

那麼,我們如何放開其中的某個請求,可以允許它不用身份驗證。

[AttributeUsage(AttributeTargets.Class | AttributeTargets.Method, AllowMultiple = false, Inherited = true)]
public class AllowAnonymousAttribute : Attribute, IAllowAnonymous
{
}

仔細觀察,可以看得出這個特性可以設置在類、方法上,不允許多次設置,允許子類繼承父類的特性。

這個特性的使用沒啥可說的,不過需要注意的是,不要與AuthorizeAttribute一起使用。雖然編譯上沒啥問題,但實際上會對程序員的邏輯照成一定程度的誤導。

2.保存身份

有身份驗證,就必然需要保存身份。當我們從數據庫中或者其他的三方服務中獲取到用戶信息后,我們需要將用戶信息保存起來,而不是每次都向用戶或者服務提供方索求信息。

在asp.net core中,Controller類里有一個屬性:

public HttpContext HttpContext { get; }

HttpContext 提供了一個擴展方法,可以用來保存用戶信息:

public static Task SignInAsync(this HttpContext context, ClaimsPrincipal principal);

暫時忽略這個方法的返回類型,它接受了一個ClaimsPrincipal類型的參數。我們來看下這個類的基本情況吧:

public class ClaimsPrincipal : IPrincipal
{

    public ClaimsPrincipal();
    public ClaimsPrincipal(IEnumerable<ClaimsIdentity> identities);
    public ClaimsPrincipal(BinaryReader reader);
    public ClaimsPrincipal(IIdentity identity);
    public ClaimsPrincipal(IPrincipal principal);
    
    public static ClaimsPrincipal Current { get; }
    public static Func<ClaimsPrincipal> ClaimsPrincipalSelector { get; set; }
    public static Func<IEnumerable<ClaimsIdentity>, ClaimsIdentity> PrimaryIdentitySelector { get; set; }
    public virtual IIdentity Identity { get; }
    public virtual IEnumerable<ClaimsIdentity> Identities { get; }
    public virtual IEnumerable<Claim> Claims { get; }
    public virtual void AddIdentities(IEnumerable<ClaimsIdentity> identities);
    public virtual void AddIdentity(ClaimsIdentity identity);
    public virtual ClaimsPrincipal Clone();
    public virtual IEnumerable<Claim> FindAll(Predicate<Claim> match);
    public virtual IEnumerable<Claim> FindAll(string type);
    public virtual Claim FindFirst(string type);
    public virtual Claim FindFirst(Predicate<Claim> match);
    public virtual bool HasClaim(Predicate<Claim> match);
    public virtual bool HasClaim(string type, string value);
    public virtual bool IsInRole(string role);
    public virtual void WriteTo(BinaryWriter writer);
}

方法和屬性有點多,那麼我們重點關注一下構造函數以及可以AddXXX開頭的方法。

這裡有一個竅門,對於一個陌生的類來說,構造函數對於類本身是個很重要的特徵,我們可以通過構造函數分析出這個類需要哪些基礎數據。

所以,通過簡單的分析,我們需要繼續了解這兩個類:

public class ClaimsIdentity : IIdentity
{
    public ClaimsIdentity();
    public ClaimsIdentity(string authenticationType);
    public ClaimsIdentity(IIdentity identity);
    public ClaimsIdentity(IEnumerable<Claim> claims);
    public ClaimsIdentity(IEnumerable<Claim> claims, string authenticationType);
    public ClaimsIdentity(IIdentity identity, IEnumerable<Claim> claims);
    public ClaimsIdentity(string authenticationType, string nameType, string roleType);
    public ClaimsIdentity(IEnumerable<Claim> claims, string authenticationType, string nameType, string roleType);
    public ClaimsIdentity(IIdentity identity, IEnumerable<Claim> claims, string authenticationType, string nameType, string roleType);
    
}

public class Claim
{
    public Claim(BinaryReader reader);
    public Claim(BinaryReader reader, ClaimsIdentity subject);

    public Claim(string type, string value);
    public Claim(string type, string value, string valueType);
    public Claim(string type, string value, string valueType, string issuer);

    public Claim(string type, string value, string valueType, string issuer, string originalIssuer);
    public Claim(string type, string value, string valueType, string issuer, string originalIssuer, ClaimsIdentity subject);
    protected Claim(Claim other);
    protected Claim(Claim other, ClaimsIdentity subject);
    public string Type { get; }
    public ClaimsIdentity Subject { get; }
    public IDictionary<string, string> Properties { get; }
    public string OriginalIssuer { get; }
    public string Issuer { get; }
    public string ValueType { get; }
    public string Value { get; }
    protected virtual byte[] CustomSerializationData { get; }
    public virtual Claim Clone();
    public virtual Claim Clone(ClaimsIdentity identity);
    public override string ToString();
    public virtual void WriteTo(BinaryWriter writer);
    protected virtual void WriteTo(BinaryWriter writer, byte[] userData);
}

所以,看到這裏就會發現,我們可以通過以下方式保存信息:

List<Claim> claims = null;
var identity = new ClaimsIdentity(claims, CookieAuthenticationDefaults.AuthenticationScheme);

HttpContext.SignInAsync( CookieAuthenticationDefaults.AuthenticationScheme,new ClaimsPrincipal(identity));

這時候,數據就可以保存在Cookie里了,那麼如何在控制器中獲取到數據呢:

public ClaimsPrincipal User { get; }

在控制器中,提供了這樣一個屬性,當然如果想要正確獲取到值的話,需要在 Startup.cs類中的添加如下配置:

public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
    // ……省略其他配置
    app.UseAuthorization();
    app.UseAuthentication();
    // ……省略其他配置
}

3. 總結

在這一篇中,簡單介紹了asp.net core的identity,下一篇將從實際上帶領大家設置不一樣的identity以及Authorize驗證。

更多內容煩請關注我的博客《高先生小屋》

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※台北網頁設計公司全省服務真心推薦

※想知道最厲害的網頁設計公司"嚨底家"!

新北清潔公司,居家、辦公、裝潢細清專業服務

※推薦評價好的iphone維修中心

Mysql和Redis數據同步策略

3{icon} {views}

目錄

  • 為什麼對緩存只刪除不更新
  • 先更新數據庫還是先刪除緩存?
  • Cache Aside Pattern
  • Double-Delete
  • Read/Write Through Pattern
  • Write Behind
  • 設置緩存過期時間
  • 總結

為什麼對緩存只刪除不更新

不更新緩存是防止併發更新導致的數據不一致。
所以為了降低數據不一致的概率,不應該更新緩存,而是直接將其刪除,
然後等待下次發生cache miss時再把數據庫中的數據同步到緩存。

先更新數據庫還是先刪除緩存?

有兩個選擇:
1. 先刪除緩存,再更新數據庫
2. 先更新數據庫,再刪除緩存

如果先刪除緩存,有一個明顯的邏輯錯誤:考慮兩個併發操作,線程A刪除緩存后,線程B讀該數據時會發生Cache Miss,然後從數據庫中讀出該數據並同步到緩存中,此時線程A更新了數據庫。
結果導致,緩存中是老數據,數據庫中是新數據,並且之後的讀操作都會直接讀取緩存中的臟數據。(直到key過期被刪除或者被LRU策略踢出)
如果數據庫更新成功后,再刪除緩存,就不會有上面這個問題。
可能是由於數據庫優先,第二種方式也被稱為Cache Aside Pattern。

Cache Aside Pattern

cache aside在絕大多數情況下能做到數據一致性,但是在極端情況仍然存在問題。

  • 首先更新數據庫(A)和刪除緩存(B)不是原子操作,任何在A之後B之前的讀操作,都會讀到redis中的舊數據。
    正常情況下操作緩存的速度會很快,通常是毫秒級,臟數據存在的時間極端。
    但是,對超高併發的應用可能會在意這幾毫秒。
  • 更新完數據庫后,線程意外被kill掉(真的很不幸),由於沒有刪除緩存,緩存中的臟數據會一直存在。
  • 線程A讀數據時cache miss,從Mysql中查詢到數據,還沒來得及同步到redis中,
    此時線程B更新了數據庫並把Redis中的舊值刪除。隨後,線程A把之前查到的數據同步到了Redis。
    顯然,此時redis中的是臟數據。
    通常數據庫讀操作比寫操作快很多,所以除非線程A在同步redis前意外卡住了,否則發生上述情況的概率極低。

雖然以上情況都有可能發生,但是發生的概率相比“先刪除緩存再更新數據庫”會低很多。

Double-Delete

前面我們講到先刪除緩存(A)、后更新數據庫(B)的方案有明顯的錯誤,任何發生在A操作和B操作之間的併發讀都會造成數據的最終不一致。
Double-Delete是一種比較笨拙的修補方案,執行過程如下:

1.delete redis cache
2.update database
3.sleep(500ms)
3.delete redis cache

也很好理解,在睡眠500ms后嘗試再次刪除緩存中的臟數據,它通過兩次刪除來盡可能做到數據的最終一致。
其實,Double-Delete在數據一致性上比Cache Aside更靠譜,但是它的代價是昂貴的,
即使,把睡眠時間縮短到100ms,對耗時敏感的應用也不會考慮這種方案。

Read/Write Through Pattern

cache aside是我們自己的應用程序維護兩個數據存儲系統,而Read/Write Through Pattern是把同步數據的問題交給緩存系統了,應用程序不需要關心。
Read Through是指發生cache miss時,緩存系統自動去數據庫加載數據。
Write Through是指如果cache miss,直接更新數據庫,然後返回,如果cache hit,則更新緩存后,由緩存系統自動同步到數據庫。
以Redis為例,通常我們不會把數據庫的數據全部緩存到redis,而是採用一定的數據精簡或壓縮策略,以節省緩存空間。
就是說,讓緩存系統設計出通用的緩存方案不太現實,不過根據自己的業務定製一個在項目內部通用的中間件是可行的。

Write Behind

Write Behind方案在更新數據時,只更新緩存,不更新數據庫。而是由另外一個服務異步的把數據更新到數據庫。
邏輯上,和Linux中的write back很類似。這個設計的好處是,I/O操作很快,因為是純內存操作。
但是由於異步寫庫,可能要犧牲一些數據一致性,譬如突然宕機會丟失所有未寫入數據庫的內存數據。

阿里巴巴的Canal中間件是一種相反的設計,它先更新mysql,然後通過binlog把數據自動同步到redis。
這種方案會全量同步數據到redis,不適合只緩存熱點數據的應用。

設置緩存過期時間

無論採用哪種策略都應該設置緩存的過期時間。
過期時間設置太長,臟數據存在的時間就越長;
過期時間設置太短,大量的cache miss會讓查詢直接進入數據庫。
所以,需要根據業務場景考慮過期時間。

總結

以上沒有哪種方案是完美的,都無法做到強一致性。
我們總要在性能和數據準確性之間做出妥協。
Cache Aside Pattern適用於絕大多數的場景。

https://www.pixelstech.net/article/1562504974-Consistency-between-Redis-Cache-and-SQL-Database
https://coolshell.cn/articles/17416.html
為什麼不更新緩存,而是直接刪除

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※幫你省時又省力,新北清潔一流服務好口碑

※回頭車貨運收費標準

最詳細教學–win10 + frp + rdpwrap + 阿里雲服務器 –實現win10 多用戶同時遠程登錄內網機

2{icon} {views}

概述:

  使用win10 專業版 + frp + RDPwrap + 阿里雲服務器 的組合實現win10 多用戶同時遠程登錄內網機。使用frp 做內網穿透,將內網機的指定端口暴露在外網,通過ip+port 來實現遠程登錄。再使用rdpwrap 來破解win10 不能同時多用戶登錄的問題。

 

 

設想一下場景

  我是一個建築工程師。經常出差,需要經常畫3D圖和展示建築圖紙,所以買了一台性能非常強的筆記本工作站。筆記本重量大概3.9kg,充電器0.5kg,一個本子,一個書包,全部加起來接近10斤的重量。每天背着10斤重的東西跑來跑去出差,想想都累!!!

  筆記本工作站不僅重,價格也很貴,非常不方便用於出差,那簡直是折磨。。。。。。

   被折磨幾個月後,他開始向他的一個朋友訴苦,這真的是太t*d痛苦了,我能不能背着一台輕薄本筆記本出差啊,可是性能又要很好才行,怎麼辦???

然後就有了這篇文章。

 

以↑純屬扯淡……

—————————————————我是完美分隔符—————————————————

一、先實現單用戶遠程登錄內網機

 1.為什麼要實現內網穿透

    繼續對話:

    你怎麼在互聯網裡面找到你家裡面的電腦,是不是要把你的電腦與互聯網對接上。

    是啊,對接上啦,我的電腦不是連着網線嗎……

    (我語文水平有問題……。)兩個條件:一是你的電腦與互聯網對接上,二是讓互聯網知道你家在哪裡,不,你的電腦在哪裡。

    明白,好的。那什麼是內網穿透啊?

    額,(心想:md還要給你解釋內網外網……還要幫你弄,還免費的,我還有一堆事要忙啊,si建築的)。

    enenen……我幫你弄好就行了,你看百度吧,給個鏈接你:https://baike.baidu.com/item/%E5%86%85%E7%BD%91%E7%A9%BF%E9%80%8F

    (這不是本文的重點)

 2.要準備什麼呢

電腦若干台……
雲服務器(比如阿里雲服務器)
frp 反向代理工具(免費簡單高效)

 資源下載路徑>>>

   window端frp下載:https://github.com/fatedier/frp/releases/download/v0.30.0/frp_0.30.0_windows_amd64.zip

   linux端frp 下載:https://github.com/fatedier/frp/releases/download/v0.30.0/frp_0.30.0_linux_amd64.tar.gz

   資源解壓后的模樣>>>

      windows 解壓后                                                               linux  解壓后

                               

  

先上草圖過過目: 

 

 3.配置過程

      單用戶遠程登錄內網機需要配置的東東:

  1. windows下的frp客戶端:frpc
  2. linux的frp服務端:frps;
  3. 阿里雲服務器端口開放;
  4. windows的遠程登錄配置;
  5. 啟動window中的frpc 客戶端的命令:客戶端連接服務;
  6. 開啟另外一台windows電腦,準備遠程連接測試

 

   1) win 配置frpc.ini 文件(是 frpc ,是客戶端,別配錯了)

[common]
server_addr = 11.11.11.11
server_port = 7000

# trace, debug, info, warn, error
log_level = trace

#遠程桌面
[ssh]
type = tcp
local_ip = 127.0.0.1
local_port = 3389
remote_port = 6000

 

  2)Linux 配置frps.ini 文件 (服務端)

[common]
bind_port = 7000
vhost_http_port=8080

  啟動linux中的frps 服務的命令:啟動服務

當前窗口啟動,關閉窗口失效: ./frps -c frps.ini
後台啟動,關閉窗口依然有效:nohup ./frps -c frps.ini &

  啟動成功的效果分別是>>>

   

   

          第二張圖片里的 [1] 2374 是什麼???   

           輸入命令:kill 2374   就知道了……

 

  3)打開阿里雲服務器端口

    進入:安全組配置

 

   進入:安全組列表>配置規則

 

    開放端口:6000和7000

 

   阿里雲實例:重啟

   

  4)配置windows 遠程登錄用戶

     右鍵“我的電腦”進入這個界面    選擇“遠程設置”

     

 

      允許遠程連接

      

   

      點“添加”進入, >>>  “高級”  >>> “立即查找”  >>> 選擇用戶,用於遠程登錄

     

 

    PS: 對於win10 家庭版的用戶,遠程設置的界面是這樣子的

    

     對於win家庭版的用戶,將破解多用戶遠程登錄提前先做: 跳轉

 

  5)啟動window中的frpc 客戶端的命令:客戶端連接服務

打開cmd窗口:
先cd 到:frpc.exe 執行程序的目錄
再執行: frpc.exe -c frpc.ini

  啟動成功的效果圖>>> 簡單的描述一下

       

 

   6)開啟另外一台windows電腦,準備遠程連接測試

     快速打開遠程連接窗口:win + R   >>>  輸入:mstsc      再確定

     

 

     最激動人心的時刻到了

      

      遠程登錄輸入用戶密碼

      

 

     有這個界面說明成功了       點進去

      

 

 

     到目前為止單用戶遠程登錄已完成!!!

      

比如一台高性能的電腦只能同時給一個人用,那太浪費了;

又比如另一個人要用你賬號登錄時還要問你:親,你在在用XXX電腦嗎;

再比如多個人同時用一個賬號遠程登錄時:哪個親在用,不用的人不出聲回復你,正在用的人可能沒聽到,你就尷尬吧/(ㄒoㄒ)/~~

最後比如有個人正在使用,你一聲不吭登錄了,咔嚓,m蛋那個gou兒子登錄不說…………………………

繼續學習,解決問題>>> 

二、多用戶同時遠程登錄內網機

       多用戶同時遠程登錄內網機需要做的那些事:

  1. 單用戶遠程登錄成功;
  2. 新建一個windows登錄用戶;
  3. 配置windows遠程登錄用戶;
  4. 編輯本地組策略:配置關於遠程登錄的東東;
  5. 解決windows多用戶同時登錄的問題;
  6. 測試遠程登錄。

   

1)單用戶遠程登錄成功

      前提:在你 單用戶遠程登錄成功后再做多用戶同時遠程登錄。

 

2)新建一個windows登錄用戶

    直接搜索:“用戶”,進入“創建標準用戶賬戶”

       

 

 

 

 輸入新用戶信息:ccccc  隨便輸入你喜歡的

 

 

   ps:當你點擊創建時,輸入框數據會被清空,其實已經創建好了,只是win10 沒有自動幫你關窗口,也許是方便創建多個用戶吧,個人覺得體驗感很差。

   雙擊打開:用戶  >>> 可以看到有啦

  

 

  

3)配置windows遠程登錄用戶

    將剛才新建的用戶添加到遠程登錄(上面已經講過了)

   

    添加成功:

   

 

4)編輯本地組策略

     win + R  >>>  輸入 ”gpedit.msc“ 

     

       打開本地組策略

      

 

      進入到遠程登錄配置

     

 

       配置連接數和同時遠程登錄的信息  

     

        PS: windows 雖然允許設置多用戶同時遠程登錄,但不允許你這麼遠程連接……

        配置完這個后的效果:先遠程登錄一個用戶,再遠程登錄另外一個用戶時會提示等待前一個用戶退出。

         可自行驗證,在這裏就不演示了。

        

 

 5)解決windows多用戶同時登錄的問題

     先下載:RDPWrap-v1.6   https://github.com/stascorp/rdpwrap/releases/tag/v1.6.2

   

 

      解壓后:

     

    1. 先再cmd 下 執行“install.bat” 安裝RDPWrap ;安裝成功后,在“C:\Program Files\RDP Wrapper” 目錄下有

     

 

    2. 再嘗試執行下update.bat 。更新不了配置信息,需要手動來配置

      在配置rdpwrap.ini 之前先看下電腦版本:win + r   ,接着輸入:ver (細心的小夥伴會看到,打開cmd其實就已經看到了版本:10.0.18362.53 )

     

 

     接着打開 C:\Program Files\RDP Wrapper\rdpwrap.ini ,在文本末尾添加面的配置信息(不同版本的配置不一樣)     配置之間有空行,最後的空行也不要漏了

[10.0.18362.53]
LocalOnlyPatch.x86=1
LocalOnlyOffset.x86=B7D06
LocalOnlyCode.x86=jmpshort
LocalOnlyPatch.x64=1
LocalOnlyOffset.x64=82FB5
LocalOnlyCode.x64=jmpshort
SingleUserPatch.x86=1
SingleUserOffset.x86=50535
SingleUserCode.x86=nop
SingleUserPatch.x64=1
SingleUserOffset.x64=DBFC
SingleUserCode.x64=Zero
DefPolicyPatch.x86=1
DefPolicyOffset.x86=50269
DefPolicyCode.x86=CDefPolicy_Query_eax_ecx
DefPolicyPatch.x64=1
DefPolicyOffset.x64=1FE15
DefPolicyCode.x64=CDefPolicy_Query_eax_rcx
SLInitHook.x86=1
SLInitOffset.x86=5A77A
SLInitFunc.x86=New_CSLQuery_Initialize
SLInitHook.x64=1
SLInitOffset.x64=22DDC
SLInitFunc.x64=New_CSLQuery_Initialize

[10.0.18362.53-SLInit]
bInitialized.x86      =D577C
bServerSku.x86        =D5780
lMaxUserSessions.x86  =D5784
bAppServerAllowed.x86 =D578C
bRemoteConnAllowed.x86=D5790
bMultimonAllowed.x86  =D5794
ulMaxDebugSessions.x86=D5798
bFUSEnabled.x86       =D579C

bInitialized.x64      =F6A8C
bServerSku.x64        =F6A90
lMaxUserSessions.x64  =F6A94
bAppServerAllowed.x64 =F6A9C
bRemoteConnAllowed.x64=F6AA0
bMultimonAllowed.x64  =F6AA4
ulMaxDebugSessions.x64=F6AA8
bFUSEnabled.x64       =F6AAC

 

    3. 管理員啟動RDPWrap.exe

    

   ps:不配置第2步,或者版本不對的效果

   

 

  4. 執行:RDPCheck.exe 檢測是否破解成功

   

  對於已經全部綠色了,但提示“訪問拒絕”的,應該是用了當前的用戶(相同用戶)登錄了。

 

    PS:啟動RDPConf.exe 前要 重啟 “遠程桌面服務”:Remote Desktop Services 如圖

    

 

6)測試遠程登錄

    快速打開遠程登錄窗口:win+r  >>> 輸入:mstsc   

    多用戶遠程同時登錄內網機測試效果:  

    

  最後附上不同版本的rdpwrap.ini的配置信息(先看裏面有沒有合適的版本,沒有就沒必要下載了):https://gitee.com/RDNGL/rdpwrap

 

後面純屬扯淡,可以不看。

三、拓展學習

  到目前為止,多用戶同時遠程登錄內網機已完成。

       擴展其應用:內網穿透后,外網可以訪問內網,內網的服務可以被互聯網訪問,也即是可以在內網發布web服務,ftp服務等等–內網穿透的應用。

    外網可以遠程連接內網機器,多個人可以同時使用一台電腦。

    拿着一台可以聯網的筆記本便可以擁有巨大的計算資源。

       延伸為雲計算:

    趨勢:現在很多東西都開始“雲”化,這也是未來的發展方向。未來的趨勢:不需要再買固定配置的電腦,升級不僅麻煩,而且還不方便攜帶。未來只需要購買連接器設備,再開個雲計算機服務,隨時隨地升級配置,便可擁有巨大的計算資源。

         “雲”趨勢帶來的影響:硬件配置將面臨企業集中式採購,而零售數量下降。好像扯遠了……

 

 

    到目前為止全部講完了,歡迎來評論區打唾沫戰。

 

學習會讓人視野開闊,站在頂端才能仰望未來。

轉載請指明出處:https://www.cnblogs.com/dennyLee2025/p/13168408.html

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

※教你寫出一流的銷售文案?

※超省錢租車方案

【Spring】AOP的代理默認是Jdk還是Cglib?,【DP-動態代理】JDK&Cglib

2{icon} {views}

菜瓜:你覺得AOP是啥

水稻:我覺得吧,AOP是對OOP的補充。通常情況下,OOP代碼專註功能的實現,所謂面向切面編程,大多數時候是對某一類對象的方法或者功能進行增強或者抽象

菜瓜:我看你這個理解就挺抽象的

水稻:舉個栗子!我要在滿足開閉原則的基礎下對已有功能進行擴展

  • 我現在想對很多個功能增加日誌功能,但是代碼已經打好包了,不想改。又或者有時候方法調用很慢,想定位問題
  • low一點的方法就是每個方法調用之前記錄調用開始,之後記錄調用結束

菜瓜:你說的這個low一點的方法怎麼好像是在說我???

水稻:建議看一下動態代理設計模式【DP-動態代理】JDK&Cglib,我當然知道你不會看,所以我還準備了自定義註解的栗子

  • package com.hb.merchant.config.aop;
    
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.EnableAspectJAutoProxy;
    
    /**
     * @author QuCheng on 2020/6/23.
     */
    @Configuration
    @EnableAspectJAutoProxy
    public class AopConfig {
    }
    
    package com.hb.merchant.config.aop;
    
    import java.lang.annotation.ElementType;
    import java.lang.annotation.Retention;
    import java.lang.annotation.RetentionPolicy;
    import java.lang.annotation.Target;
    
    /**
     * @author QuCheng on 2020/6/23.
     */
    @Retention(RetentionPolicy.RUNTIME)
    @Target(ElementType.METHOD)
    public @interface OperatorLog {
    }
    
    
    package com.hb.merchant.config.aop;
    
    import lombok.extern.slf4j.Slf4j;
    import org.aspectj.lang.ProceedingJoinPoint;
    import org.aspectj.lang.annotation.Around;
    import org.aspectj.lang.annotation.Aspect;
    import org.aspectj.lang.reflect.MethodSignature;
    import org.springframework.stereotype.Component;
    
    /**
     *
     * @author QuCheng on 2020/6/23.
     */
    @Aspect
    @Component
    @Slf4j
    public class OperatorAspect {
    
        @Around("@annotation(OperatorLog)")
        public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
            //獲取要執行的方法
            MethodSignature methodSignature = (MethodSignature) joinPoint.getSignature();
            //記錄方法執行前日誌
            log.info("startLog: {} 開始了。。。" , methodSignature.getName());
            //獲取方法信息
            String[] argNames = methodSignature.getParameterNames();
            // 參數值:
            final Object[] argValues = joinPoint.getArgs();
            StringBuilder sb = new StringBuilder();
            for (int i = 0; i < argNames.length; i++) {
                String value = argValues[i] == null ? "null" : argValues[i].toString();
                sb.append(argNames[i]).append("=").append(value).append(",");
            }
            String paramStr = sb.length() > 0 ? sb.toString().substring(0, sb.length() - 1) + "]" : "";
            log.info("參數信息為:[{}", paramStr);
    
            //執行方法
            Object result;
            try {
                result = joinPoint.proceed();
            } catch (Exception e) {
                log.error("errorLog", e);
                return null;
            }
    
            //記錄方法執行後日志
            log.info("endLog: {} 結束了。。。" , methodSignature.getName());
            return result;
        }
    
    }
    
    
    
    package com.hb.merchant.controller.icbc.item.oc;
    
    import com.hb.merchant.config.aop.OperatorLog;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.util.Assert;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * @author QuCheng on 2020-06-23.
     */
    @RestController
    @RequestMapping("/item")
    @Slf4j
    public class ItemOcController {
    
        @OperatorLog
        @GetMapping("/delete")
        public String delete(Long itemId) {
            Assert.notNull(itemId,"itemId不能為空");
            return "delete finished ...";
        }
    }

    // 後台打印
    startLog: delete 開始了。。。
    參數信息為:[itemId=1]
    endLog: delete 結束了。。。

菜瓜:這個自定義註解又是怎麼實現的呢?

水稻:不愧是你,沒有源碼看來是滿足不了你的好奇心了!!不知道你是否還記得我們之前有聊到過bean創建完畢後會調用一些PostProcessor對其進一步操作

菜瓜:有印象,@PostConstruct註解就是InitDestroyAnnotationBeanPostProcessor在這裏調用的,還自定義過BeanPostProcessorT對象打印輸出過bean信息

水稻:你猜Spring是怎麼操作的

菜瓜:let me try try。結合剛剛的栗子和提示,大膽猜測應該是用PostProcessor在bean創建完成之後生成代理對象。實際調用代理的invoke方法實現對被代理bean的增強

水稻:思路正確。看脈絡

  • 入口在AbstractAdvisorAutoProxyCreator#initializeBean
  • protected Object initializeBean(final String beanName, final Object bean, @Nullable RootBeanDefinition mbd) {
            。。。
                // BeanNameAware BeanFactoryAware ...
                invokeAwareMethods(beanName, bean);
        。。。    
                // BeanPostProcessorBefore  @PostConstruct
                wrappedBean = applyBeanPostProcessorsBeforeInitialization(wrappedBean, beanName);
        。。。
                // initMethod InitializingBean接口
                invokeInitMethods(beanName, wrappedBean, mbd);
                。。。
            if (mbd == null || !mbd.isSynthetic()) {
                // aop
                wrappedBean = applyBeanPostProcessorsAfterInitialization(wrappedBean, beanName);
            }
            return wrappedBean;
        }
  • 從aop入口跟下去
  • protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
            。。。
            // 收集切面信息匹配被代理對象
            Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);
            if (specificInterceptors != DO_NOT_PROXY) {
                this.advisedBeans.put(cacheKey, Boolean.TRUE);
          // 如果符合切面 創建代理,被代理對象被代理引用
                Object proxy = createProxy(
                        bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));
                this.proxyTypes.put(cacheKey, proxy.getClass());
                return proxy;
            }
    
            this.advisedBeans.put(cacheKey, Boolean.FALSE);
            return bean;
        }
  • 跟createProxy方法 -> DefaultAopProxyFactory#createAopProxy
  • @Override
    public AopProxy createAopProxy(AdvisedSupport config) throws AopConfigException {
       if (config.isOptimize() || config.isProxyTargetClass() || hasNoUserSuppliedProxyInterfaces(config)) {
          Class<?> targetClass = config.getTargetClass();
          if (targetClass == null) {
             throw new AopConfigException("TargetSource cannot determine target class: " +
                   "Either an interface or a target is required for proxy creation.");
          }
          if (targetClass.isInterface() || Proxy.isProxyClass(targetClass)) {
            // jdk動態代理類
             return new JdkDynamicAopProxy(config);
          }
          // cglib
          return new ObjenesisCglibAopProxy(config);
       }
       else {
          return new JdkDynamicAopProxy(config);
       }
    }
  • 此處省略了切面類搜集和匹配的過程。可以簡單理解成搜集到所有的切面類信息獲取pointcut的目錄或者註解信息,匹配當前bean是否屬於pointcut目標範圍
  • 另外我們可以看到最後返回的bean已經不是原始bean了,而是代理對象。也就是說getBean(“xxx”)返回的對象實際是代理對象,被代理對象被其成員變量直接引用

菜瓜:然後代理類中都有invoke方法,那些advice(@Around,@Before…)在invoke中找到適當時機調用對吧

水稻:是的,這裏我想結合@Transactional註解會更容易理解,你肯定用過這個註解吧,它其實。。。

菜瓜:停。。。今天獲取的知識量已經夠了,我下去自己斷點走一趟再熟悉熟悉。下次請結合Transactional註解再敲打我吧

水稻:也好,我下去再給你準備幾個栗子

 

總結:

  • AOP提供了在不侵入代碼的前提下動態增強目標對象的途徑,讓OOP更加專註於實現自己的邏輯
  • 而Spring的實現還是老套路,利用PostProcessor在類初始化完成之後替需要的bean創建代理對象
  • 這裏還有一些細節沒有照顧到,譬如說AOP解析類是什麼時候註冊到IOC容器的(偷偷告訴你從@EnableAspectJAutoProxy註解下手)

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

現代化編程模式(1)-快

5{icon} {views}

感謝大家購買我的書

首先十分感謝大家購買我的書,在Tommy和Julisa購買我的書的時候,我承諾會開Sharing給大家。我開Sharing的目標就是讓像Tommy和Julisa這樣的基礎比較低的人都能夠學會編程。所以我就以Tommy和Julisa為目標制定sharing課程,不再按照書籍章節順序一章一章的講。

現代化編程模式核心 – 快

我這本書最值得學習的地方不是.NET,所以這節sharing我故意不用.NET來演示,而是採用JavaScript來演示。

我這本書最值得學習的地方也不是併發。

我這本書最值得學習的地方是“現代化的編程模式”。也就是如下標黃處

 

 

 

現代化編程模式核心只有一個字,就是 – 快!

為了显示出快,我故意挑選了最慢的JavaScript來做這次sharing的演示語言!

現代化編程模式適用於所有現代化的編程語言,包括.NET/(C#和F#),Java,Python,JavaScript,Scala,Clojure,Swift。所以無論是我書中的C#和F#,還是我這次sharing用的JavaScript,都是通用的。最典型的就是本書第6章所講的Rx,它就有多個編程語言的版本:Java,JavaScript,.NET,Scala,Clojure,Swift。以下截圖取自Rx的官網 http://reactivex.io/

 

 

 

快主要來自兩個方面:

  • 程序運行速度快。
  • 編程速度快。

程序運行速度快

從大家的反饋來看,大家不太關注這點,所以我這次sharing先不講,以後再講。

編程速度快

所有技術的進步趨勢都是越來越容易,門檻越來越低,編程也同樣如此!

現在我回首十九年前(2001年)我用C語言寫出第一個程序的時候,當時的編程門檻是相當的高。當時的Turbo C 2.0還是跑在Dos上的,與今天相比真是天壤之別。編程門檻越來越低,編程工具環境越來越完善。這些基礎設施的完善決定了我們編程速度會越來越快。所以大家不需要再對編程帶有恐懼感,覺得編程難!現在編程的難度已經是你讀書時代的幾十分之一了!如果你掌握的是現代化的編程模式而不再是傳統古老的編程模式的話。

在這次sharing我只講大家最容易理解的幾點:

  • 調試診斷快
  • 不用動腦跟着感覺走導致行動快
  • 不用等待他人協作導致自身行動快

調試診斷快

在傳統的編程模式中,我們是這樣編程的:

  1. 啪啪啪敲一堆鍵盤之後,程序寫完了。
  2. 跑起來試一下,oh! No! 出錯了!(耗時幾分到十幾分鐘)
  3. 這時候就加斷點調試,先進一點的就不加斷點,而是看log來診斷。(至少耗時一分鐘起)

大家可以看到,第二步和第三步耗時都是以分鐘為單位的。如果unlucky,第三步可能要按小時甚至天為單位。為了加深大家的印象,sharing的時候我會視時間多少而現場演示一遍這個痛苦的傳統編程模式。

但是現代化的編程模式是以秒為單位的,比以分為單位的傳統編程模式快了一個數量級!!!

Sharing的時候我以JavaScript為編程語言,Visual Studio Code為編程工具,框架選karma + jasmine + AngularJS(我故意沒選最近的Angular而是選AngularJS,也是為了證明現代化編程模式是通用的)來演示現代化編程模式是如何以秒為單位進行調試診斷的。

現代化編程模式以秒為單位進行調試診斷的要點在於:

  1. 每Save一次就會把所有BDD Test cases在一兩秒內跑完!
  2. 基於上一點,也就是說,與傳統的編程模式相比,我馬上就能知道結果,而不是要等幾分鐘。
  3. 基於上一點,就可以每改一點代碼,就馬上Save,同時就馬上知道結果。如果此時發現BDD Test Case變紅了,馬上就知道剛才改的哪一處代碼有錯誤。馬上就能知道原因,馬上就能夠Fix!這裏的每改一點代碼可以理解成:每改一行代碼,每改一個字符,等等。
  4. 因為每次修改代碼的粒度是如此之小,小到每一行和每一個字符,可謂:一步一個腳印。並且每次改動都能保證程序是能BDD Test Case跑通過的,自然也就不需要加斷點調試和看log來診斷了。
  5. 然而人總是會犯錯的,程序出錯時,現代化編程模式不需要通過加斷點調試和看log診斷這種這麼古老的方式,而是通過不斷的加BDD Test Case和縮小Scope來定位問題發生的地方以及去解決。

這部分實操內容也是這次Sharing的大頭。

不用動腦跟着感覺走導致行動快

傳統的編程模式需要程序員去動腦,去思考程序代碼的流程,所以需要寫if/else/switch/for/foreach等語句。然而動腦是很容易累的,同時又是很耗時間的,所以傳統的編程模式自然是快不起來的!

“動腦是不可能動腦的啦,做生意又不會,只能跟着感覺走寫寫代碼才能生活這樣子”

竊-格瓦拉

現代化編程模式改變了思路,不再去思考程序代碼的流程:

 

傳統的編程模式

現代化編程模式

程序員所要做的

需要動腦思考如何做,需要明確的指出每一步該怎麼做。

只需要告訴計算機你的目標即可!

相關編程語句和庫

if/else/switch/for/foreach

LINQ,Reactive Extensions, Promises (不應該再出現if/else/switch/for/foreach)

專業術語

命令式編程

聲明式編程

 

 

 

補充多一句:聲明式編程語言通常用作解決人工智能和約束滿足問題。哈哈,看到這裏大家都懂的啦。

從上面的表格對比中可以看出,“只需要告訴計算機你的目標”的現代化編程模式明顯就比“還需要明確的指出每一步該怎麼做。”的傳統編程模式節省腦子很多!!!

大家可以看看 https://baike.baidu.com/item/%E5%A3%B0%E6%98%8E%E5%BC%8F%E7%BC%96%E7%A8%8B/9939512 來預習一下

 

 

 

不用等待他人協作導致自身行動快

自從進入移動互聯網時代之後,因為客戶端設備的多樣化,前後端分離已經成了大趨勢。同時也帶來了一個大問題:前後端程序員協作的問題!

接着前面“調試診斷快”一節中所share的內容,傳統的編程模式對前後端程序員聯調的依賴性比較高。而現代化編程模式因為可以通過mock的方式來模擬絕大部分後端的響應,前端程序員不再需要等待後端程序員的工作了,從而節約了這部分時間,導致自身在項目進度中更快更有保障。

在這次sharing中我所使用的Karma(BDD框架)和Jasmine(包含了httpbackend mock庫的unit testing框架)在其他編程語言中都有對應的框架和庫,再次體現了現代化編程模式是在各種編程語言中是通用的!!!

總結和動手口訣

在這次sharing之前,我請了兩位老夥計参觀了一下,都得到了好評,希望參加的有經驗的程序員也同樣會一聲驚嘆!

當然,非程序員還是不可能只通過這一次sharing就馬上學會編程的,所以對於你們來說,這次sharing的目的應該是:你能夠識別和分辨出哪些才是編程里正確的道路,從而找到先進的編程資料和避免誤讀落後的編程資料。關於這點,我總結了口訣如下:

  1. 快是王道!
    凡是要一分鐘以上才能看到編程運行結果的都是落後的編程模式,現代化模式是一兩秒就能看到運行結果的。凡是要通過debug和看log這麼耗時的行為,我們都要思考是否通過BDD來減少時間。
  2. 節省腦力是王道!
    現代化編程模式已經可以實現了只需要告訴計算機你的目標即可!如果還需要費腦子去想if/else/switch/for/foreach。那你就落後了。
  3. 能夠獨立完成任務,不依賴別人是王道!
    Mock+BDD已經可以讓你不需要過多等待和依賴其他程序員的工作了。

大家下周sharing見!

風險提示:

我的blog文章和我所翻譯和所寫的書籍不一樣:

    • 沒有像書籍一樣經過三審三校。所以不像書籍一樣嚴謹和全面。
    • 都有當時特定的閱讀對象,然而每篇blog的閱讀對象都不一樣,這點和書籍十分不一樣。所以如果你讀起來覺得怪怪的,那很有可能是你與該篇blog閱讀對象差異很大。
    • 所有文章全部不構成任何投資建議。如因採納這些文章而進行投資所造成的虧損,我不負任何責任。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

南投搬家公司費用需注意的眉眉角角,別等搬了再說!

新北清潔公司,居家、辦公、裝潢細清專業服務

※教你寫出一流的銷售文案?

【原創】Linux中斷子系統(四)-Workqueue

5{icon} {views}

背景

  • Read the fucking source code! –By 魯迅
  • A picture is worth a thousand words. –By 高爾基

說明:

  1. Kernel版本:4.14
  2. ARM64處理器,Contex-A53,雙核
  3. 使用工具:Source Insight 3.5, Visio

1. 概述

  • Workqueue工作隊列是利用內核線程來異步執行工作任務的通用機制;
  • Workqueue工作隊列可以用作中斷處理的Bottom-half機制,利用進程上下文來執行中斷處理中耗時的任務,因此它允許睡眠,而SoftirqTasklet在處理任務時不能睡眠;

來一張概述圖:

  • 在中斷處理過程中,或者其他子系統中,調用workqueue的調度或入隊接口后,通過建立好的鏈接關係圖逐級找到合適的worker,最終完成工作任務的執行;

2. 數據結構

2.1 總覽

此處應有圖:

  • 先看看關鍵的數據結構:
    1. work_struct:工作隊列調度的最小單位,work item
    2. workqueue_struct:工作隊列,work item都掛入到工作隊列中;
    3. workerwork item的處理者,每個worker對應一個內核線程;
    4. worker_poolworker池(內核線程池),是一個共享資源池,提供不同的worker來對work item進行處理;
    5. pool_workqueue:充當橋樑紐帶的作用,用於連接workqueueworker_pool,建立鏈接關係;

下邊看看細節吧:

2.2 work

struct work_struct用來描述work,初始化一個work並添加到工作隊列后,將會將其傳遞到合適的內核線程來進行處理,它是用於調度的最小單位。

關鍵字段描述如下:

struct work_struct {
	atomic_long_t data;     //低比特存放狀態位,高比特存放worker_pool的ID或者pool_workqueue的指針
	struct list_head entry; //用於添加到其他隊列上
	work_func_t func;       //工作任務的處理函數,在內核線程中回調
#ifdef CONFIG_LOCKDEP
	struct lockdep_map lockdep_map;
#endif
};

圖片說明下data字段:

2.3 workqueue

  • 內核中工作隊列分為兩種:

    1. bound:綁定處理器的工作隊列,每個worker創建的內核線程綁定到特定的CPU上運行;
    2. unbound:不綁定處理器的工作隊列,創建的時候需要指定WQ_UNBOUND標誌,內核線程可以在處理器間遷移;
  • 內核默認創建了一些工作隊列(用戶也可以創建):

    1. system_mq:如果work item執行時間較短,使用本隊列,調用schedule[_delayed]_work[_on]()接口就是添加到本隊列中;
    2. system_highpri_mq:高優先級工作隊列,以nice值-20來運行;
    3. system_long_wq:如果work item執行時間較長,使用本隊列;
    4. system_unbound_wq:該工作隊列的內核線程不綁定到特定的處理器上;
    5. system_freezable_wq:該工作隊列用於在Suspend時可凍結的work item
    6. system_power_efficient_wq:該工作隊列用於節能目的而選擇犧牲性能的work item
    7. system_freezable_power_efficient_wq:該工作隊列用於節能或Suspend時可凍結目的的work item

struct workqueue_struct關鍵字段介紹如下:

struct workqueue_struct {
	struct list_head	pwqs;		/* WR: all pwqs of this wq */   //所有的pool_workqueue都添加到本鏈表中
	struct list_head	list;		/* PR: list of all workqueues */    //用於將工作隊列添加到全局鏈表workqueues中

	struct list_head	maydays;	/* MD: pwqs requesting rescue */    //rescue狀態下的pool_workqueue添加到本鏈表中
	struct worker		*rescuer;	/* I: rescue worker */  //rescuer內核線程,用於處理內存緊張時創建工作線程失敗的情況

	struct pool_workqueue	*dfl_pwq;	/* PW: only for unbound wqs */

	char			name[WQ_NAME_LEN]; /* I: workqueue name */

	/* hot fields used during command issue, aligned to cacheline */
	unsigned int		flags ____cacheline_aligned; /* WQ: WQ_* flags */
	struct pool_workqueue __percpu *cpu_pwqs; /* I: per-cpu pwqs */     //Per-CPU都創建pool_workqueue
	struct pool_workqueue __rcu *numa_pwq_tbl[]; /* PWR: unbound pwqs indexed by node */    //Per-Node創建pool_workqueue
    ...
};

2.4 worker

  • 每個worker對應一個內核線程,用於對work item的處理;
  • worker根據工作狀態,可以添加到worker_pool的空閑鏈表或忙碌列表中;
  • worker處於空閑狀態時並接收到工作處理請求,將喚醒內核線程來處理;
  • 內核線程是在每個worker_pool中由一個初始的空閑工作線程創建的,並根據需要動態創建和銷毀;

關鍵字段描述如下:

struct worker {
	/* on idle list while idle, on busy hash table while busy */
	union {
		struct list_head	entry;	/* L: while idle */     //用於添加到worker_pool的空閑鏈表中
		struct hlist_node	hentry;	/* L: while busy */ //用於添加到worker_pool的忙碌列表中
	};

	struct work_struct	*current_work;	/* L: work being processed */   //當前正在處理的work
	work_func_t		current_func;	/* L: current_work's fn */                  //當前正在執行的work回調函數
	struct pool_workqueue	*current_pwq; /* L: current_work's pwq */   //指向當前work所屬的pool_workqueue

	struct list_head	scheduled;	/* L: scheduled works */    //所有被調度執行的work都將添加到該鏈表中

	/* 64 bytes boundary on 64bit, 32 on 32bit */

	struct task_struct	*task;		/* I: worker task */    //指向內核線程
	struct worker_pool	*pool;		/* I: the associated pool */    //該worker所屬的worker_pool
						/* L: for rescuers */
	struct list_head	node;		/* A: anchored at pool->workers */  //添加到worker_pool->workers鏈表中
						/* A: runs through worker->node */
    ...
};

2.5 worker_pool

  • worker_pool是一個資源池,管理多個worker,也就是管理多個內核線程;
  • 針對綁定類型的工作隊列,worker_pool是Per-CPU創建,每個CPU都有兩個worker_pool,對應不同的優先級,nice值分別為0和-20;
  • 針對非綁定類型的工作隊列,worker_pool創建後會添加到unbound_pool_hash哈希表中;
  • worker_pool管理一個空閑鏈表和一個忙碌列表,其中忙碌列表由哈希管理;

關鍵字段描述如下:

struct worker_pool {
	spinlock_t		lock;		/* the pool lock */
	int			cpu;		/* I: the associated cpu */     //綁定到CPU的workqueue,代表CPU ID
	int			node;		/* I: the associated node ID */ //非綁定類型的workqueue,代表內存Node ID
	int			id;		/* I: pool ID */
	unsigned int		flags;		/* X: flags */

	unsigned long		watchdog_ts;	/* L: watchdog timestamp */

	struct list_head	worklist;	/* L: list of pending works */  //pending狀態的work添加到本鏈表
	int			nr_workers;	/* L: total number of workers */    //worker的數量

	/* nr_idle includes the ones off idle_list for rebinding */
	int			nr_idle;	/* L: currently idle ones */

	struct list_head	idle_list;	/* X: list of idle workers */   //處於IDLE狀態的worker添加到本鏈表
	struct timer_list	idle_timer;	/* L: worker idle timeout */
	struct timer_list	mayday_timer;	/* L: SOS timer for workers */

	/* a workers is either on busy_hash or idle_list, or the manager */
	DECLARE_HASHTABLE(busy_hash, BUSY_WORKER_HASH_ORDER);   //工作狀態的worker添加到本哈希表中
						/* L: hash of busy workers */

	/* see manage_workers() for details on the two manager mutexes */
	struct worker		*manager;	/* L: purely informational */
	struct mutex		attach_mutex;	/* attach/detach exclusion */
	struct list_head	workers;	/* A: attached workers */   //worker_pool管理的worker添加到本鏈表中
	struct completion	*detach_completion; /* all workers detached */

	struct ida		worker_ida;	/* worker IDs for task name */

	struct workqueue_attrs	*attrs;		/* I: worker attributes */
	struct hlist_node	hash_node;	/* PL: unbound_pool_hash node */    //用於添加到unbound_pool_hash中
    ...
} ____cacheline_aligned_in_smp;

2.6 pool_workqueue

  • pool_workqueue充當紐帶的作用,用於將workqueueworker_pool關聯起來;

關鍵字段描述如下:

struct pool_workqueue {
	struct worker_pool	*pool;		/* I: the associated pool */    //指向worker_pool
	struct workqueue_struct *wq;		/* I: the owning workqueue */   //指向所屬的workqueue

	int			nr_active;	/* L: nr of active works */     //活躍的work數量
	int			max_active;	/* L: max active works */   //活躍的最大work數量
	struct list_head	delayed_works;	/* L: delayed works */      //延遲執行的work掛入本鏈表
	struct list_head	pwqs_node;	/* WR: node on wq->pwqs */      //用於添加到workqueue鏈表中
	struct list_head	mayday_node;	/* MD: node on wq->maydays */   //用於添加到workqueue鏈表中
    ...
} __aligned(1 << WORK_STRUCT_FLAG_BITS);

2.7 小結

再來張圖,首尾呼應一下:

3. 流程分析

3.1 workqueue子系統初始化

  • workqueue子系統的初始化分成兩步來完成的:workqueue_init_earlyworkqueue_init

3.1.1 workqueue_init_early

  • workqueue子系統早期初始化函數完成的主要工作包括:
    1. 創建pool_workqueue的SLAB緩存,用於動態分配struct pool_workqueue結構;
    2. 為每個CPU都分配兩個worker_pool,其中的nice值分別為0和HIGHPRI_NICE_LEVEL,並且為每個worker_poolworker_pool_idr中分配一個ID號;
    3. 為unbound工作隊列創建默認屬性,struct workqueue_attrs屬性,主要描述內核線程的nice值,以及cpumask值,分別針對優先級以及允許在哪些CPU上執行;
    4. 為系統默認創建幾個工作隊列,這幾個工作隊列的描述在上文的數據結構部分提及過,不再贅述;

從圖中可以看出創建工作隊列的接口為:alloc_workqueue,如下圖:

  • alloc_workqueue完成的主要工作包括:
    1. 首先當然是要分配一個struct workqueue_struct的數據結構,並且對該結構中的字段進行初始化操作;
    2. 前文提到過workqueue最終需要和worker_pool關聯起來,而這個紐帶就是pool_workqueuealloc_and_link_pwqs函數就是完成這個功能:1)如果工作隊列是綁定到CPU上的,則為每個CPU都分配pool_workqueue並且初始化,通過link_pwq將工作隊列與pool_workqueue建立連接;2)如果工作隊列不綁定到CPU上,則按內存節點(NUMA,參考之前內存管理的文章)來分配pool_workqueue,調用get_unbound_pool來實現,它會根據wq屬性先去查找,如果沒有找到相同的就創建一個新的pool_workqueue,並且添加到unbound_pool_hash哈希表中,最後也會調用link_pwq來建立連接;
    3. 創建工作隊列時,如果設置了WQ_MEM_RECLAIM標誌,則會新建rescuer worker,對應rescuer_thread內核線程。當內存緊張時,新創建worker可能會失敗,這時候由rescuer來處理這種情況;
    4. 最終將新建好的工作隊列添加到全局鏈表workqueues中;

3.1.2 workqueue_init

workqueue子系統第二階段的初始化:

  • 主要完成的工作是給之前創建好的worker_pool,添加一個初始的worker
  • create_worker函數中,創建的內核線程名字為kworker/XX:YY或者kworker/uXX:YY,其中XX表示worker_pool的編號,YY表示worker的編號,u表示unbound

workqueue子系統初始化完成后,基本就已經將數據結構的關聯建立好了,當有work來進行調度的時候,就可以進行處理了。

3.2 work調度

3.2.1 schedule_work

schedule_work接口為例進行分析:

  • schedule_work默認是將work添加到系統的system_work工作隊列中;

  • queue_work_on接口中的操作判斷要添加work的標誌位,如果已經置位了WORK_STRUCT_PENDING_BIT,表明已經添加到了隊列中等待執行了,否則,需要調用__queue_work來進行添加。注意了,這個操作是在關中斷的情況下進行的,因為工作隊列使用WORK_STRUCT_PENDING_BIT位來同步work的插入和刪除操作,設置了這個比特后,然後才能執行work,這個過程可能被中斷或搶佔打斷;

  • workqueue的標誌位設置了__WQ_DRAINING,表明工作隊列正在銷毀,所有的work都要處理完,此時不允許再將work添加到隊列中,有一種特殊情況:銷毀過程中,執行work時又觸發了新的work,也就是所謂的chained work

  • 判斷workqueue的類型,如果是bound類型,根據CPU來獲取pool_workqueue,如果是unbound類型,通過node號來獲取pool_workqueue

  • get_work_pool獲取上一次執行workworker_pool,如果本次執行的worker_pool與上次執行的worker_pool不一致,且通過find_worker_executing_work判斷work正在某個worker_pool中的worker中執行,考慮到緩存熱度,放到該worker執行是更合理的選擇,進而根據該worker獲取到pool_workqueue

  • 判斷pool_workqueue活躍的work數量,少於最大限值則將work加入到pool->worklist中,否則加入到pwq->delayed_works鏈表中,如果__need_more_worker判斷沒有worker在執行,則喚醒worker內核線程執行;

  • 總結:

    1. schedule_work完成的工作是將work添加到對應的鏈表中,而在添加的過程中,首先是需要確定pool_workqueue
    2. pool_workqueue對應一個worker_pool,因此確定了pool_workqueue也就確定了worker_pool,進而可以將work添加到工作鏈表中;
    3. pool_workqueue的確定分為三種情況:1)bound類型的工作隊列,直接根據CPU號獲取;2)unbound類型的工作隊列,根據node號獲取,針對unbound類型工作隊列,pool_workqueue的釋放是異步執行的,需要判斷refcnt的計數值,因此在獲取pool_workqueue時可能要多次retry;3)根據緩存熱度,優先選擇正在被執行的worker_pool

3.2.2 worker_thread

work添加到工作隊列后,最終的執行在worker_thread函數中:

  • 在創建worker時,創建內核線程,執行函數為worker_thread

  • worker_thread在開始執行時,設置標誌位PF_WQ_WORKER,調度器在進行調度處理時會對task進行判斷,針對workerqueue worker有特殊處理;

  • worker對應的內核線程,在沒有處理work的時候是睡眠狀態,當被喚醒的時候,跳轉到woke_up開始執行;

  • woke_up之後,如果此時worker是需要銷毀的,那就進行清理工作並返回。否則,離開IDLE狀態,並進入recheck模塊執行;

  • recheck部分,首先判斷是否需要更多的worker來處理,如果沒有任務處理,跳轉到sleep地方進行睡眠。有任務需要處理時,會判斷是否有空閑內核線程以及是否需要動態創建,再清除掉worker的標誌位,然後遍歷工作鏈表,對鏈表中的每個節點調用process_one_worker來處理;

  • sleep部分比較好理解,沒有任務處理時,worker進入空閑狀態,並將當前的內核線程設置成睡眠狀態,讓出CPU;

  • 總結:

    1. 管理worker_pool的內核線程池時,如果有PENDING狀態的work,並且發現沒有正在運行的工作線程(worker_pool->nr_running == 0),喚醒空閑狀態的內核線程,或者動態創建內核線程;
    2. 如果work已經在同一個worker_pool的其他worker中執行,不再對該work進行處理;

work的執行函數為process_one_worker

  • work可能在同一個CPU上不同的worker中運行,直接退出;
  • 調用worker->current_func(),完成最終work的回調函數執行;

3.3 worker動態管理

3.3.1 worker狀態機變換

  • worker_pool通過nr_running字段來在不同的狀態機之間進行切換;
  • worker_pool中有work需要處理時,需要至少保證有一個運行狀態的worker,當nr_running大於1時,將多餘的worker進入IDLE狀態,沒有work需要處理時,所有的worker都會進入IDLE狀態;
  • 執行work時,如果回調函數阻塞運行,那麼會讓worker進入睡眠狀態,此時調度器會進行判斷是否需要喚醒另一個worker
  • IDLE狀態的worker都存放在idle_list鏈表中,如果空閑時間超過了300秒,則會將其進行銷毀;
  1. Running->Suspend
  • worker進入睡眠狀態時,如果該worker_pool沒有其他的worker處於運行狀態,那麼是需要喚醒一個空閑的worker來維持併發處理的能力;
  1. Suspend->Running
  • 睡眠狀態可以通過wake_up_worker來進行喚醒處理,最終判斷如果該worker不在運行狀態,則增加worker_poolnr_running值;

3.3.2 worker的動態添加和刪除

  1. 動態刪除
  • worker_pool初始化時,註冊了timer的回調函數,用於定時對空閑鏈表上的worker進行處理,如果worker太多,且空閑時間太長,超過了5分鐘,那麼就直接進行銷毀處理了;
  1. 動態添加
  • 內核線程執行worker_thread函數時,如果沒有空閑的worker,會調用manage_workers接口來創建更多的worker來處理工作;

參考

Documentation/core-api/workqueue.rst
http://kernel.meizu.com/linux-workqueue.html

洗洗睡了,收工!

歡迎關注公眾號,不定期分享Linux內核機制文章

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※想知道最厲害的網頁設計公司"嚨底家"!

※幫你省時又省力,新北清潔一流服務好口碑

※別再煩惱如何寫文案,掌握八大原則!

【String註解驅動開發】你了解@PostConstruct註解和@PreDestroy註解嗎?

5{icon} {views}

寫在前面

在之前的文章中,我們介紹了如何使用@Bean註解指定初始化和銷毀的方法,小夥伴們可以參見《【Spring註解驅動開發】如何使用@Bean註解指定初始化和銷毀的方法?看這一篇就夠了!!》,也介紹了使用InitializingBean和DisposableBean來處理bean的初始化和銷毀,小夥伴們可以參見《【Spring註解驅動開發】Spring中的InitializingBean和DisposableBean,你真的了解嗎?》。除此之外,在JDK中也提供了兩個註解能夠在bean加載到Spring容器之後執行和在bean銷毀之前執行,今天,我們就一起來看看這兩個註解的用法。

項目工程源碼已經提交到GitHub:https://github.com/sunshinelyz/spring-annotation

@PostConstruct註解

@PostConstruct註解好多人以為是Spring提供的。其實是Java自己的註解。我們來看下@PostConstruct註解的源碼,如下所示。

package javax.annotation;
import java.lang.annotation.*;
import static java.lang.annotation.ElementType.*;
import static java.lang.annotation.RetentionPolicy.*;
@Documented
@Retention (RUNTIME)
@Target(METHOD)
public @interface PostConstruct {
}

從源碼可以看出,@PostConstruct註解是Java中的註解,並不是Spring提供的註解。

@PostConstruct註解被用來修飾一個非靜態的void()方法。被@PostConstruct修飾的方法會在服務器加載Servlet的時候運行,並且只會被服務器執行一次。PostConstruct在構造函數之後執行,init()方法之前執行。

通常我們會是在Spring框架中使用到@PostConstruct註解,該註解的方法在整個Bean初始化中的執行順序:

Constructor(構造方法) -> @Autowired(依賴注入) -> @PostConstruct(註釋的方法)。

@PreDestroy註解

@PreDestroy註解同樣是Java提供的,看下源碼,如下所示。

package javax.annotation;
import java.lang.annotation.*;
import static java.lang.annotation.ElementType.*;
import static java.lang.annotation.RetentionPolicy.*;
@Documented
@Retention (RUNTIME)
@Target(METHOD)
public @interface PreDestroy {
}

被@PreDestroy修飾的方法會在服務器卸載Servlet的時候運行,並且只會被服務器調用一次,類似於Servlet的destroy()方法。被@PreDestroy修飾的方法會在destroy()方法之後運行,在Servlet被徹底卸載之前。執行順序如下所示。

調用destroy()方法->@PreDestroy->destroy()方法->bean銷毀。

總結:@PostConstruct,@PreDestroy是Java規範JSR-250引入的註解,定義了對象的創建和銷毀工作,同一期規範中還有註解@Resource,Spring也支持了這些註解。

案例程序

對@PostConstruct註解和@PreDestroy註解有了簡單的了解之後,接下來,我們就寫一個簡單的程序來加深對這兩個註解的理解。

我們創建一個Cat類,如下所示。

package io.mykit.spring.plugins.register.bean;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

/**
 * @author binghe
 * @version 1.0.0
 * @description 測試@PostConstruct註解和@PreDestroy註解
 */
public class Cat {

    public Cat(){
        System.out.println("Cat類的構造方法...");
    }

    public void init(){
        System.out.println("Cat的init()方法...");
    }

    @PostConstruct
    public void postConstruct(){
        System.out.println("Cat的postConstruct()方法...");
    }

    @PreDestroy
    public void preDestroy(){
        System.out.println("Cat的preDestroy()方法...");
    }

    public void destroy(){
        System.out.println("Cat的destroy()方法...");
    }
}

可以看到,在Cat類中,我們提供了構造方法,init()方法、destroy()方法,使用 @PostConstruct註解標註的postConstruct()方法和只用@PreDestroy註解標註的preDestroy()方法。接下來,我們在AnimalConfig類中使用@Bean註解將Cat類註冊到Spring容器中,如下所示。

@Bean(initMethod = "init", destroyMethod = "destroy")
public Cat cat(){
    return new Cat();
}

接下來,在BeanLifeCircleTest類中新建testBeanLifeCircle04()方法進行測試,如下所示。

@Test
public void testBeanLifeCircle04(){
    //創建IOC容器
    AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(AnimalConfig.class);
    //關閉IOC容器
    context.close();
}

運行BeanLifeCircleTest類中的testBeanLifeCircle04()方法,輸出的結果信息如下所示。

Cat類的構造方法...
Cat的postConstruct()方法...
Cat的init()方法...
Cat的preDestroy()方法...
Cat的destroy()方法...

從輸出的結果信息中,可以看出執行的順序是: 構造方法 -> @PostConstruct -> init()方法 -> @PreDestroy -> destroy()方法。

好了,咱們今天就聊到這兒吧!別忘了給個在看和轉發,讓更多的人看到,一起學習一起進步!!

項目工程源碼已經提交到GitHub:https://github.com/sunshinelyz/spring-annotation

寫在最後

如果覺得文章對你有點幫助,請微信搜索並關注「 冰河技術 」微信公眾號,跟冰河學習Spring註解驅動開發。公眾號回復“spring註解”關鍵字,領取Spring註解驅動開發核心知識圖,讓Spring註解驅動開發不再迷茫。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

新北清潔公司,居家、辦公、裝潢細清專業服務

※別再煩惱如何寫文案,掌握八大原則!

※教你寫出一流的銷售文案?

※超省錢租車方案

Redis SDS 深入一點,看到更多!

1{icon} {views}

1、什麼是SDS?

Redis 自定的字符串存儲結構,關於redis,你需要了解的幾點!中我們對此有過簡要說明。

Redis 底層是用C語言編寫的,可是在字符存儲上,並未使用C原生的String類型,而是定義了自己的字符串結構 Simple Dynamic Stirng,簡稱SDS。

SDS基本結構如下:

struct sdshdr {
int len; // 記錄buf數組中已使用字節的數量,等於SDS所保存字符串的長度
int free; // 記錄buf數組中未使用字節的數量
char buf[];// 字節數組,用於保存字符串
};

例如,字符串“Redis”存儲示意圖為:

圖1

當前存儲字符串長度為5,未使用長度為0,字節數組存儲的字符為“Redis\0”。

這裏需要注意的是:內部數據數組存儲字符串形式符合C語言要求,以‘\0’結尾。且len字符串長度不包含結尾標識符‘\0’。

buf[]的這種遵循C語言形式的存儲,使得Redis可以直接使用C語言的相關字符串函數進行SDS對象的操作。

二、SDS的優勢

1、O(1)時間複雜度獲取字符串長度

SDS內部維護着一個字符串長度的len變量,可以直接讀取,時間複雜度為O(1)。

對於傳統的C字符串:字符+“\0”,想要獲取字符長度,則需要遍歷整個字符串,直到遇到結束字符,時間複雜度為O(n)。

2、緩衝區溢出規避

所謂緩衝區溢出即所需要的內存超出了實際的內存。因此對於C字符串來說,要特別注意內存分配,回收使用問題。

比如,向一個現有字符串內添加特定字符時,需要保證當前已經分配了這足夠的內存。 

圖2

與C不同的是,SDS的空間預分配策略可以避免緩衝區溢出發生,

當需要對SDS進行操作時,首先會檢查當前空間是否滿足需求,不足則擴展當前分配空間。內存檢查相對於C變成了內部預置操作。

3、減少內存重分配次數

上面我們講到過,C字符操作前都需要進行內存的分配操作,同時,操作完成后,也需要進行相應的內存回收操作。一次操作至少涉及一次內存分配操作。

大家都知道內存的重分配是一個比較複雜且需精細控制的過程,耗時耗資源。針對此弊端,Redis 在SDS內存配置策略上採用了空間預分配+惰性刪除相結合的策略。

a)空間預分配:

空間預分配用於優化SDS字符擴展操作。

所謂預分配,也即是說在一次擴展操作中,擴展的空間大小會大於實際需要的空間大小。
如下,圖1執行圖2操作后SDS變更為:

圖3 

預分配空間的大小基於以下規則計算:

SDS len<1M:分配len長度空間作為預分配空間;

SDS len>=1M:分配1M空間作為預分配空間;

這樣,在下次進行字符操作的時候,如果所需要的空間小於當前SDS free空間,則可以直接行操作,而不需要再執行內存擴展,重分配操作。

SDS的預分配機制使得一次擴展操作所需的內存重分配次數變為<=1。

b)惰性刪除機制

所謂惰性刪除,即調整刪除SDS中部分數據時,不會立刻執行內存重分配,而是會保留空出來內存,並更新內部free屬性。以備將來有字符擴展需求,可以直接使用。

當然,Redis也提供了主動釋放未使用內存的方法。

如下,刪除“ent”之後的SDS結構:

 圖4

SDS的內存分配機制,尤其對於以寫為主的應用場景,能夠提供更加優異的性能表現。

3、二進制安全

C字符串由於特殊的編碼要求只能保存文本數據。

SDS相關的功能方法會以二進制的形式來操作SDS存儲的數據,沒有任何中間操作,存儲最原始的數據,因此不會有字符層面的因素影響。

SDS可以保存任何源的二進制數據,字符、圖片、文件或者序列化的對象等等。

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

新北清潔公司,居家、辦公、裝潢細清專業服務

※別再煩惱如何寫文案,掌握八大原則!

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※超省錢租車方案

※教你寫出一流的銷售文案?

正則匹配中的非貪婪匹配不是最短匹配

1{icon} {views}

最近在工作中遇到一個需求,就是找出html中所有錨文字包含 聯繫方式 的超鏈接。剛開始我寫了一個很簡單的正則來解決這個問題<a.*?聯繫方式.*?</a。但是在測試的時候卻發現這個正則表達式並不像我想象的那樣工作。

圖中給出了一個正則表達式匹配的例子,可以看出在這段文字中有兩個匹配,但是第一個匹配所包含的結果已經超出了實際需要的範圍,包含了太多的超鏈接標籤,而我需要的是最短的匹配也就是圖中橫線畫出的範圍,這是怎麼回事?

正則匹配的原理

這要從正則匹配的原理說起,簡單的來說正則匹配是一種貪心的算法。它總是先找到第一個匹配的位置,然後向後繼續匹配其他的表達式符號。對於本文給出的正則表達式,會現在html中找到一個<a標籤,然後之後是.*?,直到找到一個聯繫方式,在這個過程中如果找到了另一個<a,會被當作.*?匹配的部分,而忘記了要匹配表達式的開頭就是<a。也就是說,除非發生失配,正則表達式不會主動地回溯。儘管使用了來表達非貪婪匹配,也只能限制向後匹配時盡可能地短,而不能縮短已匹配部分的長度。也就是說,非貪婪匹配向後是最短匹配,但是向前不是最短匹配

對於這個任務,我後來使用了其他效率更高的方法實現了,但是有沒有可能使用正則表達式來完成這個任務呢?

零寬斷言

零寬斷言是一種零寬度的匹配,它匹配到的內容不會保存到匹配結果中去,最終匹配結果只是一個位置而已。
作用是給指定位置添加一個限定條件,用來規定此位置之前或者之後的字符必須滿足限定條件才能使正則中的字表達式匹配成功。

零寬斷言總共有四種

對於這個需求,實際上應該找到離聯繫方式最近的一個<a,也就是說,在<a聯繫方式之前不能再有其他的<a了。而最開始的正則匹配表達式<a.*?聯繫方式.*?</a中的.*?可以通過.來匹配任意一個字符,在這裏可以使用零寬度負先行斷言來限制.匹配的任何一個字符的右側不能夠再有<a。也就是.(?!<a),再將這個整體重複多次(.(?!<a))*?。這裏引入了一個額外的括號,為了不產生多餘的匹配,可以使用非捕獲組來去除不需要的匹配,最終可以將整個表達式寫成<a(:?.(?!<a))*?聯繫方式.*?</a

可以看到匹配的範圍已經縮小到最後一個出現的超鏈接。

總結

因為正則表達式實現原理的限制,儘管選擇非貪婪匹配,匹配到的結果也不一定是最短的匹配。
通常正則表達式總是表明了“要匹配什麼”,而通過零寬度負斷言,則可以表明“不匹配什麼”,這比字符集中使用^來取反更加強大。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※教你寫出一流的銷售文案?

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※回頭車貨運收費標準

※別再煩惱如何寫文案,掌握八大原則!

※超省錢租車方案