[源碼解析] Flink的groupBy和reduce究竟做了什麼

2{icon} {views}

[源碼解析] Flink的groupBy和reduce究竟做了什麼

目錄

  • [源碼解析] Flink的groupBy和reduce究竟做了什麼
    • 0x00 摘要
    • 0x01 問題和概括
      • 1.1 問題
      • 1.2 概括
    • 0x02 背景概念
      • 2.1 MapReduce細分
      • 2.2 MapReduce細分
      • 2.3 Combine
      • 2.4 Partition
      • 2.5 Shuffle
      • 2.6 Reducer
    • 0x03 代碼
    • 0x04 從Flink JAVA API入手挖掘
      • 4.1 GroupBy是個輔助概念
        • 4.1.1 Grouping
        • 4.1.2 UnsortedGrouping
      • 4.2 reduce才是算子
    • 0x05 批處理執行計劃(Plan)
    • 0x06 批處理優化計劃(Optimized Plan)
    • 0x07 JobGraph
    • 0x08 Runtime
      • 8.1 FlatMap
        • 8.1.1 Combine
        • 8.1.2 Partition
      • 8.2 UnilateralSortMerger
        • 8.2.1 三種線程
        • 8.2.2 MutableObjectIterator
      • 8.3 ReduceDriver
    • 0x09 參考

0x00 摘要

Groupby和reduce是大數據領域常見的算子,但是很多同學應該對其背後機制不甚了解。本文將從源碼入手,為大家解析Flink中Groupby和reduce的原理,看看他們在背後做了什麼。

0x01 問題和概括

1.1 問題

探究的原因是想到了幾個問題 :

  • groupby的算子會對數據進行排序嘛。
  • groupby和reduce過程中究竟有幾次排序。
  • 如果有多個groupby task,什麼機制保證所有這些grouby task的輸出中,同樣的key都分配給同一個reducer。
  • groupby和reduce時候,有沒有Rebalance 重新分配。
  • reduce算子會不會重新劃分task。
  • reduce算子有沒有可能和前後的其他算子組成Operator Chain。

1.2 概括

為了便於大家理解,我們先總結下,對於一個Groupby + Reduce的操作,Flink做了如下處理:

  • Group其實沒有真實對應的算子,它只是在在reduce過程之前的一个中間步驟或者輔助步驟。
  • 在Flink生成批處理執行計劃后,有意義的結果是Reduce算子。
  • 為了更好的reduce,Flink在reduce之前大量使用了Combine操作。Combine可以理解為是在map端的reduce的操作,對單個map任務的輸出結果數據進行合併的操作。
  • 在Flink生成批處理優化計劃(Optimized Plan)之後,會把reduce分割成兩段,一段是SORTED_PARTIAL_REDUCE,一段是SORTED_REDUCE。
  • SORTED_PARTIAL_REDUCE就是Combine。
  • Flink生成JobGraph之後,Flink形成了一個Operator Chain:Reduce(SORTED_PARTIAL_REDUCE)和其上游合併在一起。
  • Flink用Partitioner來保證多個 grouby task 的輸出中同樣的key都分配給同一個reducer。
  • groupby和reduce過程中至少有三次排序:
    • combine
    • sort + merge
    • reduce

這樣之前的疑問就基本得到了解釋。

0x02 背景概念

2.1 MapReduce細分

MapReduce是一種編程模型,用於大規模數據集的并行運算。概念 “Map(映射)”和”Reduce(歸約)” 是它們的主要思想,其是從函數式編程語言,矢量編程語言里借來的特性。

我們目前使用的Flink,Spark都出自於MapReduce,所以我們有必有追根溯源,看看MapReduce是如何區分各個階段的。

2.2 MapReduce細分

