AVt天堂网 手机版,亚洲va久久久噜噜噜久久4399,天天综合亚洲色在线精品,亚洲一级Av无码毛片久久精品

當前位置:首頁 > 科技  > 軟件

Kafka 遷移工具 MirrorMaker2 原理起底

來源: 責編: 時間:2024-04-19 09:29:57 186觀看
導讀注意:本文內容截止到 2024 年 2 月 26 日發布的 Kafka 3.7.0 版本。MirrorMaker2(后文簡稱 MM2)在 2019 年 12 月隨 Kafka 2.4.0 一起推出。顧名思義,是為了解決 Kafka 集群之間數據復制和數據同步的問題而誕生的 Kafka

注意:本文內容截止到 2024 年 2 月 26 日發布的 Kafka 3.7.0 版本。f4Q28資訊網——每日最新資訊28at.com

MirrorMaker2(后文簡稱 MM2)在 2019 年 12 月隨 Kafka 2.4.0 一起推出。顧名思義,是為了解決 Kafka 集群之間數據復制和數據同步的問題而誕生的 Kafka 官方的數據復制工具。在實際生產中,經常被用來實現 Kafka 數據的備份,遷移和災備等目的。f4Q28資訊網——每日最新資訊28at.com

在此也預告一下,AutoMQ 基于 MM2 的遷移產品化功能也即將和大家見面,可以幫助用戶更好更快從自建 Kafka 遷移到 AutoMQ,歡迎大家屆時使用。f4Q28資訊網——每日最新資訊28at.com

1、 安裝部署 

MM2 一共有三種部署模式,dedicated mode,standalone mode 和 Kafka connect mode。f4Q28資訊網——每日最新資訊28at.com

部署模式

Dedicated mode

直接部署 Kafka MM2,啟動命令如下:f4Q28資訊網——每日最新資訊28at.com

./bin/connect-mirror-maker.sh connect-mirror-maker.properties

此時 MM2 依然是基于 Kafka Connect,對外封裝掉了 Kafka Connect 的復雜度,與此同時也支持分布式部署。One-line 直接拉起 MM2 以及背后的 Kafka Connect,不過相比較來說也喪失掉了一些 Kafka Connect 的靈活性(閹割了 Kafka Connect 對外的 RESTful API)。f4Q28資訊網——每日最新資訊28at.com

Standalone mode

Standalone mode 更像是為測試環境設計的,并不支持分布式部署。這一點在 KIP-382[1] 中也有說明。因為不是一個生產可用的版本,在此不作多贅述。f4Q28資訊網——每日最新資訊28at.com

Kafka Connect mode

此時整個 MM2 的部署是需要一個現成的 Kafka Connect 集群的,MM2 會在 Kafka Connect 上部署自己的 Connector 來完成整個遷移過程。因為 Kafka Connect mode 是 MM2 最復雜的部署模式,而且無論是 Dedicated mode 還是 Kafka Connect mode,背后的原理都是一樣,只是前者進行了封裝,因此了解 MM2 在 Kafka Connect 上的工作流程最有利于我們對 MM2 有全局了解。f4Q28資訊網——每日最新資訊28at.com

Kafka Connect 在 Kafka 0.9.0 版本中進行推出,旨在簡化數據集成和數據流管道的構建,同時提供了一種可拓展,可靠的方式來連接 Kafka 與外部系統。基于這樣的設計,MM2 基于 Kafka Connect 進行實現是非常自然的事情。f4Q28資訊網——每日最新資訊28at.com

我們可以把基于 Kafka Connect mode 進行部署的 MM2 里的調度資源分為以下幾種:f4Q28資訊網——每日最新資訊28at.com

? Worker:一個 MM2 或者 Kafka Connect 進程,是進行分布式部署時的基本單位。f4Q28資訊網——每日最新資訊28at.com

