SpringCloud Alibaba (三):Sentinel 流量控制組件

SpringCloud Alibaba (三):Sentinel 流量控制組件

Sentinel 是什麼

隨着微服務的流行,服務和服務之間的穩定性變得越來越重要。Sentinel 是面向分佈式服務架構的流量控制組件,主要以流量為切入點,從限流、流量整形、熔斷降級、系統負載保護、熱點防護等多個維度來幫助開發者保障微服務的穩定性。

Sentinel 基本概念

資源

資源是 Sentinel 的關鍵概念。它可以是 Java 應用程序中的任何內容,例如,由應用程序提供的服務,或由應用程序調用的其它應用提供的服務,甚至可以是一段代碼。

只要通過 Sentinel API 定義的代碼,就是資源,能夠被 Sentinel 保護起來。大部分情況下,可以使用方法簽名,URL,甚至服務名稱作為資源名來標示資源。

規則

圍繞資源的實時狀態設定的規則,可以包括流量控制規則、熔斷降級規則以及系統保護規則。所有規則可以動態實時調整。

 

規則的種類

Sentinel 的所有規則都可以在內存態中動態地查詢及修改,修改之後立即生效。同時 Sentinel 也提供相關 API,供您來定製自己的規則策略。

Sentinel 支持以下幾種規則:流量控制規則熔斷降級規則系統保護規則來源訪問控制規則熱點參數規則

流量控制規則 (FlowRule)

流量規則的定義0

重要屬性:

Field 說明 默認值
resource 資源名,資源名是限流規則的作用對象  
count 限流閾值  
grade 限流閾值類型,QPS 或線程數模式 QPS 模式
limitApp 流控針對的調用來源 default,代表不區分調用來源
strategy 調用關係限流策略:直接、鏈路、關聯 根據資源本身(直接)
controlBehavior 流控效果(直接拒絕 / 排隊等待 / 慢啟動模式),不支持按調用關係限流 直接拒絕

同一個資源可以同時有多個限流規則。

通過代碼定義流量控制規則

理解上面規則的定義之後,我們可以通過調用 FlowRuleManager.loadRules() 方法來用硬編碼的方式定義流量控制規則,比如:

private static void initFlowQpsRule() {
    List<FlowRule> rules = new ArrayList<>();
    FlowRule rule1 = new FlowRule();
    rule1.setResource(resource);
    // Set max qps to 20
    rule1.setCount(20);
    rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
    rule1.setLimitApp("default");
    rules.add(rule1);
    FlowRuleManager.loadRules(rules);
}

更多詳細內容可以參考 流量控制

熔斷降級規則 (DegradeRule)

熔斷降級規則包含下面幾個重要的屬性:

Field 說明 默認值
resource 資源名,即限流規則的作用對象  
count 閾值  
grade 熔斷策略,支持秒級 RT/秒級異常比例/分鐘級異常數 秒級平均 RT
timeWindow 降級的時間,單位為 s  

同一個資源可以同時有多個降級規則。

理解上面規則的定義之後,我們可以通過調用 DegradeRuleManager.loadRules() 方法來用硬編碼的方式定義流量控制規則。

 private static void initDegradeRule() {
        List<DegradeRule> rules = new ArrayList<>();
        DegradeRule rule = new DegradeRule();
        rule.setResource(KEY);
        // set threshold rt, 10 ms
        rule.setCount(10);
        rule.setGrade(RuleConstant.DEGRADE_GRADE_RT);
        rule.setTimeWindow(10);
        rules.add(rule);
        DegradeRuleManager.loadRules(rules);
    }

更多詳情可以參考 熔斷降級

系統保護規則 (SystemRule)

規則包含下面幾個重要的屬性:

Field 說明 默認值
highestSystemLoad load1 閾值,參考值 -1 (不生效)
avgRt 所有入口流量的平均響應時間 -1 (不生效)
maxThread 入口流量的最大併發數 -1 (不生效)
qps 所有入口資源的 QPS -1 (不生效)
highestCpuUsage 當前系統的 CPU 使用率(0.0-1.0) -1 (不生效)

理解上面規則的定義之後,我們可以通過調用 SystemRuleManager.loadRules() 方法來用硬編碼的方式定義流量控制規則。

private void initSystemProtectionRule() {
  List<SystemRule> rules = new ArrayList<>();
  SystemRule rule = new SystemRule();
  rule.setHighestSystemLoad(10);
  rules.add(rule);
  SystemRuleManager.loadRules(rules);
}

更多詳情可以參考 系統自適應保護

訪問控制規則 (AuthorityRule)

很多時候,我們需要根據調用方來限制資源是否通過,這時候可以使用 Sentinel 的訪問控制(黑白名單)的功能。黑白名單根據資源的請求來源(origin)限制資源是否通過,若配置白名單則只有請求來源位於白名單內時才可通過;若配置黑名單則請求來源位於黑名單時不通過,其餘的請求通過。

授權規則,即黑白名單規則(AuthorityRule)非常簡單,主要有以下配置項:

  • resource:資源名,即限流規則的作用對象

  • limitApp:對應的黑名單/白名單,不同 origin 用 , 分隔,如 appA,appB

  • strategy:限制模式,AUTHORITY_WHITE 為白名單模式,AUTHORITY_BLACK 為黑名單模式,默認為白名單模式

更多詳情可以參考 來源訪問控制

熱點規則 (ParamFlowRule)

詳情可以參考 熱點參數限流

 

Sentinel控制台

概述

Sentinel 提供一個輕量級的開源控制台,它提供機器發現以及健康情況管理、監控(單機和集群),規則管理和推送的功能。另外,鑒權在生產環境中也必不可少。這裏,我們將會詳細講述如何通過簡單的步驟就可以使用這些功能。

接下來,我們將會逐一介紹如何整合 Sentinel 核心庫和 Dashboard,讓它發揮最大的作用。同時我們也在阿里雲上提供企業級的控制台:AHAS Sentinel 控制台,您只需要幾個簡單的步驟,就能最直觀地看到控制台如何實現這些功能。

Sentinel 控制台包含如下功能:

  • 查看機器列表以及健康情況:收集 Sentinel 客戶端發送的心跳包,用於判斷機器是否在線。

  • 監控 (單機和集群聚合):通過 Sentinel 客戶端暴露的監控 API,定期拉取並且聚合應用監控信息,最終可以實現秒級的實時監控。

  • 規則管理和推送:統一管理推送規則。

  • 鑒權:生產環境中鑒權非常重要。這裏每個開發者需要根據自己的實際情況進行定製。

更詳細內容,訪問 https://github.com/alibaba/Sentinel/wiki

啟動Sentinel控制台

1.在Sentinel的github上下載 sentinel-dashboard.jar

https://github.com/alibaba/Sentinel/releases

2.在sentinel-dashboard.jar所在文件夾運行cmd,在cmd里運行以下啟動命令啟動sentinel-dashboard

java -Dserver.port=8081 -Dcsp.sentinel.dashboard.server=localhost:8081 -Dproject.name=sentinel-dashboard -jar sentinel-dashboard-1.7.2.jar

3.訪問 localhost:8081,進入Sentinel控制台,默認用戶和密碼都是 sentinel

 

SpringCloud Alibaba 整合 Sentinel

Sentinel 可以簡單的分為 Sentinel 核心庫和 Dashboard。核心庫不依賴 Dashboard,但是結合 Dashboard 可以取得最好的效果。

我們說的資源,可以是任何東西,服務,服務里的方法,甚至是一段代碼。使用 Sentinel 來進行資源保護,主要分為幾個步驟:

  1. 定義資源

  2. 定義規則

  3. 檢驗規則是否生效

先把可能需要保護的資源定義好,之後再配置規則。也可以理解為,只要有了資源,我們就可以在任何時候靈活地定義各種流量控制規則。在編碼的時候,只需要考慮這個代碼是否需要保護,如果需要保護,就將之定義為一個資源。

定義資源

1. 添加依賴

注意:在整合spring-cloud-starter-alibaba-sentinelspring-cloud-starter-openfeign時,feign-core的版本要在10.1.0以上,即導入2.1.0.RELEASE或以上版本的openfeign,否則會報feign.RequestTemplate.path()Ljava/lang/String;異常,因為低版本openfeign的RequestTemplate類里沒有path方法

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
    <version>2.1.0.RELEASE</version>
</dependency>
2.添加配置
server:
  port: 8080
​
spring:
  application:
    name: consumer-8080
  cloud:
    nacos:
      discovery:
        server-addr: 192.168.11.132:8848
    sentinel:
      transport:
        dashboard: localhost:8081 # 將服務註冊到sentinel控制台
​
feign:
  sentinel:
    enabled: true #開啟feign的sentinel支持
​
management:
  endpoints:
    web:
      exposure:
        include: "*"
3.在controller需要保護的方法添加 @SentinelResource 註解,把方法定義為資源
@RestController
public class FeignController {
​
    @Autowired
    private EchoService echoService;
    
    @SentinelResource(value = "echo", blockHandler = "echoBlockHandler", blockHandlerClass = EchoServiceBlockHandler.class)
    @GetMapping("/feign/echo/{string}")
    public String echo(@PathVariable("string")String string){
        return echoService.echo(string);
    }
​
}

