丝袜人妻一区二区三区_少妇福利无码视频_亚洲理论片在线观看_一级毛片国产A级片

當(dāng)前位置:首頁 > 話題廣場(chǎng) > 攻略專題 > 網(wǎng)游攻略

678改動(dòng)日志專題之Flink 最佳實(shí)踐之使用 Canal 同步 MySQL 數(shù)據(jù)至 TiDB

一.背景介紹

本文介紹了將MySQL中的數(shù)據(jù)通過Binlog Canal導(dǎo)入Kafka,然后由Flink消耗的情況。

為了能夠快速的驗(yàn)證整套流程的功能性,所有的組件都以單機(jī)的形式部署。如果手上的物理資源不足,可以將本文中的所有組件一臺(tái) 4G 1U 的虛擬機(jī)環(huán)境中。

如果需要在生產(chǎn)環(huán)境中部署,建議將每一個(gè)組件替換成高可用的集群部署方案。

其中,我們單獨(dú)創(chuàng)建了一套 Zookeeper 單節(jié)點(diǎn)環(huán)境,F(xiàn)link、Kafka、Canal 等組件共用這個(gè) Zookeeper 環(huán)境。

針對(duì)于所有需要 Jre 的組件,如 Flink,Kafka,Canal,Zookeeper,考慮到升級(jí) JRE 可能會(huì)影響到其他的應(yīng)用,我們選擇每個(gè)組件獨(dú)立使用自己的 JRE 環(huán)境。

本文分為兩個(gè)部分,其中,前七小節(jié)主要介紹基礎(chǔ)環(huán)境的搭建,最后一個(gè)小節(jié)介紹了數(shù)據(jù)是如何在各個(gè)組件中流通的。

數(shù)據(jù)的流動(dòng)經(jīng)過以下組件:

  • MySQL 數(shù)據(jù)源生成 Binlog。
  • Canal 讀取 Binlog,生成 Canal json,推送到 Kafka 指定的 Topic 中。
  • Flink 使用 flink-sql-connector-kafka API,消費(fèi) Kafka Topic 中的數(shù)據(jù)。
  • Flink 在通過 flink-connector-jdbc,將數(shù)據(jù)寫入到 TiDB 中。

TiDB + Flink 的結(jié)構(gòu),支持開發(fā)與運(yùn)行多種不同種類的應(yīng)用程序。

目前主要的特性主要包括:

  • 批流一體化。
  • 精密的狀態(tài)管理。
  • 事件時(shí)間支持。
  • 精確的一次狀態(tài)一致性保障。

Flink 可以運(yùn)行在包括 YARN、Mesos、Kubernetes 在內(nèi)的多種資源管理框架上,還支持裸機(jī)集群上獨(dú)立部署。TiDB 可以部署 AWS、Kubernetes、GCP GKE 上,同時(shí)也支持使用 TiUP 在裸機(jī)集群上獨(dú)立部署。

TiDB + Flink 結(jié)構(gòu)常見的幾類應(yīng)用如下:

  • 事件驅(qū)動(dòng)型應(yīng)用:反欺詐。異常檢測(cè)。基于規(guī)則的報(bào)警。業(yè)務(wù)流程監(jiān)控。
  • 數(shù)據(jù)分析應(yīng)用:網(wǎng)絡(luò)質(zhì)量監(jiān)控。產(chǎn)品更新及試驗(yàn)評(píng)估分析。事實(shí)數(shù)據(jù)即席分析。大規(guī)模圖分析。
  • 數(shù)據(jù)管道應(yīng)用:電商實(shí)時(shí)查詢索引構(gòu)建。電商持續(xù) ETL。

二. 環(huán)境介紹

2.1 操作系統(tǒng)環(huán)境

[root@r20 topology]# cat /etc/redhat-release CentOS Stream release 8

2.2 軟件環(huán)境

Item

Version

Download link

TiDB

v4.0.9

1

Kafka

v2.7.0

Flink

v1.12.1

Jre

v1.8.0_281

Zookeeper

v3.6.2

flink-sql-connector-kafka

v1.12.1

flink-connector-jdbc

v1.12.0

MySQL

v8.0.23

Canal

v1.1.4

2.3 機(jī)器分配

Hostname

IP

Component

r21

192.168.12.21

TiDB Cluster

r22

192.168.12.22

Kafka

r23

192.168.12.23

Flink

r24

192.168.12.24

Zookeeper

r25

192.168.12.25

MySQL

r26

192.168.12.26

Canal

三. 部署 TiDB Cluster

