AVt天堂网 手机版,亚洲va久久久噜噜噜久久4399,天天综合亚洲色在线精品,亚洲一级Av无码毛片久久精品

當前位置:首頁 > 科技  > 軟件

SpringBoot整合Flink CDC,實時追蹤數據變動,無縫同步至Redis

來源: 責編: 時間:2024-04-09 17:23:02 174觀看
導讀環境:SpringBoot2.7.16 + Flink 1.19.0 + JDK211. 簡介Flink CDC(Flink Change Data Capture)是基于數據庫的日志CDC技術,實現了全增量一體化讀取的數據集成框架。它搭配Flink計算框架,能夠高效實現海量數據的實時集成。Fl

環境:SpringBoot2.7.16 + Flink 1.19.0 + JDK219Q928資訊網——每日最新資訊28at.com

1. 簡介

Flink CDC(Flink Change Data Capture)是基于數據庫的日志CDC技術,實現了全增量一體化讀取的數據集成框架。它搭配Flink計算框架,能夠高效實現海量數據的實時集成。Flink CDC的核心功能在于實時地監視數據庫或數據流中發生的數據變動,并將這些變動抽取出來,以便進一步的處理和分析。通過使用Flink CDC,用戶可以輕松地構建實時數據管道,對數據變動進行實時響應和處理,為實時分析、實時報表和實時決策等場景提供強大的支持。9Q928資訊網——每日最新資訊28at.com

具體來說,Flink CDC的應用場景包括但不限于實時數據倉庫更新、實時數據同步和遷移、實時數據處理等。它還可以確保數據一致性,并在數據發生變更時能夠準確地捕獲和處理。此外,Flink CDC支持與多種數據源進行集成,如MySQL、PostgreSQL、Oracle等,并提供了相應的連接器,方便數據的捕獲和處理。9Q928資訊網——每日最新資訊28at.com

接下來將詳細的介紹關于MySQL CDC的使用。MySQL CDC 連接器允許從 MySQL 數據庫讀取快照數據和增量數據。9Q928資訊網——每日最新資訊28at.com

支持的數據庫9Q928資訊網——每日最新資訊28at.com

Connector9Q928資訊網——每日最新資訊28at.com

Database9Q928資訊網——每日最新資訊28at.com

Driver9Q928資訊網——每日最新資訊28at.com

mysql-cdc9Q928資訊網——每日最新資訊28at.com

  • MySQL:5.6,5.7,8.0.x
  • RDS MYSQL: 5.6,5.7,8.0.x
  • PolarDB MySQL: 5.6,5.7,8.0.x
  • Aurora MySQL 5.6,5.7,8.0.x
  • MariaDB: 10.x
  • PolarDB X: 2.0.1

JDBC Driver 8.0.279Q928資訊網——每日最新資訊28at.com

2. 實戰案例

2.1 MySQL開啟Binlog

在MySQL的配置文件中(如Linux的/etc/my.cnf或Windows的/my.ini),需要在[mysqld]部分設置相關參數以開啟binlog功能,如下:9Q928資訊網——每日最新資訊28at.com

[mysqld]server-id=1# 格式,行級格式binlog-format=Row# binlog 日志文件的前綴log-bin=mysql-bin# 指定哪些數據庫需要記錄二進制日志binlog_do_db=testjpa

除了開啟binlog功能外,Flink CDC還需要其他配置和權限來確保能夠正常連接到MySQL并讀取數據。例如,需要授予Flink CDC連接MySQL的用戶必要的權限,包括SELECT、REPLICATION SLAVE、REPLICATION CLIENT、SHOW VIEW等。這些權限是Flink CDC讀取數據和元數據所必需的。9Q928資訊網——每日最新資訊28at.com

查看是否開啟了binlog功能

mysql> SHOW VARIABLES LIKE 'log_bin';+---------------+-------+| Variable_name | Value |+---------------+-------+| log_bin       | ON    |+---------------+-------+

以上就對mysql相關的配置完成了。9Q928資訊網——每日最新資訊28at.com

2.2 依賴管理

<properties>  <flink.version>1.19.0</flink.version></properties><dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-connector-base</artifactId>  <version>${flink.version}</version></dependency><dependency>  <groupId>com.ververica</groupId>  <artifactId>flink-sql-connector-mysql-cdc</artifactId>  <version>3.0.1</version></dependency><dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-streaming-java</artifactId>  <version>${flink.version}</version></dependency><dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-clients</artifactId>  <version>${flink.version}</version></dependency><dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-table-runtime</artifactId>  <version>${flink.version}</version></dependency>

