你知道和你不知道的冒泡排序

這篇文章包含了你一定知道的,和你不一定知道的冒泡排序。

gif看不了可以點擊【原文】查看gif。

1. 什麼是冒泡排序

可能對於大多數的人來說比如我,接觸的第一個算法就是冒泡排序。

我看過的很多的文章都把冒泡排序描述成我們喝的汽水,底部不停的有二氧化碳的氣泡往上冒,還有描述成魚吐泡泡,都特別的形象。

其實結合一杯水來對比很好理解,將我們的數組豎著放進杯子,數組中值小的元素密度相對較小,值大的元素密度相對較大。這樣一來,密度大的元素就會沉入杯底,而密度小的元素會慢慢的浮到杯子的最頂部,稍微專業一點描述如下。

冒泡算法會運行多輪,每一輪會依次比較數組中相鄰的兩個元素的大小,如果左邊的元素大於右邊的元素,則交換兩個元素的位置。最終經過多輪的排序,數組最終成為有序數組。

2. 排序過程展示

我們先不聊空間複雜度和時間複雜度的概念,我們先通過一張動圖來了解一下冒泡排序的過程。

這個圖形象的還原了密度不同的元素上浮和下沉的過程。

3. 算法V1

3.1 代碼實現

private void bubbleSort(int[] arr) {
  for (int i = 0; i < arr.length; i++) {
    for (int j = 0; j < arr.length - 1; j++) {
      if (arr[j] > arr[j + 1]) {
        exchange(arr, j, j + 1);
      }
    }
  }
}

private void exchange(int arr[], int i, int j) {
  int temp = arr[i];
  arr[i] = arr[j];
  arr[j] = temp;
}

int[] arr = new int[]{5, 1, 3, 7, 6, 2, 4};
bubbleSort(arr);
System.out.println(Arrays.toString(arr)); // [1, 2, 3, 4, 5, 6, 7]

3.2 實現分析

各位大佬看了上面的代碼之後先別激動,坐下坐下,日常操作。可能很多的第一個冒泡排序算法就是這麼寫的,比如我,同時還自我感覺良好,覺得算法也不過如此。

我們還是以數組[5, 1, 3, 7, 6, 2, 4]為例,我們通過動圖來看一下過程。

思路很簡單,我們用兩層循環來實現冒泡排序。

  • 第一層,控制冒泡排序總共執行的輪數,例如例子數組的長度是7,那麼總共需要執行6輪。如果長度是n,則需要執行n-1輪
  • 第二層,負責從左到右依次的兩兩比較相鄰元素,並且將大的元素交換到右側

這就是冒泡排序V1的思路。

下錶是通過對一個0-100000的亂序數組的標準樣本,使用V1算法進行排序所總共執行的次數,以及對同一個數組執行100次V1算法的所花的平均時間。