與傳統(tǒng)的單機(jī)數(shù)據(jù)庫相比,TiDB 具有以下優(yōu)勢(shì):

  • 純分布式架構(gòu),擁有良好的擴(kuò)展性,支持彈性的擴(kuò)縮容。
  • 支持 SQL,對(duì)外暴露 MySQL 的網(wǎng)絡(luò)協(xié)議,并兼容大多數(shù) MySQL 的語法,在大多數(shù)場(chǎng)景下可以直接替換 MySQL。
  • 默認(rèn)支持高可用,在少數(shù)副本失效的情況下,數(shù)據(jù)庫本身能夠自動(dòng)進(jìn)行數(shù)據(jù)修復(fù)和故障轉(zhuǎn)移,對(duì)業(yè)務(wù)透明。
  • 支持 ACID 事務(wù),對(duì)于一些有強(qiáng)一致需求的場(chǎng)景友好,例如:銀行轉(zhuǎn)賬。
  • 具有豐富的工具鏈生態(tài),覆蓋數(shù)據(jù)遷移、同步、備份等多種場(chǎng)景。

在內(nèi)核設(shè)計(jì)上,TiDB 分布式數(shù)據(jù)庫將整體架構(gòu)拆分成了多個(gè)模塊,各模塊之間互相通信,組成完整的 TiDB 系統(tǒng)。對(duì)應(yīng)的架構(gòu)圖如下:

在本文中,我們只做最簡(jiǎn)單的功能測(cè)試,所以部署了一套單節(jié)點(diǎn)但副本的 TiDB,涉及到了以下的三個(gè)模塊:

  • TiDB Server:SQL 層,對(duì)外暴露 MySQL 協(xié)議的連接 endpoint,負(fù)責(zé)接受客戶端的連接,執(zhí)行 SQL 解析和優(yōu)化,最終生成分布式執(zhí)行計(jì)劃。
  • PD (Placement Driver) Server:整個(gè) TiDB 集群的元信息管理模塊,負(fù)責(zé)存儲(chǔ)每個(gè) TiKV 節(jié)點(diǎn)實(shí)時(shí)的數(shù)據(jù)分布情況和集群的整體拓?fù)浣Y(jié)構(gòu),提供 TiDB Dashboard 管控界面,并為分布式事務(wù)分配事務(wù) ID。
  • TiKV Server:負(fù)責(zé)存儲(chǔ)數(shù)據(jù),從外部看 TiKV 是一個(gè)分布式的提供事務(wù)的 Key-Value 存儲(chǔ)引擎。

3.1 TiUP 部署模板文件

# # Global variables are applied to all deployments and used as the default value of # # the deployments if a specific deployment value is missing. global: user: "tidb" SSH_port: 22 deploy_dir: "/opt/tidb-c1/" data_dir: "/opt/tidb-c1/data/" # # Monitored variables are applied to all the machines. #monitored: # node_exporter_port: 19100 # blackbox_exporter_port: 39115 # deploy_dir: "/opt/tidb-c3/monitored" # data_dir: "/opt/tidb-c3/data/monitored" # log_dir: "/opt/tidb-c3/log/monitored" # # Server configs are used to specify the runtime configuration of TiDB components. # # All configuration items can be found in TiDB docs: # # - TiDB: # # - TiKV: # # - PD: # # All configuration items use points to represent the hierarchy, e.g: # # read # # # # You can overwrite this configuration via the instance-level `config` field. server_configs: tidb: log.slow-threshold: 300 binlog.enable: false binlog.ignore-error: false : true tikv: : 4 ra: 2 ra: 2 rock: 1 : "16GB" read: 12 read: false read: true ra: 0 pd: : 4 : 2048 : 64 pd_servers: - host: 192.168.12.21 ssh_port: 22 name: "pd-2" client_port: 12379 peer_port: 12380 deploy_dir: "/opt/tidb-c1/pd-12379" data_dir: "/opt/tidb-c1/data/pd-12379" log_dir: "/opt/tidb-c1/log/pd-12379" numa_node: "0" # # The following configs are used to overwrite the `` values. config: : 20 : 200000 tidb_servers: - host: 192.168.12.21 ssh_port: 22 port: 14000 status_port: 12080 deploy_dir: "/opt/tidb-c1/tidb-14000" log_dir: "/opt/tidb-c1/log/tidb-14000" numa_node: "0" # # The following configs are used to overwrite the `` values. config: log.slow-query-file: : true tikv_servers: - host: 192.168.12.21 ssh_port: 22 port: 12160 status_port: 12180 deploy_dir: "/opt/tidb-c1/tikv-12160" data_dir: "/opt/tidb-c1/data/tikv-12160" log_dir: "/opt/tidb-c1/log/tikv-12160" numa_node: "0" # # The following configs are used to overwrite the `` values. config: : 4 #: { zone: "zone1", dc: "dc1", host: "host1" } #monitoring_servers: # - host: 192.168.12.21 # ssh_port: 22 # port: 19090 # deploy_dir: "/opt/tidb-c1/prometheus-19090" # data_dir: "/opt/tidb-c1/data/prometheus-19090" # log_dir: "/opt/tidb-c1/log/prometheus-19090" #grafana_servers: # - host: 192.168.12.21 # port: 13000 # deploy_dir: "/opt/tidb-c1/grafana-13000" #alertmanager_servers: # - host: 192.168.12.21 # ssh_port: 22 # web_port: 19093 # cluster_port: 19094 # deploy_dir: "/opt/tidb-c1/alertmanager-19093" # data_dir: "/opt/tidb-c1/data/alertmanager-19093" # log_dir: "/opt/tidb-c1/log/alertmanager-19093"

