前提条件
私有镜像仓库(制作connect镜像)
Harbor原文地址:【DevOps】Harbor私有镜像仓库并利用cert-manager自签名证书开启HTTPS(helm方式安装) - Elijah Blog (sreok.cn)
docker registry:
安装OLM
curl -sL https://github.com/operator-framework/operator-lifecycle-manager/releases/download/v0.20.0/install.sh | bash -s v0.20.0
安装Operator
kubectl create -f https://operatorhub.io/install/strimzi-kafka-operator.yaml
部署kafka集群
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: kafka
spec:
kafka:
replicas: 1
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
authentication:
type: tls
- name: external
port: 9094
type: nodeport
tls: false
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 20Gi
deleteClaim: false
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
zookeeper:
replicas: 1
storage:
type: persistent-claim
size: 20Gi
deleteClaim: false
entityOperator:
topicOperator: {}
userOperator: {}
部署Kafka Connect 集群(构建MySQL插件镜像)
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: kafka-connect
annotations:
strimzi.io/use-connector-resources: "true"
spec:
# 版本3.7.0,默认3.1.0与MySQL 8.0.x不兼容
version: 3.7.0
replicas: 1
bootstrapServers: kafka-kafka-bootstrap:9092
config:
config.providers: secrets
config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
# -1 means it will use the default replication factor configured in the broker
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
build:
output:
type: docker
# 存在的私有镜像仓库地址
image: 10.20.13.140:5000/debezium-connect-mysql:latest
plugins:
- name: debezium-mysql-connector
artifacts:
- type: tgz
url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.7.0.Final/debezium-connector-mysql-2.7.0.Final-plugin.tar.gz
如果已存在可用镜像,使用以下方式
部署Kafka Connect 集群(使用已有镜像)
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: kafka-connect
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.7.0
image: 10.20.13.140:5000/debezium-connect-mysql:latest
replicas: 1
bootstrapServers: kafka-kafka-bootstrap:9092
config:
config.providers: secrets
config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
# -1 means it will use the default replication factor configured in the broker
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
创建MySQL Connector
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: debezium-connector-mysql
labels:
strimzi.io/cluster: kafka-connect
spec:
class: io.debezium.connector.mysql.MySqlConnector
tasksMax: 1
config:
tasks.max: 1
database.hostname: mysql
database.port: 3306
database.user: root
database.password: password
# database.user: ${secrets:debezium-example/debezium-secret:username}
# database.password: ${secrets:debezium-example/debezium-secret:password}
database.server.id: 184054
topic.prefix: mysql
database.include.list: inventory
schema.history.internal.kafka.bootstrap.servers: kafka-kafka-bootstrap:9092
schema.history.internal.kafka.topic: schema-changes.inventory
注意事项
多库多表同步
编辑mysql-connect.yaml
或者kubectl edit kafkaconnector -n kafka debezium-connector-mysql
推荐修改yaml文件,因为edit不会立即更新
示例内容:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: debezium-connector-mysql
labels:
strimzi.io/cluster: debezium-connect-cluster
spec:
class: io.debezium.connector.mysql.MySqlConnector
tasksMax: 1
config:
tasks.max: 1
database.hostname: # mysql svc地址,如果不在一个命名空间(比如keycloak命名空间)使用:mysql.keycloak.svc
database.port: 3306
database.user: #mysql用户名
database.password: #mysql密码
database.server.id: 184054
topic.prefix: mysql
database.include.list: testdb,testdb1 # 数据库
table.include.list: testdb.test_tables,testdb1.test_tables
schema.history.internal.kafka.bootstrap.servers: debezium-cluster-kafka-bootstrap:9092
schema.history.internal.kafka.topic: schema-changes.inventory
# 修改以下内容,库与库之间、表于表之间用英文逗号隔开
database.include.list: testdb,testdb1
table.include.list: testdb.test_tables,testdb1.test_tables
修改后重新应用
kubectl delete -f mysql-connect.yaml -n kafka
kubectl apply -f mysql-connect.yaml -n kafka
验证kafka连接状态(排查时使用)
kubectl get kafkaconnect -n kafka
# ready状态为true即可
# 查看详情
kubectl describe kafkaconnect -n kafka
验证mysql连接状态
kubectl get kafkaconnector -n kafka
# ready状态为true即可
# 查看详情
kubectl describe kafkaconnector -n kafka
查看表更新
# 启动一个pod watcher监控数据表增删改,只作用database.include.list定义的数据库
kubectl run -n kafka -it --rm --image=quay.io/debezium/tooling:1.2 --restart=Never watcher -- kcat -b kafka-kafka-bootstrap:9092 -C -o beginning -t mysql
# kubectl run -n kafka -it --rm --image=harbor.basepoint.net/library/tooling:1.2 --restart=Never watcher -- kcat -b kafka.kafka.svc.cluster.local:9092 -C -o beginning -t mysql
查看数据更新
# 启动一个pod watcher2监控user表数据增删改
kubectl run -n kafka -it --rm --image=quay.io/debezium/tooling:1.2 --restart=Never watcher2 -- kcat -b kafka-kafka-bootstrap:9092 -C -o beginning -t mysql.testdb.user
# kubectl run -n kafka -it --rm --image=harbor.basepoint.net/library/tooling:1.2 --restart=Never watcher2 -- kcat -b kafka.kafka.svc.cluster.local:9092 -C -o beginning -t mysql.testdb.user
如果提示watcher或watcher2 Already exists,需要执行下面命令删除pod
kubectl delete pod -n kafka watcher --force
kubectl delete pod -n kafka watcher2 --force