算法執行情況 結果
樣本 [0 – 100000] 的亂序數組
算法 V1 執行的總次數 99990000 次(9999萬次
算法 V1 運行 100 次的平均時間 181 ms

4. 算法V2

4.1 實現分析

仔細看動圖我們可以發現,每一輪的排序,都從數組的最左端再到最右。而每一輪的冒泡,都可以確定一個最大的數,固定在數組的最右邊,也就是密度最大的元素會冒泡到杯子的最上面。

還是拿上面的數組舉例子。下圖是第一輪冒泡之後數組的元素位置。

第二輪排序之後如下。

可以看到,每一輪排序都會確認一個最大元素,放在數組的最後面,當算法進行到後面,我們根本就沒有必要再去比較數組後面已經有序的片段,我們接下來針對這個點來優化一下。

4.2 代碼實現

這是優化之後的代碼。

private void bubbleSort(int[] arr) {
  for (int i = 0; i < arr.length - 1; i++) {
    for (int j = 0; j < arr.length - 1 - i; j++) {
      if (arr[j] > arr[j + 1]) {
        exchange(arr, j, j + 1);
      }
    }
  }
}

private void exchange(int arr[], int i, int j) {
  int temp = arr[i];
  arr[i] = arr[j];
  arr[j] = temp;
}

int[] arr = new int[]{5, 1, 3, 7, 6, 2, 4};
bubbleSort(arr);
System.out.println(Arrays.toString(arr)); // [1, 2, 3, 4, 5, 6, 7]

優化之後的實現,也就變成了我們動圖中所展示的過程。

每一步之後都會確定一個元素在數組中的位置,所以之後的每次冒泡的需要比較的元素個數就會相應的減1。這樣一來,避免了去比較已經有序的數組,從而減少了大量的時間。

算法執行情況 結果
樣本 [0 – 10000] 的亂序數組
算法 V2 執行的總次數 49995000 次(4999萬次
算法 V2 運行 100 次的平均時間 144 ms
運行時間與 V1 對比 V2 運行時間減少 20.44 %
執行次數與 V1 對比 V2 運行次數減少 50.00 %

可能會有人看到,時間大部分已經會覺得滿足了。從數據上看,執行的次數減少了50%,而運行的時間也減少了20%,在性能上已經是很大的提升了。而且已經減少了7億次的執行次數,已經很NB了。 那是不是到這就已經很完美了呢?

答案是No

4.3 哪裡可以優化

同理,我們還是拿上面長度為7的數組來舉例子,只不過元素的位置有所不同,假設數組的元素如下。

[7, 1, 2, 3, 4, 5, 6]

我們再來一步一步的執行V2算法, 看看會發生什麼。

第一步執行完畢后,數組的情況如下。

繼續推進,當第一輪執行完畢后,數組的元素位置如下。

這個時候,數組已經排序完畢,但是按照目前的V2邏輯,仍然有5輪排序需要繼續,而且程序會完整的執行完5輪的排序,如果是100000輪呢?這樣將會浪費大量的計算資源。

5. 算法V3

5.1 代碼實現

private void bubbleSort(int[] arr) {
  for (int i = 0; i < arr.length - 1; i++) {
    boolean flag = true;
    for (int j = 0; j < arr.length - 1 - i; j++) {
      if (arr[j] > arr[j + 1]) {
        flag = false;
        exchange(arr, j, j + 1);
      }
    }
    if (flag) {
      break;
    }
  }
}

private void exchange(int arr[], int i, int j) {
  int temp = arr[i];
  arr[i] = arr[j];
  arr[j] = temp;
}

int[] arr = new int[]{5, 1, 3, 7, 6, 2, 4};
bubbleSort(arr);
System.out.println(Arrays.toString(arr)); // [1, 2, 3, 4, 5, 6, 7]

5.2 實現分析

我們在V2代碼的基礎上,在第一層循環,也就是控制總冒泡輪數的循環中,加入了一個標誌為flag。用來標示該輪冒泡排序中,數組是否是有序的。每一輪的初始值都是true。

當第二層循環,也就是冒泡排序的元素兩兩比較完成之後,flag的值仍然是true,則說明在這輪比較中沒有任何元素被交換了位置。也就是說,數組此時已經是有序狀態了,沒有必要再執行後續的剩餘輪數的冒泡了。

所以,如果flag的值是true,就直接break了(沒有其他的操作return也沒毛病)。

算法執行情況 結果
樣本 [0 – 10000] 的亂序數組
算法 V3 執行的總次數 49993775
算法 V3 運行 100 次的平均時間 142 ms
運行時間與 V2 對比 V3 運行時間減少 00.00 %
執行次數與 V2 對比 V3 運行次數減少 00.00 %

5.3 數據分析

大家看到數據可能有點懵逼。

你這個優化之後,運行時間執行次數都沒有減少。你這優化的什麼東西?

其實,這就要說到算法的適用性了。V3的優化是針對原始數據中存在一部分或者大量的數據已經是有序的情況,V3的算法對於這樣的樣本數據才最適用。

其實是我們還沒有到優化這種情況的那一步,但是其實仍然有這樣的說法,面對不同的數據結構,幾乎沒有算法是萬能的

而目前的樣本數據仍然是隨機的亂序數組,所以並不能發揮優化之後的算法的威力。所謂對症下藥,同理並不是所有的算法都是萬能的。對於不同的數據我們需要選擇不同的算法。例如我們選擇[9999,1,2,…,9998]這行的數據做樣本來分析,我們來看一下V3算法的表現。

算法執行情況 結果
樣本 [0 – 10000] 的亂序數組
算法 V3 執行的總次數 19995
算法 V3 運行 100 次的平均時間 1 ms
運行時間與 V3 亂序樣例對比 V3 運行時間減少 99.96 %
執行次數與 V3 亂序樣例對比 V3 運行次數減少 99.29 %

可以看到,提升非常明顯。

5.4 適用情況

當冒泡算法運行到後半段的時候,如果此時數組已經有序了,需要提前結束冒泡排序。V3針對這樣的情況就特別有效。

6. 算法V4

嗯,什麼?為什麼不是結束語?那是因為還有一種沒有考慮到啊。

6.1 適用情況總結

我們總結一下前面的算法能夠處理的情況。

  • V1:正常亂序數組
  • V2:正常亂序數組,但對算法的執行次數做了優化
  • V3:大部分元素已經有序的數組,可以提前結束冒泡排序

還有一種情況是冒泡算法的輪數沒有執行完,甚至還沒有開始執行,後半段的數組就已經有序的數組,例如如下的情況。

這種情況,在數組完全有序之前都不會觸發V3中的提前停止算法,因為每一輪都有交換存在,flag的值會一直是true。而下標2之後的所有的數組都是有序的,算法會依次的冒泡完所有的已有序部分,造成資源的浪費。我們怎麼來處理這種情況呢?

6.2 實現分析

我們可以在V3的基礎之上來做。

當第一輪冒泡排序結束后,元素3會被移動到下標2的位置。在此之後沒有再進行過任意一輪的排序,但是如果我們不做處理,程序仍然會繼續的運行下去。

我們在V3的基礎上,加上一個標識endIndex來記錄這一輪最後的發生交換的位置。這樣一來,下一輪的冒泡就只冒到endIndex所記錄的位置即可。因為後面的數組沒有發生任何的交換,所以數組必定有序。

6.3 代碼實現

private void bubbleSort(int[] arr) {
  int endIndex = arr.length - 1;
  for (int i = 0; i < arr.length - 1; i++) {
    boolean flag = true;
    int endAt = 0;
    for (int j = 0; j < endIndex; j++) {
      if (arr[j] > arr[j + 1]) {
        flag = false;
        endAt = j;
        exchange(arr, j, j + 1);
      }
    }
    endIndex = endAt;
    if (flag) {
      break;
    }
  }
}

private void exchange(int arr[], int i, int j) {
  int temp = arr[i];
  arr[i] = arr[j];
  arr[j] = temp;
}

int[] arr = new int[]{5, 1, 3, 7, 6, 2, 4};
bubbleSort(arr);
System.out.println(Arrays.toString(arr)); // [1, 2, 3, 4, 5, 6, 7]

7. 算法V5

這一節仍然不是結束語…

7.1 算法優化

我們來看一下這種情況。

對於這種以上的算法都將不能發揮其應有的作用。每一輪算法都存在元素的交換,同時,直到算法完成以前,數組都不是有序的。但是如果我們能直接從右向左冒泡,只需要一輪就可以完成排序。這就是雞尾酒排序,冒泡排序的另一種優化,其適用情況就是上圖所展示的那種。

7.2 代碼實現

private void bubbleSort(int[] arr) {
  int leftBorder = 0;
  int rightBorder = arr.length - 1;

  int leftEndAt = 0;
  int rightEndAt = 0;

  for (int i = 0; i < arr.length / 2; i++) {
    boolean flag = true;
    for (int j = leftBorder; j < rightBorder; j++) {
      if (arr[j] > arr[j + 1]) {
        flag = false;
        exchange(arr, j, j + 1);
        rightEndAt = j;
      }
    }
    rightBorder = rightEndAt;
    if (flag) {
      break;
    }

    flag = true;
    for (int j = rightBorder; j > leftBorder; j--) {
      if (arr[j] < arr[j - 1]) {
        flag = false;
        exchange(arr, j, j - 1);
        leftEndAt = j;
      }
    }
    leftBorder = leftEndAt;
    if (flag) {
      break;
    }
  }
}

private void exchange(int arr[], int i, int j) {
  int temp = arr[i];
  arr[i] = arr[j];
  arr[j] = temp;
}

int[] arr = new int[]{2, 3, 4, 5, 6, 7, 1};
bubbleSort(arr);
System.out.println(Arrays.toString(arr)); // [1, 2, 3, 4, 5, 6, 7]

7.3 實現分析

第一層循環同樣用於控制總的循環輪數,由於每次需要從左到右再從右到左,所以總共的輪數是數組的長度 / 2。

內存循環則負責先實現從左到右的冒泡排序,再實現從右到左的冒泡,並且同時結合了V4的優化點。

我們來看一下V5與V4的對比。

算法執行情況 結果
樣本 [2,3,4…10000,1] 的數組
算法 V5 執行的總次數 19995
算法 V5 運行 100 次的平均時間 1 ms
運行時間與 V4 對比 V5 運行時間減少 99.97 %
執行次數與 V4 對比 V5 運行次數減少 99.34 %

8. 總結

以下是對同一個數組,使用每一種算法對其運行100次的平均時間和執行次數做的的對比。

[0 – 10000] 的亂序數組 V1 V2 V3 V4 V5
執行時間(ms) 184 142 143 140 103
執行次數(次) 99990000 49995000 49971129 49943952 16664191
大部分有序的情況 V1 V2 V3 V4 V5
執行時間(ms) 181 141 146 145 107
執行次數(次) 99990000 49995000 49993230 49923591 16675618

而冒泡排序的時間複雜度分為最好的情況和最快的情況。

  • 最好的情況為O($n$). 也就是我們在V5中提到的那種情況,數組2, 3, 4, 5, 6, 7, 1。使用雞尾酒算法,只需要進行一輪冒泡,即可完成對數組的排序。
  • 最壞的情況為O($n^2$).也就是V1,V2,V3和V4所遇到的情況,幾乎大部分數據都是無序的。

往期文章:

  • 聊聊微服務集群當中的自動化工具
  • go源碼解析-Println的故事
  • 用go-module作為包管理器搭建go的web服務器
  • WebAssembly完全入門——了解wasm的前世今身
  • 小強開飯店-從單體應用到微服務

相關:

  • 微信公眾號: SH的全棧筆記(或直接在添加公眾號界面搜索微信號LunhaoHu)

【精選推薦文章】

自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

redis的五種數據類型及應用場景

前言

redis是用鍵值對的形式來保存數據,鍵類型只能是String,但是值類型可以有String、List、Hash、Set、Sorted Set五種,來滿足不同場景的特定需求。

本博客中的示例不是將控制台作為redis的一個客戶端,而是將redis運用在java里進行測試

需要有java redis的驅動包,可以通過引入maven的依賴即可

        <dependency>
            <groupId>org.rarefiedredis.redis</groupId>
            <artifactId>redis-java</artifactId>
            <version>0.0.17</version>
        </dependency>

 

String

String類型是最基礎的一種key-value存儲形式,value其實不僅僅可以是String,也可以是數值類型。常常用來做計數器這類自增自減的功能,可用在粉絲數、微博數等。

示例

 1         //連接本地的 Redis 服務
 2         Jedis jedis = new Jedis("localhost");
 3         System.out.println("連接成功");
 4         //查看服務是否運行
 5         System.out.println("服務正在運行: "+jedis.ping());
 6         //String實例
 7         jedis.set("hello", String.valueOf(1));
 8         jedis.incr("hello");
 9         jedis.set("hello1","word1");
10         System.out.println(jedis.get("hello"));
11         System.out.println(jedis.mget("hello","hello1"));

常用命令

  • set
  • get
  • mget
  • incr
  • decr

 

List

list就是鏈表,在redis實現為一個雙向鏈表,可以從兩邊插入、刪除數據。應用場景可以有微博的關注列表、粉絲列表、消息列表等。

有一個lrange函數,可以從某個元素開始讀取多少個元素,可用來實現分頁功能。

示例

 1         /*list實例,雙向鏈表結構,適合做消息隊列,
 2         但其實真正應用中一般都會用專門做消息隊列的中間件例如RabbitMQ*/
 3         jedis.lpush("201宿舍","hlf");
 4         jedis.lpush("201宿舍","css");
 5         jedis.lpush("201宿舍","ty");
 6         jedis.lpush("201宿舍","jy");
 7         List<String> name = jedis.lrange("201宿舍",0,3);
 8         for (String person:name
 9              ) {
10             System.out.print(person+" ");
11         }

 

常用命令

  •  lpush
  • rpush
  • lpush
  • lpop
  • lrange

 

Hash

hash就是值類型存儲的是一個鍵值對形式,適合存儲對象類型信息,例如個人信息、商品信息等。

示例

 1         //hash實例,適合存儲對象
 2         HashMap<String,String> map = new HashMap<String, String>();
 3         map.put("name","hlf");
 4         map.put("sex","女");
 5         map.put("age","21");
 6         jedis.hmset("hlf",map);
 7         jedis.hset("hlf","major","software");
 8         Map<String,String> map1 = jedis.hgetAll("hlf");
 9         String age = jedis.hget("hlf","age");
10         System.out.println(map1);
11         System.out.println(age);

 

常用命令

  • hset
  • hmset
  • hget
  • hgetAll

 

Set

set表示存儲的一個元素不重合的集合,因為set集合支持查緝、並集操作,因此適合做共同好友等功能

示例

1         //set實例
2         jedis.sadd("set","hhh");
3         jedis.sadd("set","ff");
4         jedis.sadd("set","hhh");
5         System.out.println(jedis.smembers("set"));
6         jedis.sadd("set1","oo");
7         jedis.sadd("set1","ff");
8         System.out.println("交集:"+jedis.sinter("set","set1"));
9         System.out.println("合集:"+jedis.sunion("set","set1"));

 

常用命令

  • sadd
  • spop
  • smembers
  • sunion
  • sinter

 

Sorted Set

相對於Set,Sorted Set多了一個Score作為權重,使集合裏面的元素可以按照score排序,注意它是Set,所以它裏面的元素也不能重複

示例

        //sorted set實例
        jedis.zadd("set2",4,"redis");
        jedis.zadd("set2",3,"mysql");
        jedis.zadd("set2",2,"kk");
        jedis.zadd("set2",1,"redis");
        System.out.println(jedis.zrangeByScore("set2",0,4));

 

常用命令

  • zadd
  • zpop
  • zrangeByScore

 

【精選推薦文章】

智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

想知道網站建置、網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計及後台網頁設計

帶您來看台北網站建置台北網頁設計,各種案例分享

廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

雲遊戲流媒體整體架構設計(雲遊戲流媒體技術前瞻,最近雲遊戲概念很火,加之對流媒體技術略有研究,簡單寫一些)

前言:

遙想當年阿法狗戰敗一眾圍棋國手,風氣一轉,似乎所有人都懂AI。這次谷歌又放出了stadia,國內鵝廠再次跑步進場,貴州某xx雲提前布局。

閑來無事,嘗試體驗了一下貴州某xx雲的雲遊戲(不打廣告),暫且不評論如何如何,剛好對流媒體技術略有研究,僅在這裏簡單聊一下這方面涉及的架構和技術。

架構設計:

總體架構自上而下大致分為四端:

1、雲遊戲主機端(雲遊戲運行端,或者叫雲遊戲畫面渲染端,需要接收控制指令並錄屏推流到流媒體服務)

主機端需要運行遊戲並讓通過錄屏推流程序把渲染好的遊戲畫面(其實就是錄屏)推流到流媒體服務進行實時視頻分發。

有人會想這個雲遊戲主機端可能會很複雜,其實也還好,只是包含了錄屏、推流、用戶控制指令接收和一些其他諸如計費此類的相關功能。

2、流媒體服務(用於轉發主機端推上來的遊戲實時視頻並分發出去,所有用戶都可以觀看這個視頻)

這個不需要多講了,只是用來轉發遊戲實時視頻,並不涉及雲遊戲主機的控制權。

3、控制指令轉發服務(用戶需要獲取控制指令服務的所有權才能控制雲遊戲主機)

這個是雲遊戲的控制核心,獲取某台雲遊戲主機的用戶就可以通過鍵盤或者鼠標進行雲遊戲的試玩(操作),理論上講能夠獲取該控制權的不是只有一個用戶,完全可以支持多個用戶同時控制一台雲遊戲主機。

4、客戶端(瀏覽器,pc客戶端,ios,安卓客戶端等)

客戶端需要從流媒體服務拉取實時遊戲視頻,用戶需要先獲取雲遊戲主機的控制權,才能夠發送控制指令來試玩(操作)雲遊戲(鼠標,鍵盤,手柄等)

靈魂畫師繪製結構圖:

架構示意圖-來自靈魂畫師的傾情手繪

難點或者叫待解決的點:

1、流媒體協議的選擇?高延遲才是最大殺手

從流媒體技術出身開始,實時視頻延遲一直是個比較棘手的問題,比如rtmp/http-flv等基於tcp的協議本身優化到極點也要幾百毫秒的延遲,hls這種超高延遲到幾秒的不提也罷。就目前看只有sip、rtsp以及基於udp的一些協議能夠滿足這種超低延遲的需求,但是這種協議就很難在瀏覽器上就很難實現了,除了webrtc,而webrtc協議是谷歌力推的下一代流媒體協議,不排除這次是谷歌webrtc技術的奠基之作,拭目以待。

2、雲遊戲主機控制指令的所有權?依然是延遲

這個所有權其實不算是難點,只是用戶獲取某台雲遊戲主機的控制權而已。難點在於控制指令的延遲,沒錯,就是網絡延遲。尤其是在拉取實時視頻時,在視頻已經佔用大量帶寬的情況下,在這種網絡負載或者網絡波動較大的情況下控制指令延遲或許值得重視。

跟很多朋友討論過雲遊戲這個話題,不約而同第一個想到的都是網絡延遲,當然這個延遲不僅包含控制指令的延遲也指實時遊戲視頻的延遲。

 

後言(啰嗦幾句):

其實這塊依然屬於共享經濟的後續,類似共享單車。

給大家舉個栗子:我有一百台性能強勁的遊戲主機,每台主機價值一萬,當二手貨賣掉可能還會虧點,好可惜。那麼我把他共享出來,假設現在有十萬個用戶想租我這一百台機器,然後每人只收10塊錢月租,不考慮電費等其他成本,請問我什麼時候能回本?

作者:eguid

說明:原CSDN相關博客文章已經全部轉移到博客園

【精選推薦文章】

自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

如何在C#中調試LINQ查詢

原文:How to Debug LINQ queries in C#
作者:Michael Shpilt
譯文:如何在C#中調試LINQ查詢
譯者:Lamond Lu

在C#中我最喜歡的特性就是LINQ。使用LINQ, 我們可以獲得一種易於編寫和理解的簡潔語法,而不是單調的foreach循環,它可以讓你的代碼更加美觀。

但是LINQ也有不好的地方,就是調試起來非常難。我們無法知道查詢中到底發生了什麼。我們可以看到輸入值和輸出值,但是僅此而已。當代碼出現問題的時候,我們只能盯着代碼看嗎?答案是否定的,這裡有幾種可以使用的LINQ的調試方法。

LINQ調試

儘管很困難,但是這裏還是有幾種可選的方式來調試LINQ的。

這裏首先,我們先創建一個測試場景。假設我們現在想要獲取一個列表,這個列表中包含了3個超過平均工資的男性員工的信息,並且按照年齡排序。這是一個非常普通的查詢,下面就是我針對這個場景編寫的查詢方法。

public IEnumerable<Employee> MyQuery(List<Employee> employees)
{
    var avgSalary = employees.Select(e=>e.Salary).Average();
 
    return employees
        .Where(e => e.Gender == "Male")
        .Take(3)
        .Where(e => e.Salary > avgSalary)
        .OrderBy(e => e.Age);
}

這裏我們使用的數據集如下:

Name Age Gender Salary
Peter Claus 40 “Male” 61000
Jose Mond 35 “male” 62000
Helen Gant 38 “Female” 38000
Jo Parker 42 “Male” 52000
Alex Mueller 22 “Male” 39000
Abbi Black 53 “female” 56000
Mike Mockson 51 “Male” 82000

當運行以上查詢之後, 我得到的結果是

Peter Claus, 61000, 40

這個結果看起來不太對…這裏應該查出3個員工。這裏我們計算出的平均工資應該是56400, 所以’Jose Mond’和’Mick Mockson’應該也是滿足條件的結果。

所以呢,這裡在我的LINQ查詢中有BUG, 那麼我們該怎麼做? 當然我可以一直盯着代碼來找出問題,在某些場景下這種方式可能是行的通的。或者呢我們可以來嘗試調試它。

下面讓我們看一下,我們有哪些可選的調試方法。

1. 使用Quickwatch

這裏比較容易的方法是使用QuickWatch窗口來查看查詢的不同部分的結果。你可以從第一個操作開始,一步一步的追加過濾條件。

例:

這裏我們可以看到,在經過第一個查詢之後,就出錯了。 ‘Jose Mond’應該是一個男性,但是在結果集中缺失了。那麼我們的BUG應該就是出在這裏了,我們可以只盯着這一小段代碼來查找問題。沒錯,這裏的BUG原因是數據集中將男性拼寫為了’male’, 而不是我們查詢的’Male’。

因此,現在我可以通過忽略大小寫來修復這個問題。

var res = employees
        .Where(e => e.Gender.Equals("Male", StringComparison.OrdinalIgnoreCase))
        .Take(3)
        .Where(e => e.Salary > avgSalary)
        .OrderBy(e => e.Age);
 

現在我們將得到如下結果集:

Jose Mond, 62000, 35
Peter Claus, 61000, 40

在結果集中’Jose’已經包含在內了,所以這裏第一個Bug已經被修復了。但是問題是’Mike Mockson’依然沒有出現在結果集裏面。我們將使用後面的調試方式來解決它。

Quickwatch看似很美好,其實是有一個很大的缺點。如果你要從一個很大的數據集中找到一個指定的數據項,你可以需要花非常多的時間。

而且需要注意有些查詢可能會改變應用的狀態。例如,你可能在lambda表達式中,通過調用某個方法來改變一些變量的值,例如var res = source.Select(x => x.Age++)。在Quickwatch中運行這段代碼,你的應用狀態會被修改,調試上下文會不一致。不過在Quickwatch你可以使用添加nse這個”無副作用”標記,來避免調試上下文的變更。你可以在你的LINQ表達式後面追加, nse的後綴來啟用“無副作用”標記。

例:

2. 在lambda表達式部分放置斷點

另外一種非常好用的調試方式是在lambda表達式內部放置斷點。這可以讓你查看每個獨立數據項的值。針對比較大的數據集,你可以使用條件斷點。

在我們的用例中,我們發現’Mike Mockson’不在第一個Where操作結果集中。這時候我們就可以在.Where(e => e.Gender == "Male")代碼部分添加一個條件斷點,斷點條件是e.Name=="Mike Mockson"

在我們的用例中,這個斷點永遠不會被觸發。而且在我們將查詢條件改為

.Where(e => e.Gender.Equals("Male", StringComparison.OrdinalIgnoreCase))

之後也不會觸發。你知道這是為什麼?

現在不要在盯着代碼了,這裏我們使用斷點的Actions功能,這個功能允許你在斷點觸發時,在Output窗口中輸出日誌。

再次調試之後,我們會在Output窗口中得到如下結果:

只有3個人名被打印出來了。這是因為在我們的查詢中使用了.Take(3), 它會讓數據集只返回前3個匹配的數據項。

這裏我們本來的意願是想列出超過平均工資的前三位男性,並且按照年齡排序。所以這裏我們應該把Take放到工資過濾代碼的後面。

var res = employees
        .Where(e => e.Gender.Equals("Male", StringComparison.OrdinalIgnoreCase))
        .Where(e => e.Salary > avgSalary)
        .Take(3)
        .OrderBy(e => e.Age);
 

再次運行之後,結果集正確显示了Jose Mond,Peter ClausMike Mockson

注: LINQ to SQL中,這個方式不起作用。

3. 為LINQ添加日誌擴展方法

現在讓我們把代碼還原到Bug還未修復的最初狀態.

下面我們來使用擴展方法來幫助調試Query。


public static IEnumerable<T> LogLINQ<T>(this IEnumerable<T> enumerable, string logName, Func<T, string> printMethod)
{
#if DEBUG
    int count = 0;
    foreach (var item in enumerable)
    {
        if (printMethod != null)
        {
            Debug.WriteLine($"{logName}|item {count} = {printMethod(item)}");
        }
        count++;
        yield return item;
    }
    Debug.WriteLine($"{logName}|count = {count}");
#else   
    return enumerable;
#endif
}
 

你可以像這樣使用你的調試方法。

var res = employees
        .LogLINQ("source", e=>e.Name)
        .Where(e => e.Gender == "Male")
        .LogLINQ("logWhere", e=>e.Name)
        .Take(3)
        .LogLINQ("logTake", e=>e.Name)
        .Where(e => e.Salary > avgSalary)
        .LogLINQ("logWhere2", e=>e.Name)
        .OrderBy(e => e.Age);
 

輸出結果如下:

說明和解釋:

  • LogLINQ方法需要放在你的每個查詢條件後面。它會輸出所有滿足條件的數據項及其總數
  • logName是一個輸出日誌的前綴,使用它可以很容易了解到當前運行的是哪一步查詢
  • Func<T, string> printMethod是一個委託,它可以幫助打印任何你指定的變量值,在上述例子中,我們打印了員工的名字
  • 為了優化代碼,這個代碼應該是只在調試模式使用。所以我們添加了#if DEBUG

下面我們來分析一下輸出窗口的結果,你會發現這幾個問題:

  • source中包含”Jose Mond”, 但是logWhere中不包含,這就是我們前面發現的大小寫問題
  • “Mike Mockson”沒有出現在任何結果中,原因是過早的使用Take, 過濾了許多正確的結果。

4. 使用OzCode的LINQ功能

如果你需要一個強力的工具來調試LINQ, 那麼你可以使用OzCode這個Visual Studio插件。

OzCode可以提供一個可視化的LINQ查詢界面來展示每一個數據項的行為。首先,它可以展示每次操作后,滿足條件的所有數據項的數量。

然後呢,當你點擊任何一個数字按鈕的時候,你可以查看所有滿足條件的數據項。

我們可以看到”Jo Parker”是源數據的第四個,經過第一個Where查詢時候,變成了數據源中的第三項。這裏可以看到在最後2步操作OrderByTake返回的結果集中沒有這一項了,因為他已經被過濾掉了。

就調試LINQ而言,OzCode基本上已經可以滿足你的所有需求了。

總結

LINQ的調試不是非常直觀,但是通過一些內置和第三方組件還是可以很好調試結果。

這裏我沒有提到LINQ查詢語法,因為它使用得並不多。只有方式#2 (lambda表達式部分放置斷點)和技術#4 (OzCode)可以使用查詢語法。

LINQ既適用於內存集合,也適用於數據源。直接數據源可以是SQL數據庫、XML模式和web服務。但是並非所有上述技術都適用於數據源。特別是,方式#2 (lambda表達式部分放置斷點)根本不起作用。方式#3(日誌中間件)可以用於調試,但最好避免使用它,因為它將集合從IQueryable更改為IEnumerable。不要讓LogLINQ方法用於生產數據源。方式#4 (OzCode)對於大多數LINQ提供程序都可以很好地工作,但是如果LINQ提供程序以非標準的方式工作,那麼可能會有一些細微的變化。

【精選推薦文章】

智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

想知道網站建置、網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計及後台網頁設計

帶您來看台北網站建置台北網頁設計,各種案例分享

廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

你值得關注的幾種常見的js設計模式

前言

潜水了一段時間,今天空閑時間復盤下之前的知識點,聊聊 js 幾種常見的設計模式。

掌握 JavaScript 中常見的一些設計模式,對我們書寫規範性代碼,可維護性代碼有很大的幫助。

ps:最近在一些好友的鼓勵下,pubdreamcc 準備着手經營一個公眾號了,具體信息會在接下來的两天時間內發布,新手上路,歡迎大夥提供一些寶貴的建議,cc 在這裏先謝了~

內容主體

單例模式

所謂單例模式即為:保證一個類僅有一個實例,並提供一個訪問它的全局訪問點。

這裏其實利用的是 js閉包 來實現這樣的功能。

假如現在我們有這樣的需求,設置一個管理員,無論創建多次都只是設置一次。


function SetManager(name) {
  this.manager = name;
}

SetManager.prototype.getName = function() {
  console.log(this.manager);
};

var SingletonSetManager = (function() {
  var manager = null;

  return function(name) {
    if (!manager) {
        manager = new SetManager(name);
    }

    return manager;
  }
})();

SingletonSetManager('a').getName(); // a
SingletonSetManager('b').getName(); // a
SingletonSetManager('c').getName(); // a

這種方法有一個缺點就是:如果我們需要再次創建一個 HR,則需要將代碼再複製一遍,所以我們可以提取通用的單例。

function getSingleton(fn) {
  var instance = null;

  return function() {
    if (!instance) {
        instance = fn.apply(this, arguments);
    }

    return instance;
  }
}


// 設置管理員
var managerSingleton = getSingleton(function(name){
  var manager = new SetManager(name);
  return manager;
})

managerSingleton('a').getName(); // a
managerSingleton('b').getName(); // a
managerSingleton('c').getName(); // a

// 設置 HR

function SetHr(name) {
  this.hr = name;
}

SetHr.prototype.getName = function() {
  console.log(this.hr);
};

var hrSingleton = getSingleton(function(name) {
  var hr = new SetHr(name);
  return hr;
});

hrSingleton('aa').getName(); // aa
hrSingleton('bb').getName(); // aa
hrSingleton('cc').getName(); // aa

這樣我們的代碼可通用性就會變得更好,省去了一些重複性的代碼。

代理模式

所謂代理模式就是:我們不方便直接訪問某個對象時,可以為對象創建一個佔位符(代理),以便控制對它的訪問,我們實際上訪問的是代理對象。

這裏我們以一個過濾敏感字符來說明這種模式

// 主體,發送消息
function sendMsg(msg) {
  console.log(msg);
}

// 代理,對消息進行過濾
function proxySendMsg(msg) {
  // 無消息則直接返回
  if (typeof msg === 'undefined') {
    console.log(null);
    return;
  }
  
  // 有消息則進行過濾
  msg = ('' + msg).replace(/泥\s*煤/g, '');

  sendMsg(msg);
}


sendMsg('泥煤呀泥 煤呀'); // 泥煤呀泥 煤呀
proxySendMsg('泥煤呀泥 煤'); // 呀
proxySendMsg(); // null

這樣操作的意圖很明顯,當沒有消息的時候,控制對主體對象的訪問,代理直接返回一個 null ,有消息,則會過濾掉敏感字符,實現虛擬代理。

策略模式

策略模式就是內部封裝一些算法,它們之間可以互相替換,但是它們不隨客戶端變化而變化。

策略模式我們外部看不到算法的具體實現,我們也只關心算法實現的結果,不關注過程。

這裏以一個商品促銷的例子來說明下:在聖誕節,某些商品需要八折出售,有些商品需要九折出售,到了元旦節,普通客戶滿100減30,vip客戶滿100減50。可以看到商品出售的價格需要根據不同的條件來規定,分別採取不同的算法實現,所以我們採用策略模式。

// 價格策略對象
class PriceStrategy {
  constructor() {
      // 內部算法對象
    this.stragtegy = {
        // 100返30
      return30(price) {
          return +price + parseInt( price / 100) * 30;
      },
      // 100 返 50
      return50(price) {
          return +price + parseInt(price/ 100) * 50;
      },
      // 9 折
      percent90(price) {
          return price * 100 * 90 / 10000
      },
      percent80(price) {
          return price * 100 * 80 / 10000
      },
      percent50(price) {
          return price * 100 * 50 / 10000
      }
    }
  }
  // 策略算法調用接口
  getPrice(algorithm, price) {
    return this.stragtegy[algorithm] && this.stragtegy[algorithm](price);
  }
}
let priceStrategy = new PriceStrategy();
let price = priceStrategy.getPrice('return50', 314.67);
console.log(price);

這樣,我們可以採取不同的策略算法得到商品的不同價格。

觀察者模式

觀察者模式又稱為 “發布-訂閱模式”,通過定義一種依賴關係,當一個對象狀態發生改變時,訂閱者會得到通知。

其實,我們傳統的 DOM 事件綁定就是一種發布-訂閱模式。

// 訂閱
document.body.addEventListener('click', function() {
  console.log('click1');
}, false);

document.body.addEventListener('click', function() {
  console.log('click2');
}, false);

// 發布
document.body.click(); // click1  click2

裝飾者模式

裝飾者模式就是在不改變原對象基本功能的基礎上,通過增加功能使得原本對象滿足用戶的更為複雜的需求。

比如有這麼一個需求:

用戶點擊輸入框時,如果輸入框輸入的內容有限制,那麼在其後面显示用戶輸入內容的限制格式的提示文案

—————->>>>>>> 現在要改為:

多加一條需求,默認輸入框上邊显示一行文案,當用戶點擊輸入框的時候,文案消失。

這裡是以前的代碼:

// 輸入框元素
let telInput = document.getElementById('tel_input');
// 輸入框提示文案
let telWarnText = document.getElementById('tel_warn_text');
// 點擊輸入框显示輸入框輸入格式提示文案
input.onclick = function () {
  telWarnText.style.display = 'inline-block';
};

修改之後的代碼:

// 輸入框元素
let telInput = document.getElementById('tel_input');
// 輸入框輸入格式提示文案
let telWarnText = document.getElementById('tel_warn_text');
// 輸入框提示輸入文案
let telDemoText = document.getElementById('tel_demo_text');
// 點擊輸入框显示輸入框輸入格式提示文案
input.onclick = function () {
  telWarnText.style.display = 'inline-block';
  telDemoText.style.display = 'none';
};

但是緊接着悲劇就來了,修改了電話輸入框,還有姓名、地址輸入框等等;

裝飾已有的功能對象

原有的功能已經不滿足用戶的需求了,此時需要做的是對原有的功能添加,設置新的屬性和方法來滿足新的需求,但是有不影響原來已經有的部分。

let decorator = function (input, fn) {
  let getInput = document.getElementById(input);
  if(typeof getInput.onclick === 'function') {
    let oldClick = getInput.onclick;
    getInput.onclick = function() {
        // 原來的事件回調函數
        oldClick();
        // 新增的事件回調函數
        fn();
    }
  } else {
    getInput.onclick = fn;
  }
  // 其他事件
};

// 電話輸入框功能裝飾
decorator('tel_input', function() {
  document.getElementById('tel_demo_text').sytle.display = 'none'
});
// 姓名輸入框裝飾
decorator('name_input', function() {
  document.getElementById('name_demo_text').sytle.display = 'none'
});
// 地址輸入框裝飾
decorator('address_input', function() {
  document.getElementById('address_demo_text').sytle.display = 'none'
});

后語

本編文章出自於我的 github 倉庫 ,歡迎喜歡的夥伴 star ,謝謝 。

倉庫地址 前端學習

【精選推薦文章】

自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

大型網站如何防止崩潰,解決高併發帶來的問題

大型網站,比如門戶網站,在面對大量用戶訪問、高併發請求方面帶來的問題
1大併發:在同一個時間點,有大量的客戶來訪問我們的網站,如果訪問量過大,就可能造成網站癱瘓。
2大流量:當網站大后,有大量的圖片,視頻, 這樣就會對流量要求高,需要更多更大的帶寬。
3大存儲:你的數據量會成海量的數據,如果我們的數據放入一張表,是無法應對的。可能對數據保存和查詢出現問題。

基本的解決方案集中在這樣幾個環節:使用高性能的服務器、高性能的數據庫、高效率的編程語言、還有高性能的Web容器,(對架構分層+負載均衡+集群)這幾個解決思路在一定程度上意味着更大的投入。

解決方案:

   一、提高硬件能力、增加系統服務器。(當服務器增加到某個程度的時候系統所能提供的併發訪問量幾乎不變,所以不能根本解決問題)

   二、使用緩存(本地緩存:本地可以使用JDK自帶的 Map、Guava Cache.分佈式緩存:Redis、Memcache.本地緩存不適用於提高系統併發量,一般是用處用在程序中。比如Spring是如何實現單例的呢?大家如果看過源碼的話,應該知道,Spiring把已經初始過的變量放在一個Map中,下次再要使用這個變量的時候,先判斷Map中有沒有,這也就是系統中常見的單例模式的實現。)

 分佈式緩存利器Redis集群,Redis集群的搭建至少需要三主三從。

1. 所有的redis節點彼此互聯(PING-PONG機制),內部使用二進制協議優化傳輸速度和帶寬。

2. 節點的fail是通過集群中超過半數的節點檢測失效時才生效(所以一個集群中至少要有三個節點)。

3. 客戶端與redis節點直連,不需要中間proxy層.客戶端不需要連接集群所有節點,連接集群中任何一個可用節點即可。

4. 集群中每一個節點都存放不同的內容,每一個節點都應有備份機。

5. redis-cluster把所有的物理節點映射到[0-16383]slot上,cluster 負責維護node<->slot<->value

 

 

 

 

Redis 集群中內置了16384 個哈希槽,當需要在Redis 集群中放置一個key-value 時,redis先對 key 使用 crc16 算法算出一個結果,然後把結果對16384 求餘數,這樣每個key 都會對應一個編號在0-16383 之間的哈希槽,redis會根據節點數量大致均等的將哈希槽映射到不同的節點

   三 、消息隊列 (解耦+削峰+異步)通過異步處理提高系統性能,降低系統耦合性

在不使用消息隊列服務器的時候,用戶的請求數據直接寫入數據庫,在高併發的情況下數據庫壓力劇增,使得響應速度變慢。但是在使用消息隊列之後,用戶的請求數據發送給消息隊列之後立即 返回,再由消息隊列的消費者進程從消息隊列中獲取數據,異步寫入數據庫。由於消息隊列服務器處理速度快於數據庫(消息隊列也比數據庫有更好的伸縮性),因此響應速度得到大幅改善。

 

通過使用消息中間件對Dubbo服務間的調用進行解耦, 消息中間件可利用高效可靠的消息傳遞機制進行平台無關的數據交流,並基於數據通信來進行分佈式系統的集成。通過提供消息傳遞和消息排隊模型,可以在分佈式環境下擴展進程間的通信。通過消息中間件,應用程序或組件之間可以進行可靠的異步通訊,從而降低系統之間的耦合度,提高系統的可擴展性和可用性。

   四 、採用分佈式開發 (不同的服務部署在不同的機器節點上,並且一個服務也可以部署在多台機器上,然後利用 Nginx 負載均衡訪問。這樣就解決了單點部署(All In)的缺點,大大提高的系統併發量)

   五 、數據庫分庫(讀寫分離)、分表(水平分表、垂直分表)

PXC高可用集群與Replication集群結合方案

這種的集群在遇到單表數據量超過2000萬的時候,mysql性能會受損,所以一個集群還不夠,我們需要把數據分到另一個集群,這個稱為“切片”,就是把大量的數據拆分到不同的集群中,每個集群的數據都是不一樣的,通過MyCat這個阿里巴巴的開源中間件,可以把sql分到不同的集群裏面去。

PXC集群方案與Replication區別

PXC集群方案所有節點都是可讀可寫的,Replication從節點不能寫入,因為主從同步是單向的,無法從slave節點向master點同步。

PXC同步機制是同步進行的,這也是它能保證數據強一致性的根本原因,Replication同步機制是異步進行的,它如果從節點停止同步,依然可以向主節點插入數據,正確返回,造成數據主從數據的不一致性。

PXC是用犧牲性能保證數據的一致性,Replication在性能上是高於PXC的。所以兩者用途也不一致。PXC是用於重要信息的存儲,例如:訂單、用戶信息等。Replication用於一般信息的存儲,能夠容忍數據丟失,例如:購物車,用戶行為日誌等

   六、 採用集群 (多台機器提供相同的服務)系統架構方案

   七、CDN 加速 (將一些靜態資源比如圖片、視頻等等緩存到離用戶最近的網絡節點)

   八、瀏覽器緩存 頁面靜態化(使用php自己的ob緩存技術實現, 主流的mvc框架(tp,yii,laravel)模板引擎一般都自帶頁面靜態化 )      

   九、使用合適的連接池(數據庫連接池、線程池等等)

   十、適當使用多線程進行開發。

   十一、使用鏡像

鏡像是大型網站常採用的提高性能和數據安全性的方式,鏡像的技術可以解決不同網絡接入商和地域帶來的用戶訪問速度差異,比如ChinaNet和EduNet之間的差異就促使了很多網站在教育網內搭建鏡像站點,數據進行定時更新或者實時更新。有很多專業的現成的解決架構和產品可選。也有廉價的通過軟件實現的思路,比如Linux上的rsync等工具。

   十二、圖片服務器分離

大家知道,對於Web服務器來說,不管是Apache、IIS還是其他容器,圖片是最消耗資源的,於是我們有必要將圖片與頁面進行分離,這是基本上大型網站都會採用的策略,他們都有獨立的、甚至很多台的圖片服務器。這樣的架構可以降低提供頁面訪問請求的服務器系統壓力,並且可以保證系統不會因為圖片問題而崩潰。

在應用服務器和圖片服務器上,可以進行不同的配置優化,比如apache在配置ContentType的時候可以盡量少支持、盡可能少的LoadModule,保證更高的系統消耗和執行效率。

 

【精選推薦文章】

智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

想知道網站建置、網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計及後台網頁設計

帶您來看台北網站建置台北網頁設計,各種案例分享

廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

kafka消費者客戶端

Kafka消費者

1.1 消費者與消費者組

消費者與消費者組之間的關係

​ 每一個消費者都隸屬於某一個消費者組,一個消費者組可以包含一個或多個消費者,每一條消息只會被消費者組中的某一個消費者所消費。不同消費者組之間消息的消費是互不干擾的。

為什麼會有消費者組的概念

​ 消費者組出現主要是出於兩個目的:

​ (1) 使整體的消費能力具備橫向的伸縮性。可以適當增加消費者組中消費者的數量,來提高整體的消費能力。但是每一個分區至多被消費者組的中一個消費者所消費,因此當消費者組中消費者數量超過分區數時,多出的消費者不會分配到任何一個分區。當然這是默認的分區分配策略,可通過partition.assignment.strategy進行配置。

​ (2) 實現消息消費的隔離。不同消費者組之間消息消費互不干擾,從而實現發布訂閱這種消息投遞模式。

注意:

​ 消費者隸屬的消費者組可以通過group.id進行配置。消費者組是一個邏輯上的概念,但消費者並不是一個邏輯上的概念,它可以是一個線程,也可以是一個進程。同一個消費者組內的消費者可以部署在同一台機器上,也可以部署在不同的機器上。

1.2 消費者客戶端開發

​ 一個正常的消費邏輯需要具備以下幾個步驟:

  • 配置消費者客戶端參數及創建相應的消費者實例。

  • 訂閱主題

  • 拉取消息並消費

  • 提交消費位移

  • 關閉消費者實例

  public class KafkaConsumerAnalysis {
  public static final String brokerList="node112:9092,node113:9092,node114:9092";
  public static final String topic = "topic-demo";
  public static final String groupId = "group.demo";
  public static final AtomicBoolean isRunning = new AtomicBoolean(true);

  public static Properties initConfig() {
      Properties prop = new Properties();
      prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
      prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
      prop.put(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG, "consumer.client.di.demo");
      return prop;
  }


  public static void main(String[] args) {
      KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(initConfig());
       
      for (ConsumerRecord<String, String> record : records) {
          System.out.println("topic = " + record.topic() + ", partition =" +                                               record.partition() + ", offset = " + record.offset());
          System.out.println("key = " + record.key() + ", value = " + record.value());
              }
          }
      } catch (Exception e) {
          e.printStackTrace();
      }finally {
          consumer.close();
      }
  }
}