3.2 TiDB Cluster 環(huán)境

本文重點(diǎn)非部署 TiDB Cluster,作為快速實(shí)驗(yàn)環(huán)境,只在一臺(tái)機(jī)器上部署單副本的 TiDB Cluster 集群。不需要部署監(jiān)控環(huán)境。

[root@r20 topology]# tiup cluster display tidb-c1-v409 Starting component `cluster`: /root/.tiup/components/cluster display tidb-c1-v409 Cluster type: tidb Cluster name: tidb-c1-v409 Cluster version: v4.0.9 SSH type: builtin Dashboard URL: ID Role Host Ports OS/Arch Status Data Dir Deploy Dir -- ---- ---- ----- ------- ------ -------- ---------- 192.168.12.21:12379 pd 192.168.12.21 12379/12380 linux/x86_64 Up|L|UI /opt/tidb-c1/data/pd-12379 /opt/tidb-c1/pd-12379 192.168.12.21:14000 tidb 192.168.12.21 14000/12080 linux/x86_64 Up - /opt/tidb-c1/tidb-14000 192.168.12.21:12160 tikv 192.168.12.21 12160/12180 linux/x86_64 Up /opt/tidb-c1/data/tikv-12160 /opt/tidb-c1/tikv-12160 Total nodes: 4

創(chuàng)建用于測(cè)試的表

mysql> show create table t1; +-------+-------------------------------------------------------------------------------------------------------------------------------+ | Table | Create Table | +-------+-------------------------------------------------------------------------------------------------------------------------------+ | t1 | CREATE TABLE `t1` ( `id` int(11) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin | +-------+-------------------------------------------------------------------------------------------------------------------------------+ 1 row in set sec)

四. 部署 Zookeeper 環(huán)境

在本實(shí)驗(yàn)中單獨(dú)配置 Zookeeper 環(huán)境,為 Kafka 和 Flink 環(huán)境提供服務(wù)。

作為實(shí)驗(yàn)演示方案,只部署單機(jī)環(huán)境。

4.1 解壓 Zookeeper 包

[root@r24 soft]# tar vxzf a [root@r24 soft]# mv a /opt/zookeeper

4.2 部署用于 Zookeeper 的 jre

[root@r24 soft]# tar vxzf jre1.8.0_281. [root@r24 soft]# mv jre1.8.0_281 /opt/zookeeper/jre

修改 /opt/zookeeper/bin 文件,增加 JAVA_HOME 環(huán)境變量

## add bellowing env var in the head of zkEnv.sh JAVA_HOME=/opt/zookeeper/jre

4.3 創(chuàng)建 Zookeeper 的配置文件

[root@r24 conf]# cat zoo.cfg | grep -v "#" tickTime=2000 initLimit=10 syncLimit=5 dataDir=/opt/zookeeper/data clientPort=2181

4.4 啟動(dòng) Zookeeper

[root@r24 bin]# /opt/zookeeper/bin start

4.5 檢查 Zookeeper 的狀態(tài)

## check zk status [root@r24 bin]# . status ZooKeeper JMX enabled by default Using config: /opt/zookeeper/bin/../con Client port found: 2181. Client address: localhost. Client SSL: false. Mode: standalone ## check OS port status [root@r24 bin]# netstat -ntlp Active Internet connections (only servers) Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name tcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN 942/sshd tcp6 0 0 :::2181 :::* LISTEN 15062/java tcp6 0 0 :::8080 :::* LISTEN 15062/java tcp6 0 0 :::22 :::* LISTEN 942/sshd tcp6 0 0 :::44505 :::* LISTEN 15062/java ## use zkCli tool to check zk connection [root@r24 bin]# . -server 192.168.12.24:2181

4.6 關(guān)于 Zookeeper 的建議

我個(gè)人有一個(gè)關(guān)于 Zookeeper 的不成熟的小建議:

Zookeeper 集群版本一定要開啟網(wǎng)絡(luò)監(jiān)控。特別是要關(guān)注 system metrics 里面的 network bandwidth。

五. 部署 Kafka

Kafka 是一個(gè)分布式流處理平臺(tái),主要應(yīng)用于兩大類的應(yīng)用中:

  • 構(gòu)造實(shí)時(shí)流數(shù)據(jù)管道,它可以在系統(tǒng)或應(yīng)用之間可靠地獲取數(shù)據(jù)。 (相當(dāng)于message queue)
  • 構(gòu)建實(shí)時(shí)流式應(yīng)用程序,對(duì)這些流數(shù)據(jù)進(jìn)行轉(zhuǎn)換或者影響。 (就是流處理,通過kafka stream topic和topic之間內(nèi)部進(jìn)行變化)

