調優 | Apache Hudi應用調優指南

通過Spark作業將數據寫入Hudi時,Spark應用的調優技巧也適用於此。如果要提高性能或可靠性,請牢記以下幾點。

輸入并行性:Hudi對輸入進行分區默認併發度為1500,以確保每個Spark分區都在2GB的限制內(在Spark2.4.0版本之後去除了該限制),如果有更大的輸入,則相應地進行調整。我們建議設置shuffle的併發度,配置項為hoodie.[insert|upsert|bulkinsert].shuffle.parallelism,以使其至少達到input_data_size/500MB。

Off-heap(堆外)內存:Hudi寫入parquet文件,需要使用一定的堆外內存,如果遇到此類故障,請考慮設置類似spark.yarn.executor.memoryOverheadspark.yarn.driver.memoryOverhead的值。

Spark 內存:通常Hudi需要能夠將單個文件讀入內存以執行合併或壓縮操作,因此執行程序的內存應足以容納此文件。另外,Hudi會緩存輸入數據以便能夠智能地放置數據,因此預留一些spark.memory.storageFraction通常有助於提高性能。

調整文件大小:設置limitFileSize以平衡接收/寫入延遲與文件數量,並平衡與文件數據相關的元數據開銷。

時間序列/日誌數據:對於單條記錄較大的數據庫/ nosql變更日誌,可調整默認配置。另一類非常流行的數據是時間序列/事件/日誌數據,它往往更加龐大,每個分區的記錄更多。在這種情況下,請考慮通過.bloomFilterFPP()/bloomFilterNumEntries()來調整Bloom過濾器的精度,以加速目標索引查找時間,另外可考慮一個以事件時間為前綴的鍵,這將使用範圍修剪並顯着加快索引查找的速度。

GC調優:請確保遵循Spark調優指南中的垃圾收集調優技巧,以避免OutOfMemory錯誤。[必須]使用G1 / CMS收集器,其中添加到spark.executor.extraJavaOptions的示例如下:

-XX:NewSize=1g -XX:SurvivorRatio=2 -XX:+UseCompressedOops -XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:CMSInitiatingOccupancyFraction=70 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintTenuringDistribution -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof

OutOfMemory錯誤:如果出現OOM錯誤,則可嘗試通過如下配置處理:spark.memory.fraction = 0.2,spark.memory.storageFraction = 0.2允許其溢出而不是OOM(速度變慢與間歇性崩潰相比)。

以下是完整的生產配置

spark.driver.extraClassPath /etc/hive/conf
spark.driver.extraJavaOptions -XX:+PrintTenuringDistribution -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintGCApplicationConcurrentTime -XX:+PrintGCTimeStamps -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof
spark.driver.maxResultSize 2g
spark.driver.memory 4g
spark.executor.cores 1
spark.executor.extraJavaOptions -XX:+PrintFlagsFinal -XX:+PrintReferenceGC -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy -XX:+UnlockDiagnosticVMOptions -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/hoodie-heapdump.hprof
spark.executor.id driver
spark.executor.instances 300
spark.executor.memory 6g
spark.rdd.compress true
 
spark.kryoserializer.buffer.max 512m
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.shuffle.service.enabled true
spark.sql.hive.convertMetastoreParquet false
spark.submit.deployMode cluster
spark.task.cpus 1
spark.task.maxFailures 4
 
spark.yarn.driver.memoryOverhead 1024
spark.yarn.executor.memoryOverhead 3072
spark.yarn.max.executor.failures 100

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

※教你寫出一流的銷售文案?

※超省錢租車方案

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※產品缺大量曝光嗎?你需要的是一流包裝設計!

※回頭車貨運收費標準

CentOS8.1中使用通用二進制包安裝MySQL8.0

  寫在前的的話: 在IT技術日新月異的今天,老司機也可能在看似熟悉的道路上翻車,甚至是大型翻車現場!自己一個人開車過去翻個車不可怕,可怕的是帶着整個團隊甚至是整個公司一起翻車山崖下,解決辦法就是:新出現的道路自己先過一遍,留好坑位標記,將來帶隊過去時不再翻車!!!

 

  最近剛好在進行權限系統的微服務化改造,要重新搭一套開發服務器環境。今天搭的是MySQL數據庫服務器,MySQL 8.0出來也有些年月了,現在(2020)大多數公司還沒在生產上用上,於是乎就想嘗個鮮,選擇了在CentOS8.1上進行MySQL8.0服務器的搭建。當前來說CentOS8.1也算比較新了!

 

  老樣子,先裝一個全新的CentOS8.1虛擬機,選擇裝配基本的Server軟件包,網絡模式選擇【橋接模式(自動)】(主要為了讓宿主機和虛擬機的網絡處於等級),IP地址相關信息切記選擇手工配置,不能用DHCP進行動態分配(有DNS服務器輔助除外),為什麼呢?因為你是在配服務器,IP地址要固定下來,不然每次啟動后的IP都不同,那就很尷尬了!

 

  服務器操作系統準備好后該去下載MySQL8.0了,去哪裡下呢?當然是MySQL8.0社區版官網!可是跑到網官一看,估計有些同學就一臉懵逼,純英文的不說,安裝包還各樣種樣的!怎麼選呢?

  首先要選定操作系統平台,我這邊是CentOS8.1 x86_64位架構,那我要選Linux版本的,最好是CentOS專用的!結果找了一圈暫時還沒有CentOS專用的版本(RedHat專用版本到是有,其實我們知道CentOS是就源自RedHat)…為了保險起見,我們選Linux通用版(Linux-Generic),結果出來的列表也不少!這麼多都是些啥,見圖標解吧:

 

  我選擇【64位通用二進制最小安裝壓縮包(不含調試組件及調式符號)】,理由:

  1. 我的目標操作系統是64位的CentOS;
  2. 我不需要做MySQL的調試;
  3. CentOS8.1上帶有專用的解壓安裝工具tar;
  4. 小包從官網下載和上傳到服務器都快(MySQL數據庫功能也完整,別看小了那麼多!);

  點擊Download后出現的界面中會建議你登錄你的Oracle Web賬號,不用登錄(當然你有賬號也可以登錄),直接點下面的一小行字——“No thanks,just start my download.

 

  將目標包(mysql-8.0.20-linux-x86_64-minimal.tar.xz)下載好后,使用SecureCRT上傳到服務器 /opt 目錄下!(Linux眾多目錄的作用)

  接下來開始正式安裝MySQL8.0,記住我們的前提是:全新的、乾淨的CentOS8.1操作系統,之前沒有裝過MySQL的!如果之前裝過MySQL是要先把相關目錄和配置文件刪除乾淨才能再裝的!謹記!謹記!謹記!

  第一步:使用yum包管理器檢查並安裝異步IO依賴包 libaio ,如果沒裝這個包,數據目錄初始化和後續服務器的啟動都將失敗,如果檢查發現沒裝,要將它裝上:

# yum search libaio 
# yum -y install libaio 

  第二步:由於CentOS是源自RedHat的,而RedHat系列的操作系統中沒有MySQL通用二進制安裝包(不管有沒有壓縮)中的MySQL客戶端(bin/mysql)組件所需要的 /lib64/libtinfo.so.5 文件,為了解決這個問題,需要安裝一個含有該文件的 ncurses-compat-libs 包(截圖是已經裝過一次的結果):

# yum -y install ncurses-compat-libs

  第三步:轉到 mysql-8.0.20-linux-x86_64-minimal.tar.xz 壓縮包存放目錄 /opt ,使用 tar 命令進行解壓(直接解壓到當前目錄):

# cd /opt
# tar -xvf mysql-8.0.20-linux-x86_64-minimal.tar.xz

  解壓得到一個新目錄:/opt/mysql-8.0.20-linux-x86_64-minimal ,該目錄即為MySQL的真實安裝目錄

  第四步:創建用於運行MySQL的組和普通用戶(非操作系統用戶):

# groupadd mysql
# useradd -r -g mysql -s /bin/false mysql

  第五步:在用戶手動安裝軟件推薦安裝目錄 /usr/local 中創建MySQL的真實安裝目錄的軟鏈接目錄(軟鏈接目錄不能是已經存在的目錄,相當於Windows快捷方式):

# cd /usr/local
# ln -s /opt/mysql-8.0.20-linux-x86_64-minimal mysql

  第六步:在mysql軟鏈接目錄中創建導入導出操作安全目錄(該目錄用於使具有FILE權限的用戶可以安全地執行導入導出操作):

# cd /usr/local/mysql
# mkdir mysql-files
# chown mysql:mysql mysql-files
# chmod 750 mysql-files

  (其實,第六步做完后就已經有辦法可以臨時點亮MySQL服務器了,這裏不講,因為我們要配的是一個長期運行的MySQL數據庫服務器)

  第七步:在mysql軟鏈接目錄下創建數據目錄:

# cd /usr/local/mysql
# mkdir data
# chown mysql:mysql data
# chmod 750 data

  第八步:創建MySQL服務啟動需要用到的靜態配置文件(如果目錄下已經有了同名文件,則需要換一個名字,之前沒裝過MySQL一般是不會有的!):

# cd /etc
# touch my.cnf
# chown root:root my.cnf
# chmod 644 my.cnf

  第九步:使用vi或vim打開第八步創建的配置文件 /etc/my.cnf ,加入MySQL服務的配置信息

[mysqld]
datadir=/usr/local/mysql/data
socket=/tmp/mysql.sock
port=3306
log-error=/usr/local/mysql/data/mysqldb.xgclassroom.err
user=mysql
secure_file_priv=/usr/local/mysql/mysql-files
local_infile=OFF

  注意:配置中log-error的值一般中根據你安裝操作系統時設置的Host名稱相關,大家的不一樣,當然你也可以直接指定新名字!

  另外,如果要InnoDB的相關配置項,那麼只能在數據目錄初始化(第十步)之前在my.cnf中進行配置,主要有 innodb_data_home_dir,innodb_data_file_path,,innodb_log_file_size,innodb_log_group_home_dir 和 innodb_page_size 這幾項,未配置的情況下,它們都使用默認值。

  第十步:初始化第七步創建的數據目錄(因為要用到第八和第九步創建的配置文件)

# cd /usr/local/mysql
# bin/mysqld --defaults-file=/etc/my.cnf --initialize

  注意:數據目錄初始化成功后,會在第九步所設置的log-error日誌文件(我的是 /usr/local/mysql/data/mysqldb.xgclassroom.err)中生成 root@localhost 的初始密碼(賬號冒號後面的QdbB=9e!lT6=就是,要記住這個初始密碼,後面登錄root賬號是要它來修改初始密碼),類似下面的信息(可以使用cat命令查看):

...省略...
A temporary password is generated for root@localhost: yi5w%J*hws6E
...省略...

  現在我們還缺了最最最重要的一項配置——讓MySQL服務隨操作系統的啟動自動啟動!繼續配置ing!

  在Linux系統中目前系統服務主要以 systemd 服務單元的形式存在(類似windows平台的services.msc下管理的各個服務),Linux系統下一切皆文件,systemd 服務單元也是由一個個systemd 服務單元配置文件組成,systemd 服務單元配置文件 = systemd 服務單元!配置文件名就是服務單元名!所有服務單元的配置文件統一放在 /usr/lib/systemd/system 目錄下。

  第十一步:在系統服務單元配置文件存放目下創建MySQL的服務單元配置文件:

# cd /usr/lib/systemd/system
# touch mysqld.service
# chmod 644 mysqld.service

  第十二步:使用vi或vim打開第十一步創建的MySQL服務單元配置文件 /usr/lib/systemd/system/mysqld.service,並加入MySQL服務單元配置信息:

[Unit]
Description=MySQL Server
Documentation=man:mysqld(8)
Documentation=http://dev.mysql.com/doc/refman/en/using-systemd.html
After=network.target
After=syslog.target

[Install]
WantedBy=multi-user.target

[Service]
User=mysql
Group=mysql

# Have mysqld write its state to the systemd notify socket
Type=notify

# Disable service start and stop timeout logic of systemd for mysqld service.
TimeoutSec=0

# Start main service
ExecStart=/usr/local/mysql/bin/mysqld --defaults-file=/etc/my.cnf $MYSQLD_OPTS 

# Use this to switch malloc implementation
EnvironmentFile=-/etc/sysconfig/mysql

# Sets open_files_limit
LimitNOFILE = 10000

Restart=on-failure

RestartPreventExitStatus=1

# Set environment variable MYSQLD_PARENT_PID. This is required for restart.
Environment=MYSQLD_PARENT_PID=1

PrivateTmp=false

  第十三步:啟用MySQL服務單元配置

