官方地址:在 Kubernetes 上部署 Debezium :: Debezium Documentation — Deploying Debezium on Kubernetes :: Debezium Documentation

前提条件

安装OLM

1
curl -sL https://github.com/operator-framework/operator-lifecycle-manager/releases/download/v0.20.0/install.sh | bash -s v0.20.0

安装Operator

1
kubectl create -f https://operatorhub.io/install/strimzi-kafka-operator.yaml

部署kafka集群

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: kafka
spec:
kafka:
replicas: 1
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
authentication:
type: tls
- name: external
port: 9094
type: nodeport
tls: false
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 20Gi
deleteClaim: false
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
zookeeper:
replicas: 1
storage:
type: persistent-claim
size: 20Gi
deleteClaim: false
entityOperator:
topicOperator: {}
userOperator: {}

部署Kafka Connect 集群(构建MySQL插件镜像)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: kafka-connect
annotations:
strimzi.io/use-connector-resources: "true"
spec:
# 版本3.7.0,默认3.1.0与MySQL 8.0.x不兼容
version: 3.7.0
replicas: 1
bootstrapServers: kafka-kafka-bootstrap:9092
config:
config.providers: secrets
config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
# -1 means it will use the default replication factor configured in the broker
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
build:
output:
type: docker
# 存在的私有镜像仓库地址
image: 10.20.13.140:5000/debezium-connect-mysql:latest
plugins:
- name: debezium-mysql-connector
artifacts:
- type: tgz
url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.7.0.Final/debezium-connector-mysql-2.7.0.Final-plugin.tar.gz

如果已存在可用镜像,使用以下方式

部署Kafka Connect 集群(使用已有镜像)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: kafka-connect
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.7.0
image: 10.20.13.140:5000/debezium-connect-mysql:latest
replicas: 1
bootstrapServers: kafka-kafka-bootstrap:9092
config:
config.providers: secrets
config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
# -1 means it will use the default replication factor configured in the broker
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1

创建MySQL Connector

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: debezium-connector-mysql
labels:
strimzi.io/cluster: kafka-connect
spec:
class: io.debezium.connector.mysql.MySqlConnector
tasksMax: 1
config:
tasks.max: 1
database.hostname: mysql
database.port: 3306
database.user: root
database.password: password
# database.user: ${secrets:debezium-example/debezium-secret:username}
# database.password: ${secrets:debezium-example/debezium-secret:password}
database.server.id: 184054
topic.prefix: mysql
database.include.list: inventory
schema.history.internal.kafka.bootstrap.servers: kafka-kafka-bootstrap:9092
schema.history.internal.kafka.topic: schema-changes.inventory

注意事项

多库多表同步

编辑mysql-connect.yaml或者kubectl edit kafkaconnector -n kafka debezium-connector-mysql

推荐修改yaml文件,因为edit不会立即更新

示例内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: debezium-connector-mysql
labels:
  strimzi.io/cluster: debezium-connect-cluster
spec:
class: io.debezium.connector.mysql.MySqlConnector
tasksMax: 1
config:
  tasks.max: 1
  database.hostname: # mysql svc地址,如果不在一个命名空间(比如keycloak命名空间)使用:mysql.keycloak.svc
  database.port: 3306
  database.user: #mysql用户名
  database.password: #mysql密码
  database.server.id: 184054
  topic.prefix: mysql
  database.include.list: testdb,testdb1 # 数据库
table.include.list: testdb.test_tables,testdb1.test_tables
  schema.history.internal.kafka.bootstrap.servers: debezium-cluster-kafka-bootstrap:9092
  schema.history.internal.kafka.topic: schema-changes.inventory
1
2
3
# 修改以下内容,库与库之间、表于表之间用英文逗号隔开
database.include.list: testdb,testdb1
table.include.list: testdb.test_tables,testdb1.test_tables

修改后重新应用

1
2
kubectl delete -f mysql-connect.yaml -n kafka
kubectl apply -f mysql-connect.yaml -n kafka

验证kafka连接状态(排查时使用)

1
2
3
4
kubectl get kafkaconnect -n kafka
# ready状态为true即可
# 查看详情
kubectl describe kafkaconnect -n kafka

验证mysql连接状态

1
2
3
4
kubectl get kafkaconnector -n kafka
# ready状态为true即可
# 查看详情
kubectl describe kafkaconnector -n kafka

查看表更新

1
2
3
4
# 启动一个pod watcher监控数据表增删改,只作用database.include.list定义的数据库 
kubectl run -n kafka -it --rm --image=quay.io/debezium/tooling:1.2 --restart=Never watcher -- kcat -b kafka-kafka-bootstrap:9092 -C -o beginning -t mysql

# kubectl run -n kafka -it --rm --image=harbor.basepoint.net/library/tooling:1.2 --restart=Never watcher -- kcat -b kafka.kafka.svc.cluster.local:9092 -C -o beginning -t mysql

查看数据更新

1
2
3
4
# 启动一个pod watcher2监控user表数据增删改
kubectl run -n kafka -it --rm --image=quay.io/debezium/tooling:1.2 --restart=Never watcher2 -- kcat -b kafka-kafka-bootstrap:9092 -C -o beginning -t mysql.testdb.user

# kubectl run -n kafka -it --rm --image=harbor.basepoint.net/library/tooling:1.2 --restart=Never watcher2 -- kcat -b kafka.kafka.svc.cluster.local:9092 -C -o beginning -t mysql.testdb.user

如果提示watcher或watcher2 Already exists,需要执行下面命令删除pod

1
2
kubectl delete pod -n kafka watcher --force
kubectl delete pod -n kafka watcher2 --force