BloomFilter在Hudi中的應用

9{icon} {views}

Bloom Filter在Hudi中的應用

介紹

Bloom Filter可以用於檢索一個元素是否在一個集合中。它的優點是空間效率和查詢時間都遠遠超過一般的算法,主要缺點是存在一定的誤判率:當其判斷元素存在時,實際上元素可能並不存在。而當判定不存在時,則元素一定不存在,Bloom Filter在對精確度要求不太嚴格的大數據量場景下運用十分廣泛。

引入

為何要引入Bloom Filter?這是Hudi為加快數據upsert採用的一種解決方案,即判斷record是否已經在文件中存在,若存在,則更新,若不存在,則插入。對於upsert顯然無法容忍出現誤判,否則可能會出現應該插入和變成了更新的錯誤,那麼Hudi是如何解決誤判問題的呢?一種簡單辦法是當Bloom Filter判斷該元素存在時,再去文件里二次確認該元素是否真的存在;而當Bloom Filter判斷該元素不存在時,則無需讀文件,通過二次確認的方法來規避Bloom Filter的誤判問題,實際上這也是Hudi採取的方案,值得一提的是,現在Delta暫時還不支持Bloom Filter,其判斷一條記錄是否存在是直接通過一次全表join來實現,效率比較低下。接下來我們來分析Bloom Filter在Hudi中的應用。

流程

Hudi從上游系統(Kafka、DFS等)消費一批數據后,會根據用戶配置的寫入模式(insert、upsert、bulkinsert)寫入Hudi數據集。而當配置為upsert時,意味着需要將數據插入更新至Hudi數據集,而第一步是需要標記哪些記錄已經存在,哪些記錄不存在,然後,對於存在的記錄進行更新,不存在記錄進行插入。

HoodieWriteClient中提供了對應三種寫入模式的方法(#insert、#upsert、#bulkinsert),對於使用了Bloom Filter的#upsert方法而言,其核心源代碼如下

public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, final String commitTime) {
    ...
    // perform index loop up to get existing location of records
    JavaRDD<HoodieRecord<T>> taggedRecords = index.tagLocation(dedupedRecords, jsc, table);
    ...
    return upsertRecordsInternal(taggedRecords, commitTime, table, true);
}

可以看到首先利用索引給記錄打標籤,然後再進行更新,下面主要分析打標籤的過程。

對於索引,Hudi提供了四種索引方式的實現:HBaseIndexHoodieBloomIndexHoodieGlobalBloomIndexInMemoryHashIndex,默認使用HoodieBloomIndex。其中HoodieGlobalBloomIndex與HoodieBloomIndex的區別是前者會讀取所有分區文件,而後者只讀取記錄所存在的分區下的文件。下面以HoodieBloomIndex為例進行分析。

HoodieBloomIndex#tagLocation核心代碼如下

public JavaRDD<HoodieRecord<T>> tagLocation(JavaRDD<HoodieRecord<T>> recordRDD, JavaSparkContext jsc,
      HoodieTable<T> hoodieTable) {

    // Step 0: cache the input record RDD
    if (config.getBloomIndexUseCaching()) {
      recordRDD.persist(config.getBloomIndexInputStorageLevel());
    }

    // Step 1: Extract out thinner JavaPairRDD of (partitionPath, recordKey)
    JavaPairRDD<String, String> partitionRecordKeyPairRDD =
        recordRDD.mapToPair(record -> new Tuple2<>(record.getPartitionPath(), record.getRecordKey()));

    // Lookup indexes for all the partition/recordkey pair
    JavaPairRDD<HoodieKey, HoodieRecordLocation> keyFilenamePairRDD =
        lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable);

    // Cache the result, for subsequent stages.
    if (config.getBloomIndexUseCaching()) {
      keyFilenamePairRDD.persist(StorageLevel.MEMORY_AND_DISK_SER());
    }

    // Step 4: Tag the incoming records, as inserts or updates, by joining with existing record keys
    // Cost: 4 sec.
    JavaRDD<HoodieRecord<T>> taggedRecordRDD = tagLocationBacktoRecords(keyFilenamePairRDD, recordRDD);

    if (config.getBloomIndexUseCaching()) {
      recordRDD.unpersist(); // unpersist the input Record RDD
      keyFilenamePairRDD.unpersist();
    }

    return taggedRecordRDD;
  }

