Kotlin Coroutines不複雜, 我來幫你理一理

2{icon} {views}

Coroutines 協程

最近在總結Kotlin的一些東西, 發現協程這塊確實不容易說清楚. 之前的那篇就寫得不好, 所以決定重寫.
反覆研究了官網文檔和各種教程博客, 本篇內容是最基礎也最主要的內容, 力求小白也能看懂並理解.

Coroutines概念

Coroutines(協程), 計算機程序組件, 通過允許任務掛起和恢復執行, 來支持非搶佔式的多任務. (見).

協程主要是為了異步, 非阻塞的代碼. 這個概念並不是Kotlin特有的, Go, Python等多個語言中都有支持.

Kotlin Coroutines

Kotlin中用協程來做異步和非阻塞任務, 主要優點是代碼可讀性好, 不用回調函數. (用協程寫的異步代碼乍一看很像同步代碼.)

Kotlin對協程的支持是在語言級別的, 在標準庫中只提供了最低程度的APIs, 然後把很多功能都代理到庫中.

Kotlin中只加了suspend作為關鍵字.
asyncawait不是Kotlin的關鍵字, 也不是標準庫的一部分.

比起futures和promises, kotlin中suspending function的概念為異步操作提供了一種更安全和不易出錯的抽象.

kotlinx.coroutines是協程的庫, 為了使用它的核心功能, 項目需要增加kotlinx-coroutines-core的依賴.

Coroutines Basics: 協程到底是什麼?

先上一段官方的demo:

import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch


fun main() {
    GlobalScope.launch { // launch a new coroutine in background and continue
        delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
        println("World!") // print after delay
    }
    println("Hello,") // main thread continues while coroutine is delayed
    Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
}

這段代碼的輸出:
先打印Hello, 延遲1s之後, 打印World.

對這段代碼的解釋:

launch開始了一個計算, 這個計算是可掛起的(suspendable), 它在計算過程中, 釋放了底層的線程, 當協程執行完成, 就會恢復(resume).

這種可掛起的計算就叫做一個協程(coroutine). 所以我們可以簡單地說launch開始了一個新的協程.

注意, 主線程需要等待協程結束, 如果註釋掉最後一行的Thread.sleep(2000L), 則只打印Hello, 沒有World.

協程和線程的關係

coroutine(協程)可以理解為輕量級的線程. 多個協程可以并行運行, 互相等待, 互相通信. 協程和線程的最大區別就是協程非常輕量(cheap), 我們可以創建成千上萬個協程而不必考慮性能.

協程是運行在線程上可以被掛起的運算. 可以被掛起, 意味着運算可以被暫停, 從線程移除, 存儲在內存里. 此時, 線程就可以自由做其他事情. 當計算準備好繼續進行時, 它會返回線程(但不一定要是同一個線程).

默認情況下, 協程運行在一個共享的線程池裡, 線程還是存在的, 只是一個線程可以運行多個協程, 所以線程沒必要太多.

調試

在上面的代碼中加上線程的名字:

fun main() {
    GlobalScope.launch {
        // launch a new coroutine in background and continue
        delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
        println("World! + ${Thread.currentThread().name}") // print after delay
    }
    println("Hello, + ${Thread.currentThread().name}") // main thread continues while coroutine is delayed
    Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
}

可以在IDE的Edit Configurations中設置VM options: -Dkotlinx.coroutines.debug, 運行程序, 會在log中打印出代碼運行的協程信息:

Hello, + main
World! + DefaultDispatcher-worker-1 @coroutine#1

suspend function

上面例子中的delay方法是一個suspend function.
delay()Thread.sleep()的區別是: delay()方法可以在不阻塞線程的情況下延遲協程. (It doesn’t block a thread, but only suspends the coroutine itself). 而Thread.sleep()則阻塞了當前線程.

所以, suspend的意思就是協程作用域被掛起了, 但是當前線程中協程作用域之外的代碼不被阻塞.

如果把GlobalScope.launch替換為thread, delay方法下面會出現紅線報錯:

Suspend functions are only allowed to be called from a coroutine or another suspend function

suspend方法只能在協程或者另一個suspend方法中被調用.

在協程等待的過程中, 線程會返回線程池, 當協程等待結束, 協程會在線程池中一個空閑的線程上恢復. (The thread is returned to the pool while the coroutine is waiting, and when the waiting is done, the coroutine resumes on a free thread in the pool.)

啟動協程

啟動一個新的協程, 常用的主要有以下幾種方式:

  • launch
  • async
  • runBlocking