? Connector:單個 Worker 內部執行遷移任務的連接器,一個 Worker 內可以有多個 Connector,每個 Connector 負責相對獨立的功能。f4Q28資訊網——每日最新資訊28at.com

? Task:Connector 將需要遷移的任務進行切分,Task 是并發執行的最小單位。f4Q28資訊網——每日最新資訊28at.com

Kafka Connect 集群

在 Kafka Connect Mode 下,我們需要先準備一個 Kafka Connect 集群,在每個節點上執行以下命令即可啟動 Kafka Connect 集群。f4Q28資訊網——每日最新資訊28at.com

./bin/connect-distributed.sh config/connect-distributed.properties

在 Kafka Connect 集群部署完成之后,我們可以利用 Kafka Connect 提供的 RESTful API 來啟動 MM2 所需要的所有 Connectors。默認情況下,Kafka Connect 提供的端口為 8083。即使 Kafka Connect 集群中有多個節點,但是執行下列的命令只需要向集群中的任一節點發起請求即可。f4Q28資訊網——每日最新資訊28at.com

Connector

假設節點 IP 為本機,啟動三個 Connector 的命令如下(實際上向當前 Kafka Connect 集群中的任一節點發起請求即可):f4Q28資訊網——每日最新資訊28at.com

# MirrorSourceConnectorcurl -X POST -H "Content-Type: application/json" --data @mirror-source-connector.properties http://127.0.0.1:8083/connectors# MirrorCheckpointConnectorcurl -X POST -H "Content-Type: application/json" --data @mirror-checkpoint-connector.properties http://127.0.0.1:8083/connectors# MirrorHeartbeatConnectorcurl -X POST -H "Content-Type: application/json" --data @mirror-heartbeat-connector.properties http://127.0.0.1:8083/connectors

其中 mirror-source-connector.properties,mirror-checkpoint-connector.properties 和 mirror-heartbeat-connector.properties 為對應 Connector 的配置文件。f4Q28資訊網——每日最新資訊28at.com

在啟動完 Connector 之后,我們還可以使用以下命令查看當前 Kafka Connect 集群中已經存在的 Connectors。f4Q28資訊網——每日最新資訊28at.com

$ curl http://127.0.0.1:8083/connectors["mm2-heartbeat-connector","mm2-source-connector","mm2-checkpoint-connector"]%

更多關于 Kafka Connect RESTful API 的細節,可以參考 Kafka Connect 101: Kafka Connect's REST API[2]。f4Q28資訊網——每日最新資訊28at.com

2、工作流   

從上文可以看到,在 MM2 中,有三個 Connector,它們負責完成整個副本復制過程,這三個 Connector 包括:f4Q28資訊網——每日最新資訊28at.com

? MirrorSourceConnector:同步源集群中 topic 的消息數據到目標集群。f4Q28資訊網——每日最新資訊28at.com

? MirrorCheckpointConnector:將源集群的消費位點翻譯并同步到目標集群。f4Q28資訊網——每日最新資訊28at.com

? MirrorHeartbeatConnector:定時往源集群中發送心跳,驗證和監控兩個集群之間連接和遷移任務的運行情況。f4Q28資訊網——每日最新資訊28at.com

對于 MirrorSourceConnector 和 MirrorCheckpointConnector 提供有 JMX 監控信息,可以幫助對遷移進度和遷移健康狀況有全局了解。f4Q28資訊網——每日最新資訊28at.com

MM2 會創建以下幾種 Topic(除 heartbeats 之外,所有的 Topic 都會被創建在 target 集群上):f4Q28資訊網——每日最新資訊28at.com

? connect-configs:存儲 MM2 中 connector 的配置信息。f4Q28資訊網——每日最新資訊28at.com

? connect-offsets:存儲 MM2 中 MirrorSourceConnector 和 MirrorCheckpointConnector 的消費位點。f4Q28資訊網——每日最新資訊28at.com

? connect-status:存儲 MM2 中 connector 的狀態信息。f4Q28資訊網——每日最新資訊28at.com

