AVt天堂网 手机版,亚洲va久久久噜噜噜久久4399,天天综合亚洲色在线精品,亚洲一级Av无码毛片久久精品

當前位置:首頁 > 科技  > 軟件

火山引擎 ByteHouse:ClickHouse 如何保證海量數據一致性

來源: 責編: 時間:2023-09-18 21:41:04 310觀看
導讀背景ClickHouse是一個開源的OLAP引擎,不僅被全球開發者廣泛使用,在字節各個應用場景中也可以看到它的身影。基于高性能、分布式特點,ClickHouse可以滿足大規模數據的分析和查詢需求,因此字節研發團隊以開源ClickHouse為基

FCj28資訊網——每日最新資訊28at.com

背景

ClickHouse是一個開源的OLAP引擎,不僅被全球開發者廣泛使用,在字節各個應用場景中也可以看到它的身影。基于高性能、分布式特點,ClickHouse可以滿足大規模數據的分析和查詢需求,因此字節研發團隊以開源ClickHouse為基礎,推出火山引擎云原生數據倉庫ByteHouse。FCj28資訊網——每日最新資訊28at.com

在日常工作中,研發人員經常會遇到業務鏈路過長,導致流程穩定性和數據一致性難保障的問題,這在分布式、跨服務的場景中更為明顯。本篇文章提出針對這一問題的解決思路:在火山引擎ByteHouse中構建輕量級流程引擎,來解決數據一致性問題。FCj28資訊網——每日最新資訊28at.com

使用輕量級流程引擎可以幫我們使用統一的標準來解決復雜業務鏈路的編排問題,不僅提高業務代碼的可讀性和復用性,還能更專注業務核心邏輯的開發,讓整體流程更加標準化、規范化。FCj28資訊網——每日最新資訊28at.com

總結來說,使用流程引擎有以下優勢:FCj28資訊網——每日最新資訊28at.com

  • 輕量級,接入方便,內存操作,性能有保障
  • 易維護,流程配置與業務分離,支持熱更新
  • 易擴展,豐富的執行策略及算子支持

大體思路

圖片圖片FCj28資訊網——每日最新資訊28at.com

上圖為ByteHouse企業版管理平臺功能架構圖。從該功能架構圖可以看出,ByteHouse核心能力都是依賴ClickHouse集群,對于集群節點多、數據計算量大的業務場景,容易出現節點狀態不一致的問題,因此保證ClickHouse集群間的狀態一致性是我們的核心訴求。FCj28資訊網——每日最新資訊28at.com

圖片圖片FCj28資訊網——每日最新資訊28at.com

為了保證數據一致性,ByteHouse提供了以下能力:FCj28資訊網——每日最新資訊28at.com

  1. event engine: 事件處理中心
  2. workflow engine:輕量級流程引擎
  3. 對賬系統

保障數據一致性最簡單的方式是通過狀態機來監聽流程執行過程:FCj28資訊網——每日最新資訊28at.com

  • 首先,將所有的任務請求下發到event engine,由event engine將任務分發對應的handler執行,統一管理所有下發任務的生命周期,并提供異步重試、回滾補償等功能。流量匯總到event engine以后,會讓服務后續的業務擴展更加便捷。
  • 其次,對于比較復雜的任務請求,我們可以下發到workflow engine執行,由workflow生成實例,并編排任務隊列,管理流程執行實例的生命周期,統一失敗回滾,失敗重試。
  • 最后,對于服務不可用等特殊場景產生的臟數據,由對賬服務兜底。

圖片圖片FCj28資訊網——每日最新資訊28at.com

架構設計

在流程監控的架構設計中,主要包含以下:FCj28資訊網——每日最新資訊28at.com

  • 流程管理層:主要負責流程配置的解析初始化,并完成編排策略的工作
  • 策略behavior層:編排執行節點,并下發執行任務到執行器
  • 執行器:管理執行節點執行
  • 執行節點:負責業務具體實現

圖片圖片FCj28資訊網——每日最新資訊28at.com

實現方案

執行節點

圖片圖片FCj28資訊網——每日最新資訊28at.com

流程引擎的核心為“責任鏈”,按照責任鏈上的節點順序依次執行所有任務,所以我們需要的三個基本單元分別為:FCj28資訊網——每日最新資訊28at.com

  • request:入參
  • processlist:流程執行節點list
  • response:出參

