官方地址:在 Kubernetes 上部署 Debezium :: Debezium Documentation --- Deploying Debezium on Kubernetes :: Debezium Documentation

前提条件

安装OLM

curl -sL https://github.com/operator-framework/operator-lifecycle-manager/releases/download/v0.20.0/install.sh | bash -s v0.20.0

安装Operator

kubectl create -f https://operatorhub.io/install/strimzi-kafka-operator.yaml

部署kafka集群

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: kafka
spec:
  kafka:
    replicas: 1
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
        authentication:
          type: tls
      - name: external
        port: 9094
        type: nodeport
        tls: false
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 20Gi
        deleteClaim: false
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      default.replication.factor: 1
      min.insync.replicas: 1
  zookeeper:
    replicas: 1
    storage:
      type: persistent-claim
      size: 20Gi
      deleteClaim: false
  entityOperator:
    topicOperator: {}
    userOperator: {}

部署Kafka Connect 集群(构建MySQL插件镜像)

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: kafka-connect
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  # 版本3.7.0,默认3.1.0与MySQL 8.0.x不兼容
  version: 3.7.0
  replicas: 1
  bootstrapServers: kafka-kafka-bootstrap:9092
  config:
    config.providers: secrets
    config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    # -1 means it will use the default replication factor configured in the broker
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1
  build:
    output:
      type: docker
      # 存在的私有镜像仓库地址
      image: 10.20.13.140:5000/debezium-connect-mysql:latest
    plugins:
      - name: debezium-mysql-connector
        artifacts:
          - type: tgz
            url: https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.7.0.Final/debezium-connector-mysql-2.7.0.Final-plugin.tar.gz

如果已存在可用镜像,使用以下方式

部署Kafka Connect 集群(使用已有镜像)

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: kafka-connect
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.7.0
  image: 10.20.13.140:5000/debezium-connect-mysql:latest
  replicas: 1
  bootstrapServers: kafka-kafka-bootstrap:9092
  config:
    config.providers: secrets
    config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    # -1 means it will use the default replication factor configured in the broker
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1

创建MySQL Connector

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: debezium-connector-mysql
  labels:
    strimzi.io/cluster: kafka-connect
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  tasksMax: 1
  config:
    tasks.max: 1
    database.hostname: mysql
    database.port: 3306
    database.user: root
    database.password: password
    # database.user: ${secrets:debezium-example/debezium-secret:username}
    # database.password: ${secrets:debezium-example/debezium-secret:password}
    database.server.id: 184054
    topic.prefix: mysql
    database.include.list: inventory
    schema.history.internal.kafka.bootstrap.servers: kafka-kafka-bootstrap:9092
    schema.history.internal.kafka.topic: schema-changes.inventory

注意事项

多库多表同步

编辑mysql-connect.yaml或者kubectl edit kafkaconnector -n kafka debezium-connector-mysql

推荐修改yaml文件,因为edit不会立即更新

示例内容:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: debezium-connector-mysql
  labels:
    strimzi.io/cluster: debezium-connect-cluster
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  tasksMax: 1
  config:
    tasks.max: 1
    database.hostname: # mysql svc地址,如果不在一个命名空间(比如keycloak命名空间)使用:mysql.keycloak.svc
    database.port: 3306
    database.user: #mysql用户名
    database.password: #mysql密码
    database.server.id: 184054
    topic.prefix: mysql
    database.include.list: testdb,testdb1 # 数据库
    table.include.list: testdb.test_tables,testdb1.test_tables
    schema.history.internal.kafka.bootstrap.servers: debezium-cluster-kafka-bootstrap:9092
    schema.history.internal.kafka.topic: schema-changes.inventory
# 修改以下内容,库与库之间、表于表之间用英文逗号隔开
database.include.list: testdb,testdb1
table.include.list: testdb.test_tables,testdb1.test_tables

修改后重新应用

kubectl delete -f mysql-connect.yaml -n kafka
kubectl apply -f mysql-connect.yaml -n kafka

验证kafka连接状态(排查时使用)

kubectl get kafkaconnect -n kafka
# ready状态为true即可
# 查看详情
kubectl describe kafkaconnect -n kafka

验证mysql连接状态

kubectl get kafkaconnector -n kafka
# ready状态为true即可
# 查看详情
kubectl describe kafkaconnector -n kafka

查看表更新

# 启动一个pod watcher监控数据表增删改,只作用database.include.list定义的数据库 
kubectl run -n kafka -it --rm --image=quay.io/debezium/tooling:1.2  --restart=Never watcher -- kcat -b kafka-kafka-bootstrap:9092 -C -o beginning -t mysql
​
# kubectl run -n kafka -it --rm --image=harbor.basepoint.net/library/tooling:1.2  --restart=Never watcher -- kcat -b kafka.kafka.svc.cluster.local:9092 -C -o beginning -t mysql

查看数据更新

# 启动一个pod watcher2监控user表数据增删改
kubectl run -n kafka -it --rm --image=quay.io/debezium/tooling:1.2  --restart=Never watcher2 -- kcat -b kafka-kafka-bootstrap:9092 -C -o beginning -t mysql.testdb.user
​
# kubectl run -n kafka -it --rm --image=harbor.basepoint.net/library/tooling:1.2  --restart=Never watcher2 -- kcat -b kafka.kafka.svc.cluster.local:9092 -C -o beginning -t mysql.testdb.user

如果提示watcher或watcher2 Already exists,需要执行下面命令删除pod

kubectl delete pod -n kafka watcher --force
kubectl delete pod -n kafka watcher2 --force