? mm2-offset-syncs.A.internal:存儲消息在源集群和目標集群之間同步的 offset 映射信息(即 OffsetSync 消息)用于消費位點翻譯。此 Topic 中的消息由 MirrorSourceConnector 發出(Topic 名中 A 表示源集群的 alias)。f4Q28資訊網——每日最新資訊28at.com

? A.checkpoints.internal:存儲 GroupId 同步的消費進度。具體存儲的信息包括 GroupId,Partition 以及在源集群和目標集群的消費位點,此 Topic 中的信息由 MirrorCheckpointConnector 發出(Topic 名中 A 表示源集群的 alias)。f4Q28資訊網——每日最新資訊28at.com

? heartbeats:定期往源集群發送心跳消息,這部分消息會被同步到目標集群。此 Topic 中的消息體主要存儲簡單的時間戳信息,其中的消息由 MirrorHeartbeatConnector 發出。f4Q28資訊網——每日最新資訊28at.com

想要了解具體的 MM2 工作流,弄清楚 mm2-offset-syncs.A.internal 和 A.checkpoints.internal 兩個 Topic 的作用尤為關鍵。f4Q28資訊網——每日最新資訊28at.com

圖片圖片f4Q28資訊網——每日最新資訊28at.com

消息同步與位點映射

MirrorSourceConnector 會從最早位點開始同步消息。在同步消息時會生成 OffsetSync 消息。OffsetSync 消息中記錄了被同步的消息的分區信息,在源集群和目標集群上的位點映射信息。f4Q28資訊網——每日最新資訊28at.com

記錄在 OffsetSync 消息中的位點映射信息是非常必要的,首先一條消息從源集群被同步到目標集群上,前后的 offset 大概率是不同的,而且還有可能會出現消息重復和多個源集群的 topic 被同步到一個目標 topic 上的情況,而位點映射能最大程度上幫助我們將源集群的消息和目標集群的消息對應上。f4Q28資訊網——每日最新資訊28at.com

這個 OffsetSync 消息就被存儲在 mm2-offset-syncs.A.internal 中。但是并不是每同步一條消息就會生成一個 OffsetSync 消息。默認情況下每隔 100 條消息就會生成一個 OffsetSync 消息,這里的參數可以使用 offset.lag.max 來進行調節。關于 OffsetSync 消息的同步判斷,可以參照 org.apache.kafka.connect.mirror.MirrorSourceTask.PartitionState#update 的具體實現細節。f4Q28資訊網——每日最新資訊28at.com

位點翻譯

MirrorCheckpointConnector 則會執行具體的位點翻譯工作,它會消費 mm2-offset-syncs.A.internal 中的 OffsetSync 消息,然后將源集群上的消費位點翻譯成目標集群上的消費位點并執行 alterConsumerGroupOffsets 方法來重置消費者位點。f4Q28資訊網——每日最新資訊28at.com

因為 OffsetSync 沒有按照時間間隔同步的邏輯,導致的結果就是當前分區最新的消息位點距離上一次同步的位點如果沒有超過 100,則不會生成新的 OffsetSync。而 MirrorCheckpointConnector 是根據 OffsetSync 中的消息位點來同步消費進度的,這樣的結果就是目標集群的消費位點基本上不可能被完全同步,最多相比較于源集群會回退 100 個位點。但是在 3.7.0 以及之后的版本中,對 OffsetSync 增加了按照時間同步的兜底邏輯,使得這個問題得到了解決[3]。f4Q28資訊網——每日最新資訊28at.com

詳細來說,如果當前消息距離之前的 OffsetSync 中的最新消息沒有超過 100 個 offset,但是已經有一段時間沒有進行過 OffsetSync 消息的同步了,也會強行進行一次 OffsetSync 消息的同步(由 offset.flush.internal.ms 參數控制,默認為 10S)。f4Q28資訊網——每日最新資訊28at.com

圖片圖片f4Q28資訊網——每日最新資訊28at.com

