[源碼解析]為什麼mapPartition比map更高效_貨運

2{icon} {views}

※評比南投搬家公司費用收費行情懶人包大公開

搬家價格與搬家費用透明合理,不亂收費。本公司提供下列三種搬家計費方案,由資深專業組長到府估價,替客戶量身規劃選擇最經濟節省的計費方式

[源碼解析]為什麼mapPartition比map更高效

目錄

  • [源碼解析]為什麼mapPartition比map更高效
    • 0x00 摘要
    • 0x01 map vs mapPartition
      • 1.1 map
      • 1.2 mapPartition
      • 1.3 異同
    • 0x02 代碼
    • 0x03 Flink的傳輸機制
      • 3.1 傳輸機制概述
      • 3.2 遠程通信
      • 3.3 TaskManager進程內傳輸
      • 3.4 源碼分析
    • 0x04 runtime
      • 4.1 Driver
      • 4.2 MapDriver
      • 4.3 MapPartitionDriver
      • 4.4 效率區別
    • 0x05 優化和ChainedMapDriver
    • 0x06 總結
    • 0x07 參考

0x00 摘要

自從函數式編程和響應式編程逐漸進入到程序員的生活之後,map函數作為其中一個重要算子也為大家所熟知,無論是前端web開發,手機開發還是後端服務器開發,都很難逃過它的手心。而在大數據領域中又往往可以見到另外一個算子mapPartition的身影。在性能調優中,經常會被建議盡量用 mappartition 操作去替代 map 操作。本文將從Flink源碼和示例入手,為大家解析為什麼mapPartition比map更高效。

0x01 map vs mapPartition

1.1 map

Map的作用是將數據流上每個元素轉換為另外的元素,比如data.map { x => x.toInt }。它把數組流中的每一個值,使用所提供的函數執行一遍,一一對應。得到與元素個數相同的數組流。然後返回這個新數據流。

1.2 mapPartition

MapPartition的作用是單個函數調用并行分區,比如data.mapPartition { in => in map { (_, 1) } }。該函數將分區作為“迭代器”,可以產生任意數量的結果。每個分區中的元素數量取決於并行度和以前的operations。

1.3 異同

其實,兩者完成的業務操作是一樣的,本質上都是將數據流上每個元素轉換為另外的元素。

區別主要在兩點。

從邏輯實現來講

  • map邏輯實現簡單,就是在函數中簡單一一轉換,map函數的輸入和輸入都是單個元素。
  • mapPartition相對複雜,函數的輸入有兩個,一般格式為 void mapPartition(Iterable<T> values, Collector<O> out) 。其中values是需要映射轉換的所有記錄,out是用來發送結果的collector。具體返回什麼,如何操作out來返回結果,則完全依賴於業務邏輯。

從調用次數來說

  • 數據有多少個元素,map就會被調用多少次。
  • 數據有多少分區,mapPartition就會被調用多少次。

為什麼MapPartition有這麼高效呢,下面我們將具體論證。

0x02 代碼

首先我們給出示例代碼,從下文中我們可以看出,map就是簡單的轉換,而mapPartition則不但要做轉換,程序員還需要手動操作如何返回結果:

public class IteratePi {

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
        //迭代次數
        int iterativeNum=10;
        DataSet<Integer> wordList = env.fromElements(1, 2, 3);
      
        IterativeDataSet<Integer> iterativeDataSet=wordList.iterate(iterativeNum);
        DataSet<Integer> mapResult=iterativeDataSet
          			.map(new MapFunction<Integer, Integer>() {
            @Override
            public Integer map(Integer value) throws Exception {
                value += 1;
                return value;
            }
        });
        //迭代結束的條件
        DataSet<Integer> result=iterativeDataSet.closeWith(mapResult);
        result.print();

