特斯拉新款可能推 Model S 超跑版本 3秒加速至 96 公里

美國電動車商特斯拉 (Tesla) 即將在美國時間 10 月 9 日召開媒體大會,發表與字母「D」有關的神秘商品。特斯拉執行長 Elon Musk 一週前透過 Twitter 揭露相關訊息後,現在又再度透露了一點線索。   Vanity Fair 報導,Musk 8 日表示,特斯拉即將發表的神秘商品其實早就現身,只是人們都還沒有意識到而已。他說,網友非常會猜,方向大致都正確,但大家都還沒有意識到規模有多大。   Musk 的說法實在太過含糊,但最新的網路謠言也許可以讓大家稍稍解惑。Electrek 9 日引述訊息人士報導,特斯拉會在 9 日的大會上發表「P85D」,這是「Model S」的超跑版本,只要花 3 秒鐘就可把時速從零加快到 60 英里 (大約 96 公里),速度非常驚人。   根據報導,如此的表現已超越布卡堤 (Bugattis)、藍寶堅尼 (Lamborghinis)、麥拿倫 (Mclarens)、法拉利 (Ferraris)、保時捷 (Porsches) 等市面上最高級的超跑車種。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※幫你省時又省力,新北清潔一流服務好口碑

※回頭車貨運收費標準

F-英瑞搭電動車商機 下一步瞄準特斯拉

北美汽車水箱龍頭大廠英瑞國際 F-英瑞 9月受惠出貨成長,單月營收為 3.3 億元,累計前 9 月營收為 36.85 億元,年增 14.65%,英瑞看好電動車市場,攜手美國大廠合作開發新世代電動貨卡 (pick up),首年訂單銷售將達數萬台,下一步將接觸 TESLA,全面衝刺電動車市場。   F-英瑞第三季單季營收達到 11.13 億元,年增 1.64%,公司表示,北美車市持續改善,帶動 AM 市場成長,公司目前供應多家美國大型連鎖通路訂單,帶動今年成長,目前中國市場已經開始進入暖身階段,預計 2015 年中國市場也將成為另一成長引擎,公司看好明年在北美、中國兩大成長帶動下,營運將會水漲船高。   且為進軍電動車市場,F-英瑞將與美國大廠合作開發新世代電動貨卡,由於新款設計採用汽油發電來帶動運轉,整體續航力可以超過 600 公里,客戶對銷售相當樂觀,預計第 1 年銷售量就可以達到數萬台,由於電動車需要 4 套的冷卻系統,是一般車輛的 4 倍,因此業績將會倍數成長,下一階段將會將會接觸 Tesla,全面衝刺電動車市場。   除了中國、北美市場成長之外,F-英瑞也積極布局現在市佔率偏低的歐洲市場,由於全球汽車市場的成長,公司將積極規畫新產能,預計 2015 年將於東協設廠規劃 200 萬年產能的新廠。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

南投搬家公司費用需注意的眉眉角角,別等搬了再說!

新北清潔公司,居家、辦公、裝潢細清專業服務

※教你寫出一流的銷售文案?

約定民生銀行 特斯拉將在中國20城市建400個充電樁

特斯拉與民生銀行達成合作,雙方約定將共同在中國20個城市的民生銀行自有營業廳及社區金融門店建至少400個充電樁。截至8月,特斯拉在中國建成的目的地充電裝置已超過370個。在此之前,特斯拉用了一年時間才在中國建了16個超級充電站。   此前,特斯拉已經與銀泰、SOHO和中國聯通簽署過類似的合作協定。其中,特斯拉與地產商銀泰集團宣佈合作啟動“目的地充電”項目,充電1個小時可行駛40公里;還計畫年內依託聯通營業廳在全國120個城市共同建設400個目的地充電站和20個城市超級充電站。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※想知道最厲害的網頁設計公司"嚨底家"!

※幫你省時又省力,新北清潔一流服務好口碑

※別再煩惱如何寫文案,掌握八大原則!

電動機車 E-bike 上路 試營運前 30 分鐘免費

  除了有腳踏車 U-bike 可租借,現在連電動機車都能租借得到了。台灣城市動力公司開發自動化公共 E-bike 租借系統,試營運期間提供前 30 分鐘免費優惠,民眾前往租借還可參加多項大獎的抽獎活動。   台灣城市動力公司建構的E-bike租借系統,上周率先在新北市板橋區縣民大道與漢生東路交口的車站停車廣場供國人自由租借,民眾透過悠遊卡註冊後就可選車、取車、還車、付費,方便性與租借U-bike無異。開放試租首日在一個站點就吸引數百民眾註冊借車,該公司相信未來E-bike站點持續擴增後,勢必讓政府推動的機車電動化進度呈跳躍式進展。   台灣城市動力公司總經理洪國修表示,環保減碳是政府重要的施政項目,為提高民眾騎乘電動二輪車之意願,環保署除提供購買電動車 2 萬 4 千元的補助,台灣城市動力公司也配合這項政策,從便民方向建置全球首創的電池交換站系統(BES),民眾在 30 秒即可完成電池更換,免除電池充電過久或保固維護的不便。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

新北清潔公司,居家、辦公、裝潢細清專業服務

※別再煩惱如何寫文案,掌握八大原則!

※教你寫出一流的銷售文案?

※超省錢租車方案

其他動力電池車望塵莫及 鋁氣電池續航達 1,600 公里

不讓美國特斯拉電動車專美於前,美鋁公司(Alcoa)宣稱已完成由以色列飛能(Phinergy)研發成功的鋁氣電池(Al–air batteries),並且達成航距超過 1,600 公里的測試。這項研發讓相繼投入電動車領域的德國寶馬、日本豐田等相形失色。   美鋁和以色列飛能最近在美國蒙特利爾維倫紐夫賽車場,測試了以鋁空氣電池為能源的車輛,驚人的是續航里程竟上升到 994 英里。在金屬空氣電池中,鋁空氣電池就是其中之一,如今這 1,600 千公里的續航力已遠超過包括特斯拉 Model S 在內等現有各類動力電池車。其他插電式混合動力車、增程式電動車(續航在 500 公里左右)或者豐天、現代所謂的氫燃料電池車(續航超過 500-600 公里)等,也同樣望塵莫及。   飛能公司說,這一組電池重僅約 100 公斤,裝有 50 塊電池板,平均 1 塊鋁空氣電池板就可驅動車輛行駛約 32 公里。但鋁空氣電池的放電過程會導致陽極腐蝕產生氫,導致陽極材料過度消耗,並增加電池內部損耗,使其商業化進程放緩。   然而,鋁空氣電池維護非常方便。按現有技術方案,鋁空氣電池是作為鋰電池的補充電源,鋰電池能量耗盡後,鋁空氣電池才接手,所以使用者無需進行充電,每 1-2 個月注入自來水維持其化學反應,每年也只需讓技術人員進行一次保養即可,電池壽命甚至可達 20 至 30 年。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

新北清潔公司,居家、辦公、裝潢細清專業服務

※別再煩惱如何寫文案,掌握八大原則!

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※超省錢租車方案

※教你寫出一流的銷售文案?

Day10-微信小程序實戰-交友小程序-創建friendList字段實現好友關係(添加好友功能)–內附代碼

回顧:之前我們進行了刪除的功能,以及對message消息的增刪,下面實現添加好友的功能

我們先在數據庫中,在message這個字段的list裏面,添加上測試號的id,就是模擬這個兩個測試號要加我主號的效果“

 

 

 

就可以達到這個效果了

 

下面我們正式開始實現

1、在removeList的wxml的昵稱結構處添加一個點擊事件handleAddFriend

2、在removeList.js中來實現這個點擊事件即可的,並且它也是要提示(讓用戶選擇確認的這種,所以就可以直接copy前面的刪除按鈕的代碼

 直接把hanleDelMessage函數裏面的:

   wx.showModal({
        title: '提示信息',
        content: '刪除消息',
        confirmText: "刪除",
        success: (res) => {
          if (res.confirm) {
            
          } else if (res.cancel) {
            console.log('用戶點擊取消')
          }
        }
      })
    }

也就是只用設計在點擊了確定之後的一些列操作即可了

3、因為要構建好友之間的關係,所以要在user裏面加一個friendlist字段才行了,並且這個字段的數據類型是數組的,因為好友肯定不只是一個

4、在數據庫中給每個人都添加一個friendList字段

 

 但是不要忘記了在原來的程序中 初始化 用戶的時候也要加上讀i這個字段的初始化才行的

打開user.js:

 

5、後面的事情大概的邏輯就是:同意了好友申請的話,這個用戶的id就會出現在這個用戶的friendList數組裡面了

所以就可以回到removeList.js文件中去了

6、通過在開頭的時候 設置 const _ = db.command 就可以讓這個文件有了運算的能力了