1.2.1 訂閱主題和分區

​ 先來說一下消費者訂閱消息的粒度:一個消費者可以訂閱一個主題、多個主題、或者多個主題的特定分區。主要通過subsribe和assign兩個方法實現訂閱。

(1)訂閱一個主題:

​ public void subscribe(Collection<String> topics),當集合中有一個主題時。

(2)訂閱多個主題:

​ public void subscribe(Collection<String> topics),當集合中有多個主題時。

​ public void subscribe(Pattern pattern),通過正則表達式實現消費者主題的匹配。通過這種方式,如果在消息消費的過程中,又添加了新的能夠匹配到正則的主題,那麼消費者就可以消費到新添加的主題。 consumer.subscribe(Pattern.compile(“topic-.*”));

(3)多個主題的特定分區

​ public void assign(Collection<TopicPartition> partitions),可以實現訂閱某些特定的主題分區。TopicPartition包括兩個屬性:topic(String)和partition(int)。

​ 如果事先不知道有多少分區該如何處理,KafkaConsumer中的partitionFor方法可以獲得指定主題分區的元數據信息:

​ public List<PartitionInfo> partitionsFor(String topic)

​ PartitionInfo的屬性如下:

  
public class PartitionInfo {
  private final String topic;//主題
  private final int partition;//分區
  private final Node leader;//分區leader
  private final Node[] replicas;//分區的AR
  private final Node[] inSyncReplicas;//分區的ISR
  private final Node[] offlineReplicas;//分區的OSR
}