Kafka 有四個(gè)核心的 API:

  • The Producer API 允許一個(gè)應(yīng)用程序發(fā)布一串流式的數(shù)據(jù)到一個(gè)或者多個(gè)Kafka topic。
  • The Consumer API 允許一個(gè)應(yīng)用程序訂閱一個(gè)或多個(gè) topic ,并且對(duì)發(fā)布給他們的流式數(shù)據(jù)進(jìn)行處理。
  • The Streams API 允許一個(gè)應(yīng)用程序作為一個(gè)流處理器,消費(fèi)一個(gè)或者多個(gè)topic產(chǎn)生的輸入流,然后生產(chǎn)一個(gè)輸出流到一個(gè)或多個(gè)topic中去,在輸入輸出流中進(jìn)行有效的轉(zhuǎn)換。
  • The Connector API 允許構(gòu)建并運(yùn)行可重用的生產(chǎn)者或者消費(fèi)者,將Kafka topics連接到已存在的應(yīng)用程序或者數(shù)據(jù)系統(tǒng)。比如,連接到一個(gè)關(guān)系型數(shù)據(jù)庫,捕捉表(table)的所有變更內(nèi)容。

在本實(shí)驗(yàn)中只做功能性驗(yàn)證,只搭建一個(gè)單機(jī)版的 Kafka 環(huán)境。

5.1 下載并解壓 Kafka

[root@r22 soft]# tar vxzf ka [root@r22 soft]# mv ka /opt/kafka

5.2 部署用于 Kafka 的 jre

[root@r22 soft]# tar vxzf jre1.8.0_281. [root@r22 soft]# mv jre1.8.0_281 /opt/kafka/jre

修改 Kafka 的 jre 環(huán)境變量

[root@r22 bin]# vim /opt/kafka/bin ## add bellowing line in the head of ka JAVA_HOME=/opt/kafka/jre

5.3 修改 Kafka 配置文件

修改 Kafka 配置文件 /opt/kafka/config

## change bellowing variable in /opt/kafka/config broker.id=0 listeners=PLAINTEXT://192.168.12.22:9092 log.dirs=/opt/kafka/logs zookeeper.connect=i192.168.12.24:2181

5.4 啟動(dòng) Kafka

[root@r22 bin]# /opt/kafka/bin /opt/kafka/config

5.5 查看 Kafka 的版本信息

Kafka 并沒有提供 --version 的 optional 來查看 Kafka 的版本信息。

[root@r22 ~]# ll /opt/kafka/libs/ | grep kafka -rw-r--r-- 1 root root 4929521 Dec 16 09:02 ka.jar -rw-r--r-- 1 root root 821 Dec 16 09:03 ka.jar.asc -rw-r--r-- 1 root root 41793 Dec 16 09:02 ka-javadoc.jar -rw-r--r-- 1 root root 821 Dec 16 09:03 ka-javadoc.jar.asc -rw-r--r-- 1 root root 892036 Dec 16 09:02 ka-sources.jar -rw-r--r-- 1 root root 821 Dec 16 09:03 ka-sources.jar.asc ... ...

其中 2.13 是 scale 的版本信息,2.7.0 是 Kafka 的版本信息。

六. 部署 Flink

Apache Flink 是一個(gè)框架和分布式處理引擎,用于在無邊界和有邊界數(shù)據(jù)流上進(jìn)行有狀態(tài)的計(jì)算。Flink 能在所有常見集群環(huán)境中運(yùn)行,并能以內(nèi)存速度和任意規(guī)模進(jìn)行計(jì)算。

支持高吞吐、低延遲、高性能的分布式處理框架 Apache Flink 是一個(gè)框架和分布式處理引擎,用于對(duì)無界和有界數(shù)據(jù)流進(jìn)行有狀態(tài)計(jì)算。Flink被設(shè)計(jì)在所有常見的集群環(huán)境中運(yùn)行,以內(nèi)存執(zhí)行速度和任意規(guī)模來執(zhí)行計(jì)算。

本實(shí)驗(yàn)只做功能性測(cè)試,僅部署單機(jī) Flink 環(huán)境。

6.1 下載并分發(fā) Flink

[root@r23 soft]# tar vxzf [root@r23 soft]# mv /opt/flink

6.2 部署 Flink 的 jre

[root@r23 soft]# tar vxzf jre1.8.0_281. [root@r23 soft]# mv jre1.8.0_281 /opt/flink/jre

6.3 添加 Flink 需要的 lib

Flink 消費(fèi) Kafka 數(shù)據(jù),需要 flink-sql-connector-kafka 包。

Flink 鏈接 MySQL/TiDB,需要 flink-connector-jdbc 包。

[root@r23 soft]# mv /opt/flink/lib/ [root@r23 soft]# mv /opt/flink/lib/

6.4 修改 Flink 配置文件

## add or modify bellowing lines in /opt/flink/con jobmanager.r: 192.168.12.23 env.java.home: /opt/flink/jre

6.5 啟動(dòng) Flink

[root@r23 ~]# /opt/flink/bin Starting cluster. Starting standalonesession daemon on host r23. Starting taskexecutor daemon on host r23.