如果把MapReduce細分,可以分為一下幾大過程:

  • Input-Split(輸入分片):此過程是將從HDFS上讀取的文件分片,然後送給Map端。有多少分片就有多少Mapper,一般分片的大小和HDFS中的塊大小一致。
  • Shuffle-Spill(溢寫):每個Map任務都有一個環形緩衝區。一旦緩衝區達到閾值80%,一個後台線程便開始把內容“溢寫”-“spill”到磁盤。在溢寫過程中,map將繼續輸出到剩餘的20%空間中,互不影響,如果緩衝區被填滿map會被堵塞直到寫磁盤完成。
  • Shuffle-Partition(分區):由於每個Map可能處理的數據量不同,所以到達reduce有可能會導致數據傾斜。分區可以幫助我們解決這一問題,在shuffle過程中會按照默認key的哈希碼對分區數量取余,reduce便根據分區號來拉取對應的數據,達到數據均衡。分區數量對應Reduce個數。
  • Shuffle-Sort(排序):在分區后,會對此分區的數據進行內排序,排序過程會穿插在整個MapReduce中,在很多地方都存在。
  • Shuffle-Group(分組):分組過程會把key相同的value分配到一個組中,wordcount程序就利用了分組這一過程。
  • Shuffle-Combiner(組合):這一過程我們可以理解為一個小的Reduce階段,當數據量大的時候可以在map過程中執行一次combine,這樣就相當於在map階段執行了一次reduce。由於reduce和map在不同的節點上運行,所以reduce需要遠程拉取數據,combine就可以有效降低reduce拉取數據的量,減少網絡負荷(這一過程默認是不開啟的,在如求平均值的mapreduce程序中不要使用combine,因為會影響結果)。
  • Compress(壓縮):在緩衝區溢寫磁盤的時候,可以對數據進行壓縮,節約磁盤空間,同樣減少給reducer傳遞的數據量。
  • Reduce-Merge(合併):reduce端會拉取各個map輸出結果對應的分區文件,這樣reduce端就會有很多文件,所以在此階段,reduce再次將它們合併/排序再送入reduce執行。
  • Output(輸出):在reduce階段,對已排序輸出中的每個鍵調用reduce函數。此階段的輸出直接寫到輸出文件系統,一般為HDFS。

2.3 Combine

Combine是我們需要特殊注意的。在mapreduce中,map多,reduce少。在reduce中由於數據量比較多,所以我們乾脆在map階段中先把自己map裏面的數據歸類,這樣到了reduce的時候就減輕了壓力。

Combine可以理解為是在map端的reduce的操作,對單個map任務的輸出結果數據進行合併的操作。combine是對一個map的,而reduce合併的對象是對於多個map

map函數操作所產生的鍵值對會作為combine函數的輸入,經combine函數處理后再送到reduce函數進行處理,減少了寫入磁盤的數據量,同時也減少了網絡中鍵值對的傳輸量。在Map端,用戶自定義實現的Combine優化機制類Combiner在執行Map端任務的節點本身運行,相當於對map函數的輸出做了一次reduce。

集群上的可用帶寬往往是有限的,產生的中間臨時數據量很大時就會出現性能瓶頸,因此應該盡量避免Map端任務和Reduce端任務之間大量的數據傳輸。使用Combine機制的意義就在於使Map端輸出更緊湊,使得寫到本地磁盤和傳給Reduce端的數據更少。

2.4 Partition

Partition是分割map每個節點的結果,按照key分別映射給不同的reduce,mapreduce使用哈希HashPartitioner幫我們歸類了。這個我們也可以自定義。

這裏其實可以理解歸類。我們對於錯綜複雜的數據歸類。比如在動物園裡有牛羊雞鴨鵝,他們都是混在一起的,但是到了晚上他們就各自牛回牛棚,羊回羊圈,雞回雞窩。partition的作用就是把這些數據歸類。只不過是在寫程序的時候,

在經過mapper的運行后,我們得知mapper的輸出是這樣一個key/value對: key是“aaa”, value是數值1。因為當前map端只做加1的操作,在reduce task里才去合併結果集。假如我們知道這個job有3個reduce task,到底當前的“aaa”應該交由哪個reduce task去做呢,是需要立刻決定的。

MapReduce提供Partitioner接口,它的作用就是根據key或value及reduce task的數量來決定當前的這對輸出數據最終應該交由哪個reduce task處理。默認對key hash后再以reduce task數量取模。默認的取模方式只是為了平均reduce的處理能力,如果用戶自己對Partitioner有需求,可以訂製並設置到job上。