        MapPartitionOperator<Integer, Integer> mapPartitionResult = iterativeDataSet
                .mapPartition(new MapPartitionFunction<Integer, Integer>() {
            @Override
            public void mapPartition(Iterable<Integer> values, Collector<Integer> out) {
                for (Integer value : values) {
                    // 這裏需要程序員自行決定如何返回,即調用collect操作。
                    out.collect(value + 2);
                }
            }                                                                                                                           					}
        );
        //迭代結束的條件
        DataSet<Integer> partitionResult=iterativeDataSet.closeWith(mapPartitionResult);
        partitionResult.print();
    }
}

0x03 Flink的傳輸機制

世界上很少有沒有來由的愛,也少見免費的午餐。mapPartition之所以高效,其所依賴的基礎就是Flink的傳輸機制。所以我們下面就講解下為什麼。

大家都知道,Spark是用微批處理來模擬流處理,就是說,spark還是一批一批的傳輸和處理數據,所以我們就能理解mapPartition的機制就是基於這一批數據做統一處理。這樣確實可以高效。

但是Flink號稱是純流,即Flink是每來一個輸入record,就進行一次業務處理,然後返回給下游算子。

有的兄弟就會產生疑問:每次都只是處理單個記錄,怎麼能夠讓mapPartition做到批次處理呢。其實這就是Flink的微妙之處:即Flink確實是每次都處理一個輸入record,但是在上下游傳輸時候,Flink還是把records累積起來做批量傳輸的。也可以這麼理解:從傳輸的角度講,Flink是微批處理的

3.1 傳輸機制概述

Flink 的網絡棧是組成 flink-runtime 模塊的核心組件之一,也是 Flink 作業的核心部分。所有來自 TaskManager 的工作單元(子任務)都通過它來互相連接。流式傳輸數據流都要經過網絡棧,所以它對 Flink 作業的性能表現(包括吞吐量和延遲指標)至關重要。與通過 Akka 使用 RPC 的 TaskManager 和 JobManager 之間的協調通道相比,TaskManager 之間的網絡棧依賴的是更底層的,基於 Netty 的 API。

3.2 遠程通信

一個運行的application的tasks在持續交換數據。TaskManager負責做數據傳輸。不同任務之間的每個(遠程)網絡連接將在 Flink 的網絡棧中獲得自己的 TCP 通道。但是如果同一任務的不同子任務被安排到了同一個 TaskManager,則它們與同一個 TaskManager 的網絡連接將被多路復用,並共享一個 TCP 信道以減少資源佔用。

每個TaskManager有一組網絡緩衝池(默認每個buffer是32KB),用於發送與接受數據。如發送端和接收端位於不同的TaskManager進程中,則它們需要通過操作系統的網絡棧進行交流。流應用需要以管道的模式進行數據交換,也就是說,每對TaskManager會維持一個永久的TCP連接用於做數據交換。在shuffle連接模式下(多個sender與多個receiver),每個sender task需要向每個receiver task發送數據,此時TaskManager需要為每個receiver task都分配一個緩衝區。

一個記錄被創建並傳遞之後(例如通過 Collector.collect()),它會被遞交到RecordWriter,其將來自 Java 對象的記錄序列化為一個字節序列,後者最終成為網絡緩存。RecordWriter 首先使用SpanningRecordSerializer將記錄序列化為一個靈活的堆上字節數組。然後它嘗試將這些字節寫入目標網絡通道的關聯網絡緩存。

因為如果逐個發送會降低每個記錄的開銷並帶來更高的吞吐量,所以為了取得高吞吐量,TaskManager的網絡組件首先從緩衝buffer中收集records,然後再發送。也就是說,records並不是一個接一個的發送,而是先放入緩衝,然後再以batch的形式發送。這個技術可以高效使用網絡資源,並達到高吞吐。類似於網絡或磁盤 I/O 協議中使用的緩衝技術。

接收方網絡棧(netty)將接收到的緩存寫入適當的輸入通道。最後(流式)任務的線程從這些隊列中讀取並嘗試在RecordReader的幫助下,通過Deserializer將積累的數據反序列化為 Java 對象。

3.3 TaskManager進程內傳輸

若sender與receiver任務都運行在同一個TaskManager進程,則sender任務會將發送的條目做序列化,並存入一個字節緩衝。然後將緩衝放入一個隊列,直到隊列被填滿。