# systemctl enable mysqld.service

  經過上面的一系列安裝和配置步驟后,此時我們已經可以通過systemctl工具手工管理MySQL服務了:

# systemctl {start|stop|restart|status} mysqld

  第十四步:啟動MySQL服務,並查看MySQL服務狀態:

# systemctl start mysqld
# systemctl status mysqld

  第十五步:重啟服務器,檢驗MySQL服務是否隨服務器一起啟動了:

# reboot

  …重啟系統中…

# systemctl status mysqld

  如果最終MySQL服務狀態正常,那麼CentOS8.1上MySQL8.0的安裝就算是完成了,但是不要高興的太早了!還有好多事要做:

  1、將服務器上的MySQL客戶端(bin/mysql)配置到系統環境變量PATH中:

  如果不將MySQL客戶端(bin/mysql)配到環境變量中,你會發現即使MySQL服務在正常運行,但直接在系統終端輸入mysql是找不到該命令的:

[root@mysqldb /]# mysql
-bash: mysql: command not found

  當然,你用完整MySQL客戶端(bin/mysql)命令路徑是可以運行該命令的(雖然報錯,但那表示命令可以使用了):

[root@mysqldb /]# /usr/local/mysql/bin/mysql
ERROR 1045 (28000): Access denied for user 'root'@'localhost' (using password: NO)

  系統環境變量PATH在環境變量配置文件 /etc/profile 中配置:

# vim /etc/profile

在文件的最後新起一行,插入:

export PATH=$PATH:/usr/local/mysql/bin

保存退出后,使用下面的命令手動使修改生效:

# source /etc/profile

  現在你可以直接在任意目錄下執行mysql命令了:

[root@mysqldb /]# mysql
ERROR 1045 (28000): Access denied for user 'root'@'localhost' (using password: NO)

  

  2、root賬號初始密碼修改:

  使用root初始密碼(記錄在第九步所設置的log-error日誌文件)登錄MySQL,並修改密碼:

[root@mysqldb /]# mysql -u root -p
Enter password: 
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 10
Server version: 8.0.20

Copyright (c) 2000, 2020, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql>alter user 'root'@'localhost' identified by '1qaz@WSX';

 Query OK, 0 rows affected (0.00 sec)

 

  3、進行root賬號的服務器本機登錄測試:

  修改完root的默認密碼后,退出MySQL並使用新密碼重新嘗試登錄:

mysql> exit
Bye
[root@mysqldb /]# mysql -u root -p
Enter password: 
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 11
Server version: 8.0.20 MySQL Community Server - GPL

Copyright (c) 2000, 2020, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> 

 

  4、對MySQL使用的端口號進行防火牆例外設置:

  先用root賬號登錄MySQL,檢查一下當前正在使用的端口號:

mysql> show global variables like 'port';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| port          | 3306  |
+---------------+-------+
1 row in set (0.01 sec)

  將目標端口號(例中為3306)添加到防火牆例外列表,並重新載入防火牆

# firewall-cmd --zone=public --add-port=3306/tcp --permanent
success
# firewall-cmd --reload
success

  注意1:一定要帶上–permanent 參數才能永遠生效,否則系統重啟后丟失,另外 –zone 、–add-port 和 –permanent參數前面是兩個-;

  注意2:一定要重新載入防火牆,讓設置生效;

 

  5、創建遠程登錄和使用MySQL的普通用戶(因為安全起見,root賬號一般不要設置成可以遠程登錄,要設置也盡量設置成只可在固定的某個IP遠程登錄):

  以root賬號登錄MySQL,使用以下SQL命令創建一個可以任意網絡互通的點登錄的賬號xurm 密碼 1qaz@WSX

create user xurm IDENTIFIED with mysql_native_password by '1qaz@WSX' account unlock;
select host,user from user;
grant all on *.* to xurm WITH GRANT OPTION;
FLUSH PRIVILEGES;

 

  認認真真上邊的步驟和細節進行安裝和配置后,現在可以從開發機的客戶端,用普通賬號登錄遠程MySQL服務器就可以愉快的玩耍了:

 

  以下記錄一次手賤導致的翻車現場:

  使用 # systemctl stop mysqld.service 成功關閉mysql服務后,嘗試直接使用 /usr/local/mysql/bin/mysqld 嘗試啟動mysql造成的後果:bin/mysqld 嘗試啟動時覆蓋並破壞了第十步數據目錄初始生成的有關文件,導至MySQL無法啟動,這時嘗試使用 # systemctl start mysqld.service也無法再啟動MySQL服務~~~

....
2020-06-06T04:38:39.614356Z 1 [ERROR] [MY-012574] [InnoDB] Unable to lock ./ibdata1 error: 11
2020-06-06T04:38:40.618323Z 1 [ERROR] [MY-012574] [InnoDB] Unable to lock ./ibdata1 error: 11
2020-06-06T04:38:41.619650Z 1 [ERROR] [MY-012574] [InnoDB] Unable to lock ./ibdata1 error: 11
2020-06-06T04:38:41.620710Z 1 [ERROR] [MY-012592] [InnoDB] Operating system error number 11 in a file operation.
2020-06-06T04:38:41.621045Z 1 [ERROR] [MY-012596] [InnoDB] Error number 11 means 'Resource temporarily unavailable'
2020-06-06T04:38:41.621712Z 1 [ERROR] [MY-012215] [InnoDB] Cannot open datafile './ibdata1'
2020-06-06T04:38:41.622047Z 1 [ERROR] [MY-012959] [InnoDB] Could not open or create the system tablespace. 
If you tried to add new data files to the system tablespace, and it failed here, you should now edit innodb_data_file_path in my.cnf back to
what it was, and remove the new ibdata files InnoDB created in this failed attempt. InnoDB only wrote those files full of zeros, but did not
yet use them in any way. But be careful: do not remove old data files which contain your precious data!
2020-06-06T04:38:41.622372Z 1 [ERROR] [MY-012930] [InnoDB] Plugin initialization aborted with error Cannot open a file. 2020-06-06T04:38:42.119489Z 1 [ERROR] [MY-010334] [Server] Failed to initialize DD Storage Engine 2020-06-06T04:38:42.122437Z 0 [ERROR] [MY-010020] [Server] Data Dictionary initialization failed. 2020-06-06T04:38:42.127147Z 0 [ERROR] [MY-010119] [Server] Aborting 2020-06-06T04:38:42.127857Z 0 [System] [MY-010910] [Server] /usr/local/mysql/bin/mysqld: Shutdown complete (mysqld 8.0.20) MySQL Community Server - GPL.

  提示信息中讓我在/etc/my.cnf配置文件中將 innodb_data_file_path 配置項的值改回原來的去,實際上我並沒有在/etc/my.cnf配置文件配置該項,而是一直使用着默認的 innodb_data_file_path 配置;

  提示信息讓我刪除啟動失敗的嘗試中InnoDB生成的新ibdata相關文件,講真的,因為一開始沒有對比過data/目錄中的文件,現在我都不知道哪些是InnoDB生成的新ibdata相關文件;

  最後想了一下,我是剛剛安裝的數據庫系統,也還沒有重要數據在上邊,現在data/目錄遭到破壞,那最快的辦法就是清空data/目錄,並重新初始化它,應該就能解決問題了(已經運行了段時間的生產數據庫千萬不要這麼玩,要備份好所有數據文件,否則會死得很難看!!!最好另尋它法,最最最好就不要讓這樣的車禍出現!):

  • 清空data/目錄:
#  rm -rf /usr/local/mysql/data/*

 

  • 使用第十步中的命令重新初始化數據目錄:
# cd /usr/local/mysql
# bin/mysqld --defaults-file=/etc/my.cnf --initialize

注意:數據目錄重新初始化成功后,會在第九步所設置的log-error日誌文件(我的是 /usr/local/mysql/data/mysqldb.xgclassroom.err)中生成新的 root@localhost 的初始密碼

 

  • 重啟嘗試啟動MySQL服務:
# systemctl start mysqld
Job for mysqld.service failed because the control process exited with error code.
See "systemctl status mysqld.service" and "journalctl -xe" for details.

  哦呵…又雙叒叕翻車了,重啟失敗!!! 慌得一逼!繼續…

 

  • 查看一下日誌文件/usr/local/mysql/data/mysqldb.xgclassroom.err
# cat mysqldb.xgclassroom.err 
2020-06-06T05:31:31.249888Z 0 [System] [MY-013169] [Server] /opt/mysql-8.0.20-linux-x86_64-minimal/bin/mysqld (mysqld 8.0.20) initializing of server in progress as process 2662
2020-06-06T05:31:31.256260Z 1 [System] [MY-013576] [InnoDB] InnoDB initialization has started.
2020-06-06T05:31:31.509465Z 1 [System] [MY-013577] [InnoDB] InnoDB initialization has ended.
2020-06-06T05:31:31.995068Z 6 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: iqaaJCdnj3&e
2020-06-06T05:33:09.978908Z 0 [System] [MY-010116] [Server] /usr/local/mysql/bin/mysqld (mysqld 8.0.20) starting as process 2714
2020-06-06T05:33:09.987836Z 1 [System] [MY-013576] [InnoDB] InnoDB initialization has started.
2020-06-06T05:33:10.212686Z 1 [System] [MY-013577] [InnoDB] InnoDB initialization has ended.
2020-06-06T05:33:11.314035Z 0 [ERROR] [MY-011292] [Server] Plugin mysqlx reported: 'Preparation of I/O interfaces failed, X Protocol won't be accessible'
2020-06-06T05:33:11.314175Z 0 [ERROR] [MY-011300] [Server] Plugin mysqlx reported: 'Setup of bind-address: '*' port: 33060 failed, `bind()` 
failed with error: Address already in use (98). Do you already have another mysqld server running with Mysqlx ?' 2020-06-06T05:33:11.314342Z 0 [ERROR] [MY-011300] [Server] Plugin mysqlx reported: 'Setup of socket: '/tmp/mysqlx.sock' failed,
another process with PID 1734 is using UNIX socket file' 2020-06-06T05:33:11.413373Z 0 [Warning] [MY-010068] [Server] CA certificate ca.pem is self signed. 2020-06-06T05:33:11.414505Z 0 [ERROR] [MY-010262] [Server] Can't start server: Bind on TCP/IP port: Address already in use 2020-06-06T05:33:11.414625Z 0 [ERROR] [MY-010257] [Server] Do you already have another mysqld server running on port: 3306 ? 2020-06-06T05:33:11.414918Z 0 [ERROR] [MY-010119] [Server] Aborting 2020-06-06T05:33:12.939872Z 0 [System] [MY-010910] [Server] /usr/local/mysql/bin/mysqld: Shutdown complete (mysqld 8.0.20) MySQL Community Server - GPL.

  有個PID為1734的進程在佔用3306和33060端口?而且還可能是另一個mysqld server,我造,不會是真的吧!使用# systemctl status mysqld 都查過了,沒有服務了~

 

  • 查看一下3306端口的佔用情況先:
# netstat -anp | grep 3306
tcp6       0      0 :::33060                :::*                    LISTEN      1734/mysqld         
tcp6       0      0 :::3306                 :::*                    LISTEN      1734/mysqld 

  竟然真的有個mysqld進程(PID=1734)以tpc6的協議模式佔著3306和33060端口~

 

  • 查看一下這個mysqld進程的詳情,看看它是是哪裡來的:
# ps -aux | grep mysqld
mysql      1734  0.5 19.0 1773244 352616 ?      Sl   12:12   0:29 /usr/local/mysql/bin/mysqld
root       2786  0.0  0.0  12108  1080 pts/1    S+   13:45   0:00 grep --color=auto mysqld

  果然是手賤作的孽,直接通過/usr/local/mysql/bin/mysqld手動啟動的服務沒關徹底…

 

  • 幹掉佔用3306和33060端口的無效mysqld進程:
# kill -9 1734

 

  • 再次使用# systemctl start mysqld 命令啟動mysql:
# systemctl start mysqld
# systemctl status mysqld
● mysqld.service - MySQL Server
   Loaded: loaded (/usr/lib/systemd/system/mysqld.service; enabled; vendor preset: disabled)
   Active: active (running) since Sat 2020-06-06 13:47:15 CST; 20s ago
     Docs: man:mysqld(8)
           http://dev.mysql.com/doc/refman/en/using-systemd.html
 Main PID: 2795 (mysqld)
   Status: "Server is operational"
    Tasks: 39 (limit: 11337)
   Memory: 327.7M
   CGroup: /system.slice/mysqld.service
           └─2795 /usr/local/mysql/bin/mysqld --defaults-file=/etc/my.cnf