​ 因此也可以通過這個方法實現某個主題的全部訂閱。

​ 需要指出的是,subscribe(Collection)、subscirbe(Pattern)、assign(Collection)方法分別代表了三種不同的訂閱狀態:AUTO_TOPICS、AUTO_PATTREN和USER_ASSIGN,這三種方式是互斥的,消費者只能使用其中一種,否則會報出IllegalStateException。

​ subscirbe方法可以實現消費者自動再平衡的功能。多個消費者的情況下,可以根據分區分配策略自動分配消費者和分區的關係,當消費者增加或減少時,也能實現負載均衡和故障轉移。

​ 如何實現取消訂閱:

​ consumer.unsubscribe()

1.2.2 反序列化

​ KafkaProducer端生產消息進行序列化,同樣消費者就要進行相應的反序列化。相當於根據定義的序列化格式的一個逆序提取數據的過程。

  
import com.gdy.kafka.producer.Company;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;

public class CompanyDeserializer implements Deserializer<Company> {
  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {

  }

  @Override
  public Company deserialize(String topic, byte[] data) {
      if(data == null) {
          return null;
      }

      if(data.length < 8) {
          throw new SerializationException("size of data received by Deserializer is shorter than expected");
      }

      ByteBuffer buffer = ByteBuffer.wrap(data);
      int nameLength = buffer.getInt();
      byte[] nameBytes = new byte[nameLength];
      buffer.get(nameBytes);
      int addressLen = buffer.getInt();
      byte[] addressBytes = new byte[addressLen];
      buffer.get(addressBytes);
      String name,address;
      try {
          name = new String(nameBytes,"UTF-8");
          address = new String(addressBytes,"UTF-8");
      }catch (UnsupportedEncodingException e) {
          throw new SerializationException("Error accur when deserializing");
      }

      return new Company(name, address);
  }

