我之前在一家餐飲公司待過兩年,每天中午和晚上用餐高峰期,系統的并發量不容小覷。為了保險起見,公司規定各部門都要在吃飯的時間輪流值班,防止出現線上問題時能夠及時處理。
我當時在后廚顯示系統團隊,該系統屬于訂單的下游業務。
用戶點完菜下單后,訂單系統會通過發kafka消息給我們系統,系統讀取消息后,做業務邏輯處理,持久化訂單和菜品數據,然后展示到劃菜客戶端。
這樣廚師就知道哪個訂單要做哪些菜,有些菜做好了,就可以通過該系統出菜。系統自動通知服務員上菜,如果服務員上完菜,修改菜品上菜狀態,用戶就知道哪些菜已經上了,哪些還沒有上。這個系統可以大大提高后廚到用戶的效率。
圖片
這一切的關鍵是消息中間件:kafka,如果它出現問題,將會直接影響到后廚顯示系統的用戶功能使用。
這篇文章跟大家一起聊聊,我們當時出現過的消息積壓問題,希望對你會有所幫助。
剛開始我們的用戶量比較少,上線一段時間,mq的消息通信都沒啥問題。
隨著用戶量逐步增多,每個商家每天都會產生大量的訂單數據,每個訂單都有多個菜品,這樣導致我們劃菜系統的劃菜表的數據越來越多。
在某一天中午,收到商家投訴說用戶下單之后,在平板上出現的菜品列表有延遲。
廚房幾分鐘之后才能看到菜品。
我們馬上開始查原因。
出現這種菜品延遲的問題,必定跟kafka有關,因此,我們先查看kafka。
果然出現了消息積壓。
通常情況下,出現消息積壓的原因有:
我查了一下監控,發現我們的mq消費者,服務在正常運行,沒有異常。
剩下的原因可能是:mq消費者消費消息的速度變慢了。
接下來,我查了一下劃菜表,目前不太多只有幾十萬的數據。
看來需要優化mq消費者的處理邏輯了。
我在代碼中增加了一些日志,把mq消息者中各個關鍵節點的耗時都打印出來了。
發現有兩個地方耗時比較長:
于是,我做了有針對性的優化。
將在for循環中一個個查詢數據庫的代碼,改成通過參數集合,批量查詢數據。
有時候,我們需要從指定的用戶集合中,查詢出有哪些是在數據庫中已經存在的。
實現代碼可以這樣寫:
public List<User> queryUser(List<User> searchList) { if (CollectionUtils.isEmpty(searchList)) { return Collections.emptyList(); } List<User> result = Lists.newArrayList(); searchList.forEach(user -> result.add(userMapper.getUserById(user.getId()))); return result;}
這里如果有50個用戶,則需要循環50次,去查詢數據庫。我們都知道,每查詢一次數據庫,就是一次遠程調用。
如果查詢50次數據庫,就有50次遠程調用,這是非常耗時的操作。
那么,我們如何優化呢?
具體代碼如下:
public List<User> queryUser(List<User> searchList) { if (CollectionUtils.isEmpty(searchList)) { return Collections.emptyList(); } List<Long> ids = searchList.stream().map(User::getId).collect(Collectors.toList()); return userMapper.getUserByIds(ids);}
提供一個根據用戶id集合批量查詢用戶的接口,只遠程調用一次,就能查詢出所有的數據。
多條件查詢數據的地方,增加了一個聯合索引,解決了問題。
這樣優化之后, mq消費者處理消息的速度提升了很多,消息積壓問題被解決了。
沒想到,過了幾個月之后,又開始出現消息積壓的問題了。
但這次是偶爾會積壓,大部分情況不會。
這幾天消息的積壓時間不長,對用戶影響比較小,沒有引起商家的投訴。
我查了一下劃菜表的數據只有幾百萬。
但通過一些監控,和DBA每天發的慢查詢郵件,自己發現了異常。
我發現有些sql語句,執行的where條件是一模一樣的,只有條件后面的參數值不一樣,導致該sql語句走的索引不一樣。
比如:order_id=123走了索引a,而order_id=124走了索引b。
有張表查詢的場景有很多,當時為了滿足不同業務場景,加了多個聯合索引。
MySQL會根據下面幾個因素選擇索引:
綜合1、2、3以及其它的一些因素,MySql優化器會選出它自己認為最合適的索引。
MySQL優化器是通過采樣來預估要掃描的行數的,所謂采樣就是選擇一些數據頁來進行統計預估,這個會有一定的誤差。
由于MVCC會有多個版本的數據頁,比如刪除一些數據,但是這些數據由于還在其它的事務中可能會被看到,索引不是真正的刪除,這種情況也會導致統計不準確,從而影響優化器的判斷。
上面這兩個原因導致MySQL在執行SQL語句時,會選錯索引。
明明使用索引a的時候,執行效率更高,但實際情況卻使用了索引b。
為了解決MySQL選錯索引的問題,我們使用了關鍵字force index,來強制查詢sql走索引a。
這樣優化之后,這次小范圍的消息積壓問題被解決了。
過了半年之后,在某個晚上6點多鐘。
有幾個商家投訴過來,說劃菜系統有延遲,下單之后,幾分鐘才能看到菜品。
我查看了一下監控,發現kafka消息又出現了積壓的情況。
查了一下MySQL的索引,該走的索引都走了,但數據查詢還是有些慢。
此時,我再次查了一下劃菜表,驚奇的發現,短短半年表中有3千萬的數據了。
通常情況下,單表的數據太多,無論是查詢,還是寫入的性能,都會下降。
這次出現查詢慢的原因是數據太多了。
為了解決這個問題,我們必須:
由于現階段做分庫分表的代價太大了,我們的商戶數量還沒有走到這一步。
因此,我們當時果斷選擇了將歷史數據做備份的方案。
當時我跟產品和DBA討論了一下,劃菜表只保留最近30天的數據,超過幾天的數據寫入到歷史表中。
這樣優化之后,劃菜表30天只會產生幾百萬的數據,對性能影響不大。
消息積壓的問題被解決了。
通過上面這幾次優化之后,很長一段時間,系統都沒有出現消息積壓的問題。
但在一年之后的某一天下午,又有一些商家投訴過來了。
此時,我查看公司郵箱,發現kafka消息積壓的監控報警郵件一大堆。
但由于剛剛一直在開會,沒有看到。
這次的時間點就有些特殊。
一般情況下,并發量大的時候,是中午或者晚上的用餐高峰期,而這次出現消息積壓問題的時間是下午。
這就有點奇怪了。
剛開始查詢這個問題一點頭緒都沒有。
我問了一下訂單組的同事,下午有沒有發版,或者執行什么功能?
因為我們的劃菜系統,是他們的下游系統,跟他們有直接的關系。
某位同事說,他們半小時之前,執行了一個批量修改訂單狀態的job,一次性修改了幾萬個訂單的狀態。
而修改了訂單狀態,會自動發送mq消息。
這樣導致,他們的程序在極短的時間內,產生了大量的mq消息。
而我們的mq消費者根本無法處理這些消息,所以才會產生消息積壓的問題。
我們當時一起查了kafka消息的積壓情況,發現當時積壓了幾十萬條消息。
要想快速提升mq消費者的處理速度,我們當時想到了兩個方案:
但考慮到,當時消息已經積壓到幾個已有的partion中了,再新增partion意義不大。
于是,我們只能改造代碼,使用線程池處理消息了。
為了開始消費積壓的消息,我們將線程池的核心線程和最大線程數量調大到了50。
這兩個參數是可以動態配置的。
這樣調整之后,積壓了幾十萬的mq消息,在20分鐘左右被消費完了。
這次突然產生的消息積壓問題被解決了。
解決完這次的問題之后,我們還是保留的線程池消費消息的邏輯,將核心線程數調到8,最大線程數調到10。
當后面出現消息積壓問題,可以及時通過調整線程數量,先臨時解決問題,而不會對用戶造成太大的影響。
注意:使用線程池消費mq消息不是萬能的。該方案也有一些弊端,它有消息順序的問題,也可能會導致服務器的CPU使用率飆升。此外,如果在多線程中調用了第三方接口,可能會導致該第三方接口的壓力太大,而直接掛掉。
總之,MQ的消息積壓問題,不是一個簡單的問題。
雖說產生的根本原因是:MQ生產者生產消息的速度,大于MQ消費者消費消息的速度,但產生的具體原因有多種。
我們在實際工作中,需要針對不同的業務場景,做不同的優化。
我們需要對MQ隊列中的消息積壓情況,進行監控和預警,至少能夠及時發現問題。
沒有最好的方案,只有最合適當前業務場景的方案。
本文鏈接:http://www.tebozhan.com/showinfo-26-88349-0.htmlMQ消息積壓,把我整吐血了
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com