Jun 06 13:47:14 mysqldb.xgclassroom systemd[1]: Starting MySQL Server...
Jun 06 13:47:15 mysqldb.xgclassroom systemd[1]: Started MySQL Server.

 

  終於把車開回正路來了!!!使用日誌文件/usr/local/mysql/data/mysqldb.xgclassroom.err中記錄的新的初始密碼登錄mysql並修改初始化密碼就可以了!

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※別再煩惱如何寫文案,掌握八大原則!

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※超省錢租車方案

※教你寫出一流的銷售文案?

網頁設計最專業,超強功能平台可客製化

※產品缺大量曝光嗎?你需要的是一流包裝設計!

台中搬家遵守搬運三大原則,讓您的家具不再被破壞!

HashMap解析(主要JDK1.8,附帶1.7出現的問題以及區別)

按問題的形式來吧,這些大多是我自己總結的,如有錯誤請及時指正謝謝

1.你了解HashMap么,可以說說么?

  首先,HashMap是一種數據結構,可以快速的幫我們存取數據。它的底層數據結構在1.7和1.8有了一些變化,1.7版本及以前他是數組+鏈表的形式,1.8及以後數組+鏈表+紅黑樹,如果鏈表長度大於等於8就會轉化為紅黑樹,如果長度降至6紅黑樹會轉化為鏈表紅黑樹的出現解決了因為鏈表過長導致查詢速度變慢的問題,因為鏈表的查詢時間複雜度是O(n),而紅黑樹的查詢時間複雜度是O(logn)。

2.它的數組+鏈表是怎麼實現的?

  

 

 

 這個代碼是1.8的(1.7是Entry,就是名字不一樣),其實我們每一個放進去的(key,value)到最後都會封裝成這樣的Node對象。Hashmap的數組就是以一系列這樣的Node對象構成的數組,鏈表就是把next指向下一個Node對象。

 

 

3.為什麼要有鏈表,紅黑樹?只有數組不可以么?

首先我們要知道什麼是Hash算法。

這裏放出一段官方的話:

 

Hash,一般翻譯做散列、雜湊,或音譯為哈希,是把任意長度的輸入(又叫做預映射pre-image)通過散列算法變換成固定長度的輸出,該輸出就是散列值。這種轉換是一種壓縮映射,也就是,散列值的空間通常遠小於輸入的空間,不同的輸入可能會散列成相同的輸出,所以不可能從散列值來確定唯一的輸入值。簡單的說就是一種將任意長度的消息壓縮到某一固定長度的消息摘要的函數。

 

簡單點來說:就是把一個大数字經過運算變為固定範圍的輸出,最簡單的算法就是對你的數組長度取模。

但是這樣就會出現一個問題,你這麼算難免會出現算出來的数字是一樣的:

比如數組長度為16,我們要放入数字1和17,那麼他們經過對數組長度取模后位置是一樣的,這樣就產生了Hash衝突。我們就可以在數組下拉出一個鏈表去存儲這個数字

4.知道哪些常見的解決hash衝突算法么?

1、開放定址法(就是往下找空餘地方)
     用開放定址法解決衝突的做法是:當衝突發生時,使用某種探查(亦稱探測)技術在散列表中形成一個探查(測)序列。沿此序列逐個單元地查找,直到找到給定 的關鍵字,或者碰到一個開放的地址(即該地址單元為空)為止(若要插入,在探查到開放的地址,則可將待插入的新結點存人該地址單元)。查找時探查到開放的 地址則表明表中無待查的關鍵字,即查找失敗。

2、 再哈希法(再進行hash直到無衝突)
再哈希法又叫雙哈希法,有多個不同的Hash函數,當發生衝突時,使用第二個,第三個,….,等哈希函數
計算地址,直到無衝突。雖然不易發生聚集,但是增加了計算時間。

3、拉鏈法(hashmap用的)

鏈地址法的基本思想是:每個哈希表節點都有一個next指針,多個哈希表節點可以用next指針構成一個單向鏈表,被分配到同一個索引上的多個結點用單向鏈表連接起來

4、建立公共溢出區: 
這種方法的基本思想是:將哈希表分為基本表和溢出表兩部分,凡是和基本表發生衝突的元素,一律填入溢出表

5.為什麼閾值就是8和6呢?中間的7是有什麼作用的呢?直接就是紅黑樹不可以么?

HashMap中有這樣一段註釋(主要看数字):

/* * Because TreeNodes are about twice the size of regular nodes, we * use them only when 鏈表s contain enough nodes to warrant use * (see TREEIFY_THRESHOLD). And when they become too small (due to * removal or resizing) they are converted back to plain 鏈表s. In * usages with well-distributed user hashCodes, tree 鏈表s are * rarely used. Ideally, under random hashCodes, the frequency of * nodes in 鏈表s follows a Poisson distribution * (http://en.wikipedia.org/wiki/Poisson_distribution) with a * parameter of about 0.5 on average for the default resizing * threshold of 0.75, although with a large variance because of * resizing granularity. Ignoring variance, the expected * occurrences of list size k are (exp(-0.5) * pow(0.5, k) / * factorial(k)). The first values are: * * 0: 0.60653066 * 1: 0.30326533 * 2: 0.07581633 * 3: 0.01263606 * 4: 0.00157952 * 5: 0.00015795 * 6: 0.00001316 * 7: 0.00000094 * 8: 0.00000006 * more: less than 1 in ten million */

TreeNodes佔用空間是普通Nodes的兩倍(相較於鏈表結構,鏈表只有指向下一個節點的指針,二叉樹則需要左右指針,分別指向左節點和右節點),所以只有當鏈表包含足夠多的節點時才會轉成TreeNodes(考慮到時間和空間的權衡),而是否足夠多就是由TREEIFY_THRESHOLD的值決定的。當紅黑樹中節點數變少時,又會轉成普通的鏈表。並且我們查看源碼的時候發現,鏈表長度達到8就轉成紅黑樹,當長度降到6就轉成普通鏈表。

這樣就解釋了為什麼不是一開始就將其轉換為TreeNodes,而是需要一定節點數才轉為TreeNodes,說白了就是trade-off,空間和時間的權衡。

當hashCode離散性很好的時候,樹型鏈表用到的概率非常小,因為數據均勻分佈在每個鏈表中,幾乎不會有鏈表中鏈表長度會達到閾值。但是在隨機hashCode下,離散性可能會變差,然而JDK又不能阻止用戶實現這種不好的hash算法,因此就可能導致不均勻的數據分佈。不過理想情況下隨機hashCode算法下所有鏈表中節點的分佈頻率會遵循泊松分佈,我們可以看到,一個鏈表中鏈表長度達到8個元素的概率為0.00000006,幾乎是不可能事件。這種不可能事件都發生了,說明鏈表中的節點數很多,查找起來效率不高。至於7,是為了作為緩衝,可以有效防止鏈表和樹頻繁轉換。

之所以選擇8,不是拍拍屁股決定的,而是根據概率統計決定的。由此可見,發展30年的Java每一項改動和優化都是非常嚴謹和科學的。

泊松分佈適合於描述單位時間(或空間)內隨機事件發生的次數。如某一服務設施在一定時間內到達的人數,電話交換機接到呼叫的次數,汽車站台的候客人數,機器出現的故障數,自然災害發生的次數,一塊產品上的缺陷數,顯微鏡下單位分區內的細菌分佈數等等。如果有興趣的,可以研究一下,概率是怎麼算出來的!

個人總結:

  1. 選擇8是因為空間和時間的權衡,再一個是因為鏈表中節點的分佈頻率會遵循泊松分佈,達到8的概率很小
  2. 選擇7是為了作為緩衝,可以有效防止鏈表和樹頻繁轉換
  3. 你的紅黑樹查詢時間複雜度低,但你的維持平衡的操作代價是大的,所以不會直接是紅黑樹(這一點是個人理解)

6.HashMap的初始容量,加載因子,擴容增量是多少?如果加載因子變大變小會怎麼樣?

HashMap的初始容量16,加載因子為0.75,擴容增量是原容量的1倍。如果HashMap的容量為16,一次擴容后容量為32。HashMap擴容是指元素個數(包括數組和鏈表+紅黑樹中)超過了16*0.75=12(容量×加載因子)之後開始擴容。

這個就是源碼里的聲明

//默認初始容量
static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16 //最大容量
static final int MAXIMUM_CAPACITY = 1 << 30; //加載因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;

加載因子越大,填滿的元素越多,空間利用率越高,但衝突的機會加大了。
反之,加載因子越小,填滿的元素越少,衝突的機會減小,但空間浪費多了(因為需要經常擴容)。

所以這是一個時間和空間的均衡。

7. 如果我默認初始大小為100,那麼元素個數到達75會擴容么?

這個問題我以前見到過,所以拿出來說一下。

首先HashMap的構造方法有四個

    public HashMap(int initialCapacity, float loadFactor) { if (initialCapacity < 0) throw new IllegalArgumentException("Illegal initial capacity: " + initialCapacity); if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; if (loadFactor <= 0 || Float.isNaN(loadFactor)) throw new IllegalArgumentException("Illegal load factor: " + loadFactor); this.loadFactor = loadFactor; this.threshold = tableSizeFor(initialCapacity); } public HashMap(int initialCapacity) { this(initialCapacity, DEFAULT_LOAD_FACTOR); } public HashMap() { this.loadFactor = DEFAULT_LOAD_FACTOR; // all other fields defaulted
    }
  
  public HashMap(Map<!--? extends K, ? extends V--> m) {       this.loadFactor = DEFAULT_LOAD_FACTOR;       putMapEntries(m, false);   }  

簡單點來說就是你可以自定義加載因子和初始容量。但是這個初始容量不是說你設置多少就是多少,他是會有個計算的,到最後Hashmap的容量一定是2的n次方

 

 

 簡單說一下putMapEntries

final void putMapEntries(Map<? extends K, ? extends V> m, boolean evict) { //獲取該map的實際長度
        int s = m.size(); if (s > 0) { //判斷table是否初始化,如果沒有初始化
            if (table == null) { // pre-size
                /**求出需要的容量,因為實際使用的長度=容量*0.75得來的,+1是因為小數相除,基本都不會是整數,容量大小不能為小數的,後面轉換為int,多餘的小數就要被丟掉,所以+1,例如,map實際長度22,22/0.75=29.3,所需要的容量肯定為30,有人會問如果剛剛好除得整數呢,除得整數的話,容量大小多1也沒什麼影響**/
                float ft = ((float)s / loadFactor) + 1.0F; //判斷該容量大小是否超出上限。
                int t = ((ft < (float)MAXIMUM_CAPACITY) ? (int)ft : MAXIMUM_CAPACITY); /**對臨界值進行初始化,tableSizeFor(t)這個方法會返回大於t值的,且離其最近的2次冪,例如t為29,則返回的值是32**/
                if (t > threshold) threshold = tableSizeFor(t); } //如果table已經初始化,則進行擴容操作,resize()就是擴容。
            else if (s > threshold) resize(); //遍歷,把map中的數據轉到hashMap中。
            for (Map.Entry<? extends K, ? extends V> e : m.entrySet()) { K key = e.getKey(); V value = e.getValue(); putVal(hash(key), key, value, false, evict); } } }

 

所以說這個答案就是不會擴容的,因為你初始它的容量是100,tableSizeFor也會自動變成128,128×0.75是93遠遠大於75.

8. HashMap中為什麼數組的長度為2的冪次方?

主要是為了計算hash值時散列性更好。

我們看一下HashMap的數組下標如何計算的

 

// 將(數組的長度-1)和hash值進行按位與操作:
i = (n - 1) & hash  // i為數組對應位置的索引 n為當前數組的大小

假定HashMap的長度為默認的16,則n – 1為15,也就是二進制的01111

可以說,Hash算法最終得到的index結果完全取決於hashCode的最後幾位。

那麼說為什麼別的数字不行呢?

假設,HashMap的長度為10,則n-1為9,也就是二進制的1001

我們來試一個hashCode:1110時,通過Hash算法得到的最終的index是8

 

再比如說:1000得到的index也是8。

也就是說,即使我們把倒數第二、三位的0、1變換,得到的index仍舊是8,說明有些index結果出現的幾率變大!

這樣,顯然不符合Hash算法均勻分佈的要求。

反觀,長度16或其他2的冪次方,Length – 1的值的二進制所有的位均為1,這種情況下,Index的結果等於hashCode的最後幾位。只要輸入的hashCode本身符合均勻分佈,Hash算法的結果就是均勻的。

一句話,HashMap的長度為2的冪次方的原因是為了減少Hash碰撞,盡量使Hash算法的結果均勻分佈。

9.put方法

