前面文章我們講解了ArrayBlockingQueue和LinkedBlockingQueue源碼,這篇文章開始講解SynchronousQueue源碼。從名字上就能看到ArrayBlockingQueue是基于數組實現的,而LinkedBlockingQueue是基于鏈表實現,而SynchronousQueue是基于什么數據結構實現的,看不來。
無論是ArrayBlockingQueue還是LinkedBlockingQueue都是起到緩沖隊列的作用,當消費者的消費速度跟不上時,任務就在隊列中堆積,需要等待消費者慢慢消費。
如果我們想要自己的任務快速執行,不要積壓在隊列中,該怎么辦? 今天的主角SynchronousQueue就派上用場了。
SynchronousQueue被稱為同步隊列,當生產者往隊列中放元素的時候,必須等待消費者把這個元素取走,否則一直阻塞。消費者取元素的時候,同理也必須等待生產者放隊列中放元素。
由于SynchronousQueue實現了BlockingQueue接口,而BlockingQueue接口中定義了幾組放數據和取數據的方法,來滿足不同的場景。
操作 | 拋出異常 | 返回特定值 | 一直阻塞 | 阻塞指定時間 |
放數據 | add() | offer() | put() | offer(e, time, unit) |
取數據(同時刪除數據) | remove() | poll() | take() | poll(time, unit) |
取數據(不刪除) | element() | peek() | 不支持 | 不支持 |
SynchronousQueue也會有針對這幾組放數據和取數據方法的具體實現。
Java線程池中的帶緩存的線程池就是基于SynchronousQueue實現的:
// 創建帶緩存的線程池ExecutorService executorService = Executors.newCachedThreadPool();
對應的源碼實現:
// 底層使用SynchronousQueue隊列處理任務public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());}
先看一下SynchronousQueue類里面有哪些屬性:
public class SynchronousQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** * 轉接器(棧和隊列的父類) */ abstract static class Transferer<E> { /** * 轉移(put和take都用這一個方法) * * @param e 元素 * @param timed 是否超時 * @param nanos 納秒 */ abstract E transfer(E e, boolean timed, long nanos); } /** * 棧實現類 */ static final class TransferStack<E> extends Transferer<E> { } /** * 隊列實現類 */ static final class TransferQueue<E> extends Transferer<E> { }}
SynchronousQueue底層是基于Transferer抽象類實現的,放數據和取數據的邏輯都耦合在transfer()方法中。而Transferer抽象類又有兩個實現類,分別是基于棧結構實現和基于隊列實現。
SynchronousQueue常用的初始化方法有兩個:
/** * 無參構造方法 */BlockingQueue<Integer> blockingQueue1 = new SynchronousQueue<>();/** * 有參構造方法,指定是否使用公平鎖(默認使用非公平鎖) */BlockingQueue<Integer> blockingQueue2 = new SynchronousQueue<>(true);
再看一下對應的源碼實現:
/** * 無參構造方法 */public SynchronousQueue() { this(false);}/** * 有參構造方法,指定是否使用公平鎖 */public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();}
可以看出SynchronousQueue的無參構造方法默認使用的非公平策略,有參構造方法可以指定使用公平策略。操作策略:
/** * 棧實現 */static final class TransferStack<E> extends Transferer<E> { /** * 頭節點(也是棧頂節點) */ volatile SNode head; /** * 棧節點類 */ static final class SNode { /** * 當前操作的線程 */ volatile Thread waiter; /** * 節點值(取數據的時候,該字段為null) */ Object item; /** * 節點模式(也叫操作類型) */ int mode; /** * 后繼節點 */ volatile SNode next; /** * 匹配到的節點 */ volatile SNode match; }}
節點模式有以下三種:
類型值 | 類型描述 | 作用 |
0 | REQUEST | 表示取數據 |
1 | DATA | 表示放數據 |
2 | FULFILLING | 表示正在執行中(比如取數據的線程正在匹配放數據的線程) |
圖片
transfer()方法中,把放數據和取數據的邏輯耦合在一塊了,邏輯有點繞,不過核心邏輯就四點,把握住就能豁然開朗。其實就是從棧頂壓入,從棧頂彈出。
詳細流程如下:
/** * 轉移(put和take都用這一個方法) * * @param e 元素(取數據的時候,元素為null) * @param timed 是否超時 * @param nanos 納秒 */E transfer(E e, boolean timed, long nanos) { SNode s = null; // 1. e為null,表示要取數據,否則是放數據 int mode = (e == null) ? REQUEST : DATA; for (; ; ) { SNode h = head; // 2. 如果本次操作跟棧頂節點模式相同(都是取數據,或者都是放數據),就把本次操作包裝成SNode,壓入棧頂 if (h == null || h.mode == mode) { if (timed && nanos <= 0) { if (h != null && h.isCancelled()) { casHead(h, h.next); } else { return null; } // 3. 把本次操作包裝成SNode,壓入棧頂,并掛起當前線程 } else if (casHead(h, s = snode(s, e, h, mode))) { // 4. 掛起當前線程 SNode m = awaitFulfill(s, timed, nanos); if (m == s) { clean(s); return null; } // 5. 當前線程被喚醒后,如果棧頂有了新節點,就刪除當前節點 if ((h = head) != null && h.next == s) { casHead(h, s.next); } return (E) ((mode == REQUEST) ? m.item : s.item); } // 6. 如果棧頂節點類型跟本次操作不同,并且模式不是FULFILLING類型 } else if (!isFulfilling(h.mode)) { if (h.isCancelled()) { casHead(h, h.next); } // 7. 把本次操作包裝成SNode(類型是FULFILLING),壓入棧頂 else if (casHead(h, s = snode(s, e, h, FULFILLING | mode))) { // 8. 使用死循環,直到匹配到對應的節點 for (; ; ) { // 9. 遍歷下個節點 SNode m = s.next; // 10. 如果節點是null,表示遍歷到末尾,設置棧頂節點是null,結束。 if (m == null) { casHead(s, null); s = null; break; } SNode mn = m.next; // 11. 如果棧頂的后繼節點跟棧頂節點匹配成功,就刪除這兩個節點,結束。 if (m.tryMatch(s)) { casHead(s, mn); return (E) ((mode == REQUEST) ? m.item : s.item); } else { // 12. 如果沒有匹配成功,就刪除棧頂的后繼節點,繼續匹配 s.casNext(m, mn); } } } } else { // 13. 如果棧頂節點類型跟本次操作不同,并且是FULFILLING類型, // 就再執行一遍上面第8步for循環中的邏輯(很少概率出現) SNode m = h.next; if (m == null) { casHead(h, null); } else { SNode mn = m.next; if (m.tryMatch(h)) { casHead(h, mn); } else { h.casNext(m, mn); } } } }}
不用關心細枝末節,把握住代碼核心邏輯即可。 再看一下第4步,掛起線程的代碼邏輯: 核心邏輯就兩條:
/** * 等待執行 * * @param s 節點 * @param timed 是否超時 * @param nanos 超時時間 */SNode awaitFulfill(SNode s, boolean timed, long nanos) { // 1. 計算超時時間 final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); // 2. 計算自旋次數 int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (; ; ) { if (w.isInterrupted()) s.tryCancel(); // 3. 如果已經匹配到其他節點,直接返回 SNode m = s.match; if (m != null) return m; if (timed) { // 4. 超時時間遞減 nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(); continue; } } // 5. 自旋次數減一 if (spins > 0) spins = shouldSpin(s) ? (spins - 1) : 0; else if (s.waiter == null) s.waiter = w; // 6. 開始掛起當前線程 else if (!timed) LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); }}
再看一下匹配節點的tryMatch()方法邏輯: 作用就是喚醒棧頂節點,并當前節點傳遞給棧頂節點。
/** * 匹配節點 * * @param s 當前節點 */boolean tryMatch(SNode s) { if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { Thread w = waiter; if (w != null) { waiter = null; // 1. 喚醒棧頂節點 LockSupport.unpark(w); } return true; } // 2. 把當前節點傳遞給棧頂節點 return match == s;}
/** * 隊列實現 */static final class TransferQueue<E> extends Transferer<E> { /** * 頭節點 */ transient volatile QNode head; /** * 尾節點 */ transient volatile QNode tail; /** * 隊列節點類 */ static final class QNode { /** * 當前操作的線程 */ volatile Thread waiter; /** * 節點值 */ volatile Object item; /** * 后繼節點 */ volatile QNode next; /** * 當前節點是否為數據節點 */ final boolean isData; }}
可以看出TransferQueue隊列是使用帶有頭尾節點的單鏈表實現的。 還有一點需要提一下,TransferQueue默認構造方法,會初始化頭尾節點,默認是空節點。
/** * TransferQueue默認的構造方法 */TransferQueue() { QNode h = new QNode(null, false); head = h; tail = h;}
隊列使用的公平策略,體現在,每次操作的時候,都是從隊尾壓入,從隊頭彈出。 詳細流程如下:
/** * 轉移(put和take都用這一個方法) * * @param e 元素(取數據的時候,元素為null) * @param timed 是否超時 * @param nanos 超時時間 */E transfer(E e, boolean timed, long nanos) { QNode s = null; // 1. e不為null,表示要放數據,否則是取數據 boolean isData = (e != null); for (; ; ) { QNode t = tail; QNode h = head; if (t == null || h == null) { continue; } // 2. 如果本次操作跟隊尾節點模式相同(都是取數據,或者都是放數據),就把本次操作包裝成QNode,壓入隊尾 if (h == t || t.isData == isData) { QNode tn = t.next; if (t != tail) { continue; } if (tn != null) { advanceTail(t, tn); continue; } if (timed && nanos <= 0) { return null; } // 3. 把本次操作包裝成QNode,壓入隊尾 if (s == null) { s = new QNode(e, isData); } if (!t.casNext(null, s)) { continue; } advanceTail(t, s); // 4. 掛起當前線程 Object x = awaitFulfill(s, e, timed, nanos); // 5. 當前線程被喚醒后,返回返回傳遞過來的節點值 if (x == s) { clean(t, s); return null; } if (!s.isOffList()) { advanceHead(t, s); if (x != null) { s.item = s; } s.waiter = null; } return (x != null) ? (E) x : e; } else { // 6. 如果本次操作跟隊尾節點模式不同,就從隊頭結點開始遍歷,找到模式相匹配的節點 QNode m = h.next; if (t != tail || m == null || h != head) { continue; } Object x = m.item; // 7. 把當前節點值e傳遞給匹配到的節點m if (isData == (x != null) || x == m || !m.casItem(x, e)) { advanceHead(h, m); continue; } // 8. 彈出隊頭節點,并喚醒節點m advanceHead(h, m); LockSupport.unpark(m.waiter); return (x != null) ? (E) x : e; } }}
看完了底層源碼,再看一下上層包裝好的工具方法。
放數據的方法有四個:
操作 | 拋出異常 | 返回特定值 | 阻塞 | 阻塞一段時間 |
放數據 | add() | offer() | put() | offer(e, time, unit) |
先看一下offer()方法源碼,其他放數據方法邏輯也是大同小異,底層都是調用的transfer()方法實現。 如果沒有匹配到合適的節點,offer()方法會直接返回false,表示插入失敗。
/** * offer方法入口 * * @param e 元素 * @return 是否插入成功 */public boolean offer(E e) { // 1. 判空,傳參不允許為null if (e == null) { throw new NullPointerException(); } // 2. 調用底層transfer方法 return transferer.transfer(e, true, 0) != null;}
再看一下另外三個添加元素方法源碼:
如果沒有匹配到合適的節點,add()方法會拋出異常,底層基于offer()實現。
/** * add方法入口 * * @param e 元素 * @return 是否添加成功 */public boolean add(E e) { if (offer(e)) { return true; } else { throw new IllegalStateException("Queue full"); }}
如果沒有匹配到合適的節點,put()方法會一直阻塞,直到有其他線程取走數據,才能添加成功。
/** * put方法入口 * * @param e 元素 */public void put(E e) throws InterruptedException { // 1. 判空,傳參不允許為null if (e == null) { throw new NullPointerException(); } // 2. 調用底層transfer方法 if (transferer.transfer(e, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); }}
再看一下offer(e, time, unit)方法源碼,如果沒有匹配到合適的節點, offer(e, time, unit)方法會阻塞一段時間,然后返回false。
/** * offer方法入口 * * @param e 元素 * @param timeout 超時時間 * @param unit 時間單位 * @return 是否添加成功 */public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { // 1. 判空,傳參不允許為null if (e == null) { throw new NullPointerException(); } // 2. 調用底層transfer方法 if (transferer.transfer(e, true, unit.toNanos(timeout)) != null) { return true; } if (!Thread.interrupted()) { return false; } throw new InterruptedException();}
彈出數據(取出數據并刪除)的方法有四個:
操作 | 拋出異常 | 返回特定值 | 阻塞 | 阻塞一段時間 |
取數據(同時刪除數據) | remove() | poll() | take() | poll(time, unit) |
看一下poll()方法源碼,其他方取數據法邏輯大同小異,底層都是調用的transfer方法實現。 poll()方法在彈出元素的時候,如果沒有匹配到合適的節點,直接返回null,表示彈出失敗。
/** * poll方法入口 */public E poll() { // 調用底層transfer方法 return transferer.transfer(null, true, 0);}
再看一下remove()方法源碼,如果沒有匹配到合適的節點,remove()會拋出異常。
/** * remove方法入口 */public E remove() { // 1. 直接調用poll方法 E x = poll(); // 2. 如果取到數據,直接返回,否則拋出異常 if (x != null) { return x; } else { throw new NoSuchElementException(); }}
再看一下take()方法源碼,如果沒有匹配到合適的節點,take()方法就一直阻塞,直到被喚醒。
/** * take方法入口 */public E take() throws InterruptedException { // 調用底層transfer方法 E e = transferer.transfer(null, false, 0); if (e != null) { return e; } Thread.interrupted(); throw new InterruptedException();}
再看一下poll(time, unit)方法源碼,如果沒有匹配到合適的節點, poll(time, unit)方法會阻塞指定時間,然后然后null。
/** * poll方法入口 * * @param timeout 超時時間 * @param unit 時間單位 * @return 元素 */public E poll(long timeout, TimeUnit unit) throws InterruptedException { // 調用底層transfer方法 E e = transferer.transfer(null, true, unit.toNanos(timeout)); if (e != null || !Thread.interrupted()) { return e; } throw new InterruptedException();}
再看一下查看數據源碼,查看數據,并不刪除數據。
操作 | 拋出異常 | 返回特定值 | 阻塞 | 阻塞一段時間 |
取數據(不刪除) | element() | peek() | 不支持 | 不支持 |
先看一下peek()方法源碼,直接返回null,SynchronousQueue不支持這種操作。
/** * peek方法入口 */public E peek() { return null;}
再看一下element()方法源碼,底層調用的也是peek()方法,也是不支持這種操作。
/** * element方法入口 */public E element() { // 1. 調用peek方法查詢數據 E x = peek(); // 2. 如果查到數據,直接返回 if (x != null) { return x; } else { // 3. 如果沒找到,則拋出異常 throw new NoSuchElementException(); }}
這篇文章講解了SynchronousQueue阻塞隊列的核心源碼,了解到SynchronousQueue隊列具有以下特點:
本文鏈接:http://www.tebozhan.com/showinfo-26-72426-0.html沒研究過SynchronousQueue源碼,就別寫精通線程池
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com