handleAddFriend(){
      wx.showModal({
        title: '提示信息',
        content: '申請好友',
        confirmText: "同意",
        success: (res) => {
          if (res.confirm) {
              db.collection('users').doc(app.userInfo._id).update({
                data:{
                    friendList : _.unshift(this.data.messageId)
                }
              }).then((res)=>{});
          } else if (res.cancel) {
            console.log('用戶點擊取消')
          }
        }
      })
    }
  }

 

 

在點擊消息之後,就可以選擇“確定”,點擊了之後,就可以在數據庫上面看到添加的用戶好友了

(以上就是第一步,把要申請的用戶的id給添加過來了)—-但是問題來了,我的主號裏面的friendList裏面有了這個申請人的id,但是這個申請人在我

同意了之後,它的friendList字段裏面也應該有我的主號的id才對的—也就是同時添加他們兩個的好友關係即可

 

 但是能不能通過上面的這種方式,把兩個變量之間的值作為交換就可以的,普通的數據庫裏面是可以這樣的。但是在小程序中式不可以這樣進行操作的

(因為在小程序裏面對數據庫的訪問式有權限的,在客戶端是組偶到這樣的操作

(也就是對其他的賬號進行更新操作的話在客戶端裏面是不允許的—同理也是要在服務端裏面來實現的))

–也就是要用雲函數來實現了

7、如果要像的客戶端中調用unshift一樣的話在服務端裏面進行調用的話,之前也搞過就是用一個模板字符串的寫法 用Esc下面的那個 “類單引號”的符號

來進行包裹就好了

 wx.cloud.callFunction({
            name : 'update',
            data : {
              collection : 'users',
              doc : this.data.messageId,
              data : {
                friendList: ` _.unshift('${app.userInfo._id}')`
              }
            }
          });

 

 出現的錯誤就是,我們把unshift也一起搞過來了,也就是我們傳過去的字符串沒有解析成功

 主要就是下面這裏寫錯了

 data : `{ friendList:  _.unshift('${app.userInfo._id}')}`

是把後面整個的串都用一個`   ….   `來圍住的(也就是後面整個的json對象都直接被這個符號扣住的)

 

修改了之後就成功了

 

8、新建一個removeMessage 函數,可以直接copy上面的handledelMessage方法裏面的:這個函數主要是為了點擊這個消息加好友的時候,可以選擇是

添加它為好友,就是在點擊了添加好友之後,就要把這個消息刪掉了,所以這兩個地方都用到了這個功能的話,就可以把這個功能封裝在removeMesage函數裏面,如何直接用this.removeMessage來調用即可了

效果就是:點擊了添加它為好友之後,這個申請為好友的消息就會被刪掉了

 

 

db.collection('message').where({
            userId : app.userInfo._id
          }).get().then((res)=>{
              // console.log(res);
              let list = res.data[0].list;
            console.log(list);
              list = list.filter((val , i)=>{
                  return val != this.data.messageId
              });
              // console.log(list);
              wx.cloud.callFunction({
                name : 'update',
                data : {
                  collection : 'message',
                  where : {
                    userId : app.userInfo._id
                  },
                  data : {
                    list : list
                  }
                }
              }).then((res)=>{
                this.triggerEvent('myevent',list) 
              });
          });

 

 整體邏輯:

1、在removeList.wxml 中的昵稱中添加一個點擊事件 

<movable-view bindtap="handleAddFriend" direction="horizontal" class="view">{{ userMessage.nickName }}</movable-view>

2、在removeList.js中隊這個點擊事件進行渲染

 handleAddFriend(){
      wx.showModal({
        title: '提示信息',
        content: '申請好友',
        confirmText: "同意",
        success: (res) => {
          if (res.confirm) {
              db.collection('users').doc(app.userInfo._id).update({
                data:{
                    friendList : _.unshift(this.data.messageId)
                }
              }).then((res)=>{});

          //其他用戶和我構成好友的關係,用到客戶端來實現(也就是用雲函數來實現)
          wx.cloud.callFunction({
            name : 'update',
            data : {
              collection : 'users',
              doc : this.data.messageId,
              data : `{ friendList:  _.unshift('${app.userInfo._id}')}`
            }
          }).then((res)=>{});
          this.removeMessage();
          } else if (res.cancel) {
            console.log('用戶點擊取消')

          }
        }
      })
    }

