本文將深入解析 CompletableFuture,希望對各位讀者能有所幫助。
CompletableFuture 適用于以下場景
下面是一個演示 CompletableFuture 如何使用的代碼示例:
public class CompletableFutureExample { public static void main(String[] args) { // 創(chuàng)建CompletableFuture對象,并定義異步任務(wù) CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // 異步任務(wù)的邏輯代碼 // 在這里執(zhí)行耗時操作或其他需要異步執(zhí)行的任務(wù) try { TimeUnit.SECONDS.sleep(2); // 模擬耗時操作 } catch (InterruptedException e) { e.printStackTrace(); } return "Hello, "; }); // 添加任務(wù)完成后的回調(diào)方法 CompletableFuture<String> resultFuture = future.thenApplyAsync(result -> { // 任務(wù)完成后的處理邏輯 // result為上一步任務(wù)的結(jié)果 return result + "World!"; }); // 組合多個CompletableFuture對象 CompletableFuture<String> combinedFuture = future.thenCombine(resultFuture, (result1, result2) -> { // 對多個CompletableFuture的結(jié)果進行組合處理 return result1 + result2 + " Welcome to the CompletableFuture world!"; }); // 異常處理 CompletableFuture<String> exceptionHandledFuture = combinedFuture.exceptionally(ex -> { // 異常處理邏輯 System.out.println("任務(wù)執(zhí)行出現(xiàn)異常:" + ex.getMessage()); return "Fallback Result"; }); // 等待并獲取任務(wù)的結(jié)果 try { String result = exceptionHandledFuture.get(); System.out.println("任務(wù)的最終結(jié)果為:" + result); } catch (InterruptedException | ExecutionException e) { // 處理異常情況 e.printStackTrace(); } }}
結(jié)果輸出:
任務(wù)的最終結(jié)果為:Hello, Hello, World! Welcome to the CompletableFuture world!
首先,我們創(chuàng)建了一個CompletableFuture對象future。在future中,我們使用supplyAsync方法定義了一個異步任務(wù),其中 lambda表達式 中的代碼會在另一個線程中執(zhí)行。在這個例子中,我們模擬了一個耗時操作,通過TimeUnit.SECONDS.sleep(2)暫停了2秒鐘。
然后,我們添加了一個回調(diào)方法resultFuture。在這個回調(diào)方法中,將前一個異步任務(wù)的結(jié)果作為參數(shù)進行處理,并返回處理后的新結(jié)果。在這個例子中,我們將前一個任務(wù)的結(jié)果與字符串 "World!" 連接起來,形成新的結(jié)果。
接下來,我們使用thenCombine方法組合了兩個CompletableFuture對象:future和resultFuture。在這個組合任務(wù)中,我們將兩個任務(wù)的結(jié)果進行組合處理,返回最終的結(jié)果。在這個例子中,我們將前兩個任務(wù)的結(jié)果與字符串 " Welcome to the CompletableFuture world!" 連接起來。
此外,我們還處理了異常情況。通過exceptionally方法,我們定義了一個異常處理回調(diào)方法。如果在任務(wù)執(zhí)行過程中發(fā)生了異常,我們可以在這里對異常進行處理,并返回一個默認值作為結(jié)果。
最后,我們使用get方法等待并獲取最終的任務(wù)結(jié)果。需要注意的是,get方法可能會阻塞當(dāng)前線程,直到任務(wù)完成并返回結(jié)果。在這個例子中,我們使用try-catch塊捕獲可能的異常情況,并打印出最終的任務(wù)結(jié)果。
這個例子只是部分展示了CompletableFuture的功能,實際上它比你想象的還要強大!
CompletableFuture 的源碼非常龐大和復(fù)雜,涉及到并發(fā)、線程池、同步機制等多方面的知識。在這里,我們只重點介紹 CompletableFuture 的核心實現(xiàn)原理。
圖片
CompletableFuture 的作者是大名鼎鼎的 Doug Lea。CompletableFuture 類是實現(xiàn)了 Future 和 CompletionStage 接口的一個關(guān)鍵類。它可以表示異步計算的結(jié)果,并提供了一系列方法來操作和處理這些結(jié)果。
CompletableFuture 內(nèi)部使用了一個屬性result來保存計算結(jié)果,以及若干個屬性waiters來保存等待結(jié)果的任務(wù)。當(dāng)計算完成后,CompletableFuture將會通知所有等待結(jié)果的任務(wù),并將結(jié)果傳遞給它們。
為了實現(xiàn)鏈?zhǔn)讲僮鳎珻ompletableFuture還定義了內(nèi)部類:Completion, UniCompletion, 和 BiCompletion。
Completion, UniCompletion, 和 BiCompletion 是 CompletableFuture 內(nèi)部用于處理異步任務(wù)完成的輔助類。
這些輔助類在 CompletableFuture 的內(nèi)部被使用,以實現(xiàn)異步任務(wù)的執(zhí)行、結(jié)果的處理和組合等操作。它們提供了一種靈活的方式來處理異步任務(wù)的完成情況,并通過回調(diào)方法或其他一些方法來處理任務(wù)的結(jié)果和異常。
圖片
CompletableFuture中包含兩個字段:result 和 stack。result 用于存儲當(dāng)前CF的結(jié)果,stack (Completion)表示當(dāng)前CF完成后需要觸發(fā)的依賴動作(Dependency Actions),去觸發(fā)依賴它的CF的計算,依賴動作可以有多個(表示有多個依賴它的CF),以棧(Treiber stack)的形式存儲,stack表示棧頂元素。
CompletableFuture 在設(shè)計思想上類似 “觀察者模式,每個 CompletableFuture 都可以被看作一個被觀察者,其內(nèi)部有一個Completion類型的鏈表成員變量stack,用來存儲注冊到其中的所有觀察者。當(dāng)被觀察者執(zhí)行完成后會彈棧stack屬性,依次通知注冊到其中的觀察者。
CompletableFuture 的執(zhí)行流程如下:
當(dāng)異步任務(wù)完成時,它會設(shè)置自己的結(jié)果值,將狀態(tài)標(biāo)記為已完成。
如果有其他線程在此之前調(diào)用了complete()、completeExceptionally()、cancel()等方法,可能會影響任務(wù)的最終狀態(tài)。
請注意,以上步驟的順序和具體實現(xiàn)可能略有不同,但大致上反映了CompletableFuture的執(zhí)行流程。在實際應(yīng)用中,我們可以根據(jù)需求選擇適合的方法來處理異步任務(wù)的完成情況、結(jié)果、異常以及任務(wù)之間的關(guān)系。
CompletableFuture類提供了一系列用于處理和組合異步任務(wù)的方法。以下是這些方法的介紹:
創(chuàng)建一個 CompletableFuture 對象有以下幾種方法:
CompletableFuture<String> future = new CompletableFuture<>();
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // 異步任務(wù)邏輯 return "Result";});CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { // 異步任務(wù)邏輯});
CompletableFuture<Integer> transformedFuture = originalFuture.thenApply(result -> { // 轉(zhuǎn)換邏輯 return result.length();});originalFuture.thenAccept(result -> { // 處理結(jié)果邏輯 System.out.println("Result: " + result);});CompletableFuture<Void> runnableFuture = originalFuture.thenRun(() -> { // 在結(jié)果完成后執(zhí)行的操作});
//CompletableFuture.completedFuture()直接創(chuàng)建一個已完成狀態(tài)的CompletableFutureCompletableFuture<String> cf2 = CompletableFuture.completedFuture("result");//先初始化一個未完成的CompletableFuture,然后通過complete()、completeExceptionally(),也完成該CompletableFutureCompletableFuture<String> cf = new CompletableFuture<>();cf.complete("success");
CompletionStage<Integer> stage = CompletableFuture.supplyAsync(() -> 42);CompletableFuture<Integer> future = stage.toCompletableFuture();
用于將當(dāng)前的 CompletionStage 對象轉(zhuǎn)換為一個 CompletableFuture 對象。
以下是在 CompletableFuture 對象上異步執(zhí)行任務(wù)的一些方法示例:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // 異步任務(wù)邏輯 return "Result";});
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { // 異步任務(wù)邏輯});
CompletableFuture提供了不同的方式來對異步任務(wù)進行鏈?zhǔn)讲僮鳌?span style="display:none">nkf28資訊網(wǎng)——每日最新資訊28at.com
CompletableFuture<Void> executedFuture = future.thenRun(() -> executeTask());
thenRun方法用于在CompletableFuture完成后執(zhí)行一個Runnable任務(wù)。它返回一個新的CompletableFuture對象,該對象沒有返回值。
CompletableFuture<Void> acceptedFuture = future.thenAccept(result -> processResult(result));
thenAccept方法用于在CompletableFuture完成后對結(jié)果進行處理。它接收一個Consumer函數(shù)作為參數(shù),并返回一個新的CompletableFuture對象。
CompletableFuture<U> appliedFuture = future.thenApply(result -> transformResult(result));
thenApply方法用于在CompletableFuture完成后對結(jié)果進行轉(zhuǎn)換。它接收一個Function函數(shù)作為參數(shù),并返回一個新的CompletableFuture對象。
CompletableFuture<U> composedFuture = future.thenCompose(result -> executeAnotherTask(result));
用于對異步任務(wù)的結(jié)果進行處理,并返回一個新的異步任務(wù)。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 42);CompletableFuture<Void> whenCompleteFuture = future.whenComplete((result, exception) -> { if (exception != null) { System.out.println("Exception occurred: " + exception.getMessage()); } else { System.out.println("Result: " + result); }});whenCompleteFuture.join();
用于在異步任務(wù)完成后執(zhí)行指定的動作。它允許你在任務(wù)完成時處理結(jié)果或處理異常。
CompletableFuture還提供了一系列方法來組合和處理多個異步任務(wù)的結(jié)果。
CompletableFuture<Void> allFuture = CompletableFuture.allOf(future1, future2, future3);
allOf方法接收一組CompletableFuture對象作為參數(shù),并返回一個新的CompletableFuture對象,該對象在所有給定的CompletableFuture都完成時完成。這樣我們可以等待所有任務(wù)都完成后再進行下一步操作。
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2, future3);
anyOf方法與allOf類似,不同之處在于它返回的CompletableFuture對象在任何一個給定的CompletableFuture完成時就完成。這樣我們可以獲取最先完成的任務(wù)的結(jié)果。
CompletableFuture<U> combinedFuture = future1.thenCombine(future2, (result1, result2) -> combineResults(result1, result2));
thenCombine方法接收兩個CompletableFuture對象和一個函數(shù)作為參數(shù),用于指定當(dāng)這兩個CompletableFuture都完成時如何處理它們的結(jié)果。返回的新的CompletableFuture對象將接收到計算后的結(jié)果。
CompletableFuture<U> resultFuture = future1.applyToEither(future2, result -> processResult(result));
applyToEither方法用于獲取兩個CompletableFuture中任意一個完成的結(jié)果,并對該結(jié)果進行處理。它接收一個Function函數(shù)作為參數(shù),并返回一個新的CompletableFuture對象。
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20);future1.acceptEither(future2, result -> { System.out.println("Result: " + result);});
用于在兩個 CompletableFuture 對象中任意一個完成時執(zhí)行指定的操作。該方法接收兩個參數(shù):另一個 CompletableFuture 對象和一個消費者函數(shù)(Consumer)。當(dāng)其中任何一個 CompletableFuture 完成時,將其結(jié)果作為參數(shù)傳遞給消費者函數(shù)進行處理。
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 42);CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Hello");CompletableFuture<Void> combinedFuture = future1.runAfterBoth(future2, () -> { System.out.println("Both futures completed");});combinedFuture.join();
用于在兩個異步任務(wù)都完成后執(zhí)行指定的動作,需要注意的是,runAfterBoth() 方法是一個非阻塞方法,動作將在兩個異步任務(wù)都完成后立即執(zhí)行。
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } return 42;});CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "Hello";});CompletableFuture<Void> eitherFuture = future1.runAfterEither(future2, () -> { System.out.println("One of the futures completed");});eitherFuture.join();
用于在兩個異步任務(wù)中任意一個完成后執(zhí)行指定的動作。
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 42);CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Hello");CompletableFuture<Void> thenAcceptBothFuture = future1.thenAcceptBoth(future2, (result1, result2) -> { System.out.println("Action executed with thenAcceptBoth(): " + result1 + ", " + result2);});thenAcceptBothFuture.join();
用于在兩個異步任務(wù)都完成后執(zhí)行指定的動作。它的作用是接收兩個異步任務(wù)的結(jié)果,并將結(jié)果作為參數(shù)傳遞給指定的消費者函數(shù)。
CompletableFuture提供了多種方式來處理異步任務(wù)的異常情況。
CompletableFuture<U> exceptionHandledFuture = future.exceptionally(ex -> handleException(ex));
通過exceptionally方法,我們可以對CompletableFuture的異常情況進行處理。它接收一個Function函數(shù)作為參數(shù),用于處理異常并返回一個新的CompletableFuture對象。
CompletableFuture<U> handledFuture = future.handle((result, ex) -> handleResult(result, ex));
handle方法可以同時處理正常結(jié)果和異常情況。它接收一個BiFunction函數(shù)作為參數(shù),用于處理結(jié)果和異常,并返回一個新的CompletableFuture對象。
future.completeExceptionally();
異常地完成 CompletableFuture,將結(jié)果設(shè)置為一個異常。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Something went wrong");});boolean completedExceptionally = future.isCompletedExceptionally();System.out.println("Is completed exceptionally: " + completedExceptionally);
該方法返回一個布爾值,表示當(dāng)前異步任務(wù)是否已經(jīng)異常完成。
CompletableFuture<Integer> future = new CompletableFuture<>();future.obtrudeException(new RuntimeException("Something went wrong"));boolean completedExceptionally = future.isCompletedExceptionally();System.out.println("Is completed exceptionally: " + completedExceptionally);
用于強制將指定的異常作為異步任務(wù)的結(jié)果,調(diào)用 obtrudeException(Throwable ex) 方法后,異步任務(wù)將立即完成,并將指定的異常作為結(jié)果返回。
future.join()
join() 方法不會拋出已檢查異常,因為它是基于 CompletableFuture 類設(shè)計的,如果異步任務(wù)拋出異常,join() 方法會將該異常包裝在 CompletionException 中并拋出。
future.get()
get() 方法會拋出一個 InterruptedException 異常和一個 ExecutionException 異常,前者表示獲取結(jié)果時被中斷,后者表示獲取結(jié)果時任務(wù)本身拋出了異常。
future.get(1,TimeUnit.Hours)
有異常則拋出異常,最長等待一個小時,一個小時之后,如果還沒有數(shù)據(jù),則異常。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { // 異步任務(wù)邏輯 return 42;});int result = future.getNow(0); // 獲取異步操作的結(jié)果,如果尚未完成,則返回默認值0System.out.println("Result: " + result);
getNow(T value) 是 CompletableFuture 類的一個方法,用于獲取異步操作的結(jié)果,如果異步操作尚未完成,則返回給定的默認值,該方法會立即返回結(jié)果,不會阻塞當(dāng)前線程。
CompletableFuture也支持超時控制和取消操作,以便更好地管理異步任務(wù)的執(zhí)行。
CompletableFuture<U> timeoutFuture = future.completeOnTimeout(defaultResult, timeout, timeUnit);
completeOnTimeout方法在指定的超時時間內(nèi)等待CompletableFuture的完成,如果超時則將其設(shè)置為默認結(jié)果。它返回一個新的CompletableFuture對象。
boolean isCancelled = future.cancel(true);
cancel方法可用于取消CompletableFuture的執(zhí)行。它接收一個boolean參數(shù),指示是否中斷正在執(zhí)行的任務(wù)。返回值表示是否成功取消了任務(wù)。
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { // 異步任務(wù)邏輯 return 42;});future.cancel(true); // 取消異步任務(wù)boolean isCancelled = future.isCancelled();System.out.println("Is cancelled: " + isCancelled);
isCancelled() 是 CompletableFuture 類的一個方法,用于判斷當(dāng)前異步任務(wù)是否已被取消。如果異步任務(wù)已被取消,則返回 true;否則返回 false。
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 10);CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 20);CompletableFuture<Integer> combinedFuture = future1.thenCombine(future2, (result1, result2) -> result1 + result2);int numberOfDependents = combinedFuture.getNumberOfDependents();System.out.println("Number of dependents: " + numberOfDependents);
getNumberOfDependents() 用于獲取當(dāng)前 CompletableFuture 對象所依賴的其他異步任務(wù)的數(shù)量。如果沒有任何依賴任務(wù),或者所有依賴任務(wù)已經(jīng)完成,則返回的數(shù)量為0。
future.complete("米飯");
complete(T value):該方法返回布爾值,表示是否成功地將結(jié)果設(shè)置到 CompletableFuture 中。如果 CompletableFuture 未完成,則將結(jié)果設(shè)置,并返回 true;如果 CompletableFuture 已經(jīng)完成,則不進行任何操作并返回 false。
CompletableFuture<Integer> future = new CompletableFuture<>();future.obtrudeValue(42);boolean completedNormally = future.isDone() && !future.isCompletedExceptionally();System.out.println("Is completed normally: " + completedNormally);
用于強制將指定的值作為異步任務(wù)的結(jié)果,調(diào)用 obtrudeValue(T value) 方法后,異步任務(wù)將立即完成,并將指定的值作為結(jié)果返回。
與 complete() 不同,obtrudeValue() 必須在任務(wù)已經(jīng)完成的情況下調(diào)用,否則會引發(fā) IllegalStateException 異常。并且complete() 方法對于已經(jīng)完成的任務(wù)會忽略額外的完成操作,并返回 false。而obtrudeValue() 方法即使任務(wù)已經(jīng)完成,仍然會強制使用新的結(jié)果值,并返回 true。
CompletableFuture<Integer> future = CompletableFuture.completedFuture(42);boolean done = future.isDone();System.out.println("Is done: " + done);
用于判斷當(dāng)前異步任務(wù)是否已經(jīng)完成(無論是正常完成還是異常完成)。
CompletableFuture也支持并發(fā)限制,以控制同時執(zhí)行的異步任務(wù)數(shù)量。
Executor executor = Executors.newFixedThreadPool(10);CompletableFuture<U> future = CompletableFuture.supplyAsync(() -> doSomething(), executor);
我們可以通過使用線程池來限制CompletableFuture的并發(fā)執(zhí)行數(shù)量。通過創(chuàng)建一個固定大小的線程池,并將其作為參數(shù)傳遞給CompletableFuture,就可以控制并發(fā)執(zhí)行任務(wù)的數(shù)量。
CompletableFuture類提供了許多方法,但實際上常用的方法只有幾個。為了方便記憶,以下是一些總結(jié)的規(guī)律:
掌握以上規(guī)律后,就可以基本記住大部分方法,剩下的其他方法可以單獨記憶。
本文詳細探討了 CompletableFuture 的原理和方法,學(xué)習(xí)了如何在任務(wù)完成后執(zhí)行操作、處理結(jié)果和轉(zhuǎn)換結(jié)果。
CompletableFuture是Java中強大的異步編程工具之一,合理利用它的方法和策略可以更好地處理異步任務(wù)和操作。
本文鏈接:http://www.tebozhan.com/showinfo-26-60983-0.htmlCompletableFuture深度解析
聲明:本網(wǎng)頁內(nèi)容旨在傳播知識,若有侵權(quán)等問題請及時與本網(wǎng)聯(lián)系,我們將在第一時間刪除處理。郵件:2376512515@qq.com