AVt天堂网 手机版,亚洲va久久久噜噜噜久久4399,天天综合亚洲色在线精品,亚洲一级Av无码毛片久久精品

當前位置:首頁 > 科技  > 軟件

獲取雙異步返回值時,如何保證主線程不阻塞?

來源: 責編: 時間:2024-01-25 10:41:34 267觀看
導讀一、前情提要在上一篇文章中,使用雙異步后,如何保證數據一致性?,通過Future獲取異步返回值,輪詢判斷Future狀態,如果執行完畢或已取消,則通過get()獲取返回值,get()是阻塞的方法,因此會阻塞當前線程,如果通過new Runnable()執行

Lsa28資訊網——每日最新資訊28at.com

一、前情提要

在上一篇文章中,使用雙異步后,如何保證數據一致性?,通過Future獲取異步返回值,輪詢判斷Future狀態,如果執行完畢或已取消,則通過get()獲取返回值,get()是阻塞的方法,因此會阻塞當前線程,如果通過new Runnable()執行get()方法,那么還是需要返回AsyncResult,然后再通過主線程去get()獲取異步線程返回結果。Lsa28資訊網——每日最新資訊28at.com

寫法很繁瑣,還會阻塞主線程。Lsa28資訊網——每日最新資訊28at.com

下面是FutureTask異步執行流程圖:Lsa28資訊網——每日最新資訊28at.com

Lsa28資訊網——每日最新資訊28at.com

二、JDK8的CompletableFuture

1、ForkJoinPool

Java8中引入了CompletableFuture,它實現了對Future的全面升級,可以通過回調的方式,獲取異步線程返回值。Lsa28資訊網——每日最新資訊28at.com

CompletableFuture的異步執行通過ForkJoinPool實現, 它使用守護線程去執行任務。Lsa28資訊網——每日最新資訊28at.com

ForkJoinPool在于可以充分利用多核CPU的優勢,把一個任務拆分成多個小任務,把多個小任務放到多個CPU上并行執行,當多個小任務執行完畢后,再將其執行結果合并起來。Lsa28資訊網——每日最新資訊28at.com

Future的異步執行是通過ThreadPoolExecutor實現的。Lsa28資訊網——每日最新資訊28at.com

Lsa28資訊網——每日最新資訊28at.com

2、從ForkJoinPool和ThreadPoolExecutor探索CompletableFuture和Future的區別

  • ForkJoinPool中的每個線程都會有一個隊列,而ThreadPoolExecutor只有一個隊列,并根據queue類型不同,細分出各種線程池;
  • ForkJoinPool在使用過程中,會創建大量的子任務,會進行大量的gc,但是ThreadPoolExecutor不需要,因為ThreadPoolExecutor是任務分配平均的;
  • ThreadPoolExecutor中每個異步線程之間是相互獨立的,當執行速度快的線程執行完畢后,它就會一直處于空閑的狀態,等待其它線程執行完畢;
  • ForkJoinPool中每個異步線程之間并不是絕對獨立的,在ForkJoinPool線程池中會維護一個隊列來存放需要執行的任務,當線程自身任務執行完畢后,它會從其它線程中獲取未執行的任務并幫助它執行,直至所有線程執行完畢。

因此,在多線程任務分配不均時,ForkJoinPool的執行效率更高。但是,如果任務分配均勻,ThreadPoolExecutor的執行效率更高,因為ForkJoinPool會創建大量子任務,并對其進行大量的GC,比較耗時。Lsa28資訊網——每日最新資訊28at.com

三、通過CompletableFuture優化 “通過Future獲取異步返回值”

1、通過Future獲取異步返回值關鍵代碼

(1)將異步方法的返回值改為Future<Integer>,將返回值放到new AsyncResult<>();中