2.3 代碼實現

@Componentpublic class MonitorMySQLCDC implements InitializingBean {  // 該隊列專門用來臨時保存變化的數據(實際生產環境,你應該使用MQ相關的產品)  public static final LinkedBlockingQueue<Map<String, Object>> queue = new LinkedBlockingQueue<>() ;    private final StringRedisTemplate stringRedisTemplate ;  // 保存到redis中key的前綴  private final String PREFIX = "users:" ;  // 數據發生變化后的sink處理  private final CustomSink customSink ;  public MonitorMySQLCDC(CustomSink customSink, StringRedisTemplate stringRedisTemplate) {    this.customSink = customSink ;    this.stringRedisTemplate = stringRedisTemplate ;  }    @Override  public void afterPropertiesSet() throws Exception {    // 啟動異步線程,實時處理隊列中的數據    new Thread(() -> {      while(true) {        try {          Map<String, Object> result = queue.take();          this.doAction(result) ;        } catch (Exception e) {          e.printStackTrace();        }      }    }).start() ;    Properties jdbcProperties = new Properties() ;    jdbcProperties.setProperty("useSSL", "false") ;    MySqlSource<String> source = MySqlSource.<String>builder()        .hostname("127.0.0.1")        .port(3306)        // 可配置多個數據庫        .databaseList("testjpa")        // 可配置多個表        .tableList("testjpa.users")        .username("root")        .password("123123")        .jdbcProperties(jdbcProperties)        // 包括schema的改變        .includeSchemaChanges(true)        // 反序列化設置        // .deserializer(new StringDebeziumDeserializationSchema())        .deserializer(new JsonDebeziumDeserializationSchema(true))        // 啟動模式;關于啟動模式下面詳細介紹        .startupOptions(StartupOptions.initial())        .build() ;    // 環境配置    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() ;    // 設置 6s 的 checkpoint 間隔    env.enableCheckpointing(6000) ;    // 設置 source 節點的并行度為 4    env.setParallelism(4) ;    env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL")        // 添加Sink        .addSink(this.customSink) ;    env.execute() ;  }    @SuppressWarnings("unchecked")  private void doAction(Map<String, Object> result) throws Exception {    Map<String, Object> payload = (Map<String, Object>) result.get("payload") ;    String op = (String) payload.get("op") ;    switch (op) {      // 更新和插入操作      case "u", "c" -> {        Map<String, Object> after = (Map<String, Object>) payload.get("after") ;        String id = after.get("id").toString();        System.out.printf("操作:%s, ID: %s%n", op, id) ;        stringRedisTemplate.opsForValue().set(PREFIX + id, new ObjectMapper().writeValueAsString(after)) ;      }      // 刪除操作      case "d" -> {        Map<String, Object> after = (Map<String, Object>) payload.get("before") ;        String id = after.get("id").toString();        stringRedisTemplate.delete(PREFIX + id) ;      }     }  }  }

啟動模式:9Q928資訊網——每日最新資訊28at.com

