最近在學習Netty相關的知識,在看到Netty FastThreadLocal章節(jié)中,回想起一起線上詭異問題。
問題描述:外銷業(yè)務獲取用戶信息判斷是否支持https場景下,獲取的用戶信息有時候竟然是錯亂的。
問題分析:使用ThreadLocal保存用戶信息時,未能及時進行remove()操作,而Tomcat工作線程是基于線程池的,會出現(xiàn)線程重用情況,所以獲取的用戶信息可能是之前線程遺留下來的。
問題修復:ThreadLocal使用完之后及時remove()、ThreadLocal使用之前也進行remove()雙重保險操作。
接下來,我們繼續(xù)深入了解下JDK ThreadLocal和Netty FastThreadLocal吧。
ThreadLocal是JDK提供的一個方便對象在本線程內(nèi)不同方法中傳遞、獲取的類。用它定義的變量,僅在本線程中可見,不受其他線程的影響,與其他線程相互隔離。
那具體是如何實現(xiàn)的呢?如圖1所示,每個線程都會有個ThreadLocalMap實例變量,其采用懶加載的方式進行創(chuàng)建,當線程第一次訪問此變量時才會去創(chuàng)建。
ThreadLocalMap使用線性探測法存儲ThreadLocal對象及其維護的數(shù)據(jù),具體操作邏輯如下:
假設有一個新的ThreadLocal對象,通過hash計算它應存儲的位置下標為x。
此時發(fā)現(xiàn)下標x對應位置已經(jīng)存儲了其他的ThreadLocal對象,則它會往后尋找,步長為1,下標變更為x+1。
接下來發(fā)現(xiàn)下標x+1對應位置也已經(jīng)存儲了其他的ThreadLocal對象,同理則它會繼續(xù)往后尋找,下標變更為x+2。
直到尋找到下標為x+3時發(fā)現(xiàn)是空閑的,然后將該ThreadLocal對象及其維護的數(shù)據(jù)構(gòu)建一個entry對象存儲在x+3位置。
在ThreadLocalMap中數(shù)據(jù)很多的情況下,很容易出現(xiàn)hash沖突,解決沖突需要不斷的向下遍歷,該操作的時間復雜度為O(n),效率較低。
從下面的代碼中可以看出:
Entry 的 key 是弱引用,value 是強引用。在 JVM 垃圾回收時,只要發(fā)現(xiàn)弱引用的對象,不管內(nèi)存是否充足,都會被回收。
但是當 ThreadLocal 不再使用被 GC 回收后,ThreadLocalMap 中可能出現(xiàn) Entry 的 key 為 NULL,那么 Entry 的 value 一直會強引用數(shù)據(jù)而得不到釋放,只能等待線程銷毀,從而造成內(nèi)存泄漏。
static class ThreadLocalMap { // 弱引用,在資源緊張的時候可以回收部分不再引用的ThreadLocal變量 static class Entry extends WeakReference<ThreadLocal<?>> { // 當前ThreadLocal對象所維護的數(shù)據(jù) Object value; Entry(ThreadLocal<?> k, Object v) { super(k); value = v; } } // 省略其他代碼}
綜上所述,既然JDK提供的ThreadLocal可能存在效率較低和內(nèi)存泄漏的問題,為啥不做相應的優(yōu)化和改造呢?
1.從ThreadLocal類注釋看,它是JDK1.2版本引入的,早期可能不太關注程序的性能。
2.大部分多線程場景下,線程中的ThreadLocal變量較少,因此出現(xiàn)hash沖突的概率相對較小,及時偶爾出現(xiàn)了hash沖突,對程序的性能影響也相對較小。
3.對于內(nèi)存泄漏問題,ThreadLocal本身已經(jīng)做了一定的保護措施。作為使用者,在線程中某個ThreadLocal對象不再使用或出現(xiàn)異常時,立即調(diào)用 remove() 方法刪除 Entry 對象,養(yǎng)成良好的編碼習慣。
FastThreadLocal是Netty中對JDK提供的ThreadLocal優(yōu)化改造版本,從名稱上來看,它應該比ThreadLocal更快了,以應對Netty處理并發(fā)量大、數(shù)據(jù)吞吐量大的場景。
那具體是如何實現(xiàn)的呢?如圖2所示,每個線程都會有個InternalThreadLocalMap實例變量。
每個FastThreadLocal實例創(chuàng)建時,都會采用AtomicInteger保證順序遞增生成一個不重復的下標index,它是該FastThreadLocal對象維護的數(shù)據(jù)應該存儲的位置。
讀寫數(shù)據(jù)的時候通過FastThreadLocal的下標 index 直接定位到該FastThreadLocal的位置,時間復雜度為 O(1),效率較高。
如果該下標index遞增到特別大,InternalThreadLocalMap維護的數(shù)組也會特別大,所以FastThreadLocal是通過空間換時間來提升讀寫性能的。
public class FastThreadLocal<V> { // FastThreadLocal中的index是記錄了該它維護的數(shù)據(jù)應該存儲的位置 // InternalThreadLocalMap數(shù)組中的下標, 它是在構(gòu)造函數(shù)中確定的 private final int index; public InternalThreadLocal() { index = InternalThreadLocalMap.nextVariableIndex(); } // 省略其他代碼}
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap { // 自增索引, ?于計算下次存儲到Object數(shù)組中的位置 private static final AtomicInteger nextIndex = new AtomicInteger(); private static final int ARRAY_LIST_CAPACITY_MAX_SIZE = Integer.MAX_VALUE - 8; public static int nextVariableIndex() { int index = nextIndex.getAndIncrement(); if (index >= ARRAY_LIST_CAPACITY_MAX_SIZE || index < 0) { nextIndex.set(ARRAY_LIST_CAPACITY_MAX_SIZE); throw new IllegalStateException("too many thread-local indexed variables"); } return index; } // 省略其他代碼}
上面這兩段代碼在Netty FastThreadLocal介紹中已經(jīng)講解過,這邊就不再重復介紹了。
public class FastThreadLocal<V> { // FastThreadLocal中的index是記錄了該它維護的數(shù)據(jù)應該存儲的位置 private final int index; public final V get() { // 獲取當前線程的InternalThreadLocalMap InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); // 根據(jù)當前線程的index從InternalThreadLocalMap中獲取其綁定的數(shù)據(jù) Object v = threadLocalMap.indexedVariable(index); // 如果獲取當前線程綁定的數(shù)據(jù)不為缺省值UNSET,則直接返回;否則進行初始化 if (v != InternalThreadLocalMap.UNSET) { return (V) v; } return initialize(threadLocalMap); } // 省略其他代碼}
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap { private static final int INDEXED_VARIABLE_TABLE_INITIAL_SIZE = 32; // 未賦值的Object變量(缺省值),當?個與線程綁定的值被刪除之后,會被設置為UNSET public static final Object UNSET = new Object(); // 存儲綁定到當前線程的數(shù)據(jù)的數(shù)組 private Object[] indexedVariables; // slowThreadLocalMap為JDK ThreadLocal存儲InternalThreadLocalMap private static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>(); // 從綁定到當前線程的數(shù)據(jù)的數(shù)組中取出index位置的元素 public Object indexedVariable(int index) { Object[] lookup = indexedVariables; return index < lookup.length? lookup[index] : UNSET; } public static InternalThreadLocalMap get() { Thread thread = Thread.currentThread(); // 判斷當前線程是否是FastThreadLocalThread類型 if (thread instanceof FastThreadLocalThread) { return fastGet((FastThreadLocalThread) thread); } else { return slowGet(); } } private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) { // 直接獲取當前線程的InternalThreadLocalMap InternalThreadLocalMap threadLocalMap = thread.threadLocalMap(); // 如果當前線程的InternalThreadLocalMap還未創(chuàng)建,則創(chuàng)建并賦值 if (threadLocalMap == null) { thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap()); } return threadLocalMap; } private static InternalThreadLocalMap slowGet() { // 使用JDK ThreadLocal獲取InternalThreadLocalMap InternalThreadLocalMap ret = slowThreadLocalMap.get(); if (ret == null) { ret = new InternalThreadLocalMap(); slowThreadLocalMap.set(ret); } return ret; } private InternalThreadLocalMap() { indexedVariables = newIndexedVariableTable(); } // 初始化一個32位長度的Object數(shù)組,并將其元素全部設置為缺省值UNSET private static Object[] newIndexedVariableTable() { Object[] array = new Object[INDEXED_VARIABLE_TABLE_INITIAL_SIZE]; Arrays.fill(array, UNSET); return array; } // 省略其他代碼}
源碼中 get() 方法主要分為下面3個步驟處理:
通過InternalThreadLocalMap.get()方法獲取當前線程的InternalThreadLocalMap。根據(jù)當前線程的index 從InternalThreadLocalMap中獲取其綁定的數(shù)據(jù)。如果不是缺省值UNSET,直接返回;如果是缺省值,則執(zhí)行initialize方法進行初始化。
下面我們繼續(xù)分析一下
InternalThreadLocalMap.get()方法的實現(xiàn)邏輯。
首先判斷當前線程是否是FastThreadLocalThread類型,如果是FastThreadLocalThread類型則直接使用fastGet方法獲取InternalThreadLocalMap,如果不是FastThreadLocalThread類型則使用slowGet方法獲取InternalThreadLocalMap兜底處理。兜底處理中的slowGet方法會退化成JDK原生的ThreadLocal獲取InternalThreadLocalMap。獲取InternalThreadLocalMap時,如果為null,則會直接創(chuàng)建一個InternalThreadLocalMap返回。其創(chuàng)建過過程中初始化一個32位長度的Object數(shù)組,并將其元素全部設置為缺省值UNSET。
public class FastThreadLocal<V> { // FastThreadLocal初始化時variablesToRemoveIndex被賦值為0 private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex(); public final void set(V value) { // 判斷value值是否是未賦值的Object變量(缺省值) if (value != InternalThreadLocalMap.UNSET) { // 獲取當前線程對應的InternalThreadLocalMap InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); // 將InternalThreadLocalMap中數(shù)據(jù)替換為新的value // 并將FastThreadLocal對象保存到待清理的Set中 setKnownNotUnset(threadLocalMap, value); } else { remove(); } } private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) { // 將InternalThreadLocalMap中數(shù)據(jù)替換為新的value if (threadLocalMap.setIndexedVariable(index, value)) { // 并將當前的FastThreadLocal對象保存到待清理的Set中 addToVariablesToRemove(threadLocalMap, this); } } private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) { // 取下標index為0的數(shù)據(jù),用于存儲待清理的FastThreadLocal對象Set集合中 Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); Set<FastThreadLocal<?>> variablesToRemove; if (v == InternalThreadLocalMap.UNSET || v == null) { // 下標index為0的數(shù)據(jù)為空,則創(chuàng)建FastThreadLocal對象Set集合 variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>()); // 將InternalThreadLocalMap中下標為0的數(shù)據(jù),設置成FastThreadLocal對象Set集合 threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove); } else { variablesToRemove = (Set<FastThreadLocal<?>>) v; } // 將FastThreadLocal對象保存到待清理的Set中 variablesToRemove.add(variable); } // 省略其他代碼}
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap { // 未賦值的Object變量(缺省值),當?個與線程綁定的值被刪除之后,會被設置為UNSET public static final Object UNSET = new Object(); // 存儲綁定到當前線程的數(shù)據(jù)的數(shù)組 private Object[] indexedVariables; // 綁定到當前線程的數(shù)據(jù)的數(shù)組能再次采用x2擴容的最大量 private static final int ARRAY_LIST_CAPACITY_EXPAND_THRESHOLD = 1 << 30; private static final int ARRAY_LIST_CAPACITY_MAX_SIZE = Integer.MAX_VALUE - 8; // 將InternalThreadLocalMap中數(shù)據(jù)替換為新的value public boolean setIndexedVariable(int index, Object value) { Object[] lookup = indexedVariables; if (index < lookup.length) { Object oldValue = lookup[index]; // 直接將數(shù)組 index 位置設置為 value,時間復雜度為 O(1) lookup[index] = value; return oldValue == UNSET; } else { // 綁定到當前線程的數(shù)據(jù)的數(shù)組需要擴容,則擴容數(shù)組并數(shù)組設置新value expandIndexedVariableTableAndSet(index, value); return true; } } private void expandIndexedVariableTableAndSet(int index, Object value) { Object[] oldArray = indexedVariables; final int oldCapacity = oldArray.length; int newCapacity; // 判斷可進行x2方式進行擴容 if (index < ARRAY_LIST_CAPACITY_EXPAND_THRESHOLD) { newCapacity = index; // 位操作,提升擴容效率 newCapacity |= newCapacity >>> 1; newCapacity |= newCapacity >>> 2; newCapacity |= newCapacity >>> 4; newCapacity |= newCapacity >>> 8; newCapacity |= newCapacity >>> 16; newCapacity ++; } else { // 不支持x2方式擴容,則設置綁定到當前線程的數(shù)據(jù)的數(shù)組容量為最大值 newCapacity = ARRAY_LIST_CAPACITY_MAX_SIZE; } // 按擴容后的大小創(chuàng)建新數(shù)組,并將老數(shù)組數(shù)據(jù)copy到新數(shù)組 Object[] newArray = Arrays.copyOf(oldArray, newCapacity); // 新數(shù)組擴容后的部分賦UNSET缺省值 Arrays.fill(newArray, oldCapacity, newArray.length, UNSET); // 新數(shù)組的index位置替換成新的value newArray[index] = value; // 綁定到當前線程的數(shù)據(jù)的數(shù)組用新數(shù)組替換 indexedVariables = newArray; } // 省略其他代碼}
源碼中 set() 方法主要分為下面3個步驟處理:
判斷value是否是缺省值UNSET,如果value不等于缺省值,則會通過InternalThreadLocalMap.get()方法獲取當前線程的InternalThreadLocalMap,具體實現(xiàn)3.2小節(jié)中g(shù)et()方法已做講解。通過FastThreadLocal中的setKnownNotUnset()方法將InternalThreadLocalMap中數(shù)據(jù)替換為新的value,并將當前的FastThreadLocal對象保存到待清理的Set中。如果等于缺省值UNSET或null(else的邏輯),會調(diào)用remove()方法,remove()具體見后面的代碼分析。
接下來我們看下
InternalThreadLocalMap.setIndexedVariable方法的實現(xiàn)邏輯。
判斷index是否超出存儲綁定到當前線程的數(shù)據(jù)的數(shù)組indexedVariables的長度,如果沒有超出,則獲取index位置的數(shù)據(jù),并將該數(shù)組index位置數(shù)據(jù)設置新value。
如果超出了,綁定到當前線程的數(shù)據(jù)的數(shù)組需要擴容,則擴容該數(shù)組并將它index位置的數(shù)據(jù)設置新value。
擴容數(shù)組以index 為基準進行擴容,將數(shù)組擴容后的容量向上取整為 2 的次冪。然后將原數(shù)組內(nèi)容拷貝到新的數(shù)組中,空余部分填充缺省值UNSET,最終把新數(shù)組賦值給 indexedVariables。
下面我們再繼續(xù)看下
FastThreadLocal.addToVariablesToRemove方法的實現(xiàn)邏輯。
1.取下標index為0的數(shù)據(jù)(用于存儲待清理的FastThreadLocal對象Set集合中),如果該數(shù)據(jù)是缺省值UNSET或null,則會創(chuàng)建FastThreadLocal對象Set集合,并將該Set集合填充到下標index為0的數(shù)組位置。
2.如果該數(shù)據(jù)不是缺省值UNSET,說明Set集合已金被填充,直接強轉(zhuǎn)獲取該Set集合。
3.最后將FastThreadLocal對象保存到待清理的Set集合中。
4.4 remove、removeAll方法
public class FastThreadLocal<V> { // FastThreadLocal初始化時variablesToRemoveIndex被賦值為0 private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex(); public final void remove() { // 獲取當前線程的InternalThreadLocalMap // 刪除當前的FastThreadLocal對象及其維護的數(shù)據(jù) remove(InternalThreadLocalMap.getIfSet()); } public final void remove(InternalThreadLocalMap threadLocalMap) { if (threadLocalMap == null) { return; } // 根據(jù)當前線程的index,并將該數(shù)組下標index位置對應的值設置為缺省值UNSET Object v = threadLocalMap.removeIndexedVariable(index); // 存儲待清理的FastThreadLocal對象Set集合中刪除當前FastThreadLocal對象 removeFromVariablesToRemove(threadLocalMap, this); if (v != InternalThreadLocalMap.UNSET) { try { // 空方法,用戶可以繼承實現(xiàn) onRemoval((V) v); } catch (Exception e) { PlatformDependent.throwException(e); } } } public static void removeAll() { InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet(); if (threadLocalMap == null) { return; } try { // 取下標index為0的數(shù)據(jù),用于存儲待清理的FastThreadLocal對象Set集合中 Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); if (v != null && v != InternalThreadLocalMap.UNSET) { @SuppressWarnings("unchecked") Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v; // 遍歷所有的FastThreadLocal對象并刪除它們以及它們維護的數(shù)據(jù) FastThreadLocal<?>[] variablesToRemoveArray = variablesToRemove.toArray(new FastThreadLocal[0]); for (FastThreadLocal<?> tlv: variablesToRemoveArray) { tlv.remove(threadLocalMap); } } } finally { // 刪除InternalThreadLocalMap中threadLocalMap和slowThreadLocalMap數(shù)據(jù) InternalThreadLocalMap.remove(); } } private static void removeFromVariablesToRemove( InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) { // 取下標index為0的數(shù)據(jù),用于存儲待清理的FastThreadLocal對象Set集合中 Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); if (v == InternalThreadLocalMap.UNSET || v == null) { return; } @SuppressWarnings("unchecked") // 存儲待清理的FastThreadLocal對象Set集合中刪除該FastThreadLocal對象 Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v; variablesToRemove.remove(variable); } // 省略其他代碼}
public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap { // 根據(jù)當前線程獲取InternalThreadLocalMap public static InternalThreadLocalMap getIfSet() { Thread thread = Thread.currentThread(); if (thread instanceof FastThreadLocalThread) { return ((FastThreadLocalThread) thread).threadLocalMap(); } return slowThreadLocalMap.get(); } // 數(shù)組下標index位置對應的值設置為缺省值UNSET public Object removeIndexedVariable(int index) { Object[] lookup = indexedVariables; if (index < lookup.length) { Object v = lookup[index]; lookup[index] = UNSET; return v; } else { return UNSET; } } // 刪除threadLocalMap和slowThreadLocalMap數(shù)據(jù) public static void remove() { Thread thread = Thread.currentThread(); if (thread instanceof FastThreadLocalThread) { ((FastThreadLocalThread) thread).setThreadLocalMap(null); } else { slowThreadLocalMap.remove(); } } // 省略其他代碼}
源碼中 remove() 方法主要分為下面2個步驟處理:
通過InternalThreadLocalMap.getIfSet()獲取當前線程的InternalThreadLocalMap。具體和3.2小節(jié)get()方法里面獲取當前線程的InternalThreadLocalMap相似,這里就不再重復介紹了。刪除當前的FastThreadLocal對象及其維護的數(shù)據(jù)。
源碼中 removeAll() 方法主要分為下面3個步驟處理:
通過InternalThreadLocalMap.getIfSet()獲取當前線程的InternalThreadLocalMap。取下標index為0的數(shù)據(jù)(用于存儲待清理的FastThreadLocal對象Set集合),然后遍歷所有的FastThreadLocal對象并刪除它們以及它們維護的數(shù)據(jù)。最后會將InternalThreadLocalMap本身從線程中移除。
那么使用ThreadLocal時最佳實踐又如何呢?
每次使用完ThreadLocal實例,在線程運行結(jié)束之前的finally代碼塊中主動調(diào)用它的remove()方法,清除Entry中的數(shù)據(jù),避免操作不當導致的內(nèi)存泄漏。
使?Netty的FastThreadLocal一定比JDK原生的ThreadLocal更快嗎?
不?定。當線程是FastThreadLocalThread,則添加、獲取FastThreadLocal所維護數(shù)據(jù)的時間復雜度是 O(1),?使?ThreadLocal可能存在哈希沖突,相對來說使?FastThreadLocal更?效。但如果是普通線程則可能更慢。
使?FastThreadLocal有哪些優(yōu)點?
正如文章開頭介紹JDK原生ThreadLocal存在的缺點,F(xiàn)astThreadLocal全部優(yōu)化了,它更?效、而且如果使?的是FastThreadLocal,它會在任務執(zhí)?完成后主動調(diào)?removeAll?法清除數(shù)據(jù),避免潛在的內(nèi)存泄露。
本文鏈接:http://www.tebozhan.com/showinfo-26-14347-0.html深入理解 Netty FastThreadLocal
聲明:本網(wǎng)頁內(nèi)容旨在傳播知識,若有侵權(quán)等問題請及時與本網(wǎng)聯(lián)系,我們將在第一時間刪除處理。郵件:2376512515@qq.com
上一篇: ?猜想生成式 AI 對軟件工程的影響