@Async("async-executor")public void readXls(String filePath, String filename) {    try {     // 此代碼為簡化關鍵性代碼        List<Future<Integer>> futureList = new ArrayList<>();        for (int time = 0; time < times; time++) {            Future<Integer> sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsync();            futureList.add(sumFuture);        }    }catch (Exception e){        logger.error("readXlsCacheAsync---插入數據異常:",e);    }}
@Async("async-executor")public Future<Integer> readXlsCacheAsync() {    try {        // 此代碼為簡化關鍵性代碼        return new AsyncResult<>(sum);    }catch (Exception e){        return new AsyncResult<>(0);    }}

(2)通過Future<Integer>.get()獲取返回值

public static boolean getFutureResult(List<Future<Integer>> futureList, int excelRow) {    int[] futureSumArr = new int[futureList.size()];    for (int i = 0;i<futureList.size();i++) {        try {            Future<Integer> future = futureList.get(i);            while (true) {                if (future.isDone() && !future.isCancelled()) {                    Integer futureSum = future.get();                    logger.info("獲取Future返回值成功"+"----Future:" + future                            + ",Result:" + futureSum);                    futureSumArr[i] += futureSum;                    break;                } else {                    logger.info("Future正在執行---獲取Future返回值中---等待3秒");                    Thread.sleep(3000);                }            }        } catch (Exception e) {            logger.error("獲取Future返回值異常: ", e);        }    }        boolean insertFlag = getInsertSum(futureSumArr, excelRow);    logger.info("獲取所有異步線程Future的返回值成功,Excel插入結果="+insertFlag);    return insertFlag;}

2、通過CompletableFuture獲取異步返回值關鍵代碼

(1)將異步方法的返回值改為 int

@Async("async-executor")public void readXls(String filePath, String filename) { List<CompletableFuture<Integer>> completableFutureList = new ArrayList<>();    for (int time = 0; time < times; time++) {     // 此代碼為簡化關鍵性代碼        CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {         @Override         public Integer get() {             return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();         }     }).thenApply((result) -> {// 回調方法         return thenApplyTest2(result);// supplyAsync返回值 * 1     }).thenApply((result) -> {         return thenApplyTest5(result);// thenApply返回值 * 1     }).exceptionally((e) -> { // 如果執行異常:         logger.error("CompletableFuture.supplyAsync----異常:", e);         return null;     });      completableFutureList.add(completableFuture);    }}
@Async("async-executor")public int readXlsCacheAsync() {    try {        // 此代碼為簡化關鍵性代碼        return sum;    }catch (Exception e){        return -1;    }}

(2)通過completableFuture.get()獲取返回值

public static boolean getCompletableFutureResult(List<CompletableFuture<Integer>> list, int excelRow){    logger.info("通過completableFuture.get()獲取每個異步線程的插入結果----開始");    int sum = 0;    for (int i = 0; i < list.size(); i++) {        Integer result = list.get(i).get();        sum += result;    }    boolean insertFlag = excelRow == sum;    logger.info("全部執行完畢,excelRow={},入庫={}, 數據是否一致={}",excelRow,sum,insertFlag);    return insertFlag;}

3、效率對比

(1)測試環境

  • 12個邏輯處理器的電腦;
  • Excel中包含10萬條數據;
  • Future的自定義線程池,核心線程數為24;
  • ForkJoinPool的核心線程數為24;

(2)統計四種情況下10萬數據入庫時間

  • 不獲取異步返回值
  • 通過Future獲取異步返回值
  • 通過CompletableFuture獲取異步返回值,默認ForkJoinPool線程池的核心線程數為本機邏輯處理器數量,測試電腦為12;
  • 通過CompletableFuture獲取異步返回值,修改ForkJoinPool線程池的核心線程數為24。

備注:因為CompletableFuture不阻塞主線程,主線程執行時間只有2秒,表格中統計的是異步線程全部執行完成的時間。Lsa28資訊網——每日最新資訊28at.com

(3)設置核心線程數

將核心線程數CorePoolSize設置成CPU的處理器數量,是不是效率最高的?Lsa28資訊網——每日最新資訊28at.com

// 獲取CPU的處理器數量int curSystemThreads = Runtime.getRuntime().availableProcessors() * 2;// 測試電腦是24

因為在接口被調用后,開啟異步線程,執行入庫任務,因為測試機最多同時開啟24線程處理任務,故將10萬條數據拆分成等量的24份,也就是10萬/24 = 4166,那么我設置成4200,是不是效率最佳呢?Lsa28資訊網——每日最新資訊28at.com

測試的過程中發現,好像真的是這樣的。Lsa28資訊網——每日最新資訊28at.com

自定義ForkJoinPool線程池
@Autowired@Qualifier("asyncTaskExecutor")private Executor asyncTaskExecutor;@Overridepublic void readXls(String filePath, String filename) {  List<CompletableFuture<Integer>> completableFutureList = new ArrayList<>();    for (int time = 0; time < times; time++) {  CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {         @Override         public Integer get() {             try {                 return readExcelDbJdk8Service.readXlsCacheAsync(sheet, row, start, finalEnd, insertBuilder);             } catch (Exception e) {                 logger.error("CompletableFuture----readXlsCacheAsync---異常:", e);                 return -1;             }         };     },asyncTaskExecutor);      completableFutureList.add(completableFuture); } // 不會阻塞主線程    CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size()])).whenComplete((r,e) -> {        try {            int insertSum = getCompletableFutureResult(completableFutureList, excelRow);        } catch (Exception ex) {            return;        }    });}
自定義線程池
/** * 自定義異步線程池 */@Bean("asyncTaskExecutor")public AsyncTaskExecutor asyncTaskExecutor() {    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();    //設置線程名稱    executor.setThreadNamePrefix("asyncTask-Executor");    //設置最大線程數    executor.setMaxPoolSize(200);    //設置核心線程數    executor.setCorePoolSize(24);    //設置線程空閑時間,默認60    executor.setKeepAliveSeconds(200);    //設置隊列容量    executor.setQueueCapacity(50);    /**     * 當線程池的任務緩存隊列已滿并且線程池中的線程數目達到maximumPoolSize,如果還有任務到來就會采取任務拒絕策略     * 通常有以下四種策略:     * ThreadPoolExecutor.AbortPolicy:丟棄任務并拋出RejectedExecutionException異常。     * ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,但是不拋出異常。     * ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,然后重新嘗試執行任務(重復此過程)     * ThreadPoolExecutor.CallerRunsPolicy:重試添加當前的任務,自動重復調用 execute() 方法,直到成功     */    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());    executor.initialize();    return executor;}

Lsa28資訊網——每日最新資訊28at.com

(4)統計分析

效率對比:Lsa28資訊網——每日最新資訊28at.com

③通過CompletableFuture獲取異步返回值(12線程) <  ②通過Future獲取異步返回值 <  ④通過CompletableFuture獲取異步返回值(24線程) <  ①不獲取異步返回值Lsa28資訊網——每日最新資訊28at.com

不獲取異步返回值時性能最優,這不廢話嘛~Lsa28資訊網——每日最新資訊28at.com

核心線程數相同的情況下,CompletableFuture的入庫效率要優于Future的入庫效率,10萬條數據大概要快4秒鐘,這還是相當驚人的,優化的價值就在于此。Lsa28資訊網——每日最新資訊28at.com

Lsa28資訊網——每日最新資訊28at.com

四、通過CompletableFuture.allOf解決阻塞主線程問題

1、語法

CompletableFuture.allOf(CompletableFuture的可變數組).whenComplete((r,e) -> {})。Lsa28資訊網——每日最新資訊28at.com

2、代碼實例

getCompletableFutureResult方法在 “3.2.2 通過completableFuture.get()獲取返回值”。Lsa28資訊網——每日最新資訊28at.com

// 不會阻塞主線程CompletableFuture.allOf(completableFutureList.toArray(new   CompletableFuture[completableFutureList.size()])).whenComplete((r,e) -> {    logger.info("全部執行完畢,解決主線程阻塞問題~");    try {        int insertSum = getCompletableFutureResult(completableFutureList, excelRow);    } catch (Exception ex) {        logger.error("全部執行完畢,解決主線程阻塞問題,異常:", ex);        return;    }});// 會阻塞主線程//getCompletableFutureResult(completableFutureList, excelRow);logger.info("CompletableFuture----會阻塞主線程嗎?");

Lsa28資訊網——每日最新資訊28at.com

五、CompletableFuture中花俏的語法糖

1、runAsync

runAsync 方法不支持返回值。Lsa28資訊網——每日最新資訊28at.com

可以通過runAsync執行沒有返回值的異步方法。Lsa28資訊網——每日最新資訊28at.com

不會阻塞主線程。Lsa28資訊網——每日最新資訊28at.com

// 分批異步讀取Excel內容并入庫int finalEnd = end;CompletableFuture.runAsync(() -> readExcelDbJdk8Service.readXlsCacheAsyncMybatis();

2、supplyAsync

supplyAsync也可以異步處理任務,傳入的對象實現了Supplier接口。將Supplier作為參數并返回CompletableFuture結果值,這意味著它不接受任何輸入參數,而是將result作為輸出返回。Lsa28資訊網——每日最新資訊28at.com

會阻塞主線程。Lsa28資訊網——每日最新資訊28at.com

supplyAsync()方法關鍵代碼:Lsa28資訊網——每日最新資訊28at.com

int finalEnd = end;CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() {    @Override    public Integer get() {        return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();    }});
@Overridepublic int readXlsCacheAsyncMybatis() {    // 不為人知的操作    // 返回異步方法執行結果即可 return 100;}

六、順序執行異步任務

1、thenRun

thenRun()不接受參數,也沒有返回值,與runAsync()配套使用,恰到好處。Lsa28資訊網——每日最新資訊28at.com

// JDK8的CompletableFutureCompletableFuture.runAsync(() -> readExcelDbJdk8Service.readXlsCacheAsyncMybatis()).thenRun(() -> logger.info("CompletableFuture----.thenRun()方法測試"));

Lsa28資訊網——每日最新資訊28at.com

2、thenAccept

thenAccept()接受參數,沒有返回值。Lsa28資訊網——每日最新資訊28at.com

supplyAsync + thenAcceptLsa28資訊網——每日最新資訊28at.com

  • 異步線程順序執行
  • supplyAsync的異步返回值,可以作為thenAccept的參數使用
  • 不會阻塞主線程
CompletableFuture.supplyAsync(new Supplier<Integer>() {    @Override    public Integer get() {        return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();    }}).thenAccept(x -> logger.info(".thenAccept()方法測試:" + x));

Lsa28資訊網——每日最新資訊28at.com

但是,此時無法通過completableFuture.get()獲取supplyAsync的返回值了。Lsa28資訊網——每日最新資訊28at.com

3、thenApply

thenApply在thenAccept的基礎上,可以再次通過completableFuture.get()獲取返回值。Lsa28資訊網——每日最新資訊28at.com

supplyAsync + thenApply,典型的鏈式編程。Lsa28資訊網——每日最新資訊28at.com

  • 異步線程內方法順序執行。
  • supplyAsync 的返回值,作為第 1 個thenApply的參數,進行業務處理。
  • 第 1 個thenApply的返回值,作為第 2 個thenApply的參數,進行業務處理。
  • 最后,通過future.get()方法獲取最終的返回值。
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(new Supplier<Integer>() { @Override    public Integer get() {        return readExcelDbJdk8Service.readXlsCacheAsyncMybatis();    }}).thenApply((result) -> {    return thenApplyTest2(result);// supplyAsync返回值 * 2}).thenApply((result) -> {    return thenApplyTest5(result);// thenApply返回值 * 5});logger.info("readXlsCacheAsyncMybatis插入數據 * 2 * 5 = " + completableFuture.get());

Lsa28資訊網——每日最新資訊28at.com

七、CompletableFuture合并任務

  • thenCombine,多個異步任務并行處理,有返回值,最后合并結果返回新的CompletableFuture對象。
  • thenAcceptBoth,多個異步任務并行處理,無返回值。
  • acceptEither,多個異步任務并行處理,無返回值。
  • applyToEither,,多個異步任務并行處理,有返回值。

CompletableFuture合并任務的代碼實例,這里就不多贅述了,一些語法糖而已,大家切記陷入低水平勤奮的怪圈。Lsa28資訊網——每日最新資訊28at.com

八、CompletableFuture VS Future總結

本文中以下幾個方面對比了CompletableFuture和Future的差異:Lsa28資訊網——每日最新資訊28at.com

  • ForkJoinPool和ThreadPoolExecutor的實現原理,探索了CompletableFuture和Future的差異。
  • 通過代碼實例的形式簡單介紹了CompletableFuture中花俏的語法糖。
  • 通過CompletableFuture優化了 “通過Future獲取異步返回值”。
  • 通過CompletableFuture.allOf解決阻塞主線程問題。

Future提供了異步執行的能力,但Future.get()會通過輪詢的方式獲取異步返回值,get()方法還會阻塞主線程。Lsa28資訊網——每日最新資訊28at.com

輪詢的方式非常消耗CPU資源,阻塞的方式顯然與我們的異步初衷背道而馳。Lsa28資訊網——每日最新資訊28at.com

JDK8提供的CompletableFuture實現了Future接口,添加了很多Future不具備的功能,比如鏈式編程、異常處理回調函數、獲取異步結果不阻塞不輪詢、合并異步任務等。Lsa28資訊網——每日最新資訊28at.com

獲取異步線程結果后,我們可以通過添加事務的方式,實現Excel入庫操作的數據一致性。Lsa28資訊網——每日最新資訊28at.com

異步多線程情況下如何實現事務?Lsa28資訊網——每日最新資訊28at.com

有的小伙伴可能會說:Lsa28資訊網——每日最新資訊28at.com

這還不簡單?添加@Transactional注解,如果發生異常或入庫數據量不符,直接回滾就可以了~Lsa28資訊網——每日最新資訊28at.com

那么,真的是這樣嗎?我們下期見~Lsa28資訊網——每日最新資訊28at.com

本文鏈接:http://www.tebozhan.com/showinfo-26-67849-0.html獲取雙異步返回值時,如何保證主線程不阻塞?

聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: Node問題:如何正確安裝nvm?Mac和Win雙教程!

下一篇: lowcode-cms開源社區源碼設計分享

標簽:
  • 熱門焦點
Top