在研發工作中,我們時常會遇到以下問題:FCj28資訊網——每日最新資訊28at.com

  • 如果同時出現了一個問題,node1、node2、node3之間的數據交互如何實現?
  • 如果node1入參、出參與node2,node3不一樣該如何處理?
  • 參數類型不同的node又該如何統一調度?

最簡單的處理辦法,是讓node使用相同的上下文信息,將整個執行node模版化。我們讓所有的執行節點node實現相同的接口Delegation,統一使用相同的上下文executionContext作為執行方法的入參。FCj28資訊網——每日最新資訊28at.com

對于流程中的request和response,我們可以放入executionContext中,讓每個執行節點都可以通過上下文操作response。FCj28資訊網——每日最新資訊28at.com

// Delegation -type Delegation interface {   Execute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppError   TryExecute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppError   ConfirmExecute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppError   CancelExecute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppError   Code() string   Type() value.DelegationType}

執行策略

如果確定好了最小的執行節點,我們需要考慮到,業務場景并不會永遠順序執行node,再返回結果,流程執行過程中跳轉、循環、并發執行都是比較常見的操作。考慮不同業務場景復用性,我們在執行節點之上加了一層執行策略,用策略behaivor來重新編排觸發執行節點的任務。FCj28資訊網——每日最新資訊28at.com

  • 下圖將流程分成了behavior1和behavior2,分別對應不同的策略。
  • 簡單的策略舉例:按順序執行、并發執行、循環執行、條件跳轉執行等。
  • 我們可以根據自身業務實際需要定制,后續會有實例介紹。

圖片圖片FCj28資訊網——每日最新資訊28at.com

// ActivityBehavior -type ActivityBehavior interface {   Enter(ctx context.Context, executionContext ExecutionContextInterface, pvmActivity PvmActivity) apperror.AppError   Execute(ctx context.Context, executionContext ExecutionContextInterface, pvmActivity PvmActivity) apperror.AppError   Leave(ctx context.Context, executionContext ExecutionContextInterface, pvmActivity PvmActivity) apperror.AppError   Code() value.ActivityBehaviorCode}

策略behavior提供有Enter,Execute,Leave三個接口,Enter負責生成執行節點任務instance,Execute負責編排并觸發執行任務instance操作,Leave負責跳轉到下一個behavior。FCj28資訊網——每日最新資訊28at.com

可以看出來策略behaivor的跳轉方式類似于鏈表,不斷執行next方法,所以編碼過程中需要注意不要出現死循環,小心stackoverflow。FCj28資訊網——每日最新資訊28at.com

Executor

執行器Executor的主要作用是串聯執行策略和執行節點,策略behavior將執行的命令下發給Executor,由Executor對執行節點的觸發操作。這里會根據執行節點的type,映射到三種執行節點的執行方式,包含tcc,執行一次,重試多次。FCj28資訊網——每日最新資訊28at.com

// DelegationExecutor -type DelegationExecutor interface {   execute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppError   postExecute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppError}func (de *DefaultDelegationExecutor) execute(ctx context.Context, executionContext ExecutionContextInterface) apperror.AppError {   delegationCode := executionContext.GetExecutionInstance().GetDelegationCode()   if len(delegationCode) == 0 || de.DelegationMap[delegationCode] == nil {      logger.Info(ctx, "DefaultDelegationExecutor delegation code not found,use default delegation", zap.String("delegationCode", delegationCode))      delegationCode = string(value.DefaultDelegation)      executionContext.GetExecutionInstance().SetDelegationCode(delegationCode)   }   return de.dumpExecute(ctx, executionContext, delegationCode)}func (de *DefaultDelegationExecutor) dumpExecute(ctx context.Context, executionContext ExecutionContextInterface, delegationCode string) apperror.AppError {   FireEvent(ctx, executionContext, value.ExecutionStart)   var err apperror.AppError   delegation := de.DelegationMap[delegationCode]   switch delegation.Type() {   case value.TccDelegation:      err = tccExecute(ctx, executionContext, delegation)   case value.SingleDelegation:      err = singleExecute(ctx, executionContext, delegation)   case value.RetryDelegation:      err = retryExecute(ctx, executionContext, delegation)   }   if err != nil {      logger.Error(ctx, "delegation.Execute_err", zap.Error(err))      return apperror.Trace(err)   }   FireEvent(ctx, executionContext, value.ExecutionEnd)   return nil}