@SentinelResource 用於定義資源,並提供可選的異常處理和 fallback 配置項。 @SentinelResource 註解包含以下屬性:

  • value:資源名稱,必需項(不能為空)

  • entryType:entry 類型,可選項(默認為 EntryType.OUT

  • blockHandler / blockHandlerClass: blockHandler 對應處理 BlockException 的函數名稱,可選項。blockHandler 函數訪問範圍需要是 public,返回類型需要與原方法相匹配,參數類型需要和原方法相匹配並且最後加一個額外的參數,類型為 BlockException。blockHandler 函數默認需要和原方法在同一個類中。若希望使用其他類的函數,則可以指定 blockHandlerClass 為對應的類的 Class 對象,注意對應的函數必需為 static 函數,否則無法解析。

  • fallback:fallback 函數名稱,可選項,用於在拋出異常的時候提供 fallback 處理邏輯。fallback 函數可以針對所有類型的異常(除了 exceptionsToIgnore裏面排除掉的異常類型)進行處理。fallback 函數簽名和位置要求:

    • 返回值類型必須與原函數返回值類型一致;

    • 方法參數列表需要和原函數一致,或者可以額外多一個 Throwable 類型的參數用於接收對應的異常。

    • fallback 函數默認需要和原方法在同一個類中。若希望使用其他類的函數,則可以指定 fallbackClass 為對應的類的 Class 對象,注意對應的函數必需為 static 函數,否則無法解析。

  • defaultFallback(since 1.6.0):默認的 fallback 函數名稱,可選項,通常用於通用的 fallback 邏輯(即可以用於很多服務或方法)。默認 fallback 函數可以針對所以類型的異常(除了 exceptionsToIgnore 裏面排除掉的異常類型)進行處理。若同時配置了 fallback 和 defaultFallback,則只有 fallback 會生效。defaultFallback 函數簽名要求:

    • 返回值類型必須與原函數返回值類型一致。

    • 方法參數列表需要為空,或者可以額外多一個 Throwable 類型的參數用於接收對應的異常。

    • defaultFallback 函數默認需要和原方法在同一個類中。若希望使用其他類的函數,則可以指定 fallbackClass 為對應的類的 Class 對象,注意對應的函數必需為 static 函數,否則無法解析。

  • exceptionsToIgnore(since 1.6.0):用於指定哪些異常被排除掉,不會計入異常統計中,也不會進入 fallback 邏輯中,而是會原樣拋出。

blockHandler 和 fallback區別

blockHandler 函數會在原方法被限流/降級/系統保護的時候調用,而 fallback 函數會針對所有類型的異常。

如果一個資源同時對 blockHandler 和 fallback 都進行了配置,除了流量控制規則觸發時拋出的 BlockException 會進入 blockHandler 處理邏輯,其他規則觸發時都會進入fallback處理邏輯

4.創建EchoServiceBlockHandler類,建立blockHandler 處理邏輯
public class EchoServiceBlockHandler {
​
    private final static Logger logger = LoggerFactory.getLogger(EchoServiceBlockHandler.class);
​
    public static String echoBlockHandler(String string, BlockException e){
        logger.error("error: "+e);
        return "方法請求降級中";
    }
​
}

在應用程序里定義規則

1.在啟動類中定義規則,並加入容器中
@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients
public class Consumer8080 {
    public static void main(String[] args) {
        SpringApplication.run(Consumer8080.class, args);
    }
​
    @Bean
    public SentinelResourceAspect sentinelResourceAspect(){
        return new SentinelResourceAspect();
    }
​
    //流量控制規則
    @Bean
    public static void initFlowRule(){
        List<FlowRule> rules = new ArrayList<FlowRule>();
        FlowRule flowRule = new FlowRule();
        flowRule.setResource("echo"); //設置資源名,即流量控制規則的作用對象
        flowRule.setCount(2); //設置限流閾值,此為QPS,即每秒最高訪問量
        flowRule.setGrade(RuleConstant.FLOW_GRADE_QPS); //限流閾值類型,QPS或線程數
        rules.add(flowRule);
        FlowRuleManager.loadRules(rules);
    }
​
}
2.訪問測試

訪問 http://localhost:8080/feign/echo/hi ,快速刷新,觸發流量控制規則,調用降級邏輯

在Sentinel控制台定義規則

在Sentinel控制台上我們可以簡單、靈活地管理規則以及推送規則,但是要注意的是,在Sentinel控制台上定義的規則是不會被持久化的,僅在內存中生存,當你重啟應用時,在Sentinel控制台上定義的規則將會丟失(應用程序里定義的規則在sentinel控制台上刪除,應用重啟後會重新生效)

 

 

我的個人博客站

 

 

自動判斷 中文 中文(簡體) 中文(香港) 中文(繁體) 英語 日語 朝鮮語 德語 法語 俄語 泰語 南非語 阿拉伯語 阿塞拜疆語 比利時語 保加利亞語 加泰隆語 捷克語 威爾士語 丹麥語 第維埃語 希臘語 世界語 西班牙語 愛沙尼亞語 巴士克語 法斯語 芬蘭語 法羅語 加里西亞語 古吉拉特語 希伯來語 印地語 克羅地亞語 匈牙利語 亞美尼亞語 印度尼西亞語 冰島語 意大利語 格魯吉亞語 哈薩克語 卡納拉語 孔卡尼語 吉爾吉斯語 立陶宛語 拉脫維亞語 毛利語 馬其頓語 蒙古語 馬拉地語 馬來語 馬耳他語 挪威語(伯克梅爾) 荷蘭語 北梭托語 旁遮普語 波蘭語 葡萄牙語 克丘亞語 羅馬尼亞語 梵文 北薩摩斯語 斯洛伐克語 斯洛文尼亞語 阿爾巴尼亞語 瑞典語 斯瓦希里語 敘利亞語 泰米爾語 泰盧固語 塔加路語 茨瓦納語 土耳其語 宗加語 韃靼語 烏克蘭語 烏都語 烏茲別克語 越南語 班圖語 祖魯語 自動選擇 中文 中文(簡體) 中文(香港) 中文(繁體) 英語 日語 朝鮮語 德語 法語 俄語 泰語 南非語 阿拉伯語 阿塞拜疆語 比利時語 保加利亞語 加泰隆語 捷克語 威爾士語 丹麥語 第維埃語 希臘語 世界語 西班牙語 愛沙尼亞語 巴士克語 法斯語 芬蘭語 法羅語 加里西亞語 古吉拉特語 希伯來語 印地語 克羅地亞語 匈牙利語 亞美尼亞語 印度尼西亞語 冰島語 意大利語 格魯吉亞語 哈薩克語 卡納拉語 孔卡尼語 吉爾吉斯語 立陶宛語 拉脫維亞語 毛利語 馬其頓語 蒙古語 馬拉地語 馬來語 馬耳他語 挪威語(伯克梅爾) 荷蘭語 北梭托語 旁遮普語 波蘭語 葡萄牙語 克丘亞語 羅馬尼亞語 梵文 北薩摩斯語 斯洛伐克語 斯洛文尼亞語 阿爾巴尼亞語 瑞典語 斯瓦希里語 敘利亞語 泰米爾語 泰盧固語 塔加路語 茨瓦納語 土耳其語 宗加語 韃靼語 烏克蘭語 烏都語 烏茲別克語 越南語 班圖語 祖魯語 有道翻譯 百度翻譯 谷歌翻譯 谷歌翻譯(國內)

翻譯 朗讀 複製 正在查詢,請稍候…… 重試 朗讀 複製 複製 朗讀 複製 via 谷歌翻譯(國內) 譯

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※教你寫出一流的銷售文案?

【你來報報】「膠」備競賽 Plastic or Planet?

文:朱漢強(綠惜地球環境倡議總監)

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※教你寫出一流的銷售文案?

傳比亞迪計畫投資1億美元在巴西建電動車廠

繼在美國洛杉磯建立汽車製造廠後,有消息稱比亞迪將在巴西投資1億美元,成立年產量4,000輛電動巴士的汽車廠。比亞迪希望2015年落實巴西工廠的全部生產能力,該工廠生產的電動大巴將銷售到拉丁美洲市場。

如果這項計畫能付諸實施,將是比亞迪在海外的最大的投資案。2013年11月,比亞迪啟動在美國加州的電動大巴士生產製造。

據華爾街日報報導,比亞迪副總裁李珂近日透露,該公司目前正針對投資細節,與當地政府進行協商,同時,建廠土地的購買問題也在洽談中。

李珂還表示,比亞迪希望在正式宣布巴西建廠工程之前與關鍵客戶簽訂合約,比亞迪與南美地區數個潛在客戶,正持續進行有關的商業談判。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※教你寫出一流的銷售文案?

不只有Gogoro 共享機車遍及30個城市 遊歐新選擇

環境資訊中心記者 陳文姿報導

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※教你寫出一流的銷售文案?

豐田與本田向歐洲市場推出迷你電動車

據日經新聞昨(24)日報導,汽車大廠豐田與本田等已鎖定歐洲迷你電動車市場,競相推出新一代超迷你電動車。豐田將與法國一家國營電力公司策略聯盟,推出i-Road、COMS電動車,本田則準備推出迷你短程代步車MC-beta。日本車廠希望先成功打入歐洲市場後,接著再前進包括東南亞等其他地區。

報導指出,豐田汽車將於今年底於格勒諾布爾市推出一項實驗性的汽車共享計畫,豐田總共將提供約70輛的超迷你電動車,包括3輪2人座的i-Road與集團旗下子公司豐田車體株式會社生產的COMS電動車。這項為期3年的領航計畫可讓通勤族租用電動車,消費者可透過智慧手機查詢有無可租車輛與預約租車等便利服務。

與此同時,本田汽車也準備在歐洲推出迷你短程代步車MC-beta。本田已在日本多個城市完成此款微型電動車的實測,並計畫以摩托車的名義向歐洲提出上路申請。

日本法規禁止超迷你電動車行駛高速公路與收費道路,地方政府也設有部份不安全路段禁駛超迷你電動車的限制。此外,與傳統迷你汽車不一樣的是,由於審核權在地方政府,車廠也不能將超迷你電動車直接販售給消費者。

而歐洲法規規範就比較寬鬆,且超迷你電動車在歐洲可能被歸類為摩托車,在義大利與其他部份國家開類似的車子甚至不需駕駛執照。歐洲國家同時也推動相關的基礎建設。

相比之下,根據日本運輸省的資料,目前日本國內已上路的超迷你電動車僅約數千輛,但歐洲地區包括非電動車在內,已登記的超迷你代步車已逾3萬輛。

MC-beta

i-Road

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※教你寫出一流的銷售文案?

落漆!聯合國要求1.5℃氣候行動計畫 大國繳白卷

環境資訊中心綜合外電;姜唯 編譯;彭瑞祥 審校

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※台北網頁設計公司全省服務真心推薦

※想知道最厲害的網頁設計公司"嚨底家"!

新北清潔公司,居家、辦公、裝潢細清專業服務

※推薦評價好的iphone維修中心

報告:核能發展太慢太貴 難以解決氣候問題

環境資訊中心綜合外電;姜唯 編譯;彭瑞祥 審校

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※教你寫出一流的銷售文案?

電動汽車租賃模式在中國興起

據悉,上海大眾交通目前正與特斯拉探討在租賃業務方面的合作。大眾交通正與特斯拉探討適應中國租賃客戶需求的租賃辦法,為與特斯拉合作積極做準備。

大眾交通汽車租賃客戶多為中高階客戶,對車型有較高要求,目前該公司正在進行客戶端調查,把客戶需求反饋至特斯拉廠家。另外,電動汽車租賃業務的關鍵在於充電樁問題,而特斯拉擬在北京、上海、廣州等大城市進行大規模投資建設,與中國政府共同建設電動車充電樁。

北汽與龐大集團的租賃業務

北汽集團目前也正與龐大集團合作新能源汽車的租賃業務,北汽集團和龐大集團合資成立的租賃公司將嘗試分時租賃等新能源汽車營銷服務新模式。

北汽集團銷售部人士表示,北汽的純電動汽車基本可以滿足市民短途的駕駛需求。同時,購買北汽新能源汽車的用戶可以憑一定數量的押金獲取龐大集團的租車券(例如3000元人民幣押金可以獲得價值1萬元的租車券),通過租賃燃油車滿足長途駕駛的需求。

易卡綠色的分時租賃方式

此前,隸屬於人民日報社的易卡綠色(北京)汽車租賃有限公司,也嘗試了採用分時租賃的方式推廣新能源汽車。目前,易卡綠色租賃的純電動汽車均為北汽E150EV。

易卡綠色方面表示,電動汽車採用分時共享租賃的方式,可以讓更多的客戶分攤租金,使單車獲得更好的收益。電動車分時租賃,既可以滿足個性化的城市短途交通出行,又可以提升車輛使用效率,減少資源佔用,有望使得電動汽車成為城市的公共交通工具。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※教你寫出一流的銷售文案?

[源碼解析] 從TimeoutException看Flink的心跳機制

[源碼解析] 從TimeoutException看Flink的心跳機制

目錄

  • [源碼解析] 從TimeoutException看Flink的心跳機制
    • 0x00 摘要
    • 0x01 緣由
    • 0x02 背景概念
      • 2.1 四大模塊
      • 2.2 Akka
      • 2.3 RPC機制
        • 2.3.1 RpcEndpoint:RPC的基類
        • RpcService:RPC服務提供者
        • RpcGateway:RPC調用的網關
      • 2.4 常見心跳機制
    • 0x03 Flink心跳機制
      • 3.1 代碼和機制
      • 3.2 靜態架構
        • 3.2.1 HeartbeatTarget :監控目標抽象
        • 3.2.2 HeartbeatMonitor : 管理heartbeat target的心跳狀態
        • 3.2.3 HeartbeatManager :心跳管理者
        • 3.2.4 HearbeatListener 處理心跳結果
      • 3.3 動態運行機制
        • 3.3.1 HearbeatManagerImpl : Receiver
        • 3.3.2 HeartbeatManagerSenderImpl : Sender
        • 3.3.3 HeartbeatMonitorImpl
        • 3.3.3 HeartbeatServices
    • 0x04 初始化
      • 4.1 心跳服務創建
    • 0x05 Flink中具體應用
      • 5.1 總述
        • 5.1.1 RM, JM, TM之間關係
        • 5.1.2 三者間心跳機制
      • 5.2 初始化過程
        • 5.2.1 TaskExecutor初始化
        • 5.2.2 JobMaster的初始化
        • 5.2.3 ResourceManager初始化
      • 5.3 註冊過程
        • 5.3.1 TM註冊到RM中
          • 5.3.1.1 TM的操作
          • 5.3.1.2 RM的操作
          • 5.3.1.3 返回到TM
        • 5.3.2 TM註冊到 JM
      • 5.4 心跳過程
        • 5.4.1 ResourceManager主動發起
          • 5.4.1.1 Sender遍歷所有監控的Monitor(Target)
          • 5.4.1.2 Target進行具體操作
          • 5.4.1.3 RPC調用
        • 5.4.2 RM通過RPC調用TM
        • 5.4.3 TM 通過RPC回到 RM
      • 5.5 超時處理
        • 5.5.1 TaskManager
        • 5.5.2 ResourceManager
    • 0x06 解決問題
    • 0x07 參考

0x00 摘要

本文從一個調試時候常見的異常 “TimeoutException: Heartbeat of TaskManager timed out”切入,為大家剖析Flink的心跳機制。文中代碼基於Flink 1.10。

0x01 緣由

大家如果經常調試Flink,當進入斷點看到了堆棧和變量內容之後,你容易陷入了沉思。當你發現了問題可能所在,高興的讓程序Resume的時候,你發現程序無法運行,有如下提示:

Caused by: java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 93aa1740-cd2c-4032-b74a-5f256edb3217 timed out.

這實在是很鬱悶的事情。作為程序猿不能忍啊,既然異常提示中有 Heartbeat 字樣,於是我們就來一起看看Flink的心跳機制,看看有沒有可以修改的途徑。

0x02 背景概念

2.1 四大模塊

Flink有核心四大組件:Dispatcher,JobMaster,ResourceManager,TaskExecutor。

  • Dispatcher(Application Master)用於接收client提交的任務和啟動相應的JobManager。其提供REST接口來接收client的application提交,負責啟動JM和提交application,同時運行Web UI。
  • ResourceManager:主要用於資源的申請和分配。當TM有空閑的slot就會告訴JM,沒有足夠的slot也會啟動新的TM。kill掉長時間空閑的TM。
  • JobMaster :功能主要包括(舊版本中JobManager的功能在新版本中以JobMaster形式出現,可能本文中會混淆這兩個詞,請大家諒解):
    • 將JobGraph轉化為ExecutionGraph(physical dataflow graph,并行化)。
    • 向RM申請資源、schedule tasks、保存作業的元數據。
  • TaskManager:類似Spark的executor,會跑多個線程的task、數據緩存與交換。Flink 架構遵循 Master – Slave 架構設計原則,JobMaster 為 Master 節點,TaskManager 為Slave節點。

這四大組件彼此之間的通信需要依賴RPC實現

2.2 Akka

Flink底層RPC基於Akka實現。Akka是一個開發併發、容錯和可伸縮應用的框架。它是Actor Model的一個實現,和Erlang的併發模型很像。在Actor模型中,所有的實體被認為是獨立的actors。actors和其他actors通過發送異步消息通信。

Actor模型的強大來自於異步。它也可以顯式等待響應,這使得可以執行同步操作。但是強烈不建議同步消息,因為它們限制了系統的伸縮性。

2.3 RPC機制

RPC作用是:讓異步調用看起來像同步調用。

Flink基於Akka構建了其底層通信系統,引入了RPC調用,各節點通過GateWay方式回調,隱藏通信組件的細節,實現解耦。Flink整個通信框架的組件主要由RpcEndpoint、RpcService、RpcServer、AkkaInvocationHandler、AkkaRpcActor等構成。

RPC相關的主要接口如下:

  • RpcEndpoint
  • RpcService
  • RpcGateway

2.3.1 RpcEndpoint:RPC的基類

RpcEndpoint是Flink RPC終端的基類,所有提供遠程過程調用的分佈式組件必須擴展RpcEndpoint,其功能由RpcService支持。

RpcEndpoint的子類只有四類組件:Dispatcher,JobMaster,ResourceManager,TaskExecutor,即Flink中只有這四個組件有RPC的能力,換句話說只有這四個組件有RPC的這個需求。

每個RpcEndpoint對應了一個路徑(endpointId和actorSystem共同確定),每個路徑對應一個Actor,其實現了RpcGateway接口,

RpcService:RPC服務提供者

RpcServer是RpcEndpoint的成員變量,為RpcService提供RPC服務/連接遠程Server,其只有一個子類實現:AkkaRpcService(可見目前Flink的通信方式依然是Akka)。

RpcServer用於啟動和連接到RpcEndpoint, 連接到rpc服務器將返回一個RpcGateway,可用於調用遠程過程。

Flink四大組件Dispatcher,JobMaster,ResourceManager,TaskExecutor,都是RpcEndpoint的實現,所以構建四大組件時,同步需要初始化RpcServer。如JobManager的構造方式,第一個參數就是需要知道RpcService。

RpcGateway:RPC調用的網關

Flink的RPC協議通過RpcGateway來定義;由前面可知,若想與遠端Actor通信,則必須提供地址(ip和port),如在Flink-on-Yarn模式下,JobMaster會先啟動ActorSystem,此時TaskExecutor的Container還未分配,後面與TaskExecutor通信時,必須讓其提供對應地址。

Dispatcher,JobMaster,ResourceManager,TaskExecutor 這四大組件通過各種方式實現了Gateway。以JobMaster為例,JobMaster實現JobMasterGateway接口。各組件類的成員變量都有需要通信的其他組件的GateWay實現類,這樣可通過各自的Gateway實現RPC調用。

2.4 常見心跳機制

常見的心跳檢測有兩種:

  • socket 套接字SO_KEEPALIVE本身帶有的心跳機制,定期向對方發送心跳包,對方收到心跳包後會自動回復;
  • 應用自身實現心跳機制,同樣也是使用定期發送請求的方式;

Flink實現的是第二種方案。

0x03 Flink心跳機制

3.1 代碼和機制

Flink的心跳機制代碼在:

Flink-master/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat

四個接口:

HeartbeatListener.java          HeartbeatManager.java      HeartbeatTarget.java  HeartbeatMonitor.java

以及如下幾個類:

HeartbeatManagerImpl.java   HeartbeatManagerSenderImpl.java   HeartbeatMonitorImpl.java
HeartbeatServices.java    NoOpHeartbeatManager.java

Flink集群有多種業務流程,比如Resource Manager, Task Manager, Job Manager。每種業務流程都有自己的心跳機制。Flink的心跳機制只是提供接口和基本功能,具體業務功能由各業務流程自己實現

我們首先設定 心跳系統中有兩種節點:sender和receiver。心跳機制是sender和receivers彼此相互檢測。但是檢測動作是Sender主動發起,即Sender主動發送請求探測receiver是否存活,因為Sender已經發送過來了探測心跳請求,所以這樣receiver同時也知道Sender是存活的,然後Reciver給Sender回應一個心跳錶示自己也是活着的。

因為Flink的幾個名詞和我們常見概念有所差別,所以流程上需要大家仔細甄別,即:

  • Flink Sender 主動發送Request請求給Receiver,要求Receiver回應一個心跳;
  • Flink Receiver 收到Request之後,通過Receive函數回應一個心跳請求給Sender;

3.2 靜態架構

3.2.1 HeartbeatTarget :監控目標抽象

HeartbeatTarget是對監控目標的抽象。心跳機制在行為上而言有兩種動作:

  • 向某個節點發送請求。
  • 處理某個節點發來的請求。

HeartbeatTarget的函數就是這兩個動作:

  • receiveHeartbeat :向某個節點(Sender)發送心跳回應,其參數heartbeatOrigin 就是 Receiver。
  • requestHeartbeat :向某個節點(Receiver)要求其回應一個心跳,其參數requestOrigin 就是 Sender。requestHeartbeat這個函數是Sender的函數,其中Sender通過RPC直接調用到Receiver

這兩個函數的參數也很簡單:分別是請求的發送放和接收方,還有Payload載荷。對於一個確定節點而言,接收的和發送的載荷是同一類型的。

public interface HeartbeatTarget<I> {
	/**
	 * Sends a heartbeat response to the target.
	 * @param heartbeatOrigin Resource ID identifying the machine for which a heartbeat shall be reported.
	 */
  // heartbeatOrigin 就是 Receiver
	void receiveHeartbeat(ResourceID heartbeatOrigin, I heartbeatPayload);

	/**
	 * Requests a heartbeat from the target. 
	 * @param requestOrigin Resource ID identifying the machine issuing the heartbeat request.
	 */
  // requestOrigin 就是 Sender
	void requestHeartbeat(ResourceID requestOrigin, I heartbeatPayload);
}

3.2.2 HeartbeatMonitor : 管理heartbeat target的心跳狀態

對HeartbeatTarget的封裝,這樣Manager對Target的操作是通過對Monitor完成,後續會在其繼承類中詳細說明。

public interface HeartbeatMonitor<O> {
  // Gets heartbeat target.
	HeartbeatTarget<O> getHeartbeatTarget();
	// Gets heartbeat target id.
	ResourceID getHeartbeatTargetId();
	// Report heartbeat from the monitored target.
	void reportHeartbeat();
	//Cancel this monitor.
	void cancel();
  //Gets the last heartbeat.
	long getLastHeartbeat();
}

3.2.3 HeartbeatManager :心跳管理者

HeartbeatManager負責管理心跳機制,比如啟動/停止/報告一個HeartbeatTarget。此接口繼承HeartbeatTarget。

除了HeartbeatTarget的函數之外,這接口有4個函數:

  • monitorTarget,把和某資源對應的節點加入到心跳監控列表;
  • unmonitorTarget,從心跳監控列表刪除某資源對應的節點;
  • stop,停止心跳管理服務,釋放資源;
  • getLastHeartbeatFrom,獲取某節點的最後一次心跳數據。
public interface HeartbeatManager<I, O> extends HeartbeatTarget<I> {
	void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget);
	void unmonitorTarget(ResourceID resourceID);
	void stop();
	long getLastHeartbeatFrom(ResourceID resourceId);
}