可以通過以下命令方便地查看 OffsetSync 消息的內容。f4Q28資訊網——每日最新資訊28at.com

$ ./bin/kafka-console-consumer.sh --formatter "org.apache.kafka.connect.mirror.formatters.OffsetSyncFormatter" --bootstrap-server 127.0.0.1:9592 --from-beginning --topic mm2-offset-syncs.A.internalOffsetSync{topicPartitinotallow=heartbeats-0, upstreamOffset=0, downstreamOffset=0}OffsetSync{topicPartitinotallow=test-0-0, upstreamOffset=0, downstreamOffset=0}OffsetSync{topicPartitinotallow=test-0-0, upstreamOffset=101, downstreamOffset=101}OffsetSync{topicPartitinotallow=heartbeats-0, upstreamOffset=2, downstreamOffset=2}

針對 MM2 中的 HeartbeatConnector,更多的時候則是起到一個觀測當前 MM2 集群同步狀況的作用。使用以下命令可以查看 HeartbeatTopic 的內容。f4Q28資訊網——每日最新資訊28at.com

$ ./bin/kafka-console-consumer.sh --formatter "org.apache.kafka.connect.mirror.formatters.HeartbeatFormatter"  --bootstrap-server 127.0.0.1:9092 --from-beginning --topic heartbeats --property print.key=trueHeartbeat{sourceClusterAlias=A, targetClusterAlias=B, timestamp=1712564822022}Heartbeat{sourceClusterAlias=A, targetClusterAlias=B, timestamp=1712564842185}Heartbeat{sourceClusterAlias=A, targetClusterAlias=B, timestamp=1712564862192}Heartbeat{sourceClusterAlias=A, targetClusterAlias=B, timestamp=1712564882197}Heartbeat{sourceClusterAlias=A, targetClusterAlias=B, timestamp=1712564902202}

這里每 20 秒會生成一條心跳消息,心跳消息包含一條當時的時間戳。這樣通過在目標集群查看被同步過來的 heartbeat Topic 中的消息,即可查看當前消息同步狀況。f4Q28資訊網——每日最新資訊28at.com

3、負載均衡 

在 Kafka Connect 中,一個獨立的 Kafka Connect 進程我們稱之為一個 worker。在分布式環境下,相同 group.id 的一組 worker 就形成了一個 Kafka Connect 集群。f4Q28資訊網——每日最新資訊28at.com

盡管在負載均衡的過程中,Connector 和 Task 都會參與,但是 Connector 和 Task 并不是正交的。Task 從屬于 Connector。Connector 參與負載均衡只是表示具體的 Connector 類中的邏輯會在哪個 worker 中執行。具體的實現邏輯可以參照 EagerAssigner#performTaskAssignment 中的內容:f4Q28資訊網——每日最新資訊28at.com