6.6 查看 Flink GUI

七. 部署 MySQL

7.1 解壓 MySQL package

[root@r25 soft]# tar vxf my [root@r25 soft]# mv my /opt/mysql/

7.2 創(chuàng)建 MySQL Service 文件

[root@r25 ~]# touch /opt/mysql/support-file [root@r25 support-files]# cat my [Unit] Description=MySQL 8.0 database server After= After=ne [Service] Type=simple User=mysql Group=mysql #ExecStartPre=/usr/libexec/mysql-check-socket #ExecStartPre=/usr/libexec/mysql-prepare-db-dir %n # Note: we set --basedir to prevent probes that might trigger SELinux alarms, # per bug #547485 ExecStart=/opt/mysql/bin/mysqld_safe #ExecStartPost=/opt/mysql/bin/mysql-check-upgrade #ExecStopPost=/opt/mysql/bin/mysql-wait-stop # Give a reasonable amount of time for the server to start up/shut down TimeoutSec=300 # Place temp files in a secure directory, not /tmp PrivateTmp=true Restart=on-failure RestartPreventExitStatus=1 # Sets open_files_limit LimitNOFILE = 10000 # Set enviroment variable MYSQLD_PARENT_PID. This is required for SQL restart command. Environment=MYSQLD_PARENT_PID=1 [Install] WantedBy=mul ## copy my to /usr/lib/systemd/system/ [root@r25 support-files]# cp my /usr/lib/systemd/system/

7.3 創(chuàng)建 my.cnf 文件

[root@r34 opt]# cat /etc [mysqld] port=3306 basedir=/opt/mysql datadir=/opt/mysql/data socket=/opt/mysql/data max_connections = 100 default-storage-engine = InnoDB character-set-server=utf8 log-error = /opt/mysql/log slow_query_log = 1 long-query-time = 30 slow_query_log_file = /opt/mysql/log min_examined_row_limit = 1000 log-slow-slave-statements log-queries-not-using-indexes #skip-grant-tables

7.4 初始化并啟動(dòng) MySQL

[root@r25 ~]# /opt/mysql/bin/mysqld --initialize --user=mysql --console [root@r25 ~]# chown -R mysql:mysql /opt/mysql [root@r25 ~]# systemctl start mysqld ## check mysql temp passord from /opt/mysql/log 2021-02-24T02:45:47.316406Z 6 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: I?nDjijxa3>-

7.5 創(chuàng)建一個(gè)新的 MySQL 用戶用以連接 Canal

## change mysql temp password firstly mysql> alter user 'root'@'localhost' identified by 'mysql'; Query OK, 0 rows affected sec) ## create a management user 'root'@'%' mysql> create user 'root'@'%' identified by 'mysql'; Query OK, 0 rows affected sec) mysql> grant all privileges on *.* to 'root'@'%'; Query OK, 0 rows affected sec) ## create a canal replication user 'canal'@'%' mysql> create user 'canal'@'%' identified by 'canal'; Query OK, 0 rows affected sec) mysql> grant select, replication slave, replication client on *.* to 'canal'@'%'; Query OK, 0 rows affected sec) mysql> flush privileges; Query OK, 0 rows affected sec)

7.6 在 MySQL 中創(chuàng)建用于測(cè)試的表

mysql> show create table ; +-------+----------------------------------------------------------------------------------+ | Table | Create Table | +-------+----------------------------------------------------------------------------------+ | t2 | CREATE TABLE `t2` ( `id` int DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8 | +-------+----------------------------------------------------------------------------------+ 1 row in set sec)

八. 部署 Canal

Canal 主要用途是基于 MySQL 數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱和消費(fèi)。

早期阿里巴巴因?yàn)楹贾莺兔绹p機(jī)房部署,存在跨機(jī)房同步的業(yè)務(wù)需求,實(shí)現(xiàn)方式主要是基于業(yè)務(wù) trigger 獲取增量變更。

從 2010 年開始,業(yè)務(wù)逐步嘗試數(shù)據(jù)庫日志解析獲取增量變更進(jìn)行同步,由此衍生出了大量的數(shù)據(jù)庫增量訂閱和消費(fèi)業(yè)務(wù)。

基于日志增量訂閱和消費(fèi)的業(yè)務(wù)包括:

  • 數(shù)據(jù)庫鏡像。
  • 數(shù)據(jù)庫實(shí)時(shí)備份。
  • 索引構(gòu)建和實(shí)時(shí)維護(hù)(拆分異構(gòu)索引、倒排索引等)。
  • 業(yè)務(wù) cache 刷新。
  • 帶業(yè)務(wù)邏輯的增量數(shù)據(jù)處理。

當(dāng)前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

8.1 解壓 Canal 包

[root@r26 soft]# mkdir /opt/canal && tar vxzf canal.deployer-1.1.4. -C /opt/canal

8.2 部署 Canal 的 jre

