AVt天堂网 手机版,亚洲va久久久噜噜噜久久4399,天天综合亚洲色在线精品,亚洲一级Av无码毛片久久精品

當前位置:首頁 > 科技  > 軟件

深入理解 Netty FastThreadLocal

來源: 責編: 時間:2023-10-20 10:02:57 503觀看
導讀一、前言最近在學習Netty相關的知識,在看到Netty FastThreadLocal章節中,回想起一起線上詭異問題。問題描述:外銷業務獲取用戶信息判斷是否支持https場景下,獲取的用戶信息有時候竟然是錯亂的。問題分析:使用ThreadLocal保

wkO28資訊網——每日最新資訊28at.com

一、前言

最近在學習Netty相關的知識,在看到Netty FastThreadLocal章節中,回想起一起線上詭異問題。wkO28資訊網——每日最新資訊28at.com

問題描述:外銷業務獲取用戶信息判斷是否支持https場景下,獲取的用戶信息有時候竟然是錯亂的。wkO28資訊網——每日最新資訊28at.com

問題分析:使用ThreadLocal保存用戶信息時,未能及時進行remove()操作,而Tomcat工作線程是基于線程池的,會出現線程重用情況,所以獲取的用戶信息可能是之前線程遺留下來的。wkO28資訊網——每日最新資訊28at.com

問題修復:ThreadLocal使用完之后及時remove()、ThreadLocal使用之前也進行remove()雙重保險操作。wkO28資訊網——每日最新資訊28at.com

接下來,我們繼續深入了解下JDK ThreadLocal和Netty FastThreadLocal吧。wkO28資訊網——每日最新資訊28at.com

二、JDK ThreadLocal介紹

ThreadLocal是JDK提供的一個方便對象在本線程內不同方法中傳遞、獲取的類。用它定義的變量,僅在本線程中可見,不受其他線程的影響,與其他線程相互隔離wkO28資訊網——每日最新資訊28at.com

那具體是如何實現的呢?如圖1所示,每個線程都會有個ThreadLocalMap實例變量,其采用懶加載的方式進行創建,當線程第一次訪問此變量時才會去創建。wkO28資訊網——每日最新資訊28at.com

ThreadLocalMap使用線性探測法存儲ThreadLocal對象及其維護的數據,具體操作邏輯如下:wkO28資訊網——每日最新資訊28at.com

假設有一個新的ThreadLocal對象,通過hash計算它應存儲的位置下標為x。wkO28資訊網——每日最新資訊28at.com

此時發現下標x對應位置已經存儲了其他的ThreadLocal對象,則它會往后尋找,步長為1,下標變更為x+1。wkO28資訊網——每日最新資訊28at.com

接下來發現下標x+1對應位置也已經存儲了其他的ThreadLocal對象,同理則它會繼續往后尋找,下標變更為x+2。wkO28資訊網——每日最新資訊28at.com

直到尋找到下標為x+3時發現是空閑的,然后將該ThreadLocal對象及其維護的數據構建一個entry對象存儲在x+3位置。wkO28資訊網——每日最新資訊28at.com

在ThreadLocalMap中數據很多的情況下,很容易出現hash沖突,解決沖突需要不斷的向下遍歷,該操作的時間復雜度為O(n),效率較低wkO28資訊網——每日最新資訊28at.com

圖1wkO28資訊網——每日最新資訊28at.com

從下面的代碼中可以看出:wkO28資訊網——每日最新資訊28at.com

Entry 的 key 是弱引用,value 是強引用。在 JVM 垃圾回收時,只要發現弱引用的對象,不管內存是否充足,都會被回收。wkO28資訊網——每日最新資訊28at.com

但是當 ThreadLocal 不再使用被 GC 回收后,ThreadLocalMap 中可能出現 Entry 的 key 為 NULL,那么 Entry 的 value 一直會強引用數據而得不到釋放,只能等待線程銷毀,從而造成內存泄漏wkO28資訊網——每日最新資訊28at.com