  @Override
  public void close() {

  }
}

​ 實際生產中需要自定義序列化器和反序列化器時,推薦使用Avro、JSON、Thrift、ProtoBuf或者Protostuff等通用的序列化工具來包裝。

1.2.3 消息消費

​ Kafka中消息的消費是基於拉模式的,kafka消息的消費是一個不斷輪旋的過程,消費者需要做的就是重複的調用poll方法。

  
public ConsumerRecords<K, V> poll(final Duration timeout)

​ 這個方法需要注意的是,如果消費者的緩衝區中有可用的數據,則會立即返回,否則會阻塞至timeout。如果在阻塞時間內緩衝區仍沒有數據,則返回一個空的消息集。timeout的設置取決於應用程序對效應速度的要求。如果應用線程的位移工作是從Kafka中拉取數據並進行消費可以將這個參數設置為Long.MAX_VALUE。

​ 每次poll都會返回一個ConsumerRecords對象,它是ConsumerRecord的集合。對於ConsumerRecord相比於ProducerRecord多了一些屬性:

  
private final String topic;//主題
  private final int partition;//分區
  private final long offset;//偏移量
  private final long timestamp;//時間戳
  private final TimestampType timestampType;//時間戳類型
  private final int serializedKeySize;//序列化key的大小
  private final int serializedValueSize;//序列化value的大小
  private final Headers headers;//headers
  private final K key;//key
  private final V value;//value
  private volatile Long checksum;//CRC32校驗和

​ 另外我們可以按照分區維度對消息進行消費,通過ConsumerRecords.records(TopicPartiton)方法實現。

  
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    Set<TopicPartition> partitions = records.partitions();
    for (TopicPartition tp : partitions) {
        for (ConsumerRecord<String, String> record : records.records(tp)) {
              System.out.println(record.partition() + " ," + record.value());
        }
    }

​ 另外還可以按照主題維度對消息進行消費,通過ConsumerRecords.records(Topic)實現。

  
for (String topic : topicList) {
      for (ConsumerRecord<String, String> record : records.records(topic)) {
              System.out.println(record.partition() + " ," + record.value());
      }
}

1.2.4 消費者位移提交

​ 首先要 明白一點,消費者位移是要做持久化處理的,否則當發生消費者崩潰或者消費者重平衡時,消費者消費位移無法獲得。舊消費者客戶端是將位移提交到zookeeper上,新消費者客戶端將位移存儲在Kafka內部主題_consumer_offsets中。

​ KafkaConsumer提供了兩個方法position(TopicPatition)和commited(TopicPartition)。

​ public long position(TopicPartition partition)—–獲得下一次拉取數據的偏移量

​ public OffsetAndMetadata committed(TopicPartition partition)—–給定分區的最後一次提交的偏移量。

還有一個概念稱之為lastConsumedOffset,這個指的是最後一次消費的偏移量。

​ 在kafka提交方式有兩種:自動提交和手動提交。

(1)自動位移提交

​ kafka默認情況下採用自動提交,enable.auto.commit的默認值為true。當然自動提交並不是沒消費一次消息就進行提交,而是定期提交,這個定期的周期時間由auto.commit.intervals.ms參數進行配置,默認值為5s,當然這個參數生效的前提就是開啟自動提交。

​ 自動提交會造成重複消費和消息丟失的情況。重複消費很容易理解,因為自動提交實際是延遲提交,因此很容易造成重複消費,然後消息丟失是怎麼產生的?

(2)手動位移提交

​ 開始手動提交的需要配置enable.auto.commit=false。手動提交消費者偏移量,又可分為同步提交和異步提交。

​ 同步提交:

​ 同步提交很簡單,調用commitSync() 方法:

  
while (isRunning.get()) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, String> record : records) {
            //consume message
            consumer.commitSync();
        }
}

​ 這樣,每消費一條消息,提交一個偏移量。當然可用過緩存消息的方式,實現批量處理+批量提交:

  
while (isRunning.get()) {
      ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
      for (ConsumerRecord<String, String> record : records) {
            buffer.add(record);
      }
      if (buffer.size() >= minBaches) {
          for (ConsumerRecord<String, String> record : records) {
              //consume message
          }
          consumer.commitSync();
          buffer.clear();
      }
}

​ 還可以通過public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets)這個方法實現按照分區粒度進行同步提交。

  
while (isRunning.get()) {
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  for (TopicPartition tp : records.partitions()) {
      List<ConsumerRecord<String, String>> partitionRecords = records.records(tp);
      for (ConsumerRecord record : partitionRecords) {
          //consume message
      }
      long lastConsumerOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
      consumer.commitSync(Collections.singletonMap(tp,new                                                               OffsetAndMetadata(lastConsumerOffset+1)));
  }
}

​ 異步提交:

​ commitAsync異步提交的時候消費者線程不會被阻塞,即可能在提交偏移量的結果還未返回之前,就開始了新一次的拉取數據操作。異步提交可以提升消費者的性能。commitAsync有三個重載:

​ public void commitAsync()

​ public void commitAsync(OffsetCommitCallback callback)

​ public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback )

​ 對照同步提交的方法參數,多了一個Callback回調參數,它提供了一個異步提交的回調方法,當消費者位移提交完成后回調OffsetCommitCallback的onComplement方法。以第二個方法為例:

  
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  for (ConsumerRecord<String, String> record : records) {
      //consume message
  }
  consumer.commitAsync(new OffsetCommitCallback() {
      @Override
      public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
          if (e == null) {
              System.out.println(offsets);
          }else {
                e.printStackTrace();
          }
      }
});

1.2.5 控制和關閉消費

​ kafkaConsumer提供了pause()和resume() 方法分別實現暫停某些分區在拉取操作時返回數據給客戶端和恢復某些分區向客戶端返回數據的操作:

​ public void pause(Collection<TopicPartition> partitions)

​ public void resume(Collection<TopicPartition> partitions)

​ 優雅停止KafkaConsumer退出消費者循環的方式:

​ (1)不要使用while(true),而是使用while(isRunning.get()),isRunning是一個AtomicBoolean類型,可以在其他地方調用isRunning.set(false)方法退出循環。

​ (2)調用consumer.wakup()方法,wakeup方法是KafkaConsumer中唯一一個可以從其他線程里安全調用的方法,會拋出WakeupException,我們不需要處理這個異常。

​ 跳出循環后一定要显示的執行關閉動作和釋放資源。

1.2.6 指定位移消費

KafkaConsumer可通過兩種方式實現實現不同粒度的指定位移消費。第一種是通過auto.offset.reset參數,另一種通過一個重要的方法seek。

(1)auto.offset.reset

auto.offset.reset這個參數總共有三種可配置的值:latest、earliest、none。如果配置不在這三個值當中,就會拋出ConfigException。

latest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset或位移越界時,消費新產生的該分區下的數據

earliest:當各分區下有已提交的offset時,從提交的offset開始消費;無提交的offset或位移越界時,從頭開始消費

none:topic各分區都存在已提交的offset時,從offset后開始消費;只要有一個分區不存在已提交的offset或位移越界,則拋出NoOffsetForPartitionException異常

消息的消費是通過poll方法進行的,poll方法對於開發者來說就是一個黑盒,無法精確的掌控消費的起始位置。即使通過auto.offsets.reset參數也只能在找不到位移或者位移越界的情況下粗粒度的從頭開始或者從末尾開始。因此,Kafka提供了另一種更細粒度的消費掌控:seek。

(2)seek

seek可以實現追前消費和回溯消費:

  
public void seek(TopicPartition partition, long offset)

可以通過seek方法實現指定分區的消費位移的控制。需要注意的一點是,seek方法只能重置消費者分配到的分區的偏移量,而分區的分配是在poll方法中實現的。因此在執行seek方法之前需要先執行一次poll方法獲取消費者分配到的分區,但是並不是每次poll方法都能獲得數據,所以可以採用如下的方法。

  
consumer.subscribe(topicList);
  Set<TopicPartition> assignment = new HashSet<>();
  while(assignment.size() == 0) {
      consumer.poll(Duration.ofMillis(100));
      assignment = consumer.assignment();//獲取消費者分配到的分區,沒有獲取返回一個空集合
  }

  for (TopicPartition tp : assignment) {
      consumer.seek(tp, 10); //重置指定分區的位移
  }
  while (true) {
      ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
      //consume record
    }

如果對未分配到的分區執行了seek方法,那麼會報出IllegalStateException異常。

在前面我們已經提到,使用auto.offsets.reset參數時,只有當消費者分配到的分區沒有提交的位移或者位移越界時,才能從earliest消費或者從latest消費。seek方法可以彌補這一中情況,實現任意情況的從頭或從尾部消費。

   Set<TopicPartition> assignment = new HashSet<>();
  while(assignment.size() == 0) {
      consumer.poll(Duration.ofMillis(100));
      assignment = consumer.assignment();
  }
  Map<TopicPartition, Long> offsets = consumer.endOffsets(assignment);//獲取指定分區的末尾位置
  for (TopicPartition tp : assignment) {
      consumer.seek;
  }

與endOffset對應的方法是beginningOffset方法,可以獲取指定分區的起始位置。其實kafka已經提供了一個從頭和從尾消費的方法。

  
public void seekToBeginning(Collection<TopicPartition> partitions)
public void seekToEnd(Collection<TopicPartition> partitions)

還有一種場景是這樣的,我們並不知道特定的消費位置,卻知道一個相關的時間點。為解決這種場景遇到的問題,kafka提供了一個offsetsForTimes()方法,通過時間戳來查詢分區消費的位移。

      Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
  for (TopicPartition tp : assignment) {
      timestampToSearch.put(tp, System.currentTimeMillis() - 24 * 3600 * 1000);
  }
//獲得指定分區指定時間點的消費位移
  Map<TopicPartition, OffsetAndTimestamp> offsets =                                                                                   consumer.offsetsForTimes(timestampToSearch);
  for (TopicPartition tp : assignment) {
      OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp);
      if (offsetAndTimestamp != null) {
              consumer.seek(tp, offsetAndTimestamp.offset());
      }
  }

由於seek方法的存在,使得消費者的消費位移可以存儲在任意的存儲介質中,包括DB、文件系統等。

1.2.7 消費者的再均衡

再均衡是指分區的所屬權從一個消費者轉移到另一消費者的行為,它為消費者組具備高可用伸縮性提高保障。不過需要注意的地方有兩點,第一是消費者發生再均衡期間,消費者組中的消費者是無法讀取消息的。第二點就是消費者發生再均衡可能會引起重複消費問題,所以一般情況下要盡量避免不必要的再均衡。

KafkaConsumer的subscribe方法中有一個參數為ConsumerRebalanceListener,我們稱之為再均衡監聽器,它可以用來在設置發生再均衡動作前後的一些準備和收尾動作。

  public interface ConsumerRebalanceListener {
  void onPartitionsRevoked(Collection<TopicPartition> partitions);
  void onPartitionsAssigned(Collection<TopicPartition> partitions);
}

onPartitionsRevoked方法會在再均衡之前和消費者停止讀取消息之後被調用。可以通過這個回調函數來處理消費位移的提交,以避免重複消費。參數partitions表示再均衡前分配到的分區。

onPartitionsAssigned方法會在再均衡之後和消費者消費之間進行調用。參數partitons表示再均衡之後所分配到的分區。

  consumer.subscribe(topicList);
  Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
  consumer.subscribe(topicList, new ConsumerRebalanceListener() {
      @Override
      public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
          consumer.commitSync(currentOffsets);//提交偏移量
      }

      @Override
      public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
          //do something
      }
  });

  try {
      while (isRunning.get()) {
          ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
          for (ConsumerRecord<String, String> record : records) {
              //process records
              //記錄當前的偏移量
              currentOffsets.put(new TopicPartition(record.topic(), record.partition()),new                               OffsetAndMetadata( record.offset() + 1));
          }
          consumer.commitAsync(currentOffsets, null);
      }

      } catch (Exception e) {
          e.printStackTrace();
      }finally {
          consumer.close();
      }

1.2.8 消費者攔截器

消費者攔截器主要是在消費到消息或者提交消費位移時進行一些定製化的操作。消費者攔截器需要自定義實現org.apache.kafka.clients.consumer.ConsumerInterceptor接口。

  public interface ConsumerInterceptor<K, V> extends Configurable {    
  public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
  public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
  public void close();
}

onConsume方法是在poll()方法返回之前被調用,比如修改消息的內容、過濾消息等。如果onConsume方法發生異常,異常會被捕獲並記錄到日誌中,但是不會向上傳遞。