3、點擊了接受邀請之後,把這個申請好友的消息刪除(和之前實現刪除功能一樣,就可以直接把這個功能封裝到removeMessage這個函數裏面

removeMessage(){
      //也就是點擊了確定的話,就不提示這個了,而是刪除信息
      //  目前沒有直接更新的,智能是這個過程就變成了先查詢然後再更新
      db.collection('message').where({
        userId: app.userInfo._id
      }).get().then((res) => {
        // console.log(res);
        let list = res.data[0].list;
        console.log(list);
        list = list.filter((val, i) => {
          return val != this.data.messageId
        });
        // console.log(list);
        wx.cloud.callFunction({
          name: 'update',
          data: {
            collection: 'message',
            where: {
              userId: app.userInfo._id
            },
            data: {
              list: list
            }
          }
        }).then((res) => {
          this.triggerEvent('myevent', list)
        });
      });

注意:因為我們開通了一個friendList 給每一個用戶數據庫字段,所以在user.js初始化用戶數據庫的時候也要加上初始化這個friendList數組才行

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※台北網頁設計公司全省服務真心推薦

※想知道最厲害的網頁設計公司"嚨底家"!

新北清潔公司,居家、辦公、裝潢細清專業服務

※推薦評價好的iphone維修中心

關於對Entity Framework Core3.1的理解與總結

Entity Framework Core 是一個ORM,所謂ORM也是ef的一個框架之一吧,簡單的說就是把C#一個類,映射到數據庫的一個表,把類裏面的屬性映射到表中的字段。然後Entity Framework Core3.1 是一個長期支持的版本。本人非常樂意對.NET社區繁榮奉獻自己的青春。希望國內以後能夠有越來越多的.NET程序員。

創建了一個.NET Standard類庫。

可以編輯文件查看netStandard版本號,我這裡是一個2.0版本。

順便簡單的說一下netStandard吧,實際上它應該就是一個開源庫,不管是.NET Core還是EntityFramework都可以引用它。就是一個底層類庫,並且該類庫可以在不同的平台運行,ios、mac、linux等。一處編寫多處運行,而且它還是開源的。當然這裏你可以把它改成2.1的版本。

 右鍵項目屬性

這樣就該好了。。。

然後又創建了一個類庫,操作和上面一樣,另一個是創建一個控制台應用,這個控制台應用是.NET Core應用的,版本應該是netcoreapp3.1,可以看看

以上準備工作完成后,就開始進行第一步操作,生成數據庫!

在Demo.Main類庫中創建所需要的類

這裏我就簡單的舉個栗子。一個聯賽類,一個是俱樂部類,一個是球員類,分別是一對多的關係。

namespace Demo.Main
{
    /// <summary>
    /// 聯賽
    /// </summary>
    public class League
    {
        public int  Id { get; set; }
        public string Name { get; set; }
        public string Country { get; set; }
    }
}
using System;
using System.Collections.Generic;

namespace Demo.Main
{
    /// <summary>
    /// 俱樂部
    /// </summary>
    public class Club
    {
        public Club()
        {
            Players=new List<Player>();//以防出現空指針引用
        }
        public int Id { get; set; }
        public string Name { get; set; }
        public string City { get; set; }
        /// <summary>
        /// 俱樂部成立日期
        /// </summary>
        public DateTime Establishment { get; set; }
        public string History { get; set; }
        /// <summary>
        /// 聯賽導航屬性
        /// </summary>
        public League League { get; set; }
        /// <summary>
        /// 一對多,一個俱樂部有多個球員
        /// </summary>
        public List<Player> Players { get; set; }
    }
}
using System;

namespace Demo.Main
{
    /// <summary>
    /// 球員
    /// </summary>
    public class Player
    {
        public int Id { get; set; }
        public string Name { get; set; }
        /// <summary>
        /// 球員出生日期
        /// </summary>
        public DateTime DateOfBirth { get; set; }
    }
}

然後就是在Data中引用Main的項目庫

對Data類庫安裝對數據庫操作的相關依賴,也就是方便對Model映射到數據庫。顯而易見,Data類庫就是操作對Model映射到數據的Code first數據庫遷移操作的。

所以必然是少不了上下文類的編寫和操作了。

接下來就是準備遷移操作了,但是對於Demo.Data類庫來說它是一個底層的類庫,所以我們就得通過Demo.App一個控制台應用來去生成數據庫,然後就要通過NuGet包進行一個引用了。

需要對數據庫操作進行可執行文件的操作需要引用一下依賴吧可以說是

這個就裝在Demo.Data項目中

這裏就可以看到它為什麼對數據庫遷移起到作用了

點擊下載即可,而後才能執行數據庫遷移操作

記得在遷移時一定要選擇上下文存在的類的項目也就是Data

然後就是用命令來操作包管理工具了,來具體總結一下吧!

你可以通過給的提示輸入命令也就是get-help NuGet查看具體的一些幫助命令

主要命令應該就是這些,具體解釋可以再去看看

或者輸入get-help entityframework,結果如下,我們用到的就是Add-Migration(添加遷移)、Update-Database(更新數據庫)這兩個應該就是比較常用的了。

輸入第一個命令 Add-Migration 這裏需要給個參數(實際上有很多參數)這個參數就是會在生成的時間戳後面定義的Name參數

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※幫你省時又省力,新北清潔一流服務好口碑

※回頭車貨運收費標準

Spring7——開發基於註解形式的spring

開發基於註解形式的spring
SpringIOC容器的2種形式:
(1)xml配置文件:applicationContext.xml; 存bean:<bean> 取bean:

ApplicationContext context=new ClassPathXmlApplicationContext("applicationContext.xml");

(2)註解:帶有@Configuration註解的類(配置類)

存bean:@Bean+方法的返回值

//配置類,相當於applicationContext.xml
@Configuration
public class MyConfig {


    @Bean //id=方法名(myStudent)
    public Student myStudent(){
        Student student=new Student(2,"fg",34);
        return student;
    }
}

取bean: 

ApplicationContext context=new AnnotationConfigApplicationContext(MyConfig.class);
Student myStudent = (Student) context.getBean("myStudent");

注意:兩種形式獲取的IOC是獨立的  
註解形式向IOC容器存放bean詳解:
1.必須有@Configuration
2.形式:
2.1 三層組件(Controller、Service、Dao):             (1)將三層組件分別加註解@Controller、@Service、@Repository等價於@Commponent              (2)納入掃描器 a.xml配置

<context:component-scan base-package="org.ghl.controller"></context:component-scan>

b.註解形式      component-scan只對三層組件負責。    
給掃描器指定規則:                 過濾類型:FilterType(ANNOTATION, ASSIGNABLE_TYPE, CUSTOM)                 ANNOTATION:三層註解類型@Controller、@Service、@Repository等價於@Commponent

排除:
@ComponentScan(value = "org.ghl",excludeFilters = {@ComponentScan.Filter(type = FilterType.ANNOTATION ,value = {Service.class,Repository.class})})
包含:(有默認行為,可以通過useDefaultFilters禁止)
@ComponentScan(value = "org.ghl",includeFilters = {@ComponentScan.Filter(type = FilterType.ANNOTATION ,value = {Service.class,Repository.class})},useDefaultFilters = false)

  ASSIGNABLE_TYPE:指具體的類。

@ComponentScan(value = "org.ghl",includeFilters = {@ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE ,value = {StudentController.class})},useDefaultFilters = false)

區分:ANNOTATION:Service.class指標有@Service的所有類;           ASSIGNABLE_TYPE:指具體的類。                 CUSTOM:自定義:自己定義規則

@ComponentScan(value = "org.ghl",includeFilters = {@ComponentScan.Filter(type = FilterType.CUSTOM ,value = {MyFilter.class})},useDefaultFilters = false)

 

public class MyFilter implements TypeFilter {
    @Override
    public boolean match(MetadataReader metadataReader, MetadataReaderFactory metadataReaderFactory) throws IOException {
        ClassMetadata classMetadata = metadataReader.getClassMetadata();
        //拿到掃描器value = "org.ghl"包中的所有標有三層組件註解類的名字
        String className = classMetadata.getClassName();
        //只過濾出和student相關的三層組件
        if (className.contains("Student")){
            return true;  //表示包含
        }
        return false; //表示排除
    }
}

  

2.2 非三層組件(Student.clss/轉換器等): (1)@Bean+方法的返回值,id的默認值為方法名,也可以通過@Bean(“stu”)修改。 (2)import/FactoryBean  
bean的作用域

(@Scope(“singleton”))scope=”singleton”:單例 scope=”prototype”:原型、多實例。
執行的時機(產生bean的時機):         singleton:容器在初始化時,就創建對象,且只創建一次; 也支持延遲加載:在第一次使用時,創建對象。在config中加入@Lazy。         prototype:容器在初始化時,不創建對象,在每次使用時(每次從容器獲取對象時),再創建對象。         
條件註解 可以讓某一個Bean在某些條件下加入IOC容器。 (1)準備bean; (2)增加條件bean:給每個bean設置條件,必須實現Condition接口。 (3)根據條件加入IOC容器
  回顧給IOC加入Bean的方法:        註解:全部在@Configuration配置中設置:                 三層組件:掃描器+三層註解                 非三層組件:(1)@Bean+返回值                                      (2)@import                                      (3)FactoryBean(工廠Bean)
 
@import使用:         (1)直接編寫到@Import中; @Import({Apple.class,Banana.class})         (2)自定義ImportSelector接口的實現類,通過selectimports方法實現(方法的返回值就是要納入IOC容器的Bean)。並告知程序自己編寫的實現類。

public class MyImportSelector implements ImportSelector {
    @Override
    public String[] selectImports(AnnotationMetadata annotationMetadata) {
        return new String[]{"org.ghl.entity.Apple","org.ghl.entity.Banana"}; //方法的返回值就是要納入IOC容器的Bean
    }
}
@Import({MyImportSelector.class})

    (3)編寫ImporBeanDefinitionRegistrar接口的實現類並重寫方法。 

public class MyImporBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar {
    @Override
    public void registerBeanDefinitions(AnnotationMetadata annotationMetadata, BeanDefinitionRegistry beanDefinitionRegistry) {
        //BeanDefinition beanDefinition=new RootBeanDefinition(Orange.class);
        BeanDefinition beanDefinition=new RootBeanDefinition("org.ghl.entity.Orange");
        beanDefinitionRegistry.registerBeanDefinition("myorange",beanDefinition);


    }
}  
@Import({MyImporBeanDefinitionRegistrar.class})

  

FactoryBean(工廠Bean)         1.寫實現類和重寫方法;  

public class MyFactoryBean implements FactoryBean{
    @Override
    public Object getObject() throws Exception {
        return new Apple();
    }

    @Override
    public Class<?> getObjectType() {
        return Apple.class;
    }

    @Override
    public boolean isSingleton() {
        return true;
    }
}

  2.註冊到@Bean中  

@Bean
public FactoryBean<Apple>  myFactoryBean(){
    return new MyFactoryBean();
}

注意:需要通過&區分獲取的對象是哪一個。不加&,獲取的是最內部真實的apple,如果加&,獲取的是FactoryBean。  

  

  

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

南投搬家公司費用需注意的眉眉角角,別等搬了再說!

新北清潔公司,居家、辦公、裝潢細清專業服務

※教你寫出一流的銷售文案?

Spring Boot2+Resilience4j實現容錯之Bulkhead

Resilience4j是一個輕量級、易於使用的容錯庫,其靈感來自Netflix Hystrix,但專為Java 8和函數式編程設計。輕量級,因為庫只使用Vavr,它沒有任何其他外部庫依賴項。相比之下,Netflix Hystrix對Archaius有一個編譯依賴關係,Archaius有更多的外部庫依賴關係,如Guava和Apache Commons。

Resilience4j提供高階函數(decorators)來增強任何功能接口、lambda表達式或方法引用,包括斷路器、速率限制器、重試或艙壁。可以在任何函數接口、lambda表達式或方法引用上使用多個裝飾器。優點是您可以選擇所需的裝飾器,而無需其他任何東西。

有了Resilience4j,你不必全力以赴,你可以選擇你需要的。

https://resilience4j.readme.io/docs/getting-started

概覽

Resilience4j提供了兩種艙壁模式(Bulkhead),可用於限制併發執行的次數:

  • SemaphoreBulkhead(信號量艙壁,默認),基於Java併發庫中的Semaphore實現。
  • FixedThreadPoolBulkhead(固定線程池艙壁),它使用一個有界隊列和一個固定線程池。

本文將演示在Spring Boot2中集成Resilience4j庫,以及在多併發情況下實現如上兩種艙壁模式。

引入依賴

在Spring Boot2項目中引入Resilience4j相關依賴

<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-spring-boot2</artifactId>
    <version>1.4.0</version>
</dependency>
<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-bulkhead</artifactId>
    <version>1.4.0</version>
</dependency>

由於Resilience4j的Bulkhead依賴於Spring AOP,所以我們需要引入Spring Boot AOP相關依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-aop</artifactId>
</dependency>

我們可能還希望了解Resilience4j在程序中的運行時狀態,所以需要通過Spring Boot Actuator將其暴露出來

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

實現SemaphoreBulkhead(信號量艙壁)

resilience4j-spring-boot2實現了對resilience4j的自動配置,因此我們僅需在項目中的yml/properties文件中編寫配置即可。

SemaphoreBulkhead的配置項如下:

屬性配置 默認值 含義
maxConcurrentCalls 25 艙壁允許的最大并行執行量
maxWaitDuration 0 嘗試進入飽和艙壁時,應阻塞線程的最長時間。

添加配置

示例(使用yml):

resilience4j.bulkhead:
  configs:
    default:
      maxConcurrentCalls: 5
      maxWaitDuration: 20ms
  instances:
    backendA:
      baseConfig: default
    backendB:
      maxWaitDuration: 10ms
      maxConcurrentCalls: 20

如上,我們配置了SemaphoreBulkhead的默認配置為maxConcurrentCalls: 5,maxWaitDuration: 20ms。並在backendA實例上應用了默認配置,而在backendB實例上使用自定義的配置。這裏的實例可以理解為一個方法/lambda表達式等等的可執行單元。

編寫Bulkhead邏輯

定義一個受SemaphoreBulkhead管理的Service類:

@Service
public class BulkheadService {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Autowired
    private BulkheadRegistry bulkheadRegistry;

    @Bulkhead(name = "backendA")
    public JsonNode getJsonObject() throws InterruptedException {
        io.github.resilience4j.bulkhead.Bulkhead.Metrics metrics = bulkheadRegistry.bulkhead("backendA").getMetrics();
        logger.info("now i enter the method!!!,{}<<<<<<{}", metrics.getAvailableConcurrentCalls(), metrics.getMaxAllowedConcurrentCalls());
        Thread.sleep(1000L);
        logger.info("now i exist the method!!!");
        return new ObjectMapper().createObjectNode().put("file", System.currentTimeMillis());
    }
}

如上,我們將@Bulkhead註解放到需要管理的方法上面。並且通過name屬性指定該方法對應的Bulkhead實例名字(這裏我們指定的實例名字為backendA,所以該方法將會利用默認的配置)。

定義接口類:

@RestController
public class BulkheadResource {
    @Autowired
    private BulkheadService bulkheadService;

    @GetMapping("/json-object")
    public ResponseEntity<JsonNode> getJsonObject() throws InterruptedException {
        return ResponseEntity.ok(bulkheadService.getJsonObject());
    }
}

編寫測試:

首先添加測試相關依賴

<dependency>
    <groupId>io.rest-assured</groupId>
    <artifactId>rest-assured</artifactId>
    <version>3.0.5</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.awaitility</groupId>
    <artifactId>awaitility</artifactId>
    <version>4.0.2</version>
    <scope>test</scope>
</dependency>

這裏我們使用rest-assured和awaitility編寫多併發情況下的API測試

public class SemaphoreBulkheadTests extends Resilience4jDemoApplicationTests {
    @LocalServerPort
    private int port;
    @BeforeEach
    public void init() {
        RestAssured.baseURI = "http://localhost";
        RestAssured.port = port;
    }

    @Test
    public void 多併發訪問情況下的SemaphoreBulkhead測試() {
        CopyOnWriteArrayList<Integer> statusList = new CopyOnWriteArrayList<>();
        IntStream.range(0, 8).forEach(i -> CompletableFuture.runAsync(() -> {
                statusList.add(given().get("/json-object").statusCode());
            }
        ));
        await().atMost(1, TimeUnit.MINUTES).until(() -> statusList.size() == 8);
        System.out.println(statusList);
        assertThat(statusList.stream().filter(i -> i == 200).count()).isEqualTo(5);
        assertThat(statusList.stream().filter(i -> i == 500).count()).isEqualTo(3);
    }
}

可以看到所有請求中只有前五個順利通過了,其餘三個都因為超時而導致接口報500異常。我們可能並不希望這種不友好的提示,因此Resilience4j提供了自定義的失敗回退方法。當請求併發量過大時,無法正常執行的請求將進入回退方法。

首先我們定義一個回退方法

private JsonNode fallback(BulkheadFullException exception) {
        return new ObjectMapper().createObjectNode().put("errorFile", System.currentTimeMillis());
    }

注意:回退方法應該和調用方法放置在同一類中,並且必須具有相同的方法簽名,並且僅帶有一個額外的目標異常參數。

然後在@Bulkhead註解中指定回退方法:@Bulkhead(name = "backendA", fallbackMethod = "fallback")

最後修改API測試代碼:

@Test
public void 多併發訪問情況下的SemaphoreBulkhead測試使用回退方法() {
    CopyOnWriteArrayList<Integer> statusList = new CopyOnWriteArrayList<>();
    IntStream.range(0, 8).forEach(i -> CompletableFuture.runAsync(() -> {
            statusList.add(given().get("/json-object").statusCode());
        }
    ));
    await().atMost(1, TimeUnit.MINUTES).until(() -> statusList.size() == 8);
    System.out.println(statusList);
    assertThat(statusList.stream().filter(i -> i == 200).count()).isEqualTo(8);
}

運行單元測試,成功!可以看到,我們定義的回退方法,在請求過量時起作用了。

實現FixedThreadPoolBulkhead(固定線程池艙壁)

FixedThreadPoolBulkhead的配置項如下:

配置名稱 默認值 含義
maxThreadPoolSize Runtime.getRuntime().availableProcessors() 配置最大線程池大小
coreThreadPoolSize Runtime.getRuntime().availableProcessors() - 1 配置核心線程池大小
queueCapacity 100 配置隊列的容量
keepAliveDuration 20ms 當線程數大於核心時,這是多餘空閑線程在終止前等待新任務的最長時間

添加配置

示例(使用yml):

resilience4j.thread-pool-bulkhead:
  configs:
    default:
      maxThreadPoolSize: 4
      coreThreadPoolSize: 2
      queueCapacity: 2
  instances:
    backendA:
      baseConfig: default
    backendB:
      maxThreadPoolSize: 1
      coreThreadPoolSize: 1
      queueCapacity: 1

如上,我們定義了一段簡單的FixedThreadPoolBulkhead配置,我們指定的默認配置為:maxThreadPoolSize: 4,coreThreadPoolSize: 2,queueCapacity: 2,並且指定了兩個實例,其中backendA使用了默認配置而backendB使用了自定義的配置。

編寫Bulkhead邏輯

定義一個受FixedThreadPoolBulkhead管理的方法:

@Bulkhead(name = "backendA", type = Bulkhead.Type.THREADPOOL)
public CompletableFuture<JsonNode> getJsonObjectByThreadPool() throws InterruptedException {
    io.github.resilience4j.bulkhead.ThreadPoolBulkhead.Metrics metrics = threadPoolBulkheadRegistry.bulkhead("backendA").getMetrics();
    logger.info("now i enter the method!!!,{}", metrics);
    Thread.sleep(1000L);
    logger.info("now i exist the method!!!");
    return CompletableFuture.supplyAsync(() -> new ObjectMapper().createObjectNode().put("file", System.currentTimeMillis()));
}

如上定義和SemaphoreBulkhead的方法大同小異,其中@Bulkhead显示指定了type的屬性為Bulkhead.Type.THREADPOOL,表明其方法受FixedThreadPoolBulkhead管理。由於@Bulkhead默認的BulkheadSemaphoreBulkhead,所以在未指定type的情況下為SemaphoreBulkhead。另外,FixedThreadPoolBulkhead只對CompletableFuture方法有效,所以我們必創建返回CompletableFuture類型的方法。

定義接口類方法

@GetMapping("/json-object-with-threadpool")
public ResponseEntity<JsonNode> getJsonObjectWithThreadPool() throws InterruptedException, ExecutionException {
    return ResponseEntity.ok(bulkheadService.getJsonObjectByThreadPool().get());
}

編寫測試代碼

@Test
public void 多併發訪問情況下的ThreadPoolBulkhead測試() {
    CopyOnWriteArrayList<Integer> statusList = new CopyOnWriteArrayList<>();
    IntStream.range(0, 8).forEach(i -> CompletableFuture.runAsync(() -> {
            statusList.add(given().get("/json-object-with-threadpool").statusCode());
        }
    ));
    await().atMost(1, TimeUnit.MINUTES).until(() -> statusList.size() == 8);
    System.out.println(statusList);
    assertThat(statusList.stream().filter(i -> i == 200).count()).isEqualTo(6);
    assertThat(statusList.stream().filter(i -> i == 500).count()).isEqualTo(2);
}

測試中我們并行請求了8次,其中6次請求成功,2次失敗。根據FixedThreadPoolBulkhead的默認配置,最多能容納maxThreadPoolSize+queueCapacity次請求(根據我們上面的配置為6次)。

同樣,我們可能並不希望這種不友好的提示,那麼我們可以指定回退方法,在請求無法正常執行時使用回退方法。

private CompletableFuture<JsonNode> fallbackByThreadPool(BulkheadFullException exception) {
    return CompletableFuture.supplyAsync(() -> new ObjectMapper().createObjectNode().put("errorFile", System.currentTimeMillis()));
}
@Bulkhead(name = "backendA", type = Bulkhead.Type.THREADPOOL, fallbackMethod = "fallbackByThreadPool")
public CompletableFuture<JsonNode> getJsonObjectByThreadPoolWithFallback() throws InterruptedException {
    io.github.resilience4j.bulkhead.ThreadPoolBulkhead.Metrics metrics = threadPoolBulkheadRegistry.bulkhead("backendA").getMetrics();
    logger.info("now i enter the method!!!,{}", metrics);
    Thread.sleep(1000L);
    logger.info("now i exist the method!!!");
    return CompletableFuture.supplyAsync(() -> new ObjectMapper().createObjectNode().put("file", System.currentTimeMillis()));
}

編寫測試代碼

@Test
public void 多併發訪問情況下的ThreadPoolBulkhead測試使用回退方法() {
    CopyOnWriteArrayList<Integer> statusList = new CopyOnWriteArrayList<>();
    IntStream.range(0, 8).forEach(i -> CompletableFuture.runAsync(() -> {
            statusList.add(given().get("/json-object-by-threadpool-with-fallback").statusCode());
        }
    ));
    await().atMost(1, TimeUnit.MINUTES).until(() -> statusList.size() == 8);
    System.out.println(statusList);
    assertThat(statusList.stream().filter(i -> i == 200).count()).isEqualTo(8);
}

由於指定了回退方法,所有請求的響應狀態都為正常了。

總結

本文首先簡單介紹了Resilience4j的功能及使用場景,然後具體介紹了Resilience4j中的Bulkhead。演示了如何在Spring Boot2項目中引入Resilience4j庫,使用代碼示例演示了如何在Spring Boot2項目中實現Resilience4j中的兩種Bulkhead(SemaphoreBulkhead和FixedThreadPoolBulkhead),並編寫API測試驗證我們的示例。

本文示例代碼地址:https://github.com/cg837718548/resilience4j-demo

歡迎訪問筆者博客:blog.dongxishaonian.tech

關注筆者公眾號,推送各類原創/優質技術文章 ⬇️

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※想知道最厲害的網頁設計公司"嚨底家"!

※幫你省時又省力,新北清潔一流服務好口碑

※別再煩惱如何寫文案,掌握八大原則!

多線程高併發編程(12) — 阻塞算法實現ArrayBlockingQueue源碼分析

一.前言

  前文探究了非阻塞算法的實現ConcurrentLinkedQueue安全隊列,也說明了阻塞算法實現的兩種方式,使用一把鎖(出隊和入隊同一把鎖ArrayBlockingQueue)和兩把鎖(出隊和入隊各一把鎖LinkedBlockingQueue)來實現,今天來探究下ArrayBlockingQueue。

  ArrayBlockingQueue是一個阻塞隊列,底層使用數組結構實現,按照先進先出(FIFO)的原則對元素進行排序。

  ArrayBlockingQueue是一個線程安全的集合,通過ReentrantLock鎖來實現,在併發情況下可以保證數據的一致性。

  此外,ArrayBlockingQueue的容量是有限的,數組的大小在初始化時就固定了,不會隨着隊列元素的增加而出現擴容的情況,也就是說ArrayBlockingQueue是一個“有界緩存區”。

  從下圖可以看出,ArrayBlockingQueue是使用一個數組存儲元素的,當向隊列插入元素時,首先會插入到數組下標索引為6的位置,再有新元素進來時插入到索引為7的位置,依次類推,如果滿了就不會再插入。

  當元素出隊時,先移除索引為2的元素3,與入隊一樣,依次類推,移除索引3、4、5…上的元素。這也形成了“先進先出”。

 

二.源碼解析

  1. 構造方法

    public class ArrayBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
    
        //隊列實現:數組
        final Object[] items;
    
        //當讀取元素時數組的下標(下一個被取出元素的索引)
        int takeIndex;
    
        //添加元素時數組的下標 (下一個被添加元素的索引)
        int putIndex;
    
        //隊列中元素個數:
        int count;
    
        //可重入鎖:
        final ReentrantLock lock;
    
        //入隊操作時是否讓線程等待
        private final Condition notEmpty;
    
        //出隊操作時是否讓線程等待
        private final Condition notFull;
    
        /**
         * 初始化隊列容量構造:由於公平鎖會降低隊列的性能,因而使用非公平鎖(默認)。
         */
        public ArrayBlockingQueue(int capacity) {
            this(capacity, false);
        }
    
        //帶初始容量大小和公平鎖隊列(公平鎖通過ReentrantLock實現):
        public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];
            lock = new ReentrantLock(fair);
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }
    }
    •  在多線程中,默認不保證線程公平的訪問隊列;

    •  在ArrayBlockingQueue中為了保證數據的安全,使用了ReentrantLock鎖。由於鎖的引入,導致了線程之間的競爭。當有一個線程獲取到鎖時,其餘線程處於等待狀態。當鎖被釋放時,所有等待線程為奪鎖而競爭;

    • 鎖有公平鎖和非公平鎖:

      •  公平鎖:等待的線程在獲取鎖而競爭時,按照等待的先後順序FIFO進行獲取操作;公平鎖可以應用在比如併發下的日誌輸出隊列中,保證了日誌輸出的順序完整性;
        •  優點:等待鎖的線程不會餓死,和非公平鎖相比,在獲得鎖和保證鎖分配的均衡性差異較小;
        • 缺點:使用公平鎖的程序在多線程訪問時表現為很低的吞吐量(即速度很慢),等待隊列中除第一個線程以外的所有線程都會阻塞,CPU喚醒阻塞線程的開銷比非公平鎖的大;公平鎖不能保證線程調度的公平性,因此,使用公平鎖的眾多線程中的一員可能獲得多倍的成功機會,這種情況發生在其他活動線程沒有被處理並且目前並未持有鎖時【ReentrantLock源碼對公平鎖的定義】;
           Note however, that fairness of locks does not guarantee
           fairness of thread scheduling. Thus, one of many threads using a
           fair lock may obtain it multiple times in succession while other
           active threads are not progressing and not currently holding the
           lock.
          •  上面這句話有重入鎖的概念,一個線程可以在已經獲取鎖的情況下再次進入獲取到鎖,不需要競爭;同時,如果一個線程獲取到了鎖,然後釋放,在其他線程來獲取之前再次是可以獲取到鎖的。
            A: Request Lock -> Release Lock -> Request Lock Again (Succeeds) 
                                                   B: Request Lock (Denied)... 
            -----------------------   Time   --------------------------------->
      •  非公平鎖:在獲取鎖時,無論是先等待還是后等待的線程,均有可能獲取到鎖。即根據搶佔機制,是隨機獲取鎖的,和公平鎖不一樣的是先來的不一定能獲取到鎖,有可能一直拿不到鎖,這樣會造成“飢餓”現象;
        • 優點:非公平鎖性能高於公平鎖性能。首先,在恢復一個被掛起的線程與該線程真正運行之間存在着嚴重的延遲,而且,非公平鎖更能充分的利用CPU的時間片,盡量減少CPU空閑的狀態時間;即可以減少喚起線程的開銷,整體的吞吐效率高,因為線程有幾率不阻塞直接獲取到鎖,CPU不必喚醒其他所有線程;
        • 缺點:處於等待隊列中的線程可能會餓死或者等很久才會獲得鎖;
      • 產生“飢餓”的原因:
        • 高優先級吞噬所有低優先級的CPU時間片,優先級越高,就會獲得越高的CPU執行機會; —> 使用默認的優先級;
        • 線程被永久阻塞在一個等待進入同步塊synchronized的狀態(長時間執行) ,同時synchronized並不保障等待線程的順序(鎖釋放后,隨機競爭,由OS調度),這會存在一個可能是某個線程總是搶鎖搶不到導致一直等待狀態 —> 避免持有鎖的線程長時間執行、使用显示lock來代替synchronized;
          synchronized(obj) {
                  while (true) {
               // .... infinite loop
               }
        •  等待的線程永遠不被喚醒:如果多個線程處在wait方法執行上,而對其調用notify方法不會保證哪一個線程會獲得喚醒,喚醒是無序的,跟VM/OS調度有關,甚至底層是隨機選取一個或是隊列中的第一個,任何線程都有可能處於繼續等待的狀態,因此存在這樣一個風險,即一個等待線程從來得不到喚醒,因為其他等待線程總是能被獲得喚醒 —> 使用显示lock來代替synchronized;
      •  比如ReentrantLock:
        •  在公平鎖中,如果有另一個線程持有鎖或者有其他線程在等待隊列中等待這個鎖,那麼新發出的請求的線程將被放入到隊列中;
        • 非公平鎖中, 根據搶佔機制,擁有鎖的線程在釋放鎖資源的時候, 新發出請求的線程可以和等待隊列中的第一個線程競爭鎖資源, 新線程競爭失敗才放入隊列中,但是已經進入等待隊列的線程, 依然是按照先進先出的順序獲取鎖資源;
  2. 入隊:有阻塞式和非阻塞式

    1. 阻塞式:當隊列中的元素已滿時,則會將此線程停止,讓其處於等待狀態,直到隊列中有空餘位置產生

      public void put(E e) throws InterruptedException {
              checkNotNull(e);
              final ReentrantLock lock = this.lock;
              lock.lockInterruptibly();//獲取鎖
              try {
                  //隊列中元素 == 數組長度(隊列滿了),則線程等待
                  while (count == items.length)
                      notFull.await();
                  enqueue(e);//元素加入隊列
              } finally {
                  lock.unlock();//釋放鎖
              }
          }
      • lockInterruptibly:
        • 如果當前線程未被中斷,則獲取鎖。
        • 如果該鎖沒有被另一個線程保持,則獲取該鎖並立即返回,將鎖的保持計數設置為 1。
        • 如果當前線程已經保持此鎖,則將保持計數加 1,並且該方法立即返回。
        • 如果鎖被另一個線程保持,則出於線程調度目的,禁用當前線程,並且在發生以下兩種情況之一以前,該線程將一直處於休眠狀態:1)鎖由當前線程獲得;2)其他某個線程中斷當前線程
    2. 非阻塞式:當隊列中的元素已滿時,並不會阻塞此線程的操作,而是讓其返回又或者是拋出異常

      public boolean add(E e) {
              return super.add(e);// AbstractQueue.add
          }
          public boolean add(E e) {
              if (offer(e))//調用實現接口
                  return true;
              else
                  throw new IllegalStateException("Queue full");
          }
          public boolean offer(E e) {
              checkNotNull(e);//檢測是否有空指針異常
              final ReentrantLock lock = this.lock;//獲得鎖對象
              lock.lock();//加鎖
              try {
                  //如果隊列滿了,返回false
                  if (count == items.length)
                      return false;
                  else {
                      //元素加入隊列
                      enqueue(e);
                      return true;
                  }
              } finally {
                  lock.unlock();//釋放鎖
              }
          }
          private void enqueue(E x) {
              // assert lock.getHoldCount() == 1;
              // assert items[putIndex] == null;
              //獲得數組
              final Object[] items = this.items;
              //槽位填充元素
              items[putIndex] = x;
              //獲得下一個被添加元素的索引,如果值等於數組長度,表示到達尾部了,需要從頭開始填充
              if (++putIndex == items.length)
                  putIndex = 0;
              count++;//數量+1
              notEmpty.signal();//喚醒出隊上的等待線程,表示有元素可以消費了
          }
      • enqueue中++putIndex == items.length,putIndex=0:這是因為當前隊列執行元素出隊時總是從隊列頭部獲取,而添加元素的索引從隊列尾部獲取所以當隊列索引(從0開始)與數組長度相等時,下次我們就需要從數組頭部開始添加了
    3. 阻塞式和非阻塞式的結合:offer(E e, long timeout, TimeUnit unit),向隊列尾部添加元素,可以設置線程等待時間,如果超過指定時間隊列還是滿的,則返回false;

      public boolean offer(E e, long timeout, TimeUnit unit)
              throws InterruptedException {
      
              checkNotNull(e);//檢測是否為空
              long nanos = unit.toNanos(timeout);//轉換成超時時間閥值
              final ReentrantLock lock = this.lock;
              lock.lockInterruptibly();//加鎖
              try {
                  //隊列是否滿了的判斷
                  while (count == items.length) {
                      if (nanos <= 0)//等待超時結束返回false
                          return false;
                      nanos = notFull.awaitNanos(nanos);//隊列滿了,等待出隊有空位填充
                  }
                  enqueue(e);//加入隊列中
                  return true;
              } finally {
                  lock.unlock();//釋放鎖
              }
          }
  3. 出隊:同樣有阻塞式和非阻塞式

    1. 阻塞式:當隊列中的元素已空時,則會將此線程停止,讓其處於等待狀態,直到隊列中有元素插入

      public E take() throws InterruptedException {
              final ReentrantLock lock = this.lock;
              lock.lockInterruptibly();
              try {
                  //隊列為空,進行等待
                  while (count == 0)
                      notEmpty.await();
                  return dequeue();//返回出隊元素
              } finally {
                  lock.unlock();
              }
          }
    2. 非阻塞式:當隊列中的元素已滿時,並不會阻塞此線程的操作,而是讓其返回null或元素【裏面的迭代器比較複雜,留待下文探究】

      public E poll() {
              final ReentrantLock lock = this.lock;
              lock.lock();
              try {
                  //隊列為空,返回null,否則返回元素
                  return (count == 0) ? null : dequeue();
              } finally {
                  lock.unlock();
              }
          }
          private E dequeue() {
              // assert lock.getHoldCount() == 1;
              // assert items[takeIndex] != null;
              final Object[] items = this.items;//獲得隊列
              @SuppressWarnings("unchecked")
              E x = (E) items[takeIndex];//獲得出隊元素
              items[takeIndex] = null;//出隊槽位元素置為null
              //下一個被取出元素的索引+1,如果值等於長度,表示後面沒有元素了,需要從頭開始取出
              if (++takeIndex == items.length)
                  takeIndex = 0;
              count--;//數量-1
              if (itrs != null)//迭代器不為空
                  itrs.elementDequeued();//同時更新迭代器中的元素數據
              notFull.signal();//喚醒入隊線程
              return x;//返回出隊元素
          }
    3. 阻塞式和非阻塞式的結合:poll(long timeout, TimeUnit unit),出隊獲取元素,可以設置線程等待時間,如果超過指定時間隊列還是空的,則返回null;

      public E poll(long timeout, TimeUnit unit) throws InterruptedException {
              long nanos = unit.toNanos(timeout);//轉換成超時時間閥值
              final ReentrantLock lock = this.lock;
              lock.lockInterruptibly();//加鎖
              try {
                  while (count == 0) {//隊列空了,等待
                      if (nanos <= 0)//超時了返回null
                          return null;
                      nanos = notEmpty.awaitNanos(nanos);//等待入隊填充元素
                  }
                  return dequeue();//返回出隊元素
              } finally {
                  lock.unlock();//釋放鎖
              }
          }
  4. 移除元素remove:

    public boolean remove(Object o) {
            //要移除的元素為空返回false
            if (o == null) return false;
            //獲得隊列數組
            final Object[] items = this.items;
            final ReentrantLock lock = this.lock;
            lock.lock();//加鎖
            try {
                //隊列有元素
                if (count > 0) {
                    final int putIndex = this.putIndex;//獲得下一個被添加元素的索引
                    int i = takeIndex;//下一個被取出元素的索引
                    do {
                        if (o.equals(items[i])) {//從takeIndex下標開始,找到要被刪除的元素
                            removeAt(i);//移除
                            return true;
                        }
                        if (++i == items.length)//下一個被取出元素的索引+1並判斷是否等於隊列長度,如果是,表示需要從頭開始遍歷
                            i = 0;
                    } while (i != putIndex);//繼續查找,直到找到最後一個元素
                }
                return false;
            } finally {
                lock.unlock();//解鎖
            }
        }
    
      /**
       * 根據下標移除元素,那麼會分成兩種情況一個是移除的是隊首元素,一個是移除的是非隊首元素,移除隊首元素,就相當於出隊操作,
       * 移除非隊首元素那麼中間就有空位了,後面元素需要依次補上,然後如果是隊尾元素,那麼putIndex也就是插入操作的下標也就需要跟着移動。
       */
        void removeAt(final int removeIndex) {
            // assert lock.getHoldCount() == 1;
            // assert items[removeIndex] != null;
            // assert removeIndex >= 0 && removeIndex < items.length;
            final Object[] items = this.items;//獲得隊列
            if (removeIndex == takeIndex) {//移除的是隊首元素
                // removing front item; just advance
                items[takeIndex] = null;//隊首置為null
                if (++takeIndex == items.length)//下一個被取出元素的索引+1並判斷是否等於隊列長度
                    takeIndex = 0;
                count--;//數量-1
                if (itrs != null)//迭代器不為空
                    itrs.elementDequeued();//更新迭代器元素
            } else {//移除的不是隊首元素,而是中間元素
                // an "interior" remove
    
                // slide over all others up through putIndex.
                final int putIndex = this.putIndex;//下一個被添加元素的索引
                for (int i = removeIndex;;) {//對隊列進行遍歷,因為是隊列中間的值被移除了,所有後面的元素都要挨個遷移
                    int next = i + 1;//獲取移除元素的下一個坐標
                    if (next == items.length)//判斷是否等於隊列長度
                        next = 0;
                    if (next != putIndex) {//獲取移除元素的下一個坐標!=下一個被添加元素的索引,表示移除元素的索引後面有值
                        items[i] = items[next];//當前要移除的元素置為後面的元素,即對後面的元素往前遷移,覆蓋要移除的元素
                        i = next;//下一個遷移的索引
                    } else {//移除的元素是最後一個,後面沒有值了
                        items[i] = null;//移除元素,直接置為null
                        this.putIndex = i;//更新下一個被添加元素的索引
                        break;//結束
                    }
                }
                count--;//數量-1
                if (itrs != null)//迭代器不為空
                    itrs.removedAt(removeIndex);//更新迭代器元素
            }
            notFull.signal();//喚醒入隊線程,可以添加元素了
        }
  5. 清空元素clear:用於清空ArrayBlockingQueue,並且會釋放所有等待notFull條件的線程(存放元素的線程)

    public void clear() {
            final Object[] items = this.items;//獲得隊列
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int k = count;//獲取元素數量
                if (k > 0) {//有元素,表示隊列不為空
                    final int putIndex = this.putIndex;//下一個被添加元素的索引
                    int i = takeIndex;//下一個被取出元素的索引
                    do {
                        items[i] = null;//對每個有元素的槽位置為null
                        if (++i == items.length)
                            i = 0;
                    } while (i != putIndex);//從有元素的第一個槽位開始遍歷,直到槽位元素為null
                    takeIndex = putIndex;//更新取出和添加的索引
                    count = 0;//數量更新為0
                    if (itrs != null)//迭代器不為空
                        itrs.queueIsEmpty();//更新迭代器為空
                    //若有等待notFull條件的線程,則逐一喚醒
                    for (; k > 0 && lock.hasWaiters(notFull); k--)
                        notFull.signal();//喚醒入隊線程,可以添加元素了
                }
            } finally {
                lock.unlock();
            }
        }
  6. offer(E e, long timeout, TimeUnit unit)和poll(long timeout, TimeUnit unit)裏面有awaitNanos,下面探討該功能實現:對當前線程或等待的入/出隊線程進行掛起,如果有入/出隊操作進行了喚醒出/入隊操作,則acquireQueued自旋獲取到鎖,然後出/入隊中的ReentrantLock是重入鎖,可以重入獲取到鎖進行出/入隊操作

        AbstractQueuedSynchronizer:
        //進行超時控制
        public final long awaitNanos(long nanosTimeout)
                throws InterruptedException {
            //如果當前線程中斷了拋出中斷異常
            if (Thread.interrupted())
                throw new InterruptedException();
            //當前線程加入到Condition隊列中
            Node node = addConditionWaiter();
            //鎖釋放是否成功:釋放當前線程的lock,從AQS的隊列中移出
            int savedState = fullyRelease(node);
            //到達等待時間點
            final long deadline = System.nanoTime() + nanosTimeout;
            //中斷標識
            int interruptMode = 0;
            //當前節點是否在同步隊列中,否表示不在,進入掛起判斷操作,如果已經在Sync隊列中,則退出循環
            //那什麼時候會把當前線程又加入到Sync隊列中呢?當然是調用signal方法的時候,因為這裏需要喚醒之前調用await方法的線程,喚醒之後進行下面的獲取鎖等操作
            while (!isOnSyncQueue(node)) {
                //如果超時了,將線程掛起,然後停止遍歷
                if (nanosTimeout <= 0L) {
                    transferAfterCancelledWait(node);
                    break;
                }
                //如果等待時間間隔超過了1000,繼續掛起
                if (nanosTimeout >= spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                //線程中斷了停止遍歷
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
                //獲得剩餘的等待時間間隔
                nanosTimeout = deadline - System.nanoTime();
            }
            //結束掛起,acquireQueued自旋對當前線程的隊列出隊進行獲取鎖並返回線程是否中斷
            //如果線程被中斷,並且中斷的方式不是拋出異常,則設置中斷後續的處理方式設置為REINTERRUPT
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;//中斷標識更新為退出等待時重新中斷
            if (node.nextWaiter != null)//當前節點後面還有節點,多併發操作了
                unlinkCancelledWaiters();//從頭到尾遍歷Condition隊列,移除被cancel的節點
            //如果線程已經被中斷,則根據之前獲取的interruptMode的值來判斷是繼續中斷還是拋出異常
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return deadline - System.nanoTime();//返回剩餘等待時間
        }
  7. drainTo可以一次性獲取隊列中所有的元素,它減少了鎖定隊列的次數,使用得當在某些場景下對性能有不錯的提升

    //最多從此隊列中移除給定數量的可用元素,並將這些元素添加到給定collection中
        public int drainTo(Collection<? super E> c) {
            return drainTo(c, Integer.MAX_VALUE);
        }
        public int drainTo(Collection<? super E> c, int maxElements) {
            checkNotNull(c);//檢查是否為空
            if (c == this)//如果集合類型相同拋出參數異常
                throw new IllegalArgumentException();
            if (maxElements <= 0)//如果給定移除數量小於0,返回0,表示不做移除操作
                return 0;
            final Object[] items = this.items;//獲得隊列
            final ReentrantLock lock = this.lock;
            lock.lock();//加鎖
            try {
                int n = Math.min(maxElements, count);//獲得元素的最小數量
                int take = takeIndex;//下一個被取出元素的索引
                int i = 0;
                try {
                    while (i < n) {//遍歷移除和添加
                        @SuppressWarnings("unchecked")
                        E x = (E) items[take];//獲得移除元素
                        c.add(x);//元素添加到直到集合中
                        items[take] = null;//元素原先隊列位置置為null
                        if (++take == items.length)//如果取出索引到達尾部,從頭開始遍歷取出
                            take = 0;
                        i++;//移除的數量+1,如果達到了移除的最小數量,結束遍歷
                    }
                    return n;//返回一共移除並添加了多少個元素
                } finally {
                    // Restore invariants even if c.add() threw
                    if (i > 0) {//如果有移除操作
                        count -= i;//隊列元素數量-i
                        takeIndex = take;//重置下一個被取出元素的索引
                        if (itrs != null) {//迭代器不為空
                            if (count == 0)//隊列空了
                                itrs.queueIsEmpty();//迭代器清空
                            else if (i > take)//說明take中間變成0了,通知itr
                                itrs.takeIndexWrapped();
                        }
                        //喚醒在因為隊列滿而等待的入隊線程,最多喚醒i個,避免線程被喚醒了因為隊列又滿了而阻塞
                        for (; i > 0 && lock.hasWaiters(notFull); i--)
                            notFull.signal();
                    }
                }
            } finally {
                lock.unlock();
            }
        }

 

三.Logback 框架中異步日誌打印中ArrayBlockingQueue的使用

  1. 在高併發並且響應時間要求比較小的系統中同步打日誌已經滿足不了需求了,這是因為打日誌本身是需要同步寫磁盤的,會造成 響應時間 增加,如下圖同步日誌打印模型為:

  2. 異步模型是業務線程把要打印的日誌任務寫入一個隊列后直接返回,然後使用一個線程專門負責從隊列中獲取日誌任務寫入磁盤,其模型具體如下圖:

    • 如圖可知其實 logback 的異步日誌模型是一個多生產者單消費者模型,通過使用隊列把同步日誌打印轉換為了異步,業務線程調用異步 appender 只需要把日誌任務放入日誌隊列,日誌線程則負責使用同步的 appender 進行具體的日誌打印到磁盤;
  3. 接下來看看異步日誌打印具體實現,要把同步日誌打印改為異步需要修改 logback 的 xml 配置文件:

    <appender name="PROJECT" class="ch.qos.logback.core.FileAppender">
            <file>project.log</file>
            <encoding>UTF-8</encoding>
            <append>true</append>
    
            <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
                <!-- daily rollover -->
                <fileNamePattern>project.log.%d{yyyy-MM-dd}</fileNamePattern>
                <!-- keep 7 days' worth of history -->
                <maxHistory>7</maxHistory>
            </rollingPolicy>
            <layout class="ch.qos.logback.classic.PatternLayout">
                <pattern>
                    <![CDATA[%n%-4r [%d{yyyy-MM-dd HH:mm:ss}] %X{productionMode} - %X{method} %X{requestURIWithQueryString} [ip=%X{remoteAddr}, ref=%X{referrer},
                    ua=%X{userAgent}, sid=%X{cookie.JSESSIONID}]%n  %-5level %logger{35} - %m%n]]>
                </pattern>
            </layout>
        </appender>
    
        <appender name="asyncProject" class="ch.qos.logback.classic.AsyncAppender">
            <discardingThreshold>0</discardingThreshold>
            <queueSize>1024</queueSize>
            <neverBlock>true</neverBlock>
            <appender-ref ref="PROJECT" />
        </appender>
         <logger name="PROJECT_LOGGER" additivity="false">
            <level value="WARN" />
            <appender-ref ref="asyncProject" />
        </logger>
  4. 從上面可知 AsyncAppender 是實現異步日誌的關鍵,下面探究它的原理:

    1. 如上圖可知 AsyncAppender 繼承自 AsyncAppenderBase,其中後者具體實現了異步日誌模型的主要功能,前者只是重寫了其中的一些方法。另外從類圖可知 logback 中的異步日誌隊列是一個阻塞隊列, 後面會知道其實是一個有界阻塞隊列 ArrayBlockingQueue, 其中 queueSize 是有界隊列的元素個數默認為 256;
    2. worker則是工作線程,也就是異步打印日誌的消費者線程,aai則是一個appender的裝飾器,裡邊存放的同步日誌的appender,其中appenderCount記錄aai裡邊附加的同步appender的個數(這個和配置文件相對應,一個異步的appender對應一個同步的appender),neverBlock用來指示當同步隊列已滿時是否阻塞打印日誌線程(如果配置neverBlock=true,當隊列滿了之後,後面阻塞的線程想要輸出的消息就直接被丟棄,從而線程不會阻塞),discardingThreshold是一個閾值,當日誌隊列裡邊的空閑元素個數小於該值時,新來的某些級別的日誌就會直接被丟棄。
  5.  接下來看下何時創建的日誌隊列以及何時啟動的消費線程,這需要看下 AsyncAppenderBase 的 start 方法,該方法是在解析完畢配置 AsyncAppenderBase 的 xml 的節點元素后被調用 :

    public void start() {
            if (isStarted())
                return;
            if (appenderCount == 0) {
                addError("No attached appenders found.");
                return;
            }
            if (queueSize < 1) {
                addError("Invalid queue size [" + queueSize + "]");
                return;
            }
            // 創建一個ArrayBlockingQueue阻塞隊列,queueSize默認為256,創建阻塞隊列的原因是:防止生產者過多,造成隊列中元素過多,產生OOM異常
            blockingQueue = new ArrayBlockingQueue<E>(queueSize);
            // 如果discardingThreshold未定義的話,默認為queueSize的1/5
            if (discardingThreshold == UNDEFINED)
                discardingThreshold = queueSize / 5;
            addInfo("Setting discardingThreshold to " + discardingThreshold);
            // 將工作線程設置為守護線程,即當jvm停止時,即使隊列中有未處理的元素,也不會在進行處理
            worker.setDaemon(true);
            // 為線程設置name便於調試
            worker.setName("AsyncAppender-Worker-" + getName());
            // make sure this instance is marked as "started" before staring the worker Thread
            // 啟動線程
            super.start();
            worker.start();
        }
    1. logback 使用的隊列是有界隊列 ArrayBlockingQueue,之所以使用有界隊列是考慮到內存溢出問題,在高併發下寫日誌的 qps 會很高如果設置為無界隊列隊列本身會佔用很大內存,很可能會造成 內存溢出。
    2. 這裏消費日誌隊列的 worker 線程被設置為了守護線程,意味着當主線程運行結束並且當前沒有用戶線程時候該 worker 線程會隨着 JVM 的退出而終止,而不管日誌隊列裏面是否還有日誌任務未被處理。另外這裏設置了線程的名稱是個很好的習慣,因為這在查找問題的時候很有幫助,根據線程名字就可以定位到是哪個線程。
  6. 既然是有界隊列那麼肯定需要考慮如果隊列滿了,該如何處置,是丟棄老的日誌任務,還是阻塞日誌打印線程直到隊列有空餘元素那?下面看append 方法:

    protected void append(E eventObject) {
            // 判斷隊列中的元素數量是否小於discardingThreshold,如果小於的話,並且日誌等級小於info的話,則直接丟棄這些日誌任務
            if (isQueueBelowDiscardingThreshold() && isDiscardable(eventObject)) {
                return;
            }
            preprocess(eventObject);
            // 日誌入隊
            put(eventObject);
        }
        private boolean isQueueBelowDiscardingThreshold() {
            return (blockingQueue.remainingCapacity() < discardingThreshold);
        }
    
       // 子類重寫的方法   判斷日誌等級
        protected boolean isDiscardable(ILoggingEvent event) {
            Level level = event.getLevel();
            return level.toInt() <= Level.INFO_INT;
        }    
    • 日誌入隊put:從下面可知如果 neverBlock 設置為 false(默認為 false)則會調用阻塞隊列的 put 方法,而 put 是阻塞的,也就是說如果當前隊列滿了,如果再企圖調用 put 方法向隊列放入一個元素則調用線程會被阻塞直到隊列有空餘空間。這裡有必要提下其中blockingQueue.put(eventObject)當日誌隊列滿了的時候 put 方法會調用 await() 方法阻塞當前線程,如果其它線程中斷了該線程,那麼該線程會拋出 InterruptedException 異常,那麼當前的日誌任務就會被丟棄了。如果 neverBlock 設置為了 true 則會調用阻塞隊列的 offer 方法,而該方法是非阻塞的,如果當前隊列滿了,則會直接返回,也就是丟棄當前日誌任務。
      private void put(E eventObject) {
              // 判斷是否阻塞(默認為false),則會調用阻塞隊列的put方法
              if (neverBlock) {
                  blockingQueue.offer(eventObject);
              } else {
                  putUninterruptibly(eventObject);
              }
      }
          // 可中斷的阻塞put方法
          private void putUninterruptibly(E eventObject) {
              boolean interrupted = false;
              try {
                  while (true) {
                      try {
                          blockingQueue.put(eventObject);
                          break;
                      } catch (InterruptedException e) {
                          interrupted = true;
                      }
                  }
              } finally {
                  if (interrupted) {
                      Thread.currentThread().interrupt();
                  }
              }
          }
  7. 最後看下 addAppender 方法,可以看出,一個異步的appender只能綁定一個同步appender,這個appender會被放入AppenderAttachableImpl的appenderList列表裡邊

    public void addAppender(Appender<E> newAppender) {
            if (appenderCount == 0) {
                appenderCount++;
                addInfo("Attaching appender named [" + newAppender.getName() + "] to AsyncAppender.");
                aai.addAppender(newAppender);
            } else {
                addWarn("One and only one appender may be attached to AsyncAppender.");
                addWarn("Ignoring additional appender named [" + newAppender.getName() + "]");
            }
    }
  8. 通過上面我們已經分析完了日誌生產線程放入日誌任務到日誌隊列的實現,下面一起來看下消費線程是如何從隊列裏面消費日誌任務並寫入磁盤的,由於消費線程是一個線程,那就從 worker 的 run 方法看起(消費者,將日誌寫入磁盤的線程方法):

    class Worker extends Thread {
    
            public void run() {
                AsyncAppenderBase<E> parent = AsyncAppenderBase.this;
                AppenderAttachableImpl<E> aai = parent.aai;
    
                // loop while the parent is started 一直循環知道線程被中斷
                while (parent.isStarted()) {
                    try {// 從阻塞隊列中獲取元素,交由給同步的appender將日誌打印到磁盤
                        E e = parent.blockingQueue.take();
                        aai.appendLoopOnAppenders(e);
                    } catch (InterruptedException ie) {
                        break;
                    }
                }
    
                addInfo("Worker thread will flush remaining events before exiting. ");
                //執行到這裏說明該線程被中斷,則把隊列裡邊的剩餘日誌任務刷新到磁盤
                for (E e : parent.blockingQueue) {
                    aai.appendLoopOnAppenders(e);
                    parent.blockingQueue.remove(e);
                }
    
                aai.detachAndStopAllAppenders();
            }
        }
    • try邏輯中從日誌隊列使用 take 方法獲取一個日誌任務,如果當前隊列為空則當前線程會阻塞到 take 方法直到隊列不為空才返回,獲取到日誌任務後會調用 AppenderAttachableImpl 的 aai.appendLoopOnAppenders 方法,該方法會循環調用通過 addAppender 注入的同步日誌 appener 具體實現日誌打印到磁盤的任務。

四.參考:

  1. 公平鎖的使用場景:https://stackoverflow.com/questions/26455578/when-to-use-fairness-mode-in-java-concurrency
  2. 公平鎖和非公平鎖的區別的提問:https://segmentfault.com/q/1010000006439146
  3. 公平鎖不能保證線程調度的公平性:https://stackoverflow.com/questions/60903107/understanding-fair-reentrantlock-in-java
  4. logback異步日誌打印中的ArrayBlockingQueue的使用:https://my.oschina.net/u/4410397/blog/3428573

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

新北清潔公司,居家、辦公、裝潢細清專業服務

※別再煩惱如何寫文案,掌握八大原則!

※教你寫出一流的銷售文案?

※超省錢租車方案