在現代分布式系統中,確保數據處理的準確性和一致性是至關重要的。Apache Kafka,作為一個廣泛使用的流處理平臺,提供了強大的消息隊列和流處理功能。隨著業務需求的增長,Kafka 的事務消息功能應運而生,它允許應" />
在現代分布式系統中,確保數據處理的準確性和一致性是至關重要的。Apache Kafka,作為一個廣泛使用的流處理平臺,提供了強大的消息隊列和流處理功能。隨著業務需求的增長,Kafka 的事務消息功能應運而生,它允許應用程序以一種原子的方式處理消息,即要么所有消息都被正確處理,要么都不處理。本文將深入剖析 Kafka 的 Exactly-Once 語義實現原理,包括冪等性與事務消息的關鍵概念,以及它們是如何在 Kafka 中實現的。我們將探討 Kafka 事務的流程,事務提供的 ACID 保證,以及在實際應用中可能遇到的一些限制。無論您是 Kafka 的新手還是經驗豐富的開發者,本文都將為您提供有價值的見解和指導。
Kafka 目前用于流處理的場景:相當于一個有向無環圖(DAG,Directed acyclic graph)每個節點是一個 Kafka Topic,每條邊是一個流處理操作。在這樣的場景下,有兩種操作:
? 消費上游消息并提交位點
? 處理消息并發送到下游 Topic
對于由這兩種操作構成的一組處理流程需要具備事務語義,這樣我們就可以不重復(Exactly Once)的處理上游消息并將結果可靠地存儲在下游 Topic 中。
圖片
上圖是一個典型的 Kafka 事務的流程,我們可以看到:MySQL 的 binlog 作為上游數據源將數據寫入到 Kafka 中,Spark Streaming 從 Kafka 中讀取數據并進行處理,最后將處理結果寫入到另外兩個 Topic 中(圖中三個 Topic 位于同一集群中)。其中消費 Topic A 與寫入 Topic B 和 Topic C 的操作具備事務語義。
從上述的場景中我們可以發現,事務消息最主要的動機是在流處理中實現 Exactly Once 的語義,這可以分為:
? 僅發送一次: 單分區僅發送一次由生產者冪等保證,多分區僅發送一次由事務機制保證
? 僅消費一次: Kafka 通過消費位點的提交來控制消費進度,而消費位點的提交被抽象成向系統 topic 發送消息。這就使得發送和消費行為統一起來,只要解決了多分區發送消息的一致性就能實現 Exactly Once 語義
在創建 Kafka 生產者時設置了 enable.idempotence 參數,用于開啟生產者冪等性。
val props = new Properties()props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")val producer = new KafkaProducer(props)
Kafka 的發送冪等是通過序列號來實現的,每個消息都會被分配一個序列號,序列號是遞增的,這樣就可以保證消息的順序性。當生產者發送消息時,會將消息的序列號和消息內容一起寫入到日志文件中,下次收到非預期序列號的消息就會返回 OutOfOrderSequenceException 異常。
設置 enable.idempotence 參數后,生產者會檢查以下三個參數的值是否合法(ProducerConfig#postProcessAndValidateIdempotenceConfigs)
? max.in.flight.requests.per.connection 必須小于 5
? retries 必須大于 0
? acks 必須設置為 all
Kafka 將消息的序列號信息保存在分區維度的 .snapshot 文件中,格式如下(ProducerStateManager#ProducerSnapshotEntrySchema):
圖片
我們可以發現,該文件中保存了 ProducerId、ProducerEpoch 和 LastSequence。所以冪等的約束為:相同分區、相同 Producer(id 和 epoch) 發送的消息序列號需遞增。即 Kafka 的生產者冪等性只在單連接、單分區生效,Producer 重啟或消息發送到其他分區就失去了冪等性的約束。
.snapshot 文件在 log segment 滾動時更新,發生重啟后通過讀取 .snapshot 文件和最新的日志文件即可恢復 Producer 的狀態。Broker 的重啟或分區遷移并不會影響冪等性。
我們首先從 Demo 開始,來看一下如何使用 Kafka 客戶端完成一個事務:
// 事務初始化val props = new Properties()...props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId)props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")val producer = new KafkaProducer(props)producer.initTransactions()producer.beginTransaction()// 消息發送producer.send(RecordUtils.create(topic1, partition1, "message1"))producer.send(RecordUtils.create(topic2, partition2, "message2"))// 事務提交或回滾producer.commitTransaction()
Kafka Producer 啟動后我們使用兩個 API 來初始化事務:initTransactions 和 beginTransaction。
回顧一下我們的 Demo,在發送消息時是發送到兩個不同分區中,這兩個分區可能在不同的 Broker 上,所以我們需要一個全局的協調者 TransactionCoordinator 來記錄事務的狀態。
所以,在 initTransactions 中,Producer 首先發送 ApiKeys.FIND_COORDINATOR 請求獲取 TransactionCoordinator。
之后即可向其發送 ApiKeys.INIT_PRODUCER_ID 請求獲取 ProducerId 及 ProducerEpoch(也是上文中用于冪等的字段)。此步驟生成的 id 和 epoch 會寫入內部 Topic __transaction_state 中,并且將事務的狀態置為 Empty。
__transaction_state 是 compaction Topic,其中消息的 key 為客戶端設置的transactional.id(詳見 TransactionStateManager#appendTransactionToLog)。
區別于 ProducerId 是服務端生成的內部屬性;TransactionId 由用戶設置,用于標識業務視角認為的“同一個應用”,啟動具有相同 TransactionId 的新 Producer 會使得未完成的事務被回滾并且來自舊 Producer(具有較小 epoch)的請求被拒絕掉。
后續 beginTransaction 用于開始一個事務,該方法會創建一個 Producer 內部事務狀態,標識這一個事務的開始,并不會有 RPC 產生。
上一節說到 beginTransaction 只是更改 Producer 內部狀態,那么在第一條消息發送時才隱式開啟了事務:
首先,Producer 會發送 ApiKeys.ADD_PARTITIONS_TO_TXN 請求到 TransactionCoordinator。TransactionCoordinator 會將這個分區加入到事務中,并更改事務的狀態為 Ongoing,這些信息被持久化到 __transaction_state 中。
然后 Producer 使用 ApiKeys.PRODUCE 請求正常發送消息到對應的分區中。這條消息的可見性控制在下文消息消費一節中會詳細討論。
當所有消息發送完成后,Producer 可以選擇提交或回滾事務,此時:
? TransactionCoordinator:具有當前事務所有相關分區的信息
? 其他 Broker:已經將消息持久化到日志文件中
接下來 Producer 調用 commitTransaction 會發送 ApiKeys.END_TXN 請求將事務狀態更改為 PrepareCommit(回滾事務對應狀態 PrepareAbort)并持久化到 __transaction_state 中,此時從 Producer 的視角來看整個事務已經結束了。
TransactionCoordinator 會異步向各個 Broker 發送 ApiKeys.WRITE_TXN_MARKERS 請求,當所有參加事務的 Broker 都返回成功后,TransactionCoordinator 會將事務狀態更改為 CompleteCommit(回滾事務對應狀態 CompleteAbort)并持久化到 __transaction_state 中。
某個分區的消息可能是事務消息與非事務消息混雜的,如下圖所示:
圖片
在 Broker 處理 ApiKeys.PRODUCE 請求時,完成消息持久化會更新 LSO 到第一條未提交的事務消息的 offset。這樣在消費者消費消息時,可以通過 LSO 來判斷消息是否可見:如果設置了 isolation.level 為 read_committed 則只會消費 LSO 之前的消息。
LSO(log stable offset): 它表示的是已經被成功復制到所有副本(replicas)并且可以被消費者安全消費的消息的最大偏移量。
但是我們可以發現 LSO 之前存在已回滾的消息(圖中紅色矩形)這些消息應該被過濾掉:在 Broker 處理 ApiKeys.WRITE_TXN_MARKERS 請求時,會將已回滾的消息索引寫入到 .txnindex 文件中(LogSegmentKafka#updateTxnIndex)。
后續 Consumer 消費消息時還會收到對應區間的已取消事務消息列表,上圖區間中的該列表為:
圖片
代表 offset 在 [2,5] 之間且由 id 為 11 的 Producer 發送的消息都已回滾。
上文我們討論了 __transaction_state 的實現確保同一時間,同一 TransactionId 有且只有一個事務在進行中。所以可以使用 ProducerId 和 offset 區間定位回滾的消息不會發生沖突。
? 原子性(Atomicity)
Kafka 通過對 __transaction_state Topic 的寫入實現了事務狀態的轉移,保證了事務要么同時提交,要么同時回滾。
? 一致性(Consistency)
在事務進入 PrepareCommit 或 PrepareAbort 階段時, TransactionCoordinator 異步向所有參與事務的 Broker 提交或回滾事務。這使得 Kafka 的事務做不到強一致性,只能通過不斷重試保證最終一致性。
? 隔離性(Isolation)
Kafka 通過 LSO 機制和 .txnindex 文件來避免臟讀,實現讀已提交(Read Committed)的隔離級別。
? 持久性(Durability)
Kafka 通過將事務狀態寫入到 __transaction_state Topic 和消息寫入到日志文件中來保證持久性。
從功能上看,Kafka 事務并不能支持業務方事務,強限制上游的消費和下游寫入都需要是同一個 Kafka 集群,否則就不具備原子性保障。
從性能上看,Kafka 事務的性能開銷主要體現在生產側:
開啟事務時需要額外的 RPC 請求定位 TransactionCoordinator 并初始化數據
消息發送需要在發送消息前向 TransactionCoordinator 同步請求添加分區,并將事務狀態的變化寫入到 __transaction_state Topic
事務提交或回滾時需要向所有參與事務的 Broker 發送請求
對于涉及分區較少且消息數量較多的事務,事務的開銷可以被均攤;反之,較多的同步 RPC 帶來的開銷會極大影響性能。并且每個生產者只能有一個事務在進行中,這就意味著事務的吞吐量會受到限制。
消費側也有一定的影響:消費者只能看到 LSO 以下的消息,并且需要額外的索引文件來過濾已回滾的消息,這無疑會增加端到端的延遲。
通過本文的深入分析,我們了解到 Kafka 的事務消息功能是如何在流處理場景中提供 Exactly-Once 語義的。Kafka 通過其事務 API 和內部機制,實現了消息發送的原子性、最終一致性、隔離性和持久性,盡管在實際應用中可能存在一些性能和功能上的限制。開發者和架構師應當充分理解這些概念,并在設計系統時考慮如何有效地利用 Kafka 的事務功能,以構建更加健壯和可靠的數據處理流程。
AutoMQ 是構建于對象存儲之上的云原生 Kafka fork,在解決了 Kafka 已有的成本和彈性問題基礎上對 Kafka 100%兼容,因此在 AutoMQ 上也可以使用 Kafka 事務消息。AutoMQ 作為國內 Kafka 生態的忠實擁護者,我們將持續為 Kafka 技術愛好者帶來優質的 Kafka 技術內容分享,歡迎關注我們。
本文鏈接:http://www.tebozhan.com/showinfo-26-86061-0.html原理剖析| Kafka Exactly Once 語義實現原理:冪等性與事務消息
聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com