在我們的例子中,假定 “aaa”經過Partitioner后返回0,也就是這對值應當交由第一個reducer來處理。

2.5 Shuffle

shuffle就是map和reduce之間的過程,包含了兩端的combine和partition。它比較難以理解,因為我們摸不着,看不到它。它屬於mapreduce的框架,編程的時候,我們用不到它。

Shuffle的大致範圍就是:怎樣把map task的輸出結果有效地傳送到reduce端。也可以這樣理解, Shuffle描述着數據從map task輸出到reduce task輸入的這段過程。

2.6 Reducer

簡單地說,reduce task在執行之前的工作就是不斷地拉取當前job里每個map task的最終結果,然後對從不同地方拉取過來的數據不斷地做merge,最終形成一個文件作為reduce task的輸入文件。

0x03 代碼

我們以Flink的KMeans算法作為樣例,具體摘要如下:

public class WordCountExampleReduce {

    DataStream ds;

    public static void main(String[] args) throws Exception {
        //構建環境
        final ExecutionEnvironment env =
                ExecutionEnvironment.getExecutionEnvironment();
        //通過字符串構建數據集
        DataSet<String> text = env.fromElements(
                "Who‘s there?",
                "I think I hear them. Stand, ho! Who‘s there?");
        //分割字符串、按照key進行分組、統計相同的key個數
        DataSet<Tuple2<String, Integer>> wordCounts = text
                .flatMap(new LineSplitter())
                .groupBy(0)
                .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1,
                                          Tuple2<String, Integer> value2) throws Exception {
                        return new Tuple2(value1.f0, value1.f1 + value2.f1);
                    }
                });
        //打印
        wordCounts.print();
    }
    //分割字符串的方法
    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}

輸出是:

(hear,1)
(ho!,1)
(them.,1)
(I,2)
(Stand,,1)
(Who‘s,2)
(there?,2)
(think,1)

0x04 從Flink JAVA API入手挖掘

首先,我們從Flink基本JAVA API來入手開始挖掘。

4.1 GroupBy是個輔助概念

4.1.1 Grouping

我們需要留意的是:GroupBy並沒有對應的Operator。GroupBy只是生成DataSet轉換的一个中間步驟或者輔助步驟

GroupBy功能的基類是Grouping,其只是DataSet轉換的一个中間步驟。其幾個主要成員是:

  • 對應的輸入數據DataSet
  • 分組所基於的keys
  • 用戶自定義的Partitioner
// Grouping is an intermediate step for a transformation on a grouped DataSet.
public abstract class Grouping<T> {
   protected final DataSet<T> inputDataSet;
   protected final Keys<T> keys;
   protected Partitioner<?> customPartitioner;
}

Grouping並沒有任何業務相關的API,具體API都是在其派生類中,比如UnsortedGrouping。

4.1.2 UnsortedGrouping

我們代碼中對應的就是UnsortedGrouping類。我們看到它提供了很多業務API,比如:sum,max,min,reduce,aggregate,reduceGroup,combineGroup…..

回到我們的示例,groupBy做了如下操作

  • 首先,groupBy返回的就是一個UnsortedGrouping,這個UnsortedGrouping是用來轉換DataSet。
  • 其次,.groupBy(0).reduce(new CentroidAccumulator()) 返回的是ReduceOperator。這就對應了前面我們提到的,groupBy只是中間步驟,reduce才能返回一個Operator
public class UnsortedGrouping<T> extends Grouping<T> {
  
    // groupBy返回一個UnsortedGrouping
    public UnsortedGrouping<T> groupBy(int... fields) {
       return new UnsortedGrouping<>(this, new Keys.ExpressionKeys<>(fields, getType()));
    }
  
    // reduce返回一個ReduceOperator
 		public ReduceOperator<T> reduce(ReduceFunction<T> reducer) {
      return new ReduceOperator<T>(this, inputDataSet.clean(reducer), Utils.getCallLocationName());
    } 
}