3.2.4 HearbeatListener 處理心跳結果

用戶業務邏輯需要繼承這個接口以處理心跳結果。其可以看做服務的輸出,實現了三個回調函數

  • notifyHeartbeatTimeout,處理節點心跳超時
  • reportPayload,處理節點發來的Payload載荷
  • retrievePayLoad。獲取對某節點發下一次心跳請求的Payload載荷
public interface HeartbeatListener<I, O> {
	void notifyHeartbeatTimeout(ResourceID resourceID);
	void reportPayload(ResourceID resourceID, I payload);
	O retrievePayload(ResourceID resourceID);
}

3.3 動態運行機制

之前提到Sender和Receiver,下面兩個類就對應上述概念。

  • HeartbeatManagerImpl :Receiver,存在於JobMaster與TaskExecutor中;
  • HeartbeatManagerSenderImpl :Sender,繼承 HeartbeatManagerImpl類,用於周期發送心跳要求,存在於JobMaster、ResourceManager中。

幾個關鍵問題:

  1. 如何判定心跳超時?

    心跳服務啟動后,Flink在Monitor中通過 ScheduledFuture 會啟動一個線程來處理心跳超時事件。在設定的心跳超時時間到達后才執行線程。

    如果在設定的心跳超時時間內接收到組件的心跳消息,會先將該線程取消而後重新開啟,重置心跳超時事件的觸發。

    如果在設定的心跳超時時間內沒有收到組件的心跳,則會通知組件:你超時了。

  2. 何時”調用雙方”發起心跳檢查?

    心跳檢查是雙向的,一方(Sender)會主動發起心跳請求,而另一方(Receiver)則是對心跳做出響應,兩者通過RPC相互調用,重置對方的 Monitor 超時線程。

    以JobMaster和TaskManager為例,JM在啟動時會開啟周期調度,向已經註冊到JM中的TM發起心跳檢查,通過RPC調用TM的requestHeartbeat方法,重置TM中對JM超時線程的調用,表示當前JM狀態正常。在TM的requestHeartbeat方法被調用后,通過RPC調用JM的receiveHeartbeat,重置 JM 中對TM超時線程的調用,表示TM狀態正常。

  3. 如何處理心跳超時?

    心跳服務依賴 HeartbeatListener,當在timeout時間範圍內未接收到心跳響應,則會觸發超時處理線程,該線程通過調用HeartbeatListener.notifyHeartbeatTimeout方法做後續重連操作或者直接斷開。