它們被稱為coroutine builders. 不同的庫可以定義其他更多的構建方式.

runBlocking: 連接blocking和non-blocking的世界

runBlocking用來連接阻塞和非阻塞的世界.

runBlocking可以建立一個阻塞當前線程的協程. 所以它主要被用來在main函數中或者測試中使用, 作為連接函數.

比如前面的例子可以改寫成:

fun main() = runBlocking<Unit> {
    // start main coroutine
    GlobalScope.launch {
        // launch a new coroutine in background and continue
        delay(1000L)
        println("World! + ${Thread.currentThread().name}")
    }
    println("Hello, + ${Thread.currentThread().name}") // main coroutine continues here immediately
    delay(2000L) // delaying for 2 seconds to keep JVM alive
}

最後不再使用Thread.sleep(), 使用delay()就可以了.
程序輸出:

Hello, + main @coroutine#1
World! + DefaultDispatcher-worker-1 @coroutine#2

launch: 返回Job

上面的例子delay了一段時間來等待一個協程結束, 不是一個好的方法.

launch返回Job, 代表一個協程, 我們可以用Jobjoin()方法來顯式地等待這個協程結束:

fun main() = runBlocking {
    val job = GlobalScope.launch {
        // launch a new coroutine and keep a reference to its Job
        delay(1000L)
        println("World! + ${Thread.currentThread().name}")
    }
    println("Hello, + ${Thread.currentThread().name}")
    job.join() // wait until child coroutine completes
}

輸出結果和上面是一樣的.

Job還有一個重要的用途是cancel(), 用於取消不再需要的協程任務.

async: 從協程返回值

async開啟線程, 返回Deferred<T>, Deferred<T>Job的子類, 有一個await()函數, 可以返回協程的結果.

await()也是suspend函數, 只能在協程之內調用.

fun main() = runBlocking {
    // @coroutine#1
    println(Thread.currentThread().name)
    val deferred: Deferred<Int> = async {
        // @coroutine#2
        loadData()
    }
    println("waiting..." + Thread.currentThread().name)
    println(deferred.await()) // suspend @coroutine#1
}

suspend fun loadData(): Int {
    println("loading..." + Thread.currentThread().name)
    delay(1000L) // suspend @coroutine#2
    println("loaded!" + Thread.currentThread().name)
    return 42
}

運行結果:

main @coroutine#1
waiting...main @coroutine#1
loading...main @coroutine#2
loaded!main @coroutine#2
42

Context, Dispatcher和Scope

看一下launch方法的聲明:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
...
}

其中有幾個相關概念我們要了解一下.

協程總是在一個context下運行, 類型是接口CoroutineContext. 協程的context是一個索引集合, 其中包含各種元素, 重要元素就有Job和dispatcher. Job代表了這個協程, 那麼dispatcher是做什麼的呢?

構建協程的coroutine builder: launch, async, 都是CoroutineScope類型的擴展方法. 查看CoroutineScope接口, 其中含有CoroutineContext的引用. scope是什麼? 有什麼作用呢?

下面我們就來回答這些問題.

Dispatchers和線程

Context中的CoroutineDispatcher可以指定協程運行在什麼線程上. 可以是一個指定的線程, 線程池, 或者不限.

看一個例子:

fun main() = runBlocking<Unit> {
    launch {
        // context of the parent, main runBlocking coroutine
        println("main runBlocking      : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(Dispatchers.Unconfined) {
        // not confined -- will work with main thread
        println("Unconfined            : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(Dispatchers.Default) {
        // will get dispatched to DefaultDispatcher
        println("Default               : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(newSingleThreadContext("MyOwnThread")) {
        // will get its own new thread
        println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")
    }
}

運行后打印出:

Unconfined            : I'm working in thread main
Default               : I'm working in thread DefaultDispatcher-worker-1
newSingleThreadContext: I'm working in thread MyOwnThread
main runBlocking      : I'm working in thread main

API提供了幾種選項:

  • Dispatchers.Default代表使用JVM上的共享線程池, 其大小由CPU核數決定, 不過即便是單核也有兩個線程. 通常用來做CPU密集型工作, 比如排序或複雜計算等.
  • Dispatchers.Main指定主線程, 用來做UI更新相關的事情. (需要添加依賴, 比如kotlinx-coroutines-android.) 如果我們在主線程上啟動一個新的協程時, 主線程忙碌, 這個協程也會被掛起, 僅當線程有空時會被恢復執行.
  • Dispatchers.IO: 採用on-demand創建的線程池, 用於網絡或者是讀寫文件的工作.
  • Dispatchers.Unconfined: 不指定特定線程, 這是一個特殊的dispatcher.

如果不明確指定dispatcher, 協程將會繼承它被啟動的那個scope的context(其中包含了dispatcher).

在實踐中, 更推薦使用外部scope的dispatcher, 由調用方決定上下文. 這樣也方便測試.

newSingleThreadContext創建了一個線程來跑協程, 一個專註的線程算是一種昂貴的資源, 在實際的應用中需要被釋放或者存儲復用.

切換線程還可以用withContext, 可以在指定的協程context下運行代碼, 掛起直到它結束, 返回結果.
另一種方式是新啟一個協程, 然後用join明確地掛起等待.

在Android這種UI應用中, 比較常見的做法是, 頂部協程用CoroutineDispatchers.Main, 當需要在別的線程上做一些事情的時候, 再明確指定一個不同的dispatcher.

Scope是什麼?

launch, asyncrunBlocking開啟新協程的時候, 它們自動創建相應的scope. 所有的這些方法都有一個帶receiver的lambda參數, 默認的receiver類型是CoroutineScope.

IDE會提示this: CoroutineScope:

launch { /* this: CoroutineScope */
}

當我們在runBlocking, launch, 或async的大括號裏面再創建一個新的協程的時候, 自動就在這個scope里創建:

fun main() = runBlocking {
    /* this: CoroutineScope */
    launch { /* ... */ }
    // the same as:
    this.launch { /* ... */ }
}

因為launch是一個擴展方法, 所以上面例子中默認的receiver是this.
這個例子中launch所啟動的協程被稱作外部協程(runBlocking啟動的協程)的child. 這種”parent-child”的關係通過scope傳遞: child在parent的scope中啟動.

協程的父子關係:

  • 當一個協程在另一個協程的scope中被啟動時, 自動繼承其context, 並且新協程的Job會作為父協程Job的child.

所以, 關於scope目前有兩個關鍵知識點:

  • 我們開啟一個協程的時候, 總是在一個CoroutineScope里.
  • Scope用來管理不同協程之間的父子關係和結構.

協程的父子關係有以下兩個特性:

  • 父協程被取消時, 所有的子協程都被取消.
  • 父協程永遠會等待所有的子協程結束.

值得注意的是, 也可以不啟動協程就創建一個新的scope. 創建scope可以用工廠方法: MainScope()CoroutineScope().

coroutineScope()方法也可以創建scope. 當我們需要以結構化的方式在suspend函數內部啟動新的協程, 我們創建的新的scope, 自動成為suspend函數被調用的外部scope的child.

上面的父子關係, 可以進一步抽象到, 沒有parent協程, 由scope來管理其中所有的子協程.

Scope在實際應用中解決什麼問題呢? 如果我們的應用中, 有一個對象是有自己的生命周期的, 但是這個對象又不是協程, 比如Android應用中的Activity, 其中啟動了一些協程來做異步操作, 更新數據等, 當Activity被銷毀的時候需要取消所有的協程, 來避免內存泄漏. 我們就可以利用CoroutineScope來做這件事: 創建一個CoroutineScope對象和activity的生命周期綁定, 或者讓activity實現CoroutineScope接口.

所以, scope的主要作用就是記錄所有的協程, 並且可以取消它們.

A CoroutineScope keeps track of all your coroutines, and it can cancel all of the coroutines started in it.

Structured Concurrency

這種利用scope將協程結構化組織起來的機制, 被稱為”structured concurrency”.
好處是:

  • scope自動負責子協程, 子協程的生命和scope綁定.
  • scope可以自動取消所有的子協程.
  • scope自動等待所有的子協程結束. 如果scope和一個parent協程綁定, 父協程會等待這個scope中所有的子協程完成.

通過這種結構化的併發模式: 我們可以在創建top級別的協程時, 指定主要的context一次, 所有嵌套的協程會自動繼承這個context, 只在有需要的時候進行修改即可.

GlobalScope: daemon

GlobalScope啟動的協程都是獨立的, 它們的生命只受到application的限制. 即GlobalScope啟動的協程沒有parent, 和它被啟動時所在的外部的scope沒有關係.

launch(Dispatchers.Default) { ... }GlobalScope.launch { ... }用的dispatcher是一樣的.

GlobalScope啟動的協程並不會保持進程活躍. 它們就像daemon threads(守護線程)一樣, 如果JVM發現沒有其他一般的線程, 就會關閉.

參考

第三方博客:

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象