Future模式是一個重要的異步併發模式,在JDK有實現。但JDK實現的Future模式功能比較簡單,使用起來比較複雜。Netty在JDK Future基礎上,加強了Future的能力,具體體現在:
- 更加簡單的結果返回方式。在JDK中,需要用戶自己實現Future對象的執行及返回結果。而在Netty中可以使用Promise簡單地調用方法返回結果。
- 更加靈活的結果處理方式。JDK中只提供了主動得到結果的get方法,要麼阻塞,要麼輪詢。Netty除了支持主動get方法外,還可以使用Listener被動監聽結果。
- 實現了進度監控。Netty提供了ProgressiveFuture、ProgressivePromise和GenericProgressiveFutureListener接口及其實現,支持對執行進程的監控。
吹了那麼多牛,有一個關鍵問題還沒弄清楚:Future到底是幹嘛的?io.netty.util.concurrent.Future代碼的第一行註釋簡潔第回答了這個問題:Future就是異步操作的結果。這裏面有三個關鍵字:異步,操作,結果。首先,Future首先是一個“結果”;其次這個結果產生於一個“操作”,操作具體是什麼可以隨便定義;最後這個操作是”異步”執行的,這就意味着“操作”可能在另一個線程中併發執行,也可能隨後在同一個線程中執行,什麼時候產生結果是一件不確定的事。
異步調用過程的一般過程是:調用方喚起一個異步操作,在接下來的某個恰當的時間點得到的異步操作操作的結果。要正確地完成上述步驟,需要解決以下幾個問題:
- 怎樣維護這個調用狀態?
- 如何獲取異步操作的結果?
- 何時處理結果?
io.netty.util.concurrent.DefaultPromise是Future的默認實現,以上三個問題的答案都能在這個類的代碼中找到。
DefaultPromise的派生體系
下面是DefaultPromis及其父類,接口的聲明:
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V>
public abstract class AbstractFuture<V> implements Future<V>
public interface Promise<V> extends Future<V>
public interface Future<V> extends java.util.concurrent.Future<V>
可以看出,DefaultPromise派生自AbstractFuture類,並實現了Promise接口。抽象類型AbstractFuture派生自Future, 接口Promise派生自Future。Future派生自JDK的Future接口。
和JDK的Future相比,Netty的Future接口增加一些自己的方法:
/**
當操作成功時返回true*/
boolean isSuccess();
/**
只有當操作可以被取消時返回true
*/
boolean isCancellable();
/**
返回操作的異常*/
Throwable cause();
/**
添加一個監聽器到future。當操作完成(成功或失敗都算完成,此事isDone()返回true)時, 會通知這個監聽器。如果添加時操作已經完成,
這個監聽器會立即被通知。*/
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
/**
和上個方法一樣,可以同時添加多個監聽器*/
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
/**
刪除指定的監聽器, 如果這個監聽器還沒被通知的話。*/
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
/**
功能和上個方法一樣,可以同時刪除多個監聽器。*/
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
/**
同步等待直到操作完成。會被打斷。
*/
Future<V> sync() throws InterruptedException;
/**
同步等着知道操作完成。不會被打斷。
*/
Future<V> syncUninterruptibly();
/**
同sync*/
Future<V> await() throws InterruptedException;
/**
同synUniterruptibliy*/
Future<V> awaitUninterruptibly();
/**
等待,直到操作完成或超過指定的時間。會被打斷。*/
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
/**
同上*/
boolean await(long timeoutMillis) throws InterruptedException;
/**
同上,不會被打斷。*/
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
/**
同上。*/
boolean awaitUninterruptibly(long timeoutMillis);
/**
立即得到結果,不會阻塞。如果操作沒有完成或沒有成功,返回null*/
V getNow();
Netty的Future最大的特點是增加了Listener被動接收任務完成通知,下面是兩個Listener接口的定義:
public interface GenericFutureListener<F extends Future<?>> extends EventListener {
void operationComplete(F future) throws Exception;
}
public interface GenericProgressiveFutureListener<F extends ProgressiveFuture<?>> extends GenericFutureListener<F> {
void operationProgressed(F future, long progress, long total) throws Exception;
}
把一個listener添加到future之後。當異步操作完成之後,listener會被通知一次,同時會回調operationComplete方法。參數future是當前通知的future,這意味這,一個listener可以被添加到多個future中。
當異步操作進度發送變化時,listener會被通知,同時會回調operationProgressed方法。progress是當前進度,total是總進度。progress==total表示操作完成。如果不知道何時完成操作progress=-1。
Promise定義的方法:
/**
設置結果。把這個future設置為success,通知所有的listener,
如果這個future已經是success或failed(操作已經完成),會拋出IllegalStateException
*/
Promise<V> setSuccess(V result);
/**
同上。只有在操作沒有完成的時候才會生效,且會返回true
*/
boolean trySuccess(V result);
/** 設置異常。把這個future設置為failed狀態,通知所有的listener.
如果這個future已經完成,會拋出IllegalStateException
*/
Promise<V> setFailure(Throwable cause);
/** 同上。只有在操作沒有完成時才會生效,且返回ture
*/
boolean tryFailure(Throwable cause);
/** 設置當前前future的操作不能被取消。這個future沒有完成且可以設置成功或這個future已經完成,返回true。否則返回false
*/
boolean setUncancellable();
DefaultPromise的設計
關鍵屬性
volatile Object result;
異步操作的結果。可以通過它的值知道當前future的狀態。
final EventExecutor executor;
通知listener的線程。
Object listeners;
維護添加到當前future的listener對象。
short waiters;
記錄當前真正等待結果的線程數量。
boolean notifyingListeners;
是否正在通知listener,防止多線程併發執行通知操作。
狀態管理
future有4種狀態: 未完成, 未完成-不能取消,完成-成功,完成-失敗。使用isDone()判斷是否完成,它代碼如下:
1 @Override
2 public boolean isDone() {
3 return isDone0(result);
4 }
5
6 private static boolean isDone0(Object result) {
7 return result != null && result != UNCANCELLABLE;
8 }
第7行是判斷當前完成狀態的。result != null 且 result != UNCANCELLABLE,表示處於完成狀態。
result默認是null, 此時future處於未完成狀態。可以使用setUncancellable方法把它設置成為完成-不能取消狀態。
1 @Override
2 public boolean setUncancellable() {
3 if (RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
4 return true;
5 }
6 Object result = this.result;
7 return !isDone0(result) || !isCancelled0(result);
8 }
第3行,使用原子操作設置result的值,只有result==null時才能把result設置成UNCANCELLABLE。當result==UNCANCELLABLE時,不允許取消異步操作。
使用isSuccess方法判斷future是否處於完成-成功狀態。
1 @Override
2 public boolean isSuccess() {
3 Object result = this.result;
4 return result != null && result != UNCANCELLABLE && !(result instanceof CauseHolder);
5 }
第4行是完成-成功狀態result的取值:除null, UNCANCELLABLE和CauseHolder對象的任何值。
只有滿足isDone() && !isSuccess()時,future處於完成失敗狀態,可以使用cause方法獲取異常。
調用setSuccess和trySuccess方法,能夠把狀態轉換成完成-成功。
1 @Override
2 public Promise<V> setSuccess(V result) {
3 if (setSuccess0(result)) {
4 notifyListeners();
5 return this;
6 }
7 throw new IllegalStateException("complete already: " + this);
8 }
9
10 private boolean setSuccess0(V result) {
11 return setValue0(result == null ? SUCCESS : result);
12 }
第3行嘗試把狀態設置成完成-成功狀態。如果可以,在第4行通知所有的listener。否則第7行拋出錯誤。第11行給出了成功的默認值SUCCESS。trySuccess少了第7行,不會拋出異常。
調用setFailure和tryFailure方法,能夠包狀態轉換成完成-失敗狀態。
1 @Override
2 public Promise<V> setFailure(Throwable cause) {
3 if (setFailure0(cause)) {
4 notifyListeners();
5 return this;
6 }
7 throw new IllegalStateException("complete already: " + this, cause);
8 }
9
10 private boolean setFailure0(Throwable cause) {
11 return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
12 }
第3行嘗試把專題設置成完成-失敗狀態。如果可以,在第4行通知所有listener。否則在第7行拋出異常。第11行把異常包裝成CauseHolder對象。tryFailure少了第7行,不會拋出異常。
獲取異步操作的結果
當異步操作完成時,調用Promise提供的setSuccess和trySuccess設置成功的結果,調用setFailure和tryFailure設置異常結果。不論什麼結果,都會使用setValue0方法保存到result屬性上。
1 private boolean setValue0(Object objResult) {
2 if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
3 RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
4 checkNotifyWaiters();
5 return true;
6 }
7 return false;
8 }
第2,3行,使用原子操作設置result的值,只有result==null或result==UNCANCELLABLE時,才能設置成功。如果設置成功,在第4行喚醒所有等待中的線程。可以使用get方法得到result值。如果isSucess()==true, result的值是SUCCESS或異步操作的結果。否則result的值是CauseHolder對象,此時可以調用cause方法得到異常對象。
使用get或cause,只有在異步操作完成后才能順利得到結果。可以使用listener,被動等待操作完成通知。
使用listener異步通知處理結果
Future的listener是必須實現GenericFutureListener接口,調用方法可以在operationComplete方法中處理異步操作的結果。
listeners屬性用來保存使用addListener,addListeners方法添加到future的listener。listeners可能使用一個GenericFutureListener對象,也可能是一個GenericFutureListener數組。所有添加listener方法都會調用addListener0方法添加listener。
1 private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
2 if (listeners == null) {
3 listeners = listener;
4 } else if (listeners instanceof DefaultFutureListeners) {
5 ((DefaultFutureListeners) listeners).add(listener);
6 } else {
7 listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);
8 }
9 }
這段代碼中使用了一個DefaultFutureListeners類,它內部維護了一個GenericFutureListener數組。
當一次操作完成時,會調用notifyListeners方法通知listeners中所有的listener,並調用listener的operationComplete方法。只有當isDone()==true時才會調用notifyListeners方法。觸發點在下面的一些方法中:
addListener, addListeners。
setSuccess, trySuccess。
setFailure, tryFailure。
notifyListeners的代碼如下:
1 private void notifyListeners() {
2 EventExecutor executor = executor();
3 if (executor.inEventLoop()) {
4 final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
5 final int stackDepth = threadLocals.futureListenerStackDepth();
6 if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
7 threadLocals.setFutureListenerStackDepth(stackDepth + 1);
8 try {
9 notifyListenersNow();
10 } finally {
11 threadLocals.setFutureListenerStackDepth(stackDepth);
12 }
13 return;
14 }
15 }
16
17 safeExecute(executor, new Runnable() {
18 @Override
19 public void run() {
20 notifyListenersNow();
21 }
22 });
23 }
這段代碼的作用是調用notifyListenersNow。如果當前線程就是executor的線程,在第9行直接調用notifyListenerNow,否則在第20行,把notifyListnerNow放在executor中執行。第4-7行和11行的作用是防止遞歸調用導致線程棧溢出,MAX_LISTENER_STACK_DEPTH就是listener遞歸調用的最大深度。
notifyListenerNow的作用是,確保沒有併發執行notifyListener0或notifyListners0方法,且所有的listener只能被通知一次。
1 private void notifyListenersNow() {
2 Object listeners;
3 synchronized (this) {
4 // Only proceed if there are listeners to notify and we are not already notifying listeners.
5 if (notifyingListeners || this.listeners == null) {
6 return;
7 }
8 notifyingListeners = true;
9 listeners = this.listeners;
10 this.listeners = null;
11 }
12 for (;;) {
13 if (listeners instanceof DefaultFutureListeners) {
14 notifyListeners0((DefaultFutureListeners) listeners);
15 } else {
16 notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners);
17 }
18 synchronized (this) {
19 if (this.listeners == null) {
20 // Nothing can throw from within this method, so setting notifyingListeners back to false does not
21 // need to be in a finally block.
22 notifyingListeners = false;
23 return;
24 }
25 listeners = this.listeners;
26 this.listeners = null;
27 }
28 }
29 }
第3-11行的作用是防止多個線程併發執行11行之後的代碼。
結合第5,9,10行可知, listeners中的所有listener只能被通知一次。
13-17行,通知所有listeners。notifyListener0通知一個listener,notifyListeners0通知所有的listener。
最後,18-27行,檢查在通知listeners的過程中,是否有新的listener被添加進來。如果有,25,26行得到所有新添加的listener並清空listeners屬性,13-17行繼續通知新添加的listener。否則,運行22,23行結束通知過程。
1 private void notifyListeners0(DefaultFutureListeners listeners) {
2 GenericFutureListener<?>[] a = listeners.listeners();
3 int size = listeners.size();
4 for (int i = 0; i < size; i ++) {
5 notifyListener0(this, a[i]);
6 }
7 }
8
9 @SuppressWarnings({ "unchecked", "rawtypes" })
10 private static void notifyListener0(Future future, GenericFutureListener l) {
11 try {
12 l.operationComplete(future);
13 } catch (Throwable t) {
14 logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
15 }
16 }
1-7行,notifyListeners0對每個listener調用一次notifyListener0,參數是當前的future。
10-16,調用listener的operationComplete方法,捕獲了所有的異常,確保接下來可以繼續通知下一個listener。
使用await機制同步等待結果
可以使用一系列的await,awaitXXX方法同步等待結果。這些方法可以分為: 能被打斷的,不能被打斷的。一直等待的,有超時時間的。await0方法是最複雜的等待實現,所有帶超時時間的await方法都會調用它。
1 private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
2 if (isDone()) {
3 return true;
4 }
5
6 if (timeoutNanos <= 0) {
7 return isDone();
8 }
9
10 if (interruptable && Thread.interrupted()) {
11 throw new InterruptedException(toString());
12 }
13
14 checkDeadLock();
15
16 long startTime = System.nanoTime();
17 long waitTime = timeoutNanos;
18 boolean interrupted = false;
19 try {
20 for (;;) {
21 synchronized (this) {
22 if (isDone()) {
23 return true;
24 }
25 incWaiters();
26 try {
27 wait(waitTime / 1000000, (int) (waitTime % 1000000));
28 } catch (InterruptedException e) {
29 if (interruptable) {
30 throw e;
31 } else {
32 interrupted = true;
33 }
34 } finally {
35 decWaiters();
36 }
37 }
38 if (isDone()) {
39 return true;
40 } else {
41 waitTime = timeoutNanos - (System.nanoTime() - startTime);
42 if (waitTime <= 0) {
43 return isDone();
44 }
45 }
46 }
47 } finally {
48 if (interrupted) {
49 Thread.currentThread().interrupt();
50 }
51 }
52 }
這個方法返回的條件有: (1)isDone()==true;(2)允許被打斷(interrupted==true)的情況下被打斷;(3)已經超時。2-12行分別檢查了這3種情況。
25,35行管理waiters屬性,這個屬性用來記錄當前正在等待的線程數。inWaiters方法正常情況下會把waiters加1,當檢查到waiters==Short.MAX_VALUE時會拋出異常,防止過多的線程等待。
27行,調用wait等待,經歷waitTime后超時返回。在等待過程中,會被setValue0方法調用notifyAll喚醒。
29-33行,處理被打斷的異常,如果運行被打斷,在30行拋出這個異常返回。
38-45行,不論什麼原因線程被喚醒,檢查是否滿足返回條件,如果不滿足,繼續循環等待。
沒有超時的wait方法實現要簡單一些,只需判讀返回條件(1)(2)。
跟蹤異步操作的執行進度
如果想要跟蹤異步操作的執行進度,future需要換成DefaultProgressivePromise對象,listener需要換成GenericProgressiveFutureListener類型。DefaultProgressivePromise派生自DefaultPromise同時實現了ProgressivePromise接口。GenericProgressiveFutureListener接口派生自GenericFutureListener接口。
ProgressivePromise定義了setProgress和tryProgress方法用來更新進度,是不是很眼熟,和Promise接口定義返回結果的方法很類似。
ProgressivePromise<V> setProgress(long progress, long total);
boolean tryProgress(long progress, long total);
GenericProgressiveFutureListener定義了operationProgressed方法用來處理進度更新通知。
void operationProgressed(F future, long progress, long total) throws Exception;
DefaultProgressivePromise自己只實現了setProgress和tryProgress方法,其它都是復用了DefaultPromise的實現。
1 @Override
2 public ProgressivePromise<V> setProgress(long progress, long total) {
3 if (total < 0) {
4 // total unknown
5 total = -1; // normalize
6 if (progress < 0) {
7 throw new IllegalArgumentException("progress: " + progress + " (expected: >= 0)");
8 }
9 } else if (progress < 0 || progress > total) {
10 throw new IllegalArgumentException(
11 "progress: " + progress + " (expected: 0 <= progress <= total (" + total + "))");
12 }
13
14 if (isDone()) {
15 throw new IllegalStateException("complete already");
16 }
17
18 notifyProgressiveListeners(progress, total);
19 return this;
20 }
3-12行,檢查progress和total的合法性。
14行,如isDone()==true,拋出異常。只有在操作還沒完成的是否更新進度才有意義。
18行,調用notifyProgressiveListeners觸發進度更新通知,這個方法在DefaultPromise中實現。
notifyProgressiveListeners實現了觸發進度更新通知的主要流程:
1 void notifyProgressiveListeners(final long progress, final long total) {
2 final Object listeners = progressiveListeners();
3 if (listeners == null) {
4 return;
5 }
6
7 final ProgressiveFuture<V> self = (ProgressiveFuture<V>) this;
8
9 EventExecutor executor = executor();
10 if (executor.inEventLoop()) {
11 if (listeners instanceof GenericProgressiveFutureListener[]) {
12 notifyProgressiveListeners0(
13 self, (GenericProgressiveFutureListener<?>[]) listeners, progress, total);
14 } else {
15 notifyProgressiveListener0(
16 self, (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners, progress, total);
17 }
18 } else {
19 if (listeners instanceof GenericProgressiveFutureListener[]) {
20 final GenericProgressiveFutureListener<?>[] array =
21 (GenericProgressiveFutureListener<?>[]) listeners;
22 safeExecute(executor, new Runnable() {
23 @Override
24 public void run() {
25 notifyProgressiveListeners0(self, array, progress, total);
26 }
27 });
28 } else {
29 final GenericProgressiveFutureListener<ProgressiveFuture<V>> l =
30 (GenericProgressiveFutureListener<ProgressiveFuture<V>>) listeners;
31 safeExecute(executor, new Runnable() {
32 @Override
33 public void run() {
34 notifyProgressiveListener0(self, l, progress, total);
35 }
36 });
37 }
38 }
39 }
第3行,從listeners中選出GenericProgressiveFutureListener類型的listener。
10-38行。調用notifyProgressiveListeners0, notifyProgressiveListener0通知進度跟新。11-17行,在當前線程中調用。
19-37行,在executor中調用。notifyProgressiveListener0隻是簡單地調用listener的operationProgressed方法。notifyProgressiveListeners0是對每個listener調用一次notifyProgressiveListener0。
和完成通知相比,進度更新通知要更加簡單。進度更新通知沒有處理併發問題,沒有處理棧溢出問題。
本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】
※網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線
※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益
※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象
※台灣寄大陸海運貨物規則及重量限制?
※大陸寄台灣海運費用試算一覽表
※台中搬家,彰化搬家,南投搬家前需注意的眉眉角角,別等搬了再說!