static class ThreadLocalMap {    // 弱引用,在資源緊張的時候可以回收部分不再引用的ThreadLocal變量    static class Entry extends WeakReference<ThreadLocal<?>> {        // 當前ThreadLocal對象所維護的數據        Object value;        Entry(ThreadLocal<?> k, Object v) {            super(k);            value = v;        }    }    // 省略其他代碼}

綜上所述,既然JDK提供的ThreadLocal可能存在效率較低和內存泄漏的問題,為啥不做相應的優化和改造呢?wkO28資訊網——每日最新資訊28at.com

1.從ThreadLocal類注釋看,它是JDK1.2版本引入的,早期可能不太關注程序的性能。wkO28資訊網——每日最新資訊28at.com

2.大部分多線程場景下,線程中的ThreadLocal變量較少,因此出現hash沖突的概率相對較小,及時偶爾出現了hash沖突,對程序的性能影響也相對較小。wkO28資訊網——每日最新資訊28at.com

3.對于內存泄漏問題,ThreadLocal本身已經做了一定的保護措施。作為使用者,在線程中某個ThreadLocal對象不再使用或出現異常時,立即調用 remove() 方法刪除 Entry 對象,養成良好的編碼習慣。wkO28資訊網——每日最新資訊28at.com

三、Netty FastThreadLocal介紹

FastThreadLocal是Netty中對JDK提供的ThreadLocal優化改造版本,從名稱上來看,它應該比ThreadLocal更快了,以應對Netty處理并發量大、數據吞吐量大的場景。wkO28資訊網——每日最新資訊28at.com

那具體是如何實現的呢?如圖2所示,每個線程都會有個InternalThreadLocalMap實例變量。wkO28資訊網——每日最新資訊28at.com

每個FastThreadLocal實例創建時,都會采用AtomicInteger保證順序遞增生成一個不重復的下標index,它是該FastThreadLocal對象維護的數據應該存儲的位置。wkO28資訊網——每日最新資訊28at.com

讀寫數據的時候通過FastThreadLocal的下標 index 直接定位到該FastThreadLocal的位置,時間復雜度為 O(1),效率較高。wkO28資訊網——每日最新資訊28at.com

如果該下標index遞增到特別大,InternalThreadLocalMap維護的數組也會特別大,所以FastThreadLocal是通過空間換時間來提升讀寫性能的。wkO28資訊網——每日最新資訊28at.com

圖2wkO28資訊網——每日最新資訊28at.com

四、Netty FastThreadLocal源碼分析

4.1 構造方法

public class FastThreadLocal<V> {    // FastThreadLocal中的index是記錄了該它維護的數據應該存儲的位置    // InternalThreadLocalMap數組中的下標, 它是在構造函數中確定的    private final int index;    public InternalThreadLocal() {        index = InternalThreadLocalMap.nextVariableIndex();    }    // 省略其他代碼}
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {    // 自增索引, ?于計算下次存儲到Object數組中的位置    private static final AtomicInteger nextIndex = new AtomicInteger();    private static final int ARRAY_LIST_CAPACITY_MAX_SIZE = Integer.MAX_VALUE - 8;    public static int nextVariableIndex() {        int index = nextIndex.getAndIncrement();        if (index >= ARRAY_LIST_CAPACITY_MAX_SIZE || index < 0) {            nextIndex.set(ARRAY_LIST_CAPACITY_MAX_SIZE);            throw new IllegalStateException("too many thread-local indexed variables");        }        return index;    }    // 省略其他代碼}

上面這兩段代碼在Netty FastThreadLocal介紹中已經講解過,這邊就不再重復介紹了。wkO28資訊網——每日最新資訊28at.com

4.2 get 方法

public class FastThreadLocal<V> {    // FastThreadLocal中的index是記錄了該它維護的數據應該存儲的位置    private final int index;    public final V get() {        // 獲取當前線程的InternalThreadLocalMap        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();        // 根據當前線程的index從InternalThreadLocalMap中獲取其綁定的數據        Object v = threadLocalMap.indexedVariable(index);        // 如果獲取當前線程綁定的數據不為缺省值UNSET,則直接返回;否則進行初始化        if (v != InternalThreadLocalMap.UNSET) {            return (V) v;        }        return initialize(threadLocalMap);    }    // 省略其他代碼}
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {    private static final int INDEXED_VARIABLE_TABLE_INITIAL_SIZE = 32;    // 未賦值的Object變量(缺省值),當?個與線程綁定的值被刪除之后,會被設置為UNSET    public static final Object UNSET = new Object();    // 存儲綁定到當前線程的數據的數組    private Object[] indexedVariables;    // slowThreadLocalMap為JDK ThreadLocal存儲InternalThreadLocalMap    private static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap =            new ThreadLocal<InternalThreadLocalMap>();    // 從綁定到當前線程的數據的數組中取出index位置的元素    public Object indexedVariable(int index) {        Object[] lookup = indexedVariables;        return index < lookup.length? lookup[index] : UNSET;    }    public static InternalThreadLocalMap get() {        Thread thread = Thread.currentThread();        // 判斷當前線程是否是FastThreadLocalThread類型        if (thread instanceof FastThreadLocalThread) {            return fastGet((FastThreadLocalThread) thread);        } else {            return slowGet();        }    }    private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {        // 直接獲取當前線程的InternalThreadLocalMap        InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();        // 如果當前線程的InternalThreadLocalMap還未創建,則創建并賦值        if (threadLocalMap == null) {            thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());        }        return threadLocalMap;    }    private static InternalThreadLocalMap slowGet() {        // 使用JDK ThreadLocal獲取InternalThreadLocalMap        InternalThreadLocalMap ret = slowThreadLocalMap.get();        if (ret == null) {            ret = new InternalThreadLocalMap();            slowThreadLocalMap.set(ret);        }        return ret;    }    private InternalThreadLocalMap() {        indexedVariables = newIndexedVariableTable();    }    // 初始化一個32位長度的Object數組,并將其元素全部設置為缺省值UNSET    private static Object[] newIndexedVariableTable() {        Object[] array = new Object[INDEXED_VARIABLE_TABLE_INITIAL_SIZE];        Arrays.fill(array, UNSET);        return array;    }    // 省略其他代碼}

源碼中 get() 方法主要分為下面3個步驟處理:wkO28資訊網——每日最新資訊28at.com

通過InternalThreadLocalMap.get()方法獲取當前線程的InternalThreadLocalMap。根據當前線程的index 從InternalThreadLocalMap中獲取其綁定的數據。如果不是缺省值UNSET,直接返回;如果是缺省值,則執行initialize方法進行初始化。

下面我們繼續分析一下wkO28資訊網——每日最新資訊28at.com

InternalThreadLocalMap.get()方法的實現邏輯。wkO28資訊網——每日最新資訊28at.com

首先判斷當前線程是否是FastThreadLocalThread類型,如果是FastThreadLocalThread類型則直接使用fastGet方法獲取InternalThreadLocalMap,如果不是FastThreadLocalThread類型則使用slowGet方法獲取InternalThreadLocalMap兜底處理。兜底處理中的slowGet方法會退化成JDK原生的ThreadLocal獲取InternalThreadLocalMap。獲取InternalThreadLocalMap時,如果為null,則會直接創建一個InternalThreadLocalMap返回。其創建過過程中初始化一個32位長度的Object數組,并將其元素全部設置為缺省值UNSET。

4.3 set 方法

public class FastThreadLocal<V> {    // FastThreadLocal初始化時variablesToRemoveIndex被賦值為0    private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();    public final void set(V value) {        // 判斷value值是否是未賦值的Object變量(缺省值)        if (value != InternalThreadLocalMap.UNSET) {            // 獲取當前線程對應的InternalThreadLocalMap            InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();            // 將InternalThreadLocalMap中數據替換為新的value            // 并將FastThreadLocal對象保存到待清理的Set中            setKnownNotUnset(threadLocalMap, value);        } else {            remove();        }    }    private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {        // 將InternalThreadLocalMap中數據替換為新的value        if (threadLocalMap.setIndexedVariable(index, value)) {            // 并將當前的FastThreadLocal對象保存到待清理的Set中            addToVariablesToRemove(threadLocalMap, this);        }    }    private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {        // 取下標index為0的數據,用于存儲待清理的FastThreadLocal對象Set集合中        Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);        Set<FastThreadLocal<?>> variablesToRemove;        if (v == InternalThreadLocalMap.UNSET || v == null) {            // 下標index為0的數據為空,則創建FastThreadLocal對象Set集合            variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());            // 將InternalThreadLocalMap中下標為0的數據,設置成FastThreadLocal對象Set集合            threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);        } else {            variablesToRemove = (Set<FastThreadLocal<?>>) v;        }        // 將FastThreadLocal對象保存到待清理的Set中        variablesToRemove.add(variable);    }    // 省略其他代碼}
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {    // 未賦值的Object變量(缺省值),當?個與線程綁定的值被刪除之后,會被設置為UNSET    public static final Object UNSET = new Object();    // 存儲綁定到當前線程的數據的數組    private Object[] indexedVariables;    // 綁定到當前線程的數據的數組能再次采用x2擴容的最大量    private static final int ARRAY_LIST_CAPACITY_EXPAND_THRESHOLD = 1 << 30;    private static final int ARRAY_LIST_CAPACITY_MAX_SIZE = Integer.MAX_VALUE - 8;    // 將InternalThreadLocalMap中數據替換為新的value    public boolean setIndexedVariable(int index, Object value) {        Object[] lookup = indexedVariables;        if (index < lookup.length) {            Object oldValue = lookup[index];            // 直接將數組 index 位置設置為 value,時間復雜度為 O(1)            lookup[index] = value;            return oldValue == UNSET;        } else { // 綁定到當前線程的數據的數組需要擴容,則擴容數組并數組設置新value            expandIndexedVariableTableAndSet(index, value);            return true;        }    }    private void expandIndexedVariableTableAndSet(int index, Object value) {        Object[] oldArray = indexedVariables;        final int oldCapacity = oldArray.length;        int newCapacity;        // 判斷可進行x2方式進行擴容        if (index < ARRAY_LIST_CAPACITY_EXPAND_THRESHOLD) {            newCapacity = index;            // 位操作,提升擴容效率            newCapacity |= newCapacity >>>  1;            newCapacity |= newCapacity >>>  2;            newCapacity |= newCapacity >>>  4;            newCapacity |= newCapacity >>>  8;            newCapacity |= newCapacity >>> 16;            newCapacity ++;        } else { // 不支持x2方式擴容,則設置綁定到當前線程的數據的數組容量為最大值            newCapacity = ARRAY_LIST_CAPACITY_MAX_SIZE;        }        // 按擴容后的大小創建新數組,并將老數組數據copy到新數組        Object[] newArray = Arrays.copyOf(oldArray, newCapacity);        // 新數組擴容后的部分賦UNSET缺省值        Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);        // 新數組的index位置替換成新的value        newArray[index] = value;        // 綁定到當前線程的數據的數組用新數組替換        indexedVariables = newArray;    }    // 省略其他代碼}

源碼中 set() 方法主要分為下面3個步驟處理:wkO28資訊網——每日最新資訊28at.com

判斷value是否是缺省值UNSET,如果value不等于缺省值,則會通過InternalThreadLocalMap.get()方法獲取當前線程的InternalThreadLocalMap,具體實現3.2小節中get()方法已做講解。通過FastThreadLocal中的setKnownNotUnset()方法將InternalThreadLocalMap中數據替換為新的value,并將當前的FastThreadLocal對象保存到待清理的Set中。如果等于缺省值UNSET或null(else的邏輯),會調用remove()方法,remove()具體見后面的代碼分析。

接下來我們看下wkO28資訊網——每日最新資訊28at.com

InternalThreadLocalMap.setIndexedVariable方法的實現邏輯。wkO28資訊網——每日最新資訊28at.com

判斷index是否超出存儲綁定到當前線程的數據的數組indexedVariables的長度,如果沒有超出,則獲取index位置的數據,并將該數組index位置數據設置新value。wkO28資訊網——每日最新資訊28at.com

如果超出了,綁定到當前線程的數據的數組需要擴容,則擴容該數組并將它index位置的數據設置新value。wkO28資訊網——每日最新資訊28at.com

擴容數組以index 為基準進行擴容,將數組擴容后的容量向上取整為 2 的次冪。然后將原數組內容拷貝到新的數組中,空余部分填充缺省值UNSET,最終把新數組賦值給 indexedVariables。wkO28資訊網——每日最新資訊28at.com

下面我們再繼續看下wkO28資訊網——每日最新資訊28at.com

FastThreadLocal.addToVariablesToRemove方法的實現邏輯。wkO28資訊網——每日最新資訊28at.com

1.取下標index為0的數據(用于存儲待清理的FastThreadLocal對象Set集合中),如果該數據是缺省值UNSET或null,則會創建FastThreadLocal對象Set集合,并將該Set集合填充到下標index為0的數組位置。wkO28資訊網——每日最新資訊28at.com

2.如果該數據不是缺省值UNSET,說明Set集合已金被填充,直接強轉獲取該Set集合。wkO28資訊網——每日最新資訊28at.com

3.最后將FastThreadLocal對象保存到待清理的Set集合中。wkO28資訊網——每日最新資訊28at.com

4.4 remove、removeAll方法wkO28資訊網——每日最新資訊28at.com

public class FastThreadLocal<V> {    // FastThreadLocal初始化時variablesToRemoveIndex被賦值為0    private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();    public final void remove() {        // 獲取當前線程的InternalThreadLocalMap        // 刪除當前的FastThreadLocal對象及其維護的數據        remove(InternalThreadLocalMap.getIfSet());    }    public final void remove(InternalThreadLocalMap threadLocalMap) {        if (threadLocalMap == null) {            return;        }        // 根據當前線程的index,并將該數組下標index位置對應的值設置為缺省值UNSET        Object v = threadLocalMap.removeIndexedVariable(index);        // 存儲待清理的FastThreadLocal對象Set集合中刪除當前FastThreadLocal對象        removeFromVariablesToRemove(threadLocalMap, this);        if (v != InternalThreadLocalMap.UNSET) {            try {                // 空方法,用戶可以繼承實現                onRemoval((V) v);            } catch (Exception e) {                PlatformDependent.throwException(e);            }        }    }    public static void removeAll() {        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();        if (threadLocalMap == null) {            return;        }        try {            // 取下標index為0的數據,用于存儲待清理的FastThreadLocal對象Set集合中            Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);            if (v != null && v != InternalThreadLocalMap.UNSET) {                @SuppressWarnings("unchecked")                Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;                // 遍歷所有的FastThreadLocal對象并刪除它們以及它們維護的數據                FastThreadLocal<?>[] variablesToRemoveArray =                        variablesToRemove.toArray(new FastThreadLocal[0]);                for (FastThreadLocal<?> tlv: variablesToRemoveArray) {                    tlv.remove(threadLocalMap);                }            }        } finally {            // 刪除InternalThreadLocalMap中threadLocalMap和slowThreadLocalMap數據            InternalThreadLocalMap.remove();        }    }    private static void removeFromVariablesToRemove(            InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {        // 取下標index為0的數據,用于存儲待清理的FastThreadLocal對象Set集合中        Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);         if (v == InternalThreadLocalMap.UNSET || v == null) {            return;        }        @SuppressWarnings("unchecked")        // 存儲待清理的FastThreadLocal對象Set集合中刪除該FastThreadLocal對象        Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;        variablesToRemove.remove(variable);    }     // 省略其他代碼}
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {    // 根據當前線程獲取InternalThreadLocalMap       public static InternalThreadLocalMap getIfSet() {        Thread thread = Thread.currentThread();        if (thread instanceof FastThreadLocalThread) {            return ((FastThreadLocalThread) thread).threadLocalMap();        }        return slowThreadLocalMap.get();    }    // 數組下標index位置對應的值設置為缺省值UNSET    public Object removeIndexedVariable(int index) {        Object[] lookup = indexedVariables;        if (index < lookup.length) {            Object v = lookup[index];            lookup[index] = UNSET;            return v;        } else {            return UNSET;        }    }    // 刪除threadLocalMap和slowThreadLocalMap數據    public static void remove() {        Thread thread = Thread.currentThread();        if (thread instanceof FastThreadLocalThread) {            ((FastThreadLocalThread) thread).setThreadLocalMap(null);        } else {            slowThreadLocalMap.remove();        }    }    // 省略其他代碼}

源碼中 remove() 方法主要分為下面2個步驟處理:wkO28資訊網——每日最新資訊28at.com

通過InternalThreadLocalMap.getIfSet()獲取當前線程的InternalThreadLocalMap。具體和3.2小節get()方法里面獲取當前線程的InternalThreadLocalMap相似,這里就不再重復介紹了。刪除當前的FastThreadLocal對象及其維護的數據。

源碼中 removeAll() 方法主要分為下面3個步驟處理:wkO28資訊網——每日最新資訊28at.com

通過InternalThreadLocalMap.getIfSet()獲取當前線程的InternalThreadLocalMap。取下標index為0的數據(用于存儲待清理的FastThreadLocal對象Set集合),然后遍歷所有的FastThreadLocal對象并刪除它們以及它們維護的數據。最后會將InternalThreadLocalMap本身從線程中移除。

五、總結

那么使用ThreadLocal時最佳實踐又如何呢?wkO28資訊網——每日最新資訊28at.com

 每次使用完ThreadLocal實例,在線程運行結束之前的finally代碼塊中主動調用它的remove()方法,清除Entry中的數據,避免操作不當導致的內存泄漏。wkO28資訊網——每日最新資訊28at.com

使?Netty的FastThreadLocal一定比JDK原生的ThreadLocal更快嗎?wkO28資訊網——每日最新資訊28at.com

不?定。當線程是FastThreadLocalThread,則添加、獲取FastThreadLocal所維護數據的時間復雜度是 O(1),?使?ThreadLocal可能存在哈希沖突,相對來說使?FastThreadLocal更?效。但如果是普通線程則可能更慢。wkO28資訊網——每日最新資訊28at.com

使?FastThreadLocal有哪些優點?wkO28資訊網——每日最新資訊28at.com

正如文章開頭介紹JDK原生ThreadLocal存在的缺點,FastThreadLocal全部優化了,它更?效、而且如果使?的是FastThreadLocal,它會在任務執?完成后主動調?removeAll?法清除數據,避免潛在的內存泄露。wkO28資訊網——每日最新資訊28at.com

本文鏈接:http://www.tebozhan.com/showinfo-26-14343-0.html深入理解 Netty FastThreadLocal

聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: ?猜想生成式 AI 對軟件工程的影響

下一篇: 圖形編輯器開發:實現縮放圖形

標簽:
  • 熱門焦點
Top