Pulsar3.0 是 Pulsar 社區(qū)推出的第一個 LTS 長期支持版本。
圖片
如圖所示,LTS 版本會最長支持到 36 個月,而 Feature 版本最多只有六個月;類似于我們使用的 JDK11,17,21 都是可以長期使用的;所以也推薦大家都升級到 LTS 版本。
作為首個 LTS 版本,3.0 自然也是自帶了許多新特性,這個會在后續(xù)介紹。
先來看看升級指南:
圖片
在官方的兼容表中會發(fā)現(xiàn):不推薦跨版本升級。
也就是說如果你現(xiàn)在還在使用的是 2.10.x,那么推薦是先升級到 2.11.x 然后再升級到 3.0.x.
而且根據(jù)我們的使用經(jīng)驗來看,首個版本是不保險的,即便是 LTS 版本;所以不推薦直接升級到 3.0.0,而是更推薦 3.0.1+,這個小版本會修復 3.0 所帶來的一些 bug。
先講一下我們的升級流程,大家可以用做參考。
根據(jù)我們的使用場景,為了以防萬一,首先需要將我們的插件依賴升級到對應的版本。
圖片
其實簡單來說就是更新下依賴,然后再重新打包,在后續(xù)的流程進行測試。
之后是預熱鏡像,我們使用 harbor 搭建了自己的 docker 鏡像倉庫,這樣在升級重啟鏡像的時候可以更快的從內(nèi)網(wǎng)拉取鏡像。
畢竟一個 pulsar-all 的鏡像也不小,盡量的縮短啟動時間。
預熱的過程也很簡單:
docker pull apachepulsar/pulsar-all:3.0.1docker tag apachepulsar/pulsar-all:3.0.1 harbor-private.xx.com/pulsar/pulsar-all:3.0.1docker image push harbor-private.xx.com/pulsar/pulsar-all:3.0.1
之后升級的時候就可以使用私服的鏡像了。
我這邊有寫了一個 cli 可以幫我快速創(chuàng)建或升級一個集群,然后觸發(fā)我所編寫的功能測試。
./pulsar-upgrade-cli upgrade pulsar-test ./charts/pulsar --version x.x.x -f charts/pulsar/values.yaml -n pulsar-test
這個 cli 很簡單,一共就做三件事:
之后的效果如下:
圖片
主要就是覆蓋了我們的使用場景,都跑通過之后才會走后續(xù)的流程。
圖片
之后會啟動一個 200 左右的并發(fā)生產(chǎn)和消費數(shù)據(jù),模擬線上的使用情況,會一直讓這個任務跑著,大概一晚上就可以了,第二天通過監(jiān)控查看:
組件的升級步驟這里參考了官方指南:https://pulsar.apache.org/docs/3.1.x/administration-upgrade/#upgrade-zookeeper-optional
圖片
只要一步步按照這個流程走,問題不大,哪一步出現(xiàn)問題后需要及時回滾,回滾流程參考下面的回滾部分。
同時在升級過程中需要一直查看 broker 的 error 日志,如果有明顯的不符合預期的日志一定要注意。
在升級 bookkeeper 的時候,broker 可能會出現(xiàn) bk 連接失敗的異常,這個可以不用在意。
都升級完后就是線上業(yè)務驗證環(huán)節(jié)了:
當出現(xiàn)異常的時候需要立即回滾,這里的異常一般就是消息收發(fā)異常,客戶端掉線等。
經(jīng)過我的測試 3.0.x 的存儲和之前的版本是兼容的,所以 bookkeeper 都能降級其他的組件就沒啥可擔心的了。
需要降級時直接將所有組件降級為上一個版本即可。
因為是從 2.x 升級到 3.x 也是涉及到了跨大版本,所以也準備了災難恢復的方案。
比如極端情況下升級失敗,所有數(shù)據(jù)丟失的情況。
整個災難恢復的主要目的就是恢復后的集群對外提供的域名不發(fā)生變化,同時所有的客戶端可以自動重連上來,也就是最壞的情況下所有的數(shù)據(jù)丟了可以接受,但不能影響業(yè)務正常使用。
所以我們的流程如下:
@SneakyThrows @Test void backup(){ List<String> topicList = pulsarAdmin.topics().getPartitionedTopicList("tenant/namespace"); log.info("topic size={}",topicList.size()); // create a custom thread pool CopyOnWriteArrayList<TopicMeta> dataList = new CopyOnWriteArrayList<>(); ExecutorService customThreadPool = Executors.newFixedThreadPool(10); for (String topicName : topicList) { customThreadPool.execute(()-> { PartitionedTopicMetadata metadata; try { metadata = pulsarAdmin.topics().getPartitionedTopicMetadata(topicName); TopicMeta topicMeta = new TopicMeta(); // backup topic topicMeta.setName(topicName); topicMeta.setPartition(metadata.partitions); // backup permission Map<String, Set<AuthAction>> permissions = pulsarAdmin.topics().getPermissions(topicName); topicMeta.setPermissions(permissions); // back sub List<String> subscriptions = new ArrayList<>(); PartitionedTopicStats topicStats = pulsarAdmin.topics().getPartitionedStats(topicName, true); topicStats.getSubscriptions().forEach((k,v)-> subscriptions.add(k)); topicMeta.setSubscriptions(subscriptions); dataList.add(topicMeta); } catch (PulsarAdminException e) { throw new RuntimeException(e); } }); } customThreadPool.shutdown(); while (!customThreadPool.isTerminated()) { } log.info("{}",dataList.size()); log.info("{}",JSONUtil.toJsonStr(dataList)); }// TopicMetaData@Data public class TopicMeta { private String name; private int partition; Map<String, Set<AuthAction>> permissions; List<String> subscriptions = new ArrayList<>(); }
第一步是備份 topic:
因為我們客戶端使用了 JWT 驗證,所有為了使得恢復的 Pulsar 集群可以讓客戶端無縫切換到新集群,因此必須得使用相同的公私鑰。
這個其實比較簡單,我們使用的是 helm 安裝的集群,所以只需要備份好 Secret 即可。
apiVersion: v1 data: PRIVATEKEY: XXX PUBLICKEY: XXX kind: Secret metadata: name: pulsar-token-asymmetric-key namespace: pulsar type: Opaque # 還有幾個 superUser 的 Secret
首先使用 helm 重新創(chuàng)建一個新集群:
./scripts/pulsar/prepare_helm_release.sh -n pulsar -k pulsarhelm install / --values charts/pulsar/values.yaml / --set namespace=pulsar/ --set initialize=true / pulsar ./charts/pulsar -n pulsar
直接使用剛才備份的公私鑰覆蓋到新集群即可。
進入 toolset pod 創(chuàng)建需要使用的 tenant/namespace
k exec -it pulsar-toolset-0 -n pulsar bashbin/pulsar-admin tenants create tenantbin/pulsar-admin namespaces create tenant/namespace
之后便是最重要的元數(shù)據(jù)恢復了:
@SneakyThrows @Test void restore() { PulsarAdmin pulsarAdmin = PulsarAdmin.builder().serviceHttpUrl("http://url:8080") .authentication(AuthenticationFactory.token(token)) .build(); Path filePath = Path.of("restore-ns.json"); String fileContent = Files.readString(filePath); List<TopicMeta> topicMetaList = JSON.parseArray(fileContent, TopicMeta.class); ExecutorService customThreadPool = Executors.newFixedThreadPool(50); for (TopicMeta topicMeta : topicMetaList) { customThreadPool.execute(() -> { // Create topic try { pulsarAdmin.topics().createPartitionedTopic(topicMeta.getName(), topicMeta.getPartition()); } catch (PulsarAdminException e) { log.error("Create topic error"); } // Create sub for (String subscription : topicMeta.getSubscriptions()) { try { pulsarAdmin.topics().createSubscription(topicMeta.getName(), subscription, MessageId.latest); } catch (PulsarAdminException e) { log.error("createSubscription error"); } } // Grant permission topicMeta.getPermissions().forEach((role, authActions) -> { permission(pulsarAdmin, topicMeta.getName(), role, authActions); }); log.info("topic:{} restore success", topicMeta.getName()); }); } customThreadPool.shutdown(); while (!customThreadPool.isTerminated()) { } log.info("restore success"); }private synchronized void permission(PulsarAdmin pulsarAdmin, String topic, String role, Set<AuthAction> authActions) { try { pulsarAdmin.topics().grantPermission(topic, role, authActions); } catch (PulsarAdminException e) { log.error("grantPermission error", e); } }
流程和備份類似:
因為授權接口限制了并發(fā)調(diào)用,所有需要加鎖,導致整個恢復的流程就會比較慢。
8000 topic 的 namespace 大概恢復時間為 40min 左右。
之后依次恢復其他 namespace 即可。
admin.namespaces().setNamespaceMessageTTL("tenant/namespace", 3600 * 6);admin.namespaces().setBacklogQuota("tenant/namespace", BacklogQuota)
如果之前的集群有設置 TTL 或者是 backlogQuota 時都需要手動恢復。
以上就是整個升級和災難恢復的流程,當然災難恢復希望大家不要碰到。
我會在下一篇詳細介紹 Pulsar 3.0 的新功能以及所碰到的一些坑。
本文鏈接:http://www.tebozhan.com/showinfo-26-53341-0.htmlPulsar3.0 升級指北,你學會些什么?
聲明:本網(wǎng)頁內(nèi)容旨在傳播知識,若有侵權等問題請及時與本網(wǎng)聯(lián)系,我們將在第一時間刪除處理。郵件:2376512515@qq.com