4.2 reduce才是算子

對於業務來說,reduce才是真正有意義的邏輯算子。

從前文的函數調用和ReduceOperator定義可以看出,.groupBy(0).reduce() 的調用結果是生成一個ReduceOperator,而 UnsortedGrouping 被設置為 ReduceOperator 的 grouper 成員變量,作為輔助操作

public class ReduceOperator<IN> extends SingleInputUdfOperator<IN, IN, ReduceOperator<IN>> {
  
	private final ReduceFunction<IN> function;
	private final Grouping<IN> grouper; // UnsortedGrouping被設置在這裏,後續reduce操作中會用到。

	public ReduceOperator(Grouping<IN> input, ReduceFunction<IN> function, 
                        String defaultName) {
		this.function = function;
		this.grouper = input; // UnsortedGrouping被設置在這裏,後續reduce操作中會用到。
    this.hint = CombineHint.OPTIMIZER_CHOOSES; // 優化時候會用到。
	}
}

讓我們順着Flink程序執行階段繼續看看系統都做了些什麼。

0x05 批處理執行計劃(Plan)

程序執行的第一步是:當程序運行時候,首先會根據java API的結果來生成執行plan。

public JobClient executeAsync(String jobName) throws Exception {
   final Plan plan = createProgramPlan(jobName);
} 

其中重要的函數是translateToDataFlow,因為在translateToDataFlow方法中,會從批處理Java API模塊中operators包往核心模塊中operators包的轉換

對於我們的示例程序,在生成 Graph時,translateToDataFlow會生成一個 SingleInputOperator,為後續runtime使用。下面是代碼縮減版。

protected org.apache.flink.api.common.operators.SingleInputOperator<?, IN, ?> translateToDataFlow(Operator<IN> input) {
    
    ......
      
    // UnsortedGrouping中的keys被取出,  
		else if (grouper.getKeys() instanceof Keys.ExpressionKeys) {

			// reduce with field positions
			ReduceOperatorBase<IN, ReduceFunction<IN>> po =
					new ReduceOperatorBase<>(function, operatorInfo, logicalKeyPositions, name);

			po.setCustomPartitioner(grouper.getCustomPartitioner());
			po.setInput(input);
			po.setParallelism(getParallelism()); // 沒有并行度的變化

			return po;//translateToDataFlow會生成一個 SingleInputOperator,為後續runtime使用
		}	    
  }  
}

我們代碼最終生成的執行計劃如下,我們可以看出來,執行計劃基本符合我們的估計:簡單的從輸入到輸出。中間有意義的算子其實只有Reduce

GenericDataSourceBase ——> FlatMapOperatorBase ——> ReduceOperatorBase ——> GenericDataSinkBase

具體在代碼中體現如下是:

plan = {Plan@1296} 
 sinks = {ArrayList@1309}  size = 1
  0 = {GenericDataSinkBase@1313} "collect()"
   formatWrapper = {UserCodeObjectWrapper@1315} 
   input = {ReduceOperatorBase@1316} "ReduceOperatorBase - Reduce at main(WordCountExampleReduceCsv.java:25)"
    hint = {ReduceOperatorBase$CombineHint@1325} "OPTIMIZER_CHOOSES"
    customPartitioner = null
    input = {FlatMapOperatorBase@1326} "FlatMapOperatorBase - FlatMap at main(WordCountExampleReduceCsv.java:23)"
     input = {GenericDataSourceBase@1339} "at main(WordCountExampleReduceCsv.java:20) (org.apache.flink.api.java.io.TextInputFormat)"

0x06 批處理優化計劃(Optimized Plan)

程序執行的第二步是:Flink對於Plan會繼續優化,生成Optimized Plan。其核心代碼位於PlanTranslator.compilePlan 函數,這裏得到了Optimized Plan。

這個編譯的過程不作任何決策與假設,也就是說作業最終如何被執行早已被優化器確定,而編譯也是在此基礎上做確定性的映射。所以我們將集中精力看如何優化plan。

