調優 | Apache Hudi應用調優指南

2{icon} {views}

通過Spark作業將數據寫入Hudi時,Spark應用的調優技巧也適用於此。如果要提高性能或可靠性,請牢記以下幾點。

輸入并行性:Hudi對輸入進行分區默認併發度為1500,以確保每個Spark分區都在2GB的限制內(在Spark2.4.0版本之後去除了該限制),如果有更大的輸入,則相應地進行調整。我們建議設置shuffle的併發度,配置項為hoodie.[insert|upsert|bulkinsert].shuffle.parallelism,以使其至少達到input_data_size/500MB。

Off-heap(堆外)內存:Hudi寫入parquet文件,需要使用一定的堆外內存,如果遇到此類故障,請考慮設置類似spark.yarn.executor.memoryOverheadspark.yarn.driver.memoryOverhead的值。

Spark 內存:通常Hudi需要能夠將單個文件讀入內存以執行合併或壓縮操作,因此執行程序的內存應足以容納此文件。另外,Hudi會緩存輸入數據以便能夠智能地放置數據,因此預留一些spark.memory.storageFraction通常有助於提高性能。

調整文件大小:設置limitFileSize以平衡接收/寫入延遲與文件數量,並平衡與文件數據相關的元數據開銷。

時間序列/日誌數據:對於單條記錄較大的數據庫/ nosql變更日誌,可調整默認配置。另一類非常流行的數據是時間序列/事件/日誌數據,它往往更加龐大,每個分區的記錄更多。在這種情況下,請考慮通過.bloomFilterFPP()/bloomFilterNumEntries()來調整Bloom過濾器的精度,以加速目標索引查找時間,另外可考慮一個以事件時間為前綴的鍵,這將使用範圍修剪並顯着加快索引查找的速度。

GC調優:請確保遵循Spark調優指南中的垃圾收集調優技巧,以避免OutOfMemory錯誤。[必須]使用G1 / CMS收集器,其中添加到spark.executor.extraJavaOptions的示例如下:

-XX:NewSize=1g -XX:SurvivorRatio=2 -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintTenuringDistribution -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof

OutOfMemory錯誤:如果出現OOM錯誤,則可嘗試通過如下配置處理:spark.memory.fraction = 0.2,spark.memory.storageFraction = 0.2允許其溢出而不是OOM(速度變慢與間歇性崩潰相比)。

以下是完整的生產配置

spark.driver.extraClassPath /etc/hive/conf
spark.driver.extraJavaOptions -XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof
spark.driver.maxResultSize 2g
spark.driver.memory 4g
spark.executor.cores 1
spark.executor.extraJavaOptions -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof
spark.executor.id driver
spark.executor.instances 300
spark.executor.memory 6g
spark.rdd.compress true
 
spark.kryoserializer.buffer.max 512m
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.shuffle.service.enabled true
spark.sql.hive.convertMetastoreParquet false
spark.submit.deployMode cluster
spark.task.cpus 1
spark.task.maxFailures 4
 
spark.yarn.driver.memoryOverhead 1024
spark.yarn.executor.memoryOverhead 3072
spark.yarn.max.executor.failures 100

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

※教你寫出一流的銷售文案?

※超省錢租車方案

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※產品缺大量曝光嗎?你需要的是一流包裝設計!

※回頭車貨運收費標準

HashMap解析(主要JDK1.8,附帶1.7出現的問題以及區別)

2{icon} {views}

按問題的形式來吧,這些大多是我自己總結的,如有錯誤請及時指正謝謝

1.你了解HashMap么,可以說說么?

  首先,HashMap是一種數據結構,可以快速的幫我們存取數據。它的底層數據結構在1.7和1.8有了一些變化,1.7版本及以前他是數組+鏈表的形式,1.8及以後數組+鏈表+紅黑樹,如果鏈表長度大於等於8就會轉化為紅黑樹,如果長度降至6紅黑樹會轉化為鏈表紅黑樹的出現解決了因為鏈表過長導致查詢速度變慢的問題,因為鏈表的查詢時間複雜度是O(n),而紅黑樹的查詢時間複雜度是O(logn)。

2.它的數組+鏈表是怎麼實現的?

  

 

 

 這個代碼是1.8的(1.7是Entry,就是名字不一樣),其實我們每一個放進去的(key,value)到最後都會封裝成這樣的Node對象。Hashmap的數組就是以一系列這樣的Node對象構成的數組,鏈表就是把next指向下一個Node對象。

 

 

3.為什麼要有鏈表,紅黑樹?只有數組不可以么?

首先我們要知道什麼是Hash算法。

這裏放出一段官方的話:

 

Hash,一般翻譯做散列、雜湊,或音譯為哈希,是把任意長度的輸入(又叫做預映射pre-image)通過散列算法變換成固定長度的輸出,該輸出就是散列值。這種轉換是一種壓縮映射,也就是,散列值的空間通常遠小於輸入的空間,不同的輸入可能會散列成相同的輸出,所以不可能從散列值來確定唯一的輸入值。簡單的說就是一種將任意長度的消息壓縮到某一固定長度的消息摘要的函數。

 

簡單點來說:就是把一個大数字經過運算變為固定範圍的輸出,最簡單的算法就是對你的數組長度取模。

但是這樣就會出現一個問題,你這麼算難免會出現算出來的数字是一樣的:

比如數組長度為16,我們要放入数字1和17,那麼他們經過對數組長度取模后位置是一樣的,這樣就產生了Hash衝突。我們就可以在數組下拉出一個鏈表去存儲這個数字

4.知道哪些常見的解決hash衝突算法么?

1、開放定址法(就是往下找空餘地方)
     用開放定址法解決衝突的做法是:當衝突發生時,使用某種探查(亦稱探測)技術在散列表中形成一個探查(測)序列。沿此序列逐個單元地查找,直到找到給定 的關鍵字,或者碰到一個開放的地址(即該地址單元為空)為止(若要插入,在探查到開放的地址,則可將待插入的新結點存人該地址單元)。查找時探查到開放的 地址則表明表中無待查的關鍵字,即查找失敗。

2、 再哈希法(再進行hash直到無衝突)
再哈希法又叫雙哈希法,有多個不同的Hash函數,當發生衝突時,使用第二個,第三個,….,等哈希函數
計算地址,直到無衝突。雖然不易發生聚集,但是增加了計算時間。

3、拉鏈法(hashmap用的)

鏈地址法的基本思想是:每個哈希表節點都有一個next指針,多個哈希表節點可以用next指針構成一個單向鏈表,被分配到同一個索引上的多個結點用單向鏈表連接起來

4、建立公共溢出區: 
這種方法的基本思想是:將哈希表分為基本表和溢出表兩部分,凡是和基本表發生衝突的元素,一律填入溢出表

5.為什麼閾值就是8和6呢?中間的7是有什麼作用的呢?直接就是紅黑樹不可以么?

HashMap中有這樣一段註釋(主要看数字):

/* * Because TreeNodes are about twice the size of regular nodes, we * use them only when 鏈表s contain enough nodes to warrant use * (see TREEIFY_THRESHOLD). And when they become too small (due to * removal or resizing) they are converted back to plain 鏈表s. In * usages with well-distributed user hashCodes, tree 鏈表s are * rarely used. Ideally, under random hashCodes, the frequency of * nodes in 鏈表s follows a Poisson distribution * (http://en.wikipedia.org/wiki/Poisson_distribution) with a * parameter of about 0.5 on average for the default resizing * threshold of 0.75, although with a large variance because of * resizing granularity. Ignoring variance, the expected * occurrences of list size k are (exp(-0.5) * pow(0.5, k) / * factorial(k)). The first values are: * * 0: 0.60653066 * 1: 0.30326533 * 2: 0.07581633 * 3: 0.01263606 * 4: 0.00157952 * 5: 0.00015795 * 6: 0.00001316 * 7: 0.00000094 * 8: 0.00000006 * more: less than 1 in ten million */

TreeNodes佔用空間是普通Nodes的兩倍(相較於鏈表結構,鏈表只有指向下一個節點的指針,二叉樹則需要左右指針,分別指向左節點和右節點),所以只有當鏈表包含足夠多的節點時才會轉成TreeNodes(考慮到時間和空間的權衡),而是否足夠多就是由TREEIFY_THRESHOLD的值決定的。當紅黑樹中節點數變少時,又會轉成普通的鏈表。並且我們查看源碼的時候發現,鏈表長度達到8就轉成紅黑樹,當長度降到6就轉成普通鏈表。

這樣就解釋了為什麼不是一開始就將其轉換為TreeNodes,而是需要一定節點數才轉為TreeNodes,說白了就是trade-off,空間和時間的權衡。

當hashCode離散性很好的時候,樹型鏈表用到的概率非常小,因為數據均勻分佈在每個鏈表中,幾乎不會有鏈表中鏈表長度會達到閾值。但是在隨機hashCode下,離散性可能會變差,然而JDK又不能阻止用戶實現這種不好的hash算法,因此就可能導致不均勻的數據分佈。不過理想情況下隨機hashCode算法下所有鏈表中節點的分佈頻率會遵循泊松分佈,我們可以看到,一個鏈表中鏈表長度達到8個元素的概率為0.00000006,幾乎是不可能事件。這種不可能事件都發生了,說明鏈表中的節點數很多,查找起來效率不高。至於7,是為了作為緩衝,可以有效防止鏈表和樹頻繁轉換。

之所以選擇8,不是拍拍屁股決定的,而是根據概率統計決定的。由此可見,發展30年的Java每一項改動和優化都是非常嚴謹和科學的。

泊松分佈適合於描述單位時間(或空間)內隨機事件發生的次數。如某一服務設施在一定時間內到達的人數,電話交換機接到呼叫的次數,汽車站台的候客人數,機器出現的故障數,自然災害發生的次數,一塊產品上的缺陷數,顯微鏡下單位分區內的細菌分佈數等等。如果有興趣的,可以研究一下,概率是怎麼算出來的!

個人總結:

  1. 選擇8是因為空間和時間的權衡,再一個是因為鏈表中節點的分佈頻率會遵循泊松分佈,達到8的概率很小
  2. 選擇7是為了作為緩衝,可以有效防止鏈表和樹頻繁轉換
  3. 你的紅黑樹查詢時間複雜度低,但你的維持平衡的操作代價是大的,所以不會直接是紅黑樹(這一點是個人理解)

6.HashMap的初始容量,加載因子,擴容增量是多少?如果加載因子變大變小會怎麼樣?

HashMap的初始容量16,加載因子為0.75,擴容增量是原容量的1倍。如果HashMap的容量為16,一次擴容后容量為32。HashMap擴容是指元素個數(包括數組和鏈表+紅黑樹中)超過了16*0.75=12(容量×加載因子)之後開始擴容。

這個就是源碼里的聲明

//默認初始容量
static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16 //最大容量
static final int MAXIMUM_CAPACITY = 1 << 30; //加載因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;