[root@r26 soft]# tar vxzf jre1.8.0_281. [root@r26 soft]# mv jre1.8.0_281 /opt/canal/jre ## configue jre, add bellowing line in the head of /opt/canal/bin JAVA=/opt/canal/jre/bin/java

8.3 修改 Canal 的配置文件

修改 /opt/canal/con 配置文件

## modify bellowing configuration canal.zkServers =192.168.12.24:2181 canal.serverMode = kafka canal.destinations = example ## 需要在 /opt/canal/conf 目錄下創(chuàng)建一個(gè) example 文件夾,用于存放 destination 的配置 canal.mq.servers = 192.168.12.22:9092

修改 /opt/canal/conf/example 配置文件

## modify bellowing configuration canal.instance.master.address=192.168.12.25:3306 canal.in canal.in canal.in.*\\..* ## 過濾數(shù)據(jù)庫的表 canal.mq.topic=canal-kafka

九. 配置數(shù)據(jù)流向

9.1 MySQL Binlog -> Canal -> Kafka 通路

9.1.1 查看 MySQL Binlog 信息

查看 MySQL Binlog 信息,確保 Binlog 是正常的。

mysql> show master status; +---------------+----------+--------------+------------------+-------------------+ | File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set | +---------------+----------+--------------+------------------+-------------------+ | binlog.000001 | 2888 | | | | +---------------+----------+--------------+------------------+-------------------+ 1 row in set sec)

9.1.2 在 Kafka 中創(chuàng)建一個(gè) Topic

在 Kafka 中創(chuàng)建一個(gè) Topic canal-kafka,這個(gè)Topic 的名字要與 Canal 配置文件 /opt/canal/conf/example 中的 canal.mq.topic=canal-kafka 對(duì)應(yīng):

[root@r22 kafka]# /opt/kafka/bin --create \ > --zookeeper 192.168.12.24:2181 \ > --config max.me \ > --config \ > --replication-factor 1 \ > --partitions 1 \ > --topic canal-kafka Created topic canal-kafka. [2021-02-24 01:51:55,050] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(canal-kafka-0) ) [2021-02-24 01:51:55,052] INFO [Log partition=canal-kafka-0, dir=/opt/kafka/logs] Loading producer state till offset 0 with message format version 2 ) [2021-02-24 01:51:55,053] INFO Created log for partition canal-kafka-0 in /opt/kafka/logs/canal-kafka-0 with properties {com -> producer, me -> true, min.in -> 1, -> 0, cleanup.policy -> [delete], -> 9223372036854775807, -> 1073741824, re -> 604800000, -> 1, me -> 2.7-IV2, -> 60000, max.com -> 9223372036854775807, max.me -> 12800000, min.com -> 0, me -> CreateTime, preallocate -> false, min.cleanable.dir -> 0.5, index.in -> 4096, unclean.leader.elec -> false, re -> -1, delete.re -> 86400000, -> 604800000, me -> 9223372036854775807, -> 10485760}. Manager) [2021-02-24 01:51:55,053] INFO [Partition canal-kafka-0 broker=0] No checkpointed highwatermark is found for partition canal-kafka-0 ) [2021-02-24 01:51:55,053] INFO [Partition canal-kafka-0 broker=0] Log loaded for partition canal-kafka-0 with initial high watermark 0 )

查看 Kafka 中所有的 Topic:

[root@r22 kafka]# /opt/kafka/bin --list --zookeeper 192.168.12.24:2181 __consumer_offsets canal-kafka ticdc-test

查看 Kafka 中 Topic ticdc-test 的信息:

[root@r22 ~]# /opt/kafka/bin --describe --zookeeper 192.168.12.24:2181 --topic canal-kafka Topic: ticdc-test PartitionCount: 1 ReplicationFactor: 1 Configs: max.me, Topic: ticdc-test Partition: 0 Leader: 0 Replicas: 0 Isr: 0

9.1.3 啟動(dòng) Canal

在啟動(dòng) Canal 之前,需要在 Canal 節(jié)點(diǎn)上查看一下端口的情況:

## check MySQL 3306 port ## canal.instance.master.address=192.168.12.25:3306 [root@r26 bin]# telnet 192.168.12.25 3306 ## check Kafka 9092 port ## canal.mq.servers = 192.168.12.22:9092 [root@r26 bin]# telnet 192.168.12.22 9092 ## check zookeeper 2181 port ## canal.zkServers = 192.168.12.24:2181 [root@r26 bin]# telnet 192.168.12.24 2181

啟動(dòng) Canal:

[root@r26 bin]# /opt/canal/bin cd to /opt/canal/bin for workaround relative path LOG CONFIGURATION : /opt/canal/bin/../con canal conf : /opt/canal/bin/../con CLASSPATH :/opt/canal/bin/../conf:/opt/canal/bin/../lib: cd to /opt/canal/bin for continue

9.1.4 查看 Canal 日志

查看 /opt/canal/logs/example