private JobGraph compilePlan(Plan plan, Configuration optimizerConfiguration) {
   Optimizer optimizer = new Optimizer(new DataStatistics(), optimizerConfiguration);
   OptimizedPlan optimizedPlan = optimizer.compile(plan);

   JobGraphGenerator jobGraphGenerator = new JobGraphGenerator(optimizerConfiguration);
   return jobGraphGenerator.compileJobGraph(optimizedPlan, plan.getJobId());
}

在內部調用plan的accept方法遍歷它。accept會挨個在每個sink上調用accept。對於每個sink會先preVisit,然後 postVisit。

這裏優化時候有幾個注意點:

  1. 在 GraphCreatingVisitor.preVisit 中,當發現Operator是 ReduceOperatorBase 類型的時候,會建立ReduceNode。

    else if (c instanceof ReduceOperatorBase) {
       n = new ReduceNode((ReduceOperatorBase<?, ?>) c);
    }
    
  2. ReduceNode是Reducer Operator的Optimizer表示。

    public class ReduceNode extends SingleInputNode {
    	private final List<OperatorDescriptorSingle> possibleProperties;	
    	private ReduceNode preReduceUtilityNode;
    }
    
  3. 生成ReduceNode時候,會根據之前提到的 hint 來決定 combinerStrategy = DriverStrategy.SORTED_PARTIAL_REDUCE;

    public ReduceNode(ReduceOperatorBase<?, ?> operator) {
    			DriverStrategy combinerStrategy;
    			switch(operator.getCombineHint()) {
    				case OPTIMIZER_CHOOSES:
    					combinerStrategy = DriverStrategy.SORTED_PARTIAL_REDUCE;
    					break;
          }  
    }
    

生成的優化執行計劃如下,我們可以看到,這時候設置了并行度,也把reduce分割成兩段,一段是SORTED_PARTIAL_REDUCE,一段是SORTED_REDUCE

Data Source  ——> FlatMap ——> Reduce(SORTED_PARTIAL_REDUCE)   ——> Reduce(SORTED_REDUCE)  ——> Data Sink

具體在代碼中體現如下是:

optimizedPlan = {OptimizedPlan@1506} 
 
 allNodes = {HashSet@1510}  size = 5
   
  0 = {SourcePlanNode@1512} "Data Source "at main(WordCountExampleReduceCsv.java:20) (org.apache.flink.api.java.io.TextInputFormat)" : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]"
   parallelism = 4

  1 = {SingleInputPlanNode@1513} "FlatMap "FlatMap at main(WordCountExampleReduceCsv.java:23)" : FLAT_MAP [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]"
   parallelism = 4

  2 = {SingleInputPlanNode@1514} "Reduce "Reduce at main(WordCountExampleReduceCsv.java:25)" : SORTED_REDUCE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]"
   parallelism = 4

  3 = {SinkPlanNode@1515} "Data Sink "collect()" : NONE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]"
   parallelism = 4

  4 = {SingleInputPlanNode@1516} "Reduce "Reduce at main(WordCountExampleReduceCsv.java:25)" : SORTED_PARTIAL_REDUCE [[ GlobalProperties [partitioning=RANDOM_PARTITIONED] ]] [[ LocalProperties [ordering=null, grouped=null, unique=null] ]]"
   parallelism = 4

0x07 JobGraph

程序執行的第三步是:建立JobGraph。LocalExecutor.execute中會生成JobGraph。Optimized Plan 經過優化後生成了 JobGraph, JobGraph是提交給 JobManager 的數據結構。

主要的優化為,將多個符合條件的節點 chain 在一起作為一個節點,這樣可以減少數據在節點之間流動所需要的序列化/反序列化/傳輸消耗。

JobGraph是唯一被Flink的數據流引擎所識別的表述作業的數據結構,也正是這一共同的抽象體現了流處理和批處理在運行時的統一

public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception {
   final JobGraph jobGraph = getJobGraph(pipeline, configuration);
}

我們可以看出來,這一步形成了一個Operator Chain:

CHAIN DataSource -> FlatMap -> Combine (Reduce) 