Receiver任務從隊列中獲取緩衝,並反序列化輸入的條目。所以,在同一個TaskManager內,任務之間的數據傳輸並不經過網絡交互。

在同一個TaskManager進程內,也是批量傳輸

3.4 源碼分析

我們基於Flink優化的結果進行分析驗證,看看Flink是不是把記錄寫入到buffer中,這種情況下運行的是CountingCollector和ChainedMapDriver。

copyFromSerializerToTargetChannel:153, RecordWriter (org.apache.flink.runtime.io.network.api.writer)
emit:116, RecordWriter (org.apache.flink.runtime.io.network.api.writer)
emit:60, ChannelSelectorRecordWriter (org.apache.flink.runtime.io.network.api.writer)
collect:65, OutputCollector (org.apache.flink.runtime.operators.shipping)
collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics)
collect:79, ChainedMapDriver (org.apache.flink.runtime.operators.chaining)
collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics)
invoke:196, DataSourceTask (org.apache.flink.runtime.operators)
doRun:707, Task (org.apache.flink.runtime.taskmanager)
run:532, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)

當執行完用戶定義的map函數之後,系統運行在 ChainedMapDriver.collect 函數。

※智慧手機時代的來臨,RWD網頁設計為架站首選

網動結合了許多網際網路業界的菁英共同研發簡單易操作的架站工具,及時性的更新,為客戶創造出更多的網路商機。

public void collect(IT record) {
    this.outputCollector.collect(this.mapper.map(record));// mapper就是用戶代碼
}

然後調用到了CountingCollector.collect

public void collect(OUT record) {
		this.collector.collect(record);// record就是用戶轉換后的記錄
}

OutputCollector.collect函數會把記錄發送給所有的writers。

this.delegate.setInstance(record);// 先把record設置到SerializationDelegate中
for (RecordWriter<SerializationDelegate<T>> writer : writers) {  // 所有的writer
   writer.emit(this.delegate); // 發送record
}

RecordWriter負責把數據序列化,然後寫入到緩存中。它有兩個實現類:

  • BroadcastRecordWriter: 維護了多個下游channel,發送數據到下游所有的channel中。
  • ChannelSelectorRecordWriter: 通過channelSelector對象判斷數據需要發往下游的哪個channel。我們用的正是這個RecordWriter

這裏我們分析下ChannelSelectorRecordWriteremit方法:

public void emit(T record) throws IOException, InterruptedException {
   emit(record, channelSelector.selectChannel(record));
}

這裏使用了channelSelector.selectChannel方法。該方法為record尋找到對應下游channel id。

public class OutputEmitter<T> implements ChannelSelector<SerializationDelegate<T>> {
	public final int selectChannel(SerializationDelegate<T> record) {
		switch (strategy) {
		case FORWARD:
			return forward(); // 我們代碼用到了這種情況。這裏 return 0;
    ......
		}
	}
}

接下來我們又回到了父類RecordWriter.emit

protected void emit(T record, int targetChannel) throws IOException, InterruptedException {
   serializer.serializeRecord(record);
   // Make sure we don't hold onto the large intermediate serialization buffer for too long
   if (copyFromSerializerToTargetChannel(targetChannel)) {
      serializer.prune();
   }
}

關鍵的邏輯在於copyFromSerializerToTargetChannel此方法從序列化器中複製數據到目標channel,我們可以看出來,每條記錄都是寫入到buffer中