Kafka會在提交位移之後調用攔截器的onCommit方法,可以使用這個方法來記錄和跟蹤消費的位移信息。

  
public class ConsumerInterceptorTTL implements ConsumerInterceptor<String,String> {
  private static final long EXPIRE_INTERVAL = 10 * 1000; //10秒過期
  @Override
  public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
      long now = System.currentTimeMillis();
      Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>();

      for (TopicPartition tp : records.partitions()) {
          List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
          List<ConsumerRecord<String, String>> newTpRecords = records.records(tp);
          for (ConsumerRecord<String, String> record : tpRecords) {
              if (now - record.timestamp() < EXPIRE_INTERVAL) {//判斷是否超時
                  newTpRecords.add(record);
              }
          }
          if (!newRecords.isEmpty()) {
              newRecords.put(tp, newTpRecords);
          }


      }
      return new ConsumerRecords<>(newRecords);
  }

  @Override
  public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
      offsets.forEach((tp,offset) -> {
          System.out.println(tp + ":" + offset.offset());
      });
  }

  @Override
  public void close() {}

  @Override
  public void configure(Map<String, ?> configs) {}
}

使用這種TTL需要注意的是如果採用帶參數的位移提交方式,有可能提交了錯誤的位移,可能poll拉取的最大位移已經被攔截器過濾掉。

1.2.9 消費者的多線程實現

KafkaProducer是線程安全的,然而KafkaConsumer是非線程安全的。KafkaConsumer中的acquire方法用於檢測當前是否只有一個線程在操作,如果有就會拋出ConcurrentModifiedException。acuqire方法和我們通常所說的鎖是不同的,它不會阻塞線程,我們可以把它看做是一個輕量級的鎖,它通過線程操作計數標記的方式來檢測是否發生了併發操作。acquire方法和release方法成對出現,分表表示加鎖和解鎖。

  //標記當前正在操作consumer的線程
private final AtomicLong currentThread = new AtomicLong(NO_CURRENT_THREAD);
//refcount is used to allow reentrant access by the thread who has acquired currentThread,
//大概可以理解我加鎖的次數
private final AtomicInteger refcount = new AtomicInteger(0);
private void acquire() {
long threadId = Thread.currentThread().getId();
if (threadId != currentThread.get()&&!currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
      throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
      refcount.incrementAndGet();
}

private void release() {
  if (refcount.decrementAndGet() == 0)
      currentThread.set(NO_CURRENT_THREAD);
}

kafkaConsumer中的每個共有方法在調用之前都會執行aquire方法,只有wakeup方法是個意外。

KafkaConsumer的非線程安全並不意味着消費消息的時候只能以單線程的方式執行。可以通過多種方式實現多線程消費。

(1)Kafka多線程消費第一種實現方式——–線程封鎖

所謂線程封鎖,就是為每個線程實例化一個KafkaConsumer對象。這種方式一個線程對應一個KafkaConsumer,一個線程(可就是一個consumer)可以消費一個或多個分區的消息。這種消費方式的併發度受限於分區的實際數量。當線程數量超過分分區數量時,就會出現線程限制額的情況。

  import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class FirstMutiConsumerDemo {
  public static final String brokerList="node112:9092,node113:9092,node114:9092";
  public static final String topic = "topic-demo";
  public static final String groupId = "group.demo";

  public static Properties initConfig() {
      Properties prop = new Properties();
      prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
      prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
      prop.put(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG, "consumer.client.di.demo");
      prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
      return prop;
  }

  public static void main(String[] args) {
      Properties prop = initConfig();
      int consumerThreadNum = 4;
      for (int i = 0; i < 4; i++) {
          new KafkaCoosumerThread(prop, topic).run();
      }
  }

  public static class KafkaCoosumerThread extends Thread {
  //每個消費者線程包含一個KakfaConsumer對象。
      private KafkaConsumer<String, String> kafkaConsumer;
      public KafkaCoosumerThread(Properties prop, String topic) {
          this.kafkaConsumer = new KafkaConsumer<String, String>(prop);
          this.kafkaConsumer.subscribe(Arrays.asList(topic));
      }

      @Override
      public void run() {
          try {
              while (true) {
                  ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
                  for (ConsumerRecord<String, String> record : records) {
                      //處理消息模塊
                  }
              }
          } catch (Exception e) {
              e.printStackTrace();
          }finally {
              kafkaConsumer.close();
          }
      }
  }
}

這種實現方式和開啟多個消費進程的方式沒有本質的區別,優點是每個線程可以按照順序消費消費各個分區的消息。缺點是每個消費線程都要維護一個獨立的TCP連接,如果分區數和線程數都很多,那麼會造成不小的系統開銷。

(2)Kafka多線程消費第二種實現方式——–多個消費線程同時消費同一分區

多個線程同時消費同一分區,通過assign方法和seek方法實現。這樣就可以打破原有消費線程個數不能超過分區數的限制,進一步提高了消費的能力,但是這種方式對於位移提交和順序控制的處理就會變得非常複雜。實際生產中很少使用。

(3)第三種實現方式——-創建一個消費者,records的處理使用多線程實現

一般而言,消費者通過poll拉取數據的速度相當快,而整體消費能力的瓶頸也正式在消息處理這一塊。基於此

考慮第三種實現方式。

  import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThirdMutiConsumerThreadDemo {
  public static final String brokerList="node112:9092,node113:9092,node114:9092";
  public static final String topic = "topic-demo";
  public static final String groupId = "group.demo";

  public static Properties initConfig() {
      Properties prop = new Properties();
      prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
      prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
      prop.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
      prop.put(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG, "consumer.client.di.demo");
      prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
      return prop;
  }

  public static void main(String[] args) {
      Properties prop = initConfig();
      KafkaConsumerThread consumerThread = new KafkaConsumerThread(prop, topic, Runtime.getRuntime().availableProcessors());
      consumerThread.start();
  }


  public static class KafkaConsumerThread extends Thread {
      private KafkaConsumer<String, String> kafkaConsumer;
      private ExecutorService executorService;
      private int threadNum;

      public KafkaConsumerThread(Properties prop, String topic, int threadNum) {
          this.kafkaConsumer = new KafkaConsumer<String, String>(prop);
          kafkaConsumer.subscribe(Arrays.asList(topic));
          this.threadNum = threadNum;
          executorService = new ThreadPoolExecutor(threadNum, threadNum, 0L, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
      }

      @Override
      public void run() {
          try {
              while (true) {
                  ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
                  if (!records.isEmpty()) {
                      executorService.submit(new RecordHandler(records));
                  }
              }
          } catch (Exception e) {
              e.printStackTrace();
          }finally {
              kafkaConsumer.close();
          }
      }
  }

  public static class RecordHandler implements Runnable {
      public final ConsumerRecords<String,String> records;
      public RecordHandler(ConsumerRecords<String, String> records) {
          this.records = records;
      }
       
      @Override
      public void run() {
          //處理records
      }
  }
}

KafkaConsumerThread類對應一個消費者線程,裏面通過線程池的方式調用RecordHandler處理一批批的消息。其中線程池採用的拒絕策略為CallerRunsPolicy,當阻塞隊列填滿時,由調用線程處理該任務,以防止總體的消費能力跟不上poll拉取的速度。這種方式還可以進行橫向擴展,通過創建多個KafkaConsumerThread實例來進一步提升整體的消費能力。

這種方式還可以減少TCP連接的數量,但是對於消息的順序處理就變得困難了。這種方式需要引入一個共享變量Map<TopicPartition,OffsetAndMetadata> offsets參與消費者的偏移量提交。每一個RecordHandler類在處理完消息后都將對應的消費位移保存到共享變量offsets中,KafkaConsumerThread在每一次poll()方法之後都要進讀取offsets中的內容並對其進行提交。對於offsets的讀寫要採用加鎖處理,防止出現併發問題。並且在寫入offsets的時候需要注意位移覆蓋的問題。針對這個問題,可以將RecordHandler的run方法做如下改變:

  public void run() {
          for (TopicPartition tp : records.partitions()) {
              List<ConsumerRecord<String, String>> tpRecords = this.records.records(tp);
              //處理tpRecords
              long lastConsumedOffset = tpRecords.get(tpRecords.size() - 1).offset();
              synchronized (offsets) {
                  if (offsets.containsKey(tp)) {
                      offsets.put(tp, new OffsetAndMetadata(lastConsumedOffset + 1));
                  }else {
                      long positioin = offsets.get(tp).offset();
                      if(positioin < lastConsumedOffset + 1) {
                      offsets.put(tp, new OffsetAndMetadata(lastConsumedOffset + 1));
                      }
                  }
              }
          }
      }

對應的位移提交代碼也應該在KafkaConsumerThread的run方法中進行體現

  public void run() {
  try {
      while (true) {
          ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
          if (!records.isEmpty()) {
              executorService.submit(new RecordHandler(records));
              synchronized (offsets) {
                  if (!offsets.isEmpty()) {
                      kafkaConsumer.commitSync(offsets);
                      offsets.clear();
                    }
              }
          }
      }
  } catch (Exception e) {
      e.printStackTrace();
    }finally {
        kafkaConsumer.close();
      }
    }
}

其實這種方式並不完美,可能造成數據丟失。可以通過更為複雜的滑動窗口的方式進行改進。

1.2.10 消費者重要參數

  • fetch.min.bytes

    kafkaConsumer一次拉拉取請求的最小數據量。適當增加,會提高吞吐量,但會造成額外延遲。

  • fetch.max.bytes

    kafkaConsumer一次拉拉取請求的最大數據量,如果kafka一條消息的大小超過這個值,仍然是可以拉取的。

  • fetch.max.wait.ms

    一次拉取的最長等待時間,配合fetch.min.bytes使用

  • max.partiton.fetch.bytes

    每個分區里返回consumer的最大數據量。

  • max.poll.records

    一次拉取的最大消息數

  • connection.max.idle.ms

    多久之後關閉限制的連接

  • exclude.internal.topics

    這個參數用於設置kafka中的兩個內部主題能否被公開:consumer_offsets和transaction_state。如果設為true,可以使用Pattren訂閱內部主題,如果是false,則沒有這種限制。

  • receive.buffer.bytes

    socket接收緩衝區的大小

  • send.buffer.bytes

    socket發送緩衝區的大小

  • request.timeout.ms

    consumer等待請求響應的最長時間。

  • reconnect.backoff.ms

    重試連接指定主機的等待時間。

  • max.poll.interval.ms

    配置消費者等待拉取時間的最大值,如果超過這個期限,消費者組將剔除該消費者,進行再平衡。

  • auto.offset.reset

    自動偏移量重置

  • enable.auto.commit

    是否允許偏移量的自動提交

  • auto.commit.interval.ms

    自動偏移量提交的時間間隔

【精選推薦文章】

自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

Android-Handler消息機制實現原理

一、消息機制流程簡介

在應用啟動的時候,會執行程序的入口函數main(),main()裏面會創建一個Looper對象,然後通過這個Looper對象開啟一個死循環,這個循環的工作是,不斷的從消息隊列MessageQueue裏面取出消息即Message對象,並處理。然後看下面兩個問題:
循環拿到一個消息之後,如何處理?
是通過在Looper的循環里調用Handler的dispatchMessage()方法去處理的,而dispatchMessage()方法裏面會調用handleMessage()方法,handleMessage()就是平時使用Handler時重寫的方法,所以最終如何處理消息由使用Handler的開發者決定。
MessageQueue里的消息從哪來?
使用Handler的開發者通過調用sendMessage()方法將消息加入到MessageQueue裏面。

上面就是Android中消息機制的一個整體流程,也是 “Android中Handler,Looper,MessageQueue,Message有什麼關係?” 的答案。通過上面的流程可以發現Handler在消息機制中的地位,是作為輔助類或者工具類存在的,用來供開發者使用。

對於這個流程有兩個疑問:

  • Looper中是如何能調用到Handler的方法的?
  • Handler是如何能往MessageQueue中插入消息的?

這兩個問題會在後面給出答案,下面先來通過源碼,分析一下這個過程的具體細節:

二、消息機制的源碼分析

首先main()方法位於ActivityThread.java類裏面,這是一個隱藏類,源碼位置:frameworks/base/core/java/android/app/ActivityThread.java

public static void main(String[] args) {
    ......
    Looper.prepareMainLooper();

    ActivityThread thread = new ActivityThread();
    thread.attach(false);

    if (sMainThreadHandler == null) {
        sMainThreadHandler = thread.getHandler();
    }

    Looper.loop();

    throw new RuntimeException("Main thread loop unexpectedly exited");
}

Looper的創建可以通過Looper.prepare()來完成,上面的代碼中prepareMainLooper()是給主線程創建Looper使用的,本質也是調用的prepare()方法。創建Looper以後就可以調用Looper.loop()開啟循環了。main方法很簡單,不多說了,下面看看Looper被創建的時候做了什麼,下面是Looper的prepare()方法和變量sThreadLocal:

static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();

private static void prepare(boolean quitAllowed) {
    if (sThreadLocal.get() != null) {
        throw new RuntimeException("Only one Looper may be created per thread");
    }
    sThreadLocal.set(new Looper(quitAllowed));
}

很簡單,new了一個Looper,並把new出來的Looper保存到ThreadLocal裏面。ThreadLocal是什麼?它是一個用來存儲數據的類,類似HashMap、ArrayList等集合類。它的特點是可以在指定的線程中存儲數據,然後取數據只能取到當前線程的數據,比如下面的代碼:

ThreadLocal<Integer> mThreadLocal = new ThreadLocal<>();
private void testMethod() {

    mThreadLocal.set(0);
    Log.d(TAG, "main  mThreadLocal=" + mThreadLocal.get());

    new Thread("Thread1") {
        @Override
        public void run() {
            mThreadLocal.set(1);
            Log.d(TAG, "Thread1  mThreadLocal=" + mThreadLocal.get());
        }
    }.start();

    new Thread("Thread2") {
        @Override
        public void run() {
            mThreadLocal.set(2);
            Log.d(TAG, "Thread1  mThreadLocal=" + mThreadLocal.get());
        }
    }.start();

    Log.d(TAG, "main  mThreadLocal=" + mThreadLocal.get());
}

輸出的log是

main  mThreadLocal=0
Thread1  mThreadLocal=1
Thread2  mThreadLocal=2
main  mThreadLocal=0

通過上面的例子可以清晰的看到ThreadLocal存取數據的特點,只能取到當前所在線程存的數據,如果所在線程沒存數據,取出來的就是null。其實這個效果可以通過HashMap<Thread, Object>來實現,考慮線程安全的話使用ConcurrentMap<Thread, Object>,不過使用Map會有一些麻煩的事要處理,比如當一個線程結束的時候我們如何刪除這個線程的對象副本呢?如果使用ThreadLocal就不用有這個擔心了,ThreadLocal保證每個線程都保持對其線程局部變量副本的隱式引用,只要線程是活動的並且 ThreadLocal 實例是可訪問的;在線程消失之後,其線程局部實例的所有副本都會被垃圾回收(除非存在對這些副本的其他引用)。更多ThreadLocal的講解參考:Android線程管理之ThreadLocal理解及應用場景

好了回到正題,prepare()創建Looper的時候同時把創建的Looper存儲到了ThreadLocal中,通過對ThreadLocal的介紹,獲取Looper對象就很簡單了,sThreadLocal.get()即可,源碼提供了一個public的靜態方法可以在主線程的任何地方獲取這個主線程的Looper(注意一下方法名myLooper(),多個地方會用到):

public static @Nullable Looper myLooper() {
    return sThreadLocal.get();
}

Looper創建完了,接下來開啟循環,loop方法的關鍵代碼如下:

public static void loop() {
    final Looper me = myLooper();
    if (me == null) {
        throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
    }
    final MessageQueue queue = me.mQueue;

    for (;;) {
        Message msg = queue.next(); // might block
        if (msg == null) {
            // No message indicates that the message queue is quitting.
            return;
        }

        try {
            msg.target.dispatchMessage(msg);
        } finally {
            if (traceTag != 0) {
                Trace.traceEnd(traceTag);
            }
        }

        msg.recycleUnchecked();
    }
}

上面的代碼,首先獲取主線程的Looper對象,然後取得Looper中的消息隊列final MessageQueue queue = me.mQueue;,然後下面是一個死循環,不斷的從消息隊列里取消息Message msg = queue.next();,可以看到取出的消息是一個Message對象,如果消息隊列里沒有消息,就會阻塞在這行代碼,等到有消息來的時候會被喚醒。取到消息以後,通過msg.target.dispatchMessage(msg);來處理消息,msg.target 是一個Handler對象,所以這個時候就調用到我們重寫的Hander的handleMessage()方法了。
msg.target 是在什麼時候被賦值的呢?要找到這個答案很容易,msg.target是被封裝在消息裏面的,肯定要從發送消息那裡開始找,看看Message是如何封裝的。那麼就從Handler的sendMessage(msg)方法開始,過程如下:

public final boolean sendMessage(Message msg) {
    return sendMessageDelayed(msg, 0);
}

public final boolean sendMessageDelayed(Message msg, long delayMillis) {
    if (delayMillis < 0) {
        delayMillis = 0;
    }
    return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}

public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
    MessageQueue queue = mQueue;
    if (queue == null) {
        RuntimeException e = new RuntimeException(
                this + " sendMessageAtTime() called with no mQueue");
        Log.w("Looper", e.getMessage(), e);
        return false;
    }
    return enqueueMessage(queue, msg, uptimeMillis);
}