於是我們看到,Reduce(SORTED_PARTIAL_REDUCE)和其上游合併在一起

具體在程序中打印出來:

jobGraph = {JobGraph@1739} "JobGraph(jobId: 30421d78d7eedee6be2c5de39d416eb7)"
 taskVertices = {LinkedHashMap@1742}  size = 3
  
  {JobVertexID@1762} "e2c43ec0df647ea6735b2421fb7330fb" -> {InputOutputFormatVertex@1763} "CHAIN DataSource (at main(WordCountExampleReduceCsv.java:20) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at main(WordCountExampleReduceCsv.java:23)) -> Combine (Reduce at main(WordCountExampleReduceCsv.java:25)) (org.apache.flink.runtime.operators.DataSourceTask)"
  
  {JobVertexID@1764} "2de11f497e827e48dda1d63b458dead7" -> {JobVertex@1765} "Reduce (Reduce at main(WordCountExampleReduceCsv.java:25)) (org.apache.flink.runtime.operators.BatchTask)"
  
  {JobVertexID@1766} "2bee17f2c86aa1e9439e3dedea58007b" -> {InputOutputFormatVertex@1767} "DataSink (collect()) (org.apache.flink.runtime.operators.DataSinkTask)"

0x08 Runtime

Job提交之後,就是程序正式運行了。這裏實際上涉及到了三次排序,

  • 一次是在FlatMap發送時候調用到了ChainedReduceCombineDriver.sortAndCombine。這部分對應了我們之前提到的MapReduce中的Combine和Partition。
  • 一次是在 ReduceDriver 所在的 BatchTask中,由UnilateralSortMerger完成了sort & merge操作。
  • 一次是在ReduceDriver,這裏做了最後的reducer排序。

8.1 FlatMap

這裡是第一次排序

當一批數據處理完成之後,在ChainedFlatMapDriver中調用到close函數進行發送數據給下游。

public void close() {
   this.outputCollector.close();
}

Operator Chain會調用到ChainedReduceCombineDriver.close

public void close() {
   // send the final batch
   try {
      switch (strategy) {
         case SORTED_PARTIAL_REDUCE:
            sortAndCombine(); // 我們是在這裏
            break;
         case HASHED_PARTIAL_REDUCE:
            reduceFacade.emit();
            break;
      }
   } catch (Exception ex2) {
      throw new ExceptionInChainedStubException(taskName, ex2);
   }

   outputCollector.close();
   dispose(false);
}

8.1.1 Combine

sortAndCombine中先排序,然後做combine,最後會不斷髮送數據

private void sortAndCombine() throws Exception {
   final InMemorySorter<T> sorter = this.sorter;

   if (!sorter.isEmpty()) {
      sortAlgo.sort(sorter); // 這裡會先排序

      final TypeSerializer<T> serializer = this.serializer;
      final TypeComparator<T> comparator = this.comparator;
      final ReduceFunction<T> function = this.reducer;
      final Collector<T> output = this.outputCollector;
      final MutableObjectIterator<T> input = sorter.getIterator();

      if (objectReuseEnabled) {
        ......
      } else {
         T value = input.next();

         // 這裏就是combine
         // iterate over key groups
         while (running && value != null) {
            comparator.setReference(value);
            T res = value;

            // iterate within a key group
            while ((value = input.next()) != null) {
               if (comparator.equalToReference(value)) {
                  // same group, reduce
                  res = function.reduce(res, value);
               } else {
                  // new key group
                  break;
               }
            }

            output.collect(res); //發送數據
         }
      }
   }
}

8.1.2 Partition

最後發送給哪個下游,是由OutputEmitter.selectChannel決定的。有如下幾種決定方式:

hash-partitioning, broadcasting, round-robin, custom partition functions。這裏採用的是PARTITION_HASH。

每個task都會把同樣字符串統計結果發送給同樣的下游ReduceDriver。這就保證了下游Reducer一定不會出現統計出錯。

public final int selectChannel(SerializationDelegate<T> record) {
   switch (strategy) {
   ...
   case PARTITION_HASH:
      return hashPartitionDefault(record.getInstance(), numberOfChannels);
   ...
   }
}