ExecutionContext

ExecutionContext上下文是用來記錄了流程執行的所有細節,包含以下:FCj28資訊網——每日最新資訊28at.com

  • ProcessEngineConfigurationInterface: 流程定義信息
  • ExecutionInstanceInterface: 執行節點實例
  • ActivityInstanceInterface: 執行策略實例
  • ProcessInstanceInterface: 流程實例
  • request:入參
  • response:返回值

為了保證整個流程執行的穩定性,這里除了response之外,所以其他的實例參數都不建議開放寫接口,response可以用來存儲流程實例執行過程中會產生的變量信息。FCj28資訊網——每日最新資訊28at.com

對于整個流程的定義ProcessEngineConfiguration,我們可以選擇最簡單的方式,即在數據庫里,將配置信息映射成json字符串。當然也可以選擇讀取配置文件,只要能滿足讀取方便,數據不丟即可。FCj28資訊網——每日最新資訊28at.com

// ExecutionContextInterface -type ExecutionContextInterface interface {   GetProcessEngineConfiguration() ProcessEngineConfigurationInterface   SetProcessEngineConfiguration(processEngineConfiguration ProcessEngineConfigurationInterface)   GetExecutionInstance() instance.ExecutionInstanceInterface   SetExecutionInstance(executionInstance instance.ExecutionInstanceInterface)   GetActivityInstance() instance.ActivityInstanceInterface   SetActivityInstance(activityInstance instance.ActivityInstanceInterface)   GetProcessInstance() instance.ProcessInstanceInterface   SetProcessInstance(processInstance instance.ProcessInstanceInterface)   SetNeedPause(needPause bool)   IsNeedPause() bool   SetActivityIndex(activityIndex int)   GetActivityIndex() int   SetActivityBehaviorCode(activityBehaviorCode value.ActivityBehaviorCode)   GetActivityBehaviorCode() value.ActivityBehaviorCode   SetBizUniqueKey(bizUniqueKey string)   GetBizUniqueKey() string   GetRequest() map[string]interface{}   SetRequest(request map[string]interface{})   GetResponse() map[string]string   SetResponse(response map[string]string)   AtomicAddResponse(key string, value string)}

Listener

監聽器的主要作用是用來監聽流程執行中的重要參數信息。從上述executor接口可以看到fireEvent,它的作用是發送消息event,讓listener監聽到對應的event類型,完成一些定制化的行為。FCj28資訊網——每日最新資訊28at.com

類似于面向切面編程,我們可以在執行節點的前后增加定制化的邏輯,如打日志、監聽節點執行時間,持久化流程中產生的response信息、增加鏈路追蹤等。FCj28資訊網——每日最新資訊28at.com

API

圖片圖片FCj28資訊網——每日最新資訊28at.com

最后,我們將上述的內容拼接串聯起來,主要提供三個接口:FCj28資訊網——每日最新資訊28at.com

  • Start: 啟動流程
  • Signal: 暫停或是異常退出后,繼續執行流程
  • Abort: 強制中斷流程