在講解put方法之前,先看看hash方法,看怎麼計算哈希值的。

    static final int hash(Object key) { int h; /**先獲取到key的hashCode,然後進行移位再進行異或運算,為什麼這麼複雜,不用想肯定是為了減少hash衝突**/
        return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16); }

put方法實際調用了putVal方法

    public V put(K key, V value) { /**四個參數,第一個hash值,第四個參數表示如果該key存在值,如果為null的話,則插入新的value,最後一個參數,在hashMap中沒有用,可以不用管,使用默認的即可**/
        return putVal(hash(key), key, value, false, true); } final V putVal(int hash, K key, V value, boolean onlyIfAbsent, boolean evict) { //tab 哈希數組,p 該哈希桶的首節點,n hashMap的長度,i 計算出的數組下標
        Node<K,V>[] tab; Node<K,V> p; int n, i; //獲取長度並進行擴容,使用的是懶加載,table一開始是沒有加載的,等put后才開始加載
        if ((tab = table) == null || (n = tab.length) == 0) n = (tab = resize()).length; /**如果計算出的該哈希桶的位置沒有值,則把新插入的key-value放到此處,此處就算沒有插入成功,也就是發生哈希衝突時也會把哈希桶的首節點賦予p**/
        if ((p = tab[i = (n - 1) & hash]) == null) tab[i] = newNode(hash, key, value, null); //發生哈希衝突的幾種情況
        else { // e 臨時節點的作用, k 存放該當前節點的key 
            Node<K,V> e; K k; //第一種,插入的key-value的hash值,key都與當前節點的相等,e = p,則表示為首節點
            if (p.hash == hash && ((k = p.key) == key || (key != null && key.equals(k)))) e = p; //第二種,hash值不等於首節點,判斷該p是否屬於紅黑樹的節點
            else if (p instanceof TreeNode) /**為紅黑樹的節點,則在紅黑樹中進行添加,如果該節點已經存在,則返回該節點(不為null),該值很重要,用來判斷put操作是否成功,如果添加成功返回null**/ e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value); //第三種,hash值不等於首節點,不為紅黑樹的節點,則為鏈表的節點
            else { //遍歷該鏈表
                for (int binCount = 0; ; ++binCount) { //如果找到尾部,則表明添加的key-value沒有重複,在尾部進行添加
                    if ((e = p.next) == null) { p.next = newNode(hash, key, value, null); //判斷是否要轉換為紅黑樹結構
                        if (binCount >= TREEIFY_THRESHOLD - 1) treeifyBin(tab, hash); break; } //如果鏈表中有重複的key,e則為當前重複的節點,結束循環
                    if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) break; p = e; } } //有重複的key,則用待插入值進行覆蓋,返回舊值。
            if (e != null) { V oldValue = e.value; if (!onlyIfAbsent || oldValue == null) e.value = value; afterNodeAccess(e); return oldValue; } } //到了此步驟,則表明待插入的key-value是沒有key的重複,因為插入成功e節點的值為null //修改次數+1
        ++modCount; //實際長度+1,判斷是否大於臨界值,大於則擴容
        if (++size > threshold) resize(); afterNodeInsertion(evict); //添加成功
        return null; }

大概如下幾步:

①. 判斷鍵值對數組table[i]是否為空或為null,否則執行resize()進行擴容,初始容量是16;

②. 根據鍵值key計算hash值得到插入的數組索引i,如果table[i]==null,直接新建節點添加,轉向⑥,如果table[i]不為空,轉向③;

③. 判斷table[i]的首個元素是否和key一樣,如果相同直接覆蓋value,否則轉向④,這裏的相同指的是hashCode以及equals;

④. 判斷table[i] 是否為TreeNode,即table[i] 是否是紅黑樹,如果是紅黑樹,遍歷發現該key不存在  則直接在樹中插入鍵值對;遍歷發現key已經存在直接覆蓋value即可;

⑤. 如果table[i] 不是TreeNode則是鏈表節點,遍歷發現該key不存在,則先添加在鏈表結尾, 判斷鏈表長度是否大於8,大於8的話把鏈錶轉換為紅黑樹;遍歷發現key已經存在直接覆蓋value即可;

⑥. 插入成功后,判斷實際存在的鍵值對數量size是否超多了最大容量threshold,如果超過,進行擴容。

10.resize方法

何時進行擴容?

HashMap使用的是懶加載,構造完HashMap對象后,只要不進行put 方法插入元素之前,HashMap並不會去初始化或者擴容table。

當首次調用put方法時,HashMap會發現table為空然後調用resize方法進行初始化
,當添加完元素后,如果HashMap發現size(元素總數)大於threshold(閾值),則會調用resize方法進行擴容

    final Node<K,V>[] resize() {
        Node<K,V>[] oldTab = table; //old的長度
        int oldCap = (oldTab == null) ? 0 : oldTab.length; //old的臨界值
        int oldThr = threshold; //初始化new的長度和臨界值
        int newCap, newThr = 0; //oldCap > 0也就是說不是首次初始化,因為hashMap用的是懶加載
        if (oldCap > 0) { //大於最大值
            if (oldCap >= MAXIMUM_CAPACITY) { //臨界值為整數的最大值
                threshold = Integer.MAX_VALUE; return oldTab; } //標記##,其它情況,擴容兩倍,並且擴容后的長度要小於最大值,old長度也要大於16
            else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY && oldCap >= DEFAULT_INITIAL_CAPACITY) //臨界值也擴容為old的臨界值2倍
                newThr = oldThr << 1; } /**如果oldCap<0,但是已經初始化了,像把元素刪除完之後的情況,那麼它的臨界值肯定還存在, 如果是首次初始化,它的臨界值則為0 **/
        else if (oldThr > 0) newCap = oldThr; //首次初始化,給與默認的值
        else { newCap = DEFAULT_INITIAL_CAPACITY; //臨界值等於容量*加載因子
            newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY); } //此處的if為上面標記##的補充,也就是初始化時容量小於默認值16的,此時newThr沒有賦值
        if (newThr == 0) { //new的臨界值
            float ft = (float)newCap * loadFactor; //判斷是否new容量是否大於最大值,臨界值是否大於最大值
            newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ? (int)ft : Integer.MAX_VALUE); } //把上面各種情況分析出的臨界值,在此處真正進行改變,也就是容量和臨界值都改變了。
        threshold = newThr; //表示忽略該警告
        @SuppressWarnings({"rawtypes","unchecked"}) //初始化
            Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap]; //賦予當前的table
        table = newTab; //此處自然是把old中的元素,遍歷到new中
        if (oldTab != null) { for (int j = 0; j < oldCap; ++j) { //臨時變量
                Node<K,V> e; //當前哈希桶的位置值不為null,也就是數組下標處有值,因為有值表示可能會發生衝突
                if ((e = oldTab[j]) != null) { //把已經賦值之後的變量置位null,當然是為了好回收,釋放內存
                    oldTab[j] = null; //如果下標處的節點沒有下一個元素
                    if (e.next == null) //把該變量的值存入newCap中,e.hash & (newCap - 1)並不等於j
                        newTab[e.hash & (newCap - 1)] = e; //該節點為紅黑樹結構,也就是存在哈希衝突,該哈希桶中有多個元素
                    else if (e instanceof TreeNode) //把此樹進行轉移到newCap中
                        ((TreeNode<K,V>)e).split(this, newTab, j, oldCap); else { /**此處表示為鏈表結構,同樣把鏈錶轉移到newCap中,就是把鏈表遍歷后,把值轉過去,在置位null**/ Node<K,V> loHead = null, loTail = null; Node<K,V> hiHead = null, hiTail = null; Node<K,V> next; do { next = e.next; if ((e.hash & oldCap) == 0) { if (loTail == null) loHead = e; else loTail.next = e; loTail = e; } else { if (hiTail == null) hiHead = e; else hiTail.next = e; hiTail = e; } } while ((e = next) != null); if (loTail != null) { loTail.next = null; newTab[j] = loHead; } if (hiTail != null) { hiTail.next = null; newTab[j + oldCap] = hiHead; } } } } } //返回擴容后的hashMap
        return newTab; }

其實主要就是兩步:1.創建新的數組 2.複製元素

但是在新的下標位置計算上1.8做了很大的優化,後面會說到。

11.get方法

    public V get(Object key) { Node<K,V> e; 9 //調用getNode方法來完成的
        return (e = getNode(hash(key), key)) == null ? null : e.value; } final Node<K,V> getNode(int hash, Object key) { //first 頭結點,e 臨時變量,n 長度,k key
        Node<K,V>[] tab; Node<K,V> first, e; int n; K k; //頭結點也就是數組下標的節點
        if ((tab = table) != null && (n = tab.length) > 0 && (first = tab[(n - 1) & hash]) != null) { //如果是頭結點,則直接返回頭結點
            if (first.hash == hash && ((k = first.key) == key || (key != null && key.equals(k)))) return first; //不是頭結點
            if ((e = first.next) != null) { //判斷是否是紅黑樹結構
                if (first instanceof TreeNode) //去紅黑樹中找,然後返回
                    return ((TreeNode<K,V>)first).getTreeNode(hash, key); do { //鏈表節點,一樣遍歷鏈表,找到該節點並返回
                    if (e.hash == hash && ((k = e.key) == key || (key != null && key.equals(k)))) return e; } while ((e = e.next) != null); } } //找不到,表示不存在該節點
        return null; }

主要就是利用equals和hashcode方法找到並返回

12.HashMap在JDK1.7和1.8除了數據結構的區別

(1)插入數據方式不同:

JDK1.7用的是頭插法,而JDK1.8及之後使用的都是尾插法,那麼他們為什麼要這樣做呢?因為JDK1.7認為最新插入的應該會先被用到,所以用了頭插法,但當採用頭插法時會容易出現逆序且環形鏈表死循環問題。但是在JDK1.8之後是因為加入了紅黑樹使用尾插法,能夠避免出現逆序且鏈表死循環的問題。

  說一下為什麼會產生死循環問題:

  問題出現在了這個移動元素的transfer方法里

  

 主要問題就出在了這行代碼上

Entry<K,V> next = e.next

如果兩個線程A,B都要對這個map進行擴容

A和B都已經創建了新的數組,假設線程A在執行到Entry < K,V > next = e.next之後,cpu時間片用完了,這時變量e指向節點a,變量next指向節點b。

此時A的狀態:e=a ,next=b

線程B繼續執行,很不巧,a、b、c節點rehash之後又是在同一個位置,開始移動節點, 因為頭插法,複製后順序是反的,結束后B的狀態:

 

 

 此時A開始執行,此時變量e指向節點a,變量next指向節點b,開始執行循環體的剩餘邏輯

if (rehash) { e.hash = null == e.key ? 0 : hash(e.key); } int i = indexFor(e.hash, newCapacity); e.next = newTable[i]; newTable[i] = e; e = next;

執行到

newTable[i] = e;

此時A的狀態

 

執行到

e = next;

 

此時e=b

再執行一波循環,Entry<K,V> next = e.next 但是此時b的next是a,就出現了死循環問題

 

 

(2)擴容后數據存儲位置的計算方式也不一樣:

在JDK1.7的時候是重新計算數組下標

而在JDK1.8的時候直接用了JDK1.7的時候計算的規律,也就是擴容前的原始位置+擴容的大小值=JDK1.8的計算方式,而不再是JDK1.7的那種異或的方法。但是這種方式就相當於只需要判斷Hash值的新增參与運算的位是0還是1就直接迅速計算出了擴容后的儲存方式。

就比如說:數組大小是4,hash算法是對長度取模

 

 擴容后是這樣的

我們可以把這三個數的二進制和擴容后的length-1進行按位與,可以看到只有数字5新增位為1

 

 

 

 因此,我們在擴充HashMap的時候,不需要像JDK1.7的實現那樣重新計算hash,只需要看看原來的hash值新增的那個bit是1還是0就好了,是0的話索引沒變,是1的話索引變成“原索引+oldCap”

(3)擴容的條件不同,1.7需要容量超過閾值且發生hash衝突,1.8超過閾值即會擴容

(4)JDK1.7的時候是先進行擴容後進行插入,而在JDK1.8的時候則是先插入後進行擴容

(5)1.8中沒有區分鍵為null的情況,而1.7版本中對於鍵為null的情況調用putForNullKey()方法。但是兩個版本中如果鍵為null,那麼調用hash()方法得到的都將是0,所以鍵為null的元素都始終位於哈希表table【0】中。

(6)jdk1.7中當哈希表為空時,會先調用inflateTable()初始化一個數組;而1.8則是直接調用resize()擴容