2021-02-24 01:41:40.293 [destination = example , address = , EventParser] WARN c.a.o.c.p.inbound.my - ---> begin to find start position, it will be long time for reset or first position 2021-02-24 01:41:40.293 [destination = example , address = , EventParser] WARN c.a.o.c.p.inbound.my - prepare to find start position just show master status 2021-02-24 01:41:40.542 [destination = example , address = , EventParser] WARN c.a.o.c.p.inbound.my - ---> find start position successfully, EntryPosition[included=false,journalName=binlog.000001,position=4,serverId=1,gtid=<null>,timestamp=1614134832000] cost : 244ms , the next step is binlog dump

9.1.5 查看 Kafka 中 consumer 信息

在 MySQL 中插入一條測(cè)試信息:

mysql> insert into t2 values(1); Query OK, 1 row affected sec)

查看 consumer 的信息,已經(jīng)有了剛才插入的測(cè)試數(shù)據(jù):

/opt/kafka/bin --bootstrap-server 192.168.12.22:9092 --topic canal-kafka --from-beginning {"data":null,"database":"test","es":1614151725000,"id":2,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"create database test","sqlType":null,"table":"","ts":1614151725890,"type":"QUERY"} {"data":null,"database":"test","es":1614151746000,"id":3,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"create table t2(id int)","sqlType":null,"table":"t2","ts":1614151746141,"type":"CREATE"} {"data":[{"id":"1"}],"database":"test","es":1614151941000,"id":4,"isDdl":false,"mysqlType":{"id":"int"},"old":null,"pkNames":null,"sql":"","sqlType":{"id":4},"table":"t2","ts":1614151941235,"type":"INSERT"}

9.2 Kafka -> Flink 通路

在 Flink 中創(chuàng)建 t2 表,connector 類型為 kafka。

## create a test table t2 in Flink Flink SQL> create table t2(id int) > WITH ( > 'connector' = 'kafka', > 'topic' = 'canal-kafka', > '; = '192.168.12.22:9092', > '; = 'canal-kafka-consumer-group', > 'format' = 'canal-json', > '; = 'latest-offset' > ); Flink SQL> select * from t1;

在 MySQL 中在插入一條測(cè)試數(shù)據(jù):

mysql> insert into values(2); Query OK, 1 row affected sec)

從 Flink 中可以實(shí)時(shí)同步數(shù)據(jù):

Flink SQL> select * from t1; Refresh: 1 s Page: Last of 1 Updated: 02:49:27.366 id 2

9.3 Flink -> TiDB 通路

9.3.1 在 下游的 TiDB 中創(chuàng)建用于測(cè)試的表

[root@r20 soft]# mysql -uroot -P14000 -hr21 mysql> create table t3 (id int); Query OK, 0 rows affected sec)

9.3.2 在 Flink 中創(chuàng)建測(cè)試表

Flink SQL> CREATE TABLE t3 ( > id int > ) with ( > 'connector' = 'jdbc', > 'url' = 'jdbc:mysql://192.168.12.21:14000/test', > 'table-name' = 't3', > 'username' = 'root', > 'password' = 'mysql' > ); Flink SQL> insert into t3 values(3); [INFO] Submitting SQL update statement to the cluster... [INFO] Table update statement has been successfully submitted to the cluster: Job ID: a0827487030db177ee7e5c8575ef714e

9.3.3 在下游 TiDB 中查看插入的數(shù)據(jù)

mysql> select * from ; +------+ | id | +------+ | 3 | +------+ 1 row in set sec)

本文為阿里云原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。

1.《678改動(dòng)日志專題之Flink 最佳實(shí)踐之使用 Canal 同步 MySQL 數(shù)據(jù)至 TiDB》援引自互聯(lián)網(wǎng),旨在傳遞更多網(wǎng)絡(luò)信息知識(shí),僅代表作者本人觀點(diǎn),與本網(wǎng)站無關(guān),侵刪請(qǐng)聯(lián)系頁腳下方聯(lián)系方式。

2.《678改動(dòng)日志專題之Flink 最佳實(shí)踐之使用 Canal 同步 MySQL 數(shù)據(jù)至 TiDB》僅供讀者參考,本網(wǎng)站未對(duì)該內(nèi)容進(jìn)行證實(shí),對(duì)其原創(chuàng)性、真實(shí)性、完整性、及時(shí)性不作任何保證。

3.文章轉(zhuǎn)載時(shí)請(qǐng)保留本站內(nèi)容來源地址,http://f99ss.com/gl/2075401.html

上一篇

dnf散打加點(diǎn)專題之90版本DNF武極攻略分享 解讀90版本男散打加點(diǎn)配裝

下一篇

dnf紅眼刷圖加點(diǎn)看這里!DNF:紅眼100級(jí)的2種技能加點(diǎn)推薦,狂戰(zhàn)的信仰小蹦你會(huì)點(diǎn)滿嗎?

678改動(dòng)日志看這里!Flink 最佳實(shí)踐之使用 Canal 同步 MySQL 數(shù)據(jù)至 TiDB

678改動(dòng)日志看這里!Flink 最佳實(shí)踐之使用 Canal 同步 MySQL 數(shù)據(jù)至 TiDB