private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
    msg.target = this;
    if (mAsynchronous) {
        msg.setAsynchronous(true);
    }
    return queue.enqueueMessage(msg, uptimeMillis);
}

可以看到最後的enqueueMessage()方法中msg.target = this;,這裏就把發送消息的handler封裝到了消息中。同時可以看到,發送消息其實就是往MessageQueue裏面插入了一條消息,然後Looper裏面的循環就可以處理消息了。Handler裏面的消息隊列是怎麼來的呢?從上面的代碼可以看到enqueueMessage()裏面的queue是從sendMessageAtTime傳來的,也就是mQueue。然後看mQueue是在哪初始化的,看Handler的構造方法如下:

public Handler(Callback callback, boolean async) {
    if (FIND_POTENTIAL_LEAKS) {
        final Class<? extends Handler> klass = getClass();
        if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
                (klass.getModifiers() & Modifier.STATIC) == 0) {
            Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
                klass.getCanonicalName());
        }
    }

    mLooper = Looper.myLooper();
    if (mLooper == null) {
        throw new RuntimeException(
            "Can't create handler inside thread that has not called Looper.prepare()");
    }
    mQueue = mLooper.mQueue;
    mCallback = callback;
    mAsynchronous = async;
}

mQueue的初始化很簡單,首先取得Handler所在線程的Looper,然後取出Looper中的mQueue。這也是Handler為什麼必須在有Looper的線程中才能使用的原因,拿到mQueue就可以很容易的往Looper的消息隊列里插入消息了(配合Looper的循環+阻塞就實現了發送接收消息的效果)。

以上就是主線程中消息機制的原理。

那麼,在任何線程下使用handler的如下做法的原因、原理、內部流程等就非常清晰了:

new Thread() {
    @Override
    public void run() {
        Looper.prepare();
        Handler handler = new Handler();
        Looper.loop();
    }
}.start();
  1. 首先Looper.prepare()創建Looper並初始化Looper持有的消息隊列MessageQueue,創建好后將Looper保存到ThreadLocal中方便Handler直接獲取。
  2. 然後Looper.loop()開啟循環,從MessageQueue裏面取消息並調用handler的 dispatchMessage(msg) 方法處理消息。如果MessageQueue里沒有消息,循環就會阻塞進入休眠狀態,等有消息的時候被喚醒處理消息。
  3. 再然後我們new Handler()的時候,Handler構造方法中獲取Looper並且拿到Looper的MessageQueue對象。然後Handler內部就可以直接往MessageQueue裏面插入消息了,插入消息即發送消息,這時候有消息了就會喚醒Looper循環去處理消息。處理消息就是調用dispatchMessage(msg) 方法,最終調用到我們重寫的Handler的handleMessage()方法。

三、通過一些問題的研究加強對消息機制的理解

源碼分析完了,下面看一下文章開頭的兩個問題:

  • Looper中是如何能調用到Handler的方法的?
  • Handler是如何能往MessageQueue中插入消息的?

這兩個問題源碼分析中已經給出答案,這裏做一下總結,首先搞清楚以下對象在消息機制中的關係:

Looper,MessageQueue,Message,ThreadLocal,Handler
  1. Looper對象有一個成員MessageQueue,MessageQueue是一個消息隊列,用來存儲消息Message
  2. Message消息中帶有一個handler對象,所以Looper取出消息后,可以很方便的調用到Handler的方法(問題1解決)
  3. Message是如何帶有handler對象的?是handler在發送消息的時候把自己封裝到消息里的。
  4. Handler是如何發送消息的?是通過獲取Looper對象從而取得Looper裏面的MessageQueue,然後Handler就可以直接往MessageQueue裏面插入消息了。(問題2解決)
  5. Handler是如何獲取Looper對象的?Looper在創建的時候同時把自己保存到ThreadLocal中,並提供一個public的靜態方法可以從ThreadLocal中取出Looper,所以Handler的構造方法里可以直接調用靜態方法取得Looper對象。

帶着上面的一系列問題看源碼就很清晰了,下面是知乎上的一個問答:

Android中為什麼主線程不會因為Looper.loop()里的死循環卡死?

原因很簡單,循環里有阻塞,所以死循環並不會一直執行,相反的,大部分時間是沒有消息的,所以主線程大多數時候都是處於休眠狀態,也就不會消耗太多的CPU資源導致卡死。

  1. 阻塞的原理是使用Linux的管道機制實現的
  2. 主線程沒有消息處理時阻塞在管道的讀端
  3. binder線程會往主線程消息隊列里添加消息,然後往管道寫端寫一個字節,這樣就能喚醒主線程從管道讀端返回,也就是說looper循環里queue.next()會調用返回…

這裏說到binder線程,具體的實現細節不必深究,考慮下面的問題:
主線程的死循環如何處理其它事務?
首先需要看懂這個問題,主線程進入Looper死循環后,如何處理其他事務,比如activity的各個生命周期的回調函數是如何被執行到的(注意這裡是在同一個線程下,代碼是按順序執行的,如果在死循環這阻塞了,那麼進入死循環后循環以外的代碼是如何執行的)。
首先再看main函數的源碼

Looper.prepareMainLooper();

ActivityThread thread = new ActivityThread();
thread.attach(false);

if (sMainThreadHandler == null) {
    sMainThreadHandler = thread.getHandler();
}

Looper.loop();

在Looper.prepare和Looper.loop之間new了一個ActivityThread並調用了它的attach方法,這個方法就是開啟binder線程的,另外new ActivityThread()的時候同時會初始化它的一個H類型的成員,H是一個繼承了Handler的類。此時的結果就是:在主線程開啟loop死循環之前,已經啟動binder線程,並且準備好了一個名為H的Handler,那麼接下來在主線程死循環之外做一些事務處理就很簡單了,只需要通過binder線程向H發送消息即可,比如發送 H.LAUNCH_ACTIVITY 消息就是通知主線程調用Activity.onCreate() ,當然不是直接調用,H收到消息後會進行一系列複雜的函數調用最終調用到Activity.onCreate()。
至於誰來控制binder線程來向H發消息就不深入研究了,下面是《Android開發藝術探索》裏面的一段話:

ActivityThread 通過 ApplicationThread 和 AMS 進行進程間通訊,AMS 以進程間通信的方式完成 ActivityThread 的請求後會回調 ApplicationThread 中的 Binder 方法,然後 ApplicationThread 會向 H 發送消息,H 收到消息後會將 ApplicationThread 中的邏輯切換到 ActivityThread 中去執行,即切換到主線程中去執行,這個過程就是主線程的消息循環模型。

這個問題就到這裏,更多內容看知乎原文

最後

和其他系統相同,Android應用程序也是依靠消息驅動來工作的。網上的這句話還是很有道理的。

文章參考:

《Android開發藝術探索》
Android中為什麼主線程不會因為Looper.loop()里的死循環卡死?
Android線程管理之ThreadLocal理解及應用場景
Android 消息機制——你真的了解Handler
Android Handler到底是什麼

【精選推薦文章】

智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

想知道網站建置、網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計及後台網頁設計

帶您來看台北網站建置台北網頁設計,各種案例分享

廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

C#中多線程中變量研究

今天在知乎上看到一個問題【為什麼在同一進程中創建不同線程,但線程各自的變量無法在線程間互相訪問?】。在多線程中,每個線程都是獨立運行的,不同的線程有可能是同一段代碼,但不會是同一作用域,所以不會共享。而共享內存,並沒有作用域之分,同一進程內,不管什麼線程都可以通過同一虛擬內存地址來訪問,不同進程也可以通過ipc等方式共享內存數據。全局變量:任何線程都可以訪問;局部變量(棧變量):任何線程執行到該函數時均可訪問,函數外不可訪問;線程變量:每個線程只能訪問自己的那個拷貝,其他線程不可見。今天就用C#來實現同一段代碼的不同線程,全局變量、局部變量、線程變量。

了解進程與線程

什麼是多任務,簡單來說就是操作系統同時可以運行多個任務。例如:一遍聽歌,一遍寫文檔等。多核CPU可以執行多任務,但是單核CPU也可以執行多任務,CPU是順序執行的,操作系統讓任務輪流執行,例如:聽歌執行一次,停頓0.01s,寫文檔執行一次,停頓0.01s等等。由於CPU的執行速度很快,我們感覺就像所有的任務都是同時執行。對操作系統來說,一個任務就是一個進程,一個進程至少有一個線程。進程是資源分配的最小單位,線程是CPU調度的最小單位。