下面是一個概要(以RM & TM為例):

  • RM : 實現了ResourceManagerGateway (可以直接被RPC調用)

  • TM : 實現了TaskExecutorGateway (可以直接被RPC調用)

  • RM :有一個Sender HM : taskManagerHeartbeatManager,Sender HM 擁有用戶定義的 TaskManagerHeartbeatListener

  • TM :有一個Receiver HM :resourceManagerHeartbeatManager,Receiver HM 擁有用戶定義的ResourceManagerHeartbeatListener。

  • HeartbeatManager 有一個ConcurrentHashMap<ResourceID, HeartbeatMonitor > heartbeatTargets,這個Map是它監控的所有Target。

  • 對於RM的每一個需要監控的TM, 其生成一個HeartbeatTarget,進而被構造成一個HeartbeatMonitor,放置到ResourceManager.taskManagerHeartbeatManager中。

  • 每一個Target對應的Monitor中,有自己的異步任務ScheduledFuture,這個ScheduledFuture不停的被取消/重新生成。如果在某個期間內沒有被取消,則通知用戶定義的listener出現了timeout。

3.3.1 HearbeatManagerImpl : Receiver

HearbeatManagerImpl是receiver的具體實現。它由 心跳 被發起方(就是Receiver,例如TM) 創建,接收 發起方(就是Sender,例如 JM)的心跳發送請求。心跳超時 會觸發 heartbeatListener.notifyHeartbeatTimeout方法。

注意:被發起方監控線程(Monitor)的開啟是在接收到請求心跳(requestHeartbeat被調用后)以後才觸發的,屬於被動觸發。

HearbeatManagerImpl主要維護了

  • 一個心跳監控列表 map : <ResourceID, HeartbeatMonitor<O>> heartbeatTargets;。這是一個KV關聯。

    key代表要發送心跳組件(例如:TM)的ID,value則是為當前組件創建的觸發心跳超時的線程HeartbeatMonitor,兩者一一對應。

    當一個從所聯繫的machine發過來的心跳被收到時候,對應的monitor的狀態會被更新(重啟一個新ScheduledFuture)。當一個monitor發現了一個 heartbeat timed out,它會通知自己的HeartbeatListener。

  • 一個 ScheduledExecutor mainThreadExecutor 負責heartbeat timeout notifications。

  • heartbeatListener :處理心跳結果。

HearbeatManagerImpl 數據結構如下:

@ThreadSafe
public class HeartbeatManagerImpl<I, O> implements HeartbeatManager<I, O> {

	/** Heartbeat timeout interval in milli seconds. */
	private final long heartbeatTimeoutIntervalMs;

	/** Resource ID which is used to mark one own's heartbeat signals. */
	private final ResourceID ownResourceID;

	/** Heartbeat listener with which the heartbeat manager has been associated. */
	private final HeartbeatListener<I, O> heartbeatListener;

	/** Executor service used to run heartbeat timeout notifications. */
	private final ScheduledExecutor mainThreadExecutor;

	/** Map containing the heartbeat monitors associated with the respective resource ID. */
	private final ConcurrentHashMap<ResourceID, HeartbeatMonitor<O>> heartbeatTargets;

	/** Running state of the heartbeat manager. */
	protected volatile boolean stopped;  
}

HearbeatManagerImpl實現的主要函數有:

  • monitorTarget :把一個節點加入到心跳監控列表。
    • 傳入參數有:ResourceId和HearbeatTarget,monitorTarget根據這兩個參數,生成一個HeartbeatMonitor對象,然後把這個對象跟ResrouceId做kv關聯,存入到heartbeatTargets。 一個節點可能參与多個業務流程,因此一個節點參与多個心跳流程,一個節點上運行多個不同類型的HearbeatTarget。所以一個ResourceID可能會跟不同類型的HearbeatTarget對象關聯,分別加入到多個HeartbeatManager,進行不同類型的心跳監控。也因此這個函數入參是兩個參數。
  • requestHeartbeat :Sender通過RPC異步調用到Receiver的這個函數 以要求receiver向requestOrigin節點(就是Sender)發起一次心跳響應,載荷是heartbeatPayLoad。其內部流程如下:
    • 首先會調用reportHeartbeat函數,作用是 通過Monitor 記錄發起請求的這個時間點,然後創建一個ScheduleFuture。如果到期后,requestOrigin沒有作出響應,那麼就將requestOrigin節點對應的HeartbeatMonitor的state設置成TIMEOUT狀態,如果到期內requestOrigin響應了,ScheduleFuture會被取消,HeartbeatMonitor的state仍然是RUNNING。
    • 其次調用reportPayload函數,把requestOrigin節點的最新的heartbeatPayload通知給heartbeatListener。heartbeatListener是外部傳入的,它根據所有節點的心跳記錄做監聽管理。
    • 最後調用receiveHearbeat函數,響應一個心跳給Sender。

3.3.2 HeartbeatManagerSenderImpl : Sender

繼承HearbeatManagerImpl,由心跳管理的一方(例如JM)創建,實現了run函數(即它可以作為一個單獨線程運行),創建后立即開啟周期調度線程,每次遍歷自己管理的heartbeatTarget,觸發heartbeatTarget.requestHeartbeat,要求 Target 返回一個心跳響應。屬於主動觸發心跳請求。