  • initial (默認):在第一次啟動時對受監視的數據庫表執行初始快照,并繼續讀取最新的 binlog。
  • earliest-offset:跳過快照階段,從可讀取的最早 binlog 位點開始讀取
  • latest-offset:首次啟動時,從不對受監視的數據庫表執行快照, 連接器僅從 binlog 的結尾處開始讀取,這意味著連接器只能讀取在連接器啟動之后的數據更改。
  • specific-offset:跳過快照階段,從指定的 binlog 位點開始讀取。位點可通過 binlog 文件名和位置指定,或者在 GTID 在集群上啟用時通過 GTID 集合指定。
  • timestamp:跳過快照階段,從指定的時間戳開始讀取 binlog 事件。

數據處理Sink

@Componentpublic class CustomSink extends RichSinkFunction<String> {  private ObjectMapper mapper = new ObjectMapper();  @Override  public void invoke(String value, Context context) throws Exception {    System.out.printf("數據發生變化: %s%n", value);    TypeReference<Map<String, Object>> valueType = new TypeReference<Map<String, Object>>() {    };    Map<String, Object> result = mapper.readValue(value, valueType);    Map<String, Object> payload = (Map<String, Object>) result.get("payload");    String op = (String) payload.get("op") ;    // 不對讀操作處理    if (!"r".equals(op)) {      MonitorMySQLCDC.queue.put(result);    }  }}

以上就是實現通過FlinkCDC實時通過數據到Redis的所有代碼。9Q928資訊網——每日最新資訊28at.com

2.4 Web監控頁面

引入flink web依賴9Q928資訊網——每日最新資訊28at.com

<dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-runtime-web</artifactId>  <version>${flink.version}</version></dependency>

環境配置9Q928資訊網——每日最新資訊28at.com

Configuration config = new Configuration() ;config.set(RestOptions.PORT, 9090) ;StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config) ;

web監聽9090端口。9Q928資訊網——每日最新資訊28at.com

圖片圖片9Q928資訊網——每日最新資訊28at.com

通過web控制臺你可以管理查看到更多的信息。9Q928資訊網——每日最新資訊28at.com

本文鏈接:http://www.tebozhan.com/showinfo-26-82367-0.htmlSpringBoot整合Flink CDC,實時追蹤數據變動,無縫同步至Redis

聲明:本網頁內容旨在傳播知識,若有侵權等問題請及時與本網聯系,我們將在第一時間刪除處理。郵件:2376512515@qq.com

上一篇: 圖解 CSS Grid 布局,一起來看看 CSS Grid 布局是如何使用的

下一篇: 架構見解:使用Instagram示例設計高效的多層緩存

標簽:
  • 熱門焦點
  • 一加Ace2 Pro官宣:普及16G內存 引領24G

    一加官方今天繼續為本月發布的新機一加Ace2 Pro帶來預熱,公布了內存方面的信息。“淘汰 8GB ,12GB 起步,16GB 普及,24GB 引領,還有呢?#一加Ace2Pro#,2023 年 8 月,敬請期待。”同時
  • 女孩租房開2小時空調用完100元電費引熱議:5級能耗惹不起 月薪過萬電費也交不起

    近日,江蘇蘇州一女孩租房當天充值了100元電費,開著空調不到2小時發現電費已用完。對于為什么這個快,房東表示,電表壞了這種情況很多,之前也遇到過,給租客換
  • Automa-通過連接塊來自動化你的瀏覽器

    1、前言通過瀏覽器插件可實現自動化腳本的錄制與編寫,具有代表性的工具就是:Selenium IDE、Katalon Recorder,對于簡單的業務來說可快速實現自動化的上手工作。Selenium IDEKat
  • 十個簡單但很有用的Python裝飾器

    裝飾器(Decorators)是Python中一種強大而靈活的功能,用于修改或增強函數或類的行為。裝飾器本質上是一個函數,它接受另一個函數或類作為參數,并返回一個新的函數或類。它們通常用
  • 三言兩語說透柯里化和反柯里化

    JavaScript中的柯里化(Currying)和反柯里化(Uncurrying)是兩種很有用的技術,可以幫助我們寫出更加優雅、泛用的函數。本文將首先介紹柯里化和反柯里化的概念、實現原理和應用
  • 只需五步,使用start.spring.io快速入門Spring編程

    步驟1打開https://start.spring.io/,按照屏幕截圖中的內容創建項目,添加 Spring Web 依賴項,并單擊“生成”按鈕下載 .zip 文件,為下一步做準備。請在進入步驟2之前進行解壓。圖
  • 一文掌握 Golang 模糊測試(Fuzz Testing)

    模糊測試(Fuzz Testing)模糊測試(Fuzz Testing)是通過向目標系統提供非預期的輸入并監視異常結果來發現軟件漏洞的方法。可以用來發現應用程序、操作系統和網絡協議等中的漏洞或
  • 每天一道面試題-CPU偽共享

    前言:了不起:又到了每天一到面試題的時候了!學弟,最近學習的怎么樣啊 了不起學弟:最近學習的還不錯,每天都在學習,每天都在進步! 了不起:那你最近學習的什么呢? 了不起學弟:最近在學習C
  • 小米MIX Fold 3配置細節曝光:搭載領先版驍龍8 Gen2+罕見5倍長焦

    這段時間以來,包括三星、一加、榮耀等等有不少品牌旗下的最新折疊屏旗艦都得到了不少爆料,而小米新一代折疊屏旗艦——小米MIX Fold 3此前也屢屢被傳
Top