普通的程序寫法

private static List<int> data = Enumerable.Range(1, 1000).ToList();

public static void SimpleTest()
{
    for (int i = 0; i < 10; i++)
    {
        List<int> tempData = new List<int>();
        foreach (var d in data)
        {
            tempData.Add(d);
        }
        Console.WriteLine($"i:{i},合計:{data.Sum()},是否相等:{data.Sum() == tempData.Sum()}");
    }

    Console.WriteLine("單線程運行結束");
}

多線程寫法

private static List<int> data = Enumerable.Range(1, 1000).ToList();

public static async Task MoreTaskTestAsync()
{
    List<Task> tasks = new List<Task>();
    for (int i = 0; i < 10; i++)
    {
        var tempi = i;
        var t = Task.Run(() =>
        {
            List<int> tempData = new List<int>();
            foreach (var d in data)
            {
                tempData.Add(d);
            }
            Console.WriteLine($"i:{tempi},合計:{data.Sum()},是否相等:{data.Sum() == tempData.Sum()}");
        });
        tasks.Add(t);
    }

    await Task.WhenAll(tasks); //或者Task.WaitAll(tasks.ToArray());
    Console.WriteLine("多線程運行結束");
}

不同的線程同一段代碼,但不會是同一作用域,所以tempData數據沒有互相影響。

全局變量:data,多個線程都可以訪問,list只讀的時候是線性安全
局部變量:i就是局部變量,訪問的線程可以訪問,去掉【var tempi = i;】,運行結果打印出來,值都是一樣的,增加的都是每個線程都訪問單獨的tempi變量

i:10,合計:500500,是否相等:True
i:10,合計:500500,是否相等:True
i:10,合計:500500,是否相等:True
i:10,合計:500500,是否相等:True
i:10,合計:500500,是否相等:True
i:10,合計:500500,是否相等:True
i:10,合計:500500,是否相等:True
i:10,合計:500500,是否相等:True
i:10,合計:500500,是否相等:True
i:10,合計:500500,是否相等:True

線程變量:tempData,每個線程只訪問自己的,互不影響,運行結果

i:3,合計:500500,是否相等:True
i:6,合計:500500,是否相等:True
i:0,合計:500500,是否相等:True
i:1,合計:500500,是否相等:True
i:4,合計:500500,是否相等:True
i:2,合計:500500,是否相等:True
i:7,合計:500500,是否相等:True
i:5,合計:500500,是否相等:True
i:8,合計:500500,是否相等:True
i:9,合計:500500,是否相等:True

寫多線程的時候需要注意,變量的作用域,否則程序運行出來的結果將不會是想要的結果,注意,注意變量作用域。

【精選推薦文章】

自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

MySQL數據庫詳解(三)MySQL的事務隔離剖析

提到事務,你肯定不陌生,和數據庫打交道的時候,我們總是會用到事務。最經典的例子就是轉賬,你要給朋友小王轉 100 塊錢,而此時你的銀行卡只有 100 塊錢。

轉賬過程具體到程序里會有一系列的操作,比如查詢餘額、做加減法、更新餘額等,這些操作必須保證是一體的,不然等程序查完之後,還沒做減法之前,你這 100 塊錢,完全可以藉著這個時間差再查一次,然後再給另外一個朋友轉賬,如果銀行這麼整,不就亂了么?這時就要用到“事務”這個概念了。

簡單來說,事務就是要保證一組數據庫操作,要麼全部成功,要麼全部失敗。在 MySQL 中,事務支持是在引擎層實現的。你現在知道,MySQL 是一個支持多引擎的系統,但並不是所有的引擎都支持事務。比如 MySQL 原生的 MyISAM 引擎就不支持事務,這也是 MyISAM 被InnoDB 取代的重要原因之一。

今天的文章里,我將會以 InnoDB 為例,剖析 MySQL 在事務支持方面的特定實現,並基於原理給出相應的實踐建議,希望這些案例能加深你對 MySQL 事務原理的理解。

隔離性與隔離級別

提到事務,你肯定會想到 ACID(Atomicity、Consistency、Isolation、Durability,即原子性、一致性、隔離性、持久性),今天我們就來說說其中 I,也就是“隔離性”。

當數據庫上有多個事務同時執行的時候,就可能出現臟讀(dirty read)、不可重複讀(non reapeatable read)、幻讀(phantom read)的問題,為了解決這些問題,就有了“隔離級別”的概念。

在談隔離級別之前,你首先要知道,你隔離得越嚴實,效率就會越低。因此很多時候,我們都要在二者之間尋找一個平衡點。SQL 標準的事務隔離級別包括:讀未提交(read uncommitted)、讀提交(read committed)、可重複讀(repeatable read)和串行化serializable )。

下面我逐一為你解釋:

  • 讀未提交是指,一個事務還沒提交時,它做的變更就能被別的事務看到。
  • 讀提交是指,一個事務提交之後,它做的變更才會被其他事務看到。
  • 可重複讀是指,一個事務執行過程中看到的數據,總是跟這個事務在啟動時看到的數據是一致的。當然在可重複讀隔離級別下,未提交變更對其他事務也是不可見的。
  • 串行化,顧名思義是對於同一行記錄,“寫”會加“寫鎖”,“讀”會加“讀鎖”。當出現讀寫鎖衝突的時候,后訪問的事務必須等前一個事務執行完成,才能繼續執行。
  • 其中“讀提交”和“可重複讀”比較難理解,所以我用一個例子說明這幾種隔離級別。假設數據表 T 中只有一列,其中一行的值為 1,下面是按照時間順序執行兩個事務的行為。
mysql> create table T(c int) engine=InnoDB;
insert into T(c) values(1);

我們來看看在不同的隔離級別下,事務 A 會有哪些不同的返回結果,也就是圖裡面 V1、V2、V3 的返回值分別是什麼。

若隔離級別是“讀未提交”, 則 V1 的值就是 2。這時候事務 B 雖然還沒有提交,但是結果已經被 A 看到了。因此,V2、V3 也都是 2。

若隔離級別是“讀提交”,則 V1 是 1,V2 的值是 2。事務 B 的更新在提交后才能被 A 看到。所以, V3 的值也是 2。

若隔離級別是“可重複讀”,則 V1、V2 是 1,V3 是 2。之所以 V2 還是 1,遵循的就是這個要求:事務在執行期間看到的數據前後必須是一致的。

若隔離級別是“串行化”,則在事務 B 執行“將 1 改成 2”的時候,會被鎖住。直到事務 A提交后,事務 B 才可以繼續執行。所以從 A 的角度看, V1、V2 值是 1,V3 的值是 2。

在實現上,數據庫裏面會創建一個視圖,訪問的時候以視圖的邏輯結果為準。在“可重複讀”隔離級別下,這個視圖是在事務啟動時創建的,整個事務存在期間都用這個視圖。在“讀提交”隔離級別下,這個視圖是在每個 SQL 語句開始執行的時候創建的。這裏需要注意的是,“讀未提交”隔離級別下直接返回記錄上的最新值,沒有視圖概念;而“串行化”隔離級別下直接用加鎖的方式來避免并行訪問。

我們可以看到在不同的隔離級別下,數據庫行為是有所不同的。Oracle 數據庫的默認隔離級別其實就是“讀提交”,因此對於一些從 Oracle 遷移到 MySQL 的應用,為保證數據庫隔離級別的一致,你一定要記得將 MySQL 的隔離級別設置為“讀提交”。

配置的方式是,將啟動參數 transaction-isolation 的值設置成 READ-COMMITTED。你可以用show variables 來查看當前的值。

mysql> show variables like 'transaction_isolation';
+-----------------------+----------------+
| Variable_name | Value |
+-----------------------+----------------+
| transaction_isolation | READ-COMMITTED |
+-----------------------+----------------+

總結來說,存在即合理,哪個隔離級別都有它自己的使用場景,你要根據自己的業務情況來定。我想你可能會問那什麼時候需要“可重複讀”的場景呢?我們來看一個數據校對邏輯的案例。

假設你在管理一個個人銀行賬戶表。一個表存了每個月月底的餘額,一個表存了賬單明細。這時候你要做數據校對,也就是判斷上個月的餘額和當前餘額的差額,是否與本月的賬單明細一致。

你一定希望在校對過程中,即使有用戶發生了一筆新的交易,也不影響你的校對結果。這時候使用“可重複讀”隔離級別就很方便。事務啟動時的視圖可以認為是靜態的,不受其他事務更新的影響。

事務隔離的實現

理解了事務的隔離級別,我們再來看看事務隔離具體是怎麼實現的。這裏我們展開說明“可重複讀”。

在 MySQL 中,實際上每條記錄在更新的時候都會同時記錄一條回滾操作。記錄上的最新值,通過回滾操作,都可以得到前一個狀態的值。

假設一個值從 1 被按順序改成了 2、3、4,在回滾日誌裏面就會有類似下面的記錄。

當前值是 4,但是在查詢這條記錄的時候,不同時刻啟動的事務會有不同的 read-view。如圖中看到的,在視圖 A、B、C 裏面,這一個記錄的值分別是 1、2、4,同一條記錄在系統中可以存在多個版本,就是數據庫的多版本併發控制(MVCC)。對於 read-view A,要得到 1,就必須將當前值依次執行圖中所有的回滾操作得到。

同時你會發現,即使現在有另外一個事務正在將 4 改成 5,這個事務跟 read-view A、B、C 對應的事務是不會衝突的。

你一定會問,回滾日誌總不能一直保留吧,什麼時候刪除呢?答案是,在不需要的時候才刪除。也就是說,系統會判斷,當沒有事務再需要用到這些回滾日誌時,回滾日誌會被刪除。

什麼時候才不需要了呢?就是當系統里沒有比這個回滾日誌更早的 read-view 的時候。基於上面的說明,我們來討論一下為什麼建議你盡量不要使用長事務。

長事務意味着系統裏面會存在很老的事務視圖。由於這些事務隨時可能訪問數據庫裏面的任何數據,所以這個事務提交之前,數據庫裏面它可能用到的回滾記錄都必須保留,這就會導致大量佔用存儲空間。

在 MySQL 5.5 及以前的版本,回滾日誌是跟數據字典一起放在 ibdata 文件里的,即使長事務最終提交,回滾段被清理,文件也不會變小。我見過數據只有 20GB,而回滾段有 200GB 的庫。最終只好為了清理回滾段,重建整個庫。

除了對回滾段的影響,長事務還佔用鎖資源,也可能拖垮整個庫,這個我們會在後面講鎖的時候展開。

事務的啟動方式

如前面所述,長事務有這些潛在風險,我當然是建議你盡量避免。其實很多時候業務開發同學並不是有意使用長事務,通常是由於誤用所致。MySQL 的事務啟動方式有以下幾種:

  1. 顯式啟動事務語句, begin 或 start transaction。配套的提交語句是 commit,回滾語句
    是 rollback。
  2. set autocommit=0,這個命令會將這個線程的自動提交關掉。意味着如果你只執行一個select 語句,這個事務就啟動了,而且並不會自動提交。這個事務持續存在直到你主動執行commit 或 rollback 語句,或者斷開連接。

有些客戶端連接框架會默認連接成功后先執行一個 set autocommit=0 的命令。這就導致接下來的查詢都在事務中,如果是長連接,就導致了意外的長事務。

因此,我會建議你總是使用 set autocommit=1, 通過顯式語句的方式來啟動事務。

但是有的開發同學會糾結“多一次交互”的問題。對於一個需要頻繁使用事務的業務,第二種方式每個事務在開始時都不需要主動執行一次 “begin”,減少了語句的交互次數。如果你也有這個顧慮,我建議你使用 commit work and chain 語法。

在 autocommit 為 1 的情況下,用 begin 顯式啟動的事務,如果執行 commit 則提交事務。如果執行 commit work and chain,則是提交事務並自動啟動下一個事務,這樣也省去了再次執行 begin 語句的開銷。同時帶來的好處是從程序開發的角度明確地知道每個語句是否處於事務中。

你可以在 information_schema 庫的 innodb_trx 這個表中查詢長事務,比如下面這個語句,用於查找持續時間超過 60s 的事務。

select * from information_schema.innodb_trx where TIME_TO_SEC(timediff(now(),trx_started))>60

小結

這篇文章裏面,我介紹了 MySQL 的事務隔離級別的現象和實現,根據實現原理分析了長事務存在的風險,以及如何用正確的方式避免長事務。希望我舉的例子能夠幫助你理解事務,並更好地使用 MySQL 的事務特性。

我給你留一個問題吧。你現在知道了系統裏面應該避免長事務,如果你是業務開發負責人同時也是數據庫負責人,你會有什麼方案來避免出現或者處理這種情況呢?

【精選推薦文章】

智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

想知道網站建置、網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計及後台網頁設計

帶您來看台北網站建置台北網頁設計,各種案例分享

廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益