public class HeartbeatManagerSenderImpl<I, O> extends HeartbeatManagerImpl<I, O> implements Runnable {
	public void run() {
		if (!stopped) {
			for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {
				requestHeartbeat(heartbeatMonitor);
			}
      // 周期調度
			getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
		}
	}

  //  主動發起心跳檢查
	private void requestHeartbeat(HeartbeatMonitor<O> heartbeatMonitor) {
		O payload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId());
		final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();
    // 調用 Target 的 requestHeartbeat 函數
		heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload);
	}  
}

3.3.3 HeartbeatMonitorImpl

Heartbeat monitor管理心跳目標,它啟動一個ScheduledExecutor。

  • 如果在timeout時間內沒有接收到心跳信號,則判定心跳超時,通知給HeartbeatListener。
  • 如果在timeout時間內接收到心跳信號,則重置當前ScheduledExecutor。
public class HeartbeatMonitorImpl<O> implements HeartbeatMonitor<O>, Runnable {

	/** Resource ID of the monitored heartbeat target. */
	private final ResourceID resourceID;   // 被監控的resource ID

	/** Associated heartbeat target. */
	private final HeartbeatTarget<O> heartbeatTarget; //心跳目標

	private final ScheduledExecutor scheduledExecutor;

	/** Listener which is notified about heartbeat timeouts. */
	private final HeartbeatListener<?, ?> heartbeatListener; // 心跳監聽器

	/** Maximum heartbeat timeout interval. */
	private final long heartbeatTimeoutIntervalMs;

	private volatile ScheduledFuture<?> futureTimeout;
  //  AtomicReference  使用 
	private final AtomicReference<State> state = new AtomicReference<>(State.RUNNING);
   //  最近一次接收到心跳的時間
  private volatile long lastHeartbeat;
  
	// 報告心跳
	public void reportHeartbeat() {
    // 保留最近一次接收心跳時間
		lastHeartbeat = System.currentTimeMillis();
    // 接收心跳后,重置timeout線程
		resetHeartbeatTimeout(heartbeatTimeoutIntervalMs);
	}

  // 心跳超時,觸發lister的notifyHeartbeatTimeout
	public void run() {
		// The heartbeat has timed out if we're in state running
		if (state.compareAndSet(State.RUNNING, State.TIMEOUT)) {
			heartbeatListener.notifyHeartbeatTimeout(resourceID);
		}
	}

  //  重置TIMEOUT
	void resetHeartbeatTimeout(long heartbeatTimeout) {
		if (state.get() == State.RUNNING) {
      //先取消線程,在重新開啟
			cancelTimeout();
      // 啟動超時線程
			futureTimeout = scheduledExecutor.schedule(this, heartbeatTimeout, TimeUnit.MILLISECONDS);

			// Double check for concurrent accesses (e.g. a firing of the scheduled future)
			if (state.get() != State.RUNNING) {
				cancelTimeout();
			}
		}
	}

3.3.3 HeartbeatServices

建立heartbeat receivers and heartbeat senders,主要是對外提供服務。這裏我們可以看到:

  • HeartbeatManagerImpl就是receivers。
  • HeartbeatManagerSenderImpl就是senders。
public class HeartbeatServices {
	// Creates a heartbeat manager which does not actively send heartbeats.
	public <I, O> HeartbeatManager<I, O> createHeartbeatManager(...) {
		return new HeartbeatManagerImpl<>(...);
	}
	// Creates a heartbeat manager which actively sends heartbeats to monitoring targets.
	public <I, O> HeartbeatManager<I, O> createHeartbeatManagerSender(...) {
		return new HeartbeatManagerSenderImpl<>(...);
	}
}

0x04 初始化

4.1 心跳服務創建

心跳管理服務在Cluster入口創建。因為我們是調試,所以在MiniCluster.start調用。

public void start() throws Exception {
				......
				heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
				......
}

HeartbeatServices.fromConfiguration會從Configuration中獲取配置信息:

  • 心跳間隔 heartbeat.interval
  • 心跳超時時間 heartbeat.timeout

個就是我們解決最開始問題的思路:從配置信息入手,擴大心跳間隔

public HeartbeatServices(long heartbeatInterval, long heartbeatTimeout) {
		this.heartbeatInterval = heartbeatInterval;
		this.heartbeatTimeout = heartbeatTimeout;
}

public static HeartbeatServices fromConfiguration(Configuration configuration) {
		long heartbeatInterval = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL);
		long heartbeatTimeout = configuration.getLong(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT);

		return new HeartbeatServices(heartbeatInterval, heartbeatTimeout);
}

0x05 Flink中具體應用

5.1 總述

5.1.1 RM, JM, TM之間關係

系統中有幾個ResourceManager?整個 Flink 集群中只有一個 ResourceManager。

系統中有幾個JobManager?JobManager 負責管理作業的執行。默認情況下,每個 Flink 集群只有一個 JobManager 實例。JobManager 相當於整個集群的 Master 節點,負責整個集群的任務管理和資源管理。

系統中有幾個TaskManager?這個由具體啟動方式決定。比如Flink on Yarn,Session模式能夠指定拉起多少個TaskManager。 Per job模式中TaskManager數量是在提交作業時根據併發度動態計算,即Number of TM = Parallelism/numberOfTaskSlots。比如:有一個作業,Parallelism為10,numberOfTaskSlots為1,則TaskManager為10。

5.1.2 三者間心跳機制

Flink中ResourceManager、JobMaster、TaskExecutor三者之間存在相互檢測的心跳機制:

  • ResourceManager會主動發送請求探測JobMaster、TaskExecutor是否存活。
  • JobMaster也會主動發送請求探測TaskExecutor是否存活,以便進行任務重啟或者失敗處理。

我們之前講過,HeartbeatManagerSenderImpl屬於Sender,HeartbeatManagerImpl屬於Receiver。

  1. HeartbeatManagerImpl所處位置可以理解為client,存在於JobMaster與TaskExecutor中;
  2. HeartbeatManagerSenderImpl類,繼承 HeartbeatManagerImpl類,用於周期發送心跳請求,所處位置可以理解為server, 存在於JobMaster、ResourceManager中。

ResourceManager 級別最高,所以兩個HM都是Sender,監控taskManager和jobManager

public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
		extends FencedRpcEndpoint<ResourceManagerId>
		implements ResourceManagerGateway, LeaderContender {
	taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender
	jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender
}

JobMaster級別中等,一個Sender, 一個Receiver,受到ResourceManager的監控,監控taskManager。

public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService {
	taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender
	resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager
}

TaskExecutor級別最低,兩個Receiver,分別被JM和RM疾控。

public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
	this.jobManagerHeartbeatManager = return heartbeatServices.createHeartbeatManager
	this.resourceManagerHeartbeatManager = return heartbeatServices.createHeartbeatManager
}

以JobManager和TaskManager為例。JM在啟動時會開啟周期調度,向已經註冊到JM中的TM發起心跳檢查,通過RPC調用TM的requestHeartbeat方法,重置對JM超時線程的調用,表示當前JM狀態正常。在TM的requestHeartbeat方法被調用后,通過RPC調用JM的receiveHeartbeat,重置對TM超時線程的調用,表示TM狀態正常。

5.2 初始化過程

5.2.1 TaskExecutor初始化

TM初始化生成了兩個Receiver HM。

public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
	/** The heartbeat manager for job manager in the task manager. */
	private final HeartbeatManager<AllocatedSlotReport, AccumulatorReport> jobManagerHeartbeatManager;

	/** The heartbeat manager for resource manager in the task manager. */
	private final HeartbeatManager<Void, TaskExecutorHeartbeatPayload> resourceManagerHeartbeatManager;
  
  //初始化函數
	this.jobManagerHeartbeatManager = createJobManagerHeartbeatManager(heartbeatServices, resourceId);
	this.resourceManagerHeartbeatManager = createResourceManagerHeartbeatManager(heartbeatServices, resourceId);
}

生成HeartbeatManager時,就註冊了ResourceManagerHeartbeatListener和JobManagerHeartbeatListener。

此時,兩個HeartbeatManagerImpl中已經創建好對應monitor線程,只有在JM或者RM執行requestHeartbeat后,才會觸發該線程的執行。

5.2.2 JobMaster的初始化

JM生成了一個Sender HM,一個Receiver HM。這裡會註冊 TaskManagerHeartbeatListener 和 ResourceManagerHeartbeatListener

public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService {
  private HeartbeatManager<AccumulatorReport, AllocatedSlotReport> taskManagerHeartbeatManager;
  private HeartbeatManager<Void, Void> resourceManagerHeartbeatManager;

  private void startHeartbeatServices() {
      taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
        resourceId,
        new TaskManagerHeartbeatListener(),
        getMainThreadExecutor(),
        log);

      resourceManagerHeartbeatManager = heartbeatServices.createHeartbeatManager(
        resourceId,
        new ResourceManagerHeartbeatListener(),
        getMainThreadExecutor(),
        log);
  }
}

5.2.3 ResourceManager初始化

JobMaster在啟動時候,會在startHeartbeatServices函數中生成兩個Sender HeartbeatManager。

taskManagerHeartbeatManager :HeartbeatManagerSenderImpl對象,會反覆啟動一個定時器,定時掃描需要探測的對象並且發送心跳請求。

jobManagerHeartbeatManager :HeartbeatManagerSenderImpl,會反覆啟動一個定時器,定時掃描需要探測的對象並且發送心跳請求。

taskManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
			resourceId,
			new TaskManagerHeartbeatListener(),
			getMainThreadExecutor(),
			log);

jobManagerHeartbeatManager = heartbeatServices.createHeartbeatManagerSender(
			resourceId,
			new JobManagerHeartbeatListener(),
			getMainThreadExecutor(),
			log);  

5.3 註冊過程

我們以TM與RM交互為例。TaskExecutor啟動之後,需要註冊到RM和JM中。

