本篇內容基本已經涵蓋了AQS的全部核心內容,本篇相比于上一篇補充了“中斷”。
實現鎖應該考慮的問題:
由此可以得出實現一把鎖,應該具備哪些邏輯:
帶著上面的思考,我們來看看AQS是怎么處理的。
在最早期java中的同步機制是通過關鍵字synchronized實現,這個鎖是java原生的,jvm層面實現的。在1.6之前synchronized的性能比較低,是一把純重量級鎖。
后來,Doug Lea開發并引入了java.util.concurrent包,這個包基本涵蓋了java并發操作的半壁江山,該包內的并發工具類基本是以AQS為基礎的,AQS提高了同步操作的性能,在性能上遠超當時的synchronized,后來synchronized做了優化,java1.6及之后兩者的性能就差不多了。
AQS的全稱為AbstractQueuedSynchronizer。
AQS其實是一個抽象類,它實現了線程掛起的邏輯,實現了線程存儲機制,實現了鎖的狀態邏輯,實現了線程喚醒的邏輯,卻只定義了線程搶鎖和釋放鎖的抽象,這樣做的目的是將搶鎖和釋放鎖的邏輯交給子類來實現,這樣有助于實現各種不同特性的鎖,比如共享鎖,獨占鎖,公平鎖,非公平鎖,可重入等。并且以模板方法模式將上述上鎖流程和釋放鎖流程封裝為固定模板方法。所以AQS就是一個多線程訪問共享資源的同步器框架。
AQS實現同步機制有兩種模式,一種是獨占模式,一種是共享模式。兩種模式分別提供提供兩個模板方法實現。四個模板方法為acquire,release,acquireShared,releaseShared。
接下來分別介紹這四個方法的邏輯。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}
acquire方法是獨占模式上鎖的整個邏輯,這個方法是一個模板方法,其中的tryAcquire是獲取鎖的邏輯,這個方法是一個抽象方法,由具體的子類實現,如何獲取鎖,怎樣才算獲取到鎖這些問題子類自己決定,AQS不做處理。
addWaiter方法負責是線程存儲的邏輯,aqs里面存儲機制的核心是兩個隊列,等待隊列和條件隊列,它們用來保存被阻塞的線程,在這個方法中通過cas+自旋的方式將線程添加到等待隊列中。
先來介紹等待隊列,等待隊列的結構如下:
等待隊列是一個雙向鏈表,每個節點就是一個node對象,node是aqs類中的一個靜態內部類,它的屬性如下:
node{thread;prev;next;nextWaiter;waitStatus;}。
(1) thread是當前node節點所綁定的線程;
(2) prev是前置節點的引用;
(3) next是后置節點的引用;
(4) nextWaiter如果是等待隊列節點就標示獨占模式節點還是共享模式,如果是條件隊列節點就作為后置節點指針;
(5) waitStatus是節點的狀態,其狀態值如下:
AQS類自身也有幾個比較重要的屬性:
//正在持有鎖的線程private transient Thread exclusiveOwnerThread;//等待隊列的頭節點private transient volatile Node head;//等待隊列的尾節點private transient volatile Node tail;//鎖標識字段private volatile int state;
了解了等待隊列,接下來具體看看addWaiter方法的邏輯。
(1) 首先如果隊列還沒有初始化會先初始化隊列,初始化就是先創建一個空的node節點,把aqs里面的head和tail屬性指向這個空的node,初始化完成;
(2) 先創建一個node節點,默認屬性如下:
node{ thread=當前線程t1;prev;next;nextWaiter=獨占模式;waitStatus=0}
開始入隊操作,入隊就是cas+自旋的方式將tail指針指向新加入的node節點,并且把新加入的node和head建立雙向指針。
cas是保證原子性的,多線程操作的情況下,當前線程可能會操作失敗,自旋是為了失敗重試,保證一定能夠入隊成功。
入隊成功后,就要掛起線程了,acquireQueued方法就是掛起操作。
這個方法比較核心,線程掛起的邏輯和線程喚醒后的邏輯都在此方法中,源碼如下:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
邏輯解析:
if (p == head && tryAcquire(arg))
上面介紹了等待隊列,等待隊列的head節點永遠是一個不綁定線程的節點,所以拿到前置節點后判斷是否為head節點,如果為head節點才有資格再次獲取鎖,可以發現如果隊列中已經有其他線程處于阻塞等待狀態,新入隊線程是在這個判斷中永遠會返回fasle。
這個判斷加在這里有什么用處呢?
有兩個用處:第一個是入隊后掛起前這個時間段中,可能鎖已經被釋放了,所以這里再次嘗試獲取鎖,這樣就不用阻塞掛起了;第二個用處是,這個判斷處于循環中,阻塞掛起的動作也是在循環中,當被喚醒后,線程會從被掛起的點繼續運行,會再次進入這個判斷,從而實現被喚醒的線程再次嘗試換取鎖的邏輯。
(3) 如果沒有獲取到鎖,那接下來就會進入這個方法shouldParkAfterFailedAcquire,這個方法的源碼如下::
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true; if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
代碼邏輯為:
只有將當前node節點的前置節點設置為-1后,此方法才會返回true,從而會進入后面的parkAndCheckInterrupt()方法,這個方法就很簡單了,就是調用LockSupport類的park方法將線程阻塞掛起。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
為什么在阻塞前一定要將當前node節點的前置節點置為-1?
waitStatus為-1代表可喚醒狀態,獨占模式下,AQS在喚醒被阻塞線程的時候,總是通過判斷head節點的waitStatus狀態,如果為可喚醒狀態代表head后面的節點可以被喚醒,否則不允許喚醒。
這樣做的好處是,當head節點后面線程獲取到鎖并出隊后,可以直接將head指針移動到第一個線程節點,然后將此節點上的前置指針刪除,將線程屬性刪除,作為新的head節點。
當線程調用park方法后,線程就阻塞在這里,當被喚醒后,線程也是從這個點繼續往下進行,此時依然處在循環中,這個時候會開始新一輪循環,從而再次進入嘗試獲取鎖的判斷,如果獲取到鎖,就出隊,否則再次進入阻塞掛起的方法進行掛起操作。
這里的設計是先搶鎖,搶到鎖后再出隊,避免在沒有搶到鎖的情況下不用再次入隊造成的時間消耗。
//獨占模式的鎖調用的釋放鎖邏輯 public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false;}
這個方法也是一個模板方法,tryRelease是釋放鎖的方法,它是抽象方法,具體由子類來實現。
釋放成功后就要喚醒被阻塞的線程,核心邏輯在下面這個方法中,源碼如下:
private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
先看下整體邏輯,這兩段代碼的邏輯其實很簡單:
邏輯很容能看懂,但是這里有個問題,為什么前面有這段代碼:
if (h != null && h.waitStatus != 0) unparkSuccessor(h);
后面unparkSuccessor方法又有這一段代碼:
if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
不難看出邏輯是waitStatus不為0進入unparkSuccessor方法,進入方法馬上把waitStatus改為0,這是在阻止后續的線程再進來。
那真正的用意是什么呢?
通過上面代碼可以知道釋放鎖邏輯和喚醒邏輯是分開的,看下面的時間軸:
上面這個場景是非公平鎖的場景,公平鎖說的是所有線程都要按照順序排隊獲取鎖,而非公平鎖說的是新進來的線程可以和剛被喚醒的線程搶鎖。
在非公平鎖的場景中,如果代碼塊中的邏輯執行的足夠快就有可能發生上面的情況,線程1和線程2都是都去喚醒同一個線程,所以這里通過將head節點的waitStatus改為0的方式將其他線程拒之門外,這樣就保證在head節點后面的線程只會由一個線程去喚醒。
//共享模式的鎖調用的上鎖邏輯 public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg);}
此方法同樣是一個模板方法,tryAcquireShared方法是抽象方法,供子類實現搶鎖的邏輯,doAcquireShared方法則是實現阻塞掛起和入隊,doAcquireShared方法源碼如下:
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
通過源碼會發現doAcquireShared這個方法合并了入隊和掛起兩個步驟,整體的邏輯基本和獨占模式一樣,接下來只介紹不同的地方。
第一個不同,入隊的時候創建的node節點為共享模式節點,即nextWaiter屬性的值不同。
第二個不同,獨占模式下線程被喚醒重新獲取到鎖后,就要出隊了,而共享模式下除了出隊,還會判斷是否資源充足,如果充足就喚醒下一個節點。
//共享模式的鎖調用的釋放鎖的邏輯 public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false;}
同樣此方法也是模板方法,tryReleaseShared方法是交給子類實現的釋放鎖的邏輯,doReleaseShared方法則是aqs自己實現的喚醒邏輯,喚醒邏輯和獨占模式下的喚醒邏輯大同小異,都是喚醒head節點的下一個節點綁定的線程,不再過多贅述。
總結一下獨占和共享模式在aqs中實現的最大不同是被喚醒的線程出隊后會在資源充足的情況下順便喚醒其后面節點的線程。
上面說過,AQS有兩個隊列,等待隊列和條件隊列,上面介紹了等待隊列,但是條件隊列一直未提,那么條件隊列是做什么的呢?
AQS內部有一個內部類ConditionObject,其內部維護了一個單向鏈表(先進先出),這個內部類內有兩個屬性:firstWaiter和lastWaiter分別指向單向鏈表的頭結點和尾節點,這個單向鏈表就是條件隊列,和等待隊列的不同處是它的頭節點是綁定線程的,條件隊列的結構如下
這個內部類主要的方法是如下三個,這里直接說每個方法的底層邏輯,源碼就不展示了,可以自己去查閱源碼。
首先先說下Condition整體的思維邏輯:
await()的邏輯:
signal()的邏輯:
條件達成后換隊的意思就是將條件隊里的頭節點移動到獨占模式的等待隊列中去,入隊的方式和獨占模式下入隊方式一樣,入隊之后會將當前節點的前一個節點的waitStatus置為-1,代表可喚醒。
signalAll()的邏輯:
看的出來Condition中的條件隊列依賴等待隊列,具體使用可以參考ReentrantLock。你會發現在ReentrantLock鎖里面使用Condition,就相當于在synchronized代碼塊中使用object類的wait方法和nottfyf。
為了更好的理解Condition,一起看下ArrayBlockingQueue的實現,它是一個數組實現的先進先出的有界阻塞隊列,隊列滿,入隊者等待,隊列空,出隊者等待。
這個隊列有兩個重要的特點:先進先出和隊列有界。
為保證先進先出,需要加鎖處理,獲取到鎖的線程才有資格向隊列中放數據或者取出數據。
那如何保證隊列有界的情況下等待處理呢?這個時候就用到Condition了,它的邏輯是這樣的,所有想向隊列添加數據的和所有想從隊列取數據的線程一起競爭鎖,拿到鎖的那個線程才有資格操作,ArrayBlockingQueue維護里兩個Condition對象,也就相當于維護兩個條件隊列,如果是添加數據的某個線程搶到了鎖,在操作添加的時候,發現隊列已滿,此時該線程無法將數據插進去,需要等待有一個數據被取走后才能做添加操作,但是該線程占有鎖資源,取數據的線程進不來,所以就無法進行下去,ArrayBlockingQueue的做法是將該線程放入條件隊列阻塞掛起,等到有一個數據被取走后,再把條件隊列中的掛起的線程搬運到鎖的等待隊里上去,從而再次獲取排隊搶鎖的資格。
之所以維護兩個Condition條件隊列是為了將添加數據的線程和取數據的線程分開,根據不同的條件操作不同的條件隊列。
有沒有發現,這不就是synchronized代碼塊中的object類的wait方法嗎?
但是不同點是調用object類的wait方法阻塞的線程,要么只有一個被釋放,要么全部釋放。
而Condition就不同了,因為你可以聲明多個Condition對象,將不同條件下阻塞的線程放入不同的Condition對象,釋放的時候也按照條件釋放,這就真正意義上實現了按條件釋放。
我說的釋放是重新獲取排隊搶奪資源的資格。
不可中斷說的是阻塞狀態不能被終止。
我們知道synchronized是不可中斷的鎖,當線程因為競爭資源失敗而進入阻塞狀態后,唯一能讓該線程結束阻塞的方式就是持有鎖資源的線程處理完成后,被阻塞的線程被喚醒。
synchronized中的阻塞狀態不可中斷是因為線程的阻塞喚醒是由操作系統來管理,而AQS中的阻塞之所以支持中斷是因為上鎖是通過LockSupport類的park方法來實現的,當線程調用park方法阻塞后,如果調用此線程interrupt方法,阻塞狀態就會中斷,也就是阻塞中的線程會被喚醒。
但是調用acquire上鎖的時候如果沒有獲取到鎖就會被阻塞,此時如果調用被阻塞線程的interrupt方法就會喚醒這個線程,但是此時被喚醒的線程處于循環之中,會重新去搶鎖,如果獲取不到依然會再次阻塞,也就是說acquire方法中被阻塞的線程被中斷后只不過會讓線程提前加入搶鎖,但是并不會增加搶到鎖的概率,因為只有阻塞隊列的頭節點才有資格搶鎖。
這里介紹一個知識點:常見的可中斷方法sleep,wait,park方法,這三個方法都會使得線程處于靜止狀態,此時調用interrupt方法,會中斷其靜止狀態,線程從而處于重新被激活的狀態,不同的是被激活后的線程的中斷狀態是不一樣的,sleep和wait方法被激活后,線程的中斷狀態為false,而park方法被激活后,線程的中斷狀態為true,這是需要注意的。
按照上面的說法AQS雖然支持中斷,但是似乎沒什么用,其實AQS還有一個相對于acquire方法不那么常用的方法tryAcquireNanos方法。
跟一下這個方法進入doAcquireNanos方法,主要邏輯就在這個方法中,其實和tryAcquireNanos和acquire一樣,都是搶鎖,入隊,阻塞,喚醒那一套邏輯。
不同的是tryAcquireNanos方法還具備兩個技能:
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
上面的代碼可以清楚的看到阻塞操作是通過這段代碼實現:
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10));
parkNanos方法相對與park方法的區別就是parkNanos方法可以指定阻塞時間。
而下面這段代碼實現的就是阻塞被中斷的時候主動拋出InterruptedException異常,可以讓方法外部捕獲到這個異常,從而達到真正的阻塞中斷。
if (Thread.interrupted()) throw new InterruptedException();
本文鏈接:http://www.tebozhan.com/showinfo-26-15596-0.html并發編程:你真的能回答好AQS嗎(補充中斷機制)
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com