(7)jdk1.7中的hash函數對哈希值的計算直接使用key的hashCode值,而1.8中則是採用key的hashCode異或上key的hashCode進行無符號右移16位的結果,避免了只靠低位數據來計算哈希時導致的衝突,計算結果由高低位結合決定,使元素分佈更均勻

13、HashMap是線程安全的么?如果想線程安全怎麼辦?

不是線程安全的,多線程下會出現死循環和put操作時可能導致元素丟失

死循環原因:上邊已經分析過了

丟失原因:當多個線程同時執行addEntry(hash,key ,value,i)時,如果產生哈希碰撞,導致兩個線程得到同樣的bucketIndex去存儲,就可能會發生元素覆蓋丟失的情況

 

想實現線程安全的解決方法:

1.使用Hashtable 類,Hashtable 是線程安全的(不建議用,就是利用了synchronized進行加鎖);

2.使用併發包下的java.util.concurrent.ConcurrentHashMap,ConcurrentHashMap實現了更高級的線程安全;

3.或者使用synchronizedMap() 同步方法包裝 HashMap object,得到線程安全的Map,並在此Map上進行操作。

 

參考:

https://blog.csdn.net/m0_37914588/article/details/82287191

https://www.jianshu.com/p/7cf2d6f1096b

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※教你寫出一流的銷售文案?

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※回頭車貨運收費標準

※別再煩惱如何寫文案,掌握八大原則!

※超省錢租車方案

※產品缺大量曝光嗎?你需要的是一流包裝設計!

※推薦台中搬家公司優質服務,可到府估價

卷積生成對抗網絡(DCGAN)—生成手寫数字

深度卷積生成對抗網絡(DCGAN)

—- 生成 MNIST 手寫圖片

1、基本原理

生成對抗網絡(GAN)由2個重要的部分構成:

  • 生成器(Generator):通過機器生成數據(大部分情況下是圖像),目的是“騙過”判別器
  • 判別器(Discriminator):判斷這張圖像是真實的還是機器生成的,目的是找出生成器做的“假數據”

訓練過程

  • 1、固定判別器,讓生成器不斷生成假數據,給判別器判別,開始生成器很弱,但是隨着不斷的訓練,生成器不斷提升,最終騙過判別器。此時判別器判斷假數據的概率為50%
  • 2、固定生成器,訓練判別器。判別器經過訓練,提高鑒別能力,最終能準確判斷雖有的假圖片
  • 3、循環上兩個階段,最終生成器和判別器都越來越強。然後就可以使用生成器來生成我們想要的圖片了

2、相關數學原理

  • 判別器在這裡是一種分類器,用於區分樣本的真偽,因此我們常常使用交叉熵(cross entropy)來進行判別分佈的相似性

\[H(p, q) := -\sum_i p_i \log q_i \]

公式中 \(p_i\)\(q_i\) 為真實的樣本分佈和生成器的生成分佈

假定 \(y_1\) 為正確樣本分佈,那麼對應的( \(1-y_1\) )就是生成樣本的分佈。\(D\) 表示判別器,則 \(D(x_1)\) 表示判別樣本為正確的概率, \(1-D(x_1)\) 則對應着判別為錯誤樣本的概率。則有如下式子(這裏僅僅是對當前情況下的交叉熵損失的具體化)。

\[H((x_i, y_i)_{i=1}^N, D) = – \sum_{i=1}^N y_i\log D(x_i) – \sum_{i=1}^N(1-y_i)\log (1 – D(x_i)) \]

對於GAN中的樣本點 \(x_i\) ,對應於兩個出處,要麼來自於真實樣本,要麼來自於生成器生成的樣本 $\tilde{x} – G(z) $ ( 這裏的 \(z\) 是服從於投到生成器中噪聲的分佈)。

對於來自於真實的樣本,我們要判別為正確的分佈 \(y_i\) 。來自於生成的樣本我們要判別其為錯誤分佈( \(1-y_i\) )。將上面式子進一步使用概率分佈的期望形式寫出(為了表達無限的樣本情況,相當於無限樣本求和情況),並且讓 \(y_i\) 為 1/2 且使用 \(G(z)\) 表示生成樣本可以得到如下公式:

\[H \left( (x_i, y_i)_{i=1}^\infty, D \right) = -\frac{1}{2}E_{x-p_{data}}\left[ \log D(x) \right] – \frac{1}{2}E_z\left[ \log (1-D(G(z))) \right] \\\ GAN損失函數期望形式 \]

對於論文中的公式

\[min_G max_D V(D, G) = E_{x-p_{data}(x)}\left[ \log D(x) \right] + E_{z-p_z(z)}\left[ \log (1-D(G(z))) \right] \\\ GAN損失函數的 min max表達 \]

其實是與上面公式一樣的,下面做解釋

  • 這裏的 \(V(D, G)\) 相當於表示真實樣本和生成樣本的差異程度。
  • \(max_D V(D, G)\) 的意思是固定生成器 \(G\), 盡可能地讓判別器能夠最大化地判別出樣本來自於真實數據還是生成的數據。
  • 再將後面的 $L = max_D V(D, G) $ 看成整體,對於 \(min_G L\)這裡是在固定判別器\(D\)的條件下得到生成器 \(G\),這個 \(G\) 要求能夠最小化真實樣本與生成樣本的差異。
  • 通過上述 \(min\) \(max\) 的博弈過程,理想情況下會收斂於生成分佈擬合於真實分佈。

3、卷積對抗生成網絡

卷積對抗生成網絡(DCGAN)是在GAN的基礎上加入了CNN,主要是改進了網絡結構,在訓練過程中狀態穩定,並且可以有效實現高質量圖片的生成以及相關的生成模型應用。DCGAN的生成器網絡結構如下圖:

DCGAN的改進:

  • 使用步長卷積代替上採樣層,卷積在提取圖像特徵上具有很好的作用,並且使用卷積代替全連接層
  • 生成器G和判別器D中幾乎每一層都使用batchnorm層,將特徵層的輸出歸一化到一起,加速了訓練,提升了訓練的穩定性。
  • 在判別器中使用leakrelu激活函數,而不是RELU,防止梯度稀疏,生成器中仍然採用relu,但是輸出層採用tanh。

4、DCGAN代碼實現

shenduimport numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import optimizers, losses, layers, Sequential, Model
class DCGAN():
    '''
    實現深度對抗神經網絡
    生成 MNIST 手寫数字圖片
    輸入的噪聲為服從正態分佈均值為 0 方差為 1 的分佈, shape:(None, 100)
    生成器(G)輸入 噪聲, 輸出為 (None, 28, 28, 1)的圖片
    分類器(D)輸入為 (None, 28, 28, 1)的圖片,輸出圖片的分類真假
    '''
    def __init__(self):
        self.img_rows = 28 
        self.img_cols = 28
        self.channels = 1
        self.img_shape = (self.img_rows, self.img_cols, self.channels)

        optimizer = optimizers.Adam(0.0002)

        # 構建編譯分類器
        self.discriminator = self.build_discriminator()
        self.discriminator.compile(loss='binary_crossentropy', 
            optimizer=optimizer,
            metrics=['accuracy'])

        # 構建編譯生成器
        self.generator = self.build_generator()
        self.generator.compile(loss='binary_crossentropy', optimizer=optimizer)

        # 生成器輸入為噪音,生成圖片
        z = layers.Input(shape=(100,))
        img = self.generator(z)

        # 對於整個對抗網絡模型只優化生成器的參數
        self.discriminator.trainable = False

        # 用生成的圖片輸入分類器判斷
        valid = self.discriminator(img)

        # 對於整個對抗網絡 輸入噪音 => 生成圖片 => 決定圖片是否有效
        self.combined = Model(z, valid)
        self.combined.compile(loss='binary_crossentropy', optimizer=optimizer)

        
    def build_generator(self):
        '''
        構建生成器
        '''
        noise_shape = (100,)
        
        model = tf.keras.Sequential()
        
        # 添加全連接層
        model.add(layers.Dense(7*7*256, use_bias=False, input_shape=noise_shape))
        # 添加 BatchNormalization 層,對數據進行歸一化
        model.add(layers.BatchNormalization())
        model.add(layers.LeakyReLU())

        model.add(layers.Reshape((7, 7, 256)))
        
        # 添加逆卷積層,卷積核大小為 5X5,數量 128, 步長為 1
        model.add(layers.Conv2DTranspose(128, (5, 5), strides=(1, 1), padding='same', use_bias=False))
        assert model.output_shape == (None, 7, 7, 128)
        model.add(layers.BatchNormalization())
        model.add(layers.LeakyReLU())
        
        # 添加逆卷積層,卷積核大小為 5X5,數量 64, 步長為 2
        model.add(layers.Conv2DTranspose(64, (5, 5), strides=(2, 2), padding='same', use_bias=False))
        assert model.output_shape == (None, 14, 14, 64)
        model.add(layers.BatchNormalization())
        model.add(layers.LeakyReLU())
        
        # 添加逆卷積層,卷積核大小為 5X5,數量 1, 步長為 2
        model.add(layers.Conv2DTranspose(1, (5, 5), strides=(2, 2), padding='same', use_bias=False, activation='tanh'))
        assert model.output_shape == (None, 28, 28, 1)
        
        model.summary()
        noise = layers.Input(shape=noise_shape)
        img = model(noise)
        
        # 返回 Model 對象,輸入為 噪聲, 輸出為 圖像
        return keras.Model(noise, img)

    
    def build_discriminator(self):
        '''
        構建分類器
        '''
        img_shape = (self.img_rows, self.img_cols, self.channels)
        
        model = tf.keras.Sequential()
        
        model.add(layers.Conv2D(64, (5, 5), strides=(2, 2), padding='same',
                                         input_shape=img_shape))
        model.add(layers.LeakyReLU())
        # 添加 Dropout 層,減少參數數量
        model.add(layers.Dropout(0.3))

        model.add(layers.Conv2D(128, (5, 5), strides=(2, 2), padding='same'))
        model.add(layers.LeakyReLU())
        model.add(layers.Dropout(0.3))
        # 把數據鋪平
        model.add(layers.Flatten())
        model.add(layers.Dense(1))
        
        model.summary()
        
        img = layers.Input(shape=img_shape)
        validity = model(img)
        
        return keras.Model(img, validity)

    
    def train(self, epochs, batch_size=128, save_interval=50):
        '''
        網絡訓練
        '''
        # 加載 數據集
        (X_train, _), (_, _) = keras.datasets.mnist.load_data()

        # 把數據縮放到 [-1, 1]
        X_train = (X_train.astype(np.float32) - 127.5) / 127.5
        # 添加通道維度
        X_train = np.expand_dims(X_train, axis=3)
        half_batch = int(batch_size / 2)

        for epoch in range(epochs):

            # ---------------------
            #  訓練分類器
            # ---------------------

            # 隨機的選擇一半的 batch 數量圖片
            idx = np.random.randint(0, X_train.shape[0], half_batch)
            imgs = X_train[idx]

            noise = np.random.normal(0, 1, (half_batch, 100))

            # 生成一半 batch 數量的 圖片
            gen_imgs = self.generator.predict(noise)

            # 分類器損失
            d_loss_real = self.discriminator.train_on_batch(imgs, np.ones((half_batch, 1)))
            d_loss_fake = self.discriminator.train_on_batch(gen_imgs, np.zeros((half_batch, 1)))
            d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)


            # ---------------------
            #  訓練生成器
            # ---------------------

            noise = np.random.normal(0, 1, (batch_size, 100))

            # The generator wants the discriminator to label the generated samples
            # as valid (ones)
            # 對於生成器,希望分類器把更多的圖片判為 有效 (用 1 表示)
            valid_y = np.array([1] * batch_size)

            # 訓練生成器
            g_loss = self.combined.train_on_batch(noise, valid_y)

            # 打印訓練進度
            print ("%d [D loss: %f, acc.: %.2f%%] [G loss: %f]" % (epoch, d_loss[0], 100*d_loss[1], g_loss))

            # 每個 save_interval 周期保存一張圖片
            if epoch % save_interval == 0:
                self.save_imgs(epoch)

    def save_imgs(self, epoch):
        r, c = 5, 5
        noise = np.random.normal(0, 1, (r * c, 100))
        gen_imgs = self.generator.predict(noise)

        # 把圖片數據縮放到 0 - 1
        gen_imgs = 0.5 * gen_imgs + 0.5

        fig, axs = plt.subplots(r, c)
        cnt = 0
        for i in range(r):
            for j in range(c):
                axs[i,j].imshow(gen_imgs[cnt, :,:,0], cmap='gray')
                axs[i,j].axis('off')
                cnt += 1
        fig.savefig("dcgan/images/mnist_%d.png" % epoch)
        plt.close()