protected boolean copyFromSerializerToTargetChannel(int targetChannel) throws IOException, InterruptedException {
   // We should reset the initial position of the intermediate serialization buffer before
   // copying, so the serialization results can be copied to multiple target buffers.
   // 此處Serializer為SpanningRecordSerializer
   // reset方法將serializer內部的databuffer position重置為0
   serializer.reset();

   boolean pruneTriggered = false;
    // 獲取目標channel的bufferBuilder
    // bufferBuilder內維護了MemorySegment,即內存片段
    // Flink的內存管理依賴MemorySegment,可實現堆內堆外內存的管理
    // RecordWriter內有一個bufferBuilder數組,長度和下游channel數目相同
    // 該數組以channel ID為下標,存儲和channel對應的bufferBuilder
    // 如果對應channel的bufferBuilder尚未創建,調用requestNewBufferBuilder申請一個新的bufferBuilder  
   BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
    // 複製serializer的數據到bufferBuilder中
   SerializationResult result = serializer.copyToBufferBuilder(bufferBuilder);
    // 循環直到result完全被寫入到buffer
    // 一條數據可能會被寫入到多個緩存中
    // 如果緩存不夠用,會申請新的緩存
    // 數據完全寫入完畢之時,當前正在操作的緩存是沒有寫滿的
    // 因此返回true,表明需要壓縮該buffer的空間  
   while (result.isFullBuffer()) {
      finishBufferBuilder(bufferBuilder);

      // If this was a full record, we are done. Not breaking out of the loop at this point
      // will lead to another buffer request before breaking out (that would not be a
      // problem per se, but it can lead to stalls in the pipeline).
      if (result.isFullRecord()) {
         pruneTriggered = true;
         emptyCurrentBufferBuilder(targetChannel);
         break;
      }

      bufferBuilder = requestNewBufferBuilder(targetChannel);
      result = serializer.copyToBufferBuilder(bufferBuilder);
   }
   checkState(!serializer.hasSerializedData(), "All data should be written at once");

   // 如果buffer超時時間為0,需要flush目標channel的數據
   if (flushAlways) {
      flushTargetPartition(targetChannel);
   }
   return pruneTriggered;
}

0x04 runtime

4.1 Driver

Driver是Flink runtime的一個重要概念,是在一個task中運行的用戶業務邏輯組件,具體實現了批量操作代碼。其內部API包括初始化,清除,運行,取消等邏輯。

public interface Driver<S extends Function, OT> {
   ......
   void setup(TaskContext<S, OT> context);
   void run() throws Exception;
   void cleanup() throws Exception;
   void cancel() throws Exception;
}

具體在 org.apache.flink.runtime.operators 目錄下,我們能夠看到各種Driver的實現,基本的算子都有自己的Driver。

......
CoGroupDriver.java
FlatMapDriver.java
FullOuterJoinDriver.java
GroupReduceCombineDriver.java
GroupReduceDriver.java
JoinDriver.java
LeftOuterJoinDriver.java
MapDriver.java
MapPartitionDriver.java
......

4.2 MapDriver

map算子對應的就是MapDriver。

結合上節我們知道,上游數據是通過batch方式批量傳入的。所以,在run函數會遍歷輸入,每次取出一個record,然後調用用戶自定義函數function.map對這個record做map操作。

public class MapDriver<IT, OT> implements Driver<MapFunction<IT, OT>, OT> {

   @Override
   public void run() throws Exception {
      final MutableObjectIterator<IT> input = this.taskContext.getInput(0);
      .....
      else {
         IT record = null;
        
         // runtime主動進行循環,這樣導致大量函數調用
         while (this.running && ((record = input.next()) != null)) {
            numRecordsIn.inc();
            output.collect(function.map(record)); // function是用戶函數
         }
      }
   }
}

4.3 MapPartitionDriver

MapPartitionDriver是mapPartition的具體組件。系統會把得到的批量數據inIter一次性的都傳給用戶自定義函數,由用戶代碼來進行遍歷操作

public class MapPartitionDriver<IT, OT> implements Driver<MapPartitionFunction<IT, OT>, OT> {
   @Override
   public void run() throws Exception {
     
		final MutableObjectIterator<IT> input = new CountingMutableObjectIterator<>(this.taskContext.<IT>getInput(0), numRecordsIn);     
      ......
      } else {
         final NonReusingMutableToRegularIteratorWrapper<IT> inIter = new NonReusingMutableToRegularIteratorWrapper<IT>(input, this.taskContext.<IT>getInputSerializer(0).getSerializer());

         // runtime不參与循環,這樣可以減少函數調用
         function.mapPartition(inIter, output);
      }
   }
}