private int hashPartitionDefault(T record, int numberOfChannels) {
	int hash = this.comparator.hash(record);
	return MathUtils.murmurHash(hash) % numberOfChannels;
}

具體調用棧:

hash:50, TupleComparator (org.apache.flink.api.java.typeutils.runtime)
hash:30, TupleComparator (org.apache.flink.api.java.typeutils.runtime)
hashPartitionDefault:187, OutputEmitter (org.apache.flink.runtime.operators.shipping)
selectChannel:147, OutputEmitter (org.apache.flink.runtime.operators.shipping)
selectChannel:36, OutputEmitter (org.apache.flink.runtime.operators.shipping)
emit:60, ChannelSelectorRecordWriter (org.apache.flink.runtime.io.network.api.writer)
collect:65, OutputCollector (org.apache.flink.runtime.operators.shipping)
collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics)
sortAndCombine:254, ChainedReduceCombineDriver (org.apache.flink.runtime.operators.chaining)
close:266, ChainedReduceCombineDriver (org.apache.flink.runtime.operators.chaining)
close:40, CountingCollector (org.apache.flink.runtime.operators.util.metrics)
close:88, ChainedFlatMapDriver (org.apache.flink.runtime.operators.chaining)
invoke:215, DataSourceTask (org.apache.flink.runtime.operators)
doRun:707, Task (org.apache.flink.runtime.taskmanager)
run:532, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)

8.2 UnilateralSortMerger

這裡是第二次排序

在 BatchTask中,會先Sort, Merge輸入,然後才會交由Reduce來具體完成過。sort & merge操作具體是在UnilateralSortMerger類中完成的。

getIterator:646, UnilateralSortMerger (org.apache.flink.runtime.operators.sort)
getInput:1110, BatchTask (org.apache.flink.runtime.operators)
prepare:95, ReduceDriver (org.apache.flink.runtime.operators)
run:474, BatchTask (org.apache.flink.runtime.operators)
invoke:369, BatchTask (org.apache.flink.runtime.operators)
doRun:707, Task (org.apache.flink.runtime.taskmanager)
run:532, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)

UnilateralSortMerger是一個full fledged sorter,它實現了一個多路merge sort。其內部的邏輯被劃分到三個線程上(read, sort, spill),這三個線程彼此之間通過一系列blocking queues來構成了一個閉環。

其內存通過MemoryManager分配,所以這個組件不會超過給其分配的內存。

該類主要變量摘錄如下:

public class UnilateralSortMerger<E> implements Sorter<E> {
	// ------------------------------------------------------------------------
	//                                  Threads
	// ------------------------------------------------------------------------

	/** The thread that reads the input channels into buffers and passes them on to the merger. */
	private final ThreadBase<E> readThread;

	/** The thread that merges the buffer handed from the reading thread. */
	private final ThreadBase<E> sortThread;

	/** The thread that handles spilling to secondary storage. */
	private final ThreadBase<E> spillThread;
	
	// ------------------------------------------------------------------------
	//                                   Memory
	// ------------------------------------------------------------------------
	
	/** The memory segments used first for sorting and later for reading/pre-fetching
	 * during the external merge. */
	protected final List<MemorySegment> sortReadMemory;
	
	/** The memory segments used to stage data to be written. */
	protected final List<MemorySegment> writeMemory;
	
	/** The memory manager through which memory is allocated and released. */
	protected final MemoryManager memoryManager;
	
	// ------------------------------------------------------------------------
	//                            Miscellaneous Fields
	// ------------------------------------------------------------------------
	/**
	 * Collection of all currently open channels, to be closed and deleted during cleanup.
	 */
	private final HashSet<FileIOChannel> openChannels;
	
	/**
	 * The monitor which guards the iterator field.
	 */
	protected final Object iteratorLock = new Object();
	
	/**
	 * The iterator to be returned by the sort-merger. This variable is null, while receiving and merging is still in
	 * progress and it will be set once we have &lt; merge factor sorted sub-streams that will then be streamed sorted.
	 */
	protected volatile MutableObjectIterator<E> iterator; 	// 如果大家經常調試,就會發現driver中的input都是這個兄弟。