流程圖如下:

 * 1. Run in Task Manager
 *
 *    TaskExecutor.onStart //Life cycle
 *        |
 *        +----> startTaskExecutorServices@TaskExecutor
 *        |     //開始TM服務
 *        |     
 *        +----> resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
 *        |     // 開始連接到RM
 *        |     // start by connecting to the ResourceManager
 *        |    
 *        +----> notifyLeaderAddress@ResourceManagerLeaderListener  
 *        |     // 當RM狀態變化之後,將回調到這裏    
 *        |     // The listener for leader changes of the resource manager.
 *        |        
 *        +----> reconnectToResourceManager@TaskExecutor   
 *        |     // 以下三步調用是漸進的,就是與RM聯繫。
 *        |    
 *        +----> tryConnectToResourceManager@TaskExecutor 
 *        |     
 *        +----> connectToResourceManager()@TaskExecutor
 *        |     // 主要作用是生成了 TaskExecutorToResourceManagerConnection    
 *        |  
 *        +----> start@TaskExecutorToResourceManagerConnection
 *        |     // 開始RPC調用,將會調用到其基類RegisteredRpcConnection的start   
 *        |   
 *        +----> start@RegisteredRpcConnection
 *        |     // RegisteredRpcConnection實現了組件之間註冊聯繫的基本RPC
 *        |      
   
   
 * ~~~~~~~~ 這裡是 Akka RPC
   
 * 2. Run in Resource Manager   
 * 現在程序執行序列到達了RM, 主要是添加一個Target到RM 的 Sender HM;
 *
 *    registerTaskExecutor@ResourceManager
 *        |
 *        +----> taskExecutorGatewayFuture.handleAsync
 *        |     // 異步調用到這裏
 *        |     
 *        +----> registerTaskExecutorInternal@ResourceManager
 *        |     // RM的內部實現,將把TM註冊到RM自己這裏
 *        |      
 *        +----> taskManagerHeartbeatManager.monitorTarget
 *        |     // 生成HeartbeatMonitor,
 *        |      
 *        +---->  heartbeatTargets.put(resourceID,heartbeatMonitor);
 *        |     // 把Monitor放到 HM in TM之中,就是說TM開始監控了RM
 *        |        

 * ~~~~~~~~ 這裡是 Akka RPC
  
 * 3. Run in Task Manager
 * 現在程序回到了TM, 主要是添加一個Target到 TM 的 Receiver HM;
 *
 *    onRegistrationSuccess@TaskExecutorToResourceManagerConnection
 *        |   
 *        |   
 *        +---->  onRegistrationSuccess@ResourceManagerRegistrationListener 
 *        |       // 回調函數
 *        |  
 *        +---->  runAsync(establishResourceManagerConnection) 
 *        |       // 異步執行
 *        |  
 *        +---->  establishResourceManagerConnection@TaskExecutor 
 *        |       // 說明已經和RM建立了聯繫,所以可以開始監控RM了
 *        |  
 *        +---->  resourceManagerHeartbeatManager.monitorTarget 
 *        |     // 生成HeartbeatMonitor,
 *        |      
 *        +---->  heartbeatTargets.put(resourceID,heartbeatMonitor);  
 *        |       // 把 RM 也註冊到 TM了
 *        |       // monitor the resource manager as heartbeat target 

下面是具體文字描述。

5.3.1 TM註冊到RM中

5.3.1.1 TM的操作
  • TaskExecutor啟動之後,調用onStart,開始其生命周期。
  • onStart直接調用startTaskExecutorServices。
  • 啟動服務的第一步就是與ResourceManager取得聯繫,這裏註冊了一個ResourceManagerLeaderListener(),用來監聽RM Leader的變化。
private final LeaderRetrievalService resourceManagerLeaderRetriever;
// resourceManagerLeaderRetriever其實是EmbeddedLeaderService的實現,A simple leader election service, which selects a leader among contenders and notifies listeners.

resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
  • 當得到RM Leader的地址之後,會調用到回調函數notifyLeaderAddress@ResourceManagerLeaderListener,然後調用notifyOfNewResourceManagerLeader。
  • notifyOfNewResourceManagerLeader中獲取到RM地址后,就通過reconnectToResourceManager與RM聯繫。
  • reconnectToResourceManager中間接調用到TaskExecutorToResourceManagerConnection。其作用是建立TaskExecutor 和 ResourceManager之間的聯繫。因為知道 ResourceManagerGateway所以才能進行RPC操作。
  • 然後在 TaskExecutorToResourceManagerConnection中,就通過RPC與RM聯繫。
5.3.1.2 RM的操作
  • RPC調用后,程序就來到了RM中,RM做如下操作:
  • 會註冊一個新的TaskExecutor到自己的taskManagerHeartbeatManager中。
  • registerTaskExecutor@ResourceManager會通過異步調用到registerTaskExecutorInternal。
  • registerTaskExecutorInternal中首先看看是否這個TaskExecutor的ResourceID之前註冊過,如果註冊過就移除再添加一個新的TaskExecutor。
  • 通過 taskManagerHeartbeatManager.monitorTarget 開始進行心跳機制的註冊。
taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() {
				public void receiveHeartbeat(ResourceID resourceID, Void payload) {
					// the ResourceManager will always send heartbeat requests to the
					// TaskManager
				}
				public void requestHeartbeat(ResourceID resourceID, Void payload) {
					taskExecutorGateway.heartbeatFromResourceManager(resourceID);
				}
});

當註冊完成后,RM中的Sender HM內部結構如下,能看出來多了一個Target:

taskManagerHeartbeatManager = {HeartbeatManagerSenderImpl@8866} 
 heartbeatPeriod = 10000
 heartbeatTimeoutIntervalMs = 50000
 ownResourceID = {ResourceID@8871} "040709f36ebf38f309fed518a88946af"
 heartbeatListener = {ResourceManager$TaskManagerHeartbeatListener@8872} 
 mainThreadExecutor = {RpcEndpoint$MainThreadExecutor@8873} 
 heartbeatTargets = {ConcurrentHashMap@8875}  size = 1
  {ResourceID@8867} "630c15c9-4861-4b41-9c95-92504f458b71" -> {HeartbeatMonitorImpl@9448} 
   key = {ResourceID@8867} "630c15c9-4861-4b41-9c95-92504f458b71"
   value = {HeartbeatMonitorImpl@9448} 
    resourceID = {ResourceID@8867} "630c15c9-4861-4b41-9c95-92504f458b71"
    heartbeatTarget = {ResourceManager$2@8868} 
    scheduledExecutor = {RpcEndpoint$MainThreadExecutor@8873} 
    heartbeatListener = {ResourceManager$TaskManagerHeartbeatListener@8872} 
    heartbeatTimeoutIntervalMs = 50000
    futureTimeout = {ScheduledFutureAdapter@10140} 
    state = {AtomicReference@9786} "RUNNING"
    lastHeartbeat = 0
5.3.1.3 返回到TM

RM會通過RPC再次回到TaskExecutor,其新執行序列如下:

  • 首先RPC調用到了 onRegistrationSuccess@TaskExecutorToResourceManagerConnection。
  • 然後onRegistrationSuccess@ResourceManagerRegistrationListener中通過異步執行調用到了establishResourceManagerConnection。這說明TM已經和RM建立了聯繫,所以可以開始監控RM了。
  • 然後和RM操作類似,通過resourceManagerHeartbeatManager.monitorTarget 來把RM註冊到自己這裏。
HeartbeatMonitor<O> heartbeatMonitor = heartbeatMonitorFactory.createHeartbeatMonitor 
heartbeatTargets.put(resourceID, heartbeatMonitor);  

當註冊完成后,其Receiver HM結構如下:

resourceManagerHeartbeatManager = {HeartbeatManagerImpl@10163} 
 heartbeatTimeoutIntervalMs = 50000
 ownResourceID = {ResourceID@8882} "96a9b80c-dd97-4b63-9049-afb6662ea3e2"
 heartbeatListener = {TaskExecutor$ResourceManagerHeartbeatListener@10425} 
 mainThreadExecutor = {RpcEndpoint$MainThreadExecutor@10426} 
 heartbeatTargets = {ConcurrentHashMap@10427}  size = 1
  {ResourceID@8886} "122fa66685133b11ea26ee1b1a6cef75" -> {HeartbeatMonitorImpl@10666} 
   key = {ResourceID@8886} "122fa66685133b11ea26ee1b1a6cef75"
   value = {HeartbeatMonitorImpl@10666} 
    resourceID = {ResourceID@8886} "122fa66685133b11ea26ee1b1a6cef75"
    heartbeatTarget = {TaskExecutor$1@10668} 
    scheduledExecutor = {RpcEndpoint$MainThreadExecutor@10426} 
    heartbeatListener = {TaskExecutor$ResourceManagerHeartbeatListener@10425} 
    heartbeatTimeoutIntervalMs = 50000
    futureTimeout = {ScheduledFutureAdapter@10992} 
    state = {AtomicReference@10667} "RUNNING"
    lastHeartbeat = 0

5.3.2 TM註冊到 JM

其調用基本思路與之前相同,就是TM和JM之間互相註冊一個代表對方的monitor:

JobLeaderListenerImpl ----> establishJobManagerConnection

消息到了JM中,做如下操作。

registerTaskManager ----> taskManagerHeartbeatManager.monitorTarget
  // monitor the task manager as heartbeat target

5.4 心跳過程

在任務提交之後,我們就進入了正常的心跳監控流程。我們依然用 TM 和 RM進行演示。

