在現代分布式系統(tǒng)中,確保數據處理的準確性和一致性是至關重要的。Apache Kafka,作為一個廣泛使用的流處理平臺,提供了強大的消息隊列和流處理功能。隨著業(yè)務需求的增長,Kafka 的事務消息功能應運而生,它允許應" />

AVt天堂网 手机版,亚洲va久久久噜噜噜久久4399,天天综合亚洲色在线精品,亚洲一级Av无码毛片久久精品

當前位置:首頁 > 科技  > 軟件

原理剖析| Kafka Exactly Once 語義實現原理:冪等性與事務消息

來源: 責編: 時間:2024-04-28 08:55:47 167觀看
導讀1、前言
在現代分布式系統(tǒng)中,確保數據處理的準確性和一致性是至關重要的。Apache Kafka,作為一個廣泛使用的流處理平臺,提供了強大的消息隊列和流處理功能。隨著業(yè)務需求的增長,Kafka 的事務消息功能應運而生,它允許應

1、前言   

7c428資訊網——每日最新資訊28at.com

在現代分布式系統(tǒng)中,確保數據處理的準確性和一致性是至關重要的。Apache Kafka,作為一個廣泛使用的流處理平臺,提供了強大的消息隊列和流處理功能。隨著業(yè)務需求的增長,Kafka 的事務消息功能應運而生,它允許應用程序以一種原子的方式處理消息,即要么所有消息都被正確處理,要么都不處理。本文將深入剖析 Kafka 的 Exactly-Once 語義實現原理,包括冪等性與事務消息的關鍵概念,以及它們是如何在 Kafka 中實現的。我們將探討 Kafka 事務的流程,事務提供的 ACID 保證,以及在實際應用中可能遇到的一些限制。無論您是 Kafka 的新手還是經驗豐富的開發(fā)者,本文都將為您提供有價值的見解和指導。7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

2、消息隊列的事務場景

7c428資訊網——每日最新資訊28at.com

Kafka 目前用于流處理的場景:相當于一個有向無環(huán)圖(DAG,Directed acyclic graph)每個節(jié)點是一個 Kafka Topic,每條邊是一個流處理操作。在這樣的場景下,有兩種操作:7c428資訊網——每日最新資訊28at.com

? 消費上游消息并提交位點7c428資訊網——每日最新資訊28at.com

? 處理消息并發(fā)送到下游 Topic7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

對于由這兩種操作構成的一組處理流程需要具備事務語義,這樣我們就可以不重復(Exactly Once)的處理上游消息并將結果可靠地存儲在下游 Topic 中。7c428資訊網——每日最新資訊28at.com

圖片圖片7c428資訊網——每日最新資訊28at.com

上圖是一個典型的 Kafka 事務的流程,我們可以看到:MySQL 的 binlog 作為上游數據源將數據寫入到 Kafka 中,Spark Streaming 從 Kafka 中讀取數據并進行處理,最后將處理結果寫入到另外兩個 Topic 中(圖中三個 Topic 位于同一集群中)。其中消費 Topic A 與寫入 Topic B 和 Topic C 的操作具備事務語義。7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

3、Kafka 的 Exactly Once 語義

7c428資訊網——每日最新資訊28at.com

從上述的場景中我們可以發(fā)現,事務消息最主要的動機是在流處理中實現 Exactly Once 的語義,這可以分為:7c428資訊網——每日最新資訊28at.com

? 僅發(fā)送一次: 單分區(qū)僅發(fā)送一次由生產者冪等保證,多分區(qū)僅發(fā)送一次由事務機制保證7c428資訊網——每日最新資訊28at.com

? 僅消費一次: Kafka 通過消費位點的提交來控制消費進度,而消費位點的提交被抽象成向系統(tǒng) topic 發(fā)送消息。這就使得發(fā)送和消費行為統(tǒng)一起來,只要解決了多分區(qū)發(fā)送消息的一致性就能實現 Exactly Once 語義7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