private Map<String, ByteBuffer> performTaskAssignment(String leaderId, long maxOffset,                                                      Map<String, ExtendedWorkerState> memberConfigs,                                                      WorkerCoordinator coordinator) {    // 用于記錄 Connector 分配結果    Map<String /* member */, Collection<String /* connector */>> connectorAssignments = new HashMap<>();    // 用于記錄 Task 分配結果    Map<String /* member */, Collection<ConnectorTaskId>> taskAssignments = new HashMap<>();    List<String> connectorsSorted = sorted(coordinator.configSnapshot().connectors());    // 使用一個環形迭代器,將 connector 和 task 分別分配給不同的 worker    CircularIterator<String> memberIt = new CircularIterator<>(sorted(memberConfigs.keySet()));    // 先分配 Connector    for (String connectorId : connectorsSorted) {        String connectorAssignedTo = memberIt.next();        log.trace("Assigning connector {} to {}", connectorId, connectorAssignedTo);        Collection<String> memberConnectors = connectorAssignments.computeIfAbsent(connectorAssignedTo, k -> new ArrayList<>());        memberConnectors.add(connectorId);    }    // 在分配具體的 Task,延續 member 迭代器中的順序    for (String connectorId : connectorsSorted) {        for (ConnectorTaskId taskId : sorted(coordinator.configSnapshot().tasks(connectorId))) {            String taskAssignedTo = memberIt.next();            log.trace("Assigning task {} to {}", taskId, taskAssignedTo);            Collection<ConnectorTaskId> memberTasks = taskAssignments.computeIfAbsent(taskAssignedTo, k -> new ArrayList<>());            memberTasks.add(taskId);        }    }    // 序列化分配結果并返回    ......}

下圖展示了有 3 個 Worker,1 個 Connector 以及 5 個 Task 時以及 Worker2 宕機前后的負載均衡情況。f4Q28資訊網——每日最新資訊28at.com

圖片圖片f4Q28資訊網——每日最新資訊28at.com

不過這種負載均衡方式會引起比較明顯的驚群效應,比如在 Kafka Connect 集群擴縮容的時候,不是新擴縮容的節點也會出現較長的 stop-the-world 問題,在 K8s 環境中如果有節點需要進行滾動升級,也會出現類似的問題。這種負載均衡方式在 Kafka 中稱之為 Eager Rebalance。f4Q28資訊網——每日最新資訊28at.com

后面 Kafka 提出了 Incremental Cooperative Rebalance[4],引入了一個延遲時間延后 rebalance 的過程。進行了這樣的改進之后,當出現節點滾動升級時,負載均衡就不會馬上發生,因為被升級的節點可能很快就回歸了,之前負載均衡的結果也能最大限度得到保留,對整體消息同步流程的影響也盡可能降到了最低。相比較來說,Eager Rebalance 可以很快就達到負載均衡的終態,而 Incremental Cooperative Rebalance 則可以最大程度上降低滾動升級等場景下對負載均衡帶來的全局影響。f4Q28資訊網——每日最新資訊28at.com

參考資料

[1] KIP-382: MirrorMaker 2.0f4Q28資訊網——每日最新資訊28at.com

https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0f4Q28資訊網——每日最新資訊28at.com

[2] COURSE: KAFKA CONNECT 101 Kafka Connect’s REST APIf4Q28資訊網——每日最新資訊28at.com

https://developer.confluent.io/courses/kafka-connect/rest-api/f4Q28資訊網——每日最新資訊28at.com

[3] KAFKA-15906f4Q28資訊網——每日最新資訊28at.com

https://issues.apache.org/jira/browse/KAFKA-15906f4Q28資訊網——每日最新資訊28at.com

[4] Incremental Cooperative Rebalancing in Kafka Connectf4Q28資訊網——每日最新資訊28at.com

https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connectf4Q28資訊網——每日最新資訊28at.com

[5] KIP-415: Incremental Cooperative Rebalancing in Kafka Connectf4Q28資訊網——每日最新資訊28at.com

https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connectf4Q28資訊網——每日最新資訊28at.com

[6] KIP-545: support automated consumer offset sync across clusters in MM 2.0f4Q28資訊網——每日最新資訊28at.com

https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+support+automated+consumer+offset+sync+across+clusters+in+MM+2.0f4Q28資訊網——每日最新資訊28at.com

[7] KIP-656: MirrorMaker2 Exactly-once Semanticsf4Q28資訊網——每日最新資訊28at.com

https://cwiki.apache.org/confluence/display/KAFKA/KIP-656%3A+MirrorMaker2+Exactly-once+Semanticsf4Q28資訊網——每日最新資訊28at.com

本文鏈接:http://www.tebozhan.com/showinfo-26-84043-0.htmlKafka 遷移工具 MirrorMaker2 原理起底

聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: 2024年在Web應用程序中實現前沿技術的JavaScript庫

下一篇: Prism:打造WPF項目的MVVM之選,簡化開發流程、提高可維護性

標簽:
  • 熱門焦點
Top