任務編排(Task Orchestration)是指管理和控制多個任務的執行流程,確保它們按照預定的順序正確執行。
在復雜的業務場景中,任務間通常存在依賴關系,也就是某個任務會依賴另一個任務的執行結果,在這種情況下,我們需要通過任務編排,來確保任務按照正確的順序進行執行。
例如,以下任務的執行順序:
其中,任務二要等任務一執行完才能執行,而任務四要等任務二和任務三全部執行完才能執行。
任務編排和控制的主要手段有以下:
但如果是全局線程池,想要實現精準的任務編排,只能使用 Future 或 CompletableFuture。
使用 Future 實現上述 4 個任務的編排(任務二要等任務一執行完才能執行,而任務四要等任務二和任務三全部執行完才能執行):
import java.util.concurrent.*;import java.util.Arrays;public class TaskOrchestrator { public static void main(String[] args) { // 創建一個線程池來執行任務 ExecutorService executor = Executors.newFixedThreadPool(5); // 定義任務一 Future<String> taskOneResult = executor.submit(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(2000); // 模擬耗時操作 return "Task One Result"; } }); // 定義任務二,依賴任務一 Future<String> taskTwoResult = executor.submit(new Callable<String>() { @Override public String call() throws Exception { String result = taskOneResult.get(); // 阻塞等待任務一完成 Thread.sleep(1000); // 模擬耗時操作 return "Task Two Result, got: " + result; } }); // 定義任務三 Future<String> taskThreeResult = executor.submit(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(1500); // 模擬耗時操作 return "Task Three Result"; } }); // 定義任務四,依賴任務二和任務三 Future<String> taskFourResult = executor.submit(new Callable<String>() { @Override public String call() throws Exception { String taskTwoOutput = taskTwoResult.get(); // 阻塞等待任務二完成 String taskThreeOutput = taskThreeResult.get(); // 阻塞等待任務三完成 Thread.sleep(500); // 模擬耗時操作 return "Task Four Result, got: " + taskTwoOutput + " and " + taskThreeOutput; } }); // 打印最終結果 try { System.out.println("Final Result: " + taskFourResult.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }}
CompletableFutrue 提供的方法有很多,但最常用和最實用的核心方法只有以下幾個:
接下來,使用 CompletableFuture 實現上述 4 個任務的編排(任務二要等任務一執行完才能執行,而任務四要等任務二和任務三全部執行完才能執行):
import java.util.concurrent.CompletableFuture;import java.util.concurrent.ExecutionException;public class CompletableFutureExample { public static void main(String[] args) { // 任務一:返回 "Task 1 result" CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> { try { // 模擬耗時操作 Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } return "Task 1 result"; }); // 任務二:依賴任務一,返回 "Task 2 result" + 任務一的結果 CompletableFuture<String> task2 = task1.handle((result1, throwable) -> { try { // 模擬耗時操作 Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } return "Task 2 result " + result1; }); // 任務三:和任務一、任務二并行執行,返回 "Task 3 result" CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> { try { // 模擬耗時操作 Thread.sleep(800); // 任務三可能比任務二先完成 } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } return "Task 3 result"; }); // 任務四:依賴任務二和任務三,等待它們都完成后執行,返回 "Task 4 result" + 任務二和任務三的結果 CompletableFuture<String> task4 = CompletableFuture.allOf(task2, task3).handle((res, throwable) -> { try { // 這里不需要顯式等待,因為 allOf 已經保證了它們完成 return "Task 4 result with " + task2.get() + " and " + task3.get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } }); // 獲取任務四的結果并打印 String finalResult = task4.join(); System.out.println(finalResult); }}
本文鏈接:http://www.tebozhan.com/showinfo-26-112712-0.html面試官:如何實現線程池任務編排?
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com