該過程會緩存記錄以便優化數據的加載。首先從記錄中解析出對應的分區路徑 -> key,接着查看索引,然後將位置信息(存在於哪個文件)回推到記錄中。

HoodieBloomIndex#lookup核心代碼如下

private JavaPairRDD<HoodieKey, HoodieRecordLocation> lookupIndex(
      JavaPairRDD<String, String> partitionRecordKeyPairRDD, final JavaSparkContext jsc,
      final HoodieTable hoodieTable) {
    // Obtain records per partition, in the incoming records
    Map<String, Long> recordsPerPartition = partitionRecordKeyPairRDD.countByKey();
    List<String> affectedPartitionPathList = new ArrayList<>(recordsPerPartition.keySet());

    // Step 2: Load all involved files as <Partition, filename> pairs
    List<Tuple2<String, BloomIndexFileInfo>> fileInfoList =
        loadInvolvedFiles(affectedPartitionPathList, jsc, hoodieTable);
    final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo =
        fileInfoList.stream().collect(groupingBy(Tuple2::_1, mapping(Tuple2::_2, toList())));

    // Step 3: Obtain a RDD, for each incoming record, that already exists, with the file id,
    // that contains it.
    Map<String, Long> comparisonsPerFileGroup =
        computeComparisonsPerFileGroup(recordsPerPartition, partitionToFileInfo, partitionRecordKeyPairRDD);
    int safeParallelism = computeSafeParallelism(recordsPerPartition, comparisonsPerFileGroup);
    int joinParallelism = determineParallelism(partitionRecordKeyPairRDD.partitions().size(), safeParallelism);
    return findMatchingFilesForRecordKeys(partitionToFileInfo, partitionRecordKeyPairRDD, joinParallelism, hoodieTable,
        comparisonsPerFileGroup);
  }

該方法首先會計算出每個分區有多少條記錄和影響的分區有哪些,然後加載影響的分區的文件,最後計算并行度后,開始找記錄真正存在的文件。

對於#loadInvolvedFiles方法而言,其會查詢指定分區分區下所有的數據文件(parquet格式),並且如果開啟了hoodie.bloom.index.prune.by.ranges,還會讀取文件中的最小key和最大key(為加速後續的查找)。

HoodieBloomIndex#findMatchingFilesForRecordKeys核心代碼如下

JavaPairRDD<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
      final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
      JavaPairRDD<String, String> partitionRecordKeyPairRDD, int shuffleParallelism, HoodieTable hoodieTable,
      Map<String, Long> fileGroupToComparisons) {
    JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
        explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, partitionRecordKeyPairRDD);

    if (config.useBloomIndexBucketizedChecking()) {
      Partitioner partitioner = new BucketizedBloomCheckPartitioner(shuffleParallelism, fileGroupToComparisons,
          config.getBloomIndexKeysPerBucket());

      fileComparisonsRDD = fileComparisonsRDD.mapToPair(t -> new Tuple2<>(Pair.of(t._1, t._2.getRecordKey()), t))
          .repartitionAndSortWithinPartitions(partitioner).map(Tuple2::_2);
    } else {
      fileComparisonsRDD = fileComparisonsRDD.sortBy(Tuple2::_1, true, shuffleParallelism);
    }

    return fileComparisonsRDD.mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(hoodieTable, config), true)
        .flatMap(List::iterator).filter(lr -> lr.getMatchingRecordKeys().size() > 0)
        .flatMapToPair(lookupResult -> lookupResult.getMatchingRecordKeys().stream()
            .map(recordKey -> new Tuple2<>(new HoodieKey(recordKey, lookupResult.getPartitionPath()),
                new HoodieRecordLocation(lookupResult.getBaseInstantTime(), lookupResult.getFileId())))
            .collect(Collectors.toList()).iterator());
  }

