一.背景介紹
本文介紹了將MySQL中的數(shù)據(jù)通過Binlog Canal導(dǎo)入Kafka,然后由Flink消耗的情況。
為了能夠快速的驗(yàn)證整套流程的功能性,所有的組件都以單機(jī)的形式部署。如果手上的物理資源不足,可以將本文中的所有組件一臺(tái) 4G 1U 的虛擬機(jī)環(huán)境中。
如果需要在生產(chǎn)環(huán)境中部署,建議將每一個(gè)組件替換成高可用的集群部署方案。
其中,我們單獨(dú)創(chuàng)建了一套 Zookeeper 單節(jié)點(diǎn)環(huán)境,F(xiàn)link、Kafka、Canal 等組件共用這個(gè) Zookeeper 環(huán)境。
針對(duì)于所有需要 Jre 的組件,如 Flink,Kafka,Canal,Zookeeper,考慮到升級(jí) JRE 可能會(huì)影響到其他的應(yīng)用,我們選擇每個(gè)組件獨(dú)立使用自己的 JRE 環(huán)境。
本文分為兩個(gè)部分,其中,前七小節(jié)主要介紹基礎(chǔ)環(huán)境的搭建,最后一個(gè)小節(jié)介紹了數(shù)據(jù)是如何在各個(gè)組件中流通的。
數(shù)據(jù)的流動(dòng)經(jīng)過以下組件:
- MySQL 數(shù)據(jù)源生成 Binlog。
- Canal 讀取 Binlog,生成 Canal json,推送到 Kafka 指定的 Topic 中。
- Flink 使用 flink-sql-connector-kafka API,消費(fèi) Kafka Topic 中的數(shù)據(jù)。
- Flink 在通過 flink-connector-jdbc,將數(shù)據(jù)寫入到 TiDB 中。
TiDB + Flink 的結(jié)構(gòu),支持開發(fā)與運(yùn)行多種不同種類的應(yīng)用程序。
目前主要的特性主要包括:
- 批流一體化。
- 精密的狀態(tài)管理。
- 事件時(shí)間支持。
- 精確的一次狀態(tài)一致性保障。
Flink 可以運(yùn)行在包括 YARN、Mesos、Kubernetes 在內(nèi)的多種資源管理框架上,還支持裸機(jī)集群上獨(dú)立部署。TiDB 可以部署 AWS、Kubernetes、GCP GKE 上,同時(shí)也支持使用 TiUP 在裸機(jī)集群上獨(dú)立部署。
TiDB + Flink 結(jié)構(gòu)常見的幾類應(yīng)用如下:
- 事件驅(qū)動(dòng)型應(yīng)用:反欺詐。異常檢測(cè)。基于規(guī)則的報(bào)警。業(yè)務(wù)流程監(jiān)控。
- 數(shù)據(jù)分析應(yīng)用:網(wǎng)絡(luò)質(zhì)量監(jiān)控。產(chǎn)品更新及試驗(yàn)評(píng)估分析。事實(shí)數(shù)據(jù)即席分析。大規(guī)模圖分析。
- 數(shù)據(jù)管道應(yīng)用:電商實(shí)時(shí)查詢索引構(gòu)建。電商持續(xù) ETL。
二. 環(huán)境介紹
2.1 操作系統(tǒng)環(huán)境
[root@r20 topology]# cat /etc/redhat-release
CentOS Stream release 8
2.2 軟件環(huán)境
Item | Version | Download link |
TiDB | v4.0.9 | 1 |
Kafka | v2.7.0 | |
Flink | v1.12.1 | |
Jre | v1.8.0_281 | |
Zookeeper | v3.6.2 | |
flink-sql-connector-kafka | v1.12.1 | |
flink-connector-jdbc | v1.12.0 | |
MySQL | v8.0.23 | |
Canal | v1.1.4 |
2.3 機(jī)器分配
Hostname | IP | Component |
r21 | 192.168.12.21 | TiDB Cluster |
r22 | 192.168.12.22 | Kafka |
r23 | 192.168.12.23 | Flink |
r24 | 192.168.12.24 | Zookeeper |
r25 | 192.168.12.25 | MySQL |
r26 | 192.168.12.26 | Canal |
三. 部署 TiDB Cluster
與傳統(tǒng)的單機(jī)數(shù)據(jù)庫相比,TiDB 具有以下優(yōu)勢(shì):
- 純分布式架構(gòu),擁有良好的擴(kuò)展性,支持彈性的擴(kuò)縮容。
- 支持 SQL,對(duì)外暴露 MySQL 的網(wǎng)絡(luò)協(xié)議,并兼容大多數(shù) MySQL 的語法,在大多數(shù)場(chǎng)景下可以直接替換 MySQL。
- 默認(rèn)支持高可用,在少數(shù)副本失效的情況下,數(shù)據(jù)庫本身能夠自動(dòng)進(jìn)行數(shù)據(jù)修復(fù)和故障轉(zhuǎn)移,對(duì)業(yè)務(wù)透明。
- 支持 ACID 事務(wù),對(duì)于一些有強(qiáng)一致需求的場(chǎng)景友好,例如:銀行轉(zhuǎn)賬。
- 具有豐富的工具鏈生態(tài),覆蓋數(shù)據(jù)遷移、同步、備份等多種場(chǎng)景。
在內(nèi)核設(shè)計(jì)上,TiDB 分布式數(shù)據(jù)庫將整體架構(gòu)拆分成了多個(gè)模塊,各模塊之間互相通信,組成完整的 TiDB 系統(tǒng)。對(duì)應(yīng)的架構(gòu)圖如下:
在本文中,我們只做最簡(jiǎn)單的功能測(cè)試,所以部署了一套單節(jié)點(diǎn)但副本的 TiDB,涉及到了以下的三個(gè)模塊:
- TiDB Server:SQL 層,對(duì)外暴露 MySQL 協(xié)議的連接 endpoint,負(fù)責(zé)接受客戶端的連接,執(zhí)行 SQL 解析和優(yōu)化,最終生成分布式執(zhí)行計(jì)劃。
- PD (Placement Driver) Server:整個(gè) TiDB 集群的元信息管理模塊,負(fù)責(zé)存儲(chǔ)每個(gè) TiKV 節(jié)點(diǎn)實(shí)時(shí)的數(shù)據(jù)分布情況和集群的整體拓?fù)浣Y(jié)構(gòu),提供 TiDB Dashboard 管控界面,并為分布式事務(wù)分配事務(wù) ID。
- TiKV Server:負(fù)責(zé)存儲(chǔ)數(shù)據(jù),從外部看 TiKV 是一個(gè)分布式的提供事務(wù)的 Key-Value 存儲(chǔ)引擎。
3.1 TiUP 部署模板文件
# # Global variables are applied to all deployments and used as the default value of
# # the deployments if a specific deployment value is missing.
global:
user: "tidb"
SSH_port: 22
deploy_dir: "/opt/tidb-c1/"
data_dir: "/opt/tidb-c1/data/"
# # Monitored variables are applied to all the machines.
#monitored:
# node_exporter_port: 19100
# blackbox_exporter_port: 39115
# deploy_dir: "/opt/tidb-c3/monitored"
# data_dir: "/opt/tidb-c3/data/monitored"
# log_dir: "/opt/tidb-c3/log/monitored"
# # Server configs are used to specify the runtime configuration of TiDB components.
# # All configuration items can be found in TiDB docs:
# # - TiDB:
# # - TiKV:
# # - PD:
# # All configuration items use points to represent the hierarchy, e.g:
# # read
# #
# # You can overwrite this configuration via the instance-level `config` field.
server_configs:
tidb:
log.slow-threshold: 300
binlog.enable: false
binlog.ignore-error: false
: true
tikv:
: 4
ra: 2
ra: 2
rock: 1
: "16GB"
read: 12
read: false
read: true
ra: 0
pd:
: 4
: 2048
: 64
pd_servers:
- host: 192.168.12.21
ssh_port: 22
name: "pd-2"
client_port: 12379
peer_port: 12380
deploy_dir: "/opt/tidb-c1/pd-12379"
data_dir: "/opt/tidb-c1/data/pd-12379"
log_dir: "/opt/tidb-c1/log/pd-12379"
numa_node: "0"
# # The following configs are used to overwrite the `` values.
config:
: 20
: 200000
tidb_servers:
- host: 192.168.12.21
ssh_port: 22
port: 14000
status_port: 12080
deploy_dir: "/opt/tidb-c1/tidb-14000"
log_dir: "/opt/tidb-c1/log/tidb-14000"
numa_node: "0"
# # The following configs are used to overwrite the `` values.
config:
log.slow-query-file:
: true
tikv_servers:
- host: 192.168.12.21
ssh_port: 22
port: 12160
status_port: 12180
deploy_dir: "/opt/tidb-c1/tikv-12160"
data_dir: "/opt/tidb-c1/data/tikv-12160"
log_dir: "/opt/tidb-c1/log/tikv-12160"
numa_node: "0"
# # The following configs are used to overwrite the `` values.
config:
: 4
#: { zone: "zone1", dc: "dc1", host: "host1" }
#monitoring_servers:
# - host: 192.168.12.21
# ssh_port: 22
# port: 19090
# deploy_dir: "/opt/tidb-c1/prometheus-19090"
# data_dir: "/opt/tidb-c1/data/prometheus-19090"
# log_dir: "/opt/tidb-c1/log/prometheus-19090"
#grafana_servers:
# - host: 192.168.12.21
# port: 13000
# deploy_dir: "/opt/tidb-c1/grafana-13000"
#alertmanager_servers:
# - host: 192.168.12.21
# ssh_port: 22
# web_port: 19093
# cluster_port: 19094
# deploy_dir: "/opt/tidb-c1/alertmanager-19093"
# data_dir: "/opt/tidb-c1/data/alertmanager-19093"
# log_dir: "/opt/tidb-c1/log/alertmanager-19093"
3.2 TiDB Cluster 環(huán)境
本文重點(diǎn)非部署 TiDB Cluster,作為快速實(shí)驗(yàn)環(huán)境,只在一臺(tái)機(jī)器上部署單副本的 TiDB Cluster 集群。不需要部署監(jiān)控環(huán)境。
[root@r20 topology]# tiup cluster display tidb-c1-v409
Starting component `cluster`: /root/.tiup/components/cluster display tidb-c1-v409
Cluster type: tidb
Cluster name: tidb-c1-v409
Cluster version: v4.0.9
SSH type: builtin
Dashboard URL:
ID Role Host Ports OS/Arch Status Data Dir Deploy Dir
-- ---- ---- ----- ------- ------ -------- ----------
192.168.12.21:12379 pd 192.168.12.21 12379/12380 linux/x86_64 Up|L|UI /opt/tidb-c1/data/pd-12379 /opt/tidb-c1/pd-12379
192.168.12.21:14000 tidb 192.168.12.21 14000/12080 linux/x86_64 Up - /opt/tidb-c1/tidb-14000
192.168.12.21:12160 tikv 192.168.12.21 12160/12180 linux/x86_64 Up /opt/tidb-c1/data/tikv-12160 /opt/tidb-c1/tikv-12160
Total nodes: 4
創(chuàng)建用于測(cè)試的表
mysql> show create table t1;
+-------+-------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table |
+-------+-------------------------------------------------------------------------------------------------------------------------------+
| t1 | CREATE TABLE `t1` (
`id` int(11) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin |
+-------+-------------------------------------------------------------------------------------------------------------------------------+
1 row in set sec)
四. 部署 Zookeeper 環(huán)境
在本實(shí)驗(yàn)中單獨(dú)配置 Zookeeper 環(huán)境,為 Kafka 和 Flink 環(huán)境提供服務(wù)。
作為實(shí)驗(yàn)演示方案,只部署單機(jī)環(huán)境。
4.1 解壓 Zookeeper 包
[root@r24 soft]# tar vxzf a
[root@r24 soft]# mv a /opt/zookeeper
4.2 部署用于 Zookeeper 的 jre
[root@r24 soft]# tar vxzf jre1.8.0_281.
[root@r24 soft]# mv jre1.8.0_281 /opt/zookeeper/jre
修改 /opt/zookeeper/bin 文件,增加 JAVA_HOME 環(huán)境變量
## add bellowing env var in the head of zkEnv.sh
JAVA_HOME=/opt/zookeeper/jre
4.3 創(chuàng)建 Zookeeper 的配置文件
[root@r24 conf]# cat zoo.cfg | grep -v "#"
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zookeeper/data
clientPort=2181
4.4 啟動(dòng) Zookeeper
[root@r24 bin]# /opt/zookeeper/bin start
4.5 檢查 Zookeeper 的狀態(tài)
## check zk status
[root@r24 bin]# . status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../con
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: standalone
## check OS port status
[root@r24 bin]# netstat -ntlp
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN 942/sshd
tcp6 0 0 :::2181 :::* LISTEN 15062/java
tcp6 0 0 :::8080 :::* LISTEN 15062/java
tcp6 0 0 :::22 :::* LISTEN 942/sshd
tcp6 0 0 :::44505 :::* LISTEN 15062/java
## use zkCli tool to check zk connection
[root@r24 bin]# . -server 192.168.12.24:2181
4.6 關(guān)于 Zookeeper 的建議
我個(gè)人有一個(gè)關(guān)于 Zookeeper 的不成熟的小建議:
Zookeeper 集群版本一定要開啟網(wǎng)絡(luò)監(jiān)控。特別是要關(guān)注 system metrics 里面的 network bandwidth。
五. 部署 Kafka
Kafka 是一個(gè)分布式流處理平臺(tái),主要應(yīng)用于兩大類的應(yīng)用中:
- 構(gòu)造實(shí)時(shí)流數(shù)據(jù)管道,它可以在系統(tǒng)或應(yīng)用之間可靠地獲取數(shù)據(jù)。 (相當(dāng)于message queue)
- 構(gòu)建實(shí)時(shí)流式應(yīng)用程序,對(duì)這些流數(shù)據(jù)進(jìn)行轉(zhuǎn)換或者影響。 (就是流處理,通過kafka stream topic和topic之間內(nèi)部進(jìn)行變化)
Kafka 有四個(gè)核心的 API:
- The Producer API 允許一個(gè)應(yīng)用程序發(fā)布一串流式的數(shù)據(jù)到一個(gè)或者多個(gè)Kafka topic。
- The Consumer API 允許一個(gè)應(yīng)用程序訂閱一個(gè)或多個(gè) topic ,并且對(duì)發(fā)布給他們的流式數(shù)據(jù)進(jìn)行處理。
- The Streams API 允許一個(gè)應(yīng)用程序作為一個(gè)流處理器,消費(fèi)一個(gè)或者多個(gè)topic產(chǎn)生的輸入流,然后生產(chǎn)一個(gè)輸出流到一個(gè)或多個(gè)topic中去,在輸入輸出流中進(jìn)行有效的轉(zhuǎn)換。
- The Connector API 允許構(gòu)建并運(yùn)行可重用的生產(chǎn)者或者消費(fèi)者,將Kafka topics連接到已存在的應(yīng)用程序或者數(shù)據(jù)系統(tǒng)。比如,連接到一個(gè)關(guān)系型數(shù)據(jù)庫,捕捉表(table)的所有變更內(nèi)容。
在本實(shí)驗(yàn)中只做功能性驗(yàn)證,只搭建一個(gè)單機(jī)版的 Kafka 環(huán)境。
5.1 下載并解壓 Kafka
[root@r22 soft]# tar vxzf ka
[root@r22 soft]# mv ka /opt/kafka
5.2 部署用于 Kafka 的 jre
[root@r22 soft]# tar vxzf jre1.8.0_281.
[root@r22 soft]# mv jre1.8.0_281 /opt/kafka/jre
修改 Kafka 的 jre 環(huán)境變量
[root@r22 bin]# vim /opt/kafka/bin
## add bellowing line in the head of ka
JAVA_HOME=/opt/kafka/jre
5.3 修改 Kafka 配置文件
修改 Kafka 配置文件 /opt/kafka/config
## change bellowing variable in /opt/kafka/config
broker.id=0
listeners=PLAINTEXT://192.168.12.22:9092
log.dirs=/opt/kafka/logs
zookeeper.connect=i192.168.12.24:2181
5.4 啟動(dòng) Kafka
[root@r22 bin]# /opt/kafka/bin /opt/kafka/config
5.5 查看 Kafka 的版本信息
Kafka 并沒有提供 --version 的 optional 來查看 Kafka 的版本信息。
[root@r22 ~]# ll /opt/kafka/libs/ | grep kafka
-rw-r--r-- 1 root root 4929521 Dec 16 09:02 ka.jar
-rw-r--r-- 1 root root 821 Dec 16 09:03 ka.jar.asc
-rw-r--r-- 1 root root 41793 Dec 16 09:02 ka-javadoc.jar
-rw-r--r-- 1 root root 821 Dec 16 09:03 ka-javadoc.jar.asc
-rw-r--r-- 1 root root 892036 Dec 16 09:02 ka-sources.jar
-rw-r--r-- 1 root root 821 Dec 16 09:03 ka-sources.jar.asc
... ...
其中 2.13 是 scale 的版本信息,2.7.0 是 Kafka 的版本信息。
六. 部署 Flink
Apache Flink 是一個(gè)框架和分布式處理引擎,用于在無邊界和有邊界數(shù)據(jù)流上進(jìn)行有狀態(tài)的計(jì)算。Flink 能在所有常見集群環(huán)境中運(yùn)行,并能以內(nèi)存速度和任意規(guī)模進(jìn)行計(jì)算。
支持高吞吐、低延遲、高性能的分布式處理框架 Apache Flink 是一個(gè)框架和分布式處理引擎,用于對(duì)無界和有界數(shù)據(jù)流進(jìn)行有狀態(tài)計(jì)算。Flink被設(shè)計(jì)在所有常見的集群環(huán)境中運(yùn)行,以內(nèi)存執(zhí)行速度和任意規(guī)模來執(zhí)行計(jì)算。
本實(shí)驗(yàn)只做功能性測(cè)試,僅部署單機(jī) Flink 環(huán)境。
6.1 下載并分發(fā) Flink
[root@r23 soft]# tar vxzf
[root@r23 soft]# mv /opt/flink
6.2 部署 Flink 的 jre
[root@r23 soft]# tar vxzf jre1.8.0_281.
[root@r23 soft]# mv jre1.8.0_281 /opt/flink/jre
6.3 添加 Flink 需要的 lib
Flink 消費(fèi) Kafka 數(shù)據(jù),需要 flink-sql-connector-kafka 包。
Flink 鏈接 MySQL/TiDB,需要 flink-connector-jdbc 包。
[root@r23 soft]# mv /opt/flink/lib/
[root@r23 soft]# mv /opt/flink/lib/
6.4 修改 Flink 配置文件
## add or modify bellowing lines in /opt/flink/con
jobmanager.r: 192.168.12.23
env.java.home: /opt/flink/jre
6.5 啟動(dòng) Flink
[root@r23 ~]# /opt/flink/bin
Starting cluster.
Starting standalonesession daemon on host r23.
Starting taskexecutor daemon on host r23.
6.6 查看 Flink GUI
七. 部署 MySQL
7.1 解壓 MySQL package
[root@r25 soft]# tar vxf my
[root@r25 soft]# mv my /opt/mysql/
7.2 創(chuàng)建 MySQL Service 文件
[root@r25 ~]# touch /opt/mysql/support-file
[root@r25 support-files]# cat my
[Unit]
Description=MySQL 8.0 database server
After=
After=ne
[Service]
Type=simple
User=mysql
Group=mysql
#ExecStartPre=/usr/libexec/mysql-check-socket
#ExecStartPre=/usr/libexec/mysql-prepare-db-dir %n
# Note: we set --basedir to prevent probes that might trigger SELinux alarms,
# per bug #547485
ExecStart=/opt/mysql/bin/mysqld_safe
#ExecStartPost=/opt/mysql/bin/mysql-check-upgrade
#ExecStopPost=/opt/mysql/bin/mysql-wait-stop
# Give a reasonable amount of time for the server to start up/shut down
TimeoutSec=300
# Place temp files in a secure directory, not /tmp
PrivateTmp=true
Restart=on-failure
RestartPreventExitStatus=1
# Sets open_files_limit
LimitNOFILE = 10000
# Set enviroment variable MYSQLD_PARENT_PID. This is required for SQL restart command.
Environment=MYSQLD_PARENT_PID=1
[Install]
WantedBy=mul
## copy my to /usr/lib/systemd/system/
[root@r25 support-files]# cp my /usr/lib/systemd/system/
7.3 創(chuàng)建 my.cnf 文件
[root@r34 opt]# cat /etc
[mysqld]
port=3306
basedir=/opt/mysql
datadir=/opt/mysql/data
socket=/opt/mysql/data
max_connections = 100
default-storage-engine = InnoDB
character-set-server=utf8
log-error = /opt/mysql/log
slow_query_log = 1
long-query-time = 30
slow_query_log_file = /opt/mysql/log
min_examined_row_limit = 1000
log-slow-slave-statements
log-queries-not-using-indexes
#skip-grant-tables
7.4 初始化并啟動(dòng) MySQL
[root@r25 ~]# /opt/mysql/bin/mysqld --initialize --user=mysql --console
[root@r25 ~]# chown -R mysql:mysql /opt/mysql
[root@r25 ~]# systemctl start mysqld
## check mysql temp passord from /opt/mysql/log
2021-02-24T02:45:47.316406Z 6 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: I?nDjijxa3>-
7.5 創(chuàng)建一個(gè)新的 MySQL 用戶用以連接 Canal
## change mysql temp password firstly
mysql> alter user 'root'@'localhost' identified by 'mysql';
Query OK, 0 rows affected sec)
## create a management user 'root'@'%'
mysql> create user 'root'@'%' identified by 'mysql';
Query OK, 0 rows affected sec)
mysql> grant all privileges on *.* to 'root'@'%';
Query OK, 0 rows affected sec)
## create a canal replication user 'canal'@'%'
mysql> create user 'canal'@'%' identified by 'canal';
Query OK, 0 rows affected sec)
mysql> grant select, replication slave, replication client on *.* to 'canal'@'%';
Query OK, 0 rows affected sec)
mysql> flush privileges;
Query OK, 0 rows affected sec)
7.6 在 MySQL 中創(chuàng)建用于測(cè)試的表
mysql> show create table ;
+-------+----------------------------------------------------------------------------------+
| Table | Create Table |
+-------+----------------------------------------------------------------------------------+
| t2 | CREATE TABLE `t2` (
`id` int DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8 |
+-------+----------------------------------------------------------------------------------+
1 row in set sec)
八. 部署 Canal
Canal 主要用途是基于 MySQL 數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱和消費(fèi)。
早期阿里巴巴因?yàn)楹贾莺兔绹p機(jī)房部署,存在跨機(jī)房同步的業(yè)務(wù)需求,實(shí)現(xiàn)方式主要是基于業(yè)務(wù) trigger 獲取增量變更。
從 2010 年開始,業(yè)務(wù)逐步嘗試數(shù)據(jù)庫日志解析獲取增量變更進(jìn)行同步,由此衍生出了大量的數(shù)據(jù)庫增量訂閱和消費(fèi)業(yè)務(wù)。
基于日志增量訂閱和消費(fèi)的業(yè)務(wù)包括:
- 數(shù)據(jù)庫鏡像。
- 數(shù)據(jù)庫實(shí)時(shí)備份。
- 索引構(gòu)建和實(shí)時(shí)維護(hù)(拆分異構(gòu)索引、倒排索引等)。
- 業(yè)務(wù) cache 刷新。
- 帶業(yè)務(wù)邏輯的增量數(shù)據(jù)處理。
當(dāng)前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。
8.1 解壓 Canal 包
[root@r26 soft]# mkdir /opt/canal && tar vxzf canal.deployer-1.1.4. -C /opt/canal
8.2 部署 Canal 的 jre
[root@r26 soft]# tar vxzf jre1.8.0_281.
[root@r26 soft]# mv jre1.8.0_281 /opt/canal/jre
## configue jre, add bellowing line in the head of /opt/canal/bin
JAVA=/opt/canal/jre/bin/java
8.3 修改 Canal 的配置文件
修改 /opt/canal/con 配置文件
## modify bellowing configuration
canal.zkServers =192.168.12.24:2181
canal.serverMode = kafka
canal.destinations = example ## 需要在 /opt/canal/conf 目錄下創(chuàng)建一個(gè) example 文件夾,用于存放 destination 的配置
canal.mq.servers = 192.168.12.22:9092
修改 /opt/canal/conf/example 配置文件
## modify bellowing configuration
canal.instance.master.address=192.168.12.25:3306
canal.in
canal.in
canal.in.*\\..* ## 過濾數(shù)據(jù)庫的表
canal.mq.topic=canal-kafka
九. 配置數(shù)據(jù)流向
9.1 MySQL Binlog -> Canal -> Kafka 通路
9.1.1 查看 MySQL Binlog 信息
查看 MySQL Binlog 信息,確保 Binlog 是正常的。
mysql> show master status;
+---------------+----------+--------------+------------------+-------------------+
| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+---------------+----------+--------------+------------------+-------------------+
| binlog.000001 | 2888 | | | |
+---------------+----------+--------------+------------------+-------------------+
1 row in set sec)
9.1.2 在 Kafka 中創(chuàng)建一個(gè) Topic
在 Kafka 中創(chuàng)建一個(gè) Topic canal-kafka,這個(gè)Topic 的名字要與 Canal 配置文件 /opt/canal/conf/example 中的 canal.mq.topic=canal-kafka 對(duì)應(yīng):
[root@r22 kafka]# /opt/kafka/bin --create \
> --zookeeper 192.168.12.24:2181 \
> --config max.me \
> --config \
> --replication-factor 1 \
> --partitions 1 \
> --topic canal-kafka
Created topic canal-kafka.
[2021-02-24 01:51:55,050] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions Set(canal-kafka-0) )
[2021-02-24 01:51:55,052] INFO [Log partition=canal-kafka-0, dir=/opt/kafka/logs] Loading producer state till offset 0 with message format version 2 )
[2021-02-24 01:51:55,053] INFO Created log for partition canal-kafka-0 in /opt/kafka/logs/canal-kafka-0 with properties {com -> producer, me -> true, min.in -> 1, -> 0, cleanup.policy -> [delete], -> 9223372036854775807, -> 1073741824, re -> 604800000, -> 1, me -> 2.7-IV2, -> 60000, max.com -> 9223372036854775807, max.me -> 12800000, min.com -> 0, me -> CreateTime, preallocate -> false, min.cleanable.dir -> 0.5, index.in -> 4096, unclean.leader.elec -> false, re -> -1, delete.re -> 86400000, -> 604800000, me -> 9223372036854775807, -> 10485760}. Manager)
[2021-02-24 01:51:55,053] INFO [Partition canal-kafka-0 broker=0] No checkpointed highwatermark is found for partition canal-kafka-0 )
[2021-02-24 01:51:55,053] INFO [Partition canal-kafka-0 broker=0] Log loaded for partition canal-kafka-0 with initial high watermark 0 )
查看 Kafka 中所有的 Topic:
[root@r22 kafka]# /opt/kafka/bin --list --zookeeper 192.168.12.24:2181
__consumer_offsets
canal-kafka
ticdc-test
查看 Kafka 中 Topic ticdc-test 的信息:
[root@r22 ~]# /opt/kafka/bin --describe --zookeeper 192.168.12.24:2181 --topic canal-kafka
Topic: ticdc-test PartitionCount: 1 ReplicationFactor: 1 Configs: max.me,
Topic: ticdc-test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
9.1.3 啟動(dòng) Canal
在啟動(dòng) Canal 之前,需要在 Canal 節(jié)點(diǎn)上查看一下端口的情況:
## check MySQL 3306 port
## canal.instance.master.address=192.168.12.25:3306
[root@r26 bin]# telnet 192.168.12.25 3306
## check Kafka 9092 port
## canal.mq.servers = 192.168.12.22:9092
[root@r26 bin]# telnet 192.168.12.22 9092
## check zookeeper 2181 port
## canal.zkServers = 192.168.12.24:2181
[root@r26 bin]# telnet 192.168.12.24 2181
啟動(dòng) Canal:
[root@r26 bin]# /opt/canal/bin
cd to /opt/canal/bin for workaround relative path
LOG CONFIGURATION : /opt/canal/bin/../con
canal conf : /opt/canal/bin/../con
CLASSPATH :/opt/canal/bin/../conf:/opt/canal/bin/../lib:
cd to /opt/canal/bin for continue
9.1.4 查看 Canal 日志
查看 /opt/canal/logs/example
2021-02-24 01:41:40.293 [destination = example , address = , EventParser] WARN c.a.o.c.p.inbound.my - ---> begin to find start position, it will be long time for reset or first position
2021-02-24 01:41:40.293 [destination = example , address = , EventParser] WARN c.a.o.c.p.inbound.my - prepare to find start position just show master status
2021-02-24 01:41:40.542 [destination = example , address = , EventParser] WARN c.a.o.c.p.inbound.my - ---> find start position successfully, EntryPosition[included=false,journalName=binlog.000001,position=4,serverId=1,gtid=<null>,timestamp=1614134832000] cost : 244ms , the next step is binlog dump
9.1.5 查看 Kafka 中 consumer 信息
在 MySQL 中插入一條測(cè)試信息:
mysql> insert into t2 values(1);
Query OK, 1 row affected sec)
查看 consumer 的信息,已經(jīng)有了剛才插入的測(cè)試數(shù)據(jù):
/opt/kafka/bin --bootstrap-server 192.168.12.22:9092 --topic canal-kafka --from-beginning
{"data":null,"database":"test","es":1614151725000,"id":2,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"create database test","sqlType":null,"table":"","ts":1614151725890,"type":"QUERY"}
{"data":null,"database":"test","es":1614151746000,"id":3,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"create table t2(id int)","sqlType":null,"table":"t2","ts":1614151746141,"type":"CREATE"}
{"data":[{"id":"1"}],"database":"test","es":1614151941000,"id":4,"isDdl":false,"mysqlType":{"id":"int"},"old":null,"pkNames":null,"sql":"","sqlType":{"id":4},"table":"t2","ts":1614151941235,"type":"INSERT"}
9.2 Kafka -> Flink 通路
在 Flink 中創(chuàng)建 t2 表,connector 類型為 kafka。
## create a test table t2 in Flink
Flink SQL> create table t2(id int)
> WITH (
> 'connector' = 'kafka',
> 'topic' = 'canal-kafka',
> '; = '192.168.12.22:9092',
> '; = 'canal-kafka-consumer-group',
> 'format' = 'canal-json',
> '; = 'latest-offset'
> );
Flink SQL> select * from t1;
在 MySQL 中在插入一條測(cè)試數(shù)據(jù):
mysql> insert into values(2);
Query OK, 1 row affected sec)
從 Flink 中可以實(shí)時(shí)同步數(shù)據(jù):
Flink SQL> select * from t1;
Refresh: 1 s Page: Last of 1 Updated: 02:49:27.366
id
2
9.3 Flink -> TiDB 通路
9.3.1 在 下游的 TiDB 中創(chuàng)建用于測(cè)試的表
[root@r20 soft]# mysql -uroot -P14000 -hr21
mysql> create table t3 (id int);
Query OK, 0 rows affected sec)
9.3.2 在 Flink 中創(chuàng)建測(cè)試表
Flink SQL> CREATE TABLE t3 (
> id int
> ) with (
> 'connector' = 'jdbc',
> 'url' = 'jdbc:mysql://192.168.12.21:14000/test',
> 'table-name' = 't3',
> 'username' = 'root',
> 'password' = 'mysql'
> );
Flink SQL> insert into t3 values(3);
[INFO] Submitting SQL update statement to the cluster...
[INFO] Table update statement has been successfully submitted to the cluster:
Job ID: a0827487030db177ee7e5c8575ef714e
9.3.3 在下游 TiDB 中查看插入的數(shù)據(jù)
mysql> select * from ;
+------+
| id |
+------+
| 3 |
+------+
1 row in set sec)
本文為阿里云原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。
1.《678改動(dòng)日志專題之Flink 最佳實(shí)踐之使用 Canal 同步 MySQL 數(shù)據(jù)至 TiDB》援引自互聯(lián)網(wǎng),旨在傳遞更多網(wǎng)絡(luò)信息知識(shí),僅代表作者本人觀點(diǎn),與本網(wǎng)站無關(guān),侵刪請(qǐng)聯(lián)系頁腳下方聯(lián)系方式。
2.《678改動(dòng)日志專題之Flink 最佳實(shí)踐之使用 Canal 同步 MySQL 數(shù)據(jù)至 TiDB》僅供讀者參考,本網(wǎng)站未對(duì)該內(nèi)容進(jìn)行證實(shí),對(duì)其原創(chuàng)性、真實(shí)性、完整性、及時(shí)性不作任何保證。
3.文章轉(zhuǎn)載時(shí)請(qǐng)保留本站內(nèi)容來源地址,http://f99ss.com/gl/2075401.html