4、生產者冪等性

7c428資訊網——每日最新資訊28at.com

在創(chuàng)建 Kafka 生產者時設置了 enable.idempotence 參數,用于開啟生產者冪等性。7c428資訊網——每日最新資訊28at.com

val props = new Properties()props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")val producer = new KafkaProducer(props)

Kafka 的發(fā)送冪等是通過序列號來實現的,每個消息都會被分配一個序列號,序列號是遞增的,這樣就可以保證消息的順序性。當生產者發(fā)送消息時,會將消息的序列號和消息內容一起寫入到日志文件中,下次收到非預期序列號的消息就會返回 OutOfOrderSequenceException 異常。7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

設置 enable.idempotence 參數后,生產者會檢查以下三個參數的值是否合法(ProducerConfig#postProcessAndValidateIdempotenceConfigs)7c428資訊網——每日最新資訊28at.com

? max.in.flight.requests.per.connection 必須小于 57c428資訊網——每日最新資訊28at.com

? retries 必須大于 07c428資訊網——每日最新資訊28at.com

? acks 必須設置為 all7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

Kafka 將消息的序列號信息保存在分區(qū)維度的 .snapshot 文件中,格式如下(ProducerStateManager#ProducerSnapshotEntrySchema):7c428資訊網——每日最新資訊28at.com

圖片圖片7c428資訊網——每日最新資訊28at.com

我們可以發(fā)現,該文件中保存了 ProducerId、ProducerEpoch 和 LastSequence。所以冪等的約束為:相同分區(qū)、相同 Producer(id 和 epoch) 發(fā)送的消息序列號需遞增。即 Kafka 的生產者冪等性只在單連接、單分區(qū)生效,Producer 重啟或消息發(fā)送到其他分區(qū)就失去了冪等性的約束。7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

.snapshot 文件在 log segment 滾動時更新,發(fā)生重啟后通過讀取 .snapshot 文件和最新的日志文件即可恢復 Producer 的狀態(tài)。Broker 的重啟或分區(qū)遷移并不會影響冪等性。7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

5、事務消息流程

7c428資訊網——每日最新資訊28at.com

我們首先從 Demo 開始,來看一下如何使用 Kafka 客戶端完成一個事務:7c428資訊網——每日最新資訊28at.com

// 事務初始化val props = new Properties()...props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId)props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")val producer = new KafkaProducer(props)producer.initTransactions()producer.beginTransaction()// 消息發(fā)送producer.send(RecordUtils.create(topic1, partition1, "message1"))producer.send(RecordUtils.create(topic2, partition2, "message2"))// 事務提交或回滾producer.commitTransaction()

7c428資訊網——每日最新資訊28at.com

5.1 事務初始化

Kafka Producer 啟動后我們使用兩個 API 來初始化事務:initTransactions 和 beginTransaction。7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

回顧一下我們的 Demo,在發(fā)送消息時是發(fā)送到兩個不同分區(qū)中,這兩個分區(qū)可能在不同的 Broker 上,所以我們需要一個全局的協(xié)調者 TransactionCoordinator 來記錄事務的狀態(tài)。7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

所以,在 initTransactions 中,Producer 首先發(fā)送 ApiKeys.FIND_COORDINATOR 請求獲取 TransactionCoordinator。7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

之后即可向其發(fā)送 ApiKeys.INIT_PRODUCER_ID 請求獲取 ProducerId 及  ProducerEpoch(也是上文中用于冪等的字段)。此步驟生成的 id 和 epoch 會寫入內部 Topic __transaction_state 中,并且將事務的狀態(tài)置為 Empty。7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

__transaction_state 是 compaction Topic,其中消息的 key 為客戶端設置的transactional.id(詳見 TransactionStateManager#appendTransactionToLog)。7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

區(qū)別于 ProducerId 是服務端生成的內部屬性;TransactionId 由用戶設置,用于標識業(yè)務視角認為的“同一個應用”,啟動具有相同 TransactionId 的新 Producer 會使得未完成的事務被回滾并且來自舊 Producer(具有較小 epoch)的請求被拒絕掉。7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

后續(xù) beginTransaction 用于開始一個事務,該方法會創(chuàng)建一個 Producer 內部事務狀態(tài),標識這一個事務的開始,并不會有 RPC 產生。7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

5.2 消息發(fā)送

上一節(jié)說到 beginTransaction 只是更改 Producer 內部狀態(tài),那么在第一條消息發(fā)送時才隱式開啟了事務:7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

首先,Producer 會發(fā)送 ApiKeys.ADD_PARTITIONS_TO_TXN 請求到 TransactionCoordinator。TransactionCoordinator 會將這個分區(qū)加入到事務中,并更改事務的狀態(tài)為 Ongoing,這些信息被持久化到 __transaction_state 中。7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

然后 Producer 使用 ApiKeys.PRODUCE 請求正常發(fā)送消息到對應的分區(qū)中。這條消息的可見性控制在下文消息消費一節(jié)中會詳細討論。7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

5.3 事務提交與回滾

當所有消息發(fā)送完成后,Producer 可以選擇提交或回滾事務,此時:7c428資訊網——每日最新資訊28at.com

? TransactionCoordinator:具有當前事務所有相關分區(qū)的信息7c428資訊網——每日最新資訊28at.com

? 其他 Broker:已經將消息持久化到日志文件中7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

接下來 Producer 調用 commitTransaction 會發(fā)送 ApiKeys.END_TXN 請求將事務狀態(tài)更改為 PrepareCommit(回滾事務對應狀態(tài) PrepareAbort)并持久化到 __transaction_state 中,此時從 Producer 的視角來看整個事務已經結束了。7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

TransactionCoordinator 會異步向各個 Broker 發(fā)送 ApiKeys.WRITE_TXN_MARKERS 請求,當所有參加事務的 Broker 都返回成功后,TransactionCoordinator 會將事務狀態(tài)更改為 CompleteCommit(回滾事務對應狀態(tài) CompleteAbort)并持久化到 __transaction_state 中。7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

5.4 消息的消費

某個分區(qū)的消息可能是事務消息與非事務消息混雜的,如下圖所示:7c428資訊網——每日最新資訊28at.com

圖片圖片7c428資訊網——每日最新資訊28at.com

在 Broker 處理 ApiKeys.PRODUCE 請求時,完成消息持久化會更新 LSO 到第一條未提交的事務消息的 offset。這樣在消費者消費消息時,可以通過 LSO 來判斷消息是否可見:如果設置了 isolation.level 為 read_committed 則只會消費 LSO 之前的消息。7c428資訊網——每日最新資訊28at.com

LSO(log stable offset): 它表示的是已經被成功復制到所有副本(replicas)并且可以被消費者安全消費的消息的最大偏移量。 7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

但是我們可以發(fā)現 LSO 之前存在已回滾的消息(圖中紅色矩形)這些消息應該被過濾掉:在 Broker 處理 ApiKeys.WRITE_TXN_MARKERS 請求時,會將已回滾的消息索引寫入到 .txnindex 文件中(LogSegmentKafka#updateTxnIndex)。7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

后續(xù) Consumer 消費消息時還會收到對應區(qū)間的已取消事務消息列表,上圖區(qū)間中的該列表為:7c428資訊網——每日最新資訊28at.com

圖片圖片7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

代表 offset 在 [2,5] 之間且由 id 為 11 的 Producer 發(fā)送的消息都已回滾。7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

上文我們討論了 __transaction_state 的實現確保同一時間,同一 TransactionId 有且只有一個事務在進行中。所以可以使用 ProducerId 和 offset 區(qū)間定位回滾的消息不會發(fā)生沖突。7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

6、Kafka 事務提供的 ACID 保證

7c428資訊網——每日最新資訊28at.com

? 原子性(Atomicity)7c428資訊網——每日最新資訊28at.com

Kafka 通過對 __transaction_state Topic 的寫入實現了事務狀態(tài)的轉移,保證了事務要么同時提交,要么同時回滾。7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

? 一致性(Consistency)7c428資訊網——每日最新資訊28at.com

在事務進入 PrepareCommit 或 PrepareAbort 階段時, TransactionCoordinator 異步向所有參與事務的 Broker 提交或回滾事務。這使得 Kafka 的事務做不到強一致性,只能通過不斷重試保證最終一致性。7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

? 隔離性(Isolation)7c428資訊網——每日最新資訊28at.com

Kafka 通過 LSO 機制和 .txnindex 文件來避免臟讀,實現讀已提交(Read Committed)的隔離級別。7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

? 持久性(Durability)7c428資訊網——每日最新資訊28at.com

Kafka 通過將事務狀態(tài)寫入到 __transaction_state Topic 和消息寫入到日志文件中來保證持久性。7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

7、Kafka 事務的限制

7c428資訊網——每日最新資訊28at.com

從功能上看,Kafka 事務并不能支持業(yè)務方事務,強限制上游的消費和下游寫入都需要是同一個 Kafka 集群,否則就不具備原子性保障。7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

從性能上看,Kafka 事務的性能開銷主要體現在生產側:7c428資訊網——每日最新資訊28at.com

開啟事務時需要額外的 RPC 請求定位 TransactionCoordinator 并初始化數據7c428資訊網——每日最新資訊28at.com

消息發(fā)送需要在發(fā)送消息前向 TransactionCoordinator 同步請求添加分區(qū),并將事務狀態(tài)的變化寫入到 __transaction_state Topic7c428資訊網——每日最新資訊28at.com

事務提交或回滾時需要向所有參與事務的 Broker 發(fā)送請求7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

對于涉及分區(qū)較少且消息數量較多的事務,事務的開銷可以被均攤;反之,較多的同步 RPC 帶來的開銷會極大影響性能。并且每個生產者只能有一個事務在進行中,這就意味著事務的吞吐量會受到限制。7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

消費側也有一定的影響:消費者只能看到 LSO 以下的消息,并且需要額外的索引文件來過濾已回滾的消息,這無疑會增加端到端的延遲。7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

8、總結   

7c428資訊網——每日最新資訊28at.com

通過本文的深入分析,我們了解到 Kafka 的事務消息功能是如何在流處理場景中提供 Exactly-Once 語義的。Kafka 通過其事務 API 和內部機制,實現了消息發(fā)送的原子性、最終一致性、隔離性和持久性,盡管在實際應用中可能存在一些性能和功能上的限制。開發(fā)者和架構師應當充分理解這些概念,并在設計系統(tǒng)時考慮如何有效地利用 Kafka 的事務功能,以構建更加健壯和可靠的數據處理流程。7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

AutoMQ 是構建于對象存儲之上的云原生 Kafka fork,在解決了 Kafka 已有的成本和彈性問題基礎上對 Kafka 100%兼容,因此在 AutoMQ 上也可以使用 Kafka 事務消息。AutoMQ 作為國內 Kafka 生態(tài)的忠實擁護者,我們將持續(xù)為 Kafka 技術愛好者帶來優(yōu)質的 Kafka 技術內容分享,歡迎關注我們。7c428資訊網——每日最新資訊28at.com

7c428資訊網——每日最新資訊28at.com

本文鏈接:http://www.tebozhan.com/showinfo-26-86061-0.html原理剖析| Kafka Exactly Once 語義實現原理:冪等性與事務消息

聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: 阿里面試:RabbitMQ如何實現延遲隊列?

下一篇: 代碼質量一塌糊涂,特么離職了,新來的人都不知道從哪里,今天分享高質量命名方法論給你

標簽:
  • 熱門焦點
Top