該方法首先會查找記錄需要進行比對的文件,然後再查詢的記錄的位置信息。

其中,對於#explodeRecordRDDWithFileComparisons方法而言,其會藉助樹/鏈表結構構造的文件過濾器來加速記錄對應文件的查找(每個record可能會對應多個文件)。

而使用Bloom Filter的核心邏輯承載在HoodieBloomIndexCheckFunction,HoodieBloomIndexCheckFunction$LazyKeyCheckIterator該迭代器完成了記錄對應文件的實際查找過程,查詢的核心邏輯在computeNext`中,其核心代碼如下

protected List<HoodieKeyLookupHandle.KeyLookupResult> computeNext() {

      List<HoodieKeyLookupHandle.KeyLookupResult> ret = new ArrayList<>();
      try {
        // process one file in each go.
        while (inputItr.hasNext()) {
          Tuple2<String, HoodieKey> currentTuple = inputItr.next();
          String fileId = currentTuple._1;
          String partitionPath = currentTuple._2.getPartitionPath();
          String recordKey = currentTuple._2.getRecordKey();
          Pair<String, String> partitionPathFilePair = Pair.of(partitionPath, fileId);

          // lazily init state
          if (keyLookupHandle == null) {
            keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair);
          }

          // if continue on current file
          if (keyLookupHandle.getPartitionPathFilePair().equals(partitionPathFilePair)) {
            keyLookupHandle.addKey(recordKey);
          } else {
            // do the actual checking of file & break out
            ret.add(keyLookupHandle.getLookupResult());
            keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, partitionPathFilePair);
            keyLookupHandle.addKey(recordKey);
            break;
          }
        }

        // handle case, where we ran out of input, close pending work, update return val
        if (!inputItr.hasNext()) {
          ret.add(keyLookupHandle.getLookupResult());
        }
      } catch (Throwable e) {
        if (e instanceof HoodieException) {
          throw e;
        }
        throw new HoodieIndexException("Error checking bloom filter index. ", e);
      }

      return ret;
    }

該方法每次迭代只會處理一個文件,每次處理時都會生成HoodieKeyLookupHandle,然後會添加recordKey,處理完后再獲取查詢結果。

其中HoodieKeyLookupHandle#addKey方法核心代碼如下

public void addKey(String recordKey) {
    // check record key against bloom filter of current file & add to possible keys if needed
    if (bloomFilter.mightContain(recordKey)) {
      ...
      candidateRecordKeys.add(recordKey);
    }
    totalKeysChecked++;
  }

可以看到,這裏使用到了Bloom Filter來判斷該記錄是否存在,如果存在,則加入到候選隊列中,等待進一步判斷;若不存在,則無需額外處理,其中Bloom Filter會在創建HoodieKeyLookupHandle實例時初始化(從指定文件中讀取Bloom Filter)。

HoodieKeyLookupHandle#getLookupResult方法核心代碼如下

public KeyLookupResult getLookupResult() {
    ...
    HoodieDataFile dataFile = getLatestDataFile();
    List<String> matchingKeys =
        checkCandidatesAgainstFile(hoodieTable.getHadoopConf(), candidateRecordKeys, new Path(dataFile.getPath()));
    ...
    return new KeyLookupResult(partitionPathFilePair.getRight(), partitionPathFilePair.getLeft(),
        dataFile.getCommitTime(), matchingKeys);
  }

該方法首先獲取指定分區下的最新數據文件,然後判斷數據文件存在哪些recordKey,並將其封裝進KeyLookupResult后返回。其中#checkCandidatesAgainstFile會讀取文件中所有的recordKey,判斷是否存在於candidateRecordKeys,這便完成了進一步確認。

到這裏即完成了record存在於哪些文件的所有查找,查找完後會進行進一步處理,後續再給出分析。

總結

Hudi引入Bloom Filter是為了加速upsert過程,並將其存入parquet數據文件中的Footer中,在讀取文件時會從Footer中讀取該BloomFilter。在利用Bloom Filter來判斷記錄是否存在時,會採用二次確認的方式規避Bloom Filter的誤判問題。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!