if __name__ == '__main__':
    dcgan = DCGAN()
    dcgan.train(epochs=10000, batch_size=32, save_interval=200)

網絡參數信息

5、訓練結果

下面是循環了 10000 次 epoch 后,從開始每隔 2000 個 epoch 生成器生成的圖片

  • 可以看到,剛開始全部都是噪聲,隨着訓練的進行,圖片逐漸清晰

  • 生成的圖片還是不太清晰,一方面的原因是我訓練的 epoch 周期太少,因為自己電腦性能問題,太耗時間,所以訓練的epoch 周期少,如果有條件后提高訓練周期應該會好很多。另一方面或許因為我構建的網絡還有不合理之,後期還需要改進。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※超省錢租車方案

※別再煩惱如何寫文案,掌握八大原則!

※回頭車貨運收費標準

※教你寫出一流的銷售文案?

※產品缺大量曝光嗎?你需要的是一流包裝設計!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

網頁設計最專業,超強功能平台可客製化

【asp.net core 系列】6 實戰之 一個項目的完整結構

0. 前言

在《asp.net core 系列》之前的幾篇文章中,我們簡單了解了路由、控制器以及視圖的關係以及靜態資源的引入,讓我們對於asp.net core mvc項目有了基本的認識。不過,這些並不是 asp.net core mvc項目的全部內容,剩下的內容我將結合實戰項目為大家講解其中的知識。現在,就讓我們開始吧。

1. 項目構建

拋開之前的項目,現在跟着我重新創建一個項目,第一步依舊是先創建一個解決方案:

dotnet new sln --name Template

我先介紹一下這個項目(指整個項目,不是單獨的asp.net core 應用),這是一個後台管理的模板應用,提供了常見後台系統(管理員端)的功能,包括員工管理、部門管理、角色管理等功能。

現在回到項目中,通常一個項目需要一個模型層,一個數據提供層以及web展示層。然後,我們依次創建 Data、Domain、Web 三個項目,其中Data和Domain 是 classlib,Web是mvc項目。

# 確保當前目錄與 Template.sln 處於相同的目錄
dotnet new classlib --name Data
dotnet new classlib --name Domain
dotnet new mvc --name Web

添加三個項目到解決方案中:

dotnet sln add Data
dotnet sln add Domain
dotnet sln add Web

因為Data 中存放着模型層,所以需要其他項目對它有一個引用:

cd Domain
dotnet add reference ../Data
cd ../Web
dotnet add reference ../Data

當然,實際開發中我們應當還有一個Service層,這一層用來存放業務代碼,減少控制器里不必要的業務代碼。那麼繼續:

# 回到項目的根目錄
cd ..
dotnet new classlib --name Service
dotnet sln add Service

然後添加Service的引用:

cd Service
dotnet add reference ../Data

將 Service的引用添加到Web里:

cd ../Web
dotnet add reference ../Service

現在一個大型工程基本都是面向接口編程,幾個關鍵層應當都是接口層,我們實際上還缺少Domain的實現層和Service的實現層。

cd ..
dotnet new classlib --name Domain.Implements
dotnet new classlib --name Service.Implements

在對應的實現層中,引入它們實現的接口層,並引入Data:

cd Domain.Implements
dotnet add reference ../Data
dotnet add reference ../Domain
cd ../Service.Implements
dotnet add reference ../Data
dotnet add reference ../Domain
dotnet add reference ../Service

這裡在Service的實現層添加Domain接口層的引用,而不是實現層的引用。這是因為面向接口編程,我們需要對Service實現層隱藏Domain的實現,所以對於Service的實現層來說,不需要關心Domain層的實現邏輯。

在Web中添加新建的兩個實現層的引用:

cd ../Web
dotnet add reference ../Domain.Implements
dotnet add reference ../Service.Implements

添加這兩個實現層到解決方案中:

cd ..
dotnet sln add Domain.Implements
dotnet sln add Service.Implements

下圖是到目前為止的項目結構圖:

整體而言,Data是各個層之間的數據流通依據,所以各個項目都依賴於此項目,各個接口層的實現層都只對Web可見,其他各層實際上並不清楚具體實現。

隱藏實現層有什麼好處呢?

  • 調用方不知道實現方的邏輯,避免調用方對特定實現的依賴
  • 有利於團隊協作,有的團隊是針對模塊劃分,有的是針對分層劃分,無論哪種,使用接口都是一個好的選擇
  • 有利於後期優化,可以很方便的切換實現層,而不用重新編譯過多的代碼

當然,並不只有這些好處,不過這樣有一個壞處,在web層調用service層時會更繁瑣,不過這也不是不可解決的,後續的內容中會為大家介紹如何解決這個煩惱。

2. 項目補充

通常情況下,一個完整的項目還會有一個工具類項目和一個測試項目。所以,繼續添加以下項目:

dotnet new classlib --name Utils

Utils 表示工具類,通常一個項目中工具類會比較多,所以就抽成了一個項目,單獨列出來。

添加測試項目:

dotnet new nunit --name Test

這裏使用的是nunit 3測試框架,當然還有另一個是xunit測試框架。

添加兩個項目到解決方案里:

dotnet sln add Utils
dotnet sln add Test

3. 總結

本章內容旨在通過創建項目,讓大家了解實際開發中項目的層級規劃思想,這並不代表我的就是最優的,只是這是我總結出來相對方便的層級關係。這裏並沒有講解如何通過Visual Studio或者Rider創建這樣的一個項目,我希望大夥能夠自己試試。

好了,希望大家能創建好項目,當然了後期我會給大家提供這個項目的源碼的,地址暫時保密哦。

更多內容煩請關注我的博客《高先生小屋》

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※產品缺大量曝光嗎?你需要的是一流包裝設計!

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※回頭車貨運收費標準

※推薦評價好的iphone維修中心

※超省錢租車方案

台中搬家遵守搬運三大原則,讓您的家具不再被破壞!

※推薦台中搬家公司優質服務,可到府估價

Netty源碼學習系列之4-ServerBootstrap的bind方法

前言

    今天研究ServerBootstrap的bind方法,該方法可以說是netty的重中之重、核心中的核心。前兩節的NioEventLoopGroup和ServerBootstrap的初始化就是為bind做準備。照例粘貼一下這個三朝元老的demo,開始本文內容。

 1 public class NettyDemo1 {
 2     // netty服務端的一般性寫法
 3     public static void main(String[] args) {
 4         EventLoopGroup boss = new NioEventLoopGroup(1);
 5         EventLoopGroup worker = new NioEventLoopGroup();
 6         try {
 7             ServerBootstrap bootstrap = new ServerBootstrap();
 8             bootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
 9                     .option(ChannelOption.SO_BACKLOG, 100)
10                     .handler(new NettyServerHandler())
11                     .childHandler(new ChannelInitializer<SocketChannel>() {
12                         @Override
13                         protected void initChannel(SocketChannel socketChannel) throws Exception {
14                             ChannelPipeline pipeline = socketChannel.pipeline();
15                             pipeline.addLast(new StringDecoder());
16                             pipeline.addLast(new StringEncoder());
17                             pipeline.addLast(new NettyServerHandler());
18                         }
19                     });
20             ChannelFuture channelFuture = bootstrap.bind(90);
21             channelFuture.channel().closeFuture().sync();
22         } catch (Exception e) {
23             e.printStackTrace();
24         } finally {
25             boss.shutdownGracefully();
26             worker.shutdownGracefully();
27         }
28     }
29 }

 

一、bind及doBind方法

1.ServerBootstrap.bind方法

    該方法有多個重載方法,但核心作用只有一個,就是將參數轉為InetSocketAddress對象傳給 —>

1 public ChannelFuture bind(int inetPort) {
2         return bind(new InetSocketAddress(inetPort));
3     }
1 public ChannelFuture bind(String inetHost, int inetPort) {
2         return bind(SocketUtils.socketAddress(inetHost, inetPort));
3     }
1 public ChannelFuture bind(InetAddress inetHost, int inetPort) {
2         return bind(new InetSocketAddress(inetHost, inetPort));
3     }

    下面這個bind方法,在該方法中調用了doBind方法。

1 public ChannelFuture bind(SocketAddress localAddress) {
2         validate();
3         return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
4     }

2、ServerBootstrap的doBind方法

    doBind方法位於父類AbstractBootstrap中,它有兩大功能,均在下面代碼中標識了出來,它們分別對應通過原生nio進行server端初始化時的兩個功能,第1步對應將channel註冊到selector上;第2步對應將server地址綁定到channel上。

 1 private ChannelFuture doBind(final SocketAddress localAddress) {
 2         final ChannelFuture regFuture = initAndRegister(); // 1)、初始化和註冊,重要***
 3         final Channel channel = regFuture.channel();
 4         if (regFuture.cause() != null) {
 5             return regFuture;
 6         }
 7 
 8         if (regFuture.isDone()) {
 9             // At this point we know that the registration was complete and successful.
10             ChannelPromise promise = channel.newPromise();
11             doBind0(regFuture, channel, localAddress, promise); // 2)、將SocketAddress和channel綁定起來,最終執行的是nio中的功能,重要**
12             return promise;
13         } else {
14             // 省略異常判斷、添加監聽器和異步調用doBind0方法
15         }
16     }

    為方便關聯對照,下面再粘貼一個簡單的原生NIO編程的服務端初始化方法,其實doBind方法的邏輯基本就是對下面這個方法的封裝,只是增加了很多附加功能。

    因為上述兩步都有些複雜,所以此處分兩部分進行追蹤。

二、AbstractBootstrap的initAndRegister方法

     該方法代碼如下所示,一共有三個核心方法,邏輯比較清晰,將channel new出來,初始化它,然後註冊到selector上。下面我們各個擊破。

 1 final ChannelFuture initAndRegister() {
 2         Channel channel = null;
 3         try { // 1)、實例化channel,作為服務端初始化的是NioServerSocketChannel
 4             channel = channelFactory.newChannel();
 5             init(channel); // 2)、初始化channel,即給channel中的屬性賦值
 6         } catch (Throwable t) {
 7             if (channel != null) {
 8                 channel.unsafe().closeForcibly();
 9                 return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
10             }
11             return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
12         }
13         // 3)、註冊,即最終是將channel 註冊到selector上
14         ChannelFuture regFuture = config().group().register(channel);
15         if (regFuture.cause() != null) {
16             if (channel.isRegistered()) {
17                 channel.close();
18             } else {
19                 channel.unsafe().closeForcibly();
20             }
21         }
22         return regFuture;
23     }

1、channelFactory.newChannel()方法

1 @Override
2     public T newChannel() {
3         try {
4             return constructor.newInstance();
5         } catch (Throwable t) {
6             throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
7         }
8     }

    該方法完成了channel的實例化,channelFactory的賦值可參見上一篇博文【Netty源碼學習系列之3-ServerBootstrap的初始化】(地址 https://www.cnblogs.com/zzq6032010/p/13027161.html),對服務端來說,這裏channelFactory值為ReflectiveChannelFactory,且其內部的constructor是NioServerSocketChannel的無參構造器,下面追蹤NioServerSocketChannel的無參構造方法。

1.1)、new NioServerSocketChannel()

1 public NioServerSocketChannel() {
2         this(newSocket(DEFAULT_SELECTOR_PROVIDER));
3     }
 1 private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
 2 
 3 private static ServerSocketChannel newSocket(SelectorProvider provider) {
 4         try {
 5             return provider.openServerSocketChannel();
 6         } catch (IOException e) {
 7             throw new ChannelException(
 8                     "Failed to open a server socket.", e);
 9         }
10     }

    可見,它先通過newSocket方法獲取nio原生的ServerSocketChannel,然後傳給了重載構造器,如下,其中第三行是對NioServerSocketChannelConfig  config進行了賦值,邏輯比較簡單,下面主要看對父類構造方法的調用。

1 public NioServerSocketChannel(ServerSocketChannel channel) {
2         super(null, channel, SelectionKey.OP_ACCEPT);
3         config = new NioServerSocketChannelConfig(this, javaChannel().socket());
4     }

1.2)、對NioServerSocketChannel父類構造方法的調用

 1 protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
 2         super(parent);
 3         this.ch = ch;
 4         this.readInterestOp = readInterestOp;
 5         try {
 6             ch.configureBlocking(false);
 7         } catch (IOException e) {
 8             try {
 9                 ch.close();
10             } catch (IOException e2) {
11                 if (logger.isWarnEnabled()) {
12                     logger.warn(
13                             "Failed to close a partially initialized socket.", e2);
14                 }
15             }
16 
17             throw new ChannelException("Failed to enter non-blocking mode.", e);
18         }
19     }

    中間經過了AbstractNioMessageChannel,然後調到下面AbstractNioChannel的構造方法。此時parent為null,ch為上面獲取到的nio原生ServerSocketChannel,readInterestOp為SelectionKey的Accept事件(值為16)。可以看到,將原生渠道ch賦值、感興趣的事件readInterestOp賦值、設置非阻塞。然後重點看對父類構造器的調用。