我們先給出一個流程圖。

 * 1. Run in Resouce Manager
 *
 *    HeartbeatManagerSender in RM
 *        |
 *        +----> run@HeartbeatManagerSenderImpl
 *        |     //遍歷所有監控的Monitor(Target),逐一在Target上調用requestHeartbeat
 *        |     
 *        +----> requestHeartbeat@HeartbeatManagerSenderImpl 
 *        |     // 將調用具體監控對象的自定義函數
 *        |     // heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload);
 *        |      
 *        +----> getHeartbeatListener().retrievePayload   
 *        |     // 調用到TaskManagerHeartbeatListener@ResourceManager    
 *        |     // 這裡是return null;,因為RM不會是任何人的Receiver
 *        |        
 *        +----> requestHeartbeat@HeartbeatTarget   
 *        |     // 調用到Target這裏,代碼在ResourceManager這裏,就是生成Target時候賦值的
 *        |    
 *        +----> taskExecutorGateway.heartbeatFromResourceManager
 *        |     // 會通過gateway RPC 調用到TM,這就是主動對TM發起了心跳請求
 *        |      

 * ~~~~~~~~ 這裡是 Akka RPC
   
 * 2. Run in Task Manager   
 * 現在程序執行序列到達了TM, 主要是 1. 重置TM的Monitor線程; 2.返回一些負載信息;
 *
 *    heartbeatFromResourceManager@TaskExecutor
 *        |
 *        +----> resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
 *        |     //開始要調用到 Receiver HM in Task Manager
 *        |     
 *        +----> requestHeartbeat@HeartbeatManager in TM 
 *        |     // 在Receiver HM in Task Manager 這裏運行
 *        |      
 *        +----> reportHeartbeat@HeartbeatMonitor   
 *        |     //reportHeartbeat : 記錄發起請求的這個時間點,然後resetHeartbeatTimeout
 *        |      
 *        +----> resetHeartbeatTimeout@HeartbeatMonitor
 *        |     // 如果Monitor狀態依然是RUNNING,則取消之前設置的ScheduledFuture。
 *        |     // 重新創建一個ScheduleFuture。因為如果不取消,則之前那個ScheduleFuture運行時
 *        |     // 會調用HeartbeatMonitorImpl.run函數,run直接compareAndSet后,通知目標函數
 *        |     // 目前已經超時,即調用heartbeatListener.notifyHeartbeatTimeout。
 *        |     // 這裏代表 JM 狀態正常。   
 *        |      
 *        +---->  heartbeatListener.reportPayload
 *        |     // 把Target節點的最新的heartbeatPayload通知給heartbeatListener。 
 *        |     // heartbeatListerner是外部傳入的,它根據所擁有的節點的心跳記錄做監聽管理。
 *        |        
 *        +---->  heartbeatTarget.receiveHeartbeat(getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin));
 *        |      
 *        |     
 *        +---->  retrievePayload@ResourceManagerHeartbeatListener in TM   
 *        |       // heartbeatTarget.receiveHeartbeat參數調用的
 *        |     
 *        +---->  return new TaskExecutorHeartbeatPayload
 *        |   
 *        |     
 *        +---->  receiveHeartbeat in TM 
 *        |       // 回到 heartbeatTarget.receiveHeartbeat,這就是TM生成Target的時候的自定義函數
 *        |       // 就是響應一個心跳消息回給RM
 *        |     
 *        +----> resourceManagerGateway.heartbeatFromTaskManager  
 *        |     // 會通過gateway RPC 調用到 ResourcManager
 *        |      

 * ~~~~~~~~ 這裡是 Akka RPC
  
 * 3. Run in Resouce Manager
 * 現在程序回到了RM, 主要是 1.重置RM的Monitor線程;2. 上報收到TaskExecutor的負載信息  
 *
 *    heartbeatFromTaskManager in RM
 *        |   
 *        |   
 *        +---->  taskManagerHeartbeatManager.receiveHeartbeat 
 *        |       // 這是個Sender HM
 *        |  
 *        +---->  HeartbeatManagerImpl.receiveHeartbeat  
 *        |  
 *        |     
 *        +---->  HeartbeatManagerImpl.reportHeartbeat(heartbeatOrigin);
 *        | 
 *        |    
 *        +---->  heartbeatMonitor.reportHeartbeat(); 
 *        |      // 這裏就是重置RM 這裏對應的Monitor。在reportHeartbeat重置 JM monitor線程的觸發,即cancelTimeout取消註冊時候的超時定時任務,並且註冊下一個超時檢測futureTimeout;這代表TM正常執行。
 *        |     
 *        +----> heartbeatListener.reportPayload    
 *        |      //把Target節點的最新的heartbeatPayload通知給 TaskManagerHeartbeatListener。heartbeatListerner是外部傳入的,它根據所擁有的節點的心跳記錄做監聽管理。 
 *        |      
 *        +----> slotManager.reportSlotStatus(instanceId, payload.getSlotReport());
 *        |     // TaskManagerHeartbeatListener中調用,上報收到TaskExecutor的負載信息
 *        |        

下面是具體文字描述。

5.4.1 ResourceManager主動發起

5.4.1.1 Sender遍歷所有監控的Monitor(Target)

心跳機制是由Sender主動發起的。這裏就是 ResourceManager 的HeartbeatManagerSenderImpl中定時schedual調用,這裡會遍歷所有監控的Monitor(Target),逐一在Target上調用requestHeartbeat。

// HeartbeatManagerSenderImpl中的代碼

	@Override
	public void run() {
		if (!stopped) {
			for (HeartbeatMonitor<O> heartbeatMonitor : getHeartbeatTargets().values()) {
        // 這裏向被監控對象節點發起一次心跳請求,載荷是heartbeatPayLoad,要求被監控對象回應心跳
				requestHeartbeat(heartbeatMonitor);
			}
			getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
		}
	}
}

// 運行時候的變量
this = {HeartbeatManagerSenderImpl@9037} 
 heartbeatPeriod = 10000
 heartbeatTimeoutIntervalMs = 50000
 ownResourceID = {ResourceID@8788} "d349506cae32cadbe99b9f9c49a01c95"
 heartbeatListener = {ResourceManager$TaskManagerHeartbeatListener@8789} 
 mainThreadExecutor = {RpcEndpoint$MainThreadExecutor@8790} 

// 調用棧如下
requestHeartbeat:711, ResourceManager$2 (org.apache.flink.runtime.resourcemanager)
requestHeartbeat:702, ResourceManager$2 (org.apache.flink.runtime.resourcemanager)
requestHeartbeat:92, HeartbeatManagerSenderImpl (org.apache.flink.runtime.heartbeat)
run:81, HeartbeatManagerSenderImpl (org.apache.flink.runtime.heartbeat)
call:511, Executors$RunnableAdapter (java.util.concurrent)
run$$$capture:266, FutureTask (java.util.concurrent)
run:-1, FutureTask (java.util.concurrent)
5.4.1.2 Target進行具體操作

具體監控對象 Target 會調用自定義的requestHeartbeat。

HeartbeatManagerSenderImpl

	private void requestHeartbeat(HeartbeatMonitor<O> heartbeatMonitor) {
		O payload = getHeartbeatListener().retrievePayload(heartbeatMonitor.getHeartbeatTargetId());
		final HeartbeatTarget<O> heartbeatTarget = heartbeatMonitor.getHeartbeatTarget();

    // 這裏就是具體監控對象
		heartbeatTarget.requestHeartbeat(getOwnResourceID(), payload);
	}

heartbeatTarget = {ResourceManager$2@10688} 
 taskExecutorGateway = {$Proxy42@9459} "org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler@6d0c8334"
 this$0 = {StandaloneResourceManager@9458} 

請注意,每一個Target都是由ResourceManager生成的。ResourceManager之前註冊成為Monitor時候就註冊了這個HeartbeatTarget。

這個HeartbeatTarget的定義如下,兩個函數是:

  • receiveHeartbeat :這個是空,因為RM沒有自己的Sender。

  • requestHeartbeat :這個針對TM,就是調用TM的heartbeatFromResourceManager,當然是通過RPC調用。

5.4.1.3 RPC調用

會調用到ResourceManager定義的函數requestHeartbeat,而requestHeartbeat會通過gateway調用到TM,這就是主動對TM發起了心跳請求。

taskManagerHeartbeatManager.monitorTarget(taskExecutorResourceId, new HeartbeatTarget<Void>() {
   @Override
   public void receiveHeartbeat(ResourceID resourceID, Void payload) {
      // the ResourceManager will always send heartbeat requests to the TaskManager
   }

   @Override
   public void requestHeartbeat(ResourceID resourceID, Void payload) {
      //就是調用到這裏
      taskExecutorGateway.heartbeatFromResourceManager(resourceID); 
   }
});

5.4.2 RM通過RPC調用TM

通過taskExecutorGateway。心跳程序執行就通過RPC從RM跳躍到了TM。

taskExecutorGateway.heartbeatFromResourceManager 的意義就是:通過RPC調用回到TaskExecutor。這個是在TaskExecutorGateway就定義好的。

// TaskExecutor RPC gateway interface.
public interface TaskExecutorGateway extends RpcGateway

TaskExecutor實現了TaskExecutorGateway,所以具體在TaskExecutor內部實現了接口函數。

@Override
public void heartbeatFromResourceManager(ResourceID resourceID) {
    //調用到了這裏 ...........
		resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
}

TM中,resourceManagerHeartbeatManager 定義如下。

/** The heartbeat manager for resource manager in the task manager. */
private final HeartbeatManager<Void, TaskExecutorHeartbeatPayload> resourceManagerHeartbeatManager;

所以下面就是執行TM中的Receiver HM。在這個過程中有兩個處理步驟:

  1. 調用對應HeartbeatMonitor的reportHeartbeat方法,cancelTimeout取消註冊時候的超時定時任務,並且註冊下一個超時檢測futureTimeout;
  2. 調用monitorTarget的receiveHeartbeat方法,也就是會通過rpc調用JobMaster的heartbeatFromTaskManager方法返回一些負載信息;

具體是調用 requestHeartbeat@HeartbeatManager。在其中會

  • 調用reportHeartbeat@HeartbeatMonitor,記錄發起請求的這個時間點,然後resetHeartbeatTimeout。
  • 在resetHeartbeatTimeout@HeartbeatMonitor之中,如果Monitor狀態依然是RUNNING,則取消之前設置的ScheduledFuture。重新創建一個ScheduleFuture。因為如果不取消,則之前那個ScheduleFuture運行時會調用HeartbeatMonitorImpl.run函數,run直接compareAndSet后,通知目標函數目前已經超時,即調用heartbeatListener.notifyHeartbeatTimeout。
  • 調用 heartbeatListener.reportPayload,把Target節點的最新的heartbeatPayload通知給heartbeatListener。
  • 調用 heartbeatTarget.receiveHeartbeat(getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin)); 就是響應一個心跳消息回給RM。
	@Override
	public void requestHeartbeat(final ResourceID requestOrigin, I heartbeatPayload) {
		if (!stopped) {
			log.debug("Received heartbeat request from {}.", requestOrigin);

			final HeartbeatTarget<O> heartbeatTarget = reportHeartbeat(requestOrigin);

			if (heartbeatTarget != null) {
				if (heartbeatPayload != null) {
					heartbeatListener.reportPayload(requestOrigin, heartbeatPayload);
				}

				heartbeatTarget.receiveHeartbeat(getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin));
			}
		}
	}

最後會通過resourceManagerGateway.heartbeatFromTaskManager 調用到 ResourcManager。