	private final Collection<InMemorySorter<?>> inMemorySorters;
}

8.2.1 三種線程

ReadingThread:這種線程持續讀取輸入,然後把數據放入到一個待排序的buffer中。The thread that consumes the input data and puts it into a buffer that will be sorted.

SortingThread : 這種線程對於上游填充好的buffer進行排序。The thread that sorts filled buffers.

SpillingThread:這種線程進行歸併操作。The thread that handles the spilling of intermediate results and sets up the merging. It also merges the channels until sufficiently few channels remain to perform the final streamed merge.

8.2.2 MutableObjectIterator

UnilateralSortMerger有一個特殊變量:

protected volatile MutableObjectIterator<E> iterator;

這個變量就是最終sort-merger的輸出。如果大家調試過算子,就會發現這個變量就是具體算子的輸入input類型。最終算子的輸入就是來自於此。

8.3 ReduceDriver

這裡是第三次排序,我們可以看出來reduce是怎麼和groupby一起運作的。

  1. 針對 .groupBy(0),ReduceDriver就是單純獲取輸入的第一個數值 T value = input.next();
  2. 後續代碼中有嵌套的兩個while,分別是 :遍歷各種key,以及某一key中reduce。
  3. 遍歷 group keys的時候,把value賦於比較算子comparator(這個算子概念不是Flink算子,就是為了說明邏輯概念) comparator.setReference(value); 因為groubBy只是指定按照第一個位置比較,沒有指定具體key數值,所以這個value就是key了。此處記為while (1) ,代碼中有註解。
  4. 從輸入中讀取後續的數值value,如果下一個數值是同一個key,就reduce;如果下一個數值不是同一個key,就跳出循環。放棄比較,把reduce結果輸出。此處記為 while (2)
  5. 跳出 while (2) 之後,代碼依然在 while (1) ,此時value是新值,所以繼續在 while (1)中運行 。把value繼續賦於比較算子 comparator.setReference(value);,於是進行新的key比較
public class ReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
	@Override
	public void run() throws Exception {

		final Counter numRecordsIn = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
		final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();

		// cache references on the stack
		final MutableObjectIterator<T> input = this.input;
		final TypeSerializer<T> serializer = this.serializer;
		final TypeComparator<T> comparator = this.comparator;		
		final ReduceFunction<T> function = this.taskContext.getStub();		
		final Collector<T> output = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);

		if (objectReuseEnabled) {
      ......
		} else {
      // 針對 `.groupBy(0)`,ReduceDriver就是單純獲取輸入的第一個數值 `T value = input.next();`
			T value = input.next();

      // while (1)
			// iterate over key groups
			while (this.running && value != null) {
				numRecordsIn.inc();
        // 把value賦於比較算子,這個value就是key了。
				comparator.setReference(value);
				T res = value;

        // while (2)
				// iterate within a key group,循環比較這個key
				while ((value = input.next()) != null) {
					numRecordsIn.inc();
					if (comparator.equalToReference(value)) {
						// same group, reduce,如果下一個數值是同一個key,就reduce
						res = function.reduce(res, value);
					} else {
						// new key group,如果下一個數值不是同一個key,就跳出循環,放棄比較。
						break;
					}
				}
        // 把reduce結果輸出
				output.collect(res);
			}
		}
	}  
}

0x09 參考

mapreduce里的shuffle 里的 sort merge 和combine

實戰錄 | Hadoop Mapreduce shuffle之Combine探討

Hadoop中MapReduce中combine、partition、shuffle的作用是什麼?在程序中怎麼運用?

Flink運行時之生成作業圖

mapreduce過程

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※回頭車貨運收費標準

※產品缺大量曝光嗎?你需要的是一流包裝設計!

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※推薦評價好的iphone維修中心

※教你寫出一流的銷售文案?

台中搬家公司教你幾個打包小技巧,輕鬆整理裝箱!

台中搬家遵守搬運三大原則,讓您的家具不再被破壞!