4.4 效率區別

我們能夠看到map和mapPartition的input都是MutableObjectIterator input類型, 說明兩者的輸入一致。只不過map是在Driver代碼中進行循環,mapPartition在用戶代碼中進行循環。具體mapPartition的 效率提高體現在如下方面 :

  1. 假設一共有60個數據需要轉換,map會在runtime中調用用戶函數60次。
  2. runtime把數據分成6個partition操作,則mapPartition在runtime中會調用用戶函數6次,在每個用戶函數中分別循環10次。對於runtime來說,map操作會多出54次用戶函數調用。
  3. 如果用戶業務中需要頻繁創建額外的對象或者外部資源操作,mapPartition的優勢更可以體現。 例如將數據寫入Mysql, 那麼map需要為每個元素創建一個數據庫連接,而mapPartition為每個partition創建一個鏈接。

假設有上億個數據需要map,這資源佔用和運行速度效率差別會相當大。

0x05 優化和ChainedMapDriver

之前提到了優化,這裏我們再詳細深入下如何優化map算子。

Flink有一個關鍵的優化技術稱為任務鏈,用於(在某些情況下)減少本地通信的過載。為了滿足任務鏈的條件,至少兩個以上的operator必須配置為同一併行度,並且使用本地向前的(local forwad)方式連接。任務鏈可以被認為是一種管道。

當管道以任務鏈的方式執行時候,Operators的函數被融合成單個任務,並由一個單獨的線程執行。一個function產生的records,通過使用一個簡單的方法調用,被遞交給下一個function。所以這裡在方法之間的records傳遞中,基本沒有序列化以及通信消耗

針對優化后的Operator Chain,runtime對應的Driver則是ChainedMapDriver。這是通過 MAP(MapDriver.class, ChainedMapDriver.class, PIPELINED, 0), 映射得到的。

我們可以看到,因為是任務鏈,所以每個record是直接在管道中流淌 ,ChainedMapDriver連循環都省略了,直接map轉換后丟給下游去也

public class ChainedMapDriver<IT, OT> extends ChainedDriver<IT, OT> {

   private MapFunction<IT, OT> mapper; // 用戶函數

   @Override
   public void collect(IT record) {
      try {
         this.numRecordsIn.inc();
         this.outputCollector.collect(this.mapper.map(record));
      } catch (Exception ex) {
         throw new ExceptionInChainedStubException(this.taskName, ex);
      }
   }
}

// 這時的調用棧如下
map:23, UserFunc$1 (com.alibaba.alink)
collect:79, ChainedMapDriver (org.apache.flink.runtime.operators.chaining)
collect:35, CountingCollector (org.apache.flink.runtime.operators.util.metrics)
invoke:196, DataSourceTask (org.apache.flink.runtime.operators)
doRun:707, Task (org.apache.flink.runtime.taskmanager)
run:532, Task (org.apache.flink.runtime.taskmanager)
run:748, Thread (java.lang)

0x06 總結

map和mapPartition實現的基礎是Flink的數據傳輸機制 :Flink確實是每次都處理一個輸入record,但是在上下游之間傳輸時候,Flink還是把records累積起來做批量傳輸。即可以認為從數據傳輸模型角度講,Flink是微批次的。

對於數據流轉換,因為是批量傳輸,所以對於積累的records,map是在runtime Driver代碼中進行循環,mapPartition在用戶代碼中進行循環。

map的函數調用次數要遠高於mapPartition。如果在用戶函數中涉及到頻繁創建額外的對象或者外部資源操作,則mapPartition性能遠遠高出。

如果沒有connection之類的操作,則通常性能差別並不大,通常不會成為瓶頸,也沒有想象的那麼嚴重。

0x07 參考

深入了解 Flink 網絡棧 ——A Deep-Dive into Flink’s Network Stack

Flink架構(二)- Flink中的數據傳輸

Flink 源碼之節點間通信

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

※回頭車貨運收費標準

宇安交通關係企業,自成立迄今,即秉持著「以誠待人」、「以實處事」的企業信念