678改動(dòng)日志相關(guān)介紹,一. 背景介紹 本文將介紹如何將 MySQL 中的數(shù)據(jù),通過 Binlog + Canal 的形式導(dǎo)入到 Kafka 中,繼而被 Flink 消費(fèi)的案例。 達(dá)到當(dāng)天最大量API KEY 超過次數(shù)限制 ...

關(guān)于678改動(dòng)日志我想說第84章 春風(fēng)行動(dòng)

關(guān)于678改動(dòng)日志我想說第84章 春風(fēng)行動(dòng)

678改動(dòng)日志相關(guān)介紹,“謝謝大家熱烈的掌聲,今天面對(duì)各位前輩和老師,雷倩倩的身份首先是一名學(xué)生,請(qǐng)?jiān)试S我向各位尊敬的師長致敬!” 雷倩倩起立,她走到講臺(tái)中央,向在座的廣大教育工作者,深深鞠躬! “正因?yàn)槲沂紫仁且幻麑W(xué)生,所...

678改動(dòng)日志看這里!記一次 .NET 某紡織工廠 MES系統(tǒng) API 掛死分析

678改動(dòng)日志看這里!記一次 .NET 某紡織工廠 MES系統(tǒng) API 掛死分析

678改動(dòng)日志相關(guān)介紹,一、背景 1.講故事 這個(gè)月中旬,一個(gè)朋友向我求助wx的程序線程份額很高,探討了如何解決。屏幕截圖如下: 說實(shí)話,和其他行業(yè)的程序員聊天還是很有趣的,可以交朋友,擴(kuò)大自己的圈子。朋友說,因?yàn)檫@個(gè)bug...

【678改動(dòng)日志】Flink 最佳實(shí)踐之使用 Canal 同步 MySQL 數(shù)據(jù)至 TiDB

【678改動(dòng)日志】Flink 最佳實(shí)踐之使用 Canal 同步 MySQL 數(shù)據(jù)至 TiDB

678改動(dòng)日志相關(guān)介紹,一.背景介紹 本文介紹了將MySQL中的數(shù)據(jù)通過Binlog Canal導(dǎo)入Kafka,然后由Flink消耗的情況。 為了能夠快速的驗(yàn)證整套流程的功能性,所有的組件都以單機(jī)的形式部署。如果手上的物理資...

【678改動(dòng)日志】專題形象一落千丈的徐睿知 韓國演藝圈歷代勁爆 爭(zhēng)議連環(huán)爆

【678改動(dòng)日志】專題形象一落千丈的徐睿知 韓國演藝圈歷代勁爆 爭(zhēng)議連環(huán)爆

678改動(dòng)日志相關(guān)介紹,安靜的外表,聲音低,有磁性的她有望成為一線女演員。 她就是最近爭(zhēng)議連環(huán)爆的徐睿知。1990年4月6日出生于首爾過去曾在街上被星探看中。但因?yàn)橛X得自己聲音低沉比較適合當(dāng)主播,因此拒絕了試鏡邀請(qǐng),之后她到...

【678改動(dòng)日志】2月8日·貴州要聞及抗擊肺炎快報(bào)

【678改動(dòng)日志】2月8日·貴州要聞及抗擊肺炎快報(bào)

678改動(dòng)日志相關(guān)介紹,每天廣播 貴州:2020年2月7日12-24時(shí),全省新型新冠病毒感染肺炎新增確診病例8例,出院患者0例。 其中:新增確診病例中,遵義市3例,畢節(jié)市4例,黔西南州1例。 截至2月7日24時(shí),全省累計(jì)報(bào)告...

【678改動(dòng)日志】專題網(wǎng)絡(luò)流量與Agent_Drable惡意程序深度分析

【678改動(dòng)日志】專題網(wǎng)絡(luò)流量與Agent_Drable惡意程序深度分析

678改動(dòng)日志相關(guān)介紹,在回顧最近的一些網(wǎng)絡(luò)異常時(shí),發(fā)現(xiàn)了使用DNS隧道與C2通信的攻擊組織,并將其命名為“Cold River”。破譯受害者和C2的流量,發(fā)現(xiàn)攻擊者使用的復(fù)雜誘餌文件,與其他未知樣本連接,并發(fā)現(xiàn)攻擊者使用的...

678改動(dòng)日志專題之必收藏 | 寬帶錯(cuò)誤碼含義及處理方法大全

678改動(dòng)日志專題之必收藏 | 寬帶錯(cuò)誤碼含義及處理方法大全

678改動(dòng)日志相關(guān)介紹,文章有點(diǎn)長。但是很耐用! 更多寬帶信息,微信搜索“寬帶軍”,寬帶軍洗干凈了等著你! 最近發(fā)現(xiàn)很多小朋友給我留言 不是為了找對(duì)象 在找對(duì)象的路上。 已經(jīng)有錯(cuò)誤了。 . . . 寬帶錯(cuò)誤 今天寬帶軍給大家...