process start(){    //1.get and create ProcessEngineConfigurationInterface 解析流程定義    //2.create processInstance 創建流程實例    //3.create ExecutionContext 創建執行上下文        //4. lockstrategy trylock         //5. invoke process start     processinstance.start()    //6. persist processInstance and return    //7. lockstrategy unlock }processinstance start(){    // get behavior        // behavior enter    behavior.Enter(ctx, executionContext)    //behavior execute    behavior.Execute(ctx, executionContext)    //behavior leave    behavior.Leave(ctx, executionContext)}

相比于start,signal需要讀取執行的細節信息,找到之前失敗的執行節點位置,并加載到上下文中,再繼續執行。FCj28資訊網——每日最新資訊28at.com

對于失敗節點信息的持久化有兩種方式:第一,可以選擇在流程執行結束持久化;第二,可以通過listener在每個執行節點結束持久化。具體根據實際業務場景對于性能、數據一致性的要求做出抉擇。FCj28資訊網——每日最新資訊28at.com

并發場景考慮

  1. behavior策略中肯定會出現定制、并發、處理多個執行節點到場景的問題,如果同時修改必定會造成數據錯亂。簡單的方法推薦使用帶鎖的容器存儲,可以被修改的信息(response),此處使用的是github.com/bytedance/gopkg包里面封裝的skipmap。
  2. lockstrategy可以自己定義最適配業務場景的,最簡單的方案是redis鎖,同時也考慮到系統異常退出后的恢復問題。可以參考redis官網解決特殊情況下的鎖異常解決方案:https://redis.io/commands/setnx/

后續的工作

輕量級流程引擎的基本功能到此已經實現,后續的擴展優化可以圍繞以下方向進行:FCj28資訊網——每日最新資訊28at.com

  1. 界面化展示,可以將鏈路執行情況展示出來
  2. 策略behavior維度擴展,適配各種業務場景
  3. 增加子流程的維度,可以復用原先的執行邏輯

Demo示例

以下為簡單的processconfiguration的配置信息,此處使用DefaultBehavior,即同步順序執行策略。FCj28資訊網——每日最新資訊28at.com

{    "ProcessContentList":[        {            "Behavior":"DefaultBehavior",            "DelegationList":[                {                    "Code":"sample1"                },                {                    "Code":"sample2"                },                {                    "Code":"sample3"                }            ]        },        {            "Behavior":"DefaultBehavior",            "DelegationList":[                {                    "Code":"sample4"                },                {                    "Code":"sample5"                }            ]        }    ]}

圖片圖片FCj28資訊網——每日最新資訊28at.com

在listener里面加入日志,這樣可以追溯出整個流程的執行流程,以便更好的監控整個流程的運行狀態。FCj28資訊網——每日最新資訊28at.com

實際使用

以ClickHouse集群縮容為例:FCj28資訊網——每日最新資訊28at.com

圖片圖片FCj28資訊網——每日最新資訊28at.com

{    "ProcessContentList":[        // 查詢所有需要重分布的table        {            "Behavior":"DefaultBehavior",// 順序執行            "DelegationList":[                {                    "Code":"hor_reshard_table_loop"                 }            ]        },        // 遍歷所有table進行數據的重分布         {            "LoopKey":"reshard_table_loop_key",            "Behavior":"NonBlockLoopBehavior",// 非阻塞循環處理            "DelegationList":[                {                    "Code":"hor_reshard_table"                }            ]        },        // 進行刪除節點操作        {            "Behavior":"DefaultBehavior",            "DelegationList":[                {                    "Code":"hor_start_remove_node"                },                {                    "Code":"hor_prepare_node_vcloud",                    "PostCode":"hor_rollback_remove_node_vcloud"http:// 統一失敗回滾處理                },                {                    "Code":"hor_update_config_vcloud",                    "PostCode":"hor_rollback_remove_node_vcloud"                },                {                    "Code":"hor_set_cluster_running",                    "PostCode":"hor_rollback_remove_node_vcloud"                },                {                    "Code":"hor_release_node"                },                {                    "Code":"hor_callback_bill"                }            ]        }    ]}

總結

一個流程引擎適配所有的業務場景幾乎是不可能,除非接受復雜的方案設計,而第三方流程引擎對于日常的業務開發顯得太笨重。輕量級流程引擎則會簡化接入方式,減少了過多http請求帶來的性能損耗,更加靈活多變,追述問題也變得簡單。FCj28資訊網——每日最新資訊28at.com

在ByteHouse中加入流程引擎的能力,能以較小的代價給業務更多重試的可能性,而不需要反復回滾,特別對于耗時很長的任務,能帶來更好用戶使用體驗。除此之外,流程引擎還能將業務流程模版化,增加接口服務的復用性,使得業務代碼的可讀性、擴展性得到提升,方便后期維護。FCj28資訊網——每日最新資訊28at.com

火山引擎云原生數據倉庫ByteHouse是火山引擎旗下的一款云原生數據倉庫,為用戶提供極速分析體驗,能夠支撐實時數據分析和海量數據離線分析,同時還具備便捷的彈性擴縮容能力,極致分析性能和豐富的企業級特性,助力客戶數字化轉型。FCj28資訊網——每日最新資訊28at.com

本文鏈接:http://www.tebozhan.com/showinfo-26-10443-0.html火山引擎 ByteHouse:ClickHouse 如何保證海量數據一致性

聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: 火山引擎 DataWind 產品可視化能力揭秘

下一篇: Google 2023開發者大會簡單回顧 - Web 平臺新動向

標簽:
  • 熱門焦點
Top