1.3)、AbstractChannel構造器

1 protected AbstractChannel(Channel parent) {
2         this.parent = parent;
3         id = newId();
4         unsafe = newUnsafe();
5         pipeline = newChannelPipeline();
6     }

    可以看到,此構造方法只是給四個屬性進行了賦值,我們挨個看下這四個屬性。

    第一個屬性是this.parent,類型為io.netty.channel.Channel,但此時值為null;

    第二個屬性id類型為io.netty.channel.ChannelId,就是一個id生成器,值為new DefaultChannelId();

    第三個屬性unsafe類型為io.netty.channel.Channel.Unsafe,該屬性很重要,封裝了對事件的處理邏輯,最終調用的是AbstractNioMessageChannel中的newUnsafe方法,賦的值為new NioMessageUnsafe();

    第四個屬性pipeline類型為io.netty.channel.DefaultChannelPipeline,該屬性很重要,封裝了handler處理器的邏輯,賦的值為 new DefaultChannelPipeline(this)  this即當前的NioServerSocketChannel對象。

    其中DefaultChannelPipeline的構造器需要額外看一下,如下,將NioServerSocketChannel對象存入channel屬性,然後初始化了tail、head兩個成員變量,且對應的前後指針指向對方。TailContext和HeadContext都繼承了AbstractChannelHandlerContext,在這個父類裏面維護了next和prev兩個雙向指針,看到這裡有經驗的園友應該一下子就能看出來,DefaultChannelPipeline內部維護了一個雙向鏈表。

 1 protected DefaultChannelPipeline(Channel channel) {
 2         this.channel = ObjectUtil.checkNotNull(channel, "channel");
 3         succeededFuture = new SucceededChannelFuture(channel, null);
 4         voidPromise =  new VoidChannelPromise(channel, true);
 5 
 6         tail = new TailContext(this);
 7         head = new HeadContext(this);
 8 
 9         head.next = tail;
10         tail.prev = head;
11     }

 

     至此,完成了上面initAndRegister方法中的第一個功能:channel的實例化。此時NioServerSocketChannel的幾個父類屬性快照圖如下所示:

 

2、init(channel)方法

    init(channel)方法位於ServerBootstrap中(因為這裡是通過ServerBootstrap過來的,如果是通過Bootstrap進入的這裏則調用的就是Bootstrap中的init方法),主要功能如下註釋所示。本質都是針對channel進行初始化,初始化channel中的option、attr和pipeline。

 1 void init(Channel channel) throws Exception {
 2         // 1、獲取AbstractBootstrap中的options屬性,與channel進行關聯
 3         final Map<ChannelOption<?>, Object> options = options0();
 4         synchronized (options) {
 5             setChannelOptions(channel, options, logger);
 6         }
 7         // 2、獲取AbstractBootstrap中的attr屬性,與channel關聯起來
 8         final Map<AttributeKey<?>, Object> attrs = attrs0();
 9         synchronized (attrs) {
10             for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
11                 @SuppressWarnings("unchecked")
12                 AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
13                 channel.attr(key).set(e.getValue());
14             }
15         }
16         // 3、獲取pipeline,並將一個匿名handler對象添加進去,重要***
17         ChannelPipeline p = channel.pipeline();
18         final EventLoopGroup currentChildGroup = childGroup;
19         final ChannelHandler currentChildHandler = childHandler;
20         final Entry<ChannelOption<?>, Object>[] currentChildOptions;
21         final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
22         synchronized (childOptions) {
23             currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
24         }
25         synchronized (childAttrs) {
26             currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
27         }
28         p.addLast(new ChannelInitializer<Channel>() {
29             @Override
30             public void initChannel(final Channel ch) throws Exception {
31                 final ChannelPipeline pipeline = ch.pipeline();
32                 ChannelHandler handler = config.handler();
33                 if (handler != null) {
34                     pipeline.addLast(handler);
35                 }
36 
37                 ch.eventLoop().execute(new Runnable() {
38                     @Override
39                     public void run() {
40                         pipeline.addLast(new ServerBootstrapAcceptor(
41                                 ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
42                     }
43                 });
44             }
45         });
46     }

    1跟2的功能都比較容易理解,功能3是init的核心,雖然代碼不少但很容易理解,它就是往channel中的pipeline里添加了一個匿名handler對象,其initChannel方法只有在有客戶端連接接入時才會調用,initChannel方法的功能是什麼呢?可以看到,它就是往入參channel中的eventLoop里添加了一個任務,這個任務的功能就是往pipeline中再添加一個handler,最後添加的這個handler就不是匿名的了,它是ServerBootstrapAcceptor對象。因為這裏的initChannel方法和後面的run方法都是有客戶端接入時才會調用的,所以這裏只是提一下,後面會詳述。至此完成init方法,下面進入register。

3、config().group().register(channel)方法

 3.1)、config().group()方法

    由前面可以知道,config().group().register(channel)這行代碼位於AbstractBootstrap類中的initAndRegister方法中,但由於當前對象是ServerBootstrap,故此處config()方法實際調用的都是ServerBootstrap中重寫的方法,得到了ServerBootstrapConfig。

    ServerBootstrapConfig的group方法如下,調用的是它的父類AbstractBootstrapConfig中的方法。通過類名就能知道,ServerBootstrapConfig中的方法是獲取ServerBootstrap中的屬性,而AbstractBootstrapConfig中的方法是獲取AbstractBootstrap中的屬性,兩兩對應。故此處獲取的EventLoopGroup就是AbstractBootstrap中存放的group,即文章開頭demo中的boss對象。

1 public final EventLoopGroup group() {
2         return bootstrap.group();
3     }

    獲取到了名叫boss的這個NioEventLoopGroup對象,下面追蹤NioEventLoopGroup.register(channel)方法

3.2)、 NioEventLoopGroup.register(channel)方法

    該方法是對之前初始化屬性的應用,需結合NioEventLoopGroup的初始化流程看,詳見【Netty源碼學習系列之2-NioEventLoopGroup的初始化】(鏈接【https://www.cnblogs.com/zzq6032010/p/12872989.html】)一文,此處就不贅述了,下面把該類的繼承類圖粘貼出來,以便有個整體認識。

 

3.2.1)、next()方法 

    下面的register方法位於MultithreadEventLoopGroup類中,是NioEventLoopGroup的直接父類,如下:

1 public ChannelFuture register(Channel channel) {
2         return next().register(channel);
3     }

    next方法如下,調用了父類的next方法,下面的就是父類MultithreadEventExecutorGroup中的next實現,可以看到調用的是chooser的next方法。通過初始化流程可知,此處boss的線程數是1,是2的n次方,所以chooser就是PowerOfTwoEventExecutorChooser,通過next方法從EventExecutor[]中選擇一個對象。需要注意的是chooser.next()通過輪詢的方式選擇的對象。

1 public EventLoop next() {
2         return (EventLoop) super.next();
3     }
1 public EventExecutor next() {
2         return chooser.next();
3     }

3.2.2)、NioEventLoop.register方法

    next之後是register方法,中間將NioServerSocketChannel和當前的NioEventLoop封裝成一個DefaultChannelPromise對象往下傳遞,在下面第二個register方法中可以看到,實際上調用的是NioServerSocketChannel中的unsafe屬性的register方法。

1 public ChannelFuture register(Channel channel) {
2         return register(new DefaultChannelPromise(channel, this));
3     }
1 public ChannelFuture register(final ChannelPromise promise) {
2         ObjectUtil.checkNotNull(promise, "promise");
3         promise.channel().unsafe().register(this, promise);
4         return promise;
5     }

3.2.3)、NioMessageUnsafe的register方法

    通過本文第一部分中第1步中的1.3)可以知道,NioServerSocketChannel中的unsafe是NioMessageUnsafe對象,下面繼續追蹤其register方法:

 1 public final void register(EventLoop eventLoop, final ChannelPromise promise) {
 2             if (eventLoop == null) {// 判斷非空
 3                 throw new NullPointerException("eventLoop");
 4             }
 5             if (isRegistered()) {// 判斷是否註冊
 6                 promise.setFailure(new IllegalStateException("registered to an event loop already"));
 7                 return;
 8             }
 9             if (!isCompatible(eventLoop)) {// 判斷eventLoop類型是否匹配
10                 promise.setFailure(
11                         new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
12                 return;
13             }
14        // 完成eventLoop屬性的賦值
15             AbstractChannel.this.eventLoop = eventLoop;
16             // 判斷eventLoop中的Reactor線程是不是當前線程 ***重要1
17             if (eventLoop.inEventLoop()) {
18                 register0(promise); // 進行註冊
19             } else {
20                 try {// 不是當前線程則將register0任務放入eventLoop隊列中讓Reactor線程執行(如果Reactor線程未初始化還要將其初始化) ***重要2
21                     eventLoop.execute(new Runnable() {
22                         @Override
23                         public void run() {
24                             register0(promise);// 註冊邏輯 ***重要3
25                         }
26                     });
27                 } catch (Throwable t) {
28                     // 省略異常處理
29                 }
30             }
31         }

    該方法位於io.netty.channel.AbstractChannel.AbstractUnsafe中(它是NioMessageUnsafe的父類),根據註釋能了解每一步做了什麼,但如果要理解代碼邏輯意圖則需要結合netty的串行無鎖化(串行無鎖化參見博主的netty系列第一篇文章https://www.cnblogs.com/zzq6032010/p/12872993.html)。它實際就是讓每一個NioEventLoop對象的thread屬性記錄一條線程,用來循環執行NioEventLoop的run方法,後續這個channel上的所有事件都由這一條線程來執行,如果當前線程不是Reactor線程,則會將任務放入隊列中,Reactor線程會不斷從隊列中獲取任務執行。這樣以來,所有事件都由一條線程順序處理,線程安全,也就不需要加鎖了。

    說完整體思路,再來結合代碼看看。上述代碼中標識【***重要1】的地方就是通過inEventLoop方法判斷eventLoop中的thread屬性記錄的線程是不是當前線程:

    先調到父類AbstractEventExecutor中,獲取了當前線程:

1 public boolean inEventLoop() {
2         return inEventLoop(Thread.currentThread());
3     }

    然後調到SingleThreadEventExecutor類中的方法,如下,比對thread與當前線程是否是同一個:

1 public boolean inEventLoop(Thread thread) {
2         return thread == this.thread;
3     }

    此時thread未初始化,所以肯定返回false,則進入【***重點2】的邏輯,將register放入run方法中封裝成一個Runnable任務,然後執行execute方法,如下,該方法位於SingleThreadEventExecutor中:

 1 public void execute(Runnable task) {
 2         if (task == null) {
 3             throw new NullPointerException("task");
 4         }
 5 
 6         boolean inEventLoop = inEventLoop();
 7         addTask(task); //將任務放入隊列中 ***重要a
 8         if (!inEventLoop) {
 9             startThread(); //判斷當前線程不是thread線程,則調用該方法 ***重要b
10             if (isShutdown()) {
11                 boolean reject = false;
12                 try {
13                     if (removeTask(task)) {
14                         reject = true;
15                     }
16                 } catch (UnsupportedOperationException e) {
17                     // 省略註釋
18                 }
19                 if (reject) {
20                     reject();
21                 }
22             }
23         }
24 
25         if (!addTaskWakesUp && wakesUpForTask(task)) {
26             wakeup(inEventLoop);
27         }
28     }

    有兩個重要的邏輯,已經在上面代碼中標出,先看看【***重要a】,如下,可見最終就是往SingleThreadEventExecutor的taskQueue隊列中添加了一個任務,如果添加失敗則調reject方法執行拒絕策略,通過前文分析可以知道,此處的拒絕策略就是直接拋錯。

1 protected void addTask(Runnable task) {
2         if (task == null) {
3             throw new NullPointerException("task");
4         }
5         if (!offerTask(task)) {
6             reject(task);
7         }
8     }
1 final boolean offerTask(Runnable task) {
2         if (isShutdown()) {
3             reject();
4         }
5         return taskQueue.offer(task);
6     }

    然後在看【***重要b】,如下,該方法雖然叫startThread,但內部有控制,不能無腦開啟線程,因為調這個方法的時候會有兩種情況:1).thread變量為空;2).thread不為空且不是當前線程。第一種情況需要開啟新的線程,但第二種情況就不能直接創建線程了。所以看下面代碼可以發現,它內部通過CAS+volatile(state屬性加了volatile修飾)實現的開啟線程的原子控制,保證多線程情況下也只會有一個線程進入doStartThread()方法。

 1 private void startThread() {
 2         if (state == ST_NOT_STARTED) {
 3             if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
 4                 boolean success = false;
 5                 try {
 6                     doStartThread();
 7                     success = true;
 8                 } finally {
 9                     if (!success) {
10                         STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
11                     }
12                 }
13             }
14         }
15     }

    繼續往下看一下doStartThread()的方法邏輯:

 1 private void doStartThread() {
 2         assert thread == null;
 3         executor.execute(new Runnable() { //此處的executor內部執行的就是ThreadPerTaskExecutor的execute邏輯,創建一個新線程運行下面的run方法
 4             @Override
 5             public void run() {
 6                 thread = Thread.currentThread(); //將Reactor線程記錄到thread變量中,保證一個NioEventLoop只有一個主線程在運行
 7                 if (interrupted) {
 8                     thread.interrupt();
 9                 }
10 
11                 boolean success = false;
12                 updateLastExecutionTime();
13                 try {
14                     SingleThreadEventExecutor.this.run(); //調用當前對象的run方法,該run方法就是Reactor線程的核心邏輯方法,後面會重點研究
15                     success = true;
16                 } catch (Throwable t) {
17                     logger.warn("Unexpected exception from an event executor: ", t);
18                 } finally {
19                    // 省略無關邏輯
20                 }
21             }
22         });
23     }

    可以看到,在上面的方法中完成了Reactor線程thread的賦值和核心邏輯NioEventLoop中run方法的啟動。這個run方法啟動后,第一步做的事情是什麼?讓我們往前回溯,回到3.2.3),當然是執行當初封裝了 register0方法的那個run方法的任務,即執行register0方法,下面填之前埋得坑,對【***重要3】進行追蹤:

 1 private void register0(ChannelPromise promise) {
 2             try {
 3                 // 省略判斷邏輯
 4                 boolean firstRegistration = neverRegistered;
 5                 doRegister();// 執行註冊邏輯
 6                 neverRegistered = false;
 7                 registered = true;
 8                 pipeline.invokeHandlerAddedIfNeeded();// 調用pipeline的邏輯
 9 
10                 safeSetSuccess(promise);
11                 pipeline.fireChannelRegistered();
12                 // 省略無關邏輯
13             } catch (Throwable t) {
14                 // 省略異常處理
15             }
16         }

    doRegister()方法的實現在AbstractNioChannel中,如下,就是完成了nio中的註冊,將nio的ServerSocketChannel註冊到selector上:

 1 protected void doRegister() throws Exception {
 2         boolean selected = false;
 3         for (;;) {
 4             try {
 5                 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
 6                 return;
 7             } catch (CancelledKeyException e) {
 8                // 省略異常處理
 9             }
10         }
11     }

    再看pipeline.invokeHandlerAddedIfNeeded()方法,該方法調用鏈路比較長,此處就不詳細粘貼了,只是說一下流程。回顧下上面第二部分的第2步,在裏面最後addLast了一個匿名的內部對象,重寫了initChannel方法,此處通過pipeline.invokeHandlerAddedIfNeeded()方法就會調用到這個匿名對象的initChannel方法(只有第一次註冊時才會調),該方法往pipeline中又添加了一個ServerBootstrapAcceptor對象。執行完方法后,netty會在finally中將之前那個匿名內部對象給remove掉,這時pipeline中的handler如下所示:

 

     至此,算是基本完成了initAndRegister方法的邏輯,當然限於篇幅(本篇已經夠長了),其中還有很多細節性的處理未提及。

 