加載因子越大,填滿的元素越多,空間利用率越高,但衝突的機會加大了。
反之,加載因子越小,填滿的元素越少,衝突的機會減小,但空間浪費多了(因為需要經常擴容)。

所以這是一個時間和空間的均衡。

7. 如果我默認初始大小為100,那麼元素個數到達75會擴容么?

這個問題我以前見到過,所以拿出來說一下。

首先HashMap的構造方法有四個

    public HashMap(int initialCapacity, float loadFactor) { if (initialCapacity < 0) throw new IllegalArgumentException("Illegal initial capacity: " + initialCapacity); if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; if (loadFactor <= 0 || Float.isNaN(loadFactor)) throw new IllegalArgumentException("Illegal load factor: " + loadFactor); this.loadFactor = loadFactor; this.threshold = tableSizeFor(initialCapacity); } public HashMap(int initialCapacity) { this(initialCapacity, DEFAULT_LOAD_FACTOR); } public HashMap() { this.loadFactor = DEFAULT_LOAD_FACTOR; // all other fields defaulted
    }
  
  public HashMap(Map<!--? extends K, ? extends V--> m) {       this.loadFactor = DEFAULT_LOAD_FACTOR;       putMapEntries(m, false);   }  

簡單點來說就是你可以自定義加載因子和初始容量。但是這個初始容量不是說你設置多少就是多少,他是會有個計算的,到最後Hashmap的容量一定是2的n次方

 

 

 簡單說一下putMapEntries

final void putMapEntries(Map<? extends K, ? extends V> m, boolean evict) { //獲取該map的實際長度
        int s = m.size(); if (s > 0) { //判斷table是否初始化,如果沒有初始化
            if (table == null) { // pre-size
                /**求出需要的容量,因為實際使用的長度=容量*0.75得來的,+1是因為小數相除,基本都不會是整數,容量大小不能為小數的,後面轉換為int,多餘的小數就要被丟掉,所以+1,例如,map實際長度22,22/0.75=29.3,所需要的容量肯定為30,有人會問如果剛剛好除得整數呢,除得整數的話,容量大小多1也沒什麼影響**/
                float ft = ((float)s / loadFactor) + 1.0F; //判斷該容量大小是否超出上限。
                int t = ((ft < (float)MAXIMUM_CAPACITY) ? (int)ft : MAXIMUM_CAPACITY); /**對臨界值進行初始化,tableSizeFor(t)這個方法會返回大於t值的,且離其最近的2次冪,例如t為29,則返回的值是32**/
                if (t > threshold) threshold = tableSizeFor(t); } //如果table已經初始化,則進行擴容操作,resize()就是擴容。
            else if (s > threshold) resize(); //遍歷,把map中的數據轉到hashMap中。
            for (Map.Entry<? extends K, ? extends V> e : m.entrySet()) { K key = e.getKey(); V value = e.getValue(); putVal(hash(key), key, value, false, evict); } } }

 

所以說這個答案就是不會擴容的,因為你初始它的容量是100,tableSizeFor也會自動變成128,128×0.75是93遠遠大於75.

8. HashMap中為什麼數組的長度為2的冪次方?

主要是為了計算hash值時散列性更好。

我們看一下HashMap的數組下標如何計算的

 

// 將(數組的長度-1)和hash值進行按位與操作:
i = (n - 1) & hash  // i為數組對應位置的索引 n為當前數組的大小

假定HashMap的長度為默認的16,則n – 1為15,也就是二進制的01111

可以說,Hash算法最終得到的index結果完全取決於hashCode的最後幾位。

那麼說為什麼別的数字不行呢?

假設,HashMap的長度為10,則n-1為9,也就是二進制的1001

我們來試一個hashCode:1110時,通過Hash算法得到的最終的index是8

 

再比如說:1000得到的index也是8。

也就是說,即使我們把倒數第二、三位的0、1變換,得到的index仍舊是8,說明有些index結果出現的幾率變大!

這樣,顯然不符合Hash算法均勻分佈的要求。

反觀,長度16或其他2的冪次方,Length – 1的值的二進制所有的位均為1,這種情況下,Index的結果等於hashCode的最後幾位。只要輸入的hashCode本身符合均勻分佈,Hash算法的結果就是均勻的。

一句話,HashMap的長度為2的冪次方的原因是為了減少Hash碰撞,盡量使Hash算法的結果均勻分佈。

9.put方法

在講解put方法之前,先看看hash方法,看怎麼計算哈希值的。

    static final int hash(Object key) { int h; /**先獲取到key的hashCode,然後進行移位再進行異或運算,為什麼這麼複雜,不用想肯定是為了減少hash衝突**/
        return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16); }

put方法實際調用了putVal方法

    public V put(K key, V value) { /**四個參數,第一個hash值,第四個參數表示如果該key存在值,如果為null的話,則插入新的value,最後一個參數,在hashMap中沒有用,可以不用管,使用默認的即可**/
        return putVal(hash(key), key, value, false, true); } final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict) { //tab 哈希數組,p 該哈希桶的首節點,n hashMap的長度,i 計算出的數組下標
        Node<K,V>[] tab; Node<K,V> p; int n, i; //獲取長度並進行擴容,使用的是懶加載,table一開始是沒有加載的,等put后才開始加載
        if ((tab = table) == null || (n = tab.length) == 0) n = (tab = resize()).length; /**如果計算出的該哈希桶的位置沒有值,則把新插入的key-value放到此處,此處就算沒有插入成功,也就是發生哈希衝突時也會把哈希桶的首節點賦予p**/
        if ((p = tab[i = (n - 1) & hash]) == null) tab[i] = newNode(hash, key, value, null); //發生哈希衝突的幾種情況
        else { // e 臨時節點的作用, k 存放該當前節點的key 
            Node<K,V> e; K k; //第一種,插入的key-value的hash值,key都與當前節點的相等,e = p,則表示為首節點
            if (p.hash == hash && ((k = p.key) == key || (key != null && key.equals(k)))) e = p; //第二種,hash值不等於首節點,判斷該p是否屬於紅黑樹的節點
            else if (p instanceof TreeNode) /**為紅黑樹的節點,則在紅黑樹中進行添加,如果該節點已經存在,則返回該節點(不為null),該值很重要,用來判斷put操作是否成功,如果添加成功返回null**/ e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value); //第三種,hash值不等於首節點,不為紅黑樹的節點,則為鏈表的節點
            else { //遍歷該鏈表
                for (int binCount = 0; ; ++binCount) { //如果找到尾部,則表明添加的key-value沒有重複,在尾部進行添加
                    if ((e = p.next) == null) { p.next = newNode(hash, key, value, null); //判斷是否要轉換為紅黑樹結構
                        if (binCount >= TREEIFY_THRESHOLD - 1) treeifyBin(tab, hash); break; } //如果鏈表中有重複的key,e則為當前重複的節點,結束循環
                    if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) break; p = e; } } //有重複的key,則用待插入值進行覆蓋,返回舊值。
            if (e != null) { V oldValue = e.value; if (!onlyIfAbsent || oldValue == null) e.value = value; afterNodeAccess(e); return oldValue; } } //到了此步驟,則表明待插入的key-value是沒有key的重複,因為插入成功e節點的值為null //修改次數+1
        ++modCount; //實際長度+1,判斷是否大於臨界值,大於則擴容
        if (++size > threshold) resize(); afterNodeInsertion(evict); //添加成功
        return null; }

大概如下幾步:

①. 判斷鍵值對數組table[i]是否為空或為null,否則執行resize()進行擴容,初始容量是16;

②. 根據鍵值key計算hash值得到插入的數組索引i,如果table[i]==null,直接新建節點添加,轉向⑥,如果table[i]不為空,轉向③;

③. 判斷table[i]的首個元素是否和key一樣,如果相同直接覆蓋value,否則轉向④,這裏的相同指的是hashCode以及equals;

④. 判斷table[i] 是否為TreeNode,即table[i] 是否是紅黑樹,如果是紅黑樹,遍歷發現該key不存在  則直接在樹中插入鍵值對;遍歷發現key已經存在直接覆蓋value即可;

⑤. 如果table[i] 不是TreeNode則是鏈表節點,遍歷發現該key不存在,則先添加在鏈表結尾, 判斷鏈表長度是否大於8,大於8的話把鏈錶轉換為紅黑樹;遍歷發現key已經存在直接覆蓋value即可;

⑥. 插入成功后,判斷實際存在的鍵值對數量size是否超多了最大容量threshold,如果超過,進行擴容。

10.resize方法

何時進行擴容?

HashMap使用的是懶加載,構造完HashMap對象后,只要不進行put 方法插入元素之前,HashMap並不會去初始化或者擴容table。

當首次調用put方法時,HashMap會發現table為空然後調用resize方法進行初始化
,當添加完元素后,如果HashMap發現size(元素總數)大於threshold(閾值),則會調用resize方法進行擴容

    final Node<K,V>[] resize() {
        Node<K,V>[] oldTab = table; //old的長度
        int oldCap = (oldTab == null) ? 0 : oldTab.length; //old的臨界值
        int oldThr = threshold; //初始化new的長度和臨界值
        int newCap, newThr = 0; //oldCap > 0也就是說不是首次初始化,因為hashMap用的是懶加載
        if (oldCap > 0) { //大於最大值
            if (oldCap >= MAXIMUM_CAPACITY) { //臨界值為整數的最大值
                threshold = Integer.MAX_VALUE; return oldTab; } //標記##,其它情況,擴容兩倍,並且擴容后的長度要小於最大值,old長度也要大於16
            else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY && oldCap >= DEFAULT_INITIAL_CAPACITY) //臨界值也擴容為old的臨界值2倍
                newThr = oldThr << 1; } /**如果oldCap<0,但是已經初始化了,像把元素刪除完之後的情況,那麼它的臨界值肯定還存在, 如果是首次初始化,它的臨界值則為0 **/
        else if (oldThr > 0) newCap = oldThr; //首次初始化,給與默認的值
        else { newCap = DEFAULT_INITIAL_CAPACITY; //臨界值等於容量*加載因子
            newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY); } //此處的if為上面標記##的補充,也就是初始化時容量小於默認值16的,此時newThr沒有賦值
        if (newThr == 0) { //new的臨界值
            float ft = (float)newCap * loadFactor; //判斷是否new容量是否大於最大值,臨界值是否大於最大值
            newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ? (int)ft : Integer.MAX_VALUE); } //把上面各種情況分析出的臨界值,在此處真正進行改變,也就是容量和臨界值都改變了。
        threshold = newThr; //表示忽略該警告
        @SuppressWarnings({"rawtypes","unchecked"}) //初始化
            Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap]; //賦予當前的table
        table = newTab; //此處自然是把old中的元素,遍歷到new中
        if (oldTab != null) { for (int j = 0; j < oldCap; ++j) { //臨時變量
                Node<K,V> e; //當前哈希桶的位置值不為null,也就是數組下標處有值,因為有值表示可能會發生衝突
                if ((e = oldTab[j]) != null) { //把已經賦值之後的變量置位null,當然是為了好回收,釋放內存
                    oldTab[j] = null; //如果下標處的節點沒有下一個元素
                    if (e.next == null) //把該變量的值存入newCap中,e.hash & (newCap - 1)並不等於j
                        newTab[e.hash & (newCap - 1)] = e; //該節點為紅黑樹結構,也就是存在哈希衝突,該哈希桶中有多個元素
                    else if (e instanceof TreeNode) //把此樹進行轉移到newCap中
                        ((TreeNode<K,V>)e).split(this, newTab, j, oldCap); else { /**此處表示為鏈表結構,同樣把鏈錶轉移到newCap中,就是把鏈表遍歷后,把值轉過去,在置位null**/ Node<K,V> loHead = null, loTail = null; Node<K,V> hiHead = null, hiTail = null; Node<K,V> next; do { next = e.next; if ((e.hash & oldCap) == 0) { if (loTail == null) loHead = e; else loTail.next = e; loTail = e; } else { if (hiTail == null) hiHead = e; else hiTail.next = e; hiTail = e; } } while ((e = next) != null); if (loTail != null) { loTail.next = null; newTab[j] = loHead; } if (hiTail != null) { hiTail.next = null; newTab[j + oldCap] = hiHead; } } } } } //返回擴容后的hashMap
        return newTab; }

其實主要就是兩步:1.創建新的數組 2.複製元素

但是在新的下標位置計算上1.8做了很大的優化,後面會說到。

11.get方法

    public V get(Object key) { Node<K,V> e; 9 //調用getNode方法來完成的
        return (e = getNode(hash(key), key)) == null ? null : e.value; } final Node<K,V> getNode(int hash, Object key) { //first 頭結點,e 臨時變量,n 長度,k key
        Node<K,V>[] tab; Node<K,V> first, e; int n; K k; //頭結點也就是數組下標的節點
        if ((tab = table) != null && (n = tab.length) > 0 && (first = tab[(n - 1) & hash]) != null) { //如果是頭結點,則直接返回頭結點
            if (first.hash == hash && ((k = first.key) == key || (key != null && key.equals(k)))) return first; //不是頭結點
            if ((e = first.next) != null) { //判斷是否是紅黑樹結構
                if (first instanceof TreeNode) //去紅黑樹中找,然後返回
                    return ((TreeNode<K,V>)first).getTreeNode(hash, key); do { //鏈表節點,一樣遍歷鏈表,找到該節點並返回
                    if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) return e; } while ((e = e.next) != null); } } //找不到,表示不存在該節點
        return null; }

主要就是利用equals和hashcode方法找到並返回

12.HashMap在JDK1.7和1.8除了數據結構的區別

(1)插入數據方式不同:

JDK1.7用的是頭插法,而JDK1.8及之後使用的都是尾插法,那麼他們為什麼要這樣做呢?因為JDK1.7認為最新插入的應該會先被用到,所以用了頭插法,但當採用頭插法時會容易出現逆序且環形鏈表死循環問題。但是在JDK1.8之後是因為加入了紅黑樹使用尾插法,能夠避免出現逆序且鏈表死循環的問題。

  說一下為什麼會產生死循環問題:

  問題出現在了這個移動元素的transfer方法里

  

 主要問題就出在了這行代碼上

Entry<K,V> next = e.next

如果兩個線程A,B都要對這個map進行擴容

A和B都已經創建了新的數組,假設線程A在執行到Entry < K,V > next = e.next之後,cpu時間片用完了,這時變量e指向節點a,變量next指向節點b。

此時A的狀態:e=a ,next=b

線程B繼續執行,很不巧,a、b、c節點rehash之後又是在同一個位置,開始移動節點, 因為頭插法,複製后順序是反的,結束后B的狀態:

 

 

 此時A開始執行,此時變量e指向節點a,變量next指向節點b,開始執行循環體的剩餘邏輯

if (rehash) { e.hash = null == e.key ? 0 : hash(e.key); } int i = indexFor(e.hash, newCapacity); e.next = newTable[i]; newTable[i] = e; e = next;

執行到

newTable[i] = e;

此時A的狀態

 

執行到

e = next;

 

此時e=b

再執行一波循環,Entry<K,V> next = e.next 但是此時b的next是a,就出現了死循環問題

 

 

(2)擴容后數據存儲位置的計算方式也不一樣:

在JDK1.7的時候是重新計算數組下標

而在JDK1.8的時候直接用了JDK1.7的時候計算的規律,也就是擴容前的原始位置+擴容的大小值=JDK1.8的計算方式,而不再是JDK1.7的那種異或的方法。但是這種方式就相當於只需要判斷Hash值的新增參与運算的位是0還是1就直接迅速計算出了擴容后的儲存方式。

就比如說:數組大小是4,hash算法是對長度取模

 

 擴容后是這樣的

我們可以把這三個數的二進制和擴容后的length-1進行按位與,可以看到只有数字5新增位為1

 

 

 

 因此,我們在擴充HashMap的時候,不需要像JDK1.7的實現那樣重新計算hash,只需要看看原來的hash值新增的那個bit是1還是0就好了,是0的話索引沒變,是1的話索引變成“原索引+oldCap”

(3)擴容的條件不同,1.7需要容量超過閾值且發生hash衝突,1.8超過閾值即會擴容

(4)JDK1.7的時候是先進行擴容後進行插入,而在JDK1.8的時候則是先插入後進行擴容

(5)1.8中沒有區分鍵為null的情況,而1.7版本中對於鍵為null的情況調用putForNullKey()方法。但是兩個版本中如果鍵為null,那麼調用hash()方法得到的都將是0,所以鍵為null的元素都始終位於哈希表table【0】中。

(6)jdk1.7中當哈希表為空時,會先調用inflateTable()初始化一個數組;而1.8則是直接調用resize()擴容

(7)jdk1.7中的hash函數對哈希值的計算直接使用key的hashCode值,而1.8中則是採用key的hashCode異或上key的hashCode進行無符號右移16位的結果,避免了只靠低位數據來計算哈希時導致的衝突,計算結果由高低位結合決定,使元素分佈更均勻

13、HashMap是線程安全的么?如果想線程安全怎麼辦?

不是線程安全的,多線程下會出現死循環和put操作時可能導致元素丟失

死循環原因:上邊已經分析過了

丟失原因:當多個線程同時執行addEntry(hash,key ,value,i)時,如果產生哈希碰撞,導致兩個線程得到同樣的bucketIndex去存儲,就可能會發生元素覆蓋丟失的情況

 

想實現線程安全的解決方法:

1.使用Hashtable 類,Hashtable 是線程安全的(不建議用,就是利用了synchronized進行加鎖);

2.使用併發包下的java.util.concurrent.ConcurrentHashMap,ConcurrentHashMap實現了更高級的線程安全;

3.或者使用synchronizedMap() 同步方法包裝 HashMap object,得到線程安全的Map,並在此Map上進行操作。

 

參考:

https://blog.csdn.net/m0_37914588/article/details/82287191

https://www.jianshu.com/p/7cf2d6f1096b

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※教你寫出一流的銷售文案?

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※回頭車貨運收費標準

※別再煩惱如何寫文案,掌握八大原則!

※超省錢租車方案

※產品缺大量曝光嗎?你需要的是一流包裝設計!

※推薦台中搬家公司優質服務,可到府估價

卷積生成對抗網絡(DCGAN)—生成手寫数字

2{icon} {views}

深度卷積生成對抗網絡(DCGAN)

—- 生成 MNIST 手寫圖片

1、基本原理

生成對抗網絡(GAN)由2個重要的部分構成:

  • 生成器(Generator):通過機器生成數據(大部分情況下是圖像),目的是“騙過”判別器
  • 判別器(Discriminator):判斷這張圖像是真實的還是機器生成的,目的是找出生成器做的“假數據”

訓練過程

  • 1、固定判別器,讓生成器不斷生成假數據,給判別器判別,開始生成器很弱,但是隨着不斷的訓練,生成器不斷提升,最終騙過判別器。此時判別器判斷假數據的概率為50%
  • 2、固定生成器,訓練判別器。判別器經過訓練,提高鑒別能力,最終能準確判斷雖有的假圖片
  • 3、循環上兩個階段,最終生成器和判別器都越來越強。然後就可以使用生成器來生成我們想要的圖片了

2、相關數學原理

  • 判別器在這裡是一種分類器,用於區分樣本的真偽,因此我們常常使用交叉熵(cross entropy)來進行判別分佈的相似性

\[H(p, q) := -\sum_i p_i \log q_i \]

公式中 \(p_i\)\(q_i\) 為真實的樣本分佈和生成器的生成分佈

假定 \(y_1\) 為正確樣本分佈,那麼對應的( \(1-y_1\) )就是生成樣本的分佈。\(D\) 表示判別器,則 \(D(x_1)\) 表示判別樣本為正確的概率, \(1-D(x_1)\) 則對應着判別為錯誤樣本的概率。則有如下式子(這裏僅僅是對當前情況下的交叉熵損失的具體化)。

\[H((x_i, y_i)_{i=1}^N, D) = – \sum_{i=1}^N y_i\log D(x_i) – \sum_{i=1}^N(1-y_i)\log (1 – D(x_i)) \]

對於GAN中的樣本點 \(x_i\) ,對應於兩個出處,要麼來自於真實樣本,要麼來自於生成器生成的樣本 $\tilde{x} – G(z) $ ( 這裏的 \(z\) 是服從於投到生成器中噪聲的分佈)。

對於來自於真實的樣本,我們要判別為正確的分佈 \(y_i\) 。來自於生成的樣本我們要判別其為錯誤分佈( \(1-y_i\) )。將上面式子進一步使用概率分佈的期望形式寫出(為了表達無限的樣本情況,相當於無限樣本求和情況),並且讓 \(y_i\) 為 1/2 且使用 \(G(z)\) 表示生成樣本可以得到如下公式:

\[H \left( (x_i, y_i)_{i=1}^\infty, D \right) = -\frac{1}{2}E_{x-p_{data}}\left[ \log D(x) \right] – \frac{1}{2}E_z\left[ \log (1-D(G(z))) \right] \\\ GAN損失函數期望形式 \]

對於論文中的公式

\[min_G max_D V(D, G) = E_{x-p_{data}(x)}\left[ \log D(x) \right] + E_{z-p_z(z)}\left[ \log (1-D(G(z))) \right] \\\ GAN損失函數的 min max表達 \]

其實是與上面公式一樣的,下面做解釋

  • 這裏的 \(V(D, G)\) 相當於表示真實樣本和生成樣本的差異程度。
  • \(max_D V(D, G)\) 的意思是固定生成器 \(G\), 盡可能地讓判別器能夠最大化地判別出樣本來自於真實數據還是生成的數據。
  • 再將後面的 $L = max_D V(D, G) $ 看成整體,對於 \(min_G L\)這裡是在固定判別器\(D\)的條件下得到生成器 \(G\),這個 \(G\) 要求能夠最小化真實樣本與生成樣本的差異。
  • 通過上述 \(min\) \(max\) 的博弈過程,理想情況下會收斂於生成分佈擬合於真實分佈。

3、卷積對抗生成網絡

卷積對抗生成網絡(DCGAN)是在GAN的基礎上加入了CNN,主要是改進了網絡結構,在訓練過程中狀態穩定,並且可以有效實現高質量圖片的生成以及相關的生成模型應用。DCGAN的生成器網絡結構如下圖:

DCGAN的改進:

  • 使用步長卷積代替上採樣層,卷積在提取圖像特徵上具有很好的作用,並且使用卷積代替全連接層
  • 生成器G和判別器D中幾乎每一層都使用batchnorm層,將特徵層的輸出歸一化到一起,加速了訓練,提升了訓練的穩定性。
  • 在判別器中使用leakrelu激活函數,而不是RELU,防止梯度稀疏,生成器中仍然採用relu,但是輸出層採用tanh。

4、DCGAN代碼實現

shenduimport numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import optimizers, losses, layers, Sequential, Model
class DCGAN():
    '''
    實現深度對抗神經網絡
    生成 MNIST 手寫数字圖片
    輸入的噪聲為服從正態分佈均值為 0 方差為 1 的分佈, shape:(None, 100)
    生成器(G)輸入 噪聲, 輸出為 (None, 28, 28, 1)的圖片
    分類器(D)輸入為 (None, 28, 28, 1)的圖片,輸出圖片的分類真假
    '''
    def __init__(self):
        self.img_rows = 28 
        self.img_cols = 28
        self.channels = 1
        self.img_shape = (self.img_rows, self.img_cols, self.channels)

        optimizer = optimizers.Adam(0.0002)

        # 構建編譯分類器
        self.discriminator = self.build_discriminator()
        self.discriminator.compile(loss='binary_crossentropy', 
            optimizer=optimizer,
            metrics=['accuracy'])

        # 構建編譯生成器
        self.generator = self.build_generator()
        self.generator.compile(loss='binary_crossentropy', optimizer=optimizer)

        # 生成器輸入為噪音,生成圖片
        z = layers.Input(shape=(100,))
        img = self.generator(z)

        # 對於整個對抗網絡模型只優化生成器的參數
        self.discriminator.trainable = False

        # 用生成的圖片輸入分類器判斷
        valid = self.discriminator(img)

        # 對於整個對抗網絡 輸入噪音 => 生成圖片 => 決定圖片是否有效
        self.combined = Model(z, valid)
        self.combined.compile(loss='binary_crossentropy', optimizer=optimizer)

        
    def build_generator(self):
        '''
        構建生成器
        '''
        noise_shape = (100,)
        
        model = tf.keras.Sequential()
        
        # 添加全連接層
        model.add(layers.Dense(7*7*256, use_bias=False, input_shape=noise_shape))
        # 添加 BatchNormalization 層,對數據進行歸一化
        model.add(layers.BatchNormalization())
        model.add(layers.LeakyReLU())

        model.add(layers.Reshape((7, 7, 256)))
        
        # 添加逆卷積層,卷積核大小為 5X5,數量 128, 步長為 1
        model.add(layers.Conv2DTranspose(128, (5, 5), strides=(1, 1), padding='same', use_bias=False))
        assert model.output_shape == (None, 7, 7, 128)
        model.add(layers.BatchNormalization())
        model.add(layers.LeakyReLU())
        
        # 添加逆卷積層,卷積核大小為 5X5,數量 64, 步長為 2
        model.add(layers.Conv2DTranspose(64, (5, 5), strides=(2, 2), padding='same', use_bias=False))
        assert model.output_shape == (None, 14, 14, 64)
        model.add(layers.BatchNormalization())
        model.add(layers.LeakyReLU())
        
        # 添加逆卷積層,卷積核大小為 5X5,數量 1, 步長為 2
        model.add(layers.Conv2DTranspose(1, (5, 5), strides=(2, 2), padding='same', use_bias=False, activation='tanh'))
        assert model.output_shape == (None, 28, 28, 1)
        
        model.summary()
        noise = layers.Input(shape=noise_shape)
        img = model(noise)
        
        # 返回 Model 對象,輸入為 噪聲, 輸出為 圖像
        return keras.Model(noise, img)

    
    def build_discriminator(self):
        '''
        構建分類器
        '''
        img_shape = (self.img_rows, self.img_cols, self.channels)
        
        model = tf.keras.Sequential()
        
        model.add(layers.Conv2D(64, (5, 5), strides=(2, 2), padding='same',
                                         input_shape=img_shape))
        model.add(layers.LeakyReLU())
        # 添加 Dropout 層,減少參數數量
        model.add(layers.Dropout(0.3))

        model.add(layers.Conv2D(128, (5, 5), strides=(2, 2), padding='same'))
        model.add(layers.LeakyReLU())
        model.add(layers.Dropout(0.3))
        # 把數據鋪平
        model.add(layers.Flatten())
        model.add(layers.Dense(1))
        
        model.summary()
        
        img = layers.Input(shape=img_shape)
        validity = model(img)
        
        return keras.Model(img, validity)

    
    def train(self, epochs, batch_size=128, save_interval=50):
        '''
        網絡訓練
        '''
        # 加載 數據集
        (X_train, _), (_, _) = keras.datasets.mnist.load_data()

        # 把數據縮放到 [-1, 1]
        X_train = (X_train.astype(np.float32) - 127.5) / 127.5
        # 添加通道維度
        X_train = np.expand_dims(X_train, axis=3)
        half_batch = int(batch_size / 2)

        for epoch in range(epochs):

            # ---------------------
            #  訓練分類器
            # ---------------------

            # 隨機的選擇一半的 batch 數量圖片
            idx = np.random.randint(0, X_train.shape[0], half_batch)
            imgs = X_train[idx]

            noise = np.random.normal(0, 1, (half_batch, 100))

            # 生成一半 batch 數量的 圖片
            gen_imgs = self.generator.predict(noise)

            # 分類器損失
            d_loss_real = self.discriminator.train_on_batch(imgs, np.ones((half_batch, 1)))
            d_loss_fake = self.discriminator.train_on_batch(gen_imgs, np.zeros((half_batch, 1)))
            d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)


            # ---------------------
            #  訓練生成器
            # ---------------------

            noise = np.random.normal(0, 1, (batch_size, 100))

            # The generator wants the discriminator to label the generated samples
            # as valid (ones)
            # 對於生成器,希望分類器把更多的圖片判為 有效 (用 1 表示)
            valid_y = np.array([1] * batch_size)

            # 訓練生成器
            g_loss = self.combined.train_on_batch(noise, valid_y)

            # 打印訓練進度
            print ("%d [D loss: %f, acc.: %.2f%%] [G loss: %f]" % (epoch, d_loss[0], 100*d_loss[1], g_loss))

            # 每個 save_interval 周期保存一張圖片
            if epoch % save_interval == 0:
                self.save_imgs(epoch)

    def save_imgs(self, epoch):
        r, c = 5, 5
        noise = np.random.normal(0, 1, (r * c, 100))
        gen_imgs = self.generator.predict(noise)

        # 把圖片數據縮放到 0 - 1
        gen_imgs = 0.5 * gen_imgs + 0.5

        fig, axs = plt.subplots(r, c)
        cnt = 0
        for i in range(r):
            for j in range(c):
                axs[i,j].imshow(gen_imgs[cnt, :,:,0], cmap='gray')
                axs[i,j].axis('off')
                cnt += 1
        fig.savefig("dcgan/images/mnist_%d.png" % epoch)
        plt.close()

if __name__ == '__main__':
    dcgan = DCGAN()
    dcgan.train(epochs=10000, batch_size=32, save_interval=200)

網絡參數信息

5、訓練結果

下面是循環了 10000 次 epoch 后,從開始每隔 2000 個 epoch 生成器生成的圖片

  • 可以看到,剛開始全部都是噪聲,隨着訓練的進行,圖片逐漸清晰

  • 生成的圖片還是不太清晰,一方面的原因是我訓練的 epoch 周期太少,因為自己電腦性能問題,太耗時間,所以訓練的epoch 周期少,如果有條件后提高訓練周期應該會好很多。另一方面或許因為我構建的網絡還有不合理之,後期還需要改進。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※超省錢租車方案

※別再煩惱如何寫文案,掌握八大原則!

※回頭車貨運收費標準

※教你寫出一流的銷售文案?

※產品缺大量曝光嗎?你需要的是一流包裝設計!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

網頁設計最專業,超強功能平台可客製化

程序員需要了解的硬核知識之二進制

1{icon} {views}

我們都知道,計算機的底層都是使用二進制數據進行數據流傳輸的,那麼為什麼會使用二進製表示計算機呢?或者說,什麼是二進制數呢?在拓展一步,如何使用二進制進行加減乘除?二進制數如何表示負數呢?本文將一一為你揭曉。

為什麼用二進製表示

我們大家知道,計算機內部是由IC电子元件組成的,其中 CPU內存 也是 IC 电子元件的一種,CPU和內存圖如下

CPU 和 內存使用IC电子元件作為基本單元,IC电子元件有不同種形狀,但是其內部的組成單元稱為一個個的引腳。有人說CPU 和 內存內部都是超大規模集成電路,其實IC 就是集成電路(Integrated Circuit)。

IC元件兩側排列的四方形塊就是引腳,IC的所有引腳,只有兩種電壓: 0V5V,IC的這種特性,也就決定了計算機的信息處理只能用 0 和 1 表示,也就是二進制來處理。一個引腳可以表示一個 0 或 1 ,所以二進制的表示方式就變成 0、1、10、11、100、101等,雖然二進制數並不是專門為 引腳 來設計的,但是和 IC引腳的特性非常吻合。

計算機的最小集成單位為 ,也就是 比特(bit),二進制數的位數一般為 8位、16位、32位、64位,也就是 8 的倍數,為什麼要跟 8 扯上關係呢? 因為在計算機中,把 8 位二進制數稱為 一個字節, 一個字節有 8 位,也就是由 8個bit構成。

為什麼1個字節等於8位呢?因為 8 位能夠涵蓋所有的字符編碼,這個記住就可以了。

字節是最基本的計量單位,位是最小單位。

用字節處理數據時,如果数字小於存儲數據的字節數 ( = 二進制的位數),那麼高位就用 0 填補,高位和數學的數字錶示是一樣的,左側表示高位,右側表示低位。比如 這個六位數用二進制數來表示就是 100111,只有6位,高位需要用 0 填充,填充完后是 00100111,佔一個字節,如果用 16 位表示 就是 0000 0000 0010 0111佔用兩個字節。

我們一般口述的 32 位和 64位的計算機一般就指的是處理位數,32 位一次可以表示 4個字節,64位一次可以表示8個字節的二進制數。

我們一般在軟件開發中用十進制數表示的邏輯運算等,也會被計算機轉換為二進制數處理。對於二進制數,計算機不會區分他是 圖片、音頻文件還是数字,這些都是一些數據的結合體。

什麼是二進制數

那麼什麼是二進制數呢?為了說明這個問題,我們先把 00100111 這個數轉換為十進制數看一下,二進制數轉換為十進制數,直接將各位置上的值 * 位權即可,那麼我們將上面的數值進行轉換

也就是說,二進制數代表的 00100111 轉換成十進制就是 39,這個 39 並不是 3 和 9 兩個数字連着寫,而是 3 * 10 + 9 * 1,這裏面的 10 , 1 就是位權,以此類推,上述例子中的位權從高位到低位依次就是 7 6 5 4 3 2 1 0 。這個位權也叫做次冪,那麼最高位就是2的7次冪,2的6次冪 等等。二進制數的運算每次都會以2為底,這個2 指得就是基數,那麼十進制數的基數也就是 10 。在任何情況下位權的值都是 數的位數 – 1,那麼第一位的位權就是 1 – 1 = 0, 第二位的位權就睡 2 – 1 = 1,以此類推。

那麼我們所說的二進制數其實就是 用0和1兩個数字來表示的數,它的基數為2,它的數值就是每個數的位數 * 位權再求和得到的結果,我們一般來說數值指的就是十進制數,那麼它的數值就是 3 * 10 + 9 * 1 = 39。

移位運算和乘除的關係

在了解過二進制之後,下面我們來看一下二進制的運算,和十進制數一樣,加減乘除也適用於二進制數,只要注意逢 2 進位即可。二進制數的運算,也是計算機程序所特有的運算,因此了解二進制的運算是必須要掌握的。

首先我們來介紹移位 運算,移位運算是指將二進制的數值的各個位置上的元素坐左移和右移操作,見下圖

上述例子中還是以 39 為例,我們先把十進制的39 轉換為二進制的 0010 0111,然後向左移位 << 一個字節,也就變成了 0100 1110,那麼再把此二進制數轉換為十進制數就是上面的78, 十進制的78 竟然是 十進制39 的2倍關係。我們在讓 0010 0111 左移兩位,也就是 1001 1100,得出來的值是 156,相當於擴大了四倍!

因此你可以得出來此結論,左移相當於是數值擴大的操作,那麼右移 >> 呢?按理說右移應該是縮小 1/2,1/4 倍,但是39 縮小二倍和四倍不就變成小數了嗎?這個怎麼表示呢?請看下一節

便於計算機處理的補數

剛才我們沒有介紹右移的情況,是因為右移之後空出來的高位數值,有 0 和 1 兩種形式。要想區分什麼時候補0什麼時候補1,首先就需要掌握二進制數表示負數的方法。

二進制數中表示負數值時,一般會把最高位作為符號來使用,因此我們把這個最高位當作符號位。 符號位是 0 時表示正數,是 1 時表示 負數。那麼 -1 用二進制數該如何表示呢?可能很多人會這麼認為: 因為 1 的二進制數是 0000 0001,最高位是符號位,所以正確的表示 -1 應該是 1000 0001,但是這個答案真的對嗎?

計算機世界中是沒有減法的,計算機在做減法的時候其實就是在做加法,也就是用加法來實現的減法運算。比如 100 – 50 ,其實計算機來看的時候應該是 100 + (-50),為此,在表示負數的時候就要用到二進制補數,補數就是用正數來表示的負數。

為了獲得補數,我們需要將二進制的各數位的數值全部取反,然後再將結果 + 1 即可,先記住這個結論,下面我們來演示一下。

具體來說,就是需要先獲取某個數值的二進制數,然後對二進制數的每一位做取反操作(0 —> 1 , 1 —> 0),最後再對取反后的數 +1 ,這樣就完成了補數的獲取。

補數的獲取,雖然直觀上不易理解,但是邏輯上卻非常嚴謹,比如我們來看一下 1 – 1 的這個過程,我們先用上面的這個 1000 0001(它是1的補數,不知道的請看上文,正確性先不管,只是用來做一下計算)來表示一下

奇怪,1 – 1 會變成 130 ,而不是0,所以可以得出結論 1000 0001 表示 -1 是完全錯誤的。

那麼正確的該如何表示呢?其實我們上面已經給出結果了,那就是 1111 1111,來論證一下它的正確性

我們可以看到 1 – 1 其實實際上就是 1 + (-1),對 -1 進行上面的取反 + 1 后變為 1111 1111, 然後與 1 進行加法運算,得到的結果是九位的 1 0000 0000,結果發生了溢出,計算機會直接忽略掉溢出位,也就是直接拋掉 最高位 1 ,變為 0000 0000。也就是 0,結果正確,所以 1111 1111 表示的就是 -1 。

所以負數的二進製表示就是先求其補數,補數的求解過程就是對原始數值的二進制數各位取反,然後將結果 + 1

當然,結果不為 0 的運算同樣也可以通過補數求得正確的結果。不過,有一點需要注意,當運算結果為負的時候,計算結果的值也是以補數的形式出現的,比如 3 – 5 這個運算,來看一下解析過程

3 – 5 的運算,我們按着上面的思路來過一遍,計算出來的結果是 1111 1110,我們知道,這個數值肯定表示負數,但是負數無法直接用十進製表示,需要對其取反+ 1,算出來的結果是 2,因為 1111 1110的高位是 1,所以最終的結果是 -2。

編程語言的數據類型中,有的可以處理負數,有的不可以。比如 C語言中不能處理負數的 unsigned short類型,也有能處理負數的short類型 ,都是兩個字節的變量,它們都有 2 的十六次冪種值,但是取值範圍不一樣,short 類型的取值範圍是 -32768 – 32767 , unsigned short 的取值範圍是 0 – 65536。

仔細思考一下補數的機制,就能明白 -32768 比 32767 多一個數的原因了,最高位是 0 的正數有 0 ~ 32767 共 32768 個,其中包括0。最高位是 1 的負數,有 -1 ~ -32768 共 32768 個,其中不包含0。0 雖然既不是正數也不是負數,但是考慮到其符號位,就將其歸為了正數。

算數右移和邏輯右移的區別

在了解完補數后,我們重新考慮一下右移這個議題,右移在移位后空出來的最高位有兩種情況 0 和 1。當二進制數的值表示圖形模式而非數值時,移位后需要在最高位補0,類似於霓虹燈向右平移的效果,這就被稱為邏輯右移

將二進制數作為帶符號的數值進行右移運算時,移位后需要在最高位填充移位前符號位的值( 0 或 1)。這就被稱為算數右移。如果數值使用補數表示的負數值,那麼右移后在空出來的最高位補 1,就可以正確的表示 1/2,1/4,1/8等的數值運算。如果是正數,那麼直接在空出來的位置補 0 即可。

下面來看一個右移的例子。將 -4 右移兩位,來各自看一下移位示意圖

如上圖所示,在邏輯右移的情況下, -4 右移兩位會變成 63, 顯然不是它的 1/4,所以不能使用邏輯右移,那麼算數右移的情況下,右移兩位會變為 -1,顯然是它的 1/4,故而採用算數右移。

那麼我們可以得出來一個結論:左移時,無論是圖形還是數值,移位后,只需要將低位補 0 即可;右移時,需要根據情況判斷是邏輯右移還是算數右移。

下面介紹一下符號擴展:將數據進行符號擴展是為了產生一個位數加倍、但數值大小不變的結果,以滿足有些指令對操作數位數的要求,例如倍長於除數的被除數,再如將數據位數加長以減少計算過程中的誤差。

以8位二進製為例,符號擴展就是指在保持值不變的前提下將其轉換成為16位和32位的二進制數。將0111 1111這個正的 8位二進制數轉換成為 16位二進制數時,很容易就能夠得出0000 0000 0111 1111這個正確的結果,但是像 1111 1111這樣的補數來表示的數值,該如何處理?直接將其表示成為1111 1111 1111 1111就可以了。也就是說,不管正數還是補數表示的負數,只需要將 0 和 1 填充高位即可。

邏輯運算的竅門

掌握邏輯和運算的區別是:將二進制數表示的信息作為四則運算的數值來處理就是算數,像圖形那樣,將數值處理為單純的 01 的羅列就是邏輯

計算機能夠處理的運算,大體可分為邏輯運算和算數運算,算數運算指的是加減乘除四則運算;邏輯運算指的是對二進制各個數位的 0 和 1分別進行處理的運算,包括邏輯非(NOT運算)、邏輯與(AND運算)、邏輯或(OR運算)和邏輯異或(XOR運算)四種。

  • 邏輯非 指的是將 0 變成 1,1 變成 0 的取反操作
  • 邏輯與 指的是”兩個都是 1 時,運算結果才是 1,其他情況下是 0″
  • 邏輯或 指的是”至少有一方是 1 時,運算結果為 1,其他情況下運算結果都是 0″
  • 邏輯異或 指的是 “其中一方是 1,另一方是 0時運算結果才是 1,其他情況下是 0”

掌握邏輯運算的竅門,就是要摒棄二進制數表示數值這一個想法。大家不要把二進制數表示的值當作數值,應該把它看成是 開關上的 ON/OFF

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計最專業,超強功能平台可客製化

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※回頭車貨運收費標準

※推薦評價好的iphone維修中心

※教你寫出一流的銷售文案?

台中搬家公司教你幾個打包小技巧,輕鬆整理裝箱!

台中搬家公司費用怎麼算?

【asp.net core 系列】6 實戰之 一個項目的完整結構

1{icon} {views}

0. 前言

在《asp.net core 系列》之前的幾篇文章中,我們簡單了解了路由、控制器以及視圖的關係以及靜態資源的引入,讓我們對於asp.net core mvc項目有了基本的認識。不過,這些並不是 asp.net core mvc項目的全部內容,剩下的內容我將結合實戰項目為大家講解其中的知識。現在,就讓我們開始吧。

1. 項目構建

拋開之前的項目,現在跟着我重新創建一個項目,第一步依舊是先創建一個解決方案:

dotnet new sln --name Template

我先介紹一下這個項目(指整個項目,不是單獨的asp.net core 應用),這是一個後台管理的模板應用,提供了常見後台系統(管理員端)的功能,包括員工管理、部門管理、角色管理等功能。

現在回到項目中,通常一個項目需要一個模型層,一個數據提供層以及web展示層。然後,我們依次創建 Data、Domain、Web 三個項目,其中Data和Domain 是 classlib,Web是mvc項目。

# 確保當前目錄與 Template.sln 處於相同的目錄
dotnet new classlib --name Data
dotnet new classlib --name Domain
dotnet new mvc --name Web

添加三個項目到解決方案中:

dotnet sln add Data
dotnet sln add Domain
dotnet sln add Web

因為Data 中存放着模型層,所以需要其他項目對它有一個引用:

cd Domain
dotnet add reference ../Data
cd ../Web
dotnet add reference ../Data

當然,實際開發中我們應當還有一個Service層,這一層用來存放業務代碼,減少控制器里不必要的業務代碼。那麼繼續:

# 回到項目的根目錄
cd ..
dotnet new classlib --name Service
dotnet sln add Service

然後添加Service的引用:

cd Service
dotnet add reference ../Data

將 Service的引用添加到Web里:

cd ../Web
dotnet add reference ../Service

現在一個大型工程基本都是面向接口編程,幾個關鍵層應當都是接口層,我們實際上還缺少Domain的實現層和Service的實現層。

cd ..
dotnet new classlib --name Domain.Implements
dotnet new classlib --name Service.Implements

在對應的實現層中,引入它們實現的接口層,並引入Data:

cd Domain.Implements
dotnet add reference ../Data
dotnet add reference ../Domain
cd ../Service.Implements
dotnet add reference ../Data
dotnet add reference ../Domain
dotnet add reference ../Service

這裡在Service的實現層添加Domain接口層的引用,而不是實現層的引用。這是因為面向接口編程,我們需要對Service實現層隱藏Domain的實現,所以對於Service的實現層來說,不需要關心Domain層的實現邏輯。

在Web中添加新建的兩個實現層的引用:

cd ../Web
dotnet add reference ../Domain.Implements
dotnet add reference ../Service.Implements

添加這兩個實現層到解決方案中:

cd ..
dotnet sln add Domain.Implements
dotnet sln add Service.Implements

下圖是到目前為止的項目結構圖:

整體而言,Data是各個層之間的數據流通依據,所以各個項目都依賴於此項目,各個接口層的實現層都只對Web可見,其他各層實際上並不清楚具體實現。

隱藏實現層有什麼好處呢?

  • 調用方不知道實現方的邏輯,避免調用方對特定實現的依賴
  • 有利於團隊協作,有的團隊是針對模塊劃分,有的是針對分層劃分,無論哪種,使用接口都是一個好的選擇
  • 有利於後期優化,可以很方便的切換實現層,而不用重新編譯過多的代碼

當然,並不只有這些好處,不過這樣有一個壞處,在web層調用service層時會更繁瑣,不過這也不是不可解決的,後續的內容中會為大家介紹如何解決這個煩惱。

2. 項目補充

通常情況下,一個完整的項目還會有一個工具類項目和一個測試項目。所以,繼續添加以下項目:

dotnet new classlib --name Utils

Utils 表示工具類,通常一個項目中工具類會比較多,所以就抽成了一個項目,單獨列出來。

添加測試項目:

dotnet new nunit --name Test

這裏使用的是nunit 3測試框架,當然還有另一個是xunit測試框架。

添加兩個項目到解決方案里:

dotnet sln add Utils
dotnet sln add Test

3. 總結

本章內容旨在通過創建項目,讓大家了解實際開發中項目的層級規劃思想,這並不代表我的就是最優的,只是這是我總結出來相對方便的層級關係。這裏並沒有講解如何通過Visual Studio或者Rider創建這樣的一個項目,我希望大夥能夠自己試試。

好了,希望大家能創建好項目,當然了後期我會給大家提供這個項目的源碼的,地址暫時保密哦。

更多內容煩請關注我的博客《高先生小屋》

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※產品缺大量曝光嗎?你需要的是一流包裝設計!

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※回頭車貨運收費標準

※推薦評價好的iphone維修中心

※超省錢租車方案

台中搬家遵守搬運三大原則,讓您的家具不再被破壞!

※推薦台中搬家公司優質服務,可到府估價

Netty源碼學習系列之4-ServerBootstrap的bind方法

1{icon} {views}

前言

    今天研究ServerBootstrap的bind方法,該方法可以說是netty的重中之重、核心中的核心。前兩節的NioEventLoopGroup和ServerBootstrap的初始化就是為bind做準備。照例粘貼一下這個三朝元老的demo,開始本文內容。

 1 public class NettyDemo1 {
 2     // netty服務端的一般性寫法
 3     public static void main(String[] args) {
 4         EventLoopGroup boss = new NioEventLoopGroup(1);
 5         EventLoopGroup worker = new NioEventLoopGroup();
 6         try {
 7             ServerBootstrap bootstrap = new ServerBootstrap();
 8             bootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
 9                     .option(ChannelOption.SO_BACKLOG, 100)
10                     .handler(new NettyServerHandler())
11                     .childHandler(new ChannelInitializer<SocketChannel>() {
12                         @Override
13                         protected void initChannel(SocketChannel socketChannel) throws Exception {
14                             ChannelPipeline pipeline = socketChannel.pipeline();
15                             pipeline.addLast(new StringDecoder());
16                             pipeline.addLast(new StringEncoder());
17                             pipeline.addLast(new NettyServerHandler());
18                         }
19                     });
20             ChannelFuture channelFuture = bootstrap.bind(90);
21             channelFuture.channel().closeFuture().sync();
22         } catch (Exception e) {
23             e.printStackTrace();
24         } finally {
25             boss.shutdownGracefully();
26             worker.shutdownGracefully();
27         }
28     }
29 }

 

一、bind及doBind方法

1.ServerBootstrap.bind方法

    該方法有多個重載方法,但核心作用只有一個,就是將參數轉為InetSocketAddress對象傳給 —>

1 public ChannelFuture bind(int inetPort) {
2         return bind(new InetSocketAddress(inetPort));
3     }
1 public ChannelFuture bind(String inetHost, int inetPort) {
2         return bind(SocketUtils.socketAddress(inetHost, inetPort));
3     }
1 public ChannelFuture bind(InetAddress inetHost, int inetPort) {
2         return bind(new InetSocketAddress(inetHost, inetPort));
3     }

    下面這個bind方法,在該方法中調用了doBind方法。

1 public ChannelFuture bind(SocketAddress localAddress) {
2         validate();
3         return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
4     }

2、ServerBootstrap的doBind方法

    doBind方法位於父類AbstractBootstrap中,它有兩大功能,均在下面代碼中標識了出來,它們分別對應通過原生nio進行server端初始化時的兩個功能,第1步對應將channel註冊到selector上;第2步對應將server地址綁定到channel上。

 1 private ChannelFuture doBind(final SocketAddress localAddress) {
 2         final ChannelFuture regFuture = initAndRegister(); // 1)、初始化和註冊,重要***
 3         final Channel channel = regFuture.channel();
 4         if (regFuture.cause() != null) {
 5             return regFuture;
 6         }
 7 
 8         if (regFuture.isDone()) {
 9             // At this point we know that the registration was complete and successful.
10             ChannelPromise promise = channel.newPromise();
11             doBind0(regFuture, channel, localAddress, promise); // 2)、將SocketAddress和channel綁定起來,最終執行的是nio中的功能,重要**
12             return promise;
13         } else {
14             // 省略異常判斷、添加監聽器和異步調用doBind0方法
15         }
16     }

    為方便關聯對照,下面再粘貼一個簡單的原生NIO編程的服務端初始化方法,其實doBind方法的邏輯基本就是對下面這個方法的封裝,只是增加了很多附加功能。

    因為上述兩步都有些複雜,所以此處分兩部分進行追蹤。

二、AbstractBootstrap的initAndRegister方法

     該方法代碼如下所示,一共有三個核心方法,邏輯比較清晰,將channel new出來,初始化它,然後註冊到selector上。下面我們各個擊破。

 1 final ChannelFuture initAndRegister() {
 2         Channel channel = null;
 3         try { // 1)、實例化channel,作為服務端初始化的是NioServerSocketChannel
 4             channel = channelFactory.newChannel();
 5             init(channel); // 2)、初始化channel,即給channel中的屬性賦值
 6         } catch (Throwable t) {
 7             if (channel != null) {
 8                 channel.unsafe().closeForcibly();
 9                 return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
10             }
11             return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
12         }
13         // 3)、註冊,即最終是將channel 註冊到selector上
14         ChannelFuture regFuture = config().group().register(channel);
15         if (regFuture.cause() != null) {
16             if (channel.isRegistered()) {
17                 channel.close();
18             } else {
19                 channel.unsafe().closeForcibly();
20             }
21         }
22         return regFuture;
23     }

1、channelFactory.newChannel()方法

1 @Override
2     public T newChannel() {
3         try {
4             return constructor.newInstance();
5         } catch (Throwable t) {
6             throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
7         }
8     }

    該方法完成了channel的實例化,channelFactory的賦值可參見上一篇博文【Netty源碼學習系列之3-ServerBootstrap的初始化】(地址 https://www.cnblogs.com/zzq6032010/p/13027161.html),對服務端來說,這裏channelFactory值為ReflectiveChannelFactory,且其內部的constructor是NioServerSocketChannel的無參構造器,下面追蹤NioServerSocketChannel的無參構造方法。

1.1)、new NioServerSocketChannel()

1 public NioServerSocketChannel() {
2         this(newSocket(DEFAULT_SELECTOR_PROVIDER));
3     }
 1 private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
 2 
 3 private static ServerSocketChannel newSocket(SelectorProvider provider) {
 4         try {
 5             return provider.openServerSocketChannel();
 6         } catch (IOException e) {
 7             throw new ChannelException(
 8                     "Failed to open a server socket.", e);
 9         }
10     }

    可見,它先通過newSocket方法獲取nio原生的ServerSocketChannel,然後傳給了重載構造器,如下,其中第三行是對NioServerSocketChannelConfig  config進行了賦值,邏輯比較簡單,下面主要看對父類構造方法的調用。

1 public NioServerSocketChannel(ServerSocketChannel channel) {
2         super(null, channel, SelectionKey.OP_ACCEPT);
3         config = new NioServerSocketChannelConfig(this, javaChannel().socket());
4     }

1.2)、對NioServerSocketChannel父類構造方法的調用

 1 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
 2         super(parent);
 3         this.ch = ch;
 4         this.readInterestOp = readInterestOp;
 5         try {
 6             ch.configureBlocking(false);
 7         } catch (IOException e) {
 8             try {
 9                 ch.close();
10             } catch (IOException e2) {
11                 if (logger.isWarnEnabled()) {
12                     logger.warn(
13                             "Failed to close a partially initialized socket.", e2);
14                 }
15             }
16 
17             throw new ChannelException("Failed to enter non-blocking mode.", e);
18         }
19     }

    中間經過了AbstractNioMessageChannel,然後調到下面AbstractNioChannel的構造方法。此時parent為null,ch為上面獲取到的nio原生ServerSocketChannel,readInterestOp為SelectionKey的Accept事件(值為16)。可以看到,將原生渠道ch賦值、感興趣的事件readInterestOp賦值、設置非阻塞。然後重點看對父類構造器的調用。

1.3)、AbstractChannel構造器

1 protected AbstractChannel(Channel parent) {
2         this.parent = parent;
3         id = newId();
4         unsafe = newUnsafe();
5         pipeline = newChannelPipeline();
6     }

    可以看到,此構造方法只是給四個屬性進行了賦值,我們挨個看下這四個屬性。

    第一個屬性是this.parent,類型為io.netty.channel.Channel,但此時值為null;

    第二個屬性id類型為io.netty.channel.ChannelId,就是一個id生成器,值為new DefaultChannelId();

    第三個屬性unsafe類型為io.netty.channel.Channel.Unsafe,該屬性很重要,封裝了對事件的處理邏輯,最終調用的是AbstractNioMessageChannel中的newUnsafe方法,賦的值為new NioMessageUnsafe();

    第四個屬性pipeline類型為io.netty.channel.DefaultChannelPipeline,該屬性很重要,封裝了handler處理器的邏輯,賦的值為 new DefaultChannelPipeline(this)  this即當前的NioServerSocketChannel對象。

    其中DefaultChannelPipeline的構造器需要額外看一下,如下,將NioServerSocketChannel對象存入channel屬性,然後初始化了tail、head兩個成員變量,且對應的前後指針指向對方。TailContext和HeadContext都繼承了AbstractChannelHandlerContext,在這個父類裏面維護了next和prev兩個雙向指針,看到這裡有經驗的園友應該一下子就能看出來,DefaultChannelPipeline內部維護了一個雙向鏈表。

 1 protected DefaultChannelPipeline(Channel channel) {
 2         this.channel = ObjectUtil.checkNotNull(channel, "channel");
 3         succeededFuture = new SucceededChannelFuture(channel, null);
 4         voidPromise =  new VoidChannelPromise(channel, true);
 5 
 6         tail = new TailContext(this);
 7         head = new HeadContext(this);
 8 
 9         head.next = tail;
10         tail.prev = head;
11     }

 

     至此,完成了上面initAndRegister方法中的第一個功能:channel的實例化。此時NioServerSocketChannel的幾個父類屬性快照圖如下所示:

 

2、init(channel)方法

    init(channel)方法位於ServerBootstrap中(因為這裡是通過ServerBootstrap過來的,如果是通過Bootstrap進入的這裏則調用的就是Bootstrap中的init方法),主要功能如下註釋所示。本質都是針對channel進行初始化,初始化channel中的option、attr和pipeline。

 1 void init(Channel channel) throws Exception {
 2         // 1、獲取AbstractBootstrap中的options屬性,與channel進行關聯
 3         final Map<ChannelOption<?>, Object> options = options0();
 4         synchronized (options) {
 5             setChannelOptions(channel, options, logger);
 6         }
 7         // 2、獲取AbstractBootstrap中的attr屬性,與channel關聯起來
 8         final Map<AttributeKey<?>, Object> attrs = attrs0();
 9         synchronized (attrs) {
10             for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
11                 @SuppressWarnings("unchecked")
12                 AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
13                 channel.attr(key).set(e.getValue());
14             }
15         }
16         // 3、獲取pipeline,並將一個匿名handler對象添加進去,重要***
17         ChannelPipeline p = channel.pipeline();
18         final EventLoopGroup currentChildGroup = childGroup;
19         final ChannelHandler currentChildHandler = childHandler;
20         final Entry<ChannelOption<?>, Object>[] currentChildOptions;
21         final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
22         synchronized (childOptions) {
23             currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
24         }
25         synchronized (childAttrs) {
26             currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
27         }
28         p.addLast(new ChannelInitializer<Channel>() {
29             @Override
30             public void initChannel(final Channel ch) throws Exception {
31                 final ChannelPipeline pipeline = ch.pipeline();
32                 ChannelHandler handler = config.handler();
33                 if (handler != null) {
34                     pipeline.addLast(handler);
35                 }
36 
37                 ch.eventLoop().execute(new Runnable() {
38                     @Override
39                     public void run() {
40                         pipeline.addLast(new ServerBootstrapAcceptor(
41                                 ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
42                     }
43                 });
44             }
45         });
46     }

    1跟2的功能都比較容易理解,功能3是init的核心,雖然代碼不少但很容易理解,它就是往channel中的pipeline里添加了一個匿名handler對象,其initChannel方法只有在有客戶端連接接入時才會調用,initChannel方法的功能是什麼呢?可以看到,它就是往入參channel中的eventLoop里添加了一個任務,這個任務的功能就是往pipeline中再添加一個handler,最後添加的這個handler就不是匿名的了,它是ServerBootstrapAcceptor對象。因為這裏的initChannel方法和後面的run方法都是有客戶端接入時才會調用的,所以這裏只是提一下,後面會詳述。至此完成init方法,下面進入register。

3、config().group().register(channel)方法

 3.1)、config().group()方法

    由前面可以知道,config().group().register(channel)這行代碼位於AbstractBootstrap類中的initAndRegister方法中,但由於當前對象是ServerBootstrap,故此處config()方法實際調用的都是ServerBootstrap中重寫的方法,得到了ServerBootstrapConfig。

    ServerBootstrapConfig的group方法如下,調用的是它的父類AbstractBootstrapConfig中的方法。通過類名就能知道,ServerBootstrapConfig中的方法是獲取ServerBootstrap中的屬性,而AbstractBootstrapConfig中的方法是獲取AbstractBootstrap中的屬性,兩兩對應。故此處獲取的EventLoopGroup就是AbstractBootstrap中存放的group,即文章開頭demo中的boss對象。

1 public final EventLoopGroup group() {
2         return bootstrap.group();
3     }

    獲取到了名叫boss的這個NioEventLoopGroup對象,下面追蹤NioEventLoopGroup.register(channel)方法

3.2)、 NioEventLoopGroup.register(channel)方法

    該方法是對之前初始化屬性的應用,需結合NioEventLoopGroup的初始化流程看,詳見【Netty源碼學習系列之2-NioEventLoopGroup的初始化】(鏈接【https://www.cnblogs.com/zzq6032010/p/12872989.html】)一文,此處就不贅述了,下面把該類的繼承類圖粘貼出來,以便有個整體認識。

 

3.2.1)、next()方法 

    下面的register方法位於MultithreadEventLoopGroup類中,是NioEventLoopGroup的直接父類,如下:

1 public ChannelFuture register(Channel channel) {
2         return next().register(channel);
3     }

    next方法如下,調用了父類的next方法,下面的就是父類MultithreadEventExecutorGroup中的next實現,可以看到調用的是chooser的next方法。通過初始化流程可知,此處boss的線程數是1,是2的n次方,所以chooser就是PowerOfTwoEventExecutorChooser,通過next方法從EventExecutor[]中選擇一個對象。需要注意的是chooser.next()通過輪詢的方式選擇的對象。

1 public EventLoop next() {
2         return (EventLoop) super.next();
3     }
1 public EventExecutor next() {
2         return chooser.next();
3     }

3.2.2)、NioEventLoop.register方法

    next之後是register方法,中間將NioServerSocketChannel和當前的NioEventLoop封裝成一個DefaultChannelPromise對象往下傳遞,在下面第二個register方法中可以看到,實際上調用的是NioServerSocketChannel中的unsafe屬性的register方法。

1 public ChannelFuture register(Channel channel) {
2         return register(new DefaultChannelPromise(channel, this));
3     }
1 public ChannelFuture register(final ChannelPromise promise) {
2         ObjectUtil.checkNotNull(promise, "promise");
3         promise.channel().unsafe().register(this, promise);
4         return promise;
5     }

3.2.3)、NioMessageUnsafe的register方法

    通過本文第一部分中第1步中的1.3)可以知道,NioServerSocketChannel中的unsafe是NioMessageUnsafe對象,下面繼續追蹤其register方法:

 1 public final void register(EventLoop eventLoop, final ChannelPromise promise) {
 2             if (eventLoop == null) {// 判斷非空
 3                 throw new NullPointerException("eventLoop");
 4             }
 5             if (isRegistered()) {// 判斷是否註冊
 6                 promise.setFailure(new IllegalStateException("registered to an event loop already"));
 7                 return;
 8             }
 9             if (!isCompatible(eventLoop)) {// 判斷eventLoop類型是否匹配
10                 promise.setFailure(
11                         new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
12                 return;
13             }
14        // 完成eventLoop屬性的賦值
15             AbstractChannel.this.eventLoop = eventLoop;
16             // 判斷eventLoop中的Reactor線程是不是當前線程 ***重要1
17             if (eventLoop.inEventLoop()) {
18                 register0(promise); // 進行註冊
19             } else {
20                 try {// 不是當前線程則將register0任務放入eventLoop隊列中讓Reactor線程執行(如果Reactor線程未初始化還要將其初始化) ***重要2
21                     eventLoop.execute(new Runnable() {
22                         @Override
23                         public void run() {
24                             register0(promise);// 註冊邏輯 ***重要3
25                         }
26                     });
27                 } catch (Throwable t) {
28                     // 省略異常處理
29                 }
30             }
31         }

    該方法位於io.netty.channel.AbstractChannel.AbstractUnsafe中(它是NioMessageUnsafe的父類),根據註釋能了解每一步做了什麼,但如果要理解代碼邏輯意圖則需要結合netty的串行無鎖化(串行無鎖化參見博主的netty系列第一篇文章https://www.cnblogs.com/zzq6032010/p/12872993.html)。它實際就是讓每一個NioEventLoop對象的thread屬性記錄一條線程,用來循環執行NioEventLoop的run方法,後續這個channel上的所有事件都由這一條線程來執行,如果當前線程不是Reactor線程,則會將任務放入隊列中,Reactor線程會不斷從隊列中獲取任務執行。這樣以來,所有事件都由一條線程順序處理,線程安全,也就不需要加鎖了。

    說完整體思路,再來結合代碼看看。上述代碼中標識【***重要1】的地方就是通過inEventLoop方法判斷eventLoop中的thread屬性記錄的線程是不是當前線程:

    先調到父類AbstractEventExecutor中,獲取了當前線程:

1 public boolean inEventLoop() {
2         return inEventLoop(Thread.currentThread());
3     }

    然後調到SingleThreadEventExecutor類中的方法,如下,比對thread與當前線程是否是同一個:

1 public boolean inEventLoop(Thread thread) {
2         return thread == this.thread;
3     }

    此時thread未初始化,所以肯定返回false,則進入【***重點2】的邏輯,將register放入run方法中封裝成一個Runnable任務,然後執行execute方法,如下,該方法位於SingleThreadEventExecutor中:

 1 public void execute(Runnable task) {
 2         if (task == null) {
 3             throw new NullPointerException("task");
 4         }
 5 
 6         boolean inEventLoop = inEventLoop();
 7         addTask(task); //將任務放入隊列中 ***重要a
 8         if (!inEventLoop) {
 9             startThread(); //判斷當前線程不是thread線程,則調用該方法 ***重要b
10             if (isShutdown()) {
11                 boolean reject = false;
12                 try {
13                     if (removeTask(task)) {
14                         reject = true;
15                     }
16                 } catch (UnsupportedOperationException e) {
17                     // 省略註釋
18                 }
19                 if (reject) {
20                     reject();
21                 }
22             }
23         }
24 
25         if (!addTaskWakesUp && wakesUpForTask(task)) {
26             wakeup(inEventLoop);
27         }
28     }

    有兩個重要的邏輯,已經在上面代碼中標出,先看看【***重要a】,如下,可見最終就是往SingleThreadEventExecutor的taskQueue隊列中添加了一個任務,如果添加失敗則調reject方法執行拒絕策略,通過前文分析可以知道,此處的拒絕策略就是直接拋錯。

1 protected void addTask(Runnable task) {
2         if (task == null) {
3             throw new NullPointerException("task");
4         }
5         if (!offerTask(task)) {
6             reject(task);
7         }
8     }
1 final boolean offerTask(Runnable task) {
2         if (isShutdown()) {
3             reject();
4         }
5         return taskQueue.offer(task);
6     }

    然後在看【***重要b】,如下,該方法雖然叫startThread,但內部有控制,不能無腦開啟線程,因為調這個方法的時候會有兩種情況:1).thread變量為空;2).thread不為空且不是當前線程。第一種情況需要開啟新的線程,但第二種情況就不能直接創建線程了。所以看下面代碼可以發現,它內部通過CAS+volatile(state屬性加了volatile修飾)實現的開啟線程的原子控制,保證多線程情況下也只會有一個線程進入doStartThread()方法。

 1 private void startThread() {
 2         if (state == ST_NOT_STARTED) {
 3             if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
 4                 boolean success = false;
 5                 try {
 6                     doStartThread();
 7                     success = true;
 8                 } finally {
 9                     if (!success) {
10                         STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
11                     }
12                 }
13             }
14         }
15     }

    繼續往下看一下doStartThread()的方法邏輯:

 1 private void doStartThread() {
 2         assert thread == null;
 3         executor.execute(new Runnable() { //此處的executor內部執行的就是ThreadPerTaskExecutor的execute邏輯,創建一個新線程運行下面的run方法
 4             @Override
 5             public void run() {
 6                 thread = Thread.currentThread(); //將Reactor線程記錄到thread變量中,保證一個NioEventLoop只有一個主線程在運行
 7                 if (interrupted) {
 8                     thread.interrupt();
 9                 }
10 
11                 boolean success = false;
12                 updateLastExecutionTime();
13                 try {
14                     SingleThreadEventExecutor.this.run(); //調用當前對象的run方法,該run方法就是Reactor線程的核心邏輯方法,後面會重點研究
15                     success = true;
16                 } catch (Throwable t) {
17                     logger.warn("Unexpected exception from an event executor: ", t);
18                 } finally {
19                    // 省略無關邏輯
20                 }
21             }
22         });
23     }

    可以看到,在上面的方法中完成了Reactor線程thread的賦值和核心邏輯NioEventLoop中run方法的啟動。這個run方法啟動后,第一步做的事情是什麼?讓我們往前回溯,回到3.2.3),當然是執行當初封裝了 register0方法的那個run方法的任務,即執行register0方法,下面填之前埋得坑,對【***重要3】進行追蹤:

 1 private void register0(ChannelPromise promise) {
 2             try {
 3                 // 省略判斷邏輯
 4                 boolean firstRegistration = neverRegistered;
 5                 doRegister();// 執行註冊邏輯
 6                 neverRegistered = false;
 7                 registered = true;
 8                 pipeline.invokeHandlerAddedIfNeeded();// 調用pipeline的邏輯
 9 
10                 safeSetSuccess(promise);
11                 pipeline.fireChannelRegistered();
12                 // 省略無關邏輯
13             } catch (Throwable t) {
14                 // 省略異常處理
15             }
16         }

    doRegister()方法的實現在AbstractNioChannel中,如下,就是完成了nio中的註冊,將nio的ServerSocketChannel註冊到selector上:

 1 protected void doRegister() throws Exception {
 2         boolean selected = false;
 3         for (;;) {
 4             try {
 5                 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
 6                 return;
 7             } catch (CancelledKeyException e) {
 8                // 省略異常處理
 9             }
10         }
11     }

    再看pipeline.invokeHandlerAddedIfNeeded()方法,該方法調用鏈路比較長,此處就不詳細粘貼了,只是說一下流程。回顧下上面第二部分的第2步,在裏面最後addLast了一個匿名的內部對象,重寫了initChannel方法,此處通過pipeline.invokeHandlerAddedIfNeeded()方法就會調用到這個匿名對象的initChannel方法(只有第一次註冊時才會調),該方法往pipeline中又添加了一個ServerBootstrapAcceptor對象。執行完方法后,netty會在finally中將之前那個匿名內部對象給remove掉,這時pipeline中的handler如下所示:

 

     至此,算是基本完成了initAndRegister方法的邏輯,當然限於篇幅(本篇已經夠長了),其中還有很多細節性的處理未提及。

 

三、AbstractBootstrap的doBind0方法

     doBind0方法邏輯如下所示,new了一個Runnable任務交給Reactor線程執行,execute執行過程已經分析過了,此處不再贅述,集中下所剩無幾的精力看下run方法中的bind邏輯。

 1 private static void doBind0(
 2             final ChannelFuture regFuture, final Channel channel,
 3             final SocketAddress localAddress, final ChannelPromise promise) {
 4 
 5         channel.eventLoop().execute(new Runnable() {
 6             @Override
 7             public void run() {
 8                 if (regFuture.isSuccess()) {
 9                     channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
10                 } else {
11                     promise.setFailure(regFuture.cause());
12                 }
13             }
14         });
15     }

    channel.bind方法,如下:

1 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
2         return pipeline.bind(localAddress, promise);
3     }

    調用了pipeline的bind方法:

1 public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
2         return tail.bind(localAddress, promise);
3     }

    tail.bind方法:

 1 public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
 2         if (localAddress == null) {
 3             throw new NullPointerException("localAddress");
 4         }
 5         if (isNotValidPromise(promise, false)) {
 6             // cancelled
 7             return promise;
 8         }
 9         // 從tail開始往前,找到第一個outbond的handler,這時只有head滿足要求,故此處next是head
10         final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
11         EventExecutor executor = next.executor();
12         if (executor.inEventLoop()) {// 因為當前線程就是executor中的Reactor線程,所以直接進入invokeBind方法
13             next.invokeBind(localAddress, promise);
14         } else {
15             safeExecute(executor, new Runnable() {
16                 @Override
17                 public void run() {
18                     next.invokeBind(localAddress, promise);
19                 }
20             }, promise, null);
21         }
22         return promise;
23     }

    下面進入head.invokeBind方法:

 1 private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
 2         if (invokeHandler()) {
 3             try {
 4                 ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
 5             } catch (Throwable t) {
 6                 notifyOutboundHandlerException(t, promise);
 7             }
 8         } else {
 9             bind(localAddress, promise);
10         }
11     }

    核心邏輯就是handler.bind方法,繼續追蹤:

1 public void bind(
2                 ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
3             unsafe.bind(localAddress, promise);
4         }

    此處的unsafe是NioMessageUnsafe,繼續追蹤會看到在bind方法中又調用了NioServerSocketChannel中的doBind方法,最終在這裏完成了nio原生ServerSocketChannel和address的綁定:

1 protected void doBind(SocketAddress localAddress) throws Exception {
2         if (PlatformDependent.javaVersion() >= 7) {
3             javaChannel().bind(localAddress, config.getBacklog());
4         } else {
5             javaChannel().socket().bind(localAddress, config.getBacklog());
6         }
7     }

    至此,ServerBootstrap的bind方法完成。

 

小結

    本文從頭到尾追溯了ServerBootstrap中bind方法的邏輯,將前面netty系列中的二、三兩篇初始化給串聯了起來,是承上啟下的一個位置。後面的netty系列將圍繞本文中啟動的NioEventLoop.run方法展開,可以這麼說,本文跟前面的三篇只是為run方法的出現做的一個鋪墊,run方法才是核心功能的邏輯所在。

    本文斷斷續續更新了一周,今天才完成,也沒想到會這麼長,就這樣吧,後面繼續netty run方法的學習。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※回頭車貨運收費標準

※產品缺大量曝光嗎?你需要的是一流包裝設計!

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※推薦評價好的iphone維修中心

※教你寫出一流的銷售文案?

台中搬家公司教你幾個打包小技巧,輕鬆整理裝箱!

台中搬家遵守搬運三大原則,讓您的家具不再被破壞!

新發現 澳科學家在2009年南極冰核中發現微塑膠 附近磷蝦恐吃下肚

4{icon} {views}

環境資訊中心綜合外電;姜唯 編譯;林大利 審校

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

網頁設計最專業,超強功能平台可客製化

※回頭車貨運收費標準

「福島套餐」:奧運選手的被曝風險 民團仍訴求停辦東奧(下)

3{icon} {views}

文:宋瑞文(媽媽監督核電廠聯盟特約撰述)

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

南投搬家公司費用需注意的眉眉角角,別等搬了再說!

※教你寫出一流的銷售文案?

※回頭車貨運收費標準

※別再煩惱如何寫文案,掌握八大原則!

「疫」外減碳!全球碳排放今年估降6% 二戰以來最大降幅

3{icon} {views}

摘錄自2020年4月22日自由時報報導

世界氣象組織(World Meteorological Organization, WMO)負責人今(22)日表示,預計今年武漢肺炎(COVID-19)疫情將讓二氧化碳排放量減少6%,是自第二次世界大戰以來最大降幅。

然而,聯合國機構表示,下降幅度仍不足以阻止氣候變化,並敦促各國政府將氣候行動納入復甦計畫。WMO警告,過去經濟復甦帶來的排放量增長甚至比危機爆發前更高。WMO週三(22日)還發布一份全球氣候報告,內容指出2015-2019年是有記錄以來最溫暖的5年。

生活環境
全球變遷
溫室氣體
氣候變遷
國際新聞
二氧化碳排放
疫情看氣候與能源
武漢肺炎

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※教你寫出一流的銷售文案?

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※回頭車貨運收費標準

※別再煩惱如何寫文案,掌握八大原則!

※超省錢租車方案

※產品缺大量曝光嗎?你需要的是一流包裝設計!

※推薦台中搬家公司優質服務,可到府估價

荷蘭養殖場水貂感染病毒 當局稱傳人可能性極小

2{icon} {views}

摘錄自2020年04月26日自由時報報導

荷蘭農業部今(26日)宣布,境內有水貂感染武漢肺炎病毒,目前發現病例的兩座水貂養殖場已被隔離,此為該國首次傳出動物感染。

綜合外媒報導,荷蘭衛生部表示,由於養殖場內有水貂出現呼吸困難的跡象,檢測後確定染上病毒,據信是經由身上帶有病毒的員工傳染,不過,病毒在養殖場進一步傳播給其他人或動物的可能性極小。

衛生部表示,目前相關人員正在進行研究,呼籲人們不要經過養殖場的400公尺範圍內。

生活環境
國際新聞
荷蘭
武漢肺炎
養殖場

動物與大環境變遷

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計最專業,超強功能平台可客製化

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※回頭車貨運收費標準

※推薦評價好的iphone維修中心

※教你寫出一流的銷售文案?

台中搬家公司教你幾個打包小技巧,輕鬆整理裝箱!

台中搬家公司費用怎麼算?