5.4.3 TM 通過RPC回到 RM

JobMaster在接收到rpc請求后調用其heartbeatFromTaskManager方法,會調用taskManagerHeartbeatManager的receiveHeartbeat方法,在這個過程中同樣有兩個處理步驟:

  1. 調用對應HeartbeatMonitor的reportHeartbeat方法,cancelTimeout取消註冊時候的超時定時任務,並且註冊下一個超時檢測futureTimeout;
  2. 調用TaskManagerHeartbeatListener的reportPayload方法,上報收到TaskExecutor的負載信息

至此一次完成心跳過程已經完成,會根據heartbeatInterval執行下一次心跳。

5.5 超時處理

5.5.1 TaskManager

首先,在HeartbeatMonitorImpl中,如果超時,會調用Listener。

public void run() {
   // The heartbeat has timed out if we're in state running
   if (state.compareAndSet(State.RUNNING, State.TIMEOUT)) {
      heartbeatListener.notifyHeartbeatTimeout(resourceID);
   }
}

這就來到了ResourceManagerHeartbeatListener,會嘗試再次連接RM。

private class ResourceManagerHeartbeatListener implements HeartbeatListener<Void, TaskExecutorHeartbeatPayload> {

   @Override
   public void notifyHeartbeatTimeout(final ResourceID resourceId) {
      validateRunsInMainThread();
      // first check whether the timeout is still valid
      if (establishedResourceManagerConnection != null && establishedResourceManagerConnection.getResourceManagerResourceId().equals(resourceId)) {
         reconnectToResourceManager(new TaskManagerException(
            String.format("The heartbeat of ResourceManager with id %s timed out.", resourceId)));
      } else {
         .....
      }
   }

5.5.2 ResourceManager

RM就直接簡單粗暴,關閉連接。

private class TaskManagerHeartbeatListener implements HeartbeatListener<TaskExecutorHeartbeatPayload, Void> {

   @Override
   public void notifyHeartbeatTimeout(final ResourceID resourceID) {
      validateRunsInMainThread();
      closeTaskManagerConnection(
         resourceID,
         new TimeoutException("The heartbeat of TaskManager with id " + resourceID + "  timed out."));
   }
}  

0x06 解決問題

心跳機制我們講解完了,但是我們最初提到的異常應該如何解決呢?在程序最開始生成環境變量時候,通過設置環境變量的配置即可搞定:

Configuration conf = new Configuration();
conf.setString("heartbeat.timeout", "18000000");
final LocalEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);

0x07 參考

[flink-001]flink的心跳機制

Flink中心跳機制

flink1.8 心跳服務

你有必要了解一下Flink底層RPC使用的框架和原理

flink RPC(akka)

弄清Flink1.8的遠程過程調用(RPC)

Apache Flink源碼解析 (七)Flink RPC的底層實現

flink源碼閱讀第一篇—入口

flink-on-yarn 基礎架構和啟動流程

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※教你寫出一流的銷售文案?

一起玩轉微服務(12)——揭密starter

介紹

Spring Boot的starter主要用來簡化依賴用的,對於企業級開發中的與第三方的集成,可以通過一段簡單的配置來完成,這樣開發人員無需再對包依賴的問題頭疼。Spring Boot為我們提供了簡化企業級開發的絕大多數場景的starter pom,只需要指定需要配置的starter,Spring Boot會自動為我們提供配置好的bean。

通常流程

通常我們要搭建一個基於Spring的Web應用,我們需要做以下一些工作:

pom文件中引入相關jar包,包括spring、springmvc、redis、mybaits、log4j、mysql-connector-java 等等相關jar …

配置web.xml,Listener配置、Filter配置、Servlet配置、log4j配置、error配置 …

配置數據庫連接、配置spring事務

配置視圖解析器

開啟註解、自動掃描功能

配置完成後部署tomcat、啟動調試

……

花在搭建一個初始項目,可能一個小時就過去了或者半天就過了,但是用了SpringBoot之後一切都會變得非常便捷。

常用的starter

Spring Boot常用的starter(啟動器)包括:

  • Spring-boot-starter-logging :使用 Spring Boot 默認的日誌框架 Logback。
  • Spring-boot-starter-log4j :添加 Log4j 的支持。
  • Spring-boot-starter-web :支持 Web 應用開發,包含 Tomcat 和 Spring-mvc。
  • Spring-boot-starter-tomcat :使用 Spring Boot 默認的 Tomcat 作為應用服務器。
  • Spring-boot-starter-jetty :使用 Jetty 而不是默認的 Tomcat 作為應用服務器。
  • Spring-boot-starter-test :包含常用的測試所需的依賴,如 Junit、Hamcrest、Mockito 和 Spring-test 等。
  • Spring-boot-starter-AOP :包含 Spring-AOP 和 AspectJ 來支持面向切面編程(AOP)。
  • Spring-boot-starter-security :包含 Spring-security。
  • Spring-boot-starter-jdbc :支持使用 JDBC 訪問數據庫。
  • Spring-boot-starter-redis :支持使用 Redis。
  • Spring-boot-starter-data-mongodb :包含 Spring-data-mongodb 來支持 MongoDB。
  • Spring-boot-starter-data-jpa :包含 Spring-data-jpa、Spring-orm 和 Hibernate 來支持 JPA。
  • Spring-boot-starter-amqp :通過 Spring-rabbit 支持 AMQP。
  • Spring-boot-starter-actuator : 添加適用於生產環境的功能,如性能指標和監測等功能。

當然,如果有必要,也可以定製自己的starter。

起步依賴

在我們的pom文件裏面引入以下jar:

spring-boot-starter-web包自動幫我們引入了web模塊開發需要的相關jar包。

 

mybatis-spring-boot-starter幫我們引入了dao開發相關的jar包。 spring-boot-starter-xxx是官方提供的starter,xxx-spring-boot-starter是第三方提供的starter。

截圖看一下我們的mybatis-spring-boot-starter

可以看出mybatis-spring-boot-starter並沒有任何源碼,只有一個pom文件,它的作用就是幫我們引入其它jar。

 

得益於starter的作用,使用SpringBoot確實方便,但對剛剛上手SpringBoot的人來說,可能只知道配置屬性是在application.xml或application.yml中添加,但他們各自的屬性都有哪些,具體怎麼配置,卻無從下手。這裏先解決SpringBoot-starter中各屬性的配置問題。

Mybatis的配置是怎麼生效的?查看示例工程的pom依賴:

注意到mybatis-spring-boot-starter幫我們自動依賴了Mybatis所需jar包,其中有一個負責自動配置的mybatis-spring-boot-autoconfigure.jar,緊接着打開此jar,如下:

META-INF/spring-configuration-metadata.json中便是Mybatis在SpringBoot中的所有配置屬性和介紹。

SpringBoot-starter自動配置bean

現在已得知jar包是怎麼樣自動依賴進來,以及他們的配置屬性,那麼接下來該考慮Mybatis所需的bean(如必需的sqlSessionFactory、sqlSessionTemplate等)是如何被自動加載的?

理所應當地,我們繼續去查看mybatis-spring-boot-autoconfigure.jar,注意到裏面有一個自動配置的類MybatisAutoConfiguration:

(1)@Configuration:被掛上@Configuration註解,表明它是一個配置類,作用等同於xml配置,裏面有被@Bean註解的方法,也等同於xml配置的各種。

(2)@ConditionalOnClass/@ConditionalOnBean:自動配置條件註解,用於在某一部分配置中,將另一模塊部分的配置自動加載進來,因為隨着系統越來越大,配置內容越來越多,我們應當將Mybatis的配置放在一處,將log4j的配置放在一處,將SpringBoot自身的配置放在一處,當他們需要互相依賴時,可通過這類註解進行自動配置,如下:

@ConditionalOnClass @ConditionalOnMissingClass
@ConditionalOnBean @ConditionalOnMissingBean
@ConditionalOnProperty
@ConditionalOnResource
@ConditionalOnWebApplication @ConditionalOnNotWebApplication
@ConditionalOnExpression
 
@AutoConfigureAfter @AutoConfigureBefore @AutoConfigureOrder(指定順序)
 

(3)@EnableConfigurationProperties:啟用對@ConfigurationProperties註解的bean的支持,這裏對應了配置屬性類MybatisProperties,它裏面定義了Mybatis的所有配置。

(4)@AutoConfigureAfter:應在其他指定的自動配置類之後應用自動配置。即org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration被自動配置后,才會接着自動配置MybatisAutoConfiguration。這裏也解釋了為什麼我們在application.xml中只配置了數據源,而沒有配置Mybatis,但是Mybatis可以正常查庫的原因,就是因為它們配置之間的依賴關係。

到這裏,差不多明白了starter自動配置bean的方式,但是如若再去深究,各種starter的bean是如何被自動加載的,猜想會不會是項目啟動后,SpringBoot自動掃描裏面所有的jar包,再去掃描所有的類,從而將各個bean放置IOC容器中。從結果來看,肯定是SpringBoot在啟動時確確實實地自動加載了數據源和Mybatis相關的bean,不然他們無法正常工作。

回想在我們啟動示例工程時,SpringBoot會自動掃描啟動類所在包下的所有類,而如果還去掃描所有的jar包的話,又是具體怎麼做到的?不妨從入口類調試一把,在SpringApplication.run(DemoApplication.class, args)打斷點,一直追蹤到getSpringFactoriesInstances這塊:

查看SpringFactoriesLoader.loadFactoryNames的方法註釋:

 

使用給定的類加載器從META-INF / spring.factories加載給定類型的工廠實現的完全限定類名。

這裏的spring.factories剛好也存在於mybatis-spring-boot-autoconfigure.jar中,

繼續調試,進入SpringFactoriesLoader.loadFactoryNames,

這裏用類加載器得到工程中所有jar包中的META-INF/spring.factories文件資源,進而通過此文件得到了一些包括自動配置相關的類的集合,有各種工廠類、監聽器、處理器、過濾器、初始化器等等,如下:

最後的org.springframework.boot.autoconfigure.EnableAutoConfiguration集合中當然包括了org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration和org.mybatis.spring.boot.autoconfigure.MybatisAutoConfiguration。接着必然是將實例化的各個bean放進IOC容器中。

至此我們便明白了SpringBoot是如何自動配置starter裏面的bean的。

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※教你寫出一流的銷售文案?