三、AbstractBootstrap的doBind0方法

     doBind0方法邏輯如下所示,new了一個Runnable任務交給Reactor線程執行,execute執行過程已經分析過了,此處不再贅述,集中下所剩無幾的精力看下run方法中的bind邏輯。

 1 private static void doBind0(
 2             final ChannelFuture regFuture, final Channel channel,
 3             final SocketAddress localAddress, final ChannelPromise promise) {
 4 
 5         channel.eventLoop().execute(new Runnable() {
 6             @Override
 7             public void run() {
 8                 if (regFuture.isSuccess()) {
 9                     channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
10                 } else {
11                     promise.setFailure(regFuture.cause());
12                 }
13             }
14         });
15     }

    channel.bind方法,如下:

1 public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
2         return pipeline.bind(localAddress, promise);
3     }

    調用了pipeline的bind方法:

1 public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
2         return tail.bind(localAddress, promise);
3     }

    tail.bind方法:

 1 public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
 2         if (localAddress == null) {
 3             throw new NullPointerException("localAddress");
 4         }
 5         if (isNotValidPromise(promise, false)) {
 6             // cancelled
 7             return promise;
 8         }
 9         // 從tail開始往前,找到第一個outbond的handler,這時只有head滿足要求,故此處next是head
10         final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
11         EventExecutor executor = next.executor();
12         if (executor.inEventLoop()) {// 因為當前線程就是executor中的Reactor線程,所以直接進入invokeBind方法
13             next.invokeBind(localAddress, promise);
14         } else {
15             safeExecute(executor, new Runnable() {
16                 @Override
17                 public void run() {
18                     next.invokeBind(localAddress, promise);
19                 }
20             }, promise, null);
21         }
22         return promise;
23     }

    下面進入head.invokeBind方法:

 1 private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
 2         if (invokeHandler()) {
 3             try {
 4                 ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
 5             } catch (Throwable t) {
 6                 notifyOutboundHandlerException(t, promise);
 7             }
 8         } else {
 9             bind(localAddress, promise);
10         }
11     }

    核心邏輯就是handler.bind方法,繼續追蹤:

1 public void bind(
2                 ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
3             unsafe.bind(localAddress, promise);
4         }

    此處的unsafe是NioMessageUnsafe,繼續追蹤會看到在bind方法中又調用了NioServerSocketChannel中的doBind方法,最終在這裏完成了nio原生ServerSocketChannel和address的綁定:

1 protected void doBind(SocketAddress localAddress) throws Exception {
2         if (PlatformDependent.javaVersion() >= 7) {
3             javaChannel().bind(localAddress, config.getBacklog());
4         } else {
5             javaChannel().socket().bind(localAddress, config.getBacklog());
6         }
7     }

    至此,ServerBootstrap的bind方法完成。

 

小結

    本文從頭到尾追溯了ServerBootstrap中bind方法的邏輯,將前面netty系列中的二、三兩篇初始化給串聯了起來,是承上啟下的一個位置。後面的netty系列將圍繞本文中啟動的NioEventLoop.run方法展開,可以這麼說,本文跟前面的三篇只是為run方法的出現做的一個鋪墊,run方法才是核心功能的邏輯所在。

    本文斷斷續續更新了一周,今天才完成,也沒想到會這麼長,就這樣吧,後面繼續netty run方法的學習。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※回頭車貨運收費標準

※產品缺大量曝光嗎?你需要的是一流包裝設計!

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※推薦評價好的iphone維修中心

※教你寫出一流的銷售文案?

台中搬家公司教你幾個打包小技巧,輕鬆整理裝箱!

台中搬家遵守搬運三大原則,讓您的家具不再被破壞!

警逮黑心環保蟑螂 大台北600萬戶飲用水險遭污染

摘錄自2020年4月7日自由時報報導

專責環保、食藥安全的保安警察第七總隊三大隊,今(7日)宣佈破獲林姓男子等九名環保蟑螂,藉以清運廢棄物名義,卻將廢溶劑及其他廢料物品,棄置在新北市新店區平廣段的特定水源保護區內,險些嚴重污染大台北地區600萬戶民眾飲用水安全。

保七總隊表示,警方除依廢棄物清理法將林嫌等人送辦外,還將從重依據新增修刑法第190條之1條「投棄、放流、排出、放逸或以他法使毒物或其他有害健康之物污染空氣、土壤、河川或其他水體」罪,求處5年以下有期徒刑、拘役或科或併科1000萬元以下罰金。

保七總隊三大隊是與環保署分別接獲通報,指稱新北市新店區平廣段遭人堆置大量廢棄物。警方經派員前往案發地執行調查,現場發現遭人棄置20公升廢液桶、廢油墨桶、廢油桶、廢布等事業廢棄物,且檢驗結果均為有害事業廢棄物。

【其他文章推薦】

※廢氣洗滌塔,叫得動, 找得到的專業廠商‎

※市面十大品牌封口機!該如何選購?

塑膠射出成型加工商品有哪些?

貨梯使用安全與保養

※掌握產品行銷策略,帶你認識商品包裝設計基本要素

臭氧機推薦

推動離岸風電 永冠集團斥資成立風電鑄件廠

摘錄自2020年8月18日自由財經報導

台灣風機鑄件龍頭「永冠集團」今(18)日在台中港廠區舉辦動土典禮,永冠設廠使台中港成為亞太風電供應基地。

行政院副院長沈榮津指出,除了離岸風電鑄件廠動土外,旁邊天力公司離岸風電葉片製造工廠、西門子歌美颯(SGRE)公司的機艙組裝廠也都在興建中,已建構出台中港離岸風電專區產業價值鏈。

台中市副市長令狐榮達表示,台中港是個深水港,距離風場運程近、後線腹地充足,可作為推動離岸風機重件施工碼頭最佳基地。永冠董事長張賢銘表示,台中港區海上運輸便利,也是擁有優質風能地理條件,加上政府規劃風電專區,讓永冠最終選擇扎根台中港。新廠完成後預計年產8至10噸,期望在最先進的設備方案規劃下,不只完整供應台灣市場,還能以台中廠作為亞太區大型風機的供應鏈樞紐,有能力供應歐美市場。

【其他文章推薦】

新北市探針選用參考標準?

※如何正確使用飲水機?

※攻戰消費者第一視覺,包裝設計很重要!

滑鼠墊適用各種文宣活動廣告曝光,專業客製服務

封口機購物網-不怕你比價,就怕你買貴!

日貨輪觸礁漏油毀生態 模里西斯擬求償9.9億

摘錄自2020年9月1日自由時報報導

日本大型貨輪「若潮號」(WAKASHIO)7月在模里西斯近海觸礁,超過千噸燃油外洩造成當地生態浩劫,模里西斯政府計畫求償13.4億模里西斯盧比(約新台幣9.95億元)。另外,協助清理漏油的拖船昨晚(31日)撞上駁船,造成2死2失蹤。

綜合外媒報導,日本商船三井集團營運的巴拿馬籍大型散貨輪「若潮號」7月25日觸礁時,載有約3800噸燃油,據悉,有1000多噸燃油外洩海上,被污染的沿海地區已禁止捕魚,模里西斯政府也在8月7日宣布進入環境緊急狀態,擔心瀕物種恐受到影響。

根據模里西斯政府文件顯示的擬議計畫,當局將向日方求償13.4億模里西斯盧比,這筆錢將用來支持受到漏油影響的當地漁民社區,其中12億盧比用於建造100艘漁船、970萬盧比用於為475名漁民和60名船長提供培訓。

【其他文章推薦】

無塵擦拭布各大品牌廠商販售比價網!

※如何正確使用飲水機?

※掌握產品行銷策略,帶你認識商品包裝設計基本要素

※精密CNC 自動車床設備介紹

空壓機這裡買最划算!

利用網路販售候鳥 保育人士籲檢舉不法

摘錄自2020年10月8日公視報導

最近正值候鳥過境期,網路社交平台也出現許多賣鳥的訊息,保育人士除了呼籲大家勇於檢舉,台南市農業局也強調,獵捕一般類的野生動物至少要罰6萬元。

拍鳥俱樂部會長黃蜀婷表示,「他們就在裡頭給了我一些畫面,他們抓的數量蠻多的,而且天天都在交易,我希望能夠發揮大眾的力量去檢舉他們,讓他們下架這個社團,沒有交易的平台。」

黃蜀婷強調,賣鳥社團被檢舉後也立即改名,隨後「封社」避免遭查 。雖然藍磯鶇並非保育類動物,不過台南市農業局表示,未經許可獵捕一般類野生動物一樣有罰緩,若是保育類動物則是罰則更重。

【其他文章推薦】

石墨與鑽石區別在哪?

※專業客製化禮物、贈品設計,辦公用品常見【L夾】搖身一變大受好評!!

※高價位跟低價位的示波器又有何差異?

無塵擦拭布各大品牌廠商販售比價網!

※想知道CNC 自動車床與CNC車床有何區別??

特殊螺絲工廠批發-精密零件專售

※餐飲器具首選, 多種